forked from mttu-developers/konabot
最新最热
This commit is contained in:
@ -1,10 +1,12 @@
|
||||
import asyncio as asynkio
|
||||
from dataclasses import dataclass
|
||||
from io import BytesIO
|
||||
|
||||
from inspect import signature
|
||||
|
||||
from konabot.common.nb.extract_image import DepPILImage
|
||||
from konabot.common.nb.extract_image import DepImageBytes, DepPILImage
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot import on_message
|
||||
from nonebot import on_message, logger
|
||||
|
||||
from nonebot_plugin_alconna import (
|
||||
UniMessage,
|
||||
@ -13,46 +15,126 @@ from nonebot_plugin_alconna import (
|
||||
|
||||
from konabot.plugins.fx_process.fx_manager import ImageFilterManager
|
||||
|
||||
from PIL import Image, ImageSequence
|
||||
|
||||
@dataclass
|
||||
class FilterItem:
|
||||
filter: callable
|
||||
args: list
|
||||
|
||||
def prase_input_args(input_str: str) -> list[FilterItem]:
|
||||
# 按分号或换行符分割参数
|
||||
args = []
|
||||
for part in input_str.replace('\n', ';').split(';'):
|
||||
part = part.strip()
|
||||
if not part:
|
||||
continue
|
||||
split_part = part.split()
|
||||
filter_name = split_part[0]
|
||||
if not ImageFilterManager.has_filter(filter_name):
|
||||
continue
|
||||
filter_func = ImageFilterManager.get_filter(filter_name)
|
||||
input_filter_args = split_part[1:]
|
||||
# 获取函数最大参数数量
|
||||
sig = signature(filter_func)
|
||||
max_params = len(sig.parameters) - 1 # 减去第一个参数 image
|
||||
# 从 args 提取参数,并转换为适当类型
|
||||
func_args = []
|
||||
for i in range(0, min(len(input_filter_args), max_params)):
|
||||
# 尝试将参数转换为函数签名中对应的类型
|
||||
param = list(sig.parameters.values())[i + 1]
|
||||
param_type = param.annotation
|
||||
arg_value = input_filter_args[i]
|
||||
try:
|
||||
if param_type is float:
|
||||
converted_value = float(arg_value)
|
||||
elif param_type is int:
|
||||
converted_value = int(arg_value)
|
||||
else:
|
||||
converted_value = arg_value
|
||||
except Exception:
|
||||
converted_value = arg_value
|
||||
func_args.append(converted_value)
|
||||
args.append(FilterItem(filter=filter_func, args=func_args))
|
||||
return args
|
||||
|
||||
def apply_filters_to_image(img: Image, filters: list[FilterItem]) -> Image:
|
||||
for filter_item in filters:
|
||||
filter_func = filter_item.filter
|
||||
func_args = filter_item.args
|
||||
img = filter_func(img, *func_args)
|
||||
return img
|
||||
|
||||
async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem]) -> BytesIO:
|
||||
# 如果 image 是动图,则逐帧处理
|
||||
img = Image.open(BytesIO(image_bytes))
|
||||
logger.debug("开始图像处理")
|
||||
output = BytesIO()
|
||||
if getattr(img, "is_animated", False):
|
||||
frames = []
|
||||
all_frames = []
|
||||
for frame in ImageSequence.Iterator(img):
|
||||
frame_copy = frame.copy()
|
||||
all_frames.append(frame_copy)
|
||||
|
||||
async def process_single_frame(frame: Image.Image, frame_idx: int) -> Image.Image:
|
||||
"""处理单帧的异步函数"""
|
||||
logger.debug(f"开始处理帧 {frame_idx}")
|
||||
result = await asynkio.to_thread(apply_filters_to_image, frame, filters)
|
||||
logger.debug(f"完成处理帧 {frame_idx}")
|
||||
return result
|
||||
|
||||
# 并发处理所有帧
|
||||
tasks = []
|
||||
for i, frame in enumerate(all_frames):
|
||||
task = process_single_frame(frame, i)
|
||||
tasks.append(task)
|
||||
|
||||
frames = await asynkio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# 检查是否有处理失败的帧
|
||||
for i, result in enumerate(frames):
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"帧 {i} 处理失败: {result}")
|
||||
# 使用原始帧作为回退
|
||||
frames[i] = all_frames[i]
|
||||
|
||||
logger.debug("保存动图")
|
||||
frames[0].save(
|
||||
output,
|
||||
format="GIF",
|
||||
save_all=True,
|
||||
append_images=frames[1:],
|
||||
loop=img.info.get("loop", 0),
|
||||
disposal=img.info.get("disposal", 2),
|
||||
duration=img.info.get("duration", 100),
|
||||
)
|
||||
logger.debug("Animated image saved")
|
||||
else:
|
||||
img = apply_filters_to_image(img, filters)
|
||||
img.save(output, format="PNG")
|
||||
logger.debug("Image processing completed")
|
||||
output.seek(0)
|
||||
return output
|
||||
|
||||
|
||||
def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
|
||||
txt = msg.extract_plain_text()
|
||||
if "fx" not in txt[:3]:
|
||||
if "fx" not in txt[:3].lower():
|
||||
return False
|
||||
return True
|
||||
|
||||
fx_on = on_message(rule=is_fx_mentioned)
|
||||
|
||||
@fx_on.handle()
|
||||
async def _(msg: UniMsg, event: BaseEvent, img: DepPILImage):
|
||||
async def _(msg: UniMsg, event: BaseEvent, img: DepImageBytes):
|
||||
args = msg.extract_plain_text().split()
|
||||
if len(args) < 2:
|
||||
return
|
||||
filter_name = args[1]
|
||||
filter_func = ImageFilterManager.get_filter(filter_name)
|
||||
if not filter_func:
|
||||
filters = prase_input_args(msg.extract_plain_text()[2:])
|
||||
if not filters:
|
||||
return
|
||||
# 获取函数最大参数数量
|
||||
sig = signature(filter_func)
|
||||
max_params = len(sig.parameters) - 1 # 减去第一个参数 image
|
||||
# 从 args 提取参数,并转换为适当类型
|
||||
func_args = []
|
||||
for i in range(2, min(len(args), max_params + 2)):
|
||||
# 尝试将参数转换为函数签名中对应的类型
|
||||
param = list(sig.parameters.values())[i - 1]
|
||||
param_type = param.annotation
|
||||
arg_value = args[i]
|
||||
try:
|
||||
if param_type is float:
|
||||
converted_value = float(arg_value)
|
||||
elif param_type is int:
|
||||
converted_value = int(arg_value)
|
||||
else:
|
||||
converted_value = arg_value
|
||||
except Exception:
|
||||
converted_value = arg_value
|
||||
func_args.append(converted_value)
|
||||
# 应用滤镜
|
||||
out_img = filter_func(img, *func_args)
|
||||
output = BytesIO()
|
||||
out_img.save(output, format="PNG")
|
||||
output = await apply_filters_to_bytes(img, filters)
|
||||
logger.debug("FX processing completed, sending result.")
|
||||
await fx_on.send(await UniMessage().image(raw=output).export())
|
||||
|
||||
Reference in New Issue
Block a user