import aiohttp import asyncio as asynkio from math import ceil from pathlib import Path from typing import Any import nanoid import nonebot import ptimeparse from loguru import logger from nonebot import get_plugin_config, on_message from nonebot.adapters import Event from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg, on_alconna from pydantic import BaseModel from konabot.common.longtask import DepLongTaskTarget, LongTask, create_longtask, handle_long_task, longtask_data evt = on_message() (Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True) DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json" DATA_FILE_LOCK = asynkio.Lock() ASYNK_TASKS: set[asynkio.Task[Any]] = set() LONG_TASK_NAME = "TASK_SIMPLE_NOTIFY" PAGE_SIZE = 6 FMT_STRING = "%Y年%m月%d日 %H:%M:%S" class NotifyConfigFile(BaseModel): version: int = 2 notify_channels: dict[str, str] = {} class NotifyPluginConfig(BaseModel): plugin_notify_enable_ntfy: bool = False plugin_notify_base_url: str = "" plugin_notify_access_token: str = "" plugin_notify_prefix: str = "kona-notice-" config = get_plugin_config(NotifyPluginConfig) async def send_notify_to_ntfy_instance(msg: str, channel: str): if not config.plugin_notify_enable_ntfy: return url = f"{config.plugin_notify_base_url}/{channel}" async with aiohttp.ClientSession() as session: session.headers["Authorization"] = f"Bearer {config.plugin_notify_access_token}" session.headers["Title"] = "🔔 此方 BOT 提醒" async with session.post(url, data=msg) as response: logger.info(f"访问 {url} 的结果是 {response.status}") def load_notify_config() -> NotifyConfigFile: if not DATA_FILE_PATH.exists(): return NotifyConfigFile() try: return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text("utf-8")) except Exception as e: logger.warning(f"在解析 Notify 时遇到问题:{e}") return NotifyConfigFile() def save_notify_config(config: NotifyConfigFile): DATA_FILE_PATH.write_text(config.model_dump_json(indent=4), "utf-8") @evt.handle() async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget): if mEvt.get_user_id() in nonebot.get_bots(): return text = msg.extract_plain_text() if "提醒我" not in text: return segments = text.split("提醒我", maxsplit=1) if len(segments) != 2: return notify_time, notify_text = segments try: target_time = ptimeparse.Parser().parse(notify_time) logger.info(f"从 {notify_time} 解析出了时间:{target_time}") except Exception: logger.info(f"无法从 {notify_time} 中解析出时间") return if not notify_text: return await create_longtask( LONG_TASK_NAME, { "message": notify_text }, target, target_time, ) await target.send_message( UniMessage().text(f"了解啦!将会在 {target_time.strftime(FMT_STRING)} 提醒你哦~") ) logger.info(f"创建了一条于 {target_time} 的代办提醒") driver = nonebot.get_driver() @handle_long_task("TASK_SIMPLE_NOTIFY") async def _(task: LongTask): message = task.data["message"] await task.target.send_message( UniMessage().text(f"代办提醒:{message}") ) async with DATA_FILE_LOCK: data = load_notify_config() if (chan := data.notify_channels.get(task.target.target_id)) is not None: await send_notify_to_ntfy_instance(message, chan) save_notify_config(data) USER_CHECKOUT_TASK_CACHE: dict[str, dict[str, str]] = {} cmd_check_notify_list = on_alconna(Alconna( "re:(?:我有哪些|查询)(?:提醒|代办)", Args["page", int, 1] )) @cmd_check_notify_list.handle() async def _(page: int, target: DepLongTaskTarget): if page <= 0: await target.send_message(UniMessage().text("页数应该大于 0 吧")) return async with longtask_data() as data: tasks = data.to_handle.get(LONG_TASK_NAME, {}).values() tasks = [t for t in tasks if t.target.target_id == target.target_id] tasks = sorted(tasks, key=lambda t: t.deadline) pages = ceil(len(tasks) / PAGE_SIZE) if page > pages: await target.send_message(UniMessage().text(f"最多也就 {pages} 页啦!")) tasks = tasks[(page - 1) * PAGE_SIZE: page * PAGE_SIZE] message = "你可以输入「删除提醒 序号」来删除一个提醒\n====== 代办清单 ======\n\n" to_cache = {} if len(tasks) == 0: message += "空空如也\n" else: for i, task in enumerate(tasks): to_cache[str(i + 1)] = task.uuid message += f"{i + 1}) {task.data['message']}({task.deadline.strftime(FMT_STRING)})\n" message += f"\n==== 第 {page} 页,共 {pages} 页 ====" USER_CHECKOUT_TASK_CACHE[target.target_id] = to_cache await target.send_message(UniMessage().text(message)) cmd_remove_task = on_alconna(Alconna( "re:删除(?:提醒|代办)", Args["checker", str], )) @cmd_remove_task.handle() async def _(checker: str, target: DepLongTaskTarget): if target.target_id not in USER_CHECKOUT_TASK_CACHE: await target.send_message(UniMessage().text( "先用「查询提醒」来查询你有哪些提醒吧" )) return if checker not in USER_CHECKOUT_TASK_CACHE[target.target_id]: await target.send_message(UniMessage().text( "没有这个任务哦,请检查一下吧" )) uuid = USER_CHECKOUT_TASK_CACHE[target.target_id][checker] async with longtask_data() as data: if uuid not in data.to_handle[LONG_TASK_NAME]: await target.send_message(UniMessage().text( "似乎这个提醒已经发出去了,或者已经被删除" )) return _msg = data.to_handle[LONG_TASK_NAME][uuid].data["message"] del data.to_handle[LONG_TASK_NAME][uuid] await target.send_message(UniMessage().text( f"成功取消了提醒:{_msg}" )) cmd_notify_channel = on_alconna(Alconna( "ntfy", Subcommand("删除", dest="delete"), Subcommand("创建", Args["notify_id?", str], dest="create"), ), rule=lambda: config.plugin_notify_enable_ntfy) @cmd_notify_channel.assign("$main") async def _(target: DepLongTaskTarget): async with DATA_FILE_LOCK: data = load_notify_config() target_channel = data.notify_channels.get(target.target_id) if target_channel is None: channel_msg = "目前还没有配置 ntfy 地址" else: channel_msg = f"配置的 ntfy Channel 为:{target_channel}\n\n服务器地址:{config.plugin_notify_base_url}" await target.send_message(UniMessage.text( f"{channel_msg}\n\n" "配置 ntfy 通知:\n\n" "- ntfy 创建: 启用 ntfy 通知,并为你随机生成一个通知渠道\n" "- ntfy 删除:禁用 ntfy 通知\n" )) @cmd_notify_channel.assign("create") async def _(target: DepLongTaskTarget, notify_id: str = ""): if notify_id == "": notify_id = nanoid.generate( alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz-", size=16, ) channel_name = f"{config.plugin_notify_prefix}{notify_id}" async with DATA_FILE_LOCK: data = load_notify_config() data.notify_channels[target.target_id] = channel_name save_notify_config(data) await target.send_message(UniMessage.text( f"了解!将会在 {channel_name} 为你提醒!\n" "\n" "食用教程:在你的手机端 / 网页端 ntfy 点击「订阅主题」,选择「使用其他服务器」," f"服务器填写 {config.plugin_notify_base_url} ,主题名填写 {channel_name}\n" f"最后点击订阅,就能看到我给你发的消息啦!" )) await send_notify_to_ntfy_instance( "如果你看到这条消息,说明你已经成功订阅主题!此方 BOT 将会在这里提醒你你的代办!", channel_name, ) @cmd_notify_channel.assign("delete") async def _(target: DepLongTaskTarget): async with DATA_FILE_LOCK: data = load_notify_config() del data.notify_channels[target.target_id] save_notify_config(data) await target.send_message(UniMessage.text("ok."))