forked from mttu-developers/konabot
205 lines
6.1 KiB
Python
205 lines
6.1 KiB
Python
import re
|
||
from io import BytesIO
|
||
|
||
from nonebot import on_message
|
||
from nonebot.adapters import Bot
|
||
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
|
||
on_alconna)
|
||
|
||
from konabot.common.nb.exc import BotExceptionMessage
|
||
from konabot.common.nb.extract_image import PIL_Image
|
||
from konabot.common.nb.match_keyword import match_keyword
|
||
from konabot.common.nb.reply_image import reply_image
|
||
|
||
cmd_black_white = on_message(rule=match_keyword("黑白"))
|
||
|
||
|
||
@cmd_black_white.handle()
|
||
async def _(img: PIL_Image, bot: Bot):
|
||
await reply_image(cmd_black_white, bot, img.convert("LA"))
|
||
|
||
|
||
def parse_timestamp(tx: str) -> float | None:
|
||
res = 0.0
|
||
for component in tx.split(":"):
|
||
res *= 60
|
||
if not re.match(r"^\d+(\.\d+)?$", component):
|
||
return
|
||
res += float(component)
|
||
return res
|
||
|
||
|
||
cmd_giftool = on_alconna(Alconna(
|
||
"giftool",
|
||
Args["img", Image | None],
|
||
Option("--ss", Args["start_point", str]),
|
||
Option("--frames:v", Args["frame_count", int]),
|
||
Option("-t", Args["length", str]),
|
||
Option("-to", Args["end_point", str]),
|
||
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
|
||
))
|
||
|
||
|
||
@cmd_giftool.handle()
|
||
async def _(
|
||
image: PIL_Image,
|
||
start_point: str | None = None,
|
||
frame_count: int | None = None,
|
||
length: str | None = None,
|
||
speed_factor: float = 1.0,
|
||
end_point: str | None = None,
|
||
):
|
||
ss: None | float = None
|
||
if start_point:
|
||
ss = parse_timestamp(start_point)
|
||
if ss is None:
|
||
raise BotExceptionMessage("--ss 的格式不满足条件")
|
||
|
||
t: None | float = None
|
||
if length:
|
||
t = parse_timestamp(length)
|
||
if t is None:
|
||
raise BotExceptionMessage("-t 的格式不满足条件")
|
||
|
||
to: None | float = None
|
||
if end_point:
|
||
to = parse_timestamp(end_point)
|
||
if to is None:
|
||
raise BotExceptionMessage("-to 的格式不满足条件")
|
||
|
||
if to is not None and ss is not None and to <= ss:
|
||
raise BotExceptionMessage("错误:出点时间小于入点")
|
||
if frame_count is not None and frame_count <= 0:
|
||
raise BotExceptionMessage("错误:帧数量应该大于 0")
|
||
if speed_factor <= 0:
|
||
raise BotExceptionMessage("错误:--speed 必须大于 0")
|
||
|
||
if not getattr(image, "is_animated", False):
|
||
raise BotExceptionMessage("错误:输入的不是动图(GIF)")
|
||
|
||
frames = []
|
||
durations = []
|
||
total_duration = 0.0
|
||
|
||
try:
|
||
for i in range(getattr(image, "n_frames")):
|
||
image.seek(i)
|
||
frames.append(image.copy())
|
||
duration = image.info.get("duration", 100) # 单位:毫秒
|
||
durations.append(duration)
|
||
total_duration += duration / 1000.0 # 转为秒
|
||
except EOFError:
|
||
pass
|
||
|
||
if not frames:
|
||
raise BotExceptionMessage("错误:读取 GIF 帧失败")
|
||
|
||
def time_to_frame_index(target_time: float) -> int:
|
||
if target_time <= 0:
|
||
return 0
|
||
cum = 0.0
|
||
for idx, dur in enumerate(durations):
|
||
cum += dur / 1000.0
|
||
if cum >= target_time:
|
||
return min(idx, len(frames) - 1)
|
||
return len(frames) - 1
|
||
start_frame = 0
|
||
end_frame = len(frames) - 1
|
||
if ss is not None:
|
||
start_frame = time_to_frame_index(ss)
|
||
if to is not None:
|
||
end_frame = time_to_frame_index(to)
|
||
if end_frame < start_frame:
|
||
end_frame = start_frame
|
||
elif t is not None:
|
||
end_time = (ss or 0.0) + t
|
||
end_frame = time_to_frame_index(end_time)
|
||
if end_frame < start_frame:
|
||
end_frame = start_frame
|
||
|
||
start_frame = max(0, start_frame)
|
||
end_frame = min(len(frames) - 1, end_frame)
|
||
selected_frames = frames[start_frame : end_frame + 1]
|
||
selected_durations = durations[start_frame : end_frame + 1]
|
||
|
||
if frame_count is not None and frame_count > 0:
|
||
if frame_count >= len(selected_frames):
|
||
pass
|
||
else:
|
||
step = len(selected_frames) / frame_count
|
||
sampled_frames = []
|
||
sampled_durations = []
|
||
for i in range(frame_count):
|
||
idx = int(i * step)
|
||
sampled_frames.append(selected_frames[idx])
|
||
sampled_durations.append(
|
||
sum(selected_durations) // len(selected_durations)
|
||
)
|
||
selected_frames = sampled_frames
|
||
selected_durations = sampled_durations
|
||
|
||
output_img = BytesIO()
|
||
|
||
adjusted_durations = [
|
||
dur / speed_factor for dur in selected_durations
|
||
]
|
||
|
||
rframes = []
|
||
rdur = []
|
||
|
||
acc_mod_20 = 0
|
||
|
||
for i in range(len(selected_frames)):
|
||
fr = selected_frames[i]
|
||
du: float = adjusted_durations[i]
|
||
|
||
if du >= 20:
|
||
rframes.append(fr)
|
||
rdur.append(int(du))
|
||
acc_mod_20 = 0
|
||
else:
|
||
if acc_mod_20 == 0:
|
||
rframes.append(fr)
|
||
rdur.append(20)
|
||
acc_mod_20 += du
|
||
else:
|
||
acc_mod_20 += du
|
||
if acc_mod_20 >= 20:
|
||
acc_mod_20 = 0
|
||
|
||
if len(rframes) == 1 and len(selected_frames) > 1:
|
||
rframes.append(selected_frames[max(2, len(selected_frames) // 2)])
|
||
rdur.append(20)
|
||
|
||
transparency_flag = False
|
||
for f in rframes:
|
||
if f.mode == "RGBA":
|
||
if any(pix < 255 for pix in f.getchannel("A").getdata()):
|
||
transparency_flag = True
|
||
break
|
||
elif f.mode == "P" and "transparency" in f.info:
|
||
transparency_flag = True
|
||
break
|
||
|
||
tf = {}
|
||
if transparency_flag:
|
||
tf['transparency'] = 0
|
||
|
||
if rframes:
|
||
rframes[0].save(
|
||
output_img,
|
||
format="GIF",
|
||
save_all=True,
|
||
append_images=rframes[1:],
|
||
duration=rdur,
|
||
loop=0,
|
||
optimize=False,
|
||
disposal=2,
|
||
**tf,
|
||
)
|
||
else:
|
||
raise BotExceptionMessage("错误:没有可输出的帧")
|
||
output_img.seek(0)
|
||
|
||
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())
|