完整版
This commit is contained in:
@ -207,9 +207,21 @@ async def _ext_img(
|
||||
await matcher.send(await UniMessage.text(msg).export())
|
||||
return None
|
||||
|
||||
|
||||
async def _try_ext_img(
|
||||
evt: Event,
|
||||
bot: Bot,
|
||||
matcher: Matcher,
|
||||
) -> bytes | None:
|
||||
match await extract_image_data_from_message(evt.get_message(), evt, bot):
|
||||
case Success(img):
|
||||
return img
|
||||
case Failure(err):
|
||||
# raise BotExceptionMessage(err)
|
||||
# await matcher.send(await UniMessage().text(err).export())
|
||||
return None
|
||||
assert False
|
||||
|
||||
DepImageBytes = Annotated[bytes, nonebot.params.Depends(_ext_img_data)]
|
||||
DepPILImage = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]
|
||||
|
||||
DepImageBytesOrNone = Annotated[bytes | None, nonebot.params.Depends(_ext_img_data)]
|
||||
DepImageBytesOrNone = Annotated[bytes | None, nonebot.params.Depends(_try_ext_img)]
|
||||
|
||||
@ -1,25 +1,54 @@
|
||||
import asyncio as asynkio
|
||||
from dataclasses import dataclass
|
||||
from io import BytesIO
|
||||
|
||||
from inspect import signature
|
||||
import random
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.nb.extract_image import DepImageBytesOrNone
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot import on_message, logger
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
from nonebot_plugin_alconna import (
|
||||
UniMessage,
|
||||
UniMsg
|
||||
)
|
||||
|
||||
from konabot.plugins.fx_process.fx_handle import ImageFilterStorage
|
||||
from konabot.plugins.fx_process.fx_manager import ImageFilterManager
|
||||
|
||||
from PIL import Image, ImageSequence
|
||||
|
||||
from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo
|
||||
from konabot.plugins.fx_process.types import FilterItem, ImageRequireSignal, ImagesListRequireSignal, SenderInfo, StoredInfo
|
||||
|
||||
def try_convert_type(param_type, input_param, sender_info: SenderInfo = None) -> tuple[bool, any]:
|
||||
converted_value = None
|
||||
try:
|
||||
if param_type is float:
|
||||
converted_value = float(input_param)
|
||||
elif param_type is int:
|
||||
converted_value = int(input_param)
|
||||
elif param_type is bool:
|
||||
converted_value = input_param.lower() in ['true', '1', 'yes', '是', '开']
|
||||
elif param_type is Image.Image:
|
||||
converted_value = ImageRequireSignal()
|
||||
return False, converted_value
|
||||
elif param_type is SenderInfo:
|
||||
converted_value = sender_info
|
||||
return False, converted_value
|
||||
elif param_type == list[Image.Image]:
|
||||
converted_value = ImagesListRequireSignal()
|
||||
return False, converted_value
|
||||
elif param_type is str:
|
||||
if input_param is None:
|
||||
return False, None
|
||||
converted_value = str(input_param)
|
||||
else:
|
||||
return False, None
|
||||
except Exception:
|
||||
return False, None
|
||||
return True, converted_value
|
||||
|
||||
def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[FilterItem]:
|
||||
# 按分号或换行符分割参数
|
||||
@ -39,34 +68,24 @@ def prase_input_args(input_str: str, sender_info: SenderInfo = None) -> list[Fil
|
||||
max_params = len(sig.parameters) - 1 # 减去第一个参数 image
|
||||
# 从 args 提取参数,并转换为适当类型
|
||||
func_args = []
|
||||
for i in range(0, min(len(input_filter_args), max_params)):
|
||||
# 尝试将参数转换为函数签名中对应的类型,并检测是不是 Image 类型,如果有则表示多个图像输入
|
||||
for i in range(0, max_params):
|
||||
# 尝试将参数转换为函数签名中对应的类型
|
||||
param = list(sig.parameters.values())[i + 1]
|
||||
param_type = param.annotation
|
||||
arg_value = input_filter_args[i]
|
||||
try:
|
||||
if param_type is float:
|
||||
converted_value = float(arg_value)
|
||||
elif param_type is int:
|
||||
converted_value = int(arg_value)
|
||||
elif param_type is bool:
|
||||
converted_value = arg_value.lower() in ['true', '1', 'yes', '是', '开']
|
||||
elif param_type is Image.Image:
|
||||
converted_value = ImageRequireSignal()
|
||||
elif param_type is SenderInfo:
|
||||
converted_value = sender_info
|
||||
elif param_type is list[Image.Image]:
|
||||
converted_value = ImagesListRequireSignal()
|
||||
else:
|
||||
converted_value = arg_value
|
||||
except Exception:
|
||||
converted_value = arg_value
|
||||
func_args.append(converted_value)
|
||||
# 根据函数所需要的参数,从输入参数中提取,如果不匹配就使用默认值,将当前参数递交给下一个循环
|
||||
input_param = input_filter_args[0] if len(input_filter_args) > 0 else None
|
||||
state, converted_param = try_convert_type(param_type, input_param, sender_info)
|
||||
if state:
|
||||
input_filter_args.pop(0)
|
||||
if converted_param is None and param.default != param.empty:
|
||||
converted_param = param.default
|
||||
func_args.append(converted_param)
|
||||
args.append(FilterItem(name=filter_name,filter=filter_func, args=func_args))
|
||||
return args
|
||||
|
||||
def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem]) -> Image.Image:
|
||||
for filter_item in filters:
|
||||
logger.debug(f"{filter_item}")
|
||||
filter_func = filter_item.filter
|
||||
func_args = filter_item.args
|
||||
# 检测参数中是否有 ImageRequireSignal,如果有则传入对应数量的图像列表
|
||||
@ -90,21 +109,53 @@ def handle_filters_to_image(images: list[Image.Image], filters: list[FilterItem]
|
||||
else:
|
||||
actual_args.append(arg)
|
||||
func_args = actual_args
|
||||
|
||||
logger.debug(f"Applying filter: {filter_item.name} with args: {func_args}")
|
||||
|
||||
images[0] = filter_func(images[0], *func_args)
|
||||
return images[0]
|
||||
|
||||
async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem]) -> BytesIO:
|
||||
# 如果第一项是“加载图像”参数,那么就加载图像
|
||||
if filters and filters[0].name == "加载图像":
|
||||
load_filter = filters.pop(0)
|
||||
# 加载全部路径
|
||||
for path in load_filter.args:
|
||||
img = Image.open(path)
|
||||
images.append(img)
|
||||
def copy_images_by_index(images: list[Image.Image], index: int) -> list[Image.Image]:
|
||||
# 将导入图像列表复制为新的图像列表,如果是动图,那么就找到对应索引下的帧
|
||||
new_images = []
|
||||
for img in images:
|
||||
if getattr(img, "is_animated", False):
|
||||
frames = img.n_frames
|
||||
frame_idx = index % frames
|
||||
img.seek(frame_idx)
|
||||
new_images.append(img.copy())
|
||||
else:
|
||||
new_images.append(img.copy())
|
||||
|
||||
return new_images
|
||||
|
||||
def save_or_load_image(images: list[Image.Image], filters: list[FilterItem], sender_info: SenderInfo) -> StoredInfo | None:
|
||||
stored_info = None
|
||||
# 处理位于最前面的“读取图像”、“存入图像”
|
||||
if not filters:
|
||||
return
|
||||
while filters and filters[0].name.strip() in ["读取图像", "存入图像"]:
|
||||
if filters[0].name.strip() == "读取图像":
|
||||
load_filter = filters.pop(0)
|
||||
path = load_filter.args[0] if load_filter.args else ""
|
||||
ImageFilterStorage.load_image(None, path, images, sender_info)
|
||||
elif filters[0].name.strip() == "存入图像":
|
||||
store_filter = filters.pop(0)
|
||||
name = store_filter.args[0] if store_filter.args[0] else str(random.randint(10000,99999))
|
||||
stored_info = ImageFilterStorage.store_image(images[0], name, sender_info)
|
||||
# 将剩下的“读取图像”或“存入图像”参数全部删除,避免后续非法操作
|
||||
filters[:] = [f for f in filters if f.name.strip() not in ["读取图像", "存入图像"]]
|
||||
return stored_info
|
||||
|
||||
async def apply_filters_to_images(images: list[Image.Image], filters: list[FilterItem], sender_info: SenderInfo) -> BytesIO | StoredInfo:
|
||||
# 先处理存取图像的操作
|
||||
stored_info = save_or_load_image(images, filters, sender_info)
|
||||
|
||||
if stored_info and len(filters) <= 0:
|
||||
return stored_info
|
||||
|
||||
if len(images) <= 0:
|
||||
raise ValueError("没有提供任何图像进行处理")
|
||||
raise BotExceptionMessage("没有可处理的图像!")
|
||||
|
||||
# 检测是否需要将静态图视作动图处理
|
||||
frozen_to_move = any(
|
||||
@ -127,32 +178,30 @@ async def apply_filters_to_images(images: list[Image.Image], filters: list[Filte
|
||||
output = BytesIO()
|
||||
if getattr(img, "is_animated", False) or frozen_to_move:
|
||||
frames = []
|
||||
all_frames = []
|
||||
if getattr(img, "is_animated", False):
|
||||
logger.debug("处理动图帧")
|
||||
for frame in ImageSequence.Iterator(img):
|
||||
frame_copy = frame.copy()
|
||||
all_frames.append(frame_copy)
|
||||
else:
|
||||
# 将静态图视作单帧动图处理,拷贝多份
|
||||
logger.debug("处理静态图为多帧动图")
|
||||
for _ in range(10): # 默认复制10帧
|
||||
all_frames.append(img.copy())
|
||||
img.info['duration'] = int(1000 / static_fps)
|
||||
|
||||
async def process_single_frame(frame: list[Image.Image], frame_idx: int) -> Image.Image:
|
||||
async def process_single_frame(frame_images: list[Image.Image], frame_idx: int) -> Image.Image:
|
||||
"""处理单帧的异步函数"""
|
||||
logger.debug(f"开始处理帧 {frame_idx}")
|
||||
result = await asynkio.to_thread(handle_filters_to_image, frame, images, filters)
|
||||
result = await asynkio.to_thread(handle_filters_to_image, frame_images, filters)
|
||||
logger.debug(f"完成处理帧 {frame_idx}")
|
||||
return result[0]
|
||||
return result
|
||||
|
||||
# 并发处理所有帧
|
||||
tasks = []
|
||||
for i, frame in enumerate(all_frames):
|
||||
task = process_single_frame(frame, i)
|
||||
all_frames = []
|
||||
for i, frame in enumerate(ImageSequence.Iterator(img)):
|
||||
all_frames.append(frame.copy())
|
||||
images_copy = copy_images_by_index(images, i)
|
||||
task = process_single_frame(images_copy, i)
|
||||
tasks.append(task)
|
||||
|
||||
frames = await asynkio.gather(*tasks, return_exceptions=True)
|
||||
frames = await asynkio.gather(*tasks, return_exceptions=False)
|
||||
|
||||
# 检查是否有处理失败的帧
|
||||
for i, result in enumerate(frames):
|
||||
@ -188,13 +237,11 @@ def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
|
||||
fx_on = on_message(rule=is_fx_mentioned)
|
||||
|
||||
@fx_on.handle()
|
||||
async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone = None):
|
||||
async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data: DepImageBytesOrNone):
|
||||
preload_imgs = []
|
||||
# 提取图像
|
||||
try:
|
||||
if image_data is not None:
|
||||
preload_imgs.append(Image.open(BytesIO(image_data)))
|
||||
logger.debug("Image extracted for FX processing.")
|
||||
preload_imgs.append(Image.open(BytesIO(image_data)))
|
||||
except Exception:
|
||||
logger.info("No image found in message for FX processing.")
|
||||
args = msg.extract_plain_text().split()
|
||||
@ -207,9 +254,11 @@ async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, image_data
|
||||
)
|
||||
|
||||
filters = prase_input_args(msg.extract_plain_text()[2:], sender_info=sender_info)
|
||||
if not filters:
|
||||
return
|
||||
output = await apply_filters_to_images(preload_imgs, filters)
|
||||
logger.debug("FX processing completed, sending result.")
|
||||
await fx_on.send(await UniMessage().image(raw=output).export())
|
||||
# if not filters:
|
||||
# return
|
||||
output = await apply_filters_to_images(preload_imgs, filters, sender_info)
|
||||
if isinstance(output,StoredInfo):
|
||||
await fx_on.send(await UniMessage().text(f"图像已存为「{output.name}」!").export())
|
||||
elif isinstance(output,BytesIO):
|
||||
await fx_on.send(await UniMessage().image(raw=output).export())
|
||||
|
||||
@ -12,7 +12,7 @@ from konabot.plugins.fx_process.gradient import GradientGenerator
|
||||
import numpy as np
|
||||
|
||||
from konabot.plugins.fx_process.image_storage import ImageStorager
|
||||
from konabot.plugins.fx_process.types import SenderInfo
|
||||
from konabot.plugins.fx_process.types import SenderInfo, StoredInfo
|
||||
|
||||
class ImageFilterImplement:
|
||||
@staticmethod
|
||||
@ -548,6 +548,18 @@ class ImageFilterImplement:
|
||||
|
||||
return Image.fromarray(blended, 'RGBA')
|
||||
|
||||
# 两张图像直接覆盖
|
||||
@staticmethod
|
||||
def apply_overlay(image1: Image.Image, image2: Image.Image) -> Image.Image:
|
||||
if image1.mode != 'RGBA':
|
||||
image1 = image1.convert('RGBA')
|
||||
if image2.mode != 'RGBA':
|
||||
image2 = image2.convert('RGBA')
|
||||
|
||||
image2 = image2.resize(image1.size, Image.Resampling.LANCZOS)
|
||||
|
||||
return Image.alpha_composite(image1, image2)
|
||||
|
||||
# 叠加渐变色
|
||||
@staticmethod
|
||||
def apply_gradient_overlay(
|
||||
@ -560,6 +572,18 @@ class ImageFilterImplement:
|
||||
gradient = gradient_gen.create_gradient(image.size[0], image.size[1], color_nodes)
|
||||
return ImageFilterImplement.apply_blend(image, gradient, mode=overlay_mode, alpha=0.5)
|
||||
|
||||
# 生成颜色,类似固态层
|
||||
@staticmethod
|
||||
def generate_solid(
|
||||
image: Image.Image,
|
||||
color_list: str = '[rgb(255,0,0)|(0,0),rgb(0,255,0)|(0,100),rgb(0,0,255)|(50,100)]'
|
||||
):
|
||||
gradient_gen = GradientGenerator()
|
||||
color_nodes = gradient_gen.parse_color_list(color_list)
|
||||
gradient = gradient_gen.create_gradient(image.size[0], image.size[1], color_nodes)
|
||||
return gradient
|
||||
|
||||
|
||||
# 阴影
|
||||
@staticmethod
|
||||
def apply_shadow(image: Image.Image,
|
||||
@ -1090,17 +1114,50 @@ class ImageFilterEmpty:
|
||||
return image
|
||||
|
||||
class ImageFilterStorage:
|
||||
# 用于存储图像
|
||||
@staticmethod
|
||||
def store_image(image: Image.Image, name: str, sender_info: SenderInfo) -> Image.Image:
|
||||
ImageStorager.save_image(image, name, sender_info.group_id, sender_info.qq_id)
|
||||
return image
|
||||
|
||||
# 用于暂存图像
|
||||
# 用于暂存图像,存储在第一项的后面
|
||||
@staticmethod
|
||||
def temp_store_image(image: Image.Image, images: list[Image.Image]) -> Image.Image:
|
||||
images.append(image)
|
||||
images.insert(1, image.copy())
|
||||
return image
|
||||
|
||||
# 用于交换移动索引
|
||||
@staticmethod
|
||||
def swap_image_index(image: Image.Image, images: list[Image.Image], src: int = 2, dest: int = 1) -> Image.Image:
|
||||
if len(images) == 0:
|
||||
return image
|
||||
# 将二者交换
|
||||
src -= 1
|
||||
dest -= 1
|
||||
if 0 <= src < len(images) and 0 <= dest < len(images):
|
||||
images[src], images[dest] = images[dest], images[src]
|
||||
return images[0]
|
||||
|
||||
# 用于删除指定索引的图像
|
||||
@staticmethod
|
||||
def delete_image_by_index(image: Image.Image, images: list[Image.Image], index: int = 1) -> Image.Image:
|
||||
# 以 1 为基准
|
||||
index -= 1
|
||||
if len(images) == 1:
|
||||
# 只有一张图像,不能删除
|
||||
return image
|
||||
if 0 <= index < len(images):
|
||||
del images[index]
|
||||
return images[0]
|
||||
|
||||
# 用于选择指定索引的图像为首图
|
||||
@staticmethod
|
||||
def select_image_by_index(image: Image.Image, images: list[Image.Image], index: int = 2) -> Image.Image:
|
||||
# 以 1 为基准
|
||||
index -= 1
|
||||
if 0 <= index < len(images):
|
||||
return images[index]
|
||||
return image
|
||||
|
||||
# 用于存储图像
|
||||
@staticmethod
|
||||
def store_image(image: Image.Image, name: str, sender_info: SenderInfo) -> StoredInfo:
|
||||
ImageStorager.save_image_by_pil(image, name, sender_info.group_id, sender_info.qq_id)
|
||||
return StoredInfo(name=name)
|
||||
|
||||
# 用于读取图像
|
||||
@staticmethod
|
||||
|
||||
@ -47,9 +47,18 @@ class ImageFilterManager:
|
||||
"晃动": ImageFilterImplement.apply_random_wiggle,
|
||||
"动图": ImageFilterEmpty.empty_filter_param,
|
||||
"像素抖动": ImageFilterImplement.apply_pixel_jitter,
|
||||
# 图像处理
|
||||
"存入图像": ImageFilterStorage.store_image,
|
||||
"读取图像": ImageFilterStorage.load_image,
|
||||
"暂存图像": ImageFilterStorage.temp_store_image,
|
||||
"交换图像": ImageFilterStorage.swap_image_index,
|
||||
"删除图像": ImageFilterStorage.delete_image_by_index,
|
||||
"选择图像": ImageFilterStorage.select_image_by_index,
|
||||
# 多图像处理
|
||||
"混合图像": ImageFilterImplement.apply_blend,
|
||||
"覆盖图像": ImageFilterImplement.apply_overlay,
|
||||
# 生成式
|
||||
"生成颜色": ImageFilterImplement.generate_solid
|
||||
}
|
||||
|
||||
@classmethod
|
||||
|
||||
@ -14,20 +14,23 @@ class GradientGenerator:
|
||||
"""解析渐变颜色列表字符串
|
||||
|
||||
Args:
|
||||
color_list_str: 格式如 '[rgb(255,0,0)|(0,0),rgb(0,255,0)|(0,100),rgb(0,0,255)|(50,100)]'
|
||||
color_list_str: 格式如 '[rgb(255,0,0)|(0,0)+rgb(0,255,0)|(0,100)+rgb(0,0,255)|(50,100)]'
|
||||
|
||||
Returns:
|
||||
list: 包含颜色和位置信息的字典列表
|
||||
"""
|
||||
color_nodes = []
|
||||
color_list_str = color_list_str.strip('[] ').strip()
|
||||
pattern = r'([^|]+)\|\(([^)]+)\)'
|
||||
matches = re.findall(pattern, color_list_str)
|
||||
color_list_str = color_list_str.strip('[]').strip()
|
||||
matches = color_list_str.split('+')
|
||||
|
||||
for color_str, pos_str in matches:
|
||||
for single_str in matches:
|
||||
color_str = single_str.split('|')[0]
|
||||
pos_str = single_str.split('|')[1] if '|' in single_str else '0,0'
|
||||
|
||||
color = ColorHandle.parse_color(color_str.strip())
|
||||
|
||||
|
||||
try:
|
||||
pos_str = pos_str.replace('(', '').replace(')', '')
|
||||
x_str, y_str = pos_str.split(',')
|
||||
x_percent = float(x_str.strip().replace('%', ''))
|
||||
y_percent = float(y_str.strip().replace('%', ''))
|
||||
|
||||
@ -8,12 +8,13 @@ from nonebot_plugin_apscheduler import driver
|
||||
from konabot.common.path import DATA_PATH
|
||||
import os
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
|
||||
IMAGE_PATH = DATA_PATH / "temp" / "images"
|
||||
|
||||
@dataclass
|
||||
class ImageResource:
|
||||
name: str
|
||||
filename: str
|
||||
expire: int
|
||||
|
||||
@dataclass
|
||||
@ -39,6 +40,14 @@ class ImageStorager:
|
||||
if resource_path.exists():
|
||||
os.remove(resource_path)
|
||||
|
||||
@staticmethod
|
||||
async def clear_all_image():
|
||||
# 清理 temp 目录下的所有图片资源
|
||||
for file in os.listdir(IMAGE_PATH):
|
||||
file_path = IMAGE_PATH / file
|
||||
if file_path.is_file():
|
||||
os.remove(file_path)
|
||||
|
||||
@classmethod
|
||||
async def clear_expire_image(cls):
|
||||
# 清理过期的图片资源,将未被删除的放入列表中,如果超过最大数量则删除最早过期的
|
||||
@ -63,15 +72,17 @@ class ImageStorager:
|
||||
resource = ImageStorager.images_pool[name].resources[group_id][qq_id]
|
||||
del ImageStorager.images_pool[name].resources[group_id][qq_id]
|
||||
cls.delete_path_image(name)
|
||||
logger.info("过期图片清理完成")
|
||||
|
||||
@classmethod
|
||||
def _add_to_pool(cls, image: bytes, name: str, group_id: str, qq_id: str, expire: int = 36000):
|
||||
def _add_to_pool(cls, filename: str, name: str, group_id: str, qq_id: str, expire: int = 36000):
|
||||
expire_time = time.time() + expire
|
||||
if name not in cls.images_pool:
|
||||
cls.images_pool[name] = StorageImage(name=name,resources={})
|
||||
if group_id not in cls.images_pool[name].resources:
|
||||
cls.images_pool[name].resources[group_id] = {}
|
||||
cls.images_pool[name].resources[group_id][qq_id] = ImageResource(name=name, expire=expire_time)
|
||||
cls.images_pool[name].resources[group_id][qq_id] = ImageResource(filename=filename, expire=expire_time)
|
||||
logger.debug(f"{cls.images_pool}")
|
||||
|
||||
@classmethod
|
||||
def save_image(cls, image: bytes, name: str, group_id: str, qq_id: str) -> None:
|
||||
@ -88,20 +99,39 @@ class ImageStorager:
|
||||
with open(full_path, "wb") as f:
|
||||
f.write(image)
|
||||
# 将文件写入 images_pool
|
||||
cls._add_to_pool(image, file_name, group_id, qq_id)
|
||||
logger.debug(f"Image saved: {file_name} for group {group_id}, qq {qq_id}")
|
||||
cls._add_to_pool(file_name, name, group_id, qq_id)
|
||||
|
||||
@classmethod
|
||||
def save_image_by_pil(cls, image: Image.Image, name: str, group_id: str, qq_id: str) -> None:
|
||||
"""
|
||||
以哈希值命名保存图片,并返回图片资源信息
|
||||
"""
|
||||
img_byte_arr = BytesIO()
|
||||
# 如果图片是动图,保存为 GIF 格式
|
||||
if getattr(image, "is_animated", False):
|
||||
image.save(img_byte_arr, format="GIF", save_all=True, loop=0)
|
||||
else:
|
||||
image.save(img_byte_arr, format=image.format or "PNG")
|
||||
img_bytes = img_byte_arr.getvalue()
|
||||
cls.save_image(img_bytes, name, group_id, qq_id)
|
||||
|
||||
@classmethod
|
||||
def load_image(cls, name: str, group_id: str, qq_id: str) -> Image:
|
||||
logger.debug(f"Loading image: {name} for group {group_id}, qq {qq_id}")
|
||||
if name not in cls.images_pool:
|
||||
logger.debug(f"Image {name} not found in pool")
|
||||
return None
|
||||
if group_id not in cls.images_pool[name].resources:
|
||||
logger.debug(f"No resources for group {group_id} in image {name}")
|
||||
return None
|
||||
# 寻找对应 QQ 号 的资源,如果没有就返回相同群下的第一个资源
|
||||
if qq_id not in cls.images_pool[name].resources[group_id]:
|
||||
first_qq_id = next(iter(cls.images_pool[name].resources[group_id]))
|
||||
qq_id = first_qq_id
|
||||
resource = cls.images_pool[name].resources[group_id][qq_id]
|
||||
resource_path = IMAGE_PATH / resource.name
|
||||
resource_path = IMAGE_PATH / resource.filename
|
||||
logger.debug(f"Image path: {resource_path}")
|
||||
return Image.open(resource_path)
|
||||
|
||||
class ImageStoragerManager:
|
||||
@ -112,6 +142,8 @@ class ImageStoragerManager:
|
||||
|
||||
async def start_auto_clear(self):
|
||||
"""启动自动任务"""
|
||||
# 先清理一次
|
||||
await ImageStorager.clear_all_image()
|
||||
self._running = True
|
||||
self._clear_task = asyncio.create_task(self._auto_clear_loop())
|
||||
|
||||
|
||||
@ -10,10 +10,13 @@ class FilterItem:
|
||||
class ImageRequireSignal:
|
||||
pass
|
||||
|
||||
@dataclass
|
||||
class ImagesListRequireSignal:
|
||||
pass
|
||||
|
||||
@dataclass
|
||||
class StoredInfo:
|
||||
name: str
|
||||
|
||||
@dataclass
|
||||
class SenderInfo:
|
||||
group_id: str
|
||||
|
||||
Reference in New Issue
Block a user