412 lines
13 KiB
Python
412 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Step 2: Format AssemblyAI transcript into lines with timestamps and speaker labels.
|
|
|
|
Input: JSON files in "_assembleai/" folder
|
|
Output: Formatted lines in "_lines/" folder
|
|
|
|
Output format:
|
|
[mm:ss](Speaker) line content
|
|
|
|
Usage:
|
|
uv run step2_format.py
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Tuple
|
|
|
|
# ============== Configuration ==============
|
|
|
|
# Split utterances on pauses longer than this (milliseconds)
|
|
PAUSE_THRESHOLD_MS = 1500
|
|
|
|
# ============== Configuration ==============
|
|
|
|
INPUT_DIR = Path("_assembleai")
|
|
OUTPUT_DIR = Path("_lines")
|
|
|
|
# Patterns for non-word utterances to merge
|
|
NON_WORD_PATTERNS = [
|
|
r'^[\s]*[嗯|啊|哦|呃|唉|哎|哈|哼|哟|哼|唔|呦|啊哈|哦豁|哎呀|哎哟|呜呼]+[\s]*$', # Chinese modal particles
|
|
r'^[\s]*[Mm]hm+[\s]*$', # Mhm
|
|
r'^[\s]*[Uu]h+[\s]*$', # Uh
|
|
r'^[\s]*[Uu]m+[\s]*$', # Um
|
|
r'^[\s]*[Aa]h+[\s]*$', # Ah
|
|
r'^[\s]*[Oo]h+[\s]*$', # Oh
|
|
r'^[\s]*[Hh]uh+[\s]*$', # Huh
|
|
r'^[\s]*[Hh]mm+[\s]*$', # Hmm
|
|
r'^[\s]*[Yy]eah?[\s]*$', # Yeah (standalone)
|
|
r'^[\s]*[Nn]o+[\s]*$', # No (standalone)
|
|
r'^[\s]*[Oo]k+[\s]*$', # Ok
|
|
r'^[\s]*[Oo]kay+[\s]*$', # Okay
|
|
r'^[\s]*[Rr]ight+[\s]*$', # Right (standalone)
|
|
r'^[\s]*[Ww]hat+[\s]*$', # What (standalone)
|
|
r'^[\s]*\([^)]*\)[\s]*$', # (laughs), (coughs), etc.
|
|
r'^[\s]*\[[^\]]*\][\s]*$', # [laughs], [coughs], etc.
|
|
]
|
|
|
|
NON_WORD_REGEX = re.compile('|'.join(f'({p})' for p in NON_WORD_PATTERNS), re.IGNORECASE)
|
|
|
|
|
|
def ensure_dirs():
|
|
"""Ensure output directories exist."""
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
def split_words_by_sentences(words: list) -> list:
|
|
"""Split words into sentence segments based on punctuation."""
|
|
if not words:
|
|
return []
|
|
|
|
segments = []
|
|
current_segment = []
|
|
sentence_end_pattern = re.compile(r'[.!?]+["\')\]]*$')
|
|
|
|
for word in words:
|
|
current_segment.append(word)
|
|
text = word.get("text", "")
|
|
if sentence_end_pattern.search(text):
|
|
segments.append(current_segment)
|
|
current_segment = []
|
|
|
|
if current_segment:
|
|
segments.append(current_segment)
|
|
|
|
return segments
|
|
|
|
|
|
def split_utterances_by_pauses(utterances: list, pause_threshold_ms: int = 1500) -> list:
|
|
"""Split long utterances based on pauses between words and sentence boundaries."""
|
|
result = []
|
|
|
|
for utt in utterances:
|
|
words = utt.get("words", [])
|
|
if not words:
|
|
result.append(utt)
|
|
continue
|
|
|
|
speaker = utt.get("speaker", "?")
|
|
current_segment_words = []
|
|
segments = []
|
|
|
|
for i, word in enumerate(words):
|
|
if not current_segment_words:
|
|
current_segment_words.append(word)
|
|
else:
|
|
prev_word = current_segment_words[-1]
|
|
gap = word.get("start", 0) - prev_word.get("end", 0)
|
|
|
|
if gap >= pause_threshold_ms:
|
|
# Gap is large enough - split by sentences within current segment
|
|
sentence_segments = split_words_by_sentences(current_segment_words)
|
|
for seg_words in sentence_segments:
|
|
segments.append({
|
|
"speaker": speaker,
|
|
"words": seg_words,
|
|
"start": seg_words[0]["start"],
|
|
"end": seg_words[-1]["end"]
|
|
})
|
|
current_segment_words = [word]
|
|
else:
|
|
current_segment_words.append(word)
|
|
|
|
# Process final segment
|
|
if current_segment_words:
|
|
sentence_segments = split_words_by_sentences(current_segment_words)
|
|
for seg_words in sentence_segments:
|
|
segments.append({
|
|
"speaker": speaker,
|
|
"words": seg_words,
|
|
"start": seg_words[0]["start"],
|
|
"end": seg_words[-1]["end"]
|
|
})
|
|
|
|
# Convert segments to utterance format
|
|
for seg in segments:
|
|
text = " ".join(w.get("text", "") for w in seg["words"]).strip()
|
|
if text:
|
|
result.append({
|
|
"speaker": seg["speaker"],
|
|
"text": text,
|
|
"start": seg["start"],
|
|
"end": seg["end"],
|
|
"words": seg["words"]
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
def format_timestamp(ms: int) -> str:
|
|
"""Format milliseconds as [mm:ss]."""
|
|
seconds = ms // 1000
|
|
minutes = seconds // 60
|
|
secs = seconds % 60
|
|
return f"[{minutes:02d}:{secs:02d}]"
|
|
|
|
|
|
def is_non_word(text: str) -> bool:
|
|
"""Check if text is a non-word utterance."""
|
|
return bool(NON_WORD_REGEX.match(text.strip()))
|
|
|
|
|
|
def merge_utterances(utterances: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Merge consecutive utterances from the same speaker that are non-words
|
|
with adjacent meaningful utterances.
|
|
"""
|
|
if not utterances:
|
|
return []
|
|
|
|
merged = []
|
|
i = 0
|
|
|
|
while i < len(utterances):
|
|
current = utterances[i]
|
|
|
|
# Check if current is a non-word
|
|
if is_non_word(current.get("text", "")):
|
|
# Look ahead to find the next meaningful utterance from same speaker
|
|
j = i + 1
|
|
while j < len(utterances) and utterances[j].get("speaker") == current.get("speaker"):
|
|
if not is_non_word(utterances[j].get("text", "")):
|
|
# Merge current into the next meaningful one
|
|
utterances[j]["text"] = current["text"] + " " + utterances[j]["text"]
|
|
utterances[j]["start"] = current["start"]
|
|
i = j
|
|
break
|
|
j += 1
|
|
else:
|
|
# No meaningful utterance found, keep as is
|
|
merged.append(current)
|
|
i += 1
|
|
else:
|
|
# Check if previous was a non-word from same speaker
|
|
if merged and merged[-1].get("speaker") == current.get("speaker") and is_non_word(merged[-1].get("text", "")):
|
|
# Merge previous into current
|
|
current["text"] = merged[-1]["text"] + " " + current["text"]
|
|
current["start"] = merged[-1]["start"]
|
|
merged.pop()
|
|
|
|
merged.append(current)
|
|
i += 1
|
|
|
|
return merged
|
|
|
|
|
|
def extract_opening_song_title(utterances: List[Dict[str, Any]]) -> Tuple[str, str, str, List[Dict[str, Any]]]:
|
|
"""
|
|
Extract title from opening song (lines within first 15 seconds).
|
|
Returns (title, song_speaker, joined_song_lyrics, remaining_utterances).
|
|
|
|
The title is the text after 'Malabar' in the opening song lyrics.
|
|
All opening song lyrics (except title) are joined into one string.
|
|
"""
|
|
OPENING_SONG_THRESHOLD_MS = 15000 # 15 seconds
|
|
|
|
# Separate opening song utterances (within first 15s) from the rest
|
|
opening_song = []
|
|
remaining = []
|
|
|
|
for utt in utterances:
|
|
if utt.get("start", 0) < OPENING_SONG_THRESHOLD_MS:
|
|
opening_song.append(utt)
|
|
else:
|
|
remaining.append(utt)
|
|
|
|
if not opening_song:
|
|
return "", "", "", utterances
|
|
|
|
# Find the utterance containing "Malabar"
|
|
malabar_idx = -1
|
|
title = ""
|
|
song_speaker = opening_song[0].get("speaker", "A") if opening_song else "A"
|
|
title_utterance_idx = -1 # The utterance that contains the title (to exclude from song)
|
|
|
|
for i, utt in enumerate(opening_song):
|
|
text = utt.get("text", "")
|
|
if "Malabar" in text or "malabar" in text.lower():
|
|
malabar_idx = i
|
|
song_speaker = utt.get("speaker", song_speaker)
|
|
# Extract title: text after "Malabar" (and any punctuation/space)
|
|
match = re.search(r'Malabar[\s,]*(.+)', text, re.IGNORECASE)
|
|
if match:
|
|
title = match.group(1).strip()
|
|
# Remove trailing punctuation from title
|
|
title = re.sub(r'[.!?]+$', '', title).strip()
|
|
title_utterance_idx = i
|
|
# Remove title part from this utterance for song lyrics
|
|
utt["text"] = re.sub(r'Malabar[\s,]*.+$', 'Malabar', text, flags=re.IGNORECASE).strip()
|
|
break
|
|
|
|
# If title not in same utterance as Malabar, check next utterance(s)
|
|
if not title and malabar_idx >= 0:
|
|
for j in range(malabar_idx + 1, len(opening_song)):
|
|
next_text = opening_song[j].get("text", "").strip()
|
|
if next_text:
|
|
title = re.sub(r'[.!?]+$', '', next_text).strip()
|
|
title_utterance_idx = j
|
|
break
|
|
|
|
# Join all opening song lyrics except the title utterance
|
|
song_lines = []
|
|
for i, utt in enumerate(opening_song):
|
|
if i != title_utterance_idx:
|
|
text = utt.get("text", "").strip()
|
|
if text:
|
|
song_lines.append(text)
|
|
|
|
joined_song = " ".join(song_lines)
|
|
|
|
return title, song_speaker, joined_song, remaining
|
|
|
|
|
|
def format_lines(transcript_data: Dict[str, Any]) -> str:
|
|
"""
|
|
Format transcript utterances into lines.
|
|
Returns the formatted text.
|
|
"""
|
|
utterances = transcript_data.get("utterances", [])
|
|
|
|
if not utterances:
|
|
return ""
|
|
|
|
# Split long utterances based on pauses and sentence boundaries
|
|
utterances = split_utterances_by_pauses(utterances, PAUSE_THRESHOLD_MS)
|
|
|
|
# Extract title from opening song (first 15 seconds) and get joined song lyrics
|
|
title, song_speaker, joined_song, utterances = extract_opening_song_title(utterances)
|
|
|
|
# Merge non-word utterances
|
|
merged = merge_utterances(utterances)
|
|
|
|
# Format lines
|
|
lines = []
|
|
|
|
# Add title as first line if found (use "Song" as speaker)
|
|
if title:
|
|
lines.append(f"[00:00](Song) {title}")
|
|
|
|
# Add joined opening song as second line if exists (use "Song" as speaker)
|
|
if joined_song:
|
|
lines.append(f"[00:01](Song) {joined_song}")
|
|
|
|
# Track the last utterance for calculating THE END timestamp
|
|
last_utt = None
|
|
|
|
# Format remaining lines (skip those within first 15s as they're in the joined song)
|
|
for utt in merged:
|
|
# Skip utterances within opening song window (they're already included in joined_song)
|
|
if utt.get("start", 0) < 15000:
|
|
continue
|
|
|
|
text = utt.get("text", "").strip()
|
|
|
|
# Skip standalone non-words unless they're at the end
|
|
if is_non_word(text) and utt != merged[-1]:
|
|
continue
|
|
|
|
# Skip empty lines
|
|
if not text:
|
|
continue
|
|
|
|
speaker = utt.get("speaker", "?")
|
|
timestamp = format_timestamp(utt.get("start", 0))
|
|
|
|
lines.append(f"{timestamp}(Speaker {speaker}) {text}")
|
|
last_utt = utt
|
|
|
|
# Add dummy "THE END" line after the last line
|
|
# Calculate timestamp based on the duration of the last line
|
|
if last_utt:
|
|
last_start = last_utt.get("start", 0)
|
|
last_end = last_utt.get("end", 0)
|
|
duration = last_end - last_start
|
|
# THE END timestamp = last line start + duration (same as last line's end time)
|
|
the_end_time = last_start + duration
|
|
the_end_timestamp = format_timestamp(the_end_time)
|
|
lines.append(f"{the_end_timestamp}(Narrator) THE END")
|
|
|
|
return '\n'.join(lines)
|
|
|
|
|
|
def process_transcript(input_path: Path) -> Path:
|
|
"""
|
|
Process a single transcript file.
|
|
Returns the path to the output file.
|
|
"""
|
|
print(f"\n{'='*50}")
|
|
print(f"Processing: {input_path.name}")
|
|
print(f"{'='*50}")
|
|
|
|
# Load transcript
|
|
with open(input_path, 'r', encoding='utf-8') as f:
|
|
transcript_data = json.load(f)
|
|
|
|
raw_count = len(transcript_data.get("utterances", []))
|
|
print(f" Loaded {raw_count} raw utterances")
|
|
|
|
# Format lines (includes splitting by pauses)
|
|
formatted_text = format_lines(transcript_data)
|
|
|
|
# Save output
|
|
output_filename = input_path.stem.replace("_assemblyai", "") + "_lines.txt"
|
|
output_path = OUTPUT_DIR / output_filename
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(formatted_text)
|
|
|
|
line_count = len([l for l in formatted_text.split('\n') if l.strip()])
|
|
print(f" Formatted {line_count} lines")
|
|
print(f" Saved to: {output_path}")
|
|
|
|
return output_path
|
|
|
|
|
|
def get_input_files() -> list[Path]:
|
|
"""Discover all JSON files in _assembleai/ folder."""
|
|
if not INPUT_DIR.exists():
|
|
return []
|
|
files = [f for f in INPUT_DIR.iterdir() if f.is_file() and f.suffix == '.json']
|
|
return sorted(files)
|
|
|
|
|
|
def main():
|
|
ensure_dirs()
|
|
|
|
# Discover input files
|
|
json_files = get_input_files()
|
|
|
|
if not json_files:
|
|
print(f"No JSON files found in {INPUT_DIR}/")
|
|
sys.exit(1)
|
|
|
|
print(f"Found {len(json_files)} transcript(s) in {INPUT_DIR}/")
|
|
print("")
|
|
|
|
# Process all transcripts
|
|
success_count = 0
|
|
fail_count = 0
|
|
|
|
for input_path in json_files:
|
|
try:
|
|
output_path = process_transcript(input_path)
|
|
success_count += 1
|
|
except Exception as e:
|
|
print(f"\n❌ Failed to process {input_path.name}: {e}")
|
|
fail_count += 1
|
|
|
|
# Summary
|
|
print("\n" + "="*50)
|
|
print(f"Step 2 Complete: {success_count} succeeded, {fail_count} failed")
|
|
print("="*50)
|
|
|
|
if fail_count > 0:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|