Compare commits

...

15 Commits

Author SHA1 Message Date
795300cb83 在每日答题情况添加记录点显示
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 18:42:47 +08:00
0231aa04f4 添加中间答案功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 18:29:10 +08:00
01fe33eb9f 部分解耦了 konaph 的一些层 2025-11-01 17:52:05 +08:00
adfbac7d90 支持正义 utf-8 2025-11-01 13:48:48 +08:00
994c1412da 为 Watchfiles 添加更可配置的过滤器
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-01 12:40:01 +08:00
8780dfec6f 在 Tag 成功后也进行 ntfy 通知
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-30 16:52:55 +08:00
490d807e7a 添加一些对题解提交空格的情况判定
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-30 16:48:09 +08:00
fa208199ab 我不小心多加了一个 s
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-29 22:01:13 +08:00
38a17f42a3 添加 Ntfy 构建消息的报告
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-29 21:59:11 +08:00
37179fc4d7 添加显示提交记录的指令
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-27 15:31:12 +08:00
56e0aabbf3 优化 UX,添加 preview 指令 2025-10-27 15:20:40 +08:00
ce2b7fd6f6 空调调温优化与排行榜,浏览器添加本地HTML支持
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-27 00:04:27 +08:00
b28f8f85a2 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-10-26 22:49:04 +08:00
0acffea86d 添加排行榜,优化 UX
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 12:45:29 +08:00
7a20c3fe2f 空调指数概率损坏与空调、骰子gif图的背景优化 2025-10-26 01:06:26 +08:00
28 changed files with 925 additions and 267 deletions

View File

@ -38,6 +38,17 @@ steps:
path: /var/run/docker.sock
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy
when:
status: [success, failure]
settings:
url: https://ntfy.service.jazzwhom.top
topic: drone_ci
tags:
- drone-ci
token:
from_secret: NTFY_TOKEN
volumes:
- name: docker-socket
@ -74,6 +85,17 @@ steps:
volumes:
- name: docker-socket
path: /var/run/docker.sock
- name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy
when:
status: [success, failure]
settings:
url: https://ntfy.service.jazzwhom.top
topic: drone_ci
tags:
- drone-ci
token:
from_secret: NTFY_TOKEN
volumes:
- name: docker-socket

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.REPL.enableREPLSmartSend": false
}

View File

@ -68,7 +68,7 @@ code .
使用命令行手动启动 Bot
```bash
poetry run watchfiles bot.main konabot
poetry run watchfiles bot.main . --filter scripts.watch_filter.filter
```
如果你不希望自动重载,只是想运行 Bot可以直接运行

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

View File

@ -0,0 +1,76 @@
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>空调炸炸排行榜</title>
</head>
<body>
<div class="box">
<div class="text">位居全球第 <span id="ranking" class="ranking">200</span></div>
<div class="text-2">您的群总共坏了 <span id="number" class="number">200</span> 台空调</div>
<img class="background" src="./assets/background.png" alt="空调炸炸排行榜">
</div>
</body>
<style>
.box {
position: relative;
width: 1024px;
}
.number {
font-size: 2em;
color: #ffdd00;
text-shadow: 3px 3px 6px rgba(0, 0, 0, 0.7);
font-weight: bold;
font-stretch: 50%;
max-width: 520px;
word-wrap: break-word;
line-height: 0.8em;
}
.background {
width: 1024px;
}
.text {
position: absolute;
top: 125px;
width: 100%;
font-size: 72px;
color: white;
text-shadow: 2px 2px 4px rgba(0, 0, 0, 0.7);
font-weight: bolder;
display: flex;
align-items: baseline;
justify-content: center;
}
.text-2 {
position: absolute;
top: 50px;
width: 100%;
font-size: 48px;
color: white;
text-shadow: 2px 2px 4px rgba(0, 0, 0, 0.7);
font-weight: bolder;
display: flex;
align-items: baseline;
justify-content: center;
}
.ranking {
font-size: 2em;
color: #ff0000;
-webkit-text-stroke: #ffffff 2px;
text-shadow: 3px 3px 6px rgba(0, 0, 0, 0.7);
font-weight: bold;
font-stretch: 50%;
}
</style>
<script>
// 从 URL 参数中获取 number 的值
const urlParams = new URLSearchParams(window.location.search);
const number = urlParams.get('number');
// 将 number 显示在页面上
document.getElementById('number').textContent = number;
// 从 URL 参数中获取 ranking 的值
const ranking = urlParams.get('ranking');
// 将 ranking 显示在页面上
document.getElementById('ranking').textContent = ranking;
</script>
</html>

View File

@ -19,12 +19,12 @@ class DataManager(Generic[T]):
if not self.fp.exists():
return self.cls()
try:
return self.cls.model_validate_json(self.fp.read_text())
return self.cls.model_validate_json(self.fp.read_text("utf-8"))
except ValidationError:
return self.cls()
def save(self, data: T):
self.fp.write_text(data.model_dump_json())
self.fp.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager
async def get_data(self):

View File

@ -240,7 +240,7 @@ def handle_long_task(callback_id: str):
def _load_longtask_data() -> LongTaskModuleData:
try:
txt = LONGTASK_DATA_DIR.read_text()
txt = LONGTASK_DATA_DIR.read_text("utf-8")
return LongTaskModuleData.model_validate_json(txt)
except (FileNotFoundError, ValidationError) as e:
logger.info(f"取得 LongTask 数据时出现问题:{e}")
@ -251,7 +251,7 @@ def _load_longtask_data() -> LongTaskModuleData:
def _save_longtask_data(data: LongTaskModuleData):
LONGTASK_DATA_DIR.write_text(data.model_dump_json())
LONGTASK_DATA_DIR.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager

View File

@ -1,10 +1,13 @@
from typing import Any, cast
import nonebot
from nonebot_plugin_alconna import UniMessage
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot_plugin_alconna import UniMessage
async def qq_broadcast(groups: list[str], msg: UniMessage[Any]):
async def qq_broadcast(groups: list[str], msg: UniMessage[Any] | str):
if isinstance(msg, str):
msg = UniMessage.text(msg)
bots: dict[str, OBBot] = {}
# group_id -> bot_id

View File

@ -0,0 +1,54 @@
import re
import nonebot
from nonebot.adapters.onebot.v11 import Bot as OBBot
class UsernameManager:
grouped_data: dict[int, dict[int, str]]
individual_data: dict[int, str]
def __init__(self) -> None:
self.grouped_data = {}
self.individual_data = {}
async def update(self):
for bot in nonebot.get_bots().values():
if isinstance(bot, OBBot):
for user in await bot.get_friend_list():
uid = user["user_id"]
nickname = user["nickname"]
self.individual_data[uid] = nickname
for group in await bot.get_group_list():
gid = group["group_id"]
for member in await bot.get_group_member_list(group_id=gid):
uid = member["user_id"]
card = member.get("card", "")
nickname = member.get("nickname", "")
if card:
self.grouped_data.setdefault(gid, {})[uid] = card
if nickname:
self.individual_data[uid] = nickname
def get(self, qqid: int, groupid: int | None = None) -> str:
if groupid is not None and groupid in self.grouped_data:
n = self.grouped_data[groupid].get(qqid)
if n is not None:
return n
if qqid in self.individual_data:
return self.individual_data[qqid]
return str(qqid)
manager = UsernameManager()
def get_username(qqid: int | str, group: int | str | None = None):
if isinstance(group, str):
group = None if not re.match(r"^\d+$", group) else int(group)
if isinstance(qqid, str):
if re.match(r"^\d+$", qqid):
qqid = int(qqid)
else:
return qqid
return manager.get(qqid, group)

View File

@ -80,6 +80,30 @@ class WebRenderer:
page = cls.page_pool[page_id]
return await instance.render_with_page(page, url, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def render_file(
cls,
file_path: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
'''
访问指定本地文件URL并返回截图
:param file_path: 目标文件路径
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
instance = await cls.get_browser_instance()
logger.debug(f"Using WebRendererInstance {id(instance)} to render file {file_path} targeting {target}")
return await instance.render_file(file_path, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def close_persistent_page(cls, page_id: str) -> None:
'''
@ -154,6 +178,10 @@ class WebRendererInstance:
async with self.lock:
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
return screenshot
async def render_file(self, file_path: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
file_path = "file:///" + str(file_path).replace("\\", "/")
return await self.render(file_path, target, index, params, other_function, timeout)
async def inner_render(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
logger.debug(f"Navigating to {url} with timeout {timeout}")

View File

@ -0,0 +1,11 @@
关于「中间答案」或者「提示」:
在 KonaPH 中,当有人发送「提交答案 答案」时,会检查答案是否符合你设置的中间答案的 pattern。这个 pattern 可以有两种方式:
- 纯文本的完整匹配:你设置的 pattern 如果和提交的答案完全相等,则会触发提示。
- regex 匹配:你设置的 pattern 如果以斜杠(/)开头和结尾,就会检查提交的答案是否匹配正则表达式。注意 ^ 和 $ 符号的使用。
- 例如:/^commit$/ 会匹配 commit不会匹配 acommit、Commit 等。
- 而如果是 /commit/,则会匹配 commit、acommit而不会匹配 Commit。
- 无法使用 Javascript 的字符串声明模式,例如,/case_insensitive/i 就不会被视作一个正则表达式。
一个提示是提示,还是中间答案,取决于它是否有 checkpoint 标记。如果有 checkpoint 标记,则会提示用户「你回答了一个中间答案」,并且这个中间答案的回答会在排行榜中显示。

View File

@ -1,13 +1,19 @@
from io import BytesIO
from typing import Optional, Union
import cv2
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna
from PIL import Image
import numpy as np
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
from konabot.plugins.air_conditioner.ac import AirConditioner, generate_ac_image
from konabot.common.web_render import WebRenderer
from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform
import random
import math
def get_ac(id: str) -> AirConditioner:
ac = AirConditioner.air_conditioners.get(id)
@ -70,42 +76,58 @@ async def _(event: BaseEvent, target: DepLongTaskTarget):
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调升温"
"空调升温",
Args["temp?", Optional[Union[int, float]]] # 可选参数升温的度数默认为1
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp <= 0:
return
id = target.channel_id
ac = get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
ac.temperature += 1
ac.temperature += temp
if ac.temperature > 40:
# 打开爆炸图片
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
output = BytesIO()
Image.open(f).save(output, format="GIF")
await evt.send(await UniMessage().image(raw=output).export())
ac.burnt = True
await evt.send("太热啦,空调炸了!")
return
# 根据温度随机出是否爆炸40度开始呈指数增长
possibility = -math.e ** ((40-ac.temperature) / 50) + 1
if random.random() < possibility:
# 打开爆炸图片
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
output = BytesIO()
# 爆炸抖动
frames = wiggle_transform(np.array(Image.open(f)), intensity=5)
pil_frames = [Image.fromarray(frame) for frame in frames]
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=35, disposal=2)
output.seek(0)
await evt.send(await UniMessage().image(raw=output).export())
ac.broke_ac(CrashType.BURNT)
await evt.send("太热啦,空调炸了!")
return
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调降温"
"空调降温",
Args["temp?", Optional[Union[int, float]]] # 可选参数降温的度数默认为1
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp <= 0:
return
id = target.channel_id
ac = get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
ac.temperature -= 1
ac.temperature -= temp
if ac.temperature < 0:
ac.frozen = True
# 根据温度随机出是否冻结0度开始呈指数增长
possibility = -math.e ** (ac.temperature / 50) + 1
if random.random() < possibility:
ac.broke_ac(CrashType.FROZEN)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
@ -117,4 +139,24 @@ async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.change_ac()
await send_ac_image(evt, ac)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调炸炸排行榜",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
number, ranking = ac.get_crashes_and_ranking()
params = {
"number": number,
"ranking": ranking
}
image = await WebRenderer.render_file(
file_path=ASSETS_PATH / "webpage" / "ac" / "index.html",
target=".box",
params=params
)
await evt.send(await UniMessage().image(raw=image).export())

View File

@ -1,3 +1,4 @@
from enum import Enum
from io import BytesIO
import cv2
@ -5,6 +6,12 @@ import numpy as np
from PIL import Image, ImageDraw, ImageFont
from konabot.common.path import ASSETS_PATH, FONTS_PATH
from konabot.common.path import DATA_PATH
import json
class CrashType(Enum):
BURNT = 0
FROZEN = 1
class AirConditioner:
air_conditioners: dict[str, "AirConditioner"] = {}
@ -23,6 +30,60 @@ class AirConditioner:
self.on = False
self.temperature = 24 # 重置为默认温度
def broke_ac(self, crash_type: CrashType):
'''
让空调坏掉,并保存数据
:param crash_type: CrashType 枚举,表示空调坏掉的类型
'''
match crash_type:
case CrashType.BURNT:
self.burnt = True
case CrashType.FROZEN:
self.frozen = True
self.save_crash_data(crash_type)
def save_crash_data(self, crash_type: CrashType):
'''
如果空调爆炸了,就往本地的 ac_crash_data.json 里该 id 的记录加一
'''
data_file = DATA_PATH / "ac_crash_data.json"
crash_data = {}
if data_file.exists():
with open(data_file, "r", encoding="utf-8") as f:
crash_data = json.load(f)
if self.id not in crash_data:
crash_data[self.id] = {"burnt": 0, "frozen": 0}
match crash_type:
case CrashType.BURNT:
crash_data[self.id]["burnt"] += 1
case CrashType.FROZEN:
crash_data[self.id]["frozen"] += 1
with open(data_file, "w", encoding="utf-8") as f:
json.dump(crash_data, f, ensure_ascii=False, indent=4)
def get_crashes_and_ranking(self) -> tuple[int, int]:
'''
获取该群在全国空调损坏的数量与排行榜的位置
'''
data_file = DATA_PATH / "ac_crash_data.json"
if not data_file.exists():
return 0, 1
with open(data_file, "r", encoding="utf-8") as f:
crash_data = json.load(f)
ranking_list = []
for gid, record in crash_data.items():
total = record.get("burnt", 0) + record.get("frozen", 0)
ranking_list.append((gid, total))
ranking_list.sort(key=lambda x: x[1], reverse=True)
total_crashes = crash_data.get(self.id, {}).get("burnt", 0) + crash_data.get(self.id, {}).get("frozen", 0)
rank = 1
for gid, total in ranking_list:
if gid == self.id:
break
rank += 1
return total_crashes, rank
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
"""
将文本转换为带透明背景的图像,图像大小刚好包含文本
@ -168,13 +229,13 @@ def precise_blend_with_perspective(background, foreground, corners):
return result
def wiggle_transform(image) -> list[np.ndarray]:
def wiggle_transform(image, intensity=2) -> list[np.ndarray]:
'''
返回一组图像振动的帧组,模拟空调运作时的抖动效果
'''
frames = []
height, width = image.shape[:2]
shifts = [(-2, 0), (2, 0), (0, -2), (0, 2), (0, 0)]
shifts = [(-intensity, 0), (intensity, 0), (0, -intensity), (0, intensity), (0, 0)]
for dx, dy in shifts:
M = np.float32([[1, 0, dx], [0, 1, dy]])
shifted = cv2.warpAffine(image, M, (width, height))
@ -193,7 +254,7 @@ async def generate_ac_image(ac: AirConditioner) -> BytesIO:
return output
# 根据生成温度文本图像
text = f"{ac.temperature}°C"
text = f"{round(ac.temperature, 1)}°C"
text_image = text_to_transparent_image(
text,
font_size=60,
@ -218,8 +279,10 @@ async def generate_ac_image(ac: AirConditioner) -> BytesIO:
final_image_simple = blend_with_transparency(ac_image, transformed_text, (0, 0))
frames = wiggle_transform(final_image_simple)
intensity = max(2, abs(int(ac.temperature) - 24) // 2)
frames = wiggle_transform(final_image_simple, intensity=intensity)
pil_frames = [Image.fromarray(frame) for frame in frames]
output = BytesIO()
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=50)
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=50, disposal=2)
return output

View File

@ -30,7 +30,7 @@ def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
return []
try:
return json.loads(DATA_FILE_PATH.read_text())
return json.loads(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
@ -45,14 +45,14 @@ def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
class TryStartState(Enum):

View File

@ -1,13 +1,23 @@
import datetime
import re
from math import ceil
from loguru import logger
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
from konabot.common.longtask import DepLongTaskTarget
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
on_alconna)
from nonebot_plugin_apscheduler import scheduler
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
create_admin_commands,
puzzle_manager)
create_admin_commands()
@ -20,16 +30,24 @@ async def is_play_group(target: DepLongTaskTarget):
return False
cmd_submit = on_alconna(Alconna(
"re:提交(?:答案|题解|[fF]lag)",
Args["flag", str],
), rule=is_play_group)
cmd_submit = on_message(rule=is_play_group)
@cmd_submit.handle()
async def _(flag: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
result = manager.submit(target.target_id, flag)
await target.send_message(result.get_unimessage())
async def _(msg: UniMsg, target: DepLongTaskTarget):
txt = msg.extract_plain_text().strip()
if match := re.match(r"^提交(?:答案|题解|[fF]lag)\s*(?P<submission>.+?)\s*$", txt):
submission: str = match.group("submission")
async with puzzle_manager() as manager:
result = manager.submit(target.target_id, submission)
if isinstance(result, str):
await target.send_message(result)
else:
await target.send_message(get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
))
cmd_query = on_alconna(Alconna(
@ -42,7 +60,20 @@ async def _(target: DepLongTaskTarget):
p = manager.get_today_puzzle()
if p is None:
return await target.send_message("今天无题,改日再来吧!")
await target.send_message(p.get_unimessage())
await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna(
"今日答题情况"
), rule=is_play_group)
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
async with puzzle_manager() as manager:
await target.send_message(get_daily_report_v2(manager, gid))
cmd_history = on_alconna(Alconna(
@ -57,15 +88,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
today = get_today_date()
if index_id:
index_id = index_id.removeprefix("#")
if index_id == manager.daily_puzzle_of_date.get(today, ""):
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
return await target.send_message(puzzle.get_unimessage())
if index_id in manager.daily_puzzle:
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
msg = puzzle.get_unimessage()
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}")
return await target.send_message(msg)
return await target.send_message("没有这道题哦")
if index_id not in manager.daily_puzzle:
return await target.send_message("没有这道题哦")
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
msg = get_puzzle_description(
puzzle,
with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")),
)
return await target.send_message(msg)
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
puzzles = [
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
@ -92,15 +122,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@scheduler.scheduled_job("cron", hour="8")
async def _():
async with puzzle_manager() as manager:
msg2 = manager.get_report_yesterday()
yesterday = get_today_date() - datetime.timedelta(days=1)
msg2 = get_daily_report(manager, yesterday)
if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
puzzle = manager.get_today_puzzle()
if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage())
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
else:
logger.info("自动任务:没有找到题目,跳过")

View File

@ -0,0 +1,29 @@
import nanoid
from konabot.common.path import ASSETS_PATH
from konabot.plugins.kona_ph.core.path import KONAPH_IMAGE_BASE
class PuzzleImageManager:
def read_puzzle_image(self, img_name: str) -> bytes:
fp = KONAPH_IMAGE_BASE / img_name
if fp.exists():
return fp.read_bytes()
return (ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes()
def upload_puzzle_image(self, data: bytes, suffix: str = ".png") -> str:
id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
)
img_name = f"{id}{suffix}"
(KONAPH_IMAGE_BASE / img_name).write_bytes(data)
return img_name
def remove_puzzle_image(self, img_name: str):
if img_name:
(KONAPH_IMAGE_BASE / img_name).unlink(True)
def get_image_manager() -> PuzzleImageManager:
return PuzzleImageManager()

View File

@ -0,0 +1,187 @@
"""
生成各种各样的 Message 的函数集合
"""
import datetime
import functools
import re
from typing import Any
from nonebot_plugin_alconna import UniMessage
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.storage import (DailyPuzzleInfo, Puzzle,
PuzzleManager,
PuzzleSubmission)
def get_puzzle_description(puzzle: Puzzle, with_answer: bool = False) -> UniMessage[Any]:
"""
获取一个谜题的描述
"""
img_manager = get_image_manager()
result = UniMessage.text(f"[KonaPH#{puzzle.index_id}] {puzzle.title}")
result = result.text(f"\n\n{puzzle.content}")
if puzzle.img_name:
result = result.text("\n\n").image(
raw=img_manager.read_puzzle_image(puzzle.img_name)
)
result = result.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
if with_answer:
result = result.text(f"\n\n题目答案:{puzzle.flag}")
else:
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def get_submission_message(
puzzle: Puzzle,
submission: PuzzleSubmission,
daily_puzzle_info: DailyPuzzleInfo | None = None,
) -> str:
"""
获得提交答案的反馈信息
"""
if submission.success:
rank = -1
if daily_puzzle_info is not None:
rank = len(daily_puzzle_info.success_users)
return f"🎉 恭喜你答对了!你是今天第 {rank} 个解出来的!"
if submission.hint_id >= 0 and (
hint := puzzle.hints.get(submission.hint_id)
) is not None:
if hint.is_checkpoint:
hint_msg = "✨ 恭喜!这是本题的中间答案,加油!"
else:
hint_msg = "🤔 答错啦!请检查你的答案。"
return f"{hint_msg}\n\n提示:{hint.message}"
return "❌ 答错啦!请检查你的答案。"
def get_daily_report(
manager: PuzzleManager,
date: datetime.date,
) -> str | None:
"""
获得某日的提交的报告信息
"""
index_id = manager.daily_puzzle_of_date.get(date)
if index_id is None:
return None
info = manager.daily_puzzle[index_id]
puzzle = manager.puzzle_data[info.raw_id]
msg = f"[KonaPH#{puzzle.index_id}] 「{puzzle.title}」解答报告\n\n"
if len(info.success_users) == 0:
msg += "昨日,无人解出此题 😭😭\n\n"
else:
msg += f"昨日,共有 {len(info.success_users)} 人解出此题。\n\n"
msg += "前五名的解答者:\n\n"
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = manager.submissions[puzzle.raw_id][u][-1]
msg += f"- {get_username(u)}{m.time.strftime('%H:%M')}\n"
msg += "\n"
msg += f"出题人:{get_username(puzzle.author_id)}"
return msg
def get_daily_report_v2(manager: PuzzleManager, gid: int | None = None):
p = manager.get_today_puzzle()
if p is None:
return "今天无题"
msg = "==== 今日答题情况 ====\n\n"
subcount = len(functools.reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg += (
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg += f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]"
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
checkpoints_touched = len(set((
s.hint_id for s in manager.submissions[p.raw_id][u]
if (
s.hint_id >= 0
and s.hint_id in p.hints
and p.hints[s.hint_id].is_checkpoint
)
)))
checkpoint_message = ""
if checkpoints_touched > 0:
checkpoint_message = f" | 🚩 {checkpoints_touched} 记录点"
msg += f"\n- {uname} [💦 {tries} 提交{checkpoint_message}]"
return msg
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
image_manager = get_image_manager()
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=image_manager.read_puzzle_image(puzzle.img_name))
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def get_puzzle_hint_list(puzzle: Puzzle) -> str:
msg = f"==== {puzzle.title} 提示与中间答案 ====\n"
if len(puzzle.hints) == 0:
msg += "\n你没有添加任何中间答案。"
return msg
for hint_id, hint in puzzle.hints.items():
n = {False: "[提示]", True: "[中间答案]"}[hint.is_checkpoint]
msg += f"\n{n}[{hint_id}] {hint.pattern}"
msg += f"\n {hint.message}"
return msg

View File

@ -0,0 +1,9 @@
from konabot.common.path import DATA_PATH
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)

View File

@ -1,26 +1,26 @@
import asyncio
import datetime
import random
import re
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
import nanoid
from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, Field, ValidationError
from konabot.common.path import DATA_PATH
from konabot.plugins.kona_ph.core.path import KONAPH_DATA_JSON
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
class PuzzleHint(BaseModel):
pattern: str
message: str
is_checkpoint: bool
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
hint_id: int = -1
class Puzzle(BaseModel):
@ -39,41 +39,47 @@ class Puzzle(BaseModel):
ready: bool = False
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
def get_image_path(self) -> Path:
return KONAPH_IMAGE_BASE / self.img_name
hints: dict[int, PuzzleHint] = Field(default_factory=dict)
def get_unimessage(self) -> UniMessage[Any]:
result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}")
result = result.text(f"\n\n{self.content}")
@property
def hint_id_max(self) -> int:
return max((0, *self.hints.keys()))
if self.img_name:
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes())
result = result.text("\n\n出题者:").at(self.author_id)
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def add_image(self, img: bytes, suffix: str = ".png"):
if self.img_name:
self.get_image_path().unlink(True)
img_id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
def check_submission(
self,
submission: str,
time: datetime.datetime | None = None,
) -> PuzzleSubmission:
if time is None:
time = datetime.datetime.now()
if submission == self.flag:
return PuzzleSubmission(
success=True,
flag=submission,
time=time,
)
for hint_id, hint in self.hints.items():
if hint.pattern.startswith('/') and hint.pattern.endswith('/'):
if re.match(hint.pattern.strip('/'), submission):
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
else:
if hint.pattern == submission:
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
)
self.img_name = f"{img_id}{suffix}"
self.get_image_path().write_bytes(img)
def remove_image(self):
if self.img_name:
self.get_image_path().unlink(True)
self.img_name = ""
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
class DailyPuzzleInfo(BaseModel):
@ -83,15 +89,10 @@ class DailyPuzzleInfo(BaseModel):
success_users: dict[str, datetime.datetime] = {}
class PuzzleSubmissionResultMessage(BaseModel):
success: bool
rank: int = -1
message: str = ""
def get_unimessage(self) -> UniMessage[Any]:
if self.success:
return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!")
return UniMessage.text(self.message)
class PuzzleSubmissionFeedback(BaseModel):
submission: PuzzleSubmission
puzzle: Puzzle
info: DailyPuzzleInfo | None = None
def get_today_date() -> datetime.date:
@ -161,26 +162,6 @@ class PuzzleManager(BaseModel):
self.index_id_counter += 1
def fix(self):
# 尝试修复今日的数据
for p in self.puzzle_data.values():
if self.is_puzzle_published(p.raw_id):
p.ready = True
if self.puzzle_pinned not in self.unpublished_puzzles:
self.puzzle_pinned = ""
# 撤回重复发布的题
already_published: set[str] = set()
for date in list(self.daily_puzzle_of_date.keys()):
index_id = self.daily_puzzle_of_date[date]
info = self.daily_puzzle[index_id]
if info.raw_id in already_published:
del self.daily_puzzle[index_id]
del self.daily_puzzle_of_date[date]
else:
already_published.add(info.raw_id)
def admin_pin_puzzle(self, raw_id: str):
if raw_id in self.puzzle_data:
self.puzzle_pinned = raw_id
@ -212,41 +193,23 @@ class PuzzleManager(BaseModel):
return
return self.daily_puzzle[p.index_id]
def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage:
def submit(self, user: str, flag: str) -> PuzzleSubmissionFeedback | str:
p = self.get_today_puzzle()
d = self.get_today_info()
now = datetime.datetime.now()
if p is None or d is None:
return PuzzleSubmissionResultMessage(
success=False,
message="今天没有题哦,改天再来吧!",
)
return "今天没有题哦,改天再来吧!"
if user in d.success_users:
return PuzzleSubmissionResultMessage(
success=False,
message="你今天已经答对过啦!不用重复提交哦!",
)
if flag != p.flag:
d.tried_users.add(user)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=False,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=False,
message="❌ 答错了,请检查你的答案哦",
)
return "你今天已经答对过啦!不用重复提交哦!"
d.tried_users.add(user)
d.success_users[user] = now
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=True,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=True,
rank=len(d.success_users),
result = p.check_submission(flag, now)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(result)
if result.success:
d.success_users[user] = now
return PuzzleSubmissionFeedback(
submission=result,
puzzle=p,
info=d,
)
def admin_create_puzzle(self, user: str):
@ -272,47 +235,20 @@ class PuzzleManager(BaseModel):
if p.author_id == user
], key=lambda p: p.created_at, reverse=True)
def get_report_yesterday(self):
yesterday = get_today_date() - datetime.timedelta(days=1)
index_id = self.daily_puzzle_of_date.get(yesterday)
if index_id is None:
return None
info = self.daily_puzzle[index_id]
puzzle = self.puzzle_data[info.raw_id]
message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告")
if len(info.success_users) == 0:
message = message.text(
"\n\n昨日,竟无人解出此题!"
)
else:
message = message.text(
f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:"
)
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = self.submissions[puzzle.raw_id][u][-1]
message = message.text("- ").at(u).text(f"{m.time.strftime('%H:%M')}")
message = message.text("\n\n出题者:").at(puzzle.author_id)
return message
lock = asyncio.Lock()
def read_data():
try:
data_raw = KONAPH_DATA_JSON.read_text()
data_raw = KONAPH_DATA_JSON.read_text("utf-8")
return PuzzleManager.model_validate_json(data_raw)
except (FileNotFoundError, ValidationError):
return PuzzleManager()
def write_data(data: PuzzleManager):
KONAPH_DATA_JSON.write_text(data.model_dump_json())
KONAPH_DATA_JSON.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager

View File

@ -1,14 +1,24 @@
import datetime
from math import ceil
from typing import Any
from nonebot import get_plugin_config
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
Subcommand, SubcommandResult, UniMessage,
on_alconna)
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
get_puzzle_info_message,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
get_today_date,
puzzle_manager)
PUZZLE_PAGE_SIZE = 10
@ -30,31 +40,15 @@ def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"标题: {puzzle.title}\n"
f"出题者 ID: {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Flag: {puzzle.flag}\n"
f"状态: {status}{status_suffix}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
if raw_id not in manager.puzzle_data:
raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id]
if is_puzzle_admin(target):
return puzzle
if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
return puzzle
def create_admin_commands():
@ -80,6 +74,46 @@ def create_admin_commands():
dest="modify",
),
Subcommand("publish", Args["raw_id?", str], dest="publish"),
Subcommand("preview", Args["raw_id", str], dest="preview"),
Subcommand("get-submits", Args["raw_id", str], dest="get-submits"),
Subcommand(
"test",
Args["raw_id", str],
Args["submission", str],
dest="test",
),
Subcommand(
"hint",
Subcommand(
"add",
Args["raw_id", str],
Args["pattern", str],
Args["message", str],
dest="add",
),
Subcommand(
"list",
Args["raw_id", str],
Args["page?", int],
dest="list",
),
Subcommand(
"modify",
Args["raw_id", str],
Args["hint_id", int],
Option("--pattern", Args["pattern", str], alias=["-p"]),
Option("--message", Args["message", str], alias=["-m"]),
Option("--checkpoint", Args["is_checkpoint", bool], alias=["-c"]),
dest="modify",
),
Subcommand(
"delete",
Args["raw_id", str],
Args["hint_id", int],
dest="delete",
),
dest="hint",
),
),
rule=is_puzzle_manager,
)
@ -93,6 +127,10 @@ def create_admin_commands():
msg = msg.text("konaph info <id> - 查看谜题\n")
msg = msg.text("konaph my <page?> - 查看我的谜题列表\n")
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n")
msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n")
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
@ -117,15 +155,7 @@ def create_admin_commands():
@cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
p = check_puzzle(manager, target, raw_id)
if p.ready:
return await target.send_message(UniMessage.text(
"题目早就准备好啦!"
@ -138,15 +168,7 @@ def create_admin_commands():
@cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
p = check_puzzle(manager, target, raw_id)
if not p.ready:
return await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经是未取消状态了!"
@ -164,16 +186,7 @@ def create_admin_commands():
@cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限查看详细信息!"
))
p = check_puzzle(manager, target, raw_id)
await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my")
@ -196,7 +209,7 @@ def create_admin_commands():
if manager.puzzle_pinned == p.raw_id:
message = message.text("[📌]")
if manager.is_puzzle_published(p.raw_id):
message = message.text(f"[#{p.index_id}] ")
message = message.text(f"[✨][#{p.index_id}] ")
elif p.ready:
message = message.text("[✅] ")
else:
@ -209,7 +222,9 @@ def create_admin_commands():
@cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text("你没有权限查看所有的哦"))
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()]
if ready.available:
@ -227,7 +242,7 @@ def create_admin_commands():
if p.raw_id == manager.puzzle_pinned:
message = message.text("[📌]")
if manager.is_puzzle_published(p.raw_id):
message = message.text(f"[#{p.index_id}] ")
message = message.text(f"[✨][#{p.index_id}] ")
elif p.ready:
message = message.text("[✅] ")
else:
@ -284,28 +299,30 @@ def create_admin_commands():
"支持的参数:\n"
" --title <str> 标题\n"
" --description <str> 题目详情描述(用直引号包裹以支持多行)\n"
" --flag <str> flag\n"
" --flag <str> flag,也就是题目的答案\n"
" --image <图片> 图片\n"
" --remove-image 删除图片"
)
image_manager = get_image_manager()
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message("没有这个谜题")
p = manager.puzzle_data[raw_id]
if not is_puzzle_admin(target) and target.target_id != p.author_id:
return await target.send_message("你没有权限编辑这个谜题")
p = check_puzzle(manager, target, raw_id)
if title is not None:
p.title = title
if description is not None:
p.content = description
if flag is not None:
p.flag = flag
p.flag = flag.strip()
if flag.strip() != flag:
await target.send_message(
"⚠️ 注意:你输入的 Flag 含有开头或结尾的空格,已经帮你去除"
)
if image is not None and image.url is not None:
b = await download_image_bytes(image.url)
p.add_image(b.unwrap())
image_manager.remove_puzzle_image(p.img_name)
image_manager.upload_puzzle_image(b.unwrap())
elif remove_image.available:
p.remove_image()
image_manager.remove_puzzle_image(p.img_name)
info2 = get_puzzle_info_message(manager, p)
@ -313,6 +330,10 @@ def create_admin_commands():
@cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
today = get_today_date()
async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date:
@ -323,7 +344,121 @@ def create_admin_commands():
p = manager.get_today_puzzle(strong=True)
if p is None:
return await target.send_message("上架失败了orz可能是没题了")
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage())
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(p))
return await target.send_message("Ok!")
@cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
return await target.send_message(get_puzzle_description(p))
@cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str):
async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None:
return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
return await target.send_message("你没有权限预览这个谜题")
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
submits = manager.submissions.get(raw_id, {})
for uid, ls in submits.items():
s = ', '.join((i.flag for i in ls))
msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg)
@cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
"""
测试一道谜题的回答,并给出结果
"""
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
result = p.check_submission(submission)
msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
if len(subcommands.result.subcommands) > 0:
return
return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
.text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n")
.text(" - --checkpoint [True|False]\n - 更改是否为中间答案\n")
.text("- konaph hint delete <id> <hint_id>\n - 删除一个提示 / 中间答案\n")
.text("\n更多关于 pattern 和中间答案的信息,请见 man中间答案(7)")
)
@cmd_admin.assign("subcommands.hint.add")
async def _(
target: DepLongTaskTarget,
raw_id: str,
pattern: str,
message: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern,
message=message,
is_checkpoint=False,
)
await target.send_message("创建成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.list")
async def _(
target: DepLongTaskTarget,
raw_id: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
pattern: str | None = None,
message: str | None = None,
is_checkpoint: bool | None = None,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
hint = p.hints[hint_id]
if pattern is not None:
hint.pattern = pattern
if message is not None:
hint.message = message
if is_checkpoint is not None:
hint.is_checkpoint = is_checkpoint
await target.send_message("更改成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.delete")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin

View File

@ -1,4 +1,3 @@
from curses.ascii import isdigit
from pathlib import Path
import nonebot
@ -40,7 +39,10 @@ async def _(
doc: str | None,
event: nonebot.adapters.Event,
):
if doc is not None and section is None and all(isdigit(c) for c in doc):
if doc is not None and section is None and all(
ord('0') <= ord(c) <= ord('9')
for c in doc
):
section = int(doc)
doc = None

View File

@ -15,7 +15,7 @@ if not POLL_DATA_FILE.exists():
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
poll_list = json.loads(POLL_DATA_FILE.read_text("utf-8"))['poll']
async def createpoll(title,qqid,options):
polllength = len(poll_list)
@ -53,7 +53,7 @@ def writeback():
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
POLL_DATA_FILE.write_text(json.dumps({
'poll': poll_list,
}, ensure_ascii=False, sort_keys=True))
}, ensure_ascii=False, sort_keys=True), "utf-8")
async def pollvote(polnum,optionnum,qqnum):
optiond = poll_list[polnum]["polldata"]

View File

@ -394,7 +394,8 @@ async def generate_dice_image(number: str) -> BytesIO:
append_images=images[1:],
duration=frame_durations,
format='GIF',
loop=1)
loop=1,
disposal=2)
output.seek(0)
# pil_final.save(output, format='PNG')
return output

View File

@ -4,10 +4,13 @@ from typing import cast
from loguru import logger
from nonebot import get_bot, on_request
import nonebot
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from nonebot_plugin_apscheduler import scheduler
from konabot.common.nb.is_admin import cfg as adminConfig
from konabot.common.username import manager
add_request = on_request()
@ -23,3 +26,15 @@ async def _(req: FriendRequestEvent):
await req.approve(bot)
logger.info(f"已经自动同意 {req.user_id} 的好友请求")
@scheduler.scheduled_job("cron", minute="*/5")
async def _():
logger.info("尝试更新群成员信息")
await manager.update()
driver = nonebot.get_driver()
@driver.on_bot_connect
async def _():
logger.info("有 Bot 连接5 秒后试着更新群成员信息")
await asyncio.sleep(5)
await manager.update()

View File

@ -59,14 +59,14 @@ def load_notify_config() -> NotifyConfigFile:
if not DATA_FILE_PATH.exists():
return NotifyConfigFile()
try:
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text())
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析 Notify 时遇到问题:{e}")
return NotifyConfigFile()
def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4), "utf-8")
@evt.handle()

13
scripts/watch_filter.py Normal file
View File

@ -0,0 +1,13 @@
from pathlib import Path
from watchfiles import Change
base = Path(__file__).parent.parent.absolute()
def filter(change: Change, path: str) -> bool:
if "__pycache__" in path:
return False
if Path(path).absolute().is_relative_to(base / "data"):
return False
return True