Files
malabar/transcribe_episodes.py
2026-03-03 17:20:29 +08:00

748 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Transcribe episodes with speaker diarization and name inference.
Requirements:
uv sync
# or: uv run transcribe_episodes.py (auto-installs deps)
Environment Variables:
ASSEMBLYAI_API_KEY - Your AssemblyAI API key
OPENAI_API_KEY - Your OpenAI/Kimi API key
OPENAI_BASE_URL - (Optional) API base URL
- Regular Kimi: https://api.moonshot.cn/v1
- Kimi Code: https://api.kimi.com/coding/v1
LLM_MODEL - (Optional) Model name, e.g., "kimi-for-coding"
Usage:
uv run transcribe_episodes.py
uv run transcribe_episodes.py status
uv run transcribe_episodes.py reset [filename]
uv run transcribe_episodes.py test-llm
"""
import os
import sys
import json
import re
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict
from datetime import timedelta
import assemblyai as aai
from openai import OpenAI
# ============== Configuration ==============
EPISODES_DIR = Path("episodes")
OUTPUT_DIR = Path("transcripts")
PROGRESS_FILE = Path(".transcription_progress.json")
# Characters to recognize
CHARACTERS = ["Malabar", "Sun", "Jupiter", "Kangarro", "Mole"]
# LLM Configuration
# For Kimi Code API: set OPENAI_BASE_URL="https://api.kimi.com/coding/v1" and LLM_MODEL="kimi-for-coding"
DEFAULT_LLM_BASE_URL = "https://api.moonshot.cn/v1" # Default to regular Kimi
DEFAULT_LLM_MODEL = "kimi-latest"
# Patterns for non-word utterances to merge with adjacent lines
# These are sounds, modal particles, short acknowledgments
NON_WORD_PATTERNS = [
r'^[\s]*[嗯|啊|哦|呃|唉|哎|哈|哼|哟|哼|唔|呦|啊哈|哦豁|哎呀|哎哟|呜呼]+[\s]*$', # Chinese modal particles
r'^[\s]*[Mm]hm+[\s]*$', # Mhm
r'^[\s]*[Uu]h+[\s]*$', # Uh
r'^[\s]*[Uu]m+[\s]*$', # Um
r'^[\s]*[Aa]h+[\s]*$', # Ah
r'^[\s]*[Oo]h+[\s]*$', # Oh
r'^[\s]*[Hh]uh+[\s]*$', # Huh
r'^[\s]*[Hh]mm+[\s]*$', # Hmm
r'^[\s]*[Yy]eah?[\s]*$', # Yeah (standalone)
r'^[\s]*[Nn]o+[\s]*$', # No (standalone)
r'^[\s]*[Oo]k+[\s]*$', # Ok
r'^[\s]*[Oo]kay+[\s]*$', # Okay
r'^[\s]*[Rr]ight+[\s]*$', # Right (standalone)
r'^[\s]*[Ww]hat+[\s]*$', # What (standalone)
r'^[\s]*\([^)]*\)[\s]*$', # (laughs), (coughs), etc.
r'^[\s]*\[[^\]]*\][\s]*$', # [laughs], [coughs], etc.
]
NON_WORD_REGEX = re.compile('|'.join(f'({p})' for p in NON_WORD_PATTERNS), re.IGNORECASE)
# ============== Data Classes ==============
@dataclass
class Utterance:
"""A single utterance from a speaker."""
speaker: str # Original speaker label (A, B, C...)
text: str
start_ms: int
end_ms: int
inferred_name: Optional[str] = None
@property
def timestamp(self) -> str:
"""Format timestamp as [mm:ss]"""
seconds = self.start_ms // 1000
minutes = seconds // 60
secs = seconds % 60
return f"[{minutes:02d}:{secs:02d}]"
@dataclass
class EpisodeProgress:
"""Track progress for an episode."""
filename: str
status: str # "pending", "transcribing", "naming", "completed", "error"
error_message: Optional[str] = None
output_file: Optional[str] = None
# ============== Progress Manager ==============
class ProgressManager:
"""Manages progress tracking to avoid re-processing."""
def __init__(self, progress_file: Path = PROGRESS_FILE):
self.progress_file = progress_file
self.progress: Dict[str, dict] = self._load()
def _load(self) -> Dict[str, dict]:
if self.progress_file.exists():
with open(self.progress_file, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save(self):
with open(self.progress_file, 'w', encoding='utf-8') as f:
json.dump(self.progress, f, indent=2, ensure_ascii=False)
def get_status(self, filename: str) -> Optional[str]:
return self.progress.get(filename, {}).get('status')
def set_status(self, filename: str, status: str, error_message: str = None, output_file: str = None):
if filename not in self.progress:
self.progress[filename] = {}
self.progress[filename]['status'] = status
if error_message:
self.progress[filename]['error'] = error_message
if output_file:
self.progress[filename]['output_file'] = output_file
self.save()
def is_completed(self, filename: str) -> bool:
return self.get_status(filename) == "completed"
def reset(self, filename: str = None):
"""Reset progress for a file or all files."""
if filename:
if filename in self.progress:
del self.progress[filename]
else:
self.progress = {}
self.save()
# ============== Transcription Service ==============
class AssemblyAITranscriber:
"""Handles transcription with speaker diarization using AssemblyAI."""
def __init__(self, api_key: str):
aai.settings.api_key = api_key
def transcribe(self, audio_file: Path, on_progress=None) -> List[Utterance]:
"""
Transcribe audio file with speaker diarization.
Returns list of Utterance objects.
"""
config = aai.TranscriptionConfig(
speaker_labels=True,
speech_models=["universal-2"],
language_detection=True, # Auto-detect language (English, Chinese, etc.)
)
transcriber = aai.Transcriber(config=config)
print(f" Uploading {audio_file.name}...")
transcript = transcriber.transcribe(str(audio_file))
if transcript.status == aai.TranscriptStatus.error:
raise Exception(f"Transcription failed: {transcript.error}")
print(f" Transcription complete. Processing utterances...")
utterances = []
for utt in transcript.utterances:
utterances.append(Utterance(
speaker=utt.speaker,
text=utt.text.strip(),
start_ms=utt.start,
end_ms=utt.end
))
return utterances
# ============== Speaker Naming Service ==============
class SpeakerNamer:
"""Uses LLM to infer speaker names from context."""
# Kimi/Moonshot API endpoints to try
KIMI_ENDPOINTS = [
"https://api.moonshot.cn/v1",
"https://api.moonshot.ai/v1",
]
def __init__(self, api_key: str, base_url: Optional[str] = None, model: Optional[str] = None):
# Determine API type
base_url = base_url or DEFAULT_LLM_BASE_URL
is_kimi = "moonshot" in base_url or "kimi" in base_url
# Use provided model or default based on API
if model:
self.model = model
elif is_kimi:
self.model = DEFAULT_LLM_MODEL
else:
self.model = "gpt-4o-mini"
# Debug: Show which API is being used (without exposing the key)
print(f" LLM Config: Using {'Kimi' if is_kimi else 'OpenAI'} API")
print(f" Base URL: {base_url}")
print(f" Model: {self.model}")
print(f" API Key set: {'Yes (starts with ' + api_key[:8] + '...)' if api_key else 'NO - MISSING!'}")
self.api_key = api_key
self.base_url = base_url
self.is_kimi = is_kimi
self.client = OpenAI(
api_key=api_key,
base_url=base_url
)
def _try_infer_with_endpoint(self, endpoint: str, prompt: str, speakers: List[str]) -> Optional[Dict[str, str]]:
"""Try to infer speakers using a specific endpoint."""
client = OpenAI(api_key=self.api_key, base_url=endpoint)
response = client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are a helpful assistant that identifies speakers in a dialogue based on context and speaking style."},
{"role": "user", "content": prompt}
],
temperature=0.3
)
result = response.choices[0].message.content
return self._parse_naming_response(result, speakers)
def infer_speakers(self, utterances: List[Utterance]) -> Dict[str, str]:
"""
Infer speaker names based on context.
Returns mapping of speaker_label -> name
"""
# Get unique speaker labels
speakers = sorted(set(utt.speaker for utt in utterances))
# Build context with samples from each speaker
speaker_samples = {s: [] for s in speakers}
for utt in utterances:
if len(speaker_samples[utt.speaker]) < 5: # Sample up to 5 utterances
speaker_samples[utt.speaker].append(utt.text)
# Build prompt
prompt = self._build_naming_prompt(speaker_samples)
print(f" Sending to LLM for speaker naming...")
# Try different endpoints for Kimi
endpoints_to_try = []
if self.is_kimi:
# Start with configured base_url, then try alternatives
if self.base_url:
endpoints_to_try.append(self.base_url)
for ep in self.KIMI_ENDPOINTS:
if ep not in endpoints_to_try:
endpoints_to_try.append(ep)
else:
endpoints_to_try = [self.base_url] if self.base_url else [None]
last_error = None
for endpoint in endpoints_to_try:
try:
print(f" Trying endpoint: {endpoint or 'default (OpenAI)'}")
if self.is_kimi:
mapping = self._try_infer_with_endpoint(endpoint, prompt, speakers)
else:
# Use default client for OpenAI
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are a helpful assistant that identifies speakers in a dialogue based on context and speaking style."},
{"role": "user", "content": prompt}
],
temperature=0.3
)
result = response.choices[0].message.content
mapping = self._parse_naming_response(result, speakers)
print(f" Identified speakers: {mapping}")
return mapping
except Exception as e:
last_error = str(e)
print(f" Failed: {last_error[:100]}...")
continue # Try next endpoint
# All endpoints failed
print(f"\n Warning: Speaker naming failed on all endpoints.")
print(f" Last error: {last_error}")
# Provide helpful guidance for authentication errors
if "401" in str(last_error) or "403" in str(last_error) or "Invalid Authentication" in str(last_error):
print("\n === Authentication Error ===")
print(" Your API key was rejected. Please check:")
print(" 1. Is OPENAI_API_KEY set to your Kimi API key?")
print(" Get your key from: https://platform.moonshot.cn/")
print(" 2. Does your API key have sufficient balance/credits?")
print(" 3. Kimi Code API (api.kimi.com/coding) requires special access.")
print(" For regular Kimi, use: export OPENAI_BASE_URL='https://api.moonshot.cn/v1'")
print(" 4. The script will continue with generic speaker labels (Speaker A, B, etc.)")
print(" =============================\n")
# Fallback to speaker labels
return {s: f"Speaker {s}" for s in speakers}
def _build_naming_prompt(self, speaker_samples: Dict[str, List[str]]) -> str:
prompt = """I have a transcript from a video featuring the following characters: """ + ", ".join(CHARACTERS) + """.
Below are sample utterances from each speaker. Based on the context, speaking style, and content, please identify which character is which speaker.
"""
for speaker, samples in speaker_samples.items():
prompt += f"Speaker {speaker}:\n"
for i, sample in enumerate(samples, 1):
prompt += f" {i}. \"{sample}\"\n"
prompt += "\n"
prompt += """Please respond with a JSON object mapping speaker labels to character names.
Example: {"A": "Malabar", "B": "Sun", "C": "Jupiter"}
Only use the character names from the list provided. If you're unsure, make your best guess based on the speaking style and context."""
return prompt
def _parse_naming_response(self, response: str, valid_speakers: List[str]) -> Dict[str, str]:
"""Parse LLM response to extract speaker mapping."""
# Try to find JSON in the response
import json
# Look for JSON block
json_match = re.search(r'\{[^}]+\}', response)
if json_match:
try:
mapping = json.loads(json_match.group())
# Validate keys
return {k: v for k, v in mapping.items() if k in valid_speakers}
except json.JSONDecodeError:
pass
# Fallback: look for "Speaker X: Name" pattern
mapping = {}
for speaker in valid_speakers:
pattern = rf'["\']?{speaker}["\']?\s*[:=]\s*["\']?([^"\'\n,]+)'
match = re.search(pattern, response, re.IGNORECASE)
if match:
name = match.group(1).strip()
# Ensure name is in our character list
for char in CHARACTERS:
if char.lower() in name.lower():
mapping[speaker] = char
break
else:
mapping[speaker] = name
# Fill in any missing speakers
for speaker in valid_speakers:
if speaker not in mapping:
mapping[speaker] = f"Speaker {speaker}"
return mapping
# ============== Output Formatter ==============
class OutputFormatter:
"""Formats and merges utterances for output."""
@staticmethod
def is_non_word(text: str) -> bool:
"""Check if text is a non-word utterance (sound, modal particle)."""
return bool(NON_WORD_REGEX.match(text.strip()))
@classmethod
def merge_utterances(cls, utterances: List[Utterance]) -> List[Utterance]:
"""
Merge consecutive utterances from the same speaker that are non-words
with adjacent meaningful utterances.
"""
if not utterances:
return []
merged = []
i = 0
while i < len(utterances):
current = utterances[i]
# Check if current is a non-word
if cls.is_non_word(current.text):
# Look ahead to find the next meaningful utterance from same speaker
j = i + 1
while j < len(utterances) and utterances[j].speaker == current.speaker:
if not cls.is_non_word(utterances[j].text):
# Merge current into the next meaningful one
utterances[j].text = current.text + " " + utterances[j].text
utterances[j].start_ms = current.start_ms
i = j
break
j += 1
else:
# No meaningful utterance found, keep as is
merged.append(current)
i += 1
else:
# Check if previous was a non-word from same speaker
if merged and merged[-1].speaker == current.speaker and cls.is_non_word(merged[-1].text):
# Merge previous into current
current.text = merged[-1].text + " " + current.text
current.start_ms = merged[-1].start_ms
merged.pop()
merged.append(current)
i += 1
return merged
@classmethod
def format_output(cls, utterances: List[Utterance], speaker_mapping: Dict[str, str]) -> str:
"""Format utterances to output string."""
# Apply speaker names
for utt in utterances:
utt.inferred_name = speaker_mapping.get(utt.speaker, f"Speaker {utt.speaker}")
# Merge consecutive non-words
merged = cls.merge_utterances(utterances)
# Format lines
lines = []
for utt in merged:
# Skip standalone non-words unless they're at the end
if cls.is_non_word(utt.text) and utt != merged[-1]:
continue
speaker_name = utt.inferred_name or f"Speaker {utt.speaker}"
lines.append(f"{utt.timestamp}({speaker_name}) {utt.text}")
return '\n'.join(lines)
# ============== Main Processor ==============
class EpisodeProcessor:
"""Main processor that orchestrates transcription and naming."""
def __init__(self):
# Check API keys
assembly_key = os.getenv("ASSEMBLYAI_API_KEY")
openai_key = os.getenv("OPENAI_API_KEY")
if not assembly_key:
raise ValueError("ASSEMBLYAI_API_KEY environment variable is required")
if not openai_key:
raise ValueError("OPENAI_API_KEY environment variable is required")
self.transcriber = AssemblyAITranscriber(assembly_key)
# Get LLM configuration from environment
openai_base = os.getenv("OPENAI_BASE_URL", DEFAULT_LLM_BASE_URL)
llm_model = os.getenv("LLM_MODEL") # e.g., "kimi-for-coding"
self.namer = SpeakerNamer(openai_key, openai_base, llm_model)
self.progress = ProgressManager()
self.formatter = OutputFormatter()
# Ensure output directory exists
OUTPUT_DIR.mkdir(exist_ok=True)
def process_episode(self, video_file: Path) -> bool:
"""Process a single episode. Returns True on success."""
filename = video_file.name
# Check if already completed
if self.progress.is_completed(filename):
print(f"Skipping {filename} (already completed)")
return True
print(f"\n{'='*50}")
print(f"Processing: {filename}")
print(f"{'='*50}")
try:
# Step 1: Transcription
self.progress.set_status(filename, "transcribing")
utterances = self.transcriber.transcribe(video_file)
if not utterances:
print(f" No utterances found in {filename}")
self.progress.set_status(filename, "completed", output_file=None)
return True
print(f" Found {len(utterances)} utterances")
# Step 2: Speaker naming
self.progress.set_status(filename, "naming")
speaker_mapping = self.namer.infer_speakers(utterances)
# Step 3: Format and save
output_text = self.formatter.format_output(utterances, speaker_mapping)
output_filename = video_file.stem + ".txt"
output_path = OUTPUT_DIR / output_filename
with open(output_path, 'w', encoding='utf-8') as f:
f.write(output_text)
self.progress.set_status(filename, "completed", output_file=str(output_path))
print(f" Saved to: {output_path}")
return True
except Exception as e:
error_msg = str(e)
self.progress.set_status(filename, "error", error_message=error_msg)
print(f" ERROR: {error_msg}")
return False
def process_all(self):
"""Process all video files in episodes directory."""
# Find all video files
video_extensions = {'.mp4', '.mkv', '.avi', '.mov', '.webm'}
video_files = [
f for f in EPISODES_DIR.iterdir()
if f.is_file() and f.suffix.lower() in video_extensions
]
if not video_files:
print(f"No video files found in {EPISODES_DIR}")
return
print(f"Found {len(video_files)} video file(s) to process")
success_count = 0
fail_count = 0
for video_file in sorted(video_files):
if self.process_episode(video_file):
success_count += 1
else:
fail_count += 1
print(f"\n{'='*50}")
print(f"Processing complete: {success_count} succeeded, {fail_count} failed")
print(f"Transcripts saved to: {OUTPUT_DIR}")
print(f"Progress tracked in: {PROGRESS_FILE}")
# ============== CLI ==============
def print_usage():
print("""
Usage: uv run transcribe_episodes.py [command]
Commands:
(none) Process all episodes
reset Reset progress for all files (will re-process everything)
reset <file> Reset progress for specific file
status Show current progress status
test-llm Test LLM API connection (diagnostic)
Environment Variables:
ASSEMBLYAI_API_KEY Required - Your AssemblyAI API key
OPENAI_API_KEY Required - Your OpenAI/Kimi API key
OPENAI_BASE_URL Optional - API endpoint URL
- Regular Kimi: https://api.moonshot.cn/v1
- Kimi Code: https://api.kimi.com/coding/v1
LLM_MODEL Optional - Model name (e.g., "kimi-for-coding")
Examples:
# Regular Kimi
export ASSEMBLYAI_API_KEY="your-assembly-key"
export OPENAI_API_KEY="your-kimi-key"
uv run transcribe_episodes.py
# Kimi Code
export ASSEMBLYAI_API_KEY="your-assembly-key"
export OPENAI_API_KEY="your-kimi-code-key"
export OPENAI_BASE_URL="https://api.kimi.com/coding/v1"
export LLM_MODEL="kimi-for-coding"
uv run transcribe_episodes.py
# Test connection:
uv run transcribe_episodes.py test-llm
""")
def test_llm_connection():
"""Test LLM API connection and print diagnostic info."""
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL", DEFAULT_LLM_BASE_URL)
model = os.getenv("LLM_MODEL") # e.g., "kimi-for-coding"
print("=" * 50)
print("LLM API Connection Test")
print("=" * 50)
print(f"\nConfiguration:")
print(f" Base URL: {base_url}")
print(f" Model: {model or '(auto-detect)'}")
print(f" API Key: {'Set (starts with ' + api_key[:12] + '...)' if api_key else 'NOT SET'}")
if not api_key:
print("\n❌ ERROR: OPENAI_API_KEY environment variable is not set!")
print("\nTo fix:")
print(" export OPENAI_API_KEY='your-api-key-here'")
return
# Try connecting
print(f"\nTesting connection...")
is_kimi = "moonshot" in base_url or "kimi" in base_url
if not model:
model = "kimi-latest" if is_kimi else "gpt-4o-mini"
endpoints_to_try = [base_url]
for endpoint in endpoints_to_try:
print(f"\n Trying endpoint: {endpoint}")
print(f" Model: {model}")
try:
client = OpenAI(api_key=api_key, base_url=endpoint)
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": "Say 'Hello' and nothing else."}],
max_tokens=10
)
print(f" ✅ SUCCESS!")
print(f" Response: '{response.choices[0].message.content}'")
print(f"\n✅ LLM API is working correctly!")
return
except Exception as e:
error_str = str(e)
print(f" ❌ Failed: {error_str}")
if "401" in error_str or "403" in error_str:
print("\n Authentication Error - Possible causes:")
print(" 1. API key is incorrect or has been revoked")
print(" 2. API key has no credits remaining")
print(" 3. Wrong API key for the selected service")
print(" 4. Kimi Code API requires special access (only for Coding Agents)")
print("\n To fix:")
print(" - For regular Kimi, use: export OPENAI_BASE_URL='https://api.moonshot.cn/v1'")
print(" - For Kimi Code: Ensure your account has Coding Agent access")
print(" - Get a regular Kimi API key from: https://platform.moonshot.cn/")
print("\n❌ All endpoints failed. Please check your API key and try again.")
def show_status():
"""Show current progress status."""
progress = ProgressManager()
print(f"\nProgress Status ({PROGRESS_FILE}):")
print("-" * 50)
if not progress.progress:
print("No progress recorded yet.")
return
for filename, data in sorted(progress.progress.items()):
status = data.get('status', 'unknown')
output = data.get('output_file', '-')
error = data.get('error', '')
status_icon = "" if status == "completed" else "" if status == "error" else ""
print(f"{status_icon} {filename}: {status}")
if output and status == "completed":
print(f" Output: {output}")
if error:
print(f" Error: {error}")
def main():
args = sys.argv[1:]
if len(args) > 0:
command = args[0]
if command == "help" or command == "--help" or command == "-h":
print_usage()
return
elif command == "status":
show_status()
return
elif command == "reset":
progress = ProgressManager()
if len(args) > 1:
target = args[1]
progress.reset(target)
print(f"Reset progress for: {target}")
else:
progress.reset()
print("Reset all progress")
return
elif command == "test-llm":
test_llm_connection()
return
else:
print(f"Unknown command: {command}")
print_usage()
return
# Default: process all episodes
try:
processor = EpisodeProcessor()
processor.process_all()
except ValueError as e:
print(f"Error: {e}")
print("\nPlease set the required environment variables:")
print(" export ASSEMBLYAI_API_KEY='your-key'")
print(" export OPENAI_API_KEY='your-key'")
print("\nOptional (for Kimi Code):")
print(" export OPENAI_BASE_URL='https://api.kimi.com/coding/v1'")
print(" export LLM_MODEL='kimi-for-coding'")
print("\nFor regular Kimi, the base URL defaults to https://api.moonshot.cn/v1")
sys.exit(1)
except Exception as e:
print(f"Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()