@ -15,14 +15,14 @@ from nonebot_plugin_alconna import (
__plugin_meta__ = PluginMetadata (
name = " ytpgif " ,
description = " 生成来回镜像翻转的动图:动图按时间分段播放,静态图高频翻转(带参数范围保护) " ,
usage = " / ytpgif [倍速=1.0] ( 倍速范围: 0.1~ 20.0) " ,
description = " 生成来回镜像翻转的仿 YTPMV 动图。 " ,
usage = " ytpgif [倍速=1.0] ( 倍速范围: 0.1~ 20.0) " ,
type = " application " ,
config = None ,
homepage = None ,
)
# 参数定义(带硬性限制)
# 参数定义
BASE_SEGMENT_DURATION = 0.25
BASE_INTERVAL = 0.25
MAX_SIZE = 256
@ -54,12 +54,15 @@ ytpgif_cmd = on_alconna(
async def get_image_url ( event : BaseEvent ) - > Optional [ str ] :
""" 从事件中提取图片 URL, 支持直接消息和回复 """
msg = event . get_message ( )
for seg in msg :
if seg . type == " image " and seg . data . get ( " url " ) :
return str ( seg . data [ " url " ] )
if hasattr ( event , " reply " ) and ( reply := event . reply ) :
for seg in reply . message :
reply_msg = reply . message
for seg in reply_msg :
if seg . type == " image " and seg . data . get ( " url " ) :
return str ( seg . data [ " url " ] )
return None
@ -74,9 +77,11 @@ async def download_image(url: str) -> bytes:
def resize_frame ( frame : Image . Image ) - > Image . Image :
""" 缩放图像,保持宽高比,不超过 MAX_SIZE """
w , h = frame . size
if w < = MAX_SIZE and h < = MAX_SIZE :
return frame
scale = MAX_SIZE / max ( w , h )
new_w = int ( w * scale )
new_h = int ( h * scale )
@ -96,14 +101,15 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
if not img_url :
await ytpgif_cmd . send (
await UniMessage . text (
" 请发送一张图片并使用 /ytpgif, 或回复一张图片来生成镜像动图。 "
" 请发送一张图片或回复一张图片来生成镜像动图。 "
) . export ( )
)
return
try :
image_data = await download_image ( img_url )
except Exception :
except Exception as e :
print ( f " [YTPGIF] 下载失败: { e } " )
await ytpgif_cmd . send (
await UniMessage . text ( " ❌ 图片下载失败,请重试。 " ) . export ( )
)
@ -119,22 +125,47 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
output_path = tmp_out . name
with Image . open ( input_path ) as src_img :
is_animated = getattr ( src_img , " is_animated " , False ) or src_img . n_frames > 1
# === 判断是否为动图 ===
try :
n_frames = getattr ( src_img , " n_frames " , 1 )
is_animated = n_frames > 1
except Exception :
is_animated = False
output_frames = [ ]
output_durations_ms = [ ]
if is_animated :
# === 动图模式:播放两段,每段最多 BASE_SEGMENT_DURATION * speed 秒,且帧数 ≤ 100 ===
# === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration = [ ]
for frame in ImageSequence . Iterator ( src_img ) :
rgb_frame = frame . convert ( " RGB " )
resized_frame = resize_fram e ( rgb _frame)
palette = src_img . getpalette ( )
for idx in rang e( n _frames ) :
src_img . seek ( idx )
frame = src_img . copy ( )
# 检查是否需要透明通道
has_alpha = (
frame . mode in ( " RGBA " , " LA " )
or ( frame . mode == " P " and " transparency " in frame . info )
)
if has_alpha :
frame = frame . convert ( " RGBA " )
else :
frame = frame . convert ( " RGB " )
resized_frame = resize_frame ( frame )
# 若原图有调色板,尝试保留(可选)
if palette and resized_frame . mode == " P " :
try :
resized_frame . putpalette ( palette )
except Exception : # noqa
pass
ms = frame . info . get ( " duration " , int ( BASE_SEGMENT_DURATION * 1000 ) )
dur_sec = max ( 0.01 , ms / 1000.0 ) # 至少 10ms
dur_sec = max ( 0.01 , ms / 1000.0 )
frames_with_duration . append ( ( resized_frame , dur_sec ) )
max_dur = BASE_SEGMENT_DURATION * speed # 每段最大播放时间
max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0
frame_count = 0
@ -167,10 +198,10 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
else :
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img . convert ( " RGB " )
raw_frame = src_img . convert ( " RGBA " )
resized_frame = resize_frame ( raw_frame )
interval_sec = max ( 0.025 , min ( 2.5 , BASE_INTERVAL / speed ) ) # 限制在 25ms ~ 2.5s
interval_sec = max ( 0.025 , min ( 2.5 , BASE_INTERVAL / speed ) )
duration_ms = int ( interval_sec * 1000 )
frame1 = resized_frame
@ -185,16 +216,38 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
)
return
# 保存 GIF
output_frames [ 0 ] . save (
output_path ,
save_all = True ,
append_images = output_frames [ 1 : ] ,
format = " GIF " ,
loop = 0 ,
duration = output_durations_ms ,
disposal = 2 ,
)
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
need_transparency = False
for frame in output_frames :
if frame . mode == " RGBA " :
alpha_channel = frame . getchannel ( " A " )
if any ( pix < 255 for pix in alpha_channel . getdata ( ) ) :
need_transparency = True
break
elif frame . mode == " P " and " transparency " in frame . info :
need_transparency = True
break
# 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency :
output_frames = [ f . convert ( " RGB " ) for f in output_frames ]
# 构建保存参数
save_kwargs = {
" save_all " : True ,
" append_images " : output_frames [ 1 : ] ,
" format " : " GIF " ,
" loop " : 0 , # 无限循环
" duration " : output_durations_ms ,
" disposal " : 2 , # 清除到背景色,避免残留
" optimize " : False , # 关闭抖动(等效 -dither none)
}
# 只有真正需要透明时才启用 transparency
if need_transparency :
save_kwargs [ " transparency " ] = 0
output_frames [ 0 ] . save ( output_path , * * save_kwargs )
# 发送结果
with open ( output_path , " rb " ) as f :
@ -204,9 +257,12 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
except Exception as e :
print ( f " [YTPGIF] 处理失败: { e } " )
await ytpgif_cmd . send (
await UniMessage . text ( " ❌ 处理失败,可能是图片格式不支持或 文件过大。 " ) . export ( )
await UniMessage . text ( " ❌ 处理失败,可能是图片格式不支持、 文件损坏或 过大。 " ) . export ( )
)
finally :
for path in filter ( None , [ input_path , output_path ] ) :
if os . path . exists ( path ) :
os . unlink ( path )
try :
os . unlink ( path )
except : # noqa
pass