forked from mttu-developers/konabot
新滤镜,新修复
This commit is contained in:
@ -19,6 +19,7 @@ from PIL import Image, ImageSequence
|
||||
|
||||
@dataclass
|
||||
class FilterItem:
|
||||
name: str
|
||||
filter: callable
|
||||
args: list
|
||||
|
||||
@ -55,7 +56,7 @@ def prase_input_args(input_str: str) -> list[FilterItem]:
|
||||
except Exception:
|
||||
converted_value = arg_value
|
||||
func_args.append(converted_value)
|
||||
args.append(FilterItem(filter=filter_func, args=func_args))
|
||||
args.append(FilterItem(name=filter_name,filter=filter_func, args=func_args))
|
||||
return args
|
||||
|
||||
def apply_filters_to_image(img: Image, filters: list[FilterItem]) -> Image:
|
||||
@ -66,16 +67,39 @@ def apply_filters_to_image(img: Image, filters: list[FilterItem]) -> Image:
|
||||
return img
|
||||
|
||||
async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem]) -> BytesIO:
|
||||
# 检测是否需要将静态图视作动图处理
|
||||
frozen_to_move = any(
|
||||
filter_item.name == "动图"
|
||||
for filter_item in filters
|
||||
)
|
||||
static_fps = 10
|
||||
# 找到动图参数 fps
|
||||
if frozen_to_move:
|
||||
for filter_item in filters:
|
||||
if filter_item.name == "动图" and filter_item.args:
|
||||
try:
|
||||
static_fps = int(filter_item.args[0])
|
||||
except Exception:
|
||||
static_fps = 10
|
||||
break
|
||||
# 如果 image 是动图,则逐帧处理
|
||||
img = Image.open(BytesIO(image_bytes))
|
||||
logger.debug("开始图像处理")
|
||||
output = BytesIO()
|
||||
if getattr(img, "is_animated", False):
|
||||
if getattr(img, "is_animated", False) or frozen_to_move:
|
||||
frames = []
|
||||
all_frames = []
|
||||
for frame in ImageSequence.Iterator(img):
|
||||
frame_copy = frame.copy()
|
||||
all_frames.append(frame_copy)
|
||||
if getattr(img, "is_animated", False):
|
||||
logger.debug("处理动图帧")
|
||||
for frame in ImageSequence.Iterator(img):
|
||||
frame_copy = frame.copy()
|
||||
all_frames.append(frame_copy)
|
||||
else:
|
||||
# 将静态图视作单帧动图处理,拷贝多份
|
||||
logger.debug("处理静态图为多帧动图")
|
||||
for _ in range(10): # 默认复制10帧
|
||||
all_frames.append(img.copy())
|
||||
img.info['duration'] = int(1000 / static_fps)
|
||||
|
||||
async def process_single_frame(frame: Image.Image, frame_idx: int) -> Image.Image:
|
||||
"""处理单帧的异步函数"""
|
||||
@ -117,7 +141,6 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem])
|
||||
output.seek(0)
|
||||
return output
|
||||
|
||||
|
||||
def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
|
||||
txt = msg.extract_plain_text()
|
||||
if "fx" not in txt[:3].lower():
|
||||
|
||||
Reference in New Issue
Block a user