调整 ytpgif 使用共用方法读取图片
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
2025-10-09 19:56:16 +08:00
parent c35ee57976
commit b4e400b626

View File

@ -1,9 +1,9 @@
import os from io import BytesIO
import tempfile
from typing import Optional from typing import Optional
from PIL import Image, ImageSequence from PIL import Image
from nonebot.adapters import Event as BaseEvent from nonebot.adapters import Event as BaseEvent
from nonebot.adapters import Bot as BaseBot
from nonebot.plugin import PluginMetadata from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import ( from nonebot_plugin_alconna import (
Alconna, Alconna,
@ -12,6 +12,9 @@ from nonebot_plugin_alconna import (
UniMessage, UniMessage,
on_alconna, on_alconna,
) )
from returns.result import Failure, Success
from konabot.common.nb.extract_image import extract_image_from_message
__plugin_meta__ = PluginMetadata( __plugin_meta__ = PluginMetadata(
name="ytpgif", name="ytpgif",
@ -60,7 +63,7 @@ async def get_image_url(event: BaseEvent) -> Optional[str]:
if seg.type == "image" and seg.data.get("url"): if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"]) return str(seg.data["url"])
if hasattr(event, "reply") and (reply := event.reply): if hasattr(event, "reply") and (reply := getattr(event, "reply")):
reply_msg = reply.message reply_msg = reply.message
for seg in reply_msg: for seg in reply_msg:
if seg.type == "image" and seg.data.get("url"): if seg.type == "image" and seg.data.get("url"):
@ -89,7 +92,7 @@ def resize_frame(frame: Image.Image) -> Image.Image:
@ytpgif_cmd.handle() @ytpgif_cmd.handle()
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0): async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
# === 校验 speed 范围 === # === 校验 speed 范围 ===
if not (MIN_SPEED <= speed <= MAX_SPEED): if not (MIN_SPEED <= speed <= MAX_SPEED):
await ytpgif_cmd.send( await ytpgif_cmd.send(
@ -97,172 +100,148 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
) )
return return
img_url = await get_image_url(event) match await extract_image_from_message(event.get_message(), event, bot):
if not img_url: case Success(img):
await ytpgif_cmd.send( src_img = img
await UniMessage.text(
"请发送一张图片或回复一张图片来生成镜像动图。" case Failure(msg):
).export() await ytpgif_cmd.send(
) await UniMessage.text(msg).export()
return )
return
case _:
return
try: try:
image_data = await download_image(img_url) try:
except Exception as e: n_frames = getattr(src_img, "n_frames", 1)
print(f"[YTPGIF] 下载失败: {e}") is_animated = n_frames > 1
await ytpgif_cmd.send( except Exception:
await UniMessage.text("❌ 图片下载失败,请重试。").export() is_animated = False
) n_frames = 1
return
input_path = output_path = None output_frames = []
try: output_durations_ms = []
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
tmp_in.write(image_data)
input_path = tmp_in.name
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out: if is_animated:
output_path = tmp_out.name # === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration = []
palette = src_img.getpalette()
with Image.open(input_path) as src_img: for idx in range(n_frames):
# === 判断是否为动图 === src_img.seek(idx)
try: frame = src_img.copy()
n_frames = getattr(src_img, "n_frames", 1) # 检查是否需要透明通道
is_animated = n_frames > 1 has_alpha = (
except Exception: frame.mode in ("RGBA", "LA")
is_animated = False or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
output_frames = [] # 若原图有调色板,尝试保留(可选)
output_durations_ms = [] if palette and resized_frame.mode == "P":
try:
resized_frame.putpalette(palette)
except Exception: # noqa
pass
if is_animated: ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
# === 动图模式:截取正向 + 镜像两段 === dur_sec = max(0.01, ms / 1000.0)
frames_with_duration = [] frames_with_duration.append((resized_frame, dur_sec))
palette = src_img.getpalette()
for idx in range(n_frames): max_dur = BASE_SEGMENT_DURATION * speed
src_img.seek(idx) accumulated = 0.0
frame = src_img.copy() frame_count = 0
# 检查是否需要透明通道
has_alpha = (
frame.mode in ("RGBA", "LA")
or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
# 若原图有调色板,尝试保留(可选) # 正向段
if palette and resized_frame.mode == "P": for img, dur in frames_with_duration:
try: if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
resized_frame.putpalette(palette) break
except Exception: # noqa output_frames.append(img)
pass output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000)) if frame_count == 0:
dur_sec = max(0.01, ms / 1000.0)
frames_with_duration.append((resized_frame, dur_sec))
max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0
frame_count = 0
# 正向段
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
output_frames.append(img)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
if frame_count == 0:
await ytpgif_cmd.send(
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
)
return
# 镜像段(从头开始)
accumulated = 0.0
frame_count = 0
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
output_frames.append(flipped)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
output_frames = [frame1, frame2]
output_durations_ms = [duration_ms, duration_ms]
if len(output_frames) < 1:
await ytpgif_cmd.send( await ytpgif_cmd.send(
await UniMessage.text("未能生成任何帧").export() await UniMessage.text("动图帧太短,无法生成有效片段").export()
) )
return return
# === 🔐 关键修复:防止无透明图的颜色被当成透明 === # 镜像段(从头开始)
need_transparency = False accumulated = 0.0
for frame in output_frames: frame_count = 0
if frame.mode == "RGBA": for img, dur in frames_with_duration:
alpha_channel = frame.getchannel("A") if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
if any(pix < 255 for pix in alpha_channel.getdata()): break
need_transparency = True flipped = img.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
break output_frames.append(flipped)
elif frame.mode == "P" and "transparency" in frame.info: output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
frame2 = resized_frame.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
output_frames = [frame1, frame2]
output_durations_ms = [duration_ms, duration_ms]
if len(output_frames) < 1:
await ytpgif_cmd.send(
await UniMessage.text("未能生成任何帧。").export()
)
return
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
need_transparency = False
for frame in output_frames:
if frame.mode == "RGBA":
alpha_channel = frame.getchannel("A")
if any(pix < 255 for pix in alpha_channel.getdata()):
need_transparency = True need_transparency = True
break break
elif frame.mode == "P" and "transparency" in frame.info:
need_transparency = True
break
# 如果不需要透明,则统一转为 RGB 避免调色板污染 # 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency: if not need_transparency:
output_frames = [f.convert("RGB") for f in output_frames] output_frames = [f.convert("RGB") for f in output_frames]
# 构建保存参数 # 构建保存参数
save_kwargs = { save_kwargs = {
"save_all": True, "save_all": True,
"append_images": output_frames[1:], "append_images": output_frames[1:],
"format": "GIF", "format": "GIF",
"loop": 0, # 无限循环 "loop": 0, # 无限循环
"duration": output_durations_ms, "duration": output_durations_ms,
"disposal": 2, # 清除到背景色,避免残留 "disposal": 2, # 清除到背景色,避免残留
"optimize": False, # 关闭抖动(等效 -dither none "optimize": False, # 关闭抖动(等效 -dither none
} }
# 只有真正需要透明时才启用 transparency # 只有真正需要透明时才启用 transparency
if need_transparency: if need_transparency:
save_kwargs["transparency"] = 0 save_kwargs["transparency"] = 0
output_frames[0].save(output_path, **save_kwargs) bio = BytesIO()
output_frames[0].save(bio, **save_kwargs)
# 发送结果 result_image = UniMessage.image(raw=bio)
with open(output_path, "rb") as f:
result_image = UniMessage.image(raw=f.read())
await ytpgif_cmd.send(await result_image.export()) await ytpgif_cmd.send(await result_image.export())
except Exception as e: except Exception as e:
print(f"[YTPGIF] 处理失败: {e}") print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send( await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export() await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
) )
finally:
for path in filter(None, [input_path, output_path]):
if os.path.exists(path):
try:
os.unlink(path)
except: # noqa
pass