import asyncio import os import subprocess from pathlib import Path from tempfile import TemporaryDirectory from typing import cast import zipfile from loguru import logger from nonebot import on_command from nonebot.adapters import Event, Bot from nonebot_plugin_alconna import UniMessage, UniMsg from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot from nonebot.adapters.onebot.v11.message import Message as OB11Message from konabot.common.artifact import ArtifactDepends, ensure_artifact, register_artifacts from konabot.common.longtask import DepLongTaskTarget from konabot.common.path import BINARY_PATH, TMP_PATH arti_typst_linux = ArtifactDepends( url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz", sha256="a6044cbad2a954deb921167e257e120ac0a16b20339ec01121194ff9d394996d", target=BINARY_PATH / "typst.tar.xz", required_os="Linux", required_arch="x86_64", ) arti_typst_windows = ArtifactDepends( url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-pc-windows-msvc.zip", sha256="51353994ac83218c3497052e89b2c432c53b9d4439cdc1b361e2ea4798ebfc13", target=BINARY_PATH / "typst.zip", required_os="Windows", required_arch="AMD64", ) bin_path: Path | None = None @arti_typst_linux.on_finished async def _(downloaded: bool): global bin_path tar_path = arti_typst_linux.target bin_path = BINARY_PATH / "typst" if downloaded or not bin_path.exists(): bin_path.unlink(missing_ok=True) process = await asyncio.create_subprocess_exec( "tar", "-xvf", tar_path, "--strip-components=1", "-C", BINARY_PATH, "typst-x86_64-unknown-linux-musl/typst", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0 or not bin_path.exists(): logger.warning( "似乎没有成功解压 Typst 二进制文件,检查一下吧! " f"stdout={stdout} stderr={stderr}" ) else: os.chmod(bin_path, 0o755) @arti_typst_windows.on_finished async def _(downloaded: bool): global bin_path zip_path = arti_typst_windows.target bin_path = BINARY_PATH / "typst.exe" if downloaded or not bin_path.exists(): bin_path.unlink(missing_ok=True) with zipfile.ZipFile(zip_path, "r") as zf: target_name = "typst-x86_64-pc-windows-msvc/typst.exe" if target_name not in zf.namelist(): logger.warning("在 Zip 压缩包里面没有找到目标文件") return zf.extract(target_name, BINARY_PATH) (BINARY_PATH / target_name).rename(bin_path) (BINARY_PATH / "typst-x86_64-pc-windows-msvc").rmdir() register_artifacts(arti_typst_linux) register_artifacts(arti_typst_windows) TEMPLATE_PATH = Path(__file__).parent / "template.typ" TEMPLATE = TEMPLATE_PATH.read_text() def render_sync(code: str) -> bytes | None: global bin_path if bin_path is None: return with TemporaryDirectory(dir=TMP_PATH) as tmpdirname: temp_dir = Path(tmpdirname).resolve() temp_typ = temp_dir / "page.typ" temp_typ.write_text(TEMPLATE + "\n\n" + code) cmd = [ bin_path, "compile", temp_typ.name, "--format", "png", "--root", temp_dir, "--ppi", "300", ] result = subprocess.run( cmd, capture_output=True, text=True, cwd=temp_dir.resolve(), timeout=50 ) logger.info( f"渲染了 Typst " f"STDOUT={result.stdout} " f"STDERR={result.stderr} " f"RETURNCODE={result.returncode}" ) if result.returncode != 0: raise subprocess.CalledProcessError( result.returncode, cmd, result.stdout, result.stderr ) result_png = temp_dir / "page.png" if not result_png.exists(): raise FileNotFoundError("Typst 没有输出图片文件") return result_png.read_bytes() async def render(code: str) -> bytes | None: task = asyncio.to_thread(lambda: render_sync(code)) return await task cmd = on_command("typst") @cmd.handle() async def _( evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget, ): global bin_path # 对于本地机器,一般不会在应用启动时自动下载,这里再保证存在 await ensure_artifact(arti_typst_linux) await ensure_artifact(arti_typst_windows) if bin_path is None or not bin_path.exists(): logger.warning("当前环境不存在 Typst,但仍然调用了") return typst_code = "" if isinstance(evt, OB11MessageEvent): if evt.reply is not None: typst_code = evt.reply.message.extract_plain_text() else: for seg in evt.get_message(): if seg.type == "reply": msgid = seg.get("id") if msgid is not None: msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid) typst_code = OB11Message( msg2data.get("message") ).extract_plain_text() typst_code += msg.extract_plain_text().removeprefix("typst").strip() if len(typst_code) == 0: return try: res = await render(typst_code) if res is None: raise FileNotFoundError("没有渲染出来内容") except FileNotFoundError as e: await target.send_message("渲染出错:内部错误") raise e from e except subprocess.CalledProcessError as e: await target.send_message("渲染出错,以下是输出消息:\n\n" + e.stderr) return except TimeoutError: await target.send_message("渲染出错:渲染超时") return except PermissionError as e: await target.send_message("渲染出错:内部错误") raise e from e await target.send_message(UniMessage.image(raw=res), at=False)