Compare commits

...

12 Commits

Author SHA1 Message Date
09c9d44798 Merge pull request 'Feature: 好多好多的说' (#21) from feature-新说 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #21
2025-10-03 13:52:31 +08:00
0c4206f461 好多好多的说 2025-10-03 13:49:36 +08:00
9fb8fd90dc 修复类型注解
Some checks are pending
continuous-integration/drone/push Build is running
2025-10-02 12:06:20 +08:00
8c4fa2b5e4 Merge pull request 'fix: 透明底正常生成;静动图分离完成' (#18) from tnot/konabot:fix--修复部分Bug into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #18
2025-10-01 19:24:23 +08:00
fb2c3f1ce2 fix: 透明底正常生成;静动图分离完成 2025-10-01 11:54:54 +08:00
265415e727 Merge pull request 'feat: ytpgif' (#16) from tnot/konabot:feat--ytpgif into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #16
2025-09-30 22:38:15 +08:00
06555b2225 feat: ytpgif 2025-09-30 22:24:16 +08:00
f6fd25a41d fix: 添加 Notify 任务创建限制
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 19:08:30 +08:00
9f6c70bf0f 合并后的部分骰子路径修复
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 15:21:53 +08:00
1c01e49d5d Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-09-30 15:18:16 +08:00
48c719bc33 全新骰子 2025-09-30 15:13:59 +08:00
6bc9f94e83 拷贝环境变量
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 02:45:01 +08:00
16 changed files with 660 additions and 76 deletions

View File

@ -4,7 +4,7 @@ WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt --no-deps
COPY bot.py pyproject.toml ./
COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets
COPY scripts ./scripts
COPY konabot ./konabot

Binary file not shown.

BIN
assets/img/dice/stick.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

BIN
assets/img/meme/dss.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 172 KiB

BIN
assets/img/meme/mnksay.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 364 KiB

View File

@ -3,8 +3,7 @@ from io import BytesIO
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from konabot.plugins.memepack.drawing.geimao import draw_geimao
from konabot.plugins.memepack.drawing.pt import draw_pt
from konabot.plugins.memepack.drawing.saying import draw_geimao, draw_mnk, draw_pt, draw_suan
geimao = on_alconna(Alconna(
"给猫说",
@ -36,3 +35,51 @@ async def _(saying: list[str]):
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna(
"re:小?黑白子?说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写黑白子说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
@mnk.handle()
async def _(saying: list[str]):
img = await draw_mnk("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna(
"小蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写小蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
@suan.handle()
async def _(saying: list[str]):
img = await draw_suan("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna(
"大蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写大蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
@dsuan.handle()
async def _(saying: list[str]):
img = await draw_suan("\n".join(saying), True)
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())

View File

@ -10,3 +10,4 @@ FontDB.SetDefaultEmojiOptions(EmojiOptions(
HARMONYOS_SANS_SC_BLACK = FontDB.Query("HarmonyOS_Sans_SC_Black")
HARMONYOS_SANS_SC_REGULAR = FontDB.Query("HarmonyOS_Sans_SC_Regular")
LXGWWENKAI_REGULAR = FontDB.Query("LXGWWenKai-Regular")

View File

@ -1,30 +0,0 @@
import asyncio
from typing import Any, cast
import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from .base.fonts import HARMONYOS_SANS_SC_BLACK
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
def _draw_geimao(saying: str):
img = geimao_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 960, 50, 00.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
0.8,
imagetext_py.TextAlign.Center,
cast(Any, 30.0),
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
draw_emojis=True,
)
return img
async def draw_geimao(saying: str):
return await asyncio.to_thread(_draw_geimao, saying)

View File

@ -1,27 +0,0 @@
import asyncio
import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
def _draw_pt(saying: str):
img = pt_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_pt(saying: str):
return await asyncio.to_thread(_draw_pt, saying)

View File

@ -0,0 +1,90 @@
import asyncio
from typing import Any, cast
import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from .base.fonts import HARMONYOS_SANS_SC_BLACK, HARMONYOS_SANS_SC_REGULAR, LXGWWENKAI_REGULAR
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
mnk_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "mnksay.jpg").convert("RGBA")
dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert("RGBA")
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
def _draw_geimao(saying: str):
img = geimao_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 960, 50, 0.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
0.8,
imagetext_py.TextAlign.Center,
cast(Any, 30.0),
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
draw_emojis=True,
)
return img
async def draw_geimao(saying: str):
return await asyncio.to_thread(_draw_geimao, saying)
def _draw_pt(saying: str):
img = pt_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_pt(saying: str):
return await asyncio.to_thread(_draw_pt, saying)
def _draw_mnk(saying: str):
img = mnk_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 540, 25, 0.5, 0, 1080, 120, HARMONYOS_SANS_SC_BLACK,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
0.8,
imagetext_py.TextAlign.Center,
cast(Any, 15.0),
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
draw_emojis=True,
)
return img
async def draw_mnk(saying: str):
return await asyncio.to_thread(_draw_mnk, saying)
def _draw_suan(saying: str, dasuan: bool = False):
if dasuan:
img = dasuan_image.copy()
else:
img = suan_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 1020, 290, 0.5, 0.5, 400, 48, LXGWWENKAI_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_suan(saying: str, dasuan: bool = False):
return await asyncio.to_thread(_draw_suan, saying, dasuan)

View File

@ -1,11 +1,11 @@
from typing import Optional
from typing import Optional, Union
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.plugins.roll_dice.roll_dice import generate_dice_image
from konabot.plugins.roll_dice.roll_number import get_random_number, roll_number
from konabot.plugins.roll_dice.roll_number import get_random_number, get_random_number_string, roll_number
evt = on_alconna(Alconna(
"摇数字"
@ -22,21 +22,26 @@ async def _(event: BaseEvent):
evt = on_alconna(Alconna(
"摇骰子",
Args["f1?", int]["f2?", int]
Args["f1?", str]["f2?", str]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, f1: Optional[int] = None, f2: Optional[int] = None):
async def _(event: BaseEvent, f1: Optional[str] = None, f2: Optional[str] = None):
# if isinstance(event, DiscordMessageEvent):
# await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export())
# elif isinstance(event, ConsoleMessageEvent):
number = 0
number = ""
if(f1 is not None and f2 is not None):
number = get_random_number(f1, f2)
number = get_random_number_string(f1, f2)
elif f1 is not None:
number = get_random_number(1, f1)
if(float(f1) > 1):
number = get_random_number_string("1", f1)
elif (float(f1) > 0):
number = get_random_number_string("0", f1)
else:
number = get_random_number_string(f1, "0")
else:
number = get_random_number()
number = get_random_number_string()
await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export())
# else:
# await evt.send(await UniMessage().text(roll_dice(wide=True)).export())

View File

@ -152,27 +152,208 @@ def precise_blend_with_perspective(background, foreground, corners):
return result
async def generate_dice_image(number: int) -> BytesIO:
def draw_line_bresenham(image, x0, y0, x1, y1, color):
"""使用Bresenham算法画线避免间隙"""
dx = abs(x1 - x0)
dy = abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx - dy
while True:
if 0 <= x0 < image.shape[1] and 0 <= y0 < image.shape[0]:
image[y0, x0] = color
if x0 == x1 and y0 == y1:
break
e2 = 2 * err
if e2 > -dy:
err -= dy
x0 += sx
if e2 < dx:
err += dx
y0 += sy
def slice_and_stretch(image, slice_lines, direction):
'''
image: 图像
slice_lines: 切割线两个点的列表一般是倾斜45度的直线
direction: 移动方向向量(二元数组)
'''
# 获取图片的尺寸
height, width = image.shape[:2]
# 创建一个由移动方向扩充后,更大的图片
new_width = int(width + abs(direction[0]))
new_height = int(height + abs(direction[1]))
new_image = np.zeros((new_height, new_width, 4), dtype=image.dtype)
# 先把图片放在新图的和方向相反的一侧
offset_x = int(abs(min(0, direction[0])))
offset_y = int(abs(min(0, direction[1])))
new_image[offset_y:offset_y+height, offset_x:offset_x+width] = image
# 切割线也跟着偏移
slice_lines = [(x + offset_x, y + offset_y) for (x, y) in slice_lines]
# 复制切割线经过的像素,沿着方向移动,实现类似拖尾的效果
apply_trail_effect_vectorized(new_image, slice_lines, direction)
apply_stroke_vectorized(new_image, slice_lines, direction)
return new_image, offset_x, offset_y
def apply_trail_effect_vectorized(new_image, slice_lines, direction):
"""向量化实现拖尾效果"""
height, width = new_image.shape[:2]
# 创建坐标网格
y_coords, x_coords = np.mgrid[0:height, 0:width]
# 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]], axis=-1)
# 计算叉积(有向距离)
cross_products = (line_vec[0] * point_vecs[:, :, 1] -
line_vec[1] * point_vecs[:, :, 0])
# 选择直线右侧的像素 (d1 > 0)
mask = cross_products > 0
# 计算目标位置
target_x = (x_coords + direction[0]).astype(int)
target_y = (y_coords + direction[1]).astype(int)
# 创建有效位置掩码
valid_mask = mask & (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
# 批量复制像素
new_image[target_y[valid_mask], target_x[valid_mask]] = \
new_image[y_coords[valid_mask], x_coords[valid_mask]]
def apply_stroke_vectorized(new_image, slice_lines, direction):
"""使用向量化操作优化笔画效果"""
height, width = new_image.shape[:2]
# 1. 找到所有非透明像素
non_transparent = np.where(new_image[:, :, 3] > 0)
if len(non_transparent[0]) == 0:
return
y_coords, x_coords = non_transparent
# 2. 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.column_stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]])
# 计算叉积(距离)
cross_products = (line_vec[0] * point_vecs[:, 1] -
line_vec[1] * point_vecs[:, 0])
# 3. 选择靠近直线的像素
mask = np.abs(cross_products) < 1.0
selected_y = y_coords[mask]
selected_x = x_coords[mask]
selected_pixels = new_image[selected_y, selected_x]
if len(selected_x) == 0:
return
# 4. 预计算采样点
length = np.sqrt(direction[0]**2 + direction[1]**2)
if length == 0:
return
# 创建采样偏移
dx_dy = np.array([(dx, dy) for dx in [-0.5, 0, 0.5]
for dy in [-0.5, 0, 0.5]])
# 5. 批量计算目标位置
steps = max(1, int(length * 2))
alpha = 0.7
for k in range(1, steps + 1):
# 对所有选中的像素批量计算新位置
scale = k / steps
# 为每个像素和每个采样点计算目标位置
for dx, dy in dx_dy:
target_x = np.round(selected_x + dx + direction[0] * scale).astype(int)
target_y = np.round(selected_y + dy + direction[1] * scale).astype(int)
# 创建有效位置掩码
valid_mask = (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
if np.any(valid_mask):
valid_target_x = target_x[valid_mask]
valid_target_y = target_y[valid_mask]
valid_source_idx = np.where(valid_mask)[0]
# 批量混合像素
source_pixels = selected_pixels[valid_source_idx]
target_pixels = new_image[valid_target_y, valid_target_x]
new_image[valid_target_y, valid_target_x] = (
alpha * source_pixels + (1 - alpha) * target_pixels
)
async def generate_dice_image(number: str) -> BytesIO:
# 将文本转换为带透明背景的图像
text = str(number)
text = number
# 如果文本太长,直接返回金箍棒
if(len(text) > 50):
output = BytesIO()
push_image = Image.open(ASSETS_PATH / "img" / "dice" / "stick.png")
push_image.save(output,format='PNG')
output.seek(0)
return output
text_image = text_to_transparent_image(
text,
font_size=60,
text_color=(0, 0, 0) # 黑色文字
)
# 获取长宽比
height, width = text_image.shape[:2]
aspect_ratio = width / height
# 根据长宽比设置拉伸系数
stretch_k = 1
if aspect_ratio > 1:
stretch_k = aspect_ratio
# 骰子的方向
up_direction = (51 - 16, 5 - 30) # 右上角点 - 左上角点
move_distance = (up_direction[0] * (stretch_k - 1), up_direction[1] * (stretch_k - 1))
# 加载背景图像,保留透明通道
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
assert background is not None
height, width = background.shape[:2]
background, offset_x, offset_y = slice_and_stretch(background,
[(10,10),(0,0)],
move_distance)
# 定义3D变换的四个角点透视效果
# 顺序: [左上, 右上, 右下, 左下]
corners = np.array([
[16, 30], # 左上
[51, 5], # 右上(上移,创建透视)
[88, 33], # 右下
[51 + move_distance[0], 5 + move_distance[1]], # 右上(上移,创建透视)
[88 + move_distance[0], 33 + move_distance[1]], # 右下
[49, 62] # 左下(下移)
], dtype=np.float32)
corners[:, 0] += offset_x
corners[:, 1] += offset_y
# 加载背景图像,保留透明通道
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
# 对文本图像进行3D变换保持透明通道
transformed_text, transform_matrix = perspective_transform(text_image, background, corners)
@ -186,6 +367,26 @@ async def generate_dice_image(number: int) -> BytesIO:
images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)]
images.append(pil_final)
frame_durations = [100] * (len(images) - 1) + [100000]
# 将导入的图像尺寸扩展为和 pil_final 相同的大小,随帧数进行扩展,然后不放大的情况下放在最中间
if(aspect_ratio > 1):
target_size = pil_final.size
for i in range(len(images) - 1):
k = i / (len(images) - 1)
now_distance = (move_distance[0] * k, move_distance[1] * k)
img = np.array(images[i])
img, _, _ = slice_and_stretch(img,
[(10,10),(0,0)],
now_distance)
# 只扩展边界,图像本身不放大
img_width, img_height = img.shape[1], img.shape[0]
new_img = Image.new("RGBA", target_size, (0, 0, 0, 0))
this_offset_x = (target_size[0] - img_width) // 2
this_offset_y = (target_size[1] - img_height) // 2
# new_img.paste(img, (this_offset_x, this_offset_y))
new_img.paste(Image.fromarray(img), (this_offset_x, this_offset_y))
images[i] = new_img
# 保存为BytesIO对象
output = BytesIO()
images[0].save(output,
@ -194,4 +395,6 @@ async def generate_dice_image(number: int) -> BytesIO:
duration=frame_durations,
format='GIF',
loop=1)
output.seek(0)
# pil_final.save(output, format='PNG')
return output

View File

@ -42,6 +42,24 @@ def get_random_number(min: int = 1, max: int = 6) -> int:
import random
return random.randint(min, max)
def get_random_number_string(min_value: str = "1", max_value: str = "6") -> str:
import random
# 先判断二者是不是整数
if (float(min_value).is_integer()
and float(max_value).is_integer()
and "." not in min_value
and "." not in max_value):
return str(random.randint(int(float(min_value)), int(float(max_value))))
# 根据传入小数的位数,决定保留几位小数
if "." in str(min_value) or "." in str(max_value):
decimal_places = max(len(str(min_value).split(".")[1]) if "." in str(min_value) else 0,
len(str(max_value).split(".")[1]) if "." in str(max_value) else 0)
return str(round(random.uniform(float(min_value), float(max_value)), decimal_places))
# 如果没有小数点,很可能二者都是指数表示或均为 inf直接返回随机小数
return str(random.uniform(float(min_value), float(max_value)))
def roll_number(wide: bool = False) -> str:
raw = number_arts[get_random_number()]
if wide:

View File

@ -185,9 +185,18 @@ async def _(msg: UniMsg, mEvt: Event):
driver = nonebot.get_driver()
NOTIFIED_FLAG = {
"task_added": False,
}
@driver.on_bot_connect
async def _():
if NOTIFIED_FLAG["task_added"]:
return
NOTIFIED_FLAG["task_added"] = True
await DATA_FILE_LOCK.acquire()
tasks = []
cfg = load_notify_config()

View File

@ -0,0 +1,268 @@
import os
import tempfile
from typing import Optional
from PIL import Image, ImageSequence
from nonebot.adapters import Event as BaseEvent
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
Args,
Field,
UniMessage,
on_alconna,
)
__plugin_meta__ = PluginMetadata(
name="ytpgif",
description="生成来回镜像翻转的仿 YTPMV 动图。",
usage="ytpgif [倍速=1.0] 倍速范围0.120.0",
type="application",
config=None,
homepage=None,
)
# 参数定义
BASE_SEGMENT_DURATION = 0.25
BASE_INTERVAL = 0.25
MAX_SIZE = 256
MIN_SPEED = 0.1
MAX_SPEED = 20.0
MAX_FRAMES_PER_SEGMENT = 500
# 提示语
SPEED_TIPS = f"倍速必须是 {MIN_SPEED}{MAX_SPEED} 之间的数字"
# 定义命令 + 参数校验
ytpgif_cmd = on_alconna(
Alconna(
"ytpgif",
Args[
"speed?",
float,
Field(
default=1.0,
unmatch_tips=lambda x: f"{x}”不是有效数值。{SPEED_TIPS}",
),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
)
async def get_image_url(event: BaseEvent) -> Optional[str]:
"""从事件中提取图片 URL支持直接消息和回复"""
msg = event.get_message()
for seg in msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
if hasattr(event, "reply") and (reply := event.reply):
reply_msg = reply.message
for seg in reply_msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
return None
async def download_image(url: str) -> bytes:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=10)
resp.raise_for_status()
return resp.content
def resize_frame(frame: Image.Image) -> Image.Image:
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
w, h = frame.size
if w <= MAX_SIZE and h <= MAX_SIZE:
return frame
scale = MAX_SIZE / max(w, h)
new_w = int(w * scale)
new_h = int(h * scale)
return frame.resize((new_w, new_h), Image.Resampling.LANCZOS)
@ytpgif_cmd.handle()
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
# === 校验 speed 范围 ===
if not (MIN_SPEED <= speed <= MAX_SPEED):
await ytpgif_cmd.send(
await UniMessage.text(f"{SPEED_TIPS}").export()
)
return
img_url = await get_image_url(event)
if not img_url:
await ytpgif_cmd.send(
await UniMessage.text(
"请发送一张图片或回复一张图片来生成镜像动图。"
).export()
)
return
try:
image_data = await download_image(img_url)
except Exception as e:
print(f"[YTPGIF] 下载失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 图片下载失败,请重试。").export()
)
return
input_path = output_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
tmp_in.write(image_data)
input_path = tmp_in.name
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out:
output_path = tmp_out.name
with Image.open(input_path) as src_img:
# === 判断是否为动图 ===
try:
n_frames = getattr(src_img, "n_frames", 1)
is_animated = n_frames > 1
except Exception:
is_animated = False
output_frames = []
output_durations_ms = []
if is_animated:
# === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration = []
palette = src_img.getpalette()
for idx in range(n_frames):
src_img.seek(idx)
frame = src_img.copy()
# 检查是否需要透明通道
has_alpha = (
frame.mode in ("RGBA", "LA")
or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
# 若原图有调色板,尝试保留(可选)
if palette and resized_frame.mode == "P":
try:
resized_frame.putpalette(palette)
except Exception: # noqa
pass
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
dur_sec = max(0.01, ms / 1000.0)
frames_with_duration.append((resized_frame, dur_sec))
max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0
frame_count = 0
# 正向段
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
output_frames.append(img)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
if frame_count == 0:
await ytpgif_cmd.send(
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
)
return
# 镜像段(从头开始)
accumulated = 0.0
frame_count = 0
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
output_frames.append(flipped)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
output_frames = [frame1, frame2]
output_durations_ms = [duration_ms, duration_ms]
if len(output_frames) < 1:
await ytpgif_cmd.send(
await UniMessage.text("未能生成任何帧。").export()
)
return
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
need_transparency = False
for frame in output_frames:
if frame.mode == "RGBA":
alpha_channel = frame.getchannel("A")
if any(pix < 255 for pix in alpha_channel.getdata()):
need_transparency = True
break
elif frame.mode == "P" and "transparency" in frame.info:
need_transparency = True
break
# 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency:
output_frames = [f.convert("RGB") for f in output_frames]
# 构建保存参数
save_kwargs = {
"save_all": True,
"append_images": output_frames[1:],
"format": "GIF",
"loop": 0, # 无限循环
"duration": output_durations_ms,
"disposal": 2, # 清除到背景色,避免残留
"optimize": False, # 关闭抖动(等效 -dither none
}
# 只有真正需要透明时才启用 transparency
if need_transparency:
save_kwargs["transparency"] = 0
output_frames[0].save(output_path, **save_kwargs)
# 发送结果
with open(output_path, "rb") as f:
result_image = UniMessage.image(raw=f.read())
await ytpgif_cmd.send(await result_image.export())
except Exception as e:
print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
)
finally:
for path in filter(None, [input_path, output_path]):
if os.path.exists(path):
try:
os.unlink(path)
except: # noqa
pass