diff --git a/konabot/plugins/ytpgif/__init__.py b/konabot/plugins/ytpgif/__init__.py new file mode 100644 index 0000000..9126b62 --- /dev/null +++ b/konabot/plugins/ytpgif/__init__.py @@ -0,0 +1,212 @@ +import os +import tempfile +from typing import Optional + +from PIL import Image, ImageSequence +from nonebot.adapters import Event as BaseEvent +from nonebot.plugin import PluginMetadata +from nonebot_plugin_alconna import ( + Alconna, + Args, + Field, + UniMessage, + on_alconna, +) + +__plugin_meta__ = PluginMetadata( + name="ytpgif", + description="生成来回镜像翻转的动图:动图按时间分段播放,静态图高频翻转(带参数范围保护)", + usage="/ytpgif [倍速=1.0] (倍速范围:0.1~20.0)", + type="application", + config=None, + homepage=None, +) + +# 参数定义(带硬性限制) +BASE_SEGMENT_DURATION = 0.25 +BASE_INTERVAL = 0.25 +MAX_SIZE = 256 +MIN_SPEED = 0.1 +MAX_SPEED = 20.0 +MAX_FRAMES_PER_SEGMENT = 500 + +# 提示语 +SPEED_TIPS = f"倍速必须是 {MIN_SPEED} 到 {MAX_SPEED} 之间的数字" + + +# 定义命令 + 参数校验 +ytpgif_cmd = on_alconna( + Alconna( + "ytpgif", + Args[ + "speed?", + float, + Field( + default=1.0, + unmatch_tips=lambda x: f"“{x}”不是有效数值。{SPEED_TIPS}", + ), + ], + ), + use_cmd_start=True, + use_cmd_sep=False, + skip_for_unmatch=False, +) + + +async def get_image_url(event: BaseEvent) -> Optional[str]: + msg = event.get_message() + for seg in msg: + if seg.type == "image" and seg.data.get("url"): + return str(seg.data["url"]) + if hasattr(event, "reply") and (reply := event.reply): + for seg in reply.message: + if seg.type == "image" and seg.data.get("url"): + return str(seg.data["url"]) + return None + + +async def download_image(url: str) -> bytes: + import httpx + async with httpx.AsyncClient() as client: + resp = await client.get(url, timeout=10) + resp.raise_for_status() + return resp.content + + +def resize_frame(frame: Image.Image) -> Image.Image: + w, h = frame.size + if w <= MAX_SIZE and h <= MAX_SIZE: + return frame + scale = MAX_SIZE / max(w, h) + new_w = int(w * scale) + new_h = int(h * scale) + return frame.resize((new_w, new_h), Image.Resampling.LANCZOS) + + +@ytpgif_cmd.handle() +async def handle_ytpgif(event: BaseEvent, speed: float = 1.0): + # === 校验 speed 范围 === + if not (MIN_SPEED <= speed <= MAX_SPEED): + await ytpgif_cmd.send( + await UniMessage.text(f"❌ {SPEED_TIPS}").export() + ) + return + + img_url = await get_image_url(event) + if not img_url: + await ytpgif_cmd.send( + await UniMessage.text( + "请发送一张图片并使用 /ytpgif,或回复一张图片来生成镜像动图。" + ).export() + ) + return + + try: + image_data = await download_image(img_url) + except Exception: + await ytpgif_cmd.send( + await UniMessage.text("❌ 图片下载失败,请重试。").export() + ) + return + + input_path = output_path = None + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in: + tmp_in.write(image_data) + input_path = tmp_in.name + + with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out: + output_path = tmp_out.name + + with Image.open(input_path) as src_img: + is_animated = getattr(src_img, "is_animated", False) or src_img.n_frames > 1 + + output_frames = [] + output_durations_ms = [] + + if is_animated: + # === 动图模式:播放两段,每段最多 BASE_SEGMENT_DURATION * speed 秒,且帧数 ≤ 100 === + frames_with_duration = [] + for frame in ImageSequence.Iterator(src_img): + rgb_frame = frame.convert("RGB") + resized_frame = resize_frame(rgb_frame) + ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000)) + dur_sec = max(0.01, ms / 1000.0) # 至少 10ms + frames_with_duration.append((resized_frame, dur_sec)) + + max_dur = BASE_SEGMENT_DURATION * speed # 每段最大播放时间 + accumulated = 0.0 + frame_count = 0 + + # 正向段 + for img, dur in frames_with_duration: + if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT: + break + output_frames.append(img) + output_durations_ms.append(int(dur * 1000)) + accumulated += dur + frame_count += 1 + + if frame_count == 0: + await ytpgif_cmd.send( + await UniMessage.text("动图帧太短,无法生成有效片段。").export() + ) + return + + # 镜像段(从头开始) + accumulated = 0.0 + frame_count = 0 + for img, dur in frames_with_duration: + if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT: + break + flipped = img.transpose(Image.FLIP_LEFT_RIGHT) + output_frames.append(flipped) + output_durations_ms.append(int(dur * 1000)) + accumulated += dur + frame_count += 1 + + else: + # === 静态图模式:制作翻转动画 === + raw_frame = src_img.convert("RGB") + resized_frame = resize_frame(raw_frame) + + interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed)) # 限制在 25ms ~ 2.5s + duration_ms = int(interval_sec * 1000) + + frame1 = resized_frame + frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT) + + output_frames = [frame1, frame2] + output_durations_ms = [duration_ms, duration_ms] + + if len(output_frames) < 1: + await ytpgif_cmd.send( + await UniMessage.text("未能生成任何帧。").export() + ) + return + + # 保存 GIF + output_frames[0].save( + output_path, + save_all=True, + append_images=output_frames[1:], + format="GIF", + loop=0, + duration=output_durations_ms, + disposal=2, + ) + + # 发送结果 + with open(output_path, "rb") as f: + result_image = UniMessage.image(raw=f.read()) + await ytpgif_cmd.send(await result_image.export()) + + except Exception as e: + print(f"[YTPGIF] 处理失败: {e}") + await ytpgif_cmd.send( + await UniMessage.text("❌ 处理失败,可能是图片格式不支持或文件过大。").export() + ) + finally: + for path in filter(None, [input_path, output_path]): + if os.path.exists(path): + os.unlink(path) \ No newline at end of file