Compare commits

...

4 Commits

Author SHA1 Message Date
0231aa04f4 添加中间答案功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 18:29:10 +08:00
01fe33eb9f 部分解耦了 konaph 的一些层 2025-11-01 17:52:05 +08:00
adfbac7d90 支持正义 utf-8 2025-11-01 13:48:48 +08:00
994c1412da 为 Watchfiles 添加更可配置的过滤器
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-01 12:40:01 +08:00
19 changed files with 539 additions and 293 deletions

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.REPL.enableREPLSmartSend": false
}

View File

@ -68,7 +68,7 @@ code .
使用命令行手动启动 Bot
```bash
poetry run watchfiles bot.main konabot
poetry run watchfiles bot.main . --filter scripts.watch_filter.filter
```
如果你不希望自动重载,只是想运行 Bot可以直接运行

View File

@ -19,12 +19,12 @@ class DataManager(Generic[T]):
if not self.fp.exists():
return self.cls()
try:
return self.cls.model_validate_json(self.fp.read_text())
return self.cls.model_validate_json(self.fp.read_text("utf-8"))
except ValidationError:
return self.cls()
def save(self, data: T):
self.fp.write_text(data.model_dump_json())
self.fp.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager
async def get_data(self):

View File

@ -240,7 +240,7 @@ def handle_long_task(callback_id: str):
def _load_longtask_data() -> LongTaskModuleData:
try:
txt = LONGTASK_DATA_DIR.read_text()
txt = LONGTASK_DATA_DIR.read_text("utf-8")
return LongTaskModuleData.model_validate_json(txt)
except (FileNotFoundError, ValidationError) as e:
logger.info(f"取得 LongTask 数据时出现问题:{e}")
@ -251,7 +251,7 @@ def _load_longtask_data() -> LongTaskModuleData:
def _save_longtask_data(data: LongTaskModuleData):
LONGTASK_DATA_DIR.write_text(data.model_dump_json())
LONGTASK_DATA_DIR.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager

View File

@ -1,10 +1,13 @@
from typing import Any, cast
import nonebot
from nonebot_plugin_alconna import UniMessage
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot_plugin_alconna import UniMessage
async def qq_broadcast(groups: list[str], msg: UniMessage[Any]):
async def qq_broadcast(groups: list[str], msg: UniMessage[Any] | str):
if isinstance(msg, str):
msg = UniMessage.text(msg)
bots: dict[str, OBBot] = {}
# group_id -> bot_id

View File

@ -0,0 +1,11 @@
关于「中间答案」或者「提示」:
在 KonaPH 中,当有人发送「提交答案 答案」时,会检查答案是否符合你设置的中间答案的 pattern。这个 pattern 可以有两种方式:
- 纯文本的完整匹配:你设置的 pattern 如果和提交的答案完全相等,则会触发提示。
- regex 匹配:你设置的 pattern 如果以斜杠(/)开头和结尾,就会检查提交的答案是否匹配正则表达式。注意 ^ 和 $ 符号的使用。
- 例如:/^commit$/ 会匹配 commit不会匹配 acommit、Commit 等。
- 而如果是 /commit/,则会匹配 commit、acommit而不会匹配 Commit。
- 无法使用 Javascript 的字符串声明模式,例如,/case_insensitive/i 就不会被视作一个正则表达式。
一个提示是提示,还是中间答案,取决于它是否有 checkpoint 标记。如果有 checkpoint 标记,则会提示用户「你回答了一个中间答案」,并且这个中间答案的回答会在排行榜中显示。

View File

@ -30,7 +30,7 @@ def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
return []
try:
return json.loads(DATA_FILE_PATH.read_text())
return json.loads(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
@ -45,14 +45,14 @@ def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
class TryStartState(Enum):

View File

@ -1,17 +1,23 @@
from functools import reduce
from math import ceil
import datetime
import re
from math import ceil
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
from konabot.common.longtask import DepLongTaskTarget
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
on_alconna)
from nonebot_plugin_apscheduler import scheduler
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
create_admin_commands,
puzzle_manager)
create_admin_commands()
@ -24,18 +30,6 @@ async def is_play_group(target: DepLongTaskTarget):
return False
# cmd_submit = on_alconna(Alconna(
# "re:提交(?:答案|题解|[fF]lag)",
# Args["flag", str],
# ), rule=is_play_group)
#
# @cmd_submit.handle()
# async def _(flag: str, target: DepLongTaskTarget):
# async with puzzle_manager() as manager:
# result = manager.submit(target.target_id, flag)
# await target.send_message(result.get_unimessage())
cmd_submit = on_message(rule=is_play_group)
@ -46,7 +40,14 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
submission: str = match.group("submission")
async with puzzle_manager() as manager:
result = manager.submit(target.target_id, submission)
await target.send_message(result.get_unimessage())
if isinstance(result, str):
await target.send_message(result)
else:
await target.send_message(get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
))
cmd_query = on_alconna(Alconna(
@ -59,7 +60,7 @@ async def _(target: DepLongTaskTarget):
p = manager.get_today_puzzle()
if p is None:
return await target.send_message("今天无题,改日再来吧!")
await target.send_message(p.get_unimessage())
await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna(
@ -68,44 +69,11 @@ cmd_query_submission = on_alconna(Alconna(
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
async with puzzle_manager() as manager:
p = manager.get_today_puzzle()
if p is None:
return await target.send_message("今天无题")
msg = UniMessage.text("==== 今日答题情况 ====\n\n")
subcount = len(reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg = msg.text(
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]")
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [💦 {tries} 提交]")
await target.send_message(msg)
await target.send_message(get_daily_report_v2(manager, gid))
cmd_history = on_alconna(Alconna(
@ -120,15 +88,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
today = get_today_date()
if index_id:
index_id = index_id.removeprefix("#")
if index_id == manager.daily_puzzle_of_date.get(today, ""):
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
return await target.send_message(puzzle.get_unimessage())
if index_id in manager.daily_puzzle:
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
msg = puzzle.get_unimessage()
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}")
return await target.send_message(msg)
return await target.send_message("没有这道题哦")
if index_id not in manager.daily_puzzle:
return await target.send_message("没有这道题哦")
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
msg = get_puzzle_description(
puzzle,
with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")),
)
return await target.send_message(msg)
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
puzzles = [
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
@ -155,15 +122,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@scheduler.scheduled_job("cron", hour="8")
async def _():
async with puzzle_manager() as manager:
msg2 = manager.get_report_yesterday()
yesterday = get_today_date() - datetime.timedelta(days=1)
msg2 = get_daily_report(manager, yesterday)
if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
puzzle = manager.get_today_puzzle()
if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage())
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
else:
logger.info("自动任务:没有找到题目,跳过")

View File

@ -0,0 +1,29 @@
import nanoid
from konabot.common.path import ASSETS_PATH
from konabot.plugins.kona_ph.core.path import KONAPH_IMAGE_BASE
class PuzzleImageManager:
def read_puzzle_image(self, img_name: str) -> bytes:
fp = KONAPH_IMAGE_BASE / img_name
if fp.exists():
return fp.read_bytes()
return (ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes()
def upload_puzzle_image(self, data: bytes, suffix: str = ".png") -> str:
id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
)
img_name = f"{id}{suffix}"
(KONAPH_IMAGE_BASE / img_name).write_bytes(data)
return img_name
def remove_puzzle_image(self, img_name: str):
if img_name:
(KONAPH_IMAGE_BASE / img_name).unlink(True)
def get_image_manager() -> PuzzleImageManager:
return PuzzleImageManager()

View File

@ -0,0 +1,176 @@
"""
生成各种各样的 Message 的函数集合
"""
import datetime
import functools
import re
from typing import Any
from nonebot_plugin_alconna import UniMessage
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.storage import (DailyPuzzleInfo, Puzzle,
PuzzleManager,
PuzzleSubmission)
def get_puzzle_description(puzzle: Puzzle, with_answer: bool = False) -> UniMessage[Any]:
"""
获取一个谜题的描述
"""
img_manager = get_image_manager()
result = UniMessage.text(f"[KonaPH#{puzzle.index_id}] {puzzle.title}")
result = result.text(f"\n\n{puzzle.content}")
if puzzle.img_name:
result = result.text("\n\n").image(
raw=img_manager.read_puzzle_image(puzzle.img_name)
)
result = result.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
if with_answer:
result = result.text(f"\n\n题目答案:{puzzle.flag}")
else:
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def get_submission_message(
puzzle: Puzzle,
submission: PuzzleSubmission,
daily_puzzle_info: DailyPuzzleInfo | None = None,
) -> str:
"""
获得提交答案的反馈信息
"""
if submission.success:
rank = -1
if daily_puzzle_info is not None:
rank = len(daily_puzzle_info.success_users)
return f"🎉 恭喜你答对了!你是今天第 {rank} 个解出来的!"
if submission.hint_id >= 0 and (
hint := puzzle.hints.get(submission.hint_id)
) is not None:
if hint.is_checkpoint:
hint_msg = "✨ 恭喜!这是本题的中间答案,加油!"
else:
hint_msg = "🤔 答错啦!请检查你的答案。"
return f"{hint_msg}\n\n提示:{hint.message}"
return "❌ 答错啦!请检查你的答案。"
def get_daily_report(
manager: PuzzleManager,
date: datetime.date,
) -> str | None:
"""
获得某日的提交的报告信息
"""
index_id = manager.daily_puzzle_of_date.get(date)
if index_id is None:
return None
info = manager.daily_puzzle[index_id]
puzzle = manager.puzzle_data[info.raw_id]
msg = f"[KonaPH#{puzzle.index_id}] 「{puzzle.title}」解答报告\n\n"
if len(info.success_users) == 0:
msg += "昨日,无人解出此题 😭😭\n\n"
else:
msg += f"昨日,共有 {len(info.success_users)} 人解出此题。\n\n"
msg += "前五名的解答者:\n\n"
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = manager.submissions[puzzle.raw_id][u][-1]
msg += f"- {get_username(u)}{m.time.strftime('%H:%M')}\n"
msg += "\n"
msg += f"出题人:{get_username(puzzle.author_id)}"
return msg
def get_daily_report_v2(manager: PuzzleManager, gid: int | None = None):
p = manager.get_today_puzzle()
if p is None:
return "今天无题"
msg = "==== 今日答题情况 ====\n\n"
subcount = len(functools.reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg += (
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg += f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]"
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
msg += f"\n- {uname} [💦 {tries} 提交]"
return msg
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
image_manager = get_image_manager()
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=image_manager.read_puzzle_image(puzzle.img_name))
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def get_puzzle_hint_list(puzzle: Puzzle) -> str:
msg = f"==== {puzzle.title} 提示与中间答案 ====\n"
if len(puzzle.hints) == 0:
msg += "\n你没有添加任何中间答案。"
return msg
for hint_id, hint in puzzle.hints.items():
n = {False: "[提示]", True: "[中间答案]"}[hint.is_checkpoint]
msg += f"\n{n}[{hint_id}] {hint.pattern}"
msg += f"\n {hint.message}"
return msg

View File

@ -0,0 +1,9 @@
from konabot.common.path import DATA_PATH
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)

View File

@ -1,27 +1,26 @@
import asyncio
import datetime
import random
import re
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
import nanoid
from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, Field, ValidationError
from konabot.common.path import DATA_PATH
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.path import KONAPH_DATA_JSON
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
class PuzzleHint(BaseModel):
pattern: str
message: str
is_checkpoint: bool
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
hint_id: int = -1
class Puzzle(BaseModel):
@ -40,41 +39,47 @@ class Puzzle(BaseModel):
ready: bool = False
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
def get_image_path(self) -> Path:
return KONAPH_IMAGE_BASE / self.img_name
hints: dict[int, PuzzleHint] = Field(default_factory=dict)
def get_unimessage(self) -> UniMessage[Any]:
result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}")
result = result.text(f"\n\n{self.content}")
@property
def hint_id_max(self) -> int:
return max((0, *self.hints.keys()))
if self.img_name:
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes())
result = result.text(f"\n\n出题者:{get_username(self.author_id)}")
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def add_image(self, img: bytes, suffix: str = ".png"):
if self.img_name:
self.get_image_path().unlink(True)
img_id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
def check_submission(
self,
submission: str,
time: datetime.datetime | None = None,
) -> PuzzleSubmission:
if time is None:
time = datetime.datetime.now()
if submission == self.flag:
return PuzzleSubmission(
success=True,
flag=submission,
time=time,
)
for hint_id, hint in self.hints.items():
if hint.pattern.startswith('/') and hint.pattern.endswith('/'):
if re.match(hint.pattern.strip('/'), submission):
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
else:
if hint.pattern == submission:
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
)
self.img_name = f"{img_id}{suffix}"
self.get_image_path().write_bytes(img)
def remove_image(self):
if self.img_name:
self.get_image_path().unlink(True)
self.img_name = ""
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
class DailyPuzzleInfo(BaseModel):
@ -84,15 +89,10 @@ class DailyPuzzleInfo(BaseModel):
success_users: dict[str, datetime.datetime] = {}
class PuzzleSubmissionResultMessage(BaseModel):
success: bool
rank: int = -1
message: str = ""
def get_unimessage(self) -> UniMessage[Any]:
if self.success:
return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!")
return UniMessage.text(self.message)
class PuzzleSubmissionFeedback(BaseModel):
submission: PuzzleSubmission
puzzle: Puzzle
info: DailyPuzzleInfo | None = None
def get_today_date() -> datetime.date:
@ -162,26 +162,6 @@ class PuzzleManager(BaseModel):
self.index_id_counter += 1
def fix(self):
# 尝试修复今日的数据
for p in self.puzzle_data.values():
if self.is_puzzle_published(p.raw_id):
p.ready = True
if self.puzzle_pinned not in self.unpublished_puzzles:
self.puzzle_pinned = ""
# 撤回重复发布的题
already_published: set[str] = set()
for date in list(self.daily_puzzle_of_date.keys()):
index_id = self.daily_puzzle_of_date[date]
info = self.daily_puzzle[index_id]
if info.raw_id in already_published:
del self.daily_puzzle[index_id]
del self.daily_puzzle_of_date[date]
else:
already_published.add(info.raw_id)
def admin_pin_puzzle(self, raw_id: str):
if raw_id in self.puzzle_data:
self.puzzle_pinned = raw_id
@ -213,41 +193,23 @@ class PuzzleManager(BaseModel):
return
return self.daily_puzzle[p.index_id]
def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage:
def submit(self, user: str, flag: str) -> PuzzleSubmissionFeedback | str:
p = self.get_today_puzzle()
d = self.get_today_info()
now = datetime.datetime.now()
if p is None or d is None:
return PuzzleSubmissionResultMessage(
success=False,
message="今天没有题哦,改天再来吧!",
)
return "今天没有题哦,改天再来吧!"
if user in d.success_users:
return PuzzleSubmissionResultMessage(
success=False,
message="你今天已经答对过啦!不用重复提交哦!",
)
if flag != p.flag:
d.tried_users.add(user)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=False,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=False,
message="❌ 答错了,请检查你的答案哦",
)
return "你今天已经答对过啦!不用重复提交哦!"
d.tried_users.add(user)
d.success_users[user] = now
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=True,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=True,
rank=len(d.success_users),
result = p.check_submission(flag, now)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(result)
if result.success:
d.success_users[user] = now
return PuzzleSubmissionFeedback(
submission=result,
puzzle=p,
info=d,
)
def admin_create_puzzle(self, user: str):
@ -273,47 +235,20 @@ class PuzzleManager(BaseModel):
if p.author_id == user
], key=lambda p: p.created_at, reverse=True)
def get_report_yesterday(self):
yesterday = get_today_date() - datetime.timedelta(days=1)
index_id = self.daily_puzzle_of_date.get(yesterday)
if index_id is None:
return None
info = self.daily_puzzle[index_id]
puzzle = self.puzzle_data[info.raw_id]
message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告")
if len(info.success_users) == 0:
message = message.text(
"\n\n昨日,竟无人解出此题!"
)
else:
message = message.text(
f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:"
)
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = self.submissions[puzzle.raw_id][u][-1]
message = message.text(f"- {get_username(u)}{m.time.strftime('%H:%M')}")
message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
return message
lock = asyncio.Lock()
def read_data():
try:
data_raw = KONAPH_DATA_JSON.read_text()
data_raw = KONAPH_DATA_JSON.read_text("utf-8")
return PuzzleManager.model_validate_json(data_raw)
except (FileNotFoundError, ValidationError):
return PuzzleManager()
def write_data(data: PuzzleManager):
KONAPH_DATA_JSON.write_text(data.model_dump_json())
KONAPH_DATA_JSON.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager

View File

@ -1,15 +1,24 @@
import datetime
from math import ceil
from typing import Any
from nonebot import get_plugin_config
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
Subcommand, SubcommandResult, UniMessage,
on_alconna)
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
get_puzzle_info_message,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
get_today_date,
puzzle_manager)
PUZZLE_PAGE_SIZE = 10
@ -31,31 +40,15 @@ def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
if raw_id not in manager.puzzle_data:
raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id]
if is_puzzle_admin(target):
return puzzle
if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
return puzzle
def create_admin_commands():
@ -82,7 +75,45 @@ def create_admin_commands():
),
Subcommand("publish", Args["raw_id?", str], dest="publish"),
Subcommand("preview", Args["raw_id", str], dest="preview"),
Subcommand("get-submits", Args["raw_id", str], dest="get-submits")
Subcommand("get-submits", Args["raw_id", str], dest="get-submits"),
Subcommand(
"test",
Args["raw_id", str],
Args["submission", str],
dest="test",
),
Subcommand(
"hint",
Subcommand(
"add",
Args["raw_id", str],
Args["pattern", str],
Args["message", str],
dest="add",
),
Subcommand(
"list",
Args["raw_id", str],
Args["page?", int],
dest="list",
),
Subcommand(
"modify",
Args["raw_id", str],
Args["hint_id", int],
Option("--pattern", Args["pattern", str], alias=["-p"]),
Option("--message", Args["message", str], alias=["-m"]),
Option("--checkpoint", Args["is_checkpoint", bool], alias=["-c"]),
dest="modify",
),
Subcommand(
"delete",
Args["raw_id", str],
Args["hint_id", int],
dest="delete",
),
dest="hint",
),
),
rule=is_puzzle_manager,
)
@ -98,6 +129,8 @@ def create_admin_commands():
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n")
msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n")
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
@ -122,15 +155,7 @@ def create_admin_commands():
@cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
p = check_puzzle(manager, target, raw_id)
if p.ready:
return await target.send_message(UniMessage.text(
"题目早就准备好啦!"
@ -143,15 +168,7 @@ def create_admin_commands():
@cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
p = check_puzzle(manager, target, raw_id)
if not p.ready:
return await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经是未取消状态了!"
@ -169,16 +186,7 @@ def create_admin_commands():
@cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限查看详细信息!"
))
p = check_puzzle(manager, target, raw_id)
await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my")
@ -214,7 +222,9 @@ def create_admin_commands():
@cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text("你没有权限查看所有的哦"))
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()]
if ready.available:
@ -293,13 +303,10 @@ def create_admin_commands():
" --image <图片> 图片\n"
" --remove-image 删除图片"
)
image_manager = get_image_manager()
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message("没有这个谜题")
p = manager.puzzle_data[raw_id]
if not is_puzzle_admin(target) and target.target_id != p.author_id:
return await target.send_message("你没有权限编辑这个谜题")
p = check_puzzle(manager, target, raw_id)
if title is not None:
p.title = title
if description is not None:
@ -312,9 +319,10 @@ def create_admin_commands():
)
if image is not None and image.url is not None:
b = await download_image_bytes(image.url)
p.add_image(b.unwrap())
image_manager.remove_puzzle_image(p.img_name)
image_manager.upload_puzzle_image(b.unwrap())
elif remove_image.available:
p.remove_image()
image_manager.remove_puzzle_image(p.img_name)
info2 = get_puzzle_info_message(manager, p)
@ -322,6 +330,10 @@ def create_admin_commands():
@cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
today = get_today_date()
async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date:
@ -332,18 +344,14 @@ def create_admin_commands():
p = manager.get_today_puzzle(strong=True)
if p is None:
return await target.send_message("上架失败了orz可能是没题了")
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage())
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(p))
return await target.send_message("Ok!")
@cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str):
async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None:
return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
return await target.send_message("你没有权限预览这个谜题")
return await target.send_message(puzzle.get_unimessage())
p = check_puzzle(manager, target, raw_id)
return await target.send_message(get_puzzle_description(p))
@cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str):
@ -361,5 +369,96 @@ def create_admin_commands():
msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg)
@cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
"""
测试一道谜题的回答,并给出结果
"""
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
result = p.check_submission(submission)
msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
if len(subcommands.result.subcommands) > 0:
return
return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
.text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n")
.text(" - --checkpoint [True|False]\n - 更改是否为中间答案\n")
.text("- konaph hint delete <id> <hint_id>\n - 删除一个提示 / 中间答案\n")
.text("\n更多关于 pattern 和中间答案的信息,请见 man中间答案(7)")
)
@cmd_admin.assign("subcommands.hint.add")
async def _(
target: DepLongTaskTarget,
raw_id: str,
pattern: str,
message: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern,
message=message,
is_checkpoint=False,
)
await target.send_message("创建成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.list")
async def _(
target: DepLongTaskTarget,
raw_id: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
pattern: str | None = None,
message: str | None = None,
is_checkpoint: bool | None = None,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
hint = p.hints[hint_id]
if pattern is not None:
hint.pattern = pattern
if message is not None:
hint.message = message
if is_checkpoint is not None:
hint.is_checkpoint = is_checkpoint
await target.send_message("更改成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.delete")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin

View File

@ -1,4 +1,3 @@
from curses.ascii import isdigit
from pathlib import Path
import nonebot
@ -40,7 +39,10 @@ async def _(
doc: str | None,
event: nonebot.adapters.Event,
):
if doc is not None and section is None and all(isdigit(c) for c in doc):
if doc is not None and section is None and all(
ord('0') <= ord(c) <= ord('9')
for c in doc
):
section = int(doc)
doc = None

View File

@ -15,7 +15,7 @@ if not POLL_DATA_FILE.exists():
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
poll_list = json.loads(POLL_DATA_FILE.read_text("utf-8"))['poll']
async def createpoll(title,qqid,options):
polllength = len(poll_list)
@ -53,7 +53,7 @@ def writeback():
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
POLL_DATA_FILE.write_text(json.dumps({
'poll': poll_list,
}, ensure_ascii=False, sort_keys=True))
}, ensure_ascii=False, sort_keys=True), "utf-8")
async def pollvote(polnum,optionnum,qqnum):
optiond = poll_list[polnum]["polldata"]

View File

@ -59,14 +59,14 @@ def load_notify_config() -> NotifyConfigFile:
if not DATA_FILE_PATH.exists():
return NotifyConfigFile()
try:
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text())
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析 Notify 时遇到问题:{e}")
return NotifyConfigFile()
def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4), "utf-8")
@evt.handle()

13
scripts/watch_filter.py Normal file
View File

@ -0,0 +1,13 @@
from pathlib import Path
from watchfiles import Change
base = Path(__file__).parent.parent.absolute()
def filter(change: Change, path: str) -> bool:
if "__pycache__" in path:
return False
if Path(path).absolute().is_relative_to(base / "data"):
return False
return True