#!/usr/bin/env python3 """ Transcribe episodes with speaker diarization and name inference. Requirements: uv sync # or: uv run transcribe_episodes.py (auto-installs deps) Environment Variables: ASSEMBLYAI_API_KEY - Your AssemblyAI API key OPENAI_API_KEY - Your OpenAI/Kimi API key OPENAI_BASE_URL - (Optional) API base URL - Regular Kimi: https://api.moonshot.cn/v1 - Kimi Code: https://api.kimi.com/coding/v1 LLM_MODEL - (Optional) Model name, e.g., "kimi-for-coding" Usage: uv run transcribe_episodes.py uv run transcribe_episodes.py status uv run transcribe_episodes.py reset [filename] uv run transcribe_episodes.py test-llm """ import os import sys import json import re from pathlib import Path from dataclasses import dataclass, asdict from typing import List, Optional, Dict from datetime import timedelta import assemblyai as aai from openai import OpenAI # ============== Configuration ============== EPISODES_DIR = Path("episodes") OUTPUT_DIR = Path("transcripts") PROGRESS_FILE = Path(".transcription_progress.json") # Characters to recognize CHARACTERS = ["Malabar", "Sun", "Jupiter", "Kangarro", "Mole"] # LLM Configuration # For Kimi Code API: set OPENAI_BASE_URL="https://api.kimi.com/coding/v1" and LLM_MODEL="kimi-for-coding" DEFAULT_LLM_BASE_URL = "https://api.moonshot.cn/v1" # Default to regular Kimi DEFAULT_LLM_MODEL = "kimi-latest" # Patterns for non-word utterances to merge with adjacent lines # These are sounds, modal particles, short acknowledgments NON_WORD_PATTERNS = [ r'^[\s]*[嗯|啊|哦|呃|唉|哎|哈|哼|哟|哼|唔|呦|啊哈|哦豁|哎呀|哎哟|呜呼]+[\s]*$', # Chinese modal particles r'^[\s]*[Mm]hm+[\s]*$', # Mhm r'^[\s]*[Uu]h+[\s]*$', # Uh r'^[\s]*[Uu]m+[\s]*$', # Um r'^[\s]*[Aa]h+[\s]*$', # Ah r'^[\s]*[Oo]h+[\s]*$', # Oh r'^[\s]*[Hh]uh+[\s]*$', # Huh r'^[\s]*[Hh]mm+[\s]*$', # Hmm r'^[\s]*[Yy]eah?[\s]*$', # Yeah (standalone) r'^[\s]*[Nn]o+[\s]*$', # No (standalone) r'^[\s]*[Oo]k+[\s]*$', # Ok r'^[\s]*[Oo]kay+[\s]*$', # Okay r'^[\s]*[Rr]ight+[\s]*$', # Right (standalone) r'^[\s]*[Ww]hat+[\s]*$', # What (standalone) r'^[\s]*\([^)]*\)[\s]*$', # (laughs), (coughs), etc. r'^[\s]*\[[^\]]*\][\s]*$', # [laughs], [coughs], etc. ] NON_WORD_REGEX = re.compile('|'.join(f'({p})' for p in NON_WORD_PATTERNS), re.IGNORECASE) # ============== Data Classes ============== @dataclass class Utterance: """A single utterance from a speaker.""" speaker: str # Original speaker label (A, B, C...) text: str start_ms: int end_ms: int inferred_name: Optional[str] = None @property def timestamp(self) -> str: """Format timestamp as [mm:ss]""" seconds = self.start_ms // 1000 minutes = seconds // 60 secs = seconds % 60 return f"[{minutes:02d}:{secs:02d}]" @dataclass class EpisodeProgress: """Track progress for an episode.""" filename: str status: str # "pending", "transcribing", "naming", "completed", "error" error_message: Optional[str] = None output_file: Optional[str] = None # ============== Progress Manager ============== class ProgressManager: """Manages progress tracking to avoid re-processing.""" def __init__(self, progress_file: Path = PROGRESS_FILE): self.progress_file = progress_file self.progress: Dict[str, dict] = self._load() def _load(self) -> Dict[str, dict]: if self.progress_file.exists(): with open(self.progress_file, 'r', encoding='utf-8') as f: return json.load(f) return {} def save(self): with open(self.progress_file, 'w', encoding='utf-8') as f: json.dump(self.progress, f, indent=2, ensure_ascii=False) def get_status(self, filename: str) -> Optional[str]: return self.progress.get(filename, {}).get('status') def set_status(self, filename: str, status: str, error_message: str = None, output_file: str = None): if filename not in self.progress: self.progress[filename] = {} self.progress[filename]['status'] = status if error_message: self.progress[filename]['error'] = error_message if output_file: self.progress[filename]['output_file'] = output_file self.save() def is_completed(self, filename: str) -> bool: return self.get_status(filename) == "completed" def reset(self, filename: str = None): """Reset progress for a file or all files.""" if filename: if filename in self.progress: del self.progress[filename] else: self.progress = {} self.save() # ============== Transcription Service ============== class AssemblyAITranscriber: """Handles transcription with speaker diarization using AssemblyAI.""" def __init__(self, api_key: str): aai.settings.api_key = api_key def transcribe(self, audio_file: Path, on_progress=None) -> List[Utterance]: """ Transcribe audio file with speaker diarization. Returns list of Utterance objects. """ config = aai.TranscriptionConfig( speaker_labels=True, speech_models=["universal-2"], language_detection=True, # Auto-detect language (English, Chinese, etc.) ) transcriber = aai.Transcriber(config=config) print(f" Uploading {audio_file.name}...") transcript = transcriber.transcribe(str(audio_file)) if transcript.status == aai.TranscriptStatus.error: raise Exception(f"Transcription failed: {transcript.error}") print(f" Transcription complete. Processing utterances...") utterances = [] for utt in transcript.utterances: utterances.append(Utterance( speaker=utt.speaker, text=utt.text.strip(), start_ms=utt.start, end_ms=utt.end )) return utterances # ============== Speaker Naming Service ============== class SpeakerNamer: """Uses LLM to infer speaker names from context.""" # Kimi/Moonshot API endpoints to try KIMI_ENDPOINTS = [ "https://api.moonshot.cn/v1", "https://api.moonshot.ai/v1", ] def __init__(self, api_key: str, base_url: Optional[str] = None, model: Optional[str] = None): # Determine API type base_url = base_url or DEFAULT_LLM_BASE_URL is_kimi = "moonshot" in base_url or "kimi" in base_url # Use provided model or default based on API if model: self.model = model elif is_kimi: self.model = DEFAULT_LLM_MODEL else: self.model = "gpt-4o-mini" # Debug: Show which API is being used (without exposing the key) print(f" LLM Config: Using {'Kimi' if is_kimi else 'OpenAI'} API") print(f" Base URL: {base_url}") print(f" Model: {self.model}") print(f" API Key set: {'Yes (starts with ' + api_key[:8] + '...)' if api_key else 'NO - MISSING!'}") self.api_key = api_key self.base_url = base_url self.is_kimi = is_kimi self.client = OpenAI( api_key=api_key, base_url=base_url ) def _try_infer_with_endpoint(self, endpoint: str, prompt: str, speakers: List[str]) -> Optional[Dict[str, str]]: """Try to infer speakers using a specific endpoint.""" client = OpenAI(api_key=self.api_key, base_url=endpoint) response = client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": "You are a helpful assistant that identifies speakers in a dialogue based on context and speaking style."}, {"role": "user", "content": prompt} ], temperature=0.3 ) result = response.choices[0].message.content return self._parse_naming_response(result, speakers) def infer_speakers(self, utterances: List[Utterance]) -> Dict[str, str]: """ Infer speaker names based on context. Returns mapping of speaker_label -> name """ # Get unique speaker labels speakers = sorted(set(utt.speaker for utt in utterances)) # Build context with samples from each speaker speaker_samples = {s: [] for s in speakers} for utt in utterances: if len(speaker_samples[utt.speaker]) < 5: # Sample up to 5 utterances speaker_samples[utt.speaker].append(utt.text) # Build prompt prompt = self._build_naming_prompt(speaker_samples) print(f" Sending to LLM for speaker naming...") # Try different endpoints for Kimi endpoints_to_try = [] if self.is_kimi: # Start with configured base_url, then try alternatives if self.base_url: endpoints_to_try.append(self.base_url) for ep in self.KIMI_ENDPOINTS: if ep not in endpoints_to_try: endpoints_to_try.append(ep) else: endpoints_to_try = [self.base_url] if self.base_url else [None] last_error = None for endpoint in endpoints_to_try: try: print(f" Trying endpoint: {endpoint or 'default (OpenAI)'}") if self.is_kimi: mapping = self._try_infer_with_endpoint(endpoint, prompt, speakers) else: # Use default client for OpenAI response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": "You are a helpful assistant that identifies speakers in a dialogue based on context and speaking style."}, {"role": "user", "content": prompt} ], temperature=0.3 ) result = response.choices[0].message.content mapping = self._parse_naming_response(result, speakers) print(f" Identified speakers: {mapping}") return mapping except Exception as e: last_error = str(e) print(f" Failed: {last_error[:100]}...") continue # Try next endpoint # All endpoints failed print(f"\n Warning: Speaker naming failed on all endpoints.") print(f" Last error: {last_error}") # Provide helpful guidance for authentication errors if "401" in str(last_error) or "403" in str(last_error) or "Invalid Authentication" in str(last_error): print("\n === Authentication Error ===") print(" Your API key was rejected. Please check:") print(" 1. Is OPENAI_API_KEY set to your Kimi API key?") print(" Get your key from: https://platform.moonshot.cn/") print(" 2. Does your API key have sufficient balance/credits?") print(" 3. Kimi Code API (api.kimi.com/coding) requires special access.") print(" For regular Kimi, use: export OPENAI_BASE_URL='https://api.moonshot.cn/v1'") print(" 4. The script will continue with generic speaker labels (Speaker A, B, etc.)") print(" =============================\n") # Fallback to speaker labels return {s: f"Speaker {s}" for s in speakers} def _build_naming_prompt(self, speaker_samples: Dict[str, List[str]]) -> str: prompt = """I have a transcript from a video featuring the following characters: """ + ", ".join(CHARACTERS) + """. Below are sample utterances from each speaker. Based on the context, speaking style, and content, please identify which character is which speaker. """ for speaker, samples in speaker_samples.items(): prompt += f"Speaker {speaker}:\n" for i, sample in enumerate(samples, 1): prompt += f" {i}. \"{sample}\"\n" prompt += "\n" prompt += """Please respond with a JSON object mapping speaker labels to character names. Example: {"A": "Malabar", "B": "Sun", "C": "Jupiter"} Only use the character names from the list provided. If you're unsure, make your best guess based on the speaking style and context.""" return prompt def _parse_naming_response(self, response: str, valid_speakers: List[str]) -> Dict[str, str]: """Parse LLM response to extract speaker mapping.""" # Try to find JSON in the response import json # Look for JSON block json_match = re.search(r'\{[^}]+\}', response) if json_match: try: mapping = json.loads(json_match.group()) # Validate keys return {k: v for k, v in mapping.items() if k in valid_speakers} except json.JSONDecodeError: pass # Fallback: look for "Speaker X: Name" pattern mapping = {} for speaker in valid_speakers: pattern = rf'["\']?{speaker}["\']?\s*[:=]\s*["\']?([^"\'\n,]+)' match = re.search(pattern, response, re.IGNORECASE) if match: name = match.group(1).strip() # Ensure name is in our character list for char in CHARACTERS: if char.lower() in name.lower(): mapping[speaker] = char break else: mapping[speaker] = name # Fill in any missing speakers for speaker in valid_speakers: if speaker not in mapping: mapping[speaker] = f"Speaker {speaker}" return mapping # ============== Output Formatter ============== class OutputFormatter: """Formats and merges utterances for output.""" @staticmethod def is_non_word(text: str) -> bool: """Check if text is a non-word utterance (sound, modal particle).""" return bool(NON_WORD_REGEX.match(text.strip())) @classmethod def merge_utterances(cls, utterances: List[Utterance]) -> List[Utterance]: """ Merge consecutive utterances from the same speaker that are non-words with adjacent meaningful utterances. """ if not utterances: return [] merged = [] i = 0 while i < len(utterances): current = utterances[i] # Check if current is a non-word if cls.is_non_word(current.text): # Look ahead to find the next meaningful utterance from same speaker j = i + 1 while j < len(utterances) and utterances[j].speaker == current.speaker: if not cls.is_non_word(utterances[j].text): # Merge current into the next meaningful one utterances[j].text = current.text + " " + utterances[j].text utterances[j].start_ms = current.start_ms i = j break j += 1 else: # No meaningful utterance found, keep as is merged.append(current) i += 1 else: # Check if previous was a non-word from same speaker if merged and merged[-1].speaker == current.speaker and cls.is_non_word(merged[-1].text): # Merge previous into current current.text = merged[-1].text + " " + current.text current.start_ms = merged[-1].start_ms merged.pop() merged.append(current) i += 1 return merged @classmethod def format_output(cls, utterances: List[Utterance], speaker_mapping: Dict[str, str]) -> str: """Format utterances to output string.""" # Apply speaker names for utt in utterances: utt.inferred_name = speaker_mapping.get(utt.speaker, f"Speaker {utt.speaker}") # Merge consecutive non-words merged = cls.merge_utterances(utterances) # Format lines lines = [] for utt in merged: # Skip standalone non-words unless they're at the end if cls.is_non_word(utt.text) and utt != merged[-1]: continue speaker_name = utt.inferred_name or f"Speaker {utt.speaker}" lines.append(f"{utt.timestamp}({speaker_name}) {utt.text}") return '\n'.join(lines) # ============== Main Processor ============== class EpisodeProcessor: """Main processor that orchestrates transcription and naming.""" def __init__(self): # Check API keys assembly_key = os.getenv("ASSEMBLYAI_API_KEY") openai_key = os.getenv("OPENAI_API_KEY") if not assembly_key: raise ValueError("ASSEMBLYAI_API_KEY environment variable is required") if not openai_key: raise ValueError("OPENAI_API_KEY environment variable is required") self.transcriber = AssemblyAITranscriber(assembly_key) # Get LLM configuration from environment openai_base = os.getenv("OPENAI_BASE_URL", DEFAULT_LLM_BASE_URL) llm_model = os.getenv("LLM_MODEL") # e.g., "kimi-for-coding" self.namer = SpeakerNamer(openai_key, openai_base, llm_model) self.progress = ProgressManager() self.formatter = OutputFormatter() # Ensure output directory exists OUTPUT_DIR.mkdir(exist_ok=True) def process_episode(self, video_file: Path) -> bool: """Process a single episode. Returns True on success.""" filename = video_file.name # Check if already completed if self.progress.is_completed(filename): print(f"Skipping {filename} (already completed)") return True print(f"\n{'='*50}") print(f"Processing: {filename}") print(f"{'='*50}") try: # Step 1: Transcription self.progress.set_status(filename, "transcribing") utterances = self.transcriber.transcribe(video_file) if not utterances: print(f" No utterances found in {filename}") self.progress.set_status(filename, "completed", output_file=None) return True print(f" Found {len(utterances)} utterances") # Step 2: Speaker naming self.progress.set_status(filename, "naming") speaker_mapping = self.namer.infer_speakers(utterances) # Step 3: Format and save output_text = self.formatter.format_output(utterances, speaker_mapping) output_filename = video_file.stem + ".txt" output_path = OUTPUT_DIR / output_filename with open(output_path, 'w', encoding='utf-8') as f: f.write(output_text) self.progress.set_status(filename, "completed", output_file=str(output_path)) print(f" Saved to: {output_path}") return True except Exception as e: error_msg = str(e) self.progress.set_status(filename, "error", error_message=error_msg) print(f" ERROR: {error_msg}") return False def process_all(self): """Process all video files in episodes directory.""" # Find all video files video_extensions = {'.mp4', '.mkv', '.avi', '.mov', '.webm'} video_files = [ f for f in EPISODES_DIR.iterdir() if f.is_file() and f.suffix.lower() in video_extensions ] if not video_files: print(f"No video files found in {EPISODES_DIR}") return print(f"Found {len(video_files)} video file(s) to process") success_count = 0 fail_count = 0 for video_file in sorted(video_files): if self.process_episode(video_file): success_count += 1 else: fail_count += 1 print(f"\n{'='*50}") print(f"Processing complete: {success_count} succeeded, {fail_count} failed") print(f"Transcripts saved to: {OUTPUT_DIR}") print(f"Progress tracked in: {PROGRESS_FILE}") # ============== CLI ============== def print_usage(): print(""" Usage: uv run transcribe_episodes.py [command] Commands: (none) Process all episodes reset Reset progress for all files (will re-process everything) reset Reset progress for specific file status Show current progress status test-llm Test LLM API connection (diagnostic) Environment Variables: ASSEMBLYAI_API_KEY Required - Your AssemblyAI API key OPENAI_API_KEY Required - Your OpenAI/Kimi API key OPENAI_BASE_URL Optional - API endpoint URL - Regular Kimi: https://api.moonshot.cn/v1 - Kimi Code: https://api.kimi.com/coding/v1 LLM_MODEL Optional - Model name (e.g., "kimi-for-coding") Examples: # Regular Kimi export ASSEMBLYAI_API_KEY="your-assembly-key" export OPENAI_API_KEY="your-kimi-key" uv run transcribe_episodes.py # Kimi Code export ASSEMBLYAI_API_KEY="your-assembly-key" export OPENAI_API_KEY="your-kimi-code-key" export OPENAI_BASE_URL="https://api.kimi.com/coding/v1" export LLM_MODEL="kimi-for-coding" uv run transcribe_episodes.py # Test connection: uv run transcribe_episodes.py test-llm """) def test_llm_connection(): """Test LLM API connection and print diagnostic info.""" api_key = os.getenv("OPENAI_API_KEY") base_url = os.getenv("OPENAI_BASE_URL", DEFAULT_LLM_BASE_URL) model = os.getenv("LLM_MODEL") # e.g., "kimi-for-coding" print("=" * 50) print("LLM API Connection Test") print("=" * 50) print(f"\nConfiguration:") print(f" Base URL: {base_url}") print(f" Model: {model or '(auto-detect)'}") print(f" API Key: {'Set (starts with ' + api_key[:12] + '...)' if api_key else 'NOT SET'}") if not api_key: print("\n❌ ERROR: OPENAI_API_KEY environment variable is not set!") print("\nTo fix:") print(" export OPENAI_API_KEY='your-api-key-here'") return # Try connecting print(f"\nTesting connection...") is_kimi = "moonshot" in base_url or "kimi" in base_url if not model: model = "kimi-latest" if is_kimi else "gpt-4o-mini" endpoints_to_try = [base_url] for endpoint in endpoints_to_try: print(f"\n Trying endpoint: {endpoint}") print(f" Model: {model}") try: client = OpenAI(api_key=api_key, base_url=endpoint) response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": "Say 'Hello' and nothing else."}], max_tokens=10 ) print(f" ✅ SUCCESS!") print(f" Response: '{response.choices[0].message.content}'") print(f"\n✅ LLM API is working correctly!") return except Exception as e: error_str = str(e) print(f" ❌ Failed: {error_str}") if "401" in error_str or "403" in error_str: print("\n Authentication Error - Possible causes:") print(" 1. API key is incorrect or has been revoked") print(" 2. API key has no credits remaining") print(" 3. Wrong API key for the selected service") print(" 4. Kimi Code API requires special access (only for Coding Agents)") print("\n To fix:") print(" - For regular Kimi, use: export OPENAI_BASE_URL='https://api.moonshot.cn/v1'") print(" - For Kimi Code: Ensure your account has Coding Agent access") print(" - Get a regular Kimi API key from: https://platform.moonshot.cn/") print("\n❌ All endpoints failed. Please check your API key and try again.") def show_status(): """Show current progress status.""" progress = ProgressManager() print(f"\nProgress Status ({PROGRESS_FILE}):") print("-" * 50) if not progress.progress: print("No progress recorded yet.") return for filename, data in sorted(progress.progress.items()): status = data.get('status', 'unknown') output = data.get('output_file', '-') error = data.get('error', '') status_icon = "✓" if status == "completed" else "✗" if status == "error" else "⋯" print(f"{status_icon} {filename}: {status}") if output and status == "completed": print(f" Output: {output}") if error: print(f" Error: {error}") def main(): args = sys.argv[1:] if len(args) > 0: command = args[0] if command == "help" or command == "--help" or command == "-h": print_usage() return elif command == "status": show_status() return elif command == "reset": progress = ProgressManager() if len(args) > 1: target = args[1] progress.reset(target) print(f"Reset progress for: {target}") else: progress.reset() print("Reset all progress") return elif command == "test-llm": test_llm_connection() return else: print(f"Unknown command: {command}") print_usage() return # Default: process all episodes try: processor = EpisodeProcessor() processor.process_all() except ValueError as e: print(f"Error: {e}") print("\nPlease set the required environment variables:") print(" export ASSEMBLYAI_API_KEY='your-key'") print(" export OPENAI_API_KEY='your-key'") print("\nOptional (for Kimi Code):") print(" export OPENAI_BASE_URL='https://api.kimi.com/coding/v1'") print(" export LLM_MODEL='kimi-for-coding'") print("\nFor regular Kimi, the base URL defaults to https://api.moonshot.cn/v1") sys.exit(1) except Exception as e: print(f"Unexpected error: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()