#!/usr/bin/env python3 """ Step 2: Format AssemblyAI transcript into lines with timestamps and speaker labels. Input: JSON files in "_assembleai/" folder Output: Formatted lines in "_lines/" folder Output format: [mm:ss](Speaker) line content Usage: uv run step2_format.py """ import sys import json import re from pathlib import Path from typing import List, Dict, Any, Tuple # ============== Configuration ============== # Split utterances on pauses longer than this (milliseconds) PAUSE_THRESHOLD_MS = 1500 # ============== Configuration ============== INPUT_DIR = Path("_assembleai") OUTPUT_DIR = Path("_lines") # Patterns for non-word utterances to merge NON_WORD_PATTERNS = [ r'^[\s]*[嗯|啊|哦|呃|唉|哎|哈|哼|哟|哼|唔|呦|啊哈|哦豁|哎呀|哎哟|呜呼]+[\s]*$', # Chinese modal particles r'^[\s]*[Mm]hm+[\s]*$', # Mhm r'^[\s]*[Uu]h+[\s]*$', # Uh r'^[\s]*[Uu]m+[\s]*$', # Um r'^[\s]*[Aa]h+[\s]*$', # Ah r'^[\s]*[Oo]h+[\s]*$', # Oh r'^[\s]*[Hh]uh+[\s]*$', # Huh r'^[\s]*[Hh]mm+[\s]*$', # Hmm r'^[\s]*[Yy]eah?[\s]*$', # Yeah (standalone) r'^[\s]*[Nn]o+[\s]*$', # No (standalone) r'^[\s]*[Oo]k+[\s]*$', # Ok r'^[\s]*[Oo]kay+[\s]*$', # Okay r'^[\s]*[Rr]ight+[\s]*$', # Right (standalone) r'^[\s]*[Ww]hat+[\s]*$', # What (standalone) r'^[\s]*\([^)]*\)[\s]*$', # (laughs), (coughs), etc. r'^[\s]*\[[^\]]*\][\s]*$', # [laughs], [coughs], etc. ] NON_WORD_REGEX = re.compile('|'.join(f'({p})' for p in NON_WORD_PATTERNS), re.IGNORECASE) def ensure_dirs(): """Ensure output directories exist.""" OUTPUT_DIR.mkdir(exist_ok=True) def split_words_by_sentences(words: list) -> list: """Split words into sentence segments based on punctuation.""" if not words: return [] segments = [] current_segment = [] sentence_end_pattern = re.compile(r'[.!?]+["\')\]]*$') for word in words: current_segment.append(word) text = word.get("text", "") if sentence_end_pattern.search(text): segments.append(current_segment) current_segment = [] if current_segment: segments.append(current_segment) return segments def split_utterances_by_pauses(utterances: list, pause_threshold_ms: int = 1500) -> list: """Split long utterances based on pauses between words and sentence boundaries.""" result = [] for utt in utterances: words = utt.get("words", []) if not words: result.append(utt) continue speaker = utt.get("speaker", "?") current_segment_words = [] segments = [] for i, word in enumerate(words): if not current_segment_words: current_segment_words.append(word) else: prev_word = current_segment_words[-1] gap = word.get("start", 0) - prev_word.get("end", 0) if gap >= pause_threshold_ms: # Gap is large enough - split by sentences within current segment sentence_segments = split_words_by_sentences(current_segment_words) for seg_words in sentence_segments: segments.append({ "speaker": speaker, "words": seg_words, "start": seg_words[0]["start"], "end": seg_words[-1]["end"] }) current_segment_words = [word] else: current_segment_words.append(word) # Process final segment if current_segment_words: sentence_segments = split_words_by_sentences(current_segment_words) for seg_words in sentence_segments: segments.append({ "speaker": speaker, "words": seg_words, "start": seg_words[0]["start"], "end": seg_words[-1]["end"] }) # Convert segments to utterance format for seg in segments: text = " ".join(w.get("text", "") for w in seg["words"]).strip() if text: result.append({ "speaker": seg["speaker"], "text": text, "start": seg["start"], "end": seg["end"], "words": seg["words"] }) return result def format_timestamp(ms: int) -> str: """Format milliseconds as [mm:ss].""" seconds = ms // 1000 minutes = seconds // 60 secs = seconds % 60 return f"[{minutes:02d}:{secs:02d}]" def is_non_word(text: str) -> bool: """Check if text is a non-word utterance.""" return bool(NON_WORD_REGEX.match(text.strip())) def merge_utterances(utterances: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Merge consecutive utterances from the same speaker that are non-words with adjacent meaningful utterances. """ if not utterances: return [] merged = [] i = 0 while i < len(utterances): current = utterances[i] # Check if current is a non-word if is_non_word(current.get("text", "")): # Look ahead to find the next meaningful utterance from same speaker j = i + 1 while j < len(utterances) and utterances[j].get("speaker") == current.get("speaker"): if not is_non_word(utterances[j].get("text", "")): # Merge current into the next meaningful one utterances[j]["text"] = current["text"] + " " + utterances[j]["text"] utterances[j]["start"] = current["start"] i = j break j += 1 else: # No meaningful utterance found, keep as is merged.append(current) i += 1 else: # Check if previous was a non-word from same speaker if merged and merged[-1].get("speaker") == current.get("speaker") and is_non_word(merged[-1].get("text", "")): # Merge previous into current current["text"] = merged[-1]["text"] + " " + current["text"] current["start"] = merged[-1]["start"] merged.pop() merged.append(current) i += 1 return merged def extract_opening_song_title(utterances: List[Dict[str, Any]]) -> Tuple[str, str, str, List[Dict[str, Any]]]: """ Extract title from opening song (lines within first 15 seconds). Returns (title, song_speaker, joined_song_lyrics, remaining_utterances). The title is the text after 'Malabar' in the opening song lyrics. All opening song lyrics (except title) are joined into one string. """ OPENING_SONG_THRESHOLD_MS = 15000 # 15 seconds # Separate opening song utterances (within first 15s) from the rest opening_song = [] remaining = [] for utt in utterances: if utt.get("start", 0) < OPENING_SONG_THRESHOLD_MS: opening_song.append(utt) else: remaining.append(utt) if not opening_song: return "", "", "", utterances # Find the utterance containing "Malabar" malabar_idx = -1 title = "" song_speaker = opening_song[0].get("speaker", "A") if opening_song else "A" title_utterance_idx = -1 # The utterance that contains the title (to exclude from song) for i, utt in enumerate(opening_song): text = utt.get("text", "") if "Malabar" in text or "malabar" in text.lower(): malabar_idx = i song_speaker = utt.get("speaker", song_speaker) # Extract title: text after "Malabar" (and any punctuation/space) match = re.search(r'Malabar[\s,]*(.+)', text, re.IGNORECASE) if match: title = match.group(1).strip() # Remove trailing punctuation from title title = re.sub(r'[.!?]+$', '', title).strip() title_utterance_idx = i # Remove title part from this utterance for song lyrics utt["text"] = re.sub(r'Malabar[\s,]*.+$', 'Malabar', text, flags=re.IGNORECASE).strip() break # If title not in same utterance as Malabar, check next utterance(s) if not title and malabar_idx >= 0: for j in range(malabar_idx + 1, len(opening_song)): next_text = opening_song[j].get("text", "").strip() if next_text: title = re.sub(r'[.!?]+$', '', next_text).strip() title_utterance_idx = j break # Join all opening song lyrics except the title utterance song_lines = [] for i, utt in enumerate(opening_song): if i != title_utterance_idx: text = utt.get("text", "").strip() if text: song_lines.append(text) joined_song = " ".join(song_lines) return title, song_speaker, joined_song, remaining def format_lines(transcript_data: Dict[str, Any]) -> str: """ Format transcript utterances into lines. Returns the formatted text. """ utterances = transcript_data.get("utterances", []) if not utterances: return "" # Split long utterances based on pauses and sentence boundaries utterances = split_utterances_by_pauses(utterances, PAUSE_THRESHOLD_MS) # Extract title from opening song (first 15 seconds) and get joined song lyrics title, song_speaker, joined_song, utterances = extract_opening_song_title(utterances) # Merge non-word utterances merged = merge_utterances(utterances) # Format lines lines = [] # Add title as first line if found (use "Song" as speaker) if title: lines.append(f"[00:00](Song) {title}") # Add joined opening song as second line if exists (use "Song" as speaker) if joined_song: lines.append(f"[00:01](Song) {joined_song}") # Track the last utterance for calculating THE END timestamp last_utt = None # Format remaining lines (skip those within first 15s as they're in the joined song) for utt in merged: # Skip utterances within opening song window (they're already included in joined_song) if utt.get("start", 0) < 15000: continue text = utt.get("text", "").strip() # Skip standalone non-words unless they're at the end if is_non_word(text) and utt != merged[-1]: continue # Skip empty lines if not text: continue speaker = utt.get("speaker", "?") timestamp = format_timestamp(utt.get("start", 0)) lines.append(f"{timestamp}(Speaker {speaker}) {text}") last_utt = utt # Add dummy "THE END" line after the last line # Calculate timestamp based on the duration of the last line if last_utt: last_start = last_utt.get("start", 0) last_end = last_utt.get("end", 0) duration = last_end - last_start # THE END timestamp = last line start + duration (same as last line's end time) the_end_time = last_start + duration the_end_timestamp = format_timestamp(the_end_time) lines.append(f"{the_end_timestamp}(Narrator) THE END") return '\n'.join(lines) def process_transcript(input_path: Path) -> Path: """ Process a single transcript file. Returns the path to the output file. """ print(f"\n{'='*50}") print(f"Processing: {input_path.name}") print(f"{'='*50}") # Load transcript with open(input_path, 'r', encoding='utf-8') as f: transcript_data = json.load(f) raw_count = len(transcript_data.get("utterances", [])) print(f" Loaded {raw_count} raw utterances") # Format lines (includes splitting by pauses) formatted_text = format_lines(transcript_data) # Save output output_filename = input_path.stem.replace("_assemblyai", "") + "_lines.txt" output_path = OUTPUT_DIR / output_filename with open(output_path, 'w', encoding='utf-8') as f: f.write(formatted_text) line_count = len([l for l in formatted_text.split('\n') if l.strip()]) print(f" Formatted {line_count} lines") print(f" Saved to: {output_path}") return output_path def get_input_files() -> list[Path]: """Discover all JSON files in _assembleai/ folder.""" if not INPUT_DIR.exists(): return [] files = [f for f in INPUT_DIR.iterdir() if f.is_file() and f.suffix == '.json'] return sorted(files) def main(): ensure_dirs() # Discover input files json_files = get_input_files() if not json_files: print(f"No JSON files found in {INPUT_DIR}/") sys.exit(1) print(f"Found {len(json_files)} transcript(s) in {INPUT_DIR}/") print("") # Process all transcripts success_count = 0 fail_count = 0 for input_path in json_files: try: output_path = process_transcript(input_path) success_count += 1 except Exception as e: print(f"\n❌ Failed to process {input_path.name}: {e}") fail_count += 1 # Summary print("\n" + "="*50) print(f"Step 2 Complete: {success_count} succeeded, {fail_count} failed") print("="*50) if fail_count > 0: sys.exit(1) if __name__ == "__main__": main()