Compare commits

..

28 Commits

Author SHA1 Message Date
e0c55545ec 添加此方提醒的 CURD 和 ntfy 联动
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 05:08:54 +08:00
164305e81f 调整 man 2025-10-24 02:27:56 +08:00
96679033f3 不再有 fortune
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 02:21:56 +08:00
afda0680ec 调整衰减函数
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:59:41 +08:00
021133954e 调整 man 默认范围
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:33:59 +08:00
7baa04dbc2 添加罗文提示 2025-10-24 01:33:01 +08:00
e55bdbdf4a 怪话不可为空!!!
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:27:40 +08:00
a30c7b8093 添加怪话过滤功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:21:54 +08:00
3da2c2266f 说怪话 bot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-24 01:08:46 +08:00
96e3c3fe17 让 Onebot private channel 也有 ID 2025-10-24 00:46:05 +08:00
851c9eb3c7 修复程序退出耗时太久的问题 2025-10-24 00:01:13 +08:00
11269b2a5a 在罗文被念错时提醒他 2025-10-23 23:32:56 +08:00
875e0efc2f Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-23 22:12:41 +08:00
4f43312663 升级 ptimeparse 2025-10-23 22:12:34 +08:00
b2f4768573 优化判定与计分规则
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 23:52:05 +08:00
bc6263ec31 Merge pull request '添加安安展示' (#27) from tnot/konabot:添加安安展示 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #27
2025-10-21 22:07:19 +08:00
bc9d025836 修好了bug的安安展示 2025-10-21 22:02:41 +08:00
b552aacf89 添加安安展示 2025-10-21 21:33:32 +08:00
f9a0249772 优化 giftool 的截取逻辑
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 18:31:14 +08:00
c94db33b11 更新 ptimeparse 到 0.2.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 22:45:34 +08:00
67382a0c0a 在我写的模块采用更安全的 asyncio 锁写法
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 20:27:18 +08:00
fd4c9302c2 async with lock
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 20:24:47 +08:00
f30ad0cb7d 判定部分优化
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 18:48:10 +08:00
f7afe48680 精度修复
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 18:36:27 +08:00
b42385f780 修复成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 18:24:03 +08:00
6cae38dea9 提升 LongTask 的健壮性
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 16:54:59 +08:00
8594b59783 修复 LongTask 在 Discord 和控制台无法正确返回是否顺利完成任务的问题 2025-10-19 16:51:22 +08:00
f768c91430 完善 LongTask 模块
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 16:47:50 +08:00
20 changed files with 765 additions and 376 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 841 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 821 KiB

6
bot.py
View File

@ -20,9 +20,13 @@ env_enable_minecraft = os.environ.get("ENABLE_MINECRAFT", "none")
def main(): def main():
if env.upper() == 'DEBUG' or env.upper() == 'DEV':
console_log_level = 'DEBUG'
else:
console_log_level = 'INFO'
init_logger(LOG_PATH, [ init_logger(LOG_PATH, [
BotExceptionMessage, BotExceptionMessage,
]) ], console_log_level=console_log_level)
nonebot.init() nonebot.init()

View File

@ -18,7 +18,7 @@ def file_exception_filter(
否则,返回 True允许记录 否则,返回 True允许记录
""" """
exception_info = record.get("exception") exception_info = record.get("exception")
if exception_info: if exception_info:
exception_type = exception_info[0] exception_type = exception_info[0]
@ -29,8 +29,9 @@ def file_exception_filter(
def init_logger( def init_logger(
log_dir: Path, log_dir: Path,
ignored_exceptions: List[Type[Exception]] ignored_exceptions: List[Type[Exception]],
console_log_level: str = "INFO",
) -> None: ) -> None:
""" """
配置全局 Loguru Logger。 配置全局 Loguru Logger。
@ -47,7 +48,7 @@ def init_logger(
logger.add( logger.add(
sys.stderr, sys.stderr,
level="INFO", level=console_log_level,
colorize=True, colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
) )
@ -76,4 +77,4 @@ def init_logger(
) )
logger.info("Loguru Logger 初始化完成!") logger.info("Loguru Logger 初始化完成!")
logger.info(f"控制台日志级别: INFO") logger.info(f"控制台日志级别: {console_log_level}")

View File

@ -1,3 +1,5 @@
from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import datetime import datetime
import json import json
@ -23,6 +25,7 @@ from pydantic import BaseModel, ValidationError
from .path import DATA_PATH from .path import DATA_PATH
LONGTASK_DATA_DIR = DATA_PATH / "longtasks.json" LONGTASK_DATA_DIR = DATA_PATH / "longtasks.json"
QQ_PRIVATE_CHAT_CHANNEL_PREFIX = "_CHANNEL_QQ_PRIVATE_"
class LongTaskTarget(BaseModel): class LongTaskTarget(BaseModel):
@ -63,7 +66,7 @@ class LongTaskTarget(BaseModel):
} BOT_CLASS={bot.__class__.__name__}" } BOT_CLASS={bot.__class__.__name__}"
) )
return False return False
if self.channel_id == "": if self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX) or not self.channel_id.strip():
# 私聊模式 # 私聊模式
await bot.send_private_msg( await bot.send_private_msg(
user_id=int(self.target_id), user_id=int(self.target_id),
@ -89,6 +92,7 @@ class LongTaskTarget(BaseModel):
) )
return False return False
await bot.send_message(self.channel_id, cast(Any, await msg.export())) await bot.send_message(self.channel_id, cast(Any, await msg.export()))
return True
if self.platform == "discord": if self.platform == "discord":
if not isinstance(bot, DCBot): if not isinstance(bot, DCBot):
logger.warning( logger.warning(
@ -104,6 +108,7 @@ class LongTaskTarget(BaseModel):
), ),
tts=False, tts=False,
) )
return True
logger.warning(f"没有一个平台是期望的平台 PLATFORM={self.platform}") logger.warning(f"没有一个平台是期望的平台 PLATFORM={self.platform}")
return False return False
@ -111,21 +116,21 @@ class LongTaskTarget(BaseModel):
class LongTask(BaseModel): class LongTask(BaseModel):
uuid: str uuid: str
data_json: str data_json: str
target: "LongTaskTarget" target: LongTaskTarget
callback: str callback: str
deadline: datetime.datetime deadline: datetime.datetime
canceled: bool = False
_aio_task: asynkio.Task | None = None _aio_task: asynkio.Task | None = None
async def run(self): async def run(self):
now = datetime.datetime.now() now = datetime.datetime.now()
if self.deadline < now and not self.canceled: if self.deadline < now:
await self._run_task() await self._run_task()
return return
await asynkio.sleep((self.deadline - now).total_seconds()) await asynkio.sleep((self.deadline - now).total_seconds())
if self.canceled: async with longtask_data() as data:
return if self.uuid not in data.to_handle[self.callback]:
return
await self._run_task() await self._run_task()
async def _run_task(self): async def _run_task(self):
@ -135,19 +140,27 @@ class LongTask(BaseModel):
f"Callback {self.callback} 未曾被注册,但是被期待调用,已忽略" f"Callback {self.callback} 未曾被注册,但是被期待调用,已忽略"
) )
async with longtask_data() as datafile: async with longtask_data() as datafile:
datafile.to_handle[self.callback] = [ del datafile.to_handle[self.callback][self.uuid]
t
for t in datafile.to_handle.get(self.callback, [])
if t.uuid != self.uuid
]
datafile.unhandled.setdefault(self.callback, []).append(self) datafile.unhandled.setdefault(self.callback, []).append(self)
return return
await hdl(self) success = False
try:
await hdl(self)
success = True
except Exception as e:
logger.exception(e)
async with longtask_data() as datafile: async with longtask_data() as datafile:
datafile.to_handle[self.callback] = [ del datafile.to_handle[self.callback][self.uuid]
t for t in datafile.to_handle[self.callback] if t.uuid != self.uuid if not success:
] datafile.unhandled.setdefault(self.callback, []).append(self)
logger.info(
f"LongTask 执行失败 UUID={self.uuid} callback={self.callback}"
)
else:
logger.info(
f"LongTask 工作完成 UUID={self.uuid} callback={self.callback}"
)
def clean(self): def clean(self):
self._aio_task = None self._aio_task = None
@ -162,7 +175,7 @@ class LongTask(BaseModel):
class LongTaskModuleData(BaseModel): class LongTaskModuleData(BaseModel):
to_handle: dict[str, list[LongTask]] to_handle: dict[str, dict[str, LongTask]]
unhandled: dict[str, list[LongTask]] unhandled: dict[str, list[LongTask]]
@ -178,7 +191,7 @@ async def get_long_task_target(event: BaseEvent, bot: BaseBot) -> LongTaskTarget
return LongTaskTarget( return LongTaskTarget(
platform="qq", platform="qq",
self_id=str(event.self_id), self_id=str(event.self_id),
channel_id="", channel_id=f"{QQ_PRIVATE_CHAT_CHANNEL_PREFIX}{event.self_id}",
target_id=str(event.user_id), target_id=str(event.user_id),
) )
if isinstance(event, ConsoleMessageEvent): if isinstance(event, ConsoleMessageEvent):
@ -236,11 +249,10 @@ def _save_longtask_data(data: LongTaskModuleData):
@asynccontextmanager @asynccontextmanager
async def longtask_data(): async def longtask_data():
await longtask_lock.acquire() async with longtask_lock:
data = _load_longtask_data() data = _load_longtask_data()
yield data yield data
_save_longtask_data(data) _save_longtask_data(data)
longtask_lock.release()
async def create_longtask( async def create_longtask(
@ -257,19 +269,27 @@ async def create_longtask(
deadline=deadline, deadline=deadline,
) )
logger.info(f"创建了新的 LongTask UUID={task.uuid} CALLBACK={task.callback}")
await task.start() await task.start()
async with longtask_data() as d: async with longtask_data() as d:
d.to_handle.setdefault(handler, []).append(task) d.to_handle.setdefault(handler, {})[task.uuid] = task
return task return task
async def init_longtask(): async def init_longtask():
counter = 0
req: set[str] = set()
async with longtask_data() as data: async with longtask_data() as data:
for v in data.to_handle.values(): for v in data.to_handle.values():
for t in v: for t in v.values():
await t.start() await t.start()
counter += 1
req.add(t.callback)
logger.info(f"LongTask 启动了任务 数量={counter} 期望的门类=[{','.join(req)}]")
DepLongTaskTarget = Annotated[LongTaskTarget, Depends(get_long_task_target)] DepLongTaskTarget = Annotated[LongTaskTarget, Depends(get_long_task_target)]

View File

@ -0,0 +1,2 @@
关于罗文和洛温:
AdoreLowen 希望和洛温阿特金森区分,所以最好就不要叫他洛温了!此方 BOT 会在一些群提醒叫错了的人。

View File

@ -0,0 +1,15 @@
指令介绍
ntfy - 配置使用 ntfy 来更好地为你通知此方 BOT 代办
指令示例
`ntfy 创建`
创建一个随机的 ntfy 订阅主题来提醒代办,此方 Bot 将会给你使用指引。你可以前往 https://ntfy.sh/ 官网下载 ntfy APP或者使用网页版 ntfy。
`ntfy 创建 kagami-notice`
创建一个名字含有 kagami-notice 的 ntfy 订阅主题
`ntfy 删除`
清除并不再使用 ntfy 向你通知
另见
提醒我(1) 查询提醒(1) 删除提醒(1)

View File

@ -0,0 +1,8 @@
指令介绍
删除提醒 - 删除在`查询提醒(1)`中查到的提醒
指令示例
`删除提醒 1` 在查询提醒后,删除编号为 1 的提醒
另见
提醒我(1) 查询提醒(1) ntfy(1)

View File

@ -0,0 +1,15 @@
指令介绍
提醒我 - 在指定的时间提醒人事项的工具
使用示例
`下午五点提醒我吃饭`
创建一个下午五点的提醒,提醒你吃饭
`两分钟后提醒我睡觉`
创建一个相对于现在推迟 2 分钟的提醒,提醒你睡觉
`2026年4月25日20点整提醒我生日快乐`
创建一个指定日期和时间的提醒
另见
查询提醒(1) 删除提醒(1) ntfy(1)

View File

@ -0,0 +1,9 @@
指令介绍
查询提醒 - 查询已经创建的提醒
指令格式
`查询提醒` 查询提醒
`查询提醒 2` 查询第二页提醒
另见
提醒我(1) 删除提醒(1) ntfy(1)

View File

@ -1,22 +1,18 @@
import asyncio as asynkio import asyncio as asynkio
import base64
from pathlib import Path
import secrets
import json
import datetime import datetime
from typing import Literal, Optional import json
import secrets
from enum import Enum from enum import Enum
from pathlib import Path
from typing import Optional
from loguru import logger from loguru import logger
from nonebot import on_message from nonebot import on_message
from nonebot.adapters import Event as BaseEvent from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import ( from nonebot_plugin_alconna import (
Alconna, Alconna,
Args, Args,
Field,
Subcommand,
UniMessage, UniMessage,
UniMsg, UniMsg,
on_alconna, on_alconna,
@ -72,17 +68,19 @@ class TryStopState(Enum):
class TryVerifyState(Enum): class TryVerifyState(Enum):
VERIFIED = 0 VERIFIED = 0
NOT_IDIOM = 1 VERIFIED_AND_REAL = 1
WRONG_FIRST_CHAR = 2 NOT_IDIOM = 2
VERIFIED_BUT_NO_NEXT = 3 WRONG_FIRST_CHAR = 3
VERIFIED_GAME_END = 4 VERIFIED_BUT_NO_NEXT = 4
VERIFIED_GAME_END = 5
class IdiomGame: class IdiomGame:
ALL_WORDS = [] # 所有四字词语 ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语 ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例 INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 成语首字字典 IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典
AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典
__inited = False __inited = False
@ -161,31 +159,31 @@ class IdiomGame:
""" """
跳过当前成语,选择下一个成语 跳过当前成语,选择下一个成语
""" """
await self.lock.acquire() async with self.lock:
self._skip_idiom_async(buff_score) self._skip_idiom_async()
self.lock.release() self.add_buff_score(buff_score)
return self.last_idiom return self.last_idiom
def _skip_idiom_async(self, buff_score: int = -100) -> str: def _skip_idiom_async(self) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1] self.last_char = self.last_idiom[-1]
self.add_buff_score(buff_score) if not self.is_nextable(self.last_char):
self._skip_idiom_async()
return self.last_idiom return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState: async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
""" """
用户发送成语 用户发送成语
""" """
await self.lock.acquire() async with self.lock:
state = self._verify_idiom(idiom, user_id) state = self._verify_idiom(idiom, user_id)
self.lock.release()
return state return state
def is_nextable(self, last_char: str) -> bool: def is_nextable(self, last_char: str) -> bool:
""" """
判断是否有成语可以接 判断是否有成语可以接
""" """
return last_char in IdiomGame.IDIOM_FIRST_CHAR return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR
def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState: def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
# 新成语的首字应与上一条成语的尾字相同 # 新成语的首字应与上一条成语的尾字相同
@ -197,6 +195,8 @@ class IdiomGame:
self.last_idiom = idiom self.last_idiom = idiom
self.last_char = idiom[-1] self.last_char = idiom[-1]
self.add_score(user_id, 1) self.add_score(user_id, 1)
if idiom in IdiomGame.ALL_IDIOMS:
self.add_score(user_id, 4) # 再加 4 分
self.remain_rounds -= 1 self.remain_rounds -= 1
if self.remain_rounds <= 0: if self.remain_rounds <= 0:
self.now_playing = False self.now_playing = False
@ -205,12 +205,16 @@ class IdiomGame:
# 没有成语可以接了,自动跳过 # 没有成语可以接了,自动跳过
self._skip_idiom_async() self._skip_idiom_async()
return TryVerifyState.VERIFIED_BUT_NO_NEXT return TryVerifyState.VERIFIED_BUT_NO_NEXT
if idiom in IdiomGame.ALL_IDIOMS:
return TryVerifyState.VERIFIED_AND_REAL # 真实成语
return TryVerifyState.VERIFIED return TryVerifyState.VERIFIED
def get_user_score(self, user_id: str) -> int: def get_user_score(self, user_id: str) -> float:
if user_id not in self.score_board: if user_id not in self.score_board:
return 0 return 0
return self.score_board[user_id]["score"] # 避免浮点数精度问题导致过长
handled_score = round(self.score_board[user_id]["score"], 1)
return handled_score
def add_score(self, user_id: str, score: int): def add_score(self, user_id: str, score: int):
if user_id not in self.score_board: if user_id not in self.score_board:
@ -225,6 +229,13 @@ class IdiomGame:
def get_last_char(self) -> str: def get_last_char(self) -> str:
return self.last_char return self.last_char
@classmethod
def random_idiom_starting_with(cls, first_char: str) -> Optional[str]:
cls.init_lexicon()
if first_char not in cls.IDIOM_FIRST_CHAR:
return None
return secrets.choice(cls.IDIOM_FIRST_CHAR[first_char])
@classmethod @classmethod
def init_lexicon(cls): def init_lexicon(cls):
@ -290,6 +301,12 @@ class IdiomGame:
cls.IDIOM_FIRST_CHAR[idiom[0]] = [] cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom) cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
# 根据真正的成语大表,划分出有效成语首字字典
for idiom in cls.ALL_IDIOMS:
if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = []
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna( evt = on_alconna(
Alconna( Alconna(
@ -360,8 +377,10 @@ evt = on_alconna(
@evt.handle() @evt.handle()
async def force_play_game(event: BaseEvent, rounds: Optional[int] = 100): async def force_play_game(
await play_game(event, force=True, rounds=rounds) event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100
):
await play_game(event, target, force=True, rounds=rounds)
async def end_game(event: BaseEvent, group_id: str): async def end_game(event: BaseEvent, group_id: str):
@ -379,7 +398,7 @@ async def end_game(event: BaseEvent, group_id: str):
result_text += ( result_text += (
f"{i + 1}. " f"{i + 1}. "
+ UniMessage().at(user_id) + UniMessage().at(user_id)
+ f": {info['score'] + instance.get_all_buff_score()}\n" + f": {round(info['score'] + instance.get_all_buff_score(), 1)}\n"
) )
await evt.send(await result_text.export()) await evt.send(await result_text.export())
instance.clear_score_board() instance.clear_score_board()
@ -421,7 +440,8 @@ async def _(target: DepLongTaskTarget):
instance = IdiomGame.INSTANCE_LIST.get(group_id) instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state(): if not instance or not instance.get_playing_state():
return return
await evt.send(await UniMessage().text("你们太菜了全部扣100分").export()) avaliable_idiom = IdiomGame.random_idiom_starting_with(instance.get_last_char())
await evt.send(await UniMessage().text(f"你们太菜了全部扣100分明明还可以接「{avaliable_idiom}」的!").export())
idiom = await instance.skip_idiom(-100) idiom = await instance.skip_idiom(-100)
await evt.send( await evt.send(
await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export() await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export()
@ -458,16 +478,24 @@ async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
await evt.send( await evt.send(
await UniMessage() await UniMessage()
.at(user_id) .at(user_id)
.text("接不上!这个不一样!你被扣了 0.1 分!") .text(" 接不上!这个不一样!你被扣了 0.1 分!")
.export() .export()
) )
return return
await evt.send( if state == TryVerifyState.VERIFIED:
await UniMessage() await evt.send(
.at(user_id) await UniMessage()
.text(f"接对了!你有 {instance.get_user_score(user_id)} 分!") .at(user_id)
.export() .text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!")
) .export()
)
elif state == TryVerifyState.VERIFIED_AND_REAL:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
if state == TryVerifyState.VERIFIED_GAME_END: if state == TryVerifyState.VERIFIED_GAME_END:
await evt.send(await UniMessage().text("全部回合结束!").export()) await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id) await end_game(event, group_id)
@ -518,4 +546,3 @@ async def _(event: BaseEvent, target: DepLongTaskTarget):
group_id = target.channel_id group_id = target.channel_id
remove_banned_id(group_id) remove_banned_id(group_id)
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export()) await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())

View File

@ -1,10 +1,10 @@
import re import re
from io import BytesIO from io import BytesIO
import PIL.Image
from nonebot import on_message from nonebot import on_message
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage, from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
on_alconna)
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image from konabot.common.nb.extract_image import PIL_Image
@ -29,15 +29,17 @@ def parse_timestamp(tx: str) -> float | None:
return res return res
cmd_giftool = on_alconna(Alconna( cmd_giftool = on_alconna(
"giftool", Alconna(
Args["img", Image | None], "giftool",
Option("--ss", Args["start_point", str]), Args["img", Image | None],
Option("--frames:v", Args["frame_count", int]), Option("--ss", Args["start_point", str]),
Option("-t", Args["length", str]), Option("--frames:v", Args["frame_count", int]),
Option("-to", Args["end_point", str]), Option("-t", Args["length", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]), Option("-to", Args["end_point", str]),
)) Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
)
)
@cmd_giftool.handle() @cmd_giftool.handle()
@ -80,81 +82,66 @@ async def _(
if not getattr(image, "is_animated", False): if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF") raise BotExceptionMessage("错误输入的不是动图GIF")
frames = [] ##
durations = [] # 从这里开始,采样整个 GIF 图
total_duration = 0.0 frames: list[PIL.Image.Image] = []
durations: list[float] = []
try: try:
for i in range(getattr(image, "n_frames")): for i in range(getattr(image, "n_frames")):
image.seek(i) image.seek(i)
frames.append(image.copy()) frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒 duration = image.info.get("duration", 100) / 1000
durations.append(duration) durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError: except EOFError:
pass pass
if not frames: if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败") raise BotExceptionMessage("错误:读取 GIF 帧失败")
# 采样结束
def time_to_frame_index(target_time: float) -> int: ##
if target_time <= 0: # 根据开始、结束时间或者帧数量来裁取 GIF 图
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
start_frame = max(0, start_frame) begin_time = ss or 0
end_frame = min(len(frames) - 1, end_frame) end_time = sum(durations)
selected_frames = frames[start_frame : end_frame + 1] end_time = min(begin_time + (t or end_time), to or end_time, end_time)
selected_durations = durations[start_frame : end_frame + 1]
if frame_count is not None and frame_count > 0: accumulated = 0.0
if frame_count >= len(selected_frames): status = 0
pass
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
output_img = BytesIO() sel_frames: list[PIL.Image.Image] = []
sel_durations: list[float] = []
adjusted_durations = [ for i in range(len(frames)):
dur / speed_factor for dur in selected_durations frame = frames[i]
] duration = durations[i]
if status == 0:
if accumulated + duration > begin_time:
status = 1
sel_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
elif status == 1:
if accumulated + duration > end_time:
sel_frames.append(frame)
sel_durations.append(end_time - accumulated)
break
sel_frames.append(frame)
sel_durations.append(duration)
accumulated += duration
##
# 加速!
sel_durations = [dur / speed_factor * 1000 for dur in durations]
rframes = [] rframes = []
rdur = [] rdur = []
acc_mod_20 = 0 acc_mod_20 = 0
for i in range(len(selected_frames)): for i in range(len(sel_frames)):
fr = selected_frames[i] fr = sel_frames[i]
du: float = adjusted_durations[i] du = round(sel_durations[i])
if du >= 20: if du >= 20:
rframes.append(fr) rframes.append(fr)
@ -170,10 +157,12 @@ async def _(
if acc_mod_20 >= 20: if acc_mod_20 >= 20:
acc_mod_20 = 0 acc_mod_20 = 0
if len(rframes) == 1 and len(selected_frames) > 1: if len(rframes) == 1 and len(sel_frames) > 1:
rframes.append(selected_frames[max(2, len(selected_frames) // 2)]) rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
rdur.append(20) rdur.append(20)
##
# 收尾:看看透明度这块
transparency_flag = False transparency_flag = False
for f in rframes: for f in rframes:
if f.mode == "RGBA": if f.mode == "RGBA":
@ -186,12 +175,13 @@ async def _(
tf = {} tf = {}
if transparency_flag: if transparency_flag:
tf['transparency'] = 0 tf["transparency"] = 0
if is_rev: if is_rev:
rframes = rframes[::-1] rframes = rframes[::-1]
rdur = rdur[::-1] rdur = rdur[::-1]
output_img = BytesIO()
if rframes: if rframes:
rframes[0].save( rframes[0].save(
output_img, output_img,

View File

@ -4,13 +4,11 @@ import asyncio
from loguru import logger from loguru import logger
import nonebot import nonebot
# from nonebot.adapters import Bot, Event
# from nonebot_plugin_alconna import UniMessage # from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import ( from konabot.common.longtask import (
# DepLongTaskTarget, # DepLongTaskTarget,
# LongTask, # LongTask,
# create_longtask, # create_longtask,
# get_long_task_target,
# handle_long_task, # handle_long_task,
init_longtask, init_longtask,
) )
@ -45,6 +43,5 @@ async def _():
# handler="test_callback_001", # handler="test_callback_001",
# data={}, # data={},
# target=target, # target=target,
# deadline=datetime.datetime.now() + datetime.timedelta(seconds=2), # deadline=datetime.datetime.now() + datetime.timedelta(seconds=20),
# ) # )
# await target.send_message(UniMessage().text("Hello, world!"), at=True)

View File

@ -53,7 +53,7 @@ async def _(
if doc is None: if doc is None:
# 检索模式 # 检索模式
if section is None: if section is None:
section_set = {1} section_set = {1, 7}
else: else:
section_set = {section} section_set = {section}
if 1 in section_set and is_admin(event): if 1 in section_set and is_admin(event):
@ -75,7 +75,7 @@ async def _(
else: else:
# 查阅模式 # 查阅模式
if section is None: if section is None:
section_set = {1} section_set = {1, 7}
else: else:
section_set = {section} section_set = {section}
if 1 in section_set and is_admin(event): if 1 in section_set and is_admin(event):

View File

@ -2,25 +2,52 @@ from io import BytesIO
from typing import Iterable, cast from typing import Iterable, cast
from nonebot import on_message from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, Image, MultiVar, Option, Text, from nonebot_plugin_alconna import (
UniMessage, UniMsg, on_alconna) Alconna,
Args,
Field,
Image,
MultiVar,
Option,
Text,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display, draw_snaur_display from konabot.plugins.memepack.drawing.display import (
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten, draw_cao_display,
draw_geimao, draw_mnk, draw_snaur_display,
draw_pt, draw_suan) draw_anan_display,
)
from konabot.plugins.memepack.drawing.saying import (
draw_cute_ten,
draw_geimao,
draw_mnk,
draw_pt,
draw_suan,
)
from nonebot.adapters import Bot, Event from nonebot.adapters import Bot, Event
from returns.result import Success, Failure from returns.result import Success, Failure
geimao = on_alconna(Alconna( geimao = on_alconna(
"给猫说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "给猫说",
missing_tips=lambda: "你没有写给猫说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"给猫哈"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写给猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"给猫哈"},
)
@geimao.handle() @geimao.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -31,12 +58,21 @@ async def _(saying: list[str]):
await geimao.send(await UniMessage().image(raw=img_bytes).export()) await geimao.send(await UniMessage().image(raw=img_bytes).export())
pt = on_alconna(Alconna( pt = on_alconna(
"pt说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "pt说",
missing_tips=lambda: "你没有写小帕说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"小帕说"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小帕说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"小帕说"},
)
@pt.handle() @pt.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -47,12 +83,21 @@ async def _(saying: list[str]):
await pt.send(await UniMessage().image(raw=img_bytes).export()) await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna( mnk = on_alconna(
"re:小?黑白子?说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "re:小?黑白子?说",
missing_tips=lambda: "你没有写黑白子说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写黑白子说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"mnk说"},
)
@mnk.handle() @mnk.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -63,12 +108,21 @@ async def _(saying: list[str]):
await mnk.send(await UniMessage().image(raw=img_bytes).export()) await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna( suan = on_alconna(
"小蒜说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "小蒜说",
missing_tips=lambda: "你没有写小蒜说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set()) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@suan.handle() @suan.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -79,12 +133,21 @@ async def _(saying: list[str]):
await suan.send(await UniMessage().image(raw=img_bytes).export()) await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna( dsuan = on_alconna(
"大蒜说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "大蒜说",
missing_tips=lambda: "你没有写大蒜说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set()) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写大蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@dsuan.handle() @dsuan.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -95,12 +158,21 @@ async def _(saying: list[str]):
await dsuan.send(await UniMessage().image(raw=img_bytes).export()) await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna( cutecat = on_alconna(
"乖猫说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "乖猫说",
missing_tips=lambda: "你没有写十猫说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写十猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"十猫说"},
)
@cutecat.handle() @cutecat.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -113,13 +185,14 @@ async def _(saying: list[str]):
cao_display_cmd = on_message() cao_display_cmd = on_message()
@cao_display_cmd.handle() @cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot): async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False flag = False
for text in cast(Iterable[Text], msg.get(Text)): for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示": if text.text.strip() == "小槽展示":
flag = True flag = True
elif text.text.strip() == '': elif text.text.strip() == "":
continue continue
else: else:
return return
@ -134,29 +207,71 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
case Failure(err): case Failure(err):
await cao_display_cmd.send( await cao_display_cmd.send(
await UniMessage() await UniMessage()
.at(user_id=evt.get_user_id()) .at(user_id=evt.get_user_id())
.text(' ') .text(" ")
.text(err) .text(err)
.export() .export()
) )
snaur_display_cmd = on_alconna(Alconna( snaur_display_cmd = on_alconna(
"卵总展示", Alconna(
Option("--whiteness", Args["whiteness", float], alias=["-w"]), "卵总展示",
Option("--black-level", Args["black_level", float], alias=["-b"]), Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--opacity", Args["opacity", float], alias=["-o"]), Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--saturation", Args["saturation", float], alias=["-s"]), Option("--opacity", Args["opacity", float], alias=["-o"]),
Args["image", Image | None], Option("--saturation", Args["saturation", float], alias=["-s"]),
)) Args["image", Image | None],
)
)
@snaur_display_cmd.handle() @snaur_display_cmd.handle()
async def _(img: PIL_Image, whiteness: float = 0.0, black_level: float = 0.2, async def _(
opacity: float = 0.8, saturation: float = 0.85): img: PIL_Image,
whiteness: float = 0.0,
black_level: float = 0.2,
opacity: float = 0.8,
saturation: float = 0.85,
):
img_processed = await draw_snaur_display( img_processed = await draw_snaur_display(
img, whiteness, black_level, opacity, saturation, img,
whiteness,
black_level,
opacity,
saturation,
) )
img_data = BytesIO() img_data = BytesIO()
img_processed.save(img_data, "PNG") img_processed.save(img_data, "PNG")
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export()) await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())
anan_display_cmd = on_message()
@anan_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
stripped = text.text.strip()
if stripped == "安安展示":
flag = True
elif stripped == "":
continue
else:
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_anan_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await anan_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)

View File

@ -27,6 +27,15 @@ SNAUR_QUAD_POINTS = np.float32(cast(Any, [
[106, 1280], [106, 1280],
])) ]))
anan_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_base.png")
anan_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_top.png")
ANAN_QUAD_POINTS = np.float32([
[157, 585],
[793, 599],
[781, 908],
[160, 908]
])
def _draw_cao_display(image: PIL.Image.Image): def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB")) src = np.array(image.convert("RGB"))
h, w = src.shape[:2] h, w = src.shape[:2]
@ -139,3 +148,52 @@ async def draw_snaur_display(
opacity, saturation, opacity, saturation,
) )
def _draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
src = np.array(image.convert("RGBA"))
h, w = src.shape[:2]
src_points = np.float32([
[0, 0],
[w, 0],
[w, h],
[0, h]
])
dst_points = ANAN_QUAD_POINTS
M = cv2.getPerspectiveTransform(src_points, dst_points)
output_w, output_h = anan_image_top.size
src_rgb = cv2.cvtColor(src, cv2.COLOR_RGBA2RGB) if src.shape[2] == 4 else src
warped_rgb = cv2.warpPerspective(
src_rgb,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
mask = np.zeros((h, w), dtype=np.uint8)
mask[:, :] = 255
warped_mask = cv2.warpPerspective(
mask,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0
)
warped_rgba = cv2.cvtColor(warped_rgb, cv2.COLOR_RGB2RGBA)
warped_rgba[:, :, 3] = warped_mask
warped_pil = PIL.Image.fromarray(warped_rgba, 'RGBA')
result = PIL.Image.alpha_composite(anan_image_base, warped_pil)
result = PIL.Image.alpha_composite(result, anan_image_top)
return result
async def draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
return await asyncio.to_thread(_draw_anan_display, image)

View File

@ -0,0 +1,44 @@
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from nonebot_plugin_alconna import UniMsg, UniMessage
from pydantic import BaseModel
class NoLuowenConfig(BaseModel):
plugin_noluowen_qqid: int = -1
plugin_noluowen_enable_group: list[int] = []
config = nonebot.get_plugin_config(NoLuowenConfig)
async def is_luowen_mentioned(evt: GroupMessageEvent, msg: UniMsg) -> bool:
if config.plugin_noluowen_qqid <= 0:
return False
if evt.user_id == config.plugin_noluowen_qqid:
return False
if evt.group_id not in config.plugin_noluowen_enable_group:
return False
txt = msg.extract_plain_text()
if "洛温" not in txt:
return False
if "罗文" in txt:
return False
if "阿特金森" in txt:
return False
return True
evt_luowen_mentioned = nonebot.on_message(rule=is_luowen_mentioned)
@evt_luowen_mentioned.handle()
async def _(evt: GroupMessageEvent, bot: Bot):
msg = (
UniMessage()
.reply(str(evt.message_id))
.at(str(config.plugin_noluowen_qqid))
.text(" 好像有人念错了你的 ID")
)
await evt_luowen_mentioned.send(await msg.export(bot=bot))

View File

@ -1,27 +1,24 @@
import aiohttp
import asyncio as asynkio import asyncio as asynkio
import datetime import datetime
import functools from math import ceil
from pathlib import Path from pathlib import Path
from typing import Any, Literal, cast from typing import Any, Literal
import signal import nanoid
import nonebot import nonebot
import ptimeparse import ptimeparse
from loguru import logger from loguru import logger
from nonebot import on_message from nonebot import get_plugin_config, on_message
from nonebot.adapters import Event from nonebot.adapters import Bot, Event
from nonebot.adapters.console import Bot as ConsoleBot from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent from nonebot.adapters.console import Bot as CBot
from nonebot.adapters.discord import Bot as DiscordBot from nonebot.adapters.discord import Bot as DCBot
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg, on_alconna
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11.event import \
GroupMessageEvent as OnebotV11GroupMessageEvent
from nonebot.adapters.onebot.v11.event import \
MessageEvent as OnebotV11MessageEvent
from nonebot_plugin_alconna import UniMessage, UniMsg
from pydantic import BaseModel from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget, LongTask, LongTaskTarget, create_longtask, handle_long_task, longtask_data
evt = on_message() evt = on_message()
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True) (Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
@ -29,6 +26,14 @@ DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.j
DATA_FILE_LOCK = asynkio.Lock() DATA_FILE_LOCK = asynkio.Lock()
ASYNK_TASKS: set[asynkio.Task[Any]] = set() ASYNK_TASKS: set[asynkio.Task[Any]] = set()
LONG_TASK_NAME = "TASK_SIMPLE_NOTIFY"
PAGE_SIZE = 6
FMT_STRING = "%Y年%m月%d%H:%M:%S"
class NotifyMessage(BaseModel):
message: str
class Notify(BaseModel): class Notify(BaseModel):
@ -43,14 +48,64 @@ class Notify(BaseModel):
notify_time: datetime.datetime notify_time: datetime.datetime
notify_msg: str notify_msg: str
def get_str(self):
return f"{self.target}-{self.target_env}-{self.platform}-{self.notify_time}"
class NotifyConfigFile(BaseModel): class NotifyConfigFile(BaseModel):
version: int = 2 version: int = 2
notifies: list[Notify] = [] notifies: list[Notify] = []
unsent: list[Notify] = [] unsent: list[Notify] = []
notify_channels: dict[str, str] = {}
class NotifyPluginConfig(BaseModel):
plugin_notify_enable_ntfy: bool = False
plugin_notify_base_url: str = ""
plugin_notify_access_token: str = ""
plugin_notify_prefix: str = "kona-notice-"
config = get_plugin_config(NotifyPluginConfig)
async def send_notify_to_ntfy_instance(msg: str, channel: str):
if not config.plugin_notify_enable_ntfy:
return
url = f"{config.plugin_notify_base_url}/{channel}"
async with aiohttp.ClientSession() as session:
session.headers["Authorization"] = f"Bearer {config.plugin_notify_access_token}"
session.headers["Title"] = "🔔 此方 BOT 提醒"
async with session.post(url, data=msg) as response:
logger.info(f"访问 {url} 的结果是 {response.status}")
def _get_bot_of(_type: type[Bot]):
for bot in nonebot.get_bots().values():
if isinstance(bot, _type):
return bot.self_id
return ""
def get_target_from_notify(notify: Notify) -> LongTaskTarget:
if notify.platform == "console":
return LongTaskTarget(
platform="console",
self_id=_get_bot_of(CBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
if notify.platform == "discord":
return LongTaskTarget(
platform="discord",
self_id=_get_bot_of(DCBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
return LongTaskTarget(
platform="qq",
self_id=_get_bot_of(OBBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
def load_notify_config() -> NotifyConfigFile: def load_notify_config() -> NotifyConfigFile:
@ -67,80 +122,8 @@ def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4)) DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
async def notify_now(notify: Notify):
if notify.platform == 'console':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'discord':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
channel = await bot.create_DM(recipient_id=int(notify.target))
await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'qq':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
if notify.target_env is None:
await bot.send_private_msg(
user_id=int(notify.target),
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
bot=bot,
)),
)
else:
await bot.send_group_msg(
group_id=int(notify.target_env),
message=cast(Any,
await UniMessage().at(
notify.target
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot)
),
)
else:
logger.warning(f"提醒未成功发送出去:{notify}")
return False
return True
def create_notify_task(notify: Notify, fail2remove: bool = True):
async def mission():
begin_time = datetime.datetime.now()
if begin_time < notify.notify_time:
try:
await asynkio.sleep((notify.notify_time - begin_time).total_seconds())
except asynkio.CancelledError:
logger.debug("代办提醒被信号中止,任务退出")
return
else:
logger.warning(
f"期望在 {notify.notify_time} 在平台 {notify.platform} {notify.target_env}"
f" {notify.target} 的代办通知 {notify.notify_msg} 已经超时,将会直接通知!"
)
res = await notify_now(notify)
if fail2remove or res:
await DATA_FILE_LOCK.acquire()
cfg = load_notify_config()
cfg.notifies = [n for n in cfg.notifies if n.get_str() != notify.get_str()]
if not res:
cfg.unsent.append(notify)
save_notify_config(cfg)
DATA_FILE_LOCK.release()
else:
pass
return asynkio.create_task(mission())
@evt.handle() @evt.handle()
async def _(msg: UniMsg, mEvt: Event): async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget):
if mEvt.get_user_id() in nonebot.get_bots(): if mEvt.get_user_id() in nonebot.get_bots():
return return
@ -153,57 +136,26 @@ async def _(msg: UniMsg, mEvt: Event):
return return
notify_time, notify_text = segments notify_time, notify_text = segments
# target_time = get_target_time(notify_time)
try: try:
target_time = ptimeparse.parse(notify_time) target_time = ptimeparse.Parser().parse(notify_time)
logger.info(f"{notify_time} 解析出了时间:{target_time}") logger.info(f"{notify_time} 解析出了时间:{target_time}")
except Exception: except Exception:
logger.info(f"无法从 {notify_time} 中解析出时间") logger.info(f"无法从 {notify_time} 中解析出时间")
return return
# if target_time is None:
# logger.info(f"无法从 {notify_time} 中解析出时间")
# return
if not notify_text: if not notify_text:
return return
await DATA_FILE_LOCK.acquire() await create_longtask(
cfg = load_notify_config() LONG_TASK_NAME,
{ "message": notify_text },
if isinstance(mEvt, ConsoleMessageEvent): target,
platform = "console" target_time,
target = mEvt.get_user_id()
target_env = None
elif isinstance(mEvt, OnebotV11MessageEvent):
platform = "qq"
target = mEvt.get_user_id()
if isinstance(mEvt, OnebotV11GroupMessageEvent):
target_env = str(mEvt.group_id)
else:
target_env = None
elif isinstance(mEvt, DiscordMessageEvent):
platform = "discord"
target = mEvt.get_user_id()
target_env = None
else:
logger.warning(f"Notify 遇到不支持的平台:{type(mEvt).__name__}")
return
notify = Notify(
platform=platform,
target=target,
target_env=target_env,
notify_time=target_time,
notify_msg=notify_text,
) )
create_notify_task(notify)
cfg.notifies.append(notify) await target.send_message(
save_notify_config(cfg) UniMessage().text(f"了解啦!将会在 {target_time.strftime(FMT_STRING)} 提醒你哦~")
DATA_FILE_LOCK.release() )
logger.info(f"创建了一条于 {target_time} 的代办提醒")
await evt.send(await UniMessage().at(mEvt.get_user_id()).text(
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export())
logger.info(f"创建了一条于 {notify.notify_time} 的代办提醒")
driver = nonebot.get_driver() driver = nonebot.get_driver()
@ -226,35 +178,152 @@ async def _():
await DATA_FILE_LOCK.acquire() await DATA_FILE_LOCK.acquire()
# tasks: set[asynkio.Task[Any]] = set()
cfg = load_notify_config() cfg = load_notify_config()
if cfg.version == 1: if cfg.version == 1:
logger.info("将配置文件的版本升级为 2") logger.info("将配置文件的版本升级为 2")
cfg.version = 2 cfg.version = 2
else: else:
counter = 0
for notify in [*cfg.notifies]: for notify in [*cfg.notifies]:
task = create_notify_task(notify, fail2remove=False) await create_longtask(
ASYNK_TASKS.add(task) handler=LONG_TASK_NAME,
task.add_done_callback(lambda self: ASYNK_TASKS.remove(self)) data={ "message": notify.notify_msg },
counter += 1 target=get_target_from_notify(notify),
logger.info(f"成功创建了 {counter} 条代办事项") deadline=notify.notify_time,
)
cfg.notifies = []
save_notify_config(cfg) save_notify_config(cfg)
DATA_FILE_LOCK.release() DATA_FILE_LOCK.release()
loop = asynkio.get_running_loop()
# 解决 asynk task 没有被 cancel 的问题 @handle_long_task("TASK_SIMPLE_NOTIFY")
async def shutdown(sig: signal.Signals): async def _(task: LongTask):
logger.info(f"收到 {sig.name} 指令,正在关闭所有的东西") message = task.data["message"]
for task in ASYNK_TASKS: await task.target.send_message(
task.cancel() UniMessage().text(f"代办提醒:{message}")
await asynkio.gather(*ASYNK_TASKS, return_exceptions=True) )
logger.info("所有的代办提醒 Task 都已经退出了") async with DATA_FILE_LOCK:
data = load_notify_config()
if (chan := data.notify_channels.get(task.target.target_id)) is not None:
await send_notify_to_ntfy_instance(message, chan)
save_notify_config(data)
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, functools.partial( USER_CHECKOUT_TASK_CACHE: dict[str, dict[str, str]] = {}
asynkio.create_task, shutdown(sig)
cmd_check_notify_list = on_alconna(Alconna(
"re:(?:我有哪些|查询)(?:提醒|代办)",
Args["page", int, 1]
))
@cmd_check_notify_list.handle()
async def _(page: int, target: DepLongTaskTarget):
if page <= 0:
await target.send_message(UniMessage().text("页数应该大于 0 吧"))
return
async with longtask_data() as data:
tasks = data.to_handle.get(LONG_TASK_NAME, {}).values()
tasks = [t for t in tasks if t.target.target_id == target.target_id]
tasks = sorted(tasks, key=lambda t: t.deadline)
pages = ceil(len(tasks) / PAGE_SIZE)
if page > pages:
await target.send_message(UniMessage().text(f"最多也就 {pages} 页啦!"))
tasks = tasks[(page - 1) * PAGE_SIZE: page * PAGE_SIZE]
message = "你可以输入「删除提醒 序号」来删除一个提醒\n====== 代办清单 ======\n\n"
to_cache = {}
if len(tasks) == 0:
message += "空空如也\n"
else:
for i, task in enumerate(tasks):
to_cache[str(i + 1)] = task.uuid
message += f"{i + 1}) {task.data['message']}{task.deadline.strftime(FMT_STRING)}\n"
message += f"\n==== 第 {page} 页,共 {pages} 页 ===="
USER_CHECKOUT_TASK_CACHE[target.target_id] = to_cache
await target.send_message(UniMessage().text(message))
cmd_remove_task = on_alconna(Alconna(
"re:删除(?:提醒|代办)",
Args["checker", str],
))
@cmd_remove_task.handle()
async def _(checker: str, target: DepLongTaskTarget):
if target.target_id not in USER_CHECKOUT_TASK_CACHE:
await target.send_message(UniMessage().text(
"先用「查询提醒」来查询你有哪些提醒吧"
))
return
if checker not in USER_CHECKOUT_TASK_CACHE[target.target_id]:
await target.send_message(UniMessage().text(
"没有这个任务哦,请检查一下吧"
))
uuid = USER_CHECKOUT_TASK_CACHE[target.target_id][checker]
async with longtask_data() as data:
if uuid not in data.to_handle[LONG_TASK_NAME]:
await target.send_message(UniMessage().text(
"似乎这个提醒已经发出去了,或者已经被删除"
))
return
_msg = data.to_handle[LONG_TASK_NAME][uuid].data["message"]
del data.to_handle[LONG_TASK_NAME][uuid]
await target.send_message(UniMessage().text(
f"成功取消了提醒:{_msg}"
)) ))
await asynkio.gather(*ASYNK_TASKS)
cmd_notify_channel = on_alconna(Alconna(
"ntfy",
Subcommand("删除", dest="delete"),
Subcommand("创建", Args["notify_id?", str], dest="create"),
), rule=lambda: config.plugin_notify_enable_ntfy)
@cmd_notify_channel.assign("$main")
async def _(target: DepLongTaskTarget):
await target.send_message(UniMessage.text(
"配置 ntfy 通知:\n\n"
"- ntfy 创建: 启用 ntfy 通知,并为你随机生成一个通知渠道\n"
"- ntfy 删除:禁用 ntfy 通知\n"
))
@cmd_notify_channel.assign("create")
async def _(target: DepLongTaskTarget, notify_id: str = ""):
if notify_id == "":
notify_id = nanoid.generate(
alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz-",
size=16,
)
channel_name = f"{config.plugin_notify_prefix}{notify_id}"
async with DATA_FILE_LOCK:
data = load_notify_config()
data.notify_channels[target.target_id] = channel_name
save_notify_config(data)
await target.send_message(UniMessage.text(
f"了解!将会在 {channel_name} 为你提醒!\n"
"\n"
"食用教程:在你的手机端 / 网页端 ntfy 点击「订阅主题」,选择「使用其他服务器」,"
f"服务器填写 {config.plugin_notify_base_url} ,主题名填写 {channel_name}\n"
f"最后点击订阅,就能看到我给你发的消息啦!"
))
await send_notify_to_ntfy_instance(
"如果你看到这条消息,说明你已经成功订阅主题!此方 BOT 将会在这里提醒你你的代办!",
channel_name,
)
@cmd_notify_channel.assign("delete")
async def _(target: DepLongTaskTarget):
async with DATA_FILE_LOCK:
data = load_notify_config()
del data.notify_channels[target.target_id]
save_notify_config(data)
await target.send_message(UniMessage.text("ok."))

25
poetry.lock generated
View File

@ -1743,6 +1743,23 @@ type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple" url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors" reference = "mirrors"
[[package]]
name = "nanoid"
version = "2.0.0"
description = "A tiny, secure, URL-friendly, unique string ID generator for Python"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "nanoid-2.0.0-py3-none-any.whl", hash = "sha256:90aefa650e328cffb0893bbd4c236cfd44c48bc1f2d0b525ecc53c3187b653bb"},
{file = "nanoid-2.0.0.tar.gz", hash = "sha256:5a80cad5e9c6e9ae3a41fa2fb34ae189f7cb420b2a5d8f82bd9d23466e4efa68"},
]
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]] [[package]]
name = "nepattern" name = "nepattern"
version = "0.7.7" version = "0.7.7"
@ -2402,14 +2419,14 @@ reference = "mirrors"
[[package]] [[package]]
name = "ptimeparse" name = "ptimeparse"
version = "0.1.2" version = "0.2.1"
description = "一个用于解析中文的时间表达的库" description = "一个用于解析中文的时间表达的库"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "ptimeparse-0.1.2-py3-none-any.whl", hash = "sha256:0eea791396e53b63330fadb40d9f0a2e6272bd5467246f10d1d6971bc606edff"}, {file = "ptimeparse-0.2.1-py3-none-any.whl", hash = "sha256:cf1115784d5d983da2d5b7af327108bf04c218c795d63291e71f76d7c6ffd2d4"},
{file = "ptimeparse-0.1.2.tar.gz", hash = "sha256:658be90a3cc2994c09c4ea2f276d257e7eb84bc330be79950baefe32b19779a2"}, {file = "ptimeparse-0.2.1.tar.gz", hash = "sha256:9b640e0a315d19b1e3821a290d236a051d8320348970ce3a835ed675bd2d832f"},
] ]
[package.source] [package.source]
@ -3807,4 +3824,4 @@ reference = "mirrors"
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.12,<4.0" python-versions = ">=3.12,<4.0"
content-hash = "d6a325b769fb3ed207c1a8891e65ea20bae20166fa281d6fa620faf54ad15bd8" content-hash = "96080ea588b3ac52b19909379585cd647646faf3dce291f8d2b5801a3111c838"

View File

@ -2,30 +2,29 @@
name = "konabot" name = "konabot"
version = "0.1.0" version = "0.1.0"
description = "在 MTTU 内部使用的 bot" description = "在 MTTU 内部使用的 bot"
authors = [ authors = [{ name = "passthem", email = "Passthem183@gmail.com" }]
{name = "passthem",email = "Passthem183@gmail.com"}
]
readme = "README.md" readme = "README.md"
requires-python = ">=3.12,<4.0" requires-python = ">=3.12,<4.0"
dependencies = [ dependencies = [
"nonebot2[all] (>=2.4.3,<3.0.0)", "nonebot2[all] (>=2.4.3,<3.0.0)",
"nonebot-adapter-onebot (>=2.4.6,<3.0.0)", "nonebot-adapter-onebot (>=2.4.6,<3.0.0)",
"nonebot-adapter-console (>=0.9.0,<0.10.0)", "nonebot-adapter-console (>=0.9.0,<0.10.0)",
"nonebot-adapter-discord (>=0.1.8,<0.2.0)", "nonebot-adapter-discord (>=0.1.8,<0.2.0)",
"nonebot-adapter-minecraft (>=1.5.2,<2.0.0)", "nonebot-adapter-minecraft (>=1.5.2,<2.0.0)",
"nonebot-plugin-alconna (>=0.59.4,<0.60.0)", "nonebot-plugin-alconna (>=0.59.4,<0.60.0)",
"nonebot-plugin-apscheduler (>=0.5.0,<0.6.0)", "nonebot-plugin-apscheduler (>=0.5.0,<0.6.0)",
"requests (>=2.32.5,<3.0.0)", "requests (>=2.32.5,<3.0.0)",
"beautifulsoup4 (>=4.13.5,<5.0.0)", "beautifulsoup4 (>=4.13.5,<5.0.0)",
"lxml (>=6.0.2,<7.0.0)", "lxml (>=6.0.2,<7.0.0)",
"pillow (>=11.3.0,<12.0.0)", "pillow (>=11.3.0,<12.0.0)",
"imagetext-py (>=2.2.0,<3.0.0)", "imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)", "opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)", "returns (>=0.26.0,<0.27.0)",
"ptimeparse (>=0.1.1,<0.2.0)", "skia-python (>=138.0,<139.0)",
"skia-python (>=138.0,<139.0)", "nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)", "qrcode (>=8.2,<9.0)",
"qrcode (>=8.2,<9.0)", "ptimeparse (>=0.2.1,<0.3.0)",
"nanoid (>=2.0.0,<3.0.0)",
] ]
[build-system] [build-system]
@ -43,4 +42,3 @@ url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
priority = "primary" priority = "primary"
[tool.poetry.dependencies] [tool.poetry.dependencies]
ptimeparse = {source = "pt-gitea-pypi"}