初次提交请多关照
This commit is contained in:
9
konabot/plugins/maya_echo/__init__.py
Normal file
9
konabot/plugins/maya_echo/__init__.py
Normal file
@ -0,0 +1,9 @@
|
||||
from nonebot import on_message
|
||||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||
|
||||
evt = on_message()
|
||||
|
||||
@evt.handle()
|
||||
async def _(msg: UniMsg):
|
||||
if msg.extract_plain_text() == "喵":
|
||||
await evt.send(await UniMessage().text("喵").export())
|
||||
16
konabot/plugins/weather/__init__.py
Normal file
16
konabot/plugins/weather/__init__.py
Normal file
@ -0,0 +1,16 @@
|
||||
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
|
||||
|
||||
from konabot.plugins.weather.fetcher import fetch_radar
|
||||
|
||||
evt = on_alconna(Alconna(
|
||||
"!雷达回波",
|
||||
Args["region", str, Field(missing_tips=lambda: "请输入需要获取的地区,例如 !雷达回波 华南")],
|
||||
), use_cmd_start=False, use_cmd_sep=False, skip_for_unmatch=False)
|
||||
|
||||
@evt.handle()
|
||||
async def _(region: str):
|
||||
img, err = await fetch_radar(region)
|
||||
if err != None:
|
||||
await evt.send(await UniMessage().text(err).export())
|
||||
return
|
||||
await evt.send(await UniMessage.image(raw=img).export())
|
||||
37
konabot/plugins/weather/fetcher.py
Normal file
37
konabot/plugins/weather/fetcher.py
Normal file
@ -0,0 +1,37 @@
|
||||
import functools
|
||||
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
STATION_LINKS = {
|
||||
"全国": "https://www.nmc.cn/publish/radar/chinaall.html",
|
||||
"华北": "https://www.nmc.cn/publish/radar/huabei.html",
|
||||
"东北": "https://www.nmc.cn/publish/radar/dongbei.html",
|
||||
"华东": "https://www.nmc.cn/publish/radar/huadong.html",
|
||||
"华中": "https://www.nmc.cn/publish/radar/huazhong.html",
|
||||
"华南": "https://www.nmc.cn/publish/radar/huanan.html",
|
||||
"西南": "https://www.nmc.cn/publish/radar/xinan.html",
|
||||
"西北": "https://www.nmc.cn/publish/radar/xibei.html",
|
||||
}
|
||||
|
||||
async def fetch_radar(station: str) -> tuple[bytes, None] | tuple[None, str]:
|
||||
if station not in STATION_LINKS.keys():
|
||||
return None, f"获取失败:站点不存在。可用的站点:{','.join(STATION_LINKS.keys())}"
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(STATION_LINKS[station])
|
||||
if response.status_code != 200:
|
||||
return None, "获取失败:可能存在网络错误"
|
||||
|
||||
soup = BeautifulSoup(response.text, "lxml")
|
||||
links = functools.reduce(lambda x, y: x + y, [
|
||||
img.get_attribute_list("src") for img in soup.select("div.imgblock>img#imgpath")
|
||||
])
|
||||
if len(links) != 1:
|
||||
return None, "获取失败:链接获取失败"
|
||||
|
||||
response2 = await client.get(links[0])
|
||||
if response2.status_code != 200:
|
||||
return None, "获取失败:图片下载失败"
|
||||
|
||||
return response2.content, None
|
||||
Reference in New Issue
Block a user