#!/usr/bin/env python3 """ Step 4: Translate transcripts to Chinese. Input: Speaker files in "_speakers/" folder Output: JSON files with bilingual content in "_translated/" folder Features: - Caches translations in "_translate_cache/" to avoid re-translating - Reuses cached translations when available Output format: [ { "timestamp": "[00:00]", "speaker": "Malabar", "english": "To the moon and back.", "chinese": "到月球再回来。" }, ... ] Usage: uv run step4_translate.py Environment Variables: OPENAI_API_KEY - Required OPENAI_BASE_URL - Optional (for Kimi/GLM APIs) LLM_MODEL - Optional (e.g., "glm-4.5-air") """ import os import re import sys import json import hashlib from pathlib import Path from typing import List, Dict, Tuple, Optional from openai import OpenAI # ============== Configuration ============== INPUT_DIR = Path("_speakers") OUTPUT_DIR = Path("_translated") CACHE_DIR = Path("_translate_cache") BATCH_SIZE = 5 # Translate 5 lines at a time for efficiency # Default configurations for different providers DEFAULT_CONFIGS = { "openai": { "base_url": None, "model": "gpt-4o-mini" }, "moonshot": { "base_url": "https://api.moonshot.cn/v1", "model": "kimi-latest" }, "bigmodel": { # Zhipu AI (GLM) "base_url": "https://open.bigmodel.cn/api/paas/v4", "model": "glm-4.5-air" } } def ensure_dirs(): """Ensure output directories exist.""" OUTPUT_DIR.mkdir(exist_ok=True) CACHE_DIR.mkdir(exist_ok=True) def get_llm_config() -> Tuple[str, str]: """Get LLM configuration from environment.""" api_key = os.getenv("OPENAI_API_KEY") if not api_key: raise ValueError("OPENAI_API_KEY environment variable is required") base_url = os.getenv("OPENAI_BASE_URL") model = os.getenv("LLM_MODEL") if base_url: if model: return base_url, model if "bigmodel" in base_url: return base_url, DEFAULT_CONFIGS["bigmodel"]["model"] elif "moonshot" in base_url or "kimi" in base_url: return base_url, DEFAULT_CONFIGS["moonshot"]["model"] else: return base_url, DEFAULT_CONFIGS["openai"]["model"] else: return None, model or DEFAULT_CONFIGS["openai"]["model"] def get_cache_key(english: str) -> str: """Generate a cache key for an English sentence.""" # Use MD5 hash for consistent cache keys return hashlib.md5(english.strip().encode('utf-8')).hexdigest()[:16] def get_cached_translation(english: str) -> Optional[str]: """Check if a translation exists in cache.""" cache_key = get_cache_key(english) cache_file = CACHE_DIR / f"{cache_key}.json" if cache_file.exists(): try: with open(cache_file, 'r', encoding='utf-8') as f: data = json.load(f) if data.get("english") == english.strip(): return data.get("chinese") except Exception: pass return None def save_translation_cache(english: str, chinese: str): """Save a translation to the cache.""" if not chinese or chinese == "[Translation failed]": return cache_key = get_cache_key(english) cache_file = CACHE_DIR / f"{cache_key}.json" try: with open(cache_file, 'w', encoding='utf-8') as f: json.dump({ "english": english.strip(), "chinese": chinese.strip() }, f, ensure_ascii=False) except Exception as e: print(f" Warning: Failed to save cache: {e}") def parse_line(line: str) -> Optional[Dict[str, str]]: """Parse a line from the speaker file.""" pattern = r'^(\[\d{2}:\d{2}\])\(([^)]+)\) (.+)$' match = re.match(pattern, line.strip()) if match: return { "timestamp": match.group(1), "speaker": match.group(2), "english": match.group(3) } return None def translate_batch(lines: List[Dict[str, str]], client: OpenAI, model: str) -> List[str]: """Translate a batch of lines to Chinese.""" # Build the prompt with numbered lines lines_text = "\n".join([f"{i+1}. {line['english']}" for i, line in enumerate(lines)]) prompt = f"""Translate these lines from "Little Malabar" to natural Chinese. Maintain the meaning and keep it child-friendly. Lines to translate: {lines_text} Reply with ONLY the Chinese translations, one per line, numbered the same way: 1. [translation] 2. [translation] ...""" try: response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "You translate children's show dialogue to Chinese. Reply with ONLY the translations."}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=1000 ) message = response.choices[0].message result = message.content or "" # GLM models may put response in reasoning_content if not result and hasattr(message, 'reasoning_content') and message.reasoning_content: result = message.reasoning_content # Parse the numbered responses translations = [] result_lines = result.strip().split('\n') for i in range(len(lines)): expected_num = i + 1 translation = "" # Find line starting with "N. " for line in result_lines: line = line.strip() match = re.match(rf'^{expected_num}\.\s*(.+)$', line) if match: translation = match.group(1).strip() break if not translation: # Fallback: try to extract any text for line in result_lines: if line.strip() and not line.strip()[0].isdigit(): translation = line.strip() break translations.append(translation or "[Translation failed]") return translations except Exception as e: print(f" Batch translation error: {e}") return ["[Translation failed]"] * len(lines) def translate_file(input_path: Path, client: OpenAI, model: str) -> Path: """Translate a single speaker file to Chinese.""" print(f"\n{'='*50}") print(f"Translating: {input_path.name}") print(f"{'='*50}") # Read the file with open(input_path, 'r', encoding='utf-8') as f: lines = f.readlines() print(f" Total lines: {len(lines)}") # Parse all lines first parsed_lines = [] for i, line in enumerate(lines, 1): line = line.strip() if not line: continue line_data = parse_line(line) if line_data: parsed_lines.append(line_data) else: print(f" Warning: Could not parse line {i}: {line[:50]}...") print(f" Parsed {len(parsed_lines)} valid lines") # Check cache for each line cached_count = 0 skipped_count = 0 to_translate = [] for line_data in parsed_lines: english = line_data["english"] speaker = line_data["speaker"] # Skip translation for opening song (speaker is "Song") if speaker == "Song": line_data["chinese"] = english # Keep English for song lyrics skipped_count += 1 continue cached = get_cached_translation(english) if cached: line_data["chinese"] = cached cached_count += 1 else: to_translate.append(line_data) print(f" Cached translations: {cached_count}") print(f" Skipped (Song): {skipped_count}") print(f" To translate: {len(to_translate)}") # Translate in batches total_batches = (len(to_translate) + BATCH_SIZE - 1) // BATCH_SIZE translated_count = 0 for batch_idx in range(total_batches): start = batch_idx * BATCH_SIZE end = min(start + BATCH_SIZE, len(to_translate)) batch = to_translate[start:end] print(f" Translating batch {batch_idx + 1}/{total_batches} (lines {start+1}-{end})...") translations = translate_batch(batch, client, model) for i, line_data in enumerate(batch): translation = translations[i] if i < len(translations) else "[Translation failed]" line_data["chinese"] = translation translated_count += 1 # Save to cache if translation and translation != "[Translation failed]": save_translation_cache(line_data["english"], translation) # Combine results (cached + translated + skipped) result = parsed_lines print(f" Translation complete: {cached_count} from cache, {translated_count} new, {skipped_count} skipped (Song)") # Save JSON output output_filename = input_path.stem.replace("_speakers", "") + "_translated.json" output_path = OUTPUT_DIR / output_filename with open(output_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f" Saved to: {output_path}") return output_path def get_input_files() -> list[Path]: """Discover all text files in _speakers/ folder.""" if not INPUT_DIR.exists(): return [] files = [f for f in INPUT_DIR.iterdir() if f.is_file() and f.suffix == '.txt'] return sorted(files) def main(): ensure_dirs() # Get LLM config base_url, model = get_llm_config() client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=base_url) # Count existing cache entries cache_count = len(list(CACHE_DIR.glob("*.json"))) print(f"Using model: {model}") print(f"Endpoint: {base_url or 'OpenAI default'}") print(f"Batch size: {BATCH_SIZE} lines per request") print(f"Cache entries: {cache_count}") # Discover input files input_files = get_input_files() if not input_files: print(f"No .txt files found in {INPUT_DIR}/") sys.exit(1) print(f"Found {len(input_files)} file(s) to translate") print("") # Process all files success_count = 0 fail_count = 0 for input_path in input_files: try: output_path = translate_file(input_path, client, model) if output_path: success_count += 1 except Exception as e: print(f"\n❌ Failed to translate {input_path.name}: {e}") import traceback traceback.print_exc() fail_count += 1 # Summary final_cache_count = len(list(CACHE_DIR.glob("*.json"))) print("\n" + "="*50) print(f"Step 4 Complete: {success_count} succeeded, {fail_count} failed") print(f"Total cache entries: {final_cache_count}") print(f"Output saved to: {OUTPUT_DIR}/") print("="*50) if fail_count > 0: sys.exit(1) if __name__ == "__main__": main()