From a03cef412413f7b999c6a85e0001b170e315479d Mon Sep 17 00:00:00 2001 From: passthem Date: Sun, 26 Oct 2025 03:23:51 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E8=A7=A3=E5=AF=86=E5=8E=A8=E6=9D=A5?= =?UTF-8?q?=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/common/longtask.py | 5 +- konabot/common/nb/extract_image.py | 2 - konabot/plugins/kona_ph/__init__.py | 86 ++++++ konabot/plugins/kona_ph/core/__init__.py | 0 konabot/plugins/kona_ph/core/storage.py | 314 ++++++++++++++++++++++ konabot/plugins/kona_ph/manager.py | 327 +++++++++++++++++++++++ 6 files changed, 731 insertions(+), 3 deletions(-) create mode 100644 konabot/plugins/kona_ph/__init__.py create mode 100644 konabot/plugins/kona_ph/core/__init__.py create mode 100644 konabot/plugins/kona_ph/core/storage.py create mode 100644 konabot/plugins/kona_ph/manager.py diff --git a/konabot/common/longtask.py b/konabot/common/longtask.py index b988e36..5f37391 100644 --- a/konabot/common/longtask.py +++ b/konabot/common/longtask.py @@ -51,13 +51,16 @@ class LongTaskTarget(BaseModel): target_id: str "沟通对象的 ID" - async def send_message(self, msg: UniMessage, at: bool = True) -> bool: + async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool: try: bot = nonebot.get_bot(self.self_id) except KeyError: logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}") return False + if isinstance(msg, str): + msg = UniMessage.text(msg) + if self.platform == "qq": if not isinstance(bot, OBBot): logger.warning( diff --git a/konabot/common/nb/extract_image.py b/konabot/common/nb/extract_image.py index eecbf87..d0bed4d 100644 --- a/konabot/common/nb/extract_image.py +++ b/konabot/common/nb/extract_image.py @@ -16,8 +16,6 @@ from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage from PIL import UnidentifiedImageError from returns.result import Failure, Result, Success -from konabot.common.nb.exc import BotExceptionMessage - async def download_image_bytes(url: str) -> Result[bytes, str]: # if "/matcha/cache/" in url: diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py new file mode 100644 index 0000000..51fdcef --- /dev/null +++ b/konabot/plugins/kona_ph/__init__.py @@ -0,0 +1,86 @@ +from math import ceil +from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna +from konabot.plugins.kona_ph.core.storage import get_today_date +from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager +from konabot.common.longtask import DepLongTaskTarget + + +create_admin_commands() + + +async def is_play_group(target: DepLongTaskTarget): + if target.channel_id in config.plugin_puzzle_playgroup: + return True + if target.target_id in target.channel_id: + return True + return False + + +cmd_submit = on_alconna(Alconna( + "提交答案", + Args["flag", str], +), rule=is_play_group) + +@cmd_submit.handle() +async def _(flag: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + result = manager.submit(target.target_id, flag) + await target.send_message(result.get_unimessage()) + + +cmd_query = on_alconna(Alconna( + r"re:(?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?" +), rule=is_play_group) + +@cmd_query.handle() +async def _(target: DepLongTaskTarget): + async with puzzle_manager() as manager: + p = manager.get_today_puzzle() + if p is None: + return await target.send_message("今天无题,改日再来吧!") + await target.send_message(p.get_unimessage()) + + +cmd_history = on_alconna(Alconna( + "历史题目", + Args["page?", int], + Args["index_id?", str], +), rule=is_play_group) + +@cmd_history.handle() +async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): + async with puzzle_manager() as manager: + today = get_today_date() + if index_id: + index_id = index_id.removeprefix("#") + if index_id == manager.daily_puzzle_of_date.get(today, ""): + puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] + return await target.send_message(puzzle.get_unimessage()) + if index_id in manager.daily_puzzle: + puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] + msg = puzzle.get_unimessage() + msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}") + return await target.send_message(msg) + return await target.send_message("没有这道题哦") + msg = UniMessage.text("====== 历史题目清单 ======\n\n") + puzzles = [ + (manager.puzzle_data[manager.daily_puzzle[i].raw_id], d) + for d, i in manager.daily_puzzle_of_date.items() + ] + puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True) + count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) + if page <= 0 or page > count_pages: + return await target.send_message(UniMessage.text( + f"页数只有 1 ~ {count_pages} 啦!" + )) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + for p, d in puzzles: + info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]] + msg = msg.text( + f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]" + f" {p.title} ({d})" + ) + msg = msg.text("\n") + msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") + await target.send_message(msg) + diff --git a/konabot/plugins/kona_ph/core/__init__.py b/konabot/plugins/kona_ph/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/konabot/plugins/kona_ph/core/storage.py b/konabot/plugins/kona_ph/core/storage.py new file mode 100644 index 0000000..d5e90c9 --- /dev/null +++ b/konabot/plugins/kona_ph/core/storage.py @@ -0,0 +1,314 @@ +import asyncio +import datetime +import random + +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any + +import nanoid + +from nonebot_plugin_alconna import UniMessage +from pydantic import BaseModel, Field, ValidationError + +from konabot.common.path import DATA_PATH + + +KONAPH_BASE = DATA_PATH / "KonaPH" +KONAPH_DATA_JSON = KONAPH_BASE / "data.json" +KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs" + +# 保证所有文件夹存在 +KONAPH_BASE.mkdir(exist_ok=True) +KONAPH_IMAGE_BASE.mkdir(exist_ok=True) + + +class Puzzle(BaseModel): + raw_id: str + "用于给出题者管理的 ID" + + index_id: str + "展出的 ID,以展出顺序为准" + + title: str + content: str + img_name: str + author_id: str + flag: str + + ready: bool = False + published: bool = False + pinned: bool = False + + created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) + + def get_image_path(self) -> Path: + return KONAPH_IMAGE_BASE / self.img_name + + def get_unimessage(self) -> UniMessage[Any]: + result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}") + result = result.text(f"\n\n{self.content}") + + if self.img_name: + result = result.text("\n\n").image(raw=self.get_image_path().read_bytes()) + + result = result.text("\n\n出题者:").at(self.author_id) + + return result + + def add_image(self, img: bytes, suffix: str = ".png"): + if self.img_name: + self.get_image_path().unlink(True) + img_id = nanoid.generate( + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", + 21, + ) + self.img_name = f"{img_id}{suffix}" + self.get_image_path().write_bytes(img) + + def remove_image(self): + if self.img_name: + self.get_image_path().unlink(True) + self.img_name = "" + + +class PuzzleSubmission(BaseModel): + success: bool + flag: str + time: datetime.datetime + + +class DailyPuzzleInfo(BaseModel): + raw_id: str + time: datetime.date + tried_users: set[str] = set() + success_users: dict[str, datetime.datetime] = {} + + +class PuzzleSubmissionResultMessage(BaseModel): + success: bool + rank: int = -1 + message: str = "" + + def get_unimessage(self) -> UniMessage[Any]: + if self.success: + return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!") + return UniMessage.text(self.message) + + +def get_today_date() -> datetime.date: + now = datetime.datetime.now() + if now.hour < 8: + now -= datetime.timedelta(days=1) + return now.date() + + +class PuzzleManager(BaseModel): + puzzle_data: dict[str, Puzzle] = {} + + daily_puzzle: dict[str, DailyPuzzleInfo] = {} + daily_puzzle_of_date: dict[datetime.date, str] = {} + + puzzle_pinned: str = "" + unpublished_puzzles: set[str] = set() + unready_puzzles: set[str] = set() + published_puzzles: set[str] = set() + + index_id_counter: int = 1 + submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {} + last_pubish_date: datetime.date = Field( + default_factory=lambda: get_today_date() - datetime.timedelta(days=1) + ) + last_checked_date: datetime.date = Field( + default_factory=lambda: get_today_date() - datetime.timedelta(days=1) + ) + + def publish_puzzle(self, raw_id: str): + assert raw_id in self.puzzle_data + + self.unpublished_puzzles -= set(raw_id) + self.unready_puzzles -= set(raw_id) + p = self.puzzle_data[raw_id] + p.index_id = str(self.index_id_counter) + p.ready = True + p.published = True + p.pinned = False + self.puzzle_pinned = "" + self.last_pubish_date = get_today_date() + self.last_checked_date = self.last_pubish_date + self.daily_puzzle[p.index_id] = DailyPuzzleInfo( + raw_id=raw_id, + time=self.last_pubish_date, + ) + self.daily_puzzle_of_date[self.last_pubish_date] = p.index_id + self.published_puzzles.add(raw_id) + + self.index_id_counter += 1 + + def admin_mark_ready(self, raw_id: str, ready: bool = True): + if raw_id not in self.puzzle_data: + return + if ready: + self.unready_puzzles -= set(raw_id) + if raw_id not in self.published_puzzles: + self.unpublished_puzzles.add(raw_id) + p = self.puzzle_data[raw_id] + p.ready = True + p.published = raw_id in self.published_puzzles + else: + self.unready_puzzles.add(raw_id) + self.unpublished_puzzles -= set(raw_id) + p = self.puzzle_data[raw_id] + p.ready = False + p.published = False + # if p.raw_id == self.puzzle_pinned: + # self.puzzle_pinned = "" + + def admin_pin_puzzle(self, raw_id: str): + if self.puzzle_pinned: + p = self.puzzle_data.get(self.puzzle_pinned) + if p is not None: + p.pinned = False + if raw_id in self.puzzle_data: + p = self.puzzle_data[raw_id] + p.pinned = True + self.puzzle_pinned = raw_id + else: + self.puzzle_pinned = "" + + def get_today_puzzle(self, strong: bool = False) -> Puzzle | None: + today = get_today_date() + if today in self.daily_puzzle_of_date: + index_id = self.daily_puzzle_of_date[today] + info = self.daily_puzzle[index_id] + return self.puzzle_data[info.raw_id] + if today == self.last_checked_date and not strong: + return + self.last_checked_date = today + if self.puzzle_pinned and self.puzzle_pinned in self.puzzle_data: + d = self.puzzle_pinned + self.publish_puzzle(d) + self.puzzle_pinned = "" + return self.puzzle_data[d] + elif len(self.unpublished_puzzles) > 0: + d = random.choice(list(self.unpublished_puzzles)) + self.publish_puzzle(d) + return self.puzzle_data[d] + + def get_today_info(self) -> DailyPuzzleInfo | None: + p = self.get_today_puzzle() + if p is None: + return + return self.daily_puzzle[p.index_id] + + def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage: + p = self.get_today_puzzle() + d = self.get_today_info() + now = datetime.datetime.now() + if p is None or d is None: + return PuzzleSubmissionResultMessage( + success=False, + message="今天没有题哦,改天再来吧!", + ) + if user in d.success_users: + return PuzzleSubmissionResultMessage( + success=False, + message="你今天已经答对过啦!不用重复提交哦!", + ) + if flag != p.flag: + d.tried_users.add(user) + self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( + success=False, + flag=flag, + time=now, + )) + return PuzzleSubmissionResultMessage( + success=False, + message="❌ 答错了,请检查你的答案哦", + ) + d.tried_users.add(user) + d.success_users[user] = now + self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( + success=True, + flag=flag, + time=now, + )) + return PuzzleSubmissionResultMessage( + success=True, + rank=len(d.success_users), + ) + + def admin_create_puzzle(self, user: str): + p = Puzzle( + raw_id=nanoid.generate( + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", + 12, + ), + index_id="", + title="示例标题", + content="题目的内容填写内容", + img_name="", + author_id=user, + flag="konaph{this_is_a_flag}", + ready=False, + published=False, + ) + self.unready_puzzles.add(p.raw_id) + self.puzzle_data[p.raw_id] = p + return p + + def get_puzzles_of_user(self, user: str): + return sorted([ + p for p in self.puzzle_data.values() + if p.author_id == user + ], key=lambda p: p.created_at, reverse=True) + + def get_report_yesterday(self): + yesterday = get_today_date() - datetime.timedelta(days=1) + index_id = self.daily_puzzle_of_date.get(yesterday) + if index_id is None: + return None + info = self.daily_puzzle[index_id] + puzzle = self.puzzle_data[info.raw_id] + message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告") + + if len(info.success_users) == 0: + message = message.text( + "\n\n昨日,竟无人解出此题!" + ) + else: + message = message.text( + f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:" + ) + us = [(u, d) for u, d in info.success_users.items()] + us = sorted(us, key=lambda t: t[1]) + us = us[:5] + for u, _ in us: + m = self.submissions[puzzle.raw_id][u][-1] + message = message.text("- ").at(u).text(f" 于 {m.time.strftime('%H:%M')}") + + message = message.text("\n\n出题者:").at(puzzle.author_id) + return message + + +lock = asyncio.Lock() + + +def read_data(): + try: + data_raw = KONAPH_DATA_JSON.read_text() + return PuzzleManager.model_validate_json(data_raw) + except (FileNotFoundError, ValidationError): + return PuzzleManager() + + +def write_data(data: PuzzleManager): + KONAPH_DATA_JSON.write_text(data.model_dump_json()) + + +@asynccontextmanager +async def puzzle_manager(): + async with lock: + data = read_data() + yield data + write_data(data) diff --git a/konabot/plugins/kona_ph/manager.py b/konabot/plugins/kona_ph/manager.py new file mode 100644 index 0000000..786d4dc --- /dev/null +++ b/konabot/plugins/kona_ph/manager.py @@ -0,0 +1,327 @@ +import datetime +from math import ceil +from typing import Any +from nonebot import get_plugin_config +from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna +from pydantic import BaseModel + +from konabot.common.longtask import DepLongTaskTarget +from konabot.common.nb.extract_image import download_image_bytes +from konabot.plugins.kona_ph.core.storage import Puzzle, get_today_date, puzzle_manager + +PUZZLE_PAGE_SIZE = 10 + + +class PuzzleConfig(BaseModel): + plugin_puzzle_manager: list[str] = [] + plugin_puzzle_admin: list[str] = [] + plugin_puzzle_playgroup: list[str] = [] + + +config = get_plugin_config(PuzzleConfig) + + +def is_puzzle_manager(target: DepLongTaskTarget): + return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target) + + +def is_puzzle_admin(target: DepLongTaskTarget): + return target.target_id in config.plugin_puzzle_admin + + +def get_puzzle_info_message(puzzle: Puzzle) -> UniMessage[Any]: + status = "✅ 已准备,待发布" if puzzle.ready and not puzzle.published else \ + (f"🟢 已发布: #{puzzle.index_id}" if puzzle.published else "⚙️ 未准备") + + status_suffix = "" + if puzzle.pinned: + status_suffix += " | 📌 已被管理员置顶" + + msg = UniMessage.text( + f"--- 谜题信息 ---\n" + f"Raw ID: {puzzle.raw_id}\n" + f"标题: {puzzle.title}\n" + f"出题者 ID: {puzzle.author_id}\n" + f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Flag: {puzzle.flag}\n" + f"状态: {status}{status_suffix}\n\n" + f"{puzzle.content}" + ) + + if puzzle.img_name: + msg = msg.image(raw=puzzle.get_image_path().read_bytes()) + + msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑") + + return msg + + +def create_admin_commands(): + cmd_admin = on_alconna( + Alconna( + "konaph", + Subcommand("create", dest="create"), + Subcommand("ready", Args["raw_id", str], dest="ready"), + Subcommand("unready", Args["raw_id", str], dest="unready"), + Subcommand("info", Args["raw_id", str], dest="info"), + Subcommand("my", Args["page?", int], dest="my"), + Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"), + Subcommand("pin", Args["raw_id?", str], dest="pin"), + Subcommand("unpin", dest="unpin"), + Subcommand( + "modify", + Args["raw_id?", str], + Option("--title", Args["title", str], alias=["-t"]), + Option("--description", Args["description", str], alias=["-d"]), + Option("--image", Args["image?", Image], alias=["-i"]), + Option("--flag", Args["flag", str], alias=["-f"]), + Option("--remove-image"), + dest="modify", + ), + Subcommand("publish", Args["raw_id?", str], dest="publish"), + ), + rule=is_puzzle_manager, + ) + + @cmd_admin.assign("$main") + async def _(target: DepLongTaskTarget): + msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n") + msg = msg.text("konaph create - 创建一个新的谜题\n") + msg = msg.text("konaph ready - 准备好一道谜题\n") + msg = msg.text("konaph unready - 取消准备一道谜题\n") + msg = msg.text("konaph info - 查看谜题\n") + msg = msg.text("konaph my - 查看我的谜题列表\n") + msg = msg.text("konaph modify - 查看如何修改谜题信息\n") + + if is_puzzle_admin(target): + msg = msg.text("konaph all [--ready] - 查看所有谜题\n") + msg = msg.text("konaph pin - 查看当前置顶谜题\n") + msg = msg.text("konaph pin - 置顶一个谜题\n") + msg = msg.text("konaph unpin - 取消置顶所有谜题\n") + msg = msg.text("konaph publish - 强制发题") + + await target.send_message(msg) + + @cmd_admin.assign("create") + async def _(target: DepLongTaskTarget): + async with puzzle_manager() as manager: + puzzle = manager.admin_create_puzzle(target.target_id) + await target.send_message(UniMessage.text( + f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n" + f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n" + f"- 输入 `konaph my` 查看你创建的谜题\n" + f"- 输入 `konaph modify` 查看更改谜题的方法" + )) + + @cmd_admin.assign("ready") + async def _(raw_id: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message(UniMessage.text( + "你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题" + )) + p = manager.puzzle_data[raw_id] + if p.author_id != target.target_id and not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题" + )) + if p.ready: + return await target.send_message(UniMessage.text( + "题目早就准备好啦!" + )) + manager.admin_mark_ready(raw_id, True) + await target.send_message(UniMessage.text( + f"谜题「{p.title}」已经准备就绪!" + )) + + @cmd_admin.assign("unready") + async def _(raw_id: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message(UniMessage.text( + "你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题" + )) + p = manager.puzzle_data[raw_id] + if p.author_id != target.target_id and not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题" + )) + if not p.ready: + return await target.send_message(UniMessage.text( + f"谜题「{p.title}」已经是未取消状态了!" + )) + if p.published: + return await target.send_message(UniMessage.text( + "已发布的谜题不能取消准备状态!" + )) + + manager.admin_mark_ready(raw_id, False) + await target.send_message(UniMessage.text( + f"谜题「{p.title}」已经取消准备!" + )) + + @cmd_admin.assign("info") + async def _(raw_id: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message(UniMessage.text( + "你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题" + )) + p = manager.puzzle_data[raw_id] + if p.author_id != target.target_id and not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "这不是你的题,你没有权限查看详细信息!" + )) + + await target.send_message(get_puzzle_info_message(p)) + + @cmd_admin.assign("my") + async def _(target: DepLongTaskTarget, page: int = 1): + async with puzzle_manager() as manager: + puzzles = manager.get_puzzles_of_user(target.target_id) + if len(puzzles) == 0: + return await target.send_message(UniMessage.text( + "你没有谜题哦,使用 `konaph create` 创建一个吧!" + )) + count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) + if page <= 0 or page > count_pages: + return await target.send_message(UniMessage.text( + f"页数只有 1 ~ {count_pages} 啦!" + )) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + message = UniMessage.text("==== 我的谜题 ====\n\n") + for p in puzzles: + message = message.text("- ") + if p.pinned: + message = message.text("[📌]") + if p.published: + message = message.text(f"[#{p.index_id}] ") + elif p.ready: + message = message.text("[✅] ") + else: + message = message.text("[⚙️] ") + message = message.text(f"{p.title} ({p.raw_id})") + message = message.text("\n") + message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") + await target.send_message(message) + + @cmd_admin.assign("all") + async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("ready"), page: int = 1): + if not is_puzzle_admin(target): + return await target.send_message(UniMessage.text("你没有权限查看所有的哦")) + async with puzzle_manager() as manager: + puzzles = [*manager.puzzle_data.values()] + if ready.available: + puzzles = [p for p in puzzles if p.ready] + puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True) + count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) + if page <= 0 or page > count_pages: + return await target.send_message(UniMessage.text( + f"页数只有 1 ~ {count_pages} 啦!" + )) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + message = UniMessage.text("==== 所有谜题 ====\n\n") + for p in puzzles: + message = message.text("- ") + if p.pinned: + message = message.text("[📌]") + if p.published: + message = message.text(f"[#{p.index_id}] ") + elif p.ready: + message = message.text("[✅] ") + else: + message = message.text("[⚙️] ") + message = message.text(f"{p.title} ({p.raw_id} by {p.author_id})") + message = message.text("\n") + message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") + await target.send_message(message) + + @cmd_admin.assign("pin") + async def _(target: DepLongTaskTarget, raw_id: str = ""): + if not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "你没有权限使用该指令" + )) + + async with puzzle_manager() as manager: + if raw_id == "": + if manager.puzzle_pinned: + return await target.send_message(UniMessage.text( + f"被 Pin 的谜题 ID = {manager.puzzle_pinned}" + )) + return await target.send_message("没有置顶谜题") + if raw_id not in manager.unpublished_puzzles: + return await target.send_message(UniMessage.text( + "这个谜题已经发布了,或者还没准备好,或者不存在" + )) + manager.admin_pin_puzzle(raw_id) + return await target.send_message(f"已置顶谜题 {raw_id}") + + @cmd_admin.assign("unpin") + async def _(target: DepLongTaskTarget): + if not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "你没有权限使用该指令" + )) + async with puzzle_manager() as manager: + manager.admin_pin_puzzle("") + return await target.send_message("已取消所有置顶") + + @cmd_admin.assign("modify") + async def _( + target: DepLongTaskTarget, + raw_id: str = "", + title: str | None = None, + description: str | None = None, + flag: str | None = None, + image: Image | None = None, + remove_image: Query[bool] = Query("--remove-image"), + ): + if raw_id == "": + return await target.send_message( + "konaph modify - 修改一个谜题\n\n" + "支持的参数:\n" + " --title 标题\n" + " --description 题目详情描述(用直引号包裹以支持多行)\n" + " --flag flag\n" + " --image <图片> 图片\n" + " --remove-image 删除图片" + ) + + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message("没有这个谜题") + p = manager.puzzle_data[raw_id] + if not is_puzzle_admin(target) and target.target_id != p.author_id: + return await target.send_message("你没有权限编辑这个谜题") + if title is not None: + p.title = title + if description is not None: + p.content = description + if flag is not None: + p.flag = flag + if image is not None and image.url is not None: + b = await download_image_bytes(image.url) + p.add_image(b.unwrap()) + elif remove_image.available: + p.remove_image() + + info2 = get_puzzle_info_message(p) + + return await target.send_message("修改好啦!看看效果:\n\n" + info2) + + @cmd_admin.assign("publish") + async def _(target: DepLongTaskTarget, raw_id: str | None = None): + today = get_today_date() + async with puzzle_manager() as manager: + if today in manager.daily_puzzle_of_date: + return await target.send_message("今日已经有题了哦") + manager.last_checked_date = today - datetime.timedelta(days=-1) + if raw_id is not None: + manager.admin_pin_puzzle(raw_id) + p = manager.get_today_puzzle(strong=True) + if p is None: + return await target.send_message("上架失败了orz,可能是没题了") + return await target.send_message("Ok!") + + return cmd_admin From 534a2c9e753c17cc99478047cf4c89078d0fe2c0 Mon Sep 17 00:00:00 2001 From: passthem Date: Sun, 26 Oct 2025 03:42:28 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E8=A7=A3=E5=AF=86=E5=8E=A8=E6=9D=A5?= =?UTF-8?q?=E4=BA=862?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/common/nb/qq_broadcast.py | 30 +++++++++++++++++++++++++++++ konabot/plugins/kona_ph/__init__.py | 15 +++++++++++++++ konabot/plugins/kona_ph/manager.py | 2 ++ 3 files changed, 47 insertions(+) create mode 100644 konabot/common/nb/qq_broadcast.py diff --git a/konabot/common/nb/qq_broadcast.py b/konabot/common/nb/qq_broadcast.py new file mode 100644 index 0000000..8adab2f --- /dev/null +++ b/konabot/common/nb/qq_broadcast.py @@ -0,0 +1,30 @@ +from typing import Any, cast +import nonebot +from nonebot_plugin_alconna import UniMessage +from nonebot.adapters.onebot.v11 import Bot as OBBot + + +async def qq_broadcast(groups: list[str], msg: UniMessage[Any]): + bots: dict[str, OBBot] = {} + + # group_id -> bot_id + availabilities: dict[str, str] = {} + + for bot_id, bot in nonebot.get_bots().items(): + if not isinstance(bot, OBBot): + continue + bots[bot_id] = bot + gl = await bot.get_group_list() + for g in gl: + gid = str(g.get("group_id", -1)) + if gid in groups: + availabilities[gid] = bot_id + + for group in groups: + if group in availabilities: + bot = bots[availabilities[group]] + await bot.send_group_msg( + group_id=int(group), + message=cast(Any, await msg.export(bot)), + auto_escape=False, + ) diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py index 51fdcef..949f9f3 100644 --- a/konabot/plugins/kona_ph/__init__.py +++ b/konabot/plugins/kona_ph/__init__.py @@ -1,9 +1,13 @@ from math import ceil +from loguru import logger from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna +from konabot.common.nb.qq_broadcast import qq_broadcast from konabot.plugins.kona_ph.core.storage import get_today_date from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager from konabot.common.longtask import DepLongTaskTarget +from nonebot_plugin_apscheduler import scheduler + create_admin_commands() @@ -84,3 +88,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") await target.send_message(msg) + +@scheduler.scheduled_job("cron", hour="8") +async def _(): + async with puzzle_manager() as manager: + puzzle = manager.get_today_puzzle() + if puzzle is not None: + logger.info(f"找到了题目 {puzzle.raw_id},发送") + await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage()) + else: + logger.info("自动任务:没有找到题目,跳过") + diff --git a/konabot/plugins/kona_ph/manager.py b/konabot/plugins/kona_ph/manager.py index 786d4dc..f4d4d54 100644 --- a/konabot/plugins/kona_ph/manager.py +++ b/konabot/plugins/kona_ph/manager.py @@ -7,6 +7,7 @@ from pydantic import BaseModel from konabot.common.longtask import DepLongTaskTarget from konabot.common.nb.extract_image import download_image_bytes +from konabot.common.nb.qq_broadcast import qq_broadcast from konabot.plugins.kona_ph.core.storage import Puzzle, get_today_date, puzzle_manager PUZZLE_PAGE_SIZE = 10 @@ -322,6 +323,7 @@ def create_admin_commands(): p = manager.get_today_puzzle(strong=True) if p is None: return await target.send_message("上架失败了orz,可能是没题了") + await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage()) return await target.send_message("Ok!") return cmd_admin From 0a822bf440a4c606ec8cf953c31bfbe3ef245d66 Mon Sep 17 00:00:00 2001 From: passthem Date: Sun, 26 Oct 2025 03:55:31 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E4=BC=98=E5=8C=96=20konaph=20UX=20?= =?UTF-8?q?=E5=B9=B6=E6=B7=BB=E5=8A=A0=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/docs/sys/konaph.txt | 4 ++++ konabot/plugins/kona_ph/__init__.py | 2 +- konabot/plugins/kona_ph/core/storage.py | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 konabot/docs/sys/konaph.txt diff --git a/konabot/docs/sys/konaph.txt b/konabot/docs/sys/konaph.txt new file mode 100644 index 0000000..f6a7a89 --- /dev/null +++ b/konabot/docs/sys/konaph.txt @@ -0,0 +1,4 @@ +指令介绍 + konaph - KonaBot 的 PuzzleHunt 管理工具 + +详细介绍请直接输入 konaph 获取使用指引(该指令权限仅对部分人开放。如果你有权限的话才有响应。建议在此方 BOT 私聊使用该指令。) diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py index 949f9f3..7e10507 100644 --- a/konabot/plugins/kona_ph/__init__.py +++ b/konabot/plugins/kona_ph/__init__.py @@ -21,7 +21,7 @@ async def is_play_group(target: DepLongTaskTarget): cmd_submit = on_alconna(Alconna( - "提交答案", + "re:提交(?:答案|题解|[fF]lag)", Args["flag", str], ), rule=is_play_group) diff --git a/konabot/plugins/kona_ph/core/storage.py b/konabot/plugins/kona_ph/core/storage.py index d5e90c9..e4d23a3 100644 --- a/konabot/plugins/kona_ph/core/storage.py +++ b/konabot/plugins/kona_ph/core/storage.py @@ -53,6 +53,7 @@ class Puzzle(BaseModel): result = result.text("\n\n").image(raw=self.get_image_path().read_bytes()) result = result.text("\n\n出题者:").at(self.author_id) + result = result.text("\n\n输入「提交答案 答案」来提交你的解答") return result From f9deabfce0608f300a4842b3afea71981904154e Mon Sep 17 00:00:00 2001 From: passthem Date: Sun, 26 Oct 2025 04:03:51 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20Query=20=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/plugins/kona_ph/manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/konabot/plugins/kona_ph/manager.py b/konabot/plugins/kona_ph/manager.py index f4d4d54..9de32be 100644 --- a/konabot/plugins/kona_ph/manager.py +++ b/konabot/plugins/kona_ph/manager.py @@ -207,7 +207,7 @@ def create_admin_commands(): await target.send_message(message) @cmd_admin.assign("all") - async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("ready"), page: int = 1): + async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1): if not is_puzzle_admin(target): return await target.send_message(UniMessage.text("你没有权限查看所有的哦")) async with puzzle_manager() as manager: @@ -276,7 +276,7 @@ def create_admin_commands(): description: str | None = None, flag: str | None = None, image: Image | None = None, - remove_image: Query[bool] = Query("--remove-image"), + remove_image: Query[bool] = Query("modify.remove-image"), ): if raw_id == "": return await target.send_message( From 312e203bbe00e878a774019a6c0c0ab3db87512a Mon Sep 17 00:00:00 2001 From: passthem Date: Sun, 26 Oct 2025 04:07:26 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E5=BF=98=E8=AE=B0=E6=8A=8A=E8=BF=99?= =?UTF-8?q?=E4=B8=AA=E7=AD=94=E9=A2=98=E6=83=85=E5=86=B5=E9=80=9A=E7=9F=A5?= =?UTF-8?q?=E5=8A=A0=E4=B8=8A=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/plugins/kona_ph/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py index 7e10507..f0e9f78 100644 --- a/konabot/plugins/kona_ph/__init__.py +++ b/konabot/plugins/kona_ph/__init__.py @@ -92,6 +92,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): @scheduler.scheduled_job("cron", hour="8") async def _(): async with puzzle_manager() as manager: + msg2 = manager.get_report_yesterday() + if msg2 is not None: + await qq_broadcast(config.plugin_puzzle_playgroup, msg2) + puzzle = manager.get_today_puzzle() if puzzle is not None: logger.info(f"找到了题目 {puzzle.raw_id},发送") @@ -99,3 +103,4 @@ async def _(): else: logger.info("自动任务:没有找到题目,跳过") + From 3e395f8a35b62c789275a27b5c74ddfde769a476 Mon Sep 17 00:00:00 2001 From: passthem Date: Sun, 26 Oct 2025 11:56:03 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E6=9B=B4=E5=B0=91=E7=9A=84=E9=87=8F?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E8=A7=A3=E8=80=A6=E7=9A=84=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E5=81=A5=E5=A3=AE=E7=9A=84=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/common/longtask.py | 4 ++ konabot/plugins/kona_ph/__init__.py | 6 +- konabot/plugins/kona_ph/core/storage.py | 96 +++++++++++++------------ konabot/plugins/kona_ph/manager.py | 28 ++++---- 4 files changed, 73 insertions(+), 61 deletions(-) diff --git a/konabot/common/longtask.py b/konabot/common/longtask.py index 5f37391..72e42b7 100644 --- a/konabot/common/longtask.py +++ b/konabot/common/longtask.py @@ -51,6 +51,10 @@ class LongTaskTarget(BaseModel): target_id: str "沟通对象的 ID" + @property + def is_private_chat(self): + return self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX) + async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool: try: bot = nonebot.get_bot(self.self_id) diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py index f0e9f78..f3e4a9f 100644 --- a/konabot/plugins/kona_ph/__init__.py +++ b/konabot/plugins/kona_ph/__init__.py @@ -13,9 +13,9 @@ create_admin_commands() async def is_play_group(target: DepLongTaskTarget): - if target.channel_id in config.plugin_puzzle_playgroup: + if target.is_private_chat: return True - if target.target_id in target.channel_id: + if target.channel_id in config.plugin_puzzle_playgroup: return True return False @@ -33,7 +33,7 @@ async def _(flag: str, target: DepLongTaskTarget): cmd_query = on_alconna(Alconna( - r"re:(?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?" + r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)" ), rule=is_play_group) @cmd_query.handle() diff --git a/konabot/plugins/kona_ph/core/storage.py b/konabot/plugins/kona_ph/core/storage.py index e4d23a3..467e546 100644 --- a/konabot/plugins/kona_ph/core/storage.py +++ b/konabot/plugins/kona_ph/core/storage.py @@ -37,9 +37,6 @@ class Puzzle(BaseModel): flag: str ready: bool = False - published: bool = False - pinned: bool = False - created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) def get_image_path(self) -> Path: @@ -111,68 +108,81 @@ class PuzzleManager(BaseModel): daily_puzzle_of_date: dict[datetime.date, str] = {} puzzle_pinned: str = "" - unpublished_puzzles: set[str] = set() - unready_puzzles: set[str] = set() - published_puzzles: set[str] = set() index_id_counter: int = 1 submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {} - last_pubish_date: datetime.date = Field( - default_factory=lambda: get_today_date() - datetime.timedelta(days=1) - ) last_checked_date: datetime.date = Field( default_factory=lambda: get_today_date() - datetime.timedelta(days=1) ) + @property + def last_publish_date(self): + return max(self.daily_puzzle_of_date.keys()) + + @property + def unpublished_puzzles(self): + return set(( + p.raw_id for p in self.puzzle_data.values() + if not self.is_puzzle_published(p.raw_id) and p.ready + )) + + @property + def unready_puzzles(self): + return set(( + p.raw_id for p in self.puzzle_data.values() + if not self.is_puzzle_published(p.raw_id) and not p.ready + )) + + @property + def published_puzzles(self): + return set(( + p.raw_id for p in self.puzzle_data.values() + if self.is_puzzle_published(p.raw_id) + )) + + def is_puzzle_published(self, raw_id: str): + return raw_id in [i.raw_id for i in self.daily_puzzle.values()] + def publish_puzzle(self, raw_id: str): assert raw_id in self.puzzle_data - self.unpublished_puzzles -= set(raw_id) - self.unready_puzzles -= set(raw_id) + today = get_today_date() + p = self.puzzle_data[raw_id] p.index_id = str(self.index_id_counter) p.ready = True - p.published = True - p.pinned = False self.puzzle_pinned = "" - self.last_pubish_date = get_today_date() - self.last_checked_date = self.last_pubish_date + self.last_checked_date = today self.daily_puzzle[p.index_id] = DailyPuzzleInfo( raw_id=raw_id, - time=self.last_pubish_date, + time=today, ) - self.daily_puzzle_of_date[self.last_pubish_date] = p.index_id - self.published_puzzles.add(raw_id) + self.daily_puzzle_of_date[today] = p.index_id self.index_id_counter += 1 - def admin_mark_ready(self, raw_id: str, ready: bool = True): - if raw_id not in self.puzzle_data: - return - if ready: - self.unready_puzzles -= set(raw_id) - if raw_id not in self.published_puzzles: - self.unpublished_puzzles.add(raw_id) - p = self.puzzle_data[raw_id] - p.ready = True - p.published = raw_id in self.published_puzzles - else: - self.unready_puzzles.add(raw_id) - self.unpublished_puzzles -= set(raw_id) - p = self.puzzle_data[raw_id] - p.ready = False - p.published = False - # if p.raw_id == self.puzzle_pinned: - # self.puzzle_pinned = "" + def fix(self): + # 尝试修复今日的数据 + for p in self.puzzle_data.values(): + if self.is_puzzle_published(p.raw_id): + p.ready = True + + if self.puzzle_pinned not in self.unpublished_puzzles: + self.puzzle_pinned = "" + + # 撤回重复发布的题 + already_published: set[str] = set() + for date in list(self.daily_puzzle_of_date.keys()): + index_id = self.daily_puzzle_of_date[date] + info = self.daily_puzzle[index_id] + if info.raw_id in already_published: + del self.daily_puzzle[index_id] + del self.daily_puzzle_of_date[date] + else: + already_published.add(info.raw_id) def admin_pin_puzzle(self, raw_id: str): - if self.puzzle_pinned: - p = self.puzzle_data.get(self.puzzle_pinned) - if p is not None: - p.pinned = False if raw_id in self.puzzle_data: - p = self.puzzle_data[raw_id] - p.pinned = True self.puzzle_pinned = raw_id else: self.puzzle_pinned = "" @@ -252,9 +262,7 @@ class PuzzleManager(BaseModel): author_id=user, flag="konaph{this_is_a_flag}", ready=False, - published=False, ) - self.unready_puzzles.add(p.raw_id) self.puzzle_data[p.raw_id] = p return p diff --git a/konabot/plugins/kona_ph/manager.py b/konabot/plugins/kona_ph/manager.py index 9de32be..a6ba642 100644 --- a/konabot/plugins/kona_ph/manager.py +++ b/konabot/plugins/kona_ph/manager.py @@ -8,7 +8,7 @@ from pydantic import BaseModel from konabot.common.longtask import DepLongTaskTarget from konabot.common.nb.extract_image import download_image_bytes from konabot.common.nb.qq_broadcast import qq_broadcast -from konabot.plugins.kona_ph.core.storage import Puzzle, get_today_date, puzzle_manager +from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager PUZZLE_PAGE_SIZE = 10 @@ -30,12 +30,12 @@ def is_puzzle_admin(target: DepLongTaskTarget): return target.target_id in config.plugin_puzzle_admin -def get_puzzle_info_message(puzzle: Puzzle) -> UniMessage[Any]: - status = "✅ 已准备,待发布" if puzzle.ready and not puzzle.published else \ - (f"🟢 已发布: #{puzzle.index_id}" if puzzle.published else "⚙️ 未准备") +def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]: + status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \ + (f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备") status_suffix = "" - if puzzle.pinned: + if puzzle.raw_id == manager.puzzle_pinned: status_suffix += " | 📌 已被管理员置顶" msg = UniMessage.text( @@ -130,7 +130,7 @@ def create_admin_commands(): return await target.send_message(UniMessage.text( "题目早就准备好啦!" )) - manager.admin_mark_ready(raw_id, True) + p.ready = True await target.send_message(UniMessage.text( f"谜题「{p.title}」已经准备就绪!" )) @@ -151,12 +151,12 @@ def create_admin_commands(): return await target.send_message(UniMessage.text( f"谜题「{p.title}」已经是未取消状态了!" )) - if p.published: + if manager.is_puzzle_published(p.raw_id): return await target.send_message(UniMessage.text( "已发布的谜题不能取消准备状态!" )) - manager.admin_mark_ready(raw_id, False) + p.ready = False await target.send_message(UniMessage.text( f"谜题「{p.title}」已经取消准备!" )) @@ -174,7 +174,7 @@ def create_admin_commands(): "这不是你的题,你没有权限查看详细信息!" )) - await target.send_message(get_puzzle_info_message(p)) + await target.send_message(get_puzzle_info_message(manager, p)) @cmd_admin.assign("my") async def _(target: DepLongTaskTarget, page: int = 1): @@ -193,9 +193,9 @@ def create_admin_commands(): message = UniMessage.text("==== 我的谜题 ====\n\n") for p in puzzles: message = message.text("- ") - if p.pinned: + if manager.puzzle_pinned == p.raw_id: message = message.text("[📌]") - if p.published: + if manager.is_puzzle_published(p.raw_id): message = message.text(f"[#{p.index_id}] ") elif p.ready: message = message.text("[✅] ") @@ -224,9 +224,9 @@ def create_admin_commands(): message = UniMessage.text("==== 所有谜题 ====\n\n") for p in puzzles: message = message.text("- ") - if p.pinned: + if p.raw_id == manager.puzzle_pinned: message = message.text("[📌]") - if p.published: + if manager.is_puzzle_published(p.raw_id): message = message.text(f"[#{p.index_id}] ") elif p.ready: message = message.text("[✅] ") @@ -307,7 +307,7 @@ def create_admin_commands(): elif remove_image.available: p.remove_image() - info2 = get_puzzle_info_message(p) + info2 = get_puzzle_info_message(manager, p) return await target.send_message("修改好啦!看看效果:\n\n" + info2) From 0acffea86d87f30a069da61281ba77894f853154 Mon Sep 17 00:00:00 2001 From: passthem Date: Sun, 26 Oct 2025 12:45:29 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8E=92=E8=A1=8C?= =?UTF-8?q?=E6=A6=9C=EF=BC=8C=E4=BC=98=E5=8C=96=20UX?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/common/username.py | 54 +++++++++++++++++++ konabot/plugins/kona_ph/__init__.py | 49 +++++++++++++++++ konabot/plugins/kona_ph/core/storage.py | 7 +-- .../plugins/{auto_accept.py => routine.py} | 15 ++++++ 4 files changed, 122 insertions(+), 3 deletions(-) create mode 100644 konabot/common/username.py rename konabot/plugins/{auto_accept.py => routine.py} (66%) diff --git a/konabot/common/username.py b/konabot/common/username.py new file mode 100644 index 0000000..c5209e4 --- /dev/null +++ b/konabot/common/username.py @@ -0,0 +1,54 @@ +import re +import nonebot + +from nonebot.adapters.onebot.v11 import Bot as OBBot + + +class UsernameManager: + grouped_data: dict[int, dict[int, str]] + individual_data: dict[int, str] + + def __init__(self) -> None: + self.grouped_data = {} + self.individual_data = {} + + async def update(self): + for bot in nonebot.get_bots().values(): + if isinstance(bot, OBBot): + for user in await bot.get_friend_list(): + uid = user["user_id"] + nickname = user["nickname"] + self.individual_data[uid] = nickname + for group in await bot.get_group_list(): + gid = group["group_id"] + for member in await bot.get_group_member_list(group_id=gid): + uid = member["user_id"] + card = member.get("card", "") + nickname = member.get("nickname", "") + if card: + self.grouped_data.setdefault(gid, {})[uid] = card + if nickname: + self.individual_data[uid] = nickname + + def get(self, qqid: int, groupid: int | None = None) -> str: + if groupid is not None and groupid in self.grouped_data: + n = self.grouped_data[groupid].get(qqid) + if n is not None: + return n + if qqid in self.individual_data: + return self.individual_data[qqid] + return str(qqid) + + +manager = UsernameManager() + +def get_username(qqid: int | str, group: int | str | None = None): + if isinstance(group, str): + group = None if not re.match(r"^\d+$", group) else int(group) + if isinstance(qqid, str): + if re.match(r"^\d+$", qqid): + qqid = int(qqid) + else: + return qqid + return manager.get(qqid, group) + diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py index f3e4a9f..8b53ea6 100644 --- a/konabot/plugins/kona_ph/__init__.py +++ b/konabot/plugins/kona_ph/__init__.py @@ -1,7 +1,10 @@ +from functools import reduce from math import ceil +import re from loguru import logger from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna from konabot.common.nb.qq_broadcast import qq_broadcast +from konabot.common.username import get_username from konabot.plugins.kona_ph.core.storage import get_today_date from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager from konabot.common.longtask import DepLongTaskTarget @@ -45,6 +48,52 @@ async def _(target: DepLongTaskTarget): await target.send_message(p.get_unimessage()) +cmd_query_submission = on_alconna(Alconna( + "今日答题情况" +), rule=is_play_group) + +@cmd_query_submission.handle() +async def _(target: DepLongTaskTarget): + async with puzzle_manager() as manager: + p = manager.get_today_puzzle() + if p is None: + return await target.send_message("今天无题") + msg = UniMessage.text("==== 今日答题情况 ====\n\n") + + subcount = len(reduce( + lambda x, y: x + y, + manager.submissions.get(p.raw_id, {}).values(), + [], + )) + info = manager.daily_puzzle[p.index_id] + + msg = msg.text( + f"总体情况:答对 {len(info.success_users)} / " + f"参与 {len(info.tried_users)} / " + f"提交 {subcount}\n" + ) + + success_users = sorted(list(info.success_users.items()), key=lambda v: v[1]) + gid = None + if re.match(r"^\d+$", target.channel_id): + gid = int(target.channel_id) + for u, d in success_users: + uname = u + if re.match(r"^\d+$", u): + uname = get_username(int(u), gid) + t = d.strftime("%H:%M") + tries = len(manager.submissions[p.raw_id][u]) + msg = msg.text(f"\n- {uname} [Solved at {t} in {tries} times]") + for u in info.tried_users - set(info.success_users.keys()): + uname = u + if re.match(r"^\d+$", u): + uname = get_username(int(u), gid) + tries = len(manager.submissions[p.raw_id][u]) + msg = msg.text(f"\n- {uname} [Unsolved in {tries} times]") + + await target.send_message(msg) + + cmd_history = on_alconna(Alconna( "历史题目", Args["page?", int], diff --git a/konabot/plugins/kona_ph/core/storage.py b/konabot/plugins/kona_ph/core/storage.py index 467e546..dd5a1fe 100644 --- a/konabot/plugins/kona_ph/core/storage.py +++ b/konabot/plugins/kona_ph/core/storage.py @@ -12,6 +12,7 @@ from nonebot_plugin_alconna import UniMessage from pydantic import BaseModel, Field, ValidationError from konabot.common.path import DATA_PATH +from konabot.common.username import get_username KONAPH_BASE = DATA_PATH / "KonaPH" @@ -49,7 +50,7 @@ class Puzzle(BaseModel): if self.img_name: result = result.text("\n\n").image(raw=self.get_image_path().read_bytes()) - result = result.text("\n\n出题者:").at(self.author_id) + result = result.text(f"\n\n出题者:{get_username(self.author_id)}") result = result.text("\n\n输入「提交答案 答案」来提交你的解答") return result @@ -294,9 +295,9 @@ class PuzzleManager(BaseModel): us = us[:5] for u, _ in us: m = self.submissions[puzzle.raw_id][u][-1] - message = message.text("- ").at(u).text(f" 于 {m.time.strftime('%H:%M')}") + message = message.text(f"- {get_username(u)} 于 {m.time.strftime('%H:%M')}") - message = message.text("\n\n出题者:").at(puzzle.author_id) + message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}") return message diff --git a/konabot/plugins/auto_accept.py b/konabot/plugins/routine.py similarity index 66% rename from konabot/plugins/auto_accept.py rename to konabot/plugins/routine.py index 3283f03..5305435 100644 --- a/konabot/plugins/auto_accept.py +++ b/konabot/plugins/routine.py @@ -4,10 +4,13 @@ from typing import cast from loguru import logger from nonebot import get_bot, on_request +import nonebot from nonebot.adapters.onebot.v11.event import FriendRequestEvent from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot +from nonebot_plugin_apscheduler import scheduler from konabot.common.nb.is_admin import cfg as adminConfig +from konabot.common.username import manager add_request = on_request() @@ -23,3 +26,15 @@ async def _(req: FriendRequestEvent): await req.approve(bot) logger.info(f"已经自动同意 {req.user_id} 的好友请求") +@scheduler.scheduled_job("cron", minute="*/5") +async def _(): + logger.info("尝试更新群成员信息") + await manager.update() + +driver = nonebot.get_driver() + +@driver.on_bot_connect +async def _(): + logger.info("有 Bot 连接,5 秒后试着更新群成员信息") + await asyncio.sleep(5) + await manager.update()