From 54fae889144a837507c5d6792e5e161b6fdfd35a Mon Sep 17 00:00:00 2001 From: MixBadGun <1059129006@qq.com> Date: Tue, 9 Dec 2025 00:02:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BE=85=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/common/nb/extract_image.py | 3 + konabot/plugins/fx_process/__init__.py | 94 +++++++++--- konabot/plugins/fx_process/fx_handle.py | 24 ++++ konabot/plugins/fx_process/fx_manager.py | 5 +- konabot/plugins/fx_process/image_storage.py | 150 ++++++++++++++++++++ konabot/plugins/fx_process/types.py | 20 +++ 6 files changed, 274 insertions(+), 22 deletions(-) create mode 100644 konabot/plugins/fx_process/image_storage.py create mode 100644 konabot/plugins/fx_process/types.py diff --git a/konabot/common/nb/extract_image.py b/konabot/common/nb/extract_image.py index df54699..14d0122 100644 --- a/konabot/common/nb/extract_image.py +++ b/konabot/common/nb/extract_image.py @@ -208,5 +208,8 @@ async def _ext_img( return None + DepImageBytes = Annotated[bytes, nonebot.params.Depends(_ext_img_data)] DepPILImage = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)] + +DepImageBytesOrNone = Annotated[bytes | None, nonebot.params.Depends(_ext_img_data)] diff --git a/konabot/plugins/fx_process/__init__.py b/konabot/plugins/fx_process/__init__.py index 2f80113..19dd613 100644 --- a/konabot/plugins/fx_process/__init__.py +++ b/konabot/plugins/fx_process/__init__.py @@ -4,9 +4,11 @@ from io import BytesIO from inspect import signature -from konabot.common.nb.extract_image import DepImageBytes, DepPILImage +from konabot.common.longtask import DepLongTaskTarget +from konabot.common.nb.extract_image import DepImageBytesOrNone from nonebot.adapters import Event as BaseEvent from nonebot import on_message, logger +from returns.result import Failure, Result, Success from nonebot_plugin_alconna import ( UniMessage, @@ -17,13 +19,9 @@ from konabot.plugins.fx_process.fx_manager import ImageFilterManager from PIL import Image, ImageSequence -@dataclass -class FilterItem: - name: str - filter: callable - args: list +from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo -def prase_input_args(input_str: str) -> list[FilterItem]: +def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[FilterItem]: # 按分号或换行符分割参数 args = [] for part in input_str.replace('\n', ';').split(';'): @@ -42,7 +40,7 @@ def prase_input_args(input_str: str) -> list[FilterItem]: # 从 args 提取参数,并转换为适当类型 func_args = [] for i in range(0, min(len(input_filter_args), max_params)): - # 尝试将参数转换为函数签名中对应的类型 + # 尝试将参数转换为函数签名中对应的类型,并检测是不是 Image 类型,如果有则表示多个图像输入 param = list(sig.parameters.values())[i + 1] param_type = param.annotation arg_value = input_filter_args[i] @@ -51,6 +49,14 @@ def prase_input_args(input_str: str) -> list[FilterItem]: converted_value = float(arg_value) elif param_type is int: converted_value = int(arg_value) + elif param_type is bool: + converted_value = arg_value.lower() in ['true', '1', 'yes', '是', '开'] + elif param_type is Image.Image: + converted_value = ImageRequireSignal() + elif param_type is SenderInfo: + converted_value = sender_info + elif param_type is list[Image.Image]: + converted_value = ImagesListRequireSignal() else: converted_value = arg_value except Exception: @@ -59,14 +65,47 @@ def prase_input_args(input_str: str) -> list[FilterItem]: args.append(FilterItem(name=filter_name,filter=filter_func, args=func_args)) return args -def apply_filters_to_image(img: Image, filters: list[FilterItem]) -> Image: +def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image: for filter_item in filters: filter_func = filter_item.filter func_args = filter_item.args - img = filter_func(img, *func_args) - return img + # 检测参数中是否有 ImageRequireSignal,如果有则传入对应数量的图像列表 + if any(isinstance(arg, ImageRequireSignal) for arg in func_args): + # 替换 ImageRequireSignal 为 images 对应索引的图像 + actual_args = [] + img_signal_count = 1 # 从 images[1] 开始取图像 + for arg in func_args: + if isinstance(arg, ImageRequireSignal): + actual_args.append(images[img_signal_count]) + img_signal_count += 1 + else: + actual_args.append(arg) + func_args = actual_args + # 检测参数中是否有 ImagesListRequireSignal,如果有则传入整个图像列表 + if any(isinstance(arg, ImagesListRequireSignal) for arg in func_args): + actual_args = [] + for arg in func_args: + if isinstance(arg, ImagesListRequireSignal): + actual_args.append(images) + else: + actual_args.append(arg) + func_args = actual_args + + images[0] = filter_func(images[0], *func_args) + return images[0] + +async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem]) -> BytesIO: + # 如果第一项是“加载图像”参数,那么就加载图像 + if filters and filters[0].name == "加载图像": + load_filter = filters.pop(0) + # 加载全部路径 + for path in load_filter.args: + img = Image.open(path) + images.append(img) + + if len(images) <= 0: + raise ValueError("没有提供任何图像进行处理") -async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem]) -> BytesIO: # 检测是否需要将静态图视作动图处理 frozen_to_move = any( filter_item.name == "动图" @@ -83,7 +122,7 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem]) static_fps = 10 break # 如果 image 是动图,则逐帧处理 - img = Image.open(BytesIO(image_bytes)) + img = images[0] logger.debug("开始图像处理") output = BytesIO() if getattr(img, "is_animated", False) or frozen_to_move: @@ -101,13 +140,12 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem]) all_frames.append(img.copy()) img.info['duration'] = int(1000 / static_fps) - async def process_single_frame(frame: Image.Image, frame_idx: int) -> Image.Image: + async def process_single_frame(frame: list[Image.Image], frame_idx: int) -> Image.Image: """处理单帧的异步函数""" logger.debug(f"开始处理帧 {frame_idx}") - result = await asynkio.to_thread(apply_filters_to_image, frame, filters) + result = await asynkio.to_thread(handle_filters_to_image, frame, images, filters) logger.debug(f"完成处理帧 {frame_idx}") - return result - + return result[0] # 并发处理所有帧 tasks = [] for i, frame in enumerate(all_frames): @@ -135,7 +173,7 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem]) ) logger.debug("Animated image saved") else: - img = apply_filters_to_image(img, filters) + img = handle_filters_to_image(images=images, filters=filters) img.save(output, format="PNG") logger.debug("Image processing completed") output.seek(0) @@ -150,14 +188,28 @@ def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool: fx_on = on_message(rule=is_fx_mentioned) @fx_on.handle() -async def _(msg: UniMsg, event: BaseEvent, img: DepImageBytes): +async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone = None): + preload_imgs = [] + # 提取图像 + try: + if image_data is not None: + preload_imgs.append(Image.open(BytesIO(image_data))) + logger.debug("Image extracted for FX processing.") + except Exception: + logger.info("No image found in message for FX processing.") args = msg.extract_plain_text().split() if len(args) < 2: return - filters = prase_input_args(msg.extract_plain_text()[2:]) + + sender_info = SenderInfo( + group_id=target.channel_id, + qq_id=target.target_id + ) + + filters = prase_input_args(msg.extract_plain_text()[2:], sender_info=sender_info) if not filters: return - output = await apply_filters_to_bytes(img, filters) + output = await apply_filters_to_images(preload_imgs, filters) logger.debug("FX processing completed, sending result.") await fx_on.send(await UniMessage().image(raw=output).export()) \ No newline at end of file diff --git a/konabot/plugins/fx_process/fx_handle.py b/konabot/plugins/fx_process/fx_handle.py index d9a40b9..dd421ba 100644 --- a/konabot/plugins/fx_process/fx_handle.py +++ b/konabot/plugins/fx_process/fx_handle.py @@ -11,6 +11,9 @@ import math from konabot.plugins.fx_process.gradient import GradientGenerator import numpy as np +from konabot.plugins.fx_process.image_storage import ImageStorager +from konabot.plugins.fx_process.types import SenderInfo + class ImageFilterImplement: @staticmethod def apply_blur(image: Image.Image, radius: float = 10) -> Image.Image: @@ -1085,3 +1088,24 @@ class ImageFilterEmpty: @staticmethod def empty_filter_param(image, param = 10): return image + +class ImageFilterStorage: + # 用于存储图像 + @staticmethod + def store_image(image: Image.Image, name: str, sender_info: SenderInfo) -> Image.Image: + ImageStorager.save_image(image, name, sender_info.group_id, sender_info.qq_id) + return image + + # 用于暂存图像 + @staticmethod + def temp_store_image(image: Image.Image, images: list[Image.Image]) -> Image.Image: + images.append(image) + return image + + # 用于读取图像 + @staticmethod + def load_image(image: Image.Image, name: str, images: list[Image.Image], sender_info: SenderInfo) -> Image.Image: + loaded_image = ImageStorager.load_image(name, sender_info.group_id, sender_info.qq_id) + if loaded_image is not None: + images.append(loaded_image) + return image \ No newline at end of file diff --git a/konabot/plugins/fx_process/fx_manager.py b/konabot/plugins/fx_process/fx_manager.py index b315563..210e879 100644 --- a/konabot/plugins/fx_process/fx_manager.py +++ b/konabot/plugins/fx_process/fx_manager.py @@ -1,5 +1,5 @@ from typing import Optional -from konabot.plugins.fx_process.fx_handle import ImageFilterEmpty, ImageFilterImplement +from konabot.plugins.fx_process.fx_handle import ImageFilterEmpty, ImageFilterImplement, ImageFilterStorage class ImageFilterManager: filter_map = { @@ -47,6 +47,9 @@ class ImageFilterManager: "晃动": ImageFilterImplement.apply_random_wiggle, "动图": ImageFilterEmpty.empty_filter_param, "像素抖动": ImageFilterImplement.apply_pixel_jitter, + "存入图像": ImageFilterStorage.store_image, + "读取图像": ImageFilterStorage.load_image, + "暂存图像": ImageFilterStorage.temp_store_image, } @classmethod diff --git a/konabot/plugins/fx_process/image_storage.py b/konabot/plugins/fx_process/image_storage.py new file mode 100644 index 0000000..09ccb2b --- /dev/null +++ b/konabot/plugins/fx_process/image_storage.py @@ -0,0 +1,150 @@ +import asyncio +from dataclasses import dataclass +from hashlib import md5 +import time + +from nonebot import logger +from nonebot_plugin_apscheduler import driver +from konabot.common.path import DATA_PATH +import os +from PIL import Image + +IMAGE_PATH = DATA_PATH / "temp" / "images" + +@dataclass +class ImageResource: + name: str + expire: int + +@dataclass +class StorageImage: + name: str + resources: dict[str, + dict[str,ImageResource]] # {群号: {QQ号: ImageResource}} + +class ImageStorager: + images_pool: dict[str,StorageImage] = {} + + max_storage: int = 10 * 1024 * 1024 # 最大存储10MB + max_image_count: int = 200 # 最大存储图片数量 + + @staticmethod + def init(): + if not IMAGE_PATH.exists(): + IMAGE_PATH.mkdir(parents=True, exist_ok=True) + + @staticmethod + def delete_path_image(name: str): + resource_path = IMAGE_PATH / name + if resource_path.exists(): + os.remove(resource_path) + + @classmethod + async def clear_expire_image(cls): + # 清理过期的图片资源,将未被删除的放入列表中,如果超过最大数量则删除最早过期的 + remaining_images = [] + current_time = time.time() + for name, storage_image in list(ImageStorager.images_pool.items()): + for group_id, resources in list(storage_image.resources.items()): + for qq_id, resource in list(resources.items()): + if resource.expire < current_time: + del storage_image.resources[group_id][qq_id] + cls.delete_path_image(name) + else: + remaining_images.append((name, group_id, qq_id, resource.expire)) + if not storage_image.resources: + del ImageStorager.images_pool[name] + # 如果剩余图片超过最大数量,按过期时间排序并删除最早过期的 + if len(remaining_images) > ImageStorager.max_image_count: + remaining_images.sort(key=lambda x: x[3]) # 按过期时间排序 + to_delete = len(remaining_images) - ImageStorager.max_image_count + for i in range(to_delete): + name, group_id, qq_id, _ = remaining_images[i] + resource = ImageStorager.images_pool[name].resources[group_id][qq_id] + del ImageStorager.images_pool[name].resources[group_id][qq_id] + cls.delete_path_image(name) + + @classmethod + def _add_to_pool(cls, image: bytes, name: str, group_id: str, qq_id: str, expire: int = 36000): + expire_time = time.time() + expire + if name not in cls.images_pool: + cls.images_pool[name] = StorageImage(name=name,resources={}) + if group_id not in cls.images_pool[name].resources: + cls.images_pool[name].resources[group_id] = {} + cls.images_pool[name].resources[group_id][qq_id] = ImageResource(name=name, expire=expire_time) + + @classmethod + def save_image(cls, image: bytes, name: str, group_id: str, qq_id: str) -> None: + """ + 以哈希值命名保存图片,并返回图片资源信息 + """ + # 检测图像大小,不得超过 10 MB + if len(image) > cls.max_storage: + raise ValueError("图片大小超过 10 MB 限制") + hash_name = md5(image).hexdigest() + ext = os.path.splitext(name)[1] + file_name = f"{hash_name}{ext}" + full_path = IMAGE_PATH / file_name + with open(full_path, "wb") as f: + f.write(image) + # 将文件写入 images_pool + cls._add_to_pool(image, file_name, group_id, qq_id) + + @classmethod + def load_image(cls, name: str, group_id: str, qq_id: str) -> Image: + if name not in cls.images_pool: + return None + if group_id not in cls.images_pool[name].resources: + return None + # 寻找对应 QQ 号 的资源,如果没有就返回相同群下的第一个资源 + if qq_id not in cls.images_pool[name].resources[group_id]: + first_qq_id = next(iter(cls.images_pool[name].resources[group_id])) + qq_id = first_qq_id + resource = cls.images_pool[name].resources[group_id][qq_id] + resource_path = IMAGE_PATH / resource.name + return Image.open(resource_path) + +class ImageStoragerManager: + def __init__(self, interval: int = 300): # 默认 5 分钟执行一次 + self.interval = interval + self._clear_task = None + self._running = False + + async def start_auto_clear(self): + """启动自动任务""" + self._running = True + self._clear_task = asyncio.create_task(self._auto_clear_loop()) + + logger.info(f"自动清理任务已启动,间隔: {self.interval}秒") + + async def stop_auto_clear(self): + """停止自动清理任务""" + if self._clear_task: + self._running = False + self._clear_task.cancel() + try: + await self._clear_task + except asyncio.CancelledError: + pass + logger.info("自动清理任务已停止") + else: + logger.warning("没有正在运行的自动清理任务") + + async def _auto_clear_loop(self): + """自动清理循环""" + while self._running: + try: + await asyncio.sleep(self.interval) + await ImageStorager.clear_expire_image() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"定时清理失败: {e}") + +image_manager = ImageStoragerManager(interval=300) # 每5分钟清理一次 + +@driver.on_startup +async def init_image_storage(): + ImageStorager.init() + # 启用定时任务清理过期图片 + await image_manager.start_auto_clear() \ No newline at end of file diff --git a/konabot/plugins/fx_process/types.py b/konabot/plugins/fx_process/types.py new file mode 100644 index 0000000..303502b --- /dev/null +++ b/konabot/plugins/fx_process/types.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass + + +@dataclass +class FilterItem: + name: str + filter: callable + args: list + +class ImageRequireSignal: + pass + +@dataclass +class ImagesListRequireSignal: + pass + +@dataclass +class SenderInfo: + group_id: str + qq_id: str \ No newline at end of file