改为使用中国地震台网的正式报
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-03-10 21:58:42 +08:00
parent 8f5061ba41
commit 265e9cc583
2 changed files with 101 additions and 20 deletions

View File

@ -4,11 +4,11 @@ Wolfx 防灾免费 API
import asyncio
import json
from typing import TypeVar, cast
from typing import Literal, TypeVar, cast
import aiohttp
from aiosignal import Signal
from loguru import logger
from pydantic import BaseModel
from pydantic import BaseModel, RootModel
import pydantic
from konabot.common.appcontext import after_init
@ -92,6 +92,39 @@ class CencEewReport(BaseModel):
"最大烈度"
class CencEqReport(BaseModel):
type: Literal["automatic", "reviewed"]
"报告类型,自动报还是正式报"
EventID: str
"事件 ID"
time: str
"UTC+8 格式的地震发生时间"
location: str
"地震发生位置"
magnitude: str
"震级"
depth: str
"地震深度"
latitude: str
"纬度"
longtitude: str
"经度"
intensity: str
"烈度"
class CencEqlist(RootModel):
root: dict[str, CencEqReport]
class WolfxWebSocket:
def __init__(self, url: str) -> None:
self.url = url
@ -181,6 +214,9 @@ class WolfxAPIService:
cenc_eew: Signal[CencEewReport]
"中国地震台网地震速报"
cenc_eqlist: Signal[CencEqReport]
"中国地震台网地震信息发布"
def __init__(self) -> None:
self.sc_eew = Signal(self)
self._sc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/sc_eew")
@ -190,6 +226,10 @@ class WolfxAPIService:
self._cenc_eew_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eew")
WolfxAPIService.bind(self.cenc_eew, self._cenc_eew_ws, CencEewReport)
self.cenc_eqlist = Signal(self)
self._cenc_eqlist_ws = WolfxWebSocket("wss://ws-api.wolfx.jp/cenc_eqlist")
WolfxAPIService.bind(self.cenc_eqlist, self._cenc_eqlist_ws, CencEqReport)
@staticmethod
def bind(signal: Signal[T], ws: WolfxWebSocket, t: type[T]):
@ws.signal.append
@ -205,14 +245,22 @@ class WolfxAPIService:
async def start(self): # pragma: no cover
self.cenc_eew.freeze()
self.sc_eew.freeze()
self.cenc_eqlist.freeze()
async with asyncio.TaskGroup() as task_group:
task_group.create_task(self._cenc_eew_ws.start())
task_group.create_task(self._sc_eew_ws.start())
if len(self.cenc_eew) > 0:
task_group.create_task(self._cenc_eew_ws.start())
if len(self.sc_eew) > 0:
task_group.create_task(self._sc_eew_ws.start())
if len(self.cenc_eqlist) > 0:
task_group.create_task(self._cenc_eqlist_ws.start())
async def stop(self): # pragma: no cover
async with asyncio.TaskGroup() as task_group:
task_group.create_task(self._cenc_eew_ws.stop())
task_group.create_task(self._sc_eew_ws.stop())
task_group.create_task(self._cenc_eqlist_ws.stop())
wolfx_api = WolfxAPIService()

View File

@ -1,6 +1,6 @@
import datetime
from nonebot_plugin_alconna import UniMessage
from konabot.common.apis.wolfx import CencEewReport, wolfx_api
from konabot.common.apis.wolfx import CencEewReport, CencEqReport, wolfx_api
from konabot.common.subscribe import PosterInfo, broadcast, register_poster_info
@ -54,14 +54,50 @@ register_poster_info(
)
@wolfx_api.cenc_eew.append
async def broadcast_eew(report: CencEewReport):
is_cn = any(report.HypoCenter.startswith(prefix) for prefix in provinces_short)
if (is_cn and report.Magnitude >= 4.2) or ((not is_cn) and report.Magnitude >= 7.0):
# 这是中国地震台网网站上,会默认展示的地震信息的等级
origin_time_dt = datetime.datetime.strptime(
report.OriginTime, "%Y-%m-%d %H:%M:%S"
)
CENC_EEW_DISABLED = True
if not CENC_EEW_DISABLED:
@wolfx_api.cenc_eew.append
async def broadcast_eew(report: CencEewReport):
# 这个好像没那么准确...
is_cn = any(report.HypoCenter.startswith(prefix) for prefix in provinces_short)
if (is_cn and report.Magnitude >= 4.2) or (
(not is_cn) and report.Magnitude >= 7.0
):
# 这是中国地震台网网站上,会默认展示的地震信息的等级
origin_time_dt = datetime.datetime.strptime(
report.OriginTime, "%Y-%m-%d %H:%M:%S"
)
origin_time_str = (
f"{origin_time_dt.month}"
f"{origin_time_dt.day}"
f"{origin_time_dt.hour}"
f"{origin_time_dt.minute}"
)
# vvv 下面这个其实不准确
eid_in_link = report.EventID.split(".")[0]
link = f"https://www.cenc.ac.cn/earthquake-manage-publish-web/product-list/{eid_in_link}/summarize"
msg = UniMessage.text(
"据中国地震台网中心 (https://www.cenc.ac.cn/) 报道,"
f"北京时间{origin_time_str}"
f"{report.HypoCenter}发生{report.Magnitude:.1f}级地震。"
f"震源位于 {report.Longitude}° {report.Latitude}°,深度 {report.Depth}km。\n\n"
f"详细信息请见 {link}"
)
await broadcast("中国地震台网地震速报", msg)
@wolfx_api.cenc_eqlist.append
async def broadcast_cenc_eqlist(report: CencEqReport):
is_cn = any(report.location.startswith(prefix) for prefix in provinces_short)
if (is_cn and float(report.magnitude) >= 4.2) or (
(not is_cn) and float(report.magnitude) >= 7.0
):
origin_time_dt = datetime.datetime.strptime(report.time, "%Y-%m-%d %H:%M:%S")
origin_time_str = (
f"{origin_time_dt.month}"
f"{origin_time_dt.day}"
@ -69,14 +105,11 @@ async def broadcast_eew(report: CencEewReport):
f"{origin_time_dt.minute}"
)
eid_in_link = report.EventID.split(".")[0]
link = f"https://www.cenc.ac.cn/earthquake-manage-publish-web/product-list/{eid_in_link}/summarize"
msg = UniMessage.text(
"据中国地震台网中心 (https://www.cenc.ac.cn/) 报道"
"据中国地震台网中心 (https://www.cenc.ac.cn/) 消息"
f"北京时间{origin_time_str}"
f"{report.HypoCenter}发生{report.Magnitude:.1f}级地震。"
f"震源位于 {report.Longitude}° {report.Latitude}°,深度 {report.Depth}km。\n\n"
f"详细信息请见 {link}"
f"{report.location}发生{report.magnitude}级地震。"
f"震源位于 {report.longtitude}° {report.latitude}°,深度 {report.depth}km。\n\n"
f"数据来源于 Wolfx OpenAPI事件 ID: {report.EventID}"
)
await broadcast("中国地震台网地震速报", msg)