Compare commits
5 Commits
16b0451133
...
fx_storage
| Author | SHA1 | Date | |
|---|---|---|---|
| 54fae88914 | |||
| eed21e6223 | |||
| bf5c10b7a7 | |||
| 274ca0fa9a | |||
| c72cdd6a6b |
@ -208,5 +208,8 @@ async def _ext_img(
|
||||
return None
|
||||
|
||||
|
||||
|
||||
DepImageBytes = Annotated[bytes, nonebot.params.Depends(_ext_img_data)]
|
||||
DepPILImage = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]
|
||||
|
||||
DepImageBytesOrNone = Annotated[bytes | None, nonebot.params.Depends(_ext_img_data)]
|
||||
|
||||
@ -26,7 +26,7 @@ fx [滤镜名称] <参数1> <参数2> ...
|
||||
* ```fx 查找边缘```
|
||||
* ```fx 平滑```
|
||||
* ```fx 暗角 <半径=1.5>```
|
||||
* ```fx 发光 <强度=1.5> <模糊半径=15>```
|
||||
* ```fx 发光 <强度=0.5> <模糊半径=15>```
|
||||
* ```fx 噪点 <数量=0.05>```
|
||||
* ```fx 素描```
|
||||
* ```fx 阴影 <x偏移量=10> <y偏移量=10> <模糊量=10> <不透明度=0.5> <阴影颜色=black>```
|
||||
@ -50,10 +50,11 @@ fx [滤镜名称] <参数1> <参数2> ...
|
||||
* ```fx 色调 <颜色="rgb(255,0,0)">```
|
||||
* ```fx RGB分离 <偏移量=5>```
|
||||
* ```fx 叠加颜色 <颜色列表=[rgb(255,0,0)|(0,0),rgb(0,255,0)|(0,100),rgb(0,0,255)|(50,100)]> <叠加模式=overlay>```
|
||||
* ```fx 像素抖动 <最大偏移量=2>```
|
||||
|
||||
### 几何变换滤镜
|
||||
* ```fx 平移 <x偏移量=10> <y偏移量=10>```
|
||||
* ```fx 缩放 <比例=1.5>```
|
||||
* ```fx 缩放 <比例(X)=1.5> <比例Y=None>```
|
||||
* ```fx 旋转 <角度=45>```
|
||||
* ```fx 透视变换 <变换矩阵>```
|
||||
* ```fx 裁剪 <左=0> <上=0> <右=100> <下=100>(百分比)```
|
||||
@ -61,9 +62,15 @@ fx [滤镜名称] <参数1> <参数2> ...
|
||||
* ```fx 波纹 <振幅=5> <波长=20>```
|
||||
* ```fx 光学补偿 <数量=100> <反转=false>```
|
||||
* ```fx 球面化 <强度=0.5>```
|
||||
* ```fx 镜像 <角度=90>```
|
||||
* ```fx 水平翻转```
|
||||
* ```fx 垂直翻转```
|
||||
* ```fx 复制 <目标位置=(100,100)> <缩放=1.0> <源区域=(0,0,100,100)>(百分比)```
|
||||
|
||||
### 特殊效果滤镜
|
||||
* ```fx 色键 <目标颜色="rgb(255,0,0)"> <容差=60>```
|
||||
* ```fx 晃动 <最大偏移量=5> <运动模糊=False>```
|
||||
* ```fx 动图 <帧率=10>```
|
||||
|
||||
## 颜色名称支持
|
||||
- **基本颜色**:红、绿、蓝、黄、紫、黑、白、橙、粉、灰、青、靛、棕
|
||||
|
||||
@ -4,9 +4,11 @@ from io import BytesIO
|
||||
|
||||
from inspect import signature
|
||||
|
||||
from konabot.common.nb.extract_image import DepImageBytes, DepPILImage
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.extract_image import DepImageBytesOrNone
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot import on_message, logger
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
from nonebot_plugin_alconna import (
|
||||
UniMessage,
|
||||
@ -17,12 +19,9 @@ from konabot.plugins.fx_process.fx_manager import ImageFilterManager
|
||||
|
||||
from PIL import Image, ImageSequence
|
||||
|
||||
@dataclass
|
||||
class FilterItem:
|
||||
filter: callable
|
||||
args: list
|
||||
from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo
|
||||
|
||||
def prase_input_args(input_str: str) -> list[FilterItem]:
|
||||
def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[FilterItem]:
|
||||
# 按分号或换行符分割参数
|
||||
args = []
|
||||
for part in input_str.replace('\n', ';').split(';'):
|
||||
@ -41,7 +40,7 @@ def prase_input_args(input_str: str) -> list[FilterItem]:
|
||||
# 从 args 提取参数,并转换为适当类型
|
||||
func_args = []
|
||||
for i in range(0, min(len(input_filter_args), max_params)):
|
||||
# 尝试将参数转换为函数签名中对应的类型
|
||||
# 尝试将参数转换为函数签名中对应的类型,并检测是不是 Image 类型,如果有则表示多个图像输入
|
||||
param = list(sig.parameters.values())[i + 1]
|
||||
param_type = param.annotation
|
||||
arg_value = input_filter_args[i]
|
||||
@ -50,40 +49,103 @@ def prase_input_args(input_str: str) -> list[FilterItem]:
|
||||
converted_value = float(arg_value)
|
||||
elif param_type is int:
|
||||
converted_value = int(arg_value)
|
||||
elif param_type is bool:
|
||||
converted_value = arg_value.lower() in ['true', '1', 'yes', '是', '开']
|
||||
elif param_type is Image.Image:
|
||||
converted_value = ImageRequireSignal()
|
||||
elif param_type is SenderInfo:
|
||||
converted_value = sender_info
|
||||
elif param_type is list[Image.Image]:
|
||||
converted_value = ImagesListRequireSignal()
|
||||
else:
|
||||
converted_value = arg_value
|
||||
except Exception:
|
||||
converted_value = arg_value
|
||||
func_args.append(converted_value)
|
||||
args.append(FilterItem(filter=filter_func, args=func_args))
|
||||
args.append(FilterItem(name=filter_name,filter=filter_func, args=func_args))
|
||||
return args
|
||||
|
||||
def apply_filters_to_image(img: Image, filters: list[FilterItem]) -> Image:
|
||||
def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image:
|
||||
for filter_item in filters:
|
||||
filter_func = filter_item.filter
|
||||
func_args = filter_item.args
|
||||
img = filter_func(img, *func_args)
|
||||
return img
|
||||
# 检测参数中是否有 ImageRequireSignal,如果有则传入对应数量的图像列表
|
||||
if any(isinstance(arg, ImageRequireSignal) for arg in func_args):
|
||||
# 替换 ImageRequireSignal 为 images 对应索引的图像
|
||||
actual_args = []
|
||||
img_signal_count = 1 # 从 images[1] 开始取图像
|
||||
for arg in func_args:
|
||||
if isinstance(arg, ImageRequireSignal):
|
||||
actual_args.append(images[img_signal_count])
|
||||
img_signal_count += 1
|
||||
else:
|
||||
actual_args.append(arg)
|
||||
func_args = actual_args
|
||||
# 检测参数中是否有 ImagesListRequireSignal,如果有则传入整个图像列表
|
||||
if any(isinstance(arg, ImagesListRequireSignal) for arg in func_args):
|
||||
actual_args = []
|
||||
for arg in func_args:
|
||||
if isinstance(arg, ImagesListRequireSignal):
|
||||
actual_args.append(images)
|
||||
else:
|
||||
actual_args.append(arg)
|
||||
func_args = actual_args
|
||||
|
||||
async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem]) -> BytesIO:
|
||||
images[0] = filter_func(images[0], *func_args)
|
||||
return images[0]
|
||||
|
||||
async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem]) -> BytesIO:
|
||||
# 如果第一项是“加载图像”参数,那么就加载图像
|
||||
if filters and filters[0].name == "加载图像":
|
||||
load_filter = filters.pop(0)
|
||||
# 加载全部路径
|
||||
for path in load_filter.args:
|
||||
img = Image.open(path)
|
||||
images.append(img)
|
||||
|
||||
if len(images) <= 0:
|
||||
raise ValueError("没有提供任何图像进行处理")
|
||||
|
||||
# 检测是否需要将静态图视作动图处理
|
||||
frozen_to_move = any(
|
||||
filter_item.name == "动图"
|
||||
for filter_item in filters
|
||||
)
|
||||
static_fps = 10
|
||||
# 找到动图参数 fps
|
||||
if frozen_to_move:
|
||||
for filter_item in filters:
|
||||
if filter_item.name == "动图" and filter_item.args:
|
||||
try:
|
||||
static_fps = int(filter_item.args[0])
|
||||
except Exception:
|
||||
static_fps = 10
|
||||
break
|
||||
# 如果 image 是动图,则逐帧处理
|
||||
img = Image.open(BytesIO(image_bytes))
|
||||
img = images[0]
|
||||
logger.debug("开始图像处理")
|
||||
output = BytesIO()
|
||||
if getattr(img, "is_animated", False):
|
||||
if getattr(img, "is_animated", False) or frozen_to_move:
|
||||
frames = []
|
||||
all_frames = []
|
||||
for frame in ImageSequence.Iterator(img):
|
||||
frame_copy = frame.copy()
|
||||
all_frames.append(frame_copy)
|
||||
if getattr(img, "is_animated", False):
|
||||
logger.debug("处理动图帧")
|
||||
for frame in ImageSequence.Iterator(img):
|
||||
frame_copy = frame.copy()
|
||||
all_frames.append(frame_copy)
|
||||
else:
|
||||
# 将静态图视作单帧动图处理,拷贝多份
|
||||
logger.debug("处理静态图为多帧动图")
|
||||
for _ in range(10): # 默认复制10帧
|
||||
all_frames.append(img.copy())
|
||||
img.info['duration'] = int(1000 / static_fps)
|
||||
|
||||
async def process_single_frame(frame: Image.Image, frame_idx: int) -> Image.Image:
|
||||
async def process_single_frame(frame: list[Image.Image], frame_idx: int) -> Image.Image:
|
||||
"""处理单帧的异步函数"""
|
||||
logger.debug(f"开始处理帧 {frame_idx}")
|
||||
result = await asynkio.to_thread(apply_filters_to_image, frame, filters)
|
||||
result = await asynkio.to_thread(handle_filters_to_image, frame, images, filters)
|
||||
logger.debug(f"完成处理帧 {frame_idx}")
|
||||
return result
|
||||
|
||||
return result[0]
|
||||
# 并发处理所有帧
|
||||
tasks = []
|
||||
for i, frame in enumerate(all_frames):
|
||||
@ -111,13 +173,12 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem])
|
||||
)
|
||||
logger.debug("Animated image saved")
|
||||
else:
|
||||
img = apply_filters_to_image(img, filters)
|
||||
img = handle_filters_to_image(images=images, filters=filters)
|
||||
img.save(output, format="PNG")
|
||||
logger.debug("Image processing completed")
|
||||
output.seek(0)
|
||||
return output
|
||||
|
||||
|
||||
def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
|
||||
txt = msg.extract_plain_text()
|
||||
if "fx" not in txt[:3].lower():
|
||||
@ -127,14 +188,28 @@ def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
|
||||
fx_on = on_message(rule=is_fx_mentioned)
|
||||
|
||||
@fx_on.handle()
|
||||
async def _(msg: UniMsg, event: BaseEvent, img: DepImageBytes):
|
||||
async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone = None):
|
||||
preload_imgs = []
|
||||
# 提取图像
|
||||
try:
|
||||
if image_data is not None:
|
||||
preload_imgs.append(Image.open(BytesIO(image_data)))
|
||||
logger.debug("Image extracted for FX processing.")
|
||||
except Exception:
|
||||
logger.info("No image found in message for FX processing.")
|
||||
args = msg.extract_plain_text().split()
|
||||
if len(args) < 2:
|
||||
return
|
||||
filters = prase_input_args(msg.extract_plain_text()[2:])
|
||||
|
||||
sender_info = SenderInfo(
|
||||
group_id=target.channel_id,
|
||||
qq_id=target.target_id
|
||||
)
|
||||
|
||||
filters = prase_input_args(msg.extract_plain_text()[2:], sender_info=sender_info)
|
||||
if not filters:
|
||||
return
|
||||
output = await apply_filters_to_bytes(img, filters)
|
||||
output = await apply_filters_to_images(preload_imgs, filters)
|
||||
logger.debug("FX processing completed, sending result.")
|
||||
await fx_on.send(await UniMessage().image(raw=output).export())
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import random
|
||||
from PIL import Image, ImageFilter
|
||||
from PIL import ImageEnhance
|
||||
from PIL import ImageChops
|
||||
@ -10,6 +11,9 @@ import math
|
||||
from konabot.plugins.fx_process.gradient import GradientGenerator
|
||||
import numpy as np
|
||||
|
||||
from konabot.plugins.fx_process.image_storage import ImageStorager
|
||||
from konabot.plugins.fx_process.types import SenderInfo
|
||||
|
||||
class ImageFilterImplement:
|
||||
@staticmethod
|
||||
def apply_blur(image: Image.Image, radius: float = 10) -> Image.Image:
|
||||
@ -137,9 +141,22 @@ class ImageFilterImplement:
|
||||
|
||||
# 缩放
|
||||
@staticmethod
|
||||
def apply_resize(image: Image.Image, scale: float = 1.5) -> Image.Image:
|
||||
if scale <= 0:
|
||||
scale = 1.0
|
||||
def apply_resize(image: Image.Image, scale: float = 1.5, scale_y = None) -> Image.Image:
|
||||
# scale 可以为负
|
||||
# 如果 scale 为负,则代表翻转
|
||||
if scale_y is not None:
|
||||
if float(scale_y) < 0:
|
||||
image = ImageOps.flip(image)
|
||||
scale_y = abs(float(scale_y))
|
||||
if scale < 0:
|
||||
image = ImageOps.mirror(image)
|
||||
scale = abs(scale)
|
||||
new_size = (int(image.width * scale), int(image.height * float(scale_y)))
|
||||
return image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
if scale < 0:
|
||||
image = ImageOps.mirror(image)
|
||||
image = ImageOps.flip(image)
|
||||
scale = abs(scale)
|
||||
new_size = (int(image.width * scale), int(image.height * scale))
|
||||
return image.resize(new_size, Image.Resampling.LANCZOS)
|
||||
|
||||
@ -224,7 +241,7 @@ class ImageFilterImplement:
|
||||
|
||||
# 发光
|
||||
@staticmethod
|
||||
def apply_glow(image: Image.Image, intensity: float = 1.5, blur_radius: float = 15) -> Image.Image:
|
||||
def apply_glow(image: Image.Image, intensity: float = 0.5, blur_radius: float = 15) -> Image.Image:
|
||||
if image.mode != 'RGBA':
|
||||
image = image.convert('RGBA')
|
||||
# 创建发光图层
|
||||
@ -423,6 +440,8 @@ class ImageFilterImplement:
|
||||
# 裁剪
|
||||
@staticmethod
|
||||
def apply_crop(image: Image.Image, left: float = 0, upper: float = 0, right: float = 100, lower: float = 100) -> Image.Image:
|
||||
if image.mode != 'RGBA':
|
||||
image = image.convert('RGBA')
|
||||
# 按百分比裁剪
|
||||
width, height = image.size
|
||||
left_px = int(width * left / 100)
|
||||
@ -451,9 +470,10 @@ class ImageFilterImplement:
|
||||
arr = np.array(image)
|
||||
noise = np.random.randint(0, 256, arr.shape, dtype=np.uint8)
|
||||
|
||||
# 为每个像素创建掩码,然后扩展到所有通道
|
||||
# 为每个像素创建掩码,然后扩展到所有通道,除了 alpha 通道
|
||||
mask = np.random.rand(*arr.shape[:2]) < amount
|
||||
mask_3d = mask[:, :, np.newaxis] # 添加第三个维度
|
||||
mask_3d = np.repeat(mask[:, :, np.newaxis], 4, axis=2)
|
||||
mask_3d[:, :, 3] = False # 保持 alpha 通道不变
|
||||
|
||||
# 混合噪点
|
||||
result = np.where(mask_3d, noise, arr)
|
||||
@ -545,8 +565,8 @@ class ImageFilterImplement:
|
||||
def apply_shadow(image: Image.Image,
|
||||
x_offset: int = 10,
|
||||
y_offset: int = 10,
|
||||
blur = 10,
|
||||
opacity = 0.5,
|
||||
blur: float = 10,
|
||||
opacity: float = 0.5,
|
||||
shadow_color = "black") -> Image.Image:
|
||||
if image.mode != 'RGBA':
|
||||
image = image.convert('RGBA')
|
||||
@ -887,4 +907,205 @@ class ImageFilterImplement:
|
||||
result[:, :, c] = arr[:, :, c] * (1 - distance) + blurred_arr[:, :, c] * distance
|
||||
|
||||
return Image.fromarray(np.clip(result, 0, 255).astype(np.uint8), 'RGBA')
|
||||
|
||||
# 复制画面
|
||||
@staticmethod
|
||||
def copy_area(image: Image.Image,
|
||||
dst_move: str = '(100,100)',
|
||||
scale: float = 1.0,
|
||||
src_area: str = '(0,0,100,100)') -> Image.Image:
|
||||
if image.mode != 'RGBA':
|
||||
image = image.convert('RGBA')
|
||||
result = image.copy()
|
||||
# 参数表示的是百分比
|
||||
width, height = image.size
|
||||
# 首先裁剪出源区域
|
||||
src_tuple = eval(src_area)
|
||||
src_area = ImageFilterImplement.apply_crop(image, src_tuple[0], src_tuple[1], src_tuple[2], src_tuple[3])
|
||||
# 计算目标位置
|
||||
dst_tuple = eval(dst_move)
|
||||
dst_x = int(width * dst_tuple[0] / 100)
|
||||
dst_y = int(height * dst_tuple[1] / 100)
|
||||
# 缩放源区域
|
||||
if scale != 1.0:
|
||||
new_width = int(src_area.width * scale)
|
||||
new_height = int(src_area.height * scale)
|
||||
src_area = src_area.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||||
# 粘贴到目标位置
|
||||
result.paste(src_area, (dst_x, dst_y), src_area)
|
||||
return result
|
||||
|
||||
# 水平翻转
|
||||
@staticmethod
|
||||
def apply_flip_horizontal(image: Image.Image) -> Image.Image:
|
||||
return image.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
|
||||
|
||||
# 垂直翻转
|
||||
@staticmethod
|
||||
def apply_flip_vertical(image: Image.Image) -> Image.Image:
|
||||
return image.transpose(Image.Transpose.FLIP_TOP_BOTTOM)
|
||||
|
||||
# 依方向镜像一半画面
|
||||
@staticmethod
|
||||
def apply_mirror_half(image: Image.Image, angle: float = 90) -> Image.Image:
|
||||
if image.mode != 'RGBA':
|
||||
image = image.convert('RGBA')
|
||||
|
||||
width, height = image.size
|
||||
arr = np.array(image, dtype=np.uint8)
|
||||
|
||||
# 角度标准化
|
||||
angle = angle % 360
|
||||
|
||||
# 计算折痕的斜率
|
||||
# 注意:我们的角度是折痕与水平线的夹角
|
||||
# 0° = 垂直线,90° = 水平线
|
||||
rad = math.radians(angle)
|
||||
|
||||
# 计算中心点
|
||||
center_x = width / 2
|
||||
center_y = height / 2
|
||||
|
||||
# 创建坐标网格
|
||||
y_coords, x_coords = np.mgrid[0:height, 0:width]
|
||||
|
||||
# 计算折痕的方向向量
|
||||
# 折痕方向垂直于法向量
|
||||
# 对于角度θ,折痕方向向量为(cosθ, sinθ)
|
||||
direction_x = math.cos(rad)
|
||||
direction_y = math.sin(rad)
|
||||
|
||||
# 计算法向量(垂直于折痕)
|
||||
# 旋转90度:(-sinθ, cosθ)
|
||||
normal_x = -direction_y
|
||||
normal_y = direction_x
|
||||
|
||||
# 计算每个像素相对于折痕的位置
|
||||
# 点到直线的有向距离: (x-cx)*nx + (y-cy)*ny
|
||||
rel_x = x_coords - center_x
|
||||
rel_y = y_coords - center_y
|
||||
signed_distance = rel_x * normal_x + rel_y * normal_y
|
||||
|
||||
source_mask = signed_distance >= 0
|
||||
|
||||
# 创建结果图像
|
||||
result = arr.copy()
|
||||
|
||||
# 获取镜像侧的像素(需要被替换的像素)
|
||||
mirror_mask = ~source_mask
|
||||
|
||||
if mirror_mask.any():
|
||||
# 对于镜像侧的每个像素,计算其关于折痕的镜像点
|
||||
mirror_x = x_coords[mirror_mask]
|
||||
mirror_y = y_coords[mirror_mask]
|
||||
|
||||
# 计算相对坐标
|
||||
mirror_rel_x = mirror_x - center_x
|
||||
mirror_rel_y = mirror_y - center_y
|
||||
|
||||
# 计算到折痕的距离(带符号)
|
||||
dist_to_line = signed_distance[mirror_mask]
|
||||
|
||||
# 计算镜像点的相对坐标
|
||||
# 镜像公式: p' = p - 2 * (p·n) * n
|
||||
# 其中p是相对中心点的向量,n是单位法向量
|
||||
mirror_target_rel_x = mirror_rel_x - 2 * dist_to_line * normal_x
|
||||
mirror_target_rel_y = mirror_rel_y - 2 * dist_to_line * normal_y
|
||||
|
||||
# 计算镜像点的绝对坐标
|
||||
target_x = np.round(center_x + mirror_target_rel_x).astype(int)
|
||||
target_y = np.round(center_y + mirror_target_rel_y).astype(int)
|
||||
|
||||
# 确保坐标在图像范围内
|
||||
valid = (target_x >= 0) & (target_x < width) & (target_y >= 0) & (target_y < height)
|
||||
|
||||
if valid.any():
|
||||
# 将源侧的像素复制到镜像侧的位置
|
||||
valid_target_x = target_x[valid]
|
||||
valid_target_y = target_y[valid]
|
||||
valid_mirror_x = mirror_x[valid]
|
||||
valid_mirror_y = mirror_y[valid]
|
||||
|
||||
# 获取源侧的像素值
|
||||
source_pixels = arr[valid_target_y, valid_target_x]
|
||||
|
||||
# 复制到镜像位置
|
||||
result[valid_mirror_y, valid_mirror_x] = source_pixels
|
||||
|
||||
return Image.fromarray(result, 'RGBA')
|
||||
|
||||
# 随机画面整体晃动,对于动图会很有效
|
||||
@staticmethod
|
||||
def apply_random_wiggle(image: Image.Image, max_offset: int = 5, motion_blur = False) -> Image.Image:
|
||||
if image.mode != 'RGBA':
|
||||
image = image.convert('RGBA')
|
||||
|
||||
width, height = image.size
|
||||
arr = np.array(image)
|
||||
# 生成随机偏移
|
||||
x_offset = random.randint(-max_offset, max_offset)
|
||||
y_offset = random.randint(-max_offset, max_offset)
|
||||
# 画面平移
|
||||
translated_image = ImageFilterImplement.apply_translate(image, x_offset, y_offset)
|
||||
if motion_blur:
|
||||
# 添加运动模糊
|
||||
translated_image = ImageFilterImplement.apply_directional_blur(translated_image,
|
||||
angle=random.uniform(0, 360),
|
||||
distance=max_offset,
|
||||
samples=6)
|
||||
return translated_image
|
||||
|
||||
# 像素随机抖动
|
||||
@staticmethod
|
||||
def apply_pixel_jitter(image: Image.Image, max_offset: int = 2) -> Image.Image:
|
||||
if image.mode != 'RGBA':
|
||||
image = image.convert('RGBA')
|
||||
|
||||
width, height = image.size
|
||||
arr = np.array(image)
|
||||
|
||||
# 创建结果图像
|
||||
result = np.zeros_like(arr)
|
||||
|
||||
# numpy 向量化随机偏移
|
||||
x_indices = np.arange(width)
|
||||
y_indices = np.arange(height)
|
||||
x_grid, y_grid = np.meshgrid(x_indices, y_indices)
|
||||
x_offsets = np.random.randint(-max_offset, max_offset + 1, size=(height, width))
|
||||
y_offsets = np.random.randint(-max_offset, max_offset + 1, size=(height, width))
|
||||
new_x = np.clip(x_grid + x_offsets, 0, width - 1)
|
||||
new_y = np.clip(y_grid + y_offsets, 0, height - 1)
|
||||
result[y_grid, x_grid] = arr[new_y, new_x]
|
||||
|
||||
return Image.fromarray(result, 'RGBA')
|
||||
|
||||
class ImageFilterEmpty:
|
||||
# 空滤镜,不做任何处理,形式化参数用
|
||||
@staticmethod
|
||||
def empty_filter(image):
|
||||
return image
|
||||
|
||||
@staticmethod
|
||||
def empty_filter_param(image, param = 10):
|
||||
return image
|
||||
|
||||
class ImageFilterStorage:
|
||||
# 用于存储图像
|
||||
@staticmethod
|
||||
def store_image(image: Image.Image, name: str, sender_info: SenderInfo) -> Image.Image:
|
||||
ImageStorager.save_image(image, name, sender_info.group_id, sender_info.qq_id)
|
||||
return image
|
||||
|
||||
# 用于暂存图像
|
||||
@staticmethod
|
||||
def temp_store_image(image: Image.Image, images: list[Image.Image]) -> Image.Image:
|
||||
images.append(image)
|
||||
return image
|
||||
|
||||
# 用于读取图像
|
||||
@staticmethod
|
||||
def load_image(image: Image.Image, name: str, images: list[Image.Image], sender_info: SenderInfo) -> Image.Image:
|
||||
loaded_image = ImageStorager.load_image(name, sender_info.group_id, sender_info.qq_id)
|
||||
if loaded_image is not None:
|
||||
images.append(loaded_image)
|
||||
return image
|
||||
@ -1,5 +1,5 @@
|
||||
from typing import Optional
|
||||
from konabot.plugins.fx_process.fx_handle import ImageFilterImplement
|
||||
from konabot.plugins.fx_process.fx_handle import ImageFilterEmpty, ImageFilterImplement, ImageFilterStorage
|
||||
|
||||
class ImageFilterManager:
|
||||
filter_map = {
|
||||
@ -40,6 +40,16 @@ class ImageFilterManager:
|
||||
"方向模糊": ImageFilterImplement.apply_directional_blur,
|
||||
"边缘模糊": ImageFilterImplement.apply_focus_blur,
|
||||
"缩放模糊": ImageFilterImplement.apply_zoom_blur,
|
||||
"镜像": ImageFilterImplement.apply_mirror_half,
|
||||
"水平翻转": ImageFilterImplement.apply_flip_horizontal,
|
||||
"垂直翻转": ImageFilterImplement.apply_flip_vertical,
|
||||
"复制": ImageFilterImplement.copy_area,
|
||||
"晃动": ImageFilterImplement.apply_random_wiggle,
|
||||
"动图": ImageFilterEmpty.empty_filter_param,
|
||||
"像素抖动": ImageFilterImplement.apply_pixel_jitter,
|
||||
"存入图像": ImageFilterStorage.store_image,
|
||||
"读取图像": ImageFilterStorage.load_image,
|
||||
"暂存图像": ImageFilterStorage.temp_store_image,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
@ -48,4 +58,6 @@ class ImageFilterManager:
|
||||
|
||||
@classmethod
|
||||
def has_filter(cls, name: str) -> bool:
|
||||
return name in cls.filter_map
|
||||
return name in cls.filter_map
|
||||
|
||||
|
||||
150
konabot/plugins/fx_process/image_storage.py
Normal file
150
konabot/plugins/fx_process/image_storage.py
Normal file
@ -0,0 +1,150 @@
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from hashlib import md5
|
||||
import time
|
||||
|
||||
from nonebot import logger
|
||||
from nonebot_plugin_apscheduler import driver
|
||||
from konabot.common.path import DATA_PATH
|
||||
import os
|
||||
from PIL import Image
|
||||
|
||||
IMAGE_PATH = DATA_PATH / "temp" / "images"
|
||||
|
||||
@dataclass
|
||||
class ImageResource:
|
||||
name: str
|
||||
expire: int
|
||||
|
||||
@dataclass
|
||||
class StorageImage:
|
||||
name: str
|
||||
resources: dict[str,
|
||||
dict[str,ImageResource]] # {群号: {QQ号: ImageResource}}
|
||||
|
||||
class ImageStorager:
|
||||
images_pool: dict[str,StorageImage] = {}
|
||||
|
||||
max_storage: int = 10 * 1024 * 1024 # 最大存储10MB
|
||||
max_image_count: int = 200 # 最大存储图片数量
|
||||
|
||||
@staticmethod
|
||||
def init():
|
||||
if not IMAGE_PATH.exists():
|
||||
IMAGE_PATH.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@staticmethod
|
||||
def delete_path_image(name: str):
|
||||
resource_path = IMAGE_PATH / name
|
||||
if resource_path.exists():
|
||||
os.remove(resource_path)
|
||||
|
||||
@classmethod
|
||||
async def clear_expire_image(cls):
|
||||
# 清理过期的图片资源,将未被删除的放入列表中,如果超过最大数量则删除最早过期的
|
||||
remaining_images = []
|
||||
current_time = time.time()
|
||||
for name, storage_image in list(ImageStorager.images_pool.items()):
|
||||
for group_id, resources in list(storage_image.resources.items()):
|
||||
for qq_id, resource in list(resources.items()):
|
||||
if resource.expire < current_time:
|
||||
del storage_image.resources[group_id][qq_id]
|
||||
cls.delete_path_image(name)
|
||||
else:
|
||||
remaining_images.append((name, group_id, qq_id, resource.expire))
|
||||
if not storage_image.resources:
|
||||
del ImageStorager.images_pool[name]
|
||||
# 如果剩余图片超过最大数量,按过期时间排序并删除最早过期的
|
||||
if len(remaining_images) > ImageStorager.max_image_count:
|
||||
remaining_images.sort(key=lambda x: x[3]) # 按过期时间排序
|
||||
to_delete = len(remaining_images) - ImageStorager.max_image_count
|
||||
for i in range(to_delete):
|
||||
name, group_id, qq_id, _ = remaining_images[i]
|
||||
resource = ImageStorager.images_pool[name].resources[group_id][qq_id]
|
||||
del ImageStorager.images_pool[name].resources[group_id][qq_id]
|
||||
cls.delete_path_image(name)
|
||||
|
||||
@classmethod
|
||||
def _add_to_pool(cls, image: bytes, name: str, group_id: str, qq_id: str, expire: int = 36000):
|
||||
expire_time = time.time() + expire
|
||||
if name not in cls.images_pool:
|
||||
cls.images_pool[name] = StorageImage(name=name,resources={})
|
||||
if group_id not in cls.images_pool[name].resources:
|
||||
cls.images_pool[name].resources[group_id] = {}
|
||||
cls.images_pool[name].resources[group_id][qq_id] = ImageResource(name=name, expire=expire_time)
|
||||
|
||||
@classmethod
|
||||
def save_image(cls, image: bytes, name: str, group_id: str, qq_id: str) -> None:
|
||||
"""
|
||||
以哈希值命名保存图片,并返回图片资源信息
|
||||
"""
|
||||
# 检测图像大小,不得超过 10 MB
|
||||
if len(image) > cls.max_storage:
|
||||
raise ValueError("图片大小超过 10 MB 限制")
|
||||
hash_name = md5(image).hexdigest()
|
||||
ext = os.path.splitext(name)[1]
|
||||
file_name = f"{hash_name}{ext}"
|
||||
full_path = IMAGE_PATH / file_name
|
||||
with open(full_path, "wb") as f:
|
||||
f.write(image)
|
||||
# 将文件写入 images_pool
|
||||
cls._add_to_pool(image, file_name, group_id, qq_id)
|
||||
|
||||
@classmethod
|
||||
def load_image(cls, name: str, group_id: str, qq_id: str) -> Image:
|
||||
if name not in cls.images_pool:
|
||||
return None
|
||||
if group_id not in cls.images_pool[name].resources:
|
||||
return None
|
||||
# 寻找对应 QQ 号 的资源,如果没有就返回相同群下的第一个资源
|
||||
if qq_id not in cls.images_pool[name].resources[group_id]:
|
||||
first_qq_id = next(iter(cls.images_pool[name].resources[group_id]))
|
||||
qq_id = first_qq_id
|
||||
resource = cls.images_pool[name].resources[group_id][qq_id]
|
||||
resource_path = IMAGE_PATH / resource.name
|
||||
return Image.open(resource_path)
|
||||
|
||||
class ImageStoragerManager:
|
||||
def __init__(self, interval: int = 300): # 默认 5 分钟执行一次
|
||||
self.interval = interval
|
||||
self._clear_task = None
|
||||
self._running = False
|
||||
|
||||
async def start_auto_clear(self):
|
||||
"""启动自动任务"""
|
||||
self._running = True
|
||||
self._clear_task = asyncio.create_task(self._auto_clear_loop())
|
||||
|
||||
logger.info(f"自动清理任务已启动,间隔: {self.interval}秒")
|
||||
|
||||
async def stop_auto_clear(self):
|
||||
"""停止自动清理任务"""
|
||||
if self._clear_task:
|
||||
self._running = False
|
||||
self._clear_task.cancel()
|
||||
try:
|
||||
await self._clear_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("自动清理任务已停止")
|
||||
else:
|
||||
logger.warning("没有正在运行的自动清理任务")
|
||||
|
||||
async def _auto_clear_loop(self):
|
||||
"""自动清理循环"""
|
||||
while self._running:
|
||||
try:
|
||||
await asyncio.sleep(self.interval)
|
||||
await ImageStorager.clear_expire_image()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"定时清理失败: {e}")
|
||||
|
||||
image_manager = ImageStoragerManager(interval=300) # 每5分钟清理一次
|
||||
|
||||
@driver.on_startup
|
||||
async def init_image_storage():
|
||||
ImageStorager.init()
|
||||
# 启用定时任务清理过期图片
|
||||
await image_manager.start_auto_clear()
|
||||
20
konabot/plugins/fx_process/types.py
Normal file
20
konabot/plugins/fx_process/types.py
Normal file
@ -0,0 +1,20 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class FilterItem:
|
||||
name: str
|
||||
filter: callable
|
||||
args: list
|
||||
|
||||
class ImageRequireSignal:
|
||||
pass
|
||||
|
||||
@dataclass
|
||||
class ImagesListRequireSignal:
|
||||
pass
|
||||
|
||||
@dataclass
|
||||
class SenderInfo:
|
||||
group_id: str
|
||||
qq_id: str
|
||||
26
konabot/plugins/notice_ui/__init__.py
Normal file
26
konabot/plugins/notice_ui/__init__.py
Normal file
@ -0,0 +1,26 @@
|
||||
from loguru import logger
|
||||
import nonebot
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||||
from nonebot_plugin_alconna import (
|
||||
UniMessage,
|
||||
UniMsg
|
||||
)
|
||||
from konabot.plugins.notice_ui.notice import NoticeUI
|
||||
from nonebot_plugin_alconna import on_alconna, Alconna, Args
|
||||
|
||||
evt = on_alconna(Alconna(
|
||||
"notice",
|
||||
Args["title", str],
|
||||
Args["message", str]
|
||||
),
|
||||
use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
|
||||
)
|
||||
|
||||
@evt.handle()
|
||||
async def _(title: str, message: str, msg: UniMsg, event: BaseEvent):
|
||||
logger.debug(f"Received notice command with title: {title}, message: {message}")
|
||||
|
||||
out = await NoticeUI.render_notice(title, message)
|
||||
|
||||
await evt.send(await UniMessage().image(raw=out).export())
|
||||
77
konabot/plugins/notice_ui/notice.py
Normal file
77
konabot/plugins/notice_ui/notice.py
Normal file
@ -0,0 +1,77 @@
|
||||
from io import BytesIO
|
||||
from PIL import Image
|
||||
from konabot.common.web_render import konaweb
|
||||
from konabot.common.web_render.core import WebRenderer
|
||||
|
||||
from playwright.async_api import Page
|
||||
|
||||
class NoticeUI:
|
||||
@staticmethod
|
||||
async def render_notice(title: str, message: str) -> bytes:
|
||||
"""
|
||||
渲染一个通知图片,包含标题和消息内容。
|
||||
"""
|
||||
async def page_function(page: Page):
|
||||
# 直到 setMaskMode 函数加载完成
|
||||
await page.wait_for_function("typeof setMaskMode === 'function'", timeout=1000)
|
||||
await page.evaluate('setMaskMode(false)')
|
||||
# 直到 setContent 函数加载完成
|
||||
await page.wait_for_function("typeof setContent === 'function'", timeout=1000)
|
||||
# 设置标题和消息内容
|
||||
await page.evaluate(f'setContent("{title}", "{message}")')
|
||||
|
||||
async def mask_function(page: Page):
|
||||
# 直到 setContent 函数加载完成
|
||||
await page.wait_for_function("typeof setContent === 'function'", timeout=1000)
|
||||
# 设置标题和消息内容
|
||||
await page.evaluate(f'setContent("{title}", "{message}")')
|
||||
# 直到 setMaskMode 函数加载完成
|
||||
await page.wait_for_function("typeof setMaskMode === 'function'", timeout=1000)
|
||||
await page.evaluate('setMaskMode(true)')
|
||||
|
||||
image_bytes = await WebRenderer.render_with_persistent_page(
|
||||
"notice_renderer",
|
||||
konaweb('notice'),
|
||||
target='#main',
|
||||
other_function=page_function,
|
||||
)
|
||||
|
||||
mask_bytes = await WebRenderer.render_with_persistent_page(
|
||||
"notice_renderer",
|
||||
konaweb('notice'),
|
||||
target='#main',
|
||||
other_function=mask_function)
|
||||
|
||||
image = Image.open(BytesIO(image_bytes)).convert("RGBA")
|
||||
mask = Image.open(BytesIO(mask_bytes)).convert("L")
|
||||
|
||||
# 使用mask作为alpha通道
|
||||
r, g, b, _ = image.split()
|
||||
transparent_image = Image.merge("RGBA", (r, g, b, mask))
|
||||
|
||||
# 先创建一个纯白色背景,然后粘贴透明图像
|
||||
background = Image.new("RGBA", transparent_image.size, (255, 255, 255, 255))
|
||||
composite = Image.alpha_composite(background, transparent_image)
|
||||
|
||||
palette_img = composite.convert("RGB").convert(
|
||||
"P",
|
||||
palette=Image.Palette.WEB,
|
||||
colors=256,
|
||||
dither=Image.Dither.NONE
|
||||
)
|
||||
|
||||
# 将alpha值小于128的设为透明
|
||||
alpha_mask = mask.point(lambda x: 0 if x < 128 else 255)
|
||||
|
||||
# 保存为GIF
|
||||
output_buffer = BytesIO()
|
||||
palette_img.save(
|
||||
output_buffer,
|
||||
format="GIF",
|
||||
transparency=0, # 将索引0设为透明
|
||||
disposal=2,
|
||||
loop=0
|
||||
)
|
||||
|
||||
output_buffer.seek(0)
|
||||
return output_buffer.getvalue()
|
||||
@ -6,6 +6,7 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import nanoid
|
||||
from konabot.plugins.notice_ui.notice import NoticeUI
|
||||
import nonebot
|
||||
from loguru import logger
|
||||
from nonebot import get_plugin_config, on_message
|
||||
@ -107,6 +108,11 @@ async def _(task: LongTask):
|
||||
await task.target.send_message(
|
||||
UniMessage().text(f"代办提醒:{message}")
|
||||
)
|
||||
notice_bytes = await NoticeUI.render_notice("代办提醒", message)
|
||||
await task.target.send_message(
|
||||
UniMessage().image(raw=notice_bytes),
|
||||
at=False
|
||||
)
|
||||
async with DATA_FILE_LOCK:
|
||||
data = load_notify_config()
|
||||
if (chan := data.notify_channels.get(task.target.target_id)) is not None:
|
||||
|
||||
Reference in New Issue
Block a user