207 lines
6.4 KiB
Python
207 lines
6.4 KiB
Python
import asyncio
|
||
import os
|
||
import subprocess
|
||
|
||
from pathlib import Path
|
||
from tempfile import TemporaryDirectory
|
||
from typing import cast
|
||
import zipfile
|
||
|
||
from loguru import logger
|
||
from nonebot import on_command
|
||
from nonebot.adapters import Event, Bot
|
||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||
from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
|
||
from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot
|
||
from nonebot.adapters.onebot.v11.message import Message as OB11Message
|
||
|
||
from konabot.common.artifact import ArtifactDepends, ensure_artifact, register_artifacts
|
||
from konabot.common.longtask import DepLongTaskTarget
|
||
from konabot.common.path import BINARY_PATH, TMP_PATH
|
||
|
||
|
||
arti_typst_linux = ArtifactDepends(
|
||
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz",
|
||
sha256="a6044cbad2a954deb921167e257e120ac0a16b20339ec01121194ff9d394996d",
|
||
target=BINARY_PATH / "typst.tar.xz",
|
||
required_os="Linux",
|
||
required_arch="x86_64",
|
||
)
|
||
arti_typst_windows = ArtifactDepends(
|
||
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-pc-windows-msvc.zip",
|
||
sha256="51353994ac83218c3497052e89b2c432c53b9d4439cdc1b361e2ea4798ebfc13",
|
||
target=BINARY_PATH / "typst.zip",
|
||
required_os="Windows",
|
||
required_arch="AMD64",
|
||
)
|
||
|
||
|
||
bin_path: Path | None = None
|
||
|
||
|
||
@arti_typst_linux.on_finished
|
||
async def _(downloaded: bool):
|
||
logger.debug("安装好了 Linux 版本的 Typst")
|
||
global bin_path
|
||
|
||
tar_path = arti_typst_linux.target
|
||
bin_path = BINARY_PATH / "typst"
|
||
|
||
if downloaded or not bin_path.exists():
|
||
bin_path.unlink(missing_ok=True)
|
||
process = await asyncio.create_subprocess_exec(
|
||
"tar",
|
||
"-xvf",
|
||
tar_path,
|
||
"--strip-components=1",
|
||
"-C",
|
||
BINARY_PATH,
|
||
"typst-x86_64-unknown-linux-musl/typst",
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await process.communicate()
|
||
if process.returncode != 0 or not bin_path.exists():
|
||
logger.warning(
|
||
"似乎没有成功解压 Typst 二进制文件,检查一下吧! "
|
||
f"stdout={stdout} stderr={stderr}"
|
||
)
|
||
else:
|
||
os.chmod(bin_path, 0o755)
|
||
|
||
|
||
@arti_typst_windows.on_finished
|
||
async def _(downloaded: bool):
|
||
logger.debug("安装好了 Windows 版本的 Typst")
|
||
global bin_path
|
||
zip_path = arti_typst_windows.target
|
||
bin_path = BINARY_PATH / "typst.exe"
|
||
|
||
if downloaded or not bin_path.exists():
|
||
bin_path.unlink(missing_ok=True)
|
||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||
target_name = "typst-x86_64-pc-windows-msvc/typst.exe"
|
||
if target_name not in zf.namelist():
|
||
logger.warning("在 Zip 压缩包里面没有找到目标文件")
|
||
return
|
||
zf.extract(target_name, BINARY_PATH)
|
||
(BINARY_PATH / target_name).rename(bin_path)
|
||
(BINARY_PATH / "typst-x86_64-pc-windows-msvc").rmdir()
|
||
|
||
|
||
register_artifacts(arti_typst_linux)
|
||
register_artifacts(arti_typst_windows)
|
||
|
||
|
||
TEMPLATE_PATH = Path(__file__).parent / "template.typ"
|
||
TEMPLATE = TEMPLATE_PATH.read_text()
|
||
|
||
|
||
def render_sync(code: str) -> bytes | None:
|
||
global bin_path
|
||
|
||
if bin_path is None:
|
||
return
|
||
|
||
with TemporaryDirectory(dir=TMP_PATH) as tmpdirname:
|
||
temp_dir = Path(tmpdirname).resolve()
|
||
temp_typ = temp_dir / "page.typ"
|
||
temp_typ.write_text(TEMPLATE + "\n\n" + code)
|
||
|
||
cmd = [
|
||
bin_path,
|
||
"compile",
|
||
temp_typ.name,
|
||
"--format",
|
||
"png",
|
||
"--root",
|
||
temp_dir,
|
||
"--ppi",
|
||
"300",
|
||
]
|
||
|
||
result = subprocess.run(
|
||
cmd, capture_output=True, text=True, cwd=temp_dir.resolve(), timeout=50
|
||
)
|
||
logger.info(
|
||
f"渲染了 Typst "
|
||
f"STDOUT={result.stdout} "
|
||
f"STDERR={result.stderr} "
|
||
f"RETURNCODE={result.returncode}"
|
||
)
|
||
|
||
if result.returncode != 0:
|
||
raise subprocess.CalledProcessError(
|
||
result.returncode, cmd, result.stdout, result.stderr
|
||
)
|
||
|
||
result_png = temp_dir / "page.png"
|
||
if not result_png.exists():
|
||
raise FileNotFoundError("Typst 没有输出图片文件")
|
||
|
||
return result_png.read_bytes()
|
||
|
||
|
||
async def render(code: str) -> bytes | None:
|
||
task = asyncio.to_thread(lambda: render_sync(code))
|
||
return await task
|
||
|
||
|
||
cmd = on_command("typst")
|
||
|
||
|
||
@cmd.handle()
|
||
async def _(
|
||
evt: Event,
|
||
bot: Bot,
|
||
msg: UniMsg,
|
||
target: DepLongTaskTarget,
|
||
):
|
||
global bin_path
|
||
|
||
# 对于本地机器,一般不会在应用启动时自动下载,这里再保证存在
|
||
await ensure_artifact(arti_typst_linux)
|
||
await ensure_artifact(arti_typst_windows)
|
||
|
||
if bin_path is None or not bin_path.exists():
|
||
logger.warning("当前环境不存在 Typst,但仍然调用了")
|
||
return
|
||
|
||
typst_code = ""
|
||
if isinstance(evt, OB11MessageEvent):
|
||
if evt.reply is not None:
|
||
typst_code = evt.reply.message.extract_plain_text()
|
||
else:
|
||
for seg in evt.get_message():
|
||
if seg.type == "reply":
|
||
msgid = seg.get("id")
|
||
if msgid is not None:
|
||
msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid)
|
||
typst_code = OB11Message(
|
||
msg2data.get("message")
|
||
).extract_plain_text()
|
||
|
||
typst_code += msg.extract_plain_text().removeprefix("typst").strip()
|
||
|
||
if len(typst_code) == 0:
|
||
return
|
||
|
||
try:
|
||
res = await render(typst_code)
|
||
if res is None:
|
||
raise FileNotFoundError("没有渲染出来内容")
|
||
except FileNotFoundError as e:
|
||
await target.send_message("渲染出错:内部错误")
|
||
raise e from e
|
||
except subprocess.CalledProcessError as e:
|
||
await target.send_message("渲染出错,以下是输出消息:\n\n" + e.stderr)
|
||
return
|
||
except TimeoutError:
|
||
await target.send_message("渲染出错:渲染超时")
|
||
return
|
||
except PermissionError as e:
|
||
await target.send_message("渲染出错:内部错误")
|
||
raise e from e
|
||
|
||
await target.send_message(UniMessage.image(raw=res), at=False)
|