from io import BytesIO from typing import Optional, Union import cv2 import nonebot from nonebot.adapters import Event as BaseEvent from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna from PIL import Image import numpy as np from konabot.common.database import DatabaseManager from konabot.common.longtask import DepLongTaskTarget from konabot.common.path import ASSETS_PATH from konabot.common.web_render import WebRenderer from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform from pathlib import Path import random import math ROOT_PATH = Path(__file__).resolve().parent # 创建全局数据库管理器实例 db_manager = DatabaseManager() async def get_ac(id: str) -> AirConditioner: ac = await AirConditioner.get_ac(id) if ac is None: ac = AirConditioner(id) return ac async def send_ac_image(event: type[AlconnaMatcher], ac: AirConditioner): if(ac.burnt == True): # 打开坏掉的空调图片 with open(ASSETS_PATH / "img" / "ac" / "broken_ac.png", "rb") as f: # 将其转为 GIF 格式发送 output = BytesIO() Image.open(f).save(output, format="GIF") output.seek(0) await event.send(await UniMessage().image(raw=output).export()) return if(ac.frozen == True): # 打开坏掉的空调图片 with open(ASSETS_PATH / "img" / "ac" / "frozen_ac.png", "rb") as f: # 将其转为 GIF 格式发送 output = BytesIO() Image.open(f).save(output, format="GIF") output.seek(0) await event.send(await UniMessage().image(raw=output).export()) return ac_image = await generate_ac_image(ac) await event.send(await UniMessage().image(raw=ac_image).export()) driver = nonebot.get_driver() @driver.on_startup async def register_startup_hook(): """注册启动时需要执行的函数""" # 初始化数据库表 await db_manager.execute_by_sql_file( Path(__file__).resolve().parent / "sql" / "create_table.sql" ) @driver.on_shutdown async def register_shutdown_hook(): """注册关闭时需要执行的函数""" # 关闭所有数据库连接 await db_manager.close_all_connections() evt = on_alconna(Alconna( "群空调" ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(target: DepLongTaskTarget): id = target.channel_id ac = await get_ac(id) await send_ac_image(evt, ac) evt = on_alconna(Alconna( "开空调" ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(target: DepLongTaskTarget): id = target.channel_id ac = await get_ac(id) await ac.update_ac(state=True) await send_ac_image(evt, ac) evt = on_alconna(Alconna( "关空调" ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(target: DepLongTaskTarget): id = target.channel_id ac = await get_ac(id) await ac.update_ac(state=False) await send_ac_image(evt, ac) evt = on_alconna(Alconna( "空调升温", Args["temp?", Optional[Union[int, float]]] # 可选参数,升温的度数,默认为1 ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1): if temp is None: temp = 1 if temp <= 0: return id = target.channel_id ac = await get_ac(id) if not ac.on or ac.burnt == True or ac.frozen == True: await send_ac_image(evt, ac) return await ac.update_ac(temperature_delta=temp) if ac.temperature > 40: # 根据温度随机出是否爆炸,40度开始,呈指数增长 possibility = -math.e ** ((40-ac.temperature) / 50) + 1 if random.random() < possibility: # 打开爆炸图片 with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f: output = BytesIO() # 爆炸抖动 frames = wiggle_transform(np.array(Image.open(f)), intensity=5) pil_frames = [Image.fromarray(frame) for frame in frames] pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=35, disposal=2) output.seek(0) await evt.send(await UniMessage().image(raw=output).export()) await ac.broke_ac(CrashType.BURNT) await evt.send("太热啦,空调炸了!") return await send_ac_image(evt, ac) evt = on_alconna(Alconna( "空调降温", Args["temp?", Optional[Union[int, float]]] # 可选参数,降温的度数,默认为1 ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1): if temp is None: temp = 1 if temp <= 0: return id = target.channel_id ac = await get_ac(id) if not ac.on or ac.burnt == True or ac.frozen == True: await send_ac_image(evt, ac) return await ac.update_ac(temperature_delta=-temp) if ac.temperature < 0: # 根据温度随机出是否冻结,0度开始,呈指数增长 possibility = -math.e ** (ac.temperature / 50) + 1 if random.random() < possibility: await ac.broke_ac(CrashType.FROZEN) await send_ac_image(evt, ac) evt = on_alconna(Alconna( "换空调" ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(target: DepLongTaskTarget): id = target.channel_id ac = await get_ac(id) await ac.change_ac() await send_ac_image(evt, ac) async def query_number_ranking(id: str) -> tuple[int, int]: result = await db_manager.query_by_sql_file( ROOT_PATH / "sql" / "query_crash_and_rank.sql", (id,id) ) if len(result) == 0: return 0, 0 else: # 将字典转换为值的元组 values = list(result[0].values()) return values[0], values[1] evt = on_alconna(Alconna( "空调炸炸排行榜", ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(target: DepLongTaskTarget): id = target.channel_id # ac = get_ac(id) # number, ranking = ac.get_crashes_and_ranking() number, ranking = await query_number_ranking(id) params = { "number": number, "ranking": ranking } image = await WebRenderer.render_file( file_path=ASSETS_PATH / "webpage" / "ac" / "index.html", target=".box", params=params ) await evt.send(await UniMessage().image(raw=image).export())