Files
konabot/konabot/plugins/celeste_classic/__init__.py

155 lines
5.2 KiB
Python

from pathlib import Path
import subprocess
import tempfile
from typing import Any
from loguru import logger
from nonebot import on_message
from pydantic import BaseModel
from nonebot.adapters import Event, Bot
from nonebot_plugin_alconna import UniMessage, UniMsg
from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
from konabot.common.artifact import ArtifactDepends, ensure_artifact, register_artifacts
from konabot.common.data_man import DataManager
from konabot.common.path import BINARY_PATH, DATA_PATH
arti_ccleste_wrap_linux = ArtifactDepends(
url="https://github.com/Passthem-desu/pt-ccleste-wrap/releases/download/v0.1.5/ccleste-wrap",
sha256="ba4118c6465d1ca1547cdd1bd11c6b9e6a6a98ea8967b55485aeb6b77bb7e921",
target=BINARY_PATH / "ccleste-wrap",
required_os="Linux",
required_arch="x86_64",
)
arti_ccleste_wrap_windows = ArtifactDepends(
url="https://github.com/Passthem-desu/pt-ccleste-wrap/releases/download/v0.1.5/ccleste-wrap.exe",
sha256="7df382486a452485cdcf2115eabd7f772339ece470ab344074dc163fc7981feb",
target=BINARY_PATH / "ccleste-wrap.exe",
required_os="Windows",
required_arch="AMD64",
)
register_artifacts(arti_ccleste_wrap_linux)
register_artifacts(arti_ccleste_wrap_windows)
class CelesteStatus(BaseModel):
records: dict[str, str] = {}
celeste_status = DataManager(CelesteStatus, DATA_PATH / "celeste-status.json")
# ↓ 这里的 Type Hinting 是为了能 fit 进去 set[str | tuple[str, ...]]
aliases: set[Any] = {"celeste", "蔚蓝", "爬山", "鳌太线"}
ALLOW_CHARS = "wasdxc0123456789 \t\n\r"
async def get_prev(evt: Event, bot: Bot) -> str | None:
prev = None
if isinstance(evt, OB11MessageEvent):
if evt.reply is not None:
prev = f"QQ:{bot.self_id}:" + str(evt.reply.message_id)
else:
for seg in evt.get_message():
if seg.type == 'reply':
msgid = seg.get('id')
prev = f"QQ:{bot.self_id}:" + str(msgid)
if prev is not None:
async with celeste_status.get_data() as data:
prev = data.records.get(prev)
return prev
async def match_celeste(evt: Event, bot: Bot, msg: UniMsg) -> bool:
prev = await get_prev(evt, bot)
text = msg.extract_plain_text().strip()
if any(text.startswith(a) for a in aliases):
return True
if prev is not None:
return True
return False
# cmd = on_command(cmd="celeste", aliases=aliases)
cmd = on_message(rule=match_celeste)
@cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
prev = await get_prev(evt, bot)
actions = msg.extract_plain_text().strip()
for alias in aliases:
actions = actions.removeprefix(alias)
actions = actions.strip()
if len(actions) == 0:
return
if any((c not in ALLOW_CHARS) for c in actions):
return
await ensure_artifact(arti_ccleste_wrap_linux)
await ensure_artifact(arti_ccleste_wrap_windows)
bin: Path | None = None
for arti in (
arti_ccleste_wrap_linux,
arti_ccleste_wrap_windows,
):
if not arti.is_corresponding_platform():
continue
bin = arti.target
if not bin.exists():
continue
break
if bin is None:
logger.warning("Celeste 模块没有找到该系统需要的二进制文件")
return
if prev is not None:
prev_append = ["-p", prev]
else:
prev_append = []
try:
with tempfile.TemporaryDirectory() as _tempdir:
tempdir = Path(_tempdir)
gif_path = tempdir / "render.gif"
cmd_celeste = [
bin,
"-a",
actions,
"-o",
gif_path,
] + prev_append
logger.info(f"执行指令调用 celeste: CMD={cmd_celeste}")
res = subprocess.run(cmd_celeste, timeout=5, capture_output=True)
if res.returncode != 0:
logger.warning(f"渲染 Celeste 时的输出不是 0 CODE={res.returncode} STDOUT={res.stdout} STDERR={res.stderr}")
await UniMessage.text(f"渲染 Celeste 时出错啦!下面是输出:\n\n{res.stdout.decode()}{res.stderr.decode()}").send(evt, bot, at_sender=True)
return
if not gif_path.exists():
logger.warning("没有找到 Celeste 渲染的文件")
await UniMessage.text("渲染 Celeste 时出错啦!").send(evt, bot, at_sender=True)
return
gif_data = gif_path.read_bytes()
except TimeoutError:
logger.warning("在渲染 Celeste 时超时了")
await UniMessage("渲染 Celeste 时超时了!请检查你的操作清单,不能太长").send(evt, bot, at_sender=True)
return
receipt = await UniMessage.image(raw=gif_data).send(evt, bot)
async with celeste_status.get_data() as data:
if prev:
actions = prev + "\n" + actions
if isinstance(evt, OB11MessageEvent):
for _msgid in receipt.msg_ids:
msgid = _msgid["message_id"]
data.records[f"QQ:{bot.self_id}:{msgid}"] = actions
else:
for msgid in receipt.msg_ids:
data.records[f"DISCORD:{bot.self_id}:{msgid}"] = actions