Files
homework-template/scripts/img2typ.py
2026-04-08 13:55:17 +08:00

344 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""
img2typ.py - Convert image files to typst format using AI API.
This script scans the data directory for image files matching a pattern,
converts them to typst format using an OpenAI-compatible API, and generates
a questions.json manifest.
"""
import argparse
import asyncio
import json
import logging
import re
import sys
from dataclasses import dataclass
from pathlib import Path
import aiohttp
from common import DATA_DIR, console, load_env, load_prompt, setup_logging
IMAGE_PATTERN = re.compile(r"^(\S)\s?([\d.]+)$")
EXCLUDED_PREFIXES = {"", "A", "a"}
IMAGE_EXTENSIONS = {
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".webp",
".PNG",
".JPG",
".JPEG",
".GIF",
".BMP",
".WEBP",
}
@dataclass
class ConversionResult:
"""Result of an image to typst conversion."""
question: str
target: str
skipped: bool
success: bool
error: str | None = None
def find_images() -> list[Path]:
"""Find all image files in data directory matching the pattern."""
images = []
for file_path in DATA_DIR.iterdir():
if file_path.is_file() and file_path.suffix in IMAGE_EXTENSIONS:
stem = file_path.stem
match = IMAGE_PATTERN.match(stem)
if match and match.group(1) not in EXCLUDED_PREFIXES:
images.append(file_path)
return images
def check_typ_exists(image_path: Path) -> bool:
"""Check if corresponding .typ file exists."""
return image_path.with_suffix(".typ").exists()
def parse_markdown_blocks(text: str) -> str:
"""Remove markdown code blocks from text."""
block_pattern = re.compile(r"```(?:typst)?\s*\n?(.*?)\n?```", re.DOTALL)
matches = list(block_pattern.finditer(text))
if matches:
return matches[0].group(1).strip()
return text.strip()
async def call_api(
session: aiohttp.ClientSession,
image_path: Path,
prompt: str,
endpoint: str,
api_key: str,
model: str,
logger: logging.Logger,
) -> str | None:
"""Call the AI API to convert image to typst format."""
import base64
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
payload = {
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/{image_path.suffix[1:]};base64,{image_data}"
},
},
],
}
],
"max_tokens": 4096,
}
logger.info(f"[{image_path.stem}] Converting... (timeout 300s)")
try:
async with session.post(
endpoint,
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=300),
) as response:
response.raise_for_status()
result = await response.json()
if "choices" not in result or len(result["choices"]) == 0:
logger.error(f"Invalid API response")
return None
content = result["choices"][0]["message"]["content"]
usage = result.get("usage", {})
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
logger.info(
f"[{image_path.stem}] Done: {input_tokens} in, {output_tokens} out"
)
return content
except asyncio.TimeoutError:
logger.error(f"[{image_path.stem}] Timeout")
return None
except asyncio.CancelledError:
logger.warning(f"[{image_path.stem}] Cancelled")
raise
except Exception as e:
logger.error(f"[{image_path.stem}] Error: {e}")
return None
async def convert_image(
session: aiohttp.ClientSession,
image_path: Path,
prompt: str,
api_config: dict,
logger: logging.Logger,
dry_run: bool,
) -> ConversionResult:
"""Convert a single image to typst format."""
stem = image_path.stem
match = IMAGE_PATTERN.match(stem)
question_name = match.group(1) + match.group(2) if match else stem
typ_path = image_path.with_suffix(".typ")
if typ_path.exists():
logger.info(f"[{question_name}] Skipping: .typ already exists")
return ConversionResult(
question=question_name, target=typ_path.name, skipped=True, success=True
)
if dry_run:
logger.info(f"[{question_name}] Would convert -> {typ_path.name}")
return ConversionResult(
question=question_name, target=typ_path.name, skipped=False, success=True
)
content = await call_api(
session,
image_path,
prompt,
str(api_config["endpoint"]),
api_config["key"],
api_config["model"],
logger,
)
if content is None:
return ConversionResult(
question=question_name,
target=typ_path.name,
skipped=False,
success=False,
error="API call failed",
)
typst_code = parse_markdown_blocks(content)
try:
typ_path.write_text(typst_code, encoding="utf-8")
logger.info(
f"[{question_name}] Wrote {typ_path.name} ({len(typst_code)} bytes)"
)
return ConversionResult(
question=question_name, target=typ_path.name, skipped=False, success=True
)
except IOError as e:
logger.error(f"[{question_name}] Write failed: {e}")
return ConversionResult(
question=question_name,
target=typ_path.name,
skipped=False,
success=False,
error=str(e),
)
def generate_questions_json(
results: list[ConversionResult],
logger: logging.Logger,
dry_run: bool,
) -> None:
"""Generate questions.json from conversion results."""
from common import find_attachments
questions = []
for r in results:
attachments = find_attachments(r.question)
questions.append(
{
"question": r.question,
"format": "typst",
"target": r.target,
"attachments": attachments,
}
)
output_path = DATA_DIR / "questions.json"
if dry_run:
logger.info(
f"[DRY-RUN] Would write questions.json with {len(questions)} entries"
)
logger.debug(f"Content: {json.dumps(questions, indent=4, ensure_ascii=False)}")
else:
output_path.write_text(
json.dumps(questions, indent=4, ensure_ascii=False), encoding="utf-8"
)
logger.info(f"Wrote {output_path.name} ({len(questions)} entries)")
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Convert image files to typst format using AI API"
)
parser.add_argument(
"-f",
"--file",
action="append",
dest="files",
help="Specific image files to process",
)
parser.add_argument(
"--dry-run", action="store_true", help="Do not call AI or write files"
)
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
parser.add_argument(
"--retry", type=int, default=3, help="Retry attempts (default: 3)"
)
parser.add_argument("-n", type=int, default=3, help="Concurrent limit (default: 3)")
return parser.parse_args()
async def async_main(args: argparse.Namespace, logger: logging.Logger) -> None:
"""Async main entry point."""
if args.files:
image_paths = []
for f in args.files:
p = Path(f)
if not p.is_absolute():
p = DATA_DIR / f
image_paths.append(p)
else:
image_paths = find_images()
logger.info(f"Found {len(image_paths)} images to process")
if not image_paths:
logger.warning("No images found to process")
return
api_config = load_env()
prompt = load_prompt("img2typ.prompt.txt")
semaphore = asyncio.Semaphore(args.n)
async def limited_convert(
session: aiohttp.ClientSession, img_path: Path
) -> ConversionResult:
async with semaphore:
return await convert_image(
session, img_path, prompt, api_config, logger, args.dry_run
)
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(limited_convert(session, img)) for img in image_paths
]
results = []
try:
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
except asyncio.CancelledError:
logger.warning("Cancelled! Shutting down...")
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
sys.exit(1)
results.sort(key=lambda r: r.question)
generate_questions_json(results, logger, args.dry_run)
skipped = sum(1 for r in results if r.skipped)
solved = sum(1 for r in results if r.success and not r.skipped)
logger.info(f"Complete: {solved}/{len(results)} converted, {skipped} skipped")
def main() -> None:
"""Main entry point."""
args = parse_args()
logger = setup_logging("img2typ", args.verbose)
logger.info(f"img2typ starting (Dry-run: {args.dry_run}, Workers: {args.n})")
logger.info(f"Data directory: {DATA_DIR}")
try:
asyncio.run(async_main(args, logger))
except KeyboardInterrupt:
logger.warning("Interrupted by user")
sys.exit(1)
if __name__ == "__main__":
main()