#!/usr/bin/env python3 """ Step 1: Extract transcript from video using AssemblyAI. Input: Video files in "episodes/" folder Output: Raw AssemblyAI transcript JSON in "_assembleai/" folder Usage: uv run step1_transcribe.py uv run step1_transcribe.py -f # Force reprocessing """ import os import re import sys import json from pathlib import Path import assemblyai as aai # ============== Configuration ============== EPISODES_DIR = Path("episodes") OUTPUT_DIR = Path("_assembleai") PROGRESS_FILE = Path(".step1_progress.json") def ensure_dirs(): """Ensure output directories exist.""" OUTPUT_DIR.mkdir(exist_ok=True) def load_progress() -> dict: """Load progress tracking.""" if PROGRESS_FILE.exists(): with open(PROGRESS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_progress(progress: dict): """Save progress tracking.""" with open(PROGRESS_FILE, 'w', encoding='utf-8') as f: json.dump(progress, f, indent=2) def transcribe_video(video_path: Path) -> dict: """ Transcribe video using AssemblyAI with speaker diarization. Returns the raw transcript as a dictionary. """ api_key = os.getenv("ASSEMBLYAI_API_KEY") if not api_key: raise ValueError("ASSEMBLYAI_API_KEY environment variable is required") aai.settings.api_key = api_key print(f" Uploading {video_path.name}...") # Speaker diarization config # Lower speaker_sensitivity = more aggressive speaker detection (more speakers) speaker_options = aai.SpeakerOptions( min_speakers=2, max_speakers=10, # Allow up to 10 speakers speaker_sensitivity=0.2 # Low value = more sensitive to speaker changes ) config = aai.TranscriptionConfig( speaker_labels=True, speech_models=["universal-2"], language_detection=True, # Auto-detect language speaker_options=speaker_options, ) transcriber = aai.Transcriber(config=config) transcript = transcriber.transcribe(str(video_path)) if transcript.status == aai.TranscriptStatus.error: raise Exception(f"Transcription failed: {transcript.error}") print(f" Transcription complete!") # Convert transcript to serializable dictionary - NO POSTPROCESSING # Raw AssemblyAI output result = { "id": transcript.id, "status": str(transcript.status), "audio_url": transcript.audio_url, "text": transcript.text, "confidence": transcript.confidence, "audio_duration": transcript.audio_duration, "language_code": transcript.json_response.get("language_code", "unknown"), "utterances": [ { "speaker": utt.speaker, "text": utt.text.strip(), "start": utt.start, "end": utt.end, "confidence": utt.confidence if hasattr(utt, 'confidence') else None, "words": [ { "text": w.text, "start": w.start, "end": w.end, "speaker": w.speaker if hasattr(w, 'speaker') else None } for w in (utt.words if hasattr(utt, 'words') else []) ] } for utt in transcript.utterances ] } return result def extract_episode_code(filename: str) -> str: """Extract SxxExx code from filename (e.g., 'S01E01_The_Eye.mp4' -> 'S01E01').""" match = re.search(r'(S\d{2}E\d{2})', filename, re.IGNORECASE) if match: return match.group(1).upper() return None def process_video(video_path: Path, force: bool = False) -> Path: """ Process a single video file. Returns the path to the output JSON file. """ progress = load_progress() filename = video_path.name # Extract episode code for output naming (e.g., S01E01_The_Name.mp4 -> S01E01) episode_code = extract_episode_code(filename) if not episode_code: raise ValueError(f"Could not extract episode code (SxxExx) from filename: {filename}") # Check if already processed if not force and filename in progress and progress[filename].get("status") == "completed": output_path = Path(progress[filename]["output_file"]) if output_path.exists(): print(f"Skipping {filename} (already processed)") return output_path print(f"\n{'='*50}") print(f"Processing: {filename}") print(f"Episode code: {episode_code}") print(f"{'='*50}") try: # Transcribe progress[filename] = {"status": "transcribing"} save_progress(progress) transcript_data = transcribe_video(video_path) # Save to JSON using episode code only (drop the name part) output_filename = f"{episode_code}_assemblyai.json" output_path = OUTPUT_DIR / output_filename with open(output_path, 'w', encoding='utf-8') as f: json.dump(transcript_data, f, indent=2, ensure_ascii=False) # Update progress progress[filename] = { "status": "completed", "output_file": str(output_path), "utterance_count": len(transcript_data["utterances"]) } save_progress(progress) print(f" Saved to: {output_path}") print(f" Utterances: {len(transcript_data['utterances'])}") return output_path except Exception as e: progress[filename] = {"status": "error", "error": str(e)} save_progress(progress) print(f" ERROR: {e}") raise def get_input_files() -> list[Path]: """Discover all video files in episodes/ folder.""" if not EPISODES_DIR.exists(): return [] # Support common video formats video_extensions = {'.mp4', '.mkv', '.avi', '.mov', '.webm'} files = [] for f in EPISODES_DIR.iterdir(): if f.is_file() and f.suffix.lower() in video_extensions: files.append(f) return sorted(files) def main(): ensure_dirs() # Check for force flag force = "--force" in sys.argv or "-f" in sys.argv # Discover input files video_files = get_input_files() if not video_files: print(f"No video files found in {EPISODES_DIR}/") print("Supported formats: .mp4, .mkv, .avi, .mov, .webm") sys.exit(1) print(f"Found {len(video_files)} video(s) in {EPISODES_DIR}/") if force: print("Force mode: ON (reprocessing all files)") print("") # Process all videos success_count = 0 fail_count = 0 for video_path in video_files: try: output_path = process_video(video_path, force=force) success_count += 1 except Exception as e: print(f"\n❌ Failed to process {video_path.name}: {e}") fail_count += 1 # Summary print("\n" + "="*50) print(f"Step 1 Complete: {success_count} succeeded, {fail_count} failed") print("="*50) if fail_count > 0: sys.exit(1) if __name__ == "__main__": main()