import re from io import BytesIO from typing import Any import PIL import PIL.Image import cv2 import imageio.v3 as iio from nonebot import on_message from nonebot.adapters import Bot from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna import numpy from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.extract_image import DepImageBytes, DepPILImage from konabot.common.nb.match_keyword import match_keyword from konabot.common.nb.reply_image import reply_image # 保持不变 cmd_black_white = on_message(rule=match_keyword("黑白")) @cmd_black_white.handle() async def _(img: DepPILImage, bot: Bot): # 保持不变 await reply_image(cmd_black_white, bot, img.convert("LA")) # 保持不变 def parse_timestamp(tx: str) -> float | None: res = 0.0 for component in tx.split(":"): res *= 60 if not re.match(r"^\d+(\.\d+)?$", component): return res += float(component) return res # 保持不变 cmd_giftool = on_alconna( Alconna( "giftool", Args["img", Image | None], Option("--ss", Args["start_point", str]), Option("--frames:v", Args["frame_count", int]), Option("-t", Args["length", str]), Option("-to", Args["end_point", str]), Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]), ) ) @cmd_giftool.handle() async def _( image: DepImageBytes, start_point: str | None = None, frame_count: int | None = None, length: str | None = None, speed_factor: float = 1.0, end_point: str | None = None, ): ss: None | float = None if start_point: ss = parse_timestamp(start_point) if ss is None: raise BotExceptionMessage("--ss 的格式不满足条件") t: None | float = None if length: t = parse_timestamp(length) if t is None: raise BotExceptionMessage("-t 的格式不满足条件") to: None | float = None if end_point: to = parse_timestamp(end_point) if to is None: raise BotExceptionMessage("-to 的格式不满足条件") if to is not None and ss is not None and to <= ss: raise BotExceptionMessage("错误:出点时间小于入点") if frame_count is not None and frame_count <= 0: raise BotExceptionMessage("错误:帧数量应该大于 0") if speed_factor == 0: raise BotExceptionMessage("错误:速度不能为 0") is_rev = speed_factor < 0 speed_factor = abs(speed_factor) try: reader = iio.imread(BytesIO(image), extension=".gif", index=None) np_frames = list(reader) _pil = PIL.Image.open(BytesIO(image)) durations: list[float] = [] while True: try: duration = _pil.info.get('duration', 20) durations.append(max(duration, 20) / 1000) _pil.seek(_pil.tell() + 1) except EOFError: break except Exception: raise BotExceptionMessage("错误:读取 GIF 帧失败") ## # 根据开始、结束时间或者帧数量来裁取 GIF 图 begin_time = ss or 0 end_time = sum(durations) end_time = min(begin_time + (t or end_time), to or end_time, end_time) accumulated = 0.0 status = 0 sel_np_frames: list[numpy.ndarray[Any, Any]] = [] sel_durations: list[float] = [] for i in range(len(np_frames)): frame = np_frames[i] duration = durations[i] if status == 0: if accumulated + duration > begin_time: status = 1 sel_np_frames.append(frame) sel_durations.append(accumulated + duration - begin_time) elif accumulated + duration == begin_time: status = 1 elif status == 1: if accumulated + duration >= end_time: included_duration = end_time - accumulated if included_duration > 0: sel_np_frames.append(frame) sel_durations.append(included_duration) break sel_np_frames.append(frame) sel_durations.append(duration) accumulated += duration if not sel_np_frames: raise BotExceptionMessage("错误:裁取 GIF 帧失败(可能时间设置错误)") rdur_ms_unprocessed = [dur / speed_factor * 1000 for dur in sel_durations] rframes: list[numpy.ndarray] = [] rdur_ms: list[int] = [] acc_mod_20 = 0 for i in range(len(sel_np_frames)): fr = sel_np_frames[i] du = rdur_ms_unprocessed[i] if du >= 20: rframes.append(fr) rdur_ms.append(int(round(du))) acc_mod_20 = 0 else: if acc_mod_20 == 0: rframes.append(fr) rdur_ms.append(20) acc_mod_20 += du else: acc_mod_20 += du if acc_mod_20 >= 20: acc_mod_20 = 0 if len(rframes) == 1 and len(sel_np_frames) > 1: middle_index = max(2, len(sel_np_frames) // 2) rframes.append(sel_np_frames[middle_index]) rdur_ms.append(20) if is_rev: rframes = rframes[::-1] rdur_ms = rdur_ms[::-1] output_img = BytesIO() if rframes: do_transparent = any((f.shape[2] == 4 for f in rframes)) if do_transparent: rframes = [( f if f.shape[2] == 4 else cv2.cvtColor(f, cv2.COLOR_RGB2RGBA) ) for f in rframes] kwargs = { "transparency": 0, "disposal": 2, "mode": "RGBA" } else: kwargs = {} try: iio.imwrite( output_img, rframes, extension=".gif", duration=rdur_ms, loop=0, optimize=True, plugin="pillow", **kwargs, ) except Exception as e: raise BotExceptionMessage(f"错误:写入 GIF 失败: {e}") else: raise BotExceptionMessage("错误:没有可输出的帧") output_img.seek(0) await cmd_giftool.send(await UniMessage().image(raw=output_img).export())