Compare commits

..

6 Commits

Author SHA1 Message Date
9320815d3f 修复无法更改图片的问题
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 20:59:58 +08:00
795300cb83 在每日答题情况添加记录点显示
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 18:42:47 +08:00
0231aa04f4 添加中间答案功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 18:29:10 +08:00
01fe33eb9f 部分解耦了 konaph 的一些层 2025-11-01 17:52:05 +08:00
adfbac7d90 支持正义 utf-8 2025-11-01 13:48:48 +08:00
994c1412da 为 Watchfiles 添加更可配置的过滤器
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-01 12:40:01 +08:00
19 changed files with 550 additions and 293 deletions

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.REPL.enableREPLSmartSend": false
}

View File

@ -68,7 +68,7 @@ code .
使用命令行手动启动 Bot 使用命令行手动启动 Bot
```bash ```bash
poetry run watchfiles bot.main konabot poetry run watchfiles bot.main . --filter scripts.watch_filter.filter
``` ```
如果你不希望自动重载,只是想运行 Bot可以直接运行 如果你不希望自动重载,只是想运行 Bot可以直接运行

View File

@ -19,12 +19,12 @@ class DataManager(Generic[T]):
if not self.fp.exists(): if not self.fp.exists():
return self.cls() return self.cls()
try: try:
return self.cls.model_validate_json(self.fp.read_text()) return self.cls.model_validate_json(self.fp.read_text("utf-8"))
except ValidationError: except ValidationError:
return self.cls() return self.cls()
def save(self, data: T): def save(self, data: T):
self.fp.write_text(data.model_dump_json()) self.fp.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager @asynccontextmanager
async def get_data(self): async def get_data(self):

View File

@ -240,7 +240,7 @@ def handle_long_task(callback_id: str):
def _load_longtask_data() -> LongTaskModuleData: def _load_longtask_data() -> LongTaskModuleData:
try: try:
txt = LONGTASK_DATA_DIR.read_text() txt = LONGTASK_DATA_DIR.read_text("utf-8")
return LongTaskModuleData.model_validate_json(txt) return LongTaskModuleData.model_validate_json(txt)
except (FileNotFoundError, ValidationError) as e: except (FileNotFoundError, ValidationError) as e:
logger.info(f"取得 LongTask 数据时出现问题:{e}") logger.info(f"取得 LongTask 数据时出现问题:{e}")
@ -251,7 +251,7 @@ def _load_longtask_data() -> LongTaskModuleData:
def _save_longtask_data(data: LongTaskModuleData): def _save_longtask_data(data: LongTaskModuleData):
LONGTASK_DATA_DIR.write_text(data.model_dump_json()) LONGTASK_DATA_DIR.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager @asynccontextmanager

View File

@ -1,10 +1,13 @@
from typing import Any, cast from typing import Any, cast
import nonebot import nonebot
from nonebot_plugin_alconna import UniMessage
from nonebot.adapters.onebot.v11 import Bot as OBBot from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot_plugin_alconna import UniMessage
async def qq_broadcast(groups: list[str], msg: UniMessage[Any]): async def qq_broadcast(groups: list[str], msg: UniMessage[Any] | str):
if isinstance(msg, str):
msg = UniMessage.text(msg)
bots: dict[str, OBBot] = {} bots: dict[str, OBBot] = {}
# group_id -> bot_id # group_id -> bot_id

View File

@ -0,0 +1,11 @@
关于「中间答案」或者「提示」:
在 KonaPH 中,当有人发送「提交答案 答案」时,会检查答案是否符合你设置的中间答案的 pattern。这个 pattern 可以有两种方式:
- 纯文本的完整匹配:你设置的 pattern 如果和提交的答案完全相等,则会触发提示。
- regex 匹配:你设置的 pattern 如果以斜杠(/)开头和结尾,就会检查提交的答案是否匹配正则表达式。注意 ^ 和 $ 符号的使用。
- 例如:/^commit$/ 会匹配 commit不会匹配 acommit、Commit 等。
- 而如果是 /commit/,则会匹配 commit、acommit而不会匹配 Commit。
- 无法使用 Javascript 的字符串声明模式,例如,/case_insensitive/i 就不会被视作一个正则表达式。
一个提示是提示,还是中间答案,取决于它是否有 checkpoint 标记。如果有 checkpoint 标记,则会提示用户「你回答了一个中间答案」,并且这个中间答案的回答会在排行榜中显示。

View File

@ -30,7 +30,7 @@ def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists(): if not DATA_FILE_PATH.exists():
return [] return []
try: try:
return json.loads(DATA_FILE_PATH.read_text()) return json.loads(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e: except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}") logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return [] return []
@ -45,14 +45,14 @@ def add_banned_id(group_id: str):
banned_ids = load_banned_ids() banned_ids = load_banned_ids()
if group_id not in banned_ids: if group_id not in banned_ids:
banned_ids.append(group_id) banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4)) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
def remove_banned_id(group_id: str): def remove_banned_id(group_id: str):
banned_ids = load_banned_ids() banned_ids = load_banned_ids()
if group_id in banned_ids: if group_id in banned_ids:
banned_ids.remove(group_id) banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4)) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
class TryStartState(Enum): class TryStartState(Enum):

View File

@ -1,17 +1,23 @@
from functools import reduce import datetime
from math import ceil
import re import re
from math import ceil
from loguru import logger from loguru import logger
from nonebot import on_message from nonebot import on_message
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
from konabot.common.nb.qq_broadcast import qq_broadcast on_alconna)
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
from konabot.common.longtask import DepLongTaskTarget
from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_apscheduler import scheduler
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
create_admin_commands,
puzzle_manager)
create_admin_commands() create_admin_commands()
@ -24,18 +30,6 @@ async def is_play_group(target: DepLongTaskTarget):
return False return False
# cmd_submit = on_alconna(Alconna(
# "re:提交(?:答案|题解|[fF]lag)",
# Args["flag", str],
# ), rule=is_play_group)
#
# @cmd_submit.handle()
# async def _(flag: str, target: DepLongTaskTarget):
# async with puzzle_manager() as manager:
# result = manager.submit(target.target_id, flag)
# await target.send_message(result.get_unimessage())
cmd_submit = on_message(rule=is_play_group) cmd_submit = on_message(rule=is_play_group)
@ -46,7 +40,14 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
submission: str = match.group("submission") submission: str = match.group("submission")
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
result = manager.submit(target.target_id, submission) result = manager.submit(target.target_id, submission)
await target.send_message(result.get_unimessage()) if isinstance(result, str):
await target.send_message(result)
else:
await target.send_message(get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
))
cmd_query = on_alconna(Alconna( cmd_query = on_alconna(Alconna(
@ -59,7 +60,7 @@ async def _(target: DepLongTaskTarget):
p = manager.get_today_puzzle() p = manager.get_today_puzzle()
if p is None: if p is None:
return await target.send_message("今天无题,改日再来吧!") return await target.send_message("今天无题,改日再来吧!")
await target.send_message(p.get_unimessage()) await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna( cmd_query_submission = on_alconna(Alconna(
@ -68,44 +69,11 @@ cmd_query_submission = on_alconna(Alconna(
@cmd_query_submission.handle() @cmd_query_submission.handle()
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget):
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = manager.get_today_puzzle() await target.send_message(get_daily_report_v2(manager, gid))
if p is None:
return await target.send_message("今天无题")
msg = UniMessage.text("==== 今日答题情况 ====\n\n")
subcount = len(reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg = msg.text(
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]")
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [💦 {tries} 提交]")
await target.send_message(msg)
cmd_history = on_alconna(Alconna( cmd_history = on_alconna(Alconna(
@ -120,15 +88,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
today = get_today_date() today = get_today_date()
if index_id: if index_id:
index_id = index_id.removeprefix("#") index_id = index_id.removeprefix("#")
if index_id == manager.daily_puzzle_of_date.get(today, ""): if index_id not in manager.daily_puzzle:
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] return await target.send_message("没有这道题哦")
return await target.send_message(puzzle.get_unimessage()) puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
if index_id in manager.daily_puzzle: msg = get_puzzle_description(
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] puzzle,
msg = puzzle.get_unimessage() with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")),
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}") )
return await target.send_message(msg) return await target.send_message(msg)
return await target.send_message("没有这道题哦")
msg = UniMessage.text("====== 历史题目清单 ======\n\n") msg = UniMessage.text("====== 历史题目清单 ======\n\n")
puzzles = [ puzzles = [
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d) (manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
@ -155,15 +122,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@scheduler.scheduled_job("cron", hour="8") @scheduler.scheduled_job("cron", hour="8")
async def _(): async def _():
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
msg2 = manager.get_report_yesterday() yesterday = get_today_date() - datetime.timedelta(days=1)
msg2 = get_daily_report(manager, yesterday)
if msg2 is not None: if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2) await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
puzzle = manager.get_today_puzzle() puzzle = manager.get_today_puzzle()
if puzzle is not None: if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送") logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage()) await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
else: else:
logger.info("自动任务:没有找到题目,跳过") logger.info("自动任务:没有找到题目,跳过")

View File

@ -0,0 +1,29 @@
import nanoid
from konabot.common.path import ASSETS_PATH
from konabot.plugins.kona_ph.core.path import KONAPH_IMAGE_BASE
class PuzzleImageManager:
def read_puzzle_image(self, img_name: str) -> bytes:
fp = KONAPH_IMAGE_BASE / img_name
if fp.exists():
return fp.read_bytes()
return (ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes()
def upload_puzzle_image(self, data: bytes, suffix: str = ".png") -> str:
id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
)
img_name = f"{id}{suffix}"
(KONAPH_IMAGE_BASE / img_name).write_bytes(data)
return img_name
def remove_puzzle_image(self, img_name: str):
if img_name:
(KONAPH_IMAGE_BASE / img_name).unlink(True)
def get_image_manager() -> PuzzleImageManager:
return PuzzleImageManager()

View File

@ -0,0 +1,187 @@
"""
生成各种各样的 Message 的函数集合
"""
import datetime
import functools
import re
from typing import Any
from nonebot_plugin_alconna import UniMessage
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.storage import (DailyPuzzleInfo, Puzzle,
PuzzleManager,
PuzzleSubmission)
def get_puzzle_description(puzzle: Puzzle, with_answer: bool = False) -> UniMessage[Any]:
"""
获取一个谜题的描述
"""
img_manager = get_image_manager()
result = UniMessage.text(f"[KonaPH#{puzzle.index_id}] {puzzle.title}")
result = result.text(f"\n\n{puzzle.content}")
if puzzle.img_name:
result = result.text("\n\n").image(
raw=img_manager.read_puzzle_image(puzzle.img_name)
)
result = result.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
if with_answer:
result = result.text(f"\n\n题目答案:{puzzle.flag}")
else:
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def get_submission_message(
puzzle: Puzzle,
submission: PuzzleSubmission,
daily_puzzle_info: DailyPuzzleInfo | None = None,
) -> str:
"""
获得提交答案的反馈信息
"""
if submission.success:
rank = -1
if daily_puzzle_info is not None:
rank = len(daily_puzzle_info.success_users)
return f"🎉 恭喜你答对了!你是今天第 {rank} 个解出来的!"
if submission.hint_id >= 0 and (
hint := puzzle.hints.get(submission.hint_id)
) is not None:
if hint.is_checkpoint:
hint_msg = "✨ 恭喜!这是本题的中间答案,加油!"
else:
hint_msg = "🤔 答错啦!请检查你的答案。"
return f"{hint_msg}\n\n提示:{hint.message}"
return "❌ 答错啦!请检查你的答案。"
def get_daily_report(
manager: PuzzleManager,
date: datetime.date,
) -> str | None:
"""
获得某日的提交的报告信息
"""
index_id = manager.daily_puzzle_of_date.get(date)
if index_id is None:
return None
info = manager.daily_puzzle[index_id]
puzzle = manager.puzzle_data[info.raw_id]
msg = f"[KonaPH#{puzzle.index_id}] 「{puzzle.title}」解答报告\n\n"
if len(info.success_users) == 0:
msg += "昨日,无人解出此题 😭😭\n\n"
else:
msg += f"昨日,共有 {len(info.success_users)} 人解出此题。\n\n"
msg += "前五名的解答者:\n\n"
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = manager.submissions[puzzle.raw_id][u][-1]
msg += f"- {get_username(u)}{m.time.strftime('%H:%M')}\n"
msg += "\n"
msg += f"出题人:{get_username(puzzle.author_id)}"
return msg
def get_daily_report_v2(manager: PuzzleManager, gid: int | None = None):
p = manager.get_today_puzzle()
if p is None:
return "今天无题"
msg = "==== 今日答题情况 ====\n\n"
subcount = len(functools.reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg += (
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg += f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]"
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
checkpoints_touched = len(set((
s.hint_id for s in manager.submissions[p.raw_id][u]
if (
s.hint_id >= 0
and s.hint_id in p.hints
and p.hints[s.hint_id].is_checkpoint
)
)))
checkpoint_message = ""
if checkpoints_touched > 0:
checkpoint_message = f" | 🚩 {checkpoints_touched} 记录点"
msg += f"\n- {uname} [💦 {tries} 提交{checkpoint_message}]"
return msg
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
image_manager = get_image_manager()
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=image_manager.read_puzzle_image(puzzle.img_name))
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def get_puzzle_hint_list(puzzle: Puzzle) -> str:
msg = f"==== {puzzle.title} 提示与中间答案 ====\n"
if len(puzzle.hints) == 0:
msg += "\n你没有添加任何中间答案。"
return msg
for hint_id, hint in puzzle.hints.items():
n = {False: "[提示]", True: "[中间答案]"}[hint.is_checkpoint]
msg += f"\n{n}[{hint_id}] {hint.pattern}"
msg += f"\n {hint.message}"
return msg

View File

@ -0,0 +1,9 @@
from konabot.common.path import DATA_PATH
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)

View File

@ -1,27 +1,26 @@
import asyncio import asyncio
import datetime import datetime
import random import random
import re
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
import nanoid import nanoid
from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, Field, ValidationError from pydantic import BaseModel, Field, ValidationError
from konabot.common.path import DATA_PATH from konabot.plugins.kona_ph.core.path import KONAPH_DATA_JSON
from konabot.common.username import get_username
KONAPH_BASE = DATA_PATH / "KonaPH" class PuzzleHint(BaseModel):
KONAPH_DATA_JSON = KONAPH_BASE / "data.json" pattern: str
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs" message: str
is_checkpoint: bool
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True) class PuzzleSubmission(BaseModel):
KONAPH_IMAGE_BASE.mkdir(exist_ok=True) success: bool
flag: str
time: datetime.datetime
hint_id: int = -1
class Puzzle(BaseModel): class Puzzle(BaseModel):
@ -40,41 +39,47 @@ class Puzzle(BaseModel):
ready: bool = False ready: bool = False
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
def get_image_path(self) -> Path: hints: dict[int, PuzzleHint] = Field(default_factory=dict)
return KONAPH_IMAGE_BASE / self.img_name
def get_unimessage(self) -> UniMessage[Any]: @property
result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}") def hint_id_max(self) -> int:
result = result.text(f"\n\n{self.content}") return max((0, *self.hints.keys()))
if self.img_name: def check_submission(
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes()) self,
submission: str,
result = result.text(f"\n\n出题者:{get_username(self.author_id)}") time: datetime.datetime | None = None,
result = result.text("\n\n输入「提交答案 答案」来提交你的解答") ) -> PuzzleSubmission:
if time is None:
return result time = datetime.datetime.now()
if submission == self.flag:
def add_image(self, img: bytes, suffix: str = ".png"): return PuzzleSubmission(
if self.img_name: success=True,
self.get_image_path().unlink(True) flag=submission,
img_id = nanoid.generate( time=time,
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz", )
21, for hint_id, hint in self.hints.items():
if hint.pattern.startswith('/') and hint.pattern.endswith('/'):
if re.match(hint.pattern.strip('/'), submission):
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
else:
if hint.pattern == submission:
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
) )
self.img_name = f"{img_id}{suffix}"
self.get_image_path().write_bytes(img)
def remove_image(self):
if self.img_name:
self.get_image_path().unlink(True)
self.img_name = ""
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
class DailyPuzzleInfo(BaseModel): class DailyPuzzleInfo(BaseModel):
@ -84,15 +89,10 @@ class DailyPuzzleInfo(BaseModel):
success_users: dict[str, datetime.datetime] = {} success_users: dict[str, datetime.datetime] = {}
class PuzzleSubmissionResultMessage(BaseModel): class PuzzleSubmissionFeedback(BaseModel):
success: bool submission: PuzzleSubmission
rank: int = -1 puzzle: Puzzle
message: str = "" info: DailyPuzzleInfo | None = None
def get_unimessage(self) -> UniMessage[Any]:
if self.success:
return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!")
return UniMessage.text(self.message)
def get_today_date() -> datetime.date: def get_today_date() -> datetime.date:
@ -162,26 +162,6 @@ class PuzzleManager(BaseModel):
self.index_id_counter += 1 self.index_id_counter += 1
def fix(self):
# 尝试修复今日的数据
for p in self.puzzle_data.values():
if self.is_puzzle_published(p.raw_id):
p.ready = True
if self.puzzle_pinned not in self.unpublished_puzzles:
self.puzzle_pinned = ""
# 撤回重复发布的题
already_published: set[str] = set()
for date in list(self.daily_puzzle_of_date.keys()):
index_id = self.daily_puzzle_of_date[date]
info = self.daily_puzzle[index_id]
if info.raw_id in already_published:
del self.daily_puzzle[index_id]
del self.daily_puzzle_of_date[date]
else:
already_published.add(info.raw_id)
def admin_pin_puzzle(self, raw_id: str): def admin_pin_puzzle(self, raw_id: str):
if raw_id in self.puzzle_data: if raw_id in self.puzzle_data:
self.puzzle_pinned = raw_id self.puzzle_pinned = raw_id
@ -213,41 +193,23 @@ class PuzzleManager(BaseModel):
return return
return self.daily_puzzle[p.index_id] return self.daily_puzzle[p.index_id]
def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage: def submit(self, user: str, flag: str) -> PuzzleSubmissionFeedback | str:
p = self.get_today_puzzle() p = self.get_today_puzzle()
d = self.get_today_info() d = self.get_today_info()
now = datetime.datetime.now() now = datetime.datetime.now()
if p is None or d is None: if p is None or d is None:
return PuzzleSubmissionResultMessage( return "今天没有题哦,改天再来吧!"
success=False,
message="今天没有题哦,改天再来吧!",
)
if user in d.success_users: if user in d.success_users:
return PuzzleSubmissionResultMessage( return "你今天已经答对过啦!不用重复提交哦!"
success=False,
message="你今天已经答对过啦!不用重复提交哦!",
)
if flag != p.flag:
d.tried_users.add(user)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=False,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=False,
message="❌ 答错了,请检查你的答案哦",
)
d.tried_users.add(user) d.tried_users.add(user)
d.success_users[user] = now result = p.check_submission(flag, now)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission( self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(result)
success=True, if result.success:
flag=flag, d.success_users[user] = now
time=now, return PuzzleSubmissionFeedback(
)) submission=result,
return PuzzleSubmissionResultMessage( puzzle=p,
success=True, info=d,
rank=len(d.success_users),
) )
def admin_create_puzzle(self, user: str): def admin_create_puzzle(self, user: str):
@ -273,47 +235,20 @@ class PuzzleManager(BaseModel):
if p.author_id == user if p.author_id == user
], key=lambda p: p.created_at, reverse=True) ], key=lambda p: p.created_at, reverse=True)
def get_report_yesterday(self):
yesterday = get_today_date() - datetime.timedelta(days=1)
index_id = self.daily_puzzle_of_date.get(yesterday)
if index_id is None:
return None
info = self.daily_puzzle[index_id]
puzzle = self.puzzle_data[info.raw_id]
message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告")
if len(info.success_users) == 0:
message = message.text(
"\n\n昨日,竟无人解出此题!"
)
else:
message = message.text(
f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:"
)
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = self.submissions[puzzle.raw_id][u][-1]
message = message.text(f"- {get_username(u)}{m.time.strftime('%H:%M')}")
message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
return message
lock = asyncio.Lock() lock = asyncio.Lock()
def read_data(): def read_data():
try: try:
data_raw = KONAPH_DATA_JSON.read_text() data_raw = KONAPH_DATA_JSON.read_text("utf-8")
return PuzzleManager.model_validate_json(data_raw) return PuzzleManager.model_validate_json(data_raw)
except (FileNotFoundError, ValidationError): except (FileNotFoundError, ValidationError):
return PuzzleManager() return PuzzleManager()
def write_data(data: PuzzleManager): def write_data(data: PuzzleManager):
KONAPH_DATA_JSON.write_text(data.model_dump_json()) KONAPH_DATA_JSON.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager @asynccontextmanager

View File

@ -1,15 +1,24 @@
import datetime import datetime
from math import ceil from math import ceil
from typing import Any
from nonebot import get_plugin_config from nonebot import get_plugin_config
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
Subcommand, SubcommandResult, UniMessage,
on_alconna)
from pydantic import BaseModel from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.nb.qq_broadcast import qq_broadcast from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.username import get_username from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
get_puzzle_info_message,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
get_today_date,
puzzle_manager)
PUZZLE_PAGE_SIZE = 10 PUZZLE_PAGE_SIZE = 10
@ -31,31 +40,15 @@ def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin return target.target_id in config.plugin_puzzle_admin
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]: def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \ if raw_id not in manager.puzzle_data:
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备") raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id]
status_suffix = "" if is_puzzle_admin(target):
if puzzle.raw_id == manager.puzzle_pinned: return puzzle
status_suffix += " | 📌 已被管理员置顶" if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
msg = UniMessage.text( return puzzle
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def create_admin_commands(): def create_admin_commands():
@ -82,7 +75,45 @@ def create_admin_commands():
), ),
Subcommand("publish", Args["raw_id?", str], dest="publish"), Subcommand("publish", Args["raw_id?", str], dest="publish"),
Subcommand("preview", Args["raw_id", str], dest="preview"), Subcommand("preview", Args["raw_id", str], dest="preview"),
Subcommand("get-submits", Args["raw_id", str], dest="get-submits") Subcommand("get-submits", Args["raw_id", str], dest="get-submits"),
Subcommand(
"test",
Args["raw_id", str],
Args["submission", str],
dest="test",
),
Subcommand(
"hint",
Subcommand(
"add",
Args["raw_id", str],
Args["pattern", str],
Args["message", str],
dest="add",
),
Subcommand(
"list",
Args["raw_id", str],
Args["page?", int],
dest="list",
),
Subcommand(
"modify",
Args["raw_id", str],
Args["hint_id", int],
Option("--pattern", Args["pattern", str], alias=["-p"]),
Option("--message", Args["message", str], alias=["-m"]),
Option("--checkpoint", Args["is_checkpoint", bool], alias=["-c"]),
dest="modify",
),
Subcommand(
"delete",
Args["raw_id", str],
Args["hint_id", int],
dest="delete",
),
dest="hint",
),
), ),
rule=is_puzzle_manager, rule=is_puzzle_manager,
) )
@ -98,6 +129,8 @@ def create_admin_commands():
msg = msg.text("konaph modify - 查看如何修改谜题信息\n") msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n") msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n")
msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n") msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n")
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target): if is_puzzle_admin(target):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n") msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
@ -122,15 +155,7 @@ def create_admin_commands():
@cmd_admin.assign("ready") @cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data: p = check_puzzle(manager, target, raw_id)
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
if p.ready: if p.ready:
return await target.send_message(UniMessage.text( return await target.send_message(UniMessage.text(
"题目早就准备好啦!" "题目早就准备好啦!"
@ -143,15 +168,7 @@ def create_admin_commands():
@cmd_admin.assign("unready") @cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data: p = check_puzzle(manager, target, raw_id)
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
if not p.ready: if not p.ready:
return await target.send_message(UniMessage.text( return await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经是未取消状态了!" f"谜题「{p.title}」已经是未取消状态了!"
@ -169,16 +186,7 @@ def create_admin_commands():
@cmd_admin.assign("info") @cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data: p = check_puzzle(manager, target, raw_id)
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限查看详细信息!"
))
await target.send_message(get_puzzle_info_message(manager, p)) await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my") @cmd_admin.assign("my")
@ -214,7 +222,9 @@ def create_admin_commands():
@cmd_admin.assign("all") @cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1): async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
if not is_puzzle_admin(target): if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text("你没有权限查看所有的哦")) return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()] puzzles = [*manager.puzzle_data.values()]
if ready.available: if ready.available:
@ -293,13 +303,10 @@ def create_admin_commands():
" --image <图片> 图片\n" " --image <图片> 图片\n"
" --remove-image 删除图片" " --remove-image 删除图片"
) )
image_manager = get_image_manager()
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data: p = check_puzzle(manager, target, raw_id)
return await target.send_message("没有这个谜题")
p = manager.puzzle_data[raw_id]
if not is_puzzle_admin(target) and target.target_id != p.author_id:
return await target.send_message("你没有权限编辑这个谜题")
if title is not None: if title is not None:
p.title = title p.title = title
if description is not None: if description is not None:
@ -312,9 +319,10 @@ def create_admin_commands():
) )
if image is not None and image.url is not None: if image is not None and image.url is not None:
b = await download_image_bytes(image.url) b = await download_image_bytes(image.url)
p.add_image(b.unwrap()) image_manager.remove_puzzle_image(p.img_name)
p.img_name = image_manager.upload_puzzle_image(b.unwrap())
elif remove_image.available: elif remove_image.available:
p.remove_image() image_manager.remove_puzzle_image(p.img_name)
info2 = get_puzzle_info_message(manager, p) info2 = get_puzzle_info_message(manager, p)
@ -322,6 +330,10 @@ def create_admin_commands():
@cmd_admin.assign("publish") @cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None): async def _(target: DepLongTaskTarget, raw_id: str | None = None):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
today = get_today_date() today = get_today_date()
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date: if today in manager.daily_puzzle_of_date:
@ -332,18 +344,14 @@ def create_admin_commands():
p = manager.get_today_puzzle(strong=True) p = manager.get_today_puzzle(strong=True)
if p is None: if p is None:
return await target.send_message("上架失败了orz可能是没题了") return await target.send_message("上架失败了orz可能是没题了")
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage()) await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(p))
return await target.send_message("Ok!") return await target.send_message("Ok!")
@cmd_admin.assign("preview") @cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str): async def _(target: DepLongTaskTarget, raw_id: str):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id) p = check_puzzle(manager, target, raw_id)
if puzzle is None: return await target.send_message(get_puzzle_description(p))
return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
return await target.send_message("你没有权限预览这个谜题")
return await target.send_message(puzzle.get_unimessage())
@cmd_admin.assign("get-submits") @cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str): async def _(target: DepLongTaskTarget, raw_id: str):
@ -361,5 +369,96 @@ def create_admin_commands():
msg = msg.text(f"- {get_username(uid)}{s}\n") msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg) return await target.send_message(msg)
@cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
"""
测试一道谜题的回答,并给出结果
"""
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
result = p.check_submission(submission)
msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
if len(subcommands.result.subcommands) > 0:
return
return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
.text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n")
.text(" - --checkpoint [True|False]\n - 更改是否为中间答案\n")
.text("- konaph hint delete <id> <hint_id>\n - 删除一个提示 / 中间答案\n")
.text("\n更多关于 pattern 和中间答案的信息,请见 man中间答案(7)")
)
@cmd_admin.assign("subcommands.hint.add")
async def _(
target: DepLongTaskTarget,
raw_id: str,
pattern: str,
message: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern,
message=message,
is_checkpoint=False,
)
await target.send_message("创建成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.list")
async def _(
target: DepLongTaskTarget,
raw_id: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
pattern: str | None = None,
message: str | None = None,
is_checkpoint: bool | None = None,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
hint = p.hints[hint_id]
if pattern is not None:
hint.pattern = pattern
if message is not None:
hint.message = message
if is_checkpoint is not None:
hint.is_checkpoint = is_checkpoint
await target.send_message("更改成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.delete")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin return cmd_admin

View File

@ -1,4 +1,3 @@
from curses.ascii import isdigit
from pathlib import Path from pathlib import Path
import nonebot import nonebot
@ -40,7 +39,10 @@ async def _(
doc: str | None, doc: str | None,
event: nonebot.adapters.Event, event: nonebot.adapters.Event,
): ):
if doc is not None and section is None and all(isdigit(c) for c in doc): if doc is not None and section is None and all(
ord('0') <= ord(c) <= ord('9')
for c in doc
):
section = int(doc) section = int(doc)
doc = None doc = None

View File

@ -15,7 +15,7 @@ if not POLL_DATA_FILE.exists():
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes()) POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll'] poll_list = json.loads(POLL_DATA_FILE.read_text("utf-8"))['poll']
async def createpoll(title,qqid,options): async def createpoll(title,qqid,options):
polllength = len(poll_list) polllength = len(poll_list)
@ -53,7 +53,7 @@ def writeback():
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True) # json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
POLL_DATA_FILE.write_text(json.dumps({ POLL_DATA_FILE.write_text(json.dumps({
'poll': poll_list, 'poll': poll_list,
}, ensure_ascii=False, sort_keys=True)) }, ensure_ascii=False, sort_keys=True), "utf-8")
async def pollvote(polnum,optionnum,qqnum): async def pollvote(polnum,optionnum,qqnum):
optiond = poll_list[polnum]["polldata"] optiond = poll_list[polnum]["polldata"]

View File

@ -59,14 +59,14 @@ def load_notify_config() -> NotifyConfigFile:
if not DATA_FILE_PATH.exists(): if not DATA_FILE_PATH.exists():
return NotifyConfigFile() return NotifyConfigFile()
try: try:
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text()) return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e: except Exception as e:
logger.warning(f"在解析 Notify 时遇到问题:{e}") logger.warning(f"在解析 Notify 时遇到问题:{e}")
return NotifyConfigFile() return NotifyConfigFile()
def save_notify_config(config: NotifyConfigFile): def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4)) DATA_FILE_PATH.write_text(config.model_dump_json(indent=4), "utf-8")
@evt.handle() @evt.handle()

13
scripts/watch_filter.py Normal file
View File

@ -0,0 +1,13 @@
from pathlib import Path
from watchfiles import Change
base = Path(__file__).parent.parent.absolute()
def filter(change: Change, path: str) -> bool:
if "__pycache__" in path:
return False
if Path(path).absolute().is_relative_to(base / "data"):
return False
return True