Files
konabot/konabot/plugins/ytpgif/__init__.py
2025-10-12 11:02:51 +08:00

220 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from io import BytesIO
from loguru import logger
from nonebot.adapters import Bot as BaseBot
from nonebot.adapters import Event as BaseEvent
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
from PIL import Image
from returns.result import Failure, Success
from konabot.common.nb.extract_image import extract_image_from_message
__plugin_meta__ = PluginMetadata(
name="ytpgif",
description="生成来回镜像翻转的仿 YTPMV 动图。",
usage="ytpgif [倍速=1.0] 倍速范围0.120.0",
type="application",
config=None,
homepage=None,
)
# 参数定义
BASE_SEGMENT_DURATION = 0.25
BASE_INTERVAL = 0.25
MAX_SIZE = 256
MIN_SPEED = 0.1
MAX_SPEED = 20.0
MAX_FRAMES_PER_SEGMENT = 500
# 提示语
SPEED_TIPS = f"倍速必须是 {MIN_SPEED}{MAX_SPEED} 之间的数字"
# 定义命令 + 参数校验
ytpgif_cmd = on_alconna(
Alconna(
"ytpgif",
Args[
"speed?",
float,
Field(
default=1.0,
unmatch_tips=lambda x: f"{x}”不是有效数值。{SPEED_TIPS}",
),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
)
def resize_frame(frame: Image.Image) -> Image.Image:
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
w, h = frame.size
if w <= MAX_SIZE and h <= MAX_SIZE:
return frame
scale = MAX_SIZE / max(w, h)
new_w = int(w * scale)
new_h = int(h * scale)
return frame.resize((new_w, new_h), Image.Resampling.LANCZOS)
@ytpgif_cmd.handle()
async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
# === 校验 speed 范围 ===
if not (MIN_SPEED <= speed <= MAX_SPEED):
await ytpgif_cmd.send(
await UniMessage.text(f"{SPEED_TIPS}").export()
)
return
match await extract_image_from_message(event.get_message(), event, bot):
case Success(img):
src_img = img
case Failure(msg):
await ytpgif_cmd.send(
await UniMessage.text(msg).export()
)
return
case _:
return
try:
try:
n_frames = getattr(src_img, "n_frames", 1)
is_animated = n_frames > 1
logger.debug(f"收到的动图的运动状态:{is_animated} 帧数量:{n_frames}")
except Exception:
is_animated = False
n_frames = 1
output_frames = []
output_durations_ms = []
if is_animated:
# === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration: list[tuple[Image.Image, float]] = []
palette = src_img.getpalette()
for idx in range(n_frames):
src_img.seek(idx)
frame = src_img.copy()
# 检查是否需要透明通道
has_alpha = (
frame.mode in ("RGBA", "LA")
or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
# 若原图有调色板,尝试保留(可选)
if palette and resized_frame.mode == "P":
try:
resized_frame.putpalette(palette)
except Exception: # noqa
logger.debug("色板应用失败")
pass
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
dur_sec = max(0.01, ms / 1000.0)
frames_with_duration.append((resized_frame, dur_sec))
max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0
frame_count = 0
# 正向段
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
output_frames.append(img)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
if frame_count == 0:
await ytpgif_cmd.send(
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
)
return
# 镜像段(从头开始)
accumulated = 0.0
frame_count = 0
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
flipped = img.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
output_frames.append(flipped)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
frame2 = resized_frame.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
output_frames = [frame1, frame2]
output_durations_ms = [duration_ms, duration_ms]
if len(output_frames) < 1:
await ytpgif_cmd.send(
await UniMessage.text("未能生成任何帧。").export()
)
return
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
need_transparency = False
for frame in output_frames:
if frame.mode == "RGBA":
alpha_channel = frame.getchannel("A")
if any(pix < 255 for pix in alpha_channel.getdata()):
need_transparency = True
break
elif frame.mode == "P" and "transparency" in frame.info:
need_transparency = True
break
# 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency:
output_frames = [f.convert("RGB") for f in output_frames]
# 构建保存参数
save_kwargs = {
"save_all": True,
"append_images": output_frames[1:],
"format": "GIF",
"loop": 0, # 无限循环
"duration": output_durations_ms,
"disposal": 2, # 清除到背景色,避免残留
"optimize": False, # 关闭抖动(等效 -dither none
}
# 只有真正需要透明时才启用 transparency
if need_transparency:
save_kwargs["transparency"] = 0
bio = BytesIO()
output_frames[0].save(bio, **save_kwargs)
result_image = UniMessage.image(raw=bio)
await ytpgif_cmd.send(await result_image.export())
except Exception as e:
print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
)