Files
konabot/konabot/plugins/air_conditioner/__init__.py
MixBadGun f6e7dfcd93
All checks were successful
continuous-integration/drone/push Build is passing
空调最高峰,空调数据库挂载再优化
2025-11-19 16:24:24 +08:00

228 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from io import BytesIO
from typing import Optional, Union
import cv2
import nonebot
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna
from PIL import Image
import numpy as np
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
from konabot.common.web_render import WebRenderer
from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform
from pathlib import Path
import random
import math
ROOT_PATH = Path(__file__).resolve().parent
# 创建全局数据库管理器实例
db_manager = DatabaseManager()
async def get_ac(id: str) -> AirConditioner:
ac = await AirConditioner.get_ac(id)
if ac is None:
ac = AirConditioner(id)
return ac
async def send_ac_image(event: type[AlconnaMatcher], ac: AirConditioner):
if(ac.burnt == True):
# 打开坏掉的空调图片
with open(ASSETS_PATH / "img" / "ac" / "broken_ac.png", "rb") as f:
# 将其转为 GIF 格式发送
output = BytesIO()
Image.open(f).save(output, format="GIF")
output.seek(0)
await event.send(await UniMessage().image(raw=output).export())
return
if(ac.frozen == True):
# 打开坏掉的空调图片
with open(ASSETS_PATH / "img" / "ac" / "frozen_ac.png", "rb") as f:
# 将其转为 GIF 格式发送
output = BytesIO()
Image.open(f).save(output, format="GIF")
output.seek(0)
await event.send(await UniMessage().image(raw=output).export())
return
ac_image = await generate_ac_image(ac)
await event.send(await UniMessage().image(raw=ac_image).export())
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
"""注册启动时需要执行的函数"""
# 初始化数据库表
await db_manager.execute_by_sql_file(
Path(__file__).resolve().parent / "sql" / "create_table.sql"
)
@driver.on_shutdown
async def register_shutdown_hook():
"""注册关闭时需要执行的函数"""
# 关闭所有数据库连接
await db_manager.close_all_connections()
evt = on_alconna(Alconna(
"群空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(target: DepLongTaskTarget):
id = target.channel_id
ac = await get_ac(id)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"开空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(target: DepLongTaskTarget):
id = target.channel_id
ac = await get_ac(id)
await ac.update_ac(state=True)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"关空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(target: DepLongTaskTarget):
id = target.channel_id
ac = await get_ac(id)
await ac.update_ac(state=False)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调升温",
Args["temp?", Optional[Union[int, float]]] # 可选参数升温的度数默认为1
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp is None:
temp = 1
if temp <= 0:
return
id = target.channel_id
ac = await get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
await ac.update_ac(temperature_delta=temp)
if ac.burnt:
# 打开爆炸图片
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
output = BytesIO()
# 爆炸抖动
frames = wiggle_transform(np.array(Image.open(f)), intensity=5)
pil_frames = [Image.fromarray(frame) for frame in frames]
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=35, disposal=2)
output.seek(0)
await evt.send(await UniMessage().image(raw=output).export())
await evt.send("太热啦,空调炸了!")
return
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调降温",
Args["temp?", Optional[Union[int, float]]] # 可选参数降温的度数默认为1
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp is None:
temp = 1
if temp <= 0:
return
id = target.channel_id
ac = await get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
await ac.update_ac(temperature_delta=-temp)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"换空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(target: DepLongTaskTarget):
id = target.channel_id
ac = await get_ac(id)
await ac.change_ac()
await send_ac_image(evt, ac)
async def query_number_ranking(id: str) -> tuple[int, int]:
result = await db_manager.query_by_sql_file(
ROOT_PATH / "sql" / "query_crash_and_rank.sql",
(id,id)
)
if len(result) == 0:
return 0, 0
else:
# 将字典转换为值的元组
values = list(result[0].values())
return values[0], values[1]
evt = on_alconna(Alconna(
"空调炸炸排行榜",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(target: DepLongTaskTarget):
id = target.channel_id
# ac = get_ac(id)
# number, ranking = ac.get_crashes_and_ranking()
number, ranking = await query_number_ranking(id)
params = {
"number": number,
"ranking": ranking
}
image = await WebRenderer.render_file(
file_path=ASSETS_PATH / "webpage" / "ac" / "index.html",
target=".box",
params=params
)
await evt.send(await UniMessage().image(raw=image).export())
evt = on_alconna(Alconna(
"空调最高峰",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(target: DepLongTaskTarget):
result = await db_manager.query_by_sql_file(
ROOT_PATH / "sql" / "query_peak.sql"
)
if len(result) == 0:
await evt.send("没有空调记录!")
return
max_temp = result[0].get("max")
min_temp = result[0].get("min")
his_max = result[0].get("his_max")
his_min = result[0].get("his_min")
# 再从内存里的空调池中获取最高温度和最低温度
for ac in AirConditioner.InstancesPool.values():
if ac.on and not ac.burnt and not ac.frozen:
if max_temp is None or min_temp is None:
max_temp = ac.temperature
min_temp = ac.temperature
max_temp = max(max_temp, ac.temperature)
min_temp = min(min_temp, ac.temperature)
if max_temp is None or min_temp is None:
await evt.send(f"目前全部空调都被炸掉了!")
else:
await evt.send(f"全球在线空调最高温度为 {'%.1f' % max_temp}°C最低温度为 {'%.1f' % min_temp}°C")
if his_max is None or his_min is None:
pass
else:
await evt.send(f"历史最高温度为 {'%.1f' % his_max}°C最低温度为 {'%.1f' % his_min}°C\n(要进入历史记录,温度需至少保持 5 分钟)")