277 lines
11 KiB
Python
277 lines
11 KiB
Python
import asyncio as asynkio
|
||
from io import BytesIO
|
||
|
||
from inspect import signature
|
||
import random
|
||
|
||
from konabot.common.longtask import DepLongTaskTarget
|
||
from konabot.common.nb.exc import BotExceptionMessage
|
||
from konabot.common.nb.extract_image import DepImageBytesOrNone
|
||
from nonebot.adapters import Event as BaseEvent
|
||
from nonebot import on_message, logger
|
||
|
||
from nonebot_plugin_alconna import (
|
||
UniMessage,
|
||
UniMsg
|
||
)
|
||
|
||
from konabot.plugins.fx_process.fx_handle import ImageFilterStorage
|
||
from konabot.plugins.fx_process.fx_manager import ImageFilterManager
|
||
|
||
from PIL import Image, ImageSequence
|
||
|
||
from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo, StoredInfo
|
||
|
||
def try_convert_type(param_type, input_param, sender_info: SenderInfo = None) -> tuple[bool, any]:
|
||
converted_value = None
|
||
try:
|
||
if param_type is float:
|
||
converted_value = float(input_param)
|
||
elif param_type is int:
|
||
converted_value = int(input_param)
|
||
elif param_type is bool:
|
||
converted_value = input_param.lower() in ['true', '1', 'yes', '是', '开']
|
||
elif param_type is Image.Image:
|
||
converted_value = ImageRequireSignal()
|
||
return False, converted_value
|
||
elif param_type is SenderInfo:
|
||
converted_value = sender_info
|
||
return False, converted_value
|
||
elif param_type == list[Image.Image]:
|
||
converted_value = ImagesListRequireSignal()
|
||
return False, converted_value
|
||
elif param_type is str:
|
||
if input_param is None:
|
||
return False, None
|
||
converted_value = str(input_param)
|
||
else:
|
||
return False, None
|
||
except Exception:
|
||
return False, None
|
||
return True, converted_value
|
||
|
||
def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[FilterItem]:
|
||
# 按分号或换行符分割参数
|
||
args = []
|
||
for part in input_str.replace('\n', ';').split(';'):
|
||
part = part.strip()
|
||
if not part:
|
||
continue
|
||
split_part = part.split()
|
||
filter_name = split_part[0]
|
||
if not ImageFilterManager.has_filter(filter_name):
|
||
continue
|
||
filter_func = ImageFilterManager.get_filter(filter_name)
|
||
input_filter_args = split_part[1:]
|
||
# 获取函数最大参数数量
|
||
sig = signature(filter_func)
|
||
max_params = len(sig.parameters) - 1 # 减去第一个参数 image
|
||
# 从 args 提取参数,并转换为适当类型
|
||
func_args = []
|
||
for i in range(0, max_params):
|
||
# 尝试将参数转换为函数签名中对应的类型
|
||
param = list(sig.parameters.values())[i + 1]
|
||
param_type = param.annotation
|
||
# 根据函数所需要的参数,从输入参数中提取,如果不匹配就使用默认值,将当前参数递交给下一个循环
|
||
input_param = input_filter_args[0] if len(input_filter_args) > 0 else None
|
||
state, converted_param = try_convert_type(param_type, input_param, sender_info)
|
||
if state:
|
||
input_filter_args.pop(0)
|
||
if converted_param is None and param.default != param.empty:
|
||
converted_param = param.default
|
||
func_args.append(converted_param)
|
||
args.append(FilterItem(name=filter_name,filter=filter_func, args=func_args))
|
||
return args
|
||
|
||
def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image:
|
||
for filter_item in filters:
|
||
logger.debug(f"{filter_item}")
|
||
filter_func = filter_item.filter
|
||
func_args = filter_item.args
|
||
# 检测参数中是否有 ImageRequireSignal,如果有则传入对应数量的图像列表
|
||
if any(isinstance(arg, ImageRequireSignal) for arg in func_args):
|
||
# 替换 ImageRequireSignal 为 images 对应索引的图像
|
||
actual_args = []
|
||
img_signal_count = 1 # 从 images[1] 开始取图像
|
||
for arg in func_args:
|
||
if isinstance(arg, ImageRequireSignal):
|
||
if img_signal_count >= len(images):
|
||
raise BotExceptionMessage("图像数量不足,无法满足滤镜需求!")
|
||
actual_args.append(images[img_signal_count])
|
||
img_signal_count += 1
|
||
else:
|
||
actual_args.append(arg)
|
||
func_args = actual_args
|
||
# 检测参数中是否有 ImagesListRequireSignal,如果有则传入整个图像列表
|
||
if any(isinstance(arg, ImagesListRequireSignal) for arg in func_args):
|
||
actual_args = []
|
||
for arg in func_args:
|
||
if isinstance(arg, ImagesListRequireSignal):
|
||
actual_args.append(images)
|
||
else:
|
||
actual_args.append(arg)
|
||
func_args = actual_args
|
||
|
||
logger.debug(f"Applying filter: {filter_item.name} with args: {func_args}")
|
||
|
||
images[0] = filter_func(images[0], *func_args)
|
||
return images[0]
|
||
|
||
def copy_images_by_index(images: list[Image.Image], index: int) -> list[Image.Image]:
|
||
# 将导入图像列表复制为新的图像列表,如果是动图,那么就找到对应索引下的帧
|
||
new_images = []
|
||
for img in images:
|
||
if getattr(img, "is_animated", False):
|
||
frames = img.n_frames
|
||
frame_idx = index % frames
|
||
img.seek(frame_idx)
|
||
new_images.append(img.copy())
|
||
else:
|
||
new_images.append(img.copy())
|
||
|
||
return new_images
|
||
|
||
def generate_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image:
|
||
# 处理位于最前面的生成类滤镜
|
||
while filters and filters[0].name.strip() in ImageFilterManager.generate_filter_map:
|
||
gen_filter = filters.pop(0)
|
||
gen_func = gen_filter.filter
|
||
func_args = gen_filter.args[1:] # 去掉第一个 list 参数
|
||
gen_func(None, images, *func_args)
|
||
|
||
def save_or_load_image(images: list[Image.Image], filters: list[FilterItem], sender_info: SenderInfo) -> StoredInfo | None:
|
||
stored_info = None
|
||
# 处理位于最前面的“读取图像”、“存入图像”
|
||
if not filters:
|
||
return
|
||
while filters and filters[0].name.strip() in ["读取图像", "存入图像"]:
|
||
if filters[0].name.strip() == "读取图像":
|
||
load_filter = filters.pop(0)
|
||
path = load_filter.args[0] if load_filter.args else ""
|
||
ImageFilterStorage.load_image(None, path, images, sender_info)
|
||
elif filters[0].name.strip() == "存入图像":
|
||
store_filter = filters.pop(0)
|
||
name = store_filter.args[0] if store_filter.args[0] else str(random.randint(10000,99999))
|
||
stored_info = ImageFilterStorage.store_image(images[0], name, sender_info)
|
||
# 将剩下的“读取图像”或“存入图像”参数全部删除,避免后续非法操作
|
||
filters[:] = [f for f in filters if f.name.strip() not in ["读取图像", "存入图像"]]
|
||
return stored_info
|
||
|
||
async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem], sender_info: SenderInfo) -> BytesIO | StoredInfo:
|
||
# 先处理存取图像、生成图像的操作
|
||
stored_info = save_or_load_image(images, filters, sender_info)
|
||
generate_image(images, filters)
|
||
|
||
if stored_info and len(filters) <= 0:
|
||
return stored_info
|
||
|
||
if len(images) <= 0:
|
||
raise BotExceptionMessage("没有可处理的图像!")
|
||
|
||
# 检测是否需要将静态图视作动图处理
|
||
frozen_to_move = any(
|
||
filter_item.name == "动图"
|
||
for filter_item in filters
|
||
)
|
||
static_fps = 10
|
||
# 找到动图参数 fps
|
||
if frozen_to_move:
|
||
for filter_item in filters:
|
||
if filter_item.name == "动图" and filter_item.args:
|
||
try:
|
||
static_fps = int(filter_item.args[0])
|
||
except Exception:
|
||
static_fps = 10
|
||
break
|
||
# 如果 image 是动图,则逐帧处理
|
||
img = images[0]
|
||
logger.debug("开始图像处理")
|
||
output = BytesIO()
|
||
if getattr(img, "is_animated", False) or frozen_to_move:
|
||
frames = []
|
||
append_images = []
|
||
if getattr(img, "is_animated", False):
|
||
logger.debug("处理动图帧")
|
||
else:
|
||
# 将静态图视作单帧动图处理,拷贝 10 帧
|
||
logger.debug("处理静态图为多帧动图")
|
||
append_images = [img.copy() for _ in range(10)]
|
||
img.info['duration'] = int(1000 / static_fps)
|
||
|
||
async def process_single_frame(frame_images: list[Image.Image], frame_idx: int) -> Image.Image:
|
||
"""处理单帧的异步函数"""
|
||
logger.debug(f"开始处理帧 {frame_idx}")
|
||
result = await asynkio.to_thread(handle_filters_to_image, frame_images, filters)
|
||
logger.debug(f"完成处理帧 {frame_idx}")
|
||
return result
|
||
|
||
# 并发处理所有帧
|
||
tasks = []
|
||
all_frames = []
|
||
for i, frame in enumerate(list(ImageSequence.Iterator(img)) + append_images):
|
||
all_frames.append(frame.copy())
|
||
images_copy = copy_images_by_index(images, i)
|
||
task = process_single_frame(images_copy, i)
|
||
tasks.append(task)
|
||
|
||
frames = await asynkio.gather(*tasks, return_exceptions=False)
|
||
|
||
# 检查是否有处理失败的帧
|
||
for i, result in enumerate(frames):
|
||
if isinstance(result, Exception):
|
||
logger.error(f"帧 {i} 处理失败: {result}")
|
||
# 使用原始帧作为回退
|
||
frames[i] = all_frames[i]
|
||
|
||
logger.debug("保存动图")
|
||
frames[0].save(
|
||
output,
|
||
format="GIF",
|
||
save_all=True,
|
||
append_images=frames[1:],
|
||
loop=img.info.get("loop", 0),
|
||
disposal=img.info.get("disposal", 2),
|
||
duration=img.info.get("duration", 100),
|
||
)
|
||
logger.debug("Animated image saved")
|
||
else:
|
||
img = handle_filters_to_image(images=images, filters=filters)
|
||
img.save(output, format="PNG")
|
||
logger.debug("Image processing completed")
|
||
output.seek(0)
|
||
return output
|
||
|
||
def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
|
||
txt = msg.extract_plain_text()
|
||
if "fx" not in txt[:3].lower():
|
||
return False
|
||
return True
|
||
|
||
fx_on = on_message(rule=is_fx_mentioned)
|
||
|
||
@fx_on.handle()
|
||
async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone):
|
||
preload_imgs = []
|
||
# 提取图像
|
||
try:
|
||
preload_imgs.append(Image.open(BytesIO(image_data)))
|
||
except Exception:
|
||
logger.info("No image found in message for FX processing.")
|
||
args = msg.extract_plain_text().split()
|
||
if len(args) < 2:
|
||
return
|
||
|
||
sender_info = SenderInfo(
|
||
group_id=target.channel_id,
|
||
qq_id=target.target_id
|
||
)
|
||
|
||
filters = prase_input_args(msg.extract_plain_text()[2:], sender_info=sender_info)
|
||
# if not filters:
|
||
# return
|
||
output = await apply_filters_to_images(preload_imgs, filters, sender_info)
|
||
if isinstance(output,StoredInfo):
|
||
await fx_on.send(await UniMessage().text(f"图像已存为「{output.name}」!").export())
|
||
elif isinstance(output,BytesIO):
|
||
await fx_on.send(await UniMessage().image(raw=output).export())
|
||
|