待完善

This commit is contained in:
2025-12-09 00:02:26 +08:00
parent eed21e6223
commit 54fae88914
6 changed files with 274 additions and 22 deletions

View File

@ -4,9 +4,11 @@ from io import BytesIO
from inspect import signature
from konabot.common.nb.extract_image import DepImageBytes, DepPILImage
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.extract_image import DepImageBytesOrNone
from nonebot.adapters import Event as BaseEvent
from nonebot import on_message, logger
from returns.result import Failure, Result, Success
from nonebot_plugin_alconna import (
UniMessage,
@ -17,13 +19,9 @@ from konabot.plugins.fx_process.fx_manager import ImageFilterManager
from PIL import Image, ImageSequence
@dataclass
class FilterItem:
name: str
filter: callable
args: list
from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo
def prase_input_args(input_str: str) -> list[FilterItem]:
def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[FilterItem]:
# 按分号或换行符分割参数
args = []
for part in input_str.replace('\n', ';').split(';'):
@ -42,7 +40,7 @@ def prase_input_args(input_str: str) -> list[FilterItem]:
# 从 args 提取参数,并转换为适当类型
func_args = []
for i in range(0, min(len(input_filter_args), max_params)):
# 尝试将参数转换为函数签名中对应的类型
# 尝试将参数转换为函数签名中对应的类型,并检测是不是 Image 类型,如果有则表示多个图像输入
param = list(sig.parameters.values())[i + 1]
param_type = param.annotation
arg_value = input_filter_args[i]
@ -51,6 +49,14 @@ def prase_input_args(input_str: str) -> list[FilterItem]:
converted_value = float(arg_value)
elif param_type is int:
converted_value = int(arg_value)
elif param_type is bool:
converted_value = arg_value.lower() in ['true', '1', 'yes', '', '']
elif param_type is Image.Image:
converted_value = ImageRequireSignal()
elif param_type is SenderInfo:
converted_value = sender_info
elif param_type is list[Image.Image]:
converted_value = ImagesListRequireSignal()
else:
converted_value = arg_value
except Exception:
@ -59,14 +65,47 @@ def prase_input_args(input_str: str) -> list[FilterItem]:
args.append(FilterItem(name=filter_name,filter=filter_func, args=func_args))
return args
def apply_filters_to_image(img: Image, filters: list[FilterItem]) -> Image:
def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image:
for filter_item in filters:
filter_func = filter_item.filter
func_args = filter_item.args
img = filter_func(img, *func_args)
return img
# 检测参数中是否有 ImageRequireSignal如果有则传入对应数量的图像列表
if any(isinstance(arg, ImageRequireSignal) for arg in func_args):
# 替换 ImageRequireSignal 为 images 对应索引的图像
actual_args = []
img_signal_count = 1 # 从 images[1] 开始取图像
for arg in func_args:
if isinstance(arg, ImageRequireSignal):
actual_args.append(images[img_signal_count])
img_signal_count += 1
else:
actual_args.append(arg)
func_args = actual_args
# 检测参数中是否有 ImagesListRequireSignal如果有则传入整个图像列表
if any(isinstance(arg, ImagesListRequireSignal) for arg in func_args):
actual_args = []
for arg in func_args:
if isinstance(arg, ImagesListRequireSignal):
actual_args.append(images)
else:
actual_args.append(arg)
func_args = actual_args
images[0] = filter_func(images[0], *func_args)
return images[0]
async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem]) -> BytesIO:
# 如果第一项是“加载图像”参数,那么就加载图像
if filters and filters[0].name == "加载图像":
load_filter = filters.pop(0)
# 加载全部路径
for path in load_filter.args:
img = Image.open(path)
images.append(img)
if len(images) <= 0:
raise ValueError("没有提供任何图像进行处理")
async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem]) -> BytesIO:
# 检测是否需要将静态图视作动图处理
frozen_to_move = any(
filter_item.name == "动图"
@ -83,7 +122,7 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem])
static_fps = 10
break
# 如果 image 是动图,则逐帧处理
img = Image.open(BytesIO(image_bytes))
img = images[0]
logger.debug("开始图像处理")
output = BytesIO()
if getattr(img, "is_animated", False) or frozen_to_move:
@ -101,13 +140,12 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem])
all_frames.append(img.copy())
img.info['duration'] = int(1000 / static_fps)
async def process_single_frame(frame: Image.Image, frame_idx: int) -> Image.Image:
async def process_single_frame(frame: list[Image.Image], frame_idx: int) -> Image.Image:
"""处理单帧的异步函数"""
logger.debug(f"开始处理帧 {frame_idx}")
result = await asynkio.to_thread(apply_filters_to_image, frame, filters)
result = await asynkio.to_thread(handle_filters_to_image, frame, images, filters)
logger.debug(f"完成处理帧 {frame_idx}")
return result
return result[0]
# 并发处理所有帧
tasks = []
for i, frame in enumerate(all_frames):
@ -135,7 +173,7 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem])
)
logger.debug("Animated image saved")
else:
img = apply_filters_to_image(img, filters)
img = handle_filters_to_image(images=images, filters=filters)
img.save(output, format="PNG")
logger.debug("Image processing completed")
output.seek(0)
@ -150,14 +188,28 @@ def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
fx_on = on_message(rule=is_fx_mentioned)
@fx_on.handle()
async def _(msg: UniMsg, event: BaseEvent, img: DepImageBytes):
async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone = None):
preload_imgs = []
# 提取图像
try:
if image_data is not None:
preload_imgs.append(Image.open(BytesIO(image_data)))
logger.debug("Image extracted for FX processing.")
except Exception:
logger.info("No image found in message for FX processing.")
args = msg.extract_plain_text().split()
if len(args) < 2:
return
filters = prase_input_args(msg.extract_plain_text()[2:])
sender_info = SenderInfo(
group_id=target.channel_id,
qq_id=target.target_id
)
filters = prase_input_args(msg.extract_plain_text()[2:], sender_info=sender_info)
if not filters:
return
output = await apply_filters_to_bytes(img, filters)
output = await apply_filters_to_images(preload_imgs, filters)
logger.debug("FX processing completed, sending result.")
await fx_on.send(await UniMessage().image(raw=output).export())