Files
konabot/konabot/plugins/simple_notify/__init__.py
passthem cdfb822f42
All checks were successful
continuous-integration/drone/push Build is passing
把所有代办都换成待办
2025-12-07 23:11:21 +08:00

251 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiohttp
import asyncio as asynkio
from math import ceil
from pathlib import Path
from typing import Any
import nanoid
from nonebot.rule import KeywordsRule, Rule
from konabot.plugins.notice_ui.notice import NoticeUI
import nonebot
from loguru import logger
from nonebot import get_plugin_config, on_message
from nonebot.adapters import Event
from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg, on_alconna
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget, LongTask, create_longtask, handle_long_task, longtask_data
from konabot.plugins.simple_notify.ask_llm import ask_ai
evt = on_message(rule=Rule(KeywordsRule("提醒我")))
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json"
DATA_FILE_LOCK = asynkio.Lock()
ASYNK_TASKS: set[asynkio.Task[Any]] = set()
LONG_TASK_NAME = "TASK_SIMPLE_NOTIFY"
PAGE_SIZE = 6
FMT_STRING = "%Y年%m月%d%H:%M:%S"
class NotifyConfigFile(BaseModel):
version: int = 2
notify_channels: dict[str, str] = {}
class NotifyPluginConfig(BaseModel):
plugin_notify_enable_ntfy: bool = False
plugin_notify_base_url: str = ""
plugin_notify_access_token: str = ""
plugin_notify_prefix: str = "kona-notice-"
config = get_plugin_config(NotifyPluginConfig)
async def send_notify_to_ntfy_instance(msg: str, channel: str):
if not config.plugin_notify_enable_ntfy:
return
url = f"{config.plugin_notify_base_url}/{channel}"
async with aiohttp.ClientSession() as session:
session.headers["Authorization"] = f"Bearer {config.plugin_notify_access_token}"
session.headers["Title"] = "🔔 此方 BOT 提醒"
async with session.post(url, data=msg) as response:
logger.info(f"访问 {url} 的结果是 {response.status}")
def load_notify_config() -> NotifyConfigFile:
if not DATA_FILE_PATH.exists():
return NotifyConfigFile()
try:
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析 Notify 时遇到问题:{e}")
return NotifyConfigFile()
def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4), "utf-8")
@evt.handle()
async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget):
if mEvt.get_user_id() in nonebot.get_bots():
return
text = msg.extract_plain_text()
segments = text.split("提醒我", maxsplit=1)
if len(segments) != 2:
return
target_time, notify_text = await ask_ai(text)
if target_time is None:
return
await create_longtask(
LONG_TASK_NAME,
{ "message": notify_text },
target,
target_time,
)
await target.send_message(
UniMessage().text(f"了解啦!将会在 {target_time.strftime(FMT_STRING)} 提醒你哦~")
)
logger.info(f"创建了一条于 {target_time} 的待办提醒")
driver = nonebot.get_driver()
@handle_long_task("TASK_SIMPLE_NOTIFY")
async def _(task: LongTask):
message = task.data["message"]
await task.target.send_message(
UniMessage().text(f"待办提醒:{message}")
)
notice_bytes = await NoticeUI.render_notice("待办提醒", message)
await task.target.send_message(
UniMessage().image(raw=notice_bytes),
at=False
)
async with DATA_FILE_LOCK:
data = load_notify_config()
if (chan := data.notify_channels.get(task.target.target_id)) is not None:
await send_notify_to_ntfy_instance(message, chan)
save_notify_config(data)
USER_CHECKOUT_TASK_CACHE: dict[str, dict[str, str]] = {}
cmd_check_notify_list = on_alconna(Alconna(
"re:(?:我有哪些|查询)(?:提醒|待办)",
Args["page", int, 1]
))
@cmd_check_notify_list.handle()
async def _(page: int, target: DepLongTaskTarget):
if page <= 0:
await target.send_message(UniMessage().text("页数应该大于 0 吧"))
return
async with longtask_data() as data:
tasks = data.to_handle.get(LONG_TASK_NAME, {}).values()
tasks = [t for t in tasks if t.target.target_id == target.target_id]
tasks = sorted(tasks, key=lambda t: t.deadline)
pages = ceil(len(tasks) / PAGE_SIZE)
if page > pages:
await target.send_message(UniMessage().text(f"最多也就 {pages} 页啦!"))
tasks = tasks[(page - 1) * PAGE_SIZE: page * PAGE_SIZE]
message = "你可以输入「删除提醒 序号」来删除一个提醒\n====== 待办清单 ======\n\n"
to_cache = {}
if len(tasks) == 0:
message += "空空如也\n"
else:
for i, task in enumerate(tasks):
to_cache[str(i + 1)] = task.uuid
message += f"{i + 1}) {task.data['message']}{task.deadline.strftime(FMT_STRING)}\n"
message += f"\n==== 第 {page} 页,共 {pages} 页 ===="
USER_CHECKOUT_TASK_CACHE[target.target_id] = to_cache
await target.send_message(UniMessage().text(message))
cmd_remove_task = on_alconna(Alconna(
"re:删除(?:提醒|待办)",
Args["checker", str],
))
@cmd_remove_task.handle()
async def _(checker: str, target: DepLongTaskTarget):
if target.target_id not in USER_CHECKOUT_TASK_CACHE:
await target.send_message(UniMessage().text(
"先用「查询提醒」来查询你有哪些提醒吧"
))
return
if checker not in USER_CHECKOUT_TASK_CACHE[target.target_id]:
await target.send_message(UniMessage().text(
"没有这个任务哦,请检查一下吧"
))
uuid = USER_CHECKOUT_TASK_CACHE[target.target_id][checker]
async with longtask_data() as data:
if uuid not in data.to_handle[LONG_TASK_NAME]:
await target.send_message(UniMessage().text(
"似乎这个提醒已经发出去了,或者已经被删除"
))
return
_msg = data.to_handle[LONG_TASK_NAME][uuid].data["message"]
del data.to_handle[LONG_TASK_NAME][uuid]
await target.send_message(UniMessage().text(
f"成功取消了提醒:{_msg}"
))
cmd_notify_channel = on_alconna(Alconna(
"ntfy",
Subcommand("删除", dest="delete"),
Subcommand("创建", Args["notify_id?", str], dest="create"),
), rule=lambda: config.plugin_notify_enable_ntfy)
@cmd_notify_channel.assign("$main")
async def _(target: DepLongTaskTarget):
async with DATA_FILE_LOCK:
data = load_notify_config()
target_channel = data.notify_channels.get(target.target_id)
if target_channel is None:
channel_msg = "目前还没有配置 ntfy 地址"
else:
channel_msg = f"配置的 ntfy Channel 为:{target_channel}\n\n服务器地址:{config.plugin_notify_base_url}"
await target.send_message(UniMessage.text(
f"{channel_msg}\n\n"
"配置 ntfy 通知:\n\n"
"- ntfy 创建: 启用 ntfy 通知,并为你随机生成一个通知渠道\n"
"- ntfy 删除:禁用 ntfy 通知\n"
))
@cmd_notify_channel.assign("create")
async def _(target: DepLongTaskTarget, notify_id: str = ""):
if notify_id == "":
notify_id = nanoid.generate(
alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz-",
size=16,
)
channel_name = f"{config.plugin_notify_prefix}{notify_id}"
async with DATA_FILE_LOCK:
data = load_notify_config()
data.notify_channels[target.target_id] = channel_name
save_notify_config(data)
await target.send_message(UniMessage.text(
f"了解!将会在 {channel_name} 为你提醒!\n"
"\n"
"食用教程:在你的手机端 / 网页端 ntfy 点击「订阅主题」,选择「使用其他服务器」,"
f"服务器填写 {config.plugin_notify_base_url} ,主题名填写 {channel_name}\n"
f"最后点击订阅,就能看到我给你发的消息啦!"
))
await send_notify_to_ntfy_instance(
"如果你看到这条消息,说明你已经成功订阅主题!此方 BOT 将会在这里提醒你你的待办!",
channel_name,
)
@cmd_notify_channel.assign("delete")
async def _(target: DepLongTaskTarget):
async with DATA_FILE_LOCK:
data = load_notify_config()
del data.notify_channels[target.target_id]
save_notify_config(data)
await target.send_message(UniMessage.text("ok."))