Compare commits
26 Commits
feature/pe
...
feature/su
| Author | SHA1 | Date | |
|---|---|---|---|
| d37c4870d8 | |||
| 23b9f101b3 | |||
|
8c1651ad3d
|
|||
| ff60642c62 | |||
| 69b5908445 | |||
| a542ed1fd9 | |||
| e86a385448 | |||
| d4bb36a074 | |||
| 1a2a3c0468 | |||
| 67502cb932 | |||
| f9a312b80a | |||
| 1980f8a895 | |||
|
d273ed4b1a
|
|||
|
265e9cc583
|
|||
|
8f5061ba41
|
|||
|
b3c3c77f3c
|
|||
|
6a84ce2cd8
|
|||
|
392c699b33
|
|||
|
72e21cd9aa
|
|||
|
f3389ff2b9
|
|||
|
e59d3c2e4b
|
|||
|
31d19b7ec0
|
|||
|
c2f677911d
|
|||
|
f5b81319f8
|
|||
|
870e2383d8
|
|||
| 7e8fa45f36 |
@ -39,7 +39,7 @@ steps:
|
|||||||
commands:
|
commands:
|
||||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
|
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
|
||||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
|
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
|
||||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov-report term-missing:skip-covered
|
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov=./konabot/ --cov-report term-missing:skip-covered
|
||||||
- name: 发送构建结果到 ntfy
|
- name: 发送构建结果到 ntfy
|
||||||
image: parrazam/drone-ntfy
|
image: parrazam/drone-ntfy
|
||||||
when:
|
when:
|
||||||
|
|||||||
10
.gitignore
vendored
10
.gitignore
vendored
@ -3,9 +3,14 @@
|
|||||||
/data
|
/data
|
||||||
/pyrightconfig.json
|
/pyrightconfig.json
|
||||||
/pyrightconfig.toml
|
/pyrightconfig.toml
|
||||||
|
/uv.lock
|
||||||
|
|
||||||
# 缓存文件
|
# 缓存文件
|
||||||
__pycache__
|
__pycache__
|
||||||
|
/.ruff_cache
|
||||||
|
/.pytest_cache
|
||||||
|
/.mypy_cache
|
||||||
|
/.black_cache
|
||||||
|
|
||||||
# 可能会偶然生成的 diff 文件
|
# 可能会偶然生成的 diff 文件
|
||||||
/*.diff
|
/*.diff
|
||||||
@ -14,3 +19,8 @@ __pycache__
|
|||||||
/.coverage
|
/.coverage
|
||||||
/.coverage.db
|
/.coverage.db
|
||||||
/htmlcov
|
/htmlcov
|
||||||
|
|
||||||
|
# 对手动创建虚拟环境的人
|
||||||
|
/.venv
|
||||||
|
/venv
|
||||||
|
*.egg-info
|
||||||
|
|||||||
4
.vscode/settings.json
vendored
4
.vscode/settings.json
vendored
@ -1,3 +1,5 @@
|
|||||||
{
|
{
|
||||||
"python.REPL.enableREPLSmartSend": false
|
"python.REPL.enableREPLSmartSend": false,
|
||||||
|
"python-envs.defaultEnvManager": "ms-python.python:poetry",
|
||||||
|
"python-envs.defaultPackageManager": "ms-python.python:poetry"
|
||||||
}
|
}
|
||||||
@ -61,6 +61,7 @@ COPY bot.py pyproject.toml .env.prod .env.test ./
|
|||||||
COPY assets ./assets
|
COPY assets ./assets
|
||||||
COPY scripts ./scripts
|
COPY scripts ./scripts
|
||||||
COPY konabot ./konabot
|
COPY konabot ./konabot
|
||||||
|
COPY tests ./tests
|
||||||
|
|
||||||
ENV PYTHONPATH=/app
|
ENV PYTHONPATH=/app
|
||||||
|
|
||||||
|
|||||||
5
bot.py
5
bot.py
@ -7,6 +7,7 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter
|
|||||||
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
|
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
|
||||||
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
|
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
|
||||||
|
|
||||||
|
from konabot.common.appcontext import run_afterinit_functions
|
||||||
from konabot.common.log import init_logger
|
from konabot.common.log import init_logger
|
||||||
from konabot.common.nb.exc import BotExceptionMessage
|
from konabot.common.nb.exc import BotExceptionMessage
|
||||||
from konabot.common.path import LOG_PATH
|
from konabot.common.path import LOG_PATH
|
||||||
@ -56,9 +57,7 @@ def main():
|
|||||||
nonebot.load_plugins("konabot/plugins")
|
nonebot.load_plugins("konabot/plugins")
|
||||||
nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
|
nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
|
||||||
|
|
||||||
from konabot.common import permsys
|
run_afterinit_functions()
|
||||||
|
|
||||||
permsys.create_startup()
|
|
||||||
|
|
||||||
# 注册关闭钩子
|
# 注册关闭钩子
|
||||||
@driver.on_shutdown
|
@driver.on_shutdown
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
- `konabot/common/permsys/__init__.py`
|
- `konabot/common/permsys/__init__.py`
|
||||||
- 暴露 `PermManager`、`DepPermManager`、`require_permission`
|
- 暴露 `PermManager`、`DepPermManager`、`require_permission`
|
||||||
- 负责数据库初始化、启动迁移、超级管理员默认授权
|
- 负责数据库初始化、启动迁移、超级管理员默认授权
|
||||||
|
- 提供 `register_default_allow_permission()` 用于注册“启动时默认放行”的权限键
|
||||||
- `konabot/common/permsys/entity.py`
|
- `konabot/common/permsys/entity.py`
|
||||||
- 定义 `PermEntity`
|
- 定义 `PermEntity`
|
||||||
- 将事件转换为可查询的实体链
|
- 将事件转换为可查询的实体链
|
||||||
@ -134,6 +135,14 @@ PermEntity("ob11", "user", str(account)), "*", True
|
|||||||
|
|
||||||
也就是说,配置中的超级管理员会直接拥有全部权限。
|
也就是说,配置中的超级管理员会直接拥有全部权限。
|
||||||
|
|
||||||
|
此外,模块也支持插件在导入阶段通过 `register_default_allow_permission("some.key")` 注册默认放行的权限键;这些键会在启动时被写入到:
|
||||||
|
|
||||||
|
```python
|
||||||
|
PermEntity("sys", "global", "global"), "some.key", True
|
||||||
|
```
|
||||||
|
|
||||||
|
这适合“默认所有人可用,但仍希望后续能被权限系统单独关闭”的功能。
|
||||||
|
|
||||||
这属于启动时自动灌入的保底策略,不依赖手工授权命令。
|
这属于启动时自动灌入的保底策略,不依赖手工授权命令。
|
||||||
|
|
||||||
## 在插件中使用
|
## 在插件中使用
|
||||||
|
|||||||
37
docs/subscribe.md
Normal file
37
docs/subscribe.md
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
# subscribe 模块
|
||||||
|
|
||||||
|
一套统一的接口,让用户可以订阅一些延迟或者定时消息。
|
||||||
|
|
||||||
|
```python
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from konabot.common.subscribe import register_poster_info, broadcast, PosterInfo
|
||||||
|
from nonebot_plugin_alconna import UniMessage
|
||||||
|
|
||||||
|
# 注册了服务信息,用户可以用「查询可用订阅」指令了解可用的订阅清单。
|
||||||
|
# 用户可以使用「订阅 某某服务通知」或者「订阅 某某服务」来订阅消息。
|
||||||
|
# 如果用户在群聊发起订阅,则会在 QQ 群订阅,不然会在私聊订阅
|
||||||
|
register_poster_info("某某服务通知", PosterInfo(
|
||||||
|
aliases={"某某服务"},
|
||||||
|
description="告诉你关于某某的最新资讯等信息",
|
||||||
|
))
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
while True:
|
||||||
|
# 这里的服务 channel 名字必须填写该服务的名字,不可以是 alias
|
||||||
|
# 这会给所有订阅了该通道的用户发送「向大家发送纯文本通知」
|
||||||
|
await broadcast("某某服务通知", "向大家发送纯文本通知")
|
||||||
|
|
||||||
|
# 也可以发送 UniMessage 对象,可以构造包含图片的通知等
|
||||||
|
data = Path('image.png').read_bytes()
|
||||||
|
await broadcast(
|
||||||
|
"某某服务通知",
|
||||||
|
UniMessage.text("很遗憾告诉大家,我们倒闭了:").image(raw=data),
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.sleep(114.514)
|
||||||
|
```
|
||||||
|
|
||||||
|
该模块的代码请查阅 `/konabot/common/subscribe/` 下的文件。
|
||||||
281
konabot/common/apis/wolfx.py
Normal file
281
konabot/common/apis/wolfx.py
Normal file
@ -0,0 +1,281 @@
|
|||||||
|
"""
|
||||||
|
Wolfx 防灾免费 API
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
from typing import Literal, TypeVar, cast
|
||||||
|
import aiohttp
|
||||||
|
from aiosignal import Signal
|
||||||
|
from loguru import logger
|
||||||
|
from pydantic import BaseModel, RootModel
|
||||||
|
import pydantic
|
||||||
|
|
||||||
|
from konabot.common.appcontext import after_init
|
||||||
|
|
||||||
|
|
||||||
|
class ScEewReport(BaseModel):
|
||||||
|
"""
|
||||||
|
四川地震局报文
|
||||||
|
"""
|
||||||
|
|
||||||
|
ID: str
|
||||||
|
"EEW 发报 ID"
|
||||||
|
|
||||||
|
EventID: str
|
||||||
|
"EEW 发报事件 ID"
|
||||||
|
|
||||||
|
ReportTime: str
|
||||||
|
"EEW 发报时间(UTC+8)"
|
||||||
|
|
||||||
|
ReportNum: int
|
||||||
|
"EEW 发报数"
|
||||||
|
|
||||||
|
OriginTime: str
|
||||||
|
"发震时间(UTC+8)"
|
||||||
|
|
||||||
|
HypoCenter: str
|
||||||
|
"震源地"
|
||||||
|
|
||||||
|
Latitude: float
|
||||||
|
"震源地纬度"
|
||||||
|
|
||||||
|
Longitude: float
|
||||||
|
"震源地经度"
|
||||||
|
|
||||||
|
Magnitude: float
|
||||||
|
"震级"
|
||||||
|
|
||||||
|
Depth: float | None
|
||||||
|
"震源深度"
|
||||||
|
|
||||||
|
MaxIntensity: float
|
||||||
|
"最大烈度"
|
||||||
|
|
||||||
|
|
||||||
|
class CencEewReport(BaseModel):
|
||||||
|
"""
|
||||||
|
中国地震台网报文
|
||||||
|
"""
|
||||||
|
|
||||||
|
ID: str
|
||||||
|
"EEW 发报 ID"
|
||||||
|
|
||||||
|
EventID: str
|
||||||
|
"EEW 发报事件 ID"
|
||||||
|
|
||||||
|
ReportTime: str
|
||||||
|
"EEW 发报时间(UTC+8)"
|
||||||
|
|
||||||
|
ReportNum: int
|
||||||
|
"EEW 发报数"
|
||||||
|
|
||||||
|
OriginTime: str
|
||||||
|
"发震时间(UTC+8)"
|
||||||
|
|
||||||
|
HypoCenter: str
|
||||||
|
"震源地"
|
||||||
|
|
||||||
|
Latitude: float
|
||||||
|
"震源地纬度"
|
||||||
|
|
||||||
|
Longitude: float
|
||||||
|
"震源地经度"
|
||||||
|
|
||||||
|
Magnitude: float
|
||||||
|
"震级"
|
||||||
|
|
||||||
|
Depth: float | None
|
||||||
|
"震源深度"
|
||||||
|
|
||||||
|
MaxIntensity: float
|
||||||
|
"最大烈度"
|
||||||
|
|
||||||
|
|
||||||
|
class CencEqReport(BaseModel):
|
||||||
|
type: str
|
||||||
|
"报告类型"
|
||||||
|
|
||||||
|
EventID: str
|
||||||
|
"事件 ID"
|
||||||
|
|
||||||
|
time: str
|
||||||
|
"UTC+8 格式的地震发生时间"
|
||||||
|
|
||||||
|
location: str
|
||||||
|
"地震发生位置"
|
||||||
|
|
||||||
|
magnitude: str
|
||||||
|
"震级"
|
||||||
|
|
||||||
|
depth: str
|
||||||
|
"地震深度"
|
||||||
|
|
||||||
|
latitude: str
|
||||||
|
"纬度"
|
||||||
|
|
||||||
|
longtitude: str
|
||||||
|
"经度"
|
||||||
|
|
||||||
|
intensity: str
|
||||||
|
"烈度"
|
||||||
|
|
||||||
|
|
||||||
|
class CencEqlist(RootModel):
|
||||||
|
root: dict[str, CencEqReport]
|
||||||
|
|
||||||
|
|
||||||
|
class WolfxWebSocket:
|
||||||
|
def __init__(self, url: str) -> None:
|
||||||
|
self.url = url
|
||||||
|
self.signal: Signal[bytes] = Signal(self)
|
||||||
|
self._running = False
|
||||||
|
self._task: asyncio.Task | None = None
|
||||||
|
self._session: aiohttp.ClientSession | None = None
|
||||||
|
self._ws: aiohttp.ClientWebSocketResponse | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def session(self) -> aiohttp.ClientSession: # pragma: no cover
|
||||||
|
assert self._session is not None
|
||||||
|
return self._session
|
||||||
|
|
||||||
|
async def start(self): # pragma: no cover
|
||||||
|
if self._running:
|
||||||
|
return
|
||||||
|
self._running = True
|
||||||
|
self._session = aiohttp.ClientSession()
|
||||||
|
self._task = asyncio.create_task(self._run())
|
||||||
|
self.signal.freeze()
|
||||||
|
|
||||||
|
async def stop(self): # pragma: no cover
|
||||||
|
self._running = False
|
||||||
|
if self._task:
|
||||||
|
self._task.cancel()
|
||||||
|
try:
|
||||||
|
await self._task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
if self._session:
|
||||||
|
await self._session.close()
|
||||||
|
|
||||||
|
async def _run(self): # pragma: no cover
|
||||||
|
retry_delay = 1
|
||||||
|
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
async with self.session.ws_connect(self.url) as ws:
|
||||||
|
self._ws = ws
|
||||||
|
logger.info(f"Wolfx API 服务连接上了 {self.url} 的 WebSocket")
|
||||||
|
async for msg in ws:
|
||||||
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||||
|
await self.handle(cast(str, msg.data).encode())
|
||||||
|
elif msg.type == aiohttp.WSMsgType.BINARY:
|
||||||
|
await self.handle(cast(bytes, msg.data))
|
||||||
|
elif msg.type == aiohttp.WSMsgType.CLOSED:
|
||||||
|
break
|
||||||
|
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||||
|
break
|
||||||
|
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
||||||
|
logger.warning("连接 WebSocket 时发生错误")
|
||||||
|
logger.exception(e)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Wolfx API 发生未知错误")
|
||||||
|
logger.exception(e)
|
||||||
|
self._ws = None
|
||||||
|
|
||||||
|
if self._running:
|
||||||
|
logger.info(f"Wolfx API 准备断线重连 {self.url}")
|
||||||
|
await asyncio.sleep(retry_delay)
|
||||||
|
retry_delay = min(retry_delay * 2, 60)
|
||||||
|
|
||||||
|
async def handle(self, data: bytes):
|
||||||
|
try:
|
||||||
|
obj = json.loads(data)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.warning("解析 Wolfs API 时出错")
|
||||||
|
logger.exception(e)
|
||||||
|
return
|
||||||
|
|
||||||
|
if obj.get("type") == "heartbeat" or obj.get("type") == "pong":
|
||||||
|
logger.debug(f"Wolfx API 收到了来自 {self.url} 的心跳: {obj}")
|
||||||
|
else:
|
||||||
|
await self.signal.send(data)
|
||||||
|
|
||||||
|
|
||||||
|
T = TypeVar("T", bound=BaseModel)
|
||||||
|
|
||||||
|
|
||||||
|
class WolfxAPIService:
|
||||||
|
sc_eew: Signal[ScEewReport]
|
||||||
|
"四川地震局地震速报"
|
||||||
|
|
||||||
|
cenc_eew: Signal[CencEewReport]
|
||||||
|
"中国地震台网地震速报"
|
||||||
|
|
||||||
|
cenc_eqlist: Signal[CencEqReport]
|
||||||
|
"中国地震台网地震信息发布"
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.sc_eew = Signal(self)
|
||||||
|
self._sc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/sc_eew")
|
||||||
|
WolfxAPIService.bind(self.sc_eew, self._sc_eew_ws, ScEewReport)
|
||||||
|
|
||||||
|
self.cenc_eew = Signal(self)
|
||||||
|
self._cenc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eew")
|
||||||
|
WolfxAPIService.bind(self.cenc_eew, self._cenc_eew_ws, CencEewReport)
|
||||||
|
|
||||||
|
self.cenc_eqlist = Signal(self)
|
||||||
|
self._cenc_eqlist_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eqlist")
|
||||||
|
WolfxAPIService.bind(self.cenc_eqlist, self._cenc_eqlist_ws, CencEqReport)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def bind(signal: Signal[T], ws: WolfxWebSocket, t: type[T]):
|
||||||
|
@ws.signal.append
|
||||||
|
async def _(data: bytes):
|
||||||
|
try:
|
||||||
|
obj = t.model_validate_json(data)
|
||||||
|
logger.info(f"接收到来自 Wolfx API 的信息:{data}")
|
||||||
|
await signal.send(obj)
|
||||||
|
except pydantic.ValidationError as e:
|
||||||
|
logger.warning(f"解析 Wolfx API 时出错 URL={ws.url}")
|
||||||
|
logger.error(e)
|
||||||
|
|
||||||
|
async def start(self): # pragma: no cover
|
||||||
|
self.cenc_eew.freeze()
|
||||||
|
self.sc_eew.freeze()
|
||||||
|
self.cenc_eqlist.freeze()
|
||||||
|
async with asyncio.TaskGroup() as task_group:
|
||||||
|
if len(self.cenc_eew) > 0:
|
||||||
|
task_group.create_task(self._cenc_eew_ws.start())
|
||||||
|
|
||||||
|
if len(self.sc_eew) > 0:
|
||||||
|
task_group.create_task(self._sc_eew_ws.start())
|
||||||
|
|
||||||
|
if len(self.cenc_eqlist) > 0:
|
||||||
|
task_group.create_task(self._cenc_eqlist_ws.start())
|
||||||
|
|
||||||
|
async def stop(self): # pragma: no cover
|
||||||
|
async with asyncio.TaskGroup() as task_group:
|
||||||
|
task_group.create_task(self._cenc_eew_ws.stop())
|
||||||
|
task_group.create_task(self._sc_eew_ws.stop())
|
||||||
|
task_group.create_task(self._cenc_eqlist_ws.stop())
|
||||||
|
|
||||||
|
|
||||||
|
wolfx_api = WolfxAPIService()
|
||||||
|
|
||||||
|
|
||||||
|
@after_init
|
||||||
|
def init(): # pragma: no cover
|
||||||
|
import nonebot
|
||||||
|
|
||||||
|
driver = nonebot.get_driver()
|
||||||
|
|
||||||
|
@driver.on_startup
|
||||||
|
async def _():
|
||||||
|
await wolfx_api.start()
|
||||||
|
|
||||||
|
@driver.on_shutdown
|
||||||
|
async def _():
|
||||||
|
await wolfx_api.stop()
|
||||||
15
konabot/common/appcontext.py
Normal file
15
konabot/common/appcontext.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
from typing import Any, Callable
|
||||||
|
|
||||||
|
|
||||||
|
AFTER_INIT_FUNCTION = Callable[[], Any]
|
||||||
|
|
||||||
|
_after_init_functions: list[AFTER_INIT_FUNCTION] = []
|
||||||
|
|
||||||
|
|
||||||
|
def after_init(func: AFTER_INIT_FUNCTION):
|
||||||
|
_after_init_functions.append(func)
|
||||||
|
|
||||||
|
|
||||||
|
def run_afterinit_functions(): # pragma: no cover
|
||||||
|
for f in _after_init_functions:
|
||||||
|
f()
|
||||||
@ -4,6 +4,7 @@ from nonebot.adapters import Event
|
|||||||
from nonebot.params import Depends
|
from nonebot.params import Depends
|
||||||
from nonebot.rule import Rule
|
from nonebot.rule import Rule
|
||||||
|
|
||||||
|
from konabot.common.appcontext import after_init
|
||||||
from konabot.common.database import DatabaseManager
|
from konabot.common.database import DatabaseManager
|
||||||
from konabot.common.pager import PagerQuery
|
from konabot.common.pager import PagerQuery
|
||||||
from konabot.common.path import DATA_PATH
|
from konabot.common.path import DATA_PATH
|
||||||
@ -13,6 +14,7 @@ from konabot.common.permsys.repo import PermRepo
|
|||||||
|
|
||||||
|
|
||||||
db = DatabaseManager(DATA_PATH / "perm.sqlite3")
|
db = DatabaseManager(DATA_PATH / "perm.sqlite3")
|
||||||
|
_default_allow_permissions: set[str] = set()
|
||||||
|
|
||||||
|
|
||||||
_EntityLike = Event | PermEntity | list[PermEntity]
|
_EntityLike = Event | PermEntity | list[PermEntity]
|
||||||
@ -73,6 +75,7 @@ def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma:
|
|||||||
return PermManager(_db)
|
return PermManager(_db)
|
||||||
|
|
||||||
|
|
||||||
|
@after_init
|
||||||
def create_startup(): # pragma: no cover
|
def create_startup(): # pragma: no cover
|
||||||
from konabot.common.nb.is_admin import cfg
|
from konabot.common.nb.is_admin import cfg
|
||||||
|
|
||||||
@ -89,6 +92,10 @@ def create_startup(): # pragma: no cover
|
|||||||
await pm.update_permission(
|
await pm.update_permission(
|
||||||
PermEntity("ob11", "user", str(account)), "*", True
|
PermEntity("ob11", "user", str(account)), "*", True
|
||||||
)
|
)
|
||||||
|
for key in _default_allow_permissions:
|
||||||
|
await pm.update_permission(
|
||||||
|
PermEntity("sys", "global", "global"), key, True
|
||||||
|
)
|
||||||
|
|
||||||
@driver.on_shutdown
|
@driver.on_shutdown
|
||||||
async def _():
|
async def _():
|
||||||
@ -101,6 +108,10 @@ def create_startup(): # pragma: no cover
|
|||||||
DepPermManager = Annotated[PermManager, Depends(perm_manager)]
|
DepPermManager = Annotated[PermManager, Depends(perm_manager)]
|
||||||
|
|
||||||
|
|
||||||
|
def register_default_allow_permission(key: str):
|
||||||
|
_default_allow_permissions.add(key)
|
||||||
|
|
||||||
|
|
||||||
def require_permission(perm: str) -> Rule: # pragma: no cover
|
def require_permission(perm: str) -> Rule: # pragma: no cover
|
||||||
async def check_permission(event: Event, pm: DepPermManager) -> bool:
|
async def check_permission(event: Event, pm: DepPermManager) -> bool:
|
||||||
return await pm.check_has_permission(event, perm)
|
return await pm.check_has_permission(event, perm)
|
||||||
|
|||||||
11
konabot/common/subscribe/__init__.py
Normal file
11
konabot/common/subscribe/__init__.py
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
"""
|
||||||
|
Subscribe 模块,用于向一些订阅的频道广播消息
|
||||||
|
"""
|
||||||
|
|
||||||
|
from .service import broadcast as broadcast
|
||||||
|
from .service import dep_poster_service as dep_poster_service
|
||||||
|
from .service import DepPosterService as DepPosterService
|
||||||
|
from .service import PosterService as PosterService
|
||||||
|
from .subscribe_info import PosterInfo as PosterInfo
|
||||||
|
from .subscribe_info import POSTER_INFO_DATA as POSTER_INFO_DATA
|
||||||
|
from .subscribe_info import register_poster_info as register_poster_info
|
||||||
@ -6,7 +6,8 @@ from pydantic import BaseModel, ValidationError
|
|||||||
from konabot.common.longtask import LongTaskTarget
|
from konabot.common.longtask import LongTaskTarget
|
||||||
from konabot.common.pager import PagerQuery, PagerResult
|
from konabot.common.pager import PagerQuery, PagerResult
|
||||||
from konabot.common.path import DATA_PATH
|
from konabot.common.path import DATA_PATH
|
||||||
from konabot.plugins.poster.repository import IPosterRepo
|
|
||||||
|
from .repository import IPosterRepo
|
||||||
|
|
||||||
|
|
||||||
class ChannelData(BaseModel):
|
class ChannelData(BaseModel):
|
||||||
@ -18,9 +19,9 @@ class PosterData(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
|
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
|
||||||
if (target1.is_private_chat and not target2.is_private_chat):
|
if target1.is_private_chat and not target2.is_private_chat:
|
||||||
return False
|
return False
|
||||||
if (target2.is_private_chat and not target1.is_private_chat):
|
if target2.is_private_chat and not target1.is_private_chat:
|
||||||
return False
|
return False
|
||||||
if target1.platform != target2.platform:
|
if target1.platform != target2.platform:
|
||||||
return False
|
return False
|
||||||
@ -58,7 +59,9 @@ class LocalPosterRepo(IPosterRepo):
|
|||||||
len1 = len(self.data.channels[channel].targets)
|
len1 = len(self.data.channels[channel].targets)
|
||||||
return len0 != len1
|
return len0 != len1
|
||||||
|
|
||||||
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
|
async def get_subscribed_channels(
|
||||||
|
self, target: LongTaskTarget, pager: PagerQuery
|
||||||
|
) -> PagerResult[str]:
|
||||||
channels: list[str] = []
|
channels: list[str] = []
|
||||||
for channel_id, channel in self.data.channels.items():
|
for channel_id, channel in self.data.channels.items():
|
||||||
for t in channel.targets:
|
for t in channel.targets:
|
||||||
@ -95,7 +98,9 @@ async def local_poster_data():
|
|||||||
data = PosterData()
|
data = PosterData()
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
data = PosterData.model_validate_json(LOCAL_POSTER_DATA_PATH.read_text())
|
data = PosterData.model_validate_json(
|
||||||
|
LOCAL_POSTER_DATA_PATH.read_text()
|
||||||
|
)
|
||||||
except ValidationError:
|
except ValidationError:
|
||||||
data = PosterData()
|
data = PosterData()
|
||||||
yield data
|
yield data
|
||||||
@ -109,4 +114,3 @@ async def local_poster():
|
|||||||
|
|
||||||
|
|
||||||
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]
|
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]
|
||||||
|
|
||||||
@ -4,9 +4,10 @@ from nonebot.params import Depends
|
|||||||
from nonebot_plugin_alconna import UniMessage
|
from nonebot_plugin_alconna import UniMessage
|
||||||
from konabot.common.longtask import LongTaskTarget
|
from konabot.common.longtask import LongTaskTarget
|
||||||
from konabot.common.pager import PagerQuery, PagerResult
|
from konabot.common.pager import PagerQuery, PagerResult
|
||||||
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
|
|
||||||
from konabot.plugins.poster.repo_local_data import local_poster
|
from .subscribe_info import POSTER_INFO_DATA
|
||||||
from konabot.plugins.poster.repository import IPosterRepo
|
from .repo_local_data import local_poster
|
||||||
|
from .repository import IPosterRepo
|
||||||
|
|
||||||
|
|
||||||
class PosterService:
|
class PosterService:
|
||||||
@ -27,7 +28,9 @@ class PosterService:
|
|||||||
channel = self.parse_channel_id(channel)
|
channel = self.parse_channel_id(channel)
|
||||||
return await self.repo.remove_channel_target(channel, target)
|
return await self.repo.remove_channel_target(channel, target)
|
||||||
|
|
||||||
async def broadcast(self, channel: str, message: UniMessage[Any] | str) -> list[LongTaskTarget]:
|
async def broadcast(
|
||||||
|
self, channel: str, message: UniMessage[Any] | str
|
||||||
|
) -> list[LongTaskTarget]:
|
||||||
channel = self.parse_channel_id(channel)
|
channel = self.parse_channel_id(channel)
|
||||||
targets = await self.repo.get_channel_targets(channel)
|
targets = await self.repo.get_channel_targets(channel)
|
||||||
for target in targets:
|
for target in targets:
|
||||||
@ -35,7 +38,9 @@ class PosterService:
|
|||||||
await target.send_message(message, at=False)
|
await target.send_message(message, at=False)
|
||||||
return targets
|
return targets
|
||||||
|
|
||||||
async def get_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
|
async def get_channels(
|
||||||
|
self, target: LongTaskTarget, pager: PagerQuery
|
||||||
|
) -> PagerResult[str]:
|
||||||
return await self.repo.get_subscribed_channels(target, pager)
|
return await self.repo.get_subscribed_channels(target, pager)
|
||||||
|
|
||||||
async def fix_data(self):
|
async def fix_data(self):
|
||||||
@ -56,4 +61,3 @@ async def broadcast(channel: str, message: UniMessage[Any] | str):
|
|||||||
|
|
||||||
|
|
||||||
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]
|
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]
|
||||||
|
|
||||||
@ -4,7 +4,7 @@ from dataclasses import dataclass, field
|
|||||||
@dataclass
|
@dataclass
|
||||||
class PosterInfo:
|
class PosterInfo:
|
||||||
aliases: set[str] = field(default_factory=set)
|
aliases: set[str] = field(default_factory=set)
|
||||||
description: str = field(default='')
|
description: str = field(default="")
|
||||||
|
|
||||||
|
|
||||||
POSTER_INFO_DATA: dict[str, PosterInfo] = {}
|
POSTER_INFO_DATA: dict[str, PosterInfo] = {}
|
||||||
@ -12,4 +12,3 @@ POSTER_INFO_DATA: dict[str, PosterInfo] = {}
|
|||||||
|
|
||||||
def register_poster_info(channel: str, info: PosterInfo):
|
def register_poster_info(channel: str, info: PosterInfo):
|
||||||
POSTER_INFO_DATA[channel] = info
|
POSTER_INFO_DATA[channel] = info
|
||||||
|
|
||||||
@ -76,6 +76,8 @@ fx [滤镜名称] <参数1> <参数2> ...
|
|||||||
* ```fx 设置遮罩```
|
* ```fx 设置遮罩```
|
||||||
* ```fx 色键 <目标颜色="rgb(255,0,0)"> <容差=60>```
|
* ```fx 色键 <目标颜色="rgb(255,0,0)"> <容差=60>```
|
||||||
* ```fx 晃动 <最大偏移量=5> <运动模糊=False>```
|
* ```fx 晃动 <最大偏移量=5> <运动模糊=False>```
|
||||||
|
* ```fx JPEG损坏 <质量=10>```
|
||||||
|
* 质量范围建议为 1~95,数值越低,压缩痕迹越重、效果越搞笑。
|
||||||
* ```fx 动图 <帧率=10>```
|
* ```fx 动图 <帧率=10>```
|
||||||
|
|
||||||
### 多图像处理器
|
### 多图像处理器
|
||||||
|
|||||||
53
konabot/docs/user/roll.txt
Normal file
53
konabot/docs/user/roll.txt
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
**roll** - 面向跑团的文本骰子指令
|
||||||
|
|
||||||
|
## 用法
|
||||||
|
|
||||||
|
`roll 表达式`
|
||||||
|
|
||||||
|
支持常见骰子写法:
|
||||||
|
|
||||||
|
- `roll 3d6`
|
||||||
|
- `roll d20+5`
|
||||||
|
- `roll 2d8+1d4+3`
|
||||||
|
- `roll d%`
|
||||||
|
- `roll 4dF`
|
||||||
|
|
||||||
|
## 说明
|
||||||
|
|
||||||
|
- `NdM` 表示掷 N 个 M 面骰,例如 `3d6`
|
||||||
|
- `d20` 等价于 `1d20`
|
||||||
|
- `d%` 表示百分骰,范围 1 到 100
|
||||||
|
- `dF` 表示 Fate/Fudge 骰,单骰结果为 -1、0、+1
|
||||||
|
- 支持用 `+`、`-` 连接多个项,也支持常数修正
|
||||||
|
|
||||||
|
## 返回格式
|
||||||
|
|
||||||
|
会返回总结果,以及每一项的明细。
|
||||||
|
|
||||||
|
例如:
|
||||||
|
|
||||||
|
- `roll 3d6`
|
||||||
|
可能返回:
|
||||||
|
- `3d6 = 11`
|
||||||
|
- `+3d6=[2, 4, 5]`
|
||||||
|
|
||||||
|
- `roll d20+5`
|
||||||
|
可能返回:
|
||||||
|
- `d20+5 = 19`
|
||||||
|
- `+1d20=[14] +5=5`
|
||||||
|
|
||||||
|
## 限制
|
||||||
|
|
||||||
|
为防止刷屏和滥用,当前实现会限制:
|
||||||
|
|
||||||
|
- 单项最多 100 个骰子
|
||||||
|
- 单个骰子最多 1000 面
|
||||||
|
- 一次表达式最多 20 项
|
||||||
|
- 一次表达式最多实际掷 200 个骰子
|
||||||
|
- 结果过长时会直接拒绝
|
||||||
|
|
||||||
|
## 权限
|
||||||
|
|
||||||
|
需要 `trpg.roll` 权限。
|
||||||
|
|
||||||
|
默认启动时会给系统全局授予允许,因此通常所有人都能用;如有需要可再用权限系统单独关闭。
|
||||||
@ -1,12 +1,14 @@
|
|||||||
import re
|
import re
|
||||||
|
|
||||||
from nonebot import get_plugin_config, on_message
|
from nonebot import get_plugin_config, on_message
|
||||||
|
from nonebot.rule import Rule
|
||||||
from nonebot_plugin_alconna import Reference, Reply, UniMsg
|
from nonebot_plugin_alconna import Reference, Reply, UniMsg
|
||||||
|
|
||||||
from nonebot.adapters import Event
|
from nonebot.adapters import Event
|
||||||
from nonebot.adapters.onebot.v11.event import GroupMessageEvent as OB11GroupEvent
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from konabot.common.permsys import require_permission
|
||||||
|
|
||||||
|
|
||||||
class Config(BaseModel):
|
class Config(BaseModel):
|
||||||
bilifetch_enabled_groups: list[int] = []
|
bilifetch_enabled_groups: list[int] = []
|
||||||
@ -19,11 +21,7 @@ pattern = (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _rule(msg: UniMsg, evt: Event) -> bool:
|
def _rule(msg: UniMsg) -> bool:
|
||||||
if isinstance(evt, OB11GroupEvent):
|
|
||||||
if evt.group_id not in config.bilifetch_enabled_groups:
|
|
||||||
return False
|
|
||||||
|
|
||||||
to_search = msg.exclude(Reply, Reference).dump(json=True)
|
to_search = msg.exclude(Reply, Reference).dump(json=True)
|
||||||
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
|
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
|
||||||
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
|
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
|
||||||
@ -31,11 +29,11 @@ def _rule(msg: UniMsg, evt: Event) -> bool:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
matcher_fix = on_message(rule=_rule)
|
matcher_fix = on_message(rule=Rule(_rule) & require_permission("bilifetch"))
|
||||||
|
|
||||||
|
|
||||||
@matcher_fix.handle()
|
@matcher_fix.handle()
|
||||||
async def _(event: Event):
|
async def _(event: Event):
|
||||||
from nonebot_plugin_analysis_bilibili import handle_analysis
|
from nonebot_plugin_analysis_bilibili import handle_analysis
|
||||||
|
|
||||||
await handle_analysis(event)
|
await handle_analysis(event)
|
||||||
|
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
import random
|
import random
|
||||||
|
from io import BytesIO
|
||||||
from PIL import Image, ImageFilter, ImageDraw, ImageStat, ImageFont
|
from PIL import Image, ImageFilter, ImageDraw, ImageStat, ImageFont
|
||||||
from PIL import ImageEnhance
|
from PIL import ImageEnhance
|
||||||
from PIL import ImageChops
|
from PIL import ImageChops
|
||||||
@ -167,26 +168,53 @@ class ImageFilterImplement:
|
|||||||
|
|
||||||
return Image.fromarray(result, 'RGBA')
|
return Image.fromarray(result, 'RGBA')
|
||||||
|
|
||||||
|
# JPEG 损坏感压缩
|
||||||
|
@staticmethod
|
||||||
|
def apply_jpeg_damage(image: Image.Image, quality: int = 10) -> Image.Image:
|
||||||
|
quality = max(1, min(95, int(quality)))
|
||||||
|
|
||||||
|
alpha = None
|
||||||
|
if image.mode in ('RGBA', 'LA') or (image.mode == 'P' and 'transparency' in image.info):
|
||||||
|
rgba_image = image.convert('RGBA')
|
||||||
|
alpha = rgba_image.getchannel('A')
|
||||||
|
rgb_image = Image.new('RGB', rgba_image.size, (255, 255, 255))
|
||||||
|
rgb_image.paste(rgba_image, mask=alpha)
|
||||||
|
else:
|
||||||
|
rgb_image = image.convert('RGB')
|
||||||
|
|
||||||
|
output = BytesIO()
|
||||||
|
rgb_image.save(output, format='JPEG', quality=quality, optimize=False)
|
||||||
|
output.seek(0)
|
||||||
|
damaged = Image.open(output).convert('RGB')
|
||||||
|
|
||||||
|
if alpha is not None:
|
||||||
|
return Image.merge('RGBA', (*damaged.split(), alpha))
|
||||||
|
return damaged.convert('RGBA')
|
||||||
|
|
||||||
# 缩放
|
# 缩放
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y = None) -> Image.Image:
|
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y: float = None) -> Image.Image:
|
||||||
# scale 可以为负
|
scale_x = float(scale)
|
||||||
# 如果 scale 为负,则代表翻转
|
scale_y_value = float(scale_y) if scale_y is not None else None
|
||||||
if scale_y is not None:
|
|
||||||
if float(scale_y) < 0:
|
if scale_y_value is not None:
|
||||||
|
if scale_y_value < 0:
|
||||||
image = ImageOps.flip(image)
|
image = ImageOps.flip(image)
|
||||||
scale_y = abs(float(scale_y))
|
scale_y_value = abs(scale_y_value)
|
||||||
if scale < 0:
|
if scale_x < 0:
|
||||||
image = ImageOps.mirror(image)
|
image = ImageOps.mirror(image)
|
||||||
scale = abs(scale)
|
scale_x = abs(scale_x)
|
||||||
new_size = (int(image.width * scale), int(image.height * float(scale_y)))
|
target_scale_y = scale_y_value
|
||||||
return image.resize(new_size, Image.Resampling.LANCZOS)
|
else:
|
||||||
if scale < 0:
|
if scale_x < 0:
|
||||||
image = ImageOps.mirror(image)
|
image = ImageOps.mirror(image)
|
||||||
image = ImageOps.flip(image)
|
image = ImageOps.flip(image)
|
||||||
scale = abs(scale)
|
scale_x = abs(scale_x)
|
||||||
new_size = (int(image.width * scale), int(image.height * scale))
|
target_scale_y = scale_x
|
||||||
return image.resize(new_size, Image.Resampling.LANCZOS)
|
|
||||||
|
new_width = max(1, round(image.width * scale_x))
|
||||||
|
new_height = max(1, round(image.height * target_scale_y))
|
||||||
|
return image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||||||
|
|
||||||
# 波纹
|
# 波纹
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
@ -50,6 +50,7 @@ class ImageFilterManager:
|
|||||||
"描边": ImageFilterImplement.apply_stroke,
|
"描边": ImageFilterImplement.apply_stroke,
|
||||||
"形状描边": ImageFilterImplement.apply_shape_stroke,
|
"形状描边": ImageFilterImplement.apply_shape_stroke,
|
||||||
"半调": ImageFilterImplement.apply_halftone,
|
"半调": ImageFilterImplement.apply_halftone,
|
||||||
|
"JPEG损坏": ImageFilterImplement.apply_jpeg_damage,
|
||||||
"设置通道": ImageFilterImplement.apply_set_channel,
|
"设置通道": ImageFilterImplement.apply_set_channel,
|
||||||
"设置遮罩": ImageFilterImplement.apply_set_mask,
|
"设置遮罩": ImageFilterImplement.apply_set_mask,
|
||||||
# 图像处理
|
# 图像处理
|
||||||
|
|||||||
@ -70,7 +70,7 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
|||||||
await target.send_message(res)
|
await target.send_message(res)
|
||||||
return
|
return
|
||||||
|
|
||||||
env = TextHandlerEnvironment(is_trusted=False)
|
env = TextHandlerEnvironment(is_trusted=False, event=evt)
|
||||||
results = await runner.run_pipeline(res, istream or None, env)
|
results = await runner.run_pipeline(res, istream or None, env)
|
||||||
|
|
||||||
# 检查是否有错误
|
# 检查是否有错误
|
||||||
|
|||||||
@ -7,11 +7,13 @@ from string import whitespace
|
|||||||
from typing import cast
|
from typing import cast
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
from nonebot.adapters import Event
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TextHandlerEnvironment:
|
class TextHandlerEnvironment:
|
||||||
is_trusted: bool
|
is_trusted: bool
|
||||||
|
event: Event | None = None
|
||||||
buffers: dict[str, str] = field(default_factory=dict)
|
buffers: dict[str, str] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
@ -287,7 +289,7 @@ class PipelineRunner:
|
|||||||
env: TextHandlerEnvironment | None = None,
|
env: TextHandlerEnvironment | None = None,
|
||||||
) -> list[TextHandleResult]:
|
) -> list[TextHandleResult]:
|
||||||
if env is None:
|
if env is None:
|
||||||
env = TextHandlerEnvironment(is_trusted=False, buffers={})
|
env = TextHandlerEnvironment(is_trusted=False, event=None, buffers={})
|
||||||
|
|
||||||
results: list[TextHandleResult] = []
|
results: list[TextHandleResult] = []
|
||||||
|
|
||||||
|
|||||||
@ -1,36 +1,53 @@
|
|||||||
from typing import Any, cast
|
from typing import Any, cast
|
||||||
from konabot.common.llm import get_llm
|
from konabot.common.llm import get_llm
|
||||||
from konabot.plugins.handle_text.base import TextHandler, TextHandlerEnvironment, TextHandleResult
|
from konabot.common.permsys import perm_manager
|
||||||
|
from konabot.plugins.handle_text.base import (
|
||||||
|
TextHandler,
|
||||||
|
TextHandlerEnvironment,
|
||||||
|
TextHandleResult,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class THQwen(TextHandler):
|
class THQwen(TextHandler):
|
||||||
name = "qwen"
|
name = "qwen"
|
||||||
|
|
||||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
async def handle(
|
||||||
|
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||||
|
) -> TextHandleResult:
|
||||||
|
pm = perm_manager()
|
||||||
|
if env.event is None or not await pm.check_has_permission(
|
||||||
|
env.event, "textfx.qwen"
|
||||||
|
):
|
||||||
|
return TextHandleResult(
|
||||||
|
code=1,
|
||||||
|
ostream="你或当前环境没有使用 qwen 的权限。如有疑问请联系管理员",
|
||||||
|
)
|
||||||
|
|
||||||
llm = get_llm()
|
llm = get_llm()
|
||||||
messages = []
|
messages = []
|
||||||
|
|
||||||
if istream is not None:
|
if istream is not None:
|
||||||
messages.append({
|
messages.append({"role": "user", "content": istream})
|
||||||
"role": "user",
|
|
||||||
"content": istream
|
|
||||||
})
|
|
||||||
if len(args) > 0:
|
if len(args) > 0:
|
||||||
message = ' '.join(args)
|
message = " ".join(args)
|
||||||
messages.append({
|
messages.append(
|
||||||
"role": "user",
|
{
|
||||||
"content": message,
|
"role": "user",
|
||||||
})
|
"content": message,
|
||||||
|
}
|
||||||
|
)
|
||||||
if len(messages) == 0:
|
if len(messages) == 0:
|
||||||
return TextHandleResult(
|
return TextHandleResult(
|
||||||
code=1,
|
code=1,
|
||||||
ostream="使用方法:qwen <提示词>",
|
ostream="使用方法:qwen <提示词>",
|
||||||
)
|
)
|
||||||
|
|
||||||
messages = [{
|
messages = [
|
||||||
"role": "system",
|
{
|
||||||
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答"
|
"role": "system",
|
||||||
}] + messages
|
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答",
|
||||||
|
}
|
||||||
|
] + messages
|
||||||
result = await llm.chat(cast(Any, messages))
|
result = await llm.chat(cast(Any, messages))
|
||||||
content = result.content
|
content = result.content
|
||||||
if content is None:
|
if content is None:
|
||||||
|
|||||||
@ -6,29 +6,34 @@ from loguru import logger
|
|||||||
from nonebot import on_message
|
from nonebot import on_message
|
||||||
import nonebot
|
import nonebot
|
||||||
from nonebot.rule import to_me
|
from nonebot.rule import to_me
|
||||||
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
|
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
|
||||||
on_alconna)
|
|
||||||
from nonebot_plugin_apscheduler import scheduler
|
from nonebot_plugin_apscheduler import scheduler
|
||||||
|
|
||||||
from konabot.common import username
|
from konabot.common import username
|
||||||
from konabot.common.longtask import DepLongTaskTarget
|
from konabot.common.longtask import DepLongTaskTarget
|
||||||
from konabot.common.pager import PagerQuery
|
from konabot.common.pager import PagerQuery
|
||||||
from konabot.plugins.kona_ph.core.message import (get_daily_report,
|
from konabot.plugins.kona_ph.core.message import (
|
||||||
get_daily_report_v2,
|
get_daily_report,
|
||||||
get_puzzle_description,
|
get_daily_report_v2,
|
||||||
get_submission_message)
|
get_puzzle_description,
|
||||||
|
get_submission_message,
|
||||||
|
)
|
||||||
from konabot.plugins.kona_ph.core.storage import get_today_date
|
from konabot.plugins.kona_ph.core.storage import get_today_date
|
||||||
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE,
|
from konabot.plugins.kona_ph.manager import (
|
||||||
create_admin_commands,
|
PUZZLE_PAGE_SIZE,
|
||||||
puzzle_manager)
|
create_admin_commands,
|
||||||
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
|
puzzle_manager,
|
||||||
from konabot.plugins.poster.service import broadcast
|
)
|
||||||
|
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
|
||||||
|
|
||||||
create_admin_commands()
|
create_admin_commands()
|
||||||
register_poster_info("每日谜题", info=PosterInfo(
|
register_poster_info(
|
||||||
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
|
"每日谜题",
|
||||||
description="此方 BOT 每日谜题推送",
|
info=PosterInfo(
|
||||||
))
|
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
|
||||||
|
description="此方 BOT 每日谜题推送",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
cmd_submit = on_message(rule=to_me())
|
cmd_submit = on_message(rule=to_me())
|
||||||
@ -44,16 +49,22 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
|
|||||||
if isinstance(result, str):
|
if isinstance(result, str):
|
||||||
await target.send_message(result)
|
await target.send_message(result)
|
||||||
else:
|
else:
|
||||||
await target.send_message(get_submission_message(
|
await target.send_message(
|
||||||
daily_puzzle_info=result.info,
|
get_submission_message(
|
||||||
submission=result.submission,
|
daily_puzzle_info=result.info,
|
||||||
puzzle=result.puzzle,
|
submission=result.submission,
|
||||||
))
|
puzzle=result.puzzle,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
cmd_query = on_alconna(Alconna(
|
cmd_query = on_alconna(
|
||||||
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)"
|
Alconna(
|
||||||
), rule=to_me())
|
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)"
|
||||||
|
),
|
||||||
|
rule=to_me(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cmd_query.handle()
|
@cmd_query.handle()
|
||||||
async def _(target: DepLongTaskTarget):
|
async def _(target: DepLongTaskTarget):
|
||||||
@ -64,9 +75,8 @@ async def _(target: DepLongTaskTarget):
|
|||||||
await target.send_message(get_puzzle_description(p))
|
await target.send_message(get_puzzle_description(p))
|
||||||
|
|
||||||
|
|
||||||
cmd_query_submission = on_alconna(Alconna(
|
cmd_query_submission = on_alconna(Alconna("今日答题情况"), rule=to_me())
|
||||||
"今日答题情况"
|
|
||||||
), rule=to_me())
|
|
||||||
|
|
||||||
@cmd_query_submission.handle()
|
@cmd_query_submission.handle()
|
||||||
async def _(target: DepLongTaskTarget):
|
async def _(target: DepLongTaskTarget):
|
||||||
@ -77,11 +87,15 @@ async def _(target: DepLongTaskTarget):
|
|||||||
await target.send_message(get_daily_report_v2(manager, gid))
|
await target.send_message(get_daily_report_v2(manager, gid))
|
||||||
|
|
||||||
|
|
||||||
cmd_history = on_alconna(Alconna(
|
cmd_history = on_alconna(
|
||||||
"re:历史(题目|谜题)",
|
Alconna(
|
||||||
Args["page?", int],
|
"re:历史(题目|谜题)",
|
||||||
Args["index_id?", str],
|
Args["page?", int],
|
||||||
), rule=to_me())
|
Args["index_id?", str],
|
||||||
|
),
|
||||||
|
rule=to_me(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cmd_history.handle()
|
@cmd_history.handle()
|
||||||
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||||
@ -105,10 +119,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
|||||||
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
|
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
|
||||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||||
if page <= 0 or page > count_pages:
|
if page <= 0 or page > count_pages:
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(
|
||||||
f"页数只有 1 ~ {count_pages} 啦!"
|
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||||||
))
|
)
|
||||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||||||
for p, d in puzzles:
|
for p, d in puzzles:
|
||||||
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
|
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
|
||||||
msg = msg.text(
|
msg = msg.text(
|
||||||
@ -120,22 +134,26 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
|||||||
await target.send_message(msg)
|
await target.send_message(msg)
|
||||||
|
|
||||||
|
|
||||||
cmd_leadboard = on_alconna(Alconna(
|
cmd_leadboard = on_alconna(
|
||||||
"re:此方(解谜|谜题)排行榜",
|
Alconna(
|
||||||
Args["page?", int],
|
"re:此方(解谜|谜题)排行榜",
|
||||||
))
|
Args["page?", int],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cmd_leadboard.handle()
|
@cmd_leadboard.handle()
|
||||||
async def _(target: DepLongTaskTarget, page: int = 1):
|
async def _(target: DepLongTaskTarget, page: int = 1):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
result = manager.get_leadboard(PagerQuery(page, 10))
|
result = manager.get_leadboard(PagerQuery(page, 10))
|
||||||
await target.send_message(result.to_unimessage(
|
await target.send_message(
|
||||||
title="此方解谜排行榜",
|
result.to_unimessage(
|
||||||
formatter=lambda data: (
|
title="此方解谜排行榜",
|
||||||
f"✨ {data[1]} 已完成 | "
|
formatter=lambda data: (
|
||||||
f"{username.get_username(data[0])}"
|
f"✨ {data[1]} 已完成 | {username.get_username(data[0])}"
|
||||||
|
),
|
||||||
)
|
)
|
||||||
))
|
)
|
||||||
|
|
||||||
|
|
||||||
@scheduler.scheduled_job("cron", hour="8")
|
@scheduler.scheduled_job("cron", hour="8")
|
||||||
@ -155,4 +173,3 @@ async def _():
|
|||||||
|
|
||||||
|
|
||||||
driver = nonebot.get_driver()
|
driver = nonebot.get_driver()
|
||||||
|
|
||||||
|
|||||||
@ -1,50 +1,54 @@
|
|||||||
import datetime
|
import datetime
|
||||||
from math import ceil
|
from math import ceil
|
||||||
|
|
||||||
from nonebot import get_plugin_config
|
from nonebot.adapters import Event
|
||||||
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
|
from nonebot_plugin_alconna import (
|
||||||
Subcommand, SubcommandResult, UniMessage,
|
Alconna,
|
||||||
on_alconna)
|
Args,
|
||||||
from pydantic import BaseModel
|
Image,
|
||||||
|
Option,
|
||||||
|
Query,
|
||||||
|
Subcommand,
|
||||||
|
SubcommandResult,
|
||||||
|
UniMessage,
|
||||||
|
on_alconna,
|
||||||
|
)
|
||||||
|
|
||||||
from konabot.common.longtask import DepLongTaskTarget
|
from konabot.common.longtask import DepLongTaskTarget
|
||||||
from konabot.common.nb.exc import BotExceptionMessage
|
from konabot.common.nb.exc import BotExceptionMessage
|
||||||
from konabot.common.nb.extract_image import download_image_bytes
|
from konabot.common.nb.extract_image import download_image_bytes
|
||||||
|
from konabot.common.permsys import DepPermManager, require_permission
|
||||||
from konabot.common.username import get_username
|
from konabot.common.username import get_username
|
||||||
from konabot.plugins.kona_ph.core.image import get_image_manager
|
from konabot.plugins.kona_ph.core.image import get_image_manager
|
||||||
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
|
from konabot.plugins.kona_ph.core.message import (
|
||||||
get_puzzle_info_message,
|
get_puzzle_description,
|
||||||
get_submission_message)
|
get_puzzle_hint_list,
|
||||||
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
|
get_puzzle_info_message,
|
||||||
get_today_date,
|
get_submission_message,
|
||||||
puzzle_manager)
|
)
|
||||||
from konabot.plugins.poster.service import broadcast
|
from konabot.plugins.kona_ph.core.storage import (
|
||||||
|
Puzzle,
|
||||||
|
PuzzleHint,
|
||||||
|
PuzzleManager,
|
||||||
|
get_today_date,
|
||||||
|
puzzle_manager,
|
||||||
|
)
|
||||||
|
from konabot.common.subscribe import broadcast
|
||||||
|
|
||||||
PUZZLE_PAGE_SIZE = 10
|
PUZZLE_PAGE_SIZE = 10
|
||||||
|
|
||||||
|
|
||||||
class PuzzleConfig(BaseModel):
|
async def check_puzzle(
|
||||||
plugin_puzzle_manager: list[str] = []
|
manager: PuzzleManager,
|
||||||
plugin_puzzle_admin: list[str] = []
|
perm: DepPermManager,
|
||||||
plugin_puzzle_playgroup: list[str] = []
|
raw_id: str,
|
||||||
|
event: Event,
|
||||||
|
target: DepLongTaskTarget,
|
||||||
config = get_plugin_config(PuzzleConfig)
|
) -> Puzzle:
|
||||||
|
|
||||||
|
|
||||||
def is_puzzle_manager(target: DepLongTaskTarget):
|
|
||||||
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
|
|
||||||
|
|
||||||
|
|
||||||
def is_puzzle_admin(target: DepLongTaskTarget):
|
|
||||||
return target.target_id in config.plugin_puzzle_admin
|
|
||||||
|
|
||||||
|
|
||||||
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
|
|
||||||
if raw_id not in manager.puzzle_data:
|
if raw_id not in manager.puzzle_data:
|
||||||
raise BotExceptionMessage("没有这个谜题")
|
raise BotExceptionMessage("没有这个谜题")
|
||||||
puzzle = manager.puzzle_data[raw_id]
|
puzzle = manager.puzzle_data[raw_id]
|
||||||
if is_puzzle_admin(target):
|
if await perm.check_has_permission(event, "konaph.admin"):
|
||||||
return puzzle
|
return puzzle
|
||||||
if target.target_id != puzzle.author_id:
|
if target.target_id != puzzle.author_id:
|
||||||
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
|
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
|
||||||
@ -60,7 +64,9 @@ def create_admin_commands():
|
|||||||
Subcommand("unready", Args["raw_id", str], dest="unready"),
|
Subcommand("unready", Args["raw_id", str], dest="unready"),
|
||||||
Subcommand("info", Args["raw_id", str], dest="info"),
|
Subcommand("info", Args["raw_id", str], dest="info"),
|
||||||
Subcommand("my", Args["page?", int], dest="my"),
|
Subcommand("my", Args["page?", int], dest="my"),
|
||||||
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
|
Subcommand(
|
||||||
|
"all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"
|
||||||
|
),
|
||||||
Subcommand("pin", Args["raw_id?", str], dest="pin"),
|
Subcommand("pin", Args["raw_id?", str], dest="pin"),
|
||||||
Subcommand("unpin", dest="unpin"),
|
Subcommand("unpin", dest="unpin"),
|
||||||
Subcommand(
|
Subcommand(
|
||||||
@ -115,11 +121,11 @@ def create_admin_commands():
|
|||||||
dest="hint",
|
dest="hint",
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
rule=is_puzzle_manager,
|
rule=require_permission("konaph.manager"),
|
||||||
)
|
)
|
||||||
|
|
||||||
@cmd_admin.assign("$main")
|
@cmd_admin.assign("$main")
|
||||||
async def _(target: DepLongTaskTarget):
|
async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event):
|
||||||
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
|
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
|
||||||
msg = msg.text("konaph create - 创建一个新的谜题\n")
|
msg = msg.text("konaph create - 创建一个新的谜题\n")
|
||||||
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
|
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
|
||||||
@ -132,7 +138,7 @@ def create_admin_commands():
|
|||||||
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
|
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
|
||||||
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
|
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
|
||||||
|
|
||||||
if is_puzzle_admin(target):
|
if await pm.check_has_permission(event, "konaph.admin"):
|
||||||
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
|
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
|
||||||
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
|
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
|
||||||
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
|
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
|
||||||
@ -145,48 +151,54 @@ def create_admin_commands():
|
|||||||
async def _(target: DepLongTaskTarget):
|
async def _(target: DepLongTaskTarget):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
puzzle = manager.admin_create_puzzle(target.target_id)
|
puzzle = manager.admin_create_puzzle(target.target_id)
|
||||||
await target.send_message(UniMessage.text(
|
await target.send_message(
|
||||||
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
UniMessage.text(
|
||||||
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
||||||
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
||||||
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
||||||
))
|
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
@cmd_admin.assign("ready")
|
@cmd_admin.assign("ready")
|
||||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
async def _(
|
||||||
|
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||||||
|
):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
if p.ready:
|
if p.ready:
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(UniMessage.text("题目早就准备好啦!"))
|
||||||
"题目早就准备好啦!"
|
|
||||||
))
|
|
||||||
p.ready = True
|
p.ready = True
|
||||||
await target.send_message(UniMessage.text(
|
await target.send_message(
|
||||||
f"谜题「{p.title}」已经准备就绪!"
|
UniMessage.text(f"谜题「{p.title}」已经准备就绪!")
|
||||||
))
|
)
|
||||||
|
|
||||||
@cmd_admin.assign("unready")
|
@cmd_admin.assign("unready")
|
||||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
async def _(
|
||||||
|
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||||||
|
):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
if not p.ready:
|
if not p.ready:
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(
|
||||||
f"谜题「{p.title}」已经是未取消状态了!"
|
UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!")
|
||||||
))
|
)
|
||||||
if manager.is_puzzle_published(p.raw_id):
|
if manager.is_puzzle_published(p.raw_id):
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(
|
||||||
"已发布的谜题不能取消准备状态!"
|
UniMessage.text("已发布的谜题不能取消准备状态!")
|
||||||
))
|
)
|
||||||
|
|
||||||
p.ready = False
|
p.ready = False
|
||||||
await target.send_message(UniMessage.text(
|
await target.send_message(
|
||||||
f"谜题「{p.title}」已经取消准备!"
|
UniMessage.text(f"谜题「{p.title}」已经取消准备!")
|
||||||
))
|
)
|
||||||
|
|
||||||
@cmd_admin.assign("info")
|
@cmd_admin.assign("info")
|
||||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
async def _(
|
||||||
|
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||||||
|
):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
await target.send_message(get_puzzle_info_message(manager, p))
|
await target.send_message(get_puzzle_info_message(manager, p))
|
||||||
|
|
||||||
@cmd_admin.assign("my")
|
@cmd_admin.assign("my")
|
||||||
@ -194,15 +206,15 @@ def create_admin_commands():
|
|||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
puzzles = manager.get_puzzles_of_user(target.target_id)
|
puzzles = manager.get_puzzles_of_user(target.target_id)
|
||||||
if len(puzzles) == 0:
|
if len(puzzles) == 0:
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(
|
||||||
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
|
UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!")
|
||||||
))
|
)
|
||||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||||
if page <= 0 or page > count_pages:
|
if page <= 0 or page > count_pages:
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(
|
||||||
f"页数只有 1 ~ {count_pages} 啦!"
|
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||||||
))
|
)
|
||||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||||||
message = UniMessage.text("==== 我的谜题 ====\n\n")
|
message = UniMessage.text("==== 我的谜题 ====\n\n")
|
||||||
for p in puzzles:
|
for p in puzzles:
|
||||||
message = message.text("- ")
|
message = message.text("- ")
|
||||||
@ -220,11 +232,15 @@ def create_admin_commands():
|
|||||||
await target.send_message(message)
|
await target.send_message(message)
|
||||||
|
|
||||||
@cmd_admin.assign("all")
|
@cmd_admin.assign("all")
|
||||||
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
|
async def _(
|
||||||
if not is_puzzle_admin(target):
|
target: DepLongTaskTarget,
|
||||||
return await target.send_message(UniMessage.text(
|
event: Event,
|
||||||
"你没有权限使用该指令"
|
perm: DepPermManager,
|
||||||
))
|
ready: Query[bool] = Query("all.ready"),
|
||||||
|
page: int = 1,
|
||||||
|
):
|
||||||
|
if not perm.check_has_permission(event, "konaph.admin"):
|
||||||
|
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
puzzles = [*manager.puzzle_data.values()]
|
puzzles = [*manager.puzzle_data.values()]
|
||||||
if ready.available:
|
if ready.available:
|
||||||
@ -232,10 +248,10 @@ def create_admin_commands():
|
|||||||
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
|
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
|
||||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||||
if page <= 0 or page > count_pages:
|
if page <= 0 or page > count_pages:
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(
|
||||||
f"页数只有 1 ~ {count_pages} 啦!"
|
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||||||
))
|
)
|
||||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||||||
message = UniMessage.text("==== 所有谜题 ====\n\n")
|
message = UniMessage.text("==== 所有谜题 ====\n\n")
|
||||||
for p in puzzles:
|
for p in puzzles:
|
||||||
message = message.text("- ")
|
message = message.text("- ")
|
||||||
@ -253,32 +269,30 @@ def create_admin_commands():
|
|||||||
await target.send_message(message)
|
await target.send_message(message)
|
||||||
|
|
||||||
@cmd_admin.assign("pin")
|
@cmd_admin.assign("pin")
|
||||||
async def _(target: DepLongTaskTarget, raw_id: str = ""):
|
async def _(
|
||||||
if not is_puzzle_admin(target):
|
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = ""
|
||||||
return await target.send_message(UniMessage.text(
|
):
|
||||||
"你没有权限使用该指令"
|
if not perm.check_has_permission(event, "konaph.admin"):
|
||||||
))
|
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||||
|
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
if raw_id == "":
|
if raw_id == "":
|
||||||
if manager.puzzle_pinned:
|
if manager.puzzle_pinned:
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(
|
||||||
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
|
UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}")
|
||||||
))
|
)
|
||||||
return await target.send_message("没有置顶谜题")
|
return await target.send_message("没有置顶谜题")
|
||||||
if raw_id not in manager.unpublished_puzzles:
|
if raw_id not in manager.unpublished_puzzles:
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(
|
||||||
"这个谜题已经发布了,或者还没准备好,或者不存在"
|
UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在")
|
||||||
))
|
)
|
||||||
manager.admin_pin_puzzle(raw_id)
|
manager.admin_pin_puzzle(raw_id)
|
||||||
return await target.send_message(f"已置顶谜题 {raw_id}")
|
return await target.send_message(f"已置顶谜题 {raw_id}")
|
||||||
|
|
||||||
@cmd_admin.assign("unpin")
|
@cmd_admin.assign("unpin")
|
||||||
async def _(target: DepLongTaskTarget):
|
async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager):
|
||||||
if not is_puzzle_admin(target):
|
if not perm.check_has_permission(event, "konaph.admin"):
|
||||||
return await target.send_message(UniMessage.text(
|
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||||
"你没有权限使用该指令"
|
|
||||||
))
|
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
manager.admin_pin_puzzle("")
|
manager.admin_pin_puzzle("")
|
||||||
return await target.send_message("已取消所有置顶")
|
return await target.send_message("已取消所有置顶")
|
||||||
@ -286,6 +300,8 @@ def create_admin_commands():
|
|||||||
@cmd_admin.assign("modify")
|
@cmd_admin.assign("modify")
|
||||||
async def _(
|
async def _(
|
||||||
target: DepLongTaskTarget,
|
target: DepLongTaskTarget,
|
||||||
|
event: Event,
|
||||||
|
perm: DepPermManager,
|
||||||
raw_id: str = "",
|
raw_id: str = "",
|
||||||
title: str | None = None,
|
title: str | None = None,
|
||||||
description: str | None = None,
|
description: str | None = None,
|
||||||
@ -306,7 +322,7 @@ def create_admin_commands():
|
|||||||
image_manager = get_image_manager()
|
image_manager = get_image_manager()
|
||||||
|
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
if title is not None:
|
if title is not None:
|
||||||
p.title = title
|
p.title = title
|
||||||
if description is not None:
|
if description is not None:
|
||||||
@ -329,11 +345,14 @@ def create_admin_commands():
|
|||||||
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
|
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
|
||||||
|
|
||||||
@cmd_admin.assign("publish")
|
@cmd_admin.assign("publish")
|
||||||
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
|
async def _(
|
||||||
if not is_puzzle_admin(target):
|
target: DepLongTaskTarget,
|
||||||
return await target.send_message(UniMessage.text(
|
event: Event,
|
||||||
"你没有权限使用该指令"
|
perm: DepPermManager,
|
||||||
))
|
raw_id: str | None = None,
|
||||||
|
):
|
||||||
|
if not perm.check_has_permission(event, "konaph.admin"):
|
||||||
|
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||||
today = get_today_date()
|
today = get_today_date()
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
if today in manager.daily_puzzle_of_date:
|
if today in manager.daily_puzzle_of_date:
|
||||||
@ -348,46 +367,64 @@ def create_admin_commands():
|
|||||||
return await target.send_message("Ok!")
|
return await target.send_message("Ok!")
|
||||||
|
|
||||||
@cmd_admin.assign("preview")
|
@cmd_admin.assign("preview")
|
||||||
async def _(target: DepLongTaskTarget, raw_id: str):
|
async def _(
|
||||||
|
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
|
||||||
|
):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
return await target.send_message(get_puzzle_description(p))
|
return await target.send_message(get_puzzle_description(p))
|
||||||
|
|
||||||
@cmd_admin.assign("get-submits")
|
@cmd_admin.assign("get-submits")
|
||||||
async def _(target: DepLongTaskTarget, raw_id: str):
|
async def _(
|
||||||
|
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
|
||||||
|
):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
puzzle = manager.puzzle_data.get(raw_id)
|
puzzle = manager.puzzle_data.get(raw_id)
|
||||||
if puzzle is None:
|
if puzzle is None:
|
||||||
return await target.send_message("没有这个谜题")
|
return await target.send_message("没有这个谜题")
|
||||||
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
|
if (
|
||||||
|
not perm.check_has_permission(event, "konaph.admin")
|
||||||
|
and target.target_id != puzzle.author_id
|
||||||
|
):
|
||||||
return await target.send_message("你没有权限预览这个谜题")
|
return await target.send_message("你没有权限预览这个谜题")
|
||||||
|
|
||||||
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
|
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
|
||||||
submits = manager.submissions.get(raw_id, {})
|
submits = manager.submissions.get(raw_id, {})
|
||||||
for uid, ls in submits.items():
|
for uid, ls in submits.items():
|
||||||
s = ', '.join((i.flag for i in ls))
|
s = ", ".join((i.flag for i in ls))
|
||||||
msg = msg.text(f"- {get_username(uid)}:{s}\n")
|
msg = msg.text(f"- {get_username(uid)}:{s}\n")
|
||||||
return await target.send_message(msg)
|
return await target.send_message(msg)
|
||||||
|
|
||||||
@cmd_admin.assign("test")
|
@cmd_admin.assign("test")
|
||||||
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
|
async def _(
|
||||||
|
target: DepLongTaskTarget,
|
||||||
|
raw_id: str,
|
||||||
|
submission: str,
|
||||||
|
event: Event,
|
||||||
|
perm: DepPermManager,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
测试一道谜题的回答,并给出结果
|
测试一道谜题的回答,并给出结果
|
||||||
"""
|
"""
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
result = p.check_submission(submission)
|
result = p.check_submission(submission)
|
||||||
msg = get_submission_message(p, result)
|
msg = get_submission_message(p, result)
|
||||||
return await target.send_message("[测试提交] " + msg)
|
return await target.send_message("[测试提交] " + msg)
|
||||||
|
|
||||||
@cmd_admin.assign("subcommands.hint")
|
@cmd_admin.assign("subcommands.hint")
|
||||||
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
|
async def _(
|
||||||
|
target: DepLongTaskTarget,
|
||||||
|
subcommands: Query[SubcommandResult] = Query("subcommands.hint"),
|
||||||
|
):
|
||||||
if len(subcommands.result.subcommands) > 0:
|
if len(subcommands.result.subcommands) > 0:
|
||||||
return
|
return
|
||||||
return await target.send_message(
|
return await target.send_message(
|
||||||
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
|
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
|
||||||
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
|
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
|
||||||
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
|
.text(
|
||||||
|
"- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n"
|
||||||
|
)
|
||||||
.text("- konaph hint modify <id> <hint_id>\n")
|
.text("- konaph hint modify <id> <hint_id>\n")
|
||||||
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
|
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
|
||||||
.text(" - --message <message>\n - 更改提示文本\n")
|
.text(" - --message <message>\n - 更改提示文本\n")
|
||||||
@ -402,9 +439,11 @@ def create_admin_commands():
|
|||||||
raw_id: str,
|
raw_id: str,
|
||||||
pattern: str,
|
pattern: str,
|
||||||
message: str,
|
message: str,
|
||||||
|
event: Event,
|
||||||
|
perm: DepPermManager,
|
||||||
):
|
):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
p.hints[p.hint_id_max + 1] = PuzzleHint(
|
p.hints[p.hint_id_max + 1] = PuzzleHint(
|
||||||
pattern=pattern,
|
pattern=pattern,
|
||||||
message=message,
|
message=message,
|
||||||
@ -416,9 +455,11 @@ def create_admin_commands():
|
|||||||
async def _(
|
async def _(
|
||||||
target: DepLongTaskTarget,
|
target: DepLongTaskTarget,
|
||||||
raw_id: str,
|
raw_id: str,
|
||||||
|
event: Event,
|
||||||
|
perm: DepPermManager,
|
||||||
):
|
):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
await target.send_message(get_puzzle_hint_list(p))
|
await target.send_message(get_puzzle_hint_list(p))
|
||||||
|
|
||||||
@cmd_admin.assign("subcommands.hint.modify")
|
@cmd_admin.assign("subcommands.hint.modify")
|
||||||
@ -426,12 +467,14 @@ def create_admin_commands():
|
|||||||
target: DepLongTaskTarget,
|
target: DepLongTaskTarget,
|
||||||
raw_id: str,
|
raw_id: str,
|
||||||
hint_id: int,
|
hint_id: int,
|
||||||
|
event: Event,
|
||||||
|
perm: DepPermManager,
|
||||||
pattern: str | None = None,
|
pattern: str | None = None,
|
||||||
message: str | None = None,
|
message: str | None = None,
|
||||||
is_checkpoint: bool | None = None,
|
is_checkpoint: bool | None = None,
|
||||||
):
|
):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
if hint_id not in p.hints:
|
if hint_id not in p.hints:
|
||||||
raise BotExceptionMessage(
|
raise BotExceptionMessage(
|
||||||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||||||
@ -450,9 +493,11 @@ def create_admin_commands():
|
|||||||
target: DepLongTaskTarget,
|
target: DepLongTaskTarget,
|
||||||
raw_id: str,
|
raw_id: str,
|
||||||
hint_id: int,
|
hint_id: int,
|
||||||
|
event: Event,
|
||||||
|
perm: DepPermManager,
|
||||||
):
|
):
|
||||||
async with puzzle_manager() as manager:
|
async with puzzle_manager() as manager:
|
||||||
p = check_puzzle(manager, target, raw_id)
|
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||||
if hint_id not in p.hints:
|
if hint_id not in p.hints:
|
||||||
raise BotExceptionMessage(
|
raise BotExceptionMessage(
|
||||||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||||||
@ -460,5 +505,4 @@ def create_admin_commands():
|
|||||||
del p.hints[hint_id]
|
del p.hints[hint_id]
|
||||||
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
|
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
|
||||||
|
|
||||||
|
|
||||||
return cmd_admin
|
return cmd_admin
|
||||||
|
|||||||
@ -1,57 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import mcstatus
|
|
||||||
|
|
||||||
from nonebot import on_command
|
|
||||||
from nonebot.adapters import Event
|
|
||||||
from nonebot_plugin_alconna import UniMessage
|
|
||||||
from konabot.common.nb.is_admin import is_admin
|
|
||||||
from mcstatus.responses import JavaStatusResponse
|
|
||||||
|
|
||||||
|
|
||||||
cmd = on_command("宾几人", aliases=set(("宾人数", "mcbingo")), rule=is_admin)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_status(motd: str) -> str:
|
|
||||||
if "[PRE-GAME]" in motd:
|
|
||||||
return "[✨ 空闲]"
|
|
||||||
if "[IN-GAME]" in motd:
|
|
||||||
return "[🕜 游戏中]"
|
|
||||||
if "[POST-GAME]" in motd:
|
|
||||||
return "[🕜 游戏中]"
|
|
||||||
return "[✨ 开放]"
|
|
||||||
|
|
||||||
|
|
||||||
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
|
|
||||||
if isinstance(status, JavaStatusResponse):
|
|
||||||
motd = status.motd.to_plain()
|
|
||||||
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
|
|
||||||
st = parse_status(motd)
|
|
||||||
players_sample = status.players.sample or []
|
|
||||||
players_sample_suffix = ""
|
|
||||||
if len(players_sample) > 0:
|
|
||||||
player_list = [s.name for s in players_sample]
|
|
||||||
players_sample_suffix = " (" + ", ".join(player_list) + ")"
|
|
||||||
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
|
|
||||||
else:
|
|
||||||
return f"{name}: 好像没开"
|
|
||||||
|
|
||||||
|
|
||||||
@cmd.handle()
|
|
||||||
async def _(evt: Event):
|
|
||||||
servers = (
|
|
||||||
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
|
|
||||||
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
|
|
||||||
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
|
|
||||||
)
|
|
||||||
|
|
||||||
responses = await asyncio.gather(
|
|
||||||
*map(lambda s: s[0].async_status(), servers),
|
|
||||||
return_exceptions=True,
|
|
||||||
)
|
|
||||||
messages = "\n".join((
|
|
||||||
dump_server_status(n, r)
|
|
||||||
for n, r in zip(map(lambda s: s[1], servers), responses)
|
|
||||||
))
|
|
||||||
|
|
||||||
await UniMessage.text(messages).finish(evt, at_sender=False)
|
|
||||||
|
|
||||||
131
konabot/plugins/minecraft_servers/__init__.py
Normal file
131
konabot/plugins/minecraft_servers/__init__.py
Normal file
@ -0,0 +1,131 @@
|
|||||||
|
import asyncio
|
||||||
|
import datetime
|
||||||
|
from typing import Literal
|
||||||
|
import mcstatus
|
||||||
|
|
||||||
|
from nonebot import on_command
|
||||||
|
from nonebot.adapters import Event
|
||||||
|
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||||
|
from mcstatus.responses import JavaStatusResponse
|
||||||
|
from nonebot_plugin_apscheduler import scheduler
|
||||||
|
|
||||||
|
from konabot.common.permsys import DepPermManager, require_permission
|
||||||
|
from konabot.plugins.minecraft_servers.simpfun_server import SimpfunServer
|
||||||
|
|
||||||
|
|
||||||
|
cmd = on_command(
|
||||||
|
"宾几人",
|
||||||
|
aliases=set(("宾人数", "mcbingo")),
|
||||||
|
rule=require_permission("minecraft.bingo.check"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_status(motd: str) -> str:
|
||||||
|
if "[PRE-GAME]" in motd:
|
||||||
|
return "[✨ 空闲]"
|
||||||
|
if "[IN-GAME]" in motd:
|
||||||
|
return "[🕜 游戏中]"
|
||||||
|
if "[POST-GAME]" in motd:
|
||||||
|
return "[🕜 游戏中]"
|
||||||
|
return "[✨ 开放]"
|
||||||
|
|
||||||
|
|
||||||
|
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
|
||||||
|
if isinstance(status, JavaStatusResponse):
|
||||||
|
motd = status.motd.to_plain()
|
||||||
|
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
|
||||||
|
st = parse_status(motd)
|
||||||
|
players_sample = status.players.sample or []
|
||||||
|
players_sample_suffix = ""
|
||||||
|
if len(players_sample) > 0:
|
||||||
|
player_list = [s.name for s in players_sample]
|
||||||
|
players_sample_suffix = " (" + ", ".join(player_list) + ")"
|
||||||
|
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
|
||||||
|
else:
|
||||||
|
return f"{name}: 好像没开"
|
||||||
|
|
||||||
|
|
||||||
|
@cmd.handle()
|
||||||
|
async def _(evt: Event, pm: DepPermManager):
|
||||||
|
servers = (
|
||||||
|
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
|
||||||
|
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
|
||||||
|
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
|
||||||
|
)
|
||||||
|
|
||||||
|
responses = await asyncio.gather(
|
||||||
|
*map(lambda s: s[0].async_status(), servers),
|
||||||
|
return_exceptions=True,
|
||||||
|
)
|
||||||
|
messages = "\n".join(
|
||||||
|
(
|
||||||
|
dump_server_status(n, r)
|
||||||
|
for n, r in zip(map(lambda s: s[1], servers), responses)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if await pm.check_has_permission(evt, "minecraft.bingo.manipulate"):
|
||||||
|
messages += "\n\n---\n\n你可以使用 bingoman start 开启小帕的 bingo 服,用 bingoman stop 关闭小帕的 bingo 服"
|
||||||
|
|
||||||
|
await UniMessage.text(messages).finish(evt, at_sender=False)
|
||||||
|
|
||||||
|
|
||||||
|
cmd_bingo_manipulate = on_alconna(
|
||||||
|
Alconna("bingoman", Args["action", str]),
|
||||||
|
aliases=("宾服务器", "bingo服"),
|
||||||
|
rule=require_permission("minecraft.bingo.manipulate"),
|
||||||
|
)
|
||||||
|
|
||||||
|
actions: dict[str, Literal["start", "stop", "restart", "kill"]] = {
|
||||||
|
"up": "start",
|
||||||
|
"down": "stop",
|
||||||
|
"start": "start",
|
||||||
|
"stop": "stop",
|
||||||
|
"开机": "start",
|
||||||
|
"关机": "stop",
|
||||||
|
"restart": "restart",
|
||||||
|
"kill": "kill",
|
||||||
|
"重启": "restart",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@cmd_bingo_manipulate.handle()
|
||||||
|
async def _(action: str, event: Event):
|
||||||
|
server = SimpfunServer.new() # 使用默认配置管理服务器
|
||||||
|
a = actions.get(action.lower().strip())
|
||||||
|
if a is None:
|
||||||
|
await UniMessage.text(f"操作 {action} 不存在").send(event, at_sender=True)
|
||||||
|
return
|
||||||
|
resp = await server.power(a)
|
||||||
|
if resp.code == 200:
|
||||||
|
await UniMessage.text("好了").send(event, at_sender=True)
|
||||||
|
else:
|
||||||
|
await UniMessage.text(f"不好:{resp}").send(event, at_sender=True)
|
||||||
|
|
||||||
|
|
||||||
|
@scheduler.scheduled_job("cron", hour="4,23")
|
||||||
|
async def _():
|
||||||
|
server = SimpfunServer.new()
|
||||||
|
today = datetime.datetime.now()
|
||||||
|
|
||||||
|
# 获取服务器当前状态,重试多次以保证不会误判服务器未开启
|
||||||
|
server_up = False
|
||||||
|
server_players = 0
|
||||||
|
for _ in range(3):
|
||||||
|
mcs = mcstatus.JavaServer("play.simpfun.cn", 11495)
|
||||||
|
try:
|
||||||
|
resp = await mcs.async_status()
|
||||||
|
server_up = True
|
||||||
|
server_players = resp.players.online
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if today.weekday() == 5 and today.hour < 12:
|
||||||
|
# 每周六开机一天,保证可以让服务器不被自动销毁
|
||||||
|
if not server_up:
|
||||||
|
await server.power("start")
|
||||||
|
else:
|
||||||
|
# 每用一个自然日都会计费,所以要赶在这一天结束之前关服
|
||||||
|
# 平时如果没人,也自动关上
|
||||||
|
if server_up and server_players == 0:
|
||||||
|
await server.power("stop")
|
||||||
90
konabot/plugins/minecraft_servers/simpfun_server.py
Normal file
90
konabot/plugins/minecraft_servers/simpfun_server.py
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
import datetime
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
|
class SimpfunServerConfig(BaseModel):
|
||||||
|
plugin_simpfun_api_key: str = ""
|
||||||
|
plugin_simpfun_base_url: str = "https://api.simpfun.cn"
|
||||||
|
plugin_simpfun_instance_id: int = 0
|
||||||
|
|
||||||
|
|
||||||
|
def get_config():
|
||||||
|
from nonebot import get_plugin_config
|
||||||
|
|
||||||
|
return get_plugin_config(SimpfunServerConfig)
|
||||||
|
|
||||||
|
|
||||||
|
class PowerManageResult(BaseModel):
|
||||||
|
code: int
|
||||||
|
status: bool
|
||||||
|
msg: str
|
||||||
|
|
||||||
|
|
||||||
|
class SimpfunServerDetailUtilization(BaseModel):
|
||||||
|
memory_bytes: int
|
||||||
|
cpu_absolute: float
|
||||||
|
disk_bytes: int
|
||||||
|
network_rx_bytes: int
|
||||||
|
network_tx_bytes: int
|
||||||
|
uptime: float
|
||||||
|
disk_last_check_time: datetime.datetime
|
||||||
|
|
||||||
|
|
||||||
|
class SimpfunServerDetailData(BaseModel):
|
||||||
|
id: int
|
||||||
|
name: str
|
||||||
|
is_pro: bool
|
||||||
|
|
||||||
|
status: str
|
||||||
|
"运行中的话,是 running"
|
||||||
|
|
||||||
|
is_suspended: bool
|
||||||
|
utilization: SimpfunServerDetailUtilization
|
||||||
|
|
||||||
|
|
||||||
|
class SimpfunServerDetailResp(BaseModel):
|
||||||
|
code: int
|
||||||
|
data: SimpfunServerDetailData
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SimpfunServer:
|
||||||
|
instance_id: int
|
||||||
|
api_key: str
|
||||||
|
base_url: str
|
||||||
|
|
||||||
|
async def power(
|
||||||
|
self, action: Literal["start", "stop", "restart", "kill"]
|
||||||
|
) -> PowerManageResult:
|
||||||
|
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
|
||||||
|
|
||||||
|
async with aiohttp.ClientSession(
|
||||||
|
headers={"Authorization": self.api_key}
|
||||||
|
) as session:
|
||||||
|
async with session.get(url, params={"action": action}) as resp:
|
||||||
|
resp.raise_for_status()
|
||||||
|
return PowerManageResult.model_validate_json(await resp.read())
|
||||||
|
|
||||||
|
async def detail(self) -> SimpfunServerDetailResp:
|
||||||
|
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
|
||||||
|
|
||||||
|
async with aiohttp.ClientSession(
|
||||||
|
headers={"Authorization": self.api_key}
|
||||||
|
) as session:
|
||||||
|
async with session.get(url) as resp:
|
||||||
|
resp.raise_for_status()
|
||||||
|
return SimpfunServerDetailResp.model_validate_json(await resp.read())
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def new(config: SimpfunServerConfig | None = None):
|
||||||
|
if config is None:
|
||||||
|
config = get_config()
|
||||||
|
return SimpfunServer(
|
||||||
|
instance_id=config.plugin_simpfun_instance_id,
|
||||||
|
api_key=config.plugin_simpfun_api_key,
|
||||||
|
base_url=config.plugin_simpfun_base_url,
|
||||||
|
)
|
||||||
@ -6,6 +6,7 @@ from konabot.common.nb.match_keyword import match_keyword
|
|||||||
|
|
||||||
evt_nya = on_message(rule=match_keyword("喵"))
|
evt_nya = on_message(rule=match_keyword("喵"))
|
||||||
|
|
||||||
|
|
||||||
@evt_nya.handle()
|
@evt_nya.handle()
|
||||||
async def _():
|
async def _():
|
||||||
await evt_nya.send(await UniMessage().text("喵").export())
|
await evt_nya.send(await UniMessage().text("喵").export())
|
||||||
@ -25,8 +26,9 @@ NYA_SYMBOL_MAPPING = {
|
|||||||
"~": "~",
|
"~": "~",
|
||||||
"~": "~",
|
"~": "~",
|
||||||
" ": " ",
|
" ": " ",
|
||||||
"\n": "\n",
|
|
||||||
}
|
}
|
||||||
|
NYA_SYMBOL_KEEP = "—¹₁²₂³₃⁴₄⁵₅⁶₆⁷₇⁸₈⁹₉⁰₀\n"
|
||||||
|
NYA_SYMBOL_MAPPING.update((k, k) for k in NYA_SYMBOL_KEEP)
|
||||||
|
|
||||||
|
|
||||||
async def has_nya(msg: UniMsg) -> bool:
|
async def has_nya(msg: UniMsg) -> bool:
|
||||||
@ -49,10 +51,10 @@ async def has_nya(msg: UniMsg) -> bool:
|
|||||||
|
|
||||||
evt_nya_v2 = on_message(rule=has_nya)
|
evt_nya_v2 = on_message(rule=has_nya)
|
||||||
|
|
||||||
|
|
||||||
@evt_nya_v2.handle()
|
@evt_nya_v2.handle()
|
||||||
async def _(msg: UniMsg, evt: Event):
|
async def _(msg: UniMsg, evt: Event):
|
||||||
text = msg.extract_plain_text()
|
text = msg.extract_plain_text()
|
||||||
await UniMessage.text(''.join(
|
await UniMessage.text("".join((NYA_SYMBOL_MAPPING.get(c, "") for c in text))).send(
|
||||||
(NYA_SYMBOL_MAPPING.get(c, '') for c in text)
|
evt
|
||||||
)).send(evt)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -3,14 +3,15 @@ from nonebot_plugin_alconna import Alconna, Args, on_alconna
|
|||||||
|
|
||||||
from konabot.common.longtask import DepLongTaskTarget
|
from konabot.common.longtask import DepLongTaskTarget
|
||||||
from konabot.common.pager import PagerQuery
|
from konabot.common.pager import PagerQuery
|
||||||
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
|
from konabot.common.subscribe import POSTER_INFO_DATA, dep_poster_service
|
||||||
from konabot.plugins.poster.service import dep_poster_service
|
|
||||||
|
|
||||||
|
|
||||||
cmd_subscribe = on_alconna(Alconna(
|
cmd_subscribe = on_alconna(
|
||||||
"订阅",
|
Alconna(
|
||||||
Args["channel", str],
|
"订阅",
|
||||||
))
|
Args["channel", str],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cmd_subscribe.handle()
|
@cmd_subscribe.handle()
|
||||||
@ -23,10 +24,12 @@ async def _(target: DepLongTaskTarget, channel: str):
|
|||||||
await target.send_message(f"已经订阅过「{channel}」了")
|
await target.send_message(f"已经订阅过「{channel}」了")
|
||||||
|
|
||||||
|
|
||||||
cmd_list = on_alconna(Alconna(
|
cmd_list = on_alconna(
|
||||||
"re:(?:查询|我的|获取)订阅(列表)?",
|
Alconna(
|
||||||
Args["page?", int],
|
"re:(?:查询|我的|获取)订阅(列表)?",
|
||||||
))
|
Args["page?", int],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def better_channel_message(channel_id: str) -> str:
|
def better_channel_message(channel_id: str) -> str:
|
||||||
@ -39,17 +42,24 @@ def better_channel_message(channel_id: str) -> str:
|
|||||||
@cmd_list.handle()
|
@cmd_list.handle()
|
||||||
async def _(target: DepLongTaskTarget, page: int = 1):
|
async def _(target: DepLongTaskTarget, page: int = 1):
|
||||||
async with dep_poster_service() as service:
|
async with dep_poster_service() as service:
|
||||||
result = await service.get_channels(target, PagerQuery(
|
result = await service.get_channels(
|
||||||
page_index=page,
|
target,
|
||||||
page_size=10,
|
PagerQuery(
|
||||||
))
|
page_index=page,
|
||||||
await target.send_message(result.to_unimessage(title="订阅列表", formatter=better_channel_message))
|
page_size=10,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
await target.send_message(
|
||||||
|
result.to_unimessage(title="订阅列表", formatter=better_channel_message)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
cmd_list_available = on_alconna(Alconna(
|
cmd_list_available = on_alconna(
|
||||||
"re:(查询)?可用订阅(列表)?",
|
Alconna(
|
||||||
Args["page?", int],
|
"re:(查询)?可用订阅(列表)?",
|
||||||
))
|
Args["page?", int],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cmd_list_available.handle()
|
@cmd_list_available.handle()
|
||||||
@ -58,13 +68,17 @@ async def _(target: DepLongTaskTarget, page: int = 1):
|
|||||||
page_index=page,
|
page_index=page,
|
||||||
page_size=10,
|
page_size=10,
|
||||||
).apply(sorted(POSTER_INFO_DATA.keys()))
|
).apply(sorted(POSTER_INFO_DATA.keys()))
|
||||||
await target.send_message(result.to_unimessage(title="可用订阅列表", formatter=better_channel_message))
|
await target.send_message(
|
||||||
|
result.to_unimessage(title="可用订阅列表", formatter=better_channel_message)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
cmd_unsubscribe = on_alconna(Alconna(
|
cmd_unsubscribe = on_alconna(
|
||||||
"取消订阅",
|
Alconna(
|
||||||
Args["channel", str],
|
"取消订阅",
|
||||||
))
|
Args["channel", str],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cmd_unsubscribe.handle()
|
@cmd_unsubscribe.handle()
|
||||||
@ -79,6 +93,7 @@ async def _(target: DepLongTaskTarget, channel: str):
|
|||||||
|
|
||||||
driver = nonebot.get_driver()
|
driver = nonebot.get_driver()
|
||||||
|
|
||||||
|
|
||||||
@driver.on_startup
|
@driver.on_startup
|
||||||
async def _():
|
async def _():
|
||||||
async with dep_poster_service() as service:
|
async with dep_poster_service() as service:
|
||||||
|
|||||||
@ -4,8 +4,7 @@ from nonebot.internal.adapter.event import Event
|
|||||||
from nonebot_plugin_alconna import UniMessage
|
from nonebot_plugin_alconna import UniMessage
|
||||||
from nonebot_plugin_apscheduler import scheduler
|
from nonebot_plugin_apscheduler import scheduler
|
||||||
|
|
||||||
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
|
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
|
||||||
from konabot.plugins.poster.service import broadcast
|
|
||||||
|
|
||||||
register_poster_info(
|
register_poster_info(
|
||||||
"二十四节气",
|
"二十四节气",
|
||||||
@ -98,4 +97,3 @@ async def _(event: Event):
|
|||||||
|
|
||||||
msg = UniMessage.text(f"现在的节气是{date.term}")
|
msg = UniMessage.text(f"现在的节气是{date.term}")
|
||||||
await msg.send(event)
|
await msg.send(event)
|
||||||
|
|
||||||
|
|||||||
@ -1,20 +1,23 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from nonebot import get_driver
|
from nonebot import get_driver
|
||||||
from nonebot_plugin_alconna import UniMessage
|
from nonebot_plugin_alconna import UniMessage
|
||||||
from konabot.plugins.poster.poster_info import register_poster_info, PosterInfo
|
from konabot.common.subscribe import register_poster_info, PosterInfo, broadcast
|
||||||
from konabot.plugins.poster.service import broadcast
|
|
||||||
|
|
||||||
|
|
||||||
CHANNEL_STARTUP = "启动通知"
|
CHANNEL_STARTUP = "启动通知"
|
||||||
|
|
||||||
|
|
||||||
register_poster_info(CHANNEL_STARTUP, PosterInfo(
|
register_poster_info(
|
||||||
aliases=set(),
|
CHANNEL_STARTUP,
|
||||||
description="当 Bot 重启时告知",
|
PosterInfo(
|
||||||
))
|
aliases=set(),
|
||||||
|
description="当 Bot 重启时告知",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
driver = get_driver()
|
driver = get_driver()
|
||||||
|
|
||||||
|
|
||||||
@driver.on_startup
|
@driver.on_startup
|
||||||
async def _():
|
async def _():
|
||||||
# 要尽量保证接受讯息的服务存在
|
# 要尽量保证接受讯息的服务存在
|
||||||
@ -30,4 +33,3 @@ async def _():
|
|||||||
await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了"))
|
await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了"))
|
||||||
|
|
||||||
asyncio.create_task(task())
|
asyncio.create_task(task())
|
||||||
|
|
||||||
|
|||||||
210
konabot/plugins/syntactic_sugar/__init__.py
Normal file
210
konabot/plugins/syntactic_sugar/__init__.py
Normal file
@ -0,0 +1,210 @@
|
|||||||
|
import copy
|
||||||
|
import re
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import nonebot
|
||||||
|
from nonebot import on_command
|
||||||
|
from nonebot.adapters import Bot, Event, Message
|
||||||
|
from nonebot.log import logger
|
||||||
|
from nonebot.message import handle_event
|
||||||
|
from nonebot.params import CommandArg
|
||||||
|
|
||||||
|
from konabot.common.database import DatabaseManager
|
||||||
|
from konabot.common.longtask import DepLongTaskTarget
|
||||||
|
|
||||||
|
ROOT_PATH = Path(__file__).resolve().parent
|
||||||
|
|
||||||
|
cmd = on_command(cmd="语法糖", aliases={"糖", "sugar"}, block=True)
|
||||||
|
|
||||||
|
db_manager = DatabaseManager()
|
||||||
|
driver = nonebot.get_driver()
|
||||||
|
|
||||||
|
|
||||||
|
@driver.on_startup
|
||||||
|
async def register_startup_hook():
|
||||||
|
await init_db()
|
||||||
|
|
||||||
|
|
||||||
|
@driver.on_shutdown
|
||||||
|
async def register_shutdown_hook():
|
||||||
|
await db_manager.close_all_connections()
|
||||||
|
|
||||||
|
|
||||||
|
async def init_db():
|
||||||
|
await db_manager.execute_by_sql_file(ROOT_PATH / "sql" / "create_table.sql")
|
||||||
|
|
||||||
|
table_info = await db_manager.query("PRAGMA table_info(syntactic_sugar)")
|
||||||
|
columns = {str(row.get("name")) for row in table_info}
|
||||||
|
if "channel_id" not in columns:
|
||||||
|
await db_manager.execute(
|
||||||
|
"ALTER TABLE syntactic_sugar ADD COLUMN channel_id VARCHAR(255) NOT NULL DEFAULT ''"
|
||||||
|
)
|
||||||
|
|
||||||
|
await db_manager.execute("DROP INDEX IF EXISTS idx_syntactic_sugar_name_belong_to")
|
||||||
|
await db_manager.execute(
|
||||||
|
"CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target "
|
||||||
|
"ON syntactic_sugar(name, channel_id, belong_to)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_reply_plain_text(evt: Event) -> str:
|
||||||
|
reply = getattr(evt, "reply", None)
|
||||||
|
if reply is None:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
reply_message = getattr(reply, "message", None)
|
||||||
|
if reply_message is None:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
extract_plain_text = getattr(reply_message, "extract_plain_text", None)
|
||||||
|
if callable(extract_plain_text):
|
||||||
|
return extract_plain_text().strip()
|
||||||
|
return str(reply_message).strip()
|
||||||
|
|
||||||
|
|
||||||
|
def _split_variables(tokens: list[str]) -> tuple[list[str], dict[str, str]]:
|
||||||
|
positional: list[str] = []
|
||||||
|
named: dict[str, str] = {}
|
||||||
|
|
||||||
|
for token in tokens:
|
||||||
|
if "=" in token:
|
||||||
|
key, value = token.split("=", 1)
|
||||||
|
key = key.strip()
|
||||||
|
if key:
|
||||||
|
named[key] = value
|
||||||
|
continue
|
||||||
|
positional.append(token)
|
||||||
|
|
||||||
|
return positional, named
|
||||||
|
|
||||||
|
|
||||||
|
def _render_template(content: str, positional: list[str], named: dict[str, str]) -> str:
|
||||||
|
def replace(match: re.Match[str]) -> str:
|
||||||
|
key = match.group(1).strip()
|
||||||
|
if key.isdigit():
|
||||||
|
idx = int(key) - 1
|
||||||
|
if 0 <= idx < len(positional):
|
||||||
|
return positional[idx]
|
||||||
|
return match.group(0)
|
||||||
|
return named.get(key, match.group(0))
|
||||||
|
|
||||||
|
return re.sub(r"\{([^{}]+)\}", replace, content)
|
||||||
|
|
||||||
|
|
||||||
|
async def _store_sugar(name: str, content: str, belong_to: str, channel_id: str):
|
||||||
|
await db_manager.execute_by_sql_file(
|
||||||
|
ROOT_PATH / "sql" / "insert_sugar.sql",
|
||||||
|
(name, content, belong_to, channel_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _delete_sugar(name: str, belong_to: str, channel_id: str):
|
||||||
|
await db_manager.execute(
|
||||||
|
"DELETE FROM syntactic_sugar WHERE name = ? AND belong_to = ? AND channel_id = ?",
|
||||||
|
(name, belong_to, channel_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _find_sugar(name: str, belong_to: str, channel_id: str) -> str | None:
|
||||||
|
rows = await db_manager.query(
|
||||||
|
(
|
||||||
|
"SELECT content FROM syntactic_sugar "
|
||||||
|
"WHERE name = ? AND channel_id = ? "
|
||||||
|
"ORDER BY CASE WHEN belong_to = ? THEN 0 ELSE 1 END, id ASC "
|
||||||
|
"LIMIT 1"
|
||||||
|
),
|
||||||
|
(name, channel_id, belong_to),
|
||||||
|
)
|
||||||
|
if not rows:
|
||||||
|
return None
|
||||||
|
return rows[0].get("content")
|
||||||
|
|
||||||
|
|
||||||
|
async def _reinject_command(bot: Bot, evt: Event, command_text: str) -> bool:
|
||||||
|
depth = int(getattr(evt, "_syntactic_sugar_depth", 0))
|
||||||
|
if depth >= 3:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
cloned_evt = copy.deepcopy(evt)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("语法糖克隆事件失败")
|
||||||
|
return False
|
||||||
|
|
||||||
|
message = getattr(cloned_evt, "message", None)
|
||||||
|
if message is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
msg_obj = type(message)(command_text)
|
||||||
|
except Exception:
|
||||||
|
msg_obj = command_text
|
||||||
|
|
||||||
|
setattr(cloned_evt, "message", msg_obj)
|
||||||
|
if hasattr(cloned_evt, "original_message"):
|
||||||
|
setattr(cloned_evt, "original_message", msg_obj)
|
||||||
|
if hasattr(cloned_evt, "raw_message"):
|
||||||
|
setattr(cloned_evt, "raw_message", command_text)
|
||||||
|
|
||||||
|
setattr(cloned_evt, "_syntactic_sugar_depth", depth + 1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await handle_event(bot, cloned_evt)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("语法糖回注事件失败")
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@cmd.handle()
|
||||||
|
async def _(bot: Bot, evt: Event, target: DepLongTaskTarget, args: Message = CommandArg()):
|
||||||
|
raw = args.extract_plain_text().strip()
|
||||||
|
if not raw:
|
||||||
|
return
|
||||||
|
|
||||||
|
tokens = raw.split()
|
||||||
|
action = tokens[0]
|
||||||
|
target_id = target.target_id
|
||||||
|
channel_id = target.channel_id
|
||||||
|
|
||||||
|
if action == "存入":
|
||||||
|
if len(tokens) < 2:
|
||||||
|
await cmd.finish("请提供要存入的名称")
|
||||||
|
name = tokens[1].strip()
|
||||||
|
content = " ".join(tokens[2:]).strip()
|
||||||
|
if not content:
|
||||||
|
content = _extract_reply_plain_text(evt)
|
||||||
|
if not content:
|
||||||
|
await cmd.finish("请提供要存入的内容")
|
||||||
|
|
||||||
|
await _store_sugar(name, content, target_id, channel_id)
|
||||||
|
await cmd.finish(f"糖已存入:「{name}」!")
|
||||||
|
|
||||||
|
if action == "删除":
|
||||||
|
if len(tokens) < 2:
|
||||||
|
await cmd.finish("请提供要删除的名称")
|
||||||
|
name = tokens[1].strip()
|
||||||
|
await _delete_sugar(name, target_id, channel_id)
|
||||||
|
await cmd.finish(f"已删除糖:「{name}」!")
|
||||||
|
|
||||||
|
if action == "查看":
|
||||||
|
if len(tokens) < 2:
|
||||||
|
await cmd.finish("请提供要查看的名称")
|
||||||
|
name = tokens[1].strip()
|
||||||
|
content = await _find_sugar(name, target_id, channel_id)
|
||||||
|
if content is None:
|
||||||
|
await cmd.finish(f"没有糖:「{name}」")
|
||||||
|
await cmd.finish(f"糖的内容:「{content}」")
|
||||||
|
|
||||||
|
|
||||||
|
name = action
|
||||||
|
content = await _find_sugar(name, target_id, channel_id)
|
||||||
|
if content is None:
|
||||||
|
await cmd.finish(f"没有糖:「{name}」")
|
||||||
|
|
||||||
|
positional, named = _split_variables(tokens[1:])
|
||||||
|
rendered = _render_template(content, positional, named)
|
||||||
|
|
||||||
|
ok = await _reinject_command(bot, evt, rendered)
|
||||||
|
if not ok:
|
||||||
|
await cmd.finish(f"糖的展开结果:「{rendered}」")
|
||||||
12
konabot/plugins/syntactic_sugar/sql/create_table.sql
Normal file
12
konabot/plugins/syntactic_sugar/sql/create_table.sql
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
-- 创建语法糖表
|
||||||
|
CREATE TABLE IF NOT EXISTS syntactic_sugar (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
name VARCHAR(255) NOT NULL,
|
||||||
|
content TEXT NOT NULL,
|
||||||
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
belong_to VARCHAR(255) NOT NULL,
|
||||||
|
channel_id VARCHAR(255) NOT NULL DEFAULT ''
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target
|
||||||
|
ON syntactic_sugar(name, channel_id, belong_to);
|
||||||
5
konabot/plugins/syntactic_sugar/sql/insert_sugar.sql
Normal file
5
konabot/plugins/syntactic_sugar/sql/insert_sugar.sql
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
-- 插入语法糖,如果同一用户下名称已存在则更新内容
|
||||||
|
INSERT INTO syntactic_sugar (name, content, belong_to, channel_id)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
ON CONFLICT(name, channel_id, belong_to) DO UPDATE SET
|
||||||
|
content = excluded.content;
|
||||||
35
konabot/plugins/trpg_roll/__init__.py
Normal file
35
konabot/plugins/trpg_roll/__init__.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
import re
|
||||||
|
|
||||||
|
import nonebot
|
||||||
|
from nonebot.adapters import Event
|
||||||
|
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||||
|
|
||||||
|
from konabot.common.nb import match_keyword
|
||||||
|
from konabot.common.permsys import register_default_allow_permission, require_permission
|
||||||
|
from konabot.plugins.trpg_roll.core import RollError, roll_expression
|
||||||
|
|
||||||
|
|
||||||
|
PERMISSION_KEY = "trpg.roll"
|
||||||
|
register_default_allow_permission(PERMISSION_KEY)
|
||||||
|
|
||||||
|
|
||||||
|
matcher = nonebot.on_message(
|
||||||
|
rule=match_keyword.match_keyword(re.compile(r"^roll(?:\s+.+)?$", re.I))
|
||||||
|
& require_permission(PERMISSION_KEY),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@matcher.handle()
|
||||||
|
async def _(event: Event, msg: UniMsg):
|
||||||
|
text = msg.extract_plain_text().strip()
|
||||||
|
expr = text[4:].strip()
|
||||||
|
|
||||||
|
if not expr:
|
||||||
|
await UniMessage.text("用法:roll 3d6 / roll d20+5 / roll 2d8+1d4+3 / roll 4dF").send(event)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = roll_expression(expr)
|
||||||
|
await UniMessage.text(result.format()).send(event)
|
||||||
|
except RollError as e:
|
||||||
|
await UniMessage.text(str(e)).send(event)
|
||||||
143
konabot/plugins/trpg_roll/core.py
Normal file
143
konabot/plugins/trpg_roll/core.py
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
import random
|
||||||
|
import re
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
MAX_DICE_COUNT = 100
|
||||||
|
MAX_DICE_FACES = 1000
|
||||||
|
MAX_TERM_COUNT = 20
|
||||||
|
MAX_TOTAL_ROLLS = 200
|
||||||
|
MAX_EXPRESSION_LENGTH = 200
|
||||||
|
MAX_MESSAGE_LENGTH = 1200
|
||||||
|
|
||||||
|
_TOKEN_RE = re.compile(r"([+-]?)(\d*d(?:%|[fF]|\d+)|\d+)")
|
||||||
|
_DICE_RE = re.compile(r"(?i)(\d*)d(%|f|\d+)")
|
||||||
|
|
||||||
|
# 常见跑团表达式示例:3d6、d20+5、2d8+1d4+3、d%、4dF
|
||||||
|
|
||||||
|
|
||||||
|
class RollError(ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class RollTermResult:
|
||||||
|
sign: int
|
||||||
|
source: str
|
||||||
|
detail: str
|
||||||
|
value: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class RollResult:
|
||||||
|
expression: str
|
||||||
|
total: int
|
||||||
|
terms: list[RollTermResult]
|
||||||
|
|
||||||
|
def format(self) -> str:
|
||||||
|
parts = [f"{term.source}={term.detail}" for term in self.terms]
|
||||||
|
detail = " ".join(parts)
|
||||||
|
text = f"{self.expression} = {self.total}"
|
||||||
|
if detail:
|
||||||
|
text += f"\n{detail}"
|
||||||
|
if len(text) > MAX_MESSAGE_LENGTH:
|
||||||
|
raise RollError("结果过长,请减少骰子数量或简化表达式")
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_single_term(raw: str, sign: int, rng: random.Random) -> RollTermResult:
|
||||||
|
dice_match = _DICE_RE.fullmatch(raw)
|
||||||
|
if dice_match:
|
||||||
|
count_text, faces_text = dice_match.groups()
|
||||||
|
count = int(count_text) if count_text else 1
|
||||||
|
if count <= 0:
|
||||||
|
raise RollError("骰子个数必须大于 0")
|
||||||
|
if count > MAX_DICE_COUNT:
|
||||||
|
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
|
||||||
|
|
||||||
|
if faces_text == "%":
|
||||||
|
faces = 100
|
||||||
|
rolls = [rng.randint(1, 100) for _ in range(count)]
|
||||||
|
elif faces_text.lower() == "f":
|
||||||
|
rolls = [rng.choice((-1, 0, 1)) for _ in range(count)]
|
||||||
|
total = sum(rolls) * sign
|
||||||
|
signed = "+" if sign > 0 else "-"
|
||||||
|
return RollTermResult(
|
||||||
|
sign=sign,
|
||||||
|
source=f"{signed}{count}dF",
|
||||||
|
detail="[" + ", ".join(f"{v:+d}" for v in rolls) + "]",
|
||||||
|
value=total,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
faces = int(faces_text)
|
||||||
|
if faces <= 0:
|
||||||
|
raise RollError("骰子面数必须大于 0")
|
||||||
|
if faces > MAX_DICE_FACES:
|
||||||
|
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
|
||||||
|
rolls = [rng.randint(1, faces) for _ in range(count)]
|
||||||
|
|
||||||
|
total = sum(rolls) * sign
|
||||||
|
signed = "+" if sign > 0 else "-"
|
||||||
|
return RollTermResult(
|
||||||
|
sign=sign,
|
||||||
|
source=f"{signed}{count}d{faces_text.upper()}",
|
||||||
|
detail="[" + ", ".join(map(str, rolls)) + "]",
|
||||||
|
value=total,
|
||||||
|
)
|
||||||
|
|
||||||
|
value = int(raw) * sign
|
||||||
|
signed = "+" if sign > 0 else "-"
|
||||||
|
return RollTermResult(sign=sign, source=f"{signed}{raw}", detail=str(abs(value)), value=value)
|
||||||
|
|
||||||
|
|
||||||
|
def roll_expression(expr: str, rng: random.Random | None = None) -> RollResult:
|
||||||
|
expr = expr.strip().replace(" ", "")
|
||||||
|
if not expr:
|
||||||
|
raise RollError("请提供要掷的表达式,例如 roll 3d6 或 roll d20+5")
|
||||||
|
if len(expr) > MAX_EXPRESSION_LENGTH:
|
||||||
|
raise RollError("表达式太长了")
|
||||||
|
|
||||||
|
matches = list(_TOKEN_RE.finditer(expr))
|
||||||
|
if not matches:
|
||||||
|
raise RollError("无法解析表达式,请使用如 3d6、d20+5、2d8+1d4+3、4dF 这样的格式")
|
||||||
|
|
||||||
|
rebuilt = "".join(m.group(0) for m in matches)
|
||||||
|
if rebuilt != expr:
|
||||||
|
raise RollError("表达式中含有无法识别的内容")
|
||||||
|
if len(matches) > MAX_TERM_COUNT:
|
||||||
|
raise RollError(f"表达式项数不能超过 {MAX_TERM_COUNT}")
|
||||||
|
|
||||||
|
total_rolls = 0
|
||||||
|
for m in matches:
|
||||||
|
token = m.group(2)
|
||||||
|
dice_match = _DICE_RE.fullmatch(token)
|
||||||
|
if dice_match:
|
||||||
|
count_text, faces_text = dice_match.groups()
|
||||||
|
count = int(count_text) if count_text else 1
|
||||||
|
if count <= 0:
|
||||||
|
raise RollError("骰子个数必须大于 0")
|
||||||
|
if count > MAX_DICE_COUNT:
|
||||||
|
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
|
||||||
|
if faces_text not in {"%", "f", "F"}:
|
||||||
|
faces = int(faces_text)
|
||||||
|
if faces <= 0:
|
||||||
|
raise RollError("骰子面数必须大于 0")
|
||||||
|
if faces > MAX_DICE_FACES:
|
||||||
|
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
|
||||||
|
total_rolls += count
|
||||||
|
if total_rolls > MAX_TOTAL_ROLLS:
|
||||||
|
raise RollError(f"一次最多只能实际掷 {MAX_TOTAL_ROLLS} 个骰子")
|
||||||
|
|
||||||
|
rng = rng or random.Random()
|
||||||
|
terms: list[RollTermResult] = []
|
||||||
|
total = 0
|
||||||
|
for idx, match in enumerate(matches):
|
||||||
|
sign_text, raw = match.groups()
|
||||||
|
sign = -1 if sign_text == "-" else 1
|
||||||
|
if idx == 0 and sign_text == "":
|
||||||
|
sign = 1
|
||||||
|
term = _parse_single_term(raw, sign, rng)
|
||||||
|
terms.append(term)
|
||||||
|
total += term.value
|
||||||
|
|
||||||
|
return RollResult(expression=expr, total=total, terms=terms)
|
||||||
115
konabot/plugins/wolfx_eew.py
Normal file
115
konabot/plugins/wolfx_eew.py
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
import datetime
|
||||||
|
from nonebot_plugin_alconna import UniMessage
|
||||||
|
from konabot.common.apis.wolfx import CencEewReport, CencEqReport, wolfx_api
|
||||||
|
from konabot.common.subscribe import PosterInfo, broadcast, register_poster_info
|
||||||
|
|
||||||
|
|
||||||
|
provinces_short = [
|
||||||
|
"北京",
|
||||||
|
"天津",
|
||||||
|
"河北",
|
||||||
|
"山西",
|
||||||
|
"内蒙古",
|
||||||
|
"辽宁",
|
||||||
|
"吉林",
|
||||||
|
"黑龙江",
|
||||||
|
"上海",
|
||||||
|
"江苏",
|
||||||
|
"浙江",
|
||||||
|
"安徽",
|
||||||
|
"福建",
|
||||||
|
"江西",
|
||||||
|
"山东",
|
||||||
|
"河南",
|
||||||
|
"湖北",
|
||||||
|
"湖南",
|
||||||
|
"广东",
|
||||||
|
"广西",
|
||||||
|
"海南",
|
||||||
|
"重庆",
|
||||||
|
"四川",
|
||||||
|
"贵州",
|
||||||
|
"云南",
|
||||||
|
"西藏",
|
||||||
|
"陕西",
|
||||||
|
"甘肃",
|
||||||
|
"青海",
|
||||||
|
"宁夏",
|
||||||
|
"新疆",
|
||||||
|
"香港",
|
||||||
|
"澳门",
|
||||||
|
"台湾",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
register_poster_info(
|
||||||
|
"中国地震台网地震速报",
|
||||||
|
PosterInfo(
|
||||||
|
aliases={
|
||||||
|
"地震速报",
|
||||||
|
"地震预警",
|
||||||
|
},
|
||||||
|
description="来自中国地震台网的地震速报",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
CENC_EEW_DISABLED = True
|
||||||
|
|
||||||
|
|
||||||
|
if not CENC_EEW_DISABLED:
|
||||||
|
|
||||||
|
@wolfx_api.cenc_eew.append
|
||||||
|
async def broadcast_eew(report: CencEewReport):
|
||||||
|
# 这个好像没那么准确...
|
||||||
|
is_cn = any(report.HypoCenter.startswith(prefix) for prefix in provinces_short)
|
||||||
|
if (is_cn and report.Magnitude >= 4.2) or (
|
||||||
|
(not is_cn) and report.Magnitude >= 7.0
|
||||||
|
):
|
||||||
|
# 这是中国地震台网网站上,会默认展示的地震信息的等级
|
||||||
|
origin_time_dt = datetime.datetime.strptime(
|
||||||
|
report.OriginTime, "%Y-%m-%d %H:%M:%S"
|
||||||
|
)
|
||||||
|
origin_time_str = (
|
||||||
|
f"{origin_time_dt.month}月"
|
||||||
|
f"{origin_time_dt.day}日"
|
||||||
|
f"{origin_time_dt.hour}时"
|
||||||
|
f"{origin_time_dt.minute}分"
|
||||||
|
)
|
||||||
|
|
||||||
|
# vvv 下面这个其实不准确
|
||||||
|
eid_in_link = report.EventID.split(".")[0]
|
||||||
|
link = f"https://www.cenc.ac.cn/earthquake-manage-publish-web/product-list/{eid_in_link}/summarize"
|
||||||
|
|
||||||
|
msg = UniMessage.text(
|
||||||
|
"据中国地震台网中心 (https://www.cenc.ac.cn/) 报道,"
|
||||||
|
f"北京时间{origin_time_str},"
|
||||||
|
f"{report.HypoCenter}发生{report.Magnitude:.1f}级地震。"
|
||||||
|
f"震源位于 {report.Longitude}° {report.Latitude}°,深度 {report.Depth}km。\n\n"
|
||||||
|
f"详细信息请见 {link}"
|
||||||
|
)
|
||||||
|
await broadcast("中国地震台网地震速报", msg)
|
||||||
|
|
||||||
|
|
||||||
|
@wolfx_api.cenc_eqlist.append
|
||||||
|
async def broadcast_cenc_eqlist(report: CencEqReport):
|
||||||
|
is_cn = any(report.location.startswith(prefix) for prefix in provinces_short)
|
||||||
|
if (is_cn and float(report.magnitude) >= 4.2) or (
|
||||||
|
(not is_cn) and float(report.magnitude) >= 7.0
|
||||||
|
):
|
||||||
|
origin_time_dt = datetime.datetime.strptime(report.time, "%Y-%m-%d %H:%M:%S")
|
||||||
|
origin_time_str = (
|
||||||
|
f"{origin_time_dt.month}月"
|
||||||
|
f"{origin_time_dt.day}日"
|
||||||
|
f"{origin_time_dt.hour}时"
|
||||||
|
f"{origin_time_dt.minute}分"
|
||||||
|
)
|
||||||
|
|
||||||
|
msg = UniMessage.text(
|
||||||
|
"据中国地震台网中心 (https://www.cenc.ac.cn/) 消息,"
|
||||||
|
f"北京时间{origin_time_str},"
|
||||||
|
f"{report.location}发生{report.magnitude}级地震。"
|
||||||
|
f"震源位于 {report.longtitude}° {report.latitude}°,深度 {report.depth}km。\n\n"
|
||||||
|
f"数据来源于 Wolfx OpenAPI,事件 ID: {report.EventID}"
|
||||||
|
)
|
||||||
|
await broadcast("中国地震台网地震速报", msg)
|
||||||
25
poetry.lock
generated
25
poetry.lock
generated
@ -4067,6 +4067,29 @@ type = "legacy"
|
|||||||
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
|
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
|
||||||
reference = "mirrors"
|
reference = "mirrors"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pytest-mock"
|
||||||
|
version = "3.15.1"
|
||||||
|
description = "Thin-wrapper around the mock package for easier use with pytest"
|
||||||
|
optional = false
|
||||||
|
python-versions = ">=3.9"
|
||||||
|
groups = ["main"]
|
||||||
|
files = [
|
||||||
|
{file = "pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d"},
|
||||||
|
{file = "pytest_mock-3.15.1.tar.gz", hash = "sha256:1849a238f6f396da19762269de72cb1814ab44416fa73a8686deac10b0d87a0f"},
|
||||||
|
]
|
||||||
|
|
||||||
|
[package.dependencies]
|
||||||
|
pytest = ">=6.2.5"
|
||||||
|
|
||||||
|
[package.extras]
|
||||||
|
dev = ["pre-commit", "pytest-asyncio", "tox"]
|
||||||
|
|
||||||
|
[package.source]
|
||||||
|
type = "legacy"
|
||||||
|
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
|
||||||
|
reference = "mirrors"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "python-dotenv"
|
name = "python-dotenv"
|
||||||
version = "1.2.2"
|
version = "1.2.2"
|
||||||
@ -5311,4 +5334,4 @@ reference = "mirrors"
|
|||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.1"
|
lock-version = "2.1"
|
||||||
python-versions = ">=3.12,<4.0"
|
python-versions = ">=3.12,<4.0"
|
||||||
content-hash = "f2d5345d93636e19e49852af636d598c88394aaef4020f383402394b58452e3d"
|
content-hash = "23d2eadd1c36d017ff77934bd02b56e37d26005e6a99793b623c01162b90d67d"
|
||||||
|
|||||||
@ -37,6 +37,8 @@ dependencies = [
|
|||||||
"pytest (>=8.0.0,<9.0.0)",
|
"pytest (>=8.0.0,<9.0.0)",
|
||||||
"nonebug (>=0.4.3,<0.5.0)",
|
"nonebug (>=0.4.3,<0.5.0)",
|
||||||
"pytest-cov (>=7.0.0,<8.0.0)",
|
"pytest-cov (>=7.0.0,<8.0.0)",
|
||||||
|
"aiosignal (>=1.4.0,<2.0.0)",
|
||||||
|
"pytest-mock (>=3.15.1,<4.0.0)",
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
|
|||||||
@ -12,8 +12,22 @@ def filter(change: Change, path: str) -> bool:
|
|||||||
return False
|
return False
|
||||||
if Path(path).absolute().is_relative_to((base / ".git").absolute()):
|
if Path(path).absolute().is_relative_to((base / ".git").absolute()):
|
||||||
return False
|
return False
|
||||||
if Path(path).absolute().is_relative_to((base / "assets" / "oracle" / "image").absolute()):
|
if (
|
||||||
|
Path(path)
|
||||||
|
.absolute()
|
||||||
|
.is_relative_to((base / "assets" / "oracle" / "image").absolute())
|
||||||
|
):
|
||||||
# 还要解决坏枪的这个问题
|
# 还要解决坏枪的这个问题
|
||||||
return False
|
return False
|
||||||
|
if Path(path).absolute().is_relative_to((base / "htmlcov").absolute()):
|
||||||
|
return False
|
||||||
|
if Path(path).absolute().is_relative_to((base / "test").absolute()):
|
||||||
|
return False
|
||||||
|
if Path(path).absolute().is_relative_to((base / ".pytest_cache").absolute()):
|
||||||
|
return False
|
||||||
|
if Path(path).absolute().is_relative_to((base / ".ruff_cache").absolute()):
|
||||||
|
return False
|
||||||
|
if path.endswith(".coverage"):
|
||||||
|
return False
|
||||||
print(path)
|
print(path)
|
||||||
return True
|
return True
|
||||||
|
|||||||
78
tests/services/test_wolfx_api.py
Normal file
78
tests/services/test_wolfx_api.py
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
import json
|
||||||
|
from unittest.mock import AsyncMock
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from konabot.common.apis.wolfx import CencEewReport, WolfxAPIService, WolfxWebSocket
|
||||||
|
|
||||||
|
|
||||||
|
obj_example = {
|
||||||
|
"ID": "bacby4yab1oyb",
|
||||||
|
"EventID": "202603100805.0001",
|
||||||
|
"ReportTime": "2026-03-10 08:05:29",
|
||||||
|
"ReportNum": 1,
|
||||||
|
"OriginTime": "2026-03-10 08:05:29",
|
||||||
|
"HypoCenter": "新疆昌吉州呼图壁县",
|
||||||
|
"Latitude": 43.687,
|
||||||
|
"Longitude": 86.427,
|
||||||
|
"Magnitude": 4.0,
|
||||||
|
"Depth": 14,
|
||||||
|
"MaxIntensity": 5,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_wolfx_websocket_handle():
|
||||||
|
ws = WolfxWebSocket("")
|
||||||
|
|
||||||
|
mock_callback = AsyncMock()
|
||||||
|
ws.signal.append(mock_callback)
|
||||||
|
ws.signal.freeze()
|
||||||
|
|
||||||
|
obj1 = {
|
||||||
|
"type": "heartbeat",
|
||||||
|
"ver": 18,
|
||||||
|
"id": "a69edf6436c5b605",
|
||||||
|
"timestamp": 1773111106701,
|
||||||
|
}
|
||||||
|
data1 = json.dumps(obj1).encode()
|
||||||
|
await ws.handle(data1)
|
||||||
|
mock_callback.assert_not_called()
|
||||||
|
mock_callback.reset_mock()
|
||||||
|
|
||||||
|
obj2 = obj_example
|
||||||
|
data2 = json.dumps(obj2).encode()
|
||||||
|
await ws.handle(data2)
|
||||||
|
mock_callback.assert_called_once_with(data2)
|
||||||
|
mock_callback.reset_mock()
|
||||||
|
|
||||||
|
data3 = b"what the f"
|
||||||
|
await ws.handle(data3)
|
||||||
|
mock_callback.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_wolfx_bind_pydantic():
|
||||||
|
sv = WolfxAPIService()
|
||||||
|
called: list[CencEewReport] = []
|
||||||
|
|
||||||
|
@sv.cenc_eew.append
|
||||||
|
async def _(data: CencEewReport):
|
||||||
|
called.append(data)
|
||||||
|
|
||||||
|
sv._cenc_eew_ws.signal.freeze()
|
||||||
|
sv.cenc_eew.freeze()
|
||||||
|
|
||||||
|
data = json.dumps(obj_example).encode()
|
||||||
|
await sv._cenc_eew_ws.signal.send(data)
|
||||||
|
|
||||||
|
assert len(called) == 1
|
||||||
|
data = called[0]
|
||||||
|
|
||||||
|
assert data.HypoCenter == obj_example["HypoCenter"]
|
||||||
|
assert data.EventID == obj_example["EventID"]
|
||||||
|
|
||||||
|
# Don't panic when the object is invalid
|
||||||
|
data = json.dumps({"type": "给"}).encode()
|
||||||
|
await sv._cenc_eew_ws.signal.send(data)
|
||||||
|
|
||||||
|
assert len(called) == 1
|
||||||
88
tests/test_fx_process.py
Normal file
88
tests/test_fx_process.py
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
from importlib.util import module_from_spec, spec_from_file_location
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import nonebot
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
|
||||||
|
nonebot.init()
|
||||||
|
|
||||||
|
MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/fx_handle.py"
|
||||||
|
SPEC = spec_from_file_location("test_fx_handle_module", MODULE_PATH)
|
||||||
|
assert SPEC is not None and SPEC.loader is not None
|
||||||
|
fx_handle = module_from_spec(SPEC)
|
||||||
|
SPEC.loader.exec_module(fx_handle)
|
||||||
|
ImageFilterImplement = fx_handle.ImageFilterImplement
|
||||||
|
|
||||||
|
INIT_MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/__init__.py"
|
||||||
|
INIT_SPEC = spec_from_file_location("test_fx_init_module", INIT_MODULE_PATH)
|
||||||
|
assert INIT_SPEC is not None and INIT_SPEC.loader is not None
|
||||||
|
fx_init = module_from_spec(INIT_SPEC)
|
||||||
|
INIT_SPEC.loader.exec_module(fx_init)
|
||||||
|
prase_input_args = fx_init.prase_input_args
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_jpeg_damage_keeps_size_and_rgba_mode():
|
||||||
|
image = Image.new("RGBA", (32, 24), (255, 0, 0, 128))
|
||||||
|
|
||||||
|
result = ImageFilterImplement.apply_jpeg_damage(image, 5)
|
||||||
|
|
||||||
|
assert result.size == image.size
|
||||||
|
assert result.mode == "RGBA"
|
||||||
|
assert result.getchannel("A").getextrema() == (128, 128)
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_jpeg_damage_clamps_quality_range():
|
||||||
|
image = Image.new("RGB", (16, 16), (123, 222, 111))
|
||||||
|
|
||||||
|
low = ImageFilterImplement.apply_jpeg_damage(image, -10)
|
||||||
|
high = ImageFilterImplement.apply_jpeg_damage(image, 999)
|
||||||
|
|
||||||
|
assert low.size == image.size
|
||||||
|
assert high.size == image.size
|
||||||
|
assert low.mode == "RGBA"
|
||||||
|
assert high.mode == "RGBA"
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_resize_clamps_small_result_to_at_least_one_pixel():
|
||||||
|
image = Image.new("RGBA", (10, 10), (255, 0, 0, 255))
|
||||||
|
|
||||||
|
result = ImageFilterImplement.apply_resize(image, 0.01)
|
||||||
|
|
||||||
|
assert result.size == (1, 1)
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_resize_negative_x_with_positive_y_only_mirrors_horizontally():
|
||||||
|
image = Image.new("RGBA", (2, 1))
|
||||||
|
image.putpixel((0, 0), (255, 0, 0, 255))
|
||||||
|
image.putpixel((1, 0), (0, 0, 255, 255))
|
||||||
|
|
||||||
|
result = ImageFilterImplement.apply_resize(image, -1, 1)
|
||||||
|
|
||||||
|
assert result.size == (2, 1)
|
||||||
|
assert result.getpixel((0, 0)) == (0, 0, 255, 255)
|
||||||
|
assert result.getpixel((1, 0)) == (255, 0, 0, 255)
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_resize_negative_scale_without_y_flips_both_axes():
|
||||||
|
image = Image.new("RGBA", (2, 2))
|
||||||
|
image.putpixel((0, 0), (255, 0, 0, 255))
|
||||||
|
image.putpixel((1, 0), (0, 255, 0, 255))
|
||||||
|
image.putpixel((0, 1), (0, 0, 255, 255))
|
||||||
|
image.putpixel((1, 1), (255, 255, 0, 255))
|
||||||
|
|
||||||
|
result = ImageFilterImplement.apply_resize(image, -1)
|
||||||
|
|
||||||
|
assert result.size == (2, 2)
|
||||||
|
assert result.getpixel((0, 0)) == (255, 255, 0, 255)
|
||||||
|
assert result.getpixel((1, 0)) == (0, 0, 255, 255)
|
||||||
|
assert result.getpixel((0, 1)) == (0, 255, 0, 255)
|
||||||
|
assert result.getpixel((1, 1)) == (255, 0, 0, 255)
|
||||||
|
|
||||||
|
|
||||||
|
def test_prase_input_args_parses_resize_second_argument_as_float():
|
||||||
|
filters = prase_input_args("缩放 2 3")
|
||||||
|
|
||||||
|
assert len(filters) == 1
|
||||||
|
assert filters[0].name == "缩放"
|
||||||
|
assert filters[0].args == [2.0, 3.0]
|
||||||
40
tests/test_permsys_default_allow.py
Normal file
40
tests/test_permsys_default_allow.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from pathlib import Path
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from konabot.common.database import DatabaseManager
|
||||||
|
from konabot.common.permsys import PermManager, register_default_allow_permission
|
||||||
|
from konabot.common.permsys.entity import PermEntity
|
||||||
|
from konabot.common.permsys.migrates import execute_migration
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def tempdb():
|
||||||
|
with TemporaryDirectory() as _tempdir:
|
||||||
|
tempdir = Path(_tempdir)
|
||||||
|
db = DatabaseManager(tempdir / "perm.sqlite3")
|
||||||
|
yield db
|
||||||
|
await db.close_all_connections()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_register_default_allow_permission_records_key():
|
||||||
|
register_default_allow_permission("test.default.allow")
|
||||||
|
|
||||||
|
async with tempdb() as db:
|
||||||
|
async with db.get_conn() as conn:
|
||||||
|
await execute_migration(conn)
|
||||||
|
|
||||||
|
pm = PermManager(db)
|
||||||
|
await pm.update_permission(
|
||||||
|
PermEntity("sys", "global", "global"),
|
||||||
|
"test.default.allow",
|
||||||
|
True,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert await pm.check_has_permission(
|
||||||
|
[PermEntity("dummy", "user", "1"), PermEntity("sys", "global", "global")],
|
||||||
|
"test.default.allow.sub",
|
||||||
|
)
|
||||||
66
tests/test_trpg_roll.py
Normal file
66
tests/test_trpg_roll.py
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
import random
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from konabot.plugins.trpg_roll.core import RollError, roll_expression
|
||||||
|
|
||||||
|
|
||||||
|
class FakeRandom:
|
||||||
|
def __init__(self, randint_values: list[int] | None = None, choice_values: list[int] | None = None):
|
||||||
|
self._randint_values = list(randint_values or [])
|
||||||
|
self._choice_values = list(choice_values or [])
|
||||||
|
|
||||||
|
def randint(self, _a: int, _b: int) -> int:
|
||||||
|
assert self._randint_values
|
||||||
|
return self._randint_values.pop(0)
|
||||||
|
|
||||||
|
def choice(self, _seq):
|
||||||
|
assert self._choice_values
|
||||||
|
return self._choice_values.pop(0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_roll_expression_basic():
|
||||||
|
rng = FakeRandom(randint_values=[2, 4, 5])
|
||||||
|
result = roll_expression("3d6", rng=rng)
|
||||||
|
|
||||||
|
assert result.total == 11
|
||||||
|
assert result.format() == "3d6 = 11\n+3d6=[2, 4, 5]"
|
||||||
|
|
||||||
|
|
||||||
|
def test_roll_expression_multiple_terms():
|
||||||
|
rng = FakeRandom(randint_values=[14, 3, 1])
|
||||||
|
result = roll_expression("d20+1d4-2", rng=rng)
|
||||||
|
|
||||||
|
assert result.total == 15
|
||||||
|
assert result.format() == "d20+1d4-2 = 15\n+1d20=[14] +1d4=[3] -2=2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_roll_expression_df():
|
||||||
|
rng = FakeRandom(choice_values=[-1, 0, 1, 1])
|
||||||
|
result = roll_expression("4dF", rng=rng)
|
||||||
|
|
||||||
|
assert result.total == 1
|
||||||
|
assert result.format() == "4dF = 1\n+4dF=[-1, +0, +1, +1]"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("expr", "message"),
|
||||||
|
[
|
||||||
|
("", "请提供要掷的表达式"),
|
||||||
|
("abc", "无法解析表达式"),
|
||||||
|
("1d0", "骰子面数必须大于 0"),
|
||||||
|
("0d6", "骰子个数必须大于 0"),
|
||||||
|
("101d6", "单项最多只能掷 100 个骰子"),
|
||||||
|
("1d1001", "骰子面数不能超过 1000"),
|
||||||
|
("201d1", "单项最多只能掷 100 个骰子"),
|
||||||
|
("1d6*2", "表达式中含有无法识别的内容"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_roll_expression_invalid(expr: str, message: str):
|
||||||
|
with pytest.raises(RollError, match=message):
|
||||||
|
roll_expression(expr, rng=random.Random(0))
|
||||||
|
|
||||||
|
|
||||||
|
def test_roll_expression_total_roll_limit():
|
||||||
|
with pytest.raises(RollError, match="一次最多只能实际掷 200 个骰子"):
|
||||||
|
roll_expression("100d6+100d6+1d6", rng=random.Random(0))
|
||||||
Reference in New Issue
Block a user