From 2f22f11d57a376fff3a09a4eea7b5a27e8276de3 Mon Sep 17 00:00:00 2001 From: passthem Date: Sat, 15 Nov 2025 20:16:42 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=20Gif=20=E5=9B=BE=E6=B8=B2?= =?UTF-8?q?=E6=9F=93=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/common/nb/extract_image.py | 67 +++++++--- konabot/plugins/image_process/__init__.py | 148 +++++++++++----------- konabot/plugins/memepack/__init__.py | 51 ++------ konabot/plugins/ytpgif/__init__.py | 22 +--- poetry.lock | 41 +++++- pyproject.toml | 1 + 6 files changed, 183 insertions(+), 147 deletions(-) diff --git a/konabot/common/nb/extract_image.py b/konabot/common/nb/extract_image.py index 4b4eb50..df54699 100644 --- a/konabot/common/nb/extract_image.py +++ b/konabot/common/nb/extract_image.py @@ -1,4 +1,5 @@ from io import BytesIO +from pathlib import Path from typing import Annotated import httpx @@ -19,15 +20,21 @@ from PIL import UnidentifiedImageError from pydantic import BaseModel from returns.result import Failure, Result, Success -from konabot.common.path import ASSETS_PATH - discordConfig = nonebot.get_plugin_config(DiscordConfig) class ExtractImageConfig(BaseModel): module_extract_image_no_download: bool = False - "要不要算了,不下载了,直接爆炸算了,适用于一些比较奇怪的网络环境,无法从协议端下载文件" + """ + 要不要算了,不下载了,直接爆炸算了, + 适用于一些比较奇怪的网络环境,无法从协议端下载文件 + """ + + module_extract_image_target: str = './assets/img/other/boom.jpg' + """ + 使用哪个图片呢 + """ module_config = nonebot.get_plugin_config(ExtractImageConfig) @@ -37,7 +44,7 @@ async def download_image_bytes(url: str, proxy: str | None = None) -> Result[byt # if "/matcha/cache/" in url: # url = url.replace('127.0.0.1', '10.126.126.101') if module_config.module_extract_image_no_download: - return Success((ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes()) + return Success(Path(module_config.module_extract_image_target).read_bytes()) logger.debug(f"开始从 {url} 下载图片") async with httpx.AsyncClient(proxy=proxy) as c: try: @@ -70,15 +77,22 @@ def bytes_to_pil(raw_data: bytes | BytesIO) -> Result[PIL.Image.Image, str]: return Failure("图像无法读取,可能是网络存在问题orz") -async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]: +async def unimsg_img_to_bytes(image: Image) -> Result[bytes, str]: if image.url is not None: raw_result = await download_image_bytes(image.url) elif image.raw is not None: - raw_result = Success(image.raw) + if isinstance(image.raw, bytes): + raw_result = Success(image.raw) + else: + raw_result = Success(image.raw.getvalue()) else: return Failure("由于一些内部问题,下载图片失败了orz") - return raw_result.bind(bytes_to_pil) + return raw_result + + +async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]: + return (await unimsg_img_to_bytes(image)).bind(bytes_to_pil) async def extract_image_from_qq_message( @@ -86,7 +100,7 @@ async def extract_image_from_qq_message( evt: OnebotV11MessageEvent, bot: OnebotV11Bot, allow_reply: bool = True, -) -> Result[PIL.Image.Image, str]: +) -> Result[bytes, str]: if allow_reply and (reply := evt.reply) is not None: return await extract_image_from_qq_message( reply.message, @@ -118,18 +132,17 @@ async def extract_image_from_qq_message( url = seg.data.get("url") if url is None: return Failure("无法下载图片,可能有一些网络问题") - data = await download_image_bytes(url) - return data.bind(bytes_to_pil) + return await download_image_bytes(url) return Failure("请在消息中包含图片,或者引用一个含有图片的消息") -async def extract_image_from_message( +async def extract_image_data_from_message( msg: Message, evt: Event, bot: Bot, allow_reply: bool = True, -) -> Result[PIL.Image.Image, str]: +) -> Result[bytes, str]: if ( isinstance(bot, OnebotV11Bot) and isinstance(msg, OnebotV11Message) @@ -145,18 +158,18 @@ async def extract_image_from_message( if "image/" not in a.content_type: continue url = a.proxy_url - return (await download_image_bytes(url, discordConfig.discord_proxy)).bind(bytes_to_pil) + return await download_image_bytes(url, discordConfig.discord_proxy) for seg in UniMessage.of(msg, bot): logger.info(seg) if isinstance(seg, Image): - return await unimsg_img_to_pil(seg) + return await unimsg_img_to_bytes(seg) elif isinstance(seg, Reply) and allow_reply: msg2 = seg.msg logger.debug(f"深入搜索引用的消息:{msg2}") if msg2 is None or isinstance(msg2, str): continue - return await extract_image_from_message(msg2, evt, bot, False) + return await extract_image_data_from_message(msg2, evt, bot, False) elif isinstance(seg, RefNode) and allow_reply: if isinstance(bot, DiscordBot): return Failure("暂时不支持在 Discord 中通过引用的方式获取图片") @@ -165,12 +178,12 @@ async def extract_image_from_message( return Failure("请在消息中包含图片,或者引用一个含有图片的消息") -async def _ext_img( +async def _ext_img_data( evt: Event, bot: Bot, matcher: Matcher, -) -> PIL.Image.Image | None: - match await extract_image_from_message(evt.get_message(), evt, bot): +) -> bytes | None: + match await extract_image_data_from_message(evt.get_message(), evt, bot): case Success(img): return img case Failure(err): @@ -180,4 +193,20 @@ async def _ext_img( assert False -PIL_Image = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)] +async def _ext_img( + evt: Event, + bot: Bot, + matcher: Matcher, +) -> PIL.Image.Image | None: + r = await _ext_img_data(evt, bot, matcher) + if r: + match bytes_to_pil(r): + case Success(img): + return img + case Failure(msg): + await matcher.send(await UniMessage.text(msg).export()) + return None + + +DepImageBytes = Annotated[bytes, nonebot.params.Depends(_ext_img_data)] +DepPILImage = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)] diff --git a/konabot/plugins/image_process/__init__.py b/konabot/plugins/image_process/__init__.py index 04d53d9..e57dbe6 100644 --- a/konabot/plugins/image_process/__init__.py +++ b/konabot/plugins/image_process/__init__.py @@ -1,24 +1,32 @@ import re from io import BytesIO +from typing import Any +import PIL import PIL.Image +import cv2 +import imageio.v3 as iio from nonebot import on_message from nonebot.adapters import Bot from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna +import numpy from konabot.common.nb.exc import BotExceptionMessage -from konabot.common.nb.extract_image import PIL_Image +from konabot.common.nb.extract_image import DepImageBytes, DepPILImage from konabot.common.nb.match_keyword import match_keyword from konabot.common.nb.reply_image import reply_image +# 保持不变 cmd_black_white = on_message(rule=match_keyword("黑白")) @cmd_black_white.handle() -async def _(img: PIL_Image, bot: Bot): +async def _(img: DepPILImage, bot: Bot): + # 保持不变 await reply_image(cmd_black_white, bot, img.convert("LA")) +# 保持不变 def parse_timestamp(tx: str) -> float | None: res = 0.0 for component in tx.split(":"): @@ -29,6 +37,7 @@ def parse_timestamp(tx: str) -> float | None: return res +# 保持不变 cmd_giftool = on_alconna( Alconna( "giftool", @@ -44,7 +53,7 @@ cmd_giftool = on_alconna( @cmd_giftool.handle() async def _( - image: PIL_Image, + image: DepImageBytes, start_point: str | None = None, frame_count: int | None = None, length: str | None = None, @@ -79,28 +88,24 @@ async def _( is_rev = speed_factor < 0 speed_factor = abs(speed_factor) - if not getattr(image, "is_animated", False): - raise BotExceptionMessage("错误:输入的不是动图(GIF)") - - ## - # 从这里开始,采样整个 GIF 图 - frames: list[PIL.Image.Image] = [] - durations: list[float] = [] try: - for i in range(getattr(image, "n_frames")): - image.seek(i) - frames.append(image.copy()) - duration = image.info.get("duration", 100) / 1000 - durations.append(duration) - except EOFError: - pass - if not frames: + reader = iio.imread(BytesIO(image), extension=".gif", index=None) + np_frames = list(reader) + + _pil = PIL.Image.open(BytesIO(image)) + durations: list[float] = [] + while True: + try: + duration = _pil.info.get('duration', 20) + durations.append(max(duration, 20) / 1000) + _pil.seek(_pil.tell() + 1) + except EOFError: + break + except Exception: raise BotExceptionMessage("错误:读取 GIF 帧失败") - # 采样结束 ## # 根据开始、结束时间或者帧数量来裁取 GIF 图 - begin_time = ss or 0 end_time = sum(durations) end_time = min(begin_time + (t or end_time), to or end_time, end_time) @@ -108,94 +113,95 @@ async def _( accumulated = 0.0 status = 0 - sel_frames: list[PIL.Image.Image] = [] + sel_np_frames: list[numpy.ndarray[Any, Any]] = [] sel_durations: list[float] = [] - for i in range(len(frames)): - frame = frames[i] + for i in range(len(np_frames)): + frame = np_frames[i] duration = durations[i] if status == 0: if accumulated + duration > begin_time: status = 1 - sel_frames.append(frame) - sel_durations.append(accumulated + duration - begin_time) + sel_np_frames.append(frame) + sel_durations.append(accumulated + duration - begin_time) + elif accumulated + duration == begin_time: + status = 1 elif status == 1: - if accumulated + duration > end_time: - sel_frames.append(frame) - sel_durations.append(end_time - accumulated) + if accumulated + duration >= end_time: + included_duration = end_time - accumulated + if included_duration > 0: + sel_np_frames.append(frame) + sel_durations.append(included_duration) break - sel_frames.append(frame) + sel_np_frames.append(frame) sel_durations.append(duration) accumulated += duration - ## - # 加速! - sel_durations = [dur / speed_factor * 1000 for dur in durations] + if not sel_np_frames: + raise BotExceptionMessage("错误:裁取 GIF 帧失败(可能时间设置错误)") - rframes = [] - rdur = [] + rdur_ms_unprocessed = [dur / speed_factor * 1000 for dur in sel_durations] + rframes: list[numpy.ndarray] = [] + rdur_ms: list[int] = [] acc_mod_20 = 0 - for i in range(len(sel_frames)): - fr = sel_frames[i] - du = round(sel_durations[i]) + for i in range(len(sel_np_frames)): + fr = sel_np_frames[i] + du = rdur_ms_unprocessed[i] if du >= 20: rframes.append(fr) - rdur.append(int(du)) + rdur_ms.append(int(round(du))) acc_mod_20 = 0 else: if acc_mod_20 == 0: rframes.append(fr) - rdur.append(20) + rdur_ms.append(20) acc_mod_20 += du else: acc_mod_20 += du if acc_mod_20 >= 20: acc_mod_20 = 0 - if len(rframes) == 1 and len(sel_frames) > 1: - rframes.append(sel_frames[max(2, len(sel_frames) // 2)]) - rdur.append(20) - - ## - # 收尾:看看透明度这块 - transparency_flag = False - for f in rframes: - if f.mode == "RGBA": - if any(pix < 255 for pix in f.getchannel("A").getdata()): - transparency_flag = True - break - elif f.mode == "P" and "transparency" in f.info: - transparency_flag = True - break - - tf = {} - if transparency_flag: - tf["transparency"] = 0 + if len(rframes) == 1 and len(sel_np_frames) > 1: + middle_index = max(2, len(sel_np_frames) // 2) + rframes.append(sel_np_frames[middle_index]) + rdur_ms.append(20) if is_rev: rframes = rframes[::-1] - rdur = rdur[::-1] + rdur_ms = rdur_ms[::-1] output_img = BytesIO() + if rframes: - rframes[0].save( - output_img, - format="GIF", - save_all=True, - append_images=rframes[1:], - duration=rdur, - loop=0, - optimize=False, - disposal=2, - **tf, - ) + do_transparent = any((f.shape[2] == 4 for f in rframes)) + if do_transparent: + rframes = [( + f + if f.shape[2] == 4 + else cv2.cvtColor(f, cv2.COLOR_RGB2RGBA) + ) for f in rframes] + kwargs = { "transparency": 0, "disposal": 2, "mode": "RGBA" } + else: + kwargs = {} + try: + iio.imwrite( + output_img, + rframes, + extension=".gif", + duration=rdur_ms, + loop=0, + optimize=True, + plugin="pillow", + **kwargs, + ) + except Exception as e: + raise BotExceptionMessage(f"错误:写入 GIF 失败: {e}") else: raise BotExceptionMessage("错误:没有可输出的帧") output_img.seek(0) - await cmd_giftool.send(await UniMessage().image(raw=output_img).export()) diff --git a/konabot/plugins/memepack/__init__.py b/konabot/plugins/memepack/__init__.py index c6f2905..fa29093 100644 --- a/konabot/plugins/memepack/__init__.py +++ b/konabot/plugins/memepack/__init__.py @@ -17,7 +17,7 @@ from nonebot_plugin_alconna import ( ) from playwright.async_api import ConsoleMessage, Page -from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message +from konabot.common.nb.extract_image import DepPILImage from konabot.common.web_render import konaweb from konabot.common.web_render.core import WebRenderer from konabot.common.web_render.host_images import host_tempdir @@ -36,9 +36,6 @@ from konabot.plugins.memepack.drawing.saying import ( ) from konabot.plugins.memepack.drawing.watermark import draw_doubao_watermark -from nonebot.adapters import Bot, Event - -from returns.result import Success, Failure geimao = on_alconna( Alconna( @@ -194,7 +191,7 @@ cao_display_cmd = on_message() @cao_display_cmd.handle() -async def _(msg: UniMsg, evt: Event, bot: Bot): +async def _(msg: UniMsg, img: DepPILImage): flag = False for text in cast(Iterable[Text], msg.get(Text)): if text.text.strip() == "小槽展示": @@ -205,20 +202,10 @@ async def _(msg: UniMsg, evt: Event, bot: Bot): return if not flag: return - match await extract_image_from_message(evt.get_message(), evt, bot): - case Success(img): - img_handled = await draw_cao_display(img) - img_bytes = BytesIO() - img_handled.save(img_bytes, format="PNG") - await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export()) - case Failure(err): - await cao_display_cmd.send( - await UniMessage() - .at(user_id=evt.get_user_id()) - .text(" ") - .text(err) - .export() - ) + img_handled = await draw_cao_display(img) + img_bytes = BytesIO() + img_handled.save(img_bytes, format="PNG") + await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export()) snaur_display_cmd = on_alconna( @@ -235,7 +222,7 @@ snaur_display_cmd = on_alconna( @snaur_display_cmd.handle() async def _( - img: PIL_Image, + img: DepPILImage, whiteness: float = 0.0, black_level: float = 0.2, opacity: float = 0.8, @@ -254,7 +241,7 @@ async def _( anan_display_cmd = on_message() @anan_display_cmd.handle() -async def _(msg: UniMsg, evt: Event, bot: Bot): +async def _(msg: UniMsg, img: DepPILImage): flag = False for text in cast(Iterable[Text], msg.get(Text)): stripped = text.text.strip() @@ -267,20 +254,10 @@ async def _(msg: UniMsg, evt: Event, bot: Bot): if not flag: return - match await extract_image_from_message(evt.get_message(), evt, bot): - case Success(img): - img_handled = await draw_anan_display(img) - img_bytes = BytesIO() - img_handled.save(img_bytes, format="PNG") - await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export()) - case Failure(err): - await anan_display_cmd.send( - await UniMessage() - .at(user_id=evt.get_user_id()) - .text(" ") - .text(err) - .export() - ) + img_handled = await draw_anan_display(img) + img_bytes = BytesIO() + img_handled.save(img_bytes, format="PNG") + await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export()) kiosay = on_alconna( @@ -316,7 +293,7 @@ quote_cmd = on_alconna(Alconna( ), aliases={"quote"}) @quote_cmd.handle() -async def _(quote: str, author: str, img: PIL_Image): +async def _(quote: str, author: str, img: DepPILImage): async with host_tempdir() as tempdir: img_path = tempdir.path / "image.png" img_url = tempdir.url_of(img_path) @@ -351,7 +328,7 @@ doubao_cmd = on_alconna(Alconna( @doubao_cmd.handle() -async def _(img: PIL_Image): +async def _(img: DepPILImage): result = await draw_doubao_watermark(img) result_bytes = BytesIO() result.save(result_bytes, format="PNG") diff --git a/konabot/plugins/ytpgif/__init__.py b/konabot/plugins/ytpgif/__init__.py index 19b2bca..3c34e74 100644 --- a/konabot/plugins/ytpgif/__init__.py +++ b/konabot/plugins/ytpgif/__init__.py @@ -1,14 +1,11 @@ from io import BytesIO from loguru import logger -from nonebot.adapters import Bot as BaseBot -from nonebot.adapters import Event as BaseEvent from nonebot.plugin import PluginMetadata from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna from PIL import Image -from returns.result import Failure, Success -from konabot.common.nb.extract_image import extract_image_from_message +from konabot.common.nb.extract_image import DepPILImage __plugin_meta__ = PluginMetadata( name="ytpgif", @@ -63,7 +60,7 @@ def resize_frame(frame: Image.Image) -> Image.Image: @ytpgif_cmd.handle() -async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0): +async def handle_ytpgif(src_img: DepPILImage, speed: float = 1.0): # === 校验 speed 范围 === if not (MIN_SPEED <= speed <= MAX_SPEED): await ytpgif_cmd.send( @@ -71,19 +68,6 @@ async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0): ) return - match await extract_image_from_message(event.get_message(), event, bot): - case Success(img): - src_img = img - - case Failure(msg): - await ytpgif_cmd.send( - await UniMessage.text(msg).export() - ) - return - - case _: - return - try: try: n_frames = getattr(src_img, "n_frames", 1) @@ -217,4 +201,4 @@ async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0): print(f"[YTPGIF] 处理失败: {e}") await ytpgif_cmd.send( await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export() - ) \ No newline at end of file + ) diff --git a/poetry.lock b/poetry.lock index 2ac05ad..640bd0d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1469,6 +1469,45 @@ type = "legacy" url = "https://pypi.tuna.tsinghua.edu.cn/simple" reference = "mirrors" +[[package]] +name = "imageio" +version = "2.37.2" +description = "Read and write images and video across all major formats. Supports scientific and volumetric data." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "imageio-2.37.2-py3-none-any.whl", hash = "sha256:ad9adfb20335d718c03de457358ed69f141021a333c40a53e57273d8a5bd0b9b"}, + {file = "imageio-2.37.2.tar.gz", hash = "sha256:0212ef2727ac9caa5ca4b2c75ae89454312f440a756fcfc8ef1993e718f50f8a"}, +] + +[package.dependencies] +numpy = "*" +pillow = ">=8.3.2" + +[package.extras] +all-plugins = ["astropy", "av", "fsspec[http]", "imageio-ffmpeg", "numpy (>2)", "pillow-heif", "psutil", "rawpy", "tifffile"] +all-plugins-pypy = ["fsspec[http]", "imageio-ffmpeg", "pillow-heif", "psutil", "tifffile"] +dev = ["black", "flake8", "fsspec[github]", "pytest", "pytest-cov"] +docs = ["numpydoc", "pydata-sphinx-theme", "sphinx (<6)"] +ffmpeg = ["imageio-ffmpeg", "psutil"] +fits = ["astropy"] +freeimage = ["fsspec[http]"] +full = ["astropy", "av", "black", "flake8", "fsspec[github,http]", "imageio-ffmpeg", "numpy (>2)", "numpydoc", "pillow-heif", "psutil", "pydata-sphinx-theme", "pytest", "pytest-cov", "rawpy", "sphinx (<6)", "tifffile"] +gdal = ["gdal"] +itk = ["itk"] +linting = ["black", "flake8"] +pillow-heif = ["pillow-heif"] +pyav = ["av"] +rawpy = ["numpy (>2)", "rawpy"] +test = ["fsspec[github]", "pytest", "pytest-cov"] +tifffile = ["tifffile"] + +[package.source] +type = "legacy" +url = "https://pypi.tuna.tsinghua.edu.cn/simple" +reference = "mirrors" + [[package]] name = "imagetext-py" version = "2.2.0" @@ -4489,4 +4528,4 @@ reference = "mirrors" [metadata] lock-version = "2.1" python-versions = ">=3.12,<4.0" -content-hash = "af9fc535dd8c4e33c2cac481839ba07bcb8014b9a9cbd6bd1b6f5942640ecefe" +content-hash = "478bd59d60d3b73397241c6ed552434486bd26d56cc3805ef34d1cfa1be7006e" diff --git a/pyproject.toml b/pyproject.toml index 9d76256..0c11e66 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "opencc (>=1.1.9,<2.0.0)", "playwright (>=1.55.0,<2.0.0)", "openai (>=2.7.1,<3.0.0)", + "imageio (>=2.37.2,<3.0.0)", ] [tool.poetry]