import aiohttp import asyncio as asynkio from math import ceil from pathlib import Path from typing import Any import nanoid import nonebot from loguru import logger from nonebot import get_plugin_config, on_message from nonebot.adapters import Event from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg, on_alconna from pydantic import BaseModel from konabot.common.longtask import DepLongTaskTarget, LongTask, create_longtask, handle_long_task, longtask_data from konabot.common.ptimeparse import Parser evt = on_message() (Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True) DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json" DATA_FILE_LOCK = asynkio.Lock() ASYNK_TASKS: set[asynkio.Task[Any]] = set() LONG_TASK_NAME = "TASK_SIMPLE_NOTIFY" PAGE_SIZE = 6 FMT_STRING = "%Y年%m月%d日 %H:%M:%S" class NotifyConfigFile(BaseModel): version: int = 2 notify_channels: dict[str, str] = {} class NotifyPluginConfig(BaseModel): plugin_notify_enable_ntfy: bool = False plugin_notify_base_url: str = "" plugin_notify_access_token: str = "" plugin_notify_prefix: str = "kona-notice-" config = get_plugin_config(NotifyPluginConfig) async def send_notify_to_ntfy_instance(msg: str, channel: str): if not config.plugin_notify_enable_ntfy: return url = f"{config.plugin_notify_base_url}/{channel}" async with aiohttp.ClientSession() as session: session.headers["Authorization"] = f"Bearer {config.plugin_notify_access_token}" session.headers["Title"] = "🔔 此方 BOT 提醒" async with session.post(url, data=msg) as response: logger.info(f"访问 {url} 的结果是 {response.status}") def load_notify_config() -> NotifyConfigFile: if not DATA_FILE_PATH.exists(): return NotifyConfigFile() try: return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text("utf-8")) except Exception as e: logger.warning(f"在解析 Notify 时遇到问题:{e}") return NotifyConfigFile() def save_notify_config(config: NotifyConfigFile): DATA_FILE_PATH.write_text(config.model_dump_json(indent=4), "utf-8") @evt.handle() async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget): if mEvt.get_user_id() in nonebot.get_bots(): return text = msg.extract_plain_text() if "提醒我" not in text: return segments = text.split("提醒我", maxsplit=1) if len(segments) != 2: return notify_time, notify_text = segments try: target_time = Parser().parse(notify_time) logger.info(f"从 {notify_time} 解析出了时间:{target_time}") except Exception: logger.info(f"无法从 {notify_time} 中解析出时间") return if not notify_text: return await create_longtask( LONG_TASK_NAME, { "message": notify_text }, target, target_time, ) await target.send_message( UniMessage().text(f"了解啦!将会在 {target_time.strftime(FMT_STRING)} 提醒你哦~") ) logger.info(f"创建了一条于 {target_time} 的代办提醒") driver = nonebot.get_driver() @handle_long_task("TASK_SIMPLE_NOTIFY") async def _(task: LongTask): message = task.data["message"] await task.target.send_message( UniMessage().text(f"代办提醒:{message}") ) async with DATA_FILE_LOCK: data = load_notify_config() if (chan := data.notify_channels.get(task.target.target_id)) is not None: await send_notify_to_ntfy_instance(message, chan) save_notify_config(data) USER_CHECKOUT_TASK_CACHE: dict[str, dict[str, str]] = {} cmd_check_notify_list = on_alconna(Alconna( "re:(?:我有哪些|查询)(?:提醒|代办)", Args["page", int, 1] )) @cmd_check_notify_list.handle() async def _(page: int, target: DepLongTaskTarget): if page <= 0: await target.send_message(UniMessage().text("页数应该大于 0 吧")) return async with longtask_data() as data: tasks = data.to_handle.get(LONG_TASK_NAME, {}).values() tasks = [t for t in tasks if t.target.target_id == target.target_id] tasks = sorted(tasks, key=lambda t: t.deadline) pages = ceil(len(tasks) / PAGE_SIZE) if page > pages: await target.send_message(UniMessage().text(f"最多也就 {pages} 页啦!")) tasks = tasks[(page - 1) * PAGE_SIZE: page * PAGE_SIZE] message = "你可以输入「删除提醒 序号」来删除一个提醒\n====== 代办清单 ======\n\n" to_cache = {} if len(tasks) == 0: message += "空空如也\n" else: for i, task in enumerate(tasks): to_cache[str(i + 1)] = task.uuid message += f"{i + 1}) {task.data['message']}({task.deadline.strftime(FMT_STRING)})\n" message += f"\n==== 第 {page} 页,共 {pages} 页 ====" USER_CHECKOUT_TASK_CACHE[target.target_id] = to_cache await target.send_message(UniMessage().text(message)) cmd_remove_task = on_alconna(Alconna( "re:删除(?:提醒|代办)", Args["checker", str], )) @cmd_remove_task.handle() async def _(checker: str, target: DepLongTaskTarget): if target.target_id not in USER_CHECKOUT_TASK_CACHE: await target.send_message(UniMessage().text( "先用「查询提醒」来查询你有哪些提醒吧" )) return if checker not in USER_CHECKOUT_TASK_CACHE[target.target_id]: await target.send_message(UniMessage().text( "没有这个任务哦,请检查一下吧" )) uuid = USER_CHECKOUT_TASK_CACHE[target.target_id][checker] async with longtask_data() as data: if uuid not in data.to_handle[LONG_TASK_NAME]: await target.send_message(UniMessage().text( "似乎这个提醒已经发出去了,或者已经被删除" )) return _msg = data.to_handle[LONG_TASK_NAME][uuid].data["message"] del data.to_handle[LONG_TASK_NAME][uuid] await target.send_message(UniMessage().text( f"成功取消了提醒:{_msg}" )) cmd_notify_channel = on_alconna(Alconna( "ntfy", Subcommand("删除", dest="delete"), Subcommand("创建", Args["notify_id?", str], dest="create"), ), rule=lambda: config.plugin_notify_enable_ntfy) @cmd_notify_channel.assign("$main") async def _(target: DepLongTaskTarget): async with DATA_FILE_LOCK: data = load_notify_config() target_channel = data.notify_channels.get(target.target_id) if target_channel is None: channel_msg = "目前还没有配置 ntfy 地址" else: channel_msg = f"配置的 ntfy Channel 为:{target_channel}\n\n服务器地址:{config.plugin_notify_base_url}" await target.send_message(UniMessage.text( f"{channel_msg}\n\n" "配置 ntfy 通知:\n\n" "- ntfy 创建: 启用 ntfy 通知,并为你随机生成一个通知渠道\n" "- ntfy 删除:禁用 ntfy 通知\n" )) @cmd_notify_channel.assign("create") async def _(target: DepLongTaskTarget, notify_id: str = ""): if notify_id == "": notify_id = nanoid.generate( alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz-", size=16, ) channel_name = f"{config.plugin_notify_prefix}{notify_id}" async with DATA_FILE_LOCK: data = load_notify_config() data.notify_channels[target.target_id] = channel_name save_notify_config(data) await target.send_message(UniMessage.text( f"了解!将会在 {channel_name} 为你提醒!\n" "\n" "食用教程:在你的手机端 / 网页端 ntfy 点击「订阅主题」,选择「使用其他服务器」," f"服务器填写 {config.plugin_notify_base_url} ,主题名填写 {channel_name}\n" f"最后点击订阅,就能看到我给你发的消息啦!" )) await send_notify_to_ntfy_instance( "如果你看到这条消息,说明你已经成功订阅主题!此方 BOT 将会在这里提醒你你的代办!", channel_name, ) @cmd_notify_channel.assign("delete") async def _(target: DepLongTaskTarget): async with DATA_FILE_LOCK: data = load_notify_config() del data.notify_channels[target.target_id] save_notify_config(data) await target.send_message(UniMessage.text("ok."))