diff --git a/konabot/plugins/ytpgif/__init__.py b/konabot/plugins/ytpgif/__init__.py index 9126b62..4390dac 100644 --- a/konabot/plugins/ytpgif/__init__.py +++ b/konabot/plugins/ytpgif/__init__.py @@ -15,14 +15,14 @@ from nonebot_plugin_alconna import ( __plugin_meta__ = PluginMetadata( name="ytpgif", - description="生成来回镜像翻转的动图:动图按时间分段播放,静态图高频翻转(带参数范围保护)", - usage="/ytpgif [倍速=1.0] (倍速范围:0.1~20.0)", + description="生成来回镜像翻转的仿 YTPMV 动图。", + usage="ytpgif [倍速=1.0] (倍速范围:0.1~20.0)", type="application", config=None, homepage=None, ) -# 参数定义(带硬性限制) +# 参数定义 BASE_SEGMENT_DURATION = 0.25 BASE_INTERVAL = 0.25 MAX_SIZE = 256 @@ -54,12 +54,15 @@ ytpgif_cmd = on_alconna( async def get_image_url(event: BaseEvent) -> Optional[str]: + """从事件中提取图片 URL,支持直接消息和回复""" msg = event.get_message() for seg in msg: if seg.type == "image" and seg.data.get("url"): return str(seg.data["url"]) + if hasattr(event, "reply") and (reply := event.reply): - for seg in reply.message: + reply_msg = reply.message + for seg in reply_msg: if seg.type == "image" and seg.data.get("url"): return str(seg.data["url"]) return None @@ -74,9 +77,11 @@ async def download_image(url: str) -> bytes: def resize_frame(frame: Image.Image) -> Image.Image: + """缩放图像,保持宽高比,不超过 MAX_SIZE""" w, h = frame.size if w <= MAX_SIZE and h <= MAX_SIZE: return frame + scale = MAX_SIZE / max(w, h) new_w = int(w * scale) new_h = int(h * scale) @@ -96,14 +101,15 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0): if not img_url: await ytpgif_cmd.send( await UniMessage.text( - "请发送一张图片并使用 /ytpgif,或回复一张图片来生成镜像动图。" + "请发送一张图片或回复一张图片来生成镜像动图。" ).export() ) return try: image_data = await download_image(img_url) - except Exception: + except Exception as e: + print(f"[YTPGIF] 下载失败: {e}") await ytpgif_cmd.send( await UniMessage.text("❌ 图片下载失败,请重试。").export() ) @@ -119,22 +125,47 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0): output_path = tmp_out.name with Image.open(input_path) as src_img: - is_animated = getattr(src_img, "is_animated", False) or src_img.n_frames > 1 + # === 判断是否为动图 === + try: + n_frames = getattr(src_img, "n_frames", 1) + is_animated = n_frames > 1 + except Exception: + is_animated = False output_frames = [] output_durations_ms = [] if is_animated: - # === 动图模式:播放两段,每段最多 BASE_SEGMENT_DURATION * speed 秒,且帧数 ≤ 100 === + # === 动图模式:截取正向 + 镜像两段 === frames_with_duration = [] - for frame in ImageSequence.Iterator(src_img): - rgb_frame = frame.convert("RGB") - resized_frame = resize_frame(rgb_frame) + palette = src_img.getpalette() + + for idx in range(n_frames): + src_img.seek(idx) + frame = src_img.copy() + # 检查是否需要透明通道 + has_alpha = ( + frame.mode in ("RGBA", "LA") + or (frame.mode == "P" and "transparency" in frame.info) + ) + if has_alpha: + frame = frame.convert("RGBA") + else: + frame = frame.convert("RGB") + resized_frame = resize_frame(frame) + + # 若原图有调色板,尝试保留(可选) + if palette and resized_frame.mode == "P": + try: + resized_frame.putpalette(palette) + except Exception: # noqa + pass + ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000)) - dur_sec = max(0.01, ms / 1000.0) # 至少 10ms + dur_sec = max(0.01, ms / 1000.0) frames_with_duration.append((resized_frame, dur_sec)) - max_dur = BASE_SEGMENT_DURATION * speed # 每段最大播放时间 + max_dur = BASE_SEGMENT_DURATION * speed accumulated = 0.0 frame_count = 0 @@ -167,10 +198,10 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0): else: # === 静态图模式:制作翻转动画 === - raw_frame = src_img.convert("RGB") + raw_frame = src_img.convert("RGBA") resized_frame = resize_frame(raw_frame) - interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed)) # 限制在 25ms ~ 2.5s + interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed)) duration_ms = int(interval_sec * 1000) frame1 = resized_frame @@ -185,16 +216,38 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0): ) return - # 保存 GIF - output_frames[0].save( - output_path, - save_all=True, - append_images=output_frames[1:], - format="GIF", - loop=0, - duration=output_durations_ms, - disposal=2, - ) + # === 🔐 关键修复:防止无透明图的颜色被当成透明 === + need_transparency = False + for frame in output_frames: + if frame.mode == "RGBA": + alpha_channel = frame.getchannel("A") + if any(pix < 255 for pix in alpha_channel.getdata()): + need_transparency = True + break + elif frame.mode == "P" and "transparency" in frame.info: + need_transparency = True + break + + # 如果不需要透明,则统一转为 RGB 避免调色板污染 + if not need_transparency: + output_frames = [f.convert("RGB") for f in output_frames] + + # 构建保存参数 + save_kwargs = { + "save_all": True, + "append_images": output_frames[1:], + "format": "GIF", + "loop": 0, # 无限循环 + "duration": output_durations_ms, + "disposal": 2, # 清除到背景色,避免残留 + "optimize": False, # 关闭抖动(等效 -dither none) + } + + # 只有真正需要透明时才启用 transparency + if need_transparency: + save_kwargs["transparency"] = 0 + + output_frames[0].save(output_path, **save_kwargs) # 发送结果 with open(output_path, "rb") as f: @@ -204,9 +257,12 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0): except Exception as e: print(f"[YTPGIF] 处理失败: {e}") await ytpgif_cmd.send( - await UniMessage.text("❌ 处理失败,可能是图片格式不支持或文件过大。").export() + await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export() ) finally: for path in filter(None, [input_path, output_path]): if os.path.exists(path): - os.unlink(path) \ No newline at end of file + try: + os.unlink(path) + except: # noqa + pass \ No newline at end of file