diff --git a/konabot/plugins/ytpgif/__init__.py b/konabot/plugins/ytpgif/__init__.py index 4390dac..2be0a62 100644 --- a/konabot/plugins/ytpgif/__init__.py +++ b/konabot/plugins/ytpgif/__init__.py @@ -1,9 +1,9 @@ -import os -import tempfile +from io import BytesIO from typing import Optional -from PIL import Image, ImageSequence +from PIL import Image from nonebot.adapters import Event as BaseEvent +from nonebot.adapters import Bot as BaseBot from nonebot.plugin import PluginMetadata from nonebot_plugin_alconna import ( Alconna, @@ -12,6 +12,9 @@ from nonebot_plugin_alconna import ( UniMessage, on_alconna, ) +from returns.result import Failure, Success + +from konabot.common.nb.extract_image import extract_image_from_message __plugin_meta__ = PluginMetadata( name="ytpgif", @@ -60,7 +63,7 @@ async def get_image_url(event: BaseEvent) -> Optional[str]: if seg.type == "image" and seg.data.get("url"): return str(seg.data["url"]) - if hasattr(event, "reply") and (reply := event.reply): + if hasattr(event, "reply") and (reply := getattr(event, "reply")): reply_msg = reply.message for seg in reply_msg: if seg.type == "image" and seg.data.get("url"): @@ -89,7 +92,7 @@ def resize_frame(frame: Image.Image) -> Image.Image: @ytpgif_cmd.handle() -async def handle_ytpgif(event: BaseEvent, speed: float = 1.0): +async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0): # === 校验 speed 范围 === if not (MIN_SPEED <= speed <= MAX_SPEED): await ytpgif_cmd.send( @@ -97,172 +100,148 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0): ) return - img_url = await get_image_url(event) - if not img_url: - await ytpgif_cmd.send( - await UniMessage.text( - "请发送一张图片或回复一张图片来生成镜像动图。" - ).export() - ) - return + match await extract_image_from_message(event.get_message(), event, bot): + case Success(img): + src_img = img + + case Failure(msg): + await ytpgif_cmd.send( + await UniMessage.text(msg).export() + ) + return + + case _: + return try: - image_data = await download_image(img_url) - except Exception as e: - print(f"[YTPGIF] 下载失败: {e}") - await ytpgif_cmd.send( - await UniMessage.text("❌ 图片下载失败,请重试。").export() - ) - return + try: + n_frames = getattr(src_img, "n_frames", 1) + is_animated = n_frames > 1 + except Exception: + is_animated = False + n_frames = 1 - input_path = output_path = None - try: - with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in: - tmp_in.write(image_data) - input_path = tmp_in.name + output_frames = [] + output_durations_ms = [] - with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out: - output_path = tmp_out.name + if is_animated: + # === 动图模式:截取正向 + 镜像两段 === + frames_with_duration = [] + palette = src_img.getpalette() - with Image.open(input_path) as src_img: - # === 判断是否为动图 === - try: - n_frames = getattr(src_img, "n_frames", 1) - is_animated = n_frames > 1 - except Exception: - is_animated = False + for idx in range(n_frames): + src_img.seek(idx) + frame = src_img.copy() + # 检查是否需要透明通道 + has_alpha = ( + frame.mode in ("RGBA", "LA") + or (frame.mode == "P" and "transparency" in frame.info) + ) + if has_alpha: + frame = frame.convert("RGBA") + else: + frame = frame.convert("RGB") + resized_frame = resize_frame(frame) - output_frames = [] - output_durations_ms = [] + # 若原图有调色板,尝试保留(可选) + if palette and resized_frame.mode == "P": + try: + resized_frame.putpalette(palette) + except Exception: # noqa + pass - if is_animated: - # === 动图模式:截取正向 + 镜像两段 === - frames_with_duration = [] - palette = src_img.getpalette() + ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000)) + dur_sec = max(0.01, ms / 1000.0) + frames_with_duration.append((resized_frame, dur_sec)) - for idx in range(n_frames): - src_img.seek(idx) - frame = src_img.copy() - # 检查是否需要透明通道 - has_alpha = ( - frame.mode in ("RGBA", "LA") - or (frame.mode == "P" and "transparency" in frame.info) - ) - if has_alpha: - frame = frame.convert("RGBA") - else: - frame = frame.convert("RGB") - resized_frame = resize_frame(frame) + max_dur = BASE_SEGMENT_DURATION * speed + accumulated = 0.0 + frame_count = 0 - # 若原图有调色板,尝试保留(可选) - if palette and resized_frame.mode == "P": - try: - resized_frame.putpalette(palette) - except Exception: # noqa - pass + # 正向段 + for img, dur in frames_with_duration: + if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT: + break + output_frames.append(img) + output_durations_ms.append(int(dur * 1000)) + accumulated += dur + frame_count += 1 - ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000)) - dur_sec = max(0.01, ms / 1000.0) - frames_with_duration.append((resized_frame, dur_sec)) - - max_dur = BASE_SEGMENT_DURATION * speed - accumulated = 0.0 - frame_count = 0 - - # 正向段 - for img, dur in frames_with_duration: - if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT: - break - output_frames.append(img) - output_durations_ms.append(int(dur * 1000)) - accumulated += dur - frame_count += 1 - - if frame_count == 0: - await ytpgif_cmd.send( - await UniMessage.text("动图帧太短,无法生成有效片段。").export() - ) - return - - # 镜像段(从头开始) - accumulated = 0.0 - frame_count = 0 - for img, dur in frames_with_duration: - if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT: - break - flipped = img.transpose(Image.FLIP_LEFT_RIGHT) - output_frames.append(flipped) - output_durations_ms.append(int(dur * 1000)) - accumulated += dur - frame_count += 1 - - else: - # === 静态图模式:制作翻转动画 === - raw_frame = src_img.convert("RGBA") - resized_frame = resize_frame(raw_frame) - - interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed)) - duration_ms = int(interval_sec * 1000) - - frame1 = resized_frame - frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT) - - output_frames = [frame1, frame2] - output_durations_ms = [duration_ms, duration_ms] - - if len(output_frames) < 1: + if frame_count == 0: await ytpgif_cmd.send( - await UniMessage.text("未能生成任何帧。").export() + await UniMessage.text("动图帧太短,无法生成有效片段。").export() ) return - # === 🔐 关键修复:防止无透明图的颜色被当成透明 === - need_transparency = False - for frame in output_frames: - if frame.mode == "RGBA": - alpha_channel = frame.getchannel("A") - if any(pix < 255 for pix in alpha_channel.getdata()): - need_transparency = True - break - elif frame.mode == "P" and "transparency" in frame.info: + # 镜像段(从头开始) + accumulated = 0.0 + frame_count = 0 + for img, dur in frames_with_duration: + if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT: + break + flipped = img.transpose(Image.Transpose.FLIP_LEFT_RIGHT) + output_frames.append(flipped) + output_durations_ms.append(int(dur * 1000)) + accumulated += dur + frame_count += 1 + + else: + # === 静态图模式:制作翻转动画 === + raw_frame = src_img.convert("RGBA") + resized_frame = resize_frame(raw_frame) + + interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed)) + duration_ms = int(interval_sec * 1000) + + frame1 = resized_frame + frame2 = resized_frame.transpose(Image.Transpose.FLIP_LEFT_RIGHT) + + output_frames = [frame1, frame2] + output_durations_ms = [duration_ms, duration_ms] + + if len(output_frames) < 1: + await ytpgif_cmd.send( + await UniMessage.text("未能生成任何帧。").export() + ) + return + + # === 🔐 关键修复:防止无透明图的颜色被当成透明 === + need_transparency = False + for frame in output_frames: + if frame.mode == "RGBA": + alpha_channel = frame.getchannel("A") + if any(pix < 255 for pix in alpha_channel.getdata()): need_transparency = True break + elif frame.mode == "P" and "transparency" in frame.info: + need_transparency = True + break - # 如果不需要透明,则统一转为 RGB 避免调色板污染 - if not need_transparency: - output_frames = [f.convert("RGB") for f in output_frames] + # 如果不需要透明,则统一转为 RGB 避免调色板污染 + if not need_transparency: + output_frames = [f.convert("RGB") for f in output_frames] - # 构建保存参数 - save_kwargs = { - "save_all": True, - "append_images": output_frames[1:], - "format": "GIF", - "loop": 0, # 无限循环 - "duration": output_durations_ms, - "disposal": 2, # 清除到背景色,避免残留 - "optimize": False, # 关闭抖动(等效 -dither none) - } + # 构建保存参数 + save_kwargs = { + "save_all": True, + "append_images": output_frames[1:], + "format": "GIF", + "loop": 0, # 无限循环 + "duration": output_durations_ms, + "disposal": 2, # 清除到背景色,避免残留 + "optimize": False, # 关闭抖动(等效 -dither none) + } - # 只有真正需要透明时才启用 transparency - if need_transparency: - save_kwargs["transparency"] = 0 + # 只有真正需要透明时才启用 transparency + if need_transparency: + save_kwargs["transparency"] = 0 - output_frames[0].save(output_path, **save_kwargs) - - # 发送结果 - with open(output_path, "rb") as f: - result_image = UniMessage.image(raw=f.read()) + bio = BytesIO() + output_frames[0].save(bio, **save_kwargs) + result_image = UniMessage.image(raw=bio) await ytpgif_cmd.send(await result_image.export()) - except Exception as e: print(f"[YTPGIF] 处理失败: {e}") await ytpgif_cmd.send( await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export() - ) - finally: - for path in filter(None, [input_path, output_path]): - if os.path.exists(path): - try: - os.unlink(path) - except: # noqa - pass \ No newline at end of file + ) \ No newline at end of file