Compare commits

..

26 Commits

Author SHA1 Message Date
d37c4870d8 Merge branch 'master' into feature/sugar 2026-03-18 17:38:59 +08:00
23b9f101b3 语法糖 2026-03-18 17:29:42 +08:00
8c1651ad3d 忘记 await 相关权限了,导致永远判 True
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-18 16:29:36 +08:00
ff60642c62 Merge pull request 'feat: add TRPG roll command' (#59) from pi-agent/konabot:feat/trpg-roll into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #59
Reviewed-by: 钟晓帕 <Passthem183@gmail.com>
2026-03-14 02:19:15 +08:00
69b5908445 refactor: narrow trpg roll message matching 2026-03-14 02:17:20 +08:00
a542ed1fd9 feat: add TRPG roll command 2026-03-14 02:02:41 +08:00
e86a385448 Merge pull request 'fix: parse fx resize y scale argument' (#58) from pi-agent/konabot:fix/fx-resize-arg-parsing into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #58
Reviewed-by: 钟晓帕 <Passthem183@gmail.com>
2026-03-14 01:27:40 +08:00
d4bb36a074 fix: parse fx resize y scale argument 2026-03-14 01:26:16 +08:00
1a2a3c0468 Merge pull request 'fix: correct fx resize behavior' (#57) from pi-agent/konabot:fix/fx-resize-behavior into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #57
Reviewed-by: 钟晓帕 <Passthem183@gmail.com>
2026-03-14 01:11:44 +08:00
67502cb932 fix: correct fx resize behavior 2026-03-14 01:07:24 +08:00
f9a312b80a Merge pull request 'feat: add JPEG damage filter to fx' (#56) from pi-agent/konabot:feat/fx-jpeg-damage into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #56
2026-03-14 01:01:38 +08:00
1980f8a895 feat: add jpeg damage filter to fx 2026-03-14 00:52:34 +08:00
d273ed4b1a 放宽 wolfx api 限制
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-12 20:33:03 +08:00
265e9cc583 改为使用中国地震台网的正式报
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-10 21:58:42 +08:00
8f5061ba41 wolfx api
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-10 11:39:41 +08:00
b3c3c77f3c 添加 Ignore 2026-03-10 11:16:23 +08:00
6a84ce2cd8 提供订阅模块的文档
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-09 14:56:31 +08:00
392c699b33 移动 poster 模块到 common
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-09 14:40:27 +08:00
72e21cd9aa 添加多字符喵对一些符号的响应 2026-03-09 13:46:56 +08:00
f3389ff2b9 添加服务器管理相关,以及 cronjob
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 03:34:14 +08:00
e59d3c2e4b 哎哟喂这个文件怎么没交
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 00:40:11 +08:00
31d19b7ec0 我没辙了直接把测试打包进去吧
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-07 18:41:59 +08:00
c2f677911d 添加一些权限目标
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:36:51 +08:00
f5b81319f8 konaph 接入权限系统
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:15:28 +08:00
870e2383d8 为 Drone 提供单元测试目录 2026-03-07 18:15:16 +08:00
7e8fa45f36 Merge pull request '权限系统' (#55) from feature/permsystem into master
Some checks failed
continuous-integration/drone/push Build is failing
Reviewed-on: #55
2026-03-07 17:55:27 +08:00
45 changed files with 1875 additions and 318 deletions

View File

@ -39,7 +39,7 @@ steps:
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov-report term-missing:skip-covered
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov=./konabot/ --cov-report term-missing:skip-covered
- name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy
when:

10
.gitignore vendored
View File

@ -3,9 +3,14 @@
/data
/pyrightconfig.json
/pyrightconfig.toml
/uv.lock
# 缓存文件
__pycache__
/.ruff_cache
/.pytest_cache
/.mypy_cache
/.black_cache
# 可能会偶然生成的 diff 文件
/*.diff
@ -14,3 +19,8 @@ __pycache__
/.coverage
/.coverage.db
/htmlcov
# 对手动创建虚拟环境的人
/.venv
/venv
*.egg-info

View File

@ -1,3 +1,5 @@
{
"python.REPL.enableREPLSmartSend": false
"python.REPL.enableREPLSmartSend": false,
"python-envs.defaultEnvManager": "ms-python.python:poetry",
"python-envs.defaultPackageManager": "ms-python.python:poetry"
}

View File

@ -61,6 +61,7 @@ COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets
COPY scripts ./scripts
COPY konabot ./konabot
COPY tests ./tests
ENV PYTHONPATH=/app

5
bot.py
View File

@ -7,6 +7,7 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
from konabot.common.appcontext import run_afterinit_functions
from konabot.common.log import init_logger
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.path import LOG_PATH
@ -56,9 +57,7 @@ def main():
nonebot.load_plugins("konabot/plugins")
nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
from konabot.common import permsys
permsys.create_startup()
run_afterinit_functions()
# 注册关闭钩子
@driver.on_shutdown

View File

@ -16,6 +16,7 @@
- `konabot/common/permsys/__init__.py`
- 暴露 `PermManager``DepPermManager``require_permission`
- 负责数据库初始化、启动迁移、超级管理员默认授权
- 提供 `register_default_allow_permission()` 用于注册“启动时默认放行”的权限键
- `konabot/common/permsys/entity.py`
- 定义 `PermEntity`
- 将事件转换为可查询的实体链
@ -134,6 +135,14 @@ PermEntity("ob11", "user", str(account)), "*", True
也就是说,配置中的超级管理员会直接拥有全部权限。
此外,模块也支持插件在导入阶段通过 `register_default_allow_permission("some.key")` 注册默认放行的权限键;这些键会在启动时被写入到:
```python
PermEntity("sys", "global", "global"), "some.key", True
```
这适合“默认所有人可用,但仍希望后续能被权限系统单独关闭”的功能。
这属于启动时自动灌入的保底策略,不依赖手工授权命令。
## 在插件中使用

37
docs/subscribe.md Normal file
View File

@ -0,0 +1,37 @@
# subscribe 模块
一套统一的接口,让用户可以订阅一些延迟或者定时消息。
```python
import asyncio
from pathlib import Path
from konabot.common.subscribe import register_poster_info, broadcast, PosterInfo
from nonebot_plugin_alconna import UniMessage
# 注册了服务信息,用户可以用「查询可用订阅」指令了解可用的订阅清单。
# 用户可以使用「订阅 某某服务通知」或者「订阅 某某服务」来订阅消息。
# 如果用户在群聊发起订阅,则会在 QQ 群订阅,不然会在私聊订阅
register_poster_info("某某服务通知", PosterInfo(
aliases={"某某服务"},
description="告诉你关于某某的最新资讯等信息",
))
async def main():
while True:
# 这里的服务 channel 名字必须填写该服务的名字,不可以是 alias
# 这会给所有订阅了该通道的用户发送「向大家发送纯文本通知」
await broadcast("某某服务通知", "向大家发送纯文本通知")
# 也可以发送 UniMessage 对象,可以构造包含图片的通知等
data = Path('image.png').read_bytes()
await broadcast(
"某某服务通知",
UniMessage.text("很遗憾告诉大家,我们倒闭了:").image(raw=data),
)
await asyncio.sleep(114.514)
```
该模块的代码请查阅 `/konabot/common/subscribe/` 下的文件。

View File

@ -0,0 +1,281 @@
"""
Wolfx 防灾免费 API
"""
import asyncio
import json
from typing import Literal, TypeVar, cast
import aiohttp
from aiosignal import Signal
from loguru import logger
from pydantic import BaseModel, RootModel
import pydantic
from konabot.common.appcontext import after_init
class ScEewReport(BaseModel):
"""
四川地震局报文
"""
ID: str
"EEW 发报 ID"
EventID: str
"EEW 发报事件 ID"
ReportTime: str
"EEW 发报时间(UTC+8)"
ReportNum: int
"EEW 发报数"
OriginTime: str
"发震时间(UTC+8)"
HypoCenter: str
"震源地"
Latitude: float
"震源地纬度"
Longitude: float
"震源地经度"
Magnitude: float
"震级"
Depth: float | None
"震源深度"
MaxIntensity: float
"最大烈度"
class CencEewReport(BaseModel):
"""
中国地震台网报文
"""
ID: str
"EEW 发报 ID"
EventID: str
"EEW 发报事件 ID"
ReportTime: str
"EEW 发报时间(UTC+8)"
ReportNum: int
"EEW 发报数"
OriginTime: str
"发震时间(UTC+8)"
HypoCenter: str
"震源地"
Latitude: float
"震源地纬度"
Longitude: float
"震源地经度"
Magnitude: float
"震级"
Depth: float | None
"震源深度"
MaxIntensity: float
"最大烈度"
class CencEqReport(BaseModel):
type: str
"报告类型"
EventID: str
"事件 ID"
time: str
"UTC+8 格式的地震发生时间"
location: str
"地震发生位置"
magnitude: str
"震级"
depth: str
"地震深度"
latitude: str
"纬度"
longtitude: str
"经度"
intensity: str
"烈度"
class CencEqlist(RootModel):
root: dict[str, CencEqReport]
class WolfxWebSocket:
def __init__(self, url: str) -> None:
self.url = url
self.signal: Signal[bytes] = Signal(self)
self._running = False
self._task: asyncio.Task | None = None
self._session: aiohttp.ClientSession | None = None
self._ws: aiohttp.ClientWebSocketResponse | None = None
@property
def session(self) -> aiohttp.ClientSession: # pragma: no cover
assert self._session is not None
return self._session
async def start(self): # pragma: no cover
if self._running:
return
self._running = True
self._session = aiohttp.ClientSession()
self._task = asyncio.create_task(self._run())
self.signal.freeze()
async def stop(self): # pragma: no cover
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
if self._session:
await self._session.close()
async def _run(self): # pragma: no cover
retry_delay = 1
while self._running:
try:
async with self.session.ws_connect(self.url) as ws:
self._ws = ws
logger.info(f"Wolfx API 服务连接上了 {self.url} 的 WebSocket")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self.handle(cast(str, msg.data).encode())
elif msg.type == aiohttp.WSMsgType.BINARY:
await self.handle(cast(bytes, msg.data))
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logger.warning("连接 WebSocket 时发生错误")
logger.exception(e)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("Wolfx API 发生未知错误")
logger.exception(e)
self._ws = None
if self._running:
logger.info(f"Wolfx API 准备断线重连 {self.url}")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60)
async def handle(self, data: bytes):
try:
obj = json.loads(data)
except json.JSONDecodeError as e:
logger.warning("解析 Wolfs API 时出错")
logger.exception(e)
return
if obj.get("type") == "heartbeat" or obj.get("type") == "pong":
logger.debug(f"Wolfx API 收到了来自 {self.url} 的心跳: {obj}")
else:
await self.signal.send(data)
T = TypeVar("T", bound=BaseModel)
class WolfxAPIService:
sc_eew: Signal[ScEewReport]
"四川地震局地震速报"
cenc_eew: Signal[CencEewReport]
"中国地震台网地震速报"
cenc_eqlist: Signal[CencEqReport]
"中国地震台网地震信息发布"
def __init__(self) -> None:
self.sc_eew = Signal(self)
self._sc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/sc_eew")
WolfxAPIService.bind(self.sc_eew, self._sc_eew_ws, ScEewReport)
self.cenc_eew = Signal(self)
self._cenc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eew")
WolfxAPIService.bind(self.cenc_eew, self._cenc_eew_ws, CencEewReport)
self.cenc_eqlist = Signal(self)
self._cenc_eqlist_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eqlist")
WolfxAPIService.bind(self.cenc_eqlist, self._cenc_eqlist_ws, CencEqReport)
@staticmethod
def bind(signal: Signal[T], ws: WolfxWebSocket, t: type[T]):
@ws.signal.append
async def _(data: bytes):
try:
obj = t.model_validate_json(data)
logger.info(f"接收到来自 Wolfx API 的信息:{data}")
await signal.send(obj)
except pydantic.ValidationError as e:
logger.warning(f"解析 Wolfx API 时出错 URL={ws.url}")
logger.error(e)
async def start(self): # pragma: no cover
self.cenc_eew.freeze()
self.sc_eew.freeze()
self.cenc_eqlist.freeze()
async with asyncio.TaskGroup() as task_group:
if len(self.cenc_eew) > 0:
task_group.create_task(self._cenc_eew_ws.start())
if len(self.sc_eew) > 0:
task_group.create_task(self._sc_eew_ws.start())
if len(self.cenc_eqlist) > 0:
task_group.create_task(self._cenc_eqlist_ws.start())
async def stop(self): # pragma: no cover
async with asyncio.TaskGroup() as task_group:
task_group.create_task(self._cenc_eew_ws.stop())
task_group.create_task(self._sc_eew_ws.stop())
task_group.create_task(self._cenc_eqlist_ws.stop())
wolfx_api = WolfxAPIService()
@after_init
def init(): # pragma: no cover
import nonebot
driver = nonebot.get_driver()
@driver.on_startup
async def _():
await wolfx_api.start()
@driver.on_shutdown
async def _():
await wolfx_api.stop()

View File

@ -0,0 +1,15 @@
from typing import Any, Callable
AFTER_INIT_FUNCTION = Callable[[], Any]
_after_init_functions: list[AFTER_INIT_FUNCTION] = []
def after_init(func: AFTER_INIT_FUNCTION):
_after_init_functions.append(func)
def run_afterinit_functions(): # pragma: no cover
for f in _after_init_functions:
f()

View File

@ -4,6 +4,7 @@ from nonebot.adapters import Event
from nonebot.params import Depends
from nonebot.rule import Rule
from konabot.common.appcontext import after_init
from konabot.common.database import DatabaseManager
from konabot.common.pager import PagerQuery
from konabot.common.path import DATA_PATH
@ -13,6 +14,7 @@ from konabot.common.permsys.repo import PermRepo
db = DatabaseManager(DATA_PATH / "perm.sqlite3")
_default_allow_permissions: set[str] = set()
_EntityLike = Event | PermEntity | list[PermEntity]
@ -73,6 +75,7 @@ def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma:
return PermManager(_db)
@after_init
def create_startup(): # pragma: no cover
from konabot.common.nb.is_admin import cfg
@ -89,6 +92,10 @@ def create_startup(): # pragma: no cover
await pm.update_permission(
PermEntity("ob11", "user", str(account)), "*", True
)
for key in _default_allow_permissions:
await pm.update_permission(
PermEntity("sys", "global", "global"), key, True
)
@driver.on_shutdown
async def _():
@ -101,6 +108,10 @@ def create_startup(): # pragma: no cover
DepPermManager = Annotated[PermManager, Depends(perm_manager)]
def register_default_allow_permission(key: str):
_default_allow_permissions.add(key)
def require_permission(perm: str) -> Rule: # pragma: no cover
async def check_permission(event: Event, pm: DepPermManager) -> bool:
return await pm.check_has_permission(event, perm)

View File

@ -0,0 +1,11 @@
"""
Subscribe 模块,用于向一些订阅的频道广播消息
"""
from .service import broadcast as broadcast
from .service import dep_poster_service as dep_poster_service
from .service import DepPosterService as DepPosterService
from .service import PosterService as PosterService
from .subscribe_info import PosterInfo as PosterInfo
from .subscribe_info import POSTER_INFO_DATA as POSTER_INFO_DATA
from .subscribe_info import register_poster_info as register_poster_info

View File

@ -6,7 +6,8 @@ from pydantic import BaseModel, ValidationError
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
from konabot.common.path import DATA_PATH
from konabot.plugins.poster.repository import IPosterRepo
from .repository import IPosterRepo
class ChannelData(BaseModel):
@ -18,9 +19,9 @@ class PosterData(BaseModel):
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
if (target1.is_private_chat and not target2.is_private_chat):
if target1.is_private_chat and not target2.is_private_chat:
return False
if (target2.is_private_chat and not target1.is_private_chat):
if target2.is_private_chat and not target1.is_private_chat:
return False
if target1.platform != target2.platform:
return False
@ -58,7 +59,9 @@ class LocalPosterRepo(IPosterRepo):
len1 = len(self.data.channels[channel].targets)
return len0 != len1
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
async def get_subscribed_channels(
self, target: LongTaskTarget, pager: PagerQuery
) -> PagerResult[str]:
channels: list[str] = []
for channel_id, channel in self.data.channels.items():
for t in channel.targets:
@ -95,7 +98,9 @@ async def local_poster_data():
data = PosterData()
else:
try:
data = PosterData.model_validate_json(LOCAL_POSTER_DATA_PATH.read_text())
data = PosterData.model_validate_json(
LOCAL_POSTER_DATA_PATH.read_text()
)
except ValidationError:
data = PosterData()
yield data
@ -109,4 +114,3 @@ async def local_poster():
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]

View File

@ -4,9 +4,10 @@ from nonebot.params import Depends
from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.repo_local_data import local_poster
from konabot.plugins.poster.repository import IPosterRepo
from .subscribe_info import POSTER_INFO_DATA
from .repo_local_data import local_poster
from .repository import IPosterRepo
class PosterService:
@ -27,7 +28,9 @@ class PosterService:
channel = self.parse_channel_id(channel)
return await self.repo.remove_channel_target(channel, target)
async def broadcast(self, channel: str, message: UniMessage[Any] | str) -> list[LongTaskTarget]:
async def broadcast(
self, channel: str, message: UniMessage[Any] | str
) -> list[LongTaskTarget]:
channel = self.parse_channel_id(channel)
targets = await self.repo.get_channel_targets(channel)
for target in targets:
@ -35,7 +38,9 @@ class PosterService:
await target.send_message(message, at=False)
return targets
async def get_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
async def get_channels(
self, target: LongTaskTarget, pager: PagerQuery
) -> PagerResult[str]:
return await self.repo.get_subscribed_channels(target, pager)
async def fix_data(self):
@ -56,4 +61,3 @@ async def broadcast(channel: str, message: UniMessage[Any] | str):
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]

View File

@ -4,7 +4,7 @@ from dataclasses import dataclass, field
@dataclass
class PosterInfo:
aliases: set[str] = field(default_factory=set)
description: str = field(default='')
description: str = field(default="")
POSTER_INFO_DATA: dict[str, PosterInfo] = {}
@ -12,4 +12,3 @@ POSTER_INFO_DATA: dict[str, PosterInfo] = {}
def register_poster_info(channel: str, info: PosterInfo):
POSTER_INFO_DATA[channel] = info

View File

@ -76,6 +76,8 @@ fx [滤镜名称] <参数1> <参数2> ...
* ```fx 设置遮罩```
* ```fx 色键 <目标颜色="rgb(255,0,0)"> <容差=60>```
* ```fx 晃动 <最大偏移量=5> <运动模糊=False>```
* ```fx JPEG损坏 <质量=10>```
* 质量范围建议为 1~95数值越低压缩痕迹越重、效果越搞笑。
* ```fx 动图 <帧率=10>```
### 多图像处理器

View File

@ -0,0 +1,53 @@
**roll** - 面向跑团的文本骰子指令
## 用法
`roll 表达式`
支持常见骰子写法:
- `roll 3d6`
- `roll d20+5`
- `roll 2d8+1d4+3`
- `roll d%`
- `roll 4dF`
## 说明
- `NdM` 表示掷 N 个 M 面骰,例如 `3d6`
- `d20` 等价于 `1d20`
- `d%` 表示百分骰,范围 1 到 100
- `dF` 表示 Fate/Fudge 骰,单骰结果为 -1、0、+1
- 支持用 `+`、`-` 连接多个项,也支持常数修正
## 返回格式
会返回总结果,以及每一项的明细。
例如:
- `roll 3d6`
可能返回:
- `3d6 = 11`
- `+3d6=[2, 4, 5]`
- `roll d20+5`
可能返回:
- `d20+5 = 19`
- `+1d20=[14] +5=5`
## 限制
为防止刷屏和滥用,当前实现会限制:
- 单项最多 100 个骰子
- 单个骰子最多 1000 面
- 一次表达式最多 20 项
- 一次表达式最多实际掷 200 个骰子
- 结果过长时会直接拒绝
## 权限
需要 `trpg.roll` 权限。
默认启动时会给系统全局授予允许,因此通常所有人都能用;如有需要可再用权限系统单独关闭。

View File

@ -1,12 +1,14 @@
import re
from nonebot import get_plugin_config, on_message
from nonebot.rule import Rule
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
from nonebot.adapters.onebot.v11.event import GroupMessageEvent as OB11GroupEvent
from pydantic import BaseModel
from konabot.common.permsys import require_permission
class Config(BaseModel):
bilifetch_enabled_groups: list[int] = []
@ -19,11 +21,7 @@ pattern = (
)
def _rule(msg: UniMsg, evt: Event) -> bool:
if isinstance(evt, OB11GroupEvent):
if evt.group_id not in config.bilifetch_enabled_groups:
return False
def _rule(msg: UniMsg) -> bool:
to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
@ -31,11 +29,11 @@ def _rule(msg: UniMsg, evt: Event) -> bool:
return True
matcher_fix = on_message(rule=_rule)
matcher_fix = on_message(rule=Rule(_rule) & require_permission("bilifetch"))
@matcher_fix.handle()
async def _(event: Event):
from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event)

View File

@ -1,4 +1,5 @@
import random
from io import BytesIO
from PIL import Image, ImageFilter, ImageDraw, ImageStat, ImageFont
from PIL import ImageEnhance
from PIL import ImageChops
@ -167,26 +168,53 @@ class ImageFilterImplement:
return Image.fromarray(result, 'RGBA')
# JPEG 损坏感压缩
@staticmethod
def apply_jpeg_damage(image: Image.Image, quality: int = 10) -> Image.Image:
quality = max(1, min(95, int(quality)))
alpha = None
if image.mode in ('RGBA', 'LA') or (image.mode == 'P' and 'transparency' in image.info):
rgba_image = image.convert('RGBA')
alpha = rgba_image.getchannel('A')
rgb_image = Image.new('RGB', rgba_image.size, (255, 255, 255))
rgb_image.paste(rgba_image, mask=alpha)
else:
rgb_image = image.convert('RGB')
output = BytesIO()
rgb_image.save(output, format='JPEG', quality=quality, optimize=False)
output.seek(0)
damaged = Image.open(output).convert('RGB')
if alpha is not None:
return Image.merge('RGBA', (*damaged.split(), alpha))
return damaged.convert('RGBA')
# 缩放
@staticmethod
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y = None) -> Image.Image:
# scale 可以为负
# 如果 scale 为负,则代表翻转
if scale_y is not None:
if float(scale_y) < 0:
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y: float = None) -> Image.Image:
scale_x = float(scale)
scale_y_value = float(scale_y) if scale_y is not None else None
if scale_y_value is not None:
if scale_y_value < 0:
image = ImageOps.flip(image)
scale_y = abs(float(scale_y))
if scale < 0:
scale_y_value = abs(scale_y_value)
if scale_x < 0:
image = ImageOps.mirror(image)
scale = abs(scale)
new_size = (int(image.width * scale), int(image.height * float(scale_y)))
return image.resize(new_size, Image.Resampling.LANCZOS)
if scale < 0:
image = ImageOps.mirror(image)
image = ImageOps.flip(image)
scale = abs(scale)
new_size = (int(image.width * scale), int(image.height * scale))
return image.resize(new_size, Image.Resampling.LANCZOS)
scale_x = abs(scale_x)
target_scale_y = scale_y_value
else:
if scale_x < 0:
image = ImageOps.mirror(image)
image = ImageOps.flip(image)
scale_x = abs(scale_x)
target_scale_y = scale_x
new_width = max(1, round(image.width * scale_x))
new_height = max(1, round(image.height * target_scale_y))
return image.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 波纹
@staticmethod

View File

@ -50,6 +50,7 @@ class ImageFilterManager:
"描边": ImageFilterImplement.apply_stroke,
"形状描边": ImageFilterImplement.apply_shape_stroke,
"半调": ImageFilterImplement.apply_halftone,
"JPEG损坏": ImageFilterImplement.apply_jpeg_damage,
"设置通道": ImageFilterImplement.apply_set_channel,
"设置遮罩": ImageFilterImplement.apply_set_mask,
# 图像处理

View File

@ -70,7 +70,7 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
await target.send_message(res)
return
env = TextHandlerEnvironment(is_trusted=False)
env = TextHandlerEnvironment(is_trusted=False, event=evt)
results = await runner.run_pipeline(res, istream or None, env)
# 检查是否有错误

View File

@ -7,11 +7,13 @@ from string import whitespace
from typing import cast
from loguru import logger
from nonebot.adapters import Event
@dataclass
class TextHandlerEnvironment:
is_trusted: bool
event: Event | None = None
buffers: dict[str, str] = field(default_factory=dict)
@ -287,7 +289,7 @@ class PipelineRunner:
env: TextHandlerEnvironment | None = None,
) -> list[TextHandleResult]:
if env is None:
env = TextHandlerEnvironment(is_trusted=False, buffers={})
env = TextHandlerEnvironment(is_trusted=False, event=None, buffers={})
results: list[TextHandleResult] = []

View File

@ -1,36 +1,53 @@
from typing import Any, cast
from konabot.common.llm import get_llm
from konabot.plugins.handle_text.base import TextHandler, TextHandlerEnvironment, TextHandleResult
from konabot.common.permsys import perm_manager
from konabot.plugins.handle_text.base import (
TextHandler,
TextHandlerEnvironment,
TextHandleResult,
)
class THQwen(TextHandler):
name = "qwen"
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
pm = perm_manager()
if env.event is None or not await pm.check_has_permission(
env.event, "textfx.qwen"
):
return TextHandleResult(
code=1,
ostream="你或当前环境没有使用 qwen 的权限。如有疑问请联系管理员",
)
llm = get_llm()
messages = []
if istream is not None:
messages.append({
"role": "user",
"content": istream
})
messages.append({"role": "user", "content": istream})
if len(args) > 0:
message = ' '.join(args)
messages.append({
"role": "user",
"content": message,
})
message = " ".join(args)
messages.append(
{
"role": "user",
"content": message,
}
)
if len(messages) == 0:
return TextHandleResult(
code=1,
ostream="使用方法qwen <提示词>",
)
messages = [{
"role": "system",
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答"
}] + messages
messages = [
{
"role": "system",
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答",
}
] + messages
result = await llm.chat(cast(Any, messages))
content = result.content
if content is None:

View File

@ -6,29 +6,34 @@ from loguru import logger
from nonebot import on_message
import nonebot
from nonebot.rule import to_me
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
on_alconna)
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
from nonebot_plugin_apscheduler import scheduler
from konabot.common import username
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.message import (
get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message,
)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE,
create_admin_commands,
puzzle_manager)
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
from konabot.plugins.poster.service import broadcast
from konabot.plugins.kona_ph.manager import (
PUZZLE_PAGE_SIZE,
create_admin_commands,
puzzle_manager,
)
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
create_admin_commands()
register_poster_info("每日谜题", info=PosterInfo(
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
description="此方 BOT 每日谜题推送",
))
register_poster_info(
"每日谜题",
info=PosterInfo(
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
description="此方 BOT 每日谜题推送",
),
)
cmd_submit = on_message(rule=to_me())
@ -44,16 +49,22 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
if isinstance(result, str):
await target.send_message(result)
else:
await target.send_message(get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
))
await target.send_message(
get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
)
)
cmd_query = on_alconna(Alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
), rule=to_me())
cmd_query = on_alconna(
Alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
),
rule=to_me(),
)
@cmd_query.handle()
async def _(target: DepLongTaskTarget):
@ -64,9 +75,8 @@ async def _(target: DepLongTaskTarget):
await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna(
"今日答题情况"
), rule=to_me())
cmd_query_submission = on_alconna(Alconna("今日答题情况"), rule=to_me())
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
@ -77,11 +87,15 @@ async def _(target: DepLongTaskTarget):
await target.send_message(get_daily_report_v2(manager, gid))
cmd_history = on_alconna(Alconna(
"re:历史(题目|谜题)",
Args["page?", int],
Args["index_id?", str],
), rule=to_me())
cmd_history = on_alconna(
Alconna(
"re:历史(题目|谜题)",
Args["page?", int],
Args["index_id?", str],
),
rule=to_me(),
)
@cmd_history.handle()
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@ -105,10 +119,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
return await target.send_message(
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
for p, d in puzzles:
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
msg = msg.text(
@ -120,22 +134,26 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
await target.send_message(msg)
cmd_leadboard = on_alconna(Alconna(
"re:此方(解谜|谜题)排行榜",
Args["page?", int],
))
cmd_leadboard = on_alconna(
Alconna(
"re:此方(解谜|谜题)排行榜",
Args["page?", int],
)
)
@cmd_leadboard.handle()
async def _(target: DepLongTaskTarget, page: int = 1):
async with puzzle_manager() as manager:
result = manager.get_leadboard(PagerQuery(page, 10))
await target.send_message(result.to_unimessage(
title="此方解谜排行榜",
formatter=lambda data: (
f"{data[1]} 已完成 | "
f"{username.get_username(data[0])}"
await target.send_message(
result.to_unimessage(
title="此方解谜排行榜",
formatter=lambda data: (
f"{data[1]} 已完成 | {username.get_username(data[0])}"
),
)
))
)
@scheduler.scheduled_job("cron", hour="8")
@ -155,4 +173,3 @@ async def _():
driver = nonebot.get_driver()

View File

@ -1,50 +1,54 @@
import datetime
from math import ceil
from nonebot import get_plugin_config
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
Subcommand, SubcommandResult, UniMessage,
on_alconna)
from pydantic import BaseModel
from nonebot.adapters import Event
from nonebot_plugin_alconna import (
Alconna,
Args,
Image,
Option,
Query,
Subcommand,
SubcommandResult,
UniMessage,
on_alconna,
)
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.permsys import DepPermManager, require_permission
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
get_puzzle_info_message,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
get_today_date,
puzzle_manager)
from konabot.plugins.poster.service import broadcast
from konabot.plugins.kona_ph.core.message import (
get_puzzle_description,
get_puzzle_hint_list,
get_puzzle_info_message,
get_submission_message,
)
from konabot.plugins.kona_ph.core.storage import (
Puzzle,
PuzzleHint,
PuzzleManager,
get_today_date,
puzzle_manager,
)
from konabot.common.subscribe import broadcast
PUZZLE_PAGE_SIZE = 10
class PuzzleConfig(BaseModel):
plugin_puzzle_manager: list[str] = []
plugin_puzzle_admin: list[str] = []
plugin_puzzle_playgroup: list[str] = []
config = get_plugin_config(PuzzleConfig)
def is_puzzle_manager(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
async def check_puzzle(
manager: PuzzleManager,
perm: DepPermManager,
raw_id: str,
event: Event,
target: DepLongTaskTarget,
) -> Puzzle:
if raw_id not in manager.puzzle_data:
raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id]
if is_puzzle_admin(target):
if await perm.check_has_permission(event, "konaph.admin"):
return puzzle
if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
@ -60,7 +64,9 @@ def create_admin_commands():
Subcommand("unready", Args["raw_id", str], dest="unready"),
Subcommand("info", Args["raw_id", str], dest="info"),
Subcommand("my", Args["page?", int], dest="my"),
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
Subcommand(
"all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"
),
Subcommand("pin", Args["raw_id?", str], dest="pin"),
Subcommand("unpin", dest="unpin"),
Subcommand(
@ -115,11 +121,11 @@ def create_admin_commands():
dest="hint",
),
),
rule=is_puzzle_manager,
rule=require_permission("konaph.manager"),
)
@cmd_admin.assign("$main")
async def _(target: DepLongTaskTarget):
async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event):
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
msg = msg.text("konaph create - 创建一个新的谜题\n")
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
@ -132,7 +138,7 @@ def create_admin_commands():
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target):
if await pm.check_has_permission(event, "konaph.admin"):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
@ -145,48 +151,54 @@ def create_admin_commands():
async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager:
puzzle = manager.admin_create_puzzle(target.target_id)
await target.send_message(UniMessage.text(
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
f"- 输入 `konaph my` 查看你创建的谜题\n"
f"- 输入 `konaph modify` 查看更改谜题的方法"
))
await target.send_message(
UniMessage.text(
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
f"- 输入 `konaph my` 查看你创建的谜题\n"
f"- 输入 `konaph modify` 查看更改谜题的方法"
)
)
@cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget):
async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if p.ready:
return await target.send_message(UniMessage.text(
"题目早就准备好啦!"
))
return await target.send_message(UniMessage.text("题目早就准备好啦!"))
p.ready = True
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经准备就绪!"
))
await target.send_message(
UniMessage.text(f"谜题「{p.title}」已经准备就绪!")
)
@cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget):
async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if not p.ready:
return await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经是未取消状态了!"
))
return await target.send_message(
UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!")
)
if manager.is_puzzle_published(p.raw_id):
return await target.send_message(UniMessage.text(
"已发布的谜题不能取消准备状态!"
))
return await target.send_message(
UniMessage.text("已发布的谜题不能取消准备状态!")
)
p.ready = False
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经取消准备!"
))
await target.send_message(
UniMessage.text(f"谜题「{p.title}」已经取消准备!")
)
@cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget):
async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my")
@ -194,15 +206,15 @@ def create_admin_commands():
async with puzzle_manager() as manager:
puzzles = manager.get_puzzles_of_user(target.target_id)
if len(puzzles) == 0:
return await target.send_message(UniMessage.text(
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
))
return await target.send_message(
UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!")
)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
return await target.send_message(
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 我的谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
@ -220,11 +232,15 @@ def create_admin_commands():
await target.send_message(message)
@cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async def _(
target: DepLongTaskTarget,
event: Event,
perm: DepPermManager,
ready: Query[bool] = Query("all.ready"),
page: int = 1,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()]
if ready.available:
@ -232,10 +248,10 @@ def create_admin_commands():
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
return await target.send_message(
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 所有谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
@ -253,32 +269,30 @@ def create_admin_commands():
await target.send_message(message)
@cmd_admin.assign("pin")
async def _(target: DepLongTaskTarget, raw_id: str = ""):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = ""
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager:
if raw_id == "":
if manager.puzzle_pinned:
return await target.send_message(UniMessage.text(
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
))
return await target.send_message(
UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}")
)
return await target.send_message("没有置顶谜题")
if raw_id not in manager.unpublished_puzzles:
return await target.send_message(UniMessage.text(
"这个谜题已经发布了,或者还没准备好,或者不存在"
))
return await target.send_message(
UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在")
)
manager.admin_pin_puzzle(raw_id)
return await target.send_message(f"已置顶谜题 {raw_id}")
@cmd_admin.assign("unpin")
async def _(target: DepLongTaskTarget):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager:
manager.admin_pin_puzzle("")
return await target.send_message("已取消所有置顶")
@ -286,6 +300,8 @@ def create_admin_commands():
@cmd_admin.assign("modify")
async def _(
target: DepLongTaskTarget,
event: Event,
perm: DepPermManager,
raw_id: str = "",
title: str | None = None,
description: str | None = None,
@ -306,7 +322,7 @@ def create_admin_commands():
image_manager = get_image_manager()
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if title is not None:
p.title = title
if description is not None:
@ -329,11 +345,14 @@ def create_admin_commands():
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
@cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async def _(
target: DepLongTaskTarget,
event: Event,
perm: DepPermManager,
raw_id: str | None = None,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
today = get_today_date()
async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date:
@ -348,46 +367,64 @@ def create_admin_commands():
return await target.send_message("Ok!")
@cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str):
async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
return await target.send_message(get_puzzle_description(p))
@cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str):
async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None:
return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
if (
not perm.check_has_permission(event, "konaph.admin")
and target.target_id != puzzle.author_id
):
return await target.send_message("你没有权限预览这个谜题")
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
submits = manager.submissions.get(raw_id, {})
for uid, ls in submits.items():
s = ', '.join((i.flag for i in ls))
s = ", ".join((i.flag for i in ls))
msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg)
@cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
async def _(
target: DepLongTaskTarget,
raw_id: str,
submission: str,
event: Event,
perm: DepPermManager,
):
"""
测试一道谜题的回答,并给出结果
"""
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
result = p.check_submission(submission)
msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
async def _(
target: DepLongTaskTarget,
subcommands: Query[SubcommandResult] = Query("subcommands.hint"),
):
if len(subcommands.result.subcommands) > 0:
return
return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
.text(
"- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n"
)
.text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n")
@ -402,9 +439,11 @@ def create_admin_commands():
raw_id: str,
pattern: str,
message: str,
event: Event,
perm: DepPermManager,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern,
message=message,
@ -416,9 +455,11 @@ def create_admin_commands():
async def _(
target: DepLongTaskTarget,
raw_id: str,
event: Event,
perm: DepPermManager,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify")
@ -426,12 +467,14 @@ def create_admin_commands():
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
event: Event,
perm: DepPermManager,
pattern: str | None = None,
message: str | None = None,
is_checkpoint: bool | None = None,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -450,9 +493,11 @@ def create_admin_commands():
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
event: Event,
perm: DepPermManager,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -460,5 +505,4 @@ def create_admin_commands():
del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin

View File

@ -1,57 +0,0 @@
import asyncio
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import UniMessage
from konabot.common.nb.is_admin import is_admin
from mcstatus.responses import JavaStatusResponse
cmd = on_command("宾几人", aliases=set(("宾人数", "mcbingo")), rule=is_admin)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join((
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
))
await UniMessage.text(messages).finish(evt, at_sender=False)

View File

@ -0,0 +1,131 @@
import asyncio
import datetime
from typing import Literal
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from mcstatus.responses import JavaStatusResponse
from nonebot_plugin_apscheduler import scheduler
from konabot.common.permsys import DepPermManager, require_permission
from konabot.plugins.minecraft_servers.simpfun_server import SimpfunServer
cmd = on_command(
"宾几人",
aliases=set(("宾人数", "mcbingo")),
rule=require_permission("minecraft.bingo.check"),
)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event, pm: DepPermManager):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join(
(
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
)
)
if await pm.check_has_permission(evt, "minecraft.bingo.manipulate"):
messages += "\n\n---\n\n你可以使用 bingoman start 开启小帕的 bingo 服,用 bingoman stop 关闭小帕的 bingo 服"
await UniMessage.text(messages).finish(evt, at_sender=False)
cmd_bingo_manipulate = on_alconna(
Alconna("bingoman", Args["action", str]),
aliases=("宾服务器", "bingo服"),
rule=require_permission("minecraft.bingo.manipulate"),
)
actions: dict[str, Literal["start", "stop", "restart", "kill"]] = {
"up": "start",
"down": "stop",
"start": "start",
"stop": "stop",
"开机": "start",
"关机": "stop",
"restart": "restart",
"kill": "kill",
"重启": "restart",
}
@cmd_bingo_manipulate.handle()
async def _(action: str, event: Event):
server = SimpfunServer.new() # 使用默认配置管理服务器
a = actions.get(action.lower().strip())
if a is None:
await UniMessage.text(f"操作 {action} 不存在").send(event, at_sender=True)
return
resp = await server.power(a)
if resp.code == 200:
await UniMessage.text("好了").send(event, at_sender=True)
else:
await UniMessage.text(f"不好:{resp}").send(event, at_sender=True)
@scheduler.scheduled_job("cron", hour="4,23")
async def _():
server = SimpfunServer.new()
today = datetime.datetime.now()
# 获取服务器当前状态,重试多次以保证不会误判服务器未开启
server_up = False
server_players = 0
for _ in range(3):
mcs = mcstatus.JavaServer("play.simpfun.cn", 11495)
try:
resp = await mcs.async_status()
server_up = True
server_players = resp.players.online
except Exception:
pass
if today.weekday() == 5 and today.hour < 12:
# 每周六开机一天,保证可以让服务器不被自动销毁
if not server_up:
await server.power("start")
else:
# 每用一个自然日都会计费,所以要赶在这一天结束之前关服
# 平时如果没人,也自动关上
if server_up and server_players == 0:
await server.power("stop")

View File

@ -0,0 +1,90 @@
from dataclasses import dataclass
import datetime
from typing import Literal
import aiohttp
from pydantic import BaseModel
class SimpfunServerConfig(BaseModel):
plugin_simpfun_api_key: str = ""
plugin_simpfun_base_url: str = "https://api.simpfun.cn"
plugin_simpfun_instance_id: int = 0
def get_config():
from nonebot import get_plugin_config
return get_plugin_config(SimpfunServerConfig)
class PowerManageResult(BaseModel):
code: int
status: bool
msg: str
class SimpfunServerDetailUtilization(BaseModel):
memory_bytes: int
cpu_absolute: float
disk_bytes: int
network_rx_bytes: int
network_tx_bytes: int
uptime: float
disk_last_check_time: datetime.datetime
class SimpfunServerDetailData(BaseModel):
id: int
name: str
is_pro: bool
status: str
"运行中的话,是 running"
is_suspended: bool
utilization: SimpfunServerDetailUtilization
class SimpfunServerDetailResp(BaseModel):
code: int
data: SimpfunServerDetailData
@dataclass
class SimpfunServer:
instance_id: int
api_key: str
base_url: str
async def power(
self, action: Literal["start", "stop", "restart", "kill"]
) -> PowerManageResult:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url, params={"action": action}) as resp:
resp.raise_for_status()
return PowerManageResult.model_validate_json(await resp.read())
async def detail(self) -> SimpfunServerDetailResp:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url) as resp:
resp.raise_for_status()
return SimpfunServerDetailResp.model_validate_json(await resp.read())
@staticmethod
def new(config: SimpfunServerConfig | None = None):
if config is None:
config = get_config()
return SimpfunServer(
instance_id=config.plugin_simpfun_instance_id,
api_key=config.plugin_simpfun_api_key,
base_url=config.plugin_simpfun_base_url,
)

View File

@ -6,6 +6,7 @@ from konabot.common.nb.match_keyword import match_keyword
evt_nya = on_message(rule=match_keyword(""))
@evt_nya.handle()
async def _():
await evt_nya.send(await UniMessage().text("").export())
@ -25,8 +26,9 @@ NYA_SYMBOL_MAPPING = {
"~": "~",
"": "",
" ": " ",
"\n": "\n",
}
NYA_SYMBOL_KEEP = "—¹₁²₂³₃⁴₄⁵₅⁶₆⁷₇⁸₈⁹₉⁰₀\n"
NYA_SYMBOL_MAPPING.update((k, k) for k in NYA_SYMBOL_KEEP)
async def has_nya(msg: UniMsg) -> bool:
@ -49,10 +51,10 @@ async def has_nya(msg: UniMsg) -> bool:
evt_nya_v2 = on_message(rule=has_nya)
@evt_nya_v2.handle()
async def _(msg: UniMsg, evt: Event):
text = msg.extract_plain_text()
await UniMessage.text(''.join(
(NYA_SYMBOL_MAPPING.get(c, '') for c in text)
)).send(evt)
await UniMessage.text("".join((NYA_SYMBOL_MAPPING.get(c, "") for c in text))).send(
evt
)

View File

@ -3,14 +3,15 @@ from nonebot_plugin_alconna import Alconna, Args, on_alconna
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.service import dep_poster_service
from konabot.common.subscribe import POSTER_INFO_DATA, dep_poster_service
cmd_subscribe = on_alconna(Alconna(
"订阅",
Args["channel", str],
))
cmd_subscribe = on_alconna(
Alconna(
"订阅",
Args["channel", str],
)
)
@cmd_subscribe.handle()
@ -23,10 +24,12 @@ async def _(target: DepLongTaskTarget, channel: str):
await target.send_message(f"已经订阅过「{channel}」了")
cmd_list = on_alconna(Alconna(
"re:(?:查询|我的|获取)订阅(列表)?",
Args["page?", int],
))
cmd_list = on_alconna(
Alconna(
"re:(?:查询|我的|获取)订阅(列表)?",
Args["page?", int],
)
)
def better_channel_message(channel_id: str) -> str:
@ -39,17 +42,24 @@ def better_channel_message(channel_id: str) -> str:
@cmd_list.handle()
async def _(target: DepLongTaskTarget, page: int = 1):
async with dep_poster_service() as service:
result = await service.get_channels(target, PagerQuery(
page_index=page,
page_size=10,
))
await target.send_message(result.to_unimessage(title="订阅列表", formatter=better_channel_message))
result = await service.get_channels(
target,
PagerQuery(
page_index=page,
page_size=10,
),
)
await target.send_message(
result.to_unimessage(title="订阅列表", formatter=better_channel_message)
)
cmd_list_available = on_alconna(Alconna(
"re:(查询)?可用订阅(列表)?",
Args["page?", int],
))
cmd_list_available = on_alconna(
Alconna(
"re:(查询)?可用订阅(列表)?",
Args["page?", int],
)
)
@cmd_list_available.handle()
@ -58,13 +68,17 @@ async def _(target: DepLongTaskTarget, page: int = 1):
page_index=page,
page_size=10,
).apply(sorted(POSTER_INFO_DATA.keys()))
await target.send_message(result.to_unimessage(title="可用订阅列表", formatter=better_channel_message))
await target.send_message(
result.to_unimessage(title="可用订阅列表", formatter=better_channel_message)
)
cmd_unsubscribe = on_alconna(Alconna(
"取消订阅",
Args["channel", str],
))
cmd_unsubscribe = on_alconna(
Alconna(
"取消订阅",
Args["channel", str],
)
)
@cmd_unsubscribe.handle()
@ -79,6 +93,7 @@ async def _(target: DepLongTaskTarget, channel: str):
driver = nonebot.get_driver()
@driver.on_startup
async def _():
async with dep_poster_service() as service:

View File

@ -4,8 +4,7 @@ from nonebot.internal.adapter.event import Event
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_apscheduler import scheduler
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
from konabot.plugins.poster.service import broadcast
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
register_poster_info(
"二十四节气",
@ -98,4 +97,3 @@ async def _(event: Event):
msg = UniMessage.text(f"现在的节气是{date.term}")
await msg.send(event)

View File

@ -1,20 +1,23 @@
import asyncio
from nonebot import get_driver
from nonebot_plugin_alconna import UniMessage
from konabot.plugins.poster.poster_info import register_poster_info, PosterInfo
from konabot.plugins.poster.service import broadcast
from konabot.common.subscribe import register_poster_info, PosterInfo, broadcast
CHANNEL_STARTUP = "启动通知"
register_poster_info(CHANNEL_STARTUP, PosterInfo(
aliases=set(),
description="当 Bot 重启时告知",
))
register_poster_info(
CHANNEL_STARTUP,
PosterInfo(
aliases=set(),
description="当 Bot 重启时告知",
),
)
driver = get_driver()
@driver.on_startup
async def _():
# 要尽量保证接受讯息的服务存在
@ -30,4 +33,3 @@ async def _():
await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了"))
asyncio.create_task(task())

View File

@ -0,0 +1,210 @@
import copy
import re
from pathlib import Path
import nonebot
from nonebot import on_command
from nonebot.adapters import Bot, Event, Message
from nonebot.log import logger
from nonebot.message import handle_event
from nonebot.params import CommandArg
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget
ROOT_PATH = Path(__file__).resolve().parent
cmd = on_command(cmd="语法糖", aliases={"", "sugar"}, block=True)
db_manager = DatabaseManager()
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
await init_db()
@driver.on_shutdown
async def register_shutdown_hook():
await db_manager.close_all_connections()
async def init_db():
await db_manager.execute_by_sql_file(ROOT_PATH / "sql" / "create_table.sql")
table_info = await db_manager.query("PRAGMA table_info(syntactic_sugar)")
columns = {str(row.get("name")) for row in table_info}
if "channel_id" not in columns:
await db_manager.execute(
"ALTER TABLE syntactic_sugar ADD COLUMN channel_id VARCHAR(255) NOT NULL DEFAULT ''"
)
await db_manager.execute("DROP INDEX IF EXISTS idx_syntactic_sugar_name_belong_to")
await db_manager.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target "
"ON syntactic_sugar(name, channel_id, belong_to)"
)
def _extract_reply_plain_text(evt: Event) -> str:
reply = getattr(evt, "reply", None)
if reply is None:
return ""
reply_message = getattr(reply, "message", None)
if reply_message is None:
return ""
extract_plain_text = getattr(reply_message, "extract_plain_text", None)
if callable(extract_plain_text):
return extract_plain_text().strip()
return str(reply_message).strip()
def _split_variables(tokens: list[str]) -> tuple[list[str], dict[str, str]]:
positional: list[str] = []
named: dict[str, str] = {}
for token in tokens:
if "=" in token:
key, value = token.split("=", 1)
key = key.strip()
if key:
named[key] = value
continue
positional.append(token)
return positional, named
def _render_template(content: str, positional: list[str], named: dict[str, str]) -> str:
def replace(match: re.Match[str]) -> str:
key = match.group(1).strip()
if key.isdigit():
idx = int(key) - 1
if 0 <= idx < len(positional):
return positional[idx]
return match.group(0)
return named.get(key, match.group(0))
return re.sub(r"\{([^{}]+)\}", replace, content)
async def _store_sugar(name: str, content: str, belong_to: str, channel_id: str):
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_sugar.sql",
(name, content, belong_to, channel_id),
)
async def _delete_sugar(name: str, belong_to: str, channel_id: str):
await db_manager.execute(
"DELETE FROM syntactic_sugar WHERE name = ? AND belong_to = ? AND channel_id = ?",
(name, belong_to, channel_id),
)
async def _find_sugar(name: str, belong_to: str, channel_id: str) -> str | None:
rows = await db_manager.query(
(
"SELECT content FROM syntactic_sugar "
"WHERE name = ? AND channel_id = ? "
"ORDER BY CASE WHEN belong_to = ? THEN 0 ELSE 1 END, id ASC "
"LIMIT 1"
),
(name, channel_id, belong_to),
)
if not rows:
return None
return rows[0].get("content")
async def _reinject_command(bot: Bot, evt: Event, command_text: str) -> bool:
depth = int(getattr(evt, "_syntactic_sugar_depth", 0))
if depth >= 3:
return False
try:
cloned_evt = copy.deepcopy(evt)
except Exception:
logger.exception("语法糖克隆事件失败")
return False
message = getattr(cloned_evt, "message", None)
if message is None:
return False
try:
msg_obj = type(message)(command_text)
except Exception:
msg_obj = command_text
setattr(cloned_evt, "message", msg_obj)
if hasattr(cloned_evt, "original_message"):
setattr(cloned_evt, "original_message", msg_obj)
if hasattr(cloned_evt, "raw_message"):
setattr(cloned_evt, "raw_message", command_text)
setattr(cloned_evt, "_syntactic_sugar_depth", depth + 1)
try:
await handle_event(bot, cloned_evt)
except Exception:
logger.exception("语法糖回注事件失败")
return False
return True
@cmd.handle()
async def _(bot: Bot, evt: Event, target: DepLongTaskTarget, args: Message = CommandArg()):
raw = args.extract_plain_text().strip()
if not raw:
return
tokens = raw.split()
action = tokens[0]
target_id = target.target_id
channel_id = target.channel_id
if action == "存入":
if len(tokens) < 2:
await cmd.finish("请提供要存入的名称")
name = tokens[1].strip()
content = " ".join(tokens[2:]).strip()
if not content:
content = _extract_reply_plain_text(evt)
if not content:
await cmd.finish("请提供要存入的内容")
await _store_sugar(name, content, target_id, channel_id)
await cmd.finish(f"糖已存入:「{name}」!")
if action == "删除":
if len(tokens) < 2:
await cmd.finish("请提供要删除的名称")
name = tokens[1].strip()
await _delete_sugar(name, target_id, channel_id)
await cmd.finish(f"已删除糖:「{name}」!")
if action == "查看":
if len(tokens) < 2:
await cmd.finish("请提供要查看的名称")
name = tokens[1].strip()
content = await _find_sugar(name, target_id, channel_id)
if content is None:
await cmd.finish(f"没有糖:「{name}")
await cmd.finish(f"糖的内容:「{content}")
name = action
content = await _find_sugar(name, target_id, channel_id)
if content is None:
await cmd.finish(f"没有糖:「{name}")
positional, named = _split_variables(tokens[1:])
rendered = _render_template(content, positional, named)
ok = await _reinject_command(bot, evt, rendered)
if not ok:
await cmd.finish(f"糖的展开结果:「{rendered}")

View File

@ -0,0 +1,12 @@
-- 创建语法糖表
CREATE TABLE IF NOT EXISTS syntactic_sugar (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
belong_to VARCHAR(255) NOT NULL,
channel_id VARCHAR(255) NOT NULL DEFAULT ''
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target
ON syntactic_sugar(name, channel_id, belong_to);

View File

@ -0,0 +1,5 @@
-- 插入语法糖,如果同一用户下名称已存在则更新内容
INSERT INTO syntactic_sugar (name, content, belong_to, channel_id)
VALUES (?, ?, ?, ?)
ON CONFLICT(name, channel_id, belong_to) DO UPDATE SET
content = excluded.content;

View File

@ -0,0 +1,35 @@
import re
import nonebot
from nonebot.adapters import Event
from nonebot_plugin_alconna import UniMessage, UniMsg
from konabot.common.nb import match_keyword
from konabot.common.permsys import register_default_allow_permission, require_permission
from konabot.plugins.trpg_roll.core import RollError, roll_expression
PERMISSION_KEY = "trpg.roll"
register_default_allow_permission(PERMISSION_KEY)
matcher = nonebot.on_message(
rule=match_keyword.match_keyword(re.compile(r"^roll(?:\s+.+)?$", re.I))
& require_permission(PERMISSION_KEY),
)
@matcher.handle()
async def _(event: Event, msg: UniMsg):
text = msg.extract_plain_text().strip()
expr = text[4:].strip()
if not expr:
await UniMessage.text("用法roll 3d6 / roll d20+5 / roll 2d8+1d4+3 / roll 4dF").send(event)
return
try:
result = roll_expression(expr)
await UniMessage.text(result.format()).send(event)
except RollError as e:
await UniMessage.text(str(e)).send(event)

View File

@ -0,0 +1,143 @@
import random
import re
from dataclasses import dataclass
MAX_DICE_COUNT = 100
MAX_DICE_FACES = 1000
MAX_TERM_COUNT = 20
MAX_TOTAL_ROLLS = 200
MAX_EXPRESSION_LENGTH = 200
MAX_MESSAGE_LENGTH = 1200
_TOKEN_RE = re.compile(r"([+-]?)(\d*d(?:%|[fF]|\d+)|\d+)")
_DICE_RE = re.compile(r"(?i)(\d*)d(%|f|\d+)")
# 常见跑团表达式示例3d6、d20+5、2d8+1d4+3、d%、4dF
class RollError(ValueError):
pass
@dataclass(slots=True)
class RollTermResult:
sign: int
source: str
detail: str
value: int
@dataclass(slots=True)
class RollResult:
expression: str
total: int
terms: list[RollTermResult]
def format(self) -> str:
parts = [f"{term.source}={term.detail}" for term in self.terms]
detail = " ".join(parts)
text = f"{self.expression} = {self.total}"
if detail:
text += f"\n{detail}"
if len(text) > MAX_MESSAGE_LENGTH:
raise RollError("结果过长,请减少骰子数量或简化表达式")
return text
def _parse_single_term(raw: str, sign: int, rng: random.Random) -> RollTermResult:
dice_match = _DICE_RE.fullmatch(raw)
if dice_match:
count_text, faces_text = dice_match.groups()
count = int(count_text) if count_text else 1
if count <= 0:
raise RollError("骰子个数必须大于 0")
if count > MAX_DICE_COUNT:
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
if faces_text == "%":
faces = 100
rolls = [rng.randint(1, 100) for _ in range(count)]
elif faces_text.lower() == "f":
rolls = [rng.choice((-1, 0, 1)) for _ in range(count)]
total = sum(rolls) * sign
signed = "+" if sign > 0 else "-"
return RollTermResult(
sign=sign,
source=f"{signed}{count}dF",
detail="[" + ", ".join(f"{v:+d}" for v in rolls) + "]",
value=total,
)
else:
faces = int(faces_text)
if faces <= 0:
raise RollError("骰子面数必须大于 0")
if faces > MAX_DICE_FACES:
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
rolls = [rng.randint(1, faces) for _ in range(count)]
total = sum(rolls) * sign
signed = "+" if sign > 0 else "-"
return RollTermResult(
sign=sign,
source=f"{signed}{count}d{faces_text.upper()}",
detail="[" + ", ".join(map(str, rolls)) + "]",
value=total,
)
value = int(raw) * sign
signed = "+" if sign > 0 else "-"
return RollTermResult(sign=sign, source=f"{signed}{raw}", detail=str(abs(value)), value=value)
def roll_expression(expr: str, rng: random.Random | None = None) -> RollResult:
expr = expr.strip().replace(" ", "")
if not expr:
raise RollError("请提供要掷的表达式,例如 roll 3d6 或 roll d20+5")
if len(expr) > MAX_EXPRESSION_LENGTH:
raise RollError("表达式太长了")
matches = list(_TOKEN_RE.finditer(expr))
if not matches:
raise RollError("无法解析表达式,请使用如 3d6、d20+5、2d8+1d4+3、4dF 这样的格式")
rebuilt = "".join(m.group(0) for m in matches)
if rebuilt != expr:
raise RollError("表达式中含有无法识别的内容")
if len(matches) > MAX_TERM_COUNT:
raise RollError(f"表达式项数不能超过 {MAX_TERM_COUNT}")
total_rolls = 0
for m in matches:
token = m.group(2)
dice_match = _DICE_RE.fullmatch(token)
if dice_match:
count_text, faces_text = dice_match.groups()
count = int(count_text) if count_text else 1
if count <= 0:
raise RollError("骰子个数必须大于 0")
if count > MAX_DICE_COUNT:
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
if faces_text not in {"%", "f", "F"}:
faces = int(faces_text)
if faces <= 0:
raise RollError("骰子面数必须大于 0")
if faces > MAX_DICE_FACES:
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
total_rolls += count
if total_rolls > MAX_TOTAL_ROLLS:
raise RollError(f"一次最多只能实际掷 {MAX_TOTAL_ROLLS} 个骰子")
rng = rng or random.Random()
terms: list[RollTermResult] = []
total = 0
for idx, match in enumerate(matches):
sign_text, raw = match.groups()
sign = -1 if sign_text == "-" else 1
if idx == 0 and sign_text == "":
sign = 1
term = _parse_single_term(raw, sign, rng)
terms.append(term)
total += term.value
return RollResult(expression=expr, total=total, terms=terms)

View File

@ -0,0 +1,115 @@
import datetime
from nonebot_plugin_alconna import UniMessage
from konabot.common.apis.wolfx import CencEewReport, CencEqReport, wolfx_api
from konabot.common.subscribe import PosterInfo, broadcast, register_poster_info
provinces_short = [
"北京",
"天津",
"河北",
"山西",
"内蒙古",
"辽宁",
"吉林",
"黑龙江",
"上海",
"江苏",
"浙江",
"安徽",
"福建",
"江西",
"山东",
"河南",
"湖北",
"湖南",
"广东",
"广西",
"海南",
"重庆",
"四川",
"贵州",
"云南",
"西藏",
"陕西",
"甘肃",
"青海",
"宁夏",
"新疆",
"香港",
"澳门",
"台湾",
]
register_poster_info(
"中国地震台网地震速报",
PosterInfo(
aliases={
"地震速报",
"地震预警",
},
description="来自中国地震台网的地震速报",
),
)
CENC_EEW_DISABLED = True
if not CENC_EEW_DISABLED:
@wolfx_api.cenc_eew.append
async def broadcast_eew(report: CencEewReport):
# 这个好像没那么准确...
is_cn = any(report.HypoCenter.startswith(prefix) for prefix in provinces_short)
if (is_cn and report.Magnitude >= 4.2) or (
(not is_cn) and report.Magnitude >= 7.0
):
# 这是中国地震台网网站上,会默认展示的地震信息的等级
origin_time_dt = datetime.datetime.strptime(
report.OriginTime, "%Y-%m-%d %H:%M:%S"
)
origin_time_str = (
f"{origin_time_dt.month}"
f"{origin_time_dt.day}"
f"{origin_time_dt.hour}"
f"{origin_time_dt.minute}"
)
# vvv 下面这个其实不准确
eid_in_link = report.EventID.split(".")[0]
link = f"https://www.cenc.ac.cn/earthquake-manage-publish-web/product-list/{eid_in_link}/summarize"
msg = UniMessage.text(
"据中国地震台网中心 (https://www.cenc.ac.cn/) 报道,"
f"北京时间{origin_time_str}"
f"{report.HypoCenter}发生{report.Magnitude:.1f}级地震。"
f"震源位于 {report.Longitude}° {report.Latitude}°,深度 {report.Depth}km。\n\n"
f"详细信息请见 {link}"
)
await broadcast("中国地震台网地震速报", msg)
@wolfx_api.cenc_eqlist.append
async def broadcast_cenc_eqlist(report: CencEqReport):
is_cn = any(report.location.startswith(prefix) for prefix in provinces_short)
if (is_cn and float(report.magnitude) >= 4.2) or (
(not is_cn) and float(report.magnitude) >= 7.0
):
origin_time_dt = datetime.datetime.strptime(report.time, "%Y-%m-%d %H:%M:%S")
origin_time_str = (
f"{origin_time_dt.month}"
f"{origin_time_dt.day}"
f"{origin_time_dt.hour}"
f"{origin_time_dt.minute}"
)
msg = UniMessage.text(
"据中国地震台网中心 (https://www.cenc.ac.cn/) 消息,"
f"北京时间{origin_time_str}"
f"{report.location}发生{report.magnitude}级地震。"
f"震源位于 {report.longtitude}° {report.latitude}°,深度 {report.depth}km。\n\n"
f"数据来源于 Wolfx OpenAPI事件 ID: {report.EventID}"
)
await broadcast("中国地震台网地震速报", msg)

25
poetry.lock generated
View File

@ -4067,6 +4067,29 @@ type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "pytest-mock"
version = "3.15.1"
description = "Thin-wrapper around the mock package for easier use with pytest"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d"},
{file = "pytest_mock-3.15.1.tar.gz", hash = "sha256:1849a238f6f396da19762269de72cb1814ab44416fa73a8686deac10b0d87a0f"},
]
[package.dependencies]
pytest = ">=6.2.5"
[package.extras]
dev = ["pre-commit", "pytest-asyncio", "tox"]
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]]
name = "python-dotenv"
version = "1.2.2"
@ -5311,4 +5334,4 @@ reference = "mirrors"
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<4.0"
content-hash = "f2d5345d93636e19e49852af636d598c88394aaef4020f383402394b58452e3d"
content-hash = "23d2eadd1c36d017ff77934bd02b56e37d26005e6a99793b623c01162b90d67d"

View File

@ -37,6 +37,8 @@ dependencies = [
"pytest (>=8.0.0,<9.0.0)",
"nonebug (>=0.4.3,<0.5.0)",
"pytest-cov (>=7.0.0,<8.0.0)",
"aiosignal (>=1.4.0,<2.0.0)",
"pytest-mock (>=3.15.1,<4.0.0)",
]
[tool.poetry]

View File

@ -12,8 +12,22 @@ def filter(change: Change, path: str) -> bool:
return False
if Path(path).absolute().is_relative_to((base / ".git").absolute()):
return False
if Path(path).absolute().is_relative_to((base / "assets" / "oracle" / "image").absolute()):
if (
Path(path)
.absolute()
.is_relative_to((base / "assets" / "oracle" / "image").absolute())
):
# 还要解决坏枪的这个问题
return False
if Path(path).absolute().is_relative_to((base / "htmlcov").absolute()):
return False
if Path(path).absolute().is_relative_to((base / "test").absolute()):
return False
if Path(path).absolute().is_relative_to((base / ".pytest_cache").absolute()):
return False
if Path(path).absolute().is_relative_to((base / ".ruff_cache").absolute()):
return False
if path.endswith(".coverage"):
return False
print(path)
return True

View File

@ -0,0 +1,78 @@
import json
from unittest.mock import AsyncMock
import pytest
from konabot.common.apis.wolfx import CencEewReport, WolfxAPIService, WolfxWebSocket
obj_example = {
"ID": "bacby4yab1oyb",
"EventID": "202603100805.0001",
"ReportTime": "2026-03-10 08:05:29",
"ReportNum": 1,
"OriginTime": "2026-03-10 08:05:29",
"HypoCenter": "新疆昌吉州呼图壁县",
"Latitude": 43.687,
"Longitude": 86.427,
"Magnitude": 4.0,
"Depth": 14,
"MaxIntensity": 5,
}
@pytest.mark.asyncio
async def test_wolfx_websocket_handle():
ws = WolfxWebSocket("")
mock_callback = AsyncMock()
ws.signal.append(mock_callback)
ws.signal.freeze()
obj1 = {
"type": "heartbeat",
"ver": 18,
"id": "a69edf6436c5b605",
"timestamp": 1773111106701,
}
data1 = json.dumps(obj1).encode()
await ws.handle(data1)
mock_callback.assert_not_called()
mock_callback.reset_mock()
obj2 = obj_example
data2 = json.dumps(obj2).encode()
await ws.handle(data2)
mock_callback.assert_called_once_with(data2)
mock_callback.reset_mock()
data3 = b"what the f"
await ws.handle(data3)
mock_callback.assert_not_called()
@pytest.mark.asyncio
async def test_wolfx_bind_pydantic():
sv = WolfxAPIService()
called: list[CencEewReport] = []
@sv.cenc_eew.append
async def _(data: CencEewReport):
called.append(data)
sv._cenc_eew_ws.signal.freeze()
sv.cenc_eew.freeze()
data = json.dumps(obj_example).encode()
await sv._cenc_eew_ws.signal.send(data)
assert len(called) == 1
data = called[0]
assert data.HypoCenter == obj_example["HypoCenter"]
assert data.EventID == obj_example["EventID"]
# Don't panic when the object is invalid
data = json.dumps({"type": ""}).encode()
await sv._cenc_eew_ws.signal.send(data)
assert len(called) == 1

88
tests/test_fx_process.py Normal file
View File

@ -0,0 +1,88 @@
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
import nonebot
from PIL import Image
nonebot.init()
MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/fx_handle.py"
SPEC = spec_from_file_location("test_fx_handle_module", MODULE_PATH)
assert SPEC is not None and SPEC.loader is not None
fx_handle = module_from_spec(SPEC)
SPEC.loader.exec_module(fx_handle)
ImageFilterImplement = fx_handle.ImageFilterImplement
INIT_MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/__init__.py"
INIT_SPEC = spec_from_file_location("test_fx_init_module", INIT_MODULE_PATH)
assert INIT_SPEC is not None and INIT_SPEC.loader is not None
fx_init = module_from_spec(INIT_SPEC)
INIT_SPEC.loader.exec_module(fx_init)
prase_input_args = fx_init.prase_input_args
def test_apply_jpeg_damage_keeps_size_and_rgba_mode():
image = Image.new("RGBA", (32, 24), (255, 0, 0, 128))
result = ImageFilterImplement.apply_jpeg_damage(image, 5)
assert result.size == image.size
assert result.mode == "RGBA"
assert result.getchannel("A").getextrema() == (128, 128)
def test_apply_jpeg_damage_clamps_quality_range():
image = Image.new("RGB", (16, 16), (123, 222, 111))
low = ImageFilterImplement.apply_jpeg_damage(image, -10)
high = ImageFilterImplement.apply_jpeg_damage(image, 999)
assert low.size == image.size
assert high.size == image.size
assert low.mode == "RGBA"
assert high.mode == "RGBA"
def test_apply_resize_clamps_small_result_to_at_least_one_pixel():
image = Image.new("RGBA", (10, 10), (255, 0, 0, 255))
result = ImageFilterImplement.apply_resize(image, 0.01)
assert result.size == (1, 1)
def test_apply_resize_negative_x_with_positive_y_only_mirrors_horizontally():
image = Image.new("RGBA", (2, 1))
image.putpixel((0, 0), (255, 0, 0, 255))
image.putpixel((1, 0), (0, 0, 255, 255))
result = ImageFilterImplement.apply_resize(image, -1, 1)
assert result.size == (2, 1)
assert result.getpixel((0, 0)) == (0, 0, 255, 255)
assert result.getpixel((1, 0)) == (255, 0, 0, 255)
def test_apply_resize_negative_scale_without_y_flips_both_axes():
image = Image.new("RGBA", (2, 2))
image.putpixel((0, 0), (255, 0, 0, 255))
image.putpixel((1, 0), (0, 255, 0, 255))
image.putpixel((0, 1), (0, 0, 255, 255))
image.putpixel((1, 1), (255, 255, 0, 255))
result = ImageFilterImplement.apply_resize(image, -1)
assert result.size == (2, 2)
assert result.getpixel((0, 0)) == (255, 255, 0, 255)
assert result.getpixel((1, 0)) == (0, 0, 255, 255)
assert result.getpixel((0, 1)) == (0, 255, 0, 255)
assert result.getpixel((1, 1)) == (255, 0, 0, 255)
def test_prase_input_args_parses_resize_second_argument_as_float():
filters = prase_input_args("缩放 2 3")
assert len(filters) == 1
assert filters[0].name == "缩放"
assert filters[0].args == [2.0, 3.0]

View File

@ -0,0 +1,40 @@
from contextlib import asynccontextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from konabot.common.database import DatabaseManager
from konabot.common.permsys import PermManager, register_default_allow_permission
from konabot.common.permsys.entity import PermEntity
from konabot.common.permsys.migrates import execute_migration
@asynccontextmanager
async def tempdb():
with TemporaryDirectory() as _tempdir:
tempdir = Path(_tempdir)
db = DatabaseManager(tempdir / "perm.sqlite3")
yield db
await db.close_all_connections()
@pytest.mark.asyncio
async def test_register_default_allow_permission_records_key():
register_default_allow_permission("test.default.allow")
async with tempdb() as db:
async with db.get_conn() as conn:
await execute_migration(conn)
pm = PermManager(db)
await pm.update_permission(
PermEntity("sys", "global", "global"),
"test.default.allow",
True,
)
assert await pm.check_has_permission(
[PermEntity("dummy", "user", "1"), PermEntity("sys", "global", "global")],
"test.default.allow.sub",
)

66
tests/test_trpg_roll.py Normal file
View File

@ -0,0 +1,66 @@
import random
import pytest
from konabot.plugins.trpg_roll.core import RollError, roll_expression
class FakeRandom:
def __init__(self, randint_values: list[int] | None = None, choice_values: list[int] | None = None):
self._randint_values = list(randint_values or [])
self._choice_values = list(choice_values or [])
def randint(self, _a: int, _b: int) -> int:
assert self._randint_values
return self._randint_values.pop(0)
def choice(self, _seq):
assert self._choice_values
return self._choice_values.pop(0)
def test_roll_expression_basic():
rng = FakeRandom(randint_values=[2, 4, 5])
result = roll_expression("3d6", rng=rng)
assert result.total == 11
assert result.format() == "3d6 = 11\n+3d6=[2, 4, 5]"
def test_roll_expression_multiple_terms():
rng = FakeRandom(randint_values=[14, 3, 1])
result = roll_expression("d20+1d4-2", rng=rng)
assert result.total == 15
assert result.format() == "d20+1d4-2 = 15\n+1d20=[14] +1d4=[3] -2=2"
def test_roll_expression_df():
rng = FakeRandom(choice_values=[-1, 0, 1, 1])
result = roll_expression("4dF", rng=rng)
assert result.total == 1
assert result.format() == "4dF = 1\n+4dF=[-1, +0, +1, +1]"
@pytest.mark.parametrize(
("expr", "message"),
[
("", "请提供要掷的表达式"),
("abc", "无法解析表达式"),
("1d0", "骰子面数必须大于 0"),
("0d6", "骰子个数必须大于 0"),
("101d6", "单项最多只能掷 100 个骰子"),
("1d1001", "骰子面数不能超过 1000"),
("201d1", "单项最多只能掷 100 个骰子"),
("1d6*2", "表达式中含有无法识别的内容"),
],
)
def test_roll_expression_invalid(expr: str, message: str):
with pytest.raises(RollError, match=message):
roll_expression(expr, rng=random.Random(0))
def test_roll_expression_total_roll_limit():
with pytest.raises(RollError, match="一次最多只能实际掷 200 个骰子"):
roll_expression("100d6+100d6+1d6", rng=random.Random(0))