diff --git a/konabot/common/longtask.py b/konabot/common/longtask.py index b988e36..72e42b7 100644 --- a/konabot/common/longtask.py +++ b/konabot/common/longtask.py @@ -51,13 +51,20 @@ class LongTaskTarget(BaseModel): target_id: str "沟通对象的 ID" - async def send_message(self, msg: UniMessage, at: bool = True) -> bool: + @property + def is_private_chat(self): + return self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX) + + async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool: try: bot = nonebot.get_bot(self.self_id) except KeyError: logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}") return False + if isinstance(msg, str): + msg = UniMessage.text(msg) + if self.platform == "qq": if not isinstance(bot, OBBot): logger.warning( diff --git a/konabot/common/nb/extract_image.py b/konabot/common/nb/extract_image.py index eecbf87..d0bed4d 100644 --- a/konabot/common/nb/extract_image.py +++ b/konabot/common/nb/extract_image.py @@ -16,8 +16,6 @@ from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage from PIL import UnidentifiedImageError from returns.result import Failure, Result, Success -from konabot.common.nb.exc import BotExceptionMessage - async def download_image_bytes(url: str) -> Result[bytes, str]: # if "/matcha/cache/" in url: diff --git a/konabot/common/nb/qq_broadcast.py b/konabot/common/nb/qq_broadcast.py new file mode 100644 index 0000000..8adab2f --- /dev/null +++ b/konabot/common/nb/qq_broadcast.py @@ -0,0 +1,30 @@ +from typing import Any, cast +import nonebot +from nonebot_plugin_alconna import UniMessage +from nonebot.adapters.onebot.v11 import Bot as OBBot + + +async def qq_broadcast(groups: list[str], msg: UniMessage[Any]): + bots: dict[str, OBBot] = {} + + # group_id -> bot_id + availabilities: dict[str, str] = {} + + for bot_id, bot in nonebot.get_bots().items(): + if not isinstance(bot, OBBot): + continue + bots[bot_id] = bot + gl = await bot.get_group_list() + for g in gl: + gid = str(g.get("group_id", -1)) + if gid in groups: + availabilities[gid] = bot_id + + for group in groups: + if group in availabilities: + bot = bots[availabilities[group]] + await bot.send_group_msg( + group_id=int(group), + message=cast(Any, await msg.export(bot)), + auto_escape=False, + ) diff --git a/konabot/common/username.py b/konabot/common/username.py new file mode 100644 index 0000000..c5209e4 --- /dev/null +++ b/konabot/common/username.py @@ -0,0 +1,54 @@ +import re +import nonebot + +from nonebot.adapters.onebot.v11 import Bot as OBBot + + +class UsernameManager: + grouped_data: dict[int, dict[int, str]] + individual_data: dict[int, str] + + def __init__(self) -> None: + self.grouped_data = {} + self.individual_data = {} + + async def update(self): + for bot in nonebot.get_bots().values(): + if isinstance(bot, OBBot): + for user in await bot.get_friend_list(): + uid = user["user_id"] + nickname = user["nickname"] + self.individual_data[uid] = nickname + for group in await bot.get_group_list(): + gid = group["group_id"] + for member in await bot.get_group_member_list(group_id=gid): + uid = member["user_id"] + card = member.get("card", "") + nickname = member.get("nickname", "") + if card: + self.grouped_data.setdefault(gid, {})[uid] = card + if nickname: + self.individual_data[uid] = nickname + + def get(self, qqid: int, groupid: int | None = None) -> str: + if groupid is not None and groupid in self.grouped_data: + n = self.grouped_data[groupid].get(qqid) + if n is not None: + return n + if qqid in self.individual_data: + return self.individual_data[qqid] + return str(qqid) + + +manager = UsernameManager() + +def get_username(qqid: int | str, group: int | str | None = None): + if isinstance(group, str): + group = None if not re.match(r"^\d+$", group) else int(group) + if isinstance(qqid, str): + if re.match(r"^\d+$", qqid): + qqid = int(qqid) + else: + return qqid + return manager.get(qqid, group) + diff --git a/konabot/docs/sys/konaph.txt b/konabot/docs/sys/konaph.txt new file mode 100644 index 0000000..f6a7a89 --- /dev/null +++ b/konabot/docs/sys/konaph.txt @@ -0,0 +1,4 @@ +指令介绍 + konaph - KonaBot 的 PuzzleHunt 管理工具 + +详细介绍请直接输入 konaph 获取使用指引(该指令权限仅对部分人开放。如果你有权限的话才有响应。建议在此方 BOT 私聊使用该指令。) diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py new file mode 100644 index 0000000..8b53ea6 --- /dev/null +++ b/konabot/plugins/kona_ph/__init__.py @@ -0,0 +1,155 @@ +from functools import reduce +from math import ceil +import re +from loguru import logger +from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna +from konabot.common.nb.qq_broadcast import qq_broadcast +from konabot.common.username import get_username +from konabot.plugins.kona_ph.core.storage import get_today_date +from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager +from konabot.common.longtask import DepLongTaskTarget + +from nonebot_plugin_apscheduler import scheduler + + +create_admin_commands() + + +async def is_play_group(target: DepLongTaskTarget): + if target.is_private_chat: + return True + if target.channel_id in config.plugin_puzzle_playgroup: + return True + return False + + +cmd_submit = on_alconna(Alconna( + "re:提交(?:答案|题解|[fF]lag)", + Args["flag", str], +), rule=is_play_group) + +@cmd_submit.handle() +async def _(flag: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + result = manager.submit(target.target_id, flag) + await target.send_message(result.get_unimessage()) + + +cmd_query = on_alconna(Alconna( + r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)" +), rule=is_play_group) + +@cmd_query.handle() +async def _(target: DepLongTaskTarget): + async with puzzle_manager() as manager: + p = manager.get_today_puzzle() + if p is None: + return await target.send_message("今天无题,改日再来吧!") + await target.send_message(p.get_unimessage()) + + +cmd_query_submission = on_alconna(Alconna( + "今日答题情况" +), rule=is_play_group) + +@cmd_query_submission.handle() +async def _(target: DepLongTaskTarget): + async with puzzle_manager() as manager: + p = manager.get_today_puzzle() + if p is None: + return await target.send_message("今天无题") + msg = UniMessage.text("==== 今日答题情况 ====\n\n") + + subcount = len(reduce( + lambda x, y: x + y, + manager.submissions.get(p.raw_id, {}).values(), + [], + )) + info = manager.daily_puzzle[p.index_id] + + msg = msg.text( + f"总体情况:答对 {len(info.success_users)} / " + f"参与 {len(info.tried_users)} / " + f"提交 {subcount}\n" + ) + + success_users = sorted(list(info.success_users.items()), key=lambda v: v[1]) + gid = None + if re.match(r"^\d+$", target.channel_id): + gid = int(target.channel_id) + for u, d in success_users: + uname = u + if re.match(r"^\d+$", u): + uname = get_username(int(u), gid) + t = d.strftime("%H:%M") + tries = len(manager.submissions[p.raw_id][u]) + msg = msg.text(f"\n- {uname} [Solved at {t} in {tries} times]") + for u in info.tried_users - set(info.success_users.keys()): + uname = u + if re.match(r"^\d+$", u): + uname = get_username(int(u), gid) + tries = len(manager.submissions[p.raw_id][u]) + msg = msg.text(f"\n- {uname} [Unsolved in {tries} times]") + + await target.send_message(msg) + + +cmd_history = on_alconna(Alconna( + "历史题目", + Args["page?", int], + Args["index_id?", str], +), rule=is_play_group) + +@cmd_history.handle() +async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): + async with puzzle_manager() as manager: + today = get_today_date() + if index_id: + index_id = index_id.removeprefix("#") + if index_id == manager.daily_puzzle_of_date.get(today, ""): + puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] + return await target.send_message(puzzle.get_unimessage()) + if index_id in manager.daily_puzzle: + puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] + msg = puzzle.get_unimessage() + msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}") + return await target.send_message(msg) + return await target.send_message("没有这道题哦") + msg = UniMessage.text("====== 历史题目清单 ======\n\n") + puzzles = [ + (manager.puzzle_data[manager.daily_puzzle[i].raw_id], d) + for d, i in manager.daily_puzzle_of_date.items() + ] + puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True) + count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) + if page <= 0 or page > count_pages: + return await target.send_message(UniMessage.text( + f"页数只有 1 ~ {count_pages} 啦!" + )) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + for p, d in puzzles: + info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]] + msg = msg.text( + f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]" + f" {p.title} ({d})" + ) + msg = msg.text("\n") + msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") + await target.send_message(msg) + + +@scheduler.scheduled_job("cron", hour="8") +async def _(): + async with puzzle_manager() as manager: + msg2 = manager.get_report_yesterday() + if msg2 is not None: + await qq_broadcast(config.plugin_puzzle_playgroup, msg2) + + puzzle = manager.get_today_puzzle() + if puzzle is not None: + logger.info(f"找到了题目 {puzzle.raw_id},发送") + await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage()) + else: + logger.info("自动任务:没有找到题目,跳过") + + diff --git a/konabot/plugins/kona_ph/core/__init__.py b/konabot/plugins/kona_ph/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/konabot/plugins/kona_ph/core/storage.py b/konabot/plugins/kona_ph/core/storage.py new file mode 100644 index 0000000..dd5a1fe --- /dev/null +++ b/konabot/plugins/kona_ph/core/storage.py @@ -0,0 +1,324 @@ +import asyncio +import datetime +import random + +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any + +import nanoid + +from nonebot_plugin_alconna import UniMessage +from pydantic import BaseModel, Field, ValidationError + +from konabot.common.path import DATA_PATH +from konabot.common.username import get_username + + +KONAPH_BASE = DATA_PATH / "KonaPH" +KONAPH_DATA_JSON = KONAPH_BASE / "data.json" +KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs" + +# 保证所有文件夹存在 +KONAPH_BASE.mkdir(exist_ok=True) +KONAPH_IMAGE_BASE.mkdir(exist_ok=True) + + +class Puzzle(BaseModel): + raw_id: str + "用于给出题者管理的 ID" + + index_id: str + "展出的 ID,以展出顺序为准" + + title: str + content: str + img_name: str + author_id: str + flag: str + + ready: bool = False + created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) + + def get_image_path(self) -> Path: + return KONAPH_IMAGE_BASE / self.img_name + + def get_unimessage(self) -> UniMessage[Any]: + result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}") + result = result.text(f"\n\n{self.content}") + + if self.img_name: + result = result.text("\n\n").image(raw=self.get_image_path().read_bytes()) + + result = result.text(f"\n\n出题者:{get_username(self.author_id)}") + result = result.text("\n\n输入「提交答案 答案」来提交你的解答") + + return result + + def add_image(self, img: bytes, suffix: str = ".png"): + if self.img_name: + self.get_image_path().unlink(True) + img_id = nanoid.generate( + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", + 21, + ) + self.img_name = f"{img_id}{suffix}" + self.get_image_path().write_bytes(img) + + def remove_image(self): + if self.img_name: + self.get_image_path().unlink(True) + self.img_name = "" + + +class PuzzleSubmission(BaseModel): + success: bool + flag: str + time: datetime.datetime + + +class DailyPuzzleInfo(BaseModel): + raw_id: str + time: datetime.date + tried_users: set[str] = set() + success_users: dict[str, datetime.datetime] = {} + + +class PuzzleSubmissionResultMessage(BaseModel): + success: bool + rank: int = -1 + message: str = "" + + def get_unimessage(self) -> UniMessage[Any]: + if self.success: + return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!") + return UniMessage.text(self.message) + + +def get_today_date() -> datetime.date: + now = datetime.datetime.now() + if now.hour < 8: + now -= datetime.timedelta(days=1) + return now.date() + + +class PuzzleManager(BaseModel): + puzzle_data: dict[str, Puzzle] = {} + + daily_puzzle: dict[str, DailyPuzzleInfo] = {} + daily_puzzle_of_date: dict[datetime.date, str] = {} + + puzzle_pinned: str = "" + + index_id_counter: int = 1 + submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {} + last_checked_date: datetime.date = Field( + default_factory=lambda: get_today_date() - datetime.timedelta(days=1) + ) + + @property + def last_publish_date(self): + return max(self.daily_puzzle_of_date.keys()) + + @property + def unpublished_puzzles(self): + return set(( + p.raw_id for p in self.puzzle_data.values() + if not self.is_puzzle_published(p.raw_id) and p.ready + )) + + @property + def unready_puzzles(self): + return set(( + p.raw_id for p in self.puzzle_data.values() + if not self.is_puzzle_published(p.raw_id) and not p.ready + )) + + @property + def published_puzzles(self): + return set(( + p.raw_id for p in self.puzzle_data.values() + if self.is_puzzle_published(p.raw_id) + )) + + def is_puzzle_published(self, raw_id: str): + return raw_id in [i.raw_id for i in self.daily_puzzle.values()] + + def publish_puzzle(self, raw_id: str): + assert raw_id in self.puzzle_data + + today = get_today_date() + + p = self.puzzle_data[raw_id] + p.index_id = str(self.index_id_counter) + p.ready = True + self.puzzle_pinned = "" + self.last_checked_date = today + self.daily_puzzle[p.index_id] = DailyPuzzleInfo( + raw_id=raw_id, + time=today, + ) + self.daily_puzzle_of_date[today] = p.index_id + + self.index_id_counter += 1 + + def fix(self): + # 尝试修复今日的数据 + for p in self.puzzle_data.values(): + if self.is_puzzle_published(p.raw_id): + p.ready = True + + if self.puzzle_pinned not in self.unpublished_puzzles: + self.puzzle_pinned = "" + + # 撤回重复发布的题 + already_published: set[str] = set() + for date in list(self.daily_puzzle_of_date.keys()): + index_id = self.daily_puzzle_of_date[date] + info = self.daily_puzzle[index_id] + if info.raw_id in already_published: + del self.daily_puzzle[index_id] + del self.daily_puzzle_of_date[date] + else: + already_published.add(info.raw_id) + + def admin_pin_puzzle(self, raw_id: str): + if raw_id in self.puzzle_data: + self.puzzle_pinned = raw_id + else: + self.puzzle_pinned = "" + + def get_today_puzzle(self, strong: bool = False) -> Puzzle | None: + today = get_today_date() + if today in self.daily_puzzle_of_date: + index_id = self.daily_puzzle_of_date[today] + info = self.daily_puzzle[index_id] + return self.puzzle_data[info.raw_id] + if today == self.last_checked_date and not strong: + return + self.last_checked_date = today + if self.puzzle_pinned and self.puzzle_pinned in self.puzzle_data: + d = self.puzzle_pinned + self.publish_puzzle(d) + self.puzzle_pinned = "" + return self.puzzle_data[d] + elif len(self.unpublished_puzzles) > 0: + d = random.choice(list(self.unpublished_puzzles)) + self.publish_puzzle(d) + return self.puzzle_data[d] + + def get_today_info(self) -> DailyPuzzleInfo | None: + p = self.get_today_puzzle() + if p is None: + return + return self.daily_puzzle[p.index_id] + + def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage: + p = self.get_today_puzzle() + d = self.get_today_info() + now = datetime.datetime.now() + if p is None or d is None: + return PuzzleSubmissionResultMessage( + success=False, + message="今天没有题哦,改天再来吧!", + ) + if user in d.success_users: + return PuzzleSubmissionResultMessage( + success=False, + message="你今天已经答对过啦!不用重复提交哦!", + ) + if flag != p.flag: + d.tried_users.add(user) + self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( + success=False, + flag=flag, + time=now, + )) + return PuzzleSubmissionResultMessage( + success=False, + message="❌ 答错了,请检查你的答案哦", + ) + d.tried_users.add(user) + d.success_users[user] = now + self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( + success=True, + flag=flag, + time=now, + )) + return PuzzleSubmissionResultMessage( + success=True, + rank=len(d.success_users), + ) + + def admin_create_puzzle(self, user: str): + p = Puzzle( + raw_id=nanoid.generate( + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", + 12, + ), + index_id="", + title="示例标题", + content="题目的内容填写内容", + img_name="", + author_id=user, + flag="konaph{this_is_a_flag}", + ready=False, + ) + self.puzzle_data[p.raw_id] = p + return p + + def get_puzzles_of_user(self, user: str): + return sorted([ + p for p in self.puzzle_data.values() + if p.author_id == user + ], key=lambda p: p.created_at, reverse=True) + + def get_report_yesterday(self): + yesterday = get_today_date() - datetime.timedelta(days=1) + index_id = self.daily_puzzle_of_date.get(yesterday) + if index_id is None: + return None + info = self.daily_puzzle[index_id] + puzzle = self.puzzle_data[info.raw_id] + message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告") + + if len(info.success_users) == 0: + message = message.text( + "\n\n昨日,竟无人解出此题!" + ) + else: + message = message.text( + f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:" + ) + us = [(u, d) for u, d in info.success_users.items()] + us = sorted(us, key=lambda t: t[1]) + us = us[:5] + for u, _ in us: + m = self.submissions[puzzle.raw_id][u][-1] + message = message.text(f"- {get_username(u)} 于 {m.time.strftime('%H:%M')}") + + message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}") + return message + + +lock = asyncio.Lock() + + +def read_data(): + try: + data_raw = KONAPH_DATA_JSON.read_text() + return PuzzleManager.model_validate_json(data_raw) + except (FileNotFoundError, ValidationError): + return PuzzleManager() + + +def write_data(data: PuzzleManager): + KONAPH_DATA_JSON.write_text(data.model_dump_json()) + + +@asynccontextmanager +async def puzzle_manager(): + async with lock: + data = read_data() + yield data + write_data(data) diff --git a/konabot/plugins/kona_ph/manager.py b/konabot/plugins/kona_ph/manager.py new file mode 100644 index 0000000..a6ba642 --- /dev/null +++ b/konabot/plugins/kona_ph/manager.py @@ -0,0 +1,329 @@ +import datetime +from math import ceil +from typing import Any +from nonebot import get_plugin_config +from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna +from pydantic import BaseModel + +from konabot.common.longtask import DepLongTaskTarget +from konabot.common.nb.extract_image import download_image_bytes +from konabot.common.nb.qq_broadcast import qq_broadcast +from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager + +PUZZLE_PAGE_SIZE = 10 + + +class PuzzleConfig(BaseModel): + plugin_puzzle_manager: list[str] = [] + plugin_puzzle_admin: list[str] = [] + plugin_puzzle_playgroup: list[str] = [] + + +config = get_plugin_config(PuzzleConfig) + + +def is_puzzle_manager(target: DepLongTaskTarget): + return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target) + + +def is_puzzle_admin(target: DepLongTaskTarget): + return target.target_id in config.plugin_puzzle_admin + + +def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]: + status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \ + (f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备") + + status_suffix = "" + if puzzle.raw_id == manager.puzzle_pinned: + status_suffix += " | 📌 已被管理员置顶" + + msg = UniMessage.text( + f"--- 谜题信息 ---\n" + f"Raw ID: {puzzle.raw_id}\n" + f"标题: {puzzle.title}\n" + f"出题者 ID: {puzzle.author_id}\n" + f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Flag: {puzzle.flag}\n" + f"状态: {status}{status_suffix}\n\n" + f"{puzzle.content}" + ) + + if puzzle.img_name: + msg = msg.image(raw=puzzle.get_image_path().read_bytes()) + + msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑") + + return msg + + +def create_admin_commands(): + cmd_admin = on_alconna( + Alconna( + "konaph", + Subcommand("create", dest="create"), + Subcommand("ready", Args["raw_id", str], dest="ready"), + Subcommand("unready", Args["raw_id", str], dest="unready"), + Subcommand("info", Args["raw_id", str], dest="info"), + Subcommand("my", Args["page?", int], dest="my"), + Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"), + Subcommand("pin", Args["raw_id?", str], dest="pin"), + Subcommand("unpin", dest="unpin"), + Subcommand( + "modify", + Args["raw_id?", str], + Option("--title", Args["title", str], alias=["-t"]), + Option("--description", Args["description", str], alias=["-d"]), + Option("--image", Args["image?", Image], alias=["-i"]), + Option("--flag", Args["flag", str], alias=["-f"]), + Option("--remove-image"), + dest="modify", + ), + Subcommand("publish", Args["raw_id?", str], dest="publish"), + ), + rule=is_puzzle_manager, + ) + + @cmd_admin.assign("$main") + async def _(target: DepLongTaskTarget): + msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n") + msg = msg.text("konaph create - 创建一个新的谜题\n") + msg = msg.text("konaph ready - 准备好一道谜题\n") + msg = msg.text("konaph unready - 取消准备一道谜题\n") + msg = msg.text("konaph info - 查看谜题\n") + msg = msg.text("konaph my - 查看我的谜题列表\n") + msg = msg.text("konaph modify - 查看如何修改谜题信息\n") + + if is_puzzle_admin(target): + msg = msg.text("konaph all [--ready] - 查看所有谜题\n") + msg = msg.text("konaph pin - 查看当前置顶谜题\n") + msg = msg.text("konaph pin - 置顶一个谜题\n") + msg = msg.text("konaph unpin - 取消置顶所有谜题\n") + msg = msg.text("konaph publish - 强制发题") + + await target.send_message(msg) + + @cmd_admin.assign("create") + async def _(target: DepLongTaskTarget): + async with puzzle_manager() as manager: + puzzle = manager.admin_create_puzzle(target.target_id) + await target.send_message(UniMessage.text( + f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n" + f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n" + f"- 输入 `konaph my` 查看你创建的谜题\n" + f"- 输入 `konaph modify` 查看更改谜题的方法" + )) + + @cmd_admin.assign("ready") + async def _(raw_id: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message(UniMessage.text( + "你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题" + )) + p = manager.puzzle_data[raw_id] + if p.author_id != target.target_id and not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题" + )) + if p.ready: + return await target.send_message(UniMessage.text( + "题目早就准备好啦!" + )) + p.ready = True + await target.send_message(UniMessage.text( + f"谜题「{p.title}」已经准备就绪!" + )) + + @cmd_admin.assign("unready") + async def _(raw_id: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message(UniMessage.text( + "你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题" + )) + p = manager.puzzle_data[raw_id] + if p.author_id != target.target_id and not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题" + )) + if not p.ready: + return await target.send_message(UniMessage.text( + f"谜题「{p.title}」已经是未取消状态了!" + )) + if manager.is_puzzle_published(p.raw_id): + return await target.send_message(UniMessage.text( + "已发布的谜题不能取消准备状态!" + )) + + p.ready = False + await target.send_message(UniMessage.text( + f"谜题「{p.title}」已经取消准备!" + )) + + @cmd_admin.assign("info") + async def _(raw_id: str, target: DepLongTaskTarget): + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message(UniMessage.text( + "你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题" + )) + p = manager.puzzle_data[raw_id] + if p.author_id != target.target_id and not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "这不是你的题,你没有权限查看详细信息!" + )) + + await target.send_message(get_puzzle_info_message(manager, p)) + + @cmd_admin.assign("my") + async def _(target: DepLongTaskTarget, page: int = 1): + async with puzzle_manager() as manager: + puzzles = manager.get_puzzles_of_user(target.target_id) + if len(puzzles) == 0: + return await target.send_message(UniMessage.text( + "你没有谜题哦,使用 `konaph create` 创建一个吧!" + )) + count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) + if page <= 0 or page > count_pages: + return await target.send_message(UniMessage.text( + f"页数只有 1 ~ {count_pages} 啦!" + )) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + message = UniMessage.text("==== 我的谜题 ====\n\n") + for p in puzzles: + message = message.text("- ") + if manager.puzzle_pinned == p.raw_id: + message = message.text("[📌]") + if manager.is_puzzle_published(p.raw_id): + message = message.text(f"[#{p.index_id}] ") + elif p.ready: + message = message.text("[✅] ") + else: + message = message.text("[⚙️] ") + message = message.text(f"{p.title} ({p.raw_id})") + message = message.text("\n") + message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") + await target.send_message(message) + + @cmd_admin.assign("all") + async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1): + if not is_puzzle_admin(target): + return await target.send_message(UniMessage.text("你没有权限查看所有的哦")) + async with puzzle_manager() as manager: + puzzles = [*manager.puzzle_data.values()] + if ready.available: + puzzles = [p for p in puzzles if p.ready] + puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True) + count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) + if page <= 0 or page > count_pages: + return await target.send_message(UniMessage.text( + f"页数只有 1 ~ {count_pages} 啦!" + )) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + message = UniMessage.text("==== 所有谜题 ====\n\n") + for p in puzzles: + message = message.text("- ") + if p.raw_id == manager.puzzle_pinned: + message = message.text("[📌]") + if manager.is_puzzle_published(p.raw_id): + message = message.text(f"[#{p.index_id}] ") + elif p.ready: + message = message.text("[✅] ") + else: + message = message.text("[⚙️] ") + message = message.text(f"{p.title} ({p.raw_id} by {p.author_id})") + message = message.text("\n") + message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") + await target.send_message(message) + + @cmd_admin.assign("pin") + async def _(target: DepLongTaskTarget, raw_id: str = ""): + if not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "你没有权限使用该指令" + )) + + async with puzzle_manager() as manager: + if raw_id == "": + if manager.puzzle_pinned: + return await target.send_message(UniMessage.text( + f"被 Pin 的谜题 ID = {manager.puzzle_pinned}" + )) + return await target.send_message("没有置顶谜题") + if raw_id not in manager.unpublished_puzzles: + return await target.send_message(UniMessage.text( + "这个谜题已经发布了,或者还没准备好,或者不存在" + )) + manager.admin_pin_puzzle(raw_id) + return await target.send_message(f"已置顶谜题 {raw_id}") + + @cmd_admin.assign("unpin") + async def _(target: DepLongTaskTarget): + if not is_puzzle_admin(target): + return await target.send_message(UniMessage.text( + "你没有权限使用该指令" + )) + async with puzzle_manager() as manager: + manager.admin_pin_puzzle("") + return await target.send_message("已取消所有置顶") + + @cmd_admin.assign("modify") + async def _( + target: DepLongTaskTarget, + raw_id: str = "", + title: str | None = None, + description: str | None = None, + flag: str | None = None, + image: Image | None = None, + remove_image: Query[bool] = Query("modify.remove-image"), + ): + if raw_id == "": + return await target.send_message( + "konaph modify - 修改一个谜题\n\n" + "支持的参数:\n" + " --title 标题\n" + " --description 题目详情描述(用直引号包裹以支持多行)\n" + " --flag flag\n" + " --image <图片> 图片\n" + " --remove-image 删除图片" + ) + + async with puzzle_manager() as manager: + if raw_id not in manager.puzzle_data: + return await target.send_message("没有这个谜题") + p = manager.puzzle_data[raw_id] + if not is_puzzle_admin(target) and target.target_id != p.author_id: + return await target.send_message("你没有权限编辑这个谜题") + if title is not None: + p.title = title + if description is not None: + p.content = description + if flag is not None: + p.flag = flag + if image is not None and image.url is not None: + b = await download_image_bytes(image.url) + p.add_image(b.unwrap()) + elif remove_image.available: + p.remove_image() + + info2 = get_puzzle_info_message(manager, p) + + return await target.send_message("修改好啦!看看效果:\n\n" + info2) + + @cmd_admin.assign("publish") + async def _(target: DepLongTaskTarget, raw_id: str | None = None): + today = get_today_date() + async with puzzle_manager() as manager: + if today in manager.daily_puzzle_of_date: + return await target.send_message("今日已经有题了哦") + manager.last_checked_date = today - datetime.timedelta(days=-1) + if raw_id is not None: + manager.admin_pin_puzzle(raw_id) + p = manager.get_today_puzzle(strong=True) + if p is None: + return await target.send_message("上架失败了orz,可能是没题了") + await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage()) + return await target.send_message("Ok!") + + return cmd_admin diff --git a/konabot/plugins/auto_accept.py b/konabot/plugins/routine.py similarity index 66% rename from konabot/plugins/auto_accept.py rename to konabot/plugins/routine.py index 3283f03..5305435 100644 --- a/konabot/plugins/auto_accept.py +++ b/konabot/plugins/routine.py @@ -4,10 +4,13 @@ from typing import cast from loguru import logger from nonebot import get_bot, on_request +import nonebot from nonebot.adapters.onebot.v11.event import FriendRequestEvent from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot +from nonebot_plugin_apscheduler import scheduler from konabot.common.nb.is_admin import cfg as adminConfig +from konabot.common.username import manager add_request = on_request() @@ -23,3 +26,15 @@ async def _(req: FriendRequestEvent): await req.approve(bot) logger.info(f"已经自动同意 {req.user_id} 的好友请求") +@scheduler.scheduled_job("cron", minute="*/5") +async def _(): + logger.info("尝试更新群成员信息") + await manager.update() + +driver = nonebot.get_driver() + +@driver.on_bot_connect +async def _(): + logger.info("有 Bot 连接,5 秒后试着更新群成员信息") + await asyncio.sleep(5) + await manager.update()