Merge pull request 'fix: 透明底正常生成;静动图分离完成' (#18) from tnot/konabot:fix--修复部分Bug into master

Reviewed-on: mttu-developers/konabot#18
This commit is contained in:
2025-10-01 19:24:23 +08:00

View File

@ -15,14 +15,14 @@ from nonebot_plugin_alconna import (
__plugin_meta__ = PluginMetadata( __plugin_meta__ = PluginMetadata(
name="ytpgif", name="ytpgif",
description="生成来回镜像翻转的动图:动图按时间分段播放,静态图高频翻转(带参数范围保护)", description="生成来回镜像翻转的仿 YTPMV 动图。",
usage="/ytpgif [倍速=1.0] 倍速范围0.120.0", usage="ytpgif [倍速=1.0] 倍速范围0.120.0",
type="application", type="application",
config=None, config=None,
homepage=None, homepage=None,
) )
# 参数定义(带硬性限制) # 参数定义
BASE_SEGMENT_DURATION = 0.25 BASE_SEGMENT_DURATION = 0.25
BASE_INTERVAL = 0.25 BASE_INTERVAL = 0.25
MAX_SIZE = 256 MAX_SIZE = 256
@ -54,12 +54,15 @@ ytpgif_cmd = on_alconna(
async def get_image_url(event: BaseEvent) -> Optional[str]: async def get_image_url(event: BaseEvent) -> Optional[str]:
"""从事件中提取图片 URL支持直接消息和回复"""
msg = event.get_message() msg = event.get_message()
for seg in msg: for seg in msg:
if seg.type == "image" and seg.data.get("url"): if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"]) return str(seg.data["url"])
if hasattr(event, "reply") and (reply := event.reply): if hasattr(event, "reply") and (reply := event.reply):
for seg in reply.message: reply_msg = reply.message
for seg in reply_msg:
if seg.type == "image" and seg.data.get("url"): if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"]) return str(seg.data["url"])
return None return None
@ -74,9 +77,11 @@ async def download_image(url: str) -> bytes:
def resize_frame(frame: Image.Image) -> Image.Image: def resize_frame(frame: Image.Image) -> Image.Image:
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
w, h = frame.size w, h = frame.size
if w <= MAX_SIZE and h <= MAX_SIZE: if w <= MAX_SIZE and h <= MAX_SIZE:
return frame return frame
scale = MAX_SIZE / max(w, h) scale = MAX_SIZE / max(w, h)
new_w = int(w * scale) new_w = int(w * scale)
new_h = int(h * scale) new_h = int(h * scale)
@ -96,14 +101,15 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
if not img_url: if not img_url:
await ytpgif_cmd.send( await ytpgif_cmd.send(
await UniMessage.text( await UniMessage.text(
"请发送一张图片并使用 /ytpgif或回复一张图片来生成镜像动图。" "请发送一张图片或回复一张图片来生成镜像动图。"
).export() ).export()
) )
return return
try: try:
image_data = await download_image(img_url) image_data = await download_image(img_url)
except Exception: except Exception as e:
print(f"[YTPGIF] 下载失败: {e}")
await ytpgif_cmd.send( await ytpgif_cmd.send(
await UniMessage.text("❌ 图片下载失败,请重试。").export() await UniMessage.text("❌ 图片下载失败,请重试。").export()
) )
@ -119,22 +125,47 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
output_path = tmp_out.name output_path = tmp_out.name
with Image.open(input_path) as src_img: with Image.open(input_path) as src_img:
is_animated = getattr(src_img, "is_animated", False) or src_img.n_frames > 1 # === 判断是否为动图 ===
try:
n_frames = getattr(src_img, "n_frames", 1)
is_animated = n_frames > 1
except Exception:
is_animated = False
output_frames = [] output_frames = []
output_durations_ms = [] output_durations_ms = []
if is_animated: if is_animated:
# === 动图模式:播放两段,每段最多 BASE_SEGMENT_DURATION * speed 秒,且帧数 ≤ 100 === # === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration = [] frames_with_duration = []
for frame in ImageSequence.Iterator(src_img): palette = src_img.getpalette()
rgb_frame = frame.convert("RGB")
resized_frame = resize_frame(rgb_frame) for idx in range(n_frames):
src_img.seek(idx)
frame = src_img.copy()
# 检查是否需要透明通道
has_alpha = (
frame.mode in ("RGBA", "LA")
or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
# 若原图有调色板,尝试保留(可选)
if palette and resized_frame.mode == "P":
try:
resized_frame.putpalette(palette)
except Exception: # noqa
pass
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000)) ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
dur_sec = max(0.01, ms / 1000.0) # 至少 10ms dur_sec = max(0.01, ms / 1000.0)
frames_with_duration.append((resized_frame, dur_sec)) frames_with_duration.append((resized_frame, dur_sec))
max_dur = BASE_SEGMENT_DURATION * speed # 每段最大播放时间 max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0 accumulated = 0.0
frame_count = 0 frame_count = 0
@ -167,10 +198,10 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
else: else:
# === 静态图模式:制作翻转动画 === # === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGB") raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame) resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed)) # 限制在 25ms ~ 2.5s interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000) duration_ms = int(interval_sec * 1000)
frame1 = resized_frame frame1 = resized_frame
@ -185,16 +216,38 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
) )
return return
# 保存 GIF # === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
output_frames[0].save( need_transparency = False
output_path, for frame in output_frames:
save_all=True, if frame.mode == "RGBA":
append_images=output_frames[1:], alpha_channel = frame.getchannel("A")
format="GIF", if any(pix < 255 for pix in alpha_channel.getdata()):
loop=0, need_transparency = True
duration=output_durations_ms, break
disposal=2, elif frame.mode == "P" and "transparency" in frame.info:
) need_transparency = True
break
# 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency:
output_frames = [f.convert("RGB") for f in output_frames]
# 构建保存参数
save_kwargs = {
"save_all": True,
"append_images": output_frames[1:],
"format": "GIF",
"loop": 0, # 无限循环
"duration": output_durations_ms,
"disposal": 2, # 清除到背景色,避免残留
"optimize": False, # 关闭抖动(等效 -dither none
}
# 只有真正需要透明时才启用 transparency
if need_transparency:
save_kwargs["transparency"] = 0
output_frames[0].save(output_path, **save_kwargs)
# 发送结果 # 发送结果
with open(output_path, "rb") as f: with open(output_path, "rb") as f:
@ -204,9 +257,12 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
except Exception as e: except Exception as e:
print(f"[YTPGIF] 处理失败: {e}") print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send( await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持文件过大。").export() await UniMessage.text("❌ 处理失败,可能是图片格式不支持文件损坏或过大。").export()
) )
finally: finally:
for path in filter(None, [input_path, output_path]): for path in filter(None, [input_path, output_path]):
if os.path.exists(path): if os.path.exists(path):
os.unlink(path) try:
os.unlink(path)
except: # noqa
pass