Files
konabot/konabot/plugins/fx_process/__init__.py
MixBadGun 9148073095
All checks were successful
continuous-integration/drone/push Build is passing
完善形状描边,新增文本图层、空白图层生成
2025-12-10 22:45:33 +08:00

277 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio as asynkio
from io import BytesIO
from inspect import signature
import random
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import DepImageBytesOrNone
from nonebot.adapters import Event as BaseEvent
from nonebot import on_message, logger
from nonebot_plugin_alconna import (
UniMessage,
UniMsg
)
from konabot.plugins.fx_process.fx_handle import ImageFilterStorage
from konabot.plugins.fx_process.fx_manager import ImageFilterManager
from PIL import Image, ImageSequence
from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo, StoredInfo
def try_convert_type(param_type, input_param, sender_info: SenderInfo = None) -> tuple[bool, any]:
converted_value = None
try:
if param_type is float:
converted_value = float(input_param)
elif param_type is int:
converted_value = int(input_param)
elif param_type is bool:
converted_value = input_param.lower() in ['true', '1', 'yes', '', '']
elif param_type is Image.Image:
converted_value = ImageRequireSignal()
return False, converted_value
elif param_type is SenderInfo:
converted_value = sender_info
return False, converted_value
elif param_type == list[Image.Image]:
converted_value = ImagesListRequireSignal()
return False, converted_value
elif param_type is str:
if input_param is None:
return False, None
converted_value = str(input_param)
else:
return False, None
except Exception:
return False, None
return True, converted_value
def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[FilterItem]:
# 按分号或换行符分割参数
args = []
for part in input_str.replace('\n', ';').split(';'):
part = part.strip()
if not part:
continue
split_part = part.split()
filter_name = split_part[0]
if not ImageFilterManager.has_filter(filter_name):
continue
filter_func = ImageFilterManager.get_filter(filter_name)
input_filter_args = split_part[1:]
# 获取函数最大参数数量
sig = signature(filter_func)
max_params = len(sig.parameters) - 1 # 减去第一个参数 image
# 从 args 提取参数,并转换为适当类型
func_args = []
for i in range(0, max_params):
# 尝试将参数转换为函数签名中对应的类型
param = list(sig.parameters.values())[i + 1]
param_type = param.annotation
# 根据函数所需要的参数,从输入参数中提取,如果不匹配就使用默认值,将当前参数递交给下一个循环
input_param = input_filter_args[0] if len(input_filter_args) > 0 else None
state, converted_param = try_convert_type(param_type, input_param, sender_info)
if state:
input_filter_args.pop(0)
if converted_param is None and param.default != param.empty:
converted_param = param.default
func_args.append(converted_param)
args.append(FilterItem(name=filter_name,filter=filter_func, args=func_args))
return args
def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image:
for filter_item in filters:
logger.debug(f"{filter_item}")
filter_func = filter_item.filter
func_args = filter_item.args
# 检测参数中是否有 ImageRequireSignal如果有则传入对应数量的图像列表
if any(isinstance(arg, ImageRequireSignal) for arg in func_args):
# 替换 ImageRequireSignal 为 images 对应索引的图像
actual_args = []
img_signal_count = 1 # 从 images[1] 开始取图像
for arg in func_args:
if isinstance(arg, ImageRequireSignal):
if img_signal_count >= len(images):
raise BotExceptionMessage("图像数量不足,无法满足滤镜需求!")
actual_args.append(images[img_signal_count])
img_signal_count += 1
else:
actual_args.append(arg)
func_args = actual_args
# 检测参数中是否有 ImagesListRequireSignal如果有则传入整个图像列表
if any(isinstance(arg, ImagesListRequireSignal) for arg in func_args):
actual_args = []
for arg in func_args:
if isinstance(arg, ImagesListRequireSignal):
actual_args.append(images)
else:
actual_args.append(arg)
func_args = actual_args
logger.debug(f"Applying filter: {filter_item.name} with args: {func_args}")
images[0] = filter_func(images[0], *func_args)
return images[0]
def copy_images_by_index(images: list[Image.Image], index: int) -> list[Image.Image]:
# 将导入图像列表复制为新的图像列表,如果是动图,那么就找到对应索引下的帧
new_images = []
for img in images:
if getattr(img, "is_animated", False):
frames = img.n_frames
frame_idx = index % frames
img.seek(frame_idx)
new_images.append(img.copy())
else:
new_images.append(img.copy())
return new_images
def generate_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image:
# 处理位于最前面的生成类滤镜
while filters and filters[0].name.strip() in ImageFilterManager.generate_filter_map:
gen_filter = filters.pop(0)
gen_func = gen_filter.filter
func_args = gen_filter.args[1:] # 去掉第一个 list 参数
gen_func(None, images, *func_args)
def save_or_load_image(images: list[Image.Image], filters: list[FilterItem], sender_info: SenderInfo) -> StoredInfo | None:
stored_info = None
# 处理位于最前面的“读取图像”、“存入图像”
if not filters:
return
while filters and filters[0].name.strip() in ["读取图像", "存入图像"]:
if filters[0].name.strip() == "读取图像":
load_filter = filters.pop(0)
path = load_filter.args[0] if load_filter.args else ""
ImageFilterStorage.load_image(None, path, images, sender_info)
elif filters[0].name.strip() == "存入图像":
store_filter = filters.pop(0)
name = store_filter.args[0] if store_filter.args[0] else str(random.randint(10000,99999))
stored_info = ImageFilterStorage.store_image(images[0], name, sender_info)
# 将剩下的“读取图像”或“存入图像”参数全部删除,避免后续非法操作
filters[:] = [f for f in filters if f.name.strip() not in ["读取图像", "存入图像"]]
return stored_info
async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem], sender_info: SenderInfo) -> BytesIO | StoredInfo:
# 先处理存取图像、生成图像的操作
stored_info = save_or_load_image(images, filters, sender_info)
generate_image(images, filters)
if stored_info and len(filters) <= 0:
return stored_info
if len(images) <= 0:
raise BotExceptionMessage("没有可处理的图像!")
# 检测是否需要将静态图视作动图处理
frozen_to_move = any(
filter_item.name == "动图"
for filter_item in filters
)
static_fps = 10
# 找到动图参数 fps
if frozen_to_move:
for filter_item in filters:
if filter_item.name == "动图" and filter_item.args:
try:
static_fps = int(filter_item.args[0])
except Exception:
static_fps = 10
break
# 如果 image 是动图,则逐帧处理
img = images[0]
logger.debug("开始图像处理")
output = BytesIO()
if getattr(img, "is_animated", False) or frozen_to_move:
frames = []
append_images = []
if getattr(img, "is_animated", False):
logger.debug("处理动图帧")
else:
# 将静态图视作单帧动图处理,拷贝 10 帧
logger.debug("处理静态图为多帧动图")
append_images = [img.copy() for _ in range(10)]
img.info['duration'] = int(1000 / static_fps)
async def process_single_frame(frame_images: list[Image.Image], frame_idx: int) -> Image.Image:
"""处理单帧的异步函数"""
logger.debug(f"开始处理帧 {frame_idx}")
result = await asynkio.to_thread(handle_filters_to_image, frame_images, filters)
logger.debug(f"完成处理帧 {frame_idx}")
return result
# 并发处理所有帧
tasks = []
all_frames = []
for i, frame in enumerate(list(ImageSequence.Iterator(img)) + append_images):
all_frames.append(frame.copy())
images_copy = copy_images_by_index(images, i)
task = process_single_frame(images_copy, i)
tasks.append(task)
frames = await asynkio.gather(*tasks, return_exceptions=False)
# 检查是否有处理失败的帧
for i, result in enumerate(frames):
if isinstance(result, Exception):
logger.error(f"{i} 处理失败: {result}")
# 使用原始帧作为回退
frames[i] = all_frames[i]
logger.debug("保存动图")
frames[0].save(
output,
format="GIF",
save_all=True,
append_images=frames[1:],
loop=img.info.get("loop", 0),
disposal=img.info.get("disposal", 2),
duration=img.info.get("duration", 100),
)
logger.debug("Animated image saved")
else:
img = handle_filters_to_image(images=images, filters=filters)
img.save(output, format="PNG")
logger.debug("Image processing completed")
output.seek(0)
return output
def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "fx" not in txt[:3].lower():
return False
return True
fx_on = on_message(rule=is_fx_mentioned)
@fx_on.handle()
async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone):
preload_imgs = []
# 提取图像
try:
preload_imgs.append(Image.open(BytesIO(image_data)))
except Exception:
logger.info("No image found in message for FX processing.")
args = msg.extract_plain_text().split()
if len(args) < 2:
return
sender_info = SenderInfo(
group_id=target.channel_id,
qq_id=target.target_id
)
filters = prase_input_args(msg.extract_plain_text()[2:], sender_info=sender_info)
# if not filters:
# return
output = await apply_filters_to_images(preload_imgs, filters, sender_info)
if isinstance(output,StoredInfo):
await fx_on.send(await UniMessage().text(f"图像已存为「{output.name}」!").export())
elif isinstance(output,BytesIO):
await fx_on.send(await UniMessage().image(raw=output).export())