Compare commits

..

19 Commits

Author SHA1 Message Date
f6fd25a41d fix: 添加 Notify 任务创建限制
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 19:08:30 +08:00
9f6c70bf0f 合并后的部分骰子路径修复
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 15:21:53 +08:00
1c01e49d5d Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-09-30 15:18:16 +08:00
48c719bc33 全新骰子 2025-09-30 15:13:59 +08:00
6bc9f94e83 拷贝环境变量
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 02:45:01 +08:00
deab2d7b2b 更新 PYTHONPATH 环境变量
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 02:39:30 +08:00
2a6abbe0d4 置入 __init__ 文件
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:35:10 +08:00
30bdc50024 将 pyproject.toml 置入项目目录
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:33:03 +08:00
be8b1b9999 修复文件夹错误展开的问题
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:28:54 +08:00
43d0a09de2 修复 docker 在 CI 不在线的问题
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:24:50 +08:00
6e0082c1c9 大变:移动了很多很多文件并优化构建流程
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:23:40 +08:00
3b8b060c5b Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:09:26 +08:00
8cfe58c7dd 添加测试流水线 2025-09-30 02:08:35 +08:00
f997bf945a Merge pull request '解决了提醒事项功能的若干 Issue' (#13) from fix-提醒功能若干问题修复 into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #13
2025-09-30 02:02:22 +08:00
0dbe164703 先调整到一个可用的状态 2025-09-30 01:59:40 +08:00
818f2b64ec 修复汉语数字问题 2025-09-30 00:43:09 +08:00
a855c69f61 添加「半」钟
All checks were successful
continuous-integration/drone/tag Build is passing
continuous-integration/drone/push Build is passing
2025-09-29 23:25:09 +08:00
90ee296f55 调整UX 2025-09-29 23:22:12 +08:00
915f186955 添加代办通知
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-29 23:12:13 +08:00
38 changed files with 866 additions and 38 deletions

View File

@ -1,4 +1,5 @@
/.env
/.git
/data
__pycache__

View File

@ -26,6 +26,14 @@ steps:
volumes:
- name: docker-socket
path: /var/run/docker.sock
- name: 在容器中测试插件加载
image: docker:dind
privileged: true
volumes:
- name: docker-socket
path: /var/run/docker.sock
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
volumes:
- name: docker-socket

4
.env.test Normal file
View File

@ -0,0 +1,4 @@
ENVIRONMENT=test
ENABLE_CONSOLE=false
ENABLE_QQ=false
ENABLE_DISCORD=false

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
/.env
/data
__pycache__

View File

@ -4,5 +4,11 @@ WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt --no-deps
COPY . .
COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets
COPY scripts ./scripts
COPY konabot ./konabot
ENV PYTHONPATH=/app
CMD [ "python", "bot.py" ]

View File

Before

Width:  |  Height:  |  Size: 9.2 KiB

After

Width:  |  Height:  |  Size: 9.2 KiB

View File

Before

Width:  |  Height:  |  Size: 10 KiB

After

Width:  |  Height:  |  Size: 10 KiB

View File

Before

Width:  |  Height:  |  Size: 8.7 KiB

After

Width:  |  Height:  |  Size: 8.7 KiB

View File

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View File

Before

Width:  |  Height:  |  Size: 10 KiB

After

Width:  |  Height:  |  Size: 10 KiB

View File

Before

Width:  |  Height:  |  Size: 8.7 KiB

After

Width:  |  Height:  |  Size: 8.7 KiB

View File

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View File

Before

Width:  |  Height:  |  Size: 9.3 KiB

After

Width:  |  Height:  |  Size: 9.3 KiB

View File

Before

Width:  |  Height:  |  Size: 10 KiB

After

Width:  |  Height:  |  Size: 10 KiB

View File

Before

Width:  |  Height:  |  Size: 8.7 KiB

After

Width:  |  Height:  |  Size: 8.7 KiB

View File

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View File

Before

Width:  |  Height:  |  Size: 9.3 KiB

After

Width:  |  Height:  |  Size: 9.3 KiB

BIN
assets/img/dice/stick.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

View File

Before

Width:  |  Height:  |  Size: 9.0 KiB

After

Width:  |  Height:  |  Size: 9.0 KiB

View File

Before

Width:  |  Height:  |  Size: 219 KiB

After

Width:  |  Height:  |  Size: 219 KiB

View File

Before

Width:  |  Height:  |  Size: 272 KiB

After

Width:  |  Height:  |  Size: 272 KiB

0
konabot/__init__.py Normal file
View File

4
konabot/common/path.py Normal file
View File

@ -0,0 +1,4 @@
from pathlib import Path
ASSETS_PATH = Path(__file__).resolve().parent.parent.parent / "assets"
FONTS_PATH = ASSETS_PATH / "fonts"

View File

@ -1,8 +1,8 @@
from imagetext_py import EmojiOptions, FontDB
from .path import ASSETS
from konabot.common.path import FONTS_PATH
FontDB.LoadFromDir(str(ASSETS))
FontDB.LoadFromDir(str(FONTS_PATH))
FontDB.SetDefaultEmojiOptions(EmojiOptions(
parse_shortcodes=False,

View File

@ -1,3 +0,0 @@
from pathlib import Path
ASSETS = Path(__file__).parent.parent.parent / "assets"

View File

@ -4,10 +4,11 @@ from typing import Any, cast
import imagetext_py
import PIL.Image
from .base.fonts import HARMONYOS_SANS_SC_BLACK
from .base.path import ASSETS
from konabot.common.path import ASSETS_PATH
geimao_image = PIL.Image.open(ASSETS / "geimao.jpg").convert("RGBA")
from .base.fonts import HARMONYOS_SANS_SC_BLACK
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
def _draw_geimao(saying: str):

View File

@ -3,10 +3,11 @@ import asyncio
import imagetext_py
import PIL.Image
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
from .base.path import ASSETS
from konabot.common.path import ASSETS_PATH
pt_image = PIL.Image.open(ASSETS / "ptsay.png").convert("RGBA")
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
def _draw_pt(saying: str):

View File

@ -1,11 +1,11 @@
from typing import Optional
from typing import Optional, Union
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.plugins.roll_dice.roll_dice import generate_dice_image
from konabot.plugins.roll_dice.roll_number import get_random_number, roll_number
from konabot.plugins.roll_dice.roll_number import get_random_number, get_random_number_string, roll_number
evt = on_alconna(Alconna(
"摇数字"
@ -22,21 +22,26 @@ async def _(event: BaseEvent):
evt = on_alconna(Alconna(
"摇骰子",
Args["f1?", int]["f2?", int]
Args["f1?", str]["f2?", str]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, f1: Optional[int] = None, f2: Optional[int] = None):
async def _(event: BaseEvent, f1: Optional[str] = None, f2: Optional[str] = None):
# if isinstance(event, DiscordMessageEvent):
# await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export())
# elif isinstance(event, ConsoleMessageEvent):
number = 0
number = ""
if(f1 is not None and f2 is not None):
number = get_random_number(f1, f2)
number = get_random_number_string(f1, f2)
elif f1 is not None:
number = get_random_number(1, f1)
if(float(f1) > 1):
number = get_random_number_string("1", f1)
elif (float(f1) > 0):
number = get_random_number_string("0", f1)
else:
number = get_random_number_string(f1, "0")
else:
number = get_random_number()
number = get_random_number_string()
await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export())
# else:
# await evt.send(await UniMessage().text(roll_dice(wide=True)).export())

View File

@ -1,9 +1,11 @@
from io import BytesIO
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from konabot.plugins.roll_dice.base.path import ASSETS
from konabot.common.path import ASSETS_PATH, FONTS_PATH
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
"""
@ -13,15 +15,7 @@ def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0
temp_image = Image.new('RGB', (1, 1), (255, 255, 255))
temp_draw = ImageDraw.Draw(temp_image)
font = ImageFont.truetype(ASSETS / "montserrat.otf", font_size)
# try:
# font = ImageFont.truetype(ASSETS / "montserrat.otf", font_size)
# except:
# try:
# font = ImageFont.truetype("arial.ttf", font_size)
# except:
# # 如果系统字体不可用,使用默认字体
# font = ImageFont.load_default()
font = ImageFont.truetype(FONTS_PATH / "montserrat.otf", font_size)
# 获取文本边界框
bbox = temp_draw.textbbox((0, 0), text, font=font)
@ -158,26 +152,206 @@ def precise_blend_with_perspective(background, foreground, corners):
return result
async def generate_dice_image(number: int) -> BytesIO:
def draw_line_bresenham(image, x0, y0, x1, y1, color):
"""使用Bresenham算法画线避免间隙"""
dx = abs(x1 - x0)
dy = abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx - dy
while True:
if 0 <= x0 < image.shape[1] and 0 <= y0 < image.shape[0]:
image[y0, x0] = color
if x0 == x1 and y0 == y1:
break
e2 = 2 * err
if e2 > -dy:
err -= dy
x0 += sx
if e2 < dx:
err += dx
y0 += sy
def slice_and_stretch(image, slice_lines, direction):
'''
image: 图像
slice_lines: 切割线两个点的列表一般是倾斜45度的直线
direction: 移动方向向量(二元数组)
'''
# 获取图片的尺寸
height, width = image.shape[:2]
# 创建一个由移动方向扩充后,更大的图片
new_width = int(width + abs(direction[0]))
new_height = int(height + abs(direction[1]))
new_image = np.zeros((new_height, new_width, 4), dtype=image.dtype)
# 先把图片放在新图的和方向相反的一侧
offset_x = int(abs(min(0, direction[0])))
offset_y = int(abs(min(0, direction[1])))
new_image[offset_y:offset_y+height, offset_x:offset_x+width] = image
# 切割线也跟着偏移
slice_lines = [(x + offset_x, y + offset_y) for (x, y) in slice_lines]
# 复制切割线经过的像素,沿着方向移动,实现类似拖尾的效果
apply_trail_effect_vectorized(new_image, slice_lines, direction)
apply_stroke_vectorized(new_image, slice_lines, direction)
return new_image, offset_x, offset_y
def apply_trail_effect_vectorized(new_image, slice_lines, direction):
"""向量化实现拖尾效果"""
height, width = new_image.shape[:2]
# 创建坐标网格
y_coords, x_coords = np.mgrid[0:height, 0:width]
# 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]], axis=-1)
# 计算叉积(有向距离)
cross_products = (line_vec[0] * point_vecs[:, :, 1] -
line_vec[1] * point_vecs[:, :, 0])
# 选择直线右侧的像素 (d1 > 0)
mask = cross_products > 0
# 计算目标位置
target_x = (x_coords + direction[0]).astype(int)
target_y = (y_coords + direction[1]).astype(int)
# 创建有效位置掩码
valid_mask = mask & (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
# 批量复制像素
new_image[target_y[valid_mask], target_x[valid_mask]] = \
new_image[y_coords[valid_mask], x_coords[valid_mask]]
def apply_stroke_vectorized(new_image, slice_lines, direction):
"""使用向量化操作优化笔画效果"""
height, width = new_image.shape[:2]
# 1. 找到所有非透明像素
non_transparent = np.where(new_image[:, :, 3] > 0)
if len(non_transparent[0]) == 0:
return
y_coords, x_coords = non_transparent
# 2. 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.column_stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]])
# 计算叉积(距离)
cross_products = (line_vec[0] * point_vecs[:, 1] -
line_vec[1] * point_vecs[:, 0])
# 3. 选择靠近直线的像素
mask = np.abs(cross_products) < 1.0
selected_y = y_coords[mask]
selected_x = x_coords[mask]
selected_pixels = new_image[selected_y, selected_x]
if len(selected_x) == 0:
return
# 4. 预计算采样点
length = np.sqrt(direction[0]**2 + direction[1]**2)
if length == 0:
return
# 创建采样偏移
dx_dy = np.array([(dx, dy) for dx in [-0.5, 0, 0.5]
for dy in [-0.5, 0, 0.5]])
# 5. 批量计算目标位置
steps = max(1, int(length * 2))
alpha = 0.7
for k in range(1, steps + 1):
# 对所有选中的像素批量计算新位置
scale = k / steps
# 为每个像素和每个采样点计算目标位置
for dx, dy in dx_dy:
target_x = np.round(selected_x + dx + direction[0] * scale).astype(int)
target_y = np.round(selected_y + dy + direction[1] * scale).astype(int)
# 创建有效位置掩码
valid_mask = (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
if np.any(valid_mask):
valid_target_x = target_x[valid_mask]
valid_target_y = target_y[valid_mask]
valid_source_idx = np.where(valid_mask)[0]
# 批量混合像素
source_pixels = selected_pixels[valid_source_idx]
target_pixels = new_image[valid_target_y, valid_target_x]
new_image[valid_target_y, valid_target_x] = (
alpha * source_pixels + (1 - alpha) * target_pixels
)
async def generate_dice_image(number: str) -> BytesIO:
# 将文本转换为带透明背景的图像
text = str(number)
text = number
# 如果文本太长,直接返回金箍棒
if(len(text) > 50):
output = BytesIO()
push_image = Image.open(ASSETS_PATH / "img" / "dice" / "stick.png")
push_image.save(output,format='PNG')
output.seek(0)
return output
text_image = text_to_transparent_image(
text,
font_size=60,
text_color=(0, 0, 0) # 黑色文字
)
# 获取长宽比
height, width = text_image.shape[:2]
aspect_ratio = width / height
# 根据长宽比设置拉伸系数
stretch_k = 1
if aspect_ratio > 1:
stretch_k = aspect_ratio
# 骰子的方向
up_direction = (51 - 16, 5 - 30) # 右上角点 - 左上角点
move_distance = (up_direction[0] * (stretch_k - 1), up_direction[1] * (stretch_k - 1))
# 加载背景图像,保留透明通道
background = cv2.imread(ASSETS_PATH / "img" / "dice" / "template.png", cv2.IMREAD_UNCHANGED)
height, width = background.shape[:2]
background, offset_x, offset_y = slice_and_stretch(background,
[(10,10),(0,0)],
move_distance)
# 定义3D变换的四个角点透视效果
# 顺序: [左上, 右上, 右下, 左下]
corners = np.array([
[16, 30], # 左上
[51, 5], # 右上(上移,创建透视)
[88, 33], # 右下
[51 + move_distance[0], 5 + move_distance[1]], # 右上(上移,创建透视)
[88 + move_distance[0], 33 + move_distance[1]], # 右下
[49, 62] # 左下(下移)
], dtype=np.float32)
# 加载背景图像,保留透明通道
background = cv2.imread(str(ASSETS / "template.png"), cv2.IMREAD_UNCHANGED)
corners[:, 0] += offset_x
corners[:, 1] += offset_y
# 对文本图像进行3D变换保持透明通道
@ -189,9 +363,29 @@ async def generate_dice_image(number: int) -> BytesIO:
pil_final = Image.fromarray(final_image_simple)
# 导入一系列图像
images: list[Image.Image] = [Image.open(ASSETS / f"{i}.png") for i in range(1, 12)]
images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)]
images.append(pil_final)
frame_durations = [100] * (len(images) - 1) + [100000]
# 将导入的图像尺寸扩展为和 pil_final 相同的大小,随帧数进行扩展,然后不放大的情况下放在最中间
if(aspect_ratio > 1):
target_size = pil_final.size
for i in range(len(images) - 1):
k = i / (len(images) - 1)
now_distance = (move_distance[0] * k, move_distance[1] * k)
img = np.array(images[i])
img, _, _ = slice_and_stretch(img,
[(10,10),(0,0)],
now_distance)
# 只扩展边界,图像本身不放大
img_width, img_height = img.shape[1], img.shape[0]
new_img = Image.new("RGBA", target_size, (0, 0, 0, 0))
this_offset_x = (target_size[0] - img_width) // 2
this_offset_y = (target_size[1] - img_height) // 2
# new_img.paste(img, (this_offset_x, this_offset_y))
new_img.paste(Image.fromarray(img), (this_offset_x, this_offset_y))
images[i] = new_img
# 保存为BytesIO对象
output = BytesIO()
images[0].save(output,
@ -200,4 +394,6 @@ async def generate_dice_image(number: int) -> BytesIO:
duration=frame_durations,
format='GIF',
loop=1)
output.seek(0)
# pil_final.save(output, format='PNG')
return output

View File

@ -42,6 +42,24 @@ def get_random_number(min: int = 1, max: int = 6) -> int:
import random
return random.randint(min, max)
def get_random_number_string(min_value: str = "1", max_value: str = "6") -> str:
import random
# 先判断二者是不是整数
if (float(min_value).is_integer()
and float(max_value).is_integer()
and "." not in min_value
and "." not in max_value):
return str(random.randint(int(float(min_value)), int(float(max_value))))
# 根据传入小数的位数,决定保留几位小数
if "." in str(min_value) or "." in str(max_value):
decimal_places = max(len(str(min_value).split(".")[1]) if "." in str(min_value) else 0,
len(str(max_value).split(".")[1]) if "." in str(max_value) else 0)
return str(round(random.uniform(float(min_value), float(max_value)), decimal_places))
# 如果没有小数点,很可能二者都是指数表示或均为 inf直接返回随机小数
return str(random.uniform(float(min_value), float(max_value)))
def roll_number(wide: bool = False) -> str:
raw = number_arts[get_random_number()]
if wide:

View File

@ -0,0 +1,207 @@
import asyncio
import datetime
from pathlib import Path
from typing import Any, Literal, cast
import nonebot
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event
from nonebot.adapters.console import Bot as ConsoleBot
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11.event import \
GroupMessageEvent as OnebotV11GroupMessageEvent
from nonebot.adapters.onebot.v11.event import \
MessageEvent as OnebotV11MessageEvent
from nonebot_plugin_alconna import UniMessage, UniMsg
from pydantic import BaseModel
from konabot.plugins.simple_notify.parse_time import get_target_time
evt = on_message()
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json"
DATA_FILE_LOCK = asyncio.Lock()
class Notify(BaseModel):
platform: Literal["console", "qq", "discord"]
target: str
"需要接受通知的个体"
target_env: str | None
"在哪里进行通知,如果是 None 代表私聊通知"
notify_time: datetime.datetime
notify_msg: str
def get_str(self):
return f"{self.target}-{self.target_env}-{self.platform}-{self.notify_time}"
class NotifyConfigFile(BaseModel):
version: int = 1
notifies: list[Notify] = []
unsent: list[Notify] = []
def load_notify_config() -> NotifyConfigFile:
if not DATA_FILE_PATH.exists():
return NotifyConfigFile()
try:
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text())
except Exception as e:
logger.warning(f"在解析 Notify 时遇到问题:{e}")
return NotifyConfigFile()
def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
async def notify_now(notify: Notify):
if notify.platform == 'console':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'discord':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
channel = await bot.create_DM(recipient_id=int(notify.target))
await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'qq':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
if notify.target_env is None:
await bot.send_private_msg(
user_id=int(notify.target),
message=f"代办通知:{notify.notify_msg}",
)
else:
await bot.send_group_msg(
group_id=int(notify.target_env),
message=cast(Any,
await UniMessage().at(notify.target).text(f" 代办通知:{notify.notify_msg}").export()
),
)
else:
logger.warning(f"提醒未成功发送出去:{notify}")
return False
return True
async def create_notify_task(notify: Notify, fail2remove: bool = True):
async def mission():
begin_time = datetime.datetime.now()
if begin_time < notify.notify_time:
await asyncio.sleep((notify.notify_time - begin_time).total_seconds())
res = await notify_now(notify)
if fail2remove or res:
await DATA_FILE_LOCK.acquire()
cfg = load_notify_config()
cfg.notifies = [n for n in cfg.notifies if n.get_str() != notify.get_str()]
if not res:
cfg.unsent.append(notify)
save_notify_config(cfg)
DATA_FILE_LOCK.release()
else:
pass
return asyncio.create_task(mission())
@evt.handle()
async def _(msg: UniMsg, mEvt: Event):
if mEvt.get_user_id() in nonebot.get_bots():
return
text = msg.extract_plain_text()
if "提醒我" not in text:
return
segments = text.split("提醒我", maxsplit=1)
if len(segments) != 2:
return
notify_time, notify_text = segments
target_time = get_target_time(notify_time)
if target_time is None:
logger.info(f"无法从 {notify_time} 中解析出时间")
return
if not notify_text:
return
await DATA_FILE_LOCK.acquire()
cfg = load_notify_config()
if isinstance(mEvt, ConsoleMessageEvent):
platform = "console"
target = mEvt.get_user_id()
target_env = None
elif isinstance(mEvt, OnebotV11MessageEvent):
platform = "qq"
target = mEvt.get_user_id()
if isinstance(mEvt, OnebotV11GroupMessageEvent):
target_env = str(mEvt.group_id)
else:
target_env = None
elif isinstance(mEvt, DiscordMessageEvent):
platform = "discord"
target = mEvt.get_user_id()
target_env = None
else:
logger.warning(f"Notify 遇到不支持的平台:{type(mEvt).__name__}")
return
notify = Notify(
platform=platform,
target=target,
target_env=target_env,
notify_time=target_time,
notify_msg=notify_text,
)
await create_notify_task(notify)
cfg.notifies.append(notify)
save_notify_config(cfg)
DATA_FILE_LOCK.release()
await evt.send(await UniMessage().at(mEvt.get_user_id()).text(
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export())
driver = nonebot.get_driver()
NOTIFIED_FLAG = {
"task_added": False,
}
@driver.on_bot_connect
async def _():
if NOTIFIED_FLAG["task_added"]:
return
NOTIFIED_FLAG["task_added"] = True
await DATA_FILE_LOCK.acquire()
tasks = []
cfg = load_notify_config()
for notify in cfg.notifies:
tasks.append(create_notify_task(notify, fail2remove=False))
DATA_FILE_LOCK.release()
await asyncio.gather(*tasks)

View File

@ -0,0 +1,358 @@
import datetime
import re
from typing import Optional, Dict, List, Callable, Tuple
from loguru import logger
# --- 常量与正则表达式定义 (Constants and Regex Definitions) ---
# 数字模式,兼容中文和阿拉伯数字
P_NUM = r"(\d+|[零一两二三四五六七八九十]+)"
# 预编译的正则表达式
PATTERNS = {
# 相对时间, e.g., "5分钟后"
"DELTA": re.compile(
r"^"
r"((?P<days>" + P_NUM + r") ?天)?"
r"((?P<hours>" + P_NUM + r") ?个?小?时)?"
r"((?P<minutes>" + P_NUM + r") ?分钟?)?"
r"((?P<seconds>" + P_NUM + r") ?秒钟?)?"
r" ?后 ?$"
),
# 绝对时间
"YEAR": re.compile(r"(" + P_NUM + r") ?年"),
"MONTH": re.compile(r"(" + P_NUM + r") ?月"),
"DAY": re.compile(r"(" + P_NUM + r") ?[日号]"),
"HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"),
"MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"),
"SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"),
"HMS_COLON": re.compile(r"(\d{1,2})[:](\d{1,2})([:](\d{1,2}))?"),
"PM": re.compile(r"(下午|PM|晚上)"),
# 相对日期
"TOMORROW": re.compile(r"明天"),
"DAY_AFTER_TOMORROW": re.compile(r"后天"),
"TODAY": re.compile(r"今天"),
}
# 中文数字到阿拉伯数字的映射
CHINESE_TO_ARABIC_MAP: Dict[str, int] = {
'': 0, '': 1, '': 2, '': 3, '': 4,
'': 5, '': 6, '': 7, '': 8, '': 9, '': 10
}
# --- 核心工具函数 (Core Utility Functions) ---
def parse_number(s: str) -> int:
"""
将包含中文或阿拉伯数字的字符串解析为整数。
例如: "" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。
返回 -1 表示解析失败。
"""
if not s:
return -1
s = s.strip().replace("", "")
if s.isdigit():
return int(s)
if s in CHINESE_TO_ARABIC_MAP:
return CHINESE_TO_ARABIC_MAP[s]
# 处理 "十" 在不同位置的情况
if s.startswith(''):
if len(s) == 1:
return 10
num = CHINESE_TO_ARABIC_MAP.get(s[1])
return 10 + num if num is not None else -1
if s.endswith(''):
if len(s) == 2:
num = CHINESE_TO_ARABIC_MAP.get(s[0])
return 10 * num if num is not None else -1
if '' in s:
parts = s.split('')
if len(parts) == 2:
left = CHINESE_TO_ARABIC_MAP.get(parts[0])
right = CHINESE_TO_ARABIC_MAP.get(parts[1])
if left is not None and right is not None:
return left * 10 + right
return -1
# --- 时间解析器类 (Time Parser Class) ---
class TimeParser:
"""
一个用于解析自然语言时间描述的类。
"""
def __init__(self, content: str):
self.original_content: str = content
self.content_to_parse: str = self._preprocess(content)
self.now: datetime.datetime = datetime.datetime.now()
# 将 t 作为结果构建器,初始化为今天的午夜
self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
self.is_pm_specified: bool = False
self.is_date_specified: bool = False
self.is_time_specified: bool = False
def _preprocess(self, content: str) -> str:
"""预处理字符串,移除不相关字符。"""
content = re.sub(r"\s+", "", content)
content = re.sub(r"[,\.。::、]", "", content)
return content
def _consume_match(self, match: re.Match) -> str:
"""从待解析字符串中移除已匹配的部分。"""
self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1)
return match.group(0)
def parse(self) -> Optional[datetime.datetime]:
"""
主解析方法。
首先尝试解析相对时间如“5分钟后”失败则尝试解析绝对时间。
"""
logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'")
if not self.content_to_parse:
logger.debug("❌ 内容为空,无法解析。")
return None
# 1. 尝试相对时间解析
if (target_time := self._parse_relative_time()) is not None:
return target_time
# 2. 尝试绝对时间解析
if (target_time := self._parse_absolute_time()) is not None:
return target_time
logger.debug(f"❌ 所有解析模式均未匹配成功。")
return None
def _parse_relative_time(self) -> Optional[datetime.datetime]:
"""解析 'X天X小时X分钟后' 这种格式。"""
if match := PATTERNS["DELTA"].match(self.content_to_parse):
logger.debug("⏳ 匹配到相对时间模式 (DELTA)。")
try:
delta_parts = {
"days": parse_number(match.group("days") or "0"),
"hours": parse_number(match.group("hours") or "0"),
"minutes": parse_number(match.group("minutes") or "0"),
"seconds": parse_number(match.group("seconds") or "0"),
}
# 检查是否有无效的数字解析
if any(v < 0 for v in delta_parts.values()):
logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}")
return None
delta = datetime.timedelta(**delta_parts)
if delta.total_seconds() == 0:
logger.debug("❌ 解析出的时间增量为0。")
return None
target_time = self.now + delta
logger.debug(f"✅ 相对时间解析成功 -> {target_time}")
return target_time
except (ValueError, TypeError) as e:
logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True)
return None
return None
def _parse_absolute_time(self) -> Optional[datetime.datetime]:
"""解析一个指定的日期和时间。"""
logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}")
# 定义解析步骤和顺序
# (pattern_key, handler_method)
parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [
("TOMORROW", self._handle_tomorrow),
("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow),
("TODAY", self._handle_today),
("YEAR", self._handle_year),
("MONTH", self._handle_month),
("DAY", self._handle_day),
("HMS_COLON", self._handle_hms_colon),
("PM", self._handle_pm),
("HOUR", self._handle_hour),
("MINUTE", self._handle_minute),
("SECOND", self._handle_second),
]
for key, handler in parsing_steps:
if match := PATTERNS[key].search(self.content_to_parse):
if not handler(match):
# 如果任何一个处理器返回False说明解析失败
return None
# 移除无意义的上午关键词
self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "")
# 如果解析后还有剩余字符,说明有无法识别的部分
if self.content_to_parse.strip():
logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'")
return None
# 最终调整和检查
return self._finalize_datetime()
# --- Handler Methods for Absolute Time Parsing ---
def _handle_tomorrow(self, match: re.Match) -> bool:
self.t += datetime.timedelta(days=1)
self.is_date_specified = True
logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_day_after_tomorrow(self, match: re.Match) -> bool:
self.t += datetime.timedelta(days=2)
self.is_date_specified = True
logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_today(self, match: re.Match) -> bool:
self.is_date_specified = True
logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'")
return True
def _handle_year(self, match: re.Match) -> bool:
year = parse_number(match.group(1))
if year < 0: return False
if year < 100: year += 2000 # 处理 "25年" -> 2025
if year < self.now.year:
logger.debug(f"❌ 指定的年份 {year} 已过去。")
return False
self.t = self.t.replace(year=year)
self.is_date_specified = True
logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_month(self, match: re.Match) -> bool:
month = parse_number(match.group(1))
if not (1 <= month <= 12):
logger.debug(f"❌ 无效的月份: {month}")
return False
# 如果设置的月份在当前月份之前,且没有指定年份,则年份加一
if month < self.t.month and not self.is_date_specified:
self.t = self.t.replace(year=self.t.year + 1)
logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}")
self.t = self.t.replace(month=month)
self.is_date_specified = True
logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_day(self, match: re.Match) -> bool:
day = parse_number(match.group(1))
if not (1 <= day <= 31):
logger.debug(f"❌ 无效的日期: {day}")
return False
try:
# 如果日期小于当前日期,且只指定了日,则月份加一
if day < self.t.day and not self.is_date_specified:
if self.t.month == 12:
self.t = self.t.replace(year=self.t.year + 1, month=1)
else:
self.t = self.t.replace(month=self.t.month + 1)
logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}")
self.t = self.t.replace(day=day)
self.is_date_specified = True
logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'")
return True
except ValueError:
logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。")
return False
def _handle_hms_colon(self, match: re.Match) -> bool:
h = int(match.group(1))
m = int(match.group(2))
s_str = match.group(4) # group(3) is with colon, group(4) is the number
s = int(s_str) if s_str else 0
if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59):
logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}")
return False
self.t = self.t.replace(hour=h, minute=m, second=s)
self.is_time_specified = True
logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_pm(self, match: re.Match) -> bool:
self.is_pm_specified = True
logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'")
return True
def _handle_hour(self, match: re.Match) -> bool:
hour = parse_number(match.group(1))
has_half = match.group(2) == ''
if not (0 <= hour <= 23):
logger.debug(f"❌ 无效的小时: {hour}")
return False
minute = 30 if has_half else self.t.minute
self.t = self.t.replace(hour=hour, minute=minute)
self.is_time_specified = True
logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_minute(self, match: re.Match) -> bool:
minute = parse_number(match.group(1))
if not (0 <= minute <= 59):
logger.debug(f"❌ 无效的分钟: {minute}")
return False
self.t = self.t.replace(minute=minute)
self.is_time_specified = True
logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_second(self, match: re.Match) -> bool:
second = parse_number(match.group(1))
if not (0 <= second <= 59):
logger.debug(f"❌ 无效的秒: {second}")
return False
self.t = self.t.replace(second=second)
self.is_time_specified = True
logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'")
return True
def _finalize_datetime(self) -> Optional[datetime.datetime]:
"""对解析出的时间进行最后的调整和检查。"""
# 处理下午/晚上
if self.is_pm_specified and self.t.hour < 12:
self.t = self.t.replace(hour=self.t.hour + 12)
logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}")
# 如果没有指定任何时间或日期部分,则认为解析无效
if not self.is_date_specified and not self.is_time_specified:
logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。")
return None
# 如果最终计算出的时间点在当前时间之前,自动往后推
# 例如:现在是 15:00说 "14点"应该是指明天的14点
if self.t < self.now:
# 只有在明确指定了时间的情况下,才自动加一天
# 如果只指定了一个过去的日期如“去年5月1号”则不应该调整
if self.is_time_specified:
self.t += datetime.timedelta(days=1)
logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}")
logger.debug(f"✅ 解析成功,最终时间: {self.t}")
return self.t
# --- 公共接口 (Public Interface) ---
def get_target_time(content: str) -> Optional[datetime.datetime]:
"""
高级接口,用于将自然语言时间描述转换为 datetime 对象。
Args:
content: 包含时间信息的字符串。
Returns:
一个 datetime 对象,如果解析失败则返回 None。
"""
parser = TimeParser(content)
return parser.parse()

0
scripts/__init__.py Normal file
View File

View File

@ -0,0 +1,21 @@
from pathlib import Path
import nonebot
from loguru import logger
nonebot.init()
nonebot.load_plugins("konabot/plugins")
plugins = nonebot.get_loaded_plugins()
len_requires = len(
[f for f in (
Path(__file__).parent.parent / "konabot" / "plugins"
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
)
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]
logger.info(f"已经加载的插件数量 {len(plugins)}")
logger.info(f"期待加载的插件数量 {len_requires}")
assert len(plugins) == len_requires