forked from mttu-developers/konabot
509 lines
21 KiB
Python
509 lines
21 KiB
Python
import datetime
|
||
from math import ceil
|
||
|
||
from nonebot.adapters import Event
|
||
from nonebot_plugin_alconna import (
|
||
Alconna,
|
||
Args,
|
||
Image,
|
||
Option,
|
||
Query,
|
||
Subcommand,
|
||
SubcommandResult,
|
||
UniMessage,
|
||
on_alconna,
|
||
)
|
||
|
||
from konabot.common.longtask import DepLongTaskTarget
|
||
from konabot.common.nb.exc import BotExceptionMessage
|
||
from konabot.common.nb.extract_image import download_image_bytes
|
||
from konabot.common.permsys import DepPermManager, require_permission
|
||
from konabot.common.username import get_username
|
||
from konabot.plugins.kona_ph.core.image import get_image_manager
|
||
from konabot.plugins.kona_ph.core.message import (
|
||
get_puzzle_description,
|
||
get_puzzle_hint_list,
|
||
get_puzzle_info_message,
|
||
get_submission_message,
|
||
)
|
||
from konabot.plugins.kona_ph.core.storage import (
|
||
Puzzle,
|
||
PuzzleHint,
|
||
PuzzleManager,
|
||
get_today_date,
|
||
puzzle_manager,
|
||
)
|
||
from konabot.plugins.poster.service import broadcast
|
||
|
||
PUZZLE_PAGE_SIZE = 10
|
||
|
||
|
||
async def check_puzzle(
|
||
manager: PuzzleManager,
|
||
perm: DepPermManager,
|
||
raw_id: str,
|
||
event: Event,
|
||
target: DepLongTaskTarget,
|
||
) -> Puzzle:
|
||
if raw_id not in manager.puzzle_data:
|
||
raise BotExceptionMessage("没有这个谜题")
|
||
puzzle = manager.puzzle_data[raw_id]
|
||
if await perm.check_has_permission(event, "konaph.admin"):
|
||
return puzzle
|
||
if target.target_id != puzzle.author_id:
|
||
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
|
||
return puzzle
|
||
|
||
|
||
def create_admin_commands():
|
||
cmd_admin = on_alconna(
|
||
Alconna(
|
||
"konaph",
|
||
Subcommand("create", dest="create"),
|
||
Subcommand("ready", Args["raw_id", str], dest="ready"),
|
||
Subcommand("unready", Args["raw_id", str], dest="unready"),
|
||
Subcommand("info", Args["raw_id", str], dest="info"),
|
||
Subcommand("my", Args["page?", int], dest="my"),
|
||
Subcommand(
|
||
"all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"
|
||
),
|
||
Subcommand("pin", Args["raw_id?", str], dest="pin"),
|
||
Subcommand("unpin", dest="unpin"),
|
||
Subcommand(
|
||
"modify",
|
||
Args["raw_id?", str],
|
||
Option("--title", Args["title", str], alias=["-t"]),
|
||
Option("--description", Args["description", str], alias=["-d"]),
|
||
Option("--image", Args["image?", Image], alias=["-i"]),
|
||
Option("--flag", Args["flag", str], alias=["-f"]),
|
||
Option("--remove-image"),
|
||
dest="modify",
|
||
),
|
||
Subcommand("publish", Args["raw_id?", str], dest="publish"),
|
||
Subcommand("preview", Args["raw_id", str], dest="preview"),
|
||
Subcommand("get-submits", Args["raw_id", str], dest="get-submits"),
|
||
Subcommand(
|
||
"test",
|
||
Args["raw_id", str],
|
||
Args["submission", str],
|
||
dest="test",
|
||
),
|
||
Subcommand(
|
||
"hint",
|
||
Subcommand(
|
||
"add",
|
||
Args["raw_id", str],
|
||
Args["pattern", str],
|
||
Args["message", str],
|
||
dest="add",
|
||
),
|
||
Subcommand(
|
||
"list",
|
||
Args["raw_id", str],
|
||
Args["page?", int],
|
||
dest="list",
|
||
),
|
||
Subcommand(
|
||
"modify",
|
||
Args["raw_id", str],
|
||
Args["hint_id", int],
|
||
Option("--pattern", Args["pattern", str], alias=["-p"]),
|
||
Option("--message", Args["message", str], alias=["-m"]),
|
||
Option("--checkpoint", Args["is_checkpoint", bool], alias=["-c"]),
|
||
dest="modify",
|
||
),
|
||
Subcommand(
|
||
"delete",
|
||
Args["raw_id", str],
|
||
Args["hint_id", int],
|
||
dest="delete",
|
||
),
|
||
dest="hint",
|
||
),
|
||
),
|
||
rule=require_permission("konaph.manager"),
|
||
)
|
||
|
||
@cmd_admin.assign("$main")
|
||
async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event):
|
||
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
|
||
msg = msg.text("konaph create - 创建一个新的谜题\n")
|
||
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
|
||
msg = msg.text("konaph unready <id> - 取消准备一道谜题\n")
|
||
msg = msg.text("konaph info <id> - 查看谜题\n")
|
||
msg = msg.text("konaph my <page?> - 查看我的谜题列表\n")
|
||
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
|
||
msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n")
|
||
msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n")
|
||
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
|
||
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
|
||
|
||
if await pm.check_has_permission(event, "konaph.admin"):
|
||
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
|
||
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
|
||
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
|
||
msg = msg.text("konaph unpin - 取消置顶所有谜题\n")
|
||
msg = msg.text("konaph publish <id?> - 强制发题")
|
||
|
||
await target.send_message(msg)
|
||
|
||
@cmd_admin.assign("create")
|
||
async def _(target: DepLongTaskTarget):
|
||
async with puzzle_manager() as manager:
|
||
puzzle = manager.admin_create_puzzle(target.target_id)
|
||
await target.send_message(
|
||
UniMessage.text(
|
||
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
||
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
||
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
||
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
||
)
|
||
)
|
||
|
||
@cmd_admin.assign("ready")
|
||
async def _(
|
||
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||
):
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
if p.ready:
|
||
return await target.send_message(UniMessage.text("题目早就准备好啦!"))
|
||
p.ready = True
|
||
await target.send_message(
|
||
UniMessage.text(f"谜题「{p.title}」已经准备就绪!")
|
||
)
|
||
|
||
@cmd_admin.assign("unready")
|
||
async def _(
|
||
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||
):
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
if not p.ready:
|
||
return await target.send_message(
|
||
UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!")
|
||
)
|
||
if manager.is_puzzle_published(p.raw_id):
|
||
return await target.send_message(
|
||
UniMessage.text("已发布的谜题不能取消准备状态!")
|
||
)
|
||
|
||
p.ready = False
|
||
await target.send_message(
|
||
UniMessage.text(f"谜题「{p.title}」已经取消准备!")
|
||
)
|
||
|
||
@cmd_admin.assign("info")
|
||
async def _(
|
||
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||
):
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
await target.send_message(get_puzzle_info_message(manager, p))
|
||
|
||
@cmd_admin.assign("my")
|
||
async def _(target: DepLongTaskTarget, page: int = 1):
|
||
async with puzzle_manager() as manager:
|
||
puzzles = manager.get_puzzles_of_user(target.target_id)
|
||
if len(puzzles) == 0:
|
||
return await target.send_message(
|
||
UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!")
|
||
)
|
||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||
if page <= 0 or page > count_pages:
|
||
return await target.send_message(
|
||
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||
)
|
||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||
message = UniMessage.text("==== 我的谜题 ====\n\n")
|
||
for p in puzzles:
|
||
message = message.text("- ")
|
||
if manager.puzzle_pinned == p.raw_id:
|
||
message = message.text("[📌]")
|
||
if manager.is_puzzle_published(p.raw_id):
|
||
message = message.text(f"[✨][#{p.index_id}] ")
|
||
elif p.ready:
|
||
message = message.text("[✅] ")
|
||
else:
|
||
message = message.text("[⚙️] ")
|
||
message = message.text(f"{p.title} ({p.raw_id})")
|
||
message = message.text("\n")
|
||
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||
await target.send_message(message)
|
||
|
||
@cmd_admin.assign("all")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
event: Event,
|
||
perm: DepPermManager,
|
||
ready: Query[bool] = Query("all.ready"),
|
||
page: int = 1,
|
||
):
|
||
if not perm.check_has_permission(event, "konaph.admin"):
|
||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||
async with puzzle_manager() as manager:
|
||
puzzles = [*manager.puzzle_data.values()]
|
||
if ready.available:
|
||
puzzles = [p for p in puzzles if p.ready]
|
||
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
|
||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||
if page <= 0 or page > count_pages:
|
||
return await target.send_message(
|
||
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||
)
|
||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||
message = UniMessage.text("==== 所有谜题 ====\n\n")
|
||
for p in puzzles:
|
||
message = message.text("- ")
|
||
if p.raw_id == manager.puzzle_pinned:
|
||
message = message.text("[📌]")
|
||
if manager.is_puzzle_published(p.raw_id):
|
||
message = message.text(f"[✨][#{p.index_id}] ")
|
||
elif p.ready:
|
||
message = message.text("[✅] ")
|
||
else:
|
||
message = message.text("[⚙️] ")
|
||
message = message.text(f"{p.title} ({p.raw_id} by {p.author_id})")
|
||
message = message.text("\n")
|
||
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||
await target.send_message(message)
|
||
|
||
@cmd_admin.assign("pin")
|
||
async def _(
|
||
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = ""
|
||
):
|
||
if not perm.check_has_permission(event, "konaph.admin"):
|
||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||
|
||
async with puzzle_manager() as manager:
|
||
if raw_id == "":
|
||
if manager.puzzle_pinned:
|
||
return await target.send_message(
|
||
UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}")
|
||
)
|
||
return await target.send_message("没有置顶谜题")
|
||
if raw_id not in manager.unpublished_puzzles:
|
||
return await target.send_message(
|
||
UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在")
|
||
)
|
||
manager.admin_pin_puzzle(raw_id)
|
||
return await target.send_message(f"已置顶谜题 {raw_id}")
|
||
|
||
@cmd_admin.assign("unpin")
|
||
async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager):
|
||
if not perm.check_has_permission(event, "konaph.admin"):
|
||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||
async with puzzle_manager() as manager:
|
||
manager.admin_pin_puzzle("")
|
||
return await target.send_message("已取消所有置顶")
|
||
|
||
@cmd_admin.assign("modify")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
event: Event,
|
||
perm: DepPermManager,
|
||
raw_id: str = "",
|
||
title: str | None = None,
|
||
description: str | None = None,
|
||
flag: str | None = None,
|
||
image: Image | None = None,
|
||
remove_image: Query[bool] = Query("modify.remove-image"),
|
||
):
|
||
if raw_id == "":
|
||
return await target.send_message(
|
||
"konaph modify <raw_id> - 修改一个谜题\n\n"
|
||
"支持的参数:\n"
|
||
" --title <str> 标题\n"
|
||
" --description <str> 题目详情描述(用直引号包裹以支持多行)\n"
|
||
" --flag <str> flag,也就是题目的答案\n"
|
||
" --image <图片> 图片\n"
|
||
" --remove-image 删除图片"
|
||
)
|
||
image_manager = get_image_manager()
|
||
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
if title is not None:
|
||
p.title = title
|
||
if description is not None:
|
||
p.content = description
|
||
if flag is not None:
|
||
p.flag = flag.strip()
|
||
if flag.strip() != flag:
|
||
await target.send_message(
|
||
"⚠️ 注意:你输入的 Flag 含有开头或结尾的空格,已经帮你去除"
|
||
)
|
||
if image is not None and image.url is not None:
|
||
b = await download_image_bytes(image.url)
|
||
image_manager.remove_puzzle_image(p.img_name)
|
||
p.img_name = image_manager.upload_puzzle_image(b.unwrap())
|
||
elif remove_image.available:
|
||
image_manager.remove_puzzle_image(p.img_name)
|
||
|
||
info2 = get_puzzle_info_message(manager, p)
|
||
|
||
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
|
||
|
||
@cmd_admin.assign("publish")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
event: Event,
|
||
perm: DepPermManager,
|
||
raw_id: str | None = None,
|
||
):
|
||
if not perm.check_has_permission(event, "konaph.admin"):
|
||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||
today = get_today_date()
|
||
async with puzzle_manager() as manager:
|
||
if today in manager.daily_puzzle_of_date:
|
||
return await target.send_message("今日已经有题了哦")
|
||
manager.last_checked_date = today - datetime.timedelta(days=-1)
|
||
if raw_id is not None:
|
||
manager.admin_pin_puzzle(raw_id)
|
||
p = manager.get_today_puzzle(strong=True)
|
||
if p is None:
|
||
return await target.send_message("上架失败了orz,可能是没题了")
|
||
await broadcast("每日谜题", get_puzzle_description(p))
|
||
return await target.send_message("Ok!")
|
||
|
||
@cmd_admin.assign("preview")
|
||
async def _(
|
||
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
|
||
):
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
return await target.send_message(get_puzzle_description(p))
|
||
|
||
@cmd_admin.assign("get-submits")
|
||
async def _(
|
||
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
|
||
):
|
||
async with puzzle_manager() as manager:
|
||
puzzle = manager.puzzle_data.get(raw_id)
|
||
if puzzle is None:
|
||
return await target.send_message("没有这个谜题")
|
||
if (
|
||
not perm.check_has_permission(event, "konaph.admin")
|
||
and target.target_id != puzzle.author_id
|
||
):
|
||
return await target.send_message("你没有权限预览这个谜题")
|
||
|
||
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
|
||
submits = manager.submissions.get(raw_id, {})
|
||
for uid, ls in submits.items():
|
||
s = ", ".join((i.flag for i in ls))
|
||
msg = msg.text(f"- {get_username(uid)}:{s}\n")
|
||
return await target.send_message(msg)
|
||
|
||
@cmd_admin.assign("test")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
raw_id: str,
|
||
submission: str,
|
||
event: Event,
|
||
perm: DepPermManager,
|
||
):
|
||
"""
|
||
测试一道谜题的回答,并给出结果
|
||
"""
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
result = p.check_submission(submission)
|
||
msg = get_submission_message(p, result)
|
||
return await target.send_message("[测试提交] " + msg)
|
||
|
||
@cmd_admin.assign("subcommands.hint")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
subcommands: Query[SubcommandResult] = Query("subcommands.hint"),
|
||
):
|
||
if len(subcommands.result.subcommands) > 0:
|
||
return
|
||
return await target.send_message(
|
||
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
|
||
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
|
||
.text(
|
||
"- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n"
|
||
)
|
||
.text("- konaph hint modify <id> <hint_id>\n")
|
||
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
|
||
.text(" - --message <message>\n - 更改提示文本\n")
|
||
.text(" - --checkpoint [True|False]\n - 更改是否为中间答案\n")
|
||
.text("- konaph hint delete <id> <hint_id>\n - 删除一个提示 / 中间答案\n")
|
||
.text("\n更多关于 pattern 和中间答案的信息,请见 man:中间答案(7)")
|
||
)
|
||
|
||
@cmd_admin.assign("subcommands.hint.add")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
raw_id: str,
|
||
pattern: str,
|
||
message: str,
|
||
event: Event,
|
||
perm: DepPermManager,
|
||
):
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
p.hints[p.hint_id_max + 1] = PuzzleHint(
|
||
pattern=pattern,
|
||
message=message,
|
||
is_checkpoint=False,
|
||
)
|
||
await target.send_message("创建成功!\n\n" + get_puzzle_hint_list(p))
|
||
|
||
@cmd_admin.assign("subcommands.hint.list")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
raw_id: str,
|
||
event: Event,
|
||
perm: DepPermManager,
|
||
):
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
await target.send_message(get_puzzle_hint_list(p))
|
||
|
||
@cmd_admin.assign("subcommands.hint.modify")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
raw_id: str,
|
||
hint_id: int,
|
||
event: Event,
|
||
perm: DepPermManager,
|
||
pattern: str | None = None,
|
||
message: str | None = None,
|
||
is_checkpoint: bool | None = None,
|
||
):
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
if hint_id not in p.hints:
|
||
raise BotExceptionMessage(
|
||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||
)
|
||
hint = p.hints[hint_id]
|
||
if pattern is not None:
|
||
hint.pattern = pattern
|
||
if message is not None:
|
||
hint.message = message
|
||
if is_checkpoint is not None:
|
||
hint.is_checkpoint = is_checkpoint
|
||
await target.send_message("更改成功!\n\n" + get_puzzle_hint_list(p))
|
||
|
||
@cmd_admin.assign("subcommands.hint.delete")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
raw_id: str,
|
||
hint_id: int,
|
||
event: Event,
|
||
perm: DepPermManager,
|
||
):
|
||
async with puzzle_manager() as manager:
|
||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||
if hint_id not in p.hints:
|
||
raise BotExceptionMessage(
|
||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||
)
|
||
del p.hints[hint_id]
|
||
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
|
||
|
||
return cmd_admin
|