diff --git a/konabot/common/nb/extract_image.py b/konabot/common/nb/extract_image.py index 14d0122..a278795 100644 --- a/konabot/common/nb/extract_image.py +++ b/konabot/common/nb/extract_image.py @@ -207,9 +207,21 @@ async def _ext_img( await matcher.send(await UniMessage.text(msg).export()) return None - +async def _try_ext_img( + evt: Event, + bot: Bot, + matcher: Matcher, +) -> bytes | None: + match await extract_image_data_from_message(evt.get_message(), evt, bot): + case Success(img): + return img + case Failure(err): + # raise BotExceptionMessage(err) + # await matcher.send(await UniMessage().text(err).export()) + return None + assert False DepImageBytes = Annotated[bytes, nonebot.params.Depends(_ext_img_data)] DepPILImage = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)] -DepImageBytesOrNone = Annotated[bytes | None, nonebot.params.Depends(_ext_img_data)] +DepImageBytesOrNone = Annotated[bytes | None, nonebot.params.Depends(_try_ext_img)] diff --git a/konabot/plugins/fx_process/__init__.py b/konabot/plugins/fx_process/__init__.py index 19dd613..a2b76b3 100644 --- a/konabot/plugins/fx_process/__init__.py +++ b/konabot/plugins/fx_process/__init__.py @@ -1,25 +1,54 @@ import asyncio as asynkio -from dataclasses import dataclass from io import BytesIO from inspect import signature +import random from konabot.common.longtask import DepLongTaskTarget +from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.extract_image import DepImageBytesOrNone from nonebot.adapters import Event as BaseEvent from nonebot import on_message, logger -from returns.result import Failure, Result, Success from nonebot_plugin_alconna import ( UniMessage, UniMsg ) +from konabot.plugins.fx_process.fx_handle import ImageFilterStorage from konabot.plugins.fx_process.fx_manager import ImageFilterManager from PIL import Image, ImageSequence -from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo +from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo, StoredInfo + +def try_convert_type(param_type, input_param, sender_info: SenderInfo = None) -> tuple[bool, any]: + converted_value = None + try: + if param_type is float: + converted_value = float(input_param) + elif param_type is int: + converted_value = int(input_param) + elif param_type is bool: + converted_value = input_param.lower() in ['true', '1', 'yes', '是', '开'] + elif param_type is Image.Image: + converted_value = ImageRequireSignal() + return False, converted_value + elif param_type is SenderInfo: + converted_value = sender_info + return False, converted_value + elif param_type == list[Image.Image]: + converted_value = ImagesListRequireSignal() + return False, converted_value + elif param_type is str: + if input_param is None: + return False, None + converted_value = str(input_param) + else: + return False, None + except Exception: + return False, None + return True, converted_value def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[FilterItem]: # 按分号或换行符分割参数 @@ -39,34 +68,24 @@ def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[Fil max_params = len(sig.parameters) - 1 # 减去第一个参数 image # 从 args 提取参数,并转换为适当类型 func_args = [] - for i in range(0, min(len(input_filter_args), max_params)): - # 尝试将参数转换为函数签名中对应的类型,并检测是不是 Image 类型,如果有则表示多个图像输入 + for i in range(0, max_params): + # 尝试将参数转换为函数签名中对应的类型 param = list(sig.parameters.values())[i + 1] param_type = param.annotation - arg_value = input_filter_args[i] - try: - if param_type is float: - converted_value = float(arg_value) - elif param_type is int: - converted_value = int(arg_value) - elif param_type is bool: - converted_value = arg_value.lower() in ['true', '1', 'yes', '是', '开'] - elif param_type is Image.Image: - converted_value = ImageRequireSignal() - elif param_type is SenderInfo: - converted_value = sender_info - elif param_type is list[Image.Image]: - converted_value = ImagesListRequireSignal() - else: - converted_value = arg_value - except Exception: - converted_value = arg_value - func_args.append(converted_value) + # 根据函数所需要的参数,从输入参数中提取,如果不匹配就使用默认值,将当前参数递交给下一个循环 + input_param = input_filter_args[0] if len(input_filter_args) > 0 else None + state, converted_param = try_convert_type(param_type, input_param, sender_info) + if state: + input_filter_args.pop(0) + if converted_param is None and param.default != param.empty: + converted_param = param.default + func_args.append(converted_param) args.append(FilterItem(name=filter_name,filter=filter_func, args=func_args)) return args def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image: for filter_item in filters: + logger.debug(f"{filter_item}") filter_func = filter_item.filter func_args = filter_item.args # 检测参数中是否有 ImageRequireSignal,如果有则传入对应数量的图像列表 @@ -90,21 +109,53 @@ def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem] else: actual_args.append(arg) func_args = actual_args + + logger.debug(f"Applying filter: {filter_item.name} with args: {func_args}") images[0] = filter_func(images[0], *func_args) return images[0] -async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem]) -> BytesIO: - # 如果第一项是“加载图像”参数,那么就加载图像 - if filters and filters[0].name == "加载图像": - load_filter = filters.pop(0) - # 加载全部路径 - for path in load_filter.args: - img = Image.open(path) - images.append(img) +def copy_images_by_index(images: list[Image.Image], index: int) -> list[Image.Image]: + # 将导入图像列表复制为新的图像列表,如果是动图,那么就找到对应索引下的帧 + new_images = [] + for img in images: + if getattr(img, "is_animated", False): + frames = img.n_frames + frame_idx = index % frames + img.seek(frame_idx) + new_images.append(img.copy()) + else: + new_images.append(img.copy()) + + return new_images + +def save_or_load_image(images: list[Image.Image], filters: list[FilterItem], sender_info: SenderInfo) -> StoredInfo | None: + stored_info = None + # 处理位于最前面的“读取图像”、“存入图像” + if not filters: + return + while filters and filters[0].name.strip() in ["读取图像", "存入图像"]: + if filters[0].name.strip() == "读取图像": + load_filter = filters.pop(0) + path = load_filter.args[0] if load_filter.args else "" + ImageFilterStorage.load_image(None, path, images, sender_info) + elif filters[0].name.strip() == "存入图像": + store_filter = filters.pop(0) + name = store_filter.args[0] if store_filter.args[0] else str(random.randint(10000,99999)) + stored_info = ImageFilterStorage.store_image(images[0], name, sender_info) + # 将剩下的“读取图像”或“存入图像”参数全部删除,避免后续非法操作 + filters[:] = [f for f in filters if f.name.strip() not in ["读取图像", "存入图像"]] + return stored_info + +async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem], sender_info: SenderInfo) -> BytesIO | StoredInfo: + # 先处理存取图像的操作 + stored_info = save_or_load_image(images, filters, sender_info) + + if stored_info and len(filters) <= 0: + return stored_info if len(images) <= 0: - raise ValueError("没有提供任何图像进行处理") + raise BotExceptionMessage("没有可处理的图像!") # 检测是否需要将静态图视作动图处理 frozen_to_move = any( @@ -127,32 +178,30 @@ async def apply_filters_to_images(images: list[Image.Image], filters: list[Filte output = BytesIO() if getattr(img, "is_animated", False) or frozen_to_move: frames = [] - all_frames = [] if getattr(img, "is_animated", False): logger.debug("处理动图帧") - for frame in ImageSequence.Iterator(img): - frame_copy = frame.copy() - all_frames.append(frame_copy) else: # 将静态图视作单帧动图处理,拷贝多份 logger.debug("处理静态图为多帧动图") - for _ in range(10): # 默认复制10帧 - all_frames.append(img.copy()) img.info['duration'] = int(1000 / static_fps) - async def process_single_frame(frame: list[Image.Image], frame_idx: int) -> Image.Image: + async def process_single_frame(frame_images: list[Image.Image], frame_idx: int) -> Image.Image: """处理单帧的异步函数""" logger.debug(f"开始处理帧 {frame_idx}") - result = await asynkio.to_thread(handle_filters_to_image, frame, images, filters) + result = await asynkio.to_thread(handle_filters_to_image, frame_images, filters) logger.debug(f"完成处理帧 {frame_idx}") - return result[0] + return result + # 并发处理所有帧 tasks = [] - for i, frame in enumerate(all_frames): - task = process_single_frame(frame, i) + all_frames = [] + for i, frame in enumerate(ImageSequence.Iterator(img)): + all_frames.append(frame.copy()) + images_copy = copy_images_by_index(images, i) + task = process_single_frame(images_copy, i) tasks.append(task) - frames = await asynkio.gather(*tasks, return_exceptions=True) + frames = await asynkio.gather(*tasks, return_exceptions=False) # 检查是否有处理失败的帧 for i, result in enumerate(frames): @@ -188,13 +237,11 @@ def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool: fx_on = on_message(rule=is_fx_mentioned) @fx_on.handle() -async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone = None): +async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone): preload_imgs = [] # 提取图像 try: - if image_data is not None: - preload_imgs.append(Image.open(BytesIO(image_data))) - logger.debug("Image extracted for FX processing.") + preload_imgs.append(Image.open(BytesIO(image_data))) except Exception: logger.info("No image found in message for FX processing.") args = msg.extract_plain_text().split() @@ -207,9 +254,11 @@ async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data ) filters = prase_input_args(msg.extract_plain_text()[2:], sender_info=sender_info) - if not filters: - return - output = await apply_filters_to_images(preload_imgs, filters) - logger.debug("FX processing completed, sending result.") - await fx_on.send(await UniMessage().image(raw=output).export()) + # if not filters: + # return + output = await apply_filters_to_images(preload_imgs, filters, sender_info) + if isinstance(output,StoredInfo): + await fx_on.send(await UniMessage().text(f"图像已存为「{output.name}」!").export()) + elif isinstance(output,BytesIO): + await fx_on.send(await UniMessage().image(raw=output).export()) \ No newline at end of file diff --git a/konabot/plugins/fx_process/fx_handle.py b/konabot/plugins/fx_process/fx_handle.py index dd421ba..012e8b3 100644 --- a/konabot/plugins/fx_process/fx_handle.py +++ b/konabot/plugins/fx_process/fx_handle.py @@ -12,7 +12,7 @@ from konabot.plugins.fx_process.gradient import GradientGenerator import numpy as np from konabot.plugins.fx_process.image_storage import ImageStorager -from konabot.plugins.fx_process.types import SenderInfo +from konabot.plugins.fx_process.types import SenderInfo, StoredInfo class ImageFilterImplement: @staticmethod @@ -548,6 +548,18 @@ class ImageFilterImplement: return Image.fromarray(blended, 'RGBA') + # 两张图像直接覆盖 + @staticmethod + def apply_overlay(image1: Image.Image, image2: Image.Image) -> Image.Image: + if image1.mode != 'RGBA': + image1 = image1.convert('RGBA') + if image2.mode != 'RGBA': + image2 = image2.convert('RGBA') + + image2 = image2.resize(image1.size, Image.Resampling.LANCZOS) + + return Image.alpha_composite(image1, image2) + # 叠加渐变色 @staticmethod def apply_gradient_overlay( @@ -560,6 +572,18 @@ class ImageFilterImplement: gradient = gradient_gen.create_gradient(image.size[0], image.size[1], color_nodes) return ImageFilterImplement.apply_blend(image, gradient, mode=overlay_mode, alpha=0.5) + # 生成颜色,类似固态层 + @staticmethod + def generate_solid( + image: Image.Image, + color_list: str = '[rgb(255,0,0)|(0,0),rgb(0,255,0)|(0,100),rgb(0,0,255)|(50,100)]' + ): + gradient_gen = GradientGenerator() + color_nodes = gradient_gen.parse_color_list(color_list) + gradient = gradient_gen.create_gradient(image.size[0], image.size[1], color_nodes) + return gradient + + # 阴影 @staticmethod def apply_shadow(image: Image.Image, @@ -1090,17 +1114,50 @@ class ImageFilterEmpty: return image class ImageFilterStorage: - # 用于存储图像 - @staticmethod - def store_image(image: Image.Image, name: str, sender_info: SenderInfo) -> Image.Image: - ImageStorager.save_image(image, name, sender_info.group_id, sender_info.qq_id) - return image - - # 用于暂存图像 + # 用于暂存图像,存储在第一项的后面 @staticmethod def temp_store_image(image: Image.Image, images: list[Image.Image]) -> Image.Image: - images.append(image) + images.insert(1, image.copy()) return image + + # 用于交换移动索引 + @staticmethod + def swap_image_index(image: Image.Image, images: list[Image.Image], src: int = 2, dest: int = 1) -> Image.Image: + if len(images) == 0: + return image + # 将二者交换 + src -= 1 + dest -= 1 + if 0 <= src < len(images) and 0 <= dest < len(images): + images[src], images[dest] = images[dest], images[src] + return images[0] + + # 用于删除指定索引的图像 + @staticmethod + def delete_image_by_index(image: Image.Image, images: list[Image.Image], index: int = 1) -> Image.Image: + # 以 1 为基准 + index -= 1 + if len(images) == 1: + # 只有一张图像,不能删除 + return image + if 0 <= index < len(images): + del images[index] + return images[0] + + # 用于选择指定索引的图像为首图 + @staticmethod + def select_image_by_index(image: Image.Image, images: list[Image.Image], index: int = 2) -> Image.Image: + # 以 1 为基准 + index -= 1 + if 0 <= index < len(images): + return images[index] + return image + + # 用于存储图像 + @staticmethod + def store_image(image: Image.Image, name: str, sender_info: SenderInfo) -> StoredInfo: + ImageStorager.save_image_by_pil(image, name, sender_info.group_id, sender_info.qq_id) + return StoredInfo(name=name) # 用于读取图像 @staticmethod diff --git a/konabot/plugins/fx_process/fx_manager.py b/konabot/plugins/fx_process/fx_manager.py index 210e879..3c1d653 100644 --- a/konabot/plugins/fx_process/fx_manager.py +++ b/konabot/plugins/fx_process/fx_manager.py @@ -47,9 +47,18 @@ class ImageFilterManager: "晃动": ImageFilterImplement.apply_random_wiggle, "动图": ImageFilterEmpty.empty_filter_param, "像素抖动": ImageFilterImplement.apply_pixel_jitter, + # 图像处理 "存入图像": ImageFilterStorage.store_image, "读取图像": ImageFilterStorage.load_image, "暂存图像": ImageFilterStorage.temp_store_image, + "交换图像": ImageFilterStorage.swap_image_index, + "删除图像": ImageFilterStorage.delete_image_by_index, + "选择图像": ImageFilterStorage.select_image_by_index, + # 多图像处理 + "混合图像": ImageFilterImplement.apply_blend, + "覆盖图像": ImageFilterImplement.apply_overlay, + # 生成式 + "生成颜色": ImageFilterImplement.generate_solid } @classmethod diff --git a/konabot/plugins/fx_process/gradient.py b/konabot/plugins/fx_process/gradient.py index 44a1802..e898603 100644 --- a/konabot/plugins/fx_process/gradient.py +++ b/konabot/plugins/fx_process/gradient.py @@ -14,20 +14,23 @@ class GradientGenerator: """解析渐变颜色列表字符串 Args: - color_list_str: 格式如 '[rgb(255,0,0)|(0,0),rgb(0,255,0)|(0,100),rgb(0,0,255)|(50,100)]' + color_list_str: 格式如 '[rgb(255,0,0)|(0,0)+rgb(0,255,0)|(0,100)+rgb(0,0,255)|(50,100)]' Returns: list: 包含颜色和位置信息的字典列表 """ color_nodes = [] - color_list_str = color_list_str.strip('[] ').strip() - pattern = r'([^|]+)\|\(([^)]+)\)' - matches = re.findall(pattern, color_list_str) + color_list_str = color_list_str.strip('[]').strip() + matches = color_list_str.split('+') - for color_str, pos_str in matches: + for single_str in matches: + color_str = single_str.split('|')[0] + pos_str = single_str.split('|')[1] if '|' in single_str else '0,0' + color = ColorHandle.parse_color(color_str.strip()) - + try: + pos_str = pos_str.replace('(', '').replace(')', '') x_str, y_str = pos_str.split(',') x_percent = float(x_str.strip().replace('%', '')) y_percent = float(y_str.strip().replace('%', '')) diff --git a/konabot/plugins/fx_process/image_storage.py b/konabot/plugins/fx_process/image_storage.py index 09ccb2b..8f5e4f3 100644 --- a/konabot/plugins/fx_process/image_storage.py +++ b/konabot/plugins/fx_process/image_storage.py @@ -8,12 +8,13 @@ from nonebot_plugin_apscheduler import driver from konabot.common.path import DATA_PATH import os from PIL import Image +from io import BytesIO IMAGE_PATH = DATA_PATH / "temp" / "images" @dataclass class ImageResource: - name: str + filename: str expire: int @dataclass @@ -39,6 +40,14 @@ class ImageStorager: if resource_path.exists(): os.remove(resource_path) + @staticmethod + async def clear_all_image(): + # 清理 temp 目录下的所有图片资源 + for file in os.listdir(IMAGE_PATH): + file_path = IMAGE_PATH / file + if file_path.is_file(): + os.remove(file_path) + @classmethod async def clear_expire_image(cls): # 清理过期的图片资源,将未被删除的放入列表中,如果超过最大数量则删除最早过期的 @@ -63,15 +72,17 @@ class ImageStorager: resource = ImageStorager.images_pool[name].resources[group_id][qq_id] del ImageStorager.images_pool[name].resources[group_id][qq_id] cls.delete_path_image(name) + logger.info("过期图片清理完成") @classmethod - def _add_to_pool(cls, image: bytes, name: str, group_id: str, qq_id: str, expire: int = 36000): + def _add_to_pool(cls, filename: str, name: str, group_id: str, qq_id: str, expire: int = 36000): expire_time = time.time() + expire if name not in cls.images_pool: cls.images_pool[name] = StorageImage(name=name,resources={}) if group_id not in cls.images_pool[name].resources: cls.images_pool[name].resources[group_id] = {} - cls.images_pool[name].resources[group_id][qq_id] = ImageResource(name=name, expire=expire_time) + cls.images_pool[name].resources[group_id][qq_id] = ImageResource(filename=filename, expire=expire_time) + logger.debug(f"{cls.images_pool}") @classmethod def save_image(cls, image: bytes, name: str, group_id: str, qq_id: str) -> None: @@ -88,20 +99,39 @@ class ImageStorager: with open(full_path, "wb") as f: f.write(image) # 将文件写入 images_pool - cls._add_to_pool(image, file_name, group_id, qq_id) + logger.debug(f"Image saved: {file_name} for group {group_id}, qq {qq_id}") + cls._add_to_pool(file_name, name, group_id, qq_id) + + @classmethod + def save_image_by_pil(cls, image: Image.Image, name: str, group_id: str, qq_id: str) -> None: + """ + 以哈希值命名保存图片,并返回图片资源信息 + """ + img_byte_arr = BytesIO() + # 如果图片是动图,保存为 GIF 格式 + if getattr(image, "is_animated", False): + image.save(img_byte_arr, format="GIF", save_all=True, loop=0) + else: + image.save(img_byte_arr, format=image.format or "PNG") + img_bytes = img_byte_arr.getvalue() + cls.save_image(img_bytes, name, group_id, qq_id) @classmethod def load_image(cls, name: str, group_id: str, qq_id: str) -> Image: + logger.debug(f"Loading image: {name} for group {group_id}, qq {qq_id}") if name not in cls.images_pool: + logger.debug(f"Image {name} not found in pool") return None if group_id not in cls.images_pool[name].resources: + logger.debug(f"No resources for group {group_id} in image {name}") return None # 寻找对应 QQ 号 的资源,如果没有就返回相同群下的第一个资源 if qq_id not in cls.images_pool[name].resources[group_id]: first_qq_id = next(iter(cls.images_pool[name].resources[group_id])) qq_id = first_qq_id resource = cls.images_pool[name].resources[group_id][qq_id] - resource_path = IMAGE_PATH / resource.name + resource_path = IMAGE_PATH / resource.filename + logger.debug(f"Image path: {resource_path}") return Image.open(resource_path) class ImageStoragerManager: @@ -112,6 +142,8 @@ class ImageStoragerManager: async def start_auto_clear(self): """启动自动任务""" + # 先清理一次 + await ImageStorager.clear_all_image() self._running = True self._clear_task = asyncio.create_task(self._auto_clear_loop()) diff --git a/konabot/plugins/fx_process/types.py b/konabot/plugins/fx_process/types.py index 303502b..43a71f9 100644 --- a/konabot/plugins/fx_process/types.py +++ b/konabot/plugins/fx_process/types.py @@ -10,10 +10,13 @@ class FilterItem: class ImageRequireSignal: pass -@dataclass class ImagesListRequireSignal: pass +@dataclass +class StoredInfo: + name: str + @dataclass class SenderInfo: group_id: str