Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0231aa04f4 | |||
| 01fe33eb9f | |||
| adfbac7d90 | |||
| 994c1412da | |||
| 8780dfec6f | |||
| 490d807e7a | |||
| fa208199ab | |||
| 38a17f42a3 |
22
.drone.yml
22
.drone.yml
@ -38,6 +38,17 @@ steps:
|
||||
path: /var/run/docker.sock
|
||||
commands:
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
|
||||
- name: 发送构建结果到 ntfy
|
||||
image: parrazam/drone-ntfy
|
||||
when:
|
||||
status: [success, failure]
|
||||
settings:
|
||||
url: https://ntfy.service.jazzwhom.top
|
||||
topic: drone_ci
|
||||
tags:
|
||||
- drone-ci
|
||||
token:
|
||||
from_secret: NTFY_TOKEN
|
||||
|
||||
volumes:
|
||||
- name: docker-socket
|
||||
@ -74,6 +85,17 @@ steps:
|
||||
volumes:
|
||||
- name: docker-socket
|
||||
path: /var/run/docker.sock
|
||||
- name: 发送构建结果到 ntfy
|
||||
image: parrazam/drone-ntfy
|
||||
when:
|
||||
status: [success, failure]
|
||||
settings:
|
||||
url: https://ntfy.service.jazzwhom.top
|
||||
topic: drone_ci
|
||||
tags:
|
||||
- drone-ci
|
||||
token:
|
||||
from_secret: NTFY_TOKEN
|
||||
|
||||
volumes:
|
||||
- name: docker-socket
|
||||
|
||||
3
.vscode/settings.json
vendored
Normal file
3
.vscode/settings.json
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
{
|
||||
"python.REPL.enableREPLSmartSend": false
|
||||
}
|
||||
@ -68,7 +68,7 @@ code .
|
||||
使用命令行手动启动 Bot:
|
||||
|
||||
```bash
|
||||
poetry run watchfiles bot.main konabot
|
||||
poetry run watchfiles bot.main . --filter scripts.watch_filter.filter
|
||||
```
|
||||
|
||||
如果你不希望自动重载,只是想运行 Bot,可以直接运行:
|
||||
|
||||
@ -19,12 +19,12 @@ class DataManager(Generic[T]):
|
||||
if not self.fp.exists():
|
||||
return self.cls()
|
||||
try:
|
||||
return self.cls.model_validate_json(self.fp.read_text())
|
||||
return self.cls.model_validate_json(self.fp.read_text("utf-8"))
|
||||
except ValidationError:
|
||||
return self.cls()
|
||||
|
||||
def save(self, data: T):
|
||||
self.fp.write_text(data.model_dump_json())
|
||||
self.fp.write_text(data.model_dump_json(), "utf-8")
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_data(self):
|
||||
|
||||
@ -240,7 +240,7 @@ def handle_long_task(callback_id: str):
|
||||
|
||||
def _load_longtask_data() -> LongTaskModuleData:
|
||||
try:
|
||||
txt = LONGTASK_DATA_DIR.read_text()
|
||||
txt = LONGTASK_DATA_DIR.read_text("utf-8")
|
||||
return LongTaskModuleData.model_validate_json(txt)
|
||||
except (FileNotFoundError, ValidationError) as e:
|
||||
logger.info(f"取得 LongTask 数据时出现问题:{e}")
|
||||
@ -251,7 +251,7 @@ def _load_longtask_data() -> LongTaskModuleData:
|
||||
|
||||
|
||||
def _save_longtask_data(data: LongTaskModuleData):
|
||||
LONGTASK_DATA_DIR.write_text(data.model_dump_json())
|
||||
LONGTASK_DATA_DIR.write_text(data.model_dump_json(), "utf-8")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
||||
@ -1,10 +1,13 @@
|
||||
from typing import Any, cast
|
||||
|
||||
import nonebot
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from nonebot.adapters.onebot.v11 import Bot as OBBot
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
|
||||
async def qq_broadcast(groups: list[str], msg: UniMessage[Any]):
|
||||
async def qq_broadcast(groups: list[str], msg: UniMessage[Any] | str):
|
||||
if isinstance(msg, str):
|
||||
msg = UniMessage.text(msg)
|
||||
bots: dict[str, OBBot] = {}
|
||||
|
||||
# group_id -> bot_id
|
||||
|
||||
11
konabot/docs/concepts/中间答案.txt
Normal file
11
konabot/docs/concepts/中间答案.txt
Normal file
@ -0,0 +1,11 @@
|
||||
关于「中间答案」或者「提示」:
|
||||
|
||||
在 KonaPH 中,当有人发送「提交答案 答案」时,会检查答案是否符合你设置的中间答案的 pattern。这个 pattern 可以有两种方式:
|
||||
|
||||
- 纯文本的完整匹配:你设置的 pattern 如果和提交的答案完全相等,则会触发提示。
|
||||
- regex 匹配:你设置的 pattern 如果以斜杠(/)开头和结尾,就会检查提交的答案是否匹配正则表达式。注意 ^ 和 $ 符号的使用。
|
||||
- 例如:/^commit$/ 会匹配 commit,不会匹配 acommit、Commit 等。
|
||||
- 而如果是 /commit/,则会匹配 commit、acommit,而不会匹配 Commit。
|
||||
- 无法使用 Javascript 的字符串声明模式,例如,/case_insensitive/i 就不会被视作一个正则表达式。
|
||||
|
||||
一个提示是提示,还是中间答案,取决于它是否有 checkpoint 标记。如果有 checkpoint 标记,则会提示用户「你回答了一个中间答案」,并且这个中间答案的回答会在排行榜中显示。
|
||||
@ -30,7 +30,7 @@ def load_banned_ids() -> list[str]:
|
||||
if not DATA_FILE_PATH.exists():
|
||||
return []
|
||||
try:
|
||||
return json.loads(DATA_FILE_PATH.read_text())
|
||||
return json.loads(DATA_FILE_PATH.read_text("utf-8"))
|
||||
except Exception as e:
|
||||
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
|
||||
return []
|
||||
@ -45,14 +45,14 @@ def add_banned_id(group_id: str):
|
||||
banned_ids = load_banned_ids()
|
||||
if group_id not in banned_ids:
|
||||
banned_ids.append(group_id)
|
||||
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
|
||||
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
|
||||
|
||||
|
||||
def remove_banned_id(group_id: str):
|
||||
banned_ids = load_banned_ids()
|
||||
if group_id in banned_ids:
|
||||
banned_ids.remove(group_id)
|
||||
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
|
||||
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
|
||||
|
||||
|
||||
class TryStartState(Enum):
|
||||
|
||||
@ -1,16 +1,23 @@
|
||||
from functools import reduce
|
||||
from math import ceil
|
||||
import datetime
|
||||
import re
|
||||
from loguru import logger
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||||
from konabot.common.username import get_username
|
||||
from konabot.plugins.kona_ph.core.storage import get_today_date
|
||||
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from math import ceil
|
||||
|
||||
from loguru import logger
|
||||
from nonebot import on_message
|
||||
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
|
||||
on_alconna)
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||||
from konabot.plugins.kona_ph.core.message import (get_daily_report,
|
||||
get_daily_report_v2,
|
||||
get_puzzle_description,
|
||||
get_submission_message)
|
||||
from konabot.plugins.kona_ph.core.storage import get_today_date
|
||||
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
|
||||
create_admin_commands,
|
||||
puzzle_manager)
|
||||
|
||||
create_admin_commands()
|
||||
|
||||
@ -23,16 +30,24 @@ async def is_play_group(target: DepLongTaskTarget):
|
||||
return False
|
||||
|
||||
|
||||
cmd_submit = on_alconna(Alconna(
|
||||
"re:提交(?:答案|题解|[fF]lag)",
|
||||
Args["flag", str],
|
||||
), rule=is_play_group)
|
||||
cmd_submit = on_message(rule=is_play_group)
|
||||
|
||||
|
||||
@cmd_submit.handle()
|
||||
async def _(flag: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
result = manager.submit(target.target_id, flag)
|
||||
await target.send_message(result.get_unimessage())
|
||||
async def _(msg: UniMsg, target: DepLongTaskTarget):
|
||||
txt = msg.extract_plain_text().strip()
|
||||
if match := re.match(r"^提交(?:答案|题解|[fF]lag)\s*(?P<submission>.+?)\s*$", txt):
|
||||
submission: str = match.group("submission")
|
||||
async with puzzle_manager() as manager:
|
||||
result = manager.submit(target.target_id, submission)
|
||||
if isinstance(result, str):
|
||||
await target.send_message(result)
|
||||
else:
|
||||
await target.send_message(get_submission_message(
|
||||
daily_puzzle_info=result.info,
|
||||
submission=result.submission,
|
||||
puzzle=result.puzzle,
|
||||
))
|
||||
|
||||
|
||||
cmd_query = on_alconna(Alconna(
|
||||
@ -45,7 +60,7 @@ async def _(target: DepLongTaskTarget):
|
||||
p = manager.get_today_puzzle()
|
||||
if p is None:
|
||||
return await target.send_message("今天无题,改日再来吧!")
|
||||
await target.send_message(p.get_unimessage())
|
||||
await target.send_message(get_puzzle_description(p))
|
||||
|
||||
|
||||
cmd_query_submission = on_alconna(Alconna(
|
||||
@ -54,44 +69,11 @@ cmd_query_submission = on_alconna(Alconna(
|
||||
|
||||
@cmd_query_submission.handle()
|
||||
async def _(target: DepLongTaskTarget):
|
||||
gid = None
|
||||
if re.match(r"^\d+$", target.channel_id):
|
||||
gid = int(target.channel_id)
|
||||
async with puzzle_manager() as manager:
|
||||
p = manager.get_today_puzzle()
|
||||
if p is None:
|
||||
return await target.send_message("今天无题")
|
||||
msg = UniMessage.text("==== 今日答题情况 ====\n\n")
|
||||
|
||||
subcount = len(reduce(
|
||||
lambda x, y: x + y,
|
||||
manager.submissions.get(p.raw_id, {}).values(),
|
||||
[],
|
||||
))
|
||||
info = manager.daily_puzzle[p.index_id]
|
||||
|
||||
msg = msg.text(
|
||||
f"总体情况:答对 {len(info.success_users)} / "
|
||||
f"参与 {len(info.tried_users)} / "
|
||||
f"提交 {subcount}\n"
|
||||
)
|
||||
|
||||
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
|
||||
gid = None
|
||||
if re.match(r"^\d+$", target.channel_id):
|
||||
gid = int(target.channel_id)
|
||||
for u, d in success_users:
|
||||
uname = u
|
||||
if re.match(r"^\d+$", u):
|
||||
uname = get_username(int(u), gid)
|
||||
t = d.strftime("%H:%M")
|
||||
tries = len(manager.submissions[p.raw_id][u])
|
||||
msg = msg.text(f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]")
|
||||
for u in info.tried_users - set(info.success_users.keys()):
|
||||
uname = u
|
||||
if re.match(r"^\d+$", u):
|
||||
uname = get_username(int(u), gid)
|
||||
tries = len(manager.submissions[p.raw_id][u])
|
||||
msg = msg.text(f"\n- {uname} [💦 {tries} 提交]")
|
||||
|
||||
await target.send_message(msg)
|
||||
await target.send_message(get_daily_report_v2(manager, gid))
|
||||
|
||||
|
||||
cmd_history = on_alconna(Alconna(
|
||||
@ -106,15 +88,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
today = get_today_date()
|
||||
if index_id:
|
||||
index_id = index_id.removeprefix("#")
|
||||
if index_id == manager.daily_puzzle_of_date.get(today, ""):
|
||||
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
|
||||
return await target.send_message(puzzle.get_unimessage())
|
||||
if index_id in manager.daily_puzzle:
|
||||
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
|
||||
msg = puzzle.get_unimessage()
|
||||
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}")
|
||||
return await target.send_message(msg)
|
||||
return await target.send_message("没有这道题哦")
|
||||
if index_id not in manager.daily_puzzle:
|
||||
return await target.send_message("没有这道题哦")
|
||||
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
|
||||
msg = get_puzzle_description(
|
||||
puzzle,
|
||||
with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")),
|
||||
)
|
||||
return await target.send_message(msg)
|
||||
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
|
||||
puzzles = [
|
||||
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
|
||||
@ -141,15 +122,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
@scheduler.scheduled_job("cron", hour="8")
|
||||
async def _():
|
||||
async with puzzle_manager() as manager:
|
||||
msg2 = manager.get_report_yesterday()
|
||||
yesterday = get_today_date() - datetime.timedelta(days=1)
|
||||
msg2 = get_daily_report(manager, yesterday)
|
||||
if msg2 is not None:
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
|
||||
|
||||
puzzle = manager.get_today_puzzle()
|
||||
if puzzle is not None:
|
||||
logger.info(f"找到了题目 {puzzle.raw_id},发送")
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage())
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
|
||||
else:
|
||||
logger.info("自动任务:没有找到题目,跳过")
|
||||
|
||||
|
||||
|
||||
29
konabot/plugins/kona_ph/core/image.py
Normal file
29
konabot/plugins/kona_ph/core/image.py
Normal file
@ -0,0 +1,29 @@
|
||||
import nanoid
|
||||
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
from konabot.plugins.kona_ph.core.path import KONAPH_IMAGE_BASE
|
||||
|
||||
|
||||
class PuzzleImageManager:
|
||||
def read_puzzle_image(self, img_name: str) -> bytes:
|
||||
fp = KONAPH_IMAGE_BASE / img_name
|
||||
if fp.exists():
|
||||
return fp.read_bytes()
|
||||
return (ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes()
|
||||
|
||||
def upload_puzzle_image(self, data: bytes, suffix: str = ".png") -> str:
|
||||
id = nanoid.generate(
|
||||
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
|
||||
21,
|
||||
)
|
||||
img_name = f"{id}{suffix}"
|
||||
(KONAPH_IMAGE_BASE / img_name).write_bytes(data)
|
||||
return img_name
|
||||
|
||||
def remove_puzzle_image(self, img_name: str):
|
||||
if img_name:
|
||||
(KONAPH_IMAGE_BASE / img_name).unlink(True)
|
||||
|
||||
|
||||
def get_image_manager() -> PuzzleImageManager:
|
||||
return PuzzleImageManager()
|
||||
176
konabot/plugins/kona_ph/core/message.py
Normal file
176
konabot/plugins/kona_ph/core/message.py
Normal file
@ -0,0 +1,176 @@
|
||||
"""
|
||||
生成各种各样的 Message 的函数集合
|
||||
"""
|
||||
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
from konabot.common.username import get_username
|
||||
from konabot.plugins.kona_ph.core.image import get_image_manager
|
||||
from konabot.plugins.kona_ph.core.storage import (DailyPuzzleInfo, Puzzle,
|
||||
PuzzleManager,
|
||||
PuzzleSubmission)
|
||||
|
||||
|
||||
def get_puzzle_description(puzzle: Puzzle, with_answer: bool = False) -> UniMessage[Any]:
|
||||
"""
|
||||
获取一个谜题的描述
|
||||
"""
|
||||
|
||||
img_manager = get_image_manager()
|
||||
|
||||
result = UniMessage.text(f"[KonaPH#{puzzle.index_id}] {puzzle.title}")
|
||||
result = result.text(f"\n\n{puzzle.content}")
|
||||
|
||||
if puzzle.img_name:
|
||||
result = result.text("\n\n").image(
|
||||
raw=img_manager.read_puzzle_image(puzzle.img_name)
|
||||
)
|
||||
|
||||
result = result.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
|
||||
|
||||
if with_answer:
|
||||
result = result.text(f"\n\n题目答案:{puzzle.flag}")
|
||||
else:
|
||||
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_submission_message(
|
||||
puzzle: Puzzle,
|
||||
submission: PuzzleSubmission,
|
||||
daily_puzzle_info: DailyPuzzleInfo | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
获得提交答案的反馈信息
|
||||
"""
|
||||
|
||||
if submission.success:
|
||||
rank = -1
|
||||
if daily_puzzle_info is not None:
|
||||
rank = len(daily_puzzle_info.success_users)
|
||||
return f"🎉 恭喜你答对了!你是今天第 {rank} 个解出来的!"
|
||||
if submission.hint_id >= 0 and (
|
||||
hint := puzzle.hints.get(submission.hint_id)
|
||||
) is not None:
|
||||
if hint.is_checkpoint:
|
||||
hint_msg = "✨ 恭喜!这是本题的中间答案,加油!"
|
||||
else:
|
||||
hint_msg = "🤔 答错啦!请检查你的答案。"
|
||||
return f"{hint_msg}\n\n提示:{hint.message}"
|
||||
return "❌ 答错啦!请检查你的答案。"
|
||||
|
||||
|
||||
def get_daily_report(
|
||||
manager: PuzzleManager,
|
||||
date: datetime.date,
|
||||
) -> str | None:
|
||||
"""
|
||||
获得某日的提交的报告信息
|
||||
"""
|
||||
|
||||
index_id = manager.daily_puzzle_of_date.get(date)
|
||||
if index_id is None:
|
||||
return None
|
||||
info = manager.daily_puzzle[index_id]
|
||||
puzzle = manager.puzzle_data[info.raw_id]
|
||||
|
||||
msg = f"[KonaPH#{puzzle.index_id}] 「{puzzle.title}」解答报告\n\n"
|
||||
if len(info.success_users) == 0:
|
||||
msg += "昨日,无人解出此题 😭😭\n\n"
|
||||
else:
|
||||
msg += f"昨日,共有 {len(info.success_users)} 人解出此题。\n\n"
|
||||
msg += "前五名的解答者:\n\n"
|
||||
us = [(u, d) for u, d in info.success_users.items()]
|
||||
us = sorted(us, key=lambda t: t[1])
|
||||
us = us[:5]
|
||||
for u, _ in us:
|
||||
m = manager.submissions[puzzle.raw_id][u][-1]
|
||||
msg += f"- {get_username(u)} 于 {m.time.strftime('%H:%M')}\n"
|
||||
msg += "\n"
|
||||
msg += f"出题人:{get_username(puzzle.author_id)}"
|
||||
return msg
|
||||
|
||||
|
||||
def get_daily_report_v2(manager: PuzzleManager, gid: int | None = None):
|
||||
p = manager.get_today_puzzle()
|
||||
if p is None:
|
||||
return "今天无题"
|
||||
msg = "==== 今日答题情况 ====\n\n"
|
||||
|
||||
subcount = len(functools.reduce(
|
||||
lambda x, y: x + y,
|
||||
manager.submissions.get(p.raw_id, {}).values(),
|
||||
[],
|
||||
))
|
||||
info = manager.daily_puzzle[p.index_id]
|
||||
|
||||
msg += (
|
||||
f"总体情况:答对 {len(info.success_users)} / "
|
||||
f"参与 {len(info.tried_users)} / "
|
||||
f"提交 {subcount}\n"
|
||||
)
|
||||
|
||||
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
|
||||
for u, d in success_users:
|
||||
uname = u
|
||||
if re.match(r"^\d+$", u):
|
||||
uname = get_username(int(u), gid)
|
||||
t = d.strftime("%H:%M")
|
||||
tries = len(manager.submissions[p.raw_id][u])
|
||||
msg += f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]"
|
||||
for u in info.tried_users - set(info.success_users.keys()):
|
||||
uname = u
|
||||
if re.match(r"^\d+$", u):
|
||||
uname = get_username(int(u), gid)
|
||||
tries = len(manager.submissions[p.raw_id][u])
|
||||
msg += f"\n- {uname} [💦 {tries} 提交]"
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
|
||||
image_manager = get_image_manager()
|
||||
|
||||
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
|
||||
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
|
||||
|
||||
status_suffix = ""
|
||||
if puzzle.raw_id == manager.puzzle_pinned:
|
||||
status_suffix += " | 📌 已被管理员置顶"
|
||||
|
||||
msg = UniMessage.text(
|
||||
f"--- 谜题信息 ---\n"
|
||||
f"Raw ID: {puzzle.raw_id}\n"
|
||||
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
|
||||
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||||
f"状态: {status}{status_suffix}\n\n"
|
||||
f"标题: {puzzle.title}\n"
|
||||
f"Flag: {puzzle.flag}\n\n"
|
||||
f"{puzzle.content}"
|
||||
)
|
||||
|
||||
if puzzle.img_name:
|
||||
msg = msg.image(raw=image_manager.read_puzzle_image(puzzle.img_name))
|
||||
|
||||
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
def get_puzzle_hint_list(puzzle: Puzzle) -> str:
|
||||
msg = f"==== {puzzle.title} 提示与中间答案 ====\n"
|
||||
if len(puzzle.hints) == 0:
|
||||
msg += "\n你没有添加任何中间答案。"
|
||||
return msg
|
||||
for hint_id, hint in puzzle.hints.items():
|
||||
n = {False: "[提示]", True: "[中间答案]"}[hint.is_checkpoint]
|
||||
msg += f"\n{n}[{hint_id}] {hint.pattern}"
|
||||
msg += f"\n {hint.message}"
|
||||
return msg
|
||||
9
konabot/plugins/kona_ph/core/path.py
Normal file
9
konabot/plugins/kona_ph/core/path.py
Normal file
@ -0,0 +1,9 @@
|
||||
from konabot.common.path import DATA_PATH
|
||||
|
||||
KONAPH_BASE = DATA_PATH / "KonaPH"
|
||||
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
|
||||
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
|
||||
|
||||
# 保证所有文件夹存在
|
||||
KONAPH_BASE.mkdir(exist_ok=True)
|
||||
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)
|
||||
@ -1,27 +1,26 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
import random
|
||||
|
||||
import re
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import nanoid
|
||||
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
|
||||
from konabot.common.path import DATA_PATH
|
||||
from konabot.common.username import get_username
|
||||
from konabot.plugins.kona_ph.core.path import KONAPH_DATA_JSON
|
||||
|
||||
|
||||
KONAPH_BASE = DATA_PATH / "KonaPH"
|
||||
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
|
||||
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
|
||||
class PuzzleHint(BaseModel):
|
||||
pattern: str
|
||||
message: str
|
||||
is_checkpoint: bool
|
||||
|
||||
# 保证所有文件夹存在
|
||||
KONAPH_BASE.mkdir(exist_ok=True)
|
||||
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)
|
||||
|
||||
class PuzzleSubmission(BaseModel):
|
||||
success: bool
|
||||
flag: str
|
||||
time: datetime.datetime
|
||||
hint_id: int = -1
|
||||
|
||||
|
||||
class Puzzle(BaseModel):
|
||||
@ -40,41 +39,47 @@ class Puzzle(BaseModel):
|
||||
ready: bool = False
|
||||
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
|
||||
|
||||
def get_image_path(self) -> Path:
|
||||
return KONAPH_IMAGE_BASE / self.img_name
|
||||
hints: dict[int, PuzzleHint] = Field(default_factory=dict)
|
||||
|
||||
def get_unimessage(self) -> UniMessage[Any]:
|
||||
result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}")
|
||||
result = result.text(f"\n\n{self.content}")
|
||||
@property
|
||||
def hint_id_max(self) -> int:
|
||||
return max((0, *self.hints.keys()))
|
||||
|
||||
if self.img_name:
|
||||
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes())
|
||||
|
||||
result = result.text(f"\n\n出题者:{get_username(self.author_id)}")
|
||||
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
|
||||
|
||||
return result
|
||||
|
||||
def add_image(self, img: bytes, suffix: str = ".png"):
|
||||
if self.img_name:
|
||||
self.get_image_path().unlink(True)
|
||||
img_id = nanoid.generate(
|
||||
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
|
||||
21,
|
||||
def check_submission(
|
||||
self,
|
||||
submission: str,
|
||||
time: datetime.datetime | None = None,
|
||||
) -> PuzzleSubmission:
|
||||
if time is None:
|
||||
time = datetime.datetime.now()
|
||||
if submission == self.flag:
|
||||
return PuzzleSubmission(
|
||||
success=True,
|
||||
flag=submission,
|
||||
time=time,
|
||||
)
|
||||
for hint_id, hint in self.hints.items():
|
||||
if hint.pattern.startswith('/') and hint.pattern.endswith('/'):
|
||||
if re.match(hint.pattern.strip('/'), submission):
|
||||
return PuzzleSubmission(
|
||||
success=False,
|
||||
flag=submission,
|
||||
time=time,
|
||||
hint_id=hint_id,
|
||||
)
|
||||
else:
|
||||
if hint.pattern == submission:
|
||||
return PuzzleSubmission(
|
||||
success=False,
|
||||
flag=submission,
|
||||
time=time,
|
||||
hint_id=hint_id,
|
||||
)
|
||||
return PuzzleSubmission(
|
||||
success=False,
|
||||
flag=submission,
|
||||
time=time,
|
||||
)
|
||||
self.img_name = f"{img_id}{suffix}"
|
||||
self.get_image_path().write_bytes(img)
|
||||
|
||||
def remove_image(self):
|
||||
if self.img_name:
|
||||
self.get_image_path().unlink(True)
|
||||
self.img_name = ""
|
||||
|
||||
|
||||
class PuzzleSubmission(BaseModel):
|
||||
success: bool
|
||||
flag: str
|
||||
time: datetime.datetime
|
||||
|
||||
|
||||
class DailyPuzzleInfo(BaseModel):
|
||||
@ -84,15 +89,10 @@ class DailyPuzzleInfo(BaseModel):
|
||||
success_users: dict[str, datetime.datetime] = {}
|
||||
|
||||
|
||||
class PuzzleSubmissionResultMessage(BaseModel):
|
||||
success: bool
|
||||
rank: int = -1
|
||||
message: str = ""
|
||||
|
||||
def get_unimessage(self) -> UniMessage[Any]:
|
||||
if self.success:
|
||||
return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!")
|
||||
return UniMessage.text(self.message)
|
||||
class PuzzleSubmissionFeedback(BaseModel):
|
||||
submission: PuzzleSubmission
|
||||
puzzle: Puzzle
|
||||
info: DailyPuzzleInfo | None = None
|
||||
|
||||
|
||||
def get_today_date() -> datetime.date:
|
||||
@ -162,26 +162,6 @@ class PuzzleManager(BaseModel):
|
||||
|
||||
self.index_id_counter += 1
|
||||
|
||||
def fix(self):
|
||||
# 尝试修复今日的数据
|
||||
for p in self.puzzle_data.values():
|
||||
if self.is_puzzle_published(p.raw_id):
|
||||
p.ready = True
|
||||
|
||||
if self.puzzle_pinned not in self.unpublished_puzzles:
|
||||
self.puzzle_pinned = ""
|
||||
|
||||
# 撤回重复发布的题
|
||||
already_published: set[str] = set()
|
||||
for date in list(self.daily_puzzle_of_date.keys()):
|
||||
index_id = self.daily_puzzle_of_date[date]
|
||||
info = self.daily_puzzle[index_id]
|
||||
if info.raw_id in already_published:
|
||||
del self.daily_puzzle[index_id]
|
||||
del self.daily_puzzle_of_date[date]
|
||||
else:
|
||||
already_published.add(info.raw_id)
|
||||
|
||||
def admin_pin_puzzle(self, raw_id: str):
|
||||
if raw_id in self.puzzle_data:
|
||||
self.puzzle_pinned = raw_id
|
||||
@ -213,41 +193,23 @@ class PuzzleManager(BaseModel):
|
||||
return
|
||||
return self.daily_puzzle[p.index_id]
|
||||
|
||||
def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage:
|
||||
def submit(self, user: str, flag: str) -> PuzzleSubmissionFeedback | str:
|
||||
p = self.get_today_puzzle()
|
||||
d = self.get_today_info()
|
||||
now = datetime.datetime.now()
|
||||
if p is None or d is None:
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=False,
|
||||
message="今天没有题哦,改天再来吧!",
|
||||
)
|
||||
return "今天没有题哦,改天再来吧!"
|
||||
if user in d.success_users:
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=False,
|
||||
message="你今天已经答对过啦!不用重复提交哦!",
|
||||
)
|
||||
if flag != p.flag:
|
||||
d.tried_users.add(user)
|
||||
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
|
||||
success=False,
|
||||
flag=flag,
|
||||
time=now,
|
||||
))
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=False,
|
||||
message="❌ 答错了,请检查你的答案哦",
|
||||
)
|
||||
return "你今天已经答对过啦!不用重复提交哦!"
|
||||
d.tried_users.add(user)
|
||||
d.success_users[user] = now
|
||||
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
|
||||
success=True,
|
||||
flag=flag,
|
||||
time=now,
|
||||
))
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=True,
|
||||
rank=len(d.success_users),
|
||||
result = p.check_submission(flag, now)
|
||||
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(result)
|
||||
if result.success:
|
||||
d.success_users[user] = now
|
||||
return PuzzleSubmissionFeedback(
|
||||
submission=result,
|
||||
puzzle=p,
|
||||
info=d,
|
||||
)
|
||||
|
||||
def admin_create_puzzle(self, user: str):
|
||||
@ -273,47 +235,20 @@ class PuzzleManager(BaseModel):
|
||||
if p.author_id == user
|
||||
], key=lambda p: p.created_at, reverse=True)
|
||||
|
||||
def get_report_yesterday(self):
|
||||
yesterday = get_today_date() - datetime.timedelta(days=1)
|
||||
index_id = self.daily_puzzle_of_date.get(yesterday)
|
||||
if index_id is None:
|
||||
return None
|
||||
info = self.daily_puzzle[index_id]
|
||||
puzzle = self.puzzle_data[info.raw_id]
|
||||
message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告")
|
||||
|
||||
if len(info.success_users) == 0:
|
||||
message = message.text(
|
||||
"\n\n昨日,竟无人解出此题!"
|
||||
)
|
||||
else:
|
||||
message = message.text(
|
||||
f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:"
|
||||
)
|
||||
us = [(u, d) for u, d in info.success_users.items()]
|
||||
us = sorted(us, key=lambda t: t[1])
|
||||
us = us[:5]
|
||||
for u, _ in us:
|
||||
m = self.submissions[puzzle.raw_id][u][-1]
|
||||
message = message.text(f"- {get_username(u)} 于 {m.time.strftime('%H:%M')}")
|
||||
|
||||
message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
|
||||
return message
|
||||
|
||||
|
||||
lock = asyncio.Lock()
|
||||
|
||||
|
||||
def read_data():
|
||||
try:
|
||||
data_raw = KONAPH_DATA_JSON.read_text()
|
||||
data_raw = KONAPH_DATA_JSON.read_text("utf-8")
|
||||
return PuzzleManager.model_validate_json(data_raw)
|
||||
except (FileNotFoundError, ValidationError):
|
||||
return PuzzleManager()
|
||||
|
||||
|
||||
def write_data(data: PuzzleManager):
|
||||
KONAPH_DATA_JSON.write_text(data.model_dump_json())
|
||||
KONAPH_DATA_JSON.write_text(data.model_dump_json(), "utf-8")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
||||
@ -1,15 +1,24 @@
|
||||
import datetime
|
||||
from math import ceil
|
||||
from typing import Any
|
||||
|
||||
from nonebot import get_plugin_config
|
||||
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna
|
||||
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
|
||||
Subcommand, SubcommandResult, UniMessage,
|
||||
on_alconna)
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.nb.extract_image import download_image_bytes
|
||||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||||
from konabot.common.username import get_username
|
||||
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager
|
||||
from konabot.plugins.kona_ph.core.image import get_image_manager
|
||||
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
|
||||
get_puzzle_info_message,
|
||||
get_submission_message)
|
||||
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
|
||||
get_today_date,
|
||||
puzzle_manager)
|
||||
|
||||
PUZZLE_PAGE_SIZE = 10
|
||||
|
||||
@ -31,31 +40,15 @@ def is_puzzle_admin(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_admin
|
||||
|
||||
|
||||
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
|
||||
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
|
||||
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
|
||||
|
||||
status_suffix = ""
|
||||
if puzzle.raw_id == manager.puzzle_pinned:
|
||||
status_suffix += " | 📌 已被管理员置顶"
|
||||
|
||||
msg = UniMessage.text(
|
||||
f"--- 谜题信息 ---\n"
|
||||
f"Raw ID: {puzzle.raw_id}\n"
|
||||
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
|
||||
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||||
f"状态: {status}{status_suffix}\n\n"
|
||||
f"标题: {puzzle.title}\n"
|
||||
f"Flag: {puzzle.flag}\n\n"
|
||||
f"{puzzle.content}"
|
||||
)
|
||||
|
||||
if puzzle.img_name:
|
||||
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
|
||||
|
||||
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
|
||||
|
||||
return msg
|
||||
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
raise BotExceptionMessage("没有这个谜题")
|
||||
puzzle = manager.puzzle_data[raw_id]
|
||||
if is_puzzle_admin(target):
|
||||
return puzzle
|
||||
if target.target_id != puzzle.author_id:
|
||||
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
|
||||
return puzzle
|
||||
|
||||
|
||||
def create_admin_commands():
|
||||
@ -82,7 +75,45 @@ def create_admin_commands():
|
||||
),
|
||||
Subcommand("publish", Args["raw_id?", str], dest="publish"),
|
||||
Subcommand("preview", Args["raw_id", str], dest="preview"),
|
||||
Subcommand("get-submits", Args["raw_id", str], dest="get-submits")
|
||||
Subcommand("get-submits", Args["raw_id", str], dest="get-submits"),
|
||||
Subcommand(
|
||||
"test",
|
||||
Args["raw_id", str],
|
||||
Args["submission", str],
|
||||
dest="test",
|
||||
),
|
||||
Subcommand(
|
||||
"hint",
|
||||
Subcommand(
|
||||
"add",
|
||||
Args["raw_id", str],
|
||||
Args["pattern", str],
|
||||
Args["message", str],
|
||||
dest="add",
|
||||
),
|
||||
Subcommand(
|
||||
"list",
|
||||
Args["raw_id", str],
|
||||
Args["page?", int],
|
||||
dest="list",
|
||||
),
|
||||
Subcommand(
|
||||
"modify",
|
||||
Args["raw_id", str],
|
||||
Args["hint_id", int],
|
||||
Option("--pattern", Args["pattern", str], alias=["-p"]),
|
||||
Option("--message", Args["message", str], alias=["-m"]),
|
||||
Option("--checkpoint", Args["is_checkpoint", bool], alias=["-c"]),
|
||||
dest="modify",
|
||||
),
|
||||
Subcommand(
|
||||
"delete",
|
||||
Args["raw_id", str],
|
||||
Args["hint_id", int],
|
||||
dest="delete",
|
||||
),
|
||||
dest="hint",
|
||||
),
|
||||
),
|
||||
rule=is_puzzle_manager,
|
||||
)
|
||||
@ -98,6 +129,8 @@ def create_admin_commands():
|
||||
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
|
||||
msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n")
|
||||
msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n")
|
||||
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
|
||||
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
|
||||
|
||||
if is_puzzle_admin(target):
|
||||
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
|
||||
@ -122,15 +155,7 @@ def create_admin_commands():
|
||||
@cmd_admin.assign("ready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
if p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"题目早就准备好啦!"
|
||||
@ -143,15 +168,7 @@ def create_admin_commands():
|
||||
@cmd_admin.assign("unready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
if not p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经是未取消状态了!"
|
||||
@ -169,16 +186,7 @@ def create_admin_commands():
|
||||
@cmd_admin.assign("info")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这不是你的题,你没有权限查看详细信息!"
|
||||
))
|
||||
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
await target.send_message(get_puzzle_info_message(manager, p))
|
||||
|
||||
@cmd_admin.assign("my")
|
||||
@ -214,7 +222,9 @@ def create_admin_commands():
|
||||
@cmd_admin.assign("all")
|
||||
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text("你没有权限查看所有的哦"))
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async with puzzle_manager() as manager:
|
||||
puzzles = [*manager.puzzle_data.values()]
|
||||
if ready.available:
|
||||
@ -293,24 +303,26 @@ def create_admin_commands():
|
||||
" --image <图片> 图片\n"
|
||||
" --remove-image 删除图片"
|
||||
)
|
||||
image_manager = get_image_manager()
|
||||
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message("没有这个谜题")
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if not is_puzzle_admin(target) and target.target_id != p.author_id:
|
||||
return await target.send_message("你没有权限编辑这个谜题")
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
if title is not None:
|
||||
p.title = title
|
||||
if description is not None:
|
||||
p.content = description
|
||||
if flag is not None:
|
||||
p.flag = flag
|
||||
p.flag = flag.strip()
|
||||
if flag.strip() != flag:
|
||||
await target.send_message(
|
||||
"⚠️ 注意:你输入的 Flag 含有开头或结尾的空格,已经帮你去除"
|
||||
)
|
||||
if image is not None and image.url is not None:
|
||||
b = await download_image_bytes(image.url)
|
||||
p.add_image(b.unwrap())
|
||||
image_manager.remove_puzzle_image(p.img_name)
|
||||
image_manager.upload_puzzle_image(b.unwrap())
|
||||
elif remove_image.available:
|
||||
p.remove_image()
|
||||
image_manager.remove_puzzle_image(p.img_name)
|
||||
|
||||
info2 = get_puzzle_info_message(manager, p)
|
||||
|
||||
@ -318,6 +330,10 @@ def create_admin_commands():
|
||||
|
||||
@cmd_admin.assign("publish")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
today = get_today_date()
|
||||
async with puzzle_manager() as manager:
|
||||
if today in manager.daily_puzzle_of_date:
|
||||
@ -328,18 +344,14 @@ def create_admin_commands():
|
||||
p = manager.get_today_puzzle(strong=True)
|
||||
if p is None:
|
||||
return await target.send_message("上架失败了orz,可能是没题了")
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage())
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(p))
|
||||
return await target.send_message("Ok!")
|
||||
|
||||
@cmd_admin.assign("preview")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str):
|
||||
async with puzzle_manager() as manager:
|
||||
puzzle = manager.puzzle_data.get(raw_id)
|
||||
if puzzle is None:
|
||||
return await target.send_message("没有这个谜题")
|
||||
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
|
||||
return await target.send_message("你没有权限预览这个谜题")
|
||||
return await target.send_message(puzzle.get_unimessage())
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
return await target.send_message(get_puzzle_description(p))
|
||||
|
||||
@cmd_admin.assign("get-submits")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str):
|
||||
@ -357,5 +369,96 @@ def create_admin_commands():
|
||||
msg = msg.text(f"- {get_username(uid)}:{s}\n")
|
||||
return await target.send_message(msg)
|
||||
|
||||
@cmd_admin.assign("test")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
|
||||
"""
|
||||
测试一道谜题的回答,并给出结果
|
||||
"""
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
result = p.check_submission(submission)
|
||||
msg = get_submission_message(p, result)
|
||||
return await target.send_message("[测试提交] " + msg)
|
||||
|
||||
@cmd_admin.assign("subcommands.hint")
|
||||
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
|
||||
if len(subcommands.result.subcommands) > 0:
|
||||
return
|
||||
return await target.send_message(
|
||||
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
|
||||
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
|
||||
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
|
||||
.text("- konaph hint modify <id> <hint_id>\n")
|
||||
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
|
||||
.text(" - --message <message>\n - 更改提示文本\n")
|
||||
.text(" - --checkpoint [True|False]\n - 更改是否为中间答案\n")
|
||||
.text("- konaph hint delete <id> <hint_id>\n - 删除一个提示 / 中间答案\n")
|
||||
.text("\n更多关于 pattern 和中间答案的信息,请见 man:中间答案(7)")
|
||||
)
|
||||
|
||||
@cmd_admin.assign("subcommands.hint.add")
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
pattern: str,
|
||||
message: str,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p.hints[p.hint_id_max + 1] = PuzzleHint(
|
||||
pattern=pattern,
|
||||
message=message,
|
||||
is_checkpoint=False,
|
||||
)
|
||||
await target.send_message("创建成功!\n\n" + get_puzzle_hint_list(p))
|
||||
|
||||
@cmd_admin.assign("subcommands.hint.list")
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
await target.send_message(get_puzzle_hint_list(p))
|
||||
|
||||
@cmd_admin.assign("subcommands.hint.modify")
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
hint_id: int,
|
||||
pattern: str | None = None,
|
||||
message: str | None = None,
|
||||
is_checkpoint: bool | None = None,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
if hint_id not in p.hints:
|
||||
raise BotExceptionMessage(
|
||||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||||
)
|
||||
hint = p.hints[hint_id]
|
||||
if pattern is not None:
|
||||
hint.pattern = pattern
|
||||
if message is not None:
|
||||
hint.message = message
|
||||
if is_checkpoint is not None:
|
||||
hint.is_checkpoint = is_checkpoint
|
||||
await target.send_message("更改成功!\n\n" + get_puzzle_hint_list(p))
|
||||
|
||||
@cmd_admin.assign("subcommands.hint.delete")
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
hint_id: int,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
if hint_id not in p.hints:
|
||||
raise BotExceptionMessage(
|
||||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||||
)
|
||||
del p.hints[hint_id]
|
||||
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
|
||||
|
||||
|
||||
return cmd_admin
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
from curses.ascii import isdigit
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
@ -40,7 +39,10 @@ async def _(
|
||||
doc: str | None,
|
||||
event: nonebot.adapters.Event,
|
||||
):
|
||||
if doc is not None and section is None and all(isdigit(c) for c in doc):
|
||||
if doc is not None and section is None and all(
|
||||
ord('0') <= ord(c) <= ord('9')
|
||||
for c in doc
|
||||
):
|
||||
section = int(doc)
|
||||
doc = None
|
||||
|
||||
|
||||
@ -15,7 +15,7 @@ if not POLL_DATA_FILE.exists():
|
||||
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
|
||||
|
||||
|
||||
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
|
||||
poll_list = json.loads(POLL_DATA_FILE.read_text("utf-8"))['poll']
|
||||
|
||||
async def createpoll(title,qqid,options):
|
||||
polllength = len(poll_list)
|
||||
@ -53,7 +53,7 @@ def writeback():
|
||||
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
|
||||
POLL_DATA_FILE.write_text(json.dumps({
|
||||
'poll': poll_list,
|
||||
}, ensure_ascii=False, sort_keys=True))
|
||||
}, ensure_ascii=False, sort_keys=True), "utf-8")
|
||||
|
||||
async def pollvote(polnum,optionnum,qqnum):
|
||||
optiond = poll_list[polnum]["polldata"]
|
||||
|
||||
@ -59,14 +59,14 @@ def load_notify_config() -> NotifyConfigFile:
|
||||
if not DATA_FILE_PATH.exists():
|
||||
return NotifyConfigFile()
|
||||
try:
|
||||
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text())
|
||||
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text("utf-8"))
|
||||
except Exception as e:
|
||||
logger.warning(f"在解析 Notify 时遇到问题:{e}")
|
||||
return NotifyConfigFile()
|
||||
|
||||
|
||||
def save_notify_config(config: NotifyConfigFile):
|
||||
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
|
||||
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4), "utf-8")
|
||||
|
||||
|
||||
@evt.handle()
|
||||
|
||||
13
scripts/watch_filter.py
Normal file
13
scripts/watch_filter.py
Normal file
@ -0,0 +1,13 @@
|
||||
from pathlib import Path
|
||||
from watchfiles import Change
|
||||
|
||||
|
||||
base = Path(__file__).parent.parent.absolute()
|
||||
|
||||
|
||||
def filter(change: Change, path: str) -> bool:
|
||||
if "__pycache__" in path:
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to(base / "data"):
|
||||
return False
|
||||
return True
|
||||
Reference in New Issue
Block a user