Files
konabot/konabot/plugins/image_process/__init__.py
passthem 2f22f11d57
All checks were successful
continuous-integration/drone/push Build is passing
调整 Gif 图渲染策略
2025-11-15 20:16:42 +08:00

208 lines
6.2 KiB
Python

import re
from io import BytesIO
from typing import Any
import PIL
import PIL.Image
import cv2
import imageio.v3 as iio
from nonebot import on_message
from nonebot.adapters import Bot
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
import numpy
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import DepImageBytes, DepPILImage
from konabot.common.nb.match_keyword import match_keyword
from konabot.common.nb.reply_image import reply_image
# 保持不变
cmd_black_white = on_message(rule=match_keyword("黑白"))
@cmd_black_white.handle()
async def _(img: DepPILImage, bot: Bot):
# 保持不变
await reply_image(cmd_black_white, bot, img.convert("LA"))
# 保持不变
def parse_timestamp(tx: str) -> float | None:
res = 0.0
for component in tx.split(":"):
res *= 60
if not re.match(r"^\d+(\.\d+)?$", component):
return
res += float(component)
return res
# 保持不变
cmd_giftool = on_alconna(
Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
)
)
@cmd_giftool.handle()
async def _(
image: DepImageBytes,
start_point: str | None = None,
frame_count: int | None = None,
length: str | None = None,
speed_factor: float = 1.0,
end_point: str | None = None,
):
ss: None | float = None
if start_point:
ss = parse_timestamp(start_point)
if ss is None:
raise BotExceptionMessage("--ss 的格式不满足条件")
t: None | float = None
if length:
t = parse_timestamp(length)
if t is None:
raise BotExceptionMessage("-t 的格式不满足条件")
to: None | float = None
if end_point:
to = parse_timestamp(end_point)
if to is None:
raise BotExceptionMessage("-to 的格式不满足条件")
if to is not None and ss is not None and to <= ss:
raise BotExceptionMessage("错误:出点时间小于入点")
if frame_count is not None and frame_count <= 0:
raise BotExceptionMessage("错误:帧数量应该大于 0")
if speed_factor == 0:
raise BotExceptionMessage("错误:速度不能为 0")
is_rev = speed_factor < 0
speed_factor = abs(speed_factor)
try:
reader = iio.imread(BytesIO(image), extension=".gif", index=None)
np_frames = list(reader)
_pil = PIL.Image.open(BytesIO(image))
durations: list[float] = []
while True:
try:
duration = _pil.info.get('duration', 20)
durations.append(max(duration, 20) / 1000)
_pil.seek(_pil.tell() + 1)
except EOFError:
break
except Exception:
raise BotExceptionMessage("错误:读取 GIF 帧失败")
##
# 根据开始、结束时间或者帧数量来裁取 GIF 图
begin_time = ss or 0
end_time = sum(durations)
end_time = min(begin_time + (t or end_time), to or end_time, end_time)
accumulated = 0.0
status = 0
sel_np_frames: list[numpy.ndarray[Any, Any]] = []
sel_durations: list[float] = []
for i in range(len(np_frames)):
frame = np_frames[i]
duration = durations[i]
if status == 0:
if accumulated + duration > begin_time:
status = 1
sel_np_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
elif accumulated + duration == begin_time:
status = 1
elif status == 1:
if accumulated + duration >= end_time:
included_duration = end_time - accumulated
if included_duration > 0:
sel_np_frames.append(frame)
sel_durations.append(included_duration)
break
sel_np_frames.append(frame)
sel_durations.append(duration)
accumulated += duration
if not sel_np_frames:
raise BotExceptionMessage("错误:裁取 GIF 帧失败(可能时间设置错误)")
rdur_ms_unprocessed = [dur / speed_factor * 1000 for dur in sel_durations]
rframes: list[numpy.ndarray] = []
rdur_ms: list[int] = []
acc_mod_20 = 0
for i in range(len(sel_np_frames)):
fr = sel_np_frames[i]
du = rdur_ms_unprocessed[i]
if du >= 20:
rframes.append(fr)
rdur_ms.append(int(round(du)))
acc_mod_20 = 0
else:
if acc_mod_20 == 0:
rframes.append(fr)
rdur_ms.append(20)
acc_mod_20 += du
else:
acc_mod_20 += du
if acc_mod_20 >= 20:
acc_mod_20 = 0
if len(rframes) == 1 and len(sel_np_frames) > 1:
middle_index = max(2, len(sel_np_frames) // 2)
rframes.append(sel_np_frames[middle_index])
rdur_ms.append(20)
if is_rev:
rframes = rframes[::-1]
rdur_ms = rdur_ms[::-1]
output_img = BytesIO()
if rframes:
do_transparent = any((f.shape[2] == 4 for f in rframes))
if do_transparent:
rframes = [(
f
if f.shape[2] == 4
else cv2.cvtColor(f, cv2.COLOR_RGB2RGBA)
) for f in rframes]
kwargs = { "transparency": 0, "disposal": 2, "mode": "RGBA" }
else:
kwargs = {}
try:
iio.imwrite(
output_img,
rframes,
extension=".gif",
duration=rdur_ms,
loop=0,
optimize=True,
plugin="pillow",
**kwargs,
)
except Exception as e:
raise BotExceptionMessage(f"错误:写入 GIF 失败: {e}")
else:
raise BotExceptionMessage("错误:没有可输出的帧")
output_img.seek(0)
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())