Compare commits

..

29 Commits

Author SHA1 Message Date
0231aa04f4 添加中间答案功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 18:29:10 +08:00
01fe33eb9f 部分解耦了 konaph 的一些层 2025-11-01 17:52:05 +08:00
adfbac7d90 支持正义 utf-8 2025-11-01 13:48:48 +08:00
994c1412da 为 Watchfiles 添加更可配置的过滤器
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-01 12:40:01 +08:00
8780dfec6f 在 Tag 成功后也进行 ntfy 通知
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-30 16:52:55 +08:00
490d807e7a 添加一些对题解提交空格的情况判定
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-30 16:48:09 +08:00
fa208199ab 我不小心多加了一个 s
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-29 22:01:13 +08:00
38a17f42a3 添加 Ntfy 构建消息的报告
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-29 21:59:11 +08:00
37179fc4d7 添加显示提交记录的指令
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-27 15:31:12 +08:00
56e0aabbf3 优化 UX,添加 preview 指令 2025-10-27 15:20:40 +08:00
ce2b7fd6f6 空调调温优化与排行榜,浏览器添加本地HTML支持
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-27 00:04:27 +08:00
b28f8f85a2 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-10-26 22:49:04 +08:00
0acffea86d 添加排行榜,优化 UX
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 12:45:29 +08:00
3e395f8a35 更少的量,更解耦的数据,更健壮的系统
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 11:56:03 +08:00
312e203bbe 忘记把这个答题情况通知加上了
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 04:07:26 +08:00
f9deabfce0 修复 Query 逻辑
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 04:03:51 +08:00
0a822bf440 优化 konaph UX 并添加文档
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 03:55:31 +08:00
534a2c9e75 解密厨来了2
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 03:42:28 +08:00
a03cef4124 解密厨来了 2025-10-26 03:23:51 +08:00
7a20c3fe2f 空调指数概率损坏与空调、骰子gif图的背景优化 2025-10-26 01:06:26 +08:00
16351792b6 修复成语接龙大家没被扣分的BUG
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-25 23:59:41 +08:00
7bbd4f81ee 成语接龙5.0、群空调功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-25 23:39:32 +08:00
4d5678efac Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-10-25 22:00:57 +08:00
c7229bb763 new render 2025-10-25 21:54:38 +08:00
6abc963ccf 优化提醒 UX
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-25 00:28:29 +08:00
881f38d187 调整 Web Renderer 的代码风格,完善类型注解
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-24 23:49:32 +08:00
56d32bc9f4 BA
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 23:25:00 +08:00
76f19f9eac 添加 Emoji 字体
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 22:11:46 +08:00
1479d8f8da 添加 CJK 字体依赖 2025-10-24 22:10:16 +08:00
36 changed files with 2080 additions and 167 deletions

View File

@ -38,6 +38,17 @@ steps:
path: /var/run/docker.sock
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy
when:
status: [success, failure]
settings:
url: https://ntfy.service.jazzwhom.top
topic: drone_ci
tags:
- drone-ci
token:
from_secret: NTFY_TOKEN
volumes:
- name: docker-socket
@ -74,6 +85,17 @@ steps:
volumes:
- name: docker-socket
path: /var/run/docker.sock
- name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy
when:
status: [success, failure]
settings:
url: https://ntfy.service.jazzwhom.top
topic: drone_ci
tags:
- drone-ci
token:
from_secret: NTFY_TOKEN
volumes:
- name: docker-socket

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.REPL.enableREPLSmartSend": false
}

View File

@ -14,7 +14,8 @@ RUN apt-get update && \
libxkbfile1 libxmu6 libxpm4 libxrender1 libxt6t64 x11-common x11-xkb-utils \
xfonts-encodings xfonts-utils xkb-data xserver-common libnspr4 libatk1.0-0t64 \
libatk-bridge2.0-0t64 libatspi2.0-0t64 libxcomposite1 libxdamage1 libxfixes3 \
libxkbcommon0 libasound2t64 libnss3 \
libxkbcommon0 libasound2t64 libnss3 fonts-noto-cjk fonts-noto-cjk-extra \
fonts-noto-color-emoji \
&& rm -rf /var/lib/apt/lists/*

View File

@ -68,7 +68,7 @@ code .
使用命令行手动启动 Bot
```bash
poetry run watchfiles bot.main konabot
poetry run watchfiles bot.main . --filter scripts.watch_filter.filter
```
如果你不希望自动重载,只是想运行 Bot可以直接运行

BIN
assets/img/ac/ac.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 57 KiB

BIN
assets/img/ac/broken_ac.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

BIN
assets/img/ac/frozen_ac.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 87 KiB

BIN
assets/img/other/boom.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

View File

@ -0,0 +1,76 @@
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>空调炸炸排行榜</title>
</head>
<body>
<div class="box">
<div class="text">位居全球第 <span id="ranking" class="ranking">200</span></div>
<div class="text-2">您的群总共坏了 <span id="number" class="number">200</span> 台空调</div>
<img class="background" src="./assets/background.png" alt="空调炸炸排行榜">
</div>
</body>
<style>
.box {
position: relative;
width: 1024px;
}
.number {
font-size: 2em;
color: #ffdd00;
text-shadow: 3px 3px 6px rgba(0, 0, 0, 0.7);
font-weight: bold;
font-stretch: 50%;
max-width: 520px;
word-wrap: break-word;
line-height: 0.8em;
}
.background {
width: 1024px;
}
.text {
position: absolute;
top: 125px;
width: 100%;
font-size: 72px;
color: white;
text-shadow: 2px 2px 4px rgba(0, 0, 0, 0.7);
font-weight: bolder;
display: flex;
align-items: baseline;
justify-content: center;
}
.text-2 {
position: absolute;
top: 50px;
width: 100%;
font-size: 48px;
color: white;
text-shadow: 2px 2px 4px rgba(0, 0, 0, 0.7);
font-weight: bolder;
display: flex;
align-items: baseline;
justify-content: center;
}
.ranking {
font-size: 2em;
color: #ff0000;
-webkit-text-stroke: #ffffff 2px;
text-shadow: 3px 3px 6px rgba(0, 0, 0, 0.7);
font-weight: bold;
font-stretch: 50%;
}
</style>
<script>
// 从 URL 参数中获取 number 的值
const urlParams = new URLSearchParams(window.location.search);
const number = urlParams.get('number');
// 将 number 显示在页面上
document.getElementById('number').textContent = number;
// 从 URL 参数中获取 ranking 的值
const ranking = urlParams.get('ranking');
// 将 ranking 显示在页面上
document.getElementById('ranking').textContent = ranking;
</script>
</html>

View File

@ -19,12 +19,12 @@ class DataManager(Generic[T]):
if not self.fp.exists():
return self.cls()
try:
return self.cls.model_validate_json(self.fp.read_text())
return self.cls.model_validate_json(self.fp.read_text("utf-8"))
except ValidationError:
return self.cls()
def save(self, data: T):
self.fp.write_text(data.model_dump_json())
self.fp.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager
async def get_data(self):

View File

@ -51,13 +51,20 @@ class LongTaskTarget(BaseModel):
target_id: str
"沟通对象的 ID"
async def send_message(self, msg: UniMessage, at: bool = True) -> bool:
@property
def is_private_chat(self):
return self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX)
async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool:
try:
bot = nonebot.get_bot(self.self_id)
except KeyError:
logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}")
return False
if isinstance(msg, str):
msg = UniMessage.text(msg)
if self.platform == "qq":
if not isinstance(bot, OBBot):
logger.warning(
@ -233,7 +240,7 @@ def handle_long_task(callback_id: str):
def _load_longtask_data() -> LongTaskModuleData:
try:
txt = LONGTASK_DATA_DIR.read_text()
txt = LONGTASK_DATA_DIR.read_text("utf-8")
return LongTaskModuleData.model_validate_json(txt)
except (FileNotFoundError, ValidationError) as e:
logger.info(f"取得 LongTask 数据时出现问题:{e}")
@ -244,7 +251,7 @@ def _load_longtask_data() -> LongTaskModuleData:
def _save_longtask_data(data: LongTaskModuleData):
LONGTASK_DATA_DIR.write_text(data.model_dump_json())
LONGTASK_DATA_DIR.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager

View File

@ -16,8 +16,6 @@ from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
from PIL import UnidentifiedImageError
from returns.result import Failure, Result, Success
from konabot.common.nb.exc import BotExceptionMessage
async def download_image_bytes(url: str) -> Result[bytes, str]:
# if "/matcha/cache/" in url:

View File

@ -0,0 +1,33 @@
from typing import Any, cast
import nonebot
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot_plugin_alconna import UniMessage
async def qq_broadcast(groups: list[str], msg: UniMessage[Any] | str):
if isinstance(msg, str):
msg = UniMessage.text(msg)
bots: dict[str, OBBot] = {}
# group_id -> bot_id
availabilities: dict[str, str] = {}
for bot_id, bot in nonebot.get_bots().items():
if not isinstance(bot, OBBot):
continue
bots[bot_id] = bot
gl = await bot.get_group_list()
for g in gl:
gid = str(g.get("group_id", -1))
if gid in groups:
availabilities[gid] = bot_id
for group in groups:
if group in availabilities:
bot = bots[availabilities[group]]
await bot.send_group_msg(
group_id=int(group),
message=cast(Any, await msg.export(bot)),
auto_escape=False,
)

View File

@ -0,0 +1,54 @@
import re
import nonebot
from nonebot.adapters.onebot.v11 import Bot as OBBot
class UsernameManager:
grouped_data: dict[int, dict[int, str]]
individual_data: dict[int, str]
def __init__(self) -> None:
self.grouped_data = {}
self.individual_data = {}
async def update(self):
for bot in nonebot.get_bots().values():
if isinstance(bot, OBBot):
for user in await bot.get_friend_list():
uid = user["user_id"]
nickname = user["nickname"]
self.individual_data[uid] = nickname
for group in await bot.get_group_list():
gid = group["group_id"]
for member in await bot.get_group_member_list(group_id=gid):
uid = member["user_id"]
card = member.get("card", "")
nickname = member.get("nickname", "")
if card:
self.grouped_data.setdefault(gid, {})[uid] = card
if nickname:
self.individual_data[uid] = nickname
def get(self, qqid: int, groupid: int | None = None) -> str:
if groupid is not None and groupid in self.grouped_data:
n = self.grouped_data[groupid].get(qqid)
if n is not None:
return n
if qqid in self.individual_data:
return self.individual_data[qqid]
return str(qqid)
manager = UsernameManager()
def get_username(qqid: int | str, group: int | str | None = None):
if isinstance(group, str):
group = None if not re.match(r"^\d+$", group) else int(group)
if isinstance(qqid, str):
if re.match(r"^\d+$", qqid):
qqid = int(qqid)
else:
return qqid
return manager.get(qqid, group)

View File

@ -1,13 +1,45 @@
import asyncio
import queue
from typing import Any, Callable, Coroutine
from loguru import logger
from playwright.async_api import async_playwright, Browser
from playwright.async_api import Page, Playwright, async_playwright, Browser, Page, BrowserContext
PageFunction = Callable[[Page], Coroutine[Any, Any, Any]]
class WebRenderer:
browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue()
context_pool: dict[int, BrowserContext] = {} # 长期挂载的浏览器上下文池
page_pool: dict[str, Page] = {} # 长期挂载的页面池
@classmethod
async def render(cls, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
async def get_browser_instance(cls) -> "WebRendererInstance":
if cls.browser_pool.empty():
instance = await WebRendererInstance.create()
cls.browser_pool.put(instance)
instance = cls.browser_pool.get()
cls.browser_pool.put(instance)
return instance
@classmethod
async def get_browser_context(cls) -> BrowserContext:
instance = await cls.get_browser_instance()
if id(instance) not in cls.context_pool:
context = await instance.browser.new_context()
cls.context_pool[id(instance)] = context
logger.debug(f"Created new persistent browser context for WebRendererInstance {id(instance)}")
return cls.context_pool[id(instance)]
@classmethod
async def render(
cls,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
'''
访问指定URL并返回截图
@ -19,30 +51,109 @@ class WebRenderer:
:return: 截图的字节数据
'''
logger.debug(f"Requesting render for {url} targeting {target} with timeout {timeout}")
if cls.browser_pool.empty():
instance = await WebRendererInstance.create()
cls.browser_pool.put(instance)
instance = cls.browser_pool.get()
cls.browser_pool.put(instance)
instance = await cls.get_browser_instance()
logger.debug(f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}")
return await instance.render(url, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def render_persistent_page(cls, page_id: str, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
'''
使用长期挂载的页面访问指定URL并返回截图
:param page_id: 页面唯一标识符
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
logger.debug(f"Requesting persistent render for page_id {page_id} at {url} targeting {target} with timeout {timeout}")
instance = await cls.get_browser_instance()
if page_id not in cls.page_pool:
context = await cls.get_browser_context()
page = await context.new_page()
cls.page_pool[page_id] = page
logger.debug(f"Created new persistent page for page_id {page_id} using WebRendererInstance {id(instance)}")
page = cls.page_pool[page_id]
return await instance.render_with_page(page, url, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def render_file(
cls,
file_path: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
'''
访问指定本地文件URL并返回截图
:param file_path: 目标文件路径
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
instance = await cls.get_browser_instance()
logger.debug(f"Using WebRendererInstance {id(instance)} to render file {file_path} targeting {target}")
return await instance.render_file(file_path, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def close_persistent_page(cls, page_id: str) -> None:
'''
关闭并移除长期挂载的页面
:param page_id: 页面唯一标识符
'''
if page_id in cls.page_pool:
page = cls.page_pool[page_id]
await page.close()
del cls.page_pool[page_id]
logger.debug(f"Closed and removed persistent page for page_id {page_id}")
class WebRendererInstance:
def __init__(self):
self.playwright = None
self.browser: Browser = None
self.lock: asyncio.Lock = None
self._playwright: Playwright | None = None
self._browser: Browser | None = None
self.lock = asyncio.Lock()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None
return self._browser
async def init(self):
self._playwright = await async_playwright().start()
self._browser = await self.playwright.chromium.launch(headless=True)
@classmethod
async def create(cls) -> "WebRendererInstance":
instance = cls()
instance.playwright = await async_playwright().start()
instance.browser = await instance.playwright.chromium.launch(headless=True)
instance.lock = asyncio.Lock()
await instance.init()
return instance
async def render(self, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
async def render(
self,
url: str,
target: str,
index: int = 0,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30
) -> bytes:
'''
访问指定URL并返回截图
@ -58,29 +169,43 @@ class WebRendererInstance:
async with self.lock:
context = await self.browser.new_context()
page = await context.new_page()
logger.debug(f"Navigating to {url} with timeout {timeout}")
try:
url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "")
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
logger.debug(f"Page loaded successfully")
# 等待目标元素出现
await page.wait_for_selector(target, timeout=timeout * 1000)
logger.debug(f"Target element '{target}' found, taking screenshot")
if other_function:
await other_function(page)
elements = await page.query_selector_all(target)
if not elements:
raise Exception(f"Target element '{target}' not found on the page.")
if index >= len(elements):
raise Exception(f"Index {index} out of range for elements matching '{target}'.")
element = elements[index]
screenshot = await element.screenshot()
logger.debug(f"Screenshot taken successfully")
return screenshot
finally:
await page.close()
await context.close()
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
await page.close()
await context.close()
return screenshot
async def render_with_page(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
async with self.lock:
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
return screenshot
async def render_file(self, file_path: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
file_path = "file:///" + str(file_path).replace("\\", "/")
return await self.render(file_path, target, index, params, other_function, timeout)
async def inner_render(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
logger.debug(f"Navigating to {url} with timeout {timeout}")
url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "")
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
logger.debug("Page loaded successfully")
# 等待目标元素出现
await page.wait_for_selector(target, timeout=timeout * 1000)
logger.debug(f"Target element '{target}' found, taking screenshot")
if other_function:
await other_function(page)
elements = await page.query_selector_all(target)
if not elements:
logger.error(f"Target element '{target}' not found on the page.")
return None
if index >= len(elements):
logger.error(f"Index {index} out of range for elements matching '{target}'")
return None
element = elements[index]
screenshot = await element.screenshot()
logger.debug(f"Screenshot taken successfully")
return screenshot
async def close(self):
await self.browser.close()
await self.playwright.stop()

View File

@ -0,0 +1,11 @@
关于「中间答案」或者「提示」:
在 KonaPH 中,当有人发送「提交答案 答案」时,会检查答案是否符合你设置的中间答案的 pattern。这个 pattern 可以有两种方式:
- 纯文本的完整匹配:你设置的 pattern 如果和提交的答案完全相等,则会触发提示。
- regex 匹配:你设置的 pattern 如果以斜杠(/)开头和结尾,就会检查提交的答案是否匹配正则表达式。注意 ^ 和 $ 符号的使用。
- 例如:/^commit$/ 会匹配 commit不会匹配 acommit、Commit 等。
- 而如果是 /commit/,则会匹配 commit、acommit而不会匹配 Commit。
- 无法使用 Javascript 的字符串声明模式,例如,/case_insensitive/i 就不会被视作一个正则表达式。
一个提示是提示,还是中间答案,取决于它是否有 checkpoint 标记。如果有 checkpoint 标记,则会提示用户「你回答了一个中间答案」,并且这个中间答案的回答会在排行榜中显示。

View File

@ -0,0 +1,4 @@
指令介绍
konaph - KonaBot 的 PuzzleHunt 管理工具
详细介绍请直接输入 konaph 获取使用指引(该指令权限仅对部分人开放。如果你有权限的话才有响应。建议在此方 BOT 私聊使用该指令。)

View File

@ -0,0 +1,162 @@
from io import BytesIO
from typing import Optional, Union
import cv2
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna
from PIL import Image
import numpy as np
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
from konabot.common.web_render import WebRenderer
from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform
import random
import math
def get_ac(id: str) -> AirConditioner:
ac = AirConditioner.air_conditioners.get(id)
if ac is None:
ac = AirConditioner(id)
return ac
async def send_ac_image(event: type[AlconnaMatcher], ac: AirConditioner):
if(ac.burnt == True):
# 打开坏掉的空调图片
with open(ASSETS_PATH / "img" / "ac" / "broken_ac.png", "rb") as f:
# 将其转为 GIF 格式发送
output = BytesIO()
Image.open(f).save(output, format="GIF")
output.seek(0)
await event.send(await UniMessage().image(raw=output).export())
return
if(ac.frozen == True):
# 打开坏掉的空调图片
with open(ASSETS_PATH / "img" / "ac" / "frozen_ac.png", "rb") as f:
# 将其转为 GIF 格式发送
output = BytesIO()
Image.open(f).save(output, format="GIF")
output.seek(0)
await event.send(await UniMessage().image(raw=output).export())
return
ac_image = await generate_ac_image(ac)
await event.send(await UniMessage().image(raw=ac_image).export())
evt = on_alconna(Alconna(
"群空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"开空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.on = True
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"关空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.on = False
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调升温",
Args["temp?", Optional[Union[int, float]]] # 可选参数升温的度数默认为1
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp <= 0:
return
id = target.channel_id
ac = get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
ac.temperature += temp
if ac.temperature > 40:
# 根据温度随机出是否爆炸40度开始呈指数增长
possibility = -math.e ** ((40-ac.temperature) / 50) + 1
if random.random() < possibility:
# 打开爆炸图片
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
output = BytesIO()
# 爆炸抖动
frames = wiggle_transform(np.array(Image.open(f)), intensity=5)
pil_frames = [Image.fromarray(frame) for frame in frames]
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=35, disposal=2)
output.seek(0)
await evt.send(await UniMessage().image(raw=output).export())
ac.broke_ac(CrashType.BURNT)
await evt.send("太热啦,空调炸了!")
return
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调降温",
Args["temp?", Optional[Union[int, float]]] # 可选参数降温的度数默认为1
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp <= 0:
return
id = target.channel_id
ac = get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
ac.temperature -= temp
if ac.temperature < 0:
# 根据温度随机出是否冻结0度开始呈指数增长
possibility = -math.e ** (ac.temperature / 50) + 1
if random.random() < possibility:
ac.broke_ac(CrashType.FROZEN)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"换空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.change_ac()
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调炸炸排行榜",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
number, ranking = ac.get_crashes_and_ranking()
params = {
"number": number,
"ranking": ranking
}
image = await WebRenderer.render_file(
file_path=ASSETS_PATH / "webpage" / "ac" / "index.html",
target=".box",
params=params
)
await evt.send(await UniMessage().image(raw=image).export())

View File

@ -0,0 +1,288 @@
from enum import Enum
from io import BytesIO
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from konabot.common.path import ASSETS_PATH, FONTS_PATH
from konabot.common.path import DATA_PATH
import json
class CrashType(Enum):
BURNT = 0
FROZEN = 1
class AirConditioner:
air_conditioners: dict[str, "AirConditioner"] = {}
def __init__(self, id: str) -> None:
self.id = id
self.on = False
self.temperature = 24 # 默认温度
self.burnt = False
self.frozen = False
AirConditioner.air_conditioners[id] = self
def change_ac(self):
self.burnt = False
self.frozen = False
self.on = False
self.temperature = 24 # 重置为默认温度
def broke_ac(self, crash_type: CrashType):
'''
让空调坏掉,并保存数据
:param crash_type: CrashType 枚举,表示空调坏掉的类型
'''
match crash_type:
case CrashType.BURNT:
self.burnt = True
case CrashType.FROZEN:
self.frozen = True
self.save_crash_data(crash_type)
def save_crash_data(self, crash_type: CrashType):
'''
如果空调爆炸了,就往本地的 ac_crash_data.json 里该 id 的记录加一
'''
data_file = DATA_PATH / "ac_crash_data.json"
crash_data = {}
if data_file.exists():
with open(data_file, "r", encoding="utf-8") as f:
crash_data = json.load(f)
if self.id not in crash_data:
crash_data[self.id] = {"burnt": 0, "frozen": 0}
match crash_type:
case CrashType.BURNT:
crash_data[self.id]["burnt"] += 1
case CrashType.FROZEN:
crash_data[self.id]["frozen"] += 1
with open(data_file, "w", encoding="utf-8") as f:
json.dump(crash_data, f, ensure_ascii=False, indent=4)
def get_crashes_and_ranking(self) -> tuple[int, int]:
'''
获取该群在全国空调损坏的数量与排行榜的位置
'''
data_file = DATA_PATH / "ac_crash_data.json"
if not data_file.exists():
return 0, 1
with open(data_file, "r", encoding="utf-8") as f:
crash_data = json.load(f)
ranking_list = []
for gid, record in crash_data.items():
total = record.get("burnt", 0) + record.get("frozen", 0)
ranking_list.append((gid, total))
ranking_list.sort(key=lambda x: x[1], reverse=True)
total_crashes = crash_data.get(self.id, {}).get("burnt", 0) + crash_data.get(self.id, {}).get("frozen", 0)
rank = 1
for gid, total in ranking_list:
if gid == self.id:
break
rank += 1
return total_crashes, rank
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
"""
将文本转换为带透明背景的图像,图像大小刚好包含文本
"""
# 创建临时图像来计算文本尺寸
temp_image = Image.new('RGB', (1, 1), (255, 255, 255))
temp_draw = ImageDraw.Draw(temp_image)
font = ImageFont.truetype(FONTS_PATH / "montserrat.otf", font_size)
# 获取文本边界框
bbox = temp_draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# 计算图像大小(文本大小 + 内边距)
image_width = int(text_width + 2 * padding)
image_height = int(text_height + 2 * padding)
# 创建RGBA模式的空白图像带透明通道
image = Image.new('RGBA', (image_width, image_height), (0, 0, 0, 0))
draw = ImageDraw.Draw(image)
# 绘制文本(考虑内边距)
x = padding - bbox[0] # 调整起始位置
y = padding - bbox[1]
# 设置文本颜色(带透明度)
if len(text_color) == 3:
text_color = text_color + (255,) # 添加完全不透明的alpha值
draw.text((x, y), text, fill=text_color, font=font)
# 转换为OpenCV格式BGRA
image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGBA2BGRA)
return image_cv
def perspective_transform(image, target, corners):
"""
对图像进行透视变换(保持透明通道)
target: 画布
corners: 四个角点的坐标,顺序为 [左上, 右上, 右下, 左下]
"""
height, width = image.shape[:2]
# 源点(原始图像的四个角)
src_points = np.array([
[0, 0], # 左上
[width-1, 0], # 右上
[width-1, height-1], # 右下
[0, height-1] # 左下
], dtype=np.float32)
# 目标点(变换后的四个角)
dst_points = np.array(corners, dtype=np.float32)
# 计算透视变换矩阵
matrix = cv2.getPerspectiveTransform(src_points, dst_points)
# 获取画布大小
target_height, target_width = target.shape[:2]
# 应用透视变换保持所有通道包括alpha
transformed = cv2.warpPerspective(image, matrix, (target_width, target_height), flags=cv2.INTER_LINEAR)
return transformed, matrix
def blend_with_transparency(background, foreground, position):
"""
将带透明通道的前景图像合成到背景图像上
position: 前景图像在背景图像上的位置 (x, y)
"""
bg = background.copy()
# 如果背景没有alpha通道添加一个
if bg.shape[2] == 3:
bg = cv2.cvtColor(bg, cv2.COLOR_BGR2BGRA)
bg[:, :, 3] = 255 # 完全不透明
x, y = position
fg_height, fg_width = foreground.shape[:2]
bg_height, bg_width = bg.shape[:2]
# 确保位置在图像范围内
x = max(0, min(x, bg_width - fg_width))
y = max(0, min(y, bg_height - fg_height))
# 提取前景的alpha通道并归一化
alpha_foreground = foreground[:, :, 3] / 255.0
# 对于每个颜色通道进行合成
for c in range(3):
bg_region = bg[y:y+fg_height, x:x+fg_width, c]
fg_region = foreground[:, :, c]
# alpha混合公式
bg[y:y+fg_height, x:x+fg_width, c] = (
alpha_foreground * fg_region +
(1 - alpha_foreground) * bg_region
)
# 更新背景的alpha通道如果需要
bg_alpha_region = bg[y:y+fg_height, x:x+fg_width, 3]
bg[y:y+fg_height, x:x+fg_width, 3] = np.maximum(bg_alpha_region, foreground[:, :, 3])
return bg
def precise_blend_with_perspective(background, foreground, corners):
"""
精确合成:根据四个角点将前景图像透视合成到背景上
"""
# 创建与背景相同大小的空白图像
bg_height, bg_width = background.shape[:2]
# 如果背景没有alpha通道转换为BGRA
if background.shape[2] == 3:
background_bgra = cv2.cvtColor(background, cv2.COLOR_BGR2BGRA)
else:
background_bgra = background.copy()
# 创建与背景相同大小的前景图层
foreground_layer = np.zeros((bg_height, bg_width, 4), dtype=np.uint8)
# 计算前景图像在背景中的边界框
min_x = int(min(corners[:, 0]))
max_x = int(max(corners[:, 0]))
min_y = int(min(corners[:, 1]))
max_y = int(max(corners[:, 1]))
# 将变换后的前景图像放置到对应位置
fg_height, fg_width = foreground.shape[:2]
if min_y + fg_height <= bg_height and min_x + fg_width <= bg_width:
foreground_layer[min_y:min_y+fg_height, min_x:min_x+fg_width] = foreground
# 创建掩码(只在前景有内容的地方合成)
mask = (foreground_layer[:, :, 3] > 0)
# 合成图像
result = background_bgra.copy()
for c in range(3):
result[:, :, c][mask] = foreground_layer[:, :, c][mask]
result[:, :, 3][mask] = foreground_layer[:, :, 3][mask]
return result
def wiggle_transform(image, intensity=2) -> list[np.ndarray]:
'''
返回一组图像振动的帧组,模拟空调运作时的抖动效果
'''
frames = []
height, width = image.shape[:2]
shifts = [(-intensity, 0), (intensity, 0), (0, -intensity), (0, intensity), (0, 0)]
for dx, dy in shifts:
M = np.float32([[1, 0, dx], [0, 1, dy]])
shifted = cv2.warpAffine(image, M, (width, height))
frames.append(shifted)
return frames
async def generate_ac_image(ac: AirConditioner) -> BytesIO:
# 找到空调底图
ac_image = cv2.imread(str(ASSETS_PATH / "img" / "ac" / "ac.png"), cv2.IMREAD_UNCHANGED)
if not ac.on:
# 空调关闭状态,直接返回底图
pil_final = Image.fromarray(ac_image)
output = BytesIO()
pil_final.save(output, format="GIF")
return output
# 根据生成温度文本图像
text = f"{round(ac.temperature, 1)}°C"
text_image = text_to_transparent_image(
text,
font_size=60,
text_color=(0, 0, 0) # 黑色文字
)
# 获取长宽比
height, width = text_image.shape[:2]
aspect_ratio = width / height
# 定义3D变换的四个角点透视效果
# 顺序: [左上, 右上, 右下, 左下]
corners = np.array([
[123, 45], # 左上
[284, 101], # 右上
[290, 140], # 右下
[119, 100] # 左下
], dtype=np.float32)
# 对文本图像进行3D变换保持透明通道
transformed_text, transform_matrix = perspective_transform(text_image, ac_image, corners)
final_image_simple = blend_with_transparency(ac_image, transformed_text, (0, 0))
intensity = max(2, abs(int(ac.temperature) - 24) // 2)
frames = wiggle_transform(final_image_simple, intensity=intensity)
pil_frames = [Image.fromarray(frame) for frame in frames]
output = BytesIO()
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=50, disposal=2)
return output

View File

@ -30,7 +30,7 @@ def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
return []
try:
return json.loads(DATA_FILE_PATH.read_text())
return json.loads(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
@ -45,14 +45,14 @@ def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
class TryStartState(Enum):
@ -69,10 +69,11 @@ class TryStopState(Enum):
class TryVerifyState(Enum):
VERIFIED = 0
VERIFIED_AND_REAL = 1
NOT_IDIOM = 2
WRONG_FIRST_CHAR = 3
BUT_NO_NEXT = 4
GAME_END = 5
ALREADY_USED = 2
NOT_IDIOM = 3
WRONG_FIRST_CHAR = 4
BUT_NO_NEXT = 5
GAME_END = 6
class IdiomGame:
@ -96,12 +97,14 @@ class IdiomGame:
self.all_buff_score = 0
self.lock = asynkio.Lock()
self.remain_rounds = 0 # 剩余回合数
self.already_idioms: dict[str, int] = {} # 已经使用过的成语和使用过的次数
self.idiom_history: list[list[str]] = [] # 成语使用历史记录,多个数组以存储不同成语链
IdiomGame.INSTANCE_LIST[group_id] = self
def be_able_to_play(self) -> bool:
if self.last_play_date != datetime.date.today():
self.last_play_date = datetime.date.today()
self.remain_playing_times = 1
self.remain_playing_times = 3
if self.remain_playing_times > 0:
self.remain_playing_times -= 1
return True
@ -115,6 +118,8 @@ class IdiomGame:
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self.choose_start_idiom()
else:
self.add_history_idiom(self.last_idiom, new_chain=True)
return self.last_idiom
@classmethod
@ -148,6 +153,9 @@ class IdiomGame:
def clear_score_board(self):
self.score_board = {}
self.last_char = ""
self.all_buff_score = 0
self.already_idioms = {}
self.idiom_history = []
def get_score_board(self) -> dict:
return self.score_board
@ -169,6 +177,8 @@ class IdiomGame:
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self._skip_idiom_async()
else:
self.add_history_idiom(self.last_idiom, new_chain=True)
return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
@ -184,6 +194,29 @@ class IdiomGame:
判断是否有成语可以接
"""
return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR
def add_already_idiom(self, idiom: str):
if idiom in self.already_idioms:
self.already_idioms[idiom] += 1
else:
self.already_idioms[idiom] = 1
def get_already_used_num(self, idiom: str) -> int:
if idiom in self.already_idioms:
return self.already_idioms[idiom]
return 0
def add_history_idiom(self, idiom: str, new_chain: bool = False):
if new_chain or len(self.idiom_history) == 0:
self.idiom_history.append([idiom])
else:
self.idiom_history[-1].append(idiom)
def display_history(self) -> list[str]:
result = []
for chain in self.idiom_history:
result.append(" -> ".join(chain))
return result
def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
state = []
@ -196,13 +229,18 @@ class IdiomGame:
state.append(TryVerifyState.NOT_IDIOM)
return state
# 成语合法,更新状态
self.add_history_idiom(idiom)
score_k = 0.5 ** self.get_already_used_num(idiom) # 每被使用过一次,得分减半
if(score_k != 1):
state.append(TryVerifyState.ALREADY_USED)
self.add_already_idiom(idiom)
state.append(TryVerifyState.VERIFIED)
self.last_idiom = idiom
self.last_char = idiom[-1]
self.add_score(user_id, 1)
self.add_score(user_id, 1 * score_k) # 先加 1 分
if idiom in IdiomGame.ALL_IDIOMS:
state.append(TryVerifyState.VERIFIED_AND_REAL)
self.add_score(user_id, 4) # 再加 4 分
self.add_score(user_id, 4 * score_k) # 再加 4 分
self.remain_rounds -= 1
if self.remain_rounds <= 0:
self.now_playing = False
@ -210,6 +248,7 @@ class IdiomGame:
if not self.is_nextable(self.last_char):
# 没有成语可以接了,自动跳过
self._skip_idiom_async()
self.add_buff_score(-100)
state.append(TryVerifyState.BUT_NO_NEXT)
return state
@ -217,7 +256,7 @@ class IdiomGame:
if user_id not in self.score_board:
return 0
# 避免浮点数精度问题导致过长
handled_score = round(self.score_board[user_id]["score"], 1)
handled_score = round(self.score_board[user_id]["score"] + self.all_buff_score, 1)
return handled_score
def add_score(self, user_id: str, score: int):
@ -401,7 +440,7 @@ async def end_game(event: BaseEvent, group_id: str):
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
score_board = instance.get_score_board()
if len(score_board) == 0:
result_text += "无人得分!"
result_text += "无人得分!\n"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(
@ -413,6 +452,13 @@ async def end_game(event: BaseEvent, group_id: str):
+ UniMessage().at(user_id)
+ f": {round(info['score'] + instance.get_all_buff_score(), 1)}\n"
)
if len(instance.idiom_history) == 0:
result_text += "\n本局没有任何接龙记录。"
else:
result_text += "\n你们的接龙记录是:\n"
history_lines = instance.display_history()
for line in history_lines:
result_text += line + "\n"
await evt.send(await result_text.export())
instance.clear_score_board()
@ -499,20 +545,39 @@ async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
.export()
)
return
already_used_num = instance.get_already_used_num(user_idiom)
if TryVerifyState.VERIFIED_AND_REAL in state:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
score = 5 * (0.5 ** (already_used_num - 1))
if already_used_num > 1:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,这是个被重复用过的成语,喜提 {score} 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
else:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
elif TryVerifyState.VERIFIED in state:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
score = 1 * (0.5 ** (already_used_num - 1))
if already_used_num > 1:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,但重复了,喜提 {score} 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
else:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
if TryVerifyState.GAME_END in state:
await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id)

View File

@ -0,0 +1,135 @@
import datetime
import re
from math import ceil
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
on_alconna)
from nonebot_plugin_apscheduler import scheduler
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
create_admin_commands,
puzzle_manager)
create_admin_commands()
async def is_play_group(target: DepLongTaskTarget):
if target.is_private_chat:
return True
if target.channel_id in config.plugin_puzzle_playgroup:
return True
return False
cmd_submit = on_message(rule=is_play_group)
@cmd_submit.handle()
async def _(msg: UniMsg, target: DepLongTaskTarget):
txt = msg.extract_plain_text().strip()
if match := re.match(r"^提交(?:答案|题解|[fF]lag)\s*(?P<submission>.+?)\s*$", txt):
submission: str = match.group("submission")
async with puzzle_manager() as manager:
result = manager.submit(target.target_id, submission)
if isinstance(result, str):
await target.send_message(result)
else:
await target.send_message(get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
))
cmd_query = on_alconna(Alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
), rule=is_play_group)
@cmd_query.handle()
async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager:
p = manager.get_today_puzzle()
if p is None:
return await target.send_message("今天无题,改日再来吧!")
await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna(
"今日答题情况"
), rule=is_play_group)
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
async with puzzle_manager() as manager:
await target.send_message(get_daily_report_v2(manager, gid))
cmd_history = on_alconna(Alconna(
"历史题目",
Args["page?", int],
Args["index_id?", str],
), rule=is_play_group)
@cmd_history.handle()
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
async with puzzle_manager() as manager:
today = get_today_date()
if index_id:
index_id = index_id.removeprefix("#")
if index_id not in manager.daily_puzzle:
return await target.send_message("没有这道题哦")
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
msg = get_puzzle_description(
puzzle,
with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")),
)
return await target.send_message(msg)
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
puzzles = [
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
for d, i in manager.daily_puzzle_of_date.items()
]
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
for p, d in puzzles:
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
msg = msg.text(
f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]"
f" {p.title} ({d})"
)
msg = msg.text("\n")
msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
await target.send_message(msg)
@scheduler.scheduled_job("cron", hour="8")
async def _():
async with puzzle_manager() as manager:
yesterday = get_today_date() - datetime.timedelta(days=1)
msg2 = get_daily_report(manager, yesterday)
if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
puzzle = manager.get_today_puzzle()
if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
else:
logger.info("自动任务:没有找到题目,跳过")

View File

@ -0,0 +1,29 @@
import nanoid
from konabot.common.path import ASSETS_PATH
from konabot.plugins.kona_ph.core.path import KONAPH_IMAGE_BASE
class PuzzleImageManager:
def read_puzzle_image(self, img_name: str) -> bytes:
fp = KONAPH_IMAGE_BASE / img_name
if fp.exists():
return fp.read_bytes()
return (ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes()
def upload_puzzle_image(self, data: bytes, suffix: str = ".png") -> str:
id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
)
img_name = f"{id}{suffix}"
(KONAPH_IMAGE_BASE / img_name).write_bytes(data)
return img_name
def remove_puzzle_image(self, img_name: str):
if img_name:
(KONAPH_IMAGE_BASE / img_name).unlink(True)
def get_image_manager() -> PuzzleImageManager:
return PuzzleImageManager()

View File

@ -0,0 +1,176 @@
"""
生成各种各样的 Message 的函数集合
"""
import datetime
import functools
import re
from typing import Any
from nonebot_plugin_alconna import UniMessage
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.storage import (DailyPuzzleInfo, Puzzle,
PuzzleManager,
PuzzleSubmission)
def get_puzzle_description(puzzle: Puzzle, with_answer: bool = False) -> UniMessage[Any]:
"""
获取一个谜题的描述
"""
img_manager = get_image_manager()
result = UniMessage.text(f"[KonaPH#{puzzle.index_id}] {puzzle.title}")
result = result.text(f"\n\n{puzzle.content}")
if puzzle.img_name:
result = result.text("\n\n").image(
raw=img_manager.read_puzzle_image(puzzle.img_name)
)
result = result.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
if with_answer:
result = result.text(f"\n\n题目答案:{puzzle.flag}")
else:
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def get_submission_message(
puzzle: Puzzle,
submission: PuzzleSubmission,
daily_puzzle_info: DailyPuzzleInfo | None = None,
) -> str:
"""
获得提交答案的反馈信息
"""
if submission.success:
rank = -1
if daily_puzzle_info is not None:
rank = len(daily_puzzle_info.success_users)
return f"🎉 恭喜你答对了!你是今天第 {rank} 个解出来的!"
if submission.hint_id >= 0 and (
hint := puzzle.hints.get(submission.hint_id)
) is not None:
if hint.is_checkpoint:
hint_msg = "✨ 恭喜!这是本题的中间答案,加油!"
else:
hint_msg = "🤔 答错啦!请检查你的答案。"
return f"{hint_msg}\n\n提示:{hint.message}"
return "❌ 答错啦!请检查你的答案。"
def get_daily_report(
manager: PuzzleManager,
date: datetime.date,
) -> str | None:
"""
获得某日的提交的报告信息
"""
index_id = manager.daily_puzzle_of_date.get(date)
if index_id is None:
return None
info = manager.daily_puzzle[index_id]
puzzle = manager.puzzle_data[info.raw_id]
msg = f"[KonaPH#{puzzle.index_id}] 「{puzzle.title}」解答报告\n\n"
if len(info.success_users) == 0:
msg += "昨日,无人解出此题 😭😭\n\n"
else:
msg += f"昨日,共有 {len(info.success_users)} 人解出此题。\n\n"
msg += "前五名的解答者:\n\n"
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = manager.submissions[puzzle.raw_id][u][-1]
msg += f"- {get_username(u)}{m.time.strftime('%H:%M')}\n"
msg += "\n"
msg += f"出题人:{get_username(puzzle.author_id)}"
return msg
def get_daily_report_v2(manager: PuzzleManager, gid: int | None = None):
p = manager.get_today_puzzle()
if p is None:
return "今天无题"
msg = "==== 今日答题情况 ====\n\n"
subcount = len(functools.reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg += (
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg += f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]"
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
msg += f"\n- {uname} [💦 {tries} 提交]"
return msg
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
image_manager = get_image_manager()
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=image_manager.read_puzzle_image(puzzle.img_name))
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def get_puzzle_hint_list(puzzle: Puzzle) -> str:
msg = f"==== {puzzle.title} 提示与中间答案 ====\n"
if len(puzzle.hints) == 0:
msg += "\n你没有添加任何中间答案。"
return msg
for hint_id, hint in puzzle.hints.items():
n = {False: "[提示]", True: "[中间答案]"}[hint.is_checkpoint]
msg += f"\n{n}[{hint_id}] {hint.pattern}"
msg += f"\n {hint.message}"
return msg

View File

@ -0,0 +1,9 @@
from konabot.common.path import DATA_PATH
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)

View File

@ -0,0 +1,259 @@
import asyncio
import datetime
import random
import re
from contextlib import asynccontextmanager
import nanoid
from pydantic import BaseModel, Field, ValidationError
from konabot.plugins.kona_ph.core.path import KONAPH_DATA_JSON
class PuzzleHint(BaseModel):
pattern: str
message: str
is_checkpoint: bool
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
hint_id: int = -1
class Puzzle(BaseModel):
raw_id: str
"用于给出题者管理的 ID"
index_id: str
"展出的 ID以展出顺序为准"
title: str
content: str
img_name: str
author_id: str
flag: str
ready: bool = False
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
hints: dict[int, PuzzleHint] = Field(default_factory=dict)
@property
def hint_id_max(self) -> int:
return max((0, *self.hints.keys()))
def check_submission(
self,
submission: str,
time: datetime.datetime | None = None,
) -> PuzzleSubmission:
if time is None:
time = datetime.datetime.now()
if submission == self.flag:
return PuzzleSubmission(
success=True,
flag=submission,
time=time,
)
for hint_id, hint in self.hints.items():
if hint.pattern.startswith('/') and hint.pattern.endswith('/'):
if re.match(hint.pattern.strip('/'), submission):
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
else:
if hint.pattern == submission:
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
)
class DailyPuzzleInfo(BaseModel):
raw_id: str
time: datetime.date
tried_users: set[str] = set()
success_users: dict[str, datetime.datetime] = {}
class PuzzleSubmissionFeedback(BaseModel):
submission: PuzzleSubmission
puzzle: Puzzle
info: DailyPuzzleInfo | None = None
def get_today_date() -> datetime.date:
now = datetime.datetime.now()
if now.hour < 8:
now -= datetime.timedelta(days=1)
return now.date()
class PuzzleManager(BaseModel):
puzzle_data: dict[str, Puzzle] = {}
daily_puzzle: dict[str, DailyPuzzleInfo] = {}
daily_puzzle_of_date: dict[datetime.date, str] = {}
puzzle_pinned: str = ""
index_id_counter: int = 1
submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {}
last_checked_date: datetime.date = Field(
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
)
@property
def last_publish_date(self):
return max(self.daily_puzzle_of_date.keys())
@property
def unpublished_puzzles(self):
return set((
p.raw_id for p in self.puzzle_data.values()
if not self.is_puzzle_published(p.raw_id) and p.ready
))
@property
def unready_puzzles(self):
return set((
p.raw_id for p in self.puzzle_data.values()
if not self.is_puzzle_published(p.raw_id) and not p.ready
))
@property
def published_puzzles(self):
return set((
p.raw_id for p in self.puzzle_data.values()
if self.is_puzzle_published(p.raw_id)
))
def is_puzzle_published(self, raw_id: str):
return raw_id in [i.raw_id for i in self.daily_puzzle.values()]
def publish_puzzle(self, raw_id: str):
assert raw_id in self.puzzle_data
today = get_today_date()
p = self.puzzle_data[raw_id]
p.index_id = str(self.index_id_counter)
p.ready = True
self.puzzle_pinned = ""
self.last_checked_date = today
self.daily_puzzle[p.index_id] = DailyPuzzleInfo(
raw_id=raw_id,
time=today,
)
self.daily_puzzle_of_date[today] = p.index_id
self.index_id_counter += 1
def admin_pin_puzzle(self, raw_id: str):
if raw_id in self.puzzle_data:
self.puzzle_pinned = raw_id
else:
self.puzzle_pinned = ""
def get_today_puzzle(self, strong: bool = False) -> Puzzle | None:
today = get_today_date()
if today in self.daily_puzzle_of_date:
index_id = self.daily_puzzle_of_date[today]
info = self.daily_puzzle[index_id]
return self.puzzle_data[info.raw_id]
if today == self.last_checked_date and not strong:
return
self.last_checked_date = today
if self.puzzle_pinned and self.puzzle_pinned in self.puzzle_data:
d = self.puzzle_pinned
self.publish_puzzle(d)
self.puzzle_pinned = ""
return self.puzzle_data[d]
elif len(self.unpublished_puzzles) > 0:
d = random.choice(list(self.unpublished_puzzles))
self.publish_puzzle(d)
return self.puzzle_data[d]
def get_today_info(self) -> DailyPuzzleInfo | None:
p = self.get_today_puzzle()
if p is None:
return
return self.daily_puzzle[p.index_id]
def submit(self, user: str, flag: str) -> PuzzleSubmissionFeedback | str:
p = self.get_today_puzzle()
d = self.get_today_info()
now = datetime.datetime.now()
if p is None or d is None:
return "今天没有题哦,改天再来吧!"
if user in d.success_users:
return "你今天已经答对过啦!不用重复提交哦!"
d.tried_users.add(user)
result = p.check_submission(flag, now)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(result)
if result.success:
d.success_users[user] = now
return PuzzleSubmissionFeedback(
submission=result,
puzzle=p,
info=d,
)
def admin_create_puzzle(self, user: str):
p = Puzzle(
raw_id=nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
12,
),
index_id="",
title="示例标题",
content="题目的内容填写内容",
img_name="",
author_id=user,
flag="konaph{this_is_a_flag}",
ready=False,
)
self.puzzle_data[p.raw_id] = p
return p
def get_puzzles_of_user(self, user: str):
return sorted([
p for p in self.puzzle_data.values()
if p.author_id == user
], key=lambda p: p.created_at, reverse=True)
lock = asyncio.Lock()
def read_data():
try:
data_raw = KONAPH_DATA_JSON.read_text("utf-8")
return PuzzleManager.model_validate_json(data_raw)
except (FileNotFoundError, ValidationError):
return PuzzleManager()
def write_data(data: PuzzleManager):
KONAPH_DATA_JSON.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager
async def puzzle_manager():
async with lock:
data = read_data()
yield data
write_data(data)

View File

@ -0,0 +1,464 @@
import datetime
from math import ceil
from nonebot import get_plugin_config
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
Subcommand, SubcommandResult, UniMessage,
on_alconna)
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
get_puzzle_info_message,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
get_today_date,
puzzle_manager)
PUZZLE_PAGE_SIZE = 10
class PuzzleConfig(BaseModel):
plugin_puzzle_manager: list[str] = []
plugin_puzzle_admin: list[str] = []
plugin_puzzle_playgroup: list[str] = []
config = get_plugin_config(PuzzleConfig)
def is_puzzle_manager(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
if raw_id not in manager.puzzle_data:
raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id]
if is_puzzle_admin(target):
return puzzle
if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
return puzzle
def create_admin_commands():
cmd_admin = on_alconna(
Alconna(
"konaph",
Subcommand("create", dest="create"),
Subcommand("ready", Args["raw_id", str], dest="ready"),
Subcommand("unready", Args["raw_id", str], dest="unready"),
Subcommand("info", Args["raw_id", str], dest="info"),
Subcommand("my", Args["page?", int], dest="my"),
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
Subcommand("pin", Args["raw_id?", str], dest="pin"),
Subcommand("unpin", dest="unpin"),
Subcommand(
"modify",
Args["raw_id?", str],
Option("--title", Args["title", str], alias=["-t"]),
Option("--description", Args["description", str], alias=["-d"]),
Option("--image", Args["image?", Image], alias=["-i"]),
Option("--flag", Args["flag", str], alias=["-f"]),
Option("--remove-image"),
dest="modify",
),
Subcommand("publish", Args["raw_id?", str], dest="publish"),
Subcommand("preview", Args["raw_id", str], dest="preview"),
Subcommand("get-submits", Args["raw_id", str], dest="get-submits"),
Subcommand(
"test",
Args["raw_id", str],
Args["submission", str],
dest="test",
),
Subcommand(
"hint",
Subcommand(
"add",
Args["raw_id", str],
Args["pattern", str],
Args["message", str],
dest="add",
),
Subcommand(
"list",
Args["raw_id", str],
Args["page?", int],
dest="list",
),
Subcommand(
"modify",
Args["raw_id", str],
Args["hint_id", int],
Option("--pattern", Args["pattern", str], alias=["-p"]),
Option("--message", Args["message", str], alias=["-m"]),
Option("--checkpoint", Args["is_checkpoint", bool], alias=["-c"]),
dest="modify",
),
Subcommand(
"delete",
Args["raw_id", str],
Args["hint_id", int],
dest="delete",
),
dest="hint",
),
),
rule=is_puzzle_manager,
)
@cmd_admin.assign("$main")
async def _(target: DepLongTaskTarget):
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
msg = msg.text("konaph create - 创建一个新的谜题\n")
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
msg = msg.text("konaph unready <id> - 取消准备一道谜题\n")
msg = msg.text("konaph info <id> - 查看谜题\n")
msg = msg.text("konaph my <page?> - 查看我的谜题列表\n")
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n")
msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n")
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
msg = msg.text("konaph unpin - 取消置顶所有谜题\n")
msg = msg.text("konaph publish <id?> - 强制发题")
await target.send_message(msg)
@cmd_admin.assign("create")
async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager:
puzzle = manager.admin_create_puzzle(target.target_id)
await target.send_message(UniMessage.text(
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
f"- 输入 `konaph my` 查看你创建的谜题\n"
f"- 输入 `konaph modify` 查看更改谜题的方法"
))
@cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if p.ready:
return await target.send_message(UniMessage.text(
"题目早就准备好啦!"
))
p.ready = True
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经准备就绪!"
))
@cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if not p.ready:
return await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经是未取消状态了!"
))
if manager.is_puzzle_published(p.raw_id):
return await target.send_message(UniMessage.text(
"已发布的谜题不能取消准备状态!"
))
p.ready = False
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经取消准备!"
))
@cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my")
async def _(target: DepLongTaskTarget, page: int = 1):
async with puzzle_manager() as manager:
puzzles = manager.get_puzzles_of_user(target.target_id)
if len(puzzles) == 0:
return await target.send_message(UniMessage.text(
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
))
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 我的谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
if manager.puzzle_pinned == p.raw_id:
message = message.text("[📌]")
if manager.is_puzzle_published(p.raw_id):
message = message.text(f"[✨][#{p.index_id}] ")
elif p.ready:
message = message.text("[✅] ")
else:
message = message.text("[⚙️] ")
message = message.text(f"{p.title} ({p.raw_id})")
message = message.text("\n")
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
await target.send_message(message)
@cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()]
if ready.available:
puzzles = [p for p in puzzles if p.ready]
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 所有谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
if p.raw_id == manager.puzzle_pinned:
message = message.text("[📌]")
if manager.is_puzzle_published(p.raw_id):
message = message.text(f"[✨][#{p.index_id}] ")
elif p.ready:
message = message.text("[✅] ")
else:
message = message.text("[⚙️] ")
message = message.text(f"{p.title} ({p.raw_id} by {p.author_id})")
message = message.text("\n")
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
await target.send_message(message)
@cmd_admin.assign("pin")
async def _(target: DepLongTaskTarget, raw_id: str = ""):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async with puzzle_manager() as manager:
if raw_id == "":
if manager.puzzle_pinned:
return await target.send_message(UniMessage.text(
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
))
return await target.send_message("没有置顶谜题")
if raw_id not in manager.unpublished_puzzles:
return await target.send_message(UniMessage.text(
"这个谜题已经发布了,或者还没准备好,或者不存在"
))
manager.admin_pin_puzzle(raw_id)
return await target.send_message(f"已置顶谜题 {raw_id}")
@cmd_admin.assign("unpin")
async def _(target: DepLongTaskTarget):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async with puzzle_manager() as manager:
manager.admin_pin_puzzle("")
return await target.send_message("已取消所有置顶")
@cmd_admin.assign("modify")
async def _(
target: DepLongTaskTarget,
raw_id: str = "",
title: str | None = None,
description: str | None = None,
flag: str | None = None,
image: Image | None = None,
remove_image: Query[bool] = Query("modify.remove-image"),
):
if raw_id == "":
return await target.send_message(
"konaph modify <raw_id> - 修改一个谜题\n\n"
"支持的参数:\n"
" --title <str> 标题\n"
" --description <str> 题目详情描述(用直引号包裹以支持多行)\n"
" --flag <str> flag也就是题目的答案\n"
" --image <图片> 图片\n"
" --remove-image 删除图片"
)
image_manager = get_image_manager()
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if title is not None:
p.title = title
if description is not None:
p.content = description
if flag is not None:
p.flag = flag.strip()
if flag.strip() != flag:
await target.send_message(
"⚠️ 注意:你输入的 Flag 含有开头或结尾的空格,已经帮你去除"
)
if image is not None and image.url is not None:
b = await download_image_bytes(image.url)
image_manager.remove_puzzle_image(p.img_name)
image_manager.upload_puzzle_image(b.unwrap())
elif remove_image.available:
image_manager.remove_puzzle_image(p.img_name)
info2 = get_puzzle_info_message(manager, p)
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
@cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
today = get_today_date()
async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date:
return await target.send_message("今日已经有题了哦")
manager.last_checked_date = today - datetime.timedelta(days=-1)
if raw_id is not None:
manager.admin_pin_puzzle(raw_id)
p = manager.get_today_puzzle(strong=True)
if p is None:
return await target.send_message("上架失败了orz可能是没题了")
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(p))
return await target.send_message("Ok!")
@cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
return await target.send_message(get_puzzle_description(p))
@cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str):
async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None:
return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
return await target.send_message("你没有权限预览这个谜题")
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
submits = manager.submissions.get(raw_id, {})
for uid, ls in submits.items():
s = ', '.join((i.flag for i in ls))
msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg)
@cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
"""
测试一道谜题的回答,并给出结果
"""
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
result = p.check_submission(submission)
msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
if len(subcommands.result.subcommands) > 0:
return
return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
.text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n")
.text(" - --checkpoint [True|False]\n - 更改是否为中间答案\n")
.text("- konaph hint delete <id> <hint_id>\n - 删除一个提示 / 中间答案\n")
.text("\n更多关于 pattern 和中间答案的信息,请见 man中间答案(7)")
)
@cmd_admin.assign("subcommands.hint.add")
async def _(
target: DepLongTaskTarget,
raw_id: str,
pattern: str,
message: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern,
message=message,
is_checkpoint=False,
)
await target.send_message("创建成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.list")
async def _(
target: DepLongTaskTarget,
raw_id: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
pattern: str | None = None,
message: str | None = None,
is_checkpoint: bool | None = None,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
hint = p.hints[hint_id]
if pattern is not None:
hint.pattern = pattern
if message is not None:
hint.message = message
if is_checkpoint is not None:
hint.is_checkpoint = is_checkpoint
await target.send_message("更改成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.delete")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin

View File

@ -1,4 +1,3 @@
from curses.ascii import isdigit
from pathlib import Path
import nonebot
@ -40,7 +39,10 @@ async def _(
doc: str | None,
event: nonebot.adapters.Event,
):
if doc is not None and section is None and all(isdigit(c) for c in doc):
if doc is not None and section is None and all(
ord('0') <= ord(c) <= ord('9')
for c in doc
):
section = int(doc)
doc = None

View File

@ -15,7 +15,7 @@ if not POLL_DATA_FILE.exists():
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
poll_list = json.loads(POLL_DATA_FILE.read_text("utf-8"))['poll']
async def createpoll(title,qqid,options):
polllength = len(poll_list)
@ -53,7 +53,7 @@ def writeback():
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
POLL_DATA_FILE.write_text(json.dumps({
'poll': poll_list,
}, ensure_ascii=False, sort_keys=True))
}, ensure_ascii=False, sort_keys=True), "utf-8")
async def pollvote(polnum,optionnum,qqnum):
optiond = poll_list[polnum]["polldata"]

View File

@ -394,7 +394,8 @@ async def generate_dice_image(number: str) -> BytesIO:
append_images=images[1:],
duration=frame_durations,
format='GIF',
loop=1)
loop=1,
disposal=2)
output.seek(0)
# pil_final.save(output, format='PNG')
return output

View File

@ -4,10 +4,13 @@ from typing import cast
from loguru import logger
from nonebot import get_bot, on_request
import nonebot
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from nonebot_plugin_apscheduler import scheduler
from konabot.common.nb.is_admin import cfg as adminConfig
from konabot.common.username import manager
add_request = on_request()
@ -23,3 +26,15 @@ async def _(req: FriendRequestEvent):
await req.approve(bot)
logger.info(f"已经自动同意 {req.user_id} 的好友请求")
@scheduler.scheduled_job("cron", minute="*/5")
async def _():
logger.info("尝试更新群成员信息")
await manager.update()
driver = nonebot.get_driver()
@driver.on_bot_connect
async def _():
logger.info("有 Bot 连接5 秒后试着更新群成员信息")
await asyncio.sleep(5)
await manager.update()

View File

@ -1,23 +1,19 @@
import aiohttp
import asyncio as asynkio
import datetime
from math import ceil
from pathlib import Path
from typing import Any, Literal
from typing import Any
import nanoid
import nonebot
import ptimeparse
from loguru import logger
from nonebot import get_plugin_config, on_message
from nonebot.adapters import Bot, Event
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot.adapters.console import Bot as CBot
from nonebot.adapters.discord import Bot as DCBot
from nonebot.adapters import Event
from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg, on_alconna
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget, LongTask, LongTaskTarget, create_longtask, handle_long_task, longtask_data
from konabot.common.longtask import DepLongTaskTarget, LongTask, create_longtask, handle_long_task, longtask_data
evt = on_message()
@ -32,27 +28,8 @@ PAGE_SIZE = 6
FMT_STRING = "%Y年%m月%d%H:%M:%S"
class NotifyMessage(BaseModel):
message: str
class Notify(BaseModel):
platform: Literal["console", "qq", "discord"]
target: str
"需要接受通知的个体"
target_env: str | None
"在哪里进行通知,如果是 None 代表私聊通知"
notify_time: datetime.datetime
notify_msg: str
class NotifyConfigFile(BaseModel):
version: int = 2
notifies: list[Notify] = []
unsent: list[Notify] = []
notify_channels: dict[str, str] = {}
@ -78,48 +55,18 @@ async def send_notify_to_ntfy_instance(msg: str, channel: str):
logger.info(f"访问 {url} 的结果是 {response.status}")
def _get_bot_of(_type: type[Bot]):
for bot in nonebot.get_bots().values():
if isinstance(bot, _type):
return bot.self_id
return ""
def get_target_from_notify(notify: Notify) -> LongTaskTarget:
if notify.platform == "console":
return LongTaskTarget(
platform="console",
self_id=_get_bot_of(CBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
if notify.platform == "discord":
return LongTaskTarget(
platform="discord",
self_id=_get_bot_of(DCBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
return LongTaskTarget(
platform="qq",
self_id=_get_bot_of(OBBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
def load_notify_config() -> NotifyConfigFile:
if not DATA_FILE_PATH.exists():
return NotifyConfigFile()
try:
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text())
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析 Notify 时遇到问题:{e}")
return NotifyConfigFile()
def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4), "utf-8")
@evt.handle()
@ -160,40 +107,6 @@ async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget):
driver = nonebot.get_driver()
NOTIFIED_FLAG = {
"task_added": False,
}
@driver.on_bot_connect
async def _():
if NOTIFIED_FLAG["task_added"]:
return
NOTIFIED_FLAG["task_added"] = True
DELTA = 2
logger.info(f"第一次探测到 Bot 连接,等待 {DELTA} 秒后开始通知")
await asynkio.sleep(DELTA)
await DATA_FILE_LOCK.acquire()
cfg = load_notify_config()
if cfg.version == 1:
logger.info("将配置文件的版本升级为 2")
cfg.version = 2
else:
for notify in [*cfg.notifies]:
await create_longtask(
handler=LONG_TASK_NAME,
data={ "message": notify.notify_msg },
target=get_target_from_notify(notify),
deadline=notify.notify_time,
)
cfg.notifies = []
save_notify_config(cfg)
DATA_FILE_LOCK.release()
@handle_long_task("TASK_SIMPLE_NOTIFY")
async def _(task: LongTask):
@ -284,7 +197,17 @@ cmd_notify_channel = on_alconna(Alconna(
@cmd_notify_channel.assign("$main")
async def _(target: DepLongTaskTarget):
async with DATA_FILE_LOCK:
data = load_notify_config()
target_channel = data.notify_channels.get(target.target_id)
if target_channel is None:
channel_msg = "目前还没有配置 ntfy 地址"
else:
channel_msg = f"配置的 ntfy Channel 为:{target_channel}\n\n服务器地址:{config.plugin_notify_base_url}"
await target.send_message(UniMessage.text(
f"{channel_msg}\n\n"
"配置 ntfy 通知:\n\n"
"- ntfy 创建: 启用 ntfy 通知,并为你随机生成一个通知渠道\n"
"- ntfy 删除:禁用 ntfy 通知\n"

View File

@ -75,6 +75,44 @@ async def _(msg: UniMsg, event: BaseEvent, content: Optional[str] = ""):
other_function=lambda page: beibao_continue_handle(page, content),
timeout=30
)
await evt.send(
await UniMessage().image(raw=screenshot).export()
)
async def continue_handle_3(page: Page, arg1: str, arg2: str) -> None:
# 这里可以添加一些预处理逻辑
# 找到 id 为 textL 的 inputid 为 textR 的 input
input1 = await page.query_selector("#textL")
input2 = await page.query_selector("#textR")
if input1:
await input1.fill(arg1)
if input2:
await input2.fill(arg2)
# 等待 0.3 秒钟
await page.wait_for_timeout(300)
# 等待 id 为 loading 的元素不可见
loading = await page.query_selector("#loading")
if loading:
await loading.wait_for_element_state("hidden")
evt = on_alconna(
Alconna(
f"BA生成",
Args["arg1", str],
Args["arg2", str]
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent, arg1: str, arg2: str):
screenshot = await WebRenderer.render(
url="https://tmp.nulla.top/ba-logo/",
target="#canvas",
other_function=lambda page: continue_handle_3(page, arg1, arg2),
timeout=30
)
await evt.send(
await UniMessage().image(raw=screenshot).export()
)

13
scripts/watch_filter.py Normal file
View File

@ -0,0 +1,13 @@
from pathlib import Path
from watchfiles import Change
base = Path(__file__).parent.parent.absolute()
def filter(change: Change, path: str) -> bool:
if "__pycache__" in path:
return False
if Path(path).absolute().is_relative_to(base / "data"):
return False
return True