diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..642ff51 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.REPL.enableREPLSmartSend": false +} \ No newline at end of file diff --git a/konabot/common/nb/qq_broadcast.py b/konabot/common/nb/qq_broadcast.py index 8adab2f..3e20ac1 100644 --- a/konabot/common/nb/qq_broadcast.py +++ b/konabot/common/nb/qq_broadcast.py @@ -1,10 +1,13 @@ from typing import Any, cast + import nonebot -from nonebot_plugin_alconna import UniMessage from nonebot.adapters.onebot.v11 import Bot as OBBot +from nonebot_plugin_alconna import UniMessage -async def qq_broadcast(groups: list[str], msg: UniMessage[Any]): +async def qq_broadcast(groups: list[str], msg: UniMessage[Any] | str): + if isinstance(msg, str): + msg = UniMessage.text(msg) bots: dict[str, OBBot] = {} # group_id -> bot_id diff --git a/konabot/docs/concepts/中间答案.txt b/konabot/docs/concepts/中间答案.txt new file mode 100644 index 0000000..23ddaae --- /dev/null +++ b/konabot/docs/concepts/中间答案.txt @@ -0,0 +1,9 @@ +关于「中间答案」或者「提示」: + +在 KonaPH 中,当有人发送「提交答案 答案」时,会检查答案是否符合你设置的中间答案的 pattern。这个 pattern 可以有两种方式: + +- 纯文本的完整匹配:你设置的 pattern 如果和提交的答案完全相等,则会触发提示。 +- regex 匹配:你设置的 pattern 如果以斜杠(/)开头和结尾,就会检查提交的答案是否匹配正则表达式。注意 ^ 和 $ 符号的使用。 + - 例如:/^commit$/ 会匹配 commit,不会匹配 acommit、Commit 等。 + - 而如果是 /commit/,则会匹配 commit、acommit,而不会匹配 Commit。 + - 无法使用 Javascript 的字符串声明模式,例如,/case_insensitive/i 就不会被视作一个正则表达式。 diff --git a/konabot/docs/concepts/占位.md b/konabot/docs/concepts/占位.md deleted file mode 100644 index e69de29..0000000 diff --git a/konabot/docs/lib/占位.md b/konabot/docs/lib/占位.md deleted file mode 100644 index e69de29..0000000 diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py index 99a1bfc..2c88186 100644 --- a/konabot/plugins/kona_ph/__init__.py +++ b/konabot/plugins/kona_ph/__init__.py @@ -1,17 +1,23 @@ -from functools import reduce -from math import ceil +import datetime import re +from math import ceil + from loguru import logger from nonebot import on_message -from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna -from konabot.common.nb.qq_broadcast import qq_broadcast -from konabot.common.username import get_username -from konabot.plugins.kona_ph.core.storage import get_today_date -from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager -from konabot.common.longtask import DepLongTaskTarget - +from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg, + on_alconna) from nonebot_plugin_apscheduler import scheduler +from konabot.common.longtask import DepLongTaskTarget +from konabot.common.nb.qq_broadcast import qq_broadcast +from konabot.plugins.kona_ph.core.message import (get_daily_report, + get_daily_report_v2, + get_puzzle_description, + get_submission_message) +from konabot.plugins.kona_ph.core.storage import get_today_date +from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config, + create_admin_commands, + puzzle_manager) create_admin_commands() @@ -24,18 +30,6 @@ async def is_play_group(target: DepLongTaskTarget): return False -# cmd_submit = on_alconna(Alconna( -# "re:提交(?:答案|题解|[fF]lag)", -# Args["flag", str], -# ), rule=is_play_group) -# -# @cmd_submit.handle() -# async def _(flag: str, target: DepLongTaskTarget): -# async with puzzle_manager() as manager: -# result = manager.submit(target.target_id, flag) -# await target.send_message(result.get_unimessage()) - - cmd_submit = on_message(rule=is_play_group) @@ -46,7 +40,14 @@ async def _(msg: UniMsg, target: DepLongTaskTarget): submission: str = match.group("submission") async with puzzle_manager() as manager: result = manager.submit(target.target_id, submission) - await target.send_message(result.get_unimessage()) + if isinstance(result, str): + await target.send_message(result) + else: + await target.send_message(get_submission_message( + daily_puzzle_info=result.info, + submission=result.submission, + puzzle=result.puzzle, + )) cmd_query = on_alconna(Alconna( @@ -59,7 +60,7 @@ async def _(target: DepLongTaskTarget): p = manager.get_today_puzzle() if p is None: return await target.send_message("今天无题,改日再来吧!") - await target.send_message(p.get_unimessage()) + await target.send_message(get_puzzle_description(p)) cmd_query_submission = on_alconna(Alconna( @@ -68,44 +69,11 @@ cmd_query_submission = on_alconna(Alconna( @cmd_query_submission.handle() async def _(target: DepLongTaskTarget): + gid = None + if re.match(r"^\d+$", target.channel_id): + gid = int(target.channel_id) async with puzzle_manager() as manager: - p = manager.get_today_puzzle() - if p is None: - return await target.send_message("今天无题") - msg = UniMessage.text("==== 今日答题情况 ====\n\n") - - subcount = len(reduce( - lambda x, y: x + y, - manager.submissions.get(p.raw_id, {}).values(), - [], - )) - info = manager.daily_puzzle[p.index_id] - - msg = msg.text( - f"总体情况:答对 {len(info.success_users)} / " - f"参与 {len(info.tried_users)} / " - f"提交 {subcount}\n" - ) - - success_users = sorted(list(info.success_users.items()), key=lambda v: v[1]) - gid = None - if re.match(r"^\d+$", target.channel_id): - gid = int(target.channel_id) - for u, d in success_users: - uname = u - if re.match(r"^\d+$", u): - uname = get_username(int(u), gid) - t = d.strftime("%H:%M") - tries = len(manager.submissions[p.raw_id][u]) - msg = msg.text(f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]") - for u in info.tried_users - set(info.success_users.keys()): - uname = u - if re.match(r"^\d+$", u): - uname = get_username(int(u), gid) - tries = len(manager.submissions[p.raw_id][u]) - msg = msg.text(f"\n- {uname} [💦 {tries} 提交]") - - await target.send_message(msg) + await target.send_message(get_daily_report_v2(manager, gid)) cmd_history = on_alconna(Alconna( @@ -120,15 +88,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): today = get_today_date() if index_id: index_id = index_id.removeprefix("#") - if index_id == manager.daily_puzzle_of_date.get(today, ""): - puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] - return await target.send_message(puzzle.get_unimessage()) - if index_id in manager.daily_puzzle: - puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] - msg = puzzle.get_unimessage() - msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}") - return await target.send_message(msg) - return await target.send_message("没有这道题哦") + if index_id not in manager.daily_puzzle: + return await target.send_message("没有这道题哦") + puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] + msg = get_puzzle_description( + puzzle, + with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")), + ) + return await target.send_message(msg) msg = UniMessage.text("====== 历史题目清单 ======\n\n") puzzles = [ (manager.puzzle_data[manager.daily_puzzle[i].raw_id], d) @@ -155,15 +122,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): @scheduler.scheduled_job("cron", hour="8") async def _(): async with puzzle_manager() as manager: - msg2 = manager.get_report_yesterday() + yesterday = get_today_date() - datetime.timedelta(days=1) + msg2 = get_daily_report(manager, yesterday) if msg2 is not None: await qq_broadcast(config.plugin_puzzle_playgroup, msg2) puzzle = manager.get_today_puzzle() if puzzle is not None: logger.info(f"找到了题目 {puzzle.raw_id},发送") - await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage()) + await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle)) else: logger.info("自动任务:没有找到题目,跳过") - - diff --git a/konabot/plugins/kona_ph/core/image.py b/konabot/plugins/kona_ph/core/image.py new file mode 100644 index 0000000..590238e --- /dev/null +++ b/konabot/plugins/kona_ph/core/image.py @@ -0,0 +1,29 @@ +import nanoid + +from konabot.common.path import ASSETS_PATH +from konabot.plugins.kona_ph.core.path import KONAPH_IMAGE_BASE + + +class PuzzleImageManager: + def read_puzzle_image(self, img_name: str) -> bytes: + fp = KONAPH_IMAGE_BASE / img_name + if fp.exists(): + return fp.read_bytes() + return (ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes() + + def upload_puzzle_image(self, data: bytes, suffix: str = ".png") -> str: + id = nanoid.generate( + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", + 21, + ) + img_name = f"{id}{suffix}" + (KONAPH_IMAGE_BASE / img_name).write_bytes(data) + return img_name + + def remove_puzzle_image(self, img_name: str): + if img_name: + (KONAPH_IMAGE_BASE / img_name).unlink(True) + + +def get_image_manager() -> PuzzleImageManager: + return PuzzleImageManager() diff --git a/konabot/plugins/kona_ph/core/message.py b/konabot/plugins/kona_ph/core/message.py new file mode 100644 index 0000000..72aa92a --- /dev/null +++ b/konabot/plugins/kona_ph/core/message.py @@ -0,0 +1,164 @@ +""" +生成各种各样的 Message 的函数集合 +""" + + +import datetime +import functools +import re +from typing import Any + +from nonebot_plugin_alconna import UniMessage + +from konabot.common.username import get_username +from konabot.plugins.kona_ph.core.image import get_image_manager +from konabot.plugins.kona_ph.core.storage import (DailyPuzzleInfo, Puzzle, + PuzzleManager, + PuzzleSubmission) + + +def get_puzzle_description(puzzle: Puzzle, with_answer: bool = False) -> UniMessage[Any]: + """ + 获取一个谜题的描述 + """ + + img_manager = get_image_manager() + + result = UniMessage.text(f"[KonaPH#{puzzle.index_id}] {puzzle.title}") + result = result.text(f"\n\n{puzzle.content}") + + if puzzle.img_name: + result = result.text("\n\n").image( + raw=img_manager.read_puzzle_image(puzzle.img_name) + ) + + result = result.text(f"\n\n出题者:{get_username(puzzle.author_id)}") + + if with_answer: + result = result.text(f"\n\n题目答案:{puzzle.flag}") + else: + result = result.text("\n\n输入「提交答案 答案」来提交你的解答") + + return result + + +def get_submission_message( + puzzle: Puzzle, + submission: PuzzleSubmission, + daily_puzzle_info: DailyPuzzleInfo | None = None, +) -> str: + """ + 获得提交答案的反馈信息 + """ + + if submission.success: + rank = -1 + if daily_puzzle_info is not None: + rank = len(daily_puzzle_info.success_users) + return f"🎉 恭喜你答对了!你是今天第 {rank} 个解出来的!" + if submission.hint_id >= 0 and ( + hint := puzzle.hints.get(submission.hint_id) + ) is not None: + if hint.is_checkpoint: + hint_msg = "✨ 恭喜!这是本题的中间答案,加油!" + else: + hint_msg = "🤔 答错啦!请检查你的答案。" + return f"{hint_msg}\n\n提示:{hint.hint}" + return "❌ 答错啦!请检查你的答案。" + + +def get_daily_report( + manager: PuzzleManager, + date: datetime.date, +) -> str | None: + """ + 获得某日的提交的报告信息 + """ + + index_id = manager.daily_puzzle_of_date.get(date) + if index_id is None: + return None + info = manager.daily_puzzle[index_id] + puzzle = manager.puzzle_data[info.raw_id] + + msg = f"[KonaPH#{puzzle.index_id}] 「{puzzle.title}」解答报告\n\n" + if len(info.success_users) == 0: + msg += "昨日,无人解出此题 😭😭\n\n" + else: + msg += f"昨日,共有 {len(info.success_users)} 人解出此题。\n\n" + msg += "前五名的解答者:\n\n" + us = [(u, d) for u, d in info.success_users.items()] + us = sorted(us, key=lambda t: t[1]) + us = us[:5] + for u, _ in us: + m = manager.submissions[puzzle.raw_id][u][-1] + msg += f"- {get_username(u)} 于 {m.time.strftime('%H:%M')}\n" + msg += "\n" + msg += f"出题人:{get_username(puzzle.author_id)}" + return msg + + +def get_daily_report_v2(manager: PuzzleManager, gid: int | None = None): + p = manager.get_today_puzzle() + if p is None: + return "今天无题" + msg = "==== 今日答题情况 ====\n\n" + + subcount = len(functools.reduce( + lambda x, y: x + y, + manager.submissions.get(p.raw_id, {}).values(), + [], + )) + info = manager.daily_puzzle[p.index_id] + + msg += ( + f"总体情况:答对 {len(info.success_users)} / " + f"参与 {len(info.tried_users)} / " + f"提交 {subcount}\n" + ) + + success_users = sorted(list(info.success_users.items()), key=lambda v: v[1]) + for u, d in success_users: + uname = u + if re.match(r"^\d+$", u): + uname = get_username(int(u), gid) + t = d.strftime("%H:%M") + tries = len(manager.submissions[p.raw_id][u]) + msg += f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]" + for u in info.tried_users - set(info.success_users.keys()): + uname = u + if re.match(r"^\d+$", u): + uname = get_username(int(u), gid) + tries = len(manager.submissions[p.raw_id][u]) + msg += f"\n- {uname} [💦 {tries} 提交]" + + return msg + + +def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]: + image_manager = get_image_manager() + + status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \ + (f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备") + + status_suffix = "" + if puzzle.raw_id == manager.puzzle_pinned: + status_suffix += " | 📌 已被管理员置顶" + + msg = UniMessage.text( + f"--- 谜题信息 ---\n" + f"Raw ID: {puzzle.raw_id}\n" + f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n" + f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"状态: {status}{status_suffix}\n\n" + f"标题: {puzzle.title}\n" + f"Flag: {puzzle.flag}\n\n" + f"{puzzle.content}" + ) + + if puzzle.img_name: + msg = msg.image(raw=image_manager.read_puzzle_image(puzzle.img_name)) + + msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑") + + return msg diff --git a/konabot/plugins/kona_ph/core/path.py b/konabot/plugins/kona_ph/core/path.py new file mode 100644 index 0000000..d760bbf --- /dev/null +++ b/konabot/plugins/kona_ph/core/path.py @@ -0,0 +1,9 @@ +from konabot.common.path import DATA_PATH + +KONAPH_BASE = DATA_PATH / "KonaPH" +KONAPH_DATA_JSON = KONAPH_BASE / "data.json" +KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs" + +# 保证所有文件夹存在 +KONAPH_BASE.mkdir(exist_ok=True) +KONAPH_IMAGE_BASE.mkdir(exist_ok=True) diff --git a/konabot/plugins/kona_ph/core/storage.py b/konabot/plugins/kona_ph/core/storage.py index 1b7c38c..cca4ce6 100644 --- a/konabot/plugins/kona_ph/core/storage.py +++ b/konabot/plugins/kona_ph/core/storage.py @@ -1,27 +1,26 @@ import asyncio import datetime import random - +import re from contextlib import asynccontextmanager -from pathlib import Path -from typing import Any import nanoid - -from nonebot_plugin_alconna import UniMessage from pydantic import BaseModel, Field, ValidationError -from konabot.common.path import DATA_PATH -from konabot.common.username import get_username +from konabot.plugins.kona_ph.core.path import KONAPH_DATA_JSON -KONAPH_BASE = DATA_PATH / "KonaPH" -KONAPH_DATA_JSON = KONAPH_BASE / "data.json" -KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs" +class PuzzleHint(BaseModel): + pattern: str + hint: str + is_checkpoint: bool -# 保证所有文件夹存在 -KONAPH_BASE.mkdir(exist_ok=True) -KONAPH_IMAGE_BASE.mkdir(exist_ok=True) + +class PuzzleSubmission(BaseModel): + success: bool + flag: str + time: datetime.datetime + hint_id: int = -1 class Puzzle(BaseModel): @@ -40,41 +39,38 @@ class Puzzle(BaseModel): ready: bool = False created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) - def get_image_path(self) -> Path: - return KONAPH_IMAGE_BASE / self.img_name + hints: dict[int, PuzzleHint] = Field(default_factory=dict) - def get_unimessage(self) -> UniMessage[Any]: - result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}") - result = result.text(f"\n\n{self.content}") + @property + def hint_id_max(self) -> int: + return max((0, *self.hints.keys())) - if self.img_name: - result = result.text("\n\n").image(raw=self.get_image_path().read_bytes()) - - result = result.text(f"\n\n出题者:{get_username(self.author_id)}") - result = result.text("\n\n输入「提交答案 答案」来提交你的解答") - - return result - - def add_image(self, img: bytes, suffix: str = ".png"): - if self.img_name: - self.get_image_path().unlink(True) - img_id = nanoid.generate( - "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", - 21, + def check_submission( + self, + submission: str, + time: datetime.datetime | None = None, + ) -> PuzzleSubmission: + if time is None: + time = datetime.datetime.now() + if submission == self.flag: + return PuzzleSubmission( + success=True, + flag=submission, + time=time, + ) + for hint_id, hint in self.hints.items(): + if re.match(hint.pattern, submission): + return PuzzleSubmission( + success=False, + flag=submission, + time=time, + hint_id=hint_id, + ) + return PuzzleSubmission( + success=False, + flag=submission, + time=time, ) - self.img_name = f"{img_id}{suffix}" - self.get_image_path().write_bytes(img) - - def remove_image(self): - if self.img_name: - self.get_image_path().unlink(True) - self.img_name = "" - - -class PuzzleSubmission(BaseModel): - success: bool - flag: str - time: datetime.datetime class DailyPuzzleInfo(BaseModel): @@ -84,15 +80,10 @@ class DailyPuzzleInfo(BaseModel): success_users: dict[str, datetime.datetime] = {} -class PuzzleSubmissionResultMessage(BaseModel): - success: bool - rank: int = -1 - message: str = "" - - def get_unimessage(self) -> UniMessage[Any]: - if self.success: - return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!") - return UniMessage.text(self.message) +class PuzzleSubmissionFeedback(BaseModel): + submission: PuzzleSubmission + puzzle: Puzzle + info: DailyPuzzleInfo | None = None def get_today_date() -> datetime.date: @@ -162,26 +153,6 @@ class PuzzleManager(BaseModel): self.index_id_counter += 1 - def fix(self): - # 尝试修复今日的数据 - for p in self.puzzle_data.values(): - if self.is_puzzle_published(p.raw_id): - p.ready = True - - if self.puzzle_pinned not in self.unpublished_puzzles: - self.puzzle_pinned = "" - - # 撤回重复发布的题 - already_published: set[str] = set() - for date in list(self.daily_puzzle_of_date.keys()): - index_id = self.daily_puzzle_of_date[date] - info = self.daily_puzzle[index_id] - if info.raw_id in already_published: - del self.daily_puzzle[index_id] - del self.daily_puzzle_of_date[date] - else: - already_published.add(info.raw_id) - def admin_pin_puzzle(self, raw_id: str): if raw_id in self.puzzle_data: self.puzzle_pinned = raw_id @@ -213,41 +184,23 @@ class PuzzleManager(BaseModel): return return self.daily_puzzle[p.index_id] - def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage: + def submit(self, user: str, flag: str) -> PuzzleSubmissionFeedback | str: p = self.get_today_puzzle() d = self.get_today_info() now = datetime.datetime.now() if p is None or d is None: - return PuzzleSubmissionResultMessage( - success=False, - message="今天没有题哦,改天再来吧!", - ) + return "今天没有题哦,改天再来吧!" if user in d.success_users: - return PuzzleSubmissionResultMessage( - success=False, - message="你今天已经答对过啦!不用重复提交哦!", - ) - if flag != p.flag: - d.tried_users.add(user) - self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( - success=False, - flag=flag, - time=now, - )) - return PuzzleSubmissionResultMessage( - success=False, - message="❌ 答错了,请检查你的答案哦", - ) + return "你今天已经答对过啦!不用重复提交哦!" d.tried_users.add(user) - d.success_users[user] = now - self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( - success=True, - flag=flag, - time=now, - )) - return PuzzleSubmissionResultMessage( - success=True, - rank=len(d.success_users), + result = p.check_submission(flag, now) + self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(result) + if result.success: + d.success_users[user] = now + return PuzzleSubmissionFeedback( + submission=result, + puzzle=p, + info=d, ) def admin_create_puzzle(self, user: str): @@ -273,33 +226,6 @@ class PuzzleManager(BaseModel): if p.author_id == user ], key=lambda p: p.created_at, reverse=True) - def get_report_yesterday(self): - yesterday = get_today_date() - datetime.timedelta(days=1) - index_id = self.daily_puzzle_of_date.get(yesterday) - if index_id is None: - return None - info = self.daily_puzzle[index_id] - puzzle = self.puzzle_data[info.raw_id] - message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告") - - if len(info.success_users) == 0: - message = message.text( - "\n\n昨日,竟无人解出此题!" - ) - else: - message = message.text( - f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:" - ) - us = [(u, d) for u, d in info.success_users.items()] - us = sorted(us, key=lambda t: t[1]) - us = us[:5] - for u, _ in us: - m = self.submissions[puzzle.raw_id][u][-1] - message = message.text(f"- {get_username(u)} 于 {m.time.strftime('%H:%M')}") - - message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}") - return message - lock = asyncio.Lock() diff --git a/konabot/plugins/kona_ph/manager.py b/konabot/plugins/kona_ph/manager.py index 60425ba..bf30bb1 100644 --- a/konabot/plugins/kona_ph/manager.py +++ b/konabot/plugins/kona_ph/manager.py @@ -1,15 +1,20 @@ import datetime from math import ceil -from typing import Any + from nonebot import get_plugin_config -from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna +from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query, + Subcommand, SubcommandResult, UniMessage, on_alconna) from pydantic import BaseModel from konabot.common.longtask import DepLongTaskTarget from konabot.common.nb.extract_image import download_image_bytes from konabot.common.nb.qq_broadcast import qq_broadcast from konabot.common.username import get_username -from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager +from konabot.plugins.kona_ph.core.image import get_image_manager +from konabot.plugins.kona_ph.core.message import (get_puzzle_description, + get_puzzle_info_message, + get_submission_message) +from konabot.plugins.kona_ph.core.storage import get_today_date, puzzle_manager PUZZLE_PAGE_SIZE = 10 @@ -31,33 +36,6 @@ def is_puzzle_admin(target: DepLongTaskTarget): return target.target_id in config.plugin_puzzle_admin -def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]: - status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \ - (f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备") - - status_suffix = "" - if puzzle.raw_id == manager.puzzle_pinned: - status_suffix += " | 📌 已被管理员置顶" - - msg = UniMessage.text( - f"--- 谜题信息 ---\n" - f"Raw ID: {puzzle.raw_id}\n" - f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n" - f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n" - f"状态: {status}{status_suffix}\n\n" - f"标题: {puzzle.title}\n" - f"Flag: {puzzle.flag}\n\n" - f"{puzzle.content}" - ) - - if puzzle.img_name: - msg = msg.image(raw=puzzle.get_image_path().read_bytes()) - - msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑") - - return msg - - def create_admin_commands(): cmd_admin = on_alconna( Alconna( @@ -82,7 +60,45 @@ def create_admin_commands(): ), Subcommand("publish", Args["raw_id?", str], dest="publish"), Subcommand("preview", Args["raw_id", str], dest="preview"), - Subcommand("get-submits", Args["raw_id", str], dest="get-submits") + Subcommand("get-submits", Args["raw_id", str], dest="get-submits"), + Subcommand( + "test", + Args["raw_id", str], + Args["submission", str], + dest="test", + ), + Subcommand( + "hint", + Subcommand( + "add", + Args["raw_id", str], + Args["pattern", str], + Args["hint", str], + dest="add", + ), + Subcommand( + "list", + Args["raw_id", str], + Args["page?", int], + dest="get", + ), + Subcommand( + "modify", + Args["raw_id", str], + Args["hint_id", int], + Option("--pattern", Args["pattern", str], alias=["-p"]), + Option("--hint", Args["hint", str], alias=["-h"]), + Option("--checkpoint", Args["is_checkpoint", bool], alias=["-c"]), + dest="modify", + ), + Subcommand( + "delete", + Args["raw_id", str], + Args["hint_id", int], + dest="delete", + ), + dest="hint", + ), ), rule=is_puzzle_manager, ) @@ -98,6 +114,8 @@ def create_admin_commands(): msg = msg.text("konaph modify - 查看如何修改谜题信息\n") msg = msg.text("konaph preview - 预览一个题目的效果,不会展示答案\n") msg = msg.text("konaph get-submits - 获得题目的提交记录\n") + msg = msg.text("konaph test - 尝试提交一个答案,看回答的效果\n") + msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n") if is_puzzle_admin(target): msg = msg.text("konaph all [--ready] - 查看所有谜题\n") @@ -293,6 +311,7 @@ def create_admin_commands(): " --image <图片> 图片\n" " --remove-image 删除图片" ) + image_manager = get_image_manager() async with puzzle_manager() as manager: if raw_id not in manager.puzzle_data: @@ -312,9 +331,10 @@ def create_admin_commands(): ) if image is not None and image.url is not None: b = await download_image_bytes(image.url) - p.add_image(b.unwrap()) + image_manager.remove_puzzle_image(p.img_name) + image_manager.upload_puzzle_image(b.unwrap()) elif remove_image.available: - p.remove_image() + image_manager.remove_puzzle_image(p.img_name) info2 = get_puzzle_info_message(manager, p) @@ -332,7 +352,7 @@ def create_admin_commands(): p = manager.get_today_puzzle(strong=True) if p is None: return await target.send_message("上架失败了orz,可能是没题了") - await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage()) + await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(p)) return await target.send_message("Ok!") @cmd_admin.assign("preview") @@ -343,7 +363,7 @@ def create_admin_commands(): return await target.send_message("没有这个谜题") if not is_puzzle_admin(target) and target.target_id != puzzle.author_id: return await target.send_message("你没有权限预览这个谜题") - return await target.send_message(puzzle.get_unimessage()) + return await target.send_message(get_puzzle_description(puzzle)) @cmd_admin.assign("get-submits") async def _(target: DepLongTaskTarget, raw_id: str): @@ -361,5 +381,43 @@ def create_admin_commands(): msg = msg.text(f"- {get_username(uid)}:{s}\n") return await target.send_message(msg) + @cmd_admin.assign("test") + async def _(target: DepLongTaskTarget, raw_id: str, submission: str): + """ + 测试一道谜题的回答,并给出结果 + """ + async with puzzle_manager() as manager: + puzzle = manager.puzzle_data.get(raw_id) + if puzzle is None: + return await target.send_message("没有这个谜题") + if not is_puzzle_admin(target) and target.target_id != puzzle.author_id: + return await target.send_message("你没有权限预览这个谜题") + + result = puzzle.check_submission(submission) + msg = get_submission_message(puzzle, result) + return await target.send_message( + UniMessage.text("[测试提交] ") + msg + ) + + @cmd_admin.assign("subcommands.hint") + async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")): + if len(subcommands.result.subcommands) > 0: + return + return await target.send_message( + UniMessage.text("==== 提示/中间答案编辑器 ====\n\n") + .text("- konaph hint list \n - 查看某道题的所有提示 / 中间答案\n") + .text("- konaph hint add \n - 添加一个提示 / 中间答案\n") + .text("- konaph hint modify \n") + .text(" - --pattern \n - 更改匹配规则\n") + .text(" - --hint \n - 更改提示文本\n") + .text(" - --checkpoint [True|False]\n - 更改是否为中间答案\n") + .text("- konaph hint delete \n - 删除一个提示 / 中间答案\n") + .text("\n更多关于 pattern 和中间答案的信息,请见 man:中间答案(7)") + ) + + @cmd_admin.assign("hint.add") + async def _(target: DepLongTaskTarget): + await target.send_message("114514") + return cmd_admin diff --git a/konabot/plugins/man/__init__.py b/konabot/plugins/man/__init__.py index 0d3a75a..451a079 100644 --- a/konabot/plugins/man/__init__.py +++ b/konabot/plugins/man/__init__.py @@ -1,4 +1,3 @@ -from curses.ascii import isdigit from pathlib import Path import nonebot @@ -40,7 +39,10 @@ async def _( doc: str | None, event: nonebot.adapters.Event, ): - if doc is not None and section is None and all(isdigit(c) for c in doc): + if doc is not None and section is None and all( + ord('0') <= ord(c) <= ord('9') + for c in doc + ): section = int(doc) doc = None