Compare commits

...

22 Commits

Author SHA1 Message Date
8c4fa2b5e4 Merge pull request 'fix: 透明底正常生成;静动图分离完成' (#18) from tnot/konabot:fix--修复部分Bug into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #18
2025-10-01 19:24:23 +08:00
fb2c3f1ce2 fix: 透明底正常生成;静动图分离完成 2025-10-01 11:54:54 +08:00
265415e727 Merge pull request 'feat: ytpgif' (#16) from tnot/konabot:feat--ytpgif into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #16
2025-09-30 22:38:15 +08:00
06555b2225 feat: ytpgif 2025-09-30 22:24:16 +08:00
f6fd25a41d fix: 添加 Notify 任务创建限制
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 19:08:30 +08:00
9f6c70bf0f 合并后的部分骰子路径修复
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 15:21:53 +08:00
1c01e49d5d Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-09-30 15:18:16 +08:00
48c719bc33 全新骰子 2025-09-30 15:13:59 +08:00
6bc9f94e83 拷贝环境变量
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 02:45:01 +08:00
deab2d7b2b 更新 PYTHONPATH 环境变量
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 02:39:30 +08:00
2a6abbe0d4 置入 __init__ 文件
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:35:10 +08:00
30bdc50024 将 pyproject.toml 置入项目目录
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:33:03 +08:00
be8b1b9999 修复文件夹错误展开的问题
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:28:54 +08:00
43d0a09de2 修复 docker 在 CI 不在线的问题
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:24:50 +08:00
6e0082c1c9 大变:移动了很多很多文件并优化构建流程
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:23:40 +08:00
3b8b060c5b Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-30 02:09:26 +08:00
8cfe58c7dd 添加测试流水线 2025-09-30 02:08:35 +08:00
f997bf945a Merge pull request '解决了提醒事项功能的若干 Issue' (#13) from fix-提醒功能若干问题修复 into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #13
2025-09-30 02:02:22 +08:00
0dbe164703 先调整到一个可用的状态 2025-09-30 01:59:40 +08:00
818f2b64ec 修复汉语数字问题 2025-09-30 00:43:09 +08:00
a855c69f61 添加「半」钟
All checks were successful
continuous-integration/drone/tag Build is passing
continuous-integration/drone/push Build is passing
2025-09-29 23:25:09 +08:00
90ee296f55 调整UX 2025-09-29 23:22:12 +08:00
37 changed files with 943 additions and 189 deletions

View File

@ -26,6 +26,14 @@ steps:
volumes:
- name: docker-socket
path: /var/run/docker.sock
- name: 在容器中测试插件加载
image: docker:dind
privileged: true
volumes:
- name: docker-socket
path: /var/run/docker.sock
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
volumes:
- name: docker-socket

4
.env.test Normal file
View File

@ -0,0 +1,4 @@
ENVIRONMENT=test
ENABLE_CONSOLE=false
ENABLE_QQ=false
ENABLE_DISCORD=false

View File

@ -4,5 +4,11 @@ WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt --no-deps
COPY . .
COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets
COPY scripts ./scripts
COPY konabot ./konabot
ENV PYTHONPATH=/app
CMD [ "python", "bot.py" ]

View File

Before

Width:  |  Height:  |  Size: 9.2 KiB

After

Width:  |  Height:  |  Size: 9.2 KiB

View File

Before

Width:  |  Height:  |  Size: 10 KiB

After

Width:  |  Height:  |  Size: 10 KiB

View File

Before

Width:  |  Height:  |  Size: 8.7 KiB

After

Width:  |  Height:  |  Size: 8.7 KiB

View File

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View File

Before

Width:  |  Height:  |  Size: 10 KiB

After

Width:  |  Height:  |  Size: 10 KiB

View File

Before

Width:  |  Height:  |  Size: 8.7 KiB

After

Width:  |  Height:  |  Size: 8.7 KiB

View File

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View File

Before

Width:  |  Height:  |  Size: 9.3 KiB

After

Width:  |  Height:  |  Size: 9.3 KiB

View File

Before

Width:  |  Height:  |  Size: 10 KiB

After

Width:  |  Height:  |  Size: 10 KiB

View File

Before

Width:  |  Height:  |  Size: 8.7 KiB

After

Width:  |  Height:  |  Size: 8.7 KiB

View File

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View File

Before

Width:  |  Height:  |  Size: 9.3 KiB

After

Width:  |  Height:  |  Size: 9.3 KiB

BIN
assets/img/dice/stick.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

View File

Before

Width:  |  Height:  |  Size: 9.0 KiB

After

Width:  |  Height:  |  Size: 9.0 KiB

View File

Before

Width:  |  Height:  |  Size: 219 KiB

After

Width:  |  Height:  |  Size: 219 KiB

View File

Before

Width:  |  Height:  |  Size: 272 KiB

After

Width:  |  Height:  |  Size: 272 KiB

0
konabot/__init__.py Normal file
View File

4
konabot/common/path.py Normal file
View File

@ -0,0 +1,4 @@
from pathlib import Path
ASSETS_PATH = Path(__file__).resolve().parent.parent.parent / "assets"
FONTS_PATH = ASSETS_PATH / "fonts"

View File

@ -1,8 +1,8 @@
from imagetext_py import EmojiOptions, FontDB
from .path import ASSETS
from konabot.common.path import FONTS_PATH
FontDB.LoadFromDir(str(ASSETS))
FontDB.LoadFromDir(str(FONTS_PATH))
FontDB.SetDefaultEmojiOptions(EmojiOptions(
parse_shortcodes=False,

View File

@ -1,3 +0,0 @@
from pathlib import Path
ASSETS = Path(__file__).parent.parent.parent / "assets"

View File

@ -4,10 +4,11 @@ from typing import Any, cast
import imagetext_py
import PIL.Image
from .base.fonts import HARMONYOS_SANS_SC_BLACK
from .base.path import ASSETS
from konabot.common.path import ASSETS_PATH
geimao_image = PIL.Image.open(ASSETS / "geimao.jpg").convert("RGBA")
from .base.fonts import HARMONYOS_SANS_SC_BLACK
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
def _draw_geimao(saying: str):

View File

@ -3,10 +3,11 @@ import asyncio
import imagetext_py
import PIL.Image
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
from .base.path import ASSETS
from konabot.common.path import ASSETS_PATH
pt_image = PIL.Image.open(ASSETS / "ptsay.png").convert("RGBA")
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
def _draw_pt(saying: str):

View File

@ -1,11 +1,11 @@
from typing import Optional
from typing import Optional, Union
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.plugins.roll_dice.roll_dice import generate_dice_image
from konabot.plugins.roll_dice.roll_number import get_random_number, roll_number
from konabot.plugins.roll_dice.roll_number import get_random_number, get_random_number_string, roll_number
evt = on_alconna(Alconna(
"摇数字"
@ -22,21 +22,26 @@ async def _(event: BaseEvent):
evt = on_alconna(Alconna(
"摇骰子",
Args["f1?", int]["f2?", int]
Args["f1?", str]["f2?", str]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, f1: Optional[int] = None, f2: Optional[int] = None):
async def _(event: BaseEvent, f1: Optional[str] = None, f2: Optional[str] = None):
# if isinstance(event, DiscordMessageEvent):
# await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export())
# elif isinstance(event, ConsoleMessageEvent):
number = 0
number = ""
if(f1 is not None and f2 is not None):
number = get_random_number(f1, f2)
number = get_random_number_string(f1, f2)
elif f1 is not None:
number = get_random_number(1, f1)
if(float(f1) > 1):
number = get_random_number_string("1", f1)
elif (float(f1) > 0):
number = get_random_number_string("0", f1)
else:
number = get_random_number_string(f1, "0")
else:
number = get_random_number()
number = get_random_number_string()
await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export())
# else:
# await evt.send(await UniMessage().text(roll_dice(wide=True)).export())

View File

@ -1,9 +1,11 @@
from io import BytesIO
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from konabot.plugins.roll_dice.base.path import ASSETS
from konabot.common.path import ASSETS_PATH, FONTS_PATH
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
"""
@ -13,15 +15,7 @@ def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0
temp_image = Image.new('RGB', (1, 1), (255, 255, 255))
temp_draw = ImageDraw.Draw(temp_image)
font = ImageFont.truetype(ASSETS / "montserrat.otf", font_size)
# try:
# font = ImageFont.truetype(ASSETS / "montserrat.otf", font_size)
# except:
# try:
# font = ImageFont.truetype("arial.ttf", font_size)
# except:
# # 如果系统字体不可用,使用默认字体
# font = ImageFont.load_default()
font = ImageFont.truetype(FONTS_PATH / "montserrat.otf", font_size)
# 获取文本边界框
bbox = temp_draw.textbbox((0, 0), text, font=font)
@ -158,26 +152,206 @@ def precise_blend_with_perspective(background, foreground, corners):
return result
async def generate_dice_image(number: int) -> BytesIO:
def draw_line_bresenham(image, x0, y0, x1, y1, color):
"""使用Bresenham算法画线避免间隙"""
dx = abs(x1 - x0)
dy = abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx - dy
while True:
if 0 <= x0 < image.shape[1] and 0 <= y0 < image.shape[0]:
image[y0, x0] = color
if x0 == x1 and y0 == y1:
break
e2 = 2 * err
if e2 > -dy:
err -= dy
x0 += sx
if e2 < dx:
err += dx
y0 += sy
def slice_and_stretch(image, slice_lines, direction):
'''
image: 图像
slice_lines: 切割线两个点的列表一般是倾斜45度的直线
direction: 移动方向向量(二元数组)
'''
# 获取图片的尺寸
height, width = image.shape[:2]
# 创建一个由移动方向扩充后,更大的图片
new_width = int(width + abs(direction[0]))
new_height = int(height + abs(direction[1]))
new_image = np.zeros((new_height, new_width, 4), dtype=image.dtype)
# 先把图片放在新图的和方向相反的一侧
offset_x = int(abs(min(0, direction[0])))
offset_y = int(abs(min(0, direction[1])))
new_image[offset_y:offset_y+height, offset_x:offset_x+width] = image
# 切割线也跟着偏移
slice_lines = [(x + offset_x, y + offset_y) for (x, y) in slice_lines]
# 复制切割线经过的像素,沿着方向移动,实现类似拖尾的效果
apply_trail_effect_vectorized(new_image, slice_lines, direction)
apply_stroke_vectorized(new_image, slice_lines, direction)
return new_image, offset_x, offset_y
def apply_trail_effect_vectorized(new_image, slice_lines, direction):
"""向量化实现拖尾效果"""
height, width = new_image.shape[:2]
# 创建坐标网格
y_coords, x_coords = np.mgrid[0:height, 0:width]
# 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]], axis=-1)
# 计算叉积(有向距离)
cross_products = (line_vec[0] * point_vecs[:, :, 1] -
line_vec[1] * point_vecs[:, :, 0])
# 选择直线右侧的像素 (d1 > 0)
mask = cross_products > 0
# 计算目标位置
target_x = (x_coords + direction[0]).astype(int)
target_y = (y_coords + direction[1]).astype(int)
# 创建有效位置掩码
valid_mask = mask & (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
# 批量复制像素
new_image[target_y[valid_mask], target_x[valid_mask]] = \
new_image[y_coords[valid_mask], x_coords[valid_mask]]
def apply_stroke_vectorized(new_image, slice_lines, direction):
"""使用向量化操作优化笔画效果"""
height, width = new_image.shape[:2]
# 1. 找到所有非透明像素
non_transparent = np.where(new_image[:, :, 3] > 0)
if len(non_transparent[0]) == 0:
return
y_coords, x_coords = non_transparent
# 2. 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.column_stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]])
# 计算叉积(距离)
cross_products = (line_vec[0] * point_vecs[:, 1] -
line_vec[1] * point_vecs[:, 0])
# 3. 选择靠近直线的像素
mask = np.abs(cross_products) < 1.0
selected_y = y_coords[mask]
selected_x = x_coords[mask]
selected_pixels = new_image[selected_y, selected_x]
if len(selected_x) == 0:
return
# 4. 预计算采样点
length = np.sqrt(direction[0]**2 + direction[1]**2)
if length == 0:
return
# 创建采样偏移
dx_dy = np.array([(dx, dy) for dx in [-0.5, 0, 0.5]
for dy in [-0.5, 0, 0.5]])
# 5. 批量计算目标位置
steps = max(1, int(length * 2))
alpha = 0.7
for k in range(1, steps + 1):
# 对所有选中的像素批量计算新位置
scale = k / steps
# 为每个像素和每个采样点计算目标位置
for dx, dy in dx_dy:
target_x = np.round(selected_x + dx + direction[0] * scale).astype(int)
target_y = np.round(selected_y + dy + direction[1] * scale).astype(int)
# 创建有效位置掩码
valid_mask = (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
if np.any(valid_mask):
valid_target_x = target_x[valid_mask]
valid_target_y = target_y[valid_mask]
valid_source_idx = np.where(valid_mask)[0]
# 批量混合像素
source_pixels = selected_pixels[valid_source_idx]
target_pixels = new_image[valid_target_y, valid_target_x]
new_image[valid_target_y, valid_target_x] = (
alpha * source_pixels + (1 - alpha) * target_pixels
)
async def generate_dice_image(number: str) -> BytesIO:
# 将文本转换为带透明背景的图像
text = str(number)
text = number
# 如果文本太长,直接返回金箍棒
if(len(text) > 50):
output = BytesIO()
push_image = Image.open(ASSETS_PATH / "img" / "dice" / "stick.png")
push_image.save(output,format='PNG')
output.seek(0)
return output
text_image = text_to_transparent_image(
text,
font_size=60,
text_color=(0, 0, 0) # 黑色文字
)
# 获取长宽比
height, width = text_image.shape[:2]
aspect_ratio = width / height
# 根据长宽比设置拉伸系数
stretch_k = 1
if aspect_ratio > 1:
stretch_k = aspect_ratio
# 骰子的方向
up_direction = (51 - 16, 5 - 30) # 右上角点 - 左上角点
move_distance = (up_direction[0] * (stretch_k - 1), up_direction[1] * (stretch_k - 1))
# 加载背景图像,保留透明通道
background = cv2.imread(ASSETS_PATH / "img" / "dice" / "template.png", cv2.IMREAD_UNCHANGED)
height, width = background.shape[:2]
background, offset_x, offset_y = slice_and_stretch(background,
[(10,10),(0,0)],
move_distance)
# 定义3D变换的四个角点透视效果
# 顺序: [左上, 右上, 右下, 左下]
corners = np.array([
[16, 30], # 左上
[51, 5], # 右上(上移,创建透视)
[88, 33], # 右下
[51 + move_distance[0], 5 + move_distance[1]], # 右上(上移,创建透视)
[88 + move_distance[0], 33 + move_distance[1]], # 右下
[49, 62] # 左下(下移)
], dtype=np.float32)
# 加载背景图像,保留透明通道
background = cv2.imread(str(ASSETS / "template.png"), cv2.IMREAD_UNCHANGED)
corners[:, 0] += offset_x
corners[:, 1] += offset_y
# 对文本图像进行3D变换保持透明通道
@ -189,9 +363,29 @@ async def generate_dice_image(number: int) -> BytesIO:
pil_final = Image.fromarray(final_image_simple)
# 导入一系列图像
images: list[Image.Image] = [Image.open(ASSETS / f"{i}.png") for i in range(1, 12)]
images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)]
images.append(pil_final)
frame_durations = [100] * (len(images) - 1) + [100000]
# 将导入的图像尺寸扩展为和 pil_final 相同的大小,随帧数进行扩展,然后不放大的情况下放在最中间
if(aspect_ratio > 1):
target_size = pil_final.size
for i in range(len(images) - 1):
k = i / (len(images) - 1)
now_distance = (move_distance[0] * k, move_distance[1] * k)
img = np.array(images[i])
img, _, _ = slice_and_stretch(img,
[(10,10),(0,0)],
now_distance)
# 只扩展边界,图像本身不放大
img_width, img_height = img.shape[1], img.shape[0]
new_img = Image.new("RGBA", target_size, (0, 0, 0, 0))
this_offset_x = (target_size[0] - img_width) // 2
this_offset_y = (target_size[1] - img_height) // 2
# new_img.paste(img, (this_offset_x, this_offset_y))
new_img.paste(Image.fromarray(img), (this_offset_x, this_offset_y))
images[i] = new_img
# 保存为BytesIO对象
output = BytesIO()
images[0].save(output,
@ -200,4 +394,6 @@ async def generate_dice_image(number: int) -> BytesIO:
duration=frame_durations,
format='GIF',
loop=1)
output.seek(0)
# pil_final.save(output, format='PNG')
return output

View File

@ -42,6 +42,24 @@ def get_random_number(min: int = 1, max: int = 6) -> int:
import random
return random.randint(min, max)
def get_random_number_string(min_value: str = "1", max_value: str = "6") -> str:
import random
# 先判断二者是不是整数
if (float(min_value).is_integer()
and float(max_value).is_integer()
and "." not in min_value
and "." not in max_value):
return str(random.randint(int(float(min_value)), int(float(max_value))))
# 根据传入小数的位数,决定保留几位小数
if "." in str(min_value) or "." in str(max_value):
decimal_places = max(len(str(min_value).split(".")[1]) if "." in str(min_value) else 0,
len(str(max_value).split(".")[1]) if "." in str(max_value) else 0)
return str(round(random.uniform(float(min_value), float(max_value)), decimal_places))
# 如果没有小数点,很可能二者都是指数表示或均为 inf直接返回随机小数
return str(random.uniform(float(min_value), float(max_value)))
def roll_number(wide: bool = False) -> str:
raw = number_arts[get_random_number()]
if wide:

View File

@ -1,6 +1,5 @@
import asyncio
import datetime
import re
from pathlib import Path
from typing import Any, Literal, cast
@ -20,152 +19,7 @@ from nonebot.adapters.onebot.v11.event import \
from nonebot_plugin_alconna import UniMessage, UniMsg
from pydantic import BaseModel
PATTERN_DELTA_HMS = re.compile(r"^((\d+|[零一两二三四五六七八九十]+) ?天)?((\d+|[零一两二三四五六七八九十]+) ?个?小?时)?((\d+|[零一两二三四五六七八九十]+) ?分钟?)?((\d+|[零一两二三四五六七八九十]+) ?秒钟?)? ?后 ?$")
PATTERN_DATE_SPECIFY = re.compile(r"(\d{1,2}|[零一二三四五六七八九十]+) ?[日号]")
PATTERN_MONTH_SPECIFY = re.compile(r"(\d{1,2}|[零一二三四五六七八九十]+) ?月")
PATTERN_YEAR_SPECIFY = re.compile(r"(\d|[零一二三四五六七八九十]+) ?年")
PATTERN_HOUR_SPECIFY = re.compile(r"(\d|[零一二三四五六七八九十]+) ?[点时]钟?")
PATTERN_MINUTE_SPECIFY = re.compile(r"(\d|[零一二三四五六七八九十]+) ?分(钟)?")
PATTERN_SECOND_SPECIFY = re.compile(r"(\d|[零一二三四五六七八九十]+) ?秒(钟)?")
PATTERN_HMS_SPECIFY = re.compile(r"\d\d[:]\d\d([:]\d\d)?")
PATTERN_PM_SPECIFY = re.compile(r"(下午|PM|晚上)")
def parse_chinese_or_digit(s: str) -> int:
if set(s) <= set("0123456789"):
return int(s)
s = s.replace("", "")
chinese_digits = {
'': 1, '': 2, '': 3, '': 4, '': 5,
'': 6, '': 7, '': 8, '': 9, '': 10
}
if s in chinese_digits:
return chinese_digits[s]
if len(s) == 2 and s[0] == '':
if s[1] in chinese_digits:
return 10 + chinese_digits[s[1]] - 1 # e.g., "十一" = 10 + 1 = 11
try:
chinese_to_arabic = {
'': 0, '': 1, '': 2, '': 3, '': 4,
'': 5, '': 6, '': 7, '': 8, '': 9,
'': 10
}
if s in chinese_to_arabic:
return chinese_to_arabic[s]
if len(s) == 2 and s[0] == '' and s[1] in chinese_to_arabic:
return 10 + chinese_to_arabic[s[1]]
except (ValueError, KeyError):
pass
try:
return int(s)
except ValueError:
return -1
def get_target_time(content: str) -> datetime.datetime | None:
if match := re.match(PATTERN_DELTA_HMS, content.strip()):
days = parse_chinese_or_digit(match.group(2) or "0")
hours = parse_chinese_or_digit(match.group(4) or "0")
minutes = parse_chinese_or_digit(match.group(6) or "0")
seconds = parse_chinese_or_digit(match.group(8) or "0")
return datetime.datetime.now() + datetime.timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
t = datetime.datetime.now()
content_to_match = content
if "明天" in content_to_match:
content_to_match = "".join(content_to_match.split("明天"))
t += datetime.timedelta(days=1)
elif "后天" in content_to_match:
content_to_match = "".join(content_to_match.split("后天"))
t += datetime.timedelta(days=2)
elif "今天" in content_to_match:
content_to_match = "".join(content_to_match.split("今天"))
if match1 := re.match(PATTERN_DATE_SPECIFY, content_to_match):
content_to_match = "".join(content_to_match.split(match1.group(0)))
day = parse_chinese_or_digit(match1.group(1))
if day <= 0 or day > 31:
return
if day < t.day:
if t.month == 12:
t = t.replace(year=t.year + 1, month=1, day=day)
else:
t = t.replace(month=t.month + 1, day=day)
else:
t = t.replace(day=day)
if match2 := re.match(PATTERN_MONTH_SPECIFY, content_to_match):
content_to_match = "".join(content_to_match.split(match2.group(0)))
month = parse_chinese_or_digit(match2.group(1))
if month <= 0 or month > 12:
return
if month < t.month:
t = t.replace(year=t.year + 1, month=month)
else:
t = t.replace(month=month)
if match3 := re.match(PATTERN_YEAR_SPECIFY, content_to_match):
content_to_match = "".join(content_to_match.split(match3.group(0)))
year = parse_chinese_or_digit(match3.group(1))
if year < 100:
year += 2000
if year < t.year:
return
t = t.replace(year=year)
if match4 := re.match(PATTERN_HOUR_SPECIFY, content_to_match):
content_to_match = "".join(content_to_match.split(match4.group(0)))
hour = parse_chinese_or_digit(match4.group(1))
if hour < 0 or hour > 23:
return
t = t.replace(hour=hour)
if match5 := re.match(PATTERN_MINUTE_SPECIFY, content_to_match):
content_to_match = "".join(content_to_match.split(match5.group(0)))
minute = parse_chinese_or_digit(match5.group(1))
if minute < 0 or minute > 59:
return
t = t.replace(minute=minute)
if match6 := re.match(PATTERN_SECOND_SPECIFY, content_to_match):
content_to_match = "".join(content_to_match.split(match6.group(0)))
second = parse_chinese_or_digit(match6.group(1))
if second < 0 or second > 59:
return
t = t.replace(second=second)
if match7 := re.match(PATTERN_HMS_SPECIFY, content_to_match):
content_to_match = "".join(content_to_match.split(match7.group(0)))
hms = match7.group(0).replace("", ":").split(":")
if len(hms) >= 2:
hour = int(hms[0])
minute = int(hms[1])
if hour < 0 or hour > 23 or minute < 0 or minute > 59:
return
t = t.replace(hour=hour, minute=minute)
if len(hms) == 3:
second = int(hms[2])
if second < 0 or second > 59:
return
t = t.replace(second=second)
content_to_match = content_to_match.replace("上午", "").replace("AM", "").replace("凌晨", "")
if match8 := re.match(PATTERN_PM_SPECIFY, content_to_match):
content_to_match = "".join(content_to_match.split(match8.group(0)))
if t.hour < 12:
t = t.replace(hour=t.hour + 12)
if t.hour == 12:
t += datetime.timedelta(hours=12)
if len(content_to_match.strip()) != 0:
return
if t < datetime.datetime.now():
t += datetime.timedelta(days=1)
return t
from konabot.plugins.simple_notify.parse_time import get_target_time
evt = on_message()
@ -241,7 +95,7 @@ async def notify_now(notify: Notify):
await bot.send_group_msg(
group_id=int(notify.target_env),
message=cast(Any,
await UniMessage().at(notify.target).text(f"代办通知:{notify.notify_msg}").export()
await UniMessage().at(notify.target).text(f" 代办通知:{notify.notify_msg}").export()
),
)
else:
@ -271,11 +125,14 @@ async def create_notify_task(notify: Notify, fail2remove: bool = True):
@evt.handle()
async def _(msg: UniMsg, mEvt: Event):
if mEvt.get_user_id() in nonebot.get_bots():
return
text = msg.extract_plain_text()
if "提醒我" not in text:
return
segments = text.split("提醒我")
segments = text.split("提醒我", maxsplit=1)
if len(segments) != 2:
return
@ -321,15 +178,25 @@ async def _(msg: UniMsg, mEvt: Event):
cfg.notifies.append(notify)
save_notify_config(cfg)
DATA_FILE_LOCK.release()
await evt.send(f"了解啦!将会在 {notify.notify_time} 提醒你哦~")
await evt.send(await UniMessage().at(mEvt.get_user_id()).text(
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export())
driver = nonebot.get_driver()
NOTIFIED_FLAG = {
"task_added": False,
}
@driver.on_bot_connect
async def _():
if NOTIFIED_FLAG["task_added"]:
return
NOTIFIED_FLAG["task_added"] = True
await DATA_FILE_LOCK.acquire()
tasks = []
cfg = load_notify_config()

View File

@ -0,0 +1,358 @@
import datetime
import re
from typing import Optional, Dict, List, Callable, Tuple
from loguru import logger
# --- 常量与正则表达式定义 (Constants and Regex Definitions) ---
# 数字模式,兼容中文和阿拉伯数字
P_NUM = r"(\d+|[零一两二三四五六七八九十]+)"
# 预编译的正则表达式
PATTERNS = {
# 相对时间, e.g., "5分钟后"
"DELTA": re.compile(
r"^"
r"((?P<days>" + P_NUM + r") ?天)?"
r"((?P<hours>" + P_NUM + r") ?个?小?时)?"
r"((?P<minutes>" + P_NUM + r") ?分钟?)?"
r"((?P<seconds>" + P_NUM + r") ?秒钟?)?"
r" ?后 ?$"
),
# 绝对时间
"YEAR": re.compile(r"(" + P_NUM + r") ?年"),
"MONTH": re.compile(r"(" + P_NUM + r") ?月"),
"DAY": re.compile(r"(" + P_NUM + r") ?[日号]"),
"HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"),
"MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"),
"SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"),
"HMS_COLON": re.compile(r"(\d{1,2})[:](\d{1,2})([:](\d{1,2}))?"),
"PM": re.compile(r"(下午|PM|晚上)"),
# 相对日期
"TOMORROW": re.compile(r"明天"),
"DAY_AFTER_TOMORROW": re.compile(r"后天"),
"TODAY": re.compile(r"今天"),
}
# 中文数字到阿拉伯数字的映射
CHINESE_TO_ARABIC_MAP: Dict[str, int] = {
'': 0, '': 1, '': 2, '': 3, '': 4,
'': 5, '': 6, '': 7, '': 8, '': 9, '': 10
}
# --- 核心工具函数 (Core Utility Functions) ---
def parse_number(s: str) -> int:
"""
将包含中文或阿拉伯数字的字符串解析为整数。
例如: "" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。
返回 -1 表示解析失败。
"""
if not s:
return -1
s = s.strip().replace("", "")
if s.isdigit():
return int(s)
if s in CHINESE_TO_ARABIC_MAP:
return CHINESE_TO_ARABIC_MAP[s]
# 处理 "十" 在不同位置的情况
if s.startswith(''):
if len(s) == 1:
return 10
num = CHINESE_TO_ARABIC_MAP.get(s[1])
return 10 + num if num is not None else -1
if s.endswith(''):
if len(s) == 2:
num = CHINESE_TO_ARABIC_MAP.get(s[0])
return 10 * num if num is not None else -1
if '' in s:
parts = s.split('')
if len(parts) == 2:
left = CHINESE_TO_ARABIC_MAP.get(parts[0])
right = CHINESE_TO_ARABIC_MAP.get(parts[1])
if left is not None and right is not None:
return left * 10 + right
return -1
# --- 时间解析器类 (Time Parser Class) ---
class TimeParser:
"""
一个用于解析自然语言时间描述的类。
"""
def __init__(self, content: str):
self.original_content: str = content
self.content_to_parse: str = self._preprocess(content)
self.now: datetime.datetime = datetime.datetime.now()
# 将 t 作为结果构建器,初始化为今天的午夜
self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
self.is_pm_specified: bool = False
self.is_date_specified: bool = False
self.is_time_specified: bool = False
def _preprocess(self, content: str) -> str:
"""预处理字符串,移除不相关字符。"""
content = re.sub(r"\s+", "", content)
content = re.sub(r"[,\.。::、]", "", content)
return content
def _consume_match(self, match: re.Match) -> str:
"""从待解析字符串中移除已匹配的部分。"""
self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1)
return match.group(0)
def parse(self) -> Optional[datetime.datetime]:
"""
主解析方法。
首先尝试解析相对时间如“5分钟后”失败则尝试解析绝对时间。
"""
logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'")
if not self.content_to_parse:
logger.debug("❌ 内容为空,无法解析。")
return None
# 1. 尝试相对时间解析
if (target_time := self._parse_relative_time()) is not None:
return target_time
# 2. 尝试绝对时间解析
if (target_time := self._parse_absolute_time()) is not None:
return target_time
logger.debug(f"❌ 所有解析模式均未匹配成功。")
return None
def _parse_relative_time(self) -> Optional[datetime.datetime]:
"""解析 'X天X小时X分钟后' 这种格式。"""
if match := PATTERNS["DELTA"].match(self.content_to_parse):
logger.debug("⏳ 匹配到相对时间模式 (DELTA)。")
try:
delta_parts = {
"days": parse_number(match.group("days") or "0"),
"hours": parse_number(match.group("hours") or "0"),
"minutes": parse_number(match.group("minutes") or "0"),
"seconds": parse_number(match.group("seconds") or "0"),
}
# 检查是否有无效的数字解析
if any(v < 0 for v in delta_parts.values()):
logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}")
return None
delta = datetime.timedelta(**delta_parts)
if delta.total_seconds() == 0:
logger.debug("❌ 解析出的时间增量为0。")
return None
target_time = self.now + delta
logger.debug(f"✅ 相对时间解析成功 -> {target_time}")
return target_time
except (ValueError, TypeError) as e:
logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True)
return None
return None
def _parse_absolute_time(self) -> Optional[datetime.datetime]:
"""解析一个指定的日期和时间。"""
logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}")
# 定义解析步骤和顺序
# (pattern_key, handler_method)
parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [
("TOMORROW", self._handle_tomorrow),
("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow),
("TODAY", self._handle_today),
("YEAR", self._handle_year),
("MONTH", self._handle_month),
("DAY", self._handle_day),
("HMS_COLON", self._handle_hms_colon),
("PM", self._handle_pm),
("HOUR", self._handle_hour),
("MINUTE", self._handle_minute),
("SECOND", self._handle_second),
]
for key, handler in parsing_steps:
if match := PATTERNS[key].search(self.content_to_parse):
if not handler(match):
# 如果任何一个处理器返回False说明解析失败
return None
# 移除无意义的上午关键词
self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "")
# 如果解析后还有剩余字符,说明有无法识别的部分
if self.content_to_parse.strip():
logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'")
return None
# 最终调整和检查
return self._finalize_datetime()
# --- Handler Methods for Absolute Time Parsing ---
def _handle_tomorrow(self, match: re.Match) -> bool:
self.t += datetime.timedelta(days=1)
self.is_date_specified = True
logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_day_after_tomorrow(self, match: re.Match) -> bool:
self.t += datetime.timedelta(days=2)
self.is_date_specified = True
logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_today(self, match: re.Match) -> bool:
self.is_date_specified = True
logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'")
return True
def _handle_year(self, match: re.Match) -> bool:
year = parse_number(match.group(1))
if year < 0: return False
if year < 100: year += 2000 # 处理 "25年" -> 2025
if year < self.now.year:
logger.debug(f"❌ 指定的年份 {year} 已过去。")
return False
self.t = self.t.replace(year=year)
self.is_date_specified = True
logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_month(self, match: re.Match) -> bool:
month = parse_number(match.group(1))
if not (1 <= month <= 12):
logger.debug(f"❌ 无效的月份: {month}")
return False
# 如果设置的月份在当前月份之前,且没有指定年份,则年份加一
if month < self.t.month and not self.is_date_specified:
self.t = self.t.replace(year=self.t.year + 1)
logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}")
self.t = self.t.replace(month=month)
self.is_date_specified = True
logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_day(self, match: re.Match) -> bool:
day = parse_number(match.group(1))
if not (1 <= day <= 31):
logger.debug(f"❌ 无效的日期: {day}")
return False
try:
# 如果日期小于当前日期,且只指定了日,则月份加一
if day < self.t.day and not self.is_date_specified:
if self.t.month == 12:
self.t = self.t.replace(year=self.t.year + 1, month=1)
else:
self.t = self.t.replace(month=self.t.month + 1)
logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}")
self.t = self.t.replace(day=day)
self.is_date_specified = True
logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'")
return True
except ValueError:
logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。")
return False
def _handle_hms_colon(self, match: re.Match) -> bool:
h = int(match.group(1))
m = int(match.group(2))
s_str = match.group(4) # group(3) is with colon, group(4) is the number
s = int(s_str) if s_str else 0
if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59):
logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}")
return False
self.t = self.t.replace(hour=h, minute=m, second=s)
self.is_time_specified = True
logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_pm(self, match: re.Match) -> bool:
self.is_pm_specified = True
logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'")
return True
def _handle_hour(self, match: re.Match) -> bool:
hour = parse_number(match.group(1))
has_half = match.group(2) == ''
if not (0 <= hour <= 23):
logger.debug(f"❌ 无效的小时: {hour}")
return False
minute = 30 if has_half else self.t.minute
self.t = self.t.replace(hour=hour, minute=minute)
self.is_time_specified = True
logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_minute(self, match: re.Match) -> bool:
minute = parse_number(match.group(1))
if not (0 <= minute <= 59):
logger.debug(f"❌ 无效的分钟: {minute}")
return False
self.t = self.t.replace(minute=minute)
self.is_time_specified = True
logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_second(self, match: re.Match) -> bool:
second = parse_number(match.group(1))
if not (0 <= second <= 59):
logger.debug(f"❌ 无效的秒: {second}")
return False
self.t = self.t.replace(second=second)
self.is_time_specified = True
logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'")
return True
def _finalize_datetime(self) -> Optional[datetime.datetime]:
"""对解析出的时间进行最后的调整和检查。"""
# 处理下午/晚上
if self.is_pm_specified and self.t.hour < 12:
self.t = self.t.replace(hour=self.t.hour + 12)
logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}")
# 如果没有指定任何时间或日期部分,则认为解析无效
if not self.is_date_specified and not self.is_time_specified:
logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。")
return None
# 如果最终计算出的时间点在当前时间之前,自动往后推
# 例如:现在是 15:00说 "14点"应该是指明天的14点
if self.t < self.now:
# 只有在明确指定了时间的情况下,才自动加一天
# 如果只指定了一个过去的日期如“去年5月1号”则不应该调整
if self.is_time_specified:
self.t += datetime.timedelta(days=1)
logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}")
logger.debug(f"✅ 解析成功,最终时间: {self.t}")
return self.t
# --- 公共接口 (Public Interface) ---
def get_target_time(content: str) -> Optional[datetime.datetime]:
"""
高级接口,用于将自然语言时间描述转换为 datetime 对象。
Args:
content: 包含时间信息的字符串。
Returns:
一个 datetime 对象,如果解析失败则返回 None。
"""
parser = TimeParser(content)
return parser.parse()

View File

@ -0,0 +1,268 @@
import os
import tempfile
from typing import Optional
from PIL import Image, ImageSequence
from nonebot.adapters import Event as BaseEvent
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
Args,
Field,
UniMessage,
on_alconna,
)
__plugin_meta__ = PluginMetadata(
name="ytpgif",
description="生成来回镜像翻转的仿 YTPMV 动图。",
usage="ytpgif [倍速=1.0] 倍速范围0.120.0",
type="application",
config=None,
homepage=None,
)
# 参数定义
BASE_SEGMENT_DURATION = 0.25
BASE_INTERVAL = 0.25
MAX_SIZE = 256
MIN_SPEED = 0.1
MAX_SPEED = 20.0
MAX_FRAMES_PER_SEGMENT = 500
# 提示语
SPEED_TIPS = f"倍速必须是 {MIN_SPEED}{MAX_SPEED} 之间的数字"
# 定义命令 + 参数校验
ytpgif_cmd = on_alconna(
Alconna(
"ytpgif",
Args[
"speed?",
float,
Field(
default=1.0,
unmatch_tips=lambda x: f"{x}”不是有效数值。{SPEED_TIPS}",
),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
)
async def get_image_url(event: BaseEvent) -> Optional[str]:
"""从事件中提取图片 URL支持直接消息和回复"""
msg = event.get_message()
for seg in msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
if hasattr(event, "reply") and (reply := event.reply):
reply_msg = reply.message
for seg in reply_msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
return None
async def download_image(url: str) -> bytes:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=10)
resp.raise_for_status()
return resp.content
def resize_frame(frame: Image.Image) -> Image.Image:
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
w, h = frame.size
if w <= MAX_SIZE and h <= MAX_SIZE:
return frame
scale = MAX_SIZE / max(w, h)
new_w = int(w * scale)
new_h = int(h * scale)
return frame.resize((new_w, new_h), Image.Resampling.LANCZOS)
@ytpgif_cmd.handle()
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
# === 校验 speed 范围 ===
if not (MIN_SPEED <= speed <= MAX_SPEED):
await ytpgif_cmd.send(
await UniMessage.text(f"{SPEED_TIPS}").export()
)
return
img_url = await get_image_url(event)
if not img_url:
await ytpgif_cmd.send(
await UniMessage.text(
"请发送一张图片或回复一张图片来生成镜像动图。"
).export()
)
return
try:
image_data = await download_image(img_url)
except Exception as e:
print(f"[YTPGIF] 下载失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 图片下载失败,请重试。").export()
)
return
input_path = output_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
tmp_in.write(image_data)
input_path = tmp_in.name
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out:
output_path = tmp_out.name
with Image.open(input_path) as src_img:
# === 判断是否为动图 ===
try:
n_frames = getattr(src_img, "n_frames", 1)
is_animated = n_frames > 1
except Exception:
is_animated = False
output_frames = []
output_durations_ms = []
if is_animated:
# === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration = []
palette = src_img.getpalette()
for idx in range(n_frames):
src_img.seek(idx)
frame = src_img.copy()
# 检查是否需要透明通道
has_alpha = (
frame.mode in ("RGBA", "LA")
or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
# 若原图有调色板,尝试保留(可选)
if palette and resized_frame.mode == "P":
try:
resized_frame.putpalette(palette)
except Exception: # noqa
pass
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
dur_sec = max(0.01, ms / 1000.0)
frames_with_duration.append((resized_frame, dur_sec))
max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0
frame_count = 0
# 正向段
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
output_frames.append(img)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
if frame_count == 0:
await ytpgif_cmd.send(
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
)
return
# 镜像段(从头开始)
accumulated = 0.0
frame_count = 0
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
output_frames.append(flipped)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
output_frames = [frame1, frame2]
output_durations_ms = [duration_ms, duration_ms]
if len(output_frames) < 1:
await ytpgif_cmd.send(
await UniMessage.text("未能生成任何帧。").export()
)
return
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
need_transparency = False
for frame in output_frames:
if frame.mode == "RGBA":
alpha_channel = frame.getchannel("A")
if any(pix < 255 for pix in alpha_channel.getdata()):
need_transparency = True
break
elif frame.mode == "P" and "transparency" in frame.info:
need_transparency = True
break
# 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency:
output_frames = [f.convert("RGB") for f in output_frames]
# 构建保存参数
save_kwargs = {
"save_all": True,
"append_images": output_frames[1:],
"format": "GIF",
"loop": 0, # 无限循环
"duration": output_durations_ms,
"disposal": 2, # 清除到背景色,避免残留
"optimize": False, # 关闭抖动(等效 -dither none
}
# 只有真正需要透明时才启用 transparency
if need_transparency:
save_kwargs["transparency"] = 0
output_frames[0].save(output_path, **save_kwargs)
# 发送结果
with open(output_path, "rb") as f:
result_image = UniMessage.image(raw=f.read())
await ytpgif_cmd.send(await result_image.export())
except Exception as e:
print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
)
finally:
for path in filter(None, [input_path, output_path]):
if os.path.exists(path):
try:
os.unlink(path)
except: # noqa
pass

0
scripts/__init__.py Normal file
View File

View File

@ -0,0 +1,21 @@
from pathlib import Path
import nonebot
from loguru import logger
nonebot.init()
nonebot.load_plugins("konabot/plugins")
plugins = nonebot.get_loaded_plugins()
len_requires = len(
[f for f in (
Path(__file__).parent.parent / "konabot" / "plugins"
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
)
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]
logger.info(f"已经加载的插件数量 {len(plugins)}")
logger.info(f"期待加载的插件数量 {len_requires}")
assert len(plugins) == len_requires