Compare commits

...

19 Commits

Author SHA1 Message Date
d37c4870d8 Merge branch 'master' into feature/sugar 2026-03-18 17:38:59 +08:00
23b9f101b3 语法糖 2026-03-18 17:29:42 +08:00
8c1651ad3d 忘记 await 相关权限了,导致永远判 True
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-18 16:29:36 +08:00
ff60642c62 Merge pull request 'feat: add TRPG roll command' (#59) from pi-agent/konabot:feat/trpg-roll into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #59
Reviewed-by: 钟晓帕 <Passthem183@gmail.com>
2026-03-14 02:19:15 +08:00
69b5908445 refactor: narrow trpg roll message matching 2026-03-14 02:17:20 +08:00
a542ed1fd9 feat: add TRPG roll command 2026-03-14 02:02:41 +08:00
e86a385448 Merge pull request 'fix: parse fx resize y scale argument' (#58) from pi-agent/konabot:fix/fx-resize-arg-parsing into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #58
Reviewed-by: 钟晓帕 <Passthem183@gmail.com>
2026-03-14 01:27:40 +08:00
d4bb36a074 fix: parse fx resize y scale argument 2026-03-14 01:26:16 +08:00
1a2a3c0468 Merge pull request 'fix: correct fx resize behavior' (#57) from pi-agent/konabot:fix/fx-resize-behavior into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #57
Reviewed-by: 钟晓帕 <Passthem183@gmail.com>
2026-03-14 01:11:44 +08:00
67502cb932 fix: correct fx resize behavior 2026-03-14 01:07:24 +08:00
f9a312b80a Merge pull request 'feat: add JPEG damage filter to fx' (#56) from pi-agent/konabot:feat/fx-jpeg-damage into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #56
2026-03-14 01:01:38 +08:00
1980f8a895 feat: add jpeg damage filter to fx 2026-03-14 00:52:34 +08:00
d273ed4b1a 放宽 wolfx api 限制
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-12 20:33:03 +08:00
265e9cc583 改为使用中国地震台网的正式报
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-10 21:58:42 +08:00
8f5061ba41 wolfx api
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-10 11:39:41 +08:00
b3c3c77f3c 添加 Ignore 2026-03-10 11:16:23 +08:00
6a84ce2cd8 提供订阅模块的文档
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-09 14:56:31 +08:00
392c699b33 移动 poster 模块到 common
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-09 14:40:27 +08:00
72e21cd9aa 添加多字符喵对一些符号的响应 2026-03-09 13:46:56 +08:00
37 changed files with 1457 additions and 135 deletions

10
.gitignore vendored
View File

@ -3,9 +3,14 @@
/data
/pyrightconfig.json
/pyrightconfig.toml
/uv.lock
# 缓存文件
__pycache__
/.ruff_cache
/.pytest_cache
/.mypy_cache
/.black_cache
# 可能会偶然生成的 diff 文件
/*.diff
@ -14,3 +19,8 @@ __pycache__
/.coverage
/.coverage.db
/htmlcov
# 对手动创建虚拟环境的人
/.venv
/venv
*.egg-info

View File

@ -1,3 +1,5 @@
{
"python.REPL.enableREPLSmartSend": false
"python.REPL.enableREPLSmartSend": false,
"python-envs.defaultEnvManager": "ms-python.python:poetry",
"python-envs.defaultPackageManager": "ms-python.python:poetry"
}

5
bot.py
View File

@ -7,6 +7,7 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
from konabot.common.appcontext import run_afterinit_functions
from konabot.common.log import init_logger
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.path import LOG_PATH
@ -56,9 +57,7 @@ def main():
nonebot.load_plugins("konabot/plugins")
nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
from konabot.common import permsys
permsys.create_startup()
run_afterinit_functions()
# 注册关闭钩子
@driver.on_shutdown

View File

@ -16,6 +16,7 @@
- `konabot/common/permsys/__init__.py`
- 暴露 `PermManager``DepPermManager``require_permission`
- 负责数据库初始化、启动迁移、超级管理员默认授权
- 提供 `register_default_allow_permission()` 用于注册“启动时默认放行”的权限键
- `konabot/common/permsys/entity.py`
- 定义 `PermEntity`
- 将事件转换为可查询的实体链
@ -134,6 +135,14 @@ PermEntity("ob11", "user", str(account)), "*", True
也就是说,配置中的超级管理员会直接拥有全部权限。
此外,模块也支持插件在导入阶段通过 `register_default_allow_permission("some.key")` 注册默认放行的权限键;这些键会在启动时被写入到:
```python
PermEntity("sys", "global", "global"), "some.key", True
```
这适合“默认所有人可用,但仍希望后续能被权限系统单独关闭”的功能。
这属于启动时自动灌入的保底策略,不依赖手工授权命令。
## 在插件中使用

37
docs/subscribe.md Normal file
View File

@ -0,0 +1,37 @@
# subscribe 模块
一套统一的接口,让用户可以订阅一些延迟或者定时消息。
```python
import asyncio
from pathlib import Path
from konabot.common.subscribe import register_poster_info, broadcast, PosterInfo
from nonebot_plugin_alconna import UniMessage
# 注册了服务信息,用户可以用「查询可用订阅」指令了解可用的订阅清单。
# 用户可以使用「订阅 某某服务通知」或者「订阅 某某服务」来订阅消息。
# 如果用户在群聊发起订阅,则会在 QQ 群订阅,不然会在私聊订阅
register_poster_info("某某服务通知", PosterInfo(
aliases={"某某服务"},
description="告诉你关于某某的最新资讯等信息",
))
async def main():
while True:
# 这里的服务 channel 名字必须填写该服务的名字,不可以是 alias
# 这会给所有订阅了该通道的用户发送「向大家发送纯文本通知」
await broadcast("某某服务通知", "向大家发送纯文本通知")
# 也可以发送 UniMessage 对象,可以构造包含图片的通知等
data = Path('image.png').read_bytes()
await broadcast(
"某某服务通知",
UniMessage.text("很遗憾告诉大家,我们倒闭了:").image(raw=data),
)
await asyncio.sleep(114.514)
```
该模块的代码请查阅 `/konabot/common/subscribe/` 下的文件。

View File

@ -0,0 +1,281 @@
"""
Wolfx 防灾免费 API
"""
import asyncio
import json
from typing import Literal, TypeVar, cast
import aiohttp
from aiosignal import Signal
from loguru import logger
from pydantic import BaseModel, RootModel
import pydantic
from konabot.common.appcontext import after_init
class ScEewReport(BaseModel):
"""
四川地震局报文
"""
ID: str
"EEW 发报 ID"
EventID: str
"EEW 发报事件 ID"
ReportTime: str
"EEW 发报时间(UTC+8)"
ReportNum: int
"EEW 发报数"
OriginTime: str
"发震时间(UTC+8)"
HypoCenter: str
"震源地"
Latitude: float
"震源地纬度"
Longitude: float
"震源地经度"
Magnitude: float
"震级"
Depth: float | None
"震源深度"
MaxIntensity: float
"最大烈度"
class CencEewReport(BaseModel):
"""
中国地震台网报文
"""
ID: str
"EEW 发报 ID"
EventID: str
"EEW 发报事件 ID"
ReportTime: str
"EEW 发报时间(UTC+8)"
ReportNum: int
"EEW 发报数"
OriginTime: str
"发震时间(UTC+8)"
HypoCenter: str
"震源地"
Latitude: float
"震源地纬度"
Longitude: float
"震源地经度"
Magnitude: float
"震级"
Depth: float | None
"震源深度"
MaxIntensity: float
"最大烈度"
class CencEqReport(BaseModel):
type: str
"报告类型"
EventID: str
"事件 ID"
time: str
"UTC+8 格式的地震发生时间"
location: str
"地震发生位置"
magnitude: str
"震级"
depth: str
"地震深度"
latitude: str
"纬度"
longtitude: str
"经度"
intensity: str
"烈度"
class CencEqlist(RootModel):
root: dict[str, CencEqReport]
class WolfxWebSocket:
def __init__(self, url: str) -> None:
self.url = url
self.signal: Signal[bytes] = Signal(self)
self._running = False
self._task: asyncio.Task | None = None
self._session: aiohttp.ClientSession | None = None
self._ws: aiohttp.ClientWebSocketResponse | None = None
@property
def session(self) -> aiohttp.ClientSession: # pragma: no cover
assert self._session is not None
return self._session
async def start(self): # pragma: no cover
if self._running:
return
self._running = True
self._session = aiohttp.ClientSession()
self._task = asyncio.create_task(self._run())
self.signal.freeze()
async def stop(self): # pragma: no cover
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
if self._session:
await self._session.close()
async def _run(self): # pragma: no cover
retry_delay = 1
while self._running:
try:
async with self.session.ws_connect(self.url) as ws:
self._ws = ws
logger.info(f"Wolfx API 服务连接上了 {self.url} 的 WebSocket")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self.handle(cast(str, msg.data).encode())
elif msg.type == aiohttp.WSMsgType.BINARY:
await self.handle(cast(bytes, msg.data))
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logger.warning("连接 WebSocket 时发生错误")
logger.exception(e)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("Wolfx API 发生未知错误")
logger.exception(e)
self._ws = None
if self._running:
logger.info(f"Wolfx API 准备断线重连 {self.url}")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60)
async def handle(self, data: bytes):
try:
obj = json.loads(data)
except json.JSONDecodeError as e:
logger.warning("解析 Wolfs API 时出错")
logger.exception(e)
return
if obj.get("type") == "heartbeat" or obj.get("type") == "pong":
logger.debug(f"Wolfx API 收到了来自 {self.url} 的心跳: {obj}")
else:
await self.signal.send(data)
T = TypeVar("T", bound=BaseModel)
class WolfxAPIService:
sc_eew: Signal[ScEewReport]
"四川地震局地震速报"
cenc_eew: Signal[CencEewReport]
"中国地震台网地震速报"
cenc_eqlist: Signal[CencEqReport]
"中国地震台网地震信息发布"
def __init__(self) -> None:
self.sc_eew = Signal(self)
self._sc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/sc_eew")
WolfxAPIService.bind(self.sc_eew, self._sc_eew_ws, ScEewReport)
self.cenc_eew = Signal(self)
self._cenc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eew")
WolfxAPIService.bind(self.cenc_eew, self._cenc_eew_ws, CencEewReport)
self.cenc_eqlist = Signal(self)
self._cenc_eqlist_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eqlist")
WolfxAPIService.bind(self.cenc_eqlist, self._cenc_eqlist_ws, CencEqReport)
@staticmethod
def bind(signal: Signal[T], ws: WolfxWebSocket, t: type[T]):
@ws.signal.append
async def _(data: bytes):
try:
obj = t.model_validate_json(data)
logger.info(f"接收到来自 Wolfx API 的信息:{data}")
await signal.send(obj)
except pydantic.ValidationError as e:
logger.warning(f"解析 Wolfx API 时出错 URL={ws.url}")
logger.error(e)
async def start(self): # pragma: no cover
self.cenc_eew.freeze()
self.sc_eew.freeze()
self.cenc_eqlist.freeze()
async with asyncio.TaskGroup() as task_group:
if len(self.cenc_eew) > 0:
task_group.create_task(self._cenc_eew_ws.start())
if len(self.sc_eew) > 0:
task_group.create_task(self._sc_eew_ws.start())
if len(self.cenc_eqlist) > 0:
task_group.create_task(self._cenc_eqlist_ws.start())
async def stop(self): # pragma: no cover
async with asyncio.TaskGroup() as task_group:
task_group.create_task(self._cenc_eew_ws.stop())
task_group.create_task(self._sc_eew_ws.stop())
task_group.create_task(self._cenc_eqlist_ws.stop())
wolfx_api = WolfxAPIService()
@after_init
def init(): # pragma: no cover
import nonebot
driver = nonebot.get_driver()
@driver.on_startup
async def _():
await wolfx_api.start()
@driver.on_shutdown
async def _():
await wolfx_api.stop()

View File

@ -0,0 +1,15 @@
from typing import Any, Callable
AFTER_INIT_FUNCTION = Callable[[], Any]
_after_init_functions: list[AFTER_INIT_FUNCTION] = []
def after_init(func: AFTER_INIT_FUNCTION):
_after_init_functions.append(func)
def run_afterinit_functions(): # pragma: no cover
for f in _after_init_functions:
f()

View File

@ -4,6 +4,7 @@ from nonebot.adapters import Event
from nonebot.params import Depends
from nonebot.rule import Rule
from konabot.common.appcontext import after_init
from konabot.common.database import DatabaseManager
from konabot.common.pager import PagerQuery
from konabot.common.path import DATA_PATH
@ -13,6 +14,7 @@ from konabot.common.permsys.repo import PermRepo
db = DatabaseManager(DATA_PATH / "perm.sqlite3")
_default_allow_permissions: set[str] = set()
_EntityLike = Event | PermEntity | list[PermEntity]
@ -73,6 +75,7 @@ def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma:
return PermManager(_db)
@after_init
def create_startup(): # pragma: no cover
from konabot.common.nb.is_admin import cfg
@ -89,6 +92,10 @@ def create_startup(): # pragma: no cover
await pm.update_permission(
PermEntity("ob11", "user", str(account)), "*", True
)
for key in _default_allow_permissions:
await pm.update_permission(
PermEntity("sys", "global", "global"), key, True
)
@driver.on_shutdown
async def _():
@ -101,6 +108,10 @@ def create_startup(): # pragma: no cover
DepPermManager = Annotated[PermManager, Depends(perm_manager)]
def register_default_allow_permission(key: str):
_default_allow_permissions.add(key)
def require_permission(perm: str) -> Rule: # pragma: no cover
async def check_permission(event: Event, pm: DepPermManager) -> bool:
return await pm.check_has_permission(event, perm)

View File

@ -0,0 +1,11 @@
"""
Subscribe 模块,用于向一些订阅的频道广播消息
"""
from .service import broadcast as broadcast
from .service import dep_poster_service as dep_poster_service
from .service import DepPosterService as DepPosterService
from .service import PosterService as PosterService
from .subscribe_info import PosterInfo as PosterInfo
from .subscribe_info import POSTER_INFO_DATA as POSTER_INFO_DATA
from .subscribe_info import register_poster_info as register_poster_info

View File

@ -6,7 +6,8 @@ from pydantic import BaseModel, ValidationError
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
from konabot.common.path import DATA_PATH
from konabot.plugins.poster.repository import IPosterRepo
from .repository import IPosterRepo
class ChannelData(BaseModel):
@ -18,9 +19,9 @@ class PosterData(BaseModel):
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
if (target1.is_private_chat and not target2.is_private_chat):
if target1.is_private_chat and not target2.is_private_chat:
return False
if (target2.is_private_chat and not target1.is_private_chat):
if target2.is_private_chat and not target1.is_private_chat:
return False
if target1.platform != target2.platform:
return False
@ -58,7 +59,9 @@ class LocalPosterRepo(IPosterRepo):
len1 = len(self.data.channels[channel].targets)
return len0 != len1
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
async def get_subscribed_channels(
self, target: LongTaskTarget, pager: PagerQuery
) -> PagerResult[str]:
channels: list[str] = []
for channel_id, channel in self.data.channels.items():
for t in channel.targets:
@ -95,7 +98,9 @@ async def local_poster_data():
data = PosterData()
else:
try:
data = PosterData.model_validate_json(LOCAL_POSTER_DATA_PATH.read_text())
data = PosterData.model_validate_json(
LOCAL_POSTER_DATA_PATH.read_text()
)
except ValidationError:
data = PosterData()
yield data
@ -109,4 +114,3 @@ async def local_poster():
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]

View File

@ -4,9 +4,10 @@ from nonebot.params import Depends
from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.repo_local_data import local_poster
from konabot.plugins.poster.repository import IPosterRepo
from .subscribe_info import POSTER_INFO_DATA
from .repo_local_data import local_poster
from .repository import IPosterRepo
class PosterService:
@ -27,7 +28,9 @@ class PosterService:
channel = self.parse_channel_id(channel)
return await self.repo.remove_channel_target(channel, target)
async def broadcast(self, channel: str, message: UniMessage[Any] | str) -> list[LongTaskTarget]:
async def broadcast(
self, channel: str, message: UniMessage[Any] | str
) -> list[LongTaskTarget]:
channel = self.parse_channel_id(channel)
targets = await self.repo.get_channel_targets(channel)
for target in targets:
@ -35,7 +38,9 @@ class PosterService:
await target.send_message(message, at=False)
return targets
async def get_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
async def get_channels(
self, target: LongTaskTarget, pager: PagerQuery
) -> PagerResult[str]:
return await self.repo.get_subscribed_channels(target, pager)
async def fix_data(self):
@ -56,4 +61,3 @@ async def broadcast(channel: str, message: UniMessage[Any] | str):
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]

View File

@ -4,7 +4,7 @@ from dataclasses import dataclass, field
@dataclass
class PosterInfo:
aliases: set[str] = field(default_factory=set)
description: str = field(default='')
description: str = field(default="")
POSTER_INFO_DATA: dict[str, PosterInfo] = {}
@ -12,4 +12,3 @@ POSTER_INFO_DATA: dict[str, PosterInfo] = {}
def register_poster_info(channel: str, info: PosterInfo):
POSTER_INFO_DATA[channel] = info

View File

@ -76,6 +76,8 @@ fx [滤镜名称] <参数1> <参数2> ...
* ```fx 设置遮罩```
* ```fx 色键 <目标颜色="rgb(255,0,0)"> <容差=60>```
* ```fx 晃动 <最大偏移量=5> <运动模糊=False>```
* ```fx JPEG损坏 <质量=10>```
* 质量范围建议为 1~95数值越低压缩痕迹越重、效果越搞笑。
* ```fx 动图 <帧率=10>```
### 多图像处理器

View File

@ -0,0 +1,53 @@
**roll** - 面向跑团的文本骰子指令
## 用法
`roll 表达式`
支持常见骰子写法:
- `roll 3d6`
- `roll d20+5`
- `roll 2d8+1d4+3`
- `roll d%`
- `roll 4dF`
## 说明
- `NdM` 表示掷 N 个 M 面骰,例如 `3d6`
- `d20` 等价于 `1d20`
- `d%` 表示百分骰,范围 1 到 100
- `dF` 表示 Fate/Fudge 骰,单骰结果为 -1、0、+1
- 支持用 `+`、`-` 连接多个项,也支持常数修正
## 返回格式
会返回总结果,以及每一项的明细。
例如:
- `roll 3d6`
可能返回:
- `3d6 = 11`
- `+3d6=[2, 4, 5]`
- `roll d20+5`
可能返回:
- `d20+5 = 19`
- `+1d20=[14] +5=5`
## 限制
为防止刷屏和滥用,当前实现会限制:
- 单项最多 100 个骰子
- 单个骰子最多 1000 面
- 一次表达式最多 20 项
- 一次表达式最多实际掷 200 个骰子
- 结果过长时会直接拒绝
## 权限
需要 `trpg.roll` 权限。
默认启动时会给系统全局授予允许,因此通常所有人都能用;如有需要可再用权限系统单独关闭。

View File

@ -1,4 +1,5 @@
import random
from io import BytesIO
from PIL import Image, ImageFilter, ImageDraw, ImageStat, ImageFont
from PIL import ImageEnhance
from PIL import ImageChops
@ -167,26 +168,53 @@ class ImageFilterImplement:
return Image.fromarray(result, 'RGBA')
# JPEG 损坏感压缩
@staticmethod
def apply_jpeg_damage(image: Image.Image, quality: int = 10) -> Image.Image:
quality = max(1, min(95, int(quality)))
alpha = None
if image.mode in ('RGBA', 'LA') or (image.mode == 'P' and 'transparency' in image.info):
rgba_image = image.convert('RGBA')
alpha = rgba_image.getchannel('A')
rgb_image = Image.new('RGB', rgba_image.size, (255, 255, 255))
rgb_image.paste(rgba_image, mask=alpha)
else:
rgb_image = image.convert('RGB')
output = BytesIO()
rgb_image.save(output, format='JPEG', quality=quality, optimize=False)
output.seek(0)
damaged = Image.open(output).convert('RGB')
if alpha is not None:
return Image.merge('RGBA', (*damaged.split(), alpha))
return damaged.convert('RGBA')
# 缩放
@staticmethod
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y = None) -> Image.Image:
# scale 可以为负
# 如果 scale 为负,则代表翻转
if scale_y is not None:
if float(scale_y) < 0:
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y: float = None) -> Image.Image:
scale_x = float(scale)
scale_y_value = float(scale_y) if scale_y is not None else None
if scale_y_value is not None:
if scale_y_value < 0:
image = ImageOps.flip(image)
scale_y = abs(float(scale_y))
if scale < 0:
scale_y_value = abs(scale_y_value)
if scale_x < 0:
image = ImageOps.mirror(image)
scale = abs(scale)
new_size = (int(image.width * scale), int(image.height * float(scale_y)))
return image.resize(new_size, Image.Resampling.LANCZOS)
if scale < 0:
image = ImageOps.mirror(image)
image = ImageOps.flip(image)
scale = abs(scale)
new_size = (int(image.width * scale), int(image.height * scale))
return image.resize(new_size, Image.Resampling.LANCZOS)
scale_x = abs(scale_x)
target_scale_y = scale_y_value
else:
if scale_x < 0:
image = ImageOps.mirror(image)
image = ImageOps.flip(image)
scale_x = abs(scale_x)
target_scale_y = scale_x
new_width = max(1, round(image.width * scale_x))
new_height = max(1, round(image.height * target_scale_y))
return image.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 波纹
@staticmethod

View File

@ -50,6 +50,7 @@ class ImageFilterManager:
"描边": ImageFilterImplement.apply_stroke,
"形状描边": ImageFilterImplement.apply_shape_stroke,
"半调": ImageFilterImplement.apply_halftone,
"JPEG损坏": ImageFilterImplement.apply_jpeg_damage,
"设置通道": ImageFilterImplement.apply_set_channel,
"设置遮罩": ImageFilterImplement.apply_set_mask,
# 图像处理

View File

@ -15,10 +15,12 @@ class THQwen(TextHandler):
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
pm = perm_manager()
if env.event is None or not pm.check_has_permission(env.event, "textfx.qwen"):
if env.event is None or not await pm.check_has_permission(
env.event, "textfx.qwen"
):
return TextHandleResult(
code=1,
ostream="这里暂未开启 AI 功能",
ostream="你或当前环境没有使用 qwen 的权限。如有疑问请联系管理员",
)
llm = get_llm()

View File

@ -6,29 +6,34 @@ from loguru import logger
from nonebot import on_message
import nonebot
from nonebot.rule import to_me
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
on_alconna)
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
from nonebot_plugin_apscheduler import scheduler
from konabot.common import username
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.message import (
get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message,
)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE,
create_admin_commands,
puzzle_manager)
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
from konabot.plugins.poster.service import broadcast
from konabot.plugins.kona_ph.manager import (
PUZZLE_PAGE_SIZE,
create_admin_commands,
puzzle_manager,
)
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
create_admin_commands()
register_poster_info("每日谜题", info=PosterInfo(
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
description="此方 BOT 每日谜题推送",
))
register_poster_info(
"每日谜题",
info=PosterInfo(
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
description="此方 BOT 每日谜题推送",
),
)
cmd_submit = on_message(rule=to_me())
@ -44,16 +49,22 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
if isinstance(result, str):
await target.send_message(result)
else:
await target.send_message(get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
))
await target.send_message(
get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
)
)
cmd_query = on_alconna(Alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
), rule=to_me())
cmd_query = on_alconna(
Alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
),
rule=to_me(),
)
@cmd_query.handle()
async def _(target: DepLongTaskTarget):
@ -64,9 +75,8 @@ async def _(target: DepLongTaskTarget):
await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna(
"今日答题情况"
), rule=to_me())
cmd_query_submission = on_alconna(Alconna("今日答题情况"), rule=to_me())
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
@ -77,11 +87,15 @@ async def _(target: DepLongTaskTarget):
await target.send_message(get_daily_report_v2(manager, gid))
cmd_history = on_alconna(Alconna(
"re:历史(题目|谜题)",
Args["page?", int],
Args["index_id?", str],
), rule=to_me())
cmd_history = on_alconna(
Alconna(
"re:历史(题目|谜题)",
Args["page?", int],
Args["index_id?", str],
),
rule=to_me(),
)
@cmd_history.handle()
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@ -105,10 +119,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
return await target.send_message(
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
for p, d in puzzles:
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
msg = msg.text(
@ -120,22 +134,26 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
await target.send_message(msg)
cmd_leadboard = on_alconna(Alconna(
"re:此方(解谜|谜题)排行榜",
Args["page?", int],
))
cmd_leadboard = on_alconna(
Alconna(
"re:此方(解谜|谜题)排行榜",
Args["page?", int],
)
)
@cmd_leadboard.handle()
async def _(target: DepLongTaskTarget, page: int = 1):
async with puzzle_manager() as manager:
result = manager.get_leadboard(PagerQuery(page, 10))
await target.send_message(result.to_unimessage(
title="此方解谜排行榜",
formatter=lambda data: (
f"{data[1]} 已完成 | "
f"{username.get_username(data[0])}"
await target.send_message(
result.to_unimessage(
title="此方解谜排行榜",
formatter=lambda data: (
f"{data[1]} 已完成 | {username.get_username(data[0])}"
),
)
))
)
@scheduler.scheduled_job("cron", hour="8")
@ -155,4 +173,3 @@ async def _():
driver = nonebot.get_driver()

View File

@ -1,7 +1,6 @@
import datetime
from math import ceil
from nonebot import get_plugin_config
from nonebot.adapters import Event
from nonebot_plugin_alconna import (
Alconna,
@ -14,7 +13,6 @@ from nonebot_plugin_alconna import (
UniMessage,
on_alconna,
)
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
@ -35,20 +33,11 @@ from konabot.plugins.kona_ph.core.storage import (
get_today_date,
puzzle_manager,
)
from konabot.plugins.poster.service import broadcast
from konabot.common.subscribe import broadcast
PUZZLE_PAGE_SIZE = 10
class PuzzleConfig(BaseModel):
plugin_puzzle_manager: list[str] = []
plugin_puzzle_admin: list[str] = []
plugin_puzzle_playgroup: list[str] = []
config = get_plugin_config(PuzzleConfig)
async def check_puzzle(
manager: PuzzleManager,
perm: DepPermManager,

View File

@ -6,6 +6,7 @@ from konabot.common.nb.match_keyword import match_keyword
evt_nya = on_message(rule=match_keyword(""))
@evt_nya.handle()
async def _():
await evt_nya.send(await UniMessage().text("").export())
@ -25,8 +26,9 @@ NYA_SYMBOL_MAPPING = {
"~": "~",
"": "",
" ": " ",
"\n": "\n",
}
NYA_SYMBOL_KEEP = "—¹₁²₂³₃⁴₄⁵₅⁶₆⁷₇⁸₈⁹₉⁰₀\n"
NYA_SYMBOL_MAPPING.update((k, k) for k in NYA_SYMBOL_KEEP)
async def has_nya(msg: UniMsg) -> bool:
@ -49,10 +51,10 @@ async def has_nya(msg: UniMsg) -> bool:
evt_nya_v2 = on_message(rule=has_nya)
@evt_nya_v2.handle()
async def _(msg: UniMsg, evt: Event):
text = msg.extract_plain_text()
await UniMessage.text(''.join(
(NYA_SYMBOL_MAPPING.get(c, '') for c in text)
)).send(evt)
await UniMessage.text("".join((NYA_SYMBOL_MAPPING.get(c, "") for c in text))).send(
evt
)

View File

@ -3,14 +3,15 @@ from nonebot_plugin_alconna import Alconna, Args, on_alconna
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.service import dep_poster_service
from konabot.common.subscribe import POSTER_INFO_DATA, dep_poster_service
cmd_subscribe = on_alconna(Alconna(
"订阅",
Args["channel", str],
))
cmd_subscribe = on_alconna(
Alconna(
"订阅",
Args["channel", str],
)
)
@cmd_subscribe.handle()
@ -23,10 +24,12 @@ async def _(target: DepLongTaskTarget, channel: str):
await target.send_message(f"已经订阅过「{channel}」了")
cmd_list = on_alconna(Alconna(
"re:(?:查询|我的|获取)订阅(列表)?",
Args["page?", int],
))
cmd_list = on_alconna(
Alconna(
"re:(?:查询|我的|获取)订阅(列表)?",
Args["page?", int],
)
)
def better_channel_message(channel_id: str) -> str:
@ -39,17 +42,24 @@ def better_channel_message(channel_id: str) -> str:
@cmd_list.handle()
async def _(target: DepLongTaskTarget, page: int = 1):
async with dep_poster_service() as service:
result = await service.get_channels(target, PagerQuery(
page_index=page,
page_size=10,
))
await target.send_message(result.to_unimessage(title="订阅列表", formatter=better_channel_message))
result = await service.get_channels(
target,
PagerQuery(
page_index=page,
page_size=10,
),
)
await target.send_message(
result.to_unimessage(title="订阅列表", formatter=better_channel_message)
)
cmd_list_available = on_alconna(Alconna(
"re:(查询)?可用订阅(列表)?",
Args["page?", int],
))
cmd_list_available = on_alconna(
Alconna(
"re:(查询)?可用订阅(列表)?",
Args["page?", int],
)
)
@cmd_list_available.handle()
@ -58,13 +68,17 @@ async def _(target: DepLongTaskTarget, page: int = 1):
page_index=page,
page_size=10,
).apply(sorted(POSTER_INFO_DATA.keys()))
await target.send_message(result.to_unimessage(title="可用订阅列表", formatter=better_channel_message))
await target.send_message(
result.to_unimessage(title="可用订阅列表", formatter=better_channel_message)
)
cmd_unsubscribe = on_alconna(Alconna(
"取消订阅",
Args["channel", str],
))
cmd_unsubscribe = on_alconna(
Alconna(
"取消订阅",
Args["channel", str],
)
)
@cmd_unsubscribe.handle()
@ -79,6 +93,7 @@ async def _(target: DepLongTaskTarget, channel: str):
driver = nonebot.get_driver()
@driver.on_startup
async def _():
async with dep_poster_service() as service:

View File

@ -4,8 +4,7 @@ from nonebot.internal.adapter.event import Event
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_apscheduler import scheduler
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
from konabot.plugins.poster.service import broadcast
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
register_poster_info(
"二十四节气",
@ -98,4 +97,3 @@ async def _(event: Event):
msg = UniMessage.text(f"现在的节气是{date.term}")
await msg.send(event)

View File

@ -1,20 +1,23 @@
import asyncio
from nonebot import get_driver
from nonebot_plugin_alconna import UniMessage
from konabot.plugins.poster.poster_info import register_poster_info, PosterInfo
from konabot.plugins.poster.service import broadcast
from konabot.common.subscribe import register_poster_info, PosterInfo, broadcast
CHANNEL_STARTUP = "启动通知"
register_poster_info(CHANNEL_STARTUP, PosterInfo(
aliases=set(),
description="当 Bot 重启时告知",
))
register_poster_info(
CHANNEL_STARTUP,
PosterInfo(
aliases=set(),
description="当 Bot 重启时告知",
),
)
driver = get_driver()
@driver.on_startup
async def _():
# 要尽量保证接受讯息的服务存在
@ -30,4 +33,3 @@ async def _():
await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了"))
asyncio.create_task(task())

View File

@ -0,0 +1,210 @@
import copy
import re
from pathlib import Path
import nonebot
from nonebot import on_command
from nonebot.adapters import Bot, Event, Message
from nonebot.log import logger
from nonebot.message import handle_event
from nonebot.params import CommandArg
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget
ROOT_PATH = Path(__file__).resolve().parent
cmd = on_command(cmd="语法糖", aliases={"", "sugar"}, block=True)
db_manager = DatabaseManager()
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
await init_db()
@driver.on_shutdown
async def register_shutdown_hook():
await db_manager.close_all_connections()
async def init_db():
await db_manager.execute_by_sql_file(ROOT_PATH / "sql" / "create_table.sql")
table_info = await db_manager.query("PRAGMA table_info(syntactic_sugar)")
columns = {str(row.get("name")) for row in table_info}
if "channel_id" not in columns:
await db_manager.execute(
"ALTER TABLE syntactic_sugar ADD COLUMN channel_id VARCHAR(255) NOT NULL DEFAULT ''"
)
await db_manager.execute("DROP INDEX IF EXISTS idx_syntactic_sugar_name_belong_to")
await db_manager.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target "
"ON syntactic_sugar(name, channel_id, belong_to)"
)
def _extract_reply_plain_text(evt: Event) -> str:
reply = getattr(evt, "reply", None)
if reply is None:
return ""
reply_message = getattr(reply, "message", None)
if reply_message is None:
return ""
extract_plain_text = getattr(reply_message, "extract_plain_text", None)
if callable(extract_plain_text):
return extract_plain_text().strip()
return str(reply_message).strip()
def _split_variables(tokens: list[str]) -> tuple[list[str], dict[str, str]]:
positional: list[str] = []
named: dict[str, str] = {}
for token in tokens:
if "=" in token:
key, value = token.split("=", 1)
key = key.strip()
if key:
named[key] = value
continue
positional.append(token)
return positional, named
def _render_template(content: str, positional: list[str], named: dict[str, str]) -> str:
def replace(match: re.Match[str]) -> str:
key = match.group(1).strip()
if key.isdigit():
idx = int(key) - 1
if 0 <= idx < len(positional):
return positional[idx]
return match.group(0)
return named.get(key, match.group(0))
return re.sub(r"\{([^{}]+)\}", replace, content)
async def _store_sugar(name: str, content: str, belong_to: str, channel_id: str):
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_sugar.sql",
(name, content, belong_to, channel_id),
)
async def _delete_sugar(name: str, belong_to: str, channel_id: str):
await db_manager.execute(
"DELETE FROM syntactic_sugar WHERE name = ? AND belong_to = ? AND channel_id = ?",
(name, belong_to, channel_id),
)
async def _find_sugar(name: str, belong_to: str, channel_id: str) -> str | None:
rows = await db_manager.query(
(
"SELECT content FROM syntactic_sugar "
"WHERE name = ? AND channel_id = ? "
"ORDER BY CASE WHEN belong_to = ? THEN 0 ELSE 1 END, id ASC "
"LIMIT 1"
),
(name, channel_id, belong_to),
)
if not rows:
return None
return rows[0].get("content")
async def _reinject_command(bot: Bot, evt: Event, command_text: str) -> bool:
depth = int(getattr(evt, "_syntactic_sugar_depth", 0))
if depth >= 3:
return False
try:
cloned_evt = copy.deepcopy(evt)
except Exception:
logger.exception("语法糖克隆事件失败")
return False
message = getattr(cloned_evt, "message", None)
if message is None:
return False
try:
msg_obj = type(message)(command_text)
except Exception:
msg_obj = command_text
setattr(cloned_evt, "message", msg_obj)
if hasattr(cloned_evt, "original_message"):
setattr(cloned_evt, "original_message", msg_obj)
if hasattr(cloned_evt, "raw_message"):
setattr(cloned_evt, "raw_message", command_text)
setattr(cloned_evt, "_syntactic_sugar_depth", depth + 1)
try:
await handle_event(bot, cloned_evt)
except Exception:
logger.exception("语法糖回注事件失败")
return False
return True
@cmd.handle()
async def _(bot: Bot, evt: Event, target: DepLongTaskTarget, args: Message = CommandArg()):
raw = args.extract_plain_text().strip()
if not raw:
return
tokens = raw.split()
action = tokens[0]
target_id = target.target_id
channel_id = target.channel_id
if action == "存入":
if len(tokens) < 2:
await cmd.finish("请提供要存入的名称")
name = tokens[1].strip()
content = " ".join(tokens[2:]).strip()
if not content:
content = _extract_reply_plain_text(evt)
if not content:
await cmd.finish("请提供要存入的内容")
await _store_sugar(name, content, target_id, channel_id)
await cmd.finish(f"糖已存入:「{name}」!")
if action == "删除":
if len(tokens) < 2:
await cmd.finish("请提供要删除的名称")
name = tokens[1].strip()
await _delete_sugar(name, target_id, channel_id)
await cmd.finish(f"已删除糖:「{name}」!")
if action == "查看":
if len(tokens) < 2:
await cmd.finish("请提供要查看的名称")
name = tokens[1].strip()
content = await _find_sugar(name, target_id, channel_id)
if content is None:
await cmd.finish(f"没有糖:「{name}")
await cmd.finish(f"糖的内容:「{content}")
name = action
content = await _find_sugar(name, target_id, channel_id)
if content is None:
await cmd.finish(f"没有糖:「{name}")
positional, named = _split_variables(tokens[1:])
rendered = _render_template(content, positional, named)
ok = await _reinject_command(bot, evt, rendered)
if not ok:
await cmd.finish(f"糖的展开结果:「{rendered}")

View File

@ -0,0 +1,12 @@
-- 创建语法糖表
CREATE TABLE IF NOT EXISTS syntactic_sugar (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
belong_to VARCHAR(255) NOT NULL,
channel_id VARCHAR(255) NOT NULL DEFAULT ''
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target
ON syntactic_sugar(name, channel_id, belong_to);

View File

@ -0,0 +1,5 @@
-- 插入语法糖,如果同一用户下名称已存在则更新内容
INSERT INTO syntactic_sugar (name, content, belong_to, channel_id)
VALUES (?, ?, ?, ?)
ON CONFLICT(name, channel_id, belong_to) DO UPDATE SET
content = excluded.content;

View File

@ -0,0 +1,35 @@
import re
import nonebot
from nonebot.adapters import Event
from nonebot_plugin_alconna import UniMessage, UniMsg
from konabot.common.nb import match_keyword
from konabot.common.permsys import register_default_allow_permission, require_permission
from konabot.plugins.trpg_roll.core import RollError, roll_expression
PERMISSION_KEY = "trpg.roll"
register_default_allow_permission(PERMISSION_KEY)
matcher = nonebot.on_message(
rule=match_keyword.match_keyword(re.compile(r"^roll(?:\s+.+)?$", re.I))
& require_permission(PERMISSION_KEY),
)
@matcher.handle()
async def _(event: Event, msg: UniMsg):
text = msg.extract_plain_text().strip()
expr = text[4:].strip()
if not expr:
await UniMessage.text("用法roll 3d6 / roll d20+5 / roll 2d8+1d4+3 / roll 4dF").send(event)
return
try:
result = roll_expression(expr)
await UniMessage.text(result.format()).send(event)
except RollError as e:
await UniMessage.text(str(e)).send(event)

View File

@ -0,0 +1,143 @@
import random
import re
from dataclasses import dataclass
MAX_DICE_COUNT = 100
MAX_DICE_FACES = 1000
MAX_TERM_COUNT = 20
MAX_TOTAL_ROLLS = 200
MAX_EXPRESSION_LENGTH = 200
MAX_MESSAGE_LENGTH = 1200
_TOKEN_RE = re.compile(r"([+-]?)(\d*d(?:%|[fF]|\d+)|\d+)")
_DICE_RE = re.compile(r"(?i)(\d*)d(%|f|\d+)")
# 常见跑团表达式示例3d6、d20+5、2d8+1d4+3、d%、4dF
class RollError(ValueError):
pass
@dataclass(slots=True)
class RollTermResult:
sign: int
source: str
detail: str
value: int
@dataclass(slots=True)
class RollResult:
expression: str
total: int
terms: list[RollTermResult]
def format(self) -> str:
parts = [f"{term.source}={term.detail}" for term in self.terms]
detail = " ".join(parts)
text = f"{self.expression} = {self.total}"
if detail:
text += f"\n{detail}"
if len(text) > MAX_MESSAGE_LENGTH:
raise RollError("结果过长,请减少骰子数量或简化表达式")
return text
def _parse_single_term(raw: str, sign: int, rng: random.Random) -> RollTermResult:
dice_match = _DICE_RE.fullmatch(raw)
if dice_match:
count_text, faces_text = dice_match.groups()
count = int(count_text) if count_text else 1
if count <= 0:
raise RollError("骰子个数必须大于 0")
if count > MAX_DICE_COUNT:
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
if faces_text == "%":
faces = 100
rolls = [rng.randint(1, 100) for _ in range(count)]
elif faces_text.lower() == "f":
rolls = [rng.choice((-1, 0, 1)) for _ in range(count)]
total = sum(rolls) * sign
signed = "+" if sign > 0 else "-"
return RollTermResult(
sign=sign,
source=f"{signed}{count}dF",
detail="[" + ", ".join(f"{v:+d}" for v in rolls) + "]",
value=total,
)
else:
faces = int(faces_text)
if faces <= 0:
raise RollError("骰子面数必须大于 0")
if faces > MAX_DICE_FACES:
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
rolls = [rng.randint(1, faces) for _ in range(count)]
total = sum(rolls) * sign
signed = "+" if sign > 0 else "-"
return RollTermResult(
sign=sign,
source=f"{signed}{count}d{faces_text.upper()}",
detail="[" + ", ".join(map(str, rolls)) + "]",
value=total,
)
value = int(raw) * sign
signed = "+" if sign > 0 else "-"
return RollTermResult(sign=sign, source=f"{signed}{raw}", detail=str(abs(value)), value=value)
def roll_expression(expr: str, rng: random.Random | None = None) -> RollResult:
expr = expr.strip().replace(" ", "")
if not expr:
raise RollError("请提供要掷的表达式,例如 roll 3d6 或 roll d20+5")
if len(expr) > MAX_EXPRESSION_LENGTH:
raise RollError("表达式太长了")
matches = list(_TOKEN_RE.finditer(expr))
if not matches:
raise RollError("无法解析表达式,请使用如 3d6、d20+5、2d8+1d4+3、4dF 这样的格式")
rebuilt = "".join(m.group(0) for m in matches)
if rebuilt != expr:
raise RollError("表达式中含有无法识别的内容")
if len(matches) > MAX_TERM_COUNT:
raise RollError(f"表达式项数不能超过 {MAX_TERM_COUNT}")
total_rolls = 0
for m in matches:
token = m.group(2)
dice_match = _DICE_RE.fullmatch(token)
if dice_match:
count_text, faces_text = dice_match.groups()
count = int(count_text) if count_text else 1
if count <= 0:
raise RollError("骰子个数必须大于 0")
if count > MAX_DICE_COUNT:
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
if faces_text not in {"%", "f", "F"}:
faces = int(faces_text)
if faces <= 0:
raise RollError("骰子面数必须大于 0")
if faces > MAX_DICE_FACES:
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
total_rolls += count
if total_rolls > MAX_TOTAL_ROLLS:
raise RollError(f"一次最多只能实际掷 {MAX_TOTAL_ROLLS} 个骰子")
rng = rng or random.Random()
terms: list[RollTermResult] = []
total = 0
for idx, match in enumerate(matches):
sign_text, raw = match.groups()
sign = -1 if sign_text == "-" else 1
if idx == 0 and sign_text == "":
sign = 1
term = _parse_single_term(raw, sign, rng)
terms.append(term)
total += term.value
return RollResult(expression=expr, total=total, terms=terms)

View File

@ -0,0 +1,115 @@
import datetime
from nonebot_plugin_alconna import UniMessage
from konabot.common.apis.wolfx import CencEewReport, CencEqReport, wolfx_api
from konabot.common.subscribe import PosterInfo, broadcast, register_poster_info
provinces_short = [
"北京",
"天津",
"河北",
"山西",
"内蒙古",
"辽宁",
"吉林",
"黑龙江",
"上海",
"江苏",
"浙江",
"安徽",
"福建",
"江西",
"山东",
"河南",
"湖北",
"湖南",
"广东",
"广西",
"海南",
"重庆",
"四川",
"贵州",
"云南",
"西藏",
"陕西",
"甘肃",
"青海",
"宁夏",
"新疆",
"香港",
"澳门",
"台湾",
]
register_poster_info(
"中国地震台网地震速报",
PosterInfo(
aliases={
"地震速报",
"地震预警",
},
description="来自中国地震台网的地震速报",
),
)
CENC_EEW_DISABLED = True
if not CENC_EEW_DISABLED:
@wolfx_api.cenc_eew.append
async def broadcast_eew(report: CencEewReport):
# 这个好像没那么准确...
is_cn = any(report.HypoCenter.startswith(prefix) for prefix in provinces_short)
if (is_cn and report.Magnitude >= 4.2) or (
(not is_cn) and report.Magnitude >= 7.0
):
# 这是中国地震台网网站上,会默认展示的地震信息的等级
origin_time_dt = datetime.datetime.strptime(
report.OriginTime, "%Y-%m-%d %H:%M:%S"
)
origin_time_str = (
f"{origin_time_dt.month}"
f"{origin_time_dt.day}"
f"{origin_time_dt.hour}"
f"{origin_time_dt.minute}"
)
# vvv 下面这个其实不准确
eid_in_link = report.EventID.split(".")[0]
link = f"https://www.cenc.ac.cn/earthquake-manage-publish-web/product-list/{eid_in_link}/summarize"
msg = UniMessage.text(
"据中国地震台网中心 (https://www.cenc.ac.cn/) 报道,"
f"北京时间{origin_time_str}"
f"{report.HypoCenter}发生{report.Magnitude:.1f}级地震。"
f"震源位于 {report.Longitude}° {report.Latitude}°,深度 {report.Depth}km。\n\n"
f"详细信息请见 {link}"
)
await broadcast("中国地震台网地震速报", msg)
@wolfx_api.cenc_eqlist.append
async def broadcast_cenc_eqlist(report: CencEqReport):
is_cn = any(report.location.startswith(prefix) for prefix in provinces_short)
if (is_cn and float(report.magnitude) >= 4.2) or (
(not is_cn) and float(report.magnitude) >= 7.0
):
origin_time_dt = datetime.datetime.strptime(report.time, "%Y-%m-%d %H:%M:%S")
origin_time_str = (
f"{origin_time_dt.month}"
f"{origin_time_dt.day}"
f"{origin_time_dt.hour}"
f"{origin_time_dt.minute}"
)
msg = UniMessage.text(
"据中国地震台网中心 (https://www.cenc.ac.cn/) 消息,"
f"北京时间{origin_time_str}"
f"{report.location}发生{report.magnitude}级地震。"
f"震源位于 {report.longtitude}° {report.latitude}°,深度 {report.depth}km。\n\n"
f"数据来源于 Wolfx OpenAPI事件 ID: {report.EventID}"
)
await broadcast("中国地震台网地震速报", msg)

25
poetry.lock generated
View File

@ -4067,6 +4067,29 @@ type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "pytest-mock"
version = "3.15.1"
description = "Thin-wrapper around the mock package for easier use with pytest"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d"},
{file = "pytest_mock-3.15.1.tar.gz", hash = "sha256:1849a238f6f396da19762269de72cb1814ab44416fa73a8686deac10b0d87a0f"},
]
[package.dependencies]
pytest = ">=6.2.5"
[package.extras]
dev = ["pre-commit", "pytest-asyncio", "tox"]
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "python-dotenv"
version = "1.2.2"
@ -5311,4 +5334,4 @@ reference = "mirrors"
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<4.0"
content-hash = "f2d5345d93636e19e49852af636d598c88394aaef4020f383402394b58452e3d"
content-hash = "23d2eadd1c36d017ff77934bd02b56e37d26005e6a99793b623c01162b90d67d"

View File

@ -37,6 +37,8 @@ dependencies = [
"pytest (>=8.0.0,<9.0.0)",
"nonebug (>=0.4.3,<0.5.0)",
"pytest-cov (>=7.0.0,<8.0.0)",
"aiosignal (>=1.4.0,<2.0.0)",
"pytest-mock (>=3.15.1,<4.0.0)",
]
[tool.poetry]

View File

@ -12,8 +12,22 @@ def filter(change: Change, path: str) -> bool:
return False
if Path(path).absolute().is_relative_to((base / ".git").absolute()):
return False
if Path(path).absolute().is_relative_to((base / "assets" / "oracle" / "image").absolute()):
if (
Path(path)
.absolute()
.is_relative_to((base / "assets" / "oracle" / "image").absolute())
):
# 还要解决坏枪的这个问题
return False
if Path(path).absolute().is_relative_to((base / "htmlcov").absolute()):
return False
if Path(path).absolute().is_relative_to((base / "test").absolute()):
return False
if Path(path).absolute().is_relative_to((base / ".pytest_cache").absolute()):
return False
if Path(path).absolute().is_relative_to((base / ".ruff_cache").absolute()):
return False
if path.endswith(".coverage"):
return False
print(path)
return True

View File

@ -0,0 +1,78 @@
import json
from unittest.mock import AsyncMock
import pytest
from konabot.common.apis.wolfx import CencEewReport, WolfxAPIService, WolfxWebSocket
obj_example = {
"ID": "bacby4yab1oyb",
"EventID": "202603100805.0001",
"ReportTime": "2026-03-10 08:05:29",
"ReportNum": 1,
"OriginTime": "2026-03-10 08:05:29",
"HypoCenter": "新疆昌吉州呼图壁县",
"Latitude": 43.687,
"Longitude": 86.427,
"Magnitude": 4.0,
"Depth": 14,
"MaxIntensity": 5,
}
@pytest.mark.asyncio
async def test_wolfx_websocket_handle():
ws = WolfxWebSocket("")
mock_callback = AsyncMock()
ws.signal.append(mock_callback)
ws.signal.freeze()
obj1 = {
"type": "heartbeat",
"ver": 18,
"id": "a69edf6436c5b605",
"timestamp": 1773111106701,
}
data1 = json.dumps(obj1).encode()
await ws.handle(data1)
mock_callback.assert_not_called()
mock_callback.reset_mock()
obj2 = obj_example
data2 = json.dumps(obj2).encode()
await ws.handle(data2)
mock_callback.assert_called_once_with(data2)
mock_callback.reset_mock()
data3 = b"what the f"
await ws.handle(data3)
mock_callback.assert_not_called()
@pytest.mark.asyncio
async def test_wolfx_bind_pydantic():
sv = WolfxAPIService()
called: list[CencEewReport] = []
@sv.cenc_eew.append
async def _(data: CencEewReport):
called.append(data)
sv._cenc_eew_ws.signal.freeze()
sv.cenc_eew.freeze()
data = json.dumps(obj_example).encode()
await sv._cenc_eew_ws.signal.send(data)
assert len(called) == 1
data = called[0]
assert data.HypoCenter == obj_example["HypoCenter"]
assert data.EventID == obj_example["EventID"]
# Don't panic when the object is invalid
data = json.dumps({"type": ""}).encode()
await sv._cenc_eew_ws.signal.send(data)
assert len(called) == 1

88
tests/test_fx_process.py Normal file
View File

@ -0,0 +1,88 @@
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
import nonebot
from PIL import Image
nonebot.init()
MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/fx_handle.py"
SPEC = spec_from_file_location("test_fx_handle_module", MODULE_PATH)
assert SPEC is not None and SPEC.loader is not None
fx_handle = module_from_spec(SPEC)
SPEC.loader.exec_module(fx_handle)
ImageFilterImplement = fx_handle.ImageFilterImplement
INIT_MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/__init__.py"
INIT_SPEC = spec_from_file_location("test_fx_init_module", INIT_MODULE_PATH)
assert INIT_SPEC is not None and INIT_SPEC.loader is not None
fx_init = module_from_spec(INIT_SPEC)
INIT_SPEC.loader.exec_module(fx_init)
prase_input_args = fx_init.prase_input_args
def test_apply_jpeg_damage_keeps_size_and_rgba_mode():
image = Image.new("RGBA", (32, 24), (255, 0, 0, 128))
result = ImageFilterImplement.apply_jpeg_damage(image, 5)
assert result.size == image.size
assert result.mode == "RGBA"
assert result.getchannel("A").getextrema() == (128, 128)
def test_apply_jpeg_damage_clamps_quality_range():
image = Image.new("RGB", (16, 16), (123, 222, 111))
low = ImageFilterImplement.apply_jpeg_damage(image, -10)
high = ImageFilterImplement.apply_jpeg_damage(image, 999)
assert low.size == image.size
assert high.size == image.size
assert low.mode == "RGBA"
assert high.mode == "RGBA"
def test_apply_resize_clamps_small_result_to_at_least_one_pixel():
image = Image.new("RGBA", (10, 10), (255, 0, 0, 255))
result = ImageFilterImplement.apply_resize(image, 0.01)
assert result.size == (1, 1)
def test_apply_resize_negative_x_with_positive_y_only_mirrors_horizontally():
image = Image.new("RGBA", (2, 1))
image.putpixel((0, 0), (255, 0, 0, 255))
image.putpixel((1, 0), (0, 0, 255, 255))
result = ImageFilterImplement.apply_resize(image, -1, 1)
assert result.size == (2, 1)
assert result.getpixel((0, 0)) == (0, 0, 255, 255)
assert result.getpixel((1, 0)) == (255, 0, 0, 255)
def test_apply_resize_negative_scale_without_y_flips_both_axes():
image = Image.new("RGBA", (2, 2))
image.putpixel((0, 0), (255, 0, 0, 255))
image.putpixel((1, 0), (0, 255, 0, 255))
image.putpixel((0, 1), (0, 0, 255, 255))
image.putpixel((1, 1), (255, 255, 0, 255))
result = ImageFilterImplement.apply_resize(image, -1)
assert result.size == (2, 2)
assert result.getpixel((0, 0)) == (255, 255, 0, 255)
assert result.getpixel((1, 0)) == (0, 0, 255, 255)
assert result.getpixel((0, 1)) == (0, 255, 0, 255)
assert result.getpixel((1, 1)) == (255, 0, 0, 255)
def test_prase_input_args_parses_resize_second_argument_as_float():
filters = prase_input_args("缩放 2 3")
assert len(filters) == 1
assert filters[0].name == "缩放"
assert filters[0].args == [2.0, 3.0]

View File

@ -0,0 +1,40 @@
from contextlib import asynccontextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from konabot.common.database import DatabaseManager
from konabot.common.permsys import PermManager, register_default_allow_permission
from konabot.common.permsys.entity import PermEntity
from konabot.common.permsys.migrates import execute_migration
@asynccontextmanager
async def tempdb():
with TemporaryDirectory() as _tempdir:
tempdir = Path(_tempdir)
db = DatabaseManager(tempdir / "perm.sqlite3")
yield db
await db.close_all_connections()
@pytest.mark.asyncio
async def test_register_default_allow_permission_records_key():
register_default_allow_permission("test.default.allow")
async with tempdb() as db:
async with db.get_conn() as conn:
await execute_migration(conn)
pm = PermManager(db)
await pm.update_permission(
PermEntity("sys", "global", "global"),
"test.default.allow",
True,
)
assert await pm.check_has_permission(
[PermEntity("dummy", "user", "1"), PermEntity("sys", "global", "global")],
"test.default.allow.sub",
)

66
tests/test_trpg_roll.py Normal file
View File

@ -0,0 +1,66 @@
import random
import pytest
from konabot.plugins.trpg_roll.core import RollError, roll_expression
class FakeRandom:
def __init__(self, randint_values: list[int] | None = None, choice_values: list[int] | None = None):
self._randint_values = list(randint_values or [])
self._choice_values = list(choice_values or [])
def randint(self, _a: int, _b: int) -> int:
assert self._randint_values
return self._randint_values.pop(0)
def choice(self, _seq):
assert self._choice_values
return self._choice_values.pop(0)
def test_roll_expression_basic():
rng = FakeRandom(randint_values=[2, 4, 5])
result = roll_expression("3d6", rng=rng)
assert result.total == 11
assert result.format() == "3d6 = 11\n+3d6=[2, 4, 5]"
def test_roll_expression_multiple_terms():
rng = FakeRandom(randint_values=[14, 3, 1])
result = roll_expression("d20+1d4-2", rng=rng)
assert result.total == 15
assert result.format() == "d20+1d4-2 = 15\n+1d20=[14] +1d4=[3] -2=2"
def test_roll_expression_df():
rng = FakeRandom(choice_values=[-1, 0, 1, 1])
result = roll_expression("4dF", rng=rng)
assert result.total == 1
assert result.format() == "4dF = 1\n+4dF=[-1, +0, +1, +1]"
@pytest.mark.parametrize(
("expr", "message"),
[
("", "请提供要掷的表达式"),
("abc", "无法解析表达式"),
("1d0", "骰子面数必须大于 0"),
("0d6", "骰子个数必须大于 0"),
("101d6", "单项最多只能掷 100 个骰子"),
("1d1001", "骰子面数不能超过 1000"),
("201d1", "单项最多只能掷 100 个骰子"),
("1d6*2", "表达式中含有无法识别的内容"),
],
)
def test_roll_expression_invalid(expr: str, message: str):
with pytest.raises(RollError, match=message):
roll_expression(expr, rng=random.Random(0))
def test_roll_expression_total_roll_limit():
with pytest.raises(RollError, match="一次最多只能实际掷 200 个骰子"):
roll_expression("100d6+100d6+1d6", rng=random.Random(0))