362 lines
11 KiB
Python
362 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Step 4: Translate transcripts to Chinese.
|
|
|
|
Input: Speaker files in "_speakers/" folder
|
|
Output: JSON files with bilingual content in "_translated/" folder
|
|
|
|
Features:
|
|
- Caches translations in "_translate_cache/" to avoid re-translating
|
|
- Reuses cached translations when available
|
|
|
|
Output format:
|
|
[
|
|
{
|
|
"timestamp": "[00:00]",
|
|
"speaker": "Malabar",
|
|
"english": "To the moon and back.",
|
|
"chinese": "到月球再回来。"
|
|
},
|
|
...
|
|
]
|
|
|
|
Usage:
|
|
uv run step4_translate.py
|
|
|
|
Environment Variables:
|
|
OPENAI_API_KEY - Required
|
|
OPENAI_BASE_URL - Optional (for Kimi/GLM APIs)
|
|
LLM_MODEL - Optional (e.g., "glm-4.5-air")
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import hashlib
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple, Optional
|
|
from openai import OpenAI
|
|
|
|
# ============== Configuration ==============
|
|
|
|
INPUT_DIR = Path("_speakers")
|
|
OUTPUT_DIR = Path("_translated")
|
|
CACHE_DIR = Path("_translate_cache")
|
|
BATCH_SIZE = 5 # Translate 5 lines at a time for efficiency
|
|
|
|
# Default configurations for different providers
|
|
DEFAULT_CONFIGS = {
|
|
"openai": {
|
|
"base_url": None,
|
|
"model": "gpt-4o-mini"
|
|
},
|
|
"moonshot": {
|
|
"base_url": "https://api.moonshot.cn/v1",
|
|
"model": "kimi-latest"
|
|
},
|
|
"bigmodel": { # Zhipu AI (GLM)
|
|
"base_url": "https://open.bigmodel.cn/api/paas/v4",
|
|
"model": "glm-4.5-air"
|
|
}
|
|
}
|
|
|
|
|
|
def ensure_dirs():
|
|
"""Ensure output directories exist."""
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
CACHE_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
def get_llm_config() -> Tuple[str, str]:
|
|
"""Get LLM configuration from environment."""
|
|
api_key = os.getenv("OPENAI_API_KEY")
|
|
if not api_key:
|
|
raise ValueError("OPENAI_API_KEY environment variable is required")
|
|
|
|
base_url = os.getenv("OPENAI_BASE_URL")
|
|
model = os.getenv("LLM_MODEL")
|
|
|
|
if base_url:
|
|
if model:
|
|
return base_url, model
|
|
if "bigmodel" in base_url:
|
|
return base_url, DEFAULT_CONFIGS["bigmodel"]["model"]
|
|
elif "moonshot" in base_url or "kimi" in base_url:
|
|
return base_url, DEFAULT_CONFIGS["moonshot"]["model"]
|
|
else:
|
|
return base_url, DEFAULT_CONFIGS["openai"]["model"]
|
|
else:
|
|
return None, model or DEFAULT_CONFIGS["openai"]["model"]
|
|
|
|
|
|
def get_cache_key(english: str) -> str:
|
|
"""Generate a cache key for an English sentence."""
|
|
# Use MD5 hash for consistent cache keys
|
|
return hashlib.md5(english.strip().encode('utf-8')).hexdigest()[:16]
|
|
|
|
|
|
def get_cached_translation(english: str) -> Optional[str]:
|
|
"""Check if a translation exists in cache."""
|
|
cache_key = get_cache_key(english)
|
|
cache_file = CACHE_DIR / f"{cache_key}.json"
|
|
|
|
if cache_file.exists():
|
|
try:
|
|
with open(cache_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if data.get("english") == english.strip():
|
|
return data.get("chinese")
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def save_translation_cache(english: str, chinese: str):
|
|
"""Save a translation to the cache."""
|
|
if not chinese or chinese == "[Translation failed]":
|
|
return
|
|
|
|
cache_key = get_cache_key(english)
|
|
cache_file = CACHE_DIR / f"{cache_key}.json"
|
|
|
|
try:
|
|
with open(cache_file, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
"english": english.strip(),
|
|
"chinese": chinese.strip()
|
|
}, f, ensure_ascii=False)
|
|
except Exception as e:
|
|
print(f" Warning: Failed to save cache: {e}")
|
|
|
|
|
|
def parse_line(line: str) -> Optional[Dict[str, str]]:
|
|
"""Parse a line from the speaker file."""
|
|
pattern = r'^(\[\d{2}:\d{2}\])\(([^)]+)\) (.+)$'
|
|
match = re.match(pattern, line.strip())
|
|
if match:
|
|
return {
|
|
"timestamp": match.group(1),
|
|
"speaker": match.group(2),
|
|
"english": match.group(3)
|
|
}
|
|
return None
|
|
|
|
|
|
def translate_batch(lines: List[Dict[str, str]], client: OpenAI, model: str) -> List[str]:
|
|
"""Translate a batch of lines to Chinese."""
|
|
# Build the prompt with numbered lines
|
|
lines_text = "\n".join([f"{i+1}. {line['english']}" for i, line in enumerate(lines)])
|
|
|
|
prompt = f"""Translate these lines from "Little Malabar" to natural Chinese.
|
|
Maintain the meaning and keep it child-friendly.
|
|
|
|
Lines to translate:
|
|
{lines_text}
|
|
|
|
Reply with ONLY the Chinese translations, one per line, numbered the same way:
|
|
1. [translation]
|
|
2. [translation]
|
|
..."""
|
|
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=[
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
temperature=0.3,
|
|
max_tokens=1000,
|
|
extra_body={"thinking": {"type": "disabled"}} # Disable thinking
|
|
)
|
|
|
|
message = response.choices[0].message
|
|
result = message.content.strip() if message.content else ""
|
|
|
|
# Parse the numbered responses
|
|
translations = []
|
|
result_lines = result.strip().split('\n')
|
|
|
|
for i in range(len(lines)):
|
|
expected_num = i + 1
|
|
translation = ""
|
|
|
|
# Find line starting with "N. "
|
|
for line in result_lines:
|
|
line = line.strip()
|
|
match = re.match(rf'^{expected_num}\.\s*(.+)$', line)
|
|
if match:
|
|
translation = match.group(1).strip()
|
|
break
|
|
|
|
if not translation:
|
|
# Fallback: try to extract any text
|
|
for line in result_lines:
|
|
if line.strip() and not line.strip()[0].isdigit():
|
|
translation = line.strip()
|
|
break
|
|
|
|
translations.append(translation or "[Translation failed]")
|
|
|
|
return translations
|
|
|
|
except Exception as e:
|
|
print(f" Batch translation error: {e}")
|
|
return ["[Translation failed]"] * len(lines)
|
|
|
|
|
|
def translate_file(input_path: Path, client: OpenAI, model: str) -> Path:
|
|
"""Translate a single speaker file to Chinese."""
|
|
print(f"\n{'='*50}")
|
|
print(f"Translating: {input_path.name}")
|
|
print(f"{'='*50}")
|
|
|
|
# Read the file
|
|
with open(input_path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
print(f" Total lines: {len(lines)}")
|
|
|
|
# Parse all lines first
|
|
parsed_lines = []
|
|
for i, line in enumerate(lines, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
line_data = parse_line(line)
|
|
if line_data:
|
|
parsed_lines.append(line_data)
|
|
else:
|
|
print(f" Warning: Could not parse line {i}: {line[:50]}...")
|
|
|
|
print(f" Parsed {len(parsed_lines)} valid lines")
|
|
|
|
# Check cache for each line
|
|
cached_count = 0
|
|
skipped_count = 0
|
|
to_translate = []
|
|
|
|
for line_data in parsed_lines:
|
|
english = line_data["english"]
|
|
speaker = line_data["speaker"]
|
|
|
|
# Skip translation for opening song (speaker is "Song")
|
|
if speaker == "Song":
|
|
line_data["chinese"] = english # Keep English for song lyrics
|
|
skipped_count += 1
|
|
continue
|
|
|
|
cached = get_cached_translation(english)
|
|
if cached:
|
|
line_data["chinese"] = cached
|
|
cached_count += 1
|
|
else:
|
|
to_translate.append(line_data)
|
|
|
|
print(f" Cached translations: {cached_count}")
|
|
print(f" Skipped (Song): {skipped_count}")
|
|
print(f" To translate: {len(to_translate)}")
|
|
|
|
# Translate in batches
|
|
total_batches = (len(to_translate) + BATCH_SIZE - 1) // BATCH_SIZE
|
|
translated_count = 0
|
|
|
|
for batch_idx in range(total_batches):
|
|
start = batch_idx * BATCH_SIZE
|
|
end = min(start + BATCH_SIZE, len(to_translate))
|
|
batch = to_translate[start:end]
|
|
|
|
print(f" Translating batch {batch_idx + 1}/{total_batches} (lines {start+1}-{end})...")
|
|
|
|
translations = translate_batch(batch, client, model)
|
|
|
|
for i, line_data in enumerate(batch):
|
|
translation = translations[i] if i < len(translations) else "[Translation failed]"
|
|
line_data["chinese"] = translation
|
|
translated_count += 1
|
|
|
|
# Save to cache
|
|
if translation and translation != "[Translation failed]":
|
|
save_translation_cache(line_data["english"], translation)
|
|
|
|
# Combine results (cached + translated + skipped)
|
|
result = parsed_lines
|
|
|
|
print(f" Translation complete: {cached_count} from cache, {translated_count} new, {skipped_count} skipped (Song)")
|
|
|
|
# Save JSON output
|
|
output_filename = input_path.stem.replace("_speakers", "") + "_translated.json"
|
|
output_path = OUTPUT_DIR / output_filename
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f" Saved to: {output_path}")
|
|
|
|
return output_path
|
|
|
|
|
|
def get_input_files() -> list[Path]:
|
|
"""Discover all text files in _speakers/ folder."""
|
|
if not INPUT_DIR.exists():
|
|
return []
|
|
files = [f for f in INPUT_DIR.iterdir() if f.is_file() and f.suffix == '.txt']
|
|
return sorted(files)
|
|
|
|
|
|
def main():
|
|
ensure_dirs()
|
|
|
|
# Get LLM config
|
|
base_url, model = get_llm_config()
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=base_url)
|
|
|
|
# Count existing cache entries
|
|
cache_count = len(list(CACHE_DIR.glob("*.json")))
|
|
|
|
print(f"Using model: {model}")
|
|
print(f"Endpoint: {base_url or 'OpenAI default'}")
|
|
print(f"Batch size: {BATCH_SIZE} lines per request")
|
|
print(f"Cache entries: {cache_count}")
|
|
|
|
# Discover input files
|
|
input_files = get_input_files()
|
|
|
|
if not input_files:
|
|
print(f"No .txt files found in {INPUT_DIR}/")
|
|
sys.exit(1)
|
|
|
|
print(f"Found {len(input_files)} file(s) to translate")
|
|
print("")
|
|
|
|
# Process all files
|
|
success_count = 0
|
|
fail_count = 0
|
|
|
|
for input_path in input_files:
|
|
try:
|
|
output_path = translate_file(input_path, client, model)
|
|
if output_path:
|
|
success_count += 1
|
|
except Exception as e:
|
|
print(f"\n❌ Failed to translate {input_path.name}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
fail_count += 1
|
|
|
|
# Summary
|
|
final_cache_count = len(list(CACHE_DIR.glob("*.json")))
|
|
print("\n" + "="*50)
|
|
print(f"Step 4 Complete: {success_count} succeeded, {fail_count} failed")
|
|
print(f"Total cache entries: {final_cache_count}")
|
|
print(f"Output saved to: {OUTPUT_DIR}/")
|
|
print("="*50)
|
|
|
|
if fail_count > 0:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|