Compare commits

...

36 Commits

Author SHA1 Message Date
03900f4416 成语接龙接入 LLM 和 MarkDown、LaTeX 接入
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-09 23:12:04 +08:00
62f4195e46 Merge pull request '让豆包水印使用相对大小' (#47) from feature/doubao-watermark into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #47
2025-11-07 21:17:16 +08:00
751297e3bc Merge branch 'master' into feature/doubao-watermark 2025-11-07 21:17:09 +08:00
b450998f3f 让豆包水印使用相对大小 2025-11-07 21:15:19 +08:00
ae6297b98d Merge pull request '添加豆包水印' (#46) from feature/doubao-watermark into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #46
2025-11-07 19:18:41 +08:00
dacae29054 添加豆包水印 2025-11-07 19:18:24 +08:00
8acb546c6a Merge pull request '让浏览器等久一点' (#42) from feature/konaweb into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #42
2025-11-07 02:41:49 +08:00
49e0914416 让浏览器等久一点 2025-11-07 02:41:33 +08:00
5b74c78ec3 Merge pull request '更新 web_render 模块并支持前端渲染' (#41) from feature/konaweb into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #41
2025-11-07 02:31:06 +08:00
c911410276 更新 web_render 模块并支持前端渲染 2025-11-07 02:30:46 +08:00
37ca4bf11f Merge pull request '西多说 by 姬嵇' (#40) from feature/memepack/kiosay into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #40
2025-11-06 23:35:48 +08:00
8ef084c22a 西多说 by 姬嵇 2025-11-06 23:35:20 +08:00
57f0cd728f Merge pull request '使用 Discord Proxy 选项来下载图片' (#39) from bugfix/discord-image-download into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #39
2025-11-06 00:12:32 +08:00
627a29f57e 使用 Discord Proxy 选项来下载图片 2025-11-06 00:12:08 +08:00
650c500f47 Merge pull request '监听更广泛的 Discord 消息' (#38) from bugfix/discord-image-download into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #38
2025-11-06 00:06:49 +08:00
86acbe51e9 Merge remote-tracking branch 'origin/master' into bugfix/discord-image-download 2025-11-06 00:05:20 +08:00
4900a7e0ad Merge branch 'bugfix/discord-image-download' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot into bugfix/discord-image-download 2025-11-06 00:05:08 +08:00
34da08126b 监听更广泛的 event 2025-11-06 00:04:57 +08:00
00f416c8bc Merge pull request '改为使用 proxy url' (#37) from bugfix/discord-image-download into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #37
2025-11-05 23:59:54 +08:00
9c7d0a4486 Merge branch 'master' into bugfix/discord-image-download 2025-11-05 23:59:43 +08:00
e3b9d6723f 改为使用 proxy url 2025-11-05 23:58:55 +08:00
ef80399a90 Merge pull request '尝试解决 Discord 无法读取图片的问题' (#30) from bugfix/discord-image-download into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #30
2025-11-05 23:17:25 +08:00
bfbfa9d9be 尝试解决这个问题 2025-11-05 23:15:30 +08:00
6b7be4d3b0 Merge pull request '添加基础的 LLM 支持' (#29) from feature/LLM-base into master
Reviewed-on: #29
2025-11-05 20:40:40 +08:00
7c19c52d9f 添加关于 LLM 配置的文档 2025-11-05 20:36:51 +08:00
a5f4ae9bdc 添加基础的 LLM 支持 2025-11-05 18:40:13 +08:00
9320815d3f 修复无法更改图片的问题
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 20:59:58 +08:00
795300cb83 在每日答题情况添加记录点显示
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 18:42:47 +08:00
0231aa04f4 添加中间答案功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-01 18:29:10 +08:00
01fe33eb9f 部分解耦了 konaph 的一些层 2025-11-01 17:52:05 +08:00
adfbac7d90 支持正义 utf-8 2025-11-01 13:48:48 +08:00
994c1412da 为 Watchfiles 添加更可配置的过滤器
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-01 12:40:01 +08:00
8780dfec6f 在 Tag 成功后也进行 ntfy 通知
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-30 16:52:55 +08:00
490d807e7a 添加一些对题解提交空格的情况判定
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-30 16:48:09 +08:00
fa208199ab 我不小心多加了一个 s
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-29 22:01:13 +08:00
38a17f42a3 添加 Ntfy 构建消息的报告
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-29 21:59:11 +08:00
40 changed files with 1712 additions and 510 deletions

View File

@ -38,6 +38,17 @@ steps:
path: /var/run/docker.sock
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy
when:
status: [success, failure]
settings:
url: https://ntfy.service.jazzwhom.top
topic: drone_ci
tags:
- drone-ci
token:
from_secret: NTFY_TOKEN
volumes:
- name: docker-socket
@ -74,6 +85,17 @@ steps:
volumes:
- name: docker-socket
path: /var/run/docker.sock
- name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy
when:
status: [success, failure]
settings:
url: https://ntfy.service.jazzwhom.top
topic: drone_ci
tags:
- drone-ci
token:
from_secret: NTFY_TOKEN
volumes:
- name: docker-socket

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.REPL.enableREPLSmartSend": false
}

View File

@ -1,4 +1,4 @@
# 此方 bot
# konabot
在 MTTU 内部使用的 bot 一只。
@ -63,12 +63,20 @@ code .
配置 `ENABLE_CONSOLE=false`
#### 配置并支持 LLM大语言模型
详见[LLM 配置文档](/docs/LLM.md)。
#### 配置 konabot-web 以支持更高级的图片渲染
详见[konabot-web 配置文档](/docs/konabot-web.md)
### 运行
使用命令行手动启动 Bot
```bash
poetry run watchfiles bot.main konabot
poetry run watchfiles bot.main . --filter scripts.watch_filter.filter
```
如果你不希望自动重载,只是想运行 Bot可以直接运行

BIN
assets/img/meme/doubao.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.0 KiB

BIN
assets/img/meme/kiosay.jpg Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

65
docs/LLM.md Normal file
View File

@ -0,0 +1,65 @@
# 大语言模型平台接入
为实现更多神秘小功能,此方 Bot 需要接入 AI。如果你需要参与开发或测试涉及 AI 的相关功能,麻烦请根据下面的文档继续操作。
## 配置项目接入 AI
AI 相关的配置文件在 `data/config/llm.json` 文件中。示例格式如下,这也将是到时候在云端的配置文件格式(给出的模型都会有):
```json
{
"llms": {
"Qwen2.5-7B-Instruct": {
"base_url": "https://api.siliconflow.cn/v1",
"api_key": "sk-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"model_name": "Qwen/Qwen2.5-7B-Instruct"
},
"qwen3-max": {
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"api_key": "sk-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"model_name": "qwen3-max"
}
},
"default_llm": "Qwen2.5-7B-Instruct"
}
```
其中,形如 `qwen3-max` 的名称,是你在程序中调用 LLM 使用的键名。若不给出,则会默认使用配置文件中指定的默认模型。
```python
from konabot.common.llm import get_llm
llm = get_llm() # 获得的是 Qwen2.5-7B-Instruct 模型
llm = get_llm("qwen3-max") # 获得的是 qwen3-max 模型
message = await llm.chat([
{ "role": "system", "content": "你是一只猫娘" },
{ "role": "user", "content": "晚上好呀!" },
], timeout=None, max_tokens=16384)
# 获得了的是 openai.types.chat.ChatCompletionMessage 对象
print(f"AI 返回值:{message.content}") # 注意 content 可能为 None需要做空值检测
client = llm.get_openai_client() # 获得的是一个 OpenAI Client 对象,可以做更多操作
# 例如,调用 Embedding 模型来做知识库向量化等工作
```
## 本项目使用的模型清单
为了便利大家使用,我在这里给出该项目将会使用的模型清单,请根据你的开发需求注册并选择你最喜欢的模型。如果需要接入新的模型,或者使用到文档之外的模型,欢迎在这里给出!
### 硅基流动 Qwen/Qwen2.5-7B-Instruct
一个 7B 大小的 AI 模型。其性能不太能指望,但是它小,而且比较快,可以做一些轻量的操作。
该模型是免费的,但是也需要你注册[硅基流动](https://cloud.siliconflow.cn/me/models)账号,并生成 `api_key` 添加到配置文件中。
### 通义千问 qwen3-max
贵但是很先进的最新模型,其能力可以信赖。但是不要拿它做大量工作哦!
在[百炼大模型平台](https://bailian.console.aliyun.com/)注册账号并申请 `api_key`,新用户会赠送 1M tokens足够做测试了。
## 安全须知
请注意提防 AI 越狱等情况。

18
docs/konabot-web.md Normal file
View File

@ -0,0 +1,18 @@
# konabot-web 配置文档
本文档教你配置一个此方 Bot 的 Web 服务器。
## 安装并运行 konabot-web
按照 [konabot-web README](https://gitea.service.jazzwhom.top/mttu-developers/konabot-web) 安装并运行 konabot-web 实例。
## 指定 konabot-web 实例地址
如果你的 Web 服务器的端口不是 5173或者你有特殊的网络结构你需要手动设置 konabot-web。编辑 `.env` 文件:
```
MODULE_WEB_RENDER_WEBURL=http://web-server:port
MODULE_WEB_RENDER_INSTANCE=http://konabot-server:port
```
替换 web-server 为你的前端服务器地址konabot-server 为后端服务器地址port 为端口号。

View File

@ -19,12 +19,12 @@ class DataManager(Generic[T]):
if not self.fp.exists():
return self.cls()
try:
return self.cls.model_validate_json(self.fp.read_text())
return self.cls.model_validate_json(self.fp.read_text("utf-8"))
except ValidationError:
return self.cls()
def save(self, data: T):
self.fp.write_text(data.model_dump_json())
self.fp.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager
async def get_data(self):

View File

@ -0,0 +1,64 @@
from typing import Any
import openai
from loguru import logger
from openai.types.chat import ChatCompletion, ChatCompletionMessage, ChatCompletionMessageParam
from pydantic import BaseModel, Field
from konabot.common.path import CONFIG_PATH
LLM_CONFIG_PATH = CONFIG_PATH / 'llm.json'
if not LLM_CONFIG_PATH.exists():
LLM_CONFIG_PATH.write_text("{}")
class LLMInfo(BaseModel):
base_url: str
api_key: str
model_name: str
def get_openai_client(self):
return openai.AsyncClient(
api_key=self.api_key,
base_url=self.base_url,
)
async def chat(
self,
messages: list[ChatCompletionMessageParam],
timeout: float | None = 30.0,
max_tokens: int | None = None,
**kwargs: Any,
) -> ChatCompletionMessage:
logger.info(f"调用 LLM: BASE_URL={self.base_url} MODEL_NAME={self.model_name}")
completion: ChatCompletion = await self.get_openai_client().chat.completions.create(
messages=messages,
model=self.model_name,
max_tokens=max_tokens,
timeout=timeout,
stream=False,
**kwargs,
)
choice = completion.choices[0]
logger.info(
f"调用 LLM 完成: BASE_URL={self.base_url} MODEL_NAME={self.model_name} REASON={choice.finish_reason}"
)
return choice.message
class LLMConfig(BaseModel):
llms: dict[str, LLMInfo] = Field(default_factory=dict)
default_llm: str = "Qwen2.5-7B-Instruct"
llm_config = LLMConfig.model_validate_json(LLM_CONFIG_PATH.read_text())
def get_llm(llm_model: str | None = None):
if llm_model is None:
llm_model = llm_config.default_llm
if llm_model not in llm_config.llms:
raise NotImplementedError("LLM 未配置,该功能无法使用")
return llm_config.llms[llm_model]

View File

@ -240,7 +240,7 @@ def handle_long_task(callback_id: str):
def _load_longtask_data() -> LongTaskModuleData:
try:
txt = LONGTASK_DATA_DIR.read_text()
txt = LONGTASK_DATA_DIR.read_text("utf-8")
return LongTaskModuleData.model_validate_json(txt)
except (FileNotFoundError, ValidationError) as e:
logger.info(f"取得 LongTask 数据时出现问题:{e}")
@ -251,7 +251,7 @@ def _load_longtask_data() -> LongTaskModuleData:
def _save_longtask_data(data: LongTaskModuleData):
LONGTASK_DATA_DIR.write_text(data.model_dump_json())
LONGTASK_DATA_DIR.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager

View File

@ -8,20 +8,38 @@ import nonebot
from nonebot.matcher import Matcher
from nonebot.adapters import Bot, Event, Message
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.discord import MessageEvent as DiscordMessageEvent
from nonebot.adapters.discord.config import Config as DiscordConfig
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
import nonebot.params
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
from PIL import UnidentifiedImageError
from pydantic import BaseModel
from returns.result import Failure, Result, Success
from konabot.common.path import ASSETS_PATH
async def download_image_bytes(url: str) -> Result[bytes, str]:
discordConfig = nonebot.get_plugin_config(DiscordConfig)
class ExtractImageConfig(BaseModel):
module_extract_image_no_download: bool = False
"要不要算了,不下载了,直接爆炸算了,适用于一些比较奇怪的网络环境,无法从协议端下载文件"
module_config = nonebot.get_plugin_config(ExtractImageConfig)
async def download_image_bytes(url: str, proxy: str | None = None) -> Result[bytes, str]:
# if "/matcha/cache/" in url:
# url = url.replace('127.0.0.1', '10.126.126.101')
if module_config.module_extract_image_no_download:
return Success((ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes())
logger.debug(f"开始从 {url} 下载图片")
async with httpx.AsyncClient() as c:
async with httpx.AsyncClient(proxy=proxy) as c:
try:
response = await c.get(url)
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
@ -121,6 +139,14 @@ async def extract_image_from_message(
logger.debug('获取图片的路径 Fallback 到 QQ 模块')
return await extract_image_from_qq_message(msg, evt, bot, allow_reply)
if isinstance(evt, DiscordMessageEvent):
logger.debug('获取图片的路径方式走 Discord')
for a in evt.attachments:
if "image/" not in a.content_type:
continue
url = a.proxy_url
return (await download_image_bytes(url, discordConfig.discord_proxy)).bind(bytes_to_pil)
for seg in UniMessage.of(msg, bot):
logger.info(seg)
if isinstance(seg, Image):

View File

@ -1,10 +1,13 @@
from typing import Any, cast
import nonebot
from nonebot_plugin_alconna import UniMessage
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot_plugin_alconna import UniMessage
async def qq_broadcast(groups: list[str], msg: UniMessage[Any]):
async def qq_broadcast(groups: list[str], msg: UniMessage[Any] | str):
if isinstance(msg, str):
msg = UniMessage.text(msg)
bots: dict[str, OBBot] = {}
# group_id -> bot_id

View File

@ -6,6 +6,7 @@ FONTS_PATH = ASSETS_PATH / "fonts"
SRC_PATH = Path(__file__).resolve().parent.parent
DATA_PATH = SRC_PATH.parent / "data"
LOG_PATH = DATA_PATH / "logs"
CONFIG_PATH = DATA_PATH / "config"
DOCS_PATH = SRC_PATH / "docs"
DOCS_PATH_MAN1 = DOCS_PATH / "user"
@ -19,3 +20,5 @@ if not DATA_PATH.exists():
if not LOG_PATH.exists():
LOG_PATH.mkdir()
CONFIG_PATH.mkdir(exist_ok=True)

View File

@ -0,0 +1,17 @@
import asyncio
import functools
from typing import Awaitable, Callable, ParamSpec, TypeVar
TA = ParamSpec("TA")
T = TypeVar("T")
def make_async(func: Callable[TA, T]) -> Callable[TA, Awaitable[T]]:
@functools.wraps(func, assigned=("__module__", "__name__", "__qualname__", "__doc__", "__annotations__"))
async def wrapper(*args: TA.args, **kwargs: TA.kwargs):
return await asyncio.to_thread(func, *args, **kwargs)
return wrapper

View File

@ -1,211 +1,9 @@
import asyncio
import queue
from typing import Any, Callable, Coroutine
from loguru import logger
from playwright.async_api import Page, Playwright, async_playwright, Browser, Page, BrowserContext
from .config import web_render_config
from .core import WebRenderer as WebRenderer
from .core import WebRendererInstance as WebRendererInstance
PageFunction = Callable[[Page], Coroutine[Any, Any, Any]]
class WebRenderer:
browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue()
context_pool: dict[int, BrowserContext] = {} # 长期挂载的浏览器上下文池
page_pool: dict[str, Page] = {} # 长期挂载的页面池
@classmethod
async def get_browser_instance(cls) -> "WebRendererInstance":
if cls.browser_pool.empty():
instance = await WebRendererInstance.create()
cls.browser_pool.put(instance)
instance = cls.browser_pool.get()
cls.browser_pool.put(instance)
return instance
@classmethod
async def get_browser_context(cls) -> BrowserContext:
instance = await cls.get_browser_instance()
if id(instance) not in cls.context_pool:
context = await instance.browser.new_context()
cls.context_pool[id(instance)] = context
logger.debug(f"Created new persistent browser context for WebRendererInstance {id(instance)}")
return cls.context_pool[id(instance)]
@classmethod
async def render(
cls,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
'''
访问指定URL并返回截图
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
instance = await cls.get_browser_instance()
logger.debug(f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}")
return await instance.render(url, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def render_persistent_page(cls, page_id: str, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
'''
使用长期挂载的页面访问指定URL并返回截图
:param page_id: 页面唯一标识符
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
logger.debug(f"Requesting persistent render for page_id {page_id} at {url} targeting {target} with timeout {timeout}")
instance = await cls.get_browser_instance()
if page_id not in cls.page_pool:
context = await cls.get_browser_context()
page = await context.new_page()
cls.page_pool[page_id] = page
logger.debug(f"Created new persistent page for page_id {page_id} using WebRendererInstance {id(instance)}")
page = cls.page_pool[page_id]
return await instance.render_with_page(page, url, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def render_file(
cls,
file_path: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
'''
访问指定本地文件URL并返回截图
:param file_path: 目标文件路径
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
instance = await cls.get_browser_instance()
logger.debug(f"Using WebRendererInstance {id(instance)} to render file {file_path} targeting {target}")
return await instance.render_file(file_path, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def close_persistent_page(cls, page_id: str) -> None:
'''
关闭并移除长期挂载的页面
:param page_id: 页面唯一标识符
'''
if page_id in cls.page_pool:
page = cls.page_pool[page_id]
await page.close()
del cls.page_pool[page_id]
logger.debug(f"Closed and removed persistent page for page_id {page_id}")
class WebRendererInstance:
def __init__(self):
self._playwright: Playwright | None = None
self._browser: Browser | None = None
self.lock = asyncio.Lock()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None
return self._browser
async def init(self):
self._playwright = await async_playwright().start()
self._browser = await self.playwright.chromium.launch(headless=True)
@classmethod
async def create(cls) -> "WebRendererInstance":
instance = cls()
await instance.init()
return instance
async def render(
self,
url: str,
target: str,
index: int = 0,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30
) -> bytes:
'''
访问指定URL并返回截图
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param index: 如果目标是一个列表,指定要截图的元素索引
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
async with self.lock:
context = await self.browser.new_context()
page = await context.new_page()
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
await page.close()
await context.close()
return screenshot
async def render_with_page(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
async with self.lock:
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
return screenshot
async def render_file(self, file_path: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
file_path = "file:///" + str(file_path).replace("\\", "/")
return await self.render(file_path, target, index, params, other_function, timeout)
async def inner_render(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
logger.debug(f"Navigating to {url} with timeout {timeout}")
url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "")
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
logger.debug("Page loaded successfully")
# 等待目标元素出现
await page.wait_for_selector(target, timeout=timeout * 1000)
logger.debug(f"Target element '{target}' found, taking screenshot")
if other_function:
await other_function(page)
elements = await page.query_selector_all(target)
if not elements:
logger.error(f"Target element '{target}' not found on the page.")
return None
if index >= len(elements):
logger.error(f"Index {index} out of range for elements matching '{target}'")
return None
element = elements[index]
screenshot = await element.screenshot()
logger.debug(f"Screenshot taken successfully")
return screenshot
async def close(self):
await self.browser.close()
await self.playwright.stop()
def konaweb(sub_url: str):
sub_url = '/' + sub_url.removeprefix('/')
return web_render_config.module_web_render_weburl.removesuffix('/') + sub_url

View File

@ -0,0 +1,19 @@
import nonebot
from pydantic import BaseModel
class Config(BaseModel):
module_web_render_weburl: str = "localhost:5173"
module_web_render_instance: str = ""
def get_instance_baseurl(self):
if self.module_web_render_instance:
return self.module_web_render_instance.removesuffix('/')
config = nonebot.get_driver().config
ip = str(config.host)
if ip == "0.0.0.0":
ip = "127.0.0.1"
port = config.port
return f'http://{ip}:{port}'
web_render_config = nonebot.get_plugin_config(Config)

View File

@ -0,0 +1,281 @@
import asyncio
import queue
from typing import Any, Callable, Coroutine
from loguru import logger
from playwright.async_api import (
Page,
Playwright,
async_playwright,
Browser,
BrowserContext,
)
PageFunction = Callable[[Page], Coroutine[Any, Any, Any]]
class WebRenderer:
browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue()
context_pool: dict[int, BrowserContext] = {} # 长期挂载的浏览器上下文池
page_pool: dict[str, Page] = {} # 长期挂载的页面池
@classmethod
async def get_browser_instance(cls) -> "WebRendererInstance":
if cls.browser_pool.empty():
instance = await WebRendererInstance.create()
cls.browser_pool.put(instance)
instance = cls.browser_pool.get()
cls.browser_pool.put(instance)
return instance
@classmethod
async def get_browser_context(cls) -> BrowserContext:
instance = await cls.get_browser_instance()
if id(instance) not in cls.context_pool:
context = await instance.browser.new_context()
cls.context_pool[id(instance)] = context
logger.debug(
f"Created new persistent browser context for WebRendererInstance {id(instance)}"
)
return cls.context_pool[id(instance)]
@classmethod
async def render(
cls,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
"""
访问指定URL并返回截图
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
"""
instance = await cls.get_browser_instance()
logger.debug(
f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}"
)
return await instance.render(
url, target, params=params, other_function=other_function, timeout=timeout
)
@classmethod
async def render_persistent_page(
cls,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
"""
使用长期挂载的页面访问指定URL并返回截图
:param page_id: 页面唯一标识符
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
"""
logger.debug(
f"Requesting persistent render for page_id {page_id} at {url} targeting {target} with timeout {timeout}"
)
instance = await cls.get_browser_instance()
if page_id not in cls.page_pool:
context = await cls.get_browser_context()
page = await context.new_page()
cls.page_pool[page_id] = page
logger.debug(
f"Created new persistent page for page_id {page_id} using WebRendererInstance {id(instance)}"
)
page = cls.page_pool[page_id]
return await instance.render_with_page(
page,
url,
target,
params=params,
other_function=other_function,
timeout=timeout,
)
@classmethod
async def render_file(
cls,
file_path: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
"""
访问指定本地文件URL并返回截图
:param file_path: 目标文件路径
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
"""
instance = await cls.get_browser_instance()
logger.debug(
f"Using WebRendererInstance {id(instance)} to render file {file_path} targeting {target}"
)
return await instance.render_file(
file_path,
target,
params=params,
other_function=other_function,
timeout=timeout,
)
@classmethod
async def close_persistent_page(cls, page_id: str) -> None:
"""
关闭并移除长期挂载的页面
:param page_id: 页面唯一标识符
"""
if page_id in cls.page_pool:
page = cls.page_pool[page_id]
await page.close()
del cls.page_pool[page_id]
logger.debug(f"Closed and removed persistent page for page_id {page_id}")
class WebRendererInstance:
def __init__(self):
self._playwright: Playwright | None = None
self._browser: Browser | None = None
self.lock = asyncio.Lock()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None
return self._browser
async def init(self):
self._playwright = await async_playwright().start()
self._browser = await self.playwright.chromium.launch(headless=True)
@classmethod
async def create(cls) -> "WebRendererInstance":
instance = cls()
await instance.init()
return instance
async def render(
self,
url: str,
target: str,
index: int = 0,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
"""
访问指定URL并返回截图
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param index: 如果目标是一个列表,指定要截图的元素索引
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
"""
async with self.lock:
context = await self.browser.new_context()
page = await context.new_page()
screenshot = await self.inner_render(
page, url, target, index, params, other_function, timeout
)
await page.close()
await context.close()
return screenshot
async def render_with_page(
self,
page: Page,
url: str,
target: str,
index: int = 0,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
async with self.lock:
screenshot = await self.inner_render(
page, url, target, index, params, other_function, timeout
)
return screenshot
async def render_file(
self,
file_path: str,
target: str,
index: int = 0,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
file_path = "file:///" + str(file_path).replace("\\", "/")
return await self.render(
file_path, target, index, params, other_function, timeout
)
async def inner_render(
self,
page: Page,
url: str,
target: str,
index: int = 0,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
logger.debug(f"Navigating to {url} with timeout {timeout}")
url_with_params = url + (
"?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else ""
)
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
logger.debug("Page loaded successfully")
# 等待目标元素出现
await page.wait_for_selector(target, timeout=timeout * 1000)
logger.debug(f"Target element '{target}' found, taking screenshot")
if other_function:
await other_function(page)
elements = await page.query_selector_all(target)
if not elements:
logger.warning(f"Target element '{target}' not found on the page.")
elements = await page.query_selector_all('body')
if index >= len(elements):
logger.warning(f"Index {index} out of range for elements matching '{target}'")
index = 0
element = elements[index]
screenshot = await element.screenshot()
logger.debug("Screenshot taken successfully")
return screenshot
async def close(self):
await self.browser.close()
await self.playwright.stop()

View File

@ -0,0 +1,66 @@
import asyncio
import tempfile
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import cast
from fastapi import HTTPException
from fastapi.responses import FileResponse
import nanoid
import nonebot
from nonebot.drivers.fastapi import Driver as FastAPIDriver
from .config import web_render_config
app = cast(FastAPIDriver, nonebot.get_driver()).asgi
hosted_tempdirs: dict[str, Path] = {}
hosted_tempdirs_lock = asyncio.Lock()
@dataclass
class TempDir:
path: Path
url_base: str
def url_of(self, file: Path):
assert file.is_relative_to(self.path)
relative_path = file.relative_to(self.path)
url_path_segment = str(relative_path).replace("\\", "/")
return f"{self.url_base}/{url_path_segment}"
@asynccontextmanager
async def host_tempdir():
with tempfile.TemporaryDirectory() as tempdir:
fp = Path(tempdir)
nid = nanoid.generate(size=10)
async with hosted_tempdirs_lock:
hosted_tempdirs[nid] = fp
yield TempDir(
path=fp,
url_base=f"{web_render_config.get_instance_baseurl()}/tempdir/{nid}",
)
async with hosted_tempdirs_lock:
del hosted_tempdirs[nid]
@app.get("/tempdir/{nid}/{file_path:path}")
async def _(nid: str, file_path: str):
async with hosted_tempdirs_lock:
base_path = hosted_tempdirs.get(nid)
if base_path is None:
raise HTTPException(404)
full_path = base_path / file_path
try:
if not full_path.resolve().is_relative_to(base_path.resolve()):
raise HTTPException(status_code=403, detail="Access denied.")
except Exception:
raise HTTPException(status_code=403, detail="Access denied.")
if not full_path.is_file():
raise HTTPException(status_code=404, detail="File not found.")
return FileResponse(full_path.resolve())

View File

@ -0,0 +1,11 @@
关于「中间答案」或者「提示」:
在 KonaPH 中,当有人发送「提交答案 答案」时,会检查答案是否符合你设置的中间答案的 pattern。这个 pattern 可以有两种方式:
- 纯文本的完整匹配:你设置的 pattern 如果和提交的答案完全相等,则会触发提示。
- regex 匹配:你设置的 pattern 如果以斜杠(/)开头和结尾,就会检查提交的答案是否匹配正则表达式。注意 ^ 和 $ 符号的使用。
- 例如:/^commit$/ 会匹配 commit不会匹配 acommit、Commit 等。
- 而如果是 /commit/,则会匹配 commit、acommit而不会匹配 Commit。
- 无法使用 Javascript 的字符串声明模式,例如,/case_insensitive/i 就不会被视作一个正则表达式。
一个提示是提示,还是中间答案,取决于它是否有 checkpoint 标记。如果有 checkpoint 标记,则会提示用户「你回答了一个中间答案」,并且这个中间答案的回答会在排行榜中显示。

View File

@ -21,16 +21,19 @@ from nonebot_plugin_alconna import (
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
DATA_FILE_PATH = (
Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
)
from konabot.common.llm import get_llm
DATA_DIR = Path(__file__).parent.parent.parent.parent / "data"
DATA_FILE_PATH = (
DATA_DIR / "idiom_banned.json"
)
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
return []
try:
return json.loads(DATA_FILE_PATH.read_text())
return json.loads(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
@ -45,14 +48,14 @@ def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
class TryStartState(Enum):
@ -75,6 +78,27 @@ class TryVerifyState(Enum):
BUT_NO_NEXT = 5
GAME_END = 6
class IdiomGameLLM:
@classmethod
async def verify_idiom_with_llm(cls, idiom: str) -> bool:
if len(idiom) != 4:
return False
llm = get_llm()
system_prompt = "请判断用户的输入是否为一个合理的成语或者这四个字在中文环境下是否说得通。如果是请回答「T」否则回答「F」。请注意即使这个词不是成语如果说得通也就是能念起来很通顺你也该输出「T」。请不要包含任何解释也不要包含任何标点符号。"
message = await llm.chat([{"role": "system", "content": system_prompt}, {"role": "user", "content": idiom}])
answer = message.content
logger.info(f"LLM 对成语 {idiom} 的判断结果是 {answer}")
if answer == "T":
await cls.storage_idiom(idiom)
return answer == "T"
@classmethod
async def storage_idiom(cls, idiom: str):
# 将 idiom 存入本地文件以备后续分析
with open(DATA_DIR / "idiom_llm_storage.txt", "a", encoding="utf-8") as f:
f.write(idiom + "\n")
IdiomGame.append_into_word_list(idiom)
class IdiomGame:
ALL_WORDS = [] # 所有四字词语
@ -101,6 +125,17 @@ class IdiomGame:
self.idiom_history: list[list[str]] = [] # 成语使用历史记录,多个数组以存储不同成语链
IdiomGame.INSTANCE_LIST[group_id] = self
@classmethod
def append_into_word_list(cls, word: str):
'''
将一个新词加入到词语列表中
'''
if word not in cls.ALL_WORDS:
cls.ALL_WORDS.append(word)
if word[0] not in cls.IDIOM_FIRST_CHAR:
cls.IDIOM_FIRST_CHAR[word[0]] = []
cls.IDIOM_FIRST_CHAR[word[0]].append(word)
def be_able_to_play(self) -> bool:
if self.last_play_date != datetime.date.today():
self.last_play_date = datetime.date.today()
@ -186,7 +221,7 @@ class IdiomGame:
用户发送成语
"""
async with self.lock:
state = self._verify_idiom(idiom, user_id)
state = await self._verify_idiom(idiom, user_id)
return state
def is_nextable(self, last_char: str) -> bool:
@ -218,16 +253,24 @@ class IdiomGame:
result.append(" -> ".join(chain))
return result
def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
async def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
state = []
# 新成语的首字应与上一条成语的尾字相同
if idiom[0] != self.last_char:
state.append(TryVerifyState.WRONG_FIRST_CHAR)
return state
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
logger.info(f"用户 {user_id} 发送了未知词语 {idiom},正在使用 LLM 进行验证")
try:
if not await IdiomGameLLM.verify_idiom_with_llm(idiom):
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
except Exception as e:
logger.error(f"LLM 验证成语 {idiom} 时出现错误:{e}")
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
# 成语合法,更新状态
self.add_history_idiom(idiom)
score_k = 0.5 ** self.get_already_used_num(idiom) # 每被使用过一次,得分减半
@ -335,6 +378,16 @@ class IdiomGame:
logger.debug(f"Loaded {len(THUOCL_WORDS)} words from THUOCL txt files")
logger.debug(f"Sample words: {THUOCL_WORDS[:5]}")
# 读取本地的 idiom_llm_storage.txt 文件,补充词语表
LOCAL_LLM_WORDS = []
if (DATA_DIR / "idiom_llm_storage.txt").exists():
with open(DATA_DIR / "idiom_llm_storage.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
LOCAL_LLM_WORDS.append(word)
logger.debug(f"Loaded additional {len(LOCAL_LLM_WORDS)} words from idiom_llm_storage.txt")
# 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
@ -344,6 +397,7 @@ class IdiomGame:
[word for word in cls.ALL_WORDS if len(word) == 4]
+ THUOCL_WORDS
+ COMMON_WORDS
+ LOCAL_LLM_WORDS
)
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重

View File

@ -1,16 +1,23 @@
from functools import reduce
from math import ceil
import datetime
import re
from loguru import logger
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
from konabot.common.longtask import DepLongTaskTarget
from math import ceil
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
on_alconna)
from nonebot_plugin_apscheduler import scheduler
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
create_admin_commands,
puzzle_manager)
create_admin_commands()
@ -23,16 +30,24 @@ async def is_play_group(target: DepLongTaskTarget):
return False
cmd_submit = on_alconna(Alconna(
"re:提交(?:答案|题解|[fF]lag)",
Args["flag", str],
), rule=is_play_group)
cmd_submit = on_message(rule=is_play_group)
@cmd_submit.handle()
async def _(flag: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
result = manager.submit(target.target_id, flag)
await target.send_message(result.get_unimessage())
async def _(msg: UniMsg, target: DepLongTaskTarget):
txt = msg.extract_plain_text().strip()
if match := re.match(r"^提交(?:答案|题解|[fF]lag)\s*(?P<submission>.+?)\s*$", txt):
submission: str = match.group("submission")
async with puzzle_manager() as manager:
result = manager.submit(target.target_id, submission)
if isinstance(result, str):
await target.send_message(result)
else:
await target.send_message(get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
))
cmd_query = on_alconna(Alconna(
@ -45,7 +60,7 @@ async def _(target: DepLongTaskTarget):
p = manager.get_today_puzzle()
if p is None:
return await target.send_message("今天无题,改日再来吧!")
await target.send_message(p.get_unimessage())
await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna(
@ -54,44 +69,11 @@ cmd_query_submission = on_alconna(Alconna(
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
async with puzzle_manager() as manager:
p = manager.get_today_puzzle()
if p is None:
return await target.send_message("今天无题")
msg = UniMessage.text("==== 今日答题情况 ====\n\n")
subcount = len(reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg = msg.text(
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]")
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [💦 {tries} 提交]")
await target.send_message(msg)
await target.send_message(get_daily_report_v2(manager, gid))
cmd_history = on_alconna(Alconna(
@ -106,15 +88,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
today = get_today_date()
if index_id:
index_id = index_id.removeprefix("#")
if index_id == manager.daily_puzzle_of_date.get(today, ""):
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
return await target.send_message(puzzle.get_unimessage())
if index_id in manager.daily_puzzle:
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
msg = puzzle.get_unimessage()
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}")
return await target.send_message(msg)
return await target.send_message("没有这道题哦")
if index_id not in manager.daily_puzzle:
return await target.send_message("没有这道题哦")
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
msg = get_puzzle_description(
puzzle,
with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")),
)
return await target.send_message(msg)
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
puzzles = [
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
@ -141,15 +122,14 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@scheduler.scheduled_job("cron", hour="8")
async def _():
async with puzzle_manager() as manager:
msg2 = manager.get_report_yesterday()
yesterday = get_today_date() - datetime.timedelta(days=1)
msg2 = get_daily_report(manager, yesterday)
if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
puzzle = manager.get_today_puzzle()
if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage())
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
else:
logger.info("自动任务:没有找到题目,跳过")

View File

@ -0,0 +1,29 @@
import nanoid
from konabot.common.path import ASSETS_PATH
from konabot.plugins.kona_ph.core.path import KONAPH_IMAGE_BASE
class PuzzleImageManager:
def read_puzzle_image(self, img_name: str) -> bytes:
fp = KONAPH_IMAGE_BASE / img_name
if fp.exists():
return fp.read_bytes()
return (ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes()
def upload_puzzle_image(self, data: bytes, suffix: str = ".png") -> str:
id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
)
img_name = f"{id}{suffix}"
(KONAPH_IMAGE_BASE / img_name).write_bytes(data)
return img_name
def remove_puzzle_image(self, img_name: str):
if img_name:
(KONAPH_IMAGE_BASE / img_name).unlink(True)
def get_image_manager() -> PuzzleImageManager:
return PuzzleImageManager()

View File

@ -0,0 +1,187 @@
"""
生成各种各样的 Message 的函数集合
"""
import datetime
import functools
import re
from typing import Any
from nonebot_plugin_alconna import UniMessage
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.storage import (DailyPuzzleInfo, Puzzle,
PuzzleManager,
PuzzleSubmission)
def get_puzzle_description(puzzle: Puzzle, with_answer: bool = False) -> UniMessage[Any]:
"""
获取一个谜题的描述
"""
img_manager = get_image_manager()
result = UniMessage.text(f"[KonaPH#{puzzle.index_id}] {puzzle.title}")
result = result.text(f"\n\n{puzzle.content}")
if puzzle.img_name:
result = result.text("\n\n").image(
raw=img_manager.read_puzzle_image(puzzle.img_name)
)
result = result.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
if with_answer:
result = result.text(f"\n\n题目答案:{puzzle.flag}")
else:
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def get_submission_message(
puzzle: Puzzle,
submission: PuzzleSubmission,
daily_puzzle_info: DailyPuzzleInfo | None = None,
) -> str:
"""
获得提交答案的反馈信息
"""
if submission.success:
rank = -1
if daily_puzzle_info is not None:
rank = len(daily_puzzle_info.success_users)
return f"🎉 恭喜你答对了!你是今天第 {rank} 个解出来的!"
if submission.hint_id >= 0 and (
hint := puzzle.hints.get(submission.hint_id)
) is not None:
if hint.is_checkpoint:
hint_msg = "✨ 恭喜!这是本题的中间答案,加油!"
else:
hint_msg = "🤔 答错啦!请检查你的答案。"
return f"{hint_msg}\n\n提示:{hint.message}"
return "❌ 答错啦!请检查你的答案。"
def get_daily_report(
manager: PuzzleManager,
date: datetime.date,
) -> str | None:
"""
获得某日的提交的报告信息
"""
index_id = manager.daily_puzzle_of_date.get(date)
if index_id is None:
return None
info = manager.daily_puzzle[index_id]
puzzle = manager.puzzle_data[info.raw_id]
msg = f"[KonaPH#{puzzle.index_id}] 「{puzzle.title}」解答报告\n\n"
if len(info.success_users) == 0:
msg += "昨日,无人解出此题 😭😭\n\n"
else:
msg += f"昨日,共有 {len(info.success_users)} 人解出此题。\n\n"
msg += "前五名的解答者:\n\n"
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = manager.submissions[puzzle.raw_id][u][-1]
msg += f"- {get_username(u)}{m.time.strftime('%H:%M')}\n"
msg += "\n"
msg += f"出题人:{get_username(puzzle.author_id)}"
return msg
def get_daily_report_v2(manager: PuzzleManager, gid: int | None = None):
p = manager.get_today_puzzle()
if p is None:
return "今天无题"
msg = "==== 今日答题情况 ====\n\n"
subcount = len(functools.reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg += (
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg += f"\n- {uname} [🎉 {t} 完成 | {tries} 提交]"
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
checkpoints_touched = len(set((
s.hint_id for s in manager.submissions[p.raw_id][u]
if (
s.hint_id >= 0
and s.hint_id in p.hints
and p.hints[s.hint_id].is_checkpoint
)
)))
checkpoint_message = ""
if checkpoints_touched > 0:
checkpoint_message = f" | 🚩 {checkpoints_touched} 记录点"
msg += f"\n- {uname} [💦 {tries} 提交{checkpoint_message}]"
return msg
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
image_manager = get_image_manager()
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=image_manager.read_puzzle_image(puzzle.img_name))
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def get_puzzle_hint_list(puzzle: Puzzle) -> str:
msg = f"==== {puzzle.title} 提示与中间答案 ====\n"
if len(puzzle.hints) == 0:
msg += "\n你没有添加任何中间答案。"
return msg
for hint_id, hint in puzzle.hints.items():
n = {False: "[提示]", True: "[中间答案]"}[hint.is_checkpoint]
msg += f"\n{n}[{hint_id}] {hint.pattern}"
msg += f"\n {hint.message}"
return msg

View File

@ -0,0 +1,9 @@
from konabot.common.path import DATA_PATH
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)

View File

@ -1,27 +1,26 @@
import asyncio
import datetime
import random
import re
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
import nanoid
from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, Field, ValidationError
from konabot.common.path import DATA_PATH
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.path import KONAPH_DATA_JSON
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
class PuzzleHint(BaseModel):
pattern: str
message: str
is_checkpoint: bool
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
hint_id: int = -1
class Puzzle(BaseModel):
@ -40,41 +39,47 @@ class Puzzle(BaseModel):
ready: bool = False
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
def get_image_path(self) -> Path:
return KONAPH_IMAGE_BASE / self.img_name
hints: dict[int, PuzzleHint] = Field(default_factory=dict)
def get_unimessage(self) -> UniMessage[Any]:
result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}")
result = result.text(f"\n\n{self.content}")
@property
def hint_id_max(self) -> int:
return max((0, *self.hints.keys()))
if self.img_name:
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes())
result = result.text(f"\n\n出题者:{get_username(self.author_id)}")
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def add_image(self, img: bytes, suffix: str = ".png"):
if self.img_name:
self.get_image_path().unlink(True)
img_id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
def check_submission(
self,
submission: str,
time: datetime.datetime | None = None,
) -> PuzzleSubmission:
if time is None:
time = datetime.datetime.now()
if submission == self.flag:
return PuzzleSubmission(
success=True,
flag=submission,
time=time,
)
for hint_id, hint in self.hints.items():
if hint.pattern.startswith('/') and hint.pattern.endswith('/'):
if re.match(hint.pattern.strip('/'), submission):
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
else:
if hint.pattern == submission:
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
hint_id=hint_id,
)
return PuzzleSubmission(
success=False,
flag=submission,
time=time,
)
self.img_name = f"{img_id}{suffix}"
self.get_image_path().write_bytes(img)
def remove_image(self):
if self.img_name:
self.get_image_path().unlink(True)
self.img_name = ""
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
class DailyPuzzleInfo(BaseModel):
@ -84,15 +89,10 @@ class DailyPuzzleInfo(BaseModel):
success_users: dict[str, datetime.datetime] = {}
class PuzzleSubmissionResultMessage(BaseModel):
success: bool
rank: int = -1
message: str = ""
def get_unimessage(self) -> UniMessage[Any]:
if self.success:
return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!")
return UniMessage.text(self.message)
class PuzzleSubmissionFeedback(BaseModel):
submission: PuzzleSubmission
puzzle: Puzzle
info: DailyPuzzleInfo | None = None
def get_today_date() -> datetime.date:
@ -162,26 +162,6 @@ class PuzzleManager(BaseModel):
self.index_id_counter += 1
def fix(self):
# 尝试修复今日的数据
for p in self.puzzle_data.values():
if self.is_puzzle_published(p.raw_id):
p.ready = True
if self.puzzle_pinned not in self.unpublished_puzzles:
self.puzzle_pinned = ""
# 撤回重复发布的题
already_published: set[str] = set()
for date in list(self.daily_puzzle_of_date.keys()):
index_id = self.daily_puzzle_of_date[date]
info = self.daily_puzzle[index_id]
if info.raw_id in already_published:
del self.daily_puzzle[index_id]
del self.daily_puzzle_of_date[date]
else:
already_published.add(info.raw_id)
def admin_pin_puzzle(self, raw_id: str):
if raw_id in self.puzzle_data:
self.puzzle_pinned = raw_id
@ -213,41 +193,23 @@ class PuzzleManager(BaseModel):
return
return self.daily_puzzle[p.index_id]
def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage:
def submit(self, user: str, flag: str) -> PuzzleSubmissionFeedback | str:
p = self.get_today_puzzle()
d = self.get_today_info()
now = datetime.datetime.now()
if p is None or d is None:
return PuzzleSubmissionResultMessage(
success=False,
message="今天没有题哦,改天再来吧!",
)
return "今天没有题哦,改天再来吧!"
if user in d.success_users:
return PuzzleSubmissionResultMessage(
success=False,
message="你今天已经答对过啦!不用重复提交哦!",
)
if flag != p.flag:
d.tried_users.add(user)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=False,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=False,
message="❌ 答错了,请检查你的答案哦",
)
return "你今天已经答对过啦!不用重复提交哦!"
d.tried_users.add(user)
d.success_users[user] = now
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=True,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=True,
rank=len(d.success_users),
result = p.check_submission(flag, now)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(result)
if result.success:
d.success_users[user] = now
return PuzzleSubmissionFeedback(
submission=result,
puzzle=p,
info=d,
)
def admin_create_puzzle(self, user: str):
@ -273,47 +235,20 @@ class PuzzleManager(BaseModel):
if p.author_id == user
], key=lambda p: p.created_at, reverse=True)
def get_report_yesterday(self):
yesterday = get_today_date() - datetime.timedelta(days=1)
index_id = self.daily_puzzle_of_date.get(yesterday)
if index_id is None:
return None
info = self.daily_puzzle[index_id]
puzzle = self.puzzle_data[info.raw_id]
message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告")
if len(info.success_users) == 0:
message = message.text(
"\n\n昨日,竟无人解出此题!"
)
else:
message = message.text(
f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:"
)
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = self.submissions[puzzle.raw_id][u][-1]
message = message.text(f"- {get_username(u)}{m.time.strftime('%H:%M')}")
message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
return message
lock = asyncio.Lock()
def read_data():
try:
data_raw = KONAPH_DATA_JSON.read_text()
data_raw = KONAPH_DATA_JSON.read_text("utf-8")
return PuzzleManager.model_validate_json(data_raw)
except (FileNotFoundError, ValidationError):
return PuzzleManager()
def write_data(data: PuzzleManager):
KONAPH_DATA_JSON.write_text(data.model_dump_json())
KONAPH_DATA_JSON.write_text(data.model_dump_json(), "utf-8")
@asynccontextmanager

View File

@ -1,15 +1,24 @@
import datetime
from math import ceil
from typing import Any
from nonebot import get_plugin_config
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
Subcommand, SubcommandResult, UniMessage,
on_alconna)
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
get_puzzle_info_message,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
get_today_date,
puzzle_manager)
PUZZLE_PAGE_SIZE = 10
@ -31,31 +40,15 @@ def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"出题者: {get_username(puzzle.author_id)} | {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: {status}{status_suffix}\n\n"
f"标题: {puzzle.title}\n"
f"Flag: {puzzle.flag}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
if raw_id not in manager.puzzle_data:
raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id]
if is_puzzle_admin(target):
return puzzle
if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
return puzzle
def create_admin_commands():
@ -82,7 +75,45 @@ def create_admin_commands():
),
Subcommand("publish", Args["raw_id?", str], dest="publish"),
Subcommand("preview", Args["raw_id", str], dest="preview"),
Subcommand("get-submits", Args["raw_id", str], dest="get-submits")
Subcommand("get-submits", Args["raw_id", str], dest="get-submits"),
Subcommand(
"test",
Args["raw_id", str],
Args["submission", str],
dest="test",
),
Subcommand(
"hint",
Subcommand(
"add",
Args["raw_id", str],
Args["pattern", str],
Args["message", str],
dest="add",
),
Subcommand(
"list",
Args["raw_id", str],
Args["page?", int],
dest="list",
),
Subcommand(
"modify",
Args["raw_id", str],
Args["hint_id", int],
Option("--pattern", Args["pattern", str], alias=["-p"]),
Option("--message", Args["message", str], alias=["-m"]),
Option("--checkpoint", Args["is_checkpoint", bool], alias=["-c"]),
dest="modify",
),
Subcommand(
"delete",
Args["raw_id", str],
Args["hint_id", int],
dest="delete",
),
dest="hint",
),
),
rule=is_puzzle_manager,
)
@ -98,6 +129,8 @@ def create_admin_commands():
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
msg = msg.text("konaph preview <id> - 预览一个题目的效果,不会展示答案\n")
msg = msg.text("konaph get-submits <id> - 获得题目的提交记录\n")
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
@ -122,15 +155,7 @@ def create_admin_commands():
@cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
p = check_puzzle(manager, target, raw_id)
if p.ready:
return await target.send_message(UniMessage.text(
"题目早就准备好啦!"
@ -143,15 +168,7 @@ def create_admin_commands():
@cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
p = check_puzzle(manager, target, raw_id)
if not p.ready:
return await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经是未取消状态了!"
@ -169,16 +186,7 @@ def create_admin_commands():
@cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限查看详细信息!"
))
p = check_puzzle(manager, target, raw_id)
await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my")
@ -214,7 +222,9 @@ def create_admin_commands():
@cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text("你没有权限查看所有的哦"))
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()]
if ready.available:
@ -293,24 +303,26 @@ def create_admin_commands():
" --image <图片> 图片\n"
" --remove-image 删除图片"
)
image_manager = get_image_manager()
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message("没有这个谜题")
p = manager.puzzle_data[raw_id]
if not is_puzzle_admin(target) and target.target_id != p.author_id:
return await target.send_message("你没有权限编辑这个谜题")
p = check_puzzle(manager, target, raw_id)
if title is not None:
p.title = title
if description is not None:
p.content = description
if flag is not None:
p.flag = flag
p.flag = flag.strip()
if flag.strip() != flag:
await target.send_message(
"⚠️ 注意:你输入的 Flag 含有开头或结尾的空格,已经帮你去除"
)
if image is not None and image.url is not None:
b = await download_image_bytes(image.url)
p.add_image(b.unwrap())
image_manager.remove_puzzle_image(p.img_name)
p.img_name = image_manager.upload_puzzle_image(b.unwrap())
elif remove_image.available:
p.remove_image()
image_manager.remove_puzzle_image(p.img_name)
info2 = get_puzzle_info_message(manager, p)
@ -318,6 +330,10 @@ def create_admin_commands():
@cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
today = get_today_date()
async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date:
@ -328,18 +344,14 @@ def create_admin_commands():
p = manager.get_today_puzzle(strong=True)
if p is None:
return await target.send_message("上架失败了orz可能是没题了")
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage())
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(p))
return await target.send_message("Ok!")
@cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str):
async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None:
return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
return await target.send_message("你没有权限预览这个谜题")
return await target.send_message(puzzle.get_unimessage())
p = check_puzzle(manager, target, raw_id)
return await target.send_message(get_puzzle_description(p))
@cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str):
@ -357,5 +369,96 @@ def create_admin_commands():
msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg)
@cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
"""
测试一道谜题的回答,并给出结果
"""
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
result = p.check_submission(submission)
msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
if len(subcommands.result.subcommands) > 0:
return
return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
.text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n")
.text(" - --checkpoint [True|False]\n - 更改是否为中间答案\n")
.text("- konaph hint delete <id> <hint_id>\n - 删除一个提示 / 中间答案\n")
.text("\n更多关于 pattern 和中间答案的信息,请见 man中间答案(7)")
)
@cmd_admin.assign("subcommands.hint.add")
async def _(
target: DepLongTaskTarget,
raw_id: str,
pattern: str,
message: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern,
message=message,
is_checkpoint=False,
)
await target.send_message("创建成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.list")
async def _(
target: DepLongTaskTarget,
raw_id: str,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
pattern: str | None = None,
message: str | None = None,
is_checkpoint: bool | None = None,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
hint = p.hints[hint_id]
if pattern is not None:
hint.pattern = pattern
if message is not None:
hint.message = message
if is_checkpoint is not None:
hint.is_checkpoint = is_checkpoint
await target.send_message("更改成功!\n\n" + get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.delete")
async def _(
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
)
del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin

View File

@ -0,0 +1,40 @@
"""
肥肠危险注意:本文件仅用于开发环境测试 LLM 模块能否正常工作!
请不要在生产环境启用它!
"""
import nonebot
from nonebot_plugin_alconna import Alconna, Args, on_alconna
from pydantic import BaseModel
from konabot.common.llm import get_llm
from konabot.common.longtask import DepLongTaskTarget
class LLMTestConfig(BaseModel):
debug_enable_llm_test: bool = False
config = nonebot.get_plugin_config(LLMTestConfig)
if config.debug_enable_llm_test:
cmd = on_alconna(Alconna(
"debug-ask-llm",
Args["prompt", str],
))
@cmd.handle()
async def _(prompt: str, target: DepLongTaskTarget):
llm = get_llm()
msg = await llm.chat(
[
{"role": "user", "content": prompt}
],
timeout=None,
max_tokens=1024,
)
content = msg.content or ""
await target.send_message(content)

View File

@ -1,4 +1,3 @@
from curses.ascii import isdigit
from pathlib import Path
import nonebot
@ -40,7 +39,10 @@ async def _(
doc: str | None,
event: nonebot.adapters.Event,
):
if doc is not None and section is None and all(isdigit(c) for c in doc):
if doc is not None and section is None and all(
ord('0') <= ord(c) <= ord('9')
for c in doc
):
section = int(doc)
doc = None

View File

@ -0,0 +1,76 @@
from loguru import logger
import nonebot
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (
UniMessage,
UniMsg
)
from playwright.async_api import ConsoleMessage, Page
from konabot.common.web_render import konaweb
from konabot.common.web_render.core import WebRenderer
from konabot.plugins.markdown.core import MarkDownCore
def is_markdown_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "markdown" not in txt[:10] or "md" not in txt[:3]:
return False
return True
evt = nonebot.on_message(rule=is_markdown_mentioned)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
content = msg.extract_plain_text()
else:
content = msg.extract_plain_text()
logger.debug(f"Received markdown command with content: {content}")
if "md" in content[:3]:
message = content.replace("md", "", 1).strip()
else:
message = content.replace("markdown", "", 1).strip()
# 如果回复了消息,则转换回复的内容
if(len(message) == 0):
if event.reply:
message = event.reply.message.extract_plain_text()
else:
return
logger.debug(f"Markdown content to render: {message}")
out = await MarkDownCore.render_markdown(message, theme="dark")
await evt.send(await UniMessage().image(raw=out).export())
def is_latex_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "latex" not in txt[:8]:
return False
return True
evt = nonebot.on_message(rule=is_latex_mentioned)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
content = msg.extract_plain_text()
else:
content = msg.extract_plain_text()
logger.debug(f"Received markdown command with content: {content}")
message = content.replace("latex", "", 1).strip()
# 如果回复了消息,则转换回复的内容
if(len(message) == 0):
if event.reply:
message = event.reply.message.extract_plain_text()
else:
return
logger.debug(f"Latex content to render: {message}")
out = await MarkDownCore.render_latex(message, theme="dark")
await evt.send(await UniMessage().image(raw=out).export())

View File

@ -0,0 +1,57 @@
from loguru import logger
from playwright.async_api import ConsoleMessage, Page
from konabot.common.web_render import konaweb
from konabot.common.web_render.core import WebRenderer
class MarkDownCore:
@staticmethod
async def render_markdown(markdown_text: str, theme: str = "dark", params: dict = {}) -> bytes:
async def page_function(page: Page):
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
await page.emulate_media(color_scheme=theme)
page.on('console', on_console)
await page.locator('textarea[name=content]').fill(markdown_text)
await page.wait_for_timeout(200)
await page.locator('#button').click()
await page.wait_for_timeout(200)
out = await WebRenderer.render(
konaweb('markdown'),
target='#main',
other_function=page_function,
params=params
)
return out
@staticmethod
async def render_latex(text: str, theme: str = "dark") -> bytes:
params = {
"size": "2em",
}
async def page_function(page: Page):
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
await page.emulate_media(color_scheme=theme)
page.on('console', on_console)
await page.locator('textarea[name=content]').fill(f"$$ {text} $$")
await page.wait_for_timeout(200)
await page.locator('#button').click()
await page.wait_for_timeout(200)
out = await WebRenderer.render(
konaweb('latex'),
target='#main',
other_function=page_function,
params=params
)
return out

View File

@ -1,6 +1,7 @@
from io import BytesIO
from typing import Iterable, cast
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import (
Alconna,
@ -14,8 +15,12 @@ from nonebot_plugin_alconna import (
UniMsg,
on_alconna,
)
from playwright.async_api import ConsoleMessage, Page
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.common.web_render import konaweb
from konabot.common.web_render.core import WebRenderer
from konabot.common.web_render.host_images import host_tempdir
from konabot.plugins.memepack.drawing.display import (
draw_cao_display,
draw_snaur_display,
@ -24,10 +29,12 @@ from konabot.plugins.memepack.drawing.display import (
from konabot.plugins.memepack.drawing.saying import (
draw_cute_ten,
draw_geimao,
draw_kiosay,
draw_mnk,
draw_pt,
draw_suan,
)
from konabot.plugins.memepack.drawing.watermark import draw_doubao_watermark
from nonebot.adapters import Bot, Event
@ -275,3 +282,77 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
.export()
)
kiosay = on_alconna(
Alconna(
"西多说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写西多说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@kiosay.handle()
async def _(saying: list[str]):
img = await draw_kiosay("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await kiosay.send(await UniMessage().image(raw=img_bytes).export())
quote_cmd = on_alconna(Alconna(
"名人名言",
Args["quote", str],
Args["author", str],
Args["image?", Image | None],
), aliases={"quote"})
@quote_cmd.handle()
async def _(quote: str, author: str, img: PIL_Image):
async with host_tempdir() as tempdir:
img_path = tempdir.path / "image.png"
img_url = tempdir.url_of(img_path)
img.save(img_path)
async def page_function(page: Page):
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
page.on('console', on_console)
await page.locator('input[name=image]').fill(img_url)
await page.locator('input[name=quote]').fill(quote)
await page.locator('input[name=author]').fill(author)
await page.wait_for_timeout(500)
await page.wait_for_load_state('networkidle')
await page.wait_for_timeout(500)
out = await WebRenderer.render(
konaweb('makequote'),
target='#main',
other_function=page_function,
)
await quote_cmd.send(await UniMessage().image(raw=out).export())
doubao_cmd = on_alconna(Alconna(
"豆包水印",
Args["image?", Image | None],
))
@doubao_cmd.handle()
async def _(img: PIL_Image):
result = await draw_doubao_watermark(img)
result_bytes = BytesIO()
result.save(result_bytes, format="PNG")
await doubao_cmd.send(await UniMessage().image(raw=result_bytes).export())

View File

@ -5,6 +5,7 @@ import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from konabot.common.utils.to_async import make_async
from .base.fonts import HARMONYOS_SANS_SC_BLACK, HARMONYOS_SANS_SC_REGULAR, LXGWWENKAI_REGULAR
@ -14,6 +15,7 @@ mnk_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "mnksay.jpg").convert(
dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert("RGBA")
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
cute_ten_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "tententen.png").convert("RGBA")
kio_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "kiosay.jpg").convert("RGBA")
def _draw_geimao(saying: str):
@ -29,7 +31,7 @@ def _draw_geimao(saying: str):
draw_emojis=True,
)
return img
async def draw_geimao(saying: str):
return await asyncio.to_thread(_draw_geimao, saying)
@ -106,3 +108,18 @@ def _draw_cute_ten(saying: str):
async def draw_cute_ten(saying: str):
return await asyncio.to_thread(_draw_cute_ten, saying)
@make_async
def draw_kiosay(saying: str):
img = kio_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 450, 540, 0.5, 0.5, 900, 96, LXGWWENKAI_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img

View File

@ -0,0 +1,20 @@
import PIL
import PIL.Image
from konabot.common.path import ASSETS_PATH
from konabot.common.utils.to_async import make_async
doubao_watermark = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "doubao.png").convert("RGBA")
@make_async
def draw_doubao_watermark(base: PIL.Image.Image) -> PIL.Image.Image:
base = base.copy().convert("RGBA")
w = base.size[0] / 768 * 140
h = base.size[0] / 768 * 40
x = base.size[0] / 768 * 160
y = base.size[0] / 768 * 60
w, h, x, y = map(int, (w, h, x, y))
base.alpha_composite(doubao_watermark.resize((w, h)), (base.size[0] - x, base.size[1] - y))
return base

View File

@ -15,7 +15,7 @@ if not POLL_DATA_FILE.exists():
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
poll_list = json.loads(POLL_DATA_FILE.read_text("utf-8"))['poll']
async def createpoll(title,qqid,options):
polllength = len(poll_list)
@ -53,7 +53,7 @@ def writeback():
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
POLL_DATA_FILE.write_text(json.dumps({
'poll': poll_list,
}, ensure_ascii=False, sort_keys=True))
}, ensure_ascii=False, sort_keys=True), "utf-8")
async def pollvote(polnum,optionnum,qqnum):
optiond = poll_list[polnum]["polldata"]

View File

@ -59,14 +59,14 @@ def load_notify_config() -> NotifyConfigFile:
if not DATA_FILE_PATH.exists():
return NotifyConfigFile()
try:
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text())
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析 Notify 时遇到问题:{e}")
return NotifyConfigFile()
def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4), "utf-8")
@evt.handle()

196
poetry.lock generated
View File

@ -803,6 +803,23 @@ type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "distro"
version = "1.9.0"
description = "Distro - an OS platform information API"
optional = false
python-versions = ">=3.6"
groups = ["main"]
files = [
{file = "distro-1.9.0-py3-none-any.whl", hash = "sha256:7bffd925d65168f85027d8da9af6bddab658135b840670a223589bc0c8ef02b2"},
{file = "distro-1.9.0.tar.gz", hash = "sha256:2fa77c6fd8940f116ee1d6b94a2f90b13b5ea8d019b98bc8bafdcabcdd9bdbed"},
]
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "exceptiongroup"
version = "1.3.0"
@ -1333,6 +1350,123 @@ type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "jiter"
version = "0.11.1"
description = "Fast iterable JSON parser."
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "jiter-0.11.1-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:ed58841a491bbbf3f7c55a6b68fff568439ab73b2cce27ace0e169057b5851df"},
{file = "jiter-0.11.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:499beb9b2d7e51d61095a8de39ebcab1d1778f2a74085f8305a969f6cee9f3e4"},
{file = "jiter-0.11.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b87b2821795e28cc990939b68ce7a038edea680a24910bd68a79d54ff3f03c02"},
{file = "jiter-0.11.1-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:83f6fa494d8bba14ab100417c80e70d32d737e805cb85be2052d771c76fcd1f8"},
{file = "jiter-0.11.1-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:5fbc6aea1daa2ec6f5ed465f0c5e7b0607175062ceebbea5ca70dd5ddab58083"},
{file = "jiter-0.11.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:302288e2edc43174bb2db838e94688d724f9aad26c5fb9a74f7a5fb427452a6a"},
{file = "jiter-0.11.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:85db563fe3b367bb568af5d29dea4d4066d923b8e01f3417d25ebecd958de815"},
{file = "jiter-0.11.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:f1c1ba2b6b22f775444ef53bc2d5778396d3520abc7b2e1da8eb0c27cb3ffb10"},
{file = "jiter-0.11.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:523be464b14f8fd0cc78da6964b87b5515a056427a2579f9085ce30197a1b54a"},
{file = "jiter-0.11.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:25b99b3f04cd2a38fefb22e822e35eb203a2cd37d680dbbc0c0ba966918af336"},
{file = "jiter-0.11.1-cp310-cp310-win32.whl", hash = "sha256:47a79e90545a596bb9104109777894033347b11180d4751a216afef14072dbe7"},
{file = "jiter-0.11.1-cp310-cp310-win_amd64.whl", hash = "sha256:cace75621ae9bd66878bf69fbd4dfc1a28ef8661e0c2d0eb72d3d6f1268eddf5"},
{file = "jiter-0.11.1-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:9b0088ff3c374ce8ce0168523ec8e97122ebb788f950cf7bb8e39c7dc6a876a2"},
{file = "jiter-0.11.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:74433962dd3c3090655e02e461267095d6c84f0741c7827de11022ef8d7ff661"},
{file = "jiter-0.11.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6d98030e345e6546df2cc2c08309c502466c66c4747b043f1a0d415fada862b8"},
{file = "jiter-0.11.1-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:1d6db0b2e788db46bec2cf729a88b6dd36959af2abd9fa2312dfba5acdd96dcb"},
{file = "jiter-0.11.1-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:55678fbbda261eafe7289165dd2ddd0e922df5f9a1ae46d7c79a5a15242bd7d1"},
{file = "jiter-0.11.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6a6b74fae8e40497653b52ce6ca0f1b13457af769af6fb9c1113efc8b5b4d9be"},
{file = "jiter-0.11.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0a55a453f8b035eb4f7852a79a065d616b7971a17f5e37a9296b4b38d3b619e4"},
{file = "jiter-0.11.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:2638148099022e6bdb3f42904289cd2e403609356fb06eb36ddec2d50958bc29"},
{file = "jiter-0.11.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:252490567a5d990986f83b95a5f1ca1bf205ebd27b3e9e93bb7c2592380e29b9"},
{file = "jiter-0.11.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d431d52b0ca2436eea6195f0f48528202100c7deda354cb7aac0a302167594d5"},
{file = "jiter-0.11.1-cp311-cp311-win32.whl", hash = "sha256:db6f41e40f8bae20c86cb574b48c4fd9f28ee1c71cb044e9ec12e78ab757ba3a"},
{file = "jiter-0.11.1-cp311-cp311-win_amd64.whl", hash = "sha256:0cc407b8e6cdff01b06bb80f61225c8b090c3df108ebade5e0c3c10993735b19"},
{file = "jiter-0.11.1-cp311-cp311-win_arm64.whl", hash = "sha256:fe04ea475392a91896d1936367854d346724a1045a247e5d1c196410473b8869"},
{file = "jiter-0.11.1-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:c92148eec91052538ce6823dfca9525f5cfc8b622d7f07e9891a280f61b8c96c"},
{file = "jiter-0.11.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ecd4da91b5415f183a6be8f7158d127bdd9e6a3174138293c0d48d6ea2f2009d"},
{file = "jiter-0.11.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7e3ac25c00b9275684d47aa42febaa90a9958e19fd1726c4ecf755fbe5e553b"},
{file = "jiter-0.11.1-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:57d7305c0a841858f866cd459cd9303f73883fb5e097257f3d4a3920722c69d4"},
{file = "jiter-0.11.1-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e86fa10e117dce22c547f31dd6d2a9a222707d54853d8de4e9a2279d2c97f239"},
{file = "jiter-0.11.1-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:ae5ef1d48aec7e01ee8420155d901bb1d192998fa811a65ebb82c043ee186711"},
{file = "jiter-0.11.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eb68e7bf65c990531ad8715e57d50195daf7c8e6f1509e617b4e692af1108939"},
{file = "jiter-0.11.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:43b30c8154ded5845fa454ef954ee67bfccce629b2dea7d01f795b42bc2bda54"},
{file = "jiter-0.11.1-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:586cafbd9dd1f3ce6a22b4a085eaa6be578e47ba9b18e198d4333e598a91db2d"},
{file = "jiter-0.11.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:677cc2517d437a83bb30019fd4cf7cad74b465914c56ecac3440d597ac135250"},
{file = "jiter-0.11.1-cp312-cp312-win32.whl", hash = "sha256:fa992af648fcee2b850a3286a35f62bbbaeddbb6dbda19a00d8fbc846a947b6e"},
{file = "jiter-0.11.1-cp312-cp312-win_amd64.whl", hash = "sha256:88b5cae9fa51efeb3d4bd4e52bfd4c85ccc9cac44282e2a9640893a042ba4d87"},
{file = "jiter-0.11.1-cp312-cp312-win_arm64.whl", hash = "sha256:9a6cae1ab335551917f882f2c3c1efe7617b71b4c02381e4382a8fc80a02588c"},
{file = "jiter-0.11.1-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:71b6a920a5550f057d49d0e8bcc60945a8da998019e83f01adf110e226267663"},
{file = "jiter-0.11.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:0b3de72e925388453a5171be83379549300db01284f04d2a6f244d1d8de36f94"},
{file = "jiter-0.11.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cc19dd65a2bd3d9c044c5b4ebf657ca1e6003a97c0fc10f555aa4f7fb9821c00"},
{file = "jiter-0.11.1-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:d58faaa936743cd1464540562f60b7ce4fd927e695e8bc31b3da5b914baa9abd"},
{file = "jiter-0.11.1-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:902640c3103625317291cb73773413b4d71847cdf9383ba65528745ff89f1d14"},
{file = "jiter-0.11.1-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:30405f726e4c2ed487b176c09f8b877a957f535d60c1bf194abb8dadedb5836f"},
{file = "jiter-0.11.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3217f61728b0baadd2551844870f65219ac4a1285d5e1a4abddff3d51fdabe96"},
{file = "jiter-0.11.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:b1364cc90c03a8196f35f396f84029f12abe925415049204446db86598c8b72c"},
{file = "jiter-0.11.1-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:53a54bf8e873820ab186b2dca9f6c3303f00d65ae5e7b7d6bda1b95aa472d646"},
{file = "jiter-0.11.1-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:7e29aca023627b0e0c2392d4248f6414d566ff3974fa08ff2ac8dbb96dfee92a"},
{file = "jiter-0.11.1-cp313-cp313-win32.whl", hash = "sha256:f153e31d8bca11363751e875c0a70b3d25160ecbaee7b51e457f14498fb39d8b"},
{file = "jiter-0.11.1-cp313-cp313-win_amd64.whl", hash = "sha256:f773f84080b667c69c4ea0403fc67bb08b07e2b7ce1ef335dea5868451e60fed"},
{file = "jiter-0.11.1-cp313-cp313-win_arm64.whl", hash = "sha256:635ecd45c04e4c340d2187bcb1cea204c7cc9d32c1364d251564bf42e0e39c2d"},
{file = "jiter-0.11.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:d892b184da4d94d94ddb4031296931c74ec8b325513a541ebfd6dfb9ae89904b"},
{file = "jiter-0.11.1-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:aa22c223a3041dacb2fcd37c70dfd648b44662b4a48e242592f95bda5ab09d58"},
{file = "jiter-0.11.1-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:330e8e6a11ad4980cd66a0f4a3e0e2e0f646c911ce047014f984841924729789"},
{file = "jiter-0.11.1-cp313-cp313t-win_amd64.whl", hash = "sha256:09e2e386ebf298547ca3a3704b729471f7ec666c2906c5c26c1a915ea24741ec"},
{file = "jiter-0.11.1-cp313-cp313t-win_arm64.whl", hash = "sha256:fe4a431c291157e11cee7c34627990ea75e8d153894365a3bc84b7a959d23ca8"},
{file = "jiter-0.11.1-cp314-cp314-macosx_10_12_x86_64.whl", hash = "sha256:0fa1f70da7a8a9713ff8e5f75ec3f90c0c870be6d526aa95e7c906f6a1c8c676"},
{file = "jiter-0.11.1-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:569ee559e5046a42feb6828c55307cf20fe43308e3ae0d8e9e4f8d8634d99944"},
{file = "jiter-0.11.1-cp314-cp314-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f69955fa1d92e81987f092b233f0be49d4c937da107b7f7dcf56306f1d3fcce9"},
{file = "jiter-0.11.1-cp314-cp314-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:090f4c9d4a825e0fcbd0a2647c9a88a0f366b75654d982d95a9590745ff0c48d"},
{file = "jiter-0.11.1-cp314-cp314-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:bbf3d8cedf9e9d825233e0dcac28ff15c47b7c5512fdfe2e25fd5bbb6e6b0cee"},
{file = "jiter-0.11.1-cp314-cp314-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2aa9b1958f9c30d3d1a558b75f0626733c60eb9b7774a86b34d88060be1e67fe"},
{file = "jiter-0.11.1-cp314-cp314-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e42d1ca16590b768c5e7d723055acd2633908baacb3628dd430842e2e035aa90"},
{file = "jiter-0.11.1-cp314-cp314-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:5db4c2486a023820b701a17aec9c5a6173c5ba4393f26662f032f2de9c848b0f"},
{file = "jiter-0.11.1-cp314-cp314-musllinux_1_1_aarch64.whl", hash = "sha256:4573b78777ccfac954859a6eff45cbd9d281d80c8af049d0f1a3d9fc323d5c3a"},
{file = "jiter-0.11.1-cp314-cp314-musllinux_1_1_x86_64.whl", hash = "sha256:7593ac6f40831d7961cb67633c39b9fef6689a211d7919e958f45710504f52d3"},
{file = "jiter-0.11.1-cp314-cp314-win32.whl", hash = "sha256:87202ec6ff9626ff5f9351507def98fcf0df60e9a146308e8ab221432228f4ea"},
{file = "jiter-0.11.1-cp314-cp314-win_amd64.whl", hash = "sha256:a5dd268f6531a182c89d0dd9a3f8848e86e92dfff4201b77a18e6b98aa59798c"},
{file = "jiter-0.11.1-cp314-cp314-win_arm64.whl", hash = "sha256:5d761f863f912a44748a21b5c4979c04252588ded8d1d2760976d2e42cd8d991"},
{file = "jiter-0.11.1-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:2cc5a3965285ddc33e0cab933e96b640bc9ba5940cea27ebbbf6695e72d6511c"},
{file = "jiter-0.11.1-cp314-cp314t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b572b3636a784c2768b2342f36a23078c8d3aa6d8a30745398b1bab58a6f1a8"},
{file = "jiter-0.11.1-cp314-cp314t-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ad93e3d67a981f96596d65d2298fe8d1aa649deb5374a2fb6a434410ee11915e"},
{file = "jiter-0.11.1-cp314-cp314t-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a83097ce379e202dcc3fe3fc71a16d523d1ee9192c8e4e854158f96b3efe3f2f"},
{file = "jiter-0.11.1-cp314-cp314t-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7042c51e7fbeca65631eb0c332f90c0c082eab04334e7ccc28a8588e8e2804d9"},
{file = "jiter-0.11.1-cp314-cp314t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0a68d679c0e47649a61df591660507608adc2652442de7ec8276538ac46abe08"},
{file = "jiter-0.11.1-cp314-cp314t-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:a1b0da75dbf4b6ec0b3c9e604d1ee8beaf15bc046fff7180f7d89e3cdbd3bb51"},
{file = "jiter-0.11.1-cp314-cp314t-musllinux_1_1_aarch64.whl", hash = "sha256:69dd514bf0fa31c62147d6002e5ca2b3e7ef5894f5ac6f0a19752385f4e89437"},
{file = "jiter-0.11.1-cp314-cp314t-musllinux_1_1_x86_64.whl", hash = "sha256:bb31ac0b339efa24c0ca606febd8b77ef11c58d09af1b5f2be4c99e907b11111"},
{file = "jiter-0.11.1-cp314-cp314t-win32.whl", hash = "sha256:b2ce0d6156a1d3ad41da3eec63b17e03e296b78b0e0da660876fccfada86d2f7"},
{file = "jiter-0.11.1-cp314-cp314t-win_amd64.whl", hash = "sha256:f4db07d127b54c4a2d43b4cf05ff0193e4f73e0dd90c74037e16df0b29f666e1"},
{file = "jiter-0.11.1-cp314-cp314t-win_arm64.whl", hash = "sha256:28e4fdf2d7ebfc935523e50d1efa3970043cfaa161674fe66f9642409d001dfe"},
{file = "jiter-0.11.1-cp39-cp39-macosx_10_12_x86_64.whl", hash = "sha256:baa99c8db49467527658bb479857344daf0a14dff909b7f6714579ac439d1253"},
{file = "jiter-0.11.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:860fe55fa3b01ad0edf2adde1098247ff5c303d0121f9ce028c03d4f88c69502"},
{file = "jiter-0.11.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:173dd349d99b6feaf5a25a6fbcaf3489a6f947708d808240587a23df711c67db"},
{file = "jiter-0.11.1-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:14ac1dca837514cc946a6ac2c4995d9695303ecc754af70a3163d057d1a444ab"},
{file = "jiter-0.11.1-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:69af47de5f93a231d5b85f7372d3284a5be8edb4cc758f006ec5a1406965ac5e"},
{file = "jiter-0.11.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:685f8b3abd3bbd3e06e4dfe2429ff87fd5d7a782701151af99b1fcbd80e31b2b"},
{file = "jiter-0.11.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6d04afa2d4e5526e54ae8a58feea953b1844bf6e3526bc589f9de68e86d0ea01"},
{file = "jiter-0.11.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:1e92b927259035b50d8e11a8fdfe0ebd014d883e4552d37881643fa289a4bcf1"},
{file = "jiter-0.11.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:e7bd8be4fad8d4c5558b7801770cd2da6c072919c6f247cc5336edb143f25304"},
{file = "jiter-0.11.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:121381a77a3c85987f3eba0d30ceaca9116f7463bedeec2fa79b2e7286b89b60"},
{file = "jiter-0.11.1-cp39-cp39-win32.whl", hash = "sha256:160225407f6dfabdf9be1b44e22f06bc293a78a28ffa4347054698bd712dad06"},
{file = "jiter-0.11.1-cp39-cp39-win_amd64.whl", hash = "sha256:028e0d59bcdfa1079f8df886cdaefc6f515c27a5288dec956999260c7e4a7cfd"},
{file = "jiter-0.11.1-graalpy311-graalpy242_311_native-macosx_10_12_x86_64.whl", hash = "sha256:e642b5270e61dd02265866398707f90e365b5db2eb65a4f30c789d826682e1f6"},
{file = "jiter-0.11.1-graalpy311-graalpy242_311_native-macosx_11_0_arm64.whl", hash = "sha256:464ba6d000585e4e2fd1e891f31f1231f497273414f5019e27c00a4b8f7a24ad"},
{file = "jiter-0.11.1-graalpy311-graalpy242_311_native-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:055568693ab35e0bf3a171b03bb40b2dcb10352359e0ab9b5ed0da2bf1eb6f6f"},
{file = "jiter-0.11.1-graalpy311-graalpy242_311_native-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e0c69ea798d08a915ba4478113efa9e694971e410056392f4526d796f136d3fa"},
{file = "jiter-0.11.1-graalpy312-graalpy250_312_native-macosx_10_12_x86_64.whl", hash = "sha256:0d4d6993edc83cf75e8c6828a8d6ce40a09ee87e38c7bfba6924f39e1337e21d"},
{file = "jiter-0.11.1-graalpy312-graalpy250_312_native-macosx_11_0_arm64.whl", hash = "sha256:f78d151c83a87a6cf5461d5ee55bc730dd9ae227377ac6f115b922989b95f838"},
{file = "jiter-0.11.1-graalpy312-graalpy250_312_native-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c9022974781155cd5521d5cb10997a03ee5e31e8454c9d999dcdccd253f2353f"},
{file = "jiter-0.11.1-graalpy312-graalpy250_312_native-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:18c77aaa9117510d5bdc6a946baf21b1f0cfa58ef04d31c8d016f206f2118960"},
{file = "jiter-0.11.1.tar.gz", hash = "sha256:849dcfc76481c0ea0099391235b7ca97d7279e0fa4c86005457ac7c88e8b76dc"},
]
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "linkify-it-py"
version = "2.0.3"
@ -2200,6 +2334,39 @@ type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "openai"
version = "2.7.1"
description = "The official Python library for the openai API"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "openai-2.7.1-py3-none-any.whl", hash = "sha256:2f2530354d94c59c614645a4662b9dab0a5b881c5cd767a8587398feac0c9021"},
{file = "openai-2.7.1.tar.gz", hash = "sha256:df4d4a3622b2df3475ead8eb0fbb3c27fd1c070fa2e55d778ca4f40e0186c726"},
]
[package.dependencies]
anyio = ">=3.5.0,<5"
distro = ">=1.7.0,<2"
httpx = ">=0.23.0,<1"
jiter = ">=0.10.0,<1"
pydantic = ">=1.9.0,<3"
sniffio = "*"
tqdm = ">4"
typing-extensions = ">=4.11,<5"
[package.extras]
aiohttp = ["aiohttp", "httpx-aiohttp (>=0.1.9)"]
datalib = ["numpy (>=1)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)"]
realtime = ["websockets (>=13,<16)"]
voice-helpers = ["numpy (>=2.0.2)", "sounddevice (>=0.5.1)"]
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "opencc"
version = "1.1.9"
@ -3387,6 +3554,33 @@ type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "tqdm"
version = "4.67.1"
description = "Fast, Extensible Progress Meter"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "tqdm-4.67.1-py3-none-any.whl", hash = "sha256:26445eca388f82e72884e0d580d5464cd801a3ea01e63e5601bdff9ba6a48de2"},
{file = "tqdm-4.67.1.tar.gz", hash = "sha256:f8aef9c52c08c13a65f30ea34f4e5aac3fd1a34959879d7e59e63027286627f2"},
]
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[package.extras]
dev = ["nbval", "pytest (>=6)", "pytest-asyncio (>=0.24)", "pytest-cov", "pytest-timeout"]
discord = ["requests"]
notebook = ["ipywidgets (>=6)"]
slack = ["slack-sdk"]
telegram = ["requests"]
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "typing-extensions"
version = "4.15.0"
@ -3978,4 +4172,4 @@ reference = "mirrors"
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<4.0"
content-hash = "ec73430f70658a303c47e6f536ccb0863a475f7f25d5334c8766e6149075648c"
content-hash = "dcb6567ccb9eb6357179dd8b8eaa5fb69373cef0e17d3a49c7c895d289c0d642"

View File

@ -27,6 +27,7 @@ dependencies = [
"nanoid (>=2.0.0,<3.0.0)",
"opencc (>=1.1.9,<2.0.0)",
"playwright (>=1.55.0,<2.0.0)",
"openai (>=2.7.1,<3.0.0)",
]
[build-system]

13
scripts/watch_filter.py Normal file
View File

@ -0,0 +1,13 @@
from pathlib import Path
from watchfiles import Change
base = Path(__file__).parent.parent.absolute()
def filter(change: Change, path: str) -> bool:
if "__pycache__" in path:
return False
if Path(path).absolute().is_relative_to(base / "data"):
return False
return True