diff --git a/bot.py b/bot.py index 446a4fe..84d07b3 100644 --- a/bot.py +++ b/bot.py @@ -7,6 +7,7 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter from nonebot.adapters.minecraft import Adapter as MinecraftAdapter from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter +from konabot.common.appcontext import run_afterinit_functions from konabot.common.log import init_logger from konabot.common.nb.exc import BotExceptionMessage from konabot.common.path import LOG_PATH @@ -56,9 +57,7 @@ def main(): nonebot.load_plugins("konabot/plugins") nonebot.load_plugin("nonebot_plugin_analysis_bilibili") - from konabot.common import permsys - - permsys.create_startup() + run_afterinit_functions() # 注册关闭钩子 @driver.on_shutdown diff --git a/konabot/common/apis/wolfx.py b/konabot/common/apis/wolfx.py new file mode 100644 index 0000000..e41d099 --- /dev/null +++ b/konabot/common/apis/wolfx.py @@ -0,0 +1,233 @@ +""" +Wolfx 防灾免费 API +""" + +import asyncio +import json +from typing import TypeVar, cast +import aiohttp +from aiosignal import Signal +from loguru import logger +from pydantic import BaseModel +import pydantic + +from konabot.common.appcontext import after_init + + +class ScEewReport(BaseModel): + """ + 四川地震局报文 + """ + + ID: str + "EEW 发报 ID" + + EventID: str + "EEW 发报事件 ID" + + ReportTime: str + "EEW 发报时间(UTC+8)" + + ReportNum: int + "EEW 发报数" + + OriginTime: str + "发震时间(UTC+8)" + + HypoCenter: str + "震源地" + + Latitude: float + "震源地纬度" + + Longitude: float + "震源地经度" + + Magnitude: float + "震级" + + Depth: float | None + "震源深度" + + MaxIntensity: float + "最大烈度" + + +class CencEewReport(BaseModel): + """ + 中国地震台网报文 + """ + + ID: str + "EEW 发报 ID" + + EventID: str + "EEW 发报事件 ID" + + ReportTime: str + "EEW 发报时间(UTC+8)" + + ReportNum: int + "EEW 发报数" + + OriginTime: str + "发震时间(UTC+8)" + + HypoCenter: str + "震源地" + + Latitude: float + "震源地纬度" + + Longitude: float + "震源地经度" + + Magnitude: float + "震级" + + Depth: float | None + "震源深度" + + MaxIntensity: float + "最大烈度" + + +class WolfxWebSocket: + def __init__(self, url: str) -> None: + self.url = url + self.signal: Signal[bytes] = Signal(self) + self._running = False + self._task: asyncio.Task | None = None + self._session: aiohttp.ClientSession | None = None + self._ws: aiohttp.ClientWebSocketResponse | None = None + + @property + def session(self) -> aiohttp.ClientSession: # pragma: no cover + assert self._session is not None + return self._session + + async def start(self): # pragma: no cover + if self._running: + return + self._running = True + self._session = aiohttp.ClientSession() + self._task = asyncio.create_task(self._run()) + self.signal.freeze() + + async def stop(self): # pragma: no cover + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + if self._session: + await self._session.close() + + async def _run(self): # pragma: no cover + retry_delay = 1 + + while self._running: + try: + async with self.session.ws_connect(self.url) as ws: + self._ws = ws + logger.info(f"Wolfx API 服务连接上了 {self.url} 的 WebSocket") + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + await self.handle(cast(str, msg.data).encode()) + elif msg.type == aiohttp.WSMsgType.BINARY: + await self.handle(cast(bytes, msg.data)) + elif msg.type == aiohttp.WSMsgType.CLOSED: + break + elif msg.type == aiohttp.WSMsgType.ERROR: + break + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + logger.warning("连接 WebSocket 时发生错误") + logger.exception(e) + except asyncio.CancelledError: + break + except Exception as e: + logger.error("Wolfx API 发生未知错误") + logger.exception(e) + self._ws = None + + if self._running: + logger.info(f"Wolfx API 准备断线重连 {self.url}") + await asyncio.sleep(retry_delay) + retry_delay = min(retry_delay * 2, 60) + + async def handle(self, data: bytes): + try: + obj = json.loads(data) + except json.JSONDecodeError as e: + logger.warning("解析 Wolfs API 时出错") + logger.exception(e) + return + + if obj.get("type") == "heartbeat" or obj.get("type") == "pong": + logger.debug(f"Wolfx API 收到了来自 {self.url} 的心跳: {obj}") + else: + await self.signal.send(data) + + +T = TypeVar("T", bound=BaseModel) + + +class WolfxAPIService: + sc_eew: Signal[ScEewReport] + "四川地震局地震速报" + + cenc_eew: Signal[CencEewReport] + "中国地震台网地震速报" + + def __init__(self) -> None: + self.sc_eew = Signal(self) + self._sc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/sc_eew") + WolfxAPIService.bind(self.sc_eew, self._sc_eew_ws, ScEewReport) + + self.cenc_eew = Signal(self) + self._cenc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eew") + WolfxAPIService.bind(self.cenc_eew, self._cenc_eew_ws, CencEewReport) + + @staticmethod + def bind(signal: Signal[T], ws: WolfxWebSocket, t: type[T]): + @ws.signal.append + async def _(data: bytes): + try: + obj = t.model_validate_json(data) + logger.info(f"接收到来自 Wolfx API 的信息:{data}") + await signal.send(obj) + except pydantic.ValidationError as e: + logger.warning(f"解析 Wolfx API 时出错 URL={ws.url}") + logger.error(e) + + async def start(self): # pragma: no cover + self.cenc_eew.freeze() + self.sc_eew.freeze() + async with asyncio.TaskGroup() as task_group: + task_group.create_task(self._cenc_eew_ws.start()) + task_group.create_task(self._sc_eew_ws.start()) + + async def stop(self): # pragma: no cover + async with asyncio.TaskGroup() as task_group: + task_group.create_task(self._cenc_eew_ws.stop()) + task_group.create_task(self._sc_eew_ws.stop()) + + +wolfx_api = WolfxAPIService() + + +@after_init +def init(): # pragma: no cover + import nonebot + + driver = nonebot.get_driver() + + @driver.on_startup + async def _(): + await wolfx_api.start() + + @driver.on_shutdown + async def _(): + await wolfx_api.stop() diff --git a/konabot/common/appcontext.py b/konabot/common/appcontext.py new file mode 100644 index 0000000..411303d --- /dev/null +++ b/konabot/common/appcontext.py @@ -0,0 +1,15 @@ +from typing import Any, Callable + + +AFTER_INIT_FUNCTION = Callable[[], Any] + +_after_init_functions: list[AFTER_INIT_FUNCTION] = [] + + +def after_init(func: AFTER_INIT_FUNCTION): + _after_init_functions.append(func) + + +def run_afterinit_functions(): # pragma: no cover + for f in _after_init_functions: + f() diff --git a/konabot/common/permsys/__init__.py b/konabot/common/permsys/__init__.py index 274dbeb..b660c1b 100644 --- a/konabot/common/permsys/__init__.py +++ b/konabot/common/permsys/__init__.py @@ -4,6 +4,7 @@ from nonebot.adapters import Event from nonebot.params import Depends from nonebot.rule import Rule +from konabot.common.appcontext import after_init from konabot.common.database import DatabaseManager from konabot.common.pager import PagerQuery from konabot.common.path import DATA_PATH @@ -73,6 +74,7 @@ def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma: return PermManager(_db) +@after_init def create_startup(): # pragma: no cover from konabot.common.nb.is_admin import cfg diff --git a/konabot/plugins/wolfx_eew.py b/konabot/plugins/wolfx_eew.py new file mode 100644 index 0000000..8d353ca --- /dev/null +++ b/konabot/plugins/wolfx_eew.py @@ -0,0 +1,82 @@ +import datetime +from nonebot_plugin_alconna import UniMessage +from konabot.common.apis.wolfx import CencEewReport, wolfx_api +from konabot.common.subscribe import PosterInfo, broadcast, register_poster_info + + +provinces_short = [ + "北京", + "天津", + "河北", + "山西", + "内蒙古", + "辽宁", + "吉林", + "黑龙江", + "上海", + "江苏", + "浙江", + "安徽", + "福建", + "江西", + "山东", + "河南", + "湖北", + "湖南", + "广东", + "广西", + "海南", + "重庆", + "四川", + "贵州", + "云南", + "西藏", + "陕西", + "甘肃", + "青海", + "宁夏", + "新疆", + "香港", + "澳门", + "台湾", +] + + +register_poster_info( + "中国地震台网地震速报", + PosterInfo( + aliases={ + "地震速报", + "地震预警", + }, + description="来自中国地震台网的地震速报", + ), +) + + +@wolfx_api.cenc_eew.append +async def broadcast_eew(report: CencEewReport): + is_cn = any(report.HypoCenter.startswith(prefix) for prefix in provinces_short) + if (is_cn and report.Magnitude >= 4.2) or ((not is_cn) and report.Magnitude >= 7.0): + # 这是中国地震台网网站上,会默认展示的地震信息的等级 + origin_time_dt = datetime.datetime.strptime( + report.OriginTime, "%Y-%m-%d %H:%M:%S" + ) + origin_time_str = ( + f"{origin_time_dt.month}月" + f"{origin_time_dt.day}日" + f"{origin_time_dt.hour}时" + f"{origin_time_dt.minute}分" + ) + + eid_in_link = report.EventID.split(".")[0] + link = f"https://www.cenc.ac.cn/earthquake-manage-publish-web/product-list/{eid_in_link}/summarize" + + msg = UniMessage.text( + "据中国地震台网中心 (https://www.cenc.ac.cn/) 报道," + f"北京时间{origin_time_str}," + f"{report.HypoCenter}发生{report.Magnitude:.1f}级地震。" + f"震源位于 {report.Longitude}° {report.Latitude}°,深度 {report.Depth}km。\n\n" + f"详细信息请见 {link}" + ) + await broadcast("中国地震台网地震速报", msg) diff --git a/poetry.lock b/poetry.lock index 16d8e2f..267df3e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -4067,6 +4067,29 @@ type = "legacy" url = "https://pypi.tuna.tsinghua.edu.cn/simple" reference = "mirrors" +[[package]] +name = "pytest-mock" +version = "3.15.1" +description = "Thin-wrapper around the mock package for easier use with pytest" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d"}, + {file = "pytest_mock-3.15.1.tar.gz", hash = "sha256:1849a238f6f396da19762269de72cb1814ab44416fa73a8686deac10b0d87a0f"}, +] + +[package.dependencies] +pytest = ">=6.2.5" + +[package.extras] +dev = ["pre-commit", "pytest-asyncio", "tox"] + +[package.source] +type = "legacy" +url = "https://pypi.tuna.tsinghua.edu.cn/simple" +reference = "mirrors" + [[package]] name = "python-dotenv" version = "1.2.2" @@ -5311,4 +5334,4 @@ reference = "mirrors" [metadata] lock-version = "2.1" python-versions = ">=3.12,<4.0" -content-hash = "f2d5345d93636e19e49852af636d598c88394aaef4020f383402394b58452e3d" +content-hash = "23d2eadd1c36d017ff77934bd02b56e37d26005e6a99793b623c01162b90d67d" diff --git a/pyproject.toml b/pyproject.toml index 61f0558..7e5dfc4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,8 @@ dependencies = [ "pytest (>=8.0.0,<9.0.0)", "nonebug (>=0.4.3,<0.5.0)", "pytest-cov (>=7.0.0,<8.0.0)", + "aiosignal (>=1.4.0,<2.0.0)", + "pytest-mock (>=3.15.1,<4.0.0)", ] [tool.poetry] diff --git a/scripts/watch_filter.py b/scripts/watch_filter.py index 9554e41..669fef3 100644 --- a/scripts/watch_filter.py +++ b/scripts/watch_filter.py @@ -12,8 +12,22 @@ def filter(change: Change, path: str) -> bool: return False if Path(path).absolute().is_relative_to((base / ".git").absolute()): return False - if Path(path).absolute().is_relative_to((base / "assets" / "oracle" / "image").absolute()): + if ( + Path(path) + .absolute() + .is_relative_to((base / "assets" / "oracle" / "image").absolute()) + ): # 还要解决坏枪的这个问题 return False + if Path(path).absolute().is_relative_to((base / "htmlcov").absolute()): + return False + if Path(path).absolute().is_relative_to((base / "test").absolute()): + return False + if Path(path).absolute().is_relative_to((base / ".pytest_cache").absolute()): + return False + if Path(path).absolute().is_relative_to((base / ".ruff_cache").absolute()): + return False + if path.endswith(".coverage"): + return False print(path) return True diff --git a/tests/services/test_wolfx_api.py b/tests/services/test_wolfx_api.py new file mode 100644 index 0000000..4227d93 --- /dev/null +++ b/tests/services/test_wolfx_api.py @@ -0,0 +1,78 @@ +import json +from unittest.mock import AsyncMock +import pytest + +from konabot.common.apis.wolfx import CencEewReport, WolfxAPIService, WolfxWebSocket + + +obj_example = { + "ID": "bacby4yab1oyb", + "EventID": "202603100805.0001", + "ReportTime": "2026-03-10 08:05:29", + "ReportNum": 1, + "OriginTime": "2026-03-10 08:05:29", + "HypoCenter": "新疆昌吉州呼图壁县", + "Latitude": 43.687, + "Longitude": 86.427, + "Magnitude": 4.0, + "Depth": 14, + "MaxIntensity": 5, +} + + +@pytest.mark.asyncio +async def test_wolfx_websocket_handle(): + ws = WolfxWebSocket("") + + mock_callback = AsyncMock() + ws.signal.append(mock_callback) + ws.signal.freeze() + + obj1 = { + "type": "heartbeat", + "ver": 18, + "id": "a69edf6436c5b605", + "timestamp": 1773111106701, + } + data1 = json.dumps(obj1).encode() + await ws.handle(data1) + mock_callback.assert_not_called() + mock_callback.reset_mock() + + obj2 = obj_example + data2 = json.dumps(obj2).encode() + await ws.handle(data2) + mock_callback.assert_called_once_with(data2) + mock_callback.reset_mock() + + data3 = b"what the f" + await ws.handle(data3) + mock_callback.assert_not_called() + + +@pytest.mark.asyncio +async def test_wolfx_bind_pydantic(): + sv = WolfxAPIService() + called: list[CencEewReport] = [] + + @sv.cenc_eew.append + async def _(data: CencEewReport): + called.append(data) + + sv._cenc_eew_ws.signal.freeze() + sv.cenc_eew.freeze() + + data = json.dumps(obj_example).encode() + await sv._cenc_eew_ws.signal.send(data) + + assert len(called) == 1 + data = called[0] + + assert data.HypoCenter == obj_example["HypoCenter"] + assert data.EventID == obj_example["EventID"] + + # Don't panic when the object is invalid + data = json.dumps({"type": "给"}).encode() + await sv._cenc_eew_ws.signal.send(data) + + assert len(called) == 1