Compare commits
19 Commits
f3389ff2b9
...
feature/su
| Author | SHA1 | Date | |
|---|---|---|---|
| d37c4870d8 | |||
| 23b9f101b3 | |||
|
8c1651ad3d
|
|||
| ff60642c62 | |||
| 69b5908445 | |||
| a542ed1fd9 | |||
| e86a385448 | |||
| d4bb36a074 | |||
| 1a2a3c0468 | |||
| 67502cb932 | |||
| f9a312b80a | |||
| 1980f8a895 | |||
|
d273ed4b1a
|
|||
|
265e9cc583
|
|||
|
8f5061ba41
|
|||
|
b3c3c77f3c
|
|||
|
6a84ce2cd8
|
|||
|
392c699b33
|
|||
|
72e21cd9aa
|
10
.gitignore
vendored
10
.gitignore
vendored
@ -3,9 +3,14 @@
|
||||
/data
|
||||
/pyrightconfig.json
|
||||
/pyrightconfig.toml
|
||||
/uv.lock
|
||||
|
||||
# 缓存文件
|
||||
__pycache__
|
||||
/.ruff_cache
|
||||
/.pytest_cache
|
||||
/.mypy_cache
|
||||
/.black_cache
|
||||
|
||||
# 可能会偶然生成的 diff 文件
|
||||
/*.diff
|
||||
@ -14,3 +19,8 @@ __pycache__
|
||||
/.coverage
|
||||
/.coverage.db
|
||||
/htmlcov
|
||||
|
||||
# 对手动创建虚拟环境的人
|
||||
/.venv
|
||||
/venv
|
||||
*.egg-info
|
||||
|
||||
4
.vscode/settings.json
vendored
4
.vscode/settings.json
vendored
@ -1,3 +1,5 @@
|
||||
{
|
||||
"python.REPL.enableREPLSmartSend": false
|
||||
"python.REPL.enableREPLSmartSend": false,
|
||||
"python-envs.defaultEnvManager": "ms-python.python:poetry",
|
||||
"python-envs.defaultPackageManager": "ms-python.python:poetry"
|
||||
}
|
||||
5
bot.py
5
bot.py
@ -7,6 +7,7 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter
|
||||
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
|
||||
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
|
||||
|
||||
from konabot.common.appcontext import run_afterinit_functions
|
||||
from konabot.common.log import init_logger
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.path import LOG_PATH
|
||||
@ -56,9 +57,7 @@ def main():
|
||||
nonebot.load_plugins("konabot/plugins")
|
||||
nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
|
||||
|
||||
from konabot.common import permsys
|
||||
|
||||
permsys.create_startup()
|
||||
run_afterinit_functions()
|
||||
|
||||
# 注册关闭钩子
|
||||
@driver.on_shutdown
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
- `konabot/common/permsys/__init__.py`
|
||||
- 暴露 `PermManager`、`DepPermManager`、`require_permission`
|
||||
- 负责数据库初始化、启动迁移、超级管理员默认授权
|
||||
- 提供 `register_default_allow_permission()` 用于注册“启动时默认放行”的权限键
|
||||
- `konabot/common/permsys/entity.py`
|
||||
- 定义 `PermEntity`
|
||||
- 将事件转换为可查询的实体链
|
||||
@ -134,6 +135,14 @@ PermEntity("ob11", "user", str(account)), "*", True
|
||||
|
||||
也就是说,配置中的超级管理员会直接拥有全部权限。
|
||||
|
||||
此外,模块也支持插件在导入阶段通过 `register_default_allow_permission("some.key")` 注册默认放行的权限键;这些键会在启动时被写入到:
|
||||
|
||||
```python
|
||||
PermEntity("sys", "global", "global"), "some.key", True
|
||||
```
|
||||
|
||||
这适合“默认所有人可用,但仍希望后续能被权限系统单独关闭”的功能。
|
||||
|
||||
这属于启动时自动灌入的保底策略,不依赖手工授权命令。
|
||||
|
||||
## 在插件中使用
|
||||
|
||||
37
docs/subscribe.md
Normal file
37
docs/subscribe.md
Normal file
@ -0,0 +1,37 @@
|
||||
# subscribe 模块
|
||||
|
||||
一套统一的接口,让用户可以订阅一些延迟或者定时消息。
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from konabot.common.subscribe import register_poster_info, broadcast, PosterInfo
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
# 注册了服务信息,用户可以用「查询可用订阅」指令了解可用的订阅清单。
|
||||
# 用户可以使用「订阅 某某服务通知」或者「订阅 某某服务」来订阅消息。
|
||||
# 如果用户在群聊发起订阅,则会在 QQ 群订阅,不然会在私聊订阅
|
||||
register_poster_info("某某服务通知", PosterInfo(
|
||||
aliases={"某某服务"},
|
||||
description="告诉你关于某某的最新资讯等信息",
|
||||
))
|
||||
|
||||
async def main():
|
||||
while True:
|
||||
# 这里的服务 channel 名字必须填写该服务的名字,不可以是 alias
|
||||
# 这会给所有订阅了该通道的用户发送「向大家发送纯文本通知」
|
||||
await broadcast("某某服务通知", "向大家发送纯文本通知")
|
||||
|
||||
# 也可以发送 UniMessage 对象,可以构造包含图片的通知等
|
||||
data = Path('image.png').read_bytes()
|
||||
await broadcast(
|
||||
"某某服务通知",
|
||||
UniMessage.text("很遗憾告诉大家,我们倒闭了:").image(raw=data),
|
||||
)
|
||||
|
||||
await asyncio.sleep(114.514)
|
||||
```
|
||||
|
||||
该模块的代码请查阅 `/konabot/common/subscribe/` 下的文件。
|
||||
281
konabot/common/apis/wolfx.py
Normal file
281
konabot/common/apis/wolfx.py
Normal file
@ -0,0 +1,281 @@
|
||||
"""
|
||||
Wolfx 防灾免费 API
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Literal, TypeVar, cast
|
||||
import aiohttp
|
||||
from aiosignal import Signal
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, RootModel
|
||||
import pydantic
|
||||
|
||||
from konabot.common.appcontext import after_init
|
||||
|
||||
|
||||
class ScEewReport(BaseModel):
|
||||
"""
|
||||
四川地震局报文
|
||||
"""
|
||||
|
||||
ID: str
|
||||
"EEW 发报 ID"
|
||||
|
||||
EventID: str
|
||||
"EEW 发报事件 ID"
|
||||
|
||||
ReportTime: str
|
||||
"EEW 发报时间(UTC+8)"
|
||||
|
||||
ReportNum: int
|
||||
"EEW 发报数"
|
||||
|
||||
OriginTime: str
|
||||
"发震时间(UTC+8)"
|
||||
|
||||
HypoCenter: str
|
||||
"震源地"
|
||||
|
||||
Latitude: float
|
||||
"震源地纬度"
|
||||
|
||||
Longitude: float
|
||||
"震源地经度"
|
||||
|
||||
Magnitude: float
|
||||
"震级"
|
||||
|
||||
Depth: float | None
|
||||
"震源深度"
|
||||
|
||||
MaxIntensity: float
|
||||
"最大烈度"
|
||||
|
||||
|
||||
class CencEewReport(BaseModel):
|
||||
"""
|
||||
中国地震台网报文
|
||||
"""
|
||||
|
||||
ID: str
|
||||
"EEW 发报 ID"
|
||||
|
||||
EventID: str
|
||||
"EEW 发报事件 ID"
|
||||
|
||||
ReportTime: str
|
||||
"EEW 发报时间(UTC+8)"
|
||||
|
||||
ReportNum: int
|
||||
"EEW 发报数"
|
||||
|
||||
OriginTime: str
|
||||
"发震时间(UTC+8)"
|
||||
|
||||
HypoCenter: str
|
||||
"震源地"
|
||||
|
||||
Latitude: float
|
||||
"震源地纬度"
|
||||
|
||||
Longitude: float
|
||||
"震源地经度"
|
||||
|
||||
Magnitude: float
|
||||
"震级"
|
||||
|
||||
Depth: float | None
|
||||
"震源深度"
|
||||
|
||||
MaxIntensity: float
|
||||
"最大烈度"
|
||||
|
||||
|
||||
class CencEqReport(BaseModel):
|
||||
type: str
|
||||
"报告类型"
|
||||
|
||||
EventID: str
|
||||
"事件 ID"
|
||||
|
||||
time: str
|
||||
"UTC+8 格式的地震发生时间"
|
||||
|
||||
location: str
|
||||
"地震发生位置"
|
||||
|
||||
magnitude: str
|
||||
"震级"
|
||||
|
||||
depth: str
|
||||
"地震深度"
|
||||
|
||||
latitude: str
|
||||
"纬度"
|
||||
|
||||
longtitude: str
|
||||
"经度"
|
||||
|
||||
intensity: str
|
||||
"烈度"
|
||||
|
||||
|
||||
class CencEqlist(RootModel):
|
||||
root: dict[str, CencEqReport]
|
||||
|
||||
|
||||
class WolfxWebSocket:
|
||||
def __init__(self, url: str) -> None:
|
||||
self.url = url
|
||||
self.signal: Signal[bytes] = Signal(self)
|
||||
self._running = False
|
||||
self._task: asyncio.Task | None = None
|
||||
self._session: aiohttp.ClientSession | None = None
|
||||
self._ws: aiohttp.ClientWebSocketResponse | None = None
|
||||
|
||||
@property
|
||||
def session(self) -> aiohttp.ClientSession: # pragma: no cover
|
||||
assert self._session is not None
|
||||
return self._session
|
||||
|
||||
async def start(self): # pragma: no cover
|
||||
if self._running:
|
||||
return
|
||||
self._running = True
|
||||
self._session = aiohttp.ClientSession()
|
||||
self._task = asyncio.create_task(self._run())
|
||||
self.signal.freeze()
|
||||
|
||||
async def stop(self): # pragma: no cover
|
||||
self._running = False
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
if self._session:
|
||||
await self._session.close()
|
||||
|
||||
async def _run(self): # pragma: no cover
|
||||
retry_delay = 1
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
async with self.session.ws_connect(self.url) as ws:
|
||||
self._ws = ws
|
||||
logger.info(f"Wolfx API 服务连接上了 {self.url} 的 WebSocket")
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
await self.handle(cast(str, msg.data).encode())
|
||||
elif msg.type == aiohttp.WSMsgType.BINARY:
|
||||
await self.handle(cast(bytes, msg.data))
|
||||
elif msg.type == aiohttp.WSMsgType.CLOSED:
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
break
|
||||
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
||||
logger.warning("连接 WebSocket 时发生错误")
|
||||
logger.exception(e)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error("Wolfx API 发生未知错误")
|
||||
logger.exception(e)
|
||||
self._ws = None
|
||||
|
||||
if self._running:
|
||||
logger.info(f"Wolfx API 准备断线重连 {self.url}")
|
||||
await asyncio.sleep(retry_delay)
|
||||
retry_delay = min(retry_delay * 2, 60)
|
||||
|
||||
async def handle(self, data: bytes):
|
||||
try:
|
||||
obj = json.loads(data)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.warning("解析 Wolfs API 时出错")
|
||||
logger.exception(e)
|
||||
return
|
||||
|
||||
if obj.get("type") == "heartbeat" or obj.get("type") == "pong":
|
||||
logger.debug(f"Wolfx API 收到了来自 {self.url} 的心跳: {obj}")
|
||||
else:
|
||||
await self.signal.send(data)
|
||||
|
||||
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
|
||||
|
||||
class WolfxAPIService:
|
||||
sc_eew: Signal[ScEewReport]
|
||||
"四川地震局地震速报"
|
||||
|
||||
cenc_eew: Signal[CencEewReport]
|
||||
"中国地震台网地震速报"
|
||||
|
||||
cenc_eqlist: Signal[CencEqReport]
|
||||
"中国地震台网地震信息发布"
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.sc_eew = Signal(self)
|
||||
self._sc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/sc_eew")
|
||||
WolfxAPIService.bind(self.sc_eew, self._sc_eew_ws, ScEewReport)
|
||||
|
||||
self.cenc_eew = Signal(self)
|
||||
self._cenc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eew")
|
||||
WolfxAPIService.bind(self.cenc_eew, self._cenc_eew_ws, CencEewReport)
|
||||
|
||||
self.cenc_eqlist = Signal(self)
|
||||
self._cenc_eqlist_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eqlist")
|
||||
WolfxAPIService.bind(self.cenc_eqlist, self._cenc_eqlist_ws, CencEqReport)
|
||||
|
||||
@staticmethod
|
||||
def bind(signal: Signal[T], ws: WolfxWebSocket, t: type[T]):
|
||||
@ws.signal.append
|
||||
async def _(data: bytes):
|
||||
try:
|
||||
obj = t.model_validate_json(data)
|
||||
logger.info(f"接收到来自 Wolfx API 的信息:{data}")
|
||||
await signal.send(obj)
|
||||
except pydantic.ValidationError as e:
|
||||
logger.warning(f"解析 Wolfx API 时出错 URL={ws.url}")
|
||||
logger.error(e)
|
||||
|
||||
async def start(self): # pragma: no cover
|
||||
self.cenc_eew.freeze()
|
||||
self.sc_eew.freeze()
|
||||
self.cenc_eqlist.freeze()
|
||||
async with asyncio.TaskGroup() as task_group:
|
||||
if len(self.cenc_eew) > 0:
|
||||
task_group.create_task(self._cenc_eew_ws.start())
|
||||
|
||||
if len(self.sc_eew) > 0:
|
||||
task_group.create_task(self._sc_eew_ws.start())
|
||||
|
||||
if len(self.cenc_eqlist) > 0:
|
||||
task_group.create_task(self._cenc_eqlist_ws.start())
|
||||
|
||||
async def stop(self): # pragma: no cover
|
||||
async with asyncio.TaskGroup() as task_group:
|
||||
task_group.create_task(self._cenc_eew_ws.stop())
|
||||
task_group.create_task(self._sc_eew_ws.stop())
|
||||
task_group.create_task(self._cenc_eqlist_ws.stop())
|
||||
|
||||
|
||||
wolfx_api = WolfxAPIService()
|
||||
|
||||
|
||||
@after_init
|
||||
def init(): # pragma: no cover
|
||||
import nonebot
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
await wolfx_api.start()
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
await wolfx_api.stop()
|
||||
15
konabot/common/appcontext.py
Normal file
15
konabot/common/appcontext.py
Normal file
@ -0,0 +1,15 @@
|
||||
from typing import Any, Callable
|
||||
|
||||
|
||||
AFTER_INIT_FUNCTION = Callable[[], Any]
|
||||
|
||||
_after_init_functions: list[AFTER_INIT_FUNCTION] = []
|
||||
|
||||
|
||||
def after_init(func: AFTER_INIT_FUNCTION):
|
||||
_after_init_functions.append(func)
|
||||
|
||||
|
||||
def run_afterinit_functions(): # pragma: no cover
|
||||
for f in _after_init_functions:
|
||||
f()
|
||||
@ -4,6 +4,7 @@ from nonebot.adapters import Event
|
||||
from nonebot.params import Depends
|
||||
from nonebot.rule import Rule
|
||||
|
||||
from konabot.common.appcontext import after_init
|
||||
from konabot.common.database import DatabaseManager
|
||||
from konabot.common.pager import PagerQuery
|
||||
from konabot.common.path import DATA_PATH
|
||||
@ -13,6 +14,7 @@ from konabot.common.permsys.repo import PermRepo
|
||||
|
||||
|
||||
db = DatabaseManager(DATA_PATH / "perm.sqlite3")
|
||||
_default_allow_permissions: set[str] = set()
|
||||
|
||||
|
||||
_EntityLike = Event | PermEntity | list[PermEntity]
|
||||
@ -73,6 +75,7 @@ def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma:
|
||||
return PermManager(_db)
|
||||
|
||||
|
||||
@after_init
|
||||
def create_startup(): # pragma: no cover
|
||||
from konabot.common.nb.is_admin import cfg
|
||||
|
||||
@ -89,6 +92,10 @@ def create_startup(): # pragma: no cover
|
||||
await pm.update_permission(
|
||||
PermEntity("ob11", "user", str(account)), "*", True
|
||||
)
|
||||
for key in _default_allow_permissions:
|
||||
await pm.update_permission(
|
||||
PermEntity("sys", "global", "global"), key, True
|
||||
)
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
@ -101,6 +108,10 @@ def create_startup(): # pragma: no cover
|
||||
DepPermManager = Annotated[PermManager, Depends(perm_manager)]
|
||||
|
||||
|
||||
def register_default_allow_permission(key: str):
|
||||
_default_allow_permissions.add(key)
|
||||
|
||||
|
||||
def require_permission(perm: str) -> Rule: # pragma: no cover
|
||||
async def check_permission(event: Event, pm: DepPermManager) -> bool:
|
||||
return await pm.check_has_permission(event, perm)
|
||||
|
||||
11
konabot/common/subscribe/__init__.py
Normal file
11
konabot/common/subscribe/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
"""
|
||||
Subscribe 模块,用于向一些订阅的频道广播消息
|
||||
"""
|
||||
|
||||
from .service import broadcast as broadcast
|
||||
from .service import dep_poster_service as dep_poster_service
|
||||
from .service import DepPosterService as DepPosterService
|
||||
from .service import PosterService as PosterService
|
||||
from .subscribe_info import PosterInfo as PosterInfo
|
||||
from .subscribe_info import POSTER_INFO_DATA as POSTER_INFO_DATA
|
||||
from .subscribe_info import register_poster_info as register_poster_info
|
||||
@ -6,7 +6,8 @@ from pydantic import BaseModel, ValidationError
|
||||
from konabot.common.longtask import LongTaskTarget
|
||||
from konabot.common.pager import PagerQuery, PagerResult
|
||||
from konabot.common.path import DATA_PATH
|
||||
from konabot.plugins.poster.repository import IPosterRepo
|
||||
|
||||
from .repository import IPosterRepo
|
||||
|
||||
|
||||
class ChannelData(BaseModel):
|
||||
@ -18,9 +19,9 @@ class PosterData(BaseModel):
|
||||
|
||||
|
||||
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
|
||||
if (target1.is_private_chat and not target2.is_private_chat):
|
||||
if target1.is_private_chat and not target2.is_private_chat:
|
||||
return False
|
||||
if (target2.is_private_chat and not target1.is_private_chat):
|
||||
if target2.is_private_chat and not target1.is_private_chat:
|
||||
return False
|
||||
if target1.platform != target2.platform:
|
||||
return False
|
||||
@ -58,7 +59,9 @@ class LocalPosterRepo(IPosterRepo):
|
||||
len1 = len(self.data.channels[channel].targets)
|
||||
return len0 != len1
|
||||
|
||||
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
|
||||
async def get_subscribed_channels(
|
||||
self, target: LongTaskTarget, pager: PagerQuery
|
||||
) -> PagerResult[str]:
|
||||
channels: list[str] = []
|
||||
for channel_id, channel in self.data.channels.items():
|
||||
for t in channel.targets:
|
||||
@ -95,7 +98,9 @@ async def local_poster_data():
|
||||
data = PosterData()
|
||||
else:
|
||||
try:
|
||||
data = PosterData.model_validate_json(LOCAL_POSTER_DATA_PATH.read_text())
|
||||
data = PosterData.model_validate_json(
|
||||
LOCAL_POSTER_DATA_PATH.read_text()
|
||||
)
|
||||
except ValidationError:
|
||||
data = PosterData()
|
||||
yield data
|
||||
@ -109,4 +114,3 @@ async def local_poster():
|
||||
|
||||
|
||||
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]
|
||||
|
||||
@ -4,9 +4,10 @@ from nonebot.params import Depends
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from konabot.common.longtask import LongTaskTarget
|
||||
from konabot.common.pager import PagerQuery, PagerResult
|
||||
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
|
||||
from konabot.plugins.poster.repo_local_data import local_poster
|
||||
from konabot.plugins.poster.repository import IPosterRepo
|
||||
|
||||
from .subscribe_info import POSTER_INFO_DATA
|
||||
from .repo_local_data import local_poster
|
||||
from .repository import IPosterRepo
|
||||
|
||||
|
||||
class PosterService:
|
||||
@ -27,7 +28,9 @@ class PosterService:
|
||||
channel = self.parse_channel_id(channel)
|
||||
return await self.repo.remove_channel_target(channel, target)
|
||||
|
||||
async def broadcast(self, channel: str, message: UniMessage[Any] | str) -> list[LongTaskTarget]:
|
||||
async def broadcast(
|
||||
self, channel: str, message: UniMessage[Any] | str
|
||||
) -> list[LongTaskTarget]:
|
||||
channel = self.parse_channel_id(channel)
|
||||
targets = await self.repo.get_channel_targets(channel)
|
||||
for target in targets:
|
||||
@ -35,7 +38,9 @@ class PosterService:
|
||||
await target.send_message(message, at=False)
|
||||
return targets
|
||||
|
||||
async def get_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
|
||||
async def get_channels(
|
||||
self, target: LongTaskTarget, pager: PagerQuery
|
||||
) -> PagerResult[str]:
|
||||
return await self.repo.get_subscribed_channels(target, pager)
|
||||
|
||||
async def fix_data(self):
|
||||
@ -56,4 +61,3 @@ async def broadcast(channel: str, message: UniMessage[Any] | str):
|
||||
|
||||
|
||||
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]
|
||||
|
||||
@ -4,7 +4,7 @@ from dataclasses import dataclass, field
|
||||
@dataclass
|
||||
class PosterInfo:
|
||||
aliases: set[str] = field(default_factory=set)
|
||||
description: str = field(default='')
|
||||
description: str = field(default="")
|
||||
|
||||
|
||||
POSTER_INFO_DATA: dict[str, PosterInfo] = {}
|
||||
@ -12,4 +12,3 @@ POSTER_INFO_DATA: dict[str, PosterInfo] = {}
|
||||
|
||||
def register_poster_info(channel: str, info: PosterInfo):
|
||||
POSTER_INFO_DATA[channel] = info
|
||||
|
||||
@ -76,6 +76,8 @@ fx [滤镜名称] <参数1> <参数2> ...
|
||||
* ```fx 设置遮罩```
|
||||
* ```fx 色键 <目标颜色="rgb(255,0,0)"> <容差=60>```
|
||||
* ```fx 晃动 <最大偏移量=5> <运动模糊=False>```
|
||||
* ```fx JPEG损坏 <质量=10>```
|
||||
* 质量范围建议为 1~95,数值越低,压缩痕迹越重、效果越搞笑。
|
||||
* ```fx 动图 <帧率=10>```
|
||||
|
||||
### 多图像处理器
|
||||
|
||||
53
konabot/docs/user/roll.txt
Normal file
53
konabot/docs/user/roll.txt
Normal file
@ -0,0 +1,53 @@
|
||||
**roll** - 面向跑团的文本骰子指令
|
||||
|
||||
## 用法
|
||||
|
||||
`roll 表达式`
|
||||
|
||||
支持常见骰子写法:
|
||||
|
||||
- `roll 3d6`
|
||||
- `roll d20+5`
|
||||
- `roll 2d8+1d4+3`
|
||||
- `roll d%`
|
||||
- `roll 4dF`
|
||||
|
||||
## 说明
|
||||
|
||||
- `NdM` 表示掷 N 个 M 面骰,例如 `3d6`
|
||||
- `d20` 等价于 `1d20`
|
||||
- `d%` 表示百分骰,范围 1 到 100
|
||||
- `dF` 表示 Fate/Fudge 骰,单骰结果为 -1、0、+1
|
||||
- 支持用 `+`、`-` 连接多个项,也支持常数修正
|
||||
|
||||
## 返回格式
|
||||
|
||||
会返回总结果,以及每一项的明细。
|
||||
|
||||
例如:
|
||||
|
||||
- `roll 3d6`
|
||||
可能返回:
|
||||
- `3d6 = 11`
|
||||
- `+3d6=[2, 4, 5]`
|
||||
|
||||
- `roll d20+5`
|
||||
可能返回:
|
||||
- `d20+5 = 19`
|
||||
- `+1d20=[14] +5=5`
|
||||
|
||||
## 限制
|
||||
|
||||
为防止刷屏和滥用,当前实现会限制:
|
||||
|
||||
- 单项最多 100 个骰子
|
||||
- 单个骰子最多 1000 面
|
||||
- 一次表达式最多 20 项
|
||||
- 一次表达式最多实际掷 200 个骰子
|
||||
- 结果过长时会直接拒绝
|
||||
|
||||
## 权限
|
||||
|
||||
需要 `trpg.roll` 权限。
|
||||
|
||||
默认启动时会给系统全局授予允许,因此通常所有人都能用;如有需要可再用权限系统单独关闭。
|
||||
@ -1,4 +1,5 @@
|
||||
import random
|
||||
from io import BytesIO
|
||||
from PIL import Image, ImageFilter, ImageDraw, ImageStat, ImageFont
|
||||
from PIL import ImageEnhance
|
||||
from PIL import ImageChops
|
||||
@ -167,26 +168,53 @@ class ImageFilterImplement:
|
||||
|
||||
return Image.fromarray(result, 'RGBA')
|
||||
|
||||
# JPEG 损坏感压缩
|
||||
@staticmethod
|
||||
def apply_jpeg_damage(image: Image.Image, quality: int = 10) -> Image.Image:
|
||||
quality = max(1, min(95, int(quality)))
|
||||
|
||||
alpha = None
|
||||
if image.mode in ('RGBA', 'LA') or (image.mode == 'P' and 'transparency' in image.info):
|
||||
rgba_image = image.convert('RGBA')
|
||||
alpha = rgba_image.getchannel('A')
|
||||
rgb_image = Image.new('RGB', rgba_image.size, (255, 255, 255))
|
||||
rgb_image.paste(rgba_image, mask=alpha)
|
||||
else:
|
||||
rgb_image = image.convert('RGB')
|
||||
|
||||
output = BytesIO()
|
||||
rgb_image.save(output, format='JPEG', quality=quality, optimize=False)
|
||||
output.seek(0)
|
||||
damaged = Image.open(output).convert('RGB')
|
||||
|
||||
if alpha is not None:
|
||||
return Image.merge('RGBA', (*damaged.split(), alpha))
|
||||
return damaged.convert('RGBA')
|
||||
|
||||
# 缩放
|
||||
@staticmethod
|
||||
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y = None) -> Image.Image:
|
||||
# scale 可以为负
|
||||
# 如果 scale 为负,则代表翻转
|
||||
if scale_y is not None:
|
||||
if float(scale_y) < 0:
|
||||
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y: float = None) -> Image.Image:
|
||||
scale_x = float(scale)
|
||||
scale_y_value = float(scale_y) if scale_y is not None else None
|
||||
|
||||
if scale_y_value is not None:
|
||||
if scale_y_value < 0:
|
||||
image = ImageOps.flip(image)
|
||||
scale_y = abs(float(scale_y))
|
||||
if scale < 0:
|
||||
scale_y_value = abs(scale_y_value)
|
||||
if scale_x < 0:
|
||||
image = ImageOps.mirror(image)
|
||||
scale = abs(scale)
|
||||
new_size = (int(image.width * scale), int(image.height * float(scale_y)))
|
||||
return image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
if scale < 0:
|
||||
image = ImageOps.mirror(image)
|
||||
image = ImageOps.flip(image)
|
||||
scale = abs(scale)
|
||||
new_size = (int(image.width * scale), int(image.height * scale))
|
||||
return image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
scale_x = abs(scale_x)
|
||||
target_scale_y = scale_y_value
|
||||
else:
|
||||
if scale_x < 0:
|
||||
image = ImageOps.mirror(image)
|
||||
image = ImageOps.flip(image)
|
||||
scale_x = abs(scale_x)
|
||||
target_scale_y = scale_x
|
||||
|
||||
new_width = max(1, round(image.width * scale_x))
|
||||
new_height = max(1, round(image.height * target_scale_y))
|
||||
return image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||||
|
||||
# 波纹
|
||||
@staticmethod
|
||||
|
||||
@ -50,6 +50,7 @@ class ImageFilterManager:
|
||||
"描边": ImageFilterImplement.apply_stroke,
|
||||
"形状描边": ImageFilterImplement.apply_shape_stroke,
|
||||
"半调": ImageFilterImplement.apply_halftone,
|
||||
"JPEG损坏": ImageFilterImplement.apply_jpeg_damage,
|
||||
"设置通道": ImageFilterImplement.apply_set_channel,
|
||||
"设置遮罩": ImageFilterImplement.apply_set_mask,
|
||||
# 图像处理
|
||||
|
||||
@ -15,10 +15,12 @@ class THQwen(TextHandler):
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
pm = perm_manager()
|
||||
if env.event is None or not pm.check_has_permission(env.event, "textfx.qwen"):
|
||||
if env.event is None or not await pm.check_has_permission(
|
||||
env.event, "textfx.qwen"
|
||||
):
|
||||
return TextHandleResult(
|
||||
code=1,
|
||||
ostream="这里暂未开启 AI 功能",
|
||||
ostream="你或当前环境没有使用 qwen 的权限。如有疑问请联系管理员",
|
||||
)
|
||||
|
||||
llm = get_llm()
|
||||
|
||||
@ -6,29 +6,34 @@ from loguru import logger
|
||||
from nonebot import on_message
|
||||
import nonebot
|
||||
from nonebot.rule import to_me
|
||||
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
|
||||
on_alconna)
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
from konabot.common import username
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.pager import PagerQuery
|
||||
from konabot.plugins.kona_ph.core.message import (get_daily_report,
|
||||
get_daily_report_v2,
|
||||
get_puzzle_description,
|
||||
get_submission_message)
|
||||
from konabot.plugins.kona_ph.core.message import (
|
||||
get_daily_report,
|
||||
get_daily_report_v2,
|
||||
get_puzzle_description,
|
||||
get_submission_message,
|
||||
)
|
||||
from konabot.plugins.kona_ph.core.storage import get_today_date
|
||||
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE,
|
||||
create_admin_commands,
|
||||
puzzle_manager)
|
||||
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
|
||||
from konabot.plugins.poster.service import broadcast
|
||||
from konabot.plugins.kona_ph.manager import (
|
||||
PUZZLE_PAGE_SIZE,
|
||||
create_admin_commands,
|
||||
puzzle_manager,
|
||||
)
|
||||
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
|
||||
|
||||
create_admin_commands()
|
||||
register_poster_info("每日谜题", info=PosterInfo(
|
||||
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
|
||||
description="此方 BOT 每日谜题推送",
|
||||
))
|
||||
register_poster_info(
|
||||
"每日谜题",
|
||||
info=PosterInfo(
|
||||
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
|
||||
description="此方 BOT 每日谜题推送",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
cmd_submit = on_message(rule=to_me())
|
||||
@ -44,16 +49,22 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
|
||||
if isinstance(result, str):
|
||||
await target.send_message(result)
|
||||
else:
|
||||
await target.send_message(get_submission_message(
|
||||
daily_puzzle_info=result.info,
|
||||
submission=result.submission,
|
||||
puzzle=result.puzzle,
|
||||
))
|
||||
await target.send_message(
|
||||
get_submission_message(
|
||||
daily_puzzle_info=result.info,
|
||||
submission=result.submission,
|
||||
puzzle=result.puzzle,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
cmd_query = on_alconna(Alconna(
|
||||
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)"
|
||||
), rule=to_me())
|
||||
cmd_query = on_alconna(
|
||||
Alconna(
|
||||
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)"
|
||||
),
|
||||
rule=to_me(),
|
||||
)
|
||||
|
||||
|
||||
@cmd_query.handle()
|
||||
async def _(target: DepLongTaskTarget):
|
||||
@ -64,9 +75,8 @@ async def _(target: DepLongTaskTarget):
|
||||
await target.send_message(get_puzzle_description(p))
|
||||
|
||||
|
||||
cmd_query_submission = on_alconna(Alconna(
|
||||
"今日答题情况"
|
||||
), rule=to_me())
|
||||
cmd_query_submission = on_alconna(Alconna("今日答题情况"), rule=to_me())
|
||||
|
||||
|
||||
@cmd_query_submission.handle()
|
||||
async def _(target: DepLongTaskTarget):
|
||||
@ -77,11 +87,15 @@ async def _(target: DepLongTaskTarget):
|
||||
await target.send_message(get_daily_report_v2(manager, gid))
|
||||
|
||||
|
||||
cmd_history = on_alconna(Alconna(
|
||||
"re:历史(题目|谜题)",
|
||||
Args["page?", int],
|
||||
Args["index_id?", str],
|
||||
), rule=to_me())
|
||||
cmd_history = on_alconna(
|
||||
Alconna(
|
||||
"re:历史(题目|谜题)",
|
||||
Args["page?", int],
|
||||
Args["index_id?", str],
|
||||
),
|
||||
rule=to_me(),
|
||||
)
|
||||
|
||||
|
||||
@cmd_history.handle()
|
||||
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
@ -105,10 +119,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||||
)
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||||
for p, d in puzzles:
|
||||
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
|
||||
msg = msg.text(
|
||||
@ -120,22 +134,26 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
await target.send_message(msg)
|
||||
|
||||
|
||||
cmd_leadboard = on_alconna(Alconna(
|
||||
"re:此方(解谜|谜题)排行榜",
|
||||
Args["page?", int],
|
||||
))
|
||||
cmd_leadboard = on_alconna(
|
||||
Alconna(
|
||||
"re:此方(解谜|谜题)排行榜",
|
||||
Args["page?", int],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@cmd_leadboard.handle()
|
||||
async def _(target: DepLongTaskTarget, page: int = 1):
|
||||
async with puzzle_manager() as manager:
|
||||
result = manager.get_leadboard(PagerQuery(page, 10))
|
||||
await target.send_message(result.to_unimessage(
|
||||
title="此方解谜排行榜",
|
||||
formatter=lambda data: (
|
||||
f"✨ {data[1]} 已完成 | "
|
||||
f"{username.get_username(data[0])}"
|
||||
await target.send_message(
|
||||
result.to_unimessage(
|
||||
title="此方解谜排行榜",
|
||||
formatter=lambda data: (
|
||||
f"✨ {data[1]} 已完成 | {username.get_username(data[0])}"
|
||||
),
|
||||
)
|
||||
))
|
||||
)
|
||||
|
||||
|
||||
@scheduler.scheduled_job("cron", hour="8")
|
||||
@ -155,4 +173,3 @@ async def _():
|
||||
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import datetime
|
||||
from math import ceil
|
||||
|
||||
from nonebot import get_plugin_config
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
@ -14,7 +13,6 @@ from nonebot_plugin_alconna import (
|
||||
UniMessage,
|
||||
on_alconna,
|
||||
)
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
@ -35,20 +33,11 @@ from konabot.plugins.kona_ph.core.storage import (
|
||||
get_today_date,
|
||||
puzzle_manager,
|
||||
)
|
||||
from konabot.plugins.poster.service import broadcast
|
||||
from konabot.common.subscribe import broadcast
|
||||
|
||||
PUZZLE_PAGE_SIZE = 10
|
||||
|
||||
|
||||
class PuzzleConfig(BaseModel):
|
||||
plugin_puzzle_manager: list[str] = []
|
||||
plugin_puzzle_admin: list[str] = []
|
||||
plugin_puzzle_playgroup: list[str] = []
|
||||
|
||||
|
||||
config = get_plugin_config(PuzzleConfig)
|
||||
|
||||
|
||||
async def check_puzzle(
|
||||
manager: PuzzleManager,
|
||||
perm: DepPermManager,
|
||||
|
||||
@ -6,6 +6,7 @@ from konabot.common.nb.match_keyword import match_keyword
|
||||
|
||||
evt_nya = on_message(rule=match_keyword("喵"))
|
||||
|
||||
|
||||
@evt_nya.handle()
|
||||
async def _():
|
||||
await evt_nya.send(await UniMessage().text("喵").export())
|
||||
@ -25,8 +26,9 @@ NYA_SYMBOL_MAPPING = {
|
||||
"~": "~",
|
||||
"~": "~",
|
||||
" ": " ",
|
||||
"\n": "\n",
|
||||
}
|
||||
NYA_SYMBOL_KEEP = "—¹₁²₂³₃⁴₄⁵₅⁶₆⁷₇⁸₈⁹₉⁰₀\n"
|
||||
NYA_SYMBOL_MAPPING.update((k, k) for k in NYA_SYMBOL_KEEP)
|
||||
|
||||
|
||||
async def has_nya(msg: UniMsg) -> bool:
|
||||
@ -49,10 +51,10 @@ async def has_nya(msg: UniMsg) -> bool:
|
||||
|
||||
evt_nya_v2 = on_message(rule=has_nya)
|
||||
|
||||
|
||||
@evt_nya_v2.handle()
|
||||
async def _(msg: UniMsg, evt: Event):
|
||||
text = msg.extract_plain_text()
|
||||
await UniMessage.text(''.join(
|
||||
(NYA_SYMBOL_MAPPING.get(c, '') for c in text)
|
||||
)).send(evt)
|
||||
|
||||
await UniMessage.text("".join((NYA_SYMBOL_MAPPING.get(c, "") for c in text))).send(
|
||||
evt
|
||||
)
|
||||
|
||||
@ -3,14 +3,15 @@ from nonebot_plugin_alconna import Alconna, Args, on_alconna
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.pager import PagerQuery
|
||||
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
|
||||
from konabot.plugins.poster.service import dep_poster_service
|
||||
from konabot.common.subscribe import POSTER_INFO_DATA, dep_poster_service
|
||||
|
||||
|
||||
cmd_subscribe = on_alconna(Alconna(
|
||||
"订阅",
|
||||
Args["channel", str],
|
||||
))
|
||||
cmd_subscribe = on_alconna(
|
||||
Alconna(
|
||||
"订阅",
|
||||
Args["channel", str],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@cmd_subscribe.handle()
|
||||
@ -23,10 +24,12 @@ async def _(target: DepLongTaskTarget, channel: str):
|
||||
await target.send_message(f"已经订阅过「{channel}」了")
|
||||
|
||||
|
||||
cmd_list = on_alconna(Alconna(
|
||||
"re:(?:查询|我的|获取)订阅(列表)?",
|
||||
Args["page?", int],
|
||||
))
|
||||
cmd_list = on_alconna(
|
||||
Alconna(
|
||||
"re:(?:查询|我的|获取)订阅(列表)?",
|
||||
Args["page?", int],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def better_channel_message(channel_id: str) -> str:
|
||||
@ -39,17 +42,24 @@ def better_channel_message(channel_id: str) -> str:
|
||||
@cmd_list.handle()
|
||||
async def _(target: DepLongTaskTarget, page: int = 1):
|
||||
async with dep_poster_service() as service:
|
||||
result = await service.get_channels(target, PagerQuery(
|
||||
page_index=page,
|
||||
page_size=10,
|
||||
))
|
||||
await target.send_message(result.to_unimessage(title="订阅列表", formatter=better_channel_message))
|
||||
result = await service.get_channels(
|
||||
target,
|
||||
PagerQuery(
|
||||
page_index=page,
|
||||
page_size=10,
|
||||
),
|
||||
)
|
||||
await target.send_message(
|
||||
result.to_unimessage(title="订阅列表", formatter=better_channel_message)
|
||||
)
|
||||
|
||||
|
||||
cmd_list_available = on_alconna(Alconna(
|
||||
"re:(查询)?可用订阅(列表)?",
|
||||
Args["page?", int],
|
||||
))
|
||||
cmd_list_available = on_alconna(
|
||||
Alconna(
|
||||
"re:(查询)?可用订阅(列表)?",
|
||||
Args["page?", int],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@cmd_list_available.handle()
|
||||
@ -58,13 +68,17 @@ async def _(target: DepLongTaskTarget, page: int = 1):
|
||||
page_index=page,
|
||||
page_size=10,
|
||||
).apply(sorted(POSTER_INFO_DATA.keys()))
|
||||
await target.send_message(result.to_unimessage(title="可用订阅列表", formatter=better_channel_message))
|
||||
await target.send_message(
|
||||
result.to_unimessage(title="可用订阅列表", formatter=better_channel_message)
|
||||
)
|
||||
|
||||
|
||||
cmd_unsubscribe = on_alconna(Alconna(
|
||||
"取消订阅",
|
||||
Args["channel", str],
|
||||
))
|
||||
cmd_unsubscribe = on_alconna(
|
||||
Alconna(
|
||||
"取消订阅",
|
||||
Args["channel", str],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@cmd_unsubscribe.handle()
|
||||
@ -79,6 +93,7 @@ async def _(target: DepLongTaskTarget, channel: str):
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
async with dep_poster_service() as service:
|
||||
|
||||
@ -4,8 +4,7 @@ from nonebot.internal.adapter.event import Event
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
|
||||
from konabot.plugins.poster.service import broadcast
|
||||
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
|
||||
|
||||
register_poster_info(
|
||||
"二十四节气",
|
||||
@ -98,4 +97,3 @@ async def _(event: Event):
|
||||
|
||||
msg = UniMessage.text(f"现在的节气是{date.term}")
|
||||
await msg.send(event)
|
||||
|
||||
|
||||
@ -1,20 +1,23 @@
|
||||
import asyncio
|
||||
from nonebot import get_driver
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from konabot.plugins.poster.poster_info import register_poster_info, PosterInfo
|
||||
from konabot.plugins.poster.service import broadcast
|
||||
from konabot.common.subscribe import register_poster_info, PosterInfo, broadcast
|
||||
|
||||
|
||||
CHANNEL_STARTUP = "启动通知"
|
||||
|
||||
|
||||
register_poster_info(CHANNEL_STARTUP, PosterInfo(
|
||||
aliases=set(),
|
||||
description="当 Bot 重启时告知",
|
||||
))
|
||||
register_poster_info(
|
||||
CHANNEL_STARTUP,
|
||||
PosterInfo(
|
||||
aliases=set(),
|
||||
description="当 Bot 重启时告知",
|
||||
),
|
||||
)
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
# 要尽量保证接受讯息的服务存在
|
||||
@ -30,4 +33,3 @@ async def _():
|
||||
await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了"))
|
||||
|
||||
asyncio.create_task(task())
|
||||
|
||||
|
||||
210
konabot/plugins/syntactic_sugar/__init__.py
Normal file
210
konabot/plugins/syntactic_sugar/__init__.py
Normal file
@ -0,0 +1,210 @@
|
||||
import copy
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters import Bot, Event, Message
|
||||
from nonebot.log import logger
|
||||
from nonebot.message import handle_event
|
||||
from nonebot.params import CommandArg
|
||||
|
||||
from konabot.common.database import DatabaseManager
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
|
||||
ROOT_PATH = Path(__file__).resolve().parent
|
||||
|
||||
cmd = on_command(cmd="语法糖", aliases={"糖", "sugar"}, block=True)
|
||||
|
||||
db_manager = DatabaseManager()
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def register_startup_hook():
|
||||
await init_db()
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def register_shutdown_hook():
|
||||
await db_manager.close_all_connections()
|
||||
|
||||
|
||||
async def init_db():
|
||||
await db_manager.execute_by_sql_file(ROOT_PATH / "sql" / "create_table.sql")
|
||||
|
||||
table_info = await db_manager.query("PRAGMA table_info(syntactic_sugar)")
|
||||
columns = {str(row.get("name")) for row in table_info}
|
||||
if "channel_id" not in columns:
|
||||
await db_manager.execute(
|
||||
"ALTER TABLE syntactic_sugar ADD COLUMN channel_id VARCHAR(255) NOT NULL DEFAULT ''"
|
||||
)
|
||||
|
||||
await db_manager.execute("DROP INDEX IF EXISTS idx_syntactic_sugar_name_belong_to")
|
||||
await db_manager.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target "
|
||||
"ON syntactic_sugar(name, channel_id, belong_to)"
|
||||
)
|
||||
|
||||
|
||||
def _extract_reply_plain_text(evt: Event) -> str:
|
||||
reply = getattr(evt, "reply", None)
|
||||
if reply is None:
|
||||
return ""
|
||||
|
||||
reply_message = getattr(reply, "message", None)
|
||||
if reply_message is None:
|
||||
return ""
|
||||
|
||||
extract_plain_text = getattr(reply_message, "extract_plain_text", None)
|
||||
if callable(extract_plain_text):
|
||||
return extract_plain_text().strip()
|
||||
return str(reply_message).strip()
|
||||
|
||||
|
||||
def _split_variables(tokens: list[str]) -> tuple[list[str], dict[str, str]]:
|
||||
positional: list[str] = []
|
||||
named: dict[str, str] = {}
|
||||
|
||||
for token in tokens:
|
||||
if "=" in token:
|
||||
key, value = token.split("=", 1)
|
||||
key = key.strip()
|
||||
if key:
|
||||
named[key] = value
|
||||
continue
|
||||
positional.append(token)
|
||||
|
||||
return positional, named
|
||||
|
||||
|
||||
def _render_template(content: str, positional: list[str], named: dict[str, str]) -> str:
|
||||
def replace(match: re.Match[str]) -> str:
|
||||
key = match.group(1).strip()
|
||||
if key.isdigit():
|
||||
idx = int(key) - 1
|
||||
if 0 <= idx < len(positional):
|
||||
return positional[idx]
|
||||
return match.group(0)
|
||||
return named.get(key, match.group(0))
|
||||
|
||||
return re.sub(r"\{([^{}]+)\}", replace, content)
|
||||
|
||||
|
||||
async def _store_sugar(name: str, content: str, belong_to: str, channel_id: str):
|
||||
await db_manager.execute_by_sql_file(
|
||||
ROOT_PATH / "sql" / "insert_sugar.sql",
|
||||
(name, content, belong_to, channel_id),
|
||||
)
|
||||
|
||||
|
||||
async def _delete_sugar(name: str, belong_to: str, channel_id: str):
|
||||
await db_manager.execute(
|
||||
"DELETE FROM syntactic_sugar WHERE name = ? AND belong_to = ? AND channel_id = ?",
|
||||
(name, belong_to, channel_id),
|
||||
)
|
||||
|
||||
|
||||
async def _find_sugar(name: str, belong_to: str, channel_id: str) -> str | None:
|
||||
rows = await db_manager.query(
|
||||
(
|
||||
"SELECT content FROM syntactic_sugar "
|
||||
"WHERE name = ? AND channel_id = ? "
|
||||
"ORDER BY CASE WHEN belong_to = ? THEN 0 ELSE 1 END, id ASC "
|
||||
"LIMIT 1"
|
||||
),
|
||||
(name, channel_id, belong_to),
|
||||
)
|
||||
if not rows:
|
||||
return None
|
||||
return rows[0].get("content")
|
||||
|
||||
|
||||
async def _reinject_command(bot: Bot, evt: Event, command_text: str) -> bool:
|
||||
depth = int(getattr(evt, "_syntactic_sugar_depth", 0))
|
||||
if depth >= 3:
|
||||
return False
|
||||
|
||||
try:
|
||||
cloned_evt = copy.deepcopy(evt)
|
||||
except Exception:
|
||||
logger.exception("语法糖克隆事件失败")
|
||||
return False
|
||||
|
||||
message = getattr(cloned_evt, "message", None)
|
||||
if message is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
msg_obj = type(message)(command_text)
|
||||
except Exception:
|
||||
msg_obj = command_text
|
||||
|
||||
setattr(cloned_evt, "message", msg_obj)
|
||||
if hasattr(cloned_evt, "original_message"):
|
||||
setattr(cloned_evt, "original_message", msg_obj)
|
||||
if hasattr(cloned_evt, "raw_message"):
|
||||
setattr(cloned_evt, "raw_message", command_text)
|
||||
|
||||
setattr(cloned_evt, "_syntactic_sugar_depth", depth + 1)
|
||||
|
||||
try:
|
||||
await handle_event(bot, cloned_evt)
|
||||
except Exception:
|
||||
logger.exception("语法糖回注事件失败")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(bot: Bot, evt: Event, target: DepLongTaskTarget, args: Message = CommandArg()):
|
||||
raw = args.extract_plain_text().strip()
|
||||
if not raw:
|
||||
return
|
||||
|
||||
tokens = raw.split()
|
||||
action = tokens[0]
|
||||
target_id = target.target_id
|
||||
channel_id = target.channel_id
|
||||
|
||||
if action == "存入":
|
||||
if len(tokens) < 2:
|
||||
await cmd.finish("请提供要存入的名称")
|
||||
name = tokens[1].strip()
|
||||
content = " ".join(tokens[2:]).strip()
|
||||
if not content:
|
||||
content = _extract_reply_plain_text(evt)
|
||||
if not content:
|
||||
await cmd.finish("请提供要存入的内容")
|
||||
|
||||
await _store_sugar(name, content, target_id, channel_id)
|
||||
await cmd.finish(f"糖已存入:「{name}」!")
|
||||
|
||||
if action == "删除":
|
||||
if len(tokens) < 2:
|
||||
await cmd.finish("请提供要删除的名称")
|
||||
name = tokens[1].strip()
|
||||
await _delete_sugar(name, target_id, channel_id)
|
||||
await cmd.finish(f"已删除糖:「{name}」!")
|
||||
|
||||
if action == "查看":
|
||||
if len(tokens) < 2:
|
||||
await cmd.finish("请提供要查看的名称")
|
||||
name = tokens[1].strip()
|
||||
content = await _find_sugar(name, target_id, channel_id)
|
||||
if content is None:
|
||||
await cmd.finish(f"没有糖:「{name}」")
|
||||
await cmd.finish(f"糖的内容:「{content}」")
|
||||
|
||||
|
||||
name = action
|
||||
content = await _find_sugar(name, target_id, channel_id)
|
||||
if content is None:
|
||||
await cmd.finish(f"没有糖:「{name}」")
|
||||
|
||||
positional, named = _split_variables(tokens[1:])
|
||||
rendered = _render_template(content, positional, named)
|
||||
|
||||
ok = await _reinject_command(bot, evt, rendered)
|
||||
if not ok:
|
||||
await cmd.finish(f"糖的展开结果:「{rendered}」")
|
||||
12
konabot/plugins/syntactic_sugar/sql/create_table.sql
Normal file
12
konabot/plugins/syntactic_sugar/sql/create_table.sql
Normal file
@ -0,0 +1,12 @@
|
||||
-- 创建语法糖表
|
||||
CREATE TABLE IF NOT EXISTS syntactic_sugar (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
belong_to VARCHAR(255) NOT NULL,
|
||||
channel_id VARCHAR(255) NOT NULL DEFAULT ''
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target
|
||||
ON syntactic_sugar(name, channel_id, belong_to);
|
||||
5
konabot/plugins/syntactic_sugar/sql/insert_sugar.sql
Normal file
5
konabot/plugins/syntactic_sugar/sql/insert_sugar.sql
Normal file
@ -0,0 +1,5 @@
|
||||
-- 插入语法糖,如果同一用户下名称已存在则更新内容
|
||||
INSERT INTO syntactic_sugar (name, content, belong_to, channel_id)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(name, channel_id, belong_to) DO UPDATE SET
|
||||
content = excluded.content;
|
||||
35
konabot/plugins/trpg_roll/__init__.py
Normal file
35
konabot/plugins/trpg_roll/__init__.py
Normal file
@ -0,0 +1,35 @@
|
||||
import re
|
||||
|
||||
import nonebot
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||
|
||||
from konabot.common.nb import match_keyword
|
||||
from konabot.common.permsys import register_default_allow_permission, require_permission
|
||||
from konabot.plugins.trpg_roll.core import RollError, roll_expression
|
||||
|
||||
|
||||
PERMISSION_KEY = "trpg.roll"
|
||||
register_default_allow_permission(PERMISSION_KEY)
|
||||
|
||||
|
||||
matcher = nonebot.on_message(
|
||||
rule=match_keyword.match_keyword(re.compile(r"^roll(?:\s+.+)?$", re.I))
|
||||
& require_permission(PERMISSION_KEY),
|
||||
)
|
||||
|
||||
|
||||
@matcher.handle()
|
||||
async def _(event: Event, msg: UniMsg):
|
||||
text = msg.extract_plain_text().strip()
|
||||
expr = text[4:].strip()
|
||||
|
||||
if not expr:
|
||||
await UniMessage.text("用法:roll 3d6 / roll d20+5 / roll 2d8+1d4+3 / roll 4dF").send(event)
|
||||
return
|
||||
|
||||
try:
|
||||
result = roll_expression(expr)
|
||||
await UniMessage.text(result.format()).send(event)
|
||||
except RollError as e:
|
||||
await UniMessage.text(str(e)).send(event)
|
||||
143
konabot/plugins/trpg_roll/core.py
Normal file
143
konabot/plugins/trpg_roll/core.py
Normal file
@ -0,0 +1,143 @@
|
||||
import random
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
MAX_DICE_COUNT = 100
|
||||
MAX_DICE_FACES = 1000
|
||||
MAX_TERM_COUNT = 20
|
||||
MAX_TOTAL_ROLLS = 200
|
||||
MAX_EXPRESSION_LENGTH = 200
|
||||
MAX_MESSAGE_LENGTH = 1200
|
||||
|
||||
_TOKEN_RE = re.compile(r"([+-]?)(\d*d(?:%|[fF]|\d+)|\d+)")
|
||||
_DICE_RE = re.compile(r"(?i)(\d*)d(%|f|\d+)")
|
||||
|
||||
# 常见跑团表达式示例:3d6、d20+5、2d8+1d4+3、d%、4dF
|
||||
|
||||
|
||||
class RollError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RollTermResult:
|
||||
sign: int
|
||||
source: str
|
||||
detail: str
|
||||
value: int
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RollResult:
|
||||
expression: str
|
||||
total: int
|
||||
terms: list[RollTermResult]
|
||||
|
||||
def format(self) -> str:
|
||||
parts = [f"{term.source}={term.detail}" for term in self.terms]
|
||||
detail = " ".join(parts)
|
||||
text = f"{self.expression} = {self.total}"
|
||||
if detail:
|
||||
text += f"\n{detail}"
|
||||
if len(text) > MAX_MESSAGE_LENGTH:
|
||||
raise RollError("结果过长,请减少骰子数量或简化表达式")
|
||||
return text
|
||||
|
||||
|
||||
def _parse_single_term(raw: str, sign: int, rng: random.Random) -> RollTermResult:
|
||||
dice_match = _DICE_RE.fullmatch(raw)
|
||||
if dice_match:
|
||||
count_text, faces_text = dice_match.groups()
|
||||
count = int(count_text) if count_text else 1
|
||||
if count <= 0:
|
||||
raise RollError("骰子个数必须大于 0")
|
||||
if count > MAX_DICE_COUNT:
|
||||
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
|
||||
|
||||
if faces_text == "%":
|
||||
faces = 100
|
||||
rolls = [rng.randint(1, 100) for _ in range(count)]
|
||||
elif faces_text.lower() == "f":
|
||||
rolls = [rng.choice((-1, 0, 1)) for _ in range(count)]
|
||||
total = sum(rolls) * sign
|
||||
signed = "+" if sign > 0 else "-"
|
||||
return RollTermResult(
|
||||
sign=sign,
|
||||
source=f"{signed}{count}dF",
|
||||
detail="[" + ", ".join(f"{v:+d}" for v in rolls) + "]",
|
||||
value=total,
|
||||
)
|
||||
else:
|
||||
faces = int(faces_text)
|
||||
if faces <= 0:
|
||||
raise RollError("骰子面数必须大于 0")
|
||||
if faces > MAX_DICE_FACES:
|
||||
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
|
||||
rolls = [rng.randint(1, faces) for _ in range(count)]
|
||||
|
||||
total = sum(rolls) * sign
|
||||
signed = "+" if sign > 0 else "-"
|
||||
return RollTermResult(
|
||||
sign=sign,
|
||||
source=f"{signed}{count}d{faces_text.upper()}",
|
||||
detail="[" + ", ".join(map(str, rolls)) + "]",
|
||||
value=total,
|
||||
)
|
||||
|
||||
value = int(raw) * sign
|
||||
signed = "+" if sign > 0 else "-"
|
||||
return RollTermResult(sign=sign, source=f"{signed}{raw}", detail=str(abs(value)), value=value)
|
||||
|
||||
|
||||
def roll_expression(expr: str, rng: random.Random | None = None) -> RollResult:
|
||||
expr = expr.strip().replace(" ", "")
|
||||
if not expr:
|
||||
raise RollError("请提供要掷的表达式,例如 roll 3d6 或 roll d20+5")
|
||||
if len(expr) > MAX_EXPRESSION_LENGTH:
|
||||
raise RollError("表达式太长了")
|
||||
|
||||
matches = list(_TOKEN_RE.finditer(expr))
|
||||
if not matches:
|
||||
raise RollError("无法解析表达式,请使用如 3d6、d20+5、2d8+1d4+3、4dF 这样的格式")
|
||||
|
||||
rebuilt = "".join(m.group(0) for m in matches)
|
||||
if rebuilt != expr:
|
||||
raise RollError("表达式中含有无法识别的内容")
|
||||
if len(matches) > MAX_TERM_COUNT:
|
||||
raise RollError(f"表达式项数不能超过 {MAX_TERM_COUNT}")
|
||||
|
||||
total_rolls = 0
|
||||
for m in matches:
|
||||
token = m.group(2)
|
||||
dice_match = _DICE_RE.fullmatch(token)
|
||||
if dice_match:
|
||||
count_text, faces_text = dice_match.groups()
|
||||
count = int(count_text) if count_text else 1
|
||||
if count <= 0:
|
||||
raise RollError("骰子个数必须大于 0")
|
||||
if count > MAX_DICE_COUNT:
|
||||
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
|
||||
if faces_text not in {"%", "f", "F"}:
|
||||
faces = int(faces_text)
|
||||
if faces <= 0:
|
||||
raise RollError("骰子面数必须大于 0")
|
||||
if faces > MAX_DICE_FACES:
|
||||
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
|
||||
total_rolls += count
|
||||
if total_rolls > MAX_TOTAL_ROLLS:
|
||||
raise RollError(f"一次最多只能实际掷 {MAX_TOTAL_ROLLS} 个骰子")
|
||||
|
||||
rng = rng or random.Random()
|
||||
terms: list[RollTermResult] = []
|
||||
total = 0
|
||||
for idx, match in enumerate(matches):
|
||||
sign_text, raw = match.groups()
|
||||
sign = -1 if sign_text == "-" else 1
|
||||
if idx == 0 and sign_text == "":
|
||||
sign = 1
|
||||
term = _parse_single_term(raw, sign, rng)
|
||||
terms.append(term)
|
||||
total += term.value
|
||||
|
||||
return RollResult(expression=expr, total=total, terms=terms)
|
||||
115
konabot/plugins/wolfx_eew.py
Normal file
115
konabot/plugins/wolfx_eew.py
Normal file
@ -0,0 +1,115 @@
|
||||
import datetime
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from konabot.common.apis.wolfx import CencEewReport, CencEqReport, wolfx_api
|
||||
from konabot.common.subscribe import PosterInfo, broadcast, register_poster_info
|
||||
|
||||
|
||||
provinces_short = [
|
||||
"北京",
|
||||
"天津",
|
||||
"河北",
|
||||
"山西",
|
||||
"内蒙古",
|
||||
"辽宁",
|
||||
"吉林",
|
||||
"黑龙江",
|
||||
"上海",
|
||||
"江苏",
|
||||
"浙江",
|
||||
"安徽",
|
||||
"福建",
|
||||
"江西",
|
||||
"山东",
|
||||
"河南",
|
||||
"湖北",
|
||||
"湖南",
|
||||
"广东",
|
||||
"广西",
|
||||
"海南",
|
||||
"重庆",
|
||||
"四川",
|
||||
"贵州",
|
||||
"云南",
|
||||
"西藏",
|
||||
"陕西",
|
||||
"甘肃",
|
||||
"青海",
|
||||
"宁夏",
|
||||
"新疆",
|
||||
"香港",
|
||||
"澳门",
|
||||
"台湾",
|
||||
]
|
||||
|
||||
|
||||
register_poster_info(
|
||||
"中国地震台网地震速报",
|
||||
PosterInfo(
|
||||
aliases={
|
||||
"地震速报",
|
||||
"地震预警",
|
||||
},
|
||||
description="来自中国地震台网的地震速报",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
CENC_EEW_DISABLED = True
|
||||
|
||||
|
||||
if not CENC_EEW_DISABLED:
|
||||
|
||||
@wolfx_api.cenc_eew.append
|
||||
async def broadcast_eew(report: CencEewReport):
|
||||
# 这个好像没那么准确...
|
||||
is_cn = any(report.HypoCenter.startswith(prefix) for prefix in provinces_short)
|
||||
if (is_cn and report.Magnitude >= 4.2) or (
|
||||
(not is_cn) and report.Magnitude >= 7.0
|
||||
):
|
||||
# 这是中国地震台网网站上,会默认展示的地震信息的等级
|
||||
origin_time_dt = datetime.datetime.strptime(
|
||||
report.OriginTime, "%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
origin_time_str = (
|
||||
f"{origin_time_dt.month}月"
|
||||
f"{origin_time_dt.day}日"
|
||||
f"{origin_time_dt.hour}时"
|
||||
f"{origin_time_dt.minute}分"
|
||||
)
|
||||
|
||||
# vvv 下面这个其实不准确
|
||||
eid_in_link = report.EventID.split(".")[0]
|
||||
link = f"https://www.cenc.ac.cn/earthquake-manage-publish-web/product-list/{eid_in_link}/summarize"
|
||||
|
||||
msg = UniMessage.text(
|
||||
"据中国地震台网中心 (https://www.cenc.ac.cn/) 报道,"
|
||||
f"北京时间{origin_time_str},"
|
||||
f"{report.HypoCenter}发生{report.Magnitude:.1f}级地震。"
|
||||
f"震源位于 {report.Longitude}° {report.Latitude}°,深度 {report.Depth}km。\n\n"
|
||||
f"详细信息请见 {link}"
|
||||
)
|
||||
await broadcast("中国地震台网地震速报", msg)
|
||||
|
||||
|
||||
@wolfx_api.cenc_eqlist.append
|
||||
async def broadcast_cenc_eqlist(report: CencEqReport):
|
||||
is_cn = any(report.location.startswith(prefix) for prefix in provinces_short)
|
||||
if (is_cn and float(report.magnitude) >= 4.2) or (
|
||||
(not is_cn) and float(report.magnitude) >= 7.0
|
||||
):
|
||||
origin_time_dt = datetime.datetime.strptime(report.time, "%Y-%m-%d %H:%M:%S")
|
||||
origin_time_str = (
|
||||
f"{origin_time_dt.month}月"
|
||||
f"{origin_time_dt.day}日"
|
||||
f"{origin_time_dt.hour}时"
|
||||
f"{origin_time_dt.minute}分"
|
||||
)
|
||||
|
||||
msg = UniMessage.text(
|
||||
"据中国地震台网中心 (https://www.cenc.ac.cn/) 消息,"
|
||||
f"北京时间{origin_time_str},"
|
||||
f"{report.location}发生{report.magnitude}级地震。"
|
||||
f"震源位于 {report.longtitude}° {report.latitude}°,深度 {report.depth}km。\n\n"
|
||||
f"数据来源于 Wolfx OpenAPI,事件 ID: {report.EventID}"
|
||||
)
|
||||
await broadcast("中国地震台网地震速报", msg)
|
||||
25
poetry.lock
generated
25
poetry.lock
generated
@ -4067,6 +4067,29 @@ type = "legacy"
|
||||
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
|
||||
reference = "mirrors"
|
||||
|
||||
[[package]]
|
||||
name = "pytest-mock"
|
||||
version = "3.15.1"
|
||||
description = "Thin-wrapper around the mock package for easier use with pytest"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d"},
|
||||
{file = "pytest_mock-3.15.1.tar.gz", hash = "sha256:1849a238f6f396da19762269de72cb1814ab44416fa73a8686deac10b0d87a0f"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
pytest = ">=6.2.5"
|
||||
|
||||
[package.extras]
|
||||
dev = ["pre-commit", "pytest-asyncio", "tox"]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
|
||||
reference = "mirrors"
|
||||
|
||||
[[package]]
|
||||
name = "python-dotenv"
|
||||
version = "1.2.2"
|
||||
@ -5311,4 +5334,4 @@ reference = "mirrors"
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.12,<4.0"
|
||||
content-hash = "f2d5345d93636e19e49852af636d598c88394aaef4020f383402394b58452e3d"
|
||||
content-hash = "23d2eadd1c36d017ff77934bd02b56e37d26005e6a99793b623c01162b90d67d"
|
||||
|
||||
@ -37,6 +37,8 @@ dependencies = [
|
||||
"pytest (>=8.0.0,<9.0.0)",
|
||||
"nonebug (>=0.4.3,<0.5.0)",
|
||||
"pytest-cov (>=7.0.0,<8.0.0)",
|
||||
"aiosignal (>=1.4.0,<2.0.0)",
|
||||
"pytest-mock (>=3.15.1,<4.0.0)",
|
||||
]
|
||||
|
||||
[tool.poetry]
|
||||
|
||||
@ -12,8 +12,22 @@ def filter(change: Change, path: str) -> bool:
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / ".git").absolute()):
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / "assets" / "oracle" / "image").absolute()):
|
||||
if (
|
||||
Path(path)
|
||||
.absolute()
|
||||
.is_relative_to((base / "assets" / "oracle" / "image").absolute())
|
||||
):
|
||||
# 还要解决坏枪的这个问题
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / "htmlcov").absolute()):
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / "test").absolute()):
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / ".pytest_cache").absolute()):
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / ".ruff_cache").absolute()):
|
||||
return False
|
||||
if path.endswith(".coverage"):
|
||||
return False
|
||||
print(path)
|
||||
return True
|
||||
|
||||
78
tests/services/test_wolfx_api.py
Normal file
78
tests/services/test_wolfx_api.py
Normal file
@ -0,0 +1,78 @@
|
||||
import json
|
||||
from unittest.mock import AsyncMock
|
||||
import pytest
|
||||
|
||||
from konabot.common.apis.wolfx import CencEewReport, WolfxAPIService, WolfxWebSocket
|
||||
|
||||
|
||||
obj_example = {
|
||||
"ID": "bacby4yab1oyb",
|
||||
"EventID": "202603100805.0001",
|
||||
"ReportTime": "2026-03-10 08:05:29",
|
||||
"ReportNum": 1,
|
||||
"OriginTime": "2026-03-10 08:05:29",
|
||||
"HypoCenter": "新疆昌吉州呼图壁县",
|
||||
"Latitude": 43.687,
|
||||
"Longitude": 86.427,
|
||||
"Magnitude": 4.0,
|
||||
"Depth": 14,
|
||||
"MaxIntensity": 5,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wolfx_websocket_handle():
|
||||
ws = WolfxWebSocket("")
|
||||
|
||||
mock_callback = AsyncMock()
|
||||
ws.signal.append(mock_callback)
|
||||
ws.signal.freeze()
|
||||
|
||||
obj1 = {
|
||||
"type": "heartbeat",
|
||||
"ver": 18,
|
||||
"id": "a69edf6436c5b605",
|
||||
"timestamp": 1773111106701,
|
||||
}
|
||||
data1 = json.dumps(obj1).encode()
|
||||
await ws.handle(data1)
|
||||
mock_callback.assert_not_called()
|
||||
mock_callback.reset_mock()
|
||||
|
||||
obj2 = obj_example
|
||||
data2 = json.dumps(obj2).encode()
|
||||
await ws.handle(data2)
|
||||
mock_callback.assert_called_once_with(data2)
|
||||
mock_callback.reset_mock()
|
||||
|
||||
data3 = b"what the f"
|
||||
await ws.handle(data3)
|
||||
mock_callback.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wolfx_bind_pydantic():
|
||||
sv = WolfxAPIService()
|
||||
called: list[CencEewReport] = []
|
||||
|
||||
@sv.cenc_eew.append
|
||||
async def _(data: CencEewReport):
|
||||
called.append(data)
|
||||
|
||||
sv._cenc_eew_ws.signal.freeze()
|
||||
sv.cenc_eew.freeze()
|
||||
|
||||
data = json.dumps(obj_example).encode()
|
||||
await sv._cenc_eew_ws.signal.send(data)
|
||||
|
||||
assert len(called) == 1
|
||||
data = called[0]
|
||||
|
||||
assert data.HypoCenter == obj_example["HypoCenter"]
|
||||
assert data.EventID == obj_example["EventID"]
|
||||
|
||||
# Don't panic when the object is invalid
|
||||
data = json.dumps({"type": "给"}).encode()
|
||||
await sv._cenc_eew_ws.signal.send(data)
|
||||
|
||||
assert len(called) == 1
|
||||
88
tests/test_fx_process.py
Normal file
88
tests/test_fx_process.py
Normal file
@ -0,0 +1,88 @@
|
||||
from importlib.util import module_from_spec, spec_from_file_location
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
from PIL import Image
|
||||
|
||||
|
||||
nonebot.init()
|
||||
|
||||
MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/fx_handle.py"
|
||||
SPEC = spec_from_file_location("test_fx_handle_module", MODULE_PATH)
|
||||
assert SPEC is not None and SPEC.loader is not None
|
||||
fx_handle = module_from_spec(SPEC)
|
||||
SPEC.loader.exec_module(fx_handle)
|
||||
ImageFilterImplement = fx_handle.ImageFilterImplement
|
||||
|
||||
INIT_MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/__init__.py"
|
||||
INIT_SPEC = spec_from_file_location("test_fx_init_module", INIT_MODULE_PATH)
|
||||
assert INIT_SPEC is not None and INIT_SPEC.loader is not None
|
||||
fx_init = module_from_spec(INIT_SPEC)
|
||||
INIT_SPEC.loader.exec_module(fx_init)
|
||||
prase_input_args = fx_init.prase_input_args
|
||||
|
||||
|
||||
def test_apply_jpeg_damage_keeps_size_and_rgba_mode():
|
||||
image = Image.new("RGBA", (32, 24), (255, 0, 0, 128))
|
||||
|
||||
result = ImageFilterImplement.apply_jpeg_damage(image, 5)
|
||||
|
||||
assert result.size == image.size
|
||||
assert result.mode == "RGBA"
|
||||
assert result.getchannel("A").getextrema() == (128, 128)
|
||||
|
||||
|
||||
def test_apply_jpeg_damage_clamps_quality_range():
|
||||
image = Image.new("RGB", (16, 16), (123, 222, 111))
|
||||
|
||||
low = ImageFilterImplement.apply_jpeg_damage(image, -10)
|
||||
high = ImageFilterImplement.apply_jpeg_damage(image, 999)
|
||||
|
||||
assert low.size == image.size
|
||||
assert high.size == image.size
|
||||
assert low.mode == "RGBA"
|
||||
assert high.mode == "RGBA"
|
||||
|
||||
|
||||
def test_apply_resize_clamps_small_result_to_at_least_one_pixel():
|
||||
image = Image.new("RGBA", (10, 10), (255, 0, 0, 255))
|
||||
|
||||
result = ImageFilterImplement.apply_resize(image, 0.01)
|
||||
|
||||
assert result.size == (1, 1)
|
||||
|
||||
|
||||
def test_apply_resize_negative_x_with_positive_y_only_mirrors_horizontally():
|
||||
image = Image.new("RGBA", (2, 1))
|
||||
image.putpixel((0, 0), (255, 0, 0, 255))
|
||||
image.putpixel((1, 0), (0, 0, 255, 255))
|
||||
|
||||
result = ImageFilterImplement.apply_resize(image, -1, 1)
|
||||
|
||||
assert result.size == (2, 1)
|
||||
assert result.getpixel((0, 0)) == (0, 0, 255, 255)
|
||||
assert result.getpixel((1, 0)) == (255, 0, 0, 255)
|
||||
|
||||
|
||||
def test_apply_resize_negative_scale_without_y_flips_both_axes():
|
||||
image = Image.new("RGBA", (2, 2))
|
||||
image.putpixel((0, 0), (255, 0, 0, 255))
|
||||
image.putpixel((1, 0), (0, 255, 0, 255))
|
||||
image.putpixel((0, 1), (0, 0, 255, 255))
|
||||
image.putpixel((1, 1), (255, 255, 0, 255))
|
||||
|
||||
result = ImageFilterImplement.apply_resize(image, -1)
|
||||
|
||||
assert result.size == (2, 2)
|
||||
assert result.getpixel((0, 0)) == (255, 255, 0, 255)
|
||||
assert result.getpixel((1, 0)) == (0, 0, 255, 255)
|
||||
assert result.getpixel((0, 1)) == (0, 255, 0, 255)
|
||||
assert result.getpixel((1, 1)) == (255, 0, 0, 255)
|
||||
|
||||
|
||||
def test_prase_input_args_parses_resize_second_argument_as_float():
|
||||
filters = prase_input_args("缩放 2 3")
|
||||
|
||||
assert len(filters) == 1
|
||||
assert filters[0].name == "缩放"
|
||||
assert filters[0].args == [2.0, 3.0]
|
||||
40
tests/test_permsys_default_allow.py
Normal file
40
tests/test_permsys_default_allow.py
Normal file
@ -0,0 +1,40 @@
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
import pytest
|
||||
|
||||
from konabot.common.database import DatabaseManager
|
||||
from konabot.common.permsys import PermManager, register_default_allow_permission
|
||||
from konabot.common.permsys.entity import PermEntity
|
||||
from konabot.common.permsys.migrates import execute_migration
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def tempdb():
|
||||
with TemporaryDirectory() as _tempdir:
|
||||
tempdir = Path(_tempdir)
|
||||
db = DatabaseManager(tempdir / "perm.sqlite3")
|
||||
yield db
|
||||
await db.close_all_connections()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_default_allow_permission_records_key():
|
||||
register_default_allow_permission("test.default.allow")
|
||||
|
||||
async with tempdb() as db:
|
||||
async with db.get_conn() as conn:
|
||||
await execute_migration(conn)
|
||||
|
||||
pm = PermManager(db)
|
||||
await pm.update_permission(
|
||||
PermEntity("sys", "global", "global"),
|
||||
"test.default.allow",
|
||||
True,
|
||||
)
|
||||
|
||||
assert await pm.check_has_permission(
|
||||
[PermEntity("dummy", "user", "1"), PermEntity("sys", "global", "global")],
|
||||
"test.default.allow.sub",
|
||||
)
|
||||
66
tests/test_trpg_roll.py
Normal file
66
tests/test_trpg_roll.py
Normal file
@ -0,0 +1,66 @@
|
||||
import random
|
||||
|
||||
import pytest
|
||||
|
||||
from konabot.plugins.trpg_roll.core import RollError, roll_expression
|
||||
|
||||
|
||||
class FakeRandom:
|
||||
def __init__(self, randint_values: list[int] | None = None, choice_values: list[int] | None = None):
|
||||
self._randint_values = list(randint_values or [])
|
||||
self._choice_values = list(choice_values or [])
|
||||
|
||||
def randint(self, _a: int, _b: int) -> int:
|
||||
assert self._randint_values
|
||||
return self._randint_values.pop(0)
|
||||
|
||||
def choice(self, _seq):
|
||||
assert self._choice_values
|
||||
return self._choice_values.pop(0)
|
||||
|
||||
|
||||
def test_roll_expression_basic():
|
||||
rng = FakeRandom(randint_values=[2, 4, 5])
|
||||
result = roll_expression("3d6", rng=rng)
|
||||
|
||||
assert result.total == 11
|
||||
assert result.format() == "3d6 = 11\n+3d6=[2, 4, 5]"
|
||||
|
||||
|
||||
def test_roll_expression_multiple_terms():
|
||||
rng = FakeRandom(randint_values=[14, 3, 1])
|
||||
result = roll_expression("d20+1d4-2", rng=rng)
|
||||
|
||||
assert result.total == 15
|
||||
assert result.format() == "d20+1d4-2 = 15\n+1d20=[14] +1d4=[3] -2=2"
|
||||
|
||||
|
||||
def test_roll_expression_df():
|
||||
rng = FakeRandom(choice_values=[-1, 0, 1, 1])
|
||||
result = roll_expression("4dF", rng=rng)
|
||||
|
||||
assert result.total == 1
|
||||
assert result.format() == "4dF = 1\n+4dF=[-1, +0, +1, +1]"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("expr", "message"),
|
||||
[
|
||||
("", "请提供要掷的表达式"),
|
||||
("abc", "无法解析表达式"),
|
||||
("1d0", "骰子面数必须大于 0"),
|
||||
("0d6", "骰子个数必须大于 0"),
|
||||
("101d6", "单项最多只能掷 100 个骰子"),
|
||||
("1d1001", "骰子面数不能超过 1000"),
|
||||
("201d1", "单项最多只能掷 100 个骰子"),
|
||||
("1d6*2", "表达式中含有无法识别的内容"),
|
||||
],
|
||||
)
|
||||
def test_roll_expression_invalid(expr: str, message: str):
|
||||
with pytest.raises(RollError, match=message):
|
||||
roll_expression(expr, rng=random.Random(0))
|
||||
|
||||
|
||||
def test_roll_expression_total_roll_limit():
|
||||
with pytest.raises(RollError, match="一次最多只能实际掷 200 个骰子"):
|
||||
roll_expression("100d6+100d6+1d6", rng=random.Random(0))
|
||||
Reference in New Issue
Block a user