Files
malabar/step4_translate.py
2026-03-05 17:47:54 +08:00

368 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Step 4: Translate transcripts to Chinese.
Input: Speaker files in "_speakers/" folder
Output: JSON files with bilingual content in "_translated/" folder
Features:
- Caches translations in "_translate_cache/" to avoid re-translating
- Reuses cached translations when available
Output format:
[
{
"timestamp": "[00:00]",
"speaker": "Malabar",
"english": "To the moon and back.",
"chinese": "到月球再回来。"
},
...
]
Usage:
uv run step4_translate.py
Environment Variables:
OPENAI_API_KEY - Required
OPENAI_BASE_URL - Optional (for Kimi/GLM APIs)
LLM_MODEL - Optional (e.g., "glm-4.5-air")
"""
import os
import re
import sys
import json
import hashlib
from pathlib import Path
from typing import List, Dict, Tuple, Optional
from openai import OpenAI
# ============== Configuration ==============
INPUT_DIR = Path("_speakers")
OUTPUT_DIR = Path("_translated")
CACHE_DIR = Path("_translate_cache")
BATCH_SIZE = 5 # Translate 5 lines at a time for efficiency
# Default configurations for different providers
DEFAULT_CONFIGS = {
"openai": {
"base_url": None,
"model": "gpt-4o-mini"
},
"moonshot": {
"base_url": "https://api.moonshot.cn/v1",
"model": "kimi-latest"
},
"bigmodel": { # Zhipu AI (GLM)
"base_url": "https://open.bigmodel.cn/api/paas/v4",
"model": "glm-4.5-air"
}
}
def ensure_dirs():
"""Ensure output directories exist."""
OUTPUT_DIR.mkdir(exist_ok=True)
CACHE_DIR.mkdir(exist_ok=True)
def get_llm_config() -> Tuple[str, str]:
"""Get LLM configuration from environment."""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable is required")
base_url = os.getenv("OPENAI_BASE_URL")
model = os.getenv("LLM_MODEL")
if base_url:
if model:
return base_url, model
if "bigmodel" in base_url:
return base_url, DEFAULT_CONFIGS["bigmodel"]["model"]
elif "moonshot" in base_url or "kimi" in base_url:
return base_url, DEFAULT_CONFIGS["moonshot"]["model"]
else:
return base_url, DEFAULT_CONFIGS["openai"]["model"]
else:
return None, model or DEFAULT_CONFIGS["openai"]["model"]
def get_cache_key(english: str) -> str:
"""Generate a cache key for an English sentence."""
# Use MD5 hash for consistent cache keys
return hashlib.md5(english.strip().encode('utf-8')).hexdigest()[:16]
def get_cached_translation(english: str) -> Optional[str]:
"""Check if a translation exists in cache."""
cache_key = get_cache_key(english)
cache_file = CACHE_DIR / f"{cache_key}.json"
if cache_file.exists():
try:
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if data.get("english") == english.strip():
return data.get("chinese")
except Exception:
pass
return None
def save_translation_cache(english: str, chinese: str):
"""Save a translation to the cache."""
if not chinese or chinese == "[Translation failed]":
return
cache_key = get_cache_key(english)
cache_file = CACHE_DIR / f"{cache_key}.json"
try:
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({
"english": english.strip(),
"chinese": chinese.strip()
}, f, ensure_ascii=False)
except Exception as e:
print(f" Warning: Failed to save cache: {e}")
def parse_line(line: str) -> Optional[Dict[str, str]]:
"""Parse a line from the speaker file."""
pattern = r'^(\[\d{2}:\d{2}\])\(([^)]+)\) (.+)$'
match = re.match(pattern, line.strip())
if match:
return {
"timestamp": match.group(1),
"speaker": match.group(2),
"english": match.group(3)
}
return None
def translate_batch(lines: List[Dict[str, str]], client: OpenAI, model: str) -> List[str]:
"""Translate a batch of lines to Chinese."""
# Build the prompt with numbered lines
lines_text = "\n".join([f"{i+1}. {line['english']}" for i, line in enumerate(lines)])
prompt = f"""Translate these lines from "Little Malabar" to natural Chinese.
Maintain the meaning and keep it child-friendly.
Lines to translate:
{lines_text}
Reply with ONLY the Chinese translations, one per line, numbered the same way:
1. [translation]
2. [translation]
..."""
try:
response = client.chat.completions.create(
model=model,
messages=[
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=1000,
extra_body={"thinking": {"type": "disabled"}} # Disable thinking
)
message = response.choices[0].message
result = message.content.strip() if message.content else ""
# Parse the numbered responses
translations = []
result_lines = result.strip().split('\n')
for i in range(len(lines)):
expected_num = i + 1
translation = ""
# Find line starting with "N. "
for line in result_lines:
line = line.strip()
match = re.match(rf'^{expected_num}\.\s*(.+)$', line)
if match:
translation = match.group(1).strip()
break
if not translation:
# Fallback: try to extract any text
for line in result_lines:
if line.strip() and not line.strip()[0].isdigit():
translation = line.strip()
break
translations.append(translation or "[Translation failed]")
return translations
except Exception as e:
print(f" Batch translation error: {e}")
return ["[Translation failed]"] * len(lines)
def translate_file(input_path: Path, client: OpenAI, model: str) -> Path:
"""Translate a single speaker file to Chinese."""
print(f"\n{'='*50}")
print(f"Translating: {input_path.name}")
print(f"{'='*50}")
# Read the file
with open(input_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
print(f" Total lines: {len(lines)}")
# Parse all lines first
parsed_lines = []
for i, line in enumerate(lines, 1):
line = line.strip()
if not line:
continue
line_data = parse_line(line)
if line_data:
parsed_lines.append(line_data)
else:
print(f" Warning: Could not parse line {i}: {line[:50]}...")
print(f" Parsed {len(parsed_lines)} valid lines")
# Check cache for each line
cached_count = 0
skipped_count = 0
to_translate = []
for line_data in parsed_lines:
english = line_data["english"]
speaker = line_data["speaker"]
# Skip translation for opening song (speaker is "Song")
if speaker == "Song":
line_data["chinese"] = english # Keep English for song lyrics
skipped_count += 1
continue
# Skip dummy "THE END" line (added by step 2 for timing purposes)
if english.strip() == "THE END":
line_data["chinese"] = english # Keep as-is
skipped_count += 1
continue
cached = get_cached_translation(english)
if cached:
line_data["chinese"] = cached
cached_count += 1
else:
to_translate.append(line_data)
print(f" Cached translations: {cached_count}")
print(f" Skipped (Song): {skipped_count}")
print(f" To translate: {len(to_translate)}")
# Translate in batches
total_batches = (len(to_translate) + BATCH_SIZE - 1) // BATCH_SIZE
translated_count = 0
for batch_idx in range(total_batches):
start = batch_idx * BATCH_SIZE
end = min(start + BATCH_SIZE, len(to_translate))
batch = to_translate[start:end]
print(f" Translating batch {batch_idx + 1}/{total_batches} (lines {start+1}-{end})...")
translations = translate_batch(batch, client, model)
for i, line_data in enumerate(batch):
translation = translations[i] if i < len(translations) else "[Translation failed]"
line_data["chinese"] = translation
translated_count += 1
# Save to cache
if translation and translation != "[Translation failed]":
save_translation_cache(line_data["english"], translation)
# Combine results (cached + translated + skipped)
result = parsed_lines
print(f" Translation complete: {cached_count} from cache, {translated_count} new, {skipped_count} skipped (Song)")
# Save JSON output
output_filename = input_path.stem.replace("_speakers", "") + "_translated.json"
output_path = OUTPUT_DIR / output_filename
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f" Saved to: {output_path}")
return output_path
def get_input_files() -> list[Path]:
"""Discover all text files in _speakers/ folder."""
if not INPUT_DIR.exists():
return []
files = [f for f in INPUT_DIR.iterdir() if f.is_file() and f.suffix == '.txt']
return sorted(files)
def main():
ensure_dirs()
# Get LLM config
base_url, model = get_llm_config()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=base_url)
# Count existing cache entries
cache_count = len(list(CACHE_DIR.glob("*.json")))
print(f"Using model: {model}")
print(f"Endpoint: {base_url or 'OpenAI default'}")
print(f"Batch size: {BATCH_SIZE} lines per request")
print(f"Cache entries: {cache_count}")
# Discover input files
input_files = get_input_files()
if not input_files:
print(f"No .txt files found in {INPUT_DIR}/")
sys.exit(1)
print(f"Found {len(input_files)} file(s) to translate")
print("")
# Process all files
success_count = 0
fail_count = 0
for input_path in input_files:
try:
output_path = translate_file(input_path, client, model)
if output_path:
success_count += 1
except Exception as e:
print(f"\n❌ Failed to translate {input_path.name}: {e}")
import traceback
traceback.print_exc()
fail_count += 1
# Summary
final_cache_count = len(list(CACHE_DIR.glob("*.json")))
print("\n" + "="*50)
print(f"Step 4 Complete: {success_count} succeeded, {fail_count} failed")
print(f"Total cache entries: {final_cache_count}")
print(f"Output saved to: {OUTPUT_DIR}/")
print("="*50)
if fail_count > 0:
sys.exit(1)
if __name__ == "__main__":
main()