Compare commits

...

12 Commits

Author SHA1 Message Date
8f5061ba41 wolfx api
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-10 11:39:41 +08:00
b3c3c77f3c 添加 Ignore 2026-03-10 11:16:23 +08:00
6a84ce2cd8 提供订阅模块的文档
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-09 14:56:31 +08:00
392c699b33 移动 poster 模块到 common
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-09 14:40:27 +08:00
72e21cd9aa 添加多字符喵对一些符号的响应 2026-03-09 13:46:56 +08:00
f3389ff2b9 添加服务器管理相关,以及 cronjob
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 03:34:14 +08:00
e59d3c2e4b 哎哟喂这个文件怎么没交
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 00:40:11 +08:00
31d19b7ec0 我没辙了直接把测试打包进去吧
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-07 18:41:59 +08:00
c2f677911d 添加一些权限目标
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:36:51 +08:00
f5b81319f8 konaph 接入权限系统
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:15:28 +08:00
870e2383d8 为 Drone 提供单元测试目录 2026-03-07 18:15:16 +08:00
7e8fa45f36 Merge pull request '权限系统' (#55) from feature/permsystem into master
Some checks failed
continuous-integration/drone/push Build is failing
Reviewed-on: #55
2026-03-07 17:55:27 +08:00
31 changed files with 1071 additions and 301 deletions

View File

@ -39,7 +39,7 @@ steps:
commands: commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov-report term-missing:skip-covered - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov=./konabot/ --cov-report term-missing:skip-covered
- name: 发送构建结果到 ntfy - name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy image: parrazam/drone-ntfy
when: when:

9
.gitignore vendored
View File

@ -6,6 +6,10 @@
# 缓存文件 # 缓存文件
__pycache__ __pycache__
/.ruff_cache
/.pytest_cache
/.mypy_cache
/.black_cache
# 可能会偶然生成的 diff 文件 # 可能会偶然生成的 diff 文件
/*.diff /*.diff
@ -14,3 +18,8 @@ __pycache__
/.coverage /.coverage
/.coverage.db /.coverage.db
/htmlcov /htmlcov
# 对手动创建虚拟环境的人
/.venv
/venv
*.egg-info

View File

@ -61,6 +61,7 @@ COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets COPY assets ./assets
COPY scripts ./scripts COPY scripts ./scripts
COPY konabot ./konabot COPY konabot ./konabot
COPY tests ./tests
ENV PYTHONPATH=/app ENV PYTHONPATH=/app

5
bot.py
View File

@ -7,6 +7,7 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
from konabot.common.appcontext import run_afterinit_functions
from konabot.common.log import init_logger from konabot.common.log import init_logger
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.path import LOG_PATH from konabot.common.path import LOG_PATH
@ -56,9 +57,7 @@ def main():
nonebot.load_plugins("konabot/plugins") nonebot.load_plugins("konabot/plugins")
nonebot.load_plugin("nonebot_plugin_analysis_bilibili") nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
from konabot.common import permsys run_afterinit_functions()
permsys.create_startup()
# 注册关闭钩子 # 注册关闭钩子
@driver.on_shutdown @driver.on_shutdown

37
docs/subscribe.md Normal file
View File

@ -0,0 +1,37 @@
# subscribe 模块
一套统一的接口,让用户可以订阅一些延迟或者定时消息。
```python
import asyncio
from pathlib import Path
from konabot.common.subscribe import register_poster_info, broadcast, PosterInfo
from nonebot_plugin_alconna import UniMessage
# 注册了服务信息,用户可以用「查询可用订阅」指令了解可用的订阅清单。
# 用户可以使用「订阅 某某服务通知」或者「订阅 某某服务」来订阅消息。
# 如果用户在群聊发起订阅,则会在 QQ 群订阅,不然会在私聊订阅
register_poster_info("某某服务通知", PosterInfo(
aliases={"某某服务"},
description="告诉你关于某某的最新资讯等信息",
))
async def main():
while True:
# 这里的服务 channel 名字必须填写该服务的名字,不可以是 alias
# 这会给所有订阅了该通道的用户发送「向大家发送纯文本通知」
await broadcast("某某服务通知", "向大家发送纯文本通知")
# 也可以发送 UniMessage 对象,可以构造包含图片的通知等
data = Path('image.png').read_bytes()
await broadcast(
"某某服务通知",
UniMessage.text("很遗憾告诉大家,我们倒闭了:").image(raw=data),
)
await asyncio.sleep(114.514)
```
该模块的代码请查阅 `/konabot/common/subscribe/` 下的文件。

View File

@ -0,0 +1,233 @@
"""
Wolfx 防灾免费 API
"""
import asyncio
import json
from typing import TypeVar, cast
import aiohttp
from aiosignal import Signal
from loguru import logger
from pydantic import BaseModel
import pydantic
from konabot.common.appcontext import after_init
class ScEewReport(BaseModel):
"""
四川地震局报文
"""
ID: str
"EEW 发报 ID"
EventID: str
"EEW 发报事件 ID"
ReportTime: str
"EEW 发报时间(UTC+8)"
ReportNum: int
"EEW 发报数"
OriginTime: str
"发震时间(UTC+8)"
HypoCenter: str
"震源地"
Latitude: float
"震源地纬度"
Longitude: float
"震源地经度"
Magnitude: float
"震级"
Depth: float | None
"震源深度"
MaxIntensity: float
"最大烈度"
class CencEewReport(BaseModel):
"""
中国地震台网报文
"""
ID: str
"EEW 发报 ID"
EventID: str
"EEW 发报事件 ID"
ReportTime: str
"EEW 发报时间(UTC+8)"
ReportNum: int
"EEW 发报数"
OriginTime: str
"发震时间(UTC+8)"
HypoCenter: str
"震源地"
Latitude: float
"震源地纬度"
Longitude: float
"震源地经度"
Magnitude: float
"震级"
Depth: float | None
"震源深度"
MaxIntensity: float
"最大烈度"
class WolfxWebSocket:
def __init__(self, url: str) -> None:
self.url = url
self.signal: Signal[bytes] = Signal(self)
self._running = False
self._task: asyncio.Task | None = None
self._session: aiohttp.ClientSession | None = None
self._ws: aiohttp.ClientWebSocketResponse | None = None
@property
def session(self) -> aiohttp.ClientSession: # pragma: no cover
assert self._session is not None
return self._session
async def start(self): # pragma: no cover
if self._running:
return
self._running = True
self._session = aiohttp.ClientSession()
self._task = asyncio.create_task(self._run())
self.signal.freeze()
async def stop(self): # pragma: no cover
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
if self._session:
await self._session.close()
async def _run(self): # pragma: no cover
retry_delay = 1
while self._running:
try:
async with self.session.ws_connect(self.url) as ws:
self._ws = ws
logger.info(f"Wolfx API 服务连接上了 {self.url} 的 WebSocket")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self.handle(cast(str, msg.data).encode())
elif msg.type == aiohttp.WSMsgType.BINARY:
await self.handle(cast(bytes, msg.data))
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logger.warning("连接 WebSocket 时发生错误")
logger.exception(e)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("Wolfx API 发生未知错误")
logger.exception(e)
self._ws = None
if self._running:
logger.info(f"Wolfx API 准备断线重连 {self.url}")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60)
async def handle(self, data: bytes):
try:
obj = json.loads(data)
except json.JSONDecodeError as e:
logger.warning("解析 Wolfs API 时出错")
logger.exception(e)
return
if obj.get("type") == "heartbeat" or obj.get("type") == "pong":
logger.debug(f"Wolfx API 收到了来自 {self.url} 的心跳: {obj}")
else:
await self.signal.send(data)
T = TypeVar("T", bound=BaseModel)
class WolfxAPIService:
sc_eew: Signal[ScEewReport]
"四川地震局地震速报"
cenc_eew: Signal[CencEewReport]
"中国地震台网地震速报"
def __init__(self) -> None:
self.sc_eew = Signal(self)
self._sc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/sc_eew")
WolfxAPIService.bind(self.sc_eew, self._sc_eew_ws, ScEewReport)
self.cenc_eew = Signal(self)
self._cenc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eew")
WolfxAPIService.bind(self.cenc_eew, self._cenc_eew_ws, CencEewReport)
@staticmethod
def bind(signal: Signal[T], ws: WolfxWebSocket, t: type[T]):
@ws.signal.append
async def _(data: bytes):
try:
obj = t.model_validate_json(data)
logger.info(f"接收到来自 Wolfx API 的信息:{data}")
await signal.send(obj)
except pydantic.ValidationError as e:
logger.warning(f"解析 Wolfx API 时出错 URL={ws.url}")
logger.error(e)
async def start(self): # pragma: no cover
self.cenc_eew.freeze()
self.sc_eew.freeze()
async with asyncio.TaskGroup() as task_group:
task_group.create_task(self._cenc_eew_ws.start())
task_group.create_task(self._sc_eew_ws.start())
async def stop(self): # pragma: no cover
async with asyncio.TaskGroup() as task_group:
task_group.create_task(self._cenc_eew_ws.stop())
task_group.create_task(self._sc_eew_ws.stop())
wolfx_api = WolfxAPIService()
@after_init
def init(): # pragma: no cover
import nonebot
driver = nonebot.get_driver()
@driver.on_startup
async def _():
await wolfx_api.start()
@driver.on_shutdown
async def _():
await wolfx_api.stop()

View File

@ -0,0 +1,15 @@
from typing import Any, Callable
AFTER_INIT_FUNCTION = Callable[[], Any]
_after_init_functions: list[AFTER_INIT_FUNCTION] = []
def after_init(func: AFTER_INIT_FUNCTION):
_after_init_functions.append(func)
def run_afterinit_functions(): # pragma: no cover
for f in _after_init_functions:
f()

View File

@ -4,6 +4,7 @@ from nonebot.adapters import Event
from nonebot.params import Depends from nonebot.params import Depends
from nonebot.rule import Rule from nonebot.rule import Rule
from konabot.common.appcontext import after_init
from konabot.common.database import DatabaseManager from konabot.common.database import DatabaseManager
from konabot.common.pager import PagerQuery from konabot.common.pager import PagerQuery
from konabot.common.path import DATA_PATH from konabot.common.path import DATA_PATH
@ -73,6 +74,7 @@ def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma:
return PermManager(_db) return PermManager(_db)
@after_init
def create_startup(): # pragma: no cover def create_startup(): # pragma: no cover
from konabot.common.nb.is_admin import cfg from konabot.common.nb.is_admin import cfg

View File

@ -0,0 +1,11 @@
"""
Subscribe 模块,用于向一些订阅的频道广播消息
"""
from .service import broadcast as broadcast
from .service import dep_poster_service as dep_poster_service
from .service import DepPosterService as DepPosterService
from .service import PosterService as PosterService
from .subscribe_info import PosterInfo as PosterInfo
from .subscribe_info import POSTER_INFO_DATA as POSTER_INFO_DATA
from .subscribe_info import register_poster_info as register_poster_info

View File

@ -6,7 +6,8 @@ from pydantic import BaseModel, ValidationError
from konabot.common.longtask import LongTaskTarget from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult from konabot.common.pager import PagerQuery, PagerResult
from konabot.common.path import DATA_PATH from konabot.common.path import DATA_PATH
from konabot.plugins.poster.repository import IPosterRepo
from .repository import IPosterRepo
class ChannelData(BaseModel): class ChannelData(BaseModel):
@ -18,9 +19,9 @@ class PosterData(BaseModel):
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool: def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
if (target1.is_private_chat and not target2.is_private_chat): if target1.is_private_chat and not target2.is_private_chat:
return False return False
if (target2.is_private_chat and not target1.is_private_chat): if target2.is_private_chat and not target1.is_private_chat:
return False return False
if target1.platform != target2.platform: if target1.platform != target2.platform:
return False return False
@ -58,7 +59,9 @@ class LocalPosterRepo(IPosterRepo):
len1 = len(self.data.channels[channel].targets) len1 = len(self.data.channels[channel].targets)
return len0 != len1 return len0 != len1
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]: async def get_subscribed_channels(
self, target: LongTaskTarget, pager: PagerQuery
) -> PagerResult[str]:
channels: list[str] = [] channels: list[str] = []
for channel_id, channel in self.data.channels.items(): for channel_id, channel in self.data.channels.items():
for t in channel.targets: for t in channel.targets:
@ -95,7 +98,9 @@ async def local_poster_data():
data = PosterData() data = PosterData()
else: else:
try: try:
data = PosterData.model_validate_json(LOCAL_POSTER_DATA_PATH.read_text()) data = PosterData.model_validate_json(
LOCAL_POSTER_DATA_PATH.read_text()
)
except ValidationError: except ValidationError:
data = PosterData() data = PosterData()
yield data yield data
@ -109,4 +114,3 @@ async def local_poster():
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)] DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]

View File

@ -4,9 +4,10 @@ from nonebot.params import Depends
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import LongTaskTarget from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult from konabot.common.pager import PagerQuery, PagerResult
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.repo_local_data import local_poster from .subscribe_info import POSTER_INFO_DATA
from konabot.plugins.poster.repository import IPosterRepo from .repo_local_data import local_poster
from .repository import IPosterRepo
class PosterService: class PosterService:
@ -27,7 +28,9 @@ class PosterService:
channel = self.parse_channel_id(channel) channel = self.parse_channel_id(channel)
return await self.repo.remove_channel_target(channel, target) return await self.repo.remove_channel_target(channel, target)
async def broadcast(self, channel: str, message: UniMessage[Any] | str) -> list[LongTaskTarget]: async def broadcast(
self, channel: str, message: UniMessage[Any] | str
) -> list[LongTaskTarget]:
channel = self.parse_channel_id(channel) channel = self.parse_channel_id(channel)
targets = await self.repo.get_channel_targets(channel) targets = await self.repo.get_channel_targets(channel)
for target in targets: for target in targets:
@ -35,7 +38,9 @@ class PosterService:
await target.send_message(message, at=False) await target.send_message(message, at=False)
return targets return targets
async def get_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]: async def get_channels(
self, target: LongTaskTarget, pager: PagerQuery
) -> PagerResult[str]:
return await self.repo.get_subscribed_channels(target, pager) return await self.repo.get_subscribed_channels(target, pager)
async def fix_data(self): async def fix_data(self):
@ -56,4 +61,3 @@ async def broadcast(channel: str, message: UniMessage[Any] | str):
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)] DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]

View File

@ -4,7 +4,7 @@ from dataclasses import dataclass, field
@dataclass @dataclass
class PosterInfo: class PosterInfo:
aliases: set[str] = field(default_factory=set) aliases: set[str] = field(default_factory=set)
description: str = field(default='') description: str = field(default="")
POSTER_INFO_DATA: dict[str, PosterInfo] = {} POSTER_INFO_DATA: dict[str, PosterInfo] = {}
@ -12,4 +12,3 @@ POSTER_INFO_DATA: dict[str, PosterInfo] = {}
def register_poster_info(channel: str, info: PosterInfo): def register_poster_info(channel: str, info: PosterInfo):
POSTER_INFO_DATA[channel] = info POSTER_INFO_DATA[channel] = info

View File

@ -1,12 +1,14 @@
import re import re
from nonebot import get_plugin_config, on_message from nonebot import get_plugin_config, on_message
from nonebot.rule import Rule
from nonebot_plugin_alconna import Reference, Reply, UniMsg from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event from nonebot.adapters import Event
from nonebot.adapters.onebot.v11.event import GroupMessageEvent as OB11GroupEvent
from pydantic import BaseModel from pydantic import BaseModel
from konabot.common.permsys import require_permission
class Config(BaseModel): class Config(BaseModel):
bilifetch_enabled_groups: list[int] = [] bilifetch_enabled_groups: list[int] = []
@ -19,11 +21,7 @@ pattern = (
) )
def _rule(msg: UniMsg, evt: Event) -> bool: def _rule(msg: UniMsg) -> bool:
if isinstance(evt, OB11GroupEvent):
if evt.group_id not in config.bilifetch_enabled_groups:
return False
to_search = msg.exclude(Reply, Reference).dump(json=True) to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text() to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2): if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
@ -31,11 +29,11 @@ def _rule(msg: UniMsg, evt: Event) -> bool:
return True return True
matcher_fix = on_message(rule=_rule) matcher_fix = on_message(rule=Rule(_rule) & require_permission("bilifetch"))
@matcher_fix.handle() @matcher_fix.handle()
async def _(event: Event): async def _(event: Event):
from nonebot_plugin_analysis_bilibili import handle_analysis from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event) await handle_analysis(event)

View File

@ -70,7 +70,7 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
await target.send_message(res) await target.send_message(res)
return return
env = TextHandlerEnvironment(is_trusted=False) env = TextHandlerEnvironment(is_trusted=False, event=evt)
results = await runner.run_pipeline(res, istream or None, env) results = await runner.run_pipeline(res, istream or None, env)
# 检查是否有错误 # 检查是否有错误

View File

@ -7,11 +7,13 @@ from string import whitespace
from typing import cast from typing import cast
from loguru import logger from loguru import logger
from nonebot.adapters import Event
@dataclass @dataclass
class TextHandlerEnvironment: class TextHandlerEnvironment:
is_trusted: bool is_trusted: bool
event: Event | None = None
buffers: dict[str, str] = field(default_factory=dict) buffers: dict[str, str] = field(default_factory=dict)
@ -287,7 +289,7 @@ class PipelineRunner:
env: TextHandlerEnvironment | None = None, env: TextHandlerEnvironment | None = None,
) -> list[TextHandleResult]: ) -> list[TextHandleResult]:
if env is None: if env is None:
env = TextHandlerEnvironment(is_trusted=False, buffers={}) env = TextHandlerEnvironment(is_trusted=False, event=None, buffers={})
results: list[TextHandleResult] = [] results: list[TextHandleResult] = []

View File

@ -1,36 +1,51 @@
from typing import Any, cast from typing import Any, cast
from konabot.common.llm import get_llm from konabot.common.llm import get_llm
from konabot.plugins.handle_text.base import TextHandler, TextHandlerEnvironment, TextHandleResult from konabot.common.permsys import perm_manager
from konabot.plugins.handle_text.base import (
TextHandler,
TextHandlerEnvironment,
TextHandleResult,
)
class THQwen(TextHandler): class THQwen(TextHandler):
name = "qwen" name = "qwen"
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
pm = perm_manager()
if env.event is None or not pm.check_has_permission(env.event, "textfx.qwen"):
return TextHandleResult(
code=1,
ostream="这里暂未开启 AI 功能",
)
llm = get_llm() llm = get_llm()
messages = [] messages = []
if istream is not None: if istream is not None:
messages.append({ messages.append({"role": "user", "content": istream})
"role": "user",
"content": istream
})
if len(args) > 0: if len(args) > 0:
message = ' '.join(args) message = " ".join(args)
messages.append({ messages.append(
"role": "user", {
"content": message, "role": "user",
}) "content": message,
}
)
if len(messages) == 0: if len(messages) == 0:
return TextHandleResult( return TextHandleResult(
code=1, code=1,
ostream="使用方法qwen <提示词>", ostream="使用方法qwen <提示词>",
) )
messages = [{ messages = [
"role": "system", {
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答" "role": "system",
}] + messages "content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答",
}
] + messages
result = await llm.chat(cast(Any, messages)) result = await llm.chat(cast(Any, messages))
content = result.content content = result.content
if content is None: if content is None:

View File

@ -6,29 +6,34 @@ from loguru import logger
from nonebot import on_message from nonebot import on_message
import nonebot import nonebot
from nonebot.rule import to_me from nonebot.rule import to_me
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg, from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
on_alconna)
from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_apscheduler import scheduler
from konabot.common import username from konabot.common import username
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery from konabot.common.pager import PagerQuery
from konabot.plugins.kona_ph.core.message import (get_daily_report, from konabot.plugins.kona_ph.core.message import (
get_daily_report_v2, get_daily_report,
get_puzzle_description, get_daily_report_v2,
get_submission_message) get_puzzle_description,
get_submission_message,
)
from konabot.plugins.kona_ph.core.storage import get_today_date from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, from konabot.plugins.kona_ph.manager import (
create_admin_commands, PUZZLE_PAGE_SIZE,
puzzle_manager) create_admin_commands,
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info puzzle_manager,
from konabot.plugins.poster.service import broadcast )
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
create_admin_commands() create_admin_commands()
register_poster_info("每日谜题", info=PosterInfo( register_poster_info(
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"}, "每日谜题",
description="此方 BOT 每日谜题推送", info=PosterInfo(
)) aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
description="此方 BOT 每日谜题推送",
),
)
cmd_submit = on_message(rule=to_me()) cmd_submit = on_message(rule=to_me())
@ -44,16 +49,22 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
if isinstance(result, str): if isinstance(result, str):
await target.send_message(result) await target.send_message(result)
else: else:
await target.send_message(get_submission_message( await target.send_message(
daily_puzzle_info=result.info, get_submission_message(
submission=result.submission, daily_puzzle_info=result.info,
puzzle=result.puzzle, submission=result.submission,
)) puzzle=result.puzzle,
)
)
cmd_query = on_alconna(Alconna( cmd_query = on_alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)" Alconna(
), rule=to_me()) r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
),
rule=to_me(),
)
@cmd_query.handle() @cmd_query.handle()
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget):
@ -64,9 +75,8 @@ async def _(target: DepLongTaskTarget):
await target.send_message(get_puzzle_description(p)) await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna( cmd_query_submission = on_alconna(Alconna("今日答题情况"), rule=to_me())
"今日答题情况"
), rule=to_me())
@cmd_query_submission.handle() @cmd_query_submission.handle()
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget):
@ -77,11 +87,15 @@ async def _(target: DepLongTaskTarget):
await target.send_message(get_daily_report_v2(manager, gid)) await target.send_message(get_daily_report_v2(manager, gid))
cmd_history = on_alconna(Alconna( cmd_history = on_alconna(
"re:历史(题目|谜题)", Alconna(
Args["page?", int], "re:历史(题目|谜题)",
Args["index_id?", str], Args["page?", int],
), rule=to_me()) Args["index_id?", str],
),
rule=to_me(),
)
@cmd_history.handle() @cmd_history.handle()
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@ -105,10 +119,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True) puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages: if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text( return await target.send_message(
f"页数只有 1 ~ {count_pages} 啦!" UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)) )
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
for p, d in puzzles: for p, d in puzzles:
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]] info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
msg = msg.text( msg = msg.text(
@ -120,22 +134,26 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
await target.send_message(msg) await target.send_message(msg)
cmd_leadboard = on_alconna(Alconna( cmd_leadboard = on_alconna(
"re:此方(解谜|谜题)排行榜", Alconna(
Args["page?", int], "re:此方(解谜|谜题)排行榜",
)) Args["page?", int],
)
)
@cmd_leadboard.handle() @cmd_leadboard.handle()
async def _(target: DepLongTaskTarget, page: int = 1): async def _(target: DepLongTaskTarget, page: int = 1):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
result = manager.get_leadboard(PagerQuery(page, 10)) result = manager.get_leadboard(PagerQuery(page, 10))
await target.send_message(result.to_unimessage( await target.send_message(
title="此方解谜排行榜", result.to_unimessage(
formatter=lambda data: ( title="此方解谜排行榜",
f"{data[1]} 已完成 | " formatter=lambda data: (
f"{username.get_username(data[0])}" f"{data[1]} 已完成 | {username.get_username(data[0])}"
),
) )
)) )
@scheduler.scheduled_job("cron", hour="8") @scheduler.scheduled_job("cron", hour="8")
@ -155,4 +173,3 @@ async def _():
driver = nonebot.get_driver() driver = nonebot.get_driver()

View File

@ -1,50 +1,54 @@
import datetime import datetime
from math import ceil from math import ceil
from nonebot import get_plugin_config from nonebot.adapters import Event
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query, from nonebot_plugin_alconna import (
Subcommand, SubcommandResult, UniMessage, Alconna,
on_alconna) Args,
from pydantic import BaseModel Image,
Option,
Query,
Subcommand,
SubcommandResult,
UniMessage,
on_alconna,
)
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.permsys import DepPermManager, require_permission
from konabot.common.username import get_username from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list, from konabot.plugins.kona_ph.core.message import (
get_puzzle_info_message, get_puzzle_description,
get_submission_message) get_puzzle_hint_list,
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager, get_puzzle_info_message,
get_today_date, get_submission_message,
puzzle_manager) )
from konabot.plugins.poster.service import broadcast from konabot.plugins.kona_ph.core.storage import (
Puzzle,
PuzzleHint,
PuzzleManager,
get_today_date,
puzzle_manager,
)
from konabot.common.subscribe import broadcast
PUZZLE_PAGE_SIZE = 10 PUZZLE_PAGE_SIZE = 10
class PuzzleConfig(BaseModel): async def check_puzzle(
plugin_puzzle_manager: list[str] = [] manager: PuzzleManager,
plugin_puzzle_admin: list[str] = [] perm: DepPermManager,
plugin_puzzle_playgroup: list[str] = [] raw_id: str,
event: Event,
target: DepLongTaskTarget,
config = get_plugin_config(PuzzleConfig) ) -> Puzzle:
def is_puzzle_manager(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
if raw_id not in manager.puzzle_data: if raw_id not in manager.puzzle_data:
raise BotExceptionMessage("没有这个谜题") raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id] puzzle = manager.puzzle_data[raw_id]
if is_puzzle_admin(target): if await perm.check_has_permission(event, "konaph.admin"):
return puzzle return puzzle
if target.target_id != puzzle.author_id: if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题") raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
@ -60,7 +64,9 @@ def create_admin_commands():
Subcommand("unready", Args["raw_id", str], dest="unready"), Subcommand("unready", Args["raw_id", str], dest="unready"),
Subcommand("info", Args["raw_id", str], dest="info"), Subcommand("info", Args["raw_id", str], dest="info"),
Subcommand("my", Args["page?", int], dest="my"), Subcommand("my", Args["page?", int], dest="my"),
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"), Subcommand(
"all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"
),
Subcommand("pin", Args["raw_id?", str], dest="pin"), Subcommand("pin", Args["raw_id?", str], dest="pin"),
Subcommand("unpin", dest="unpin"), Subcommand("unpin", dest="unpin"),
Subcommand( Subcommand(
@ -115,11 +121,11 @@ def create_admin_commands():
dest="hint", dest="hint",
), ),
), ),
rule=is_puzzle_manager, rule=require_permission("konaph.manager"),
) )
@cmd_admin.assign("$main") @cmd_admin.assign("$main")
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event):
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n") msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
msg = msg.text("konaph create - 创建一个新的谜题\n") msg = msg.text("konaph create - 创建一个新的谜题\n")
msg = msg.text("konaph ready <id> - 准备好一道谜题\n") msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
@ -132,7 +138,7 @@ def create_admin_commands():
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n") msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n") msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target): if await pm.check_has_permission(event, "konaph.admin"):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n") msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
msg = msg.text("konaph pin - 查看当前置顶谜题\n") msg = msg.text("konaph pin - 查看当前置顶谜题\n")
msg = msg.text("konaph pin <id> - 置顶一个谜题\n") msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
@ -145,48 +151,54 @@ def create_admin_commands():
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzle = manager.admin_create_puzzle(target.target_id) puzzle = manager.admin_create_puzzle(target.target_id)
await target.send_message(UniMessage.text( await target.send_message(
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n" UniMessage.text(
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n" f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
f"- 输入 `konaph my` 查看你创建的谜题\n" f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
f"- 输入 `konaph modify` 查看更改谜题的方法" f"- 输入 `konaph my` 查看你创建的谜题\n"
)) f"- 输入 `konaph modify` 查看更改谜题的方法"
)
)
@cmd_admin.assign("ready") @cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if p.ready: if p.ready:
return await target.send_message(UniMessage.text( return await target.send_message(UniMessage.text("题目早就准备好啦!"))
"题目早就准备好啦!"
))
p.ready = True p.ready = True
await target.send_message(UniMessage.text( await target.send_message(
f"谜题「{p.title}」已经准备就绪!" UniMessage.text(f"谜题「{p.title}」已经准备就绪!")
)) )
@cmd_admin.assign("unready") @cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if not p.ready: if not p.ready:
return await target.send_message(UniMessage.text( return await target.send_message(
f"谜题「{p.title}」已经是未取消状态了!" UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!")
)) )
if manager.is_puzzle_published(p.raw_id): if manager.is_puzzle_published(p.raw_id):
return await target.send_message(UniMessage.text( return await target.send_message(
"已发布的谜题不能取消准备状态!" UniMessage.text("已发布的谜题不能取消准备状态!")
)) )
p.ready = False p.ready = False
await target.send_message(UniMessage.text( await target.send_message(
f"谜题「{p.title}」已经取消准备!" UniMessage.text(f"谜题「{p.title}」已经取消准备!")
)) )
@cmd_admin.assign("info") @cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_info_message(manager, p)) await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my") @cmd_admin.assign("my")
@ -194,15 +206,15 @@ def create_admin_commands():
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzles = manager.get_puzzles_of_user(target.target_id) puzzles = manager.get_puzzles_of_user(target.target_id)
if len(puzzles) == 0: if len(puzzles) == 0:
return await target.send_message(UniMessage.text( return await target.send_message(
"你没有谜题哦,使用 `konaph create` 创建一个吧!" UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!")
)) )
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages: if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text( return await target.send_message(
f"页数只有 1 ~ {count_pages} 啦!" UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)) )
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 我的谜题 ====\n\n") message = UniMessage.text("==== 我的谜题 ====\n\n")
for p in puzzles: for p in puzzles:
message = message.text("- ") message = message.text("- ")
@ -220,11 +232,15 @@ def create_admin_commands():
await target.send_message(message) await target.send_message(message)
@cmd_admin.assign("all") @cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1): async def _(
if not is_puzzle_admin(target): target: DepLongTaskTarget,
return await target.send_message(UniMessage.text( event: Event,
"你没有权限使用该指令" perm: DepPermManager,
)) ready: Query[bool] = Query("all.ready"),
page: int = 1,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()] puzzles = [*manager.puzzle_data.values()]
if ready.available: if ready.available:
@ -232,10 +248,10 @@ def create_admin_commands():
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True) puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages: if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text( return await target.send_message(
f"页数只有 1 ~ {count_pages} 啦!" UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)) )
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 所有谜题 ====\n\n") message = UniMessage.text("==== 所有谜题 ====\n\n")
for p in puzzles: for p in puzzles:
message = message.text("- ") message = message.text("- ")
@ -253,32 +269,30 @@ def create_admin_commands():
await target.send_message(message) await target.send_message(message)
@cmd_admin.assign("pin") @cmd_admin.assign("pin")
async def _(target: DepLongTaskTarget, raw_id: str = ""): async def _(
if not is_puzzle_admin(target): target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = ""
return await target.send_message(UniMessage.text( ):
"你没有权限使用该指令" if not perm.check_has_permission(event, "konaph.admin"):
)) return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if raw_id == "": if raw_id == "":
if manager.puzzle_pinned: if manager.puzzle_pinned:
return await target.send_message(UniMessage.text( return await target.send_message(
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}" UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}")
)) )
return await target.send_message("没有置顶谜题") return await target.send_message("没有置顶谜题")
if raw_id not in manager.unpublished_puzzles: if raw_id not in manager.unpublished_puzzles:
return await target.send_message(UniMessage.text( return await target.send_message(
"这个谜题已经发布了,或者还没准备好,或者不存在" UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在")
)) )
manager.admin_pin_puzzle(raw_id) manager.admin_pin_puzzle(raw_id)
return await target.send_message(f"已置顶谜题 {raw_id}") return await target.send_message(f"已置顶谜题 {raw_id}")
@cmd_admin.assign("unpin") @cmd_admin.assign("unpin")
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager):
if not is_puzzle_admin(target): if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text( return await target.send_message(UniMessage.text("你没有权限使用该指令"))
"你没有权限使用该指令"
))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
manager.admin_pin_puzzle("") manager.admin_pin_puzzle("")
return await target.send_message("已取消所有置顶") return await target.send_message("已取消所有置顶")
@ -286,6 +300,8 @@ def create_admin_commands():
@cmd_admin.assign("modify") @cmd_admin.assign("modify")
async def _( async def _(
target: DepLongTaskTarget, target: DepLongTaskTarget,
event: Event,
perm: DepPermManager,
raw_id: str = "", raw_id: str = "",
title: str | None = None, title: str | None = None,
description: str | None = None, description: str | None = None,
@ -306,7 +322,7 @@ def create_admin_commands():
image_manager = get_image_manager() image_manager = get_image_manager()
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if title is not None: if title is not None:
p.title = title p.title = title
if description is not None: if description is not None:
@ -329,11 +345,14 @@ def create_admin_commands():
return await target.send_message("修改好啦!看看效果:\n\n" + info2) return await target.send_message("修改好啦!看看效果:\n\n" + info2)
@cmd_admin.assign("publish") @cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None): async def _(
if not is_puzzle_admin(target): target: DepLongTaskTarget,
return await target.send_message(UniMessage.text( event: Event,
"你没有权限使用该指令" perm: DepPermManager,
)) raw_id: str | None = None,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
today = get_today_date() today = get_today_date()
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date: if today in manager.daily_puzzle_of_date:
@ -348,46 +367,64 @@ def create_admin_commands():
return await target.send_message("Ok!") return await target.send_message("Ok!")
@cmd_admin.assign("preview") @cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str): async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
return await target.send_message(get_puzzle_description(p)) return await target.send_message(get_puzzle_description(p))
@cmd_admin.assign("get-submits") @cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str): async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id) puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None: if puzzle is None:
return await target.send_message("没有这个谜题") return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id: if (
not perm.check_has_permission(event, "konaph.admin")
and target.target_id != puzzle.author_id
):
return await target.send_message("你没有权限预览这个谜题") return await target.send_message("你没有权限预览这个谜题")
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n") msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
submits = manager.submissions.get(raw_id, {}) submits = manager.submissions.get(raw_id, {})
for uid, ls in submits.items(): for uid, ls in submits.items():
s = ', '.join((i.flag for i in ls)) s = ", ".join((i.flag for i in ls))
msg = msg.text(f"- {get_username(uid)}{s}\n") msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg) return await target.send_message(msg)
@cmd_admin.assign("test") @cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str): async def _(
target: DepLongTaskTarget,
raw_id: str,
submission: str,
event: Event,
perm: DepPermManager,
):
""" """
测试一道谜题的回答,并给出结果 测试一道谜题的回答,并给出结果
""" """
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
result = p.check_submission(submission) result = p.check_submission(submission)
msg = get_submission_message(p, result) msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg) return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint") @cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")): async def _(
target: DepLongTaskTarget,
subcommands: Query[SubcommandResult] = Query("subcommands.hint"),
):
if len(subcommands.result.subcommands) > 0: if len(subcommands.result.subcommands) > 0:
return return
return await target.send_message( return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n") UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n") .text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n") .text(
"- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n"
)
.text("- konaph hint modify <id> <hint_id>\n") .text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n") .text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n") .text(" - --message <message>\n - 更改提示文本\n")
@ -402,9 +439,11 @@ def create_admin_commands():
raw_id: str, raw_id: str,
pattern: str, pattern: str,
message: str, message: str,
event: Event,
perm: DepPermManager,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
p.hints[p.hint_id_max + 1] = PuzzleHint( p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern, pattern=pattern,
message=message, message=message,
@ -416,9 +455,11 @@ def create_admin_commands():
async def _( async def _(
target: DepLongTaskTarget, target: DepLongTaskTarget,
raw_id: str, raw_id: str,
event: Event,
perm: DepPermManager,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_hint_list(p)) await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify") @cmd_admin.assign("subcommands.hint.modify")
@ -426,12 +467,14 @@ def create_admin_commands():
target: DepLongTaskTarget, target: DepLongTaskTarget,
raw_id: str, raw_id: str,
hint_id: int, hint_id: int,
event: Event,
perm: DepPermManager,
pattern: str | None = None, pattern: str | None = None,
message: str | None = None, message: str | None = None,
is_checkpoint: bool | None = None, is_checkpoint: bool | None = None,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints: if hint_id not in p.hints:
raise BotExceptionMessage( raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单" f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -450,9 +493,11 @@ def create_admin_commands():
target: DepLongTaskTarget, target: DepLongTaskTarget,
raw_id: str, raw_id: str,
hint_id: int, hint_id: int,
event: Event,
perm: DepPermManager,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints: if hint_id not in p.hints:
raise BotExceptionMessage( raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单" f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -460,5 +505,4 @@ def create_admin_commands():
del p.hints[hint_id] del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p)) await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin return cmd_admin

View File

@ -1,57 +0,0 @@
import asyncio
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import UniMessage
from konabot.common.nb.is_admin import is_admin
from mcstatus.responses import JavaStatusResponse
cmd = on_command("宾几人", aliases=set(("宾人数", "mcbingo")), rule=is_admin)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join((
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
))
await UniMessage.text(messages).finish(evt, at_sender=False)

View File

@ -0,0 +1,131 @@
import asyncio
import datetime
from typing import Literal
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from mcstatus.responses import JavaStatusResponse
from nonebot_plugin_apscheduler import scheduler
from konabot.common.permsys import DepPermManager, require_permission
from konabot.plugins.minecraft_servers.simpfun_server import SimpfunServer
cmd = on_command(
"宾几人",
aliases=set(("宾人数", "mcbingo")),
rule=require_permission("minecraft.bingo.check"),
)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event, pm: DepPermManager):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join(
(
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
)
)
if await pm.check_has_permission(evt, "minecraft.bingo.manipulate"):
messages += "\n\n---\n\n你可以使用 bingoman start 开启小帕的 bingo 服,用 bingoman stop 关闭小帕的 bingo 服"
await UniMessage.text(messages).finish(evt, at_sender=False)
cmd_bingo_manipulate = on_alconna(
Alconna("bingoman", Args["action", str]),
aliases=("宾服务器", "bingo服"),
rule=require_permission("minecraft.bingo.manipulate"),
)
actions: dict[str, Literal["start", "stop", "restart", "kill"]] = {
"up": "start",
"down": "stop",
"start": "start",
"stop": "stop",
"开机": "start",
"关机": "stop",
"restart": "restart",
"kill": "kill",
"重启": "restart",
}
@cmd_bingo_manipulate.handle()
async def _(action: str, event: Event):
server = SimpfunServer.new() # 使用默认配置管理服务器
a = actions.get(action.lower().strip())
if a is None:
await UniMessage.text(f"操作 {action} 不存在").send(event, at_sender=True)
return
resp = await server.power(a)
if resp.code == 200:
await UniMessage.text("好了").send(event, at_sender=True)
else:
await UniMessage.text(f"不好:{resp}").send(event, at_sender=True)
@scheduler.scheduled_job("cron", hour="4,23")
async def _():
server = SimpfunServer.new()
today = datetime.datetime.now()
# 获取服务器当前状态,重试多次以保证不会误判服务器未开启
server_up = False
server_players = 0
for _ in range(3):
mcs = mcstatus.JavaServer("play.simpfun.cn", 11495)
try:
resp = await mcs.async_status()
server_up = True
server_players = resp.players.online
except Exception:
pass
if today.weekday() == 5 and today.hour < 12:
# 每周六开机一天,保证可以让服务器不被自动销毁
if not server_up:
await server.power("start")
else:
# 每用一个自然日都会计费,所以要赶在这一天结束之前关服
# 平时如果没人,也自动关上
if server_up and server_players == 0:
await server.power("stop")

View File

@ -0,0 +1,90 @@
from dataclasses import dataclass
import datetime
from typing import Literal
import aiohttp
from pydantic import BaseModel
class SimpfunServerConfig(BaseModel):
plugin_simpfun_api_key: str = ""
plugin_simpfun_base_url: str = "https://api.simpfun.cn"
plugin_simpfun_instance_id: int = 0
def get_config():
from nonebot import get_plugin_config
return get_plugin_config(SimpfunServerConfig)
class PowerManageResult(BaseModel):
code: int
status: bool
msg: str
class SimpfunServerDetailUtilization(BaseModel):
memory_bytes: int
cpu_absolute: float
disk_bytes: int
network_rx_bytes: int
network_tx_bytes: int
uptime: float
disk_last_check_time: datetime.datetime
class SimpfunServerDetailData(BaseModel):
id: int
name: str
is_pro: bool
status: str
"运行中的话,是 running"
is_suspended: bool
utilization: SimpfunServerDetailUtilization
class SimpfunServerDetailResp(BaseModel):
code: int
data: SimpfunServerDetailData
@dataclass
class SimpfunServer:
instance_id: int
api_key: str
base_url: str
async def power(
self, action: Literal["start", "stop", "restart", "kill"]
) -> PowerManageResult:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url, params={"action": action}) as resp:
resp.raise_for_status()
return PowerManageResult.model_validate_json(await resp.read())
async def detail(self) -> SimpfunServerDetailResp:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url) as resp:
resp.raise_for_status()
return SimpfunServerDetailResp.model_validate_json(await resp.read())
@staticmethod
def new(config: SimpfunServerConfig | None = None):
if config is None:
config = get_config()
return SimpfunServer(
instance_id=config.plugin_simpfun_instance_id,
api_key=config.plugin_simpfun_api_key,
base_url=config.plugin_simpfun_base_url,
)

View File

@ -6,6 +6,7 @@ from konabot.common.nb.match_keyword import match_keyword
evt_nya = on_message(rule=match_keyword("")) evt_nya = on_message(rule=match_keyword(""))
@evt_nya.handle() @evt_nya.handle()
async def _(): async def _():
await evt_nya.send(await UniMessage().text("").export()) await evt_nya.send(await UniMessage().text("").export())
@ -25,8 +26,9 @@ NYA_SYMBOL_MAPPING = {
"~": "~", "~": "~",
"": "", "": "",
" ": " ", " ": " ",
"\n": "\n",
} }
NYA_SYMBOL_KEEP = "—¹₁²₂³₃⁴₄⁵₅⁶₆⁷₇⁸₈⁹₉⁰₀\n"
NYA_SYMBOL_MAPPING.update((k, k) for k in NYA_SYMBOL_KEEP)
async def has_nya(msg: UniMsg) -> bool: async def has_nya(msg: UniMsg) -> bool:
@ -49,10 +51,10 @@ async def has_nya(msg: UniMsg) -> bool:
evt_nya_v2 = on_message(rule=has_nya) evt_nya_v2 = on_message(rule=has_nya)
@evt_nya_v2.handle() @evt_nya_v2.handle()
async def _(msg: UniMsg, evt: Event): async def _(msg: UniMsg, evt: Event):
text = msg.extract_plain_text() text = msg.extract_plain_text()
await UniMessage.text(''.join( await UniMessage.text("".join((NYA_SYMBOL_MAPPING.get(c, "") for c in text))).send(
(NYA_SYMBOL_MAPPING.get(c, '') for c in text) evt
)).send(evt) )

View File

@ -3,14 +3,15 @@ from nonebot_plugin_alconna import Alconna, Args, on_alconna
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery from konabot.common.pager import PagerQuery
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA from konabot.common.subscribe import POSTER_INFO_DATA, dep_poster_service
from konabot.plugins.poster.service import dep_poster_service
cmd_subscribe = on_alconna(Alconna( cmd_subscribe = on_alconna(
"订阅", Alconna(
Args["channel", str], "订阅",
)) Args["channel", str],
)
)
@cmd_subscribe.handle() @cmd_subscribe.handle()
@ -23,10 +24,12 @@ async def _(target: DepLongTaskTarget, channel: str):
await target.send_message(f"已经订阅过「{channel}」了") await target.send_message(f"已经订阅过「{channel}」了")
cmd_list = on_alconna(Alconna( cmd_list = on_alconna(
"re:(?:查询|我的|获取)订阅(列表)?", Alconna(
Args["page?", int], "re:(?:查询|我的|获取)订阅(列表)?",
)) Args["page?", int],
)
)
def better_channel_message(channel_id: str) -> str: def better_channel_message(channel_id: str) -> str:
@ -39,17 +42,24 @@ def better_channel_message(channel_id: str) -> str:
@cmd_list.handle() @cmd_list.handle()
async def _(target: DepLongTaskTarget, page: int = 1): async def _(target: DepLongTaskTarget, page: int = 1):
async with dep_poster_service() as service: async with dep_poster_service() as service:
result = await service.get_channels(target, PagerQuery( result = await service.get_channels(
page_index=page, target,
page_size=10, PagerQuery(
)) page_index=page,
await target.send_message(result.to_unimessage(title="订阅列表", formatter=better_channel_message)) page_size=10,
),
)
await target.send_message(
result.to_unimessage(title="订阅列表", formatter=better_channel_message)
)
cmd_list_available = on_alconna(Alconna( cmd_list_available = on_alconna(
"re:(查询)?可用订阅(列表)?", Alconna(
Args["page?", int], "re:(查询)?可用订阅(列表)?",
)) Args["page?", int],
)
)
@cmd_list_available.handle() @cmd_list_available.handle()
@ -58,13 +68,17 @@ async def _(target: DepLongTaskTarget, page: int = 1):
page_index=page, page_index=page,
page_size=10, page_size=10,
).apply(sorted(POSTER_INFO_DATA.keys())) ).apply(sorted(POSTER_INFO_DATA.keys()))
await target.send_message(result.to_unimessage(title="可用订阅列表", formatter=better_channel_message)) await target.send_message(
result.to_unimessage(title="可用订阅列表", formatter=better_channel_message)
)
cmd_unsubscribe = on_alconna(Alconna( cmd_unsubscribe = on_alconna(
"取消订阅", Alconna(
Args["channel", str], "取消订阅",
)) Args["channel", str],
)
)
@cmd_unsubscribe.handle() @cmd_unsubscribe.handle()
@ -79,6 +93,7 @@ async def _(target: DepLongTaskTarget, channel: str):
driver = nonebot.get_driver() driver = nonebot.get_driver()
@driver.on_startup @driver.on_startup
async def _(): async def _():
async with dep_poster_service() as service: async with dep_poster_service() as service:

View File

@ -4,8 +4,7 @@ from nonebot.internal.adapter.event import Event
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_apscheduler import scheduler
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
from konabot.plugins.poster.service import broadcast
register_poster_info( register_poster_info(
"二十四节气", "二十四节气",
@ -98,4 +97,3 @@ async def _(event: Event):
msg = UniMessage.text(f"现在的节气是{date.term}") msg = UniMessage.text(f"现在的节气是{date.term}")
await msg.send(event) await msg.send(event)

View File

@ -1,20 +1,23 @@
import asyncio import asyncio
from nonebot import get_driver from nonebot import get_driver
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from konabot.plugins.poster.poster_info import register_poster_info, PosterInfo from konabot.common.subscribe import register_poster_info, PosterInfo, broadcast
from konabot.plugins.poster.service import broadcast
CHANNEL_STARTUP = "启动通知" CHANNEL_STARTUP = "启动通知"
register_poster_info(CHANNEL_STARTUP, PosterInfo( register_poster_info(
aliases=set(), CHANNEL_STARTUP,
description="当 Bot 重启时告知", PosterInfo(
)) aliases=set(),
description="当 Bot 重启时告知",
),
)
driver = get_driver() driver = get_driver()
@driver.on_startup @driver.on_startup
async def _(): async def _():
# 要尽量保证接受讯息的服务存在 # 要尽量保证接受讯息的服务存在
@ -30,4 +33,3 @@ async def _():
await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了")) await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了"))
asyncio.create_task(task()) asyncio.create_task(task())

View File

@ -0,0 +1,82 @@
import datetime
from nonebot_plugin_alconna import UniMessage
from konabot.common.apis.wolfx import CencEewReport, wolfx_api
from konabot.common.subscribe import PosterInfo, broadcast, register_poster_info
provinces_short = [
"北京",
"天津",
"河北",
"山西",
"内蒙古",
"辽宁",
"吉林",
"黑龙江",
"上海",
"江苏",
"浙江",
"安徽",
"福建",
"江西",
"山东",
"河南",
"湖北",
"湖南",
"广东",
"广西",
"海南",
"重庆",
"四川",
"贵州",
"云南",
"西藏",
"陕西",
"甘肃",
"青海",
"宁夏",
"新疆",
"香港",
"澳门",
"台湾",
]
register_poster_info(
"中国地震台网地震速报",
PosterInfo(
aliases={
"地震速报",
"地震预警",
},
description="来自中国地震台网的地震速报",
),
)
@wolfx_api.cenc_eew.append
async def broadcast_eew(report: CencEewReport):
is_cn = any(report.HypoCenter.startswith(prefix) for prefix in provinces_short)
if (is_cn and report.Magnitude >= 4.2) or ((not is_cn) and report.Magnitude >= 7.0):
# 这是中国地震台网网站上,会默认展示的地震信息的等级
origin_time_dt = datetime.datetime.strptime(
report.OriginTime, "%Y-%m-%d %H:%M:%S"
)
origin_time_str = (
f"{origin_time_dt.month}"
f"{origin_time_dt.day}"
f"{origin_time_dt.hour}"
f"{origin_time_dt.minute}"
)
eid_in_link = report.EventID.split(".")[0]
link = f"https://www.cenc.ac.cn/earthquake-manage-publish-web/product-list/{eid_in_link}/summarize"
msg = UniMessage.text(
"据中国地震台网中心 (https://www.cenc.ac.cn/) 报道,"
f"北京时间{origin_time_str}"
f"{report.HypoCenter}发生{report.Magnitude:.1f}级地震。"
f"震源位于 {report.Longitude}° {report.Latitude}°,深度 {report.Depth}km。\n\n"
f"详细信息请见 {link}"
)
await broadcast("中国地震台网地震速报", msg)

25
poetry.lock generated
View File

@ -4067,6 +4067,29 @@ type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple" url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors" reference = "mirrors"
[[package]]
name = "pytest-mock"
version = "3.15.1"
description = "Thin-wrapper around the mock package for easier use with pytest"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d"},
{file = "pytest_mock-3.15.1.tar.gz", hash = "sha256:1849a238f6f396da19762269de72cb1814ab44416fa73a8686deac10b0d87a0f"},
]
[package.dependencies]
pytest = ">=6.2.5"
[package.extras]
dev = ["pre-commit", "pytest-asyncio", "tox"]
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]] [[package]]
name = "python-dotenv" name = "python-dotenv"
version = "1.2.2" version = "1.2.2"
@ -5311,4 +5334,4 @@ reference = "mirrors"
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.12,<4.0" python-versions = ">=3.12,<4.0"
content-hash = "f2d5345d93636e19e49852af636d598c88394aaef4020f383402394b58452e3d" content-hash = "23d2eadd1c36d017ff77934bd02b56e37d26005e6a99793b623c01162b90d67d"

View File

@ -37,6 +37,8 @@ dependencies = [
"pytest (>=8.0.0,<9.0.0)", "pytest (>=8.0.0,<9.0.0)",
"nonebug (>=0.4.3,<0.5.0)", "nonebug (>=0.4.3,<0.5.0)",
"pytest-cov (>=7.0.0,<8.0.0)", "pytest-cov (>=7.0.0,<8.0.0)",
"aiosignal (>=1.4.0,<2.0.0)",
"pytest-mock (>=3.15.1,<4.0.0)",
] ]
[tool.poetry] [tool.poetry]

View File

@ -12,8 +12,22 @@ def filter(change: Change, path: str) -> bool:
return False return False
if Path(path).absolute().is_relative_to((base / ".git").absolute()): if Path(path).absolute().is_relative_to((base / ".git").absolute()):
return False return False
if Path(path).absolute().is_relative_to((base / "assets" / "oracle" / "image").absolute()): if (
Path(path)
.absolute()
.is_relative_to((base / "assets" / "oracle" / "image").absolute())
):
# 还要解决坏枪的这个问题 # 还要解决坏枪的这个问题
return False return False
if Path(path).absolute().is_relative_to((base / "htmlcov").absolute()):
return False
if Path(path).absolute().is_relative_to((base / "test").absolute()):
return False
if Path(path).absolute().is_relative_to((base / ".pytest_cache").absolute()):
return False
if Path(path).absolute().is_relative_to((base / ".ruff_cache").absolute()):
return False
if path.endswith(".coverage"):
return False
print(path) print(path)
return True return True

View File

@ -0,0 +1,78 @@
import json
from unittest.mock import AsyncMock
import pytest
from konabot.common.apis.wolfx import CencEewReport, WolfxAPIService, WolfxWebSocket
obj_example = {
"ID": "bacby4yab1oyb",
"EventID": "202603100805.0001",
"ReportTime": "2026-03-10 08:05:29",
"ReportNum": 1,
"OriginTime": "2026-03-10 08:05:29",
"HypoCenter": "新疆昌吉州呼图壁县",
"Latitude": 43.687,
"Longitude": 86.427,
"Magnitude": 4.0,
"Depth": 14,
"MaxIntensity": 5,
}
@pytest.mark.asyncio
async def test_wolfx_websocket_handle():
ws = WolfxWebSocket("")
mock_callback = AsyncMock()
ws.signal.append(mock_callback)
ws.signal.freeze()
obj1 = {
"type": "heartbeat",
"ver": 18,
"id": "a69edf6436c5b605",
"timestamp": 1773111106701,
}
data1 = json.dumps(obj1).encode()
await ws.handle(data1)
mock_callback.assert_not_called()
mock_callback.reset_mock()
obj2 = obj_example
data2 = json.dumps(obj2).encode()
await ws.handle(data2)
mock_callback.assert_called_once_with(data2)
mock_callback.reset_mock()
data3 = b"what the f"
await ws.handle(data3)
mock_callback.assert_not_called()
@pytest.mark.asyncio
async def test_wolfx_bind_pydantic():
sv = WolfxAPIService()
called: list[CencEewReport] = []
@sv.cenc_eew.append
async def _(data: CencEewReport):
called.append(data)
sv._cenc_eew_ws.signal.freeze()
sv.cenc_eew.freeze()
data = json.dumps(obj_example).encode()
await sv._cenc_eew_ws.signal.send(data)
assert len(called) == 1
data = called[0]
assert data.HypoCenter == obj_example["HypoCenter"]
assert data.EventID == obj_example["EventID"]
# Don't panic when the object is invalid
data = json.dumps({"type": ""}).encode()
await sv._cenc_eew_ws.signal.send(data)
assert len(called) == 1