Files
malabar/step1_transcribe.py
2026-03-03 17:20:29 +08:00

391 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Step 1: Extract transcript from video using AssemblyAI.
Input: Video files in "episodes/" folder
Output: Raw AssemblyAI transcript JSON in "_assembleai/" folder
Usage:
uv run step1_transcribe.py
uv run step1_transcribe.py -f # Force reprocessing
"""
import os
import re
import sys
import json
from pathlib import Path
import assemblyai as aai
# ============== Configuration ==============
EPISODES_DIR = Path("episodes")
OUTPUT_DIR = Path("_assembleai")
PROGRESS_FILE = Path(".step1_progress.json")
def ensure_dirs():
"""Ensure output directories exist."""
OUTPUT_DIR.mkdir(exist_ok=True)
def split_words_by_sentences(words: list) -> list:
"""
Split a list of words into sentence segments based on punctuation.
Args:
words: List of word dictionaries with 'text' key
Returns:
List of word segments, each representing a sentence
"""
if not words:
return []
segments = []
current_segment = []
# Pattern for sentence-ending punctuation (including the punctuation itself)
sentence_end_pattern = re.compile(r'[.!?]+["\')\]]*$')
for word in words:
current_segment.append(word)
text = word.get("text", "")
# Check if this word ends with sentence-ending punctuation
if sentence_end_pattern.search(text):
# End of sentence - save this segment
segments.append(current_segment)
current_segment = []
# Don't forget any remaining words
if current_segment:
segments.append(current_segment)
return segments
def ends_with_sentence_punctuation(text: str) -> bool:
"""Check if text ends with sentence-ending punctuation."""
text = text.strip()
return bool(re.search(r'[.!?]["\'\)\]]*$', text))
def merge_incomplete_sentences(utterances: list) -> list:
"""
Merge consecutive utterances where the first doesn't end with sentence punctuation.
This handles cases where AssemblyAI splits mid-sentence between speakers.
Uses the first speaker's label for merged utterances.
"""
if not utterances:
return utterances
result = []
current = utterances[0].copy()
for i in range(1, len(utterances)):
next_utt = utterances[i]
current_text = current.get("text", "")
# If current doesn't end with sentence punctuation, merge with next
if not ends_with_sentence_punctuation(current_text):
# Merge words
current["words"] = current.get("words", []) + next_utt.get("words", [])
# Update text
current["text"] = current_text + " " + next_utt.get("text", "")
# Update end time
current["end"] = next_utt.get("end", current["end"])
# Keep the first speaker's label (don't change to "?")
# current["speaker"] stays the same
else:
# Current is complete, save it and move to next
result.append(current)
current = next_utt.copy()
# Don't forget the last one
result.append(current)
return result
def split_utterances_by_pauses(utterances: list, pause_threshold_ms: int = 1500) -> list:
"""
Split long utterances based on pauses between words and sentence boundaries.
Args:
utterances: List of utterance dictionaries from AssemblyAI
pause_threshold_ms: Minimum gap (in milliseconds) to create a new utterance
Returns:
List of split utterances
"""
# First, merge consecutive utterances that don't end with sentence punctuation
utterances = merge_incomplete_sentences(utterances)
result = []
for utt in utterances:
words = utt.get("words", [])
if not words:
# No word-level data, keep original
result.append(utt)
continue
speaker = utt.get("speaker", "?")
current_segment_words = []
segments = []
for i, word in enumerate(words):
if not current_segment_words:
# First word in segment
current_segment_words.append(word)
else:
# Check gap from previous word
prev_word = current_segment_words[-1]
gap = word.get("start", 0) - prev_word.get("end", 0)
if gap >= pause_threshold_ms:
# Gap is large enough - first split by sentences within current segment
sentence_segments = split_words_by_sentences(current_segment_words)
for seg_words in sentence_segments:
segments.append({
"speaker": speaker,
"words": seg_words,
"start": seg_words[0]["start"],
"end": seg_words[-1]["end"]
})
current_segment_words = [word]
else:
# Continue current segment
current_segment_words.append(word)
# Don't forget the last segment - also split by sentences
if current_segment_words:
sentence_segments = split_words_by_sentences(current_segment_words)
for seg_words in sentence_segments:
segments.append({
"speaker": speaker,
"words": seg_words,
"start": seg_words[0]["start"],
"end": seg_words[-1]["end"]
})
# Convert segments to utterance format
for seg in segments:
text = " ".join(w.get("text", "") for w in seg["words"]).strip()
if text: # Only add non-empty segments
result.append({
"speaker": seg["speaker"],
"text": text,
"start": seg["start"],
"end": seg["end"],
"words": seg["words"]
})
return result
def load_progress() -> dict:
"""Load progress tracking."""
if PROGRESS_FILE.exists():
with open(PROGRESS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_progress(progress: dict):
"""Save progress tracking."""
with open(PROGRESS_FILE, 'w', encoding='utf-8') as f:
json.dump(progress, f, indent=2)
def transcribe_video(video_path: Path) -> dict:
"""
Transcribe video using AssemblyAI with speaker diarization.
Returns the raw transcript as a dictionary.
"""
api_key = os.getenv("ASSEMBLYAI_API_KEY")
if not api_key:
raise ValueError("ASSEMBLYAI_API_KEY environment variable is required")
aai.settings.api_key = api_key
print(f" Uploading {video_path.name}...")
# Speaker diarization config
# By default, AssemblyAI detects 1-10 speakers
# If you know the expected number, you can set speakers_expected
# Or set speaker_options for a range
speaker_options = aai.SpeakerOptions(
min_speakers=2,
max_speakers=10 # Allow up to 10 speakers
)
config = aai.TranscriptionConfig(
speaker_labels=True,
speech_models=["universal-2"],
language_detection=True, # Auto-detect language
speaker_options=speaker_options,
)
transcriber = aai.Transcriber(config=config)
transcript = transcriber.transcribe(str(video_path))
if transcript.status == aai.TranscriptStatus.error:
raise Exception(f"Transcription failed: {transcript.error}")
print(f" Transcription complete!")
# Convert utterances to dictionaries first
raw_utterances = []
for utt in transcript.utterances:
raw_utterances.append({
"speaker": utt.speaker,
"text": utt.text.strip(),
"start": utt.start,
"end": utt.end,
"confidence": utt.confidence if hasattr(utt, 'confidence') else None,
"words": [
{
"text": w.text,
"start": w.start,
"end": w.end,
"speaker": w.speaker if hasattr(w, 'speaker') else None
}
for w in (utt.words if hasattr(utt, 'words') else [])
]
})
# Split long utterances based on pauses
original_count = len(raw_utterances)
split_utterances = split_utterances_by_pauses(raw_utterances, pause_threshold_ms=1500)
new_count = len(split_utterances)
if new_count > original_count:
print(f" Split {original_count} utterances into {new_count} (based on 1.5s pauses)")
# Convert transcript to serializable dictionary
result = {
"id": transcript.id,
"status": str(transcript.status),
"audio_url": transcript.audio_url,
"text": transcript.text,
"confidence": transcript.confidence,
"audio_duration": transcript.audio_duration,
"language_code": transcript.json_response.get("language_code", "unknown"),
"utterances": split_utterances
}
return result
def process_video(video_path: Path, force: bool = False) -> Path:
"""
Process a single video file.
Returns the path to the output JSON file.
"""
progress = load_progress()
filename = video_path.name
# Check if already processed
if not force and filename in progress and progress[filename].get("status") == "completed":
output_path = Path(progress[filename]["output_file"])
if output_path.exists():
print(f"Skipping {filename} (already processed)")
return output_path
print(f"\n{'='*50}")
print(f"Processing: {filename}")
print(f"{'='*50}")
try:
# Transcribe
progress[filename] = {"status": "transcribing"}
save_progress(progress)
transcript_data = transcribe_video(video_path)
# Save to JSON
output_filename = video_path.stem + "_assemblyai.json"
output_path = OUTPUT_DIR / output_filename
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(transcript_data, f, indent=2, ensure_ascii=False)
# Update progress
progress[filename] = {
"status": "completed",
"output_file": str(output_path),
"utterance_count": len(transcript_data["utterances"])
}
save_progress(progress)
print(f" Saved to: {output_path}")
print(f" Utterances: {len(transcript_data['utterances'])}")
return output_path
except Exception as e:
progress[filename] = {"status": "error", "error": str(e)}
save_progress(progress)
print(f" ERROR: {e}")
raise
def get_input_files() -> list[Path]:
"""Discover all video files in episodes/ folder."""
if not EPISODES_DIR.exists():
return []
# Support common video formats
video_extensions = {'.mp4', '.mkv', '.avi', '.mov', '.webm'}
files = []
for f in EPISODES_DIR.iterdir():
if f.is_file() and f.suffix.lower() in video_extensions:
files.append(f)
return sorted(files)
def main():
ensure_dirs()
# Check for force flag
force = "--force" in sys.argv or "-f" in sys.argv
# Discover input files
video_files = get_input_files()
if not video_files:
print(f"No video files found in {EPISODES_DIR}/")
print("Supported formats: .mp4, .mkv, .avi, .mov, .webm")
sys.exit(1)
print(f"Found {len(video_files)} video(s) in {EPISODES_DIR}/")
if force:
print("Force mode: ON (reprocessing all files)")
print("")
# Process all videos
success_count = 0
fail_count = 0
for video_path in video_files:
try:
output_path = process_video(video_path, force=force)
success_count += 1
except Exception as e:
print(f"\n❌ Failed to process {video_path.name}: {e}")
fail_count += 1
# Summary
print("\n" + "="*50)
print(f"Step 1 Complete: {success_count} succeeded, {fail_count} failed")
print("="*50)
if fail_count > 0:
sys.exit(1)
if __name__ == "__main__":
main()