diff --git a/konabot/plugins/kona_ph/manager.py b/konabot/plugins/kona_ph/manager.py index 4eada29..5c181fb 100644 --- a/konabot/plugins/kona_ph/manager.py +++ b/konabot/plugins/kona_ph/manager.py @@ -2,22 +2,39 @@ import datetime from math import ceil from nonebot import get_plugin_config -from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query, - Subcommand, SubcommandResult, UniMessage, - on_alconna) +from nonebot.adapters import Event +from nonebot_plugin_alconna import ( + Alconna, + Args, + Image, + Option, + Query, + Subcommand, + SubcommandResult, + UniMessage, + on_alconna, +) from pydantic import BaseModel from konabot.common.longtask import DepLongTaskTarget from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.extract_image import download_image_bytes +from konabot.common.permsys import DepPermManager, require_permission from konabot.common.username import get_username from konabot.plugins.kona_ph.core.image import get_image_manager -from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list, - get_puzzle_info_message, - get_submission_message) -from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager, - get_today_date, - puzzle_manager) +from konabot.plugins.kona_ph.core.message import ( + get_puzzle_description, + get_puzzle_hint_list, + get_puzzle_info_message, + get_submission_message, +) +from konabot.plugins.kona_ph.core.storage import ( + Puzzle, + PuzzleHint, + PuzzleManager, + get_today_date, + puzzle_manager, +) from konabot.plugins.poster.service import broadcast PUZZLE_PAGE_SIZE = 10 @@ -32,19 +49,17 @@ class PuzzleConfig(BaseModel): config = get_plugin_config(PuzzleConfig) -def is_puzzle_manager(target: DepLongTaskTarget): - return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target) - - -def is_puzzle_admin(target: DepLongTaskTarget): - return target.target_id in config.plugin_puzzle_admin - - -def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle: +async def check_puzzle( + manager: PuzzleManager, + perm: DepPermManager, + raw_id: str, + event: Event, + target: DepLongTaskTarget, +) -> Puzzle: if raw_id not in manager.puzzle_data: raise BotExceptionMessage("没有这个谜题") puzzle = manager.puzzle_data[raw_id] - if is_puzzle_admin(target): + if await perm.check_has_permission(event, "konaph.admin"): return puzzle if target.target_id != puzzle.author_id: raise BotExceptionMessage("你没有权限查看或编辑这个谜题") @@ -60,7 +75,9 @@ def create_admin_commands(): Subcommand("unready", Args["raw_id", str], dest="unready"), Subcommand("info", Args["raw_id", str], dest="info"), Subcommand("my", Args["page?", int], dest="my"), - Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"), + Subcommand( + "all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all" + ), Subcommand("pin", Args["raw_id?", str], dest="pin"), Subcommand("unpin", dest="unpin"), Subcommand( @@ -115,11 +132,11 @@ def create_admin_commands(): dest="hint", ), ), - rule=is_puzzle_manager, + rule=require_permission("konaph.manager"), ) @cmd_admin.assign("$main") - async def _(target: DepLongTaskTarget): + async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event): msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n") msg = msg.text("konaph create - 创建一个新的谜题\n") msg = msg.text("konaph ready - 准备好一道谜题\n") @@ -132,7 +149,7 @@ def create_admin_commands(): msg = msg.text("konaph test - 尝试提交一个答案,看回答的效果\n") msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n") - if is_puzzle_admin(target): + if await pm.check_has_permission(event, "konaph.admin"): msg = msg.text("konaph all [--ready] - 查看所有谜题\n") msg = msg.text("konaph pin - 查看当前置顶谜题\n") msg = msg.text("konaph pin - 置顶一个谜题\n") @@ -145,48 +162,54 @@ def create_admin_commands(): async def _(target: DepLongTaskTarget): async with puzzle_manager() as manager: puzzle = manager.admin_create_puzzle(target.target_id) - await target.send_message(UniMessage.text( - f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n" - f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n" - f"- 输入 `konaph my` 查看你创建的谜题\n" - f"- 输入 `konaph modify` 查看更改谜题的方法" - )) + await target.send_message( + UniMessage.text( + f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n" + f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n" + f"- 输入 `konaph my` 查看你创建的谜题\n" + f"- 输入 `konaph modify` 查看更改谜题的方法" + ) + ) @cmd_admin.assign("ready") - async def _(raw_id: str, target: DepLongTaskTarget): + async def _( + raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager + ): async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) if p.ready: - return await target.send_message(UniMessage.text( - "题目早就准备好啦!" - )) + return await target.send_message(UniMessage.text("题目早就准备好啦!")) p.ready = True - await target.send_message(UniMessage.text( - f"谜题「{p.title}」已经准备就绪!" - )) + await target.send_message( + UniMessage.text(f"谜题「{p.title}」已经准备就绪!") + ) @cmd_admin.assign("unready") - async def _(raw_id: str, target: DepLongTaskTarget): + async def _( + raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager + ): async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) if not p.ready: - return await target.send_message(UniMessage.text( - f"谜题「{p.title}」已经是未取消状态了!" - )) + return await target.send_message( + UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!") + ) if manager.is_puzzle_published(p.raw_id): - return await target.send_message(UniMessage.text( - "已发布的谜题不能取消准备状态!" - )) + return await target.send_message( + UniMessage.text("已发布的谜题不能取消准备状态!") + ) p.ready = False - await target.send_message(UniMessage.text( - f"谜题「{p.title}」已经取消准备!" - )) + await target.send_message( + UniMessage.text(f"谜题「{p.title}」已经取消准备!") + ) @cmd_admin.assign("info") - async def _(raw_id: str, target: DepLongTaskTarget): + async def _( + raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager + ): async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) await target.send_message(get_puzzle_info_message(manager, p)) @cmd_admin.assign("my") @@ -194,15 +217,15 @@ def create_admin_commands(): async with puzzle_manager() as manager: puzzles = manager.get_puzzles_of_user(target.target_id) if len(puzzles) == 0: - return await target.send_message(UniMessage.text( - "你没有谜题哦,使用 `konaph create` 创建一个吧!" - )) + return await target.send_message( + UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!") + ) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) if page <= 0 or page > count_pages: - return await target.send_message(UniMessage.text( - f"页数只有 1 ~ {count_pages} 啦!" - )) - puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + return await target.send_message( + UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!") + ) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE] message = UniMessage.text("==== 我的谜题 ====\n\n") for p in puzzles: message = message.text("- ") @@ -220,11 +243,15 @@ def create_admin_commands(): await target.send_message(message) @cmd_admin.assign("all") - async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1): - if not is_puzzle_admin(target): - return await target.send_message(UniMessage.text( - "你没有权限使用该指令" - )) + async def _( + target: DepLongTaskTarget, + event: Event, + perm: DepPermManager, + ready: Query[bool] = Query("all.ready"), + page: int = 1, + ): + if not perm.check_has_permission(event, "konaph.admin"): + return await target.send_message(UniMessage.text("你没有权限使用该指令")) async with puzzle_manager() as manager: puzzles = [*manager.puzzle_data.values()] if ready.available: @@ -232,10 +259,10 @@ def create_admin_commands(): puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) if page <= 0 or page > count_pages: - return await target.send_message(UniMessage.text( - f"页数只有 1 ~ {count_pages} 啦!" - )) - puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] + return await target.send_message( + UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!") + ) + puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE] message = UniMessage.text("==== 所有谜题 ====\n\n") for p in puzzles: message = message.text("- ") @@ -253,32 +280,30 @@ def create_admin_commands(): await target.send_message(message) @cmd_admin.assign("pin") - async def _(target: DepLongTaskTarget, raw_id: str = ""): - if not is_puzzle_admin(target): - return await target.send_message(UniMessage.text( - "你没有权限使用该指令" - )) + async def _( + target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = "" + ): + if not perm.check_has_permission(event, "konaph.admin"): + return await target.send_message(UniMessage.text("你没有权限使用该指令")) async with puzzle_manager() as manager: if raw_id == "": if manager.puzzle_pinned: - return await target.send_message(UniMessage.text( - f"被 Pin 的谜题 ID = {manager.puzzle_pinned}" - )) + return await target.send_message( + UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}") + ) return await target.send_message("没有置顶谜题") if raw_id not in manager.unpublished_puzzles: - return await target.send_message(UniMessage.text( - "这个谜题已经发布了,或者还没准备好,或者不存在" - )) + return await target.send_message( + UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在") + ) manager.admin_pin_puzzle(raw_id) return await target.send_message(f"已置顶谜题 {raw_id}") @cmd_admin.assign("unpin") - async def _(target: DepLongTaskTarget): - if not is_puzzle_admin(target): - return await target.send_message(UniMessage.text( - "你没有权限使用该指令" - )) + async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager): + if not perm.check_has_permission(event, "konaph.admin"): + return await target.send_message(UniMessage.text("你没有权限使用该指令")) async with puzzle_manager() as manager: manager.admin_pin_puzzle("") return await target.send_message("已取消所有置顶") @@ -286,6 +311,8 @@ def create_admin_commands(): @cmd_admin.assign("modify") async def _( target: DepLongTaskTarget, + event: Event, + perm: DepPermManager, raw_id: str = "", title: str | None = None, description: str | None = None, @@ -306,7 +333,7 @@ def create_admin_commands(): image_manager = get_image_manager() async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) if title is not None: p.title = title if description is not None: @@ -329,11 +356,14 @@ def create_admin_commands(): return await target.send_message("修改好啦!看看效果:\n\n" + info2) @cmd_admin.assign("publish") - async def _(target: DepLongTaskTarget, raw_id: str | None = None): - if not is_puzzle_admin(target): - return await target.send_message(UniMessage.text( - "你没有权限使用该指令" - )) + async def _( + target: DepLongTaskTarget, + event: Event, + perm: DepPermManager, + raw_id: str | None = None, + ): + if not perm.check_has_permission(event, "konaph.admin"): + return await target.send_message(UniMessage.text("你没有权限使用该指令")) today = get_today_date() async with puzzle_manager() as manager: if today in manager.daily_puzzle_of_date: @@ -348,46 +378,64 @@ def create_admin_commands(): return await target.send_message("Ok!") @cmd_admin.assign("preview") - async def _(target: DepLongTaskTarget, raw_id: str): + async def _( + target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str + ): async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) return await target.send_message(get_puzzle_description(p)) @cmd_admin.assign("get-submits") - async def _(target: DepLongTaskTarget, raw_id: str): + async def _( + target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str + ): async with puzzle_manager() as manager: puzzle = manager.puzzle_data.get(raw_id) if puzzle is None: return await target.send_message("没有这个谜题") - if not is_puzzle_admin(target) and target.target_id != puzzle.author_id: + if ( + not perm.check_has_permission(event, "konaph.admin") + and target.target_id != puzzle.author_id + ): return await target.send_message("你没有权限预览这个谜题") msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n") submits = manager.submissions.get(raw_id, {}) for uid, ls in submits.items(): - s = ', '.join((i.flag for i in ls)) + s = ", ".join((i.flag for i in ls)) msg = msg.text(f"- {get_username(uid)}:{s}\n") return await target.send_message(msg) @cmd_admin.assign("test") - async def _(target: DepLongTaskTarget, raw_id: str, submission: str): + async def _( + target: DepLongTaskTarget, + raw_id: str, + submission: str, + event: Event, + perm: DepPermManager, + ): """ 测试一道谜题的回答,并给出结果 """ async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) result = p.check_submission(submission) msg = get_submission_message(p, result) return await target.send_message("[测试提交] " + msg) @cmd_admin.assign("subcommands.hint") - async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")): + async def _( + target: DepLongTaskTarget, + subcommands: Query[SubcommandResult] = Query("subcommands.hint"), + ): if len(subcommands.result.subcommands) > 0: return return await target.send_message( UniMessage.text("==== 提示/中间答案编辑器 ====\n\n") .text("- konaph hint list \n - 查看某道题的所有提示 / 中间答案\n") - .text("- konaph hint add \n - 添加一个提示 / 中间答案\n") + .text( + "- konaph hint add \n - 添加一个提示 / 中间答案\n" + ) .text("- konaph hint modify \n") .text(" - --pattern \n - 更改匹配规则\n") .text(" - --message \n - 更改提示文本\n") @@ -402,9 +450,11 @@ def create_admin_commands(): raw_id: str, pattern: str, message: str, + event: Event, + perm: DepPermManager, ): async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) p.hints[p.hint_id_max + 1] = PuzzleHint( pattern=pattern, message=message, @@ -416,9 +466,11 @@ def create_admin_commands(): async def _( target: DepLongTaskTarget, raw_id: str, + event: Event, + perm: DepPermManager, ): async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) await target.send_message(get_puzzle_hint_list(p)) @cmd_admin.assign("subcommands.hint.modify") @@ -426,12 +478,14 @@ def create_admin_commands(): target: DepLongTaskTarget, raw_id: str, hint_id: int, + event: Event, + perm: DepPermManager, pattern: str | None = None, message: str | None = None, is_checkpoint: bool | None = None, ): async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) if hint_id not in p.hints: raise BotExceptionMessage( f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单" @@ -450,9 +504,11 @@ def create_admin_commands(): target: DepLongTaskTarget, raw_id: str, hint_id: int, + event: Event, + perm: DepPermManager, ): async with puzzle_manager() as manager: - p = check_puzzle(manager, target, raw_id) + p = await check_puzzle(manager, perm, raw_id, event, target) if hint_id not in p.hints: raise BotExceptionMessage( f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单" @@ -460,5 +516,4 @@ def create_admin_commands(): del p.hints[hint_id] await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p)) - return cmd_admin