Compare commits

...

3 Commits

Author SHA1 Message Date
54fae88914 待完善 2025-12-09 00:02:26 +08:00
eed21e6223 new
All checks were successful
continuous-integration/drone/push Build is passing
2025-12-03 22:38:50 +08:00
bf5c10b7a7 notice test
All checks were successful
continuous-integration/drone/push Build is passing
2025-12-03 22:23:44 +08:00
8 changed files with 304 additions and 27 deletions

View File

@ -208,5 +208,8 @@ async def _ext_img(
return None
DepImageBytes = Annotated[bytes, nonebot.params.Depends(_ext_img_data)]
DepPILImage = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]
DepImageBytesOrNone = Annotated[bytes | None, nonebot.params.Depends(_ext_img_data)]

View File

@ -4,9 +4,11 @@ from io import BytesIO
from inspect import signature
from konabot.common.nb.extract_image import DepImageBytes, DepPILImage
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.extract_image import DepImageBytesOrNone
from nonebot.adapters import Event as BaseEvent
from nonebot import on_message, logger
from returns.result import Failure, Result, Success
from nonebot_plugin_alconna import (
UniMessage,
@ -17,13 +19,9 @@ from konabot.plugins.fx_process.fx_manager import ImageFilterManager
from PIL import Image, ImageSequence
@dataclass
class FilterItem:
name: str
filter: callable
args: list
from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo
def prase_input_args(input_str: str) -> list[FilterItem]:
def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[FilterItem]:
# 按分号或换行符分割参数
args = []
for part in input_str.replace('\n', ';').split(';'):
@ -42,7 +40,7 @@ def prase_input_args(input_str: str) -> list[FilterItem]:
# 从 args 提取参数,并转换为适当类型
func_args = []
for i in range(0, min(len(input_filter_args), max_params)):
# 尝试将参数转换为函数签名中对应的类型
# 尝试将参数转换为函数签名中对应的类型,并检测是不是 Image 类型,如果有则表示多个图像输入
param = list(sig.parameters.values())[i + 1]
param_type = param.annotation
arg_value = input_filter_args[i]
@ -51,6 +49,14 @@ def prase_input_args(input_str: str) -> list[FilterItem]:
converted_value = float(arg_value)
elif param_type is int:
converted_value = int(arg_value)
elif param_type is bool:
converted_value = arg_value.lower() in ['true', '1', 'yes', '', '']
elif param_type is Image.Image:
converted_value = ImageRequireSignal()
elif param_type is SenderInfo:
converted_value = sender_info
elif param_type is list[Image.Image]:
converted_value = ImagesListRequireSignal()
else:
converted_value = arg_value
except Exception:
@ -59,14 +65,47 @@ def prase_input_args(input_str: str) -> list[FilterItem]:
args.append(FilterItem(name=filter_name,filter=filter_func, args=func_args))
return args
def apply_filters_to_image(img: Image, filters: list[FilterItem]) -> Image:
def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image:
for filter_item in filters:
filter_func = filter_item.filter
func_args = filter_item.args
img = filter_func(img, *func_args)
return img
# 检测参数中是否有 ImageRequireSignal如果有则传入对应数量的图像列表
if any(isinstance(arg, ImageRequireSignal) for arg in func_args):
# 替换 ImageRequireSignal 为 images 对应索引的图像
actual_args = []
img_signal_count = 1 # 从 images[1] 开始取图像
for arg in func_args:
if isinstance(arg, ImageRequireSignal):
actual_args.append(images[img_signal_count])
img_signal_count += 1
else:
actual_args.append(arg)
func_args = actual_args
# 检测参数中是否有 ImagesListRequireSignal如果有则传入整个图像列表
if any(isinstance(arg, ImagesListRequireSignal) for arg in func_args):
actual_args = []
for arg in func_args:
if isinstance(arg, ImagesListRequireSignal):
actual_args.append(images)
else:
actual_args.append(arg)
func_args = actual_args
images[0] = filter_func(images[0], *func_args)
return images[0]
async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem]) -> BytesIO:
# 如果第一项是“加载图像”参数,那么就加载图像
if filters and filters[0].name == "加载图像":
load_filter = filters.pop(0)
# 加载全部路径
for path in load_filter.args:
img = Image.open(path)
images.append(img)
if len(images) <= 0:
raise ValueError("没有提供任何图像进行处理")
async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem]) -> BytesIO:
# 检测是否需要将静态图视作动图处理
frozen_to_move = any(
filter_item.name == "动图"
@ -83,7 +122,7 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem])
static_fps = 10
break
# 如果 image 是动图,则逐帧处理
img = Image.open(BytesIO(image_bytes))
img = images[0]
logger.debug("开始图像处理")
output = BytesIO()
if getattr(img, "is_animated", False) or frozen_to_move:
@ -101,13 +140,12 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem])
all_frames.append(img.copy())
img.info['duration'] = int(1000 / static_fps)
async def process_single_frame(frame: Image.Image, frame_idx: int) -> Image.Image:
async def process_single_frame(frame: list[Image.Image], frame_idx: int) -> Image.Image:
"""处理单帧的异步函数"""
logger.debug(f"开始处理帧 {frame_idx}")
result = await asynkio.to_thread(apply_filters_to_image, frame, filters)
result = await asynkio.to_thread(handle_filters_to_image, frame, images, filters)
logger.debug(f"完成处理帧 {frame_idx}")
return result
return result[0]
# 并发处理所有帧
tasks = []
for i, frame in enumerate(all_frames):
@ -135,7 +173,7 @@ async def apply_filters_to_bytes(image_bytes: bytes, filters: list[FilterItem])
)
logger.debug("Animated image saved")
else:
img = apply_filters_to_image(img, filters)
img = handle_filters_to_image(images=images, filters=filters)
img.save(output, format="PNG")
logger.debug("Image processing completed")
output.seek(0)
@ -150,14 +188,28 @@ def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
fx_on = on_message(rule=is_fx_mentioned)
@fx_on.handle()
async def _(msg: UniMsg, event: BaseEvent, img: DepImageBytes):
async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone = None):
preload_imgs = []
# 提取图像
try:
if image_data is not None:
preload_imgs.append(Image.open(BytesIO(image_data)))
logger.debug("Image extracted for FX processing.")
except Exception:
logger.info("No image found in message for FX processing.")
args = msg.extract_plain_text().split()
if len(args) < 2:
return
filters = prase_input_args(msg.extract_plain_text()[2:])
sender_info = SenderInfo(
group_id=target.channel_id,
qq_id=target.target_id
)
filters = prase_input_args(msg.extract_plain_text()[2:], sender_info=sender_info)
if not filters:
return
output = await apply_filters_to_bytes(img, filters)
output = await apply_filters_to_images(preload_imgs, filters)
logger.debug("FX processing completed, sending result.")
await fx_on.send(await UniMessage().image(raw=output).export())

View File

@ -11,6 +11,9 @@ import math
from konabot.plugins.fx_process.gradient import GradientGenerator
import numpy as np
from konabot.plugins.fx_process.image_storage import ImageStorager
from konabot.plugins.fx_process.types import SenderInfo
class ImageFilterImplement:
@staticmethod
def apply_blur(image: Image.Image, radius: float = 10) -> Image.Image:
@ -1085,3 +1088,24 @@ class ImageFilterEmpty:
@staticmethod
def empty_filter_param(image, param = 10):
return image
class ImageFilterStorage:
# 用于存储图像
@staticmethod
def store_image(image: Image.Image, name: str, sender_info: SenderInfo) -> Image.Image:
ImageStorager.save_image(image, name, sender_info.group_id, sender_info.qq_id)
return image
# 用于暂存图像
@staticmethod
def temp_store_image(image: Image.Image, images: list[Image.Image]) -> Image.Image:
images.append(image)
return image
# 用于读取图像
@staticmethod
def load_image(image: Image.Image, name: str, images: list[Image.Image], sender_info: SenderInfo) -> Image.Image:
loaded_image = ImageStorager.load_image(name, sender_info.group_id, sender_info.qq_id)
if loaded_image is not None:
images.append(loaded_image)
return image

View File

@ -1,5 +1,5 @@
from typing import Optional
from konabot.plugins.fx_process.fx_handle import ImageFilterEmpty, ImageFilterImplement
from konabot.plugins.fx_process.fx_handle import ImageFilterEmpty, ImageFilterImplement, ImageFilterStorage
class ImageFilterManager:
filter_map = {
@ -47,6 +47,9 @@ class ImageFilterManager:
"晃动": ImageFilterImplement.apply_random_wiggle,
"动图": ImageFilterEmpty.empty_filter_param,
"像素抖动": ImageFilterImplement.apply_pixel_jitter,
"存入图像": ImageFilterStorage.store_image,
"读取图像": ImageFilterStorage.load_image,
"暂存图像": ImageFilterStorage.temp_store_image,
}
@classmethod

View File

@ -0,0 +1,150 @@
import asyncio
from dataclasses import dataclass
from hashlib import md5
import time
from nonebot import logger
from nonebot_plugin_apscheduler import driver
from konabot.common.path import DATA_PATH
import os
from PIL import Image
IMAGE_PATH = DATA_PATH / "temp" / "images"
@dataclass
class ImageResource:
name: str
expire: int
@dataclass
class StorageImage:
name: str
resources: dict[str,
dict[str,ImageResource]] # {群号: {QQ号: ImageResource}}
class ImageStorager:
images_pool: dict[str,StorageImage] = {}
max_storage: int = 10 * 1024 * 1024 # 最大存储10MB
max_image_count: int = 200 # 最大存储图片数量
@staticmethod
def init():
if not IMAGE_PATH.exists():
IMAGE_PATH.mkdir(parents=True, exist_ok=True)
@staticmethod
def delete_path_image(name: str):
resource_path = IMAGE_PATH / name
if resource_path.exists():
os.remove(resource_path)
@classmethod
async def clear_expire_image(cls):
# 清理过期的图片资源,将未被删除的放入列表中,如果超过最大数量则删除最早过期的
remaining_images = []
current_time = time.time()
for name, storage_image in list(ImageStorager.images_pool.items()):
for group_id, resources in list(storage_image.resources.items()):
for qq_id, resource in list(resources.items()):
if resource.expire < current_time:
del storage_image.resources[group_id][qq_id]
cls.delete_path_image(name)
else:
remaining_images.append((name, group_id, qq_id, resource.expire))
if not storage_image.resources:
del ImageStorager.images_pool[name]
# 如果剩余图片超过最大数量,按过期时间排序并删除最早过期的
if len(remaining_images) > ImageStorager.max_image_count:
remaining_images.sort(key=lambda x: x[3]) # 按过期时间排序
to_delete = len(remaining_images) - ImageStorager.max_image_count
for i in range(to_delete):
name, group_id, qq_id, _ = remaining_images[i]
resource = ImageStorager.images_pool[name].resources[group_id][qq_id]
del ImageStorager.images_pool[name].resources[group_id][qq_id]
cls.delete_path_image(name)
@classmethod
def _add_to_pool(cls, image: bytes, name: str, group_id: str, qq_id: str, expire: int = 36000):
expire_time = time.time() + expire
if name not in cls.images_pool:
cls.images_pool[name] = StorageImage(name=name,resources={})
if group_id not in cls.images_pool[name].resources:
cls.images_pool[name].resources[group_id] = {}
cls.images_pool[name].resources[group_id][qq_id] = ImageResource(name=name, expire=expire_time)
@classmethod
def save_image(cls, image: bytes, name: str, group_id: str, qq_id: str) -> None:
"""
以哈希值命名保存图片,并返回图片资源信息
"""
# 检测图像大小,不得超过 10 MB
if len(image) > cls.max_storage:
raise ValueError("图片大小超过 10 MB 限制")
hash_name = md5(image).hexdigest()
ext = os.path.splitext(name)[1]
file_name = f"{hash_name}{ext}"
full_path = IMAGE_PATH / file_name
with open(full_path, "wb") as f:
f.write(image)
# 将文件写入 images_pool
cls._add_to_pool(image, file_name, group_id, qq_id)
@classmethod
def load_image(cls, name: str, group_id: str, qq_id: str) -> Image:
if name not in cls.images_pool:
return None
if group_id not in cls.images_pool[name].resources:
return None
# 寻找对应 QQ 号 的资源,如果没有就返回相同群下的第一个资源
if qq_id not in cls.images_pool[name].resources[group_id]:
first_qq_id = next(iter(cls.images_pool[name].resources[group_id]))
qq_id = first_qq_id
resource = cls.images_pool[name].resources[group_id][qq_id]
resource_path = IMAGE_PATH / resource.name
return Image.open(resource_path)
class ImageStoragerManager:
def __init__(self, interval: int = 300): # 默认 5 分钟执行一次
self.interval = interval
self._clear_task = None
self._running = False
async def start_auto_clear(self):
"""启动自动任务"""
self._running = True
self._clear_task = asyncio.create_task(self._auto_clear_loop())
logger.info(f"自动清理任务已启动,间隔: {self.interval}")
async def stop_auto_clear(self):
"""停止自动清理任务"""
if self._clear_task:
self._running = False
self._clear_task.cancel()
try:
await self._clear_task
except asyncio.CancelledError:
pass
logger.info("自动清理任务已停止")
else:
logger.warning("没有正在运行的自动清理任务")
async def _auto_clear_loop(self):
"""自动清理循环"""
while self._running:
try:
await asyncio.sleep(self.interval)
await ImageStorager.clear_expire_image()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"定时清理失败: {e}")
image_manager = ImageStoragerManager(interval=300) # 每5分钟清理一次
@driver.on_startup
async def init_image_storage():
ImageStorager.init()
# 启用定时任务清理过期图片
await image_manager.start_auto_clear()

View File

@ -0,0 +1,20 @@
from dataclasses import dataclass
@dataclass
class FilterItem:
name: str
filter: callable
args: list
class ImageRequireSignal:
pass
@dataclass
class ImagesListRequireSignal:
pass
@dataclass
class SenderInfo:
group_id: str
qq_id: str

View File

@ -44,10 +44,34 @@ class NoticeUI:
image = Image.open(BytesIO(image_bytes)).convert("RGBA")
mask = Image.open(BytesIO(mask_bytes)).convert("L")
# 应用遮罩
image.putalpha(mask)
# 使用mask作为alpha通道
r, g, b, _ = image.split()
transparent_image = Image.merge("RGBA", (r, g, b, mask))
# 先创建一个纯白色背景,然后粘贴透明图像
background = Image.new("RGBA", transparent_image.size, (255, 255, 255, 255))
composite = Image.alpha_composite(background, transparent_image)
palette_img = composite.convert("RGB").convert(
"P",
palette=Image.Palette.WEB,
colors=256,
dither=Image.Dither.NONE
)
# 将alpha值小于128的设为透明
alpha_mask = mask.point(lambda x: 0 if x < 128 else 255)
# 保存为GIF
output_buffer = BytesIO()
image.save(output_buffer, format="GIF", disposal=2, loop=0)
palette_img.save(
output_buffer,
format="GIF",
transparency=0, # 将索引0设为透明
disposal=2,
loop=0
)
output_buffer.seek(0)
return output_buffer.getvalue()

View File

@ -108,9 +108,10 @@ async def _(task: LongTask):
await task.target.send_message(
UniMessage().text(f"代办提醒:{message}")
)
notice_bytes = NoticeUI.render_notice("代办提醒", message)
notice_bytes = await NoticeUI.render_notice("代办提醒", message)
await task.target.send_message(
await UniMessage().image(raw=notice_bytes).export()
UniMessage().image(raw=notice_bytes),
at=False
)
async with DATA_FILE_LOCK:
data = load_notify_config()