Compare commits
39 Commits
feature/pe
...
enhancemen
| Author | SHA1 | Date | |
|---|---|---|---|
| 157236d9a6 | |||
|
8725f28caf
|
|||
| 51c0bf4229 | |||
| 5b1c6d446c | |||
| 717b7a95e8 | |||
| 9bac2b8cdf | |||
| bfb8ebab29 | |||
| 805e60a9ff | |||
| 1331f8f893 | |||
|
fef9041a97
|
|||
| 00f42dbdf1 | |||
| d37c4870d8 | |||
| 23b9f101b3 | |||
|
6a2fe11753
|
|||
|
97e87c7ec3
|
|||
|
8c1651ad3d
|
|||
| ff60642c62 | |||
| 69b5908445 | |||
| a542ed1fd9 | |||
| e86a385448 | |||
| d4bb36a074 | |||
| 1a2a3c0468 | |||
| 67502cb932 | |||
| f9a312b80a | |||
| 1980f8a895 | |||
|
d273ed4b1a
|
|||
|
265e9cc583
|
|||
|
8f5061ba41
|
|||
|
b3c3c77f3c
|
|||
|
6a84ce2cd8
|
|||
|
392c699b33
|
|||
|
72e21cd9aa
|
|||
|
f3389ff2b9
|
|||
|
e59d3c2e4b
|
|||
|
31d19b7ec0
|
|||
|
c2f677911d
|
|||
|
f5b81319f8
|
|||
|
870e2383d8
|
|||
| 7e8fa45f36 |
@ -39,7 +39,7 @@ steps:
|
||||
commands:
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov-report term-missing:skip-covered
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov=./konabot/ --cov-report term-missing:skip-covered
|
||||
- name: 发送构建结果到 ntfy
|
||||
image: parrazam/drone-ntfy
|
||||
when:
|
||||
|
||||
10
.gitignore
vendored
10
.gitignore
vendored
@ -3,9 +3,14 @@
|
||||
/data
|
||||
/pyrightconfig.json
|
||||
/pyrightconfig.toml
|
||||
/uv.lock
|
||||
|
||||
# 缓存文件
|
||||
__pycache__
|
||||
/.ruff_cache
|
||||
/.pytest_cache
|
||||
/.mypy_cache
|
||||
/.black_cache
|
||||
|
||||
# 可能会偶然生成的 diff 文件
|
||||
/*.diff
|
||||
@ -14,3 +19,8 @@ __pycache__
|
||||
/.coverage
|
||||
/.coverage.db
|
||||
/htmlcov
|
||||
|
||||
# 对手动创建虚拟环境的人
|
||||
/.venv
|
||||
/venv
|
||||
*.egg-info
|
||||
|
||||
4
.vscode/settings.json
vendored
4
.vscode/settings.json
vendored
@ -1,3 +1,5 @@
|
||||
{
|
||||
"python.REPL.enableREPLSmartSend": false
|
||||
"python.REPL.enableREPLSmartSend": false,
|
||||
"python-envs.defaultEnvManager": "ms-python.python:poetry",
|
||||
"python-envs.defaultPackageManager": "ms-python.python:poetry"
|
||||
}
|
||||
15
Dockerfile
15
Dockerfile
@ -1,16 +1,3 @@
|
||||
FROM alpine:latest AS artifacts
|
||||
|
||||
RUN apk add --no-cache curl xz
|
||||
WORKDIR /tmp
|
||||
|
||||
RUN mkdir -p /artifacts
|
||||
RUN curl -L -o typst.tar.xz "https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz" \
|
||||
&& tar -xJf typst.tar.xz \
|
||||
&& mv typst-x86_64-unknown-linux-musl/typst /artifacts
|
||||
|
||||
RUN chmod -R +x /artifacts/
|
||||
|
||||
|
||||
FROM python:3.13-slim AS base
|
||||
|
||||
ENV VIRTUAL_ENV=/app/.venv \
|
||||
@ -51,7 +38,6 @@ RUN uv sync --no-install-project
|
||||
FROM base AS runtime
|
||||
|
||||
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
|
||||
COPY --from=artifacts /artifacts/ /usr/local/bin/
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
@ -61,6 +47,7 @@ COPY bot.py pyproject.toml .env.prod .env.test ./
|
||||
COPY assets ./assets
|
||||
COPY scripts ./scripts
|
||||
COPY konabot ./konabot
|
||||
COPY tests ./tests
|
||||
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
|
||||
5
bot.py
5
bot.py
@ -7,6 +7,7 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter
|
||||
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
|
||||
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
|
||||
|
||||
from konabot.common.appcontext import run_afterinit_functions
|
||||
from konabot.common.log import init_logger
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.path import LOG_PATH
|
||||
@ -56,9 +57,7 @@ def main():
|
||||
nonebot.load_plugins("konabot/plugins")
|
||||
nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
|
||||
|
||||
from konabot.common import permsys
|
||||
|
||||
permsys.create_startup()
|
||||
run_afterinit_functions()
|
||||
|
||||
# 注册关闭钩子
|
||||
@driver.on_shutdown
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
- `konabot/common/permsys/__init__.py`
|
||||
- 暴露 `PermManager`、`DepPermManager`、`require_permission`
|
||||
- 负责数据库初始化、启动迁移、超级管理员默认授权
|
||||
- 提供 `register_default_allow_permission()` 用于注册“启动时默认放行”的权限键
|
||||
- `konabot/common/permsys/entity.py`
|
||||
- 定义 `PermEntity`
|
||||
- 将事件转换为可查询的实体链
|
||||
@ -134,6 +135,14 @@ PermEntity("ob11", "user", str(account)), "*", True
|
||||
|
||||
也就是说,配置中的超级管理员会直接拥有全部权限。
|
||||
|
||||
此外,模块也支持插件在导入阶段通过 `register_default_allow_permission("some.key")` 注册默认放行的权限键;这些键会在启动时被写入到:
|
||||
|
||||
```python
|
||||
PermEntity("sys", "global", "global"), "some.key", True
|
||||
```
|
||||
|
||||
这适合“默认所有人可用,但仍希望后续能被权限系统单独关闭”的功能。
|
||||
|
||||
这属于启动时自动灌入的保底策略,不依赖手工授权命令。
|
||||
|
||||
## 在插件中使用
|
||||
|
||||
37
docs/subscribe.md
Normal file
37
docs/subscribe.md
Normal file
@ -0,0 +1,37 @@
|
||||
# subscribe 模块
|
||||
|
||||
一套统一的接口,让用户可以订阅一些延迟或者定时消息。
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from konabot.common.subscribe import register_poster_info, broadcast, PosterInfo
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
# 注册了服务信息,用户可以用「查询可用订阅」指令了解可用的订阅清单。
|
||||
# 用户可以使用「订阅 某某服务通知」或者「订阅 某某服务」来订阅消息。
|
||||
# 如果用户在群聊发起订阅,则会在 QQ 群订阅,不然会在私聊订阅
|
||||
register_poster_info("某某服务通知", PosterInfo(
|
||||
aliases={"某某服务"},
|
||||
description="告诉你关于某某的最新资讯等信息",
|
||||
))
|
||||
|
||||
async def main():
|
||||
while True:
|
||||
# 这里的服务 channel 名字必须填写该服务的名字,不可以是 alias
|
||||
# 这会给所有订阅了该通道的用户发送「向大家发送纯文本通知」
|
||||
await broadcast("某某服务通知", "向大家发送纯文本通知")
|
||||
|
||||
# 也可以发送 UniMessage 对象,可以构造包含图片的通知等
|
||||
data = Path('image.png').read_bytes()
|
||||
await broadcast(
|
||||
"某某服务通知",
|
||||
UniMessage.text("很遗憾告诉大家,我们倒闭了:").image(raw=data),
|
||||
)
|
||||
|
||||
await asyncio.sleep(114.514)
|
||||
```
|
||||
|
||||
该模块的代码请查阅 `/konabot/common/subscribe/` 下的文件。
|
||||
281
konabot/common/apis/wolfx.py
Normal file
281
konabot/common/apis/wolfx.py
Normal file
@ -0,0 +1,281 @@
|
||||
"""
|
||||
Wolfx 防灾免费 API
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Literal, TypeVar, cast
|
||||
import aiohttp
|
||||
from aiosignal import Signal
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, RootModel
|
||||
import pydantic
|
||||
|
||||
from konabot.common.appcontext import after_init
|
||||
|
||||
|
||||
class ScEewReport(BaseModel):
|
||||
"""
|
||||
四川地震局报文
|
||||
"""
|
||||
|
||||
ID: str
|
||||
"EEW 发报 ID"
|
||||
|
||||
EventID: str
|
||||
"EEW 发报事件 ID"
|
||||
|
||||
ReportTime: str
|
||||
"EEW 发报时间(UTC+8)"
|
||||
|
||||
ReportNum: int
|
||||
"EEW 发报数"
|
||||
|
||||
OriginTime: str
|
||||
"发震时间(UTC+8)"
|
||||
|
||||
HypoCenter: str
|
||||
"震源地"
|
||||
|
||||
Latitude: float
|
||||
"震源地纬度"
|
||||
|
||||
Longitude: float
|
||||
"震源地经度"
|
||||
|
||||
Magnitude: float
|
||||
"震级"
|
||||
|
||||
Depth: float | None
|
||||
"震源深度"
|
||||
|
||||
MaxIntensity: float
|
||||
"最大烈度"
|
||||
|
||||
|
||||
class CencEewReport(BaseModel):
|
||||
"""
|
||||
中国地震台网报文
|
||||
"""
|
||||
|
||||
ID: str
|
||||
"EEW 发报 ID"
|
||||
|
||||
EventID: str
|
||||
"EEW 发报事件 ID"
|
||||
|
||||
ReportTime: str
|
||||
"EEW 发报时间(UTC+8)"
|
||||
|
||||
ReportNum: int
|
||||
"EEW 发报数"
|
||||
|
||||
OriginTime: str
|
||||
"发震时间(UTC+8)"
|
||||
|
||||
HypoCenter: str
|
||||
"震源地"
|
||||
|
||||
Latitude: float
|
||||
"震源地纬度"
|
||||
|
||||
Longitude: float
|
||||
"震源地经度"
|
||||
|
||||
Magnitude: float
|
||||
"震级"
|
||||
|
||||
Depth: float | None
|
||||
"震源深度"
|
||||
|
||||
MaxIntensity: float
|
||||
"最大烈度"
|
||||
|
||||
|
||||
class CencEqReport(BaseModel):
|
||||
type: str
|
||||
"报告类型"
|
||||
|
||||
EventID: str
|
||||
"事件 ID"
|
||||
|
||||
time: str
|
||||
"UTC+8 格式的地震发生时间"
|
||||
|
||||
location: str
|
||||
"地震发生位置"
|
||||
|
||||
magnitude: str
|
||||
"震级"
|
||||
|
||||
depth: str
|
||||
"地震深度"
|
||||
|
||||
latitude: str
|
||||
"纬度"
|
||||
|
||||
longtitude: str
|
||||
"经度"
|
||||
|
||||
intensity: str
|
||||
"烈度"
|
||||
|
||||
|
||||
class CencEqlist(RootModel):
|
||||
root: dict[str, CencEqReport]
|
||||
|
||||
|
||||
class WolfxWebSocket:
|
||||
def __init__(self, url: str) -> None:
|
||||
self.url = url
|
||||
self.signal: Signal[bytes] = Signal(self)
|
||||
self._running = False
|
||||
self._task: asyncio.Task | None = None
|
||||
self._session: aiohttp.ClientSession | None = None
|
||||
self._ws: aiohttp.ClientWebSocketResponse | None = None
|
||||
|
||||
@property
|
||||
def session(self) -> aiohttp.ClientSession: # pragma: no cover
|
||||
assert self._session is not None
|
||||
return self._session
|
||||
|
||||
async def start(self): # pragma: no cover
|
||||
if self._running:
|
||||
return
|
||||
self._running = True
|
||||
self._session = aiohttp.ClientSession()
|
||||
self._task = asyncio.create_task(self._run())
|
||||
self.signal.freeze()
|
||||
|
||||
async def stop(self): # pragma: no cover
|
||||
self._running = False
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
if self._session:
|
||||
await self._session.close()
|
||||
|
||||
async def _run(self): # pragma: no cover
|
||||
retry_delay = 1
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
async with self.session.ws_connect(self.url) as ws:
|
||||
self._ws = ws
|
||||
logger.info(f"Wolfx API 服务连接上了 {self.url} 的 WebSocket")
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
await self.handle(cast(str, msg.data).encode())
|
||||
elif msg.type == aiohttp.WSMsgType.BINARY:
|
||||
await self.handle(cast(bytes, msg.data))
|
||||
elif msg.type == aiohttp.WSMsgType.CLOSED:
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
break
|
||||
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
||||
logger.warning("连接 WebSocket 时发生错误")
|
||||
logger.exception(e)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error("Wolfx API 发生未知错误")
|
||||
logger.exception(e)
|
||||
self._ws = None
|
||||
|
||||
if self._running:
|
||||
logger.info(f"Wolfx API 准备断线重连 {self.url}")
|
||||
await asyncio.sleep(retry_delay)
|
||||
retry_delay = min(retry_delay * 2, 60)
|
||||
|
||||
async def handle(self, data: bytes):
|
||||
try:
|
||||
obj = json.loads(data)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.warning("解析 Wolfs API 时出错")
|
||||
logger.exception(e)
|
||||
return
|
||||
|
||||
if obj.get("type") == "heartbeat" or obj.get("type") == "pong":
|
||||
logger.debug(f"Wolfx API 收到了来自 {self.url} 的心跳: {obj}")
|
||||
else:
|
||||
await self.signal.send(data)
|
||||
|
||||
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
|
||||
|
||||
class WolfxAPIService:
|
||||
sc_eew: Signal[ScEewReport]
|
||||
"四川地震局地震速报"
|
||||
|
||||
cenc_eew: Signal[CencEewReport]
|
||||
"中国地震台网地震速报"
|
||||
|
||||
cenc_eqlist: Signal[CencEqReport]
|
||||
"中国地震台网地震信息发布"
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.sc_eew = Signal(self)
|
||||
self._sc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/sc_eew")
|
||||
WolfxAPIService.bind(self.sc_eew, self._sc_eew_ws, ScEewReport)
|
||||
|
||||
self.cenc_eew = Signal(self)
|
||||
self._cenc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eew")
|
||||
WolfxAPIService.bind(self.cenc_eew, self._cenc_eew_ws, CencEewReport)
|
||||
|
||||
self.cenc_eqlist = Signal(self)
|
||||
self._cenc_eqlist_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eqlist")
|
||||
WolfxAPIService.bind(self.cenc_eqlist, self._cenc_eqlist_ws, CencEqReport)
|
||||
|
||||
@staticmethod
|
||||
def bind(signal: Signal[T], ws: WolfxWebSocket, t: type[T]):
|
||||
@ws.signal.append
|
||||
async def _(data: bytes):
|
||||
try:
|
||||
obj = t.model_validate_json(data)
|
||||
logger.info(f"接收到来自 Wolfx API 的信息:{data}")
|
||||
await signal.send(obj)
|
||||
except pydantic.ValidationError as e:
|
||||
logger.warning(f"解析 Wolfx API 时出错 URL={ws.url}")
|
||||
logger.error(e)
|
||||
|
||||
async def start(self): # pragma: no cover
|
||||
self.cenc_eew.freeze()
|
||||
self.sc_eew.freeze()
|
||||
self.cenc_eqlist.freeze()
|
||||
async with asyncio.TaskGroup() as task_group:
|
||||
if len(self.cenc_eew) > 0:
|
||||
task_group.create_task(self._cenc_eew_ws.start())
|
||||
|
||||
if len(self.sc_eew) > 0:
|
||||
task_group.create_task(self._sc_eew_ws.start())
|
||||
|
||||
if len(self.cenc_eqlist) > 0:
|
||||
task_group.create_task(self._cenc_eqlist_ws.start())
|
||||
|
||||
async def stop(self): # pragma: no cover
|
||||
async with asyncio.TaskGroup() as task_group:
|
||||
task_group.create_task(self._cenc_eew_ws.stop())
|
||||
task_group.create_task(self._sc_eew_ws.stop())
|
||||
task_group.create_task(self._cenc_eqlist_ws.stop())
|
||||
|
||||
|
||||
wolfx_api = WolfxAPIService()
|
||||
|
||||
|
||||
@after_init
|
||||
def init(): # pragma: no cover
|
||||
import nonebot
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
await wolfx_api.start()
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
await wolfx_api.stop()
|
||||
15
konabot/common/appcontext.py
Normal file
15
konabot/common/appcontext.py
Normal file
@ -0,0 +1,15 @@
|
||||
from typing import Any, Callable
|
||||
|
||||
|
||||
AFTER_INIT_FUNCTION = Callable[[], Any]
|
||||
|
||||
_after_init_functions: list[AFTER_INIT_FUNCTION] = []
|
||||
|
||||
|
||||
def after_init(func: AFTER_INIT_FUNCTION):
|
||||
_after_init_functions.append(func)
|
||||
|
||||
|
||||
def run_afterinit_functions(): # pragma: no cover
|
||||
for f in _after_init_functions:
|
||||
f()
|
||||
@ -1,9 +1,10 @@
|
||||
import asyncio
|
||||
from typing import Any, Awaitable, Callable
|
||||
import aiohttp
|
||||
import hashlib
|
||||
import platform
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
@ -14,6 +15,8 @@ from pydantic import BaseModel
|
||||
|
||||
@dataclass
|
||||
class ArtifactDepends:
|
||||
_Callback = Callable[[bool], Awaitable[Any]]
|
||||
|
||||
url: str
|
||||
sha256: str
|
||||
target: Path
|
||||
@ -27,6 +30,9 @@ class ArtifactDepends:
|
||||
use_proxy: bool = True
|
||||
"网络问题,赫赫;使用的是 Discord 模块配置的 proxy"
|
||||
|
||||
callbacks: list[_Callback] = field(default_factory=list)
|
||||
"在任务完成以后,应该做的事情"
|
||||
|
||||
def is_corresponding_platform(self) -> bool:
|
||||
if self.required_os is not None:
|
||||
if self.required_os.lower() != platform.system().lower():
|
||||
@ -36,26 +42,43 @@ class ArtifactDepends:
|
||||
return False
|
||||
return True
|
||||
|
||||
def on_finished(self, task: _Callback) -> _Callback:
|
||||
self.callbacks.append(task)
|
||||
return task
|
||||
|
||||
async def _finished(self, downloaded: bool) -> list[Any | BaseException]:
|
||||
tasks = set()
|
||||
for f in self.callbacks:
|
||||
tasks.add(f(downloaded))
|
||||
return await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
prefetch_artifact: bool = False
|
||||
"是否提前下载好二进制依赖"
|
||||
|
||||
|
||||
artifact_list = []
|
||||
artifact_list: list[ArtifactDepends] = []
|
||||
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
config = nonebot.get_plugin_config(Config)
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
if config.prefetch_artifact:
|
||||
logger.info("启动检测中:正在检测需求的二进制是否下载")
|
||||
semaphore = asyncio.Semaphore(10)
|
||||
|
||||
async def _task(artifact: ArtifactDepends):
|
||||
async with semaphore:
|
||||
await ensure_artifact(artifact)
|
||||
downloaded = await ensure_artifact(artifact)
|
||||
result = await artifact._finished(downloaded)
|
||||
for r in result:
|
||||
if isinstance(r, BaseException):
|
||||
logger.warning("完成了二进制文件的下载,但是有未捕捉的错误")
|
||||
logger.exception(r)
|
||||
|
||||
tasks: set[asyncio.Task] = set()
|
||||
for a in artifact_list:
|
||||
@ -78,35 +101,43 @@ async def download_artifact(artifact: ArtifactDepends):
|
||||
async with aiohttp.ClientSession(proxy=proxy) as client:
|
||||
result = await client.get(artifact.url)
|
||||
if result.status != 200:
|
||||
logger.warning(f"已经下载了二进制,但是注意服务器没有返回 200! URL={artifact.url} TARGET={artifact.target} CODE={result.status}")
|
||||
logger.warning(
|
||||
f"已经下载了二进制,但是注意服务器没有返回 200! URL={artifact.url} TARGET={artifact.target} CODE={result.status}"
|
||||
)
|
||||
data = await result.read()
|
||||
artifact.target.write_bytes(data)
|
||||
if not platform.system().lower() == 'windows':
|
||||
if not platform.system().lower() == "windows":
|
||||
artifact.target.chmod(0o755)
|
||||
|
||||
logger.info(f"下载好了 TARGET={artifact.target} URL={artifact.url}")
|
||||
m = hashlib.sha256(artifact.target.read_bytes())
|
||||
if m.hexdigest().lower() != artifact.sha256.lower():
|
||||
logger.warning(f"下载到的二进制的 sha256 与需求不同 TARGET={artifact.target} REQUESTED={artifact.sha256} ACTUAL={m.hexdigest()}")
|
||||
logger.warning(
|
||||
f"下载到的二进制的 sha256 与需求不同 TARGET={artifact.target} REQUESTED={artifact.sha256} ACTUAL={m.hexdigest()}"
|
||||
)
|
||||
|
||||
|
||||
async def ensure_artifact(artifact: ArtifactDepends):
|
||||
async def ensure_artifact(artifact: ArtifactDepends) -> bool:
|
||||
if not artifact.is_corresponding_platform():
|
||||
return
|
||||
return False
|
||||
|
||||
if not artifact.target.exists():
|
||||
logger.info(f"二进制依赖 {artifact.target} 不存在")
|
||||
if not artifact.target.parent.exists():
|
||||
artifact.target.parent.mkdir(parents=True, exist_ok=True)
|
||||
await download_artifact(artifact)
|
||||
return True
|
||||
else:
|
||||
m = hashlib.sha256(artifact.target.read_bytes())
|
||||
if m.hexdigest().lower() != artifact.sha256.lower():
|
||||
logger.info(f"二进制依赖 {artifact.target} 的哈希无法对应需求的哈希,准备重新下载")
|
||||
logger.info(
|
||||
f"二进制依赖 {artifact.target} 的哈希无法对应需求的哈希,准备重新下载"
|
||||
)
|
||||
artifact.target.unlink()
|
||||
await download_artifact(artifact)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def register_artifacts(*artifacts: ArtifactDepends):
|
||||
artifact_list.extend(artifacts)
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@ from nonebot.adapters import Event
|
||||
from nonebot.params import Depends
|
||||
from nonebot.rule import Rule
|
||||
|
||||
from konabot.common.appcontext import after_init
|
||||
from konabot.common.database import DatabaseManager
|
||||
from konabot.common.pager import PagerQuery
|
||||
from konabot.common.path import DATA_PATH
|
||||
@ -13,6 +14,7 @@ from konabot.common.permsys.repo import PermRepo
|
||||
|
||||
|
||||
db = DatabaseManager(DATA_PATH / "perm.sqlite3")
|
||||
_default_allow_permissions: set[str] = set()
|
||||
|
||||
|
||||
_EntityLike = Event | PermEntity | list[PermEntity]
|
||||
@ -73,6 +75,7 @@ def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma:
|
||||
return PermManager(_db)
|
||||
|
||||
|
||||
@after_init
|
||||
def create_startup(): # pragma: no cover
|
||||
from konabot.common.nb.is_admin import cfg
|
||||
|
||||
@ -89,6 +92,10 @@ def create_startup(): # pragma: no cover
|
||||
await pm.update_permission(
|
||||
PermEntity("ob11", "user", str(account)), "*", True
|
||||
)
|
||||
for key in _default_allow_permissions:
|
||||
await pm.update_permission(
|
||||
PermEntity("sys", "global", "global"), key, True
|
||||
)
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
@ -101,6 +108,10 @@ def create_startup(): # pragma: no cover
|
||||
DepPermManager = Annotated[PermManager, Depends(perm_manager)]
|
||||
|
||||
|
||||
def register_default_allow_permission(key: str):
|
||||
_default_allow_permissions.add(key)
|
||||
|
||||
|
||||
def require_permission(perm: str) -> Rule: # pragma: no cover
|
||||
async def check_permission(event: Event, pm: DepPermManager) -> bool:
|
||||
return await pm.check_has_permission(event, perm)
|
||||
|
||||
11
konabot/common/subscribe/__init__.py
Normal file
11
konabot/common/subscribe/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
"""
|
||||
Subscribe 模块,用于向一些订阅的频道广播消息
|
||||
"""
|
||||
|
||||
from .service import broadcast as broadcast
|
||||
from .service import dep_poster_service as dep_poster_service
|
||||
from .service import DepPosterService as DepPosterService
|
||||
from .service import PosterService as PosterService
|
||||
from .subscribe_info import PosterInfo as PosterInfo
|
||||
from .subscribe_info import POSTER_INFO_DATA as POSTER_INFO_DATA
|
||||
from .subscribe_info import register_poster_info as register_poster_info
|
||||
@ -6,7 +6,8 @@ from pydantic import BaseModel, ValidationError
|
||||
from konabot.common.longtask import LongTaskTarget
|
||||
from konabot.common.pager import PagerQuery, PagerResult
|
||||
from konabot.common.path import DATA_PATH
|
||||
from konabot.plugins.poster.repository import IPosterRepo
|
||||
|
||||
from .repository import IPosterRepo
|
||||
|
||||
|
||||
class ChannelData(BaseModel):
|
||||
@ -18,9 +19,9 @@ class PosterData(BaseModel):
|
||||
|
||||
|
||||
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
|
||||
if (target1.is_private_chat and not target2.is_private_chat):
|
||||
if target1.is_private_chat and not target2.is_private_chat:
|
||||
return False
|
||||
if (target2.is_private_chat and not target1.is_private_chat):
|
||||
if target2.is_private_chat and not target1.is_private_chat:
|
||||
return False
|
||||
if target1.platform != target2.platform:
|
||||
return False
|
||||
@ -58,7 +59,9 @@ class LocalPosterRepo(IPosterRepo):
|
||||
len1 = len(self.data.channels[channel].targets)
|
||||
return len0 != len1
|
||||
|
||||
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
|
||||
async def get_subscribed_channels(
|
||||
self, target: LongTaskTarget, pager: PagerQuery
|
||||
) -> PagerResult[str]:
|
||||
channels: list[str] = []
|
||||
for channel_id, channel in self.data.channels.items():
|
||||
for t in channel.targets:
|
||||
@ -95,7 +98,9 @@ async def local_poster_data():
|
||||
data = PosterData()
|
||||
else:
|
||||
try:
|
||||
data = PosterData.model_validate_json(LOCAL_POSTER_DATA_PATH.read_text())
|
||||
data = PosterData.model_validate_json(
|
||||
LOCAL_POSTER_DATA_PATH.read_text()
|
||||
)
|
||||
except ValidationError:
|
||||
data = PosterData()
|
||||
yield data
|
||||
@ -109,4 +114,3 @@ async def local_poster():
|
||||
|
||||
|
||||
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]
|
||||
|
||||
@ -4,9 +4,10 @@ from nonebot.params import Depends
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from konabot.common.longtask import LongTaskTarget
|
||||
from konabot.common.pager import PagerQuery, PagerResult
|
||||
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
|
||||
from konabot.plugins.poster.repo_local_data import local_poster
|
||||
from konabot.plugins.poster.repository import IPosterRepo
|
||||
|
||||
from .subscribe_info import POSTER_INFO_DATA
|
||||
from .repo_local_data import local_poster
|
||||
from .repository import IPosterRepo
|
||||
|
||||
|
||||
class PosterService:
|
||||
@ -27,7 +28,9 @@ class PosterService:
|
||||
channel = self.parse_channel_id(channel)
|
||||
return await self.repo.remove_channel_target(channel, target)
|
||||
|
||||
async def broadcast(self, channel: str, message: UniMessage[Any] | str) -> list[LongTaskTarget]:
|
||||
async def broadcast(
|
||||
self, channel: str, message: UniMessage[Any] | str
|
||||
) -> list[LongTaskTarget]:
|
||||
channel = self.parse_channel_id(channel)
|
||||
targets = await self.repo.get_channel_targets(channel)
|
||||
for target in targets:
|
||||
@ -35,7 +38,9 @@ class PosterService:
|
||||
await target.send_message(message, at=False)
|
||||
return targets
|
||||
|
||||
async def get_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
|
||||
async def get_channels(
|
||||
self, target: LongTaskTarget, pager: PagerQuery
|
||||
) -> PagerResult[str]:
|
||||
return await self.repo.get_subscribed_channels(target, pager)
|
||||
|
||||
async def fix_data(self):
|
||||
@ -56,4 +61,3 @@ async def broadcast(channel: str, message: UniMessage[Any] | str):
|
||||
|
||||
|
||||
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]
|
||||
|
||||
@ -4,7 +4,7 @@ from dataclasses import dataclass, field
|
||||
@dataclass
|
||||
class PosterInfo:
|
||||
aliases: set[str] = field(default_factory=set)
|
||||
description: str = field(default='')
|
||||
description: str = field(default="")
|
||||
|
||||
|
||||
POSTER_INFO_DATA: dict[str, PosterInfo] = {}
|
||||
@ -12,4 +12,3 @@ POSTER_INFO_DATA: dict[str, PosterInfo] = {}
|
||||
|
||||
def register_poster_info(channel: str, info: PosterInfo):
|
||||
POSTER_INFO_DATA[channel] = info
|
||||
|
||||
@ -76,6 +76,8 @@ fx [滤镜名称] <参数1> <参数2> ...
|
||||
* ```fx 设置遮罩```
|
||||
* ```fx 色键 <目标颜色="rgb(255,0,0)"> <容差=60>```
|
||||
* ```fx 晃动 <最大偏移量=5> <运动模糊=False>```
|
||||
* ```fx JPEG损坏 <质量=10>```
|
||||
* 质量范围建议为 1~95,数值越低,压缩痕迹越重、效果越搞笑。
|
||||
* ```fx 动图 <帧率=10>```
|
||||
|
||||
### 多图像处理器
|
||||
|
||||
53
konabot/docs/user/roll.txt
Normal file
53
konabot/docs/user/roll.txt
Normal file
@ -0,0 +1,53 @@
|
||||
**roll** - 面向跑团的文本骰子指令
|
||||
|
||||
## 用法
|
||||
|
||||
`roll 表达式`
|
||||
|
||||
支持常见骰子写法:
|
||||
|
||||
- `roll 3d6`
|
||||
- `roll d20+5`
|
||||
- `roll 2d8+1d4+3`
|
||||
- `roll d%`
|
||||
- `roll 4dF`
|
||||
|
||||
## 说明
|
||||
|
||||
- `NdM` 表示掷 N 个 M 面骰,例如 `3d6`
|
||||
- `d20` 等价于 `1d20`
|
||||
- `d%` 表示百分骰,范围 1 到 100
|
||||
- `dF` 表示 Fate/Fudge 骰,单骰结果为 -1、0、+1
|
||||
- 支持用 `+`、`-` 连接多个项,也支持常数修正
|
||||
|
||||
## 返回格式
|
||||
|
||||
会返回总结果,以及每一项的明细。
|
||||
|
||||
例如:
|
||||
|
||||
- `roll 3d6`
|
||||
可能返回:
|
||||
- `3d6 = 11`
|
||||
- `+3d6=[2, 4, 5]`
|
||||
|
||||
- `roll d20+5`
|
||||
可能返回:
|
||||
- `d20+5 = 19`
|
||||
- `+1d20=[14] +5=5`
|
||||
|
||||
## 限制
|
||||
|
||||
为防止刷屏和滥用,当前实现会限制:
|
||||
|
||||
- 单项最多 100 个骰子
|
||||
- 单个骰子最多 1000 面
|
||||
- 一次表达式最多 20 项
|
||||
- 一次表达式最多实际掷 200 个骰子
|
||||
- 结果过长时会直接拒绝
|
||||
|
||||
## 权限
|
||||
|
||||
需要 `trpg.roll` 权限。
|
||||
|
||||
默认启动时会给系统全局授予允许,因此通常所有人都能用;如有需要可再用权限系统单独关闭。
|
||||
@ -31,7 +31,16 @@
|
||||
|
||||
- 用 `|` 连接多个操作,前一个的输出自动作为后一个的输入。
|
||||
- 用 `;` 分隔多条独立指令,它们各自产生输出,最终合并显示。
|
||||
- 用 `>` 或 `>>` 把结果保存起来(见下文),被重定向的指令不会产生输出。
|
||||
- 用 `&&` / `||` 做最小 shell 风格条件执行:
|
||||
- `cmd1 && cmd2`:仅当 `cmd1` 成功时执行 `cmd2`
|
||||
- `cmd1 || cmd2`:仅当 `cmd1` 失败时执行 `cmd2`
|
||||
- 用 `!` 对一条 pipeline 的成功/失败取反。
|
||||
- 支持最小 bash-like `if ... then ... else ... fi` 语句。
|
||||
- 支持最小 bash-like `while ... do ... done` 循环。
|
||||
- 可使用内建真假命令:`true` / `false`。
|
||||
- 为避免滥用与卡死:
|
||||
- 同一用户同时只能运行 **一个** textfx 脚本
|
||||
- 单个脚本最长执行时间为 **60 秒**
|
||||
|
||||
**例子**:把"HELLO"先反转,再转成摩斯电码:(转换为摩斯电码功能暂未实现)
|
||||
```
|
||||
@ -39,6 +48,36 @@ textfx reverse HELLO | morse en
|
||||
```
|
||||
→ 输出:`--- .-.. .-.. . ....`
|
||||
|
||||
**例子**:失败后兜底执行:
|
||||
```
|
||||
textfx test a = b || echo 不相等
|
||||
```
|
||||
→ 输出:`不相等`
|
||||
|
||||
**例子**:成功后继续执行:
|
||||
```
|
||||
textfx [ 2 -gt 1 ] && echo 条件成立
|
||||
```
|
||||
→ 输出:`条件成立`
|
||||
|
||||
**例子**:真正的 if 语句:
|
||||
```
|
||||
textfx if test a = b; then echo yes; else echo no; fi
|
||||
```
|
||||
→ 输出:`no`
|
||||
|
||||
**例子**:对条件取反:
|
||||
```
|
||||
textfx ! test a = b && echo 条件不成立
|
||||
```
|
||||
→ 输出:`条件不成立`
|
||||
|
||||
**例子**:while 循环:
|
||||
```
|
||||
textfx while false; do echo 不会执行; done
|
||||
```
|
||||
→ 输出为空
|
||||
|
||||
**例子**:多条指令各自输出:
|
||||
```
|
||||
textfx echo 你好; echo 世界
|
||||
@ -132,6 +171,51 @@ Base64 编码或解码。
|
||||
|
||||
> 缓存仅在当前对话中有效,重启后清空。
|
||||
|
||||
### true / false / test / [
|
||||
最小 shell 风格条件命令。通常配合 `if`、`&&`、`||`、`!` 使用。
|
||||
|
||||
支持:
|
||||
- `true`:总是成功
|
||||
- `false`:总是失败
|
||||
- 字符串非空:`test foo`
|
||||
- `-n` / `-z`:`test -n foo`、`test -z ""`
|
||||
- 字符串比较:`test a = a`、`test a != b`
|
||||
- 整数比较:`test 2 -gt 1`、`test 3 -le 5`
|
||||
- 方括号别名:`[ 2 -gt 1 ]`
|
||||
|
||||
示例:
|
||||
- `/textfx true && echo 一定执行`
|
||||
- `/textfx false || echo 兜底执行`
|
||||
- `/textfx test hello && echo 有内容`
|
||||
- `/textfx test a = b || echo 不相等`
|
||||
- `/textfx [ 3 -ge 2 ] && echo yes`
|
||||
|
||||
### if / then / else / fi
|
||||
支持最小 bash-like 条件语句。
|
||||
|
||||
示例:
|
||||
- `/textfx if test a = a; then echo yes; else echo no; fi`
|
||||
- `/textfx if [ 2 -gt 1 ]; then echo 成立; fi`
|
||||
- `/textfx if test a = a; then if test b = c; then echo x; else echo y; fi; fi`
|
||||
|
||||
说明:
|
||||
- `if` 后面跟一个条件链,可配合 `test`、`[`、`!`、`&&`、`||`
|
||||
- `then` 和 `else` 后面都可以写多条以 `;` 分隔的 textfx 语句
|
||||
- `else` 可省略
|
||||
|
||||
### while / do / done
|
||||
支持最小 bash-like 循环语句。
|
||||
|
||||
示例:
|
||||
- `/textfx while false; do echo 不会执行; done`
|
||||
- `/textfx while ! false; do false; done`
|
||||
- `/textfx while ! false; do if true; then false; fi; done`
|
||||
|
||||
说明:
|
||||
- `while` 后面跟一个条件链,返回成功就继续循环
|
||||
- `do` 后面可写多条以 `;` 分隔的 textfx 语句
|
||||
- 为避免 bot 死循环,内置最大循环次数限制;超限会报错
|
||||
|
||||
### replace(或 替换、sed)
|
||||
替换文字(支持正则表达式)。
|
||||
示例(普通):`/textfx replace 世界 宇宙 你好世界` → `你好宇宙`
|
||||
|
||||
@ -1,12 +1,14 @@
|
||||
import re
|
||||
|
||||
from nonebot import get_plugin_config, on_message
|
||||
from nonebot.rule import Rule
|
||||
from nonebot_plugin_alconna import Reference, Reply, UniMsg
|
||||
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.adapters.onebot.v11.event import GroupMessageEvent as OB11GroupEvent
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.common.permsys import require_permission
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
bilifetch_enabled_groups: list[int] = []
|
||||
@ -19,11 +21,7 @@ pattern = (
|
||||
)
|
||||
|
||||
|
||||
def _rule(msg: UniMsg, evt: Event) -> bool:
|
||||
if isinstance(evt, OB11GroupEvent):
|
||||
if evt.group_id not in config.bilifetch_enabled_groups:
|
||||
return False
|
||||
|
||||
def _rule(msg: UniMsg) -> bool:
|
||||
to_search = msg.exclude(Reply, Reference).dump(json=True)
|
||||
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
|
||||
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
|
||||
@ -31,11 +29,11 @@ def _rule(msg: UniMsg, evt: Event) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
matcher_fix = on_message(rule=_rule)
|
||||
matcher_fix = on_message(rule=Rule(_rule) & require_permission("bilifetch"))
|
||||
|
||||
|
||||
@matcher_fix.handle()
|
||||
async def _(event: Event):
|
||||
from nonebot_plugin_analysis_bilibili import handle_analysis
|
||||
|
||||
await handle_analysis(event)
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import random
|
||||
from io import BytesIO
|
||||
from PIL import Image, ImageFilter, ImageDraw, ImageStat, ImageFont
|
||||
from PIL import ImageEnhance
|
||||
from PIL import ImageChops
|
||||
@ -167,26 +168,53 @@ class ImageFilterImplement:
|
||||
|
||||
return Image.fromarray(result, 'RGBA')
|
||||
|
||||
# JPEG 损坏感压缩
|
||||
@staticmethod
|
||||
def apply_jpeg_damage(image: Image.Image, quality: int = 10) -> Image.Image:
|
||||
quality = max(1, min(95, int(quality)))
|
||||
|
||||
alpha = None
|
||||
if image.mode in ('RGBA', 'LA') or (image.mode == 'P' and 'transparency' in image.info):
|
||||
rgba_image = image.convert('RGBA')
|
||||
alpha = rgba_image.getchannel('A')
|
||||
rgb_image = Image.new('RGB', rgba_image.size, (255, 255, 255))
|
||||
rgb_image.paste(rgba_image, mask=alpha)
|
||||
else:
|
||||
rgb_image = image.convert('RGB')
|
||||
|
||||
output = BytesIO()
|
||||
rgb_image.save(output, format='JPEG', quality=quality, optimize=False)
|
||||
output.seek(0)
|
||||
damaged = Image.open(output).convert('RGB')
|
||||
|
||||
if alpha is not None:
|
||||
return Image.merge('RGBA', (*damaged.split(), alpha))
|
||||
return damaged.convert('RGBA')
|
||||
|
||||
# 缩放
|
||||
@staticmethod
|
||||
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y = None) -> Image.Image:
|
||||
# scale 可以为负
|
||||
# 如果 scale 为负,则代表翻转
|
||||
if scale_y is not None:
|
||||
if float(scale_y) < 0:
|
||||
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y: float = None) -> Image.Image:
|
||||
scale_x = float(scale)
|
||||
scale_y_value = float(scale_y) if scale_y is not None else None
|
||||
|
||||
if scale_y_value is not None:
|
||||
if scale_y_value < 0:
|
||||
image = ImageOps.flip(image)
|
||||
scale_y = abs(float(scale_y))
|
||||
if scale < 0:
|
||||
scale_y_value = abs(scale_y_value)
|
||||
if scale_x < 0:
|
||||
image = ImageOps.mirror(image)
|
||||
scale = abs(scale)
|
||||
new_size = (int(image.width * scale), int(image.height * float(scale_y)))
|
||||
return image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
if scale < 0:
|
||||
image = ImageOps.mirror(image)
|
||||
image = ImageOps.flip(image)
|
||||
scale = abs(scale)
|
||||
new_size = (int(image.width * scale), int(image.height * scale))
|
||||
return image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
scale_x = abs(scale_x)
|
||||
target_scale_y = scale_y_value
|
||||
else:
|
||||
if scale_x < 0:
|
||||
image = ImageOps.mirror(image)
|
||||
image = ImageOps.flip(image)
|
||||
scale_x = abs(scale_x)
|
||||
target_scale_y = scale_x
|
||||
|
||||
new_width = max(1, round(image.width * scale_x))
|
||||
new_height = max(1, round(image.height * target_scale_y))
|
||||
return image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||||
|
||||
# 波纹
|
||||
@staticmethod
|
||||
|
||||
@ -50,6 +50,7 @@ class ImageFilterManager:
|
||||
"描边": ImageFilterImplement.apply_stroke,
|
||||
"形状描边": ImageFilterImplement.apply_shape_stroke,
|
||||
"半调": ImageFilterImplement.apply_halftone,
|
||||
"JPEG损坏": ImageFilterImplement.apply_jpeg_damage,
|
||||
"设置通道": ImageFilterImplement.apply_set_channel,
|
||||
"设置遮罩": ImageFilterImplement.apply_set_mask,
|
||||
# 图像处理
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
from typing import cast
|
||||
import asyncio
|
||||
from loguru import logger
|
||||
from nonebot import on_command
|
||||
import nonebot
|
||||
@ -31,8 +32,11 @@ from konabot.plugins.handle_text.handlers.random_handlers import THShuffle, THSo
|
||||
from konabot.plugins.handle_text.handlers.unix_handlers import (
|
||||
THCat,
|
||||
THEcho,
|
||||
THFalse,
|
||||
THReplace,
|
||||
THRm,
|
||||
THTest,
|
||||
THTrue,
|
||||
)
|
||||
from konabot.plugins.handle_text.handlers.whitespace_handlers import (
|
||||
THLines,
|
||||
@ -43,11 +47,37 @@ from konabot.plugins.handle_text.handlers.whitespace_handlers import (
|
||||
)
|
||||
|
||||
|
||||
TEXTFX_MAX_RUNTIME_SECONDS = 60
|
||||
_textfx_running_users: set[str] = set()
|
||||
|
||||
|
||||
def _get_textfx_user_key(evt: Event) -> str:
|
||||
user_id = getattr(evt, "user_id", None)
|
||||
self_id = getattr(evt, "self_id", None)
|
||||
group_id = getattr(evt, "group_id", None)
|
||||
if user_id is not None:
|
||||
if group_id is not None:
|
||||
return f"{self_id}:{group_id}:{user_id}"
|
||||
return f"{self_id}:private:{user_id}"
|
||||
session_id = getattr(evt, "get_session_id", None)
|
||||
if callable(session_id):
|
||||
try:
|
||||
return f"session:{evt.get_session_id()}"
|
||||
except Exception:
|
||||
pass
|
||||
return f"event:{evt.__class__.__name__}:{id(evt)}"
|
||||
|
||||
|
||||
cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"})
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
||||
user_key = _get_textfx_user_key(evt)
|
||||
if user_key in _textfx_running_users:
|
||||
await target.send_message("你当前已有一个 textfx 脚本正在运行,请等待它结束后再试。")
|
||||
return
|
||||
|
||||
istream = ""
|
||||
if isinstance(evt, OB11MessageEvent):
|
||||
if evt.reply is not None:
|
||||
@ -70,10 +100,23 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
||||
await target.send_message(res)
|
||||
return
|
||||
|
||||
env = TextHandlerEnvironment(is_trusted=False)
|
||||
results = await runner.run_pipeline(res, istream or None, env)
|
||||
env = TextHandlerEnvironment(is_trusted=False, event=evt)
|
||||
|
||||
_textfx_running_users.add(user_key)
|
||||
try:
|
||||
results = await asyncio.wait_for(
|
||||
runner.run_pipeline(res, istream or None, env),
|
||||
timeout=TEXTFX_MAX_RUNTIME_SECONDS,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
rendered = await render_error_message(
|
||||
f"处理指令时出现问题:脚本执行超时(超过 {TEXTFX_MAX_RUNTIME_SECONDS} 秒)"
|
||||
)
|
||||
await target.send_message(rendered)
|
||||
return
|
||||
finally:
|
||||
_textfx_running_users.discard(user_key)
|
||||
|
||||
# 检查是否有错误
|
||||
for r in results:
|
||||
if r.code != 0:
|
||||
message = f"处理指令时出现问题:{r.ostream}"
|
||||
@ -81,7 +124,6 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
||||
await target.send_message(rendered)
|
||||
return
|
||||
|
||||
# 收集所有组的文本输出和附件
|
||||
ostreams = [r.ostream for r in results if r.ostream is not None]
|
||||
attachments = [r.attachment for r in results if r.attachment is not None]
|
||||
|
||||
@ -108,6 +150,9 @@ async def _():
|
||||
THCat(),
|
||||
THEcho(),
|
||||
THRm(),
|
||||
THTrue(),
|
||||
THFalse(),
|
||||
THTest(),
|
||||
THShuffle(),
|
||||
THReplace(),
|
||||
THBase64(),
|
||||
|
||||
@ -1,17 +1,20 @@
|
||||
import asyncio
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from string import whitespace
|
||||
from typing import cast
|
||||
|
||||
from loguru import logger
|
||||
from nonebot.adapters import Event
|
||||
|
||||
|
||||
MAX_WHILE_ITERATIONS = 100
|
||||
|
||||
|
||||
@dataclass
|
||||
class TextHandlerEnvironment:
|
||||
is_trusted: bool
|
||||
event: Event | None = None
|
||||
buffers: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
@ -51,29 +54,63 @@ class TextHandlerSync(TextHandler):
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineCommand:
|
||||
handler: TextHandler
|
||||
args: list[str]
|
||||
# 新增:重定向目标(buffer key)
|
||||
redirect_target: str | None = None
|
||||
# 新增:是否为追加模式 (>>)
|
||||
redirect_append: bool = False
|
||||
class Redirect:
|
||||
target: str
|
||||
append: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class Pipeline:
|
||||
command_groups: list[list[PipelineCommand]] = field(default_factory=list)
|
||||
"一个列表的列表,每一组之间的指令之间使用管道符连接,而不同组之间不会有数据流"
|
||||
class CommandNode:
|
||||
name: str
|
||||
handler: TextHandler
|
||||
args: list[str]
|
||||
redirects: list[Redirect] = field(default_factory=list)
|
||||
|
||||
|
||||
class PipelineParseStatus(Enum):
|
||||
normal = 0
|
||||
in_string = 1
|
||||
in_string_to_escape = 2
|
||||
off_string = 3
|
||||
@dataclass
|
||||
class PipelineNode:
|
||||
commands: list[CommandNode] = field(default_factory=list)
|
||||
negate: bool = False
|
||||
|
||||
|
||||
whitespaces = whitespace + " "
|
||||
@dataclass
|
||||
class ConditionalPipeline:
|
||||
op: str | None
|
||||
pipeline: PipelineNode
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandGroup:
|
||||
chains: list[ConditionalPipeline] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IfNode:
|
||||
condition: CommandGroup
|
||||
then_body: "Script"
|
||||
else_body: "Script | None" = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class WhileNode:
|
||||
condition: CommandGroup
|
||||
body: "Script"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Script:
|
||||
statements: list[CommandGroup | IfNode | WhileNode] = field(default_factory=list)
|
||||
|
||||
|
||||
class TokenKind(Enum):
|
||||
WORD = "word"
|
||||
OP = "op"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Token:
|
||||
kind: TokenKind
|
||||
value: str
|
||||
|
||||
|
||||
class PipelineRunner:
|
||||
@ -91,254 +128,456 @@ class PipelineRunner:
|
||||
def register(self, handler: TextHandler):
|
||||
self.handlers.append(handler)
|
||||
|
||||
def parse_pipeline(self, script: str) -> Pipeline | str:
|
||||
pipeline = Pipeline()
|
||||
|
||||
# 当前正在构建的上下文
|
||||
current_group: list[PipelineCommand] = []
|
||||
current_command_args: list[str] = []
|
||||
|
||||
# 字符串解析状态
|
||||
status = PipelineParseStatus.normal
|
||||
current_string = ""
|
||||
current_string_raw = ""
|
||||
status_in_string_pair = ""
|
||||
has_token = False # 是否正在构建一个 token(区分空字符串和无 token)
|
||||
|
||||
# 重定向解析状态
|
||||
is_parsing_redirect_filename = False
|
||||
current_redirect_target: str | None = None
|
||||
current_redirect_append = False
|
||||
|
||||
# 辅助函数:将当前解析到的字符串 flush 到 参数列表 或 重定向目标
|
||||
def _flush_token():
|
||||
nonlocal \
|
||||
current_string, \
|
||||
current_string_raw, \
|
||||
is_parsing_redirect_filename, \
|
||||
current_redirect_target, \
|
||||
has_token
|
||||
if not has_token:
|
||||
return
|
||||
|
||||
if is_parsing_redirect_filename:
|
||||
current_redirect_target = current_string
|
||||
is_parsing_redirect_filename = False # 重定向文件名只取一个 token
|
||||
else:
|
||||
current_command_args.append(current_string)
|
||||
|
||||
current_string = ""
|
||||
current_string_raw = ""
|
||||
has_token = False
|
||||
|
||||
# 辅助函数:将当前指令 flush 到当前组
|
||||
def _flush_command() -> str | None:
|
||||
nonlocal \
|
||||
current_command_args, \
|
||||
current_redirect_target, \
|
||||
current_redirect_append
|
||||
if not current_command_args:
|
||||
return None
|
||||
|
||||
cmd_name = current_command_args[0]
|
||||
args = current_command_args[1:]
|
||||
|
||||
matched = [
|
||||
h for h in self.handlers if cmd_name in h.keywords or cmd_name == h.name
|
||||
]
|
||||
if not matched:
|
||||
return f"不存在名为 {cmd_name} 的函数"
|
||||
if len(matched) > 1:
|
||||
logger.warning(
|
||||
f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}"
|
||||
)
|
||||
|
||||
cmd = PipelineCommand(
|
||||
handler=matched[0],
|
||||
args=args,
|
||||
redirect_target=current_redirect_target,
|
||||
redirect_append=current_redirect_append,
|
||||
def _resolve_handler(self, cmd_name: str) -> TextHandler | str:
|
||||
matched = [
|
||||
h for h in self.handlers if cmd_name == h.name or cmd_name in h.keywords
|
||||
]
|
||||
if not matched:
|
||||
return f"不存在名为 {cmd_name} 的函数"
|
||||
if len(matched) > 1:
|
||||
logger.warning(
|
||||
f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}"
|
||||
)
|
||||
current_group.append(cmd)
|
||||
return matched[0]
|
||||
|
||||
# 重置指令级状态
|
||||
current_command_args = []
|
||||
current_redirect_target = None
|
||||
current_redirect_append = False
|
||||
return None
|
||||
|
||||
# 使用索引遍历以支持 look-ahead (处理 >>)
|
||||
def tokenize(self, script: str) -> list[Token] | str:
|
||||
tokens: list[Token] = []
|
||||
buf = ""
|
||||
quote: str | None = None
|
||||
escape = False
|
||||
i = 0
|
||||
length = len(script)
|
||||
operators = {"|", ";", ">", "&&", "||", ">>", "!"}
|
||||
escape_map = {
|
||||
"n": "\n",
|
||||
"r": "\r",
|
||||
"t": "\t",
|
||||
"0": "\0",
|
||||
"a": "\a",
|
||||
"b": "\b",
|
||||
"f": "\f",
|
||||
"v": "\v",
|
||||
"\\": "\\",
|
||||
'"': '"',
|
||||
"'": "'",
|
||||
}
|
||||
|
||||
while i < length:
|
||||
def flush_word(force: bool = False):
|
||||
nonlocal buf
|
||||
if buf or force:
|
||||
tokens.append(Token(TokenKind.WORD, buf))
|
||||
buf = ""
|
||||
|
||||
while i < len(script):
|
||||
c = script[i]
|
||||
|
||||
match status:
|
||||
case PipelineParseStatus.normal:
|
||||
if c in whitespaces:
|
||||
_flush_token()
|
||||
if quote is not None:
|
||||
if escape:
|
||||
buf += escape_map.get(c, c)
|
||||
escape = False
|
||||
elif c == "\\":
|
||||
escape = True
|
||||
elif c == quote:
|
||||
quote = None
|
||||
flush_word(force=True) # 引号闭合时强制 flush,即使是空字符串
|
||||
else:
|
||||
buf += c
|
||||
i += 1
|
||||
continue
|
||||
|
||||
elif c in "'\"":
|
||||
status_in_string_pair = c
|
||||
status = PipelineParseStatus.in_string
|
||||
current_string_raw = ""
|
||||
has_token = True
|
||||
if c in "'\"":
|
||||
quote = c
|
||||
i += 1
|
||||
continue
|
||||
|
||||
elif c == "|":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
# 管道符不结束 group,继续在 current_group 添加
|
||||
if c.isspace():
|
||||
flush_word()
|
||||
i += 1
|
||||
continue
|
||||
|
||||
elif c == ";":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
# 分号结束 group
|
||||
if current_group:
|
||||
pipeline.command_groups.append(current_group)
|
||||
current_group = []
|
||||
two = script[i : i + 2]
|
||||
if two in operators:
|
||||
flush_word()
|
||||
tokens.append(Token(TokenKind.OP, two))
|
||||
i += 2
|
||||
continue
|
||||
|
||||
elif c == ">":
|
||||
_flush_token() # 先结束之前的参数
|
||||
# 检查是否是 append 模式 (>>)
|
||||
if i + 1 < length and script[i + 1] == ">":
|
||||
current_redirect_append = True
|
||||
i += 1 # 跳过下一个 >
|
||||
else:
|
||||
current_redirect_append = False
|
||||
if c in {"|", ";", ">", "!"}:
|
||||
flush_word()
|
||||
tokens.append(Token(TokenKind.OP, c))
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# 标记下一个 token 为文件名
|
||||
is_parsing_redirect_filename = True
|
||||
|
||||
else:
|
||||
current_string += c
|
||||
has_token = True
|
||||
|
||||
case PipelineParseStatus.in_string:
|
||||
current_string_raw += c
|
||||
if c == status_in_string_pair:
|
||||
status = PipelineParseStatus.off_string
|
||||
elif c == "\\":
|
||||
status = PipelineParseStatus.in_string_to_escape
|
||||
else:
|
||||
current_string += c
|
||||
|
||||
case PipelineParseStatus.in_string_to_escape:
|
||||
escape_map = {
|
||||
"n": "\n",
|
||||
"r": "\r",
|
||||
"t": "\t",
|
||||
"0": "\0",
|
||||
"a": "\a",
|
||||
"b": "\b",
|
||||
"f": "\f",
|
||||
"v": "\v",
|
||||
"\\": "\\",
|
||||
}
|
||||
current_string += escape_map.get(c, c)
|
||||
status = PipelineParseStatus.in_string
|
||||
|
||||
case PipelineParseStatus.off_string:
|
||||
if c in whitespaces:
|
||||
_flush_token()
|
||||
status = PipelineParseStatus.normal
|
||||
elif c == "|":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
status = PipelineParseStatus.normal
|
||||
elif c == ";":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
if current_group:
|
||||
pipeline.command_groups.append(current_group)
|
||||
current_group = []
|
||||
status = PipelineParseStatus.normal
|
||||
elif c == ">":
|
||||
_flush_token()
|
||||
status = PipelineParseStatus.normal
|
||||
# 回退索引,让下一次循环进入 normal 状态的 > 处理逻辑
|
||||
i -= 1
|
||||
else:
|
||||
# 紧接着的字符继续作为当前字符串的一部分 (如 "abc"d)
|
||||
current_string += c
|
||||
current_string_raw = ""
|
||||
status = PipelineParseStatus.normal
|
||||
if c == "\\":
|
||||
if i + 1 < len(script):
|
||||
i += 1
|
||||
buf += escape_map.get(script[i], script[i])
|
||||
else:
|
||||
buf += c
|
||||
i += 1
|
||||
continue
|
||||
|
||||
buf += c
|
||||
i += 1
|
||||
|
||||
# 循环结束后的收尾
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
if quote is not None:
|
||||
return "存在未闭合的引号"
|
||||
if escape:
|
||||
buf += "\\"
|
||||
|
||||
if current_group:
|
||||
pipeline.command_groups.append(current_group)
|
||||
flush_word()
|
||||
return tokens
|
||||
|
||||
return pipeline
|
||||
def parse_pipeline(self, script: str) -> Script | str:
|
||||
tokens = self.tokenize(script)
|
||||
if isinstance(tokens, str):
|
||||
return tokens
|
||||
if not tokens:
|
||||
return Script()
|
||||
|
||||
pos = 0
|
||||
|
||||
def peek(offset: int = 0) -> Token | None:
|
||||
idx = pos + offset
|
||||
return tokens[idx] if idx < len(tokens) else None
|
||||
|
||||
def consume() -> Token:
|
||||
nonlocal pos
|
||||
tok = tokens[pos]
|
||||
pos += 1
|
||||
return tok
|
||||
|
||||
def consume_if_op(value: str) -> bool:
|
||||
tok = peek()
|
||||
if tok is not None and tok.kind == TokenKind.OP and tok.value == value:
|
||||
consume()
|
||||
return True
|
||||
return False
|
||||
|
||||
def consume_if_word(value: str) -> bool:
|
||||
tok = peek()
|
||||
if tok is not None and tok.kind == TokenKind.WORD and tok.value == value:
|
||||
consume()
|
||||
return True
|
||||
return False
|
||||
|
||||
def expect_word(msg: str) -> Token | str:
|
||||
tok = peek()
|
||||
if tok is None or tok.kind != TokenKind.WORD:
|
||||
return msg
|
||||
return consume()
|
||||
|
||||
def parse_command() -> CommandNode | str:
|
||||
first = expect_word("缺少指令名")
|
||||
if isinstance(first, str):
|
||||
return first
|
||||
|
||||
handler = self._resolve_handler(first.value)
|
||||
if isinstance(handler, str):
|
||||
return handler
|
||||
|
||||
args: list[str] = []
|
||||
redirects: list[Redirect] = []
|
||||
|
||||
while True:
|
||||
tok = peek()
|
||||
if tok is None:
|
||||
break
|
||||
if tok.kind == TokenKind.OP and tok.value in {"|", ";", "&&", "||"}:
|
||||
break
|
||||
if tok.kind == TokenKind.OP and tok.value in {">", ">>"}:
|
||||
op_tok = consume()
|
||||
target = expect_word("重定向操作符后面需要缓存名")
|
||||
if isinstance(target, str):
|
||||
return target
|
||||
redirects.append(
|
||||
Redirect(target=target.value, append=op_tok.value == ">>")
|
||||
)
|
||||
continue
|
||||
if tok.kind != TokenKind.WORD:
|
||||
return f"无法解析的 token: {tok.value}"
|
||||
args.append(consume().value)
|
||||
|
||||
return CommandNode(
|
||||
name=first.value,
|
||||
handler=handler,
|
||||
args=args,
|
||||
redirects=redirects,
|
||||
)
|
||||
|
||||
def parse_pipe() -> PipelineNode | str:
|
||||
negate = False
|
||||
while consume_if_op("!"):
|
||||
negate = not negate
|
||||
|
||||
pipeline = PipelineNode(negate=negate)
|
||||
command = parse_command()
|
||||
if isinstance(command, str):
|
||||
return command
|
||||
pipeline.commands.append(command)
|
||||
|
||||
while True:
|
||||
tok = peek()
|
||||
if tok is None or tok.kind != TokenKind.OP or tok.value != "|":
|
||||
break
|
||||
consume()
|
||||
next_command = parse_command()
|
||||
if isinstance(next_command, str):
|
||||
return next_command
|
||||
pipeline.commands.append(next_command)
|
||||
|
||||
return pipeline
|
||||
|
||||
def parse_chain() -> CommandGroup | str:
|
||||
group = CommandGroup()
|
||||
first_pipeline = parse_pipe()
|
||||
if isinstance(first_pipeline, str):
|
||||
return first_pipeline
|
||||
group.chains.append(ConditionalPipeline(op=None, pipeline=first_pipeline))
|
||||
|
||||
while True:
|
||||
tok = peek()
|
||||
if tok is None or tok.kind != TokenKind.OP or tok.value not in {"&&", "||"}:
|
||||
break
|
||||
op = consume().value
|
||||
next_pipeline = parse_pipe()
|
||||
if isinstance(next_pipeline, str):
|
||||
return next_pipeline
|
||||
group.chains.append(ConditionalPipeline(op=op, pipeline=next_pipeline))
|
||||
|
||||
return group
|
||||
|
||||
def parse_if() -> IfNode | str:
|
||||
if not consume_if_word("if"):
|
||||
return "缺少 if"
|
||||
|
||||
condition = parse_chain()
|
||||
if isinstance(condition, str):
|
||||
return condition
|
||||
|
||||
consume_if_op(";")
|
||||
if not consume_if_word("then"):
|
||||
return "if 语句缺少 then"
|
||||
|
||||
then_body = parse_script(stop_words={"else", "fi"})
|
||||
if isinstance(then_body, str):
|
||||
return then_body
|
||||
|
||||
else_body: Script | None = None
|
||||
if consume_if_word("else"):
|
||||
else_body = parse_script(stop_words={"fi"})
|
||||
if isinstance(else_body, str):
|
||||
return else_body
|
||||
|
||||
if not consume_if_word("fi"):
|
||||
return "if 语句缺少 fi"
|
||||
|
||||
return IfNode(condition=condition, then_body=then_body, else_body=else_body)
|
||||
|
||||
def parse_while() -> WhileNode | str:
|
||||
if not consume_if_word("while"):
|
||||
return "缺少 while"
|
||||
|
||||
condition = parse_chain()
|
||||
if isinstance(condition, str):
|
||||
return condition
|
||||
|
||||
consume_if_op(";")
|
||||
if not consume_if_word("do"):
|
||||
return "while 语句缺少 do"
|
||||
|
||||
body = parse_script(stop_words={"done"})
|
||||
if isinstance(body, str):
|
||||
return body
|
||||
|
||||
if not consume_if_word("done"):
|
||||
return "while 语句缺少 done"
|
||||
|
||||
return WhileNode(condition=condition, body=body)
|
||||
|
||||
def parse_statement() -> CommandGroup | IfNode | WhileNode | str:
|
||||
tok = peek()
|
||||
if tok is not None and tok.kind == TokenKind.WORD:
|
||||
if tok.value == "if":
|
||||
return parse_if()
|
||||
if tok.value == "while":
|
||||
return parse_while()
|
||||
return parse_chain()
|
||||
|
||||
def parse_script(stop_words: set[str] | None = None) -> Script | str:
|
||||
parsed = Script()
|
||||
nonlocal pos
|
||||
|
||||
while pos < len(tokens):
|
||||
tok = peek()
|
||||
if tok is None:
|
||||
break
|
||||
|
||||
if stop_words and tok.kind == TokenKind.WORD and tok.value in stop_words:
|
||||
break
|
||||
|
||||
if tok.kind == TokenKind.OP and tok.value == ";":
|
||||
consume()
|
||||
continue
|
||||
|
||||
statement = parse_statement()
|
||||
if isinstance(statement, str):
|
||||
return statement
|
||||
parsed.statements.append(statement)
|
||||
|
||||
tok = peek()
|
||||
if tok is not None and tok.kind == TokenKind.OP and tok.value == ";":
|
||||
consume()
|
||||
|
||||
return parsed
|
||||
|
||||
parsed = parse_script()
|
||||
if isinstance(parsed, str):
|
||||
return parsed
|
||||
if pos != len(tokens):
|
||||
tok = tokens[pos]
|
||||
return f"无法解析的 token: {tok.value}"
|
||||
return parsed
|
||||
|
||||
async def _execute_command(
|
||||
self,
|
||||
command: CommandNode,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
logger.debug(
|
||||
f"Executing: {command.name} args={command.args} redirects={command.redirects}"
|
||||
)
|
||||
result = await command.handler.handle(env, istream, command.args)
|
||||
|
||||
if result.code != 0:
|
||||
return result
|
||||
|
||||
if command.redirects:
|
||||
content = result.ostream or ""
|
||||
for redirect in command.redirects:
|
||||
if redirect.append:
|
||||
old_content = env.buffers.get(redirect.target, "")
|
||||
env.buffers[redirect.target] = old_content + content
|
||||
else:
|
||||
env.buffers[redirect.target] = content
|
||||
return TextHandleResult(code=0, ostream=None, attachment=result.attachment)
|
||||
|
||||
return result
|
||||
|
||||
async def _execute_pipeline(
|
||||
self,
|
||||
pipeline: PipelineNode,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
current_stream = istream
|
||||
last_result = TextHandleResult(code=0, ostream=None)
|
||||
|
||||
for command in pipeline.commands:
|
||||
try:
|
||||
last_result = await self._execute_command(command, current_stream, env)
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline execution failed at {command.name}")
|
||||
logger.exception(e)
|
||||
return TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误")
|
||||
|
||||
if last_result.code != 0:
|
||||
if pipeline.negate:
|
||||
return TextHandleResult(code=0, ostream=None)
|
||||
return last_result
|
||||
current_stream = last_result.ostream
|
||||
|
||||
if pipeline.negate:
|
||||
return TextHandleResult(code=1, ostream=None)
|
||||
return last_result
|
||||
|
||||
async def _execute_group(
|
||||
self,
|
||||
group: CommandGroup,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
last_result = TextHandleResult(code=0, ostream=None)
|
||||
|
||||
for chain in group.chains:
|
||||
should_run = True
|
||||
if chain.op == "&&":
|
||||
should_run = last_result.code == 0
|
||||
elif chain.op == "||":
|
||||
should_run = last_result.code != 0
|
||||
|
||||
if should_run:
|
||||
last_result = await self._execute_pipeline(chain.pipeline, istream, env)
|
||||
|
||||
return last_result
|
||||
|
||||
async def _execute_if(
|
||||
self,
|
||||
if_node: IfNode,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
condition_result = await self._execute_group(if_node.condition, istream, env)
|
||||
if condition_result.code == 0:
|
||||
results = await self.run_pipeline(if_node.then_body, istream, env)
|
||||
else:
|
||||
results = (
|
||||
await self.run_pipeline(if_node.else_body, istream, env)
|
||||
if if_node.else_body is not None
|
||||
else [TextHandleResult(code=0, ostream=None)]
|
||||
)
|
||||
return results[-1] if results else TextHandleResult(code=0, ostream=None)
|
||||
|
||||
async def _execute_while(
|
||||
self,
|
||||
while_node: WhileNode,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
last_result = TextHandleResult(code=0, ostream=None)
|
||||
|
||||
for _ in range(MAX_WHILE_ITERATIONS):
|
||||
condition_result = await self._execute_group(while_node.condition, istream, env)
|
||||
if condition_result.code != 0:
|
||||
return last_result
|
||||
|
||||
body_results = await self.run_pipeline(while_node.body, istream, env)
|
||||
if body_results:
|
||||
last_result = body_results[-1]
|
||||
if last_result.code != 0:
|
||||
return last_result
|
||||
|
||||
return TextHandleResult(
|
||||
code=2,
|
||||
ostream=f"while 循环超过最大迭代次数限制({MAX_WHILE_ITERATIONS})",
|
||||
)
|
||||
|
||||
async def run_pipeline(
|
||||
self,
|
||||
pipeline: Pipeline,
|
||||
pipeline: Script,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment | None = None,
|
||||
) -> list[TextHandleResult]:
|
||||
if env is None:
|
||||
env = TextHandlerEnvironment(is_trusted=False, buffers={})
|
||||
env = TextHandlerEnvironment(is_trusted=False, event=None, buffers={})
|
||||
|
||||
results: list[TextHandleResult] = []
|
||||
|
||||
# 遍历执行指令组 (分号分隔),每个组独立产生输出
|
||||
for group in pipeline.command_groups:
|
||||
current_stream = istream
|
||||
group_result = TextHandleResult(code=0, ostream=None)
|
||||
|
||||
# 遍历组内指令 (管道分隔)
|
||||
for cmd in group:
|
||||
try:
|
||||
logger.debug(
|
||||
f"Executing: {cmd.handler.name} args={cmd.args} redirect={cmd.redirect_target}"
|
||||
)
|
||||
result = await cmd.handler.handle(env, current_stream, cmd.args)
|
||||
|
||||
if result.code != 0:
|
||||
# 组内出错,整条流水线中止
|
||||
results.append(result)
|
||||
return results
|
||||
|
||||
# 处理重定向逻辑
|
||||
if cmd.redirect_target:
|
||||
content_to_write = result.ostream or ""
|
||||
target_buffer = cmd.redirect_target
|
||||
|
||||
if cmd.redirect_append:
|
||||
old_content = env.buffers.get(target_buffer, "")
|
||||
env.buffers[target_buffer] = old_content + content_to_write
|
||||
else:
|
||||
env.buffers[target_buffer] = content_to_write
|
||||
|
||||
current_stream = None
|
||||
group_result = TextHandleResult(
|
||||
code=0, ostream=None, attachment=result.attachment
|
||||
)
|
||||
else:
|
||||
current_stream = result.ostream
|
||||
group_result = result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline execution failed at {cmd.handler.name}")
|
||||
logger.exception(e)
|
||||
results.append(
|
||||
TextHandleResult(
|
||||
code=-1, ostream="处理流水线时出现 python 错误"
|
||||
)
|
||||
)
|
||||
return results
|
||||
|
||||
results.append(group_result)
|
||||
for statement in pipeline.statements:
|
||||
try:
|
||||
if isinstance(statement, IfNode):
|
||||
results.append(await self._execute_if(statement, istream, env))
|
||||
elif isinstance(statement, WhileNode):
|
||||
results.append(await self._execute_while(statement, istream, env))
|
||||
else:
|
||||
results.append(await self._execute_group(statement, istream, env))
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline execution failed: {e}")
|
||||
logger.exception(e)
|
||||
results.append(
|
||||
TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误")
|
||||
)
|
||||
return results
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@ -1,36 +1,53 @@
|
||||
from typing import Any, cast
|
||||
from konabot.common.llm import get_llm
|
||||
from konabot.plugins.handle_text.base import TextHandler, TextHandlerEnvironment, TextHandleResult
|
||||
from konabot.common.permsys import perm_manager
|
||||
from konabot.plugins.handle_text.base import (
|
||||
TextHandler,
|
||||
TextHandlerEnvironment,
|
||||
TextHandleResult,
|
||||
)
|
||||
|
||||
|
||||
class THQwen(TextHandler):
|
||||
name = "qwen"
|
||||
|
||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
pm = perm_manager()
|
||||
if env.event is None or not await pm.check_has_permission(
|
||||
env.event, "textfx.qwen"
|
||||
):
|
||||
return TextHandleResult(
|
||||
code=1,
|
||||
ostream="你或当前环境没有使用 qwen 的权限。如有疑问请联系管理员",
|
||||
)
|
||||
|
||||
llm = get_llm()
|
||||
messages = []
|
||||
|
||||
if istream is not None:
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": istream
|
||||
})
|
||||
messages.append({"role": "user", "content": istream})
|
||||
if len(args) > 0:
|
||||
message = ' '.join(args)
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": message,
|
||||
})
|
||||
message = " ".join(args)
|
||||
messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": message,
|
||||
}
|
||||
)
|
||||
if len(messages) == 0:
|
||||
return TextHandleResult(
|
||||
code=1,
|
||||
ostream="使用方法:qwen <提示词>",
|
||||
)
|
||||
|
||||
messages = [{
|
||||
"role": "system",
|
||||
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答"
|
||||
}] + messages
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答",
|
||||
}
|
||||
] + messages
|
||||
result = await llm.chat(cast(Any, messages))
|
||||
content = result.content
|
||||
if content is None:
|
||||
|
||||
@ -13,10 +13,8 @@ class THEcho(TextHandler):
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
if len(args) == 0 and istream is None:
|
||||
return TextHandleResult(1, "请在 echo 后面添加需要输出的文本")
|
||||
if istream is not None:
|
||||
return TextHandleResult(0, "\n".join([istream] + args))
|
||||
# echo 不读 stdin,只输出参数(Unix 语义)
|
||||
# 无参数时输出空行(与 Unix echo 行为一致)
|
||||
return TextHandleResult(0, "\n".join(args))
|
||||
|
||||
|
||||
@ -26,7 +24,6 @@ class THCat(TextHandler):
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
# No args: pass through stdin (like Unix cat with no arguments)
|
||||
if len(args) == 0:
|
||||
if istream is None:
|
||||
return TextHandleResult(
|
||||
@ -35,7 +32,6 @@ class THCat(TextHandler):
|
||||
)
|
||||
return TextHandleResult(0, istream)
|
||||
|
||||
# Concatenate all specified sources in order
|
||||
parts: list[str] = []
|
||||
for arg in args:
|
||||
if arg == "-":
|
||||
@ -74,7 +70,6 @@ class THReplace(TextHandler):
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
# 用法: replace <pattern> <replacement> [text]
|
||||
if len(args) < 2:
|
||||
return TextHandleResult(1, "用法:replace <正则> <替换内容> [文本]")
|
||||
|
||||
@ -90,3 +85,77 @@ class THReplace(TextHandler):
|
||||
return TextHandleResult(0, res)
|
||||
except Exception as e:
|
||||
return TextHandleResult(1, f"正则错误: {str(e)}")
|
||||
|
||||
|
||||
class THTrue(TextHandler):
|
||||
name = "true"
|
||||
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
return TextHandleResult(0, istream)
|
||||
|
||||
|
||||
class THFalse(TextHandler):
|
||||
name = "false"
|
||||
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
return TextHandleResult(1, None)
|
||||
|
||||
|
||||
class THTest(TextHandler):
|
||||
name = "test"
|
||||
keywords = ["["]
|
||||
|
||||
def _bool_result(self, value: bool) -> TextHandleResult:
|
||||
return TextHandleResult(0 if value else 1, None)
|
||||
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
expr = list(args)
|
||||
|
||||
# 支持方括号语法:[ expr ] 会自动移除末尾的 ]
|
||||
if expr and expr[-1] == "]":
|
||||
expr = expr[:-1]
|
||||
|
||||
if not expr:
|
||||
return TextHandleResult(1, None)
|
||||
|
||||
if len(expr) == 1:
|
||||
return self._bool_result(len(expr[0]) > 0)
|
||||
|
||||
if len(expr) == 2:
|
||||
op, value = expr
|
||||
if op == "-n":
|
||||
return self._bool_result(len(value) > 0)
|
||||
if op == "-z":
|
||||
return self._bool_result(len(value) == 0)
|
||||
return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}")
|
||||
|
||||
if len(expr) == 3:
|
||||
left, op, right = expr
|
||||
if op == "=":
|
||||
return self._bool_result(left == right)
|
||||
if op == "!=":
|
||||
return self._bool_result(left != right)
|
||||
if op in {"-eq", "-ne", "-gt", "-ge", "-lt", "-le"}:
|
||||
try:
|
||||
li = int(left)
|
||||
ri = int(right)
|
||||
except ValueError:
|
||||
return TextHandleResult(2, "test 的数字比较参数必须是整数")
|
||||
mapping = {
|
||||
"-eq": li == ri,
|
||||
"-ne": li != ri,
|
||||
"-gt": li > ri,
|
||||
"-ge": li >= ri,
|
||||
"-lt": li < ri,
|
||||
"-le": li <= ri,
|
||||
}
|
||||
return self._bool_result(mapping[op])
|
||||
return TextHandleResult(2, f"test 不支持的操作符: {op}")
|
||||
|
||||
return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}")
|
||||
|
||||
@ -6,29 +6,34 @@ from loguru import logger
|
||||
from nonebot import on_message
|
||||
import nonebot
|
||||
from nonebot.rule import to_me
|
||||
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
|
||||
on_alconna)
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
from konabot.common import username
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.pager import PagerQuery
|
||||
from konabot.plugins.kona_ph.core.message import (get_daily_report,
|
||||
get_daily_report_v2,
|
||||
get_puzzle_description,
|
||||
get_submission_message)
|
||||
from konabot.plugins.kona_ph.core.message import (
|
||||
get_daily_report,
|
||||
get_daily_report_v2,
|
||||
get_puzzle_description,
|
||||
get_submission_message,
|
||||
)
|
||||
from konabot.plugins.kona_ph.core.storage import get_today_date
|
||||
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE,
|
||||
create_admin_commands,
|
||||
puzzle_manager)
|
||||
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
|
||||
from konabot.plugins.poster.service import broadcast
|
||||
from konabot.plugins.kona_ph.manager import (
|
||||
PUZZLE_PAGE_SIZE,
|
||||
create_admin_commands,
|
||||
puzzle_manager,
|
||||
)
|
||||
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
|
||||
|
||||
create_admin_commands()
|
||||
register_poster_info("每日谜题", info=PosterInfo(
|
||||
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
|
||||
description="此方 BOT 每日谜题推送",
|
||||
))
|
||||
register_poster_info(
|
||||
"每日谜题",
|
||||
info=PosterInfo(
|
||||
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
|
||||
description="此方 BOT 每日谜题推送",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
cmd_submit = on_message(rule=to_me())
|
||||
@ -44,16 +49,22 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
|
||||
if isinstance(result, str):
|
||||
await target.send_message(result)
|
||||
else:
|
||||
await target.send_message(get_submission_message(
|
||||
daily_puzzle_info=result.info,
|
||||
submission=result.submission,
|
||||
puzzle=result.puzzle,
|
||||
))
|
||||
await target.send_message(
|
||||
get_submission_message(
|
||||
daily_puzzle_info=result.info,
|
||||
submission=result.submission,
|
||||
puzzle=result.puzzle,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
cmd_query = on_alconna(Alconna(
|
||||
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)"
|
||||
), rule=to_me())
|
||||
cmd_query = on_alconna(
|
||||
Alconna(
|
||||
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)"
|
||||
),
|
||||
rule=to_me(),
|
||||
)
|
||||
|
||||
|
||||
@cmd_query.handle()
|
||||
async def _(target: DepLongTaskTarget):
|
||||
@ -64,9 +75,8 @@ async def _(target: DepLongTaskTarget):
|
||||
await target.send_message(get_puzzle_description(p))
|
||||
|
||||
|
||||
cmd_query_submission = on_alconna(Alconna(
|
||||
"今日答题情况"
|
||||
), rule=to_me())
|
||||
cmd_query_submission = on_alconna(Alconna("今日答题情况"), rule=to_me())
|
||||
|
||||
|
||||
@cmd_query_submission.handle()
|
||||
async def _(target: DepLongTaskTarget):
|
||||
@ -77,11 +87,15 @@ async def _(target: DepLongTaskTarget):
|
||||
await target.send_message(get_daily_report_v2(manager, gid))
|
||||
|
||||
|
||||
cmd_history = on_alconna(Alconna(
|
||||
"re:历史(题目|谜题)",
|
||||
Args["page?", int],
|
||||
Args["index_id?", str],
|
||||
), rule=to_me())
|
||||
cmd_history = on_alconna(
|
||||
Alconna(
|
||||
"re:历史(题目|谜题)",
|
||||
Args["page?", int],
|
||||
Args["index_id?", str],
|
||||
),
|
||||
rule=to_me(),
|
||||
)
|
||||
|
||||
|
||||
@cmd_history.handle()
|
||||
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
@ -105,10 +119,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||||
)
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||||
for p, d in puzzles:
|
||||
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
|
||||
msg = msg.text(
|
||||
@ -120,22 +134,26 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
await target.send_message(msg)
|
||||
|
||||
|
||||
cmd_leadboard = on_alconna(Alconna(
|
||||
"re:此方(解谜|谜题)排行榜",
|
||||
Args["page?", int],
|
||||
))
|
||||
cmd_leadboard = on_alconna(
|
||||
Alconna(
|
||||
"re:此方(解谜|谜题)排行榜",
|
||||
Args["page?", int],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@cmd_leadboard.handle()
|
||||
async def _(target: DepLongTaskTarget, page: int = 1):
|
||||
async with puzzle_manager() as manager:
|
||||
result = manager.get_leadboard(PagerQuery(page, 10))
|
||||
await target.send_message(result.to_unimessage(
|
||||
title="此方解谜排行榜",
|
||||
formatter=lambda data: (
|
||||
f"✨ {data[1]} 已完成 | "
|
||||
f"{username.get_username(data[0])}"
|
||||
await target.send_message(
|
||||
result.to_unimessage(
|
||||
title="此方解谜排行榜",
|
||||
formatter=lambda data: (
|
||||
f"✨ {data[1]} 已完成 | {username.get_username(data[0])}"
|
||||
),
|
||||
)
|
||||
))
|
||||
)
|
||||
|
||||
|
||||
@scheduler.scheduled_job("cron", hour="8")
|
||||
@ -155,4 +173,3 @@ async def _():
|
||||
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@ -1,50 +1,54 @@
|
||||
import datetime
|
||||
from math import ceil
|
||||
|
||||
from nonebot import get_plugin_config
|
||||
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
|
||||
Subcommand, SubcommandResult, UniMessage,
|
||||
on_alconna)
|
||||
from pydantic import BaseModel
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
Args,
|
||||
Image,
|
||||
Option,
|
||||
Query,
|
||||
Subcommand,
|
||||
SubcommandResult,
|
||||
UniMessage,
|
||||
on_alconna,
|
||||
)
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.nb.extract_image import download_image_bytes
|
||||
from konabot.common.permsys import DepPermManager, require_permission
|
||||
from konabot.common.username import get_username
|
||||
from konabot.plugins.kona_ph.core.image import get_image_manager
|
||||
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
|
||||
get_puzzle_info_message,
|
||||
get_submission_message)
|
||||
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
|
||||
get_today_date,
|
||||
puzzle_manager)
|
||||
from konabot.plugins.poster.service import broadcast
|
||||
from konabot.plugins.kona_ph.core.message import (
|
||||
get_puzzle_description,
|
||||
get_puzzle_hint_list,
|
||||
get_puzzle_info_message,
|
||||
get_submission_message,
|
||||
)
|
||||
from konabot.plugins.kona_ph.core.storage import (
|
||||
Puzzle,
|
||||
PuzzleHint,
|
||||
PuzzleManager,
|
||||
get_today_date,
|
||||
puzzle_manager,
|
||||
)
|
||||
from konabot.common.subscribe import broadcast
|
||||
|
||||
PUZZLE_PAGE_SIZE = 10
|
||||
|
||||
|
||||
class PuzzleConfig(BaseModel):
|
||||
plugin_puzzle_manager: list[str] = []
|
||||
plugin_puzzle_admin: list[str] = []
|
||||
plugin_puzzle_playgroup: list[str] = []
|
||||
|
||||
|
||||
config = get_plugin_config(PuzzleConfig)
|
||||
|
||||
|
||||
def is_puzzle_manager(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
|
||||
|
||||
|
||||
def is_puzzle_admin(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_admin
|
||||
|
||||
|
||||
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
|
||||
async def check_puzzle(
|
||||
manager: PuzzleManager,
|
||||
perm: DepPermManager,
|
||||
raw_id: str,
|
||||
event: Event,
|
||||
target: DepLongTaskTarget,
|
||||
) -> Puzzle:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
raise BotExceptionMessage("没有这个谜题")
|
||||
puzzle = manager.puzzle_data[raw_id]
|
||||
if is_puzzle_admin(target):
|
||||
if await perm.check_has_permission(event, "konaph.admin"):
|
||||
return puzzle
|
||||
if target.target_id != puzzle.author_id:
|
||||
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
|
||||
@ -60,7 +64,9 @@ def create_admin_commands():
|
||||
Subcommand("unready", Args["raw_id", str], dest="unready"),
|
||||
Subcommand("info", Args["raw_id", str], dest="info"),
|
||||
Subcommand("my", Args["page?", int], dest="my"),
|
||||
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
|
||||
Subcommand(
|
||||
"all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"
|
||||
),
|
||||
Subcommand("pin", Args["raw_id?", str], dest="pin"),
|
||||
Subcommand("unpin", dest="unpin"),
|
||||
Subcommand(
|
||||
@ -115,11 +121,11 @@ def create_admin_commands():
|
||||
dest="hint",
|
||||
),
|
||||
),
|
||||
rule=is_puzzle_manager,
|
||||
rule=require_permission("konaph.manager"),
|
||||
)
|
||||
|
||||
@cmd_admin.assign("$main")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event):
|
||||
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
|
||||
msg = msg.text("konaph create - 创建一个新的谜题\n")
|
||||
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
|
||||
@ -132,7 +138,7 @@ def create_admin_commands():
|
||||
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
|
||||
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
|
||||
|
||||
if is_puzzle_admin(target):
|
||||
if await pm.check_has_permission(event, "konaph.admin"):
|
||||
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
|
||||
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
|
||||
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
|
||||
@ -145,48 +151,54 @@ def create_admin_commands():
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
puzzle = manager.admin_create_puzzle(target.target_id)
|
||||
await target.send_message(UniMessage.text(
|
||||
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
||||
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
||||
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
||||
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
||||
))
|
||||
await target.send_message(
|
||||
UniMessage.text(
|
||||
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
||||
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
||||
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
||||
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
||||
)
|
||||
)
|
||||
|
||||
@cmd_admin.assign("ready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async def _(
|
||||
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"题目早就准备好啦!"
|
||||
))
|
||||
return await target.send_message(UniMessage.text("题目早就准备好啦!"))
|
||||
p.ready = True
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经准备就绪!"
|
||||
))
|
||||
await target.send_message(
|
||||
UniMessage.text(f"谜题「{p.title}」已经准备就绪!")
|
||||
)
|
||||
|
||||
@cmd_admin.assign("unready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async def _(
|
||||
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if not p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经是未取消状态了!"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!")
|
||||
)
|
||||
if manager.is_puzzle_published(p.raw_id):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"已发布的谜题不能取消准备状态!"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text("已发布的谜题不能取消准备状态!")
|
||||
)
|
||||
|
||||
p.ready = False
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经取消准备!"
|
||||
))
|
||||
await target.send_message(
|
||||
UniMessage.text(f"谜题「{p.title}」已经取消准备!")
|
||||
)
|
||||
|
||||
@cmd_admin.assign("info")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async def _(
|
||||
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
await target.send_message(get_puzzle_info_message(manager, p))
|
||||
|
||||
@cmd_admin.assign("my")
|
||||
@ -194,15 +206,15 @@ def create_admin_commands():
|
||||
async with puzzle_manager() as manager:
|
||||
puzzles = manager.get_puzzles_of_user(target.target_id)
|
||||
if len(puzzles) == 0:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!")
|
||||
)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||||
)
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||||
message = UniMessage.text("==== 我的谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
@ -220,11 +232,15 @@ def create_admin_commands():
|
||||
await target.send_message(message)
|
||||
|
||||
@cmd_admin.assign("all")
|
||||
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
ready: Query[bool] = Query("all.ready"),
|
||||
page: int = 1,
|
||||
):
|
||||
if not perm.check_has_permission(event, "konaph.admin"):
|
||||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||
async with puzzle_manager() as manager:
|
||||
puzzles = [*manager.puzzle_data.values()]
|
||||
if ready.available:
|
||||
@ -232,10 +248,10 @@ def create_admin_commands():
|
||||
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||||
)
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||||
message = UniMessage.text("==== 所有谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
@ -253,32 +269,30 @@ def create_admin_commands():
|
||||
await target.send_message(message)
|
||||
|
||||
@cmd_admin.assign("pin")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str = ""):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async def _(
|
||||
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = ""
|
||||
):
|
||||
if not perm.check_has_permission(event, "konaph.admin"):
|
||||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id == "":
|
||||
if manager.puzzle_pinned:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}")
|
||||
)
|
||||
return await target.send_message("没有置顶谜题")
|
||||
if raw_id not in manager.unpublished_puzzles:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这个谜题已经发布了,或者还没准备好,或者不存在"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在")
|
||||
)
|
||||
manager.admin_pin_puzzle(raw_id)
|
||||
return await target.send_message(f"已置顶谜题 {raw_id}")
|
||||
|
||||
@cmd_admin.assign("unpin")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager):
|
||||
if not perm.check_has_permission(event, "konaph.admin"):
|
||||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||
async with puzzle_manager() as manager:
|
||||
manager.admin_pin_puzzle("")
|
||||
return await target.send_message("已取消所有置顶")
|
||||
@ -286,6 +300,8 @@ def create_admin_commands():
|
||||
@cmd_admin.assign("modify")
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
raw_id: str = "",
|
||||
title: str | None = None,
|
||||
description: str | None = None,
|
||||
@ -306,7 +322,7 @@ def create_admin_commands():
|
||||
image_manager = get_image_manager()
|
||||
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if title is not None:
|
||||
p.title = title
|
||||
if description is not None:
|
||||
@ -329,11 +345,14 @@ def create_admin_commands():
|
||||
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
|
||||
|
||||
@cmd_admin.assign("publish")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
raw_id: str | None = None,
|
||||
):
|
||||
if not perm.check_has_permission(event, "konaph.admin"):
|
||||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||
today = get_today_date()
|
||||
async with puzzle_manager() as manager:
|
||||
if today in manager.daily_puzzle_of_date:
|
||||
@ -348,46 +367,64 @@ def create_admin_commands():
|
||||
return await target.send_message("Ok!")
|
||||
|
||||
@cmd_admin.assign("preview")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str):
|
||||
async def _(
|
||||
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
return await target.send_message(get_puzzle_description(p))
|
||||
|
||||
@cmd_admin.assign("get-submits")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str):
|
||||
async def _(
|
||||
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
puzzle = manager.puzzle_data.get(raw_id)
|
||||
if puzzle is None:
|
||||
return await target.send_message("没有这个谜题")
|
||||
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
|
||||
if (
|
||||
not perm.check_has_permission(event, "konaph.admin")
|
||||
and target.target_id != puzzle.author_id
|
||||
):
|
||||
return await target.send_message("你没有权限预览这个谜题")
|
||||
|
||||
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
|
||||
submits = manager.submissions.get(raw_id, {})
|
||||
for uid, ls in submits.items():
|
||||
s = ', '.join((i.flag for i in ls))
|
||||
s = ", ".join((i.flag for i in ls))
|
||||
msg = msg.text(f"- {get_username(uid)}:{s}\n")
|
||||
return await target.send_message(msg)
|
||||
|
||||
@cmd_admin.assign("test")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
submission: str,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
):
|
||||
"""
|
||||
测试一道谜题的回答,并给出结果
|
||||
"""
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
result = p.check_submission(submission)
|
||||
msg = get_submission_message(p, result)
|
||||
return await target.send_message("[测试提交] " + msg)
|
||||
|
||||
@cmd_admin.assign("subcommands.hint")
|
||||
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
subcommands: Query[SubcommandResult] = Query("subcommands.hint"),
|
||||
):
|
||||
if len(subcommands.result.subcommands) > 0:
|
||||
return
|
||||
return await target.send_message(
|
||||
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
|
||||
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
|
||||
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
|
||||
.text(
|
||||
"- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n"
|
||||
)
|
||||
.text("- konaph hint modify <id> <hint_id>\n")
|
||||
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
|
||||
.text(" - --message <message>\n - 更改提示文本\n")
|
||||
@ -402,9 +439,11 @@ def create_admin_commands():
|
||||
raw_id: str,
|
||||
pattern: str,
|
||||
message: str,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
p.hints[p.hint_id_max + 1] = PuzzleHint(
|
||||
pattern=pattern,
|
||||
message=message,
|
||||
@ -416,9 +455,11 @@ def create_admin_commands():
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
await target.send_message(get_puzzle_hint_list(p))
|
||||
|
||||
@cmd_admin.assign("subcommands.hint.modify")
|
||||
@ -426,12 +467,14 @@ def create_admin_commands():
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
hint_id: int,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
pattern: str | None = None,
|
||||
message: str | None = None,
|
||||
is_checkpoint: bool | None = None,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if hint_id not in p.hints:
|
||||
raise BotExceptionMessage(
|
||||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||||
@ -450,9 +493,11 @@ def create_admin_commands():
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
hint_id: int,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if hint_id not in p.hints:
|
||||
raise BotExceptionMessage(
|
||||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||||
@ -460,5 +505,4 @@ def create_admin_commands():
|
||||
del p.hints[hint_id]
|
||||
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
|
||||
|
||||
|
||||
return cmd_admin
|
||||
|
||||
@ -1,57 +0,0 @@
|
||||
import asyncio
|
||||
import mcstatus
|
||||
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from konabot.common.nb.is_admin import is_admin
|
||||
from mcstatus.responses import JavaStatusResponse
|
||||
|
||||
|
||||
cmd = on_command("宾几人", aliases=set(("宾人数", "mcbingo")), rule=is_admin)
|
||||
|
||||
|
||||
def parse_status(motd: str) -> str:
|
||||
if "[PRE-GAME]" in motd:
|
||||
return "[✨ 空闲]"
|
||||
if "[IN-GAME]" in motd:
|
||||
return "[🕜 游戏中]"
|
||||
if "[POST-GAME]" in motd:
|
||||
return "[🕜 游戏中]"
|
||||
return "[✨ 开放]"
|
||||
|
||||
|
||||
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
|
||||
if isinstance(status, JavaStatusResponse):
|
||||
motd = status.motd.to_plain()
|
||||
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
|
||||
st = parse_status(motd)
|
||||
players_sample = status.players.sample or []
|
||||
players_sample_suffix = ""
|
||||
if len(players_sample) > 0:
|
||||
player_list = [s.name for s in players_sample]
|
||||
players_sample_suffix = " (" + ", ".join(player_list) + ")"
|
||||
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
|
||||
else:
|
||||
return f"{name}: 好像没开"
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(evt: Event):
|
||||
servers = (
|
||||
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
|
||||
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
|
||||
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
|
||||
)
|
||||
|
||||
responses = await asyncio.gather(
|
||||
*map(lambda s: s[0].async_status(), servers),
|
||||
return_exceptions=True,
|
||||
)
|
||||
messages = "\n".join((
|
||||
dump_server_status(n, r)
|
||||
for n, r in zip(map(lambda s: s[1], servers), responses)
|
||||
))
|
||||
|
||||
await UniMessage.text(messages).finish(evt, at_sender=False)
|
||||
|
||||
131
konabot/plugins/minecraft_servers/__init__.py
Normal file
131
konabot/plugins/minecraft_servers/__init__.py
Normal file
@ -0,0 +1,131 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
from typing import Literal
|
||||
import mcstatus
|
||||
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||
from mcstatus.responses import JavaStatusResponse
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
from konabot.common.permsys import DepPermManager, require_permission
|
||||
from konabot.plugins.minecraft_servers.simpfun_server import SimpfunServer
|
||||
|
||||
|
||||
cmd = on_command(
|
||||
"宾几人",
|
||||
aliases=set(("宾人数", "mcbingo")),
|
||||
rule=require_permission("minecraft.bingo.check"),
|
||||
)
|
||||
|
||||
|
||||
def parse_status(motd: str) -> str:
|
||||
if "[PRE-GAME]" in motd:
|
||||
return "[✨ 空闲]"
|
||||
if "[IN-GAME]" in motd:
|
||||
return "[🕜 游戏中]"
|
||||
if "[POST-GAME]" in motd:
|
||||
return "[🕜 游戏中]"
|
||||
return "[✨ 开放]"
|
||||
|
||||
|
||||
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
|
||||
if isinstance(status, JavaStatusResponse):
|
||||
motd = status.motd.to_plain()
|
||||
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
|
||||
st = parse_status(motd)
|
||||
players_sample = status.players.sample or []
|
||||
players_sample_suffix = ""
|
||||
if len(players_sample) > 0:
|
||||
player_list = [s.name for s in players_sample]
|
||||
players_sample_suffix = " (" + ", ".join(player_list) + ")"
|
||||
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
|
||||
else:
|
||||
return f"{name}: 好像没开"
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(evt: Event, pm: DepPermManager):
|
||||
servers = (
|
||||
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
|
||||
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
|
||||
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
|
||||
)
|
||||
|
||||
responses = await asyncio.gather(
|
||||
*map(lambda s: s[0].async_status(), servers),
|
||||
return_exceptions=True,
|
||||
)
|
||||
messages = "\n".join(
|
||||
(
|
||||
dump_server_status(n, r)
|
||||
for n, r in zip(map(lambda s: s[1], servers), responses)
|
||||
)
|
||||
)
|
||||
|
||||
if await pm.check_has_permission(evt, "minecraft.bingo.manipulate"):
|
||||
messages += "\n\n---\n\n你可以使用 bingoman start 开启小帕的 bingo 服,用 bingoman stop 关闭小帕的 bingo 服"
|
||||
|
||||
await UniMessage.text(messages).finish(evt, at_sender=False)
|
||||
|
||||
|
||||
cmd_bingo_manipulate = on_alconna(
|
||||
Alconna("bingoman", Args["action", str]),
|
||||
aliases=("宾服务器", "bingo服"),
|
||||
rule=require_permission("minecraft.bingo.manipulate"),
|
||||
)
|
||||
|
||||
actions: dict[str, Literal["start", "stop", "restart", "kill"]] = {
|
||||
"up": "start",
|
||||
"down": "stop",
|
||||
"start": "start",
|
||||
"stop": "stop",
|
||||
"开机": "start",
|
||||
"关机": "stop",
|
||||
"restart": "restart",
|
||||
"kill": "kill",
|
||||
"重启": "restart",
|
||||
}
|
||||
|
||||
|
||||
@cmd_bingo_manipulate.handle()
|
||||
async def _(action: str, event: Event):
|
||||
server = SimpfunServer.new() # 使用默认配置管理服务器
|
||||
a = actions.get(action.lower().strip())
|
||||
if a is None:
|
||||
await UniMessage.text(f"操作 {action} 不存在").send(event, at_sender=True)
|
||||
return
|
||||
resp = await server.power(a)
|
||||
if resp.code == 200:
|
||||
await UniMessage.text("好了").send(event, at_sender=True)
|
||||
else:
|
||||
await UniMessage.text(f"不好:{resp}").send(event, at_sender=True)
|
||||
|
||||
|
||||
@scheduler.scheduled_job("cron", hour="4,23")
|
||||
async def _():
|
||||
server = SimpfunServer.new()
|
||||
today = datetime.datetime.now()
|
||||
|
||||
# 获取服务器当前状态,重试多次以保证不会误判服务器未开启
|
||||
server_up = False
|
||||
server_players = 0
|
||||
for _ in range(3):
|
||||
mcs = mcstatus.JavaServer("play.simpfun.cn", 11495)
|
||||
try:
|
||||
resp = await mcs.async_status()
|
||||
server_up = True
|
||||
server_players = resp.players.online
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if today.weekday() == 5 and today.hour < 12:
|
||||
# 每周六开机一天,保证可以让服务器不被自动销毁
|
||||
if not server_up:
|
||||
await server.power("start")
|
||||
else:
|
||||
# 每用一个自然日都会计费,所以要赶在这一天结束之前关服
|
||||
# 平时如果没人,也自动关上
|
||||
if server_up and server_players == 0:
|
||||
await server.power("stop")
|
||||
90
konabot/plugins/minecraft_servers/simpfun_server.py
Normal file
90
konabot/plugins/minecraft_servers/simpfun_server.py
Normal file
@ -0,0 +1,90 @@
|
||||
from dataclasses import dataclass
|
||||
import datetime
|
||||
from typing import Literal
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class SimpfunServerConfig(BaseModel):
|
||||
plugin_simpfun_api_key: str = ""
|
||||
plugin_simpfun_base_url: str = "https://api.simpfun.cn"
|
||||
plugin_simpfun_instance_id: int = 0
|
||||
|
||||
|
||||
def get_config():
|
||||
from nonebot import get_plugin_config
|
||||
|
||||
return get_plugin_config(SimpfunServerConfig)
|
||||
|
||||
|
||||
class PowerManageResult(BaseModel):
|
||||
code: int
|
||||
status: bool
|
||||
msg: str
|
||||
|
||||
|
||||
class SimpfunServerDetailUtilization(BaseModel):
|
||||
memory_bytes: int
|
||||
cpu_absolute: float
|
||||
disk_bytes: int
|
||||
network_rx_bytes: int
|
||||
network_tx_bytes: int
|
||||
uptime: float
|
||||
disk_last_check_time: datetime.datetime
|
||||
|
||||
|
||||
class SimpfunServerDetailData(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
is_pro: bool
|
||||
|
||||
status: str
|
||||
"运行中的话,是 running"
|
||||
|
||||
is_suspended: bool
|
||||
utilization: SimpfunServerDetailUtilization
|
||||
|
||||
|
||||
class SimpfunServerDetailResp(BaseModel):
|
||||
code: int
|
||||
data: SimpfunServerDetailData
|
||||
|
||||
|
||||
@dataclass
|
||||
class SimpfunServer:
|
||||
instance_id: int
|
||||
api_key: str
|
||||
base_url: str
|
||||
|
||||
async def power(
|
||||
self, action: Literal["start", "stop", "restart", "kill"]
|
||||
) -> PowerManageResult:
|
||||
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
|
||||
|
||||
async with aiohttp.ClientSession(
|
||||
headers={"Authorization": self.api_key}
|
||||
) as session:
|
||||
async with session.get(url, params={"action": action}) as resp:
|
||||
resp.raise_for_status()
|
||||
return PowerManageResult.model_validate_json(await resp.read())
|
||||
|
||||
async def detail(self) -> SimpfunServerDetailResp:
|
||||
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
|
||||
|
||||
async with aiohttp.ClientSession(
|
||||
headers={"Authorization": self.api_key}
|
||||
) as session:
|
||||
async with session.get(url) as resp:
|
||||
resp.raise_for_status()
|
||||
return SimpfunServerDetailResp.model_validate_json(await resp.read())
|
||||
|
||||
@staticmethod
|
||||
def new(config: SimpfunServerConfig | None = None):
|
||||
if config is None:
|
||||
config = get_config()
|
||||
return SimpfunServer(
|
||||
instance_id=config.plugin_simpfun_instance_id,
|
||||
api_key=config.plugin_simpfun_api_key,
|
||||
base_url=config.plugin_simpfun_base_url,
|
||||
)
|
||||
@ -6,6 +6,7 @@ from konabot.common.nb.match_keyword import match_keyword
|
||||
|
||||
evt_nya = on_message(rule=match_keyword("喵"))
|
||||
|
||||
|
||||
@evt_nya.handle()
|
||||
async def _():
|
||||
await evt_nya.send(await UniMessage().text("喵").export())
|
||||
@ -25,8 +26,9 @@ NYA_SYMBOL_MAPPING = {
|
||||
"~": "~",
|
||||
"~": "~",
|
||||
" ": " ",
|
||||
"\n": "\n",
|
||||
}
|
||||
NYA_SYMBOL_KEEP = "—¹₁²₂³₃⁴₄⁵₅⁶₆⁷₇⁸₈⁹₉⁰₀\n"
|
||||
NYA_SYMBOL_MAPPING.update((k, k) for k in NYA_SYMBOL_KEEP)
|
||||
|
||||
|
||||
async def has_nya(msg: UniMsg) -> bool:
|
||||
@ -49,10 +51,10 @@ async def has_nya(msg: UniMsg) -> bool:
|
||||
|
||||
evt_nya_v2 = on_message(rule=has_nya)
|
||||
|
||||
|
||||
@evt_nya_v2.handle()
|
||||
async def _(msg: UniMsg, evt: Event):
|
||||
text = msg.extract_plain_text()
|
||||
await UniMessage.text(''.join(
|
||||
(NYA_SYMBOL_MAPPING.get(c, '') for c in text)
|
||||
)).send(evt)
|
||||
|
||||
await UniMessage.text("".join((NYA_SYMBOL_MAPPING.get(c, "") for c in text))).send(
|
||||
evt
|
||||
)
|
||||
|
||||
@ -3,14 +3,15 @@ from nonebot_plugin_alconna import Alconna, Args, on_alconna
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.pager import PagerQuery
|
||||
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
|
||||
from konabot.plugins.poster.service import dep_poster_service
|
||||
from konabot.common.subscribe import POSTER_INFO_DATA, dep_poster_service
|
||||
|
||||
|
||||
cmd_subscribe = on_alconna(Alconna(
|
||||
"订阅",
|
||||
Args["channel", str],
|
||||
))
|
||||
cmd_subscribe = on_alconna(
|
||||
Alconna(
|
||||
"订阅",
|
||||
Args["channel", str],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@cmd_subscribe.handle()
|
||||
@ -23,10 +24,12 @@ async def _(target: DepLongTaskTarget, channel: str):
|
||||
await target.send_message(f"已经订阅过「{channel}」了")
|
||||
|
||||
|
||||
cmd_list = on_alconna(Alconna(
|
||||
"re:(?:查询|我的|获取)订阅(列表)?",
|
||||
Args["page?", int],
|
||||
))
|
||||
cmd_list = on_alconna(
|
||||
Alconna(
|
||||
"re:(?:查询|我的|获取)订阅(列表)?",
|
||||
Args["page?", int],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def better_channel_message(channel_id: str) -> str:
|
||||
@ -39,17 +42,24 @@ def better_channel_message(channel_id: str) -> str:
|
||||
@cmd_list.handle()
|
||||
async def _(target: DepLongTaskTarget, page: int = 1):
|
||||
async with dep_poster_service() as service:
|
||||
result = await service.get_channels(target, PagerQuery(
|
||||
page_index=page,
|
||||
page_size=10,
|
||||
))
|
||||
await target.send_message(result.to_unimessage(title="订阅列表", formatter=better_channel_message))
|
||||
result = await service.get_channels(
|
||||
target,
|
||||
PagerQuery(
|
||||
page_index=page,
|
||||
page_size=10,
|
||||
),
|
||||
)
|
||||
await target.send_message(
|
||||
result.to_unimessage(title="订阅列表", formatter=better_channel_message)
|
||||
)
|
||||
|
||||
|
||||
cmd_list_available = on_alconna(Alconna(
|
||||
"re:(查询)?可用订阅(列表)?",
|
||||
Args["page?", int],
|
||||
))
|
||||
cmd_list_available = on_alconna(
|
||||
Alconna(
|
||||
"re:(查询)?可用订阅(列表)?",
|
||||
Args["page?", int],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@cmd_list_available.handle()
|
||||
@ -58,13 +68,17 @@ async def _(target: DepLongTaskTarget, page: int = 1):
|
||||
page_index=page,
|
||||
page_size=10,
|
||||
).apply(sorted(POSTER_INFO_DATA.keys()))
|
||||
await target.send_message(result.to_unimessage(title="可用订阅列表", formatter=better_channel_message))
|
||||
await target.send_message(
|
||||
result.to_unimessage(title="可用订阅列表", formatter=better_channel_message)
|
||||
)
|
||||
|
||||
|
||||
cmd_unsubscribe = on_alconna(Alconna(
|
||||
"取消订阅",
|
||||
Args["channel", str],
|
||||
))
|
||||
cmd_unsubscribe = on_alconna(
|
||||
Alconna(
|
||||
"取消订阅",
|
||||
Args["channel", str],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@cmd_unsubscribe.handle()
|
||||
@ -79,6 +93,7 @@ async def _(target: DepLongTaskTarget, channel: str):
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
async with dep_poster_service() as service:
|
||||
|
||||
@ -4,8 +4,7 @@ from nonebot.internal.adapter.event import Event
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
|
||||
from konabot.plugins.poster.service import broadcast
|
||||
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
|
||||
|
||||
register_poster_info(
|
||||
"二十四节气",
|
||||
@ -98,4 +97,3 @@ async def _(event: Event):
|
||||
|
||||
msg = UniMessage.text(f"现在的节气是{date.term}")
|
||||
await msg.send(event)
|
||||
|
||||
|
||||
@ -1,20 +1,23 @@
|
||||
import asyncio
|
||||
from nonebot import get_driver
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from konabot.plugins.poster.poster_info import register_poster_info, PosterInfo
|
||||
from konabot.plugins.poster.service import broadcast
|
||||
from konabot.common.subscribe import register_poster_info, PosterInfo, broadcast
|
||||
|
||||
|
||||
CHANNEL_STARTUP = "启动通知"
|
||||
|
||||
|
||||
register_poster_info(CHANNEL_STARTUP, PosterInfo(
|
||||
aliases=set(),
|
||||
description="当 Bot 重启时告知",
|
||||
))
|
||||
register_poster_info(
|
||||
CHANNEL_STARTUP,
|
||||
PosterInfo(
|
||||
aliases=set(),
|
||||
description="当 Bot 重启时告知",
|
||||
),
|
||||
)
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
# 要尽量保证接受讯息的服务存在
|
||||
@ -30,4 +33,3 @@ async def _():
|
||||
await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了"))
|
||||
|
||||
asyncio.create_task(task())
|
||||
|
||||
|
||||
210
konabot/plugins/syntactic_sugar/__init__.py
Normal file
210
konabot/plugins/syntactic_sugar/__init__.py
Normal file
@ -0,0 +1,210 @@
|
||||
import copy
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters import Bot, Event, Message
|
||||
from nonebot.log import logger
|
||||
from nonebot.message import handle_event
|
||||
from nonebot.params import CommandArg
|
||||
|
||||
from konabot.common.database import DatabaseManager
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
|
||||
ROOT_PATH = Path(__file__).resolve().parent
|
||||
|
||||
cmd = on_command(cmd="语法糖", aliases={"糖", "sugar"}, block=True)
|
||||
|
||||
db_manager = DatabaseManager()
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def register_startup_hook():
|
||||
await init_db()
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def register_shutdown_hook():
|
||||
await db_manager.close_all_connections()
|
||||
|
||||
|
||||
async def init_db():
|
||||
await db_manager.execute_by_sql_file(ROOT_PATH / "sql" / "create_table.sql")
|
||||
|
||||
table_info = await db_manager.query("PRAGMA table_info(syntactic_sugar)")
|
||||
columns = {str(row.get("name")) for row in table_info}
|
||||
if "channel_id" not in columns:
|
||||
await db_manager.execute(
|
||||
"ALTER TABLE syntactic_sugar ADD COLUMN channel_id VARCHAR(255) NOT NULL DEFAULT ''"
|
||||
)
|
||||
|
||||
await db_manager.execute("DROP INDEX IF EXISTS idx_syntactic_sugar_name_belong_to")
|
||||
await db_manager.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target "
|
||||
"ON syntactic_sugar(name, channel_id, belong_to)"
|
||||
)
|
||||
|
||||
|
||||
def _extract_reply_plain_text(evt: Event) -> str:
|
||||
reply = getattr(evt, "reply", None)
|
||||
if reply is None:
|
||||
return ""
|
||||
|
||||
reply_message = getattr(reply, "message", None)
|
||||
if reply_message is None:
|
||||
return ""
|
||||
|
||||
extract_plain_text = getattr(reply_message, "extract_plain_text", None)
|
||||
if callable(extract_plain_text):
|
||||
return extract_plain_text().strip()
|
||||
return str(reply_message).strip()
|
||||
|
||||
|
||||
def _split_variables(tokens: list[str]) -> tuple[list[str], dict[str, str]]:
|
||||
positional: list[str] = []
|
||||
named: dict[str, str] = {}
|
||||
|
||||
for token in tokens:
|
||||
if "=" in token:
|
||||
key, value = token.split("=", 1)
|
||||
key = key.strip()
|
||||
if key:
|
||||
named[key] = value
|
||||
continue
|
||||
positional.append(token)
|
||||
|
||||
return positional, named
|
||||
|
||||
|
||||
def _render_template(content: str, positional: list[str], named: dict[str, str]) -> str:
|
||||
def replace(match: re.Match[str]) -> str:
|
||||
key = match.group(1).strip()
|
||||
if key.isdigit():
|
||||
idx = int(key) - 1
|
||||
if 0 <= idx < len(positional):
|
||||
return positional[idx]
|
||||
return match.group(0)
|
||||
return named.get(key, match.group(0))
|
||||
|
||||
return re.sub(r"\{([^{}]+)\}", replace, content)
|
||||
|
||||
|
||||
async def _store_sugar(name: str, content: str, belong_to: str, channel_id: str):
|
||||
await db_manager.execute_by_sql_file(
|
||||
ROOT_PATH / "sql" / "insert_sugar.sql",
|
||||
(name, content, belong_to, channel_id),
|
||||
)
|
||||
|
||||
|
||||
async def _delete_sugar(name: str, belong_to: str, channel_id: str):
|
||||
await db_manager.execute(
|
||||
"DELETE FROM syntactic_sugar WHERE name = ? AND belong_to = ? AND channel_id = ?",
|
||||
(name, belong_to, channel_id),
|
||||
)
|
||||
|
||||
|
||||
async def _find_sugar(name: str, belong_to: str, channel_id: str) -> str | None:
|
||||
rows = await db_manager.query(
|
||||
(
|
||||
"SELECT content FROM syntactic_sugar "
|
||||
"WHERE name = ? AND channel_id = ? "
|
||||
"ORDER BY CASE WHEN belong_to = ? THEN 0 ELSE 1 END, id ASC "
|
||||
"LIMIT 1"
|
||||
),
|
||||
(name, channel_id, belong_to),
|
||||
)
|
||||
if not rows:
|
||||
return None
|
||||
return rows[0].get("content")
|
||||
|
||||
|
||||
async def _reinject_command(bot: Bot, evt: Event, command_text: str) -> bool:
|
||||
depth = int(getattr(evt, "_syntactic_sugar_depth", 0))
|
||||
if depth >= 3:
|
||||
return False
|
||||
|
||||
try:
|
||||
cloned_evt = copy.deepcopy(evt)
|
||||
except Exception:
|
||||
logger.exception("语法糖克隆事件失败")
|
||||
return False
|
||||
|
||||
message = getattr(cloned_evt, "message", None)
|
||||
if message is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
msg_obj = type(message)(command_text)
|
||||
except Exception:
|
||||
msg_obj = command_text
|
||||
|
||||
setattr(cloned_evt, "message", msg_obj)
|
||||
if hasattr(cloned_evt, "original_message"):
|
||||
setattr(cloned_evt, "original_message", msg_obj)
|
||||
if hasattr(cloned_evt, "raw_message"):
|
||||
setattr(cloned_evt, "raw_message", command_text)
|
||||
|
||||
setattr(cloned_evt, "_syntactic_sugar_depth", depth + 1)
|
||||
|
||||
try:
|
||||
await handle_event(bot, cloned_evt)
|
||||
except Exception:
|
||||
logger.exception("语法糖回注事件失败")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(bot: Bot, evt: Event, target: DepLongTaskTarget, args: Message = CommandArg()):
|
||||
raw = args.extract_plain_text().strip()
|
||||
if not raw:
|
||||
return
|
||||
|
||||
tokens = raw.split()
|
||||
action = tokens[0]
|
||||
target_id = target.target_id
|
||||
channel_id = target.channel_id
|
||||
|
||||
if action == "存入":
|
||||
if len(tokens) < 2:
|
||||
await cmd.finish("请提供要存入的名称")
|
||||
name = tokens[1].strip()
|
||||
content = " ".join(tokens[2:]).strip()
|
||||
if not content:
|
||||
content = _extract_reply_plain_text(evt)
|
||||
if not content:
|
||||
await cmd.finish("请提供要存入的内容")
|
||||
|
||||
await _store_sugar(name, content, target_id, channel_id)
|
||||
await cmd.finish(f"糖已存入:「{name}」!")
|
||||
|
||||
if action == "删除":
|
||||
if len(tokens) < 2:
|
||||
await cmd.finish("请提供要删除的名称")
|
||||
name = tokens[1].strip()
|
||||
await _delete_sugar(name, target_id, channel_id)
|
||||
await cmd.finish(f"已删除糖:「{name}」!")
|
||||
|
||||
if action == "查看":
|
||||
if len(tokens) < 2:
|
||||
await cmd.finish("请提供要查看的名称")
|
||||
name = tokens[1].strip()
|
||||
content = await _find_sugar(name, target_id, channel_id)
|
||||
if content is None:
|
||||
await cmd.finish(f"没有糖:「{name}」")
|
||||
await cmd.finish(f"糖的内容:「{content}」")
|
||||
|
||||
|
||||
name = action
|
||||
content = await _find_sugar(name, target_id, channel_id)
|
||||
if content is None:
|
||||
await cmd.finish(f"没有糖:「{name}」")
|
||||
|
||||
positional, named = _split_variables(tokens[1:])
|
||||
rendered = _render_template(content, positional, named)
|
||||
|
||||
ok = await _reinject_command(bot, evt, rendered)
|
||||
if not ok:
|
||||
await cmd.finish(f"糖的展开结果:「{rendered}」")
|
||||
12
konabot/plugins/syntactic_sugar/sql/create_table.sql
Normal file
12
konabot/plugins/syntactic_sugar/sql/create_table.sql
Normal file
@ -0,0 +1,12 @@
|
||||
-- 创建语法糖表
|
||||
CREATE TABLE IF NOT EXISTS syntactic_sugar (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
belong_to VARCHAR(255) NOT NULL,
|
||||
channel_id VARCHAR(255) NOT NULL DEFAULT ''
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target
|
||||
ON syntactic_sugar(name, channel_id, belong_to);
|
||||
5
konabot/plugins/syntactic_sugar/sql/insert_sugar.sql
Normal file
5
konabot/plugins/syntactic_sugar/sql/insert_sugar.sql
Normal file
@ -0,0 +1,5 @@
|
||||
-- 插入语法糖,如果同一用户下名称已存在则更新内容
|
||||
INSERT INTO syntactic_sugar (name, content, belong_to, channel_id)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(name, channel_id, belong_to) DO UPDATE SET
|
||||
content = excluded.content;
|
||||
35
konabot/plugins/trpg_roll/__init__.py
Normal file
35
konabot/plugins/trpg_roll/__init__.py
Normal file
@ -0,0 +1,35 @@
|
||||
import re
|
||||
|
||||
import nonebot
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||
|
||||
from konabot.common.nb import match_keyword
|
||||
from konabot.common.permsys import register_default_allow_permission, require_permission
|
||||
from konabot.plugins.trpg_roll.core import RollError, roll_expression
|
||||
|
||||
|
||||
PERMISSION_KEY = "trpg.roll"
|
||||
register_default_allow_permission(PERMISSION_KEY)
|
||||
|
||||
|
||||
matcher = nonebot.on_message(
|
||||
rule=match_keyword.match_keyword(re.compile(r"^roll(?:\s+.+)?$", re.I))
|
||||
& require_permission(PERMISSION_KEY),
|
||||
)
|
||||
|
||||
|
||||
@matcher.handle()
|
||||
async def _(event: Event, msg: UniMsg):
|
||||
text = msg.extract_plain_text().strip()
|
||||
expr = text[4:].strip()
|
||||
|
||||
if not expr:
|
||||
await UniMessage.text("用法:roll 3d6 / roll d20+5 / roll 2d8+1d4+3 / roll 4dF").send(event)
|
||||
return
|
||||
|
||||
try:
|
||||
result = roll_expression(expr)
|
||||
await UniMessage.text(result.format()).send(event)
|
||||
except RollError as e:
|
||||
await UniMessage.text(str(e)).send(event)
|
||||
143
konabot/plugins/trpg_roll/core.py
Normal file
143
konabot/plugins/trpg_roll/core.py
Normal file
@ -0,0 +1,143 @@
|
||||
import random
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
MAX_DICE_COUNT = 100
|
||||
MAX_DICE_FACES = 1000
|
||||
MAX_TERM_COUNT = 20
|
||||
MAX_TOTAL_ROLLS = 200
|
||||
MAX_EXPRESSION_LENGTH = 200
|
||||
MAX_MESSAGE_LENGTH = 1200
|
||||
|
||||
_TOKEN_RE = re.compile(r"([+-]?)(\d*d(?:%|[fF]|\d+)|\d+)")
|
||||
_DICE_RE = re.compile(r"(?i)(\d*)d(%|f|\d+)")
|
||||
|
||||
# 常见跑团表达式示例:3d6、d20+5、2d8+1d4+3、d%、4dF
|
||||
|
||||
|
||||
class RollError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RollTermResult:
|
||||
sign: int
|
||||
source: str
|
||||
detail: str
|
||||
value: int
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RollResult:
|
||||
expression: str
|
||||
total: int
|
||||
terms: list[RollTermResult]
|
||||
|
||||
def format(self) -> str:
|
||||
parts = [f"{term.source}={term.detail}" for term in self.terms]
|
||||
detail = " ".join(parts)
|
||||
text = f"{self.expression} = {self.total}"
|
||||
if detail:
|
||||
text += f"\n{detail}"
|
||||
if len(text) > MAX_MESSAGE_LENGTH:
|
||||
raise RollError("结果过长,请减少骰子数量或简化表达式")
|
||||
return text
|
||||
|
||||
|
||||
def _parse_single_term(raw: str, sign: int, rng: random.Random) -> RollTermResult:
|
||||
dice_match = _DICE_RE.fullmatch(raw)
|
||||
if dice_match:
|
||||
count_text, faces_text = dice_match.groups()
|
||||
count = int(count_text) if count_text else 1
|
||||
if count <= 0:
|
||||
raise RollError("骰子个数必须大于 0")
|
||||
if count > MAX_DICE_COUNT:
|
||||
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
|
||||
|
||||
if faces_text == "%":
|
||||
faces = 100
|
||||
rolls = [rng.randint(1, 100) for _ in range(count)]
|
||||
elif faces_text.lower() == "f":
|
||||
rolls = [rng.choice((-1, 0, 1)) for _ in range(count)]
|
||||
total = sum(rolls) * sign
|
||||
signed = "+" if sign > 0 else "-"
|
||||
return RollTermResult(
|
||||
sign=sign,
|
||||
source=f"{signed}{count}dF",
|
||||
detail="[" + ", ".join(f"{v:+d}" for v in rolls) + "]",
|
||||
value=total,
|
||||
)
|
||||
else:
|
||||
faces = int(faces_text)
|
||||
if faces <= 0:
|
||||
raise RollError("骰子面数必须大于 0")
|
||||
if faces > MAX_DICE_FACES:
|
||||
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
|
||||
rolls = [rng.randint(1, faces) for _ in range(count)]
|
||||
|
||||
total = sum(rolls) * sign
|
||||
signed = "+" if sign > 0 else "-"
|
||||
return RollTermResult(
|
||||
sign=sign,
|
||||
source=f"{signed}{count}d{faces_text.upper()}",
|
||||
detail="[" + ", ".join(map(str, rolls)) + "]",
|
||||
value=total,
|
||||
)
|
||||
|
||||
value = int(raw) * sign
|
||||
signed = "+" if sign > 0 else "-"
|
||||
return RollTermResult(sign=sign, source=f"{signed}{raw}", detail=str(abs(value)), value=value)
|
||||
|
||||
|
||||
def roll_expression(expr: str, rng: random.Random | None = None) -> RollResult:
|
||||
expr = expr.strip().replace(" ", "")
|
||||
if not expr:
|
||||
raise RollError("请提供要掷的表达式,例如 roll 3d6 或 roll d20+5")
|
||||
if len(expr) > MAX_EXPRESSION_LENGTH:
|
||||
raise RollError("表达式太长了")
|
||||
|
||||
matches = list(_TOKEN_RE.finditer(expr))
|
||||
if not matches:
|
||||
raise RollError("无法解析表达式,请使用如 3d6、d20+5、2d8+1d4+3、4dF 这样的格式")
|
||||
|
||||
rebuilt = "".join(m.group(0) for m in matches)
|
||||
if rebuilt != expr:
|
||||
raise RollError("表达式中含有无法识别的内容")
|
||||
if len(matches) > MAX_TERM_COUNT:
|
||||
raise RollError(f"表达式项数不能超过 {MAX_TERM_COUNT}")
|
||||
|
||||
total_rolls = 0
|
||||
for m in matches:
|
||||
token = m.group(2)
|
||||
dice_match = _DICE_RE.fullmatch(token)
|
||||
if dice_match:
|
||||
count_text, faces_text = dice_match.groups()
|
||||
count = int(count_text) if count_text else 1
|
||||
if count <= 0:
|
||||
raise RollError("骰子个数必须大于 0")
|
||||
if count > MAX_DICE_COUNT:
|
||||
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
|
||||
if faces_text not in {"%", "f", "F"}:
|
||||
faces = int(faces_text)
|
||||
if faces <= 0:
|
||||
raise RollError("骰子面数必须大于 0")
|
||||
if faces > MAX_DICE_FACES:
|
||||
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
|
||||
total_rolls += count
|
||||
if total_rolls > MAX_TOTAL_ROLLS:
|
||||
raise RollError(f"一次最多只能实际掷 {MAX_TOTAL_ROLLS} 个骰子")
|
||||
|
||||
rng = rng or random.Random()
|
||||
terms: list[RollTermResult] = []
|
||||
total = 0
|
||||
for idx, match in enumerate(matches):
|
||||
sign_text, raw = match.groups()
|
||||
sign = -1 if sign_text == "-" else 1
|
||||
if idx == 0 and sign_text == "":
|
||||
sign = 1
|
||||
term = _parse_single_term(raw, sign, rng)
|
||||
terms.append(term)
|
||||
total += term.value
|
||||
|
||||
return RollResult(expression=expr, total=total, terms=terms)
|
||||
@ -1,9 +1,11 @@
|
||||
import asyncio
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import cast
|
||||
import zipfile
|
||||
|
||||
from loguru import logger
|
||||
from nonebot import on_command
|
||||
@ -13,22 +15,99 @@ from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
|
||||
from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot
|
||||
from nonebot.adapters.onebot.v11.message import Message as OB11Message
|
||||
|
||||
from konabot.common.artifact import ArtifactDepends, ensure_artifact, register_artifacts
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.path import TMP_PATH
|
||||
from konabot.common.path import BINARY_PATH, TMP_PATH
|
||||
|
||||
|
||||
arti_typst_linux = ArtifactDepends(
|
||||
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz",
|
||||
sha256="a6044cbad2a954deb921167e257e120ac0a16b20339ec01121194ff9d394996d",
|
||||
target=BINARY_PATH / "typst.tar.xz",
|
||||
required_os="Linux",
|
||||
required_arch="x86_64",
|
||||
)
|
||||
arti_typst_windows = ArtifactDepends(
|
||||
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-pc-windows-msvc.zip",
|
||||
sha256="51353994ac83218c3497052e89b2c432c53b9d4439cdc1b361e2ea4798ebfc13",
|
||||
target=BINARY_PATH / "typst.zip",
|
||||
required_os="Windows",
|
||||
required_arch="AMD64",
|
||||
)
|
||||
|
||||
|
||||
bin_path: Path | None = None
|
||||
|
||||
|
||||
@arti_typst_linux.on_finished
|
||||
async def _(downloaded: bool):
|
||||
global bin_path
|
||||
|
||||
tar_path = arti_typst_linux.target
|
||||
bin_path = BINARY_PATH / "typst"
|
||||
|
||||
if downloaded or not bin_path.exists():
|
||||
bin_path.unlink(missing_ok=True)
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
"tar",
|
||||
"-xvf",
|
||||
tar_path,
|
||||
"--strip-components=1",
|
||||
"-C",
|
||||
BINARY_PATH,
|
||||
"typst-x86_64-unknown-linux-musl/typst",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode != 0 or not bin_path.exists():
|
||||
logger.warning(
|
||||
"似乎没有成功解压 Typst 二进制文件,检查一下吧! "
|
||||
f"stdout={stdout} stderr={stderr}"
|
||||
)
|
||||
else:
|
||||
os.chmod(bin_path, 0o755)
|
||||
|
||||
|
||||
@arti_typst_windows.on_finished
|
||||
async def _(downloaded: bool):
|
||||
global bin_path
|
||||
zip_path = arti_typst_windows.target
|
||||
bin_path = BINARY_PATH / "typst.exe"
|
||||
|
||||
if downloaded or not bin_path.exists():
|
||||
bin_path.unlink(missing_ok=True)
|
||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||
target_name = "typst-x86_64-pc-windows-msvc/typst.exe"
|
||||
if target_name not in zf.namelist():
|
||||
logger.warning("在 Zip 压缩包里面没有找到目标文件")
|
||||
return
|
||||
zf.extract(target_name, BINARY_PATH)
|
||||
(BINARY_PATH / target_name).rename(bin_path)
|
||||
(BINARY_PATH / "typst-x86_64-pc-windows-msvc").rmdir()
|
||||
|
||||
|
||||
register_artifacts(arti_typst_linux)
|
||||
register_artifacts(arti_typst_windows)
|
||||
|
||||
|
||||
TEMPLATE_PATH = Path(__file__).parent / "template.typ"
|
||||
TEMPLATE = TEMPLATE_PATH.read_text()
|
||||
|
||||
|
||||
def render_sync(code: str) -> bytes:
|
||||
def render_sync(code: str) -> bytes | None:
|
||||
global bin_path
|
||||
|
||||
if bin_path is None:
|
||||
return
|
||||
|
||||
with TemporaryDirectory(dir=TMP_PATH) as tmpdirname:
|
||||
temp_dir = Path(tmpdirname).resolve()
|
||||
temp_typ = temp_dir / "page.typ"
|
||||
temp_typ.write_text(TEMPLATE + "\n\n" + code)
|
||||
|
||||
cmd = [
|
||||
"typst",
|
||||
bin_path,
|
||||
"compile",
|
||||
temp_typ.name,
|
||||
"--format",
|
||||
@ -61,7 +140,7 @@ def render_sync(code: str) -> bytes:
|
||||
return result_png.read_bytes()
|
||||
|
||||
|
||||
async def render(code: str) -> bytes:
|
||||
async def render(code: str) -> bytes | None:
|
||||
task = asyncio.to_thread(lambda: render_sync(code))
|
||||
return await task
|
||||
|
||||
@ -70,7 +149,21 @@ cmd = on_command("typst")
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget):
|
||||
async def _(
|
||||
evt: Event,
|
||||
bot: Bot,
|
||||
msg: UniMsg,
|
||||
target: DepLongTaskTarget,
|
||||
):
|
||||
global bin_path
|
||||
|
||||
# 对于本地机器,一般不会在应用启动时自动下载,这里再保证存在
|
||||
await ensure_artifact(arti_typst_linux)
|
||||
await ensure_artifact(arti_typst_windows)
|
||||
if bin_path is None or not bin_path.exists():
|
||||
logger.warning("当前环境不存在 Typst,但仍然调用了")
|
||||
return
|
||||
|
||||
typst_code = ""
|
||||
if isinstance(evt, OB11MessageEvent):
|
||||
if evt.reply is not None:
|
||||
@ -92,6 +185,8 @@ async def _(evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget):
|
||||
|
||||
try:
|
||||
res = await render(typst_code)
|
||||
if res is None:
|
||||
raise FileNotFoundError("没有渲染出来内容")
|
||||
except FileNotFoundError as e:
|
||||
await target.send_message("渲染出错:内部错误")
|
||||
raise e from e
|
||||
|
||||
115
konabot/plugins/wolfx_eew.py
Normal file
115
konabot/plugins/wolfx_eew.py
Normal file
@ -0,0 +1,115 @@
|
||||
import datetime
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from konabot.common.apis.wolfx import CencEewReport, CencEqReport, wolfx_api
|
||||
from konabot.common.subscribe import PosterInfo, broadcast, register_poster_info
|
||||
|
||||
|
||||
provinces_short = [
|
||||
"北京",
|
||||
"天津",
|
||||
"河北",
|
||||
"山西",
|
||||
"内蒙古",
|
||||
"辽宁",
|
||||
"吉林",
|
||||
"黑龙江",
|
||||
"上海",
|
||||
"江苏",
|
||||
"浙江",
|
||||
"安徽",
|
||||
"福建",
|
||||
"江西",
|
||||
"山东",
|
||||
"河南",
|
||||
"湖北",
|
||||
"湖南",
|
||||
"广东",
|
||||
"广西",
|
||||
"海南",
|
||||
"重庆",
|
||||
"四川",
|
||||
"贵州",
|
||||
"云南",
|
||||
"西藏",
|
||||
"陕西",
|
||||
"甘肃",
|
||||
"青海",
|
||||
"宁夏",
|
||||
"新疆",
|
||||
"香港",
|
||||
"澳门",
|
||||
"台湾",
|
||||
]
|
||||
|
||||
|
||||
register_poster_info(
|
||||
"中国地震台网地震速报",
|
||||
PosterInfo(
|
||||
aliases={
|
||||
"地震速报",
|
||||
"地震预警",
|
||||
},
|
||||
description="来自中国地震台网的地震速报",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
CENC_EEW_DISABLED = True
|
||||
|
||||
|
||||
if not CENC_EEW_DISABLED:
|
||||
|
||||
@wolfx_api.cenc_eew.append
|
||||
async def broadcast_eew(report: CencEewReport):
|
||||
# 这个好像没那么准确...
|
||||
is_cn = any(report.HypoCenter.startswith(prefix) for prefix in provinces_short)
|
||||
if (is_cn and report.Magnitude >= 4.2) or (
|
||||
(not is_cn) and report.Magnitude >= 7.0
|
||||
):
|
||||
# 这是中国地震台网网站上,会默认展示的地震信息的等级
|
||||
origin_time_dt = datetime.datetime.strptime(
|
||||
report.OriginTime, "%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
origin_time_str = (
|
||||
f"{origin_time_dt.month}月"
|
||||
f"{origin_time_dt.day}日"
|
||||
f"{origin_time_dt.hour}时"
|
||||
f"{origin_time_dt.minute}分"
|
||||
)
|
||||
|
||||
# vvv 下面这个其实不准确
|
||||
eid_in_link = report.EventID.split(".")[0]
|
||||
link = f"https://www.cenc.ac.cn/earthquake-manage-publish-web/product-list/{eid_in_link}/summarize"
|
||||
|
||||
msg = UniMessage.text(
|
||||
"据中国地震台网中心 (https://www.cenc.ac.cn/) 报道,"
|
||||
f"北京时间{origin_time_str},"
|
||||
f"{report.HypoCenter}发生{report.Magnitude:.1f}级地震。"
|
||||
f"震源位于 {report.Longitude}° {report.Latitude}°,深度 {report.Depth}km。\n\n"
|
||||
f"详细信息请见 {link}"
|
||||
)
|
||||
await broadcast("中国地震台网地震速报", msg)
|
||||
|
||||
|
||||
@wolfx_api.cenc_eqlist.append
|
||||
async def broadcast_cenc_eqlist(report: CencEqReport):
|
||||
is_cn = any(report.location.startswith(prefix) for prefix in provinces_short)
|
||||
if (is_cn and float(report.magnitude) >= 4.2) or (
|
||||
(not is_cn) and float(report.magnitude) >= 7.0
|
||||
):
|
||||
origin_time_dt = datetime.datetime.strptime(report.time, "%Y-%m-%d %H:%M:%S")
|
||||
origin_time_str = (
|
||||
f"{origin_time_dt.month}月"
|
||||
f"{origin_time_dt.day}日"
|
||||
f"{origin_time_dt.hour}时"
|
||||
f"{origin_time_dt.minute}分"
|
||||
)
|
||||
|
||||
msg = UniMessage.text(
|
||||
"据中国地震台网中心 (https://www.cenc.ac.cn/) 消息,"
|
||||
f"北京时间{origin_time_str},"
|
||||
f"{report.location}发生{report.magnitude}级地震。"
|
||||
f"震源位于 {report.longtitude}° {report.latitude}°,深度 {report.depth}km。\n\n"
|
||||
f"数据来源于 Wolfx OpenAPI,事件 ID: {report.EventID}"
|
||||
)
|
||||
await broadcast("中国地震台网地震速报", msg)
|
||||
25
poetry.lock
generated
25
poetry.lock
generated
@ -4067,6 +4067,29 @@ type = "legacy"
|
||||
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
|
||||
reference = "mirrors"
|
||||
|
||||
[[package]]
|
||||
name = "pytest-mock"
|
||||
version = "3.15.1"
|
||||
description = "Thin-wrapper around the mock package for easier use with pytest"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d"},
|
||||
{file = "pytest_mock-3.15.1.tar.gz", hash = "sha256:1849a238f6f396da19762269de72cb1814ab44416fa73a8686deac10b0d87a0f"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
pytest = ">=6.2.5"
|
||||
|
||||
[package.extras]
|
||||
dev = ["pre-commit", "pytest-asyncio", "tox"]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
|
||||
reference = "mirrors"
|
||||
|
||||
[[package]]
|
||||
name = "python-dotenv"
|
||||
version = "1.2.2"
|
||||
@ -5311,4 +5334,4 @@ reference = "mirrors"
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.12,<4.0"
|
||||
content-hash = "f2d5345d93636e19e49852af636d598c88394aaef4020f383402394b58452e3d"
|
||||
content-hash = "23d2eadd1c36d017ff77934bd02b56e37d26005e6a99793b623c01162b90d67d"
|
||||
|
||||
@ -37,6 +37,8 @@ dependencies = [
|
||||
"pytest (>=8.0.0,<9.0.0)",
|
||||
"nonebug (>=0.4.3,<0.5.0)",
|
||||
"pytest-cov (>=7.0.0,<8.0.0)",
|
||||
"aiosignal (>=1.4.0,<2.0.0)",
|
||||
"pytest-mock (>=3.15.1,<4.0.0)",
|
||||
]
|
||||
|
||||
[tool.poetry]
|
||||
|
||||
@ -12,8 +12,22 @@ def filter(change: Change, path: str) -> bool:
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / ".git").absolute()):
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / "assets" / "oracle" / "image").absolute()):
|
||||
if (
|
||||
Path(path)
|
||||
.absolute()
|
||||
.is_relative_to((base / "assets" / "oracle" / "image").absolute())
|
||||
):
|
||||
# 还要解决坏枪的这个问题
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / "htmlcov").absolute()):
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / "test").absolute()):
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / ".pytest_cache").absolute()):
|
||||
return False
|
||||
if Path(path).absolute().is_relative_to((base / ".ruff_cache").absolute()):
|
||||
return False
|
||||
if path.endswith(".coverage"):
|
||||
return False
|
||||
print(path)
|
||||
return True
|
||||
|
||||
78
tests/services/test_wolfx_api.py
Normal file
78
tests/services/test_wolfx_api.py
Normal file
@ -0,0 +1,78 @@
|
||||
import json
|
||||
from unittest.mock import AsyncMock
|
||||
import pytest
|
||||
|
||||
from konabot.common.apis.wolfx import CencEewReport, WolfxAPIService, WolfxWebSocket
|
||||
|
||||
|
||||
obj_example = {
|
||||
"ID": "bacby4yab1oyb",
|
||||
"EventID": "202603100805.0001",
|
||||
"ReportTime": "2026-03-10 08:05:29",
|
||||
"ReportNum": 1,
|
||||
"OriginTime": "2026-03-10 08:05:29",
|
||||
"HypoCenter": "新疆昌吉州呼图壁县",
|
||||
"Latitude": 43.687,
|
||||
"Longitude": 86.427,
|
||||
"Magnitude": 4.0,
|
||||
"Depth": 14,
|
||||
"MaxIntensity": 5,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wolfx_websocket_handle():
|
||||
ws = WolfxWebSocket("")
|
||||
|
||||
mock_callback = AsyncMock()
|
||||
ws.signal.append(mock_callback)
|
||||
ws.signal.freeze()
|
||||
|
||||
obj1 = {
|
||||
"type": "heartbeat",
|
||||
"ver": 18,
|
||||
"id": "a69edf6436c5b605",
|
||||
"timestamp": 1773111106701,
|
||||
}
|
||||
data1 = json.dumps(obj1).encode()
|
||||
await ws.handle(data1)
|
||||
mock_callback.assert_not_called()
|
||||
mock_callback.reset_mock()
|
||||
|
||||
obj2 = obj_example
|
||||
data2 = json.dumps(obj2).encode()
|
||||
await ws.handle(data2)
|
||||
mock_callback.assert_called_once_with(data2)
|
||||
mock_callback.reset_mock()
|
||||
|
||||
data3 = b"what the f"
|
||||
await ws.handle(data3)
|
||||
mock_callback.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wolfx_bind_pydantic():
|
||||
sv = WolfxAPIService()
|
||||
called: list[CencEewReport] = []
|
||||
|
||||
@sv.cenc_eew.append
|
||||
async def _(data: CencEewReport):
|
||||
called.append(data)
|
||||
|
||||
sv._cenc_eew_ws.signal.freeze()
|
||||
sv.cenc_eew.freeze()
|
||||
|
||||
data = json.dumps(obj_example).encode()
|
||||
await sv._cenc_eew_ws.signal.send(data)
|
||||
|
||||
assert len(called) == 1
|
||||
data = called[0]
|
||||
|
||||
assert data.HypoCenter == obj_example["HypoCenter"]
|
||||
assert data.EventID == obj_example["EventID"]
|
||||
|
||||
# Don't panic when the object is invalid
|
||||
data = json.dumps({"type": "给"}).encode()
|
||||
await sv._cenc_eew_ws.signal.send(data)
|
||||
|
||||
assert len(called) == 1
|
||||
88
tests/test_fx_process.py
Normal file
88
tests/test_fx_process.py
Normal file
@ -0,0 +1,88 @@
|
||||
from importlib.util import module_from_spec, spec_from_file_location
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
from PIL import Image
|
||||
|
||||
|
||||
nonebot.init()
|
||||
|
||||
MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/fx_handle.py"
|
||||
SPEC = spec_from_file_location("test_fx_handle_module", MODULE_PATH)
|
||||
assert SPEC is not None and SPEC.loader is not None
|
||||
fx_handle = module_from_spec(SPEC)
|
||||
SPEC.loader.exec_module(fx_handle)
|
||||
ImageFilterImplement = fx_handle.ImageFilterImplement
|
||||
|
||||
INIT_MODULE_PATH = Path(__file__).resolve().parents[1] / "konabot/plugins/fx_process/__init__.py"
|
||||
INIT_SPEC = spec_from_file_location("test_fx_init_module", INIT_MODULE_PATH)
|
||||
assert INIT_SPEC is not None and INIT_SPEC.loader is not None
|
||||
fx_init = module_from_spec(INIT_SPEC)
|
||||
INIT_SPEC.loader.exec_module(fx_init)
|
||||
prase_input_args = fx_init.prase_input_args
|
||||
|
||||
|
||||
def test_apply_jpeg_damage_keeps_size_and_rgba_mode():
|
||||
image = Image.new("RGBA", (32, 24), (255, 0, 0, 128))
|
||||
|
||||
result = ImageFilterImplement.apply_jpeg_damage(image, 5)
|
||||
|
||||
assert result.size == image.size
|
||||
assert result.mode == "RGBA"
|
||||
assert result.getchannel("A").getextrema() == (128, 128)
|
||||
|
||||
|
||||
def test_apply_jpeg_damage_clamps_quality_range():
|
||||
image = Image.new("RGB", (16, 16), (123, 222, 111))
|
||||
|
||||
low = ImageFilterImplement.apply_jpeg_damage(image, -10)
|
||||
high = ImageFilterImplement.apply_jpeg_damage(image, 999)
|
||||
|
||||
assert low.size == image.size
|
||||
assert high.size == image.size
|
||||
assert low.mode == "RGBA"
|
||||
assert high.mode == "RGBA"
|
||||
|
||||
|
||||
def test_apply_resize_clamps_small_result_to_at_least_one_pixel():
|
||||
image = Image.new("RGBA", (10, 10), (255, 0, 0, 255))
|
||||
|
||||
result = ImageFilterImplement.apply_resize(image, 0.01)
|
||||
|
||||
assert result.size == (1, 1)
|
||||
|
||||
|
||||
def test_apply_resize_negative_x_with_positive_y_only_mirrors_horizontally():
|
||||
image = Image.new("RGBA", (2, 1))
|
||||
image.putpixel((0, 0), (255, 0, 0, 255))
|
||||
image.putpixel((1, 0), (0, 0, 255, 255))
|
||||
|
||||
result = ImageFilterImplement.apply_resize(image, -1, 1)
|
||||
|
||||
assert result.size == (2, 1)
|
||||
assert result.getpixel((0, 0)) == (0, 0, 255, 255)
|
||||
assert result.getpixel((1, 0)) == (255, 0, 0, 255)
|
||||
|
||||
|
||||
def test_apply_resize_negative_scale_without_y_flips_both_axes():
|
||||
image = Image.new("RGBA", (2, 2))
|
||||
image.putpixel((0, 0), (255, 0, 0, 255))
|
||||
image.putpixel((1, 0), (0, 255, 0, 255))
|
||||
image.putpixel((0, 1), (0, 0, 255, 255))
|
||||
image.putpixel((1, 1), (255, 255, 0, 255))
|
||||
|
||||
result = ImageFilterImplement.apply_resize(image, -1)
|
||||
|
||||
assert result.size == (2, 2)
|
||||
assert result.getpixel((0, 0)) == (255, 255, 0, 255)
|
||||
assert result.getpixel((1, 0)) == (0, 0, 255, 255)
|
||||
assert result.getpixel((0, 1)) == (0, 255, 0, 255)
|
||||
assert result.getpixel((1, 1)) == (255, 0, 0, 255)
|
||||
|
||||
|
||||
def test_prase_input_args_parses_resize_second_argument_as_float():
|
||||
filters = prase_input_args("缩放 2 3")
|
||||
|
||||
assert len(filters) == 1
|
||||
assert filters[0].name == "缩放"
|
||||
assert filters[0].args == [2.0, 3.0]
|
||||
40
tests/test_permsys_default_allow.py
Normal file
40
tests/test_permsys_default_allow.py
Normal file
@ -0,0 +1,40 @@
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
import pytest
|
||||
|
||||
from konabot.common.database import DatabaseManager
|
||||
from konabot.common.permsys import PermManager, register_default_allow_permission
|
||||
from konabot.common.permsys.entity import PermEntity
|
||||
from konabot.common.permsys.migrates import execute_migration
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def tempdb():
|
||||
with TemporaryDirectory() as _tempdir:
|
||||
tempdir = Path(_tempdir)
|
||||
db = DatabaseManager(tempdir / "perm.sqlite3")
|
||||
yield db
|
||||
await db.close_all_connections()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_default_allow_permission_records_key():
|
||||
register_default_allow_permission("test.default.allow")
|
||||
|
||||
async with tempdb() as db:
|
||||
async with db.get_conn() as conn:
|
||||
await execute_migration(conn)
|
||||
|
||||
pm = PermManager(db)
|
||||
await pm.update_permission(
|
||||
PermEntity("sys", "global", "global"),
|
||||
"test.default.allow",
|
||||
True,
|
||||
)
|
||||
|
||||
assert await pm.check_has_permission(
|
||||
[PermEntity("dummy", "user", "1"), PermEntity("sys", "global", "global")],
|
||||
"test.default.allow.sub",
|
||||
)
|
||||
75
tests/test_textfx_runtime_limits.py
Normal file
75
tests/test_textfx_runtime_limits.py
Normal file
@ -0,0 +1,75 @@
|
||||
import nonebot
|
||||
|
||||
nonebot.init()
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
from konabot.plugins.handle_text.__init__ import (
|
||||
_get_textfx_user_key,
|
||||
_textfx_running_users,
|
||||
TEXTFX_MAX_RUNTIME_SECONDS,
|
||||
)
|
||||
from konabot.plugins.handle_text.base import PipelineRunner
|
||||
|
||||
|
||||
class DummyEvent:
|
||||
def __init__(self, self_id=None, user_id=None, group_id=None, session_id=None):
|
||||
self.self_id = self_id
|
||||
self.user_id = user_id
|
||||
self.group_id = group_id
|
||||
self._session_id = session_id
|
||||
|
||||
def get_session_id(self):
|
||||
if self._session_id is None:
|
||||
raise RuntimeError('no session')
|
||||
return self._session_id
|
||||
|
||||
|
||||
def test_textfx_user_key_group():
|
||||
evt = DummyEvent(self_id='123', user_id='456', group_id='789')
|
||||
assert _get_textfx_user_key(evt) == '123:789:456'
|
||||
|
||||
|
||||
def test_textfx_user_key_private():
|
||||
evt = DummyEvent(self_id='123', user_id='456')
|
||||
assert _get_textfx_user_key(evt) == '123:private:456'
|
||||
|
||||
|
||||
def test_textfx_user_key_session_fallback():
|
||||
evt = DummyEvent(session_id='console:alice')
|
||||
assert _get_textfx_user_key(evt) == 'session:console:alice'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_textfx_timeout_limit():
|
||||
"""测试脚本执行超时限制"""
|
||||
runner = PipelineRunner.get_runner()
|
||||
|
||||
# 创建一个会超时的脚本(while true 会触发迭代限制,但我们用 sleep 模拟长时间运行)
|
||||
# 由于实际超时是 60 秒,我们不能真的等那么久,所以这个测试验证超时机制存在
|
||||
script = "echo start"
|
||||
parsed = runner.parse_pipeline(script)
|
||||
assert not isinstance(parsed, str), "脚本解析应该成功"
|
||||
|
||||
# 验证 TEXTFX_MAX_RUNTIME_SECONDS 常量存在且合理
|
||||
assert TEXTFX_MAX_RUNTIME_SECONDS == 60
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_textfx_concurrent_limit():
|
||||
"""测试同一用户并发执行限制"""
|
||||
user_key = "test:group:user123"
|
||||
|
||||
# 清理可能的残留状态
|
||||
_textfx_running_users.discard(user_key)
|
||||
|
||||
# 模拟第一个脚本正在运行
|
||||
assert user_key not in _textfx_running_users
|
||||
_textfx_running_users.add(user_key)
|
||||
|
||||
# 验证用户已被标记为运行中
|
||||
assert user_key in _textfx_running_users
|
||||
|
||||
# 清理
|
||||
_textfx_running_users.discard(user_key)
|
||||
assert user_key not in _textfx_running_users
|
||||
225
tests/test_textfx_shell.py
Normal file
225
tests/test_textfx_shell.py
Normal file
@ -0,0 +1,225 @@
|
||||
import pytest
|
||||
import nonebot
|
||||
|
||||
nonebot.init()
|
||||
|
||||
from konabot.plugins.handle_text.base import IfNode, PipelineRunner, TextHandlerEnvironment, WhileNode
|
||||
from konabot.plugins.handle_text.handlers.encoding_handlers import THReverse
|
||||
from konabot.plugins.handle_text.handlers.unix_handlers import (
|
||||
THCat,
|
||||
THEcho,
|
||||
THFalse,
|
||||
THRm,
|
||||
THTest,
|
||||
THTrue,
|
||||
)
|
||||
from konabot.plugins.handle_text.handlers.whitespace_handlers import THTrim
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def runner() -> PipelineRunner:
|
||||
runner = PipelineRunner()
|
||||
runner.register(THEcho())
|
||||
runner.register(THCat())
|
||||
runner.register(THRm())
|
||||
runner.register(THTrue())
|
||||
runner.register(THFalse())
|
||||
runner.register(THTest())
|
||||
runner.register(THReverse())
|
||||
runner.register(THTrim())
|
||||
return runner
|
||||
|
||||
|
||||
def test_parse_pipeline_shell_ops(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello | reverse && test a = a || echo no; echo done > out')
|
||||
assert not isinstance(parsed, str)
|
||||
assert len(parsed.statements) == 2
|
||||
first = parsed.statements[0]
|
||||
second = parsed.statements[1]
|
||||
assert not isinstance(first, IfNode)
|
||||
assert not isinstance(first, WhileNode)
|
||||
assert not isinstance(second, IfNode)
|
||||
assert not isinstance(second, WhileNode)
|
||||
assert len(first.chains) == 3
|
||||
assert first.chains[0].pipeline.commands[0].name == 'echo'
|
||||
assert first.chains[0].pipeline.commands[1].name == 'reverse'
|
||||
assert second.chains[0].pipeline.commands[0].redirects[0].target == 'out'
|
||||
|
||||
|
||||
def test_parse_if_statement(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('if test a = a; then echo yes; else echo no; fi')
|
||||
assert not isinstance(parsed, str)
|
||||
assert len(parsed.statements) == 1
|
||||
stmt = parsed.statements[0]
|
||||
assert isinstance(stmt, IfNode)
|
||||
assert stmt.else_body is not None
|
||||
assert len(stmt.then_body.statements) == 1
|
||||
|
||||
|
||||
def test_parse_while_statement(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while false; do echo yes; done')
|
||||
assert not isinstance(parsed, str)
|
||||
assert len(parsed.statements) == 1
|
||||
stmt = parsed.statements[0]
|
||||
assert isinstance(stmt, WhileNode)
|
||||
assert len(stmt.body.statements) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_pipe(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello | reverse')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert len(results) == 1
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'olleh'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_redirect_and_cat(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello > a; cat a')
|
||||
assert not isinstance(parsed, str)
|
||||
env = TextHandlerEnvironment(False)
|
||||
results = await runner.run_pipeline(parsed, None, env)
|
||||
assert env.buffers['a'] == 'hello'
|
||||
assert results[-1].ostream == 'hello'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_redirect(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello > a; echo world >> a; cat a')
|
||||
assert not isinstance(parsed, str)
|
||||
env = TextHandlerEnvironment(False)
|
||||
results = await runner.run_pipeline(parsed, None, env)
|
||||
assert env.buffers['a'] == 'helloworld'
|
||||
assert results[-1].ostream == 'helloworld'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_and_or_short_circuit(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('test a = b && echo bad || echo ok')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert len(results) == 1
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'ok'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_test_bracket_alias(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('[ 2 -gt 1 ] && echo yes')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'yes'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_test_string_ops(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('test -n abc && echo yes; test -z abc || echo no')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert [r.ostream for r in results] == ['yes', 'no']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_quote_and_trim(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo " hello world " | trim')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'hello world'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_if_then_else(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('if test a = b; then echo yes; else echo no; fi')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'no'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_if_then_without_else(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('if test a = a; then echo yes; fi')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'yes'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nested_if(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline(
|
||||
'if test a = a; then if test b = c; then echo x; else echo y; fi; else echo z; fi'
|
||||
)
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'y'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_negate_pipeline(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('! test a = b && echo ok')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'ok'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_true_false(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('true && echo yes; false || echo no')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert [r.ostream for r in results] == ['yes', 'no']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_false_noop(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while false; do echo yes; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_limit_guard(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while true; do echo yes; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 2
|
||||
assert 'while 循环超过最大迭代次数限制' in (results[0].ostream or '')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_with_immediate_break_condition(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while ! false; do false; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_body_can_use_if(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while ! false; do if true; then false; fi; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_echo_empty_string(runner: PipelineRunner):
|
||||
"""测试 echo 空字符串"""
|
||||
# 双引号空字符串
|
||||
parsed = runner.parse_pipeline('echo ""')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == ''
|
||||
|
||||
# 单引号空字符串
|
||||
parsed2 = runner.parse_pipeline("echo ''")
|
||||
assert not isinstance(parsed2, str)
|
||||
results2 = await runner.run_pipeline(parsed2, None, TextHandlerEnvironment(False))
|
||||
assert results2[0].code == 0
|
||||
assert results2[0].ostream == ''
|
||||
66
tests/test_trpg_roll.py
Normal file
66
tests/test_trpg_roll.py
Normal file
@ -0,0 +1,66 @@
|
||||
import random
|
||||
|
||||
import pytest
|
||||
|
||||
from konabot.plugins.trpg_roll.core import RollError, roll_expression
|
||||
|
||||
|
||||
class FakeRandom:
|
||||
def __init__(self, randint_values: list[int] | None = None, choice_values: list[int] | None = None):
|
||||
self._randint_values = list(randint_values or [])
|
||||
self._choice_values = list(choice_values or [])
|
||||
|
||||
def randint(self, _a: int, _b: int) -> int:
|
||||
assert self._randint_values
|
||||
return self._randint_values.pop(0)
|
||||
|
||||
def choice(self, _seq):
|
||||
assert self._choice_values
|
||||
return self._choice_values.pop(0)
|
||||
|
||||
|
||||
def test_roll_expression_basic():
|
||||
rng = FakeRandom(randint_values=[2, 4, 5])
|
||||
result = roll_expression("3d6", rng=rng)
|
||||
|
||||
assert result.total == 11
|
||||
assert result.format() == "3d6 = 11\n+3d6=[2, 4, 5]"
|
||||
|
||||
|
||||
def test_roll_expression_multiple_terms():
|
||||
rng = FakeRandom(randint_values=[14, 3, 1])
|
||||
result = roll_expression("d20+1d4-2", rng=rng)
|
||||
|
||||
assert result.total == 15
|
||||
assert result.format() == "d20+1d4-2 = 15\n+1d20=[14] +1d4=[3] -2=2"
|
||||
|
||||
|
||||
def test_roll_expression_df():
|
||||
rng = FakeRandom(choice_values=[-1, 0, 1, 1])
|
||||
result = roll_expression("4dF", rng=rng)
|
||||
|
||||
assert result.total == 1
|
||||
assert result.format() == "4dF = 1\n+4dF=[-1, +0, +1, +1]"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("expr", "message"),
|
||||
[
|
||||
("", "请提供要掷的表达式"),
|
||||
("abc", "无法解析表达式"),
|
||||
("1d0", "骰子面数必须大于 0"),
|
||||
("0d6", "骰子个数必须大于 0"),
|
||||
("101d6", "单项最多只能掷 100 个骰子"),
|
||||
("1d1001", "骰子面数不能超过 1000"),
|
||||
("201d1", "单项最多只能掷 100 个骰子"),
|
||||
("1d6*2", "表达式中含有无法识别的内容"),
|
||||
],
|
||||
)
|
||||
def test_roll_expression_invalid(expr: str, message: str):
|
||||
with pytest.raises(RollError, match=message):
|
||||
roll_expression(expr, rng=random.Random(0))
|
||||
|
||||
|
||||
def test_roll_expression_total_roll_limit():
|
||||
with pytest.raises(RollError, match="一次最多只能实际掷 200 个骰子"):
|
||||
roll_expression("100d6+100d6+1d6", rng=random.Random(0))
|
||||
Reference in New Issue
Block a user