Files
konabot/konabot/plugins/typst/__init__.py
passthem 1eb7e62cfe
All checks were successful
continuous-integration/drone/push Build is passing
添加 Typst 支持
2026-01-18 12:25:17 +08:00

98 lines
3.1 KiB
Python

import asyncio
import subprocess
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import cast
from loguru import logger
from nonebot import on_command
from nonebot.adapters import Event, Bot
from nonebot_plugin_alconna import UniMessage, UniMsg
from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot
from nonebot.adapters.onebot.v11.message import Message as OB11Message
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import TMP_PATH
TEMPLATE_PATH = Path(__file__).parent / "template.typ"
TEMPLATE = TEMPLATE_PATH.read_text()
def render_sync(code: str) -> bytes:
with TemporaryDirectory(dir=TMP_PATH) as tmpdirname:
temp_dir = Path(tmpdirname).resolve()
temp_typ = temp_dir / "page.typ"
temp_typ.write_text(TEMPLATE + "\n\n" + code)
cmd = ["typst", "compile", temp_typ.name, "--format", "png", "--root", temp_dir]
result = subprocess.run(
cmd, capture_output=True, text=True, cwd=temp_dir.resolve(), timeout=50
)
logger.info(
f"渲染了 Typst "
f"STDOUT={result.stdout} "
f"STDERR={result.stderr} "
f"RETURNCODE={result.returncode}"
)
if result.returncode != 0:
raise subprocess.CalledProcessError(
result.returncode, cmd, result.stdout, result.stderr
)
result_png = temp_dir / "page.png"
if not result_png.exists():
raise FileNotFoundError("Typst 没有输出图片文件")
return result_png.read_bytes()
async def render(code: str) -> bytes:
task = asyncio.to_thread(lambda: render_sync(code))
return await task
cmd = on_command("typst")
@cmd.handle()
async def _(evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget):
typst_code = ""
if isinstance(evt, OB11MessageEvent):
if evt.reply is not None:
typst_code = evt.reply.message.extract_plain_text()
else:
for seg in evt.get_message():
if seg.type == 'reply':
msgid = seg.get('id')
if msgid is not None:
msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid)
typst_code = OB11Message(msg2data.get("message")).extract_plain_text()
typst_code += msg.extract_plain_text().removeprefix("typst").strip()
if len(typst_code) == 0:
return
try:
res = await render(typst_code)
except FileNotFoundError as e:
await target.send_message("渲染出错:内部错误")
raise e from e
except subprocess.CalledProcessError as e:
await target.send_message("渲染出错,以下是输出消息:\n\n" + e.stderr)
return
except TimeoutError:
await target.send_message("渲染出错:渲染超时")
return
except PermissionError as e:
await target.send_message("渲染出错:内部错误")
raise e from e
await target.send_message(UniMessage.image(raw=res), at=False)