From a03cef412413f7b999c6a85e0001b170e315479d Mon Sep 17 00:00:00 2001 From: passthem Date: Sun, 26 Oct 2025 03:23:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=AF=86=E5=8E=A8=E6=9D=A5=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/common/longtask.py | 5 +- konabot/common/nb/extract_image.py | 2 - konabot/plugins/kona_ph/__init__.py | 86 ++++++ konabot/plugins/kona_ph/core/__init__.py | 0 konabot/plugins/kona_ph/core/storage.py | 314 ++++++++++++++++++++++ konabot/plugins/kona_ph/manager.py | 327 +++++++++++++++++++++++ 6 files changed, 731 insertions(+), 3 deletions(-) create mode 100644 konabot/plugins/kona_ph/__init__.py create mode 100644 konabot/plugins/kona_ph/core/__init__.py create mode 100644 konabot/plugins/kona_ph/core/storage.py create mode 100644 konabot/plugins/kona_ph/manager.py diff --git a/konabot/common/longtask.py b/konabot/common/longtask.py index b988e36..5f37391 100644 --- a/konabot/common/longtask.py +++ b/konabot/common/longtask.py @@ -51,13 +51,16 @@ class LongTaskTarget(BaseModel): target_id: str "沟通对象的 ID" - async def send_message(self, msg: UniMessage, at: bool = True) -> bool: + async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool: try: bot = nonebot.get_bot(self.self_id) except KeyError: logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}") return False + if isinstance(msg, str): + msg = UniMessage.text(msg) + if self.platform == "qq": if not isinstance(bot, OBBot): logger.warning( diff --git a/konabot/common/nb/extract_image.py b/konabot/common/nb/extract_image.py index eecbf87..d0bed4d 100644 --- a/konabot/common/nb/extract_image.py +++ b/konabot/common/nb/extract_image.py @@ -16,8 +16,6 @@ from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage from PIL import UnidentifiedImageError from returns.result import Failure, Result, Success -from konabot.common.nb.exc import BotExceptionMessage - async def download_image_bytes(url: str) -> Result[bytes, str]: # if "/matcha/cache/" in url: diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py new file mode 100644 index 0000000..51fdcef --- /dev/null +++ b/konabot/plugins/kona_ph/__init__.py @@ -0,0 +1,86 @@ +from math import ceil +from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna +from konabot.plugins.kona_ph.core.storage import get_today_date +from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager +from konabot.common.longtask import DepLongTaskTarget + + +create_admin_commands() + + +async def is_play_group(target: DepLongTaskTarget): + if target.channel_id in config.plugin_puzzle_playgroup: + return True + if target.target_id in target.channel_id: + return True + return False + + +cmd_submit = on_alconna(Alconna( + "提交答案", + Args["flag", str], +), rule=is_play_group) + +@cmd_submit.handle() +async def _(flag: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + result = manager.submit(target.target_id, flag) + await target.send_message(result.get_unimessage()) + + +cmd_query = on_alconna(Alconna( + r"re:(?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?" +), rule=is_play_group) + +@cmd_query.handle() +async def _(target: DepLongTaskTarget): + async with puzzle_manager() as manager: + p = manager.get_today_puzzle() + if p is None: + return await target.send_message("今天无题,改日再来吧!") + await target.send_message(p.get_unimessage()) + + +cmd_history = on_alconna(Alconna( + "历史题目", + Args["page?", int], + Args["index_id?", str], +), rule=is_play_group) + +@cmd_history.handle() +async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): + async with puzzle_manager() as manager: + today = get_today_date() + if index_id: + index_id = index_id.removeprefix("#") + if index_id == manager.daily_puzzle_of_date.get(today, ""): + puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] + return await target.send_message(puzzle.get_unimessage()) + if index_id in manager.daily_puzzle: + puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] + msg = puzzle.get_unimessage() + msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}") + return await target.send_message(msg) + return await target.send_message("没有这道题哦") + msg = UniMessage.text("====== 历史题目清单 ======\n\n") + puzzles = [ + (manager.puzzle_data[manager.daily_puzzle[i].raw_id], d) + for d, i in manager.daily_puzzle_of_date.items() + ] + puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True) + count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) + if page <= 0 or page > count_pages: + return await target.send_message(UniMessage.text( + f"页数只有 1 ~ {count_pages} 啦!" + )) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + for p, d in puzzles: + info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]] + msg = msg.text( + f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]" + f" {p.title} ({d})" + ) + msg = msg.text("\n") + msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") + await target.send_message(msg) + diff --git a/konabot/plugins/kona_ph/core/__init__.py b/konabot/plugins/kona_ph/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/konabot/plugins/kona_ph/core/storage.py b/konabot/plugins/kona_ph/core/storage.py new file mode 100644 index 0000000..d5e90c9 --- /dev/null +++ b/konabot/plugins/kona_ph/core/storage.py @@ -0,0 +1,314 @@ +import asyncio +import datetime +import random + +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any + +import nanoid + +from nonebot_plugin_alconna import UniMessage +from pydantic import BaseModel, Field, ValidationError + +from konabot.common.path import DATA_PATH + + +KONAPH_BASE = DATA_PATH / "KonaPH" +KONAPH_DATA_JSON = KONAPH_BASE / "data.json" +KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs" + +# 保证所有文件夹存在 +KONAPH_BASE.mkdir(exist_ok=True) +KONAPH_IMAGE_BASE.mkdir(exist_ok=True) + + +class Puzzle(BaseModel): + raw_id: str + "用于给出题者管理的 ID" + + index_id: str + "展出的 ID,以展出顺序为准" + + title: str + content: str + img_name: str + author_id: str + flag: str + + ready: bool = False + published: bool = False + pinned: bool = False + + created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) + + def get_image_path(self) -> Path: + return KONAPH_IMAGE_BASE / self.img_name + + def get_unimessage(self) -> UniMessage[Any]: + result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}") + result = result.text(f"\n\n{self.content}") + + if self.img_name: + result = result.text("\n\n").image(raw=self.get_image_path().read_bytes()) + + result = result.text("\n\n出题者:").at(self.author_id) + + return result + + def add_image(self, img: bytes, suffix: str = ".png"): + if self.img_name: + self.get_image_path().unlink(True) + img_id = nanoid.generate( + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", + 21, + ) + self.img_name = f"{img_id}{suffix}" + self.get_image_path().write_bytes(img) + + def remove_image(self): + if self.img_name: + self.get_image_path().unlink(True) + self.img_name = "" + + +class PuzzleSubmission(BaseModel): + success: bool + flag: str + time: datetime.datetime + + +class DailyPuzzleInfo(BaseModel): + raw_id: str + time: datetime.date + tried_users: set[str] = set() + success_users: dict[str, datetime.datetime] = {} + + +class PuzzleSubmissionResultMessage(BaseModel): + success: bool + rank: int = -1 + message: str = "" + + def get_unimessage(self) -> UniMessage[Any]: + if self.success: + return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!") + return UniMessage.text(self.message) + + +def get_today_date() -> datetime.date: + now = datetime.datetime.now() + if now.hour < 8: + now -= datetime.timedelta(days=1) + return now.date() + + +class PuzzleManager(BaseModel): + puzzle_data: dict[str, Puzzle] = {} + + daily_puzzle: dict[str, DailyPuzzleInfo] = {} + daily_puzzle_of_date: dict[datetime.date, str] = {} + + puzzle_pinned: str = "" + unpublished_puzzles: set[str] = set() + unready_puzzles: set[str] = set() + published_puzzles: set[str] = set() + + index_id_counter: int = 1 + submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {} + last_pubish_date: datetime.date = Field( + default_factory=lambda: get_today_date() - datetime.timedelta(days=1) + ) + last_checked_date: datetime.date = Field( + default_factory=lambda: get_today_date() - datetime.timedelta(days=1) + ) + + def publish_puzzle(self, raw_id: str): + assert raw_id in self.puzzle_data + + self.unpublished_puzzles -= set(raw_id) + self.unready_puzzles -= set(raw_id) + p = self.puzzle_data[raw_id] + p.index_id = str(self.index_id_counter) + p.ready = True + p.published = True + p.pinned = False + self.puzzle_pinned = "" + self.last_pubish_date = get_today_date() + self.last_checked_date = self.last_pubish_date + self.daily_puzzle[p.index_id] = DailyPuzzleInfo( + raw_id=raw_id, + time=self.last_pubish_date, + ) + self.daily_puzzle_of_date[self.last_pubish_date] = p.index_id + self.published_puzzles.add(raw_id) + + self.index_id_counter += 1 + + def admin_mark_ready(self, raw_id: str, ready: bool = True): + if raw_id not in self.puzzle_data: + return + if ready: + self.unready_puzzles -= set(raw_id) + if raw_id not in self.published_puzzles: + self.unpublished_puzzles.add(raw_id) + p = self.puzzle_data[raw_id] + p.ready = True + p.published = raw_id in self.published_puzzles + else: + self.unready_puzzles.add(raw_id) + self.unpublished_puzzles -= set(raw_id) + p = self.puzzle_data[raw_id] + p.ready = False + p.published = False + # if p.raw_id == self.puzzle_pinned: + # self.puzzle_pinned = "" + + def admin_pin_puzzle(self, raw_id: str): + if self.puzzle_pinned: + p = self.puzzle_data.get(self.puzzle_pinned) + if p is not None: + p.pinned = False + if raw_id in self.puzzle_data: + p = self.puzzle_data[raw_id] + p.pinned = True + self.puzzle_pinned = raw_id + else: + self.puzzle_pinned = "" + + def get_today_puzzle(self, strong: bool = False) -> Puzzle | None: + today = get_today_date() + if today in self.daily_puzzle_of_date: + index_id = self.daily_puzzle_of_date[today] + info = self.daily_puzzle[index_id] + return self.puzzle_data[info.raw_id] + if today == self.last_checked_date and not strong: + return + self.last_checked_date = today + if self.puzzle_pinned and self.puzzle_pinned in self.puzzle_data: + d = self.puzzle_pinned + self.publish_puzzle(d) + self.puzzle_pinned = "" + return self.puzzle_data[d] + elif len(self.unpublished_puzzles) > 0: + d = random.choice(list(self.unpublished_puzzles)) + self.publish_puzzle(d) + return self.puzzle_data[d] + + def get_today_info(self) -> DailyPuzzleInfo | None: + p = self.get_today_puzzle() + if p is None: + return + return self.daily_puzzle[p.index_id] + + def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage: + p = self.get_today_puzzle() + d = self.get_today_info() + now = datetime.datetime.now() + if p is None or d is None: + return PuzzleSubmissionResultMessage( + success=False, + message="今天没有题哦,改天再来吧!", + ) + if user in d.success_users: + return PuzzleSubmissionResultMessage( + success=False, + message="你今天已经答对过啦!不用重复提交哦!", + ) + if flag != p.flag: + d.tried_users.add(user) + self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( + success=False, + flag=flag, + time=now, + )) + return PuzzleSubmissionResultMessage( + success=False, + message="❌ 答错了,请检查你的答案哦", + ) + d.tried_users.add(user) + d.success_users[user] = now + self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( + success=True, + flag=flag, + time=now, + )) + return PuzzleSubmissionResultMessage( + success=True, + rank=len(d.success_users), + ) + + def admin_create_puzzle(self, user: str): + p = Puzzle( + raw_id=nanoid.generate( + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", + 12, + ), + index_id="", + title="示例标题", + content="题目的内容填写内容", + img_name="", + author_id=user, + flag="konaph{this_is_a_flag}", + ready=False, + published=False, + ) + self.unready_puzzles.add(p.raw_id) + self.puzzle_data[p.raw_id] = p + return p + + def get_puzzles_of_user(self, user: str): + return sorted([ + p for p in self.puzzle_data.values() + if p.author_id == user + ], key=lambda p: p.created_at, reverse=True) + + def get_report_yesterday(self): + yesterday = get_today_date() - datetime.timedelta(days=1) + index_id = self.daily_puzzle_of_date.get(yesterday) + if index_id is None: + return None + info = self.daily_puzzle[index_id] + puzzle = self.puzzle_data[info.raw_id] + message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告") + + if len(info.success_users) == 0: + message = message.text( + "\n\n昨日,竟无人解出此题!" + ) + else: + message = message.text( + f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:" + ) + us = [(u, d) for u, d in info.success_users.items()] + us = sorted(us, key=lambda t: t[1]) + us = us[:5] + for u, _ in us: + m = self.submissions[puzzle.raw_id][u][-1] + message = message.text("- ").at(u).text(f" 于 {m.time.strftime('%H:%M')}") + + message = message.text("\n\n出题者:").at(puzzle.author_id) + return message + + +lock = asyncio.Lock() + + +def read_data(): + try: + data_raw = KONAPH_DATA_JSON.read_text() + return PuzzleManager.model_validate_json(data_raw) + except (FileNotFoundError, ValidationError): + return PuzzleManager() + + +def write_data(data: PuzzleManager): + KONAPH_DATA_JSON.write_text(data.model_dump_json()) + + +@asynccontextmanager +async def puzzle_manager(): + async with lock: + data = read_data() + yield data + write_data(data) diff --git a/konabot/plugins/kona_ph/manager.py b/konabot/plugins/kona_ph/manager.py new file mode 100644 index 0000000..786d4dc --- /dev/null +++ b/konabot/plugins/kona_ph/manager.py @@ -0,0 +1,327 @@ +import datetime +from math import ceil +from typing import Any +from nonebot import get_plugin_config +from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna +from pydantic import BaseModel + +from konabot.common.longtask import DepLongTaskTarget +from konabot.common.nb.extract_image import download_image_bytes +from konabot.plugins.kona_ph.core.storage import Puzzle, get_today_date, puzzle_manager + +PUZZLE_PAGE_SIZE = 10 + + +class PuzzleConfig(BaseModel): + plugin_puzzle_manager: list[str] = [] + plugin_puzzle_admin: list[str] = [] + plugin_puzzle_playgroup: list[str] = [] + + +config = get_plugin_config(PuzzleConfig) + + +def is_puzzle_manager(target: DepLongTaskTarget): + return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target) + + +def is_puzzle_admin(target: DepLongTaskTarget): + return target.target_id in config.plugin_puzzle_admin + + +def get_puzzle_info_message(puzzle: Puzzle) -> UniMessage[Any]: + status = "✅ 已准备,待发布" if puzzle.ready and not puzzle.published else \ + (f"🟢 已发布: #{puzzle.index_id}" if puzzle.published else "⚙️ 未准备") + + status_suffix = "" + if puzzle.pinned: + status_suffix += " | 📌 已被管理员置顶" + + msg = UniMessage.text( + f"--- 谜题信息 ---\n" + f"Raw ID: {puzzle.raw_id}\n" + f"标题: {puzzle.title}\n" + f"出题者 ID: {puzzle.author_id}\n" + f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Flag: {puzzle.flag}\n" + f"状态: {status}{status_suffix}\n\n" + f"{puzzle.content}" + ) + + if puzzle.img_name: + msg = msg.image(raw=puzzle.get_image_path().read_bytes()) + + msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑") + + return msg + + +def create_admin_commands(): + cmd_admin = on_alconna( + Alconna( + "konaph", + Subcommand("create", dest="create"), + Subcommand("ready", Args["raw_id", str], dest="ready"), + Subcommand("unready", Args["raw_id", str], dest="unready"), + Subcommand("info", Args["raw_id", str], dest="info"), + Subcommand("my", Args["page?", int], dest="my"), + Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"), + Subcommand("pin", Args["raw_id?", str], dest="pin"), + Subcommand("unpin", dest="unpin"), + Subcommand( + "modify", + Args["raw_id?", str], + Option("--title", Args["title", str], alias=["-t"]), + Option("--description", Args["description", str], alias=["-d"]), + Option("--image", Args["image?", Image], alias=["-i"]), + Option("--flag", Args["flag", str], alias=["-f"]), + Option("--remove-image"), + dest="modify", + ), + Subcommand("publish", Args["raw_id?", str], dest="publish"), + ), + rule=is_puzzle_manager, + ) + + @cmd_admin.assign("$main") + async def _(target: DepLongTaskTarget): + msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n") + msg = msg.text("konaph create - 创建一个新的谜题\n") + msg = msg.text("konaph ready - 准备好一道谜题\n") + msg = msg.text("konaph unready - 取消准备一道谜题\n") + msg = msg.text("konaph info - 查看谜题\n") + msg = msg.text("konaph my - 查看我的谜题列表\n") + msg = msg.text("konaph modify - 查看如何修改谜题信息\n") + + if is_puzzle_admin(target): + msg = msg.text("konaph all [--ready] - 查看所有谜题\n") + msg = msg.text("konaph pin - 查看当前置顶谜题\n") + msg = msg.text("konaph pin - 置顶一个谜题\n") + msg = msg.text("konaph unpin - 取消置顶所有谜题\n") + msg = msg.text("konaph publish - 强制发题") + + await target.send_message(msg) + + @cmd_admin.assign("create") + async def _(target: DepLongTaskTarget): + async with puzzle_manager() as manager: + puzzle = manager.admin_create_puzzle(target.target_id) + await target.send_message(UniMessage.text( + f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n" + f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n" + f"- 输入 `konaph my` 查看你创建的谜题\n" + f"- 输入 `konaph modify` 查看更改谜题的方法" + )) + + @cmd_admin.assign("ready") + async def _(raw_id: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message(UniMessage.text( + "你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题" + )) + p = manager.puzzle_data[raw_id] + if p.author_id != target.target_id and not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题" + )) + if p.ready: + return await target.send_message(UniMessage.text( + "题目早就准备好啦!" + )) + manager.admin_mark_ready(raw_id, True) + await target.send_message(UniMessage.text( + f"谜题「{p.title}」已经准备就绪!" + )) + + @cmd_admin.assign("unready") + async def _(raw_id: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message(UniMessage.text( + "你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题" + )) + p = manager.puzzle_data[raw_id] + if p.author_id != target.target_id and not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题" + )) + if not p.ready: + return await target.send_message(UniMessage.text( + f"谜题「{p.title}」已经是未取消状态了!" + )) + if p.published: + return await target.send_message(UniMessage.text( + "已发布的谜题不能取消准备状态!" + )) + + manager.admin_mark_ready(raw_id, False) + await target.send_message(UniMessage.text( + f"谜题「{p.title}」已经取消准备!" + )) + + @cmd_admin.assign("info") + async def _(raw_id: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message(UniMessage.text( + "你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题" + )) + p = manager.puzzle_data[raw_id] + if p.author_id != target.target_id and not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "这不是你的题,你没有权限查看详细信息!" + )) + + await target.send_message(get_puzzle_info_message(p)) + + @cmd_admin.assign("my") + async def _(target: DepLongTaskTarget, page: int = 1): + async with puzzle_manager() as manager: + puzzles = manager.get_puzzles_of_user(target.target_id) + if len(puzzles) == 0: + return await target.send_message(UniMessage.text( + "你没有谜题哦,使用 `konaph create` 创建一个吧!" + )) + count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) + if page <= 0 or page > count_pages: + return await target.send_message(UniMessage.text( + f"页数只有 1 ~ {count_pages} 啦!" + )) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + message = UniMessage.text("==== 我的谜题 ====\n\n") + for p in puzzles: + message = message.text("- ") + if p.pinned: + message = message.text("[📌]") + if p.published: + message = message.text(f"[#{p.index_id}] ") + elif p.ready: + message = message.text("[✅] ") + else: + message = message.text("[⚙️] ") + message = message.text(f"{p.title} ({p.raw_id})") + message = message.text("\n") + message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") + await target.send_message(message) + + @cmd_admin.assign("all") + async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("ready"), page: int = 1): + if not is_puzzle_admin(target): + return await target.send_message(UniMessage.text("你没有权限查看所有的哦")) + async with puzzle_manager() as manager: + puzzles = [*manager.puzzle_data.values()] + if ready.available: + puzzles = [p for p in puzzles if p.ready] + puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True) + count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) + if page <= 0 or page > count_pages: + return await target.send_message(UniMessage.text( + f"页数只有 1 ~ {count_pages} 啦!" + )) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + message = UniMessage.text("==== 所有谜题 ====\n\n") + for p in puzzles: + message = message.text("- ") + if p.pinned: + message = message.text("[📌]") + if p.published: + message = message.text(f"[#{p.index_id}] ") + elif p.ready: + message = message.text("[✅] ") + else: + message = message.text("[⚙️] ") + message = message.text(f"{p.title} ({p.raw_id} by {p.author_id})") + message = message.text("\n") + message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") + await target.send_message(message) + + @cmd_admin.assign("pin") + async def _(target: DepLongTaskTarget, raw_id: str = ""): + if not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "你没有权限使用该指令" + )) + + async with puzzle_manager() as manager: + if raw_id == "": + if manager.puzzle_pinned: + return await target.send_message(UniMessage.text( + f"被 Pin 的谜题 ID = {manager.puzzle_pinned}" + )) + return await target.send_message("没有置顶谜题") + if raw_id not in manager.unpublished_puzzles: + return await target.send_message(UniMessage.text( + "这个谜题已经发布了,或者还没准备好,或者不存在" + )) + manager.admin_pin_puzzle(raw_id) + return await target.send_message(f"已置顶谜题 {raw_id}") + + @cmd_admin.assign("unpin") + async def _(target: DepLongTaskTarget): + if not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "你没有权限使用该指令" + )) + async with puzzle_manager() as manager: + manager.admin_pin_puzzle("") + return await target.send_message("已取消所有置顶") + + @cmd_admin.assign("modify") + async def _( + target: DepLongTaskTarget, + raw_id: str = "", + title: str | None = None, + description: str | None = None, + flag: str | None = None, + image: Image | None = None, + remove_image: Query[bool] = Query("--remove-image"), + ): + if raw_id == "": + return await target.send_message( + "konaph modify - 修改一个谜题\n\n" + "支持的参数:\n" + " --title 标题\n" + " --description 题目详情描述(用直引号包裹以支持多行)\n" + " --flag flag\n" + " --image <图片> 图片\n" + " --remove-image 删除图片" + ) + + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message("没有这个谜题") + p = manager.puzzle_data[raw_id] + if not is_puzzle_admin(target) and target.target_id != p.author_id: + return await target.send_message("你没有权限编辑这个谜题") + if title is not None: + p.title = title + if description is not None: + p.content = description + if flag is not None: + p.flag = flag + if image is not None and image.url is not None: + b = await download_image_bytes(image.url) + p.add_image(b.unwrap()) + elif remove_image.available: + p.remove_image() + + info2 = get_puzzle_info_message(p) + + return await target.send_message("修改好啦!看看效果:\n\n" + info2) + + @cmd_admin.assign("publish") + async def _(target: DepLongTaskTarget, raw_id: str | None = None): + today = get_today_date() + async with puzzle_manager() as manager: + if today in manager.daily_puzzle_of_date: + return await target.send_message("今日已经有题了哦") + manager.last_checked_date = today - datetime.timedelta(days=-1) + if raw_id is not None: + manager.admin_pin_puzzle(raw_id) + p = manager.get_today_puzzle(strong=True) + if p is None: + return await target.send_message("上架失败了orz,可能是没题了") + return await target.send_message("Ok!") + + return cmd_admin