from pathlib import Path import subprocess import tempfile from typing import Any from loguru import logger from nonebot import on_message from pydantic import BaseModel from nonebot.adapters import Event, Bot from nonebot_plugin_alconna import UniMessage, UniMsg from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent from konabot.common.artifact import ArtifactDepends, ensure_artifact, register_artifacts from konabot.common.data_man import DataManager from konabot.common.path import BINARY_PATH, DATA_PATH arti_ccleste_wrap_linux = ArtifactDepends( url="https://github.com/Passthem-desu/pt-ccleste-wrap/releases/download/v0.1.5/ccleste-wrap", sha256="ba4118c6465d1ca1547cdd1bd11c6b9e6a6a98ea8967b55485aeb6b77bb7e921", target=BINARY_PATH / "ccleste-wrap", required_os="Linux", required_arch="x86_64", ) arti_ccleste_wrap_windows = ArtifactDepends( url="https://github.com/Passthem-desu/pt-ccleste-wrap/releases/download/v0.1.5/ccleste-wrap.exe", sha256="7df382486a452485cdcf2115eabd7f772339ece470ab344074dc163fc7981feb", target=BINARY_PATH / "ccleste-wrap.exe", required_os="Windows", required_arch="AMD64", ) register_artifacts(arti_ccleste_wrap_linux) register_artifacts(arti_ccleste_wrap_windows) class CelesteStatus(BaseModel): records: dict[str, str] = {} celeste_status = DataManager(CelesteStatus, DATA_PATH / "celeste-status.json") # ↓ 这里的 Type Hinting 是为了能 fit 进去 set[str | tuple[str, ...]] aliases: set[Any] = {"celeste", "蔚蓝", "爬山", "鳌太线"} ALLOW_CHARS = "wasdxc0123456789 \t\n\r" async def get_prev(evt: Event, bot: Bot) -> str | None: prev = None if isinstance(evt, OB11MessageEvent): if evt.reply is not None: prev = f"QQ:{bot.self_id}:" + str(evt.reply.message_id) else: for seg in evt.get_message(): if seg.type == 'reply': msgid = seg.get('id') prev = f"QQ:{bot.self_id}:" + str(msgid) if prev is not None: async with celeste_status.get_data() as data: prev = data.records.get(prev) return prev async def match_celeste(evt: Event, bot: Bot, msg: UniMsg) -> bool: prev = await get_prev(evt, bot) text = msg.extract_plain_text().strip() if any(text.startswith(a) for a in aliases): return True if prev is not None: return True return False # cmd = on_command(cmd="celeste", aliases=aliases) cmd = on_message(rule=match_celeste) @cmd.handle() async def _(msg: UniMsg, evt: Event, bot: Bot): prev = await get_prev(evt, bot) actions = msg.extract_plain_text().strip() for alias in aliases: actions = actions.removeprefix(alias) actions = actions.strip() if len(actions) == 0: return if any((c not in ALLOW_CHARS) for c in actions): return await ensure_artifact(arti_ccleste_wrap_linux) await ensure_artifact(arti_ccleste_wrap_windows) bin: Path | None = None for arti in ( arti_ccleste_wrap_linux, arti_ccleste_wrap_windows, ): if not arti.is_corresponding_platform(): continue bin = arti.target if not bin.exists(): continue break if bin is None: logger.warning("Celeste 模块没有找到该系统需要的二进制文件") return if prev is not None: prev_append = ["-p", prev] else: prev_append = [] try: with tempfile.TemporaryDirectory() as _tempdir: tempdir = Path(_tempdir) gif_path = tempdir / "render.gif" cmd_celeste = [ bin, "-a", actions, "-o", gif_path, ] + prev_append logger.info(f"执行指令调用 celeste: CMD={cmd_celeste}") res = subprocess.run(cmd_celeste, timeout=5, capture_output=True) if res.returncode != 0: logger.warning(f"渲染 Celeste 时的输出不是 0 CODE={res.returncode} STDOUT={res.stdout} STDERR={res.stderr}") await UniMessage.text(f"渲染 Celeste 时出错啦!下面是输出:\n\n{res.stdout.decode()}{res.stderr.decode()}").send(evt, bot, at_sender=True) return if not gif_path.exists(): logger.warning("没有找到 Celeste 渲染的文件") await UniMessage.text("渲染 Celeste 时出错啦!").send(evt, bot, at_sender=True) return gif_data = gif_path.read_bytes() except TimeoutError: logger.warning("在渲染 Celeste 时超时了") await UniMessage("渲染 Celeste 时超时了!请检查你的操作清单,不能太长").send(evt, bot, at_sender=True) return receipt = await UniMessage.image(raw=gif_data).send(evt, bot) async with celeste_status.get_data() as data: if prev: actions = prev + "\n" + actions if isinstance(evt, OB11MessageEvent): for _msgid in receipt.msg_ids: msgid = _msgid["message_id"] data.records[f"QQ:{bot.self_id}:{msgid}"] = actions else: for msgid in receipt.msg_ids: data.records[f"DISCORD:{bot.self_id}:{msgid}"] = actions