This commit is contained in:
@ -1,24 +1,32 @@
|
||||
import re
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
|
||||
import PIL
|
||||
import PIL.Image
|
||||
import cv2
|
||||
import imageio.v3 as iio
|
||||
from nonebot import on_message
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
|
||||
import numpy
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.nb.extract_image import PIL_Image
|
||||
from konabot.common.nb.extract_image import DepImageBytes, DepPILImage
|
||||
from konabot.common.nb.match_keyword import match_keyword
|
||||
from konabot.common.nb.reply_image import reply_image
|
||||
|
||||
# 保持不变
|
||||
cmd_black_white = on_message(rule=match_keyword("黑白"))
|
||||
|
||||
|
||||
@cmd_black_white.handle()
|
||||
async def _(img: PIL_Image, bot: Bot):
|
||||
async def _(img: DepPILImage, bot: Bot):
|
||||
# 保持不变
|
||||
await reply_image(cmd_black_white, bot, img.convert("LA"))
|
||||
|
||||
|
||||
# 保持不变
|
||||
def parse_timestamp(tx: str) -> float | None:
|
||||
res = 0.0
|
||||
for component in tx.split(":"):
|
||||
@ -29,6 +37,7 @@ def parse_timestamp(tx: str) -> float | None:
|
||||
return res
|
||||
|
||||
|
||||
# 保持不变
|
||||
cmd_giftool = on_alconna(
|
||||
Alconna(
|
||||
"giftool",
|
||||
@ -44,7 +53,7 @@ cmd_giftool = on_alconna(
|
||||
|
||||
@cmd_giftool.handle()
|
||||
async def _(
|
||||
image: PIL_Image,
|
||||
image: DepImageBytes,
|
||||
start_point: str | None = None,
|
||||
frame_count: int | None = None,
|
||||
length: str | None = None,
|
||||
@ -79,28 +88,24 @@ async def _(
|
||||
is_rev = speed_factor < 0
|
||||
speed_factor = abs(speed_factor)
|
||||
|
||||
if not getattr(image, "is_animated", False):
|
||||
raise BotExceptionMessage("错误:输入的不是动图(GIF)")
|
||||
|
||||
##
|
||||
# 从这里开始,采样整个 GIF 图
|
||||
frames: list[PIL.Image.Image] = []
|
||||
durations: list[float] = []
|
||||
try:
|
||||
for i in range(getattr(image, "n_frames")):
|
||||
image.seek(i)
|
||||
frames.append(image.copy())
|
||||
duration = image.info.get("duration", 100) / 1000
|
||||
durations.append(duration)
|
||||
except EOFError:
|
||||
pass
|
||||
if not frames:
|
||||
reader = iio.imread(BytesIO(image), extension=".gif", index=None)
|
||||
np_frames = list(reader)
|
||||
|
||||
_pil = PIL.Image.open(BytesIO(image))
|
||||
durations: list[float] = []
|
||||
while True:
|
||||
try:
|
||||
duration = _pil.info.get('duration', 20)
|
||||
durations.append(max(duration, 20) / 1000)
|
||||
_pil.seek(_pil.tell() + 1)
|
||||
except EOFError:
|
||||
break
|
||||
except Exception:
|
||||
raise BotExceptionMessage("错误:读取 GIF 帧失败")
|
||||
# 采样结束
|
||||
|
||||
##
|
||||
# 根据开始、结束时间或者帧数量来裁取 GIF 图
|
||||
|
||||
begin_time = ss or 0
|
||||
end_time = sum(durations)
|
||||
end_time = min(begin_time + (t or end_time), to or end_time, end_time)
|
||||
@ -108,94 +113,95 @@ async def _(
|
||||
accumulated = 0.0
|
||||
status = 0
|
||||
|
||||
sel_frames: list[PIL.Image.Image] = []
|
||||
sel_np_frames: list[numpy.ndarray[Any, Any]] = []
|
||||
sel_durations: list[float] = []
|
||||
|
||||
for i in range(len(frames)):
|
||||
frame = frames[i]
|
||||
for i in range(len(np_frames)):
|
||||
frame = np_frames[i]
|
||||
duration = durations[i]
|
||||
|
||||
if status == 0:
|
||||
if accumulated + duration > begin_time:
|
||||
status = 1
|
||||
sel_frames.append(frame)
|
||||
sel_durations.append(accumulated + duration - begin_time)
|
||||
sel_np_frames.append(frame)
|
||||
sel_durations.append(accumulated + duration - begin_time)
|
||||
elif accumulated + duration == begin_time:
|
||||
status = 1
|
||||
elif status == 1:
|
||||
if accumulated + duration > end_time:
|
||||
sel_frames.append(frame)
|
||||
sel_durations.append(end_time - accumulated)
|
||||
if accumulated + duration >= end_time:
|
||||
included_duration = end_time - accumulated
|
||||
if included_duration > 0:
|
||||
sel_np_frames.append(frame)
|
||||
sel_durations.append(included_duration)
|
||||
break
|
||||
sel_frames.append(frame)
|
||||
sel_np_frames.append(frame)
|
||||
sel_durations.append(duration)
|
||||
|
||||
accumulated += duration
|
||||
|
||||
##
|
||||
# 加速!
|
||||
sel_durations = [dur / speed_factor * 1000 for dur in durations]
|
||||
if not sel_np_frames:
|
||||
raise BotExceptionMessage("错误:裁取 GIF 帧失败(可能时间设置错误)")
|
||||
|
||||
rframes = []
|
||||
rdur = []
|
||||
rdur_ms_unprocessed = [dur / speed_factor * 1000 for dur in sel_durations]
|
||||
rframes: list[numpy.ndarray] = []
|
||||
rdur_ms: list[int] = []
|
||||
|
||||
acc_mod_20 = 0
|
||||
|
||||
for i in range(len(sel_frames)):
|
||||
fr = sel_frames[i]
|
||||
du = round(sel_durations[i])
|
||||
for i in range(len(sel_np_frames)):
|
||||
fr = sel_np_frames[i]
|
||||
du = rdur_ms_unprocessed[i]
|
||||
|
||||
if du >= 20:
|
||||
rframes.append(fr)
|
||||
rdur.append(int(du))
|
||||
rdur_ms.append(int(round(du)))
|
||||
acc_mod_20 = 0
|
||||
else:
|
||||
if acc_mod_20 == 0:
|
||||
rframes.append(fr)
|
||||
rdur.append(20)
|
||||
rdur_ms.append(20)
|
||||
acc_mod_20 += du
|
||||
else:
|
||||
acc_mod_20 += du
|
||||
if acc_mod_20 >= 20:
|
||||
acc_mod_20 = 0
|
||||
|
||||
if len(rframes) == 1 and len(sel_frames) > 1:
|
||||
rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
|
||||
rdur.append(20)
|
||||
|
||||
##
|
||||
# 收尾:看看透明度这块
|
||||
transparency_flag = False
|
||||
for f in rframes:
|
||||
if f.mode == "RGBA":
|
||||
if any(pix < 255 for pix in f.getchannel("A").getdata()):
|
||||
transparency_flag = True
|
||||
break
|
||||
elif f.mode == "P" and "transparency" in f.info:
|
||||
transparency_flag = True
|
||||
break
|
||||
|
||||
tf = {}
|
||||
if transparency_flag:
|
||||
tf["transparency"] = 0
|
||||
if len(rframes) == 1 and len(sel_np_frames) > 1:
|
||||
middle_index = max(2, len(sel_np_frames) // 2)
|
||||
rframes.append(sel_np_frames[middle_index])
|
||||
rdur_ms.append(20)
|
||||
|
||||
if is_rev:
|
||||
rframes = rframes[::-1]
|
||||
rdur = rdur[::-1]
|
||||
rdur_ms = rdur_ms[::-1]
|
||||
|
||||
output_img = BytesIO()
|
||||
|
||||
if rframes:
|
||||
rframes[0].save(
|
||||
output_img,
|
||||
format="GIF",
|
||||
save_all=True,
|
||||
append_images=rframes[1:],
|
||||
duration=rdur,
|
||||
loop=0,
|
||||
optimize=False,
|
||||
disposal=2,
|
||||
**tf,
|
||||
)
|
||||
do_transparent = any((f.shape[2] == 4 for f in rframes))
|
||||
if do_transparent:
|
||||
rframes = [(
|
||||
f
|
||||
if f.shape[2] == 4
|
||||
else cv2.cvtColor(f, cv2.COLOR_RGB2RGBA)
|
||||
) for f in rframes]
|
||||
kwargs = { "transparency": 0, "disposal": 2, "mode": "RGBA" }
|
||||
else:
|
||||
kwargs = {}
|
||||
try:
|
||||
iio.imwrite(
|
||||
output_img,
|
||||
rframes,
|
||||
extension=".gif",
|
||||
duration=rdur_ms,
|
||||
loop=0,
|
||||
optimize=True,
|
||||
plugin="pillow",
|
||||
**kwargs,
|
||||
)
|
||||
except Exception as e:
|
||||
raise BotExceptionMessage(f"错误:写入 GIF 失败: {e}")
|
||||
else:
|
||||
raise BotExceptionMessage("错误:没有可输出的帧")
|
||||
output_img.seek(0)
|
||||
|
||||
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())
|
||||
|
||||
Reference in New Issue
Block a user