更新 Celeste Classic 功能
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-02-02 19:06:24 +08:00
parent b32ddcaf38
commit 58ff8f02da
3 changed files with 246 additions and 0 deletions

112
konabot/common/artifact.py Normal file
View File

@ -0,0 +1,112 @@
import asyncio
import aiohttp
import hashlib
import platform
from dataclasses import dataclass
from pathlib import Path
import nonebot
from loguru import logger
from nonebot.adapters.discord.config import Config as DiscordConfig
from pydantic import BaseModel
@dataclass
class ArtifactDepends:
url: str
sha256: str
target: Path
required_os: str | None = None
"示例值Windows, Linux, Darwin"
required_arch: str | None = None
"示例值AMD64, x86_64, arm64"
use_proxy: bool = True
"网络问题,赫赫;使用的是 Discord 模块配置的 proxy"
def is_corresponding_platform(self) -> bool:
if self.required_os is not None:
if self.required_os.lower() != platform.system().lower():
return False
if self.required_arch is not None:
if self.required_arch.lower() != platform.machine().lower():
return False
return True
class Config(BaseModel):
prefetch_artifact: bool = False
"是否提前下载好二进制依赖"
artifact_list = []
driver = nonebot.get_driver()
config = nonebot.get_plugin_config(Config)
@driver.on_startup
async def _():
if config.prefetch_artifact:
logger.info("启动检测中:正在检测需求的二进制是否下载")
semaphore = asyncio.Semaphore(10)
async def _task(artifact: ArtifactDepends):
async with semaphore:
await ensure_artifact(artifact)
tasks: set[asyncio.Task] = set()
for a in artifact_list:
tasks.add(asyncio.Task(_task(a)))
await asyncio.gather(*tasks, return_exceptions=False)
logger.info("检测好了")
async def download_artifact(artifact: ArtifactDepends):
proxy = None
if artifact.use_proxy:
discord_config = nonebot.get_plugin_config(DiscordConfig)
proxy = discord_config.discord_proxy
if proxy is not None:
logger.info(f"正在使用 Proxy 下载 TARGET={artifact.target} PROXY={proxy}")
else:
logger.info(f"正在下载 TARGET={artifact.target}")
async with aiohttp.ClientSession(proxy=proxy) as client:
result = await client.get(artifact.url)
if result.status != 200:
logger.warning(f"已经下载了二进制,但是注意服务器没有返回 200 URL={artifact.url} TARGET={artifact.target} CODE={result.status}")
data = await result.read()
artifact.target.write_bytes(data)
if not platform.system().lower() == 'windows':
artifact.target.chmod(0o755)
logger.info(f"下载好了 TARGET={artifact.target} URL={artifact.url}")
m = hashlib.sha256(artifact.target.read_bytes())
if m.hexdigest().lower() != artifact.sha256.lower():
logger.warning(f"下载到的二进制的 sha256 与需求不同 TARGET={artifact.target} REQUESTED={artifact.sha256} ACTUAL={m.hexdigest()}")
async def ensure_artifact(artifact: ArtifactDepends):
if not artifact.is_corresponding_platform():
return
if not artifact.target.exists():
logger.info(f"二进制依赖 {artifact.target} 不存在")
if not artifact.target.parent.exists():
artifact.target.parent.mkdir(parents=True, exist_ok=True)
await download_artifact(artifact)
else:
m = hashlib.sha256(artifact.target.read_bytes())
if m.hexdigest().lower() != artifact.sha256.lower():
logger.info(f"二进制依赖 {artifact.target} 的哈希无法对应需求的哈希,准备重新下载")
artifact.target.unlink()
await download_artifact(artifact)
def register_artifacts(*artifacts: ArtifactDepends):
artifact_list.extend(artifacts)

View File

@ -8,6 +8,7 @@ DATA_PATH = SRC_PATH.parent / "data"
TMP_PATH = DATA_PATH / "tmp"
LOG_PATH = DATA_PATH / "logs"
CONFIG_PATH = DATA_PATH / "config"
BINARY_PATH = DATA_PATH / "bin"
DOCS_PATH = SRC_PATH / "docs"
DOCS_PATH_MAN1 = DOCS_PATH / "user"
@ -23,4 +24,5 @@ if not LOG_PATH.exists():
CONFIG_PATH.mkdir(exist_ok=True)
TMP_PATH.mkdir(exist_ok=True)
BINARY_PATH.mkdir(exist_ok=True)

View File

@ -0,0 +1,132 @@
from pathlib import Path
import subprocess
import tempfile
from loguru import logger
from nonebot import on_command
from pydantic import BaseModel
from nonebot.adapters import Event, Bot
from nonebot_plugin_alconna import UniMessage, UniMsg
from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
from konabot.common.artifact import ArtifactDepends, ensure_artifact, register_artifacts
from konabot.common.data_man import DataManager
from konabot.common.path import BINARY_PATH, DATA_PATH
arti_ccleste_wrap_linux = ArtifactDepends(
url="https://github.com/Passthem-desu/pt-ccleste-wrap/releases/download/v0.1.5/ccleste-wrap",
sha256="ba4118c6465d1ca1547cdd1bd11c6b9e6a6a98ea8967b55485aeb6b77bb7e921",
target=BINARY_PATH / "ccleste-wrap",
required_os="Linux",
required_arch="x86_64",
)
arti_ccleste_wrap_windows = ArtifactDepends(
url="https://github.com/Passthem-desu/pt-ccleste-wrap/releases/download/v0.1.5/ccleste-wrap.exe",
sha256="7df382486a452485cdcf2115eabd7f772339ece470ab344074dc163fc7981feb",
target=BINARY_PATH / "ccleste-wrap.exe",
required_os="Windows",
required_arch="AMD64",
)
register_artifacts(arti_ccleste_wrap_linux)
register_artifacts(arti_ccleste_wrap_windows)
class CelesteStatus(BaseModel):
records: dict[str, str] = {}
celeste_status = DataManager(CelesteStatus, DATA_PATH / "celeste-status.json")
cmd = on_command(cmd="celeste", aliases={"蔚蓝", "爬山", "鳌太线"})
@cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
prev = None
if isinstance(evt, OB11MessageEvent):
if evt.reply is not None:
prev = f"QQ:{bot.self_id}:" + str(evt.reply.message_id)
else:
for seg in evt.get_message():
if seg.type == 'reply':
msgid = seg.get('id')
prev = f"QQ:{bot.self_id}:" + str(msgid)
actions = msg.extract_plain_text().strip().removeprefix("celeste")
for alias in {"蔚蓝", "爬山", "鳌太线"}:
actions = actions.removeprefix(alias)
actions = actions.strip()
if len(actions) == 0:
return
if prev is not None:
async with celeste_status.get_data() as data:
prev = data.records.get(prev)
await ensure_artifact(arti_ccleste_wrap_linux)
await ensure_artifact(arti_ccleste_wrap_windows)
bin: Path | None = None
for arti in (
arti_ccleste_wrap_linux,
arti_ccleste_wrap_windows,
):
if not arti.is_corresponding_platform():
continue
bin = arti.target
if not bin.exists():
continue
break
if bin is None:
logger.warning("Celeste 模块没有找到该系统需要的二进制文件")
return
if prev is not None:
prev_append = ["-p", prev]
else:
prev_append = []
try:
with tempfile.TemporaryDirectory() as _tempdir:
tempdir = Path(_tempdir)
gif_path = tempdir / "render.gif"
cmd_celeste = [
bin,
"-a",
actions,
"-o",
gif_path,
] + prev_append
logger.info(f"执行指令调用 celeste: CMD={cmd_celeste}")
res = subprocess.run(cmd_celeste, timeout=5, capture_output=True)
if res.returncode != 0:
logger.warning(f"渲染 Celeste 时的输出不是 0 CODE={res.returncode} STDOUT={res.stdout} STDERR={res.stderr}")
await UniMessage.text(f"渲染 Celeste 时出错啦!下面是输出:\n\n{res.stdout.decode()}{res.stderr.decode()}").send(evt, bot, at_sender=True)
return
if not gif_path.exists():
logger.warning("没有找到 Celeste 渲染的文件")
await UniMessage.text("渲染 Celeste 时出错啦!").send(evt, bot, at_sender=True)
return
gif_data = gif_path.read_bytes()
except TimeoutError:
logger.warning("在渲染 Celeste 时超时了")
await UniMessage("渲染 Celeste 时超时了!请检查你的操作清单,不能太长").send(evt, bot, at_sender=True)
return
receipt = await UniMessage.image(raw=gif_data).send(evt, bot)
async with celeste_status.get_data() as data:
if prev:
actions = prev + "\n" + actions
if isinstance(evt, OB11MessageEvent):
for _msgid in receipt.msg_ids:
msgid = _msgid["message_id"]
data.records[f"QQ:{bot.self_id}:{msgid}"] = actions
else:
for msgid in receipt.msg_ids:
data.records[f"DISCORD:{bot.self_id}:{msgid}"] = actions