a
This commit is contained in:
221
step2_format.py
Normal file
221
step2_format.py
Normal file
@@ -0,0 +1,221 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Step 2: Format AssemblyAI transcript into lines with timestamps and speaker labels.
|
||||
|
||||
Input: JSON files in "_assembleai/" folder
|
||||
Output: Formatted lines in "_lines/" folder
|
||||
|
||||
Output format:
|
||||
[mm:ss](Speaker) line content
|
||||
|
||||
Usage:
|
||||
uv run step2_format.py
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
|
||||
# ============== Configuration ==============
|
||||
|
||||
INPUT_DIR = Path("_assembleai")
|
||||
OUTPUT_DIR = Path("_lines")
|
||||
|
||||
# Patterns for non-word utterances to merge
|
||||
NON_WORD_PATTERNS = [
|
||||
r'^[\s]*[嗯|啊|哦|呃|唉|哎|哈|哼|哟|哼|唔|呦|啊哈|哦豁|哎呀|哎哟|呜呼]+[\s]*$', # Chinese modal particles
|
||||
r'^[\s]*[Mm]hm+[\s]*$', # Mhm
|
||||
r'^[\s]*[Uu]h+[\s]*$', # Uh
|
||||
r'^[\s]*[Uu]m+[\s]*$', # Um
|
||||
r'^[\s]*[Aa]h+[\s]*$', # Ah
|
||||
r'^[\s]*[Oo]h+[\s]*$', # Oh
|
||||
r'^[\s]*[Hh]uh+[\s]*$', # Huh
|
||||
r'^[\s]*[Hh]mm+[\s]*$', # Hmm
|
||||
r'^[\s]*[Yy]eah?[\s]*$', # Yeah (standalone)
|
||||
r'^[\s]*[Nn]o+[\s]*$', # No (standalone)
|
||||
r'^[\s]*[Oo]k+[\s]*$', # Ok
|
||||
r'^[\s]*[Oo]kay+[\s]*$', # Okay
|
||||
r'^[\s]*[Rr]ight+[\s]*$', # Right (standalone)
|
||||
r'^[\s]*[Ww]hat+[\s]*$', # What (standalone)
|
||||
r'^[\s]*\([^)]*\)[\s]*$', # (laughs), (coughs), etc.
|
||||
r'^[\s]*\[[^\]]*\][\s]*$', # [laughs], [coughs], etc.
|
||||
]
|
||||
|
||||
NON_WORD_REGEX = re.compile('|'.join(f'({p})' for p in NON_WORD_PATTERNS), re.IGNORECASE)
|
||||
|
||||
|
||||
def ensure_dirs():
|
||||
"""Ensure output directories exist."""
|
||||
OUTPUT_DIR.mkdir(exist_ok=True)
|
||||
|
||||
|
||||
def format_timestamp(ms: int) -> str:
|
||||
"""Format milliseconds as [mm:ss]."""
|
||||
seconds = ms // 1000
|
||||
minutes = seconds // 60
|
||||
secs = seconds % 60
|
||||
return f"[{minutes:02d}:{secs:02d}]"
|
||||
|
||||
|
||||
def is_non_word(text: str) -> bool:
|
||||
"""Check if text is a non-word utterance."""
|
||||
return bool(NON_WORD_REGEX.match(text.strip()))
|
||||
|
||||
|
||||
def merge_utterances(utterances: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Merge consecutive utterances from the same speaker that are non-words
|
||||
with adjacent meaningful utterances.
|
||||
"""
|
||||
if not utterances:
|
||||
return []
|
||||
|
||||
merged = []
|
||||
i = 0
|
||||
|
||||
while i < len(utterances):
|
||||
current = utterances[i]
|
||||
|
||||
# Check if current is a non-word
|
||||
if is_non_word(current.get("text", "")):
|
||||
# Look ahead to find the next meaningful utterance from same speaker
|
||||
j = i + 1
|
||||
while j < len(utterances) and utterances[j].get("speaker") == current.get("speaker"):
|
||||
if not is_non_word(utterances[j].get("text", "")):
|
||||
# Merge current into the next meaningful one
|
||||
utterances[j]["text"] = current["text"] + " " + utterances[j]["text"]
|
||||
utterances[j]["start"] = current["start"]
|
||||
i = j
|
||||
break
|
||||
j += 1
|
||||
else:
|
||||
# No meaningful utterance found, keep as is
|
||||
merged.append(current)
|
||||
i += 1
|
||||
else:
|
||||
# Check if previous was a non-word from same speaker
|
||||
if merged and merged[-1].get("speaker") == current.get("speaker") and is_non_word(merged[-1].get("text", "")):
|
||||
# Merge previous into current
|
||||
current["text"] = merged[-1]["text"] + " " + current["text"]
|
||||
current["start"] = merged[-1]["start"]
|
||||
merged.pop()
|
||||
|
||||
merged.append(current)
|
||||
i += 1
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
def format_lines(transcript_data: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Format transcript utterances into lines.
|
||||
Returns the formatted text.
|
||||
"""
|
||||
utterances = transcript_data.get("utterances", [])
|
||||
|
||||
if not utterances:
|
||||
return ""
|
||||
|
||||
# Merge non-word utterances
|
||||
merged = merge_utterances(utterances)
|
||||
|
||||
# Format lines
|
||||
lines = []
|
||||
for utt in merged:
|
||||
text = utt.get("text", "").strip()
|
||||
|
||||
# Skip standalone non-words unless they're at the end
|
||||
if is_non_word(text) and utt != merged[-1]:
|
||||
continue
|
||||
|
||||
# Skip empty lines
|
||||
if not text:
|
||||
continue
|
||||
|
||||
speaker = utt.get("speaker", "?")
|
||||
timestamp = format_timestamp(utt.get("start", 0))
|
||||
|
||||
lines.append(f"{timestamp}(Speaker {speaker}) {text}")
|
||||
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
def process_transcript(input_path: Path) -> Path:
|
||||
"""
|
||||
Process a single transcript file.
|
||||
Returns the path to the output file.
|
||||
"""
|
||||
print(f"\n{'='*50}")
|
||||
print(f"Processing: {input_path.name}")
|
||||
print(f"{'='*50}")
|
||||
|
||||
# Load transcript
|
||||
with open(input_path, 'r', encoding='utf-8') as f:
|
||||
transcript_data = json.load(f)
|
||||
|
||||
utterance_count = len(transcript_data.get("utterances", []))
|
||||
print(f" Loaded {utterance_count} utterances")
|
||||
|
||||
# Format lines
|
||||
formatted_text = format_lines(transcript_data)
|
||||
|
||||
# Save output
|
||||
output_filename = input_path.stem.replace("_assemblyai", "") + "_lines.txt"
|
||||
output_path = OUTPUT_DIR / output_filename
|
||||
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
f.write(formatted_text)
|
||||
|
||||
line_count = len([l for l in formatted_text.split('\n') if l.strip()])
|
||||
print(f" Formatted {line_count} lines")
|
||||
print(f" Saved to: {output_path}")
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
def get_input_files() -> list[Path]:
|
||||
"""Discover all JSON files in _assembleai/ folder."""
|
||||
if not INPUT_DIR.exists():
|
||||
return []
|
||||
files = [f for f in INPUT_DIR.iterdir() if f.is_file() and f.suffix == '.json']
|
||||
return sorted(files)
|
||||
|
||||
|
||||
def main():
|
||||
ensure_dirs()
|
||||
|
||||
# Discover input files
|
||||
json_files = get_input_files()
|
||||
|
||||
if not json_files:
|
||||
print(f"No JSON files found in {INPUT_DIR}/")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Found {len(json_files)} transcript(s) in {INPUT_DIR}/")
|
||||
print("")
|
||||
|
||||
# Process all transcripts
|
||||
success_count = 0
|
||||
fail_count = 0
|
||||
|
||||
for input_path in json_files:
|
||||
try:
|
||||
output_path = process_transcript(input_path)
|
||||
success_count += 1
|
||||
except Exception as e:
|
||||
print(f"\n❌ Failed to process {input_path.name}: {e}")
|
||||
fail_count += 1
|
||||
|
||||
# Summary
|
||||
print("\n" + "="*50)
|
||||
print(f"Step 2 Complete: {success_count} succeeded, {fail_count} failed")
|
||||
print("="*50)
|
||||
|
||||
if fail_count > 0:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user