Files
konabot/konabot/plugins/air_conditioner/ac.py
MixBadGun f6e7dfcd93
All checks were successful
continuous-integration/drone/push Build is passing
空调最高峰,空调数据库挂载再优化
2025-11-19 16:24:24 +08:00

458 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from enum import Enum
from io import BytesIO
import math
from pathlib import Path
import random
import signal
import time
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from nonebot import logger
from konabot.common.database import DatabaseManager
from konabot.common.path import ASSETS_PATH, FONTS_PATH
from konabot.common.path import DATA_PATH
import nonebot
import json
ROOT_PATH = Path(__file__).resolve().parent
# 创建全局数据库管理器实例
db_manager = DatabaseManager()
class CrashType(Enum):
BURNT = 0
FROZEN = 1
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
await ac_manager.start_auto_save()
@driver.on_shutdown
async def register_shutdown_hook():
"""注册关闭时需要执行的函数"""
# 停止自动保存任务
if ac_manager:
await ac_manager.stop_auto_save()
class AirConditionerManager:
def __init__(self, save_interval: int = 300): # 默认5分钟保存一次
self.save_interval = save_interval
self._save_task = None
self._running = False
async def start_auto_save(self):
"""启动自动保存任务"""
self._running = True
self._save_task = asyncio.create_task(self._auto_save_loop())
logger.info(f"自动保存任务已启动,间隔: {self.save_interval}")
async def stop_auto_save(self):
"""停止自动保存任务"""
if self._save_task:
self._running = False
self._save_task.cancel()
try:
await self._save_task
except asyncio.CancelledError:
pass
logger.info("自动保存任务已停止")
else:
logger.warning("没有正在运行的自动保存任务")
async def _auto_save_loop(self):
"""自动保存循环"""
while self._running:
try:
await asyncio.sleep(self.save_interval)
await self.save_all_instances()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"定时保存失败: {e}")
async def save_all_instances(self):
save_time = time.time()
to_remove = []
"""保存所有实例到数据库"""
for ac_id, ac_instance in AirConditioner.InstancesPool.items():
try:
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "update_ac.sql",
[(ac_instance.on, ac_instance.temperature,
ac_instance.burnt, ac_instance.frozen, ac_id),(ac_id,)]
)
if(save_time - ac_instance.instance_get_time >= 300): # 5 分钟
to_remove.append(ac_id)
except Exception as e:
logger.error(f"保存空调 {ac_id} 失败: {e}")
logger.info(f"定时保存完成,共保存 {len(AirConditioner.InstancesPool)} 个空调实例")
# 删除时间过长实例
for ac_id in to_remove:
del AirConditioner.InstancesPool[ac_id]
logger.info(f"清理长期不活跃的空调实例完成,目前池内共有 {len(AirConditioner.InstancesPool)} 个实例")
ac_manager = AirConditionerManager(save_interval=300) # 5分钟
class AirConditioner:
InstancesPool: dict[str, 'AirConditioner'] = {}
@classmethod
async def refresh_ac(cls, id: str):
cls.InstancesPool[id].instance_get_time = time.time()
@classmethod
async def storage_ac(cls, id: str, ac: 'AirConditioner'):
cls.InstancesPool[id] = ac
@classmethod
async def get_ac(cls, id: str) -> 'AirConditioner':
if(id in cls.InstancesPool):
await cls.refresh_ac(id)
return cls.InstancesPool[id]
# 如果没有,那么从数据库重新实例化一个 AC 出来
result = await db_manager.query_by_sql_file(ROOT_PATH / "sql" / "query_ac.sql", (id,))
if len(result) == 0:
ac = await cls.create_ac(id)
return ac
ac_data = result[0]
ac = AirConditioner(id)
ac.on = bool(ac_data["on"])
ac.temperature = float(ac_data["temperature"])
ac.burnt = bool(ac_data["burnt"])
ac.frozen = bool(ac_data["frozen"])
await cls.storage_ac(id, ac)
return ac
@classmethod
async def create_ac(cls, id: str) -> 'AirConditioner':
ac = AirConditioner(id)
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_ac.sql",
[(id, ac.on, ac.temperature, ac.burnt, ac.frozen),(id,)]
)
await cls.storage_ac(id, ac)
return ac
async def change_ac_temp(self, temperature_delta: float) -> None:
'''
改变空调的温度
:param temperature_delta: float 温度变化量
'''
changed_temp = self.temperature + temperature_delta
random_poss = random.random()
if temperature_delta < 0 and changed_temp < 0:
# 根据温度随机出是否冻结0度开始呈指数增长
possibility = -math.e ** (changed_temp / 50) + 1
if random_poss < possibility:
await self.broke_ac(CrashType.FROZEN)
elif temperature_delta > 0 and changed_temp > 40:
# 根据温度随机出是否烧坏40度开始呈指数增长
possibility = -math.e ** ((40-changed_temp) / 50) + 1
if random_poss < possibility:
await self.broke_ac(CrashType.BURNT)
self.temperature = changed_temp
async def update_ac(self, state: bool = None, temperature_delta: float = None, burnt: bool = None, frozen: bool = None) -> 'AirConditioner':
if state is not None:
self.on = state
if temperature_delta is not None:
await self.change_ac_temp(temperature_delta)
if burnt is not None:
self.burnt = burnt
if frozen is not None:
self.frozen = frozen
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "update_ac.sql",
# (self.on, self.temperature, self.burnt, self.frozen, self.id)
# )
return self
async def change_ac(self) -> 'AirConditioner':
self.on = False
self.temperature = 24
self.burnt = False
self.frozen = False
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "update_ac.sql",
# (self.on, self.temperature, self.burnt, self.frozen, self.id)
# )
return self
def __init__(self, id: str) -> None:
self.id = id
self.on = False
self.temperature = 24 # 默认温度
self.burnt = False
self.frozen = False
self.instance_get_time = time.time()
async def broke_ac(self, crash_type: CrashType):
'''
让空调坏掉
:param crash_type: CrashType 枚举,表示空调坏掉的类型
'''
match crash_type:
case CrashType.BURNT:
await self.update_ac(burnt=True)
case CrashType.FROZEN:
await self.update_ac(frozen=True)
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_crash.sql",
(self.id, crash_type.value)
)
# def save_crash_data(self, crash_type: CrashType):
# '''
# 如果空调爆炸了,就往本地的 ac_crash_data.json 里该 id 的记录加一
# '''
# data_file = DATA_PATH / "ac_crash_data.json"
# crash_data = {}
# if data_file.exists():
# with open(data_file, "r", encoding="utf-8") as f:
# crash_data = json.load(f)
# if self.id not in crash_data:
# crash_data[self.id] = {"burnt": 0, "frozen": 0}
# match crash_type:
# case CrashType.BURNT:
# crash_data[self.id]["burnt"] += 1
# case CrashType.FROZEN:
# crash_data[self.id]["frozen"] += 1
# with open(data_file, "w", encoding="utf-8") as f:
# json.dump(crash_data, f, ensure_ascii=False, indent=4)
def get_crashes_and_ranking(self) -> tuple[int, int]:
'''
获取该群在全国空调损坏的数量与排行榜的位置
'''
data_file = DATA_PATH / "ac_crash_data.json"
if not data_file.exists():
return 0, 1
with open(data_file, "r", encoding="utf-8") as f:
crash_data = json.load(f)
ranking_list = []
for gid, record in crash_data.items():
total = record.get("burnt", 0) + record.get("frozen", 0)
ranking_list.append((gid, total))
ranking_list.sort(key=lambda x: x[1], reverse=True)
total_crashes = crash_data.get(self.id, {}).get("burnt", 0) + crash_data.get(self.id, {}).get("frozen", 0)
rank = 1
for gid, total in ranking_list:
if gid == self.id:
break
rank += 1
return total_crashes, rank
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
"""
将文本转换为带透明背景的图像,图像大小刚好包含文本
"""
# 创建临时图像来计算文本尺寸
temp_image = Image.new('RGB', (1, 1), (255, 255, 255))
temp_draw = ImageDraw.Draw(temp_image)
font = ImageFont.truetype(FONTS_PATH / "montserrat.otf", font_size)
# 获取文本边界框
bbox = temp_draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# 计算图像大小(文本大小 + 内边距)
image_width = int(text_width + 2 * padding)
image_height = int(text_height + 2 * padding)
# 创建RGBA模式的空白图像带透明通道
image = Image.new('RGBA', (image_width, image_height), (0, 0, 0, 0))
draw = ImageDraw.Draw(image)
# 绘制文本(考虑内边距)
x = padding - bbox[0] # 调整起始位置
y = padding - bbox[1]
# 设置文本颜色(带透明度)
if len(text_color) == 3:
text_color = text_color + (255,) # 添加完全不透明的alpha值
draw.text((x, y), text, fill=text_color, font=font)
# 转换为OpenCV格式BGRA
image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGBA2BGRA)
return image_cv
def perspective_transform(image, target, corners):
"""
对图像进行透视变换(保持透明通道)
target: 画布
corners: 四个角点的坐标,顺序为 [左上, 右上, 右下, 左下]
"""
height, width = image.shape[:2]
# 源点(原始图像的四个角)
src_points = np.array([
[0, 0], # 左上
[width-1, 0], # 右上
[width-1, height-1], # 右下
[0, height-1] # 左下
], dtype=np.float32)
# 目标点(变换后的四个角)
dst_points = np.array(corners, dtype=np.float32)
# 计算透视变换矩阵
matrix = cv2.getPerspectiveTransform(src_points, dst_points)
# 获取画布大小
target_height, target_width = target.shape[:2]
# 应用透视变换保持所有通道包括alpha
transformed = cv2.warpPerspective(image, matrix, (target_width, target_height), flags=cv2.INTER_LINEAR)
return transformed, matrix
def blend_with_transparency(background, foreground, position):
"""
将带透明通道的前景图像合成到背景图像上
position: 前景图像在背景图像上的位置 (x, y)
"""
bg = background.copy()
# 如果背景没有alpha通道添加一个
if bg.shape[2] == 3:
bg = cv2.cvtColor(bg, cv2.COLOR_BGR2BGRA)
bg[:, :, 3] = 255 # 完全不透明
x, y = position
fg_height, fg_width = foreground.shape[:2]
bg_height, bg_width = bg.shape[:2]
# 确保位置在图像范围内
x = max(0, min(x, bg_width - fg_width))
y = max(0, min(y, bg_height - fg_height))
# 提取前景的alpha通道并归一化
alpha_foreground = foreground[:, :, 3] / 255.0
# 对于每个颜色通道进行合成
for c in range(3):
bg_region = bg[y:y+fg_height, x:x+fg_width, c]
fg_region = foreground[:, :, c]
# alpha混合公式
bg[y:y+fg_height, x:x+fg_width, c] = (
alpha_foreground * fg_region +
(1 - alpha_foreground) * bg_region
)
# 更新背景的alpha通道如果需要
bg_alpha_region = bg[y:y+fg_height, x:x+fg_width, 3]
bg[y:y+fg_height, x:x+fg_width, 3] = np.maximum(bg_alpha_region, foreground[:, :, 3])
return bg
def precise_blend_with_perspective(background, foreground, corners):
"""
精确合成:根据四个角点将前景图像透视合成到背景上
"""
# 创建与背景相同大小的空白图像
bg_height, bg_width = background.shape[:2]
# 如果背景没有alpha通道转换为BGRA
if background.shape[2] == 3:
background_bgra = cv2.cvtColor(background, cv2.COLOR_BGR2BGRA)
else:
background_bgra = background.copy()
# 创建与背景相同大小的前景图层
foreground_layer = np.zeros((bg_height, bg_width, 4), dtype=np.uint8)
# 计算前景图像在背景中的边界框
min_x = int(min(corners[:, 0]))
max_x = int(max(corners[:, 0]))
min_y = int(min(corners[:, 1]))
max_y = int(max(corners[:, 1]))
# 将变换后的前景图像放置到对应位置
fg_height, fg_width = foreground.shape[:2]
if min_y + fg_height <= bg_height and min_x + fg_width <= bg_width:
foreground_layer[min_y:min_y+fg_height, min_x:min_x+fg_width] = foreground
# 创建掩码(只在前景有内容的地方合成)
mask = (foreground_layer[:, :, 3] > 0)
# 合成图像
result = background_bgra.copy()
for c in range(3):
result[:, :, c][mask] = foreground_layer[:, :, c][mask]
result[:, :, 3][mask] = foreground_layer[:, :, 3][mask]
return result
def wiggle_transform(image, intensity=2) -> list[np.ndarray]:
'''
返回一组图像振动的帧组,模拟空调运作时的抖动效果
'''
frames = []
height, width = image.shape[:2]
shifts = [(-intensity, 0), (intensity, 0), (0, -intensity), (0, intensity), (0, 0)]
for dx, dy in shifts:
M = np.float32([[1, 0, dx], [0, 1, dy]])
shifted = cv2.warpAffine(image, M, (width, height))
frames.append(shifted)
return frames
async def generate_ac_image(ac: AirConditioner) -> BytesIO:
# 找到空调底图
ac_image = cv2.imread(str(ASSETS_PATH / "img" / "ac" / "ac.png"), cv2.IMREAD_UNCHANGED)
if not ac.on:
# 空调关闭状态,直接返回底图
pil_final = Image.fromarray(ac_image)
output = BytesIO()
pil_final.save(output, format="GIF")
return output
# 根据生成温度文本图像
text = f"{round(ac.temperature, 1)}°C"
text_image = text_to_transparent_image(
text,
font_size=60,
text_color=(0, 0, 0) # 黑色文字
)
# 获取长宽比
height, width = text_image.shape[:2]
aspect_ratio = width / height
# 定义3D变换的四个角点透视效果
# 顺序: [左上, 右上, 右下, 左下]
corners = np.array([
[123, 45], # 左上
[284, 101], # 右上
[290, 140], # 右下
[119, 100] # 左下
], dtype=np.float32)
# 对文本图像进行3D变换保持透明通道
transformed_text, transform_matrix = perspective_transform(text_image, ac_image, corners)
final_image_simple = blend_with_transparency(ac_image, transformed_text, (0, 0))
intensity = max(2, abs(int(ac.temperature) - 24) // 2)
frames = wiggle_transform(final_image_simple, intensity=intensity)
pil_frames = [Image.fromarray(frame) for frame in frames]
output = BytesIO()
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=50, disposal=2)
return output