部分解耦了 konaph 的一些层

This commit is contained in:
2025-11-01 17:52:05 +08:00
parent adfbac7d90
commit 01fe33eb9f
12 changed files with 411 additions and 242 deletions

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.REPL.enableREPLSmartSend": false
}

View File

@ -1,10 +1,13 @@
from typing import Any, cast from typing import Any, cast
import nonebot import nonebot
from nonebot_plugin_alconna import UniMessage
from nonebot.adapters.onebot.v11 import Bot as OBBot from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot_plugin_alconna import UniMessage
async def qq_broadcast(groups: list[str], msg: UniMessage[Any]): async def qq_broadcast(groups: list[str], msg: UniMessage[Any] | str):
if isinstance(msg, str):
msg = UniMessage.text(msg)
bots: dict[str, OBBot] = {} bots: dict[str, OBBot] = {}
# group_id -> bot_id # group_id -> bot_id

View File

@ -0,0 +1,9 @@
关于「中间答案」或者「提示」:
在 KonaPH 中,当有人发送「提交答案 答案」时,会检查答案是否符合你设置的中间答案的 pattern。这个 pattern 可以有两种方式:
- 纯文本的完整匹配:你设置的 pattern 如果和提交的答案完全相等,则会触发提示。
- regex 匹配:你设置的 pattern 如果以斜杠(/)开头和结尾,就会检查提交的答案是否匹配正则表达式。注意 ^ 和 $ 符号的使用。
- 例如:/^commit$/ 会匹配 commit不会匹配 acommit、Commit 等。
- 而如果是 /commit/,则会匹配 commit、acommit而不会匹配 Commit。
- 无法使用 Javascript 的字符串声明模式,例如,/case_insensitive/i 就不会被视作一个正则表达式。

View File

@ -1,17 +1,23 @@
from functools import reduce import datetime
from math import ceil
import re import re
from math import ceil
from loguru import logger from loguru import logger
from nonebot import on_message from nonebot import on_message
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
from konabot.common.nb.qq_broadcast import qq_broadcast on_alconna)
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
from konabot.common.longtask import DepLongTaskTarget
from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_apscheduler import scheduler
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
create_admin_commands,
puzzle_manager)
create_admin_commands() create_admin_commands()
@ -24,18 +30,6 @@ async def is_play_group(target: DepLongTaskTarget):
return False return False
# cmd_submit = on_alconna(Alconna(
# "re:提交(?:答案|题解|[fF]lag)",
# Args["flag", str],
# ), rule=is_play_group)
#
# @cmd_submit.handle()
# async def _(flag: str, target: DepLongTaskTarget):
# async with puzzle_manager() as manager:
# result = manager.submit(target.target_id, flag)
# await target.send_message(result.get_unimessage())
cmd_submit = on_message(rule=is_play_group) cmd_submit = on_message(rule=is_play_group)
@ -46,7 +40,14 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
submission: str = match.group("submission") submission: str = match.group("submission")
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
result = manager.submit(target.target_id, submission) result = manager.submit(target.target_id, submission)
await target.send_message(result.get_unimessage()) if isinstance(result, str):
await target.send_message(result)
else:
await target.send_message(get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
))
cmd_query = on_alconna(Alconna( cmd_query = on_alconna(Alconna(
@ -59,7 +60,7 @@ async def _(target: DepLongTaskTarget):
p = manager.get_today_puzzle() p = manager.get_today_puzzle()
if p is None: if p is None:
return await target.send_message("今天无题,改日再来吧!") return await target.send_message("今天无题,改日再来吧!")
await target.send_message(p.get_unimessage()) await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna( cmd_query_submission = on_alconna(Alconna(
@ -68,44 +69,11 @@ cmd_query_submission = on_alconna(Alconna(
@cmd_query_submission.handle() @cmd_query_submission.handle()
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget):
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = manager.get_today_puzzle() await target.send_message(get_daily_report_v2(manager, gid))
if p is None:
return await target.send_message("今天无题")
msg = UniMessage.text("==== 今日答题情况 ====\n\n")
subcount = len(reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg = msg.text(
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]")
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [💦 {tries} 提交]")
await target.send_message(msg)
cmd_history = on_alconna(Alconna( cmd_history = on_alconna(Alconna(
@ -120,15 +88,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
today = get_today_date() today = get_today_date()
if index_id: if index_id:
index_id = index_id.removeprefix("#") index_id = index_id.removeprefix("#")
if index_id == manager.daily_puzzle_of_date.get(today, ""): if index_id not in manager.daily_puzzle:
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] return await target.send_message("没有这道题哦")
return await target.send_message(puzzle.get_unimessage()) puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
if index_id in manager.daily_puzzle: msg = get_puzzle_description(
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] puzzle,
msg = puzzle.get_unimessage() with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")),
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}") )
return await target.send_message(msg) return await target.send_message(msg)
return await target.send_message("没有这道题哦")
msg = UniMessage.text("====== 历史题目清单 ======\n\n") msg = UniMessage.text("====== 历史题目清单 ======\n\n")
puzzles = [ puzzles = [
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d) (manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
@ -155,15 +122,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@scheduler.scheduled_job("cron", hour="8") @scheduler.scheduled_job("cron", hour="8")
async def _(): async def _():
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
msg2 = manager.get_report_yesterday() yesterday = get_today_date() - datetime.timedelta(days=1)
msg2 = get_daily_report(manager, yesterday)
if msg2 is not None: if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2) await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
puzzle = manager.get_today_puzzle() puzzle = manager.get_today_puzzle()
if puzzle is not None: if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送") logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage()) await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
else: else:
logger.info("自动任务:没有找到题目,跳过") logger.info("自动任务:没有找到题目,跳过")

View File

@ -0,0 +1,29 @@
import nanoid
from konabot.common.path import ASSETS_PATH
from konabot.plugins.kona_ph.core.path import KONAPH_IMAGE_BASE
class PuzzleImageManager:
def read_puzzle_image(self, img_name: str) -> bytes:
fp = KONAPH_IMAGE_BASE / img_name
if fp.exists():
return fp.read_bytes()
return (ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes()
def upload_puzzle_image(self, data: bytes, suffix: str = ".png") -> str:
id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
)
img_name = f"{id}{suffix}"
(KONAPH_IMAGE_BASE / img_name).write_bytes(data)
return img_name
def remove_puzzle_image(self, img_name: str):
if img_name:
(KONAPH_IMAGE_BASE / img_name).unlink(True)
def get_image_manager() -> PuzzleImageManager:
return PuzzleImageManager()

View File

@ -0,0 +1,164 @@
"""
生成各种各样的 Message 的函数集合
"""
import datetime
import functools
import re
from typing import Any
from nonebot_plugin_alconna import UniMessage
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.storage import (DailyPuzzleInfo, Puzzle,
PuzzleManager,
PuzzleSubmission)
def get_puzzle_description(puzzle: Puzzle, with_answer: bool = False) -> UniMessage[Any]:
"""
获取一个谜题的描述
"""
img_manager = get_image_manager()
result = UniMessage.text(f"[KonaPH#{puzzle.index_id}] {puzzle.title}")
result = result.text(f"\n\n{puzzle.content}")
if puzzle.img_name:
result = result.text("\n\n").image(
raw=img_manager.read_puzzle_image(puzzle.img_name)
)
result = result.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
if with_answer:
result = result.text(f"\n\n题目答案:{puzzle.flag}")
else:
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def get_submission_message(
puzzle: Puzzle,
submission: PuzzleSubmission,
daily_puzzle_info: DailyPuzzleInfo | None = None,
) -> str:
"""
获得提交答案的反馈信息
"""
if submission.success:
rank = -1
if daily_puzzle_info is not None:
rank = len(daily_puzzle_info.success_users)
return f"🎉 恭喜你答对了!你是今天第 {rank} 个解出来的!"
if submission.hint_id >= 0 and (
hint := puzzle.hints.get(submission.hint_id)
) is not None:
if hint.is_checkpoint:
hint_msg = "✨ 恭喜!这是本题的中间答案,加油!"
else:
hint_msg = "🤔 答错啦!请检查你的答案。"
return f"{hint_msg}\n\n提示:{hint.hint}"
return "❌ 答错啦!请检查你的答案。"
def get_daily_report(
manager: PuzzleManager,
date: datetime.date,
) -> str | None:
"""
获得某日的提交的报告信息
"""
index_id = manager.daily_puzzle_of_date.get(date)
if index_id is None:
return None
info = manager.daily_puzzle[index_id]
puzzle = manager.puzzle_data[info.raw_id]
msg = f"[KonaPH#{puzzle.index_id}] 「{puzzle.title}」解答报告\n\n"
if len(info.success_users) == 0:
msg += "昨日,无人解出此题 😭😭\n\n"
else:
msg += f"昨日,共有 {len(info.success_users)} 人解出此题。\n\n"
msg += "前五名的解答者:\n\n"
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = manager.submissions[puzzle.raw_id][u][-1]
msg += f"- {get_username(u)}{m.time.strftime('%H:%M')}\n"
msg += "\n"
msg += f"出题人:{get_username(puzzle.author_id)}"
return msg
def get_daily_report_v2(manager: PuzzleManager, gid: int | None = None):
p = manager.get_today_puzzle()
if p is None:
return "今天无题"
msg = "==== 今日答题情况 ====\n\n"
subcount = len(functools.reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg += (
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg += f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]"
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
msg += f"\n- {uname} [💦 {tries} 提交]"
return msg
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
image_manager = get_image_manager()
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=image_manager.read_puzzle_image(puzzle.img_name))
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg

View File

@ -0,0 +1,9 @@
from konabot.common.path import DATA_PATH
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)

View File

@ -1,27 +1,26 @@
import asyncio import asyncio
import datetime import datetime
import random import random
import re
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
import nanoid import nanoid
from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, Field, ValidationError from pydantic import BaseModel, Field, ValidationError
from konabot.common.path import DATA_PATH from konabot.plugins.kona_ph.core.path import KONAPH_DATA_JSON
from konabot.common.username import get_username
KONAPH_BASE = DATA_PATH / "KonaPH" class PuzzleHint(BaseModel):
KONAPH_DATA_JSON = KONAPH_BASE / "data.json" pattern: str
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs" hint: str
is_checkpoint: bool
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True) class PuzzleSubmission(BaseModel):
KONAPH_IMAGE_BASE.mkdir(exist_ok=True) success: bool
flag: str
time: datetime.datetime
hint_id: int = -1
class Puzzle(BaseModel): class Puzzle(BaseModel):
@ -40,41 +39,38 @@ class Puzzle(BaseModel):
ready: bool = False ready: bool = False
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
def get_image_path(self) -> Path: hints: dict[int, PuzzleHint] = Field(default_factory=dict)
return KONAPH_IMAGE_BASE / self.img_name
def get_unimessage(self) -> UniMessage[Any]: @property
result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}") def hint_id_max(self) -> int:
result = result.text(f"\n\n{self.content}") return max((0, *self.hints.keys()))
if self.img_name: def check_submission(
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes()) self,
submission: str,
result = result.text(f"\n\n出题者:{get_username(self.author_id)}") time: datetime.datetime | None = None,
result = result.text("\n\n输入「提交答案 答案」来提交你的解答") ) -> PuzzleSubmission:
if time is None:
return result time = datetime.datetime.now()
if submission == self.flag:
def add_image(self, img: bytes, suffix: str = ".png"): return PuzzleSubmission(
if self.img_name: success=True,
self.get_image_path().unlink(True) flag=submission,
img_id = nanoid.generate( time=time,
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", )
21, for hint_id, hint in self.hints.items():
if re.match(hint.pattern, submission):
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
) )
self.img_name = f"{img_id}{suffix}"
self.get_image_path().write_bytes(img)
def remove_image(self):
if self.img_name:
self.get_image_path().unlink(True)
self.img_name = ""
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
class DailyPuzzleInfo(BaseModel): class DailyPuzzleInfo(BaseModel):
@ -84,15 +80,10 @@ class DailyPuzzleInfo(BaseModel):
success_users: dict[str, datetime.datetime] = {} success_users: dict[str, datetime.datetime] = {}
class PuzzleSubmissionResultMessage(BaseModel): class PuzzleSubmissionFeedback(BaseModel):
success: bool submission: PuzzleSubmission
rank: int = -1 puzzle: Puzzle
message: str = "" info: DailyPuzzleInfo | None = None
def get_unimessage(self) -> UniMessage[Any]:
if self.success:
return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!")
return UniMessage.text(self.message)
def get_today_date() -> datetime.date: def get_today_date() -> datetime.date:
@ -162,26 +153,6 @@ class PuzzleManager(BaseModel):
self.index_id_counter += 1 self.index_id_counter += 1
def fix(self):
# 尝试修复今日的数据
for p in self.puzzle_data.values():
if self.is_puzzle_published(p.raw_id):
p.ready = True
if self.puzzle_pinned not in self.unpublished_puzzles:
self.puzzle_pinned = ""
# 撤回重复发布的题
already_published: set[str] = set()
for date in list(self.daily_puzzle_of_date.keys()):
index_id = self.daily_puzzle_of_date[date]
info = self.daily_puzzle[index_id]
if info.raw_id in already_published:
del self.daily_puzzle[index_id]
del self.daily_puzzle_of_date[date]
else:
already_published.add(info.raw_id)
def admin_pin_puzzle(self, raw_id: str): def admin_pin_puzzle(self, raw_id: str):
if raw_id in self.puzzle_data: if raw_id in self.puzzle_data:
self.puzzle_pinned = raw_id self.puzzle_pinned = raw_id
@ -213,41 +184,23 @@ class PuzzleManager(BaseModel):
return return
return self.daily_puzzle[p.index_id] return self.daily_puzzle[p.index_id]
def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage: def submit(self, user: str, flag: str) -> PuzzleSubmissionFeedback | str:
p = self.get_today_puzzle() p = self.get_today_puzzle()
d = self.get_today_info() d = self.get_today_info()
now = datetime.datetime.now() now = datetime.datetime.now()
if p is None or d is None: if p is None or d is None:
return PuzzleSubmissionResultMessage( return "今天没有题哦,改天再来吧!"
success=False,
message="今天没有题哦,改天再来吧!",
)
if user in d.success_users: if user in d.success_users:
return PuzzleSubmissionResultMessage( return "你今天已经答对过啦!不用重复提交哦!"
success=False,
message="你今天已经答对过啦!不用重复提交哦!",
)
if flag != p.flag:
d.tried_users.add(user)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=False,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=False,
message="❌ 答错了,请检查你的答案哦",
)
d.tried_users.add(user) d.tried_users.add(user)
d.success_users[user] = now result = p.check_submission(flag, now)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(result)
success=True, if result.success:
flag=flag, d.success_users[user] = now
time=now, return PuzzleSubmissionFeedback(
)) submission=result,
return PuzzleSubmissionResultMessage( puzzle=p,
success=True, info=d,
rank=len(d.success_users),
) )
def admin_create_puzzle(self, user: str): def admin_create_puzzle(self, user: str):
@ -273,33 +226,6 @@ class PuzzleManager(BaseModel):
if p.author_id == user if p.author_id == user
], key=lambda p: p.created_at, reverse=True) ], key=lambda p: p.created_at, reverse=True)
def get_report_yesterday(self):
yesterday = get_today_date() - datetime.timedelta(days=1)
index_id = self.daily_puzzle_of_date.get(yesterday)
if index_id is None:
return None
info = self.daily_puzzle[index_id]
puzzle = self.puzzle_data[info.raw_id]
message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告")
if len(info.success_users) == 0:
message = message.text(
"\n\n昨日,竟无人解出此题!"
)
else:
message = message.text(
f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:"
)
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = self.submissions[puzzle.raw_id][u][-1]
message = message.text(f"- {get_username(u)}{m.time.strftime('%H:%M')}")
message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
return message
lock = asyncio.Lock() lock = asyncio.Lock()

View File

@ -1,15 +1,20 @@
import datetime import datetime
from math import ceil from math import ceil
from typing import Any
from nonebot import get_plugin_config from nonebot import get_plugin_config
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
Subcommand, SubcommandResult, UniMessage, on_alconna)
from pydantic import BaseModel from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.extract_image import download_image_bytes from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.nb.qq_broadcast import qq_broadcast from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.username import get_username from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description,
get_puzzle_info_message,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date, puzzle_manager
PUZZLE_PAGE_SIZE = 10 PUZZLE_PAGE_SIZE = 10
@ -31,33 +36,6 @@ def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin return target.target_id in config.plugin_puzzle_admin
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def create_admin_commands(): def create_admin_commands():
cmd_admin = on_alconna( cmd_admin = on_alconna(
Alconna( Alconna(
@ -82,7 +60,45 @@ def create_admin_commands():
), ),
Subcommand("publish", Args["raw_id?", str], dest="publish"), Subcommand("publish", Args["raw_id?", str], dest="publish"),
Subcommand("preview", Args["raw_id", str], dest="preview"), Subcommand("preview", Args["raw_id", str], dest="preview"),
Subcommand("get-submits", Args["raw_id", str], dest="get-submits") Subcommand("get-submits", Args["raw_id", str], dest="get-submits"),
Subcommand(
"test",
Args["raw_id", str],
Args["submission", str],
dest="test",
),
Subcommand(
"hint",
Subcommand(
"add",
Args["raw_id", str],
Args["pattern", str],
Args["hint", str],
dest="add",
),
Subcommand(
"list",
Args["raw_id", str],
Args["page?", int],
dest="get",
),
Subcommand(
"modify",
Args["raw_id", str],
Args["hint_id", int],
Option("--pattern", Args["pattern", str], alias=["-p"]),
Option("--hint", Args["hint", str], alias=["-h"]),
Option("--checkpoint", Args["is_checkpoint", bool], alias=["-c"]),
dest="modify",
),
Subcommand(
"delete",
Args["raw_id", str],
Args["hint_id", int],
dest="delete",
),
dest="hint",
),
), ),
rule=is_puzzle_manager, rule=is_puzzle_manager,
) )
@ -98,6 +114,8 @@ def create_admin_commands():
msg = msg.text("konaph modify - 查看如何修改谜题信息\n") msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n") msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n")
msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n") msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n")
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target): if is_puzzle_admin(target):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n") msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
@ -293,6 +311,7 @@ def create_admin_commands():
" --image <图片> 图片\n" " --image <图片> 图片\n"
" --remove-image 删除图片" " --remove-image 删除图片"
) )
image_manager = get_image_manager()
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data: if raw_id not in manager.puzzle_data:
@ -312,9 +331,10 @@ def create_admin_commands():
) )
if image is not None and image.url is not None: if image is not None and image.url is not None:
b = await download_image_bytes(image.url) b = await download_image_bytes(image.url)
p.add_image(b.unwrap()) image_manager.remove_puzzle_image(p.img_name)
image_manager.upload_puzzle_image(b.unwrap())
elif remove_image.available: elif remove_image.available:
p.remove_image() image_manager.remove_puzzle_image(p.img_name)
info2 = get_puzzle_info_message(manager, p) info2 = get_puzzle_info_message(manager, p)
@ -332,7 +352,7 @@ def create_admin_commands():
p = manager.get_today_puzzle(strong=True) p = manager.get_today_puzzle(strong=True)
if p is None: if p is None:
return await target.send_message("上架失败了orz可能是没题了") return await target.send_message("上架失败了orz可能是没题了")
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage()) await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(p))
return await target.send_message("Ok!") return await target.send_message("Ok!")
@cmd_admin.assign("preview") @cmd_admin.assign("preview")
@ -343,7 +363,7 @@ def create_admin_commands():
return await target.send_message("没有这个谜题") return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id: if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
return await target.send_message("你没有权限预览这个谜题") return await target.send_message("你没有权限预览这个谜题")
return await target.send_message(puzzle.get_unimessage()) return await target.send_message(get_puzzle_description(puzzle))
@cmd_admin.assign("get-submits") @cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str): async def _(target: DepLongTaskTarget, raw_id: str):
@ -361,5 +381,43 @@ def create_admin_commands():
msg = msg.text(f"- {get_username(uid)}{s}\n") msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg) return await target.send_message(msg)
@cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
"""
测试一道谜题的回答,并给出结果
"""
async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None:
return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
return await target.send_message("你没有权限预览这个谜题")
result = puzzle.check_submission(submission)
msg = get_submission_message(puzzle, result)
return await target.send_message(
UniMessage.text("[测试提交] ") + msg
)
@cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
if len(subcommands.result.subcommands) > 0:
return
return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
.text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --hint <hint>\n - 更改提示文本\n")
.text(" - --checkpoint [True|False]\n - 更改是否为中间答案\n")
.text("- konaph hint delete <id> <hint_id>\n - 删除一个提示 / 中间答案\n")
.text("\n更多关于 pattern 和中间答案的信息,请见 man中间答案(7)")
)
@cmd_admin.assign("hint.add")
async def _(target: DepLongTaskTarget):
await target.send_message("114514")
return cmd_admin return cmd_admin

View File

@ -1,4 +1,3 @@
from curses.ascii import isdigit
from pathlib import Path from pathlib import Path
import nonebot import nonebot
@ -40,7 +39,10 @@ async def _(
doc: str | None, doc: str | None,
event: nonebot.adapters.Event, event: nonebot.adapters.Event,
): ):
if doc is not None and section is None and all(isdigit(c) for c in doc): if doc is not None and section is None and all(
ord('0') <= ord(c) <= ord('9')
for c in doc
):
section = int(doc) section = int(doc)
doc = None doc = None