from io import BytesIO from typing import Optional, Union import cv2 from nonebot.adapters import Event as BaseEvent from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna from PIL import Image import numpy as np from konabot.common.longtask import DepLongTaskTarget from konabot.common.path import ASSETS_PATH from konabot.common.web_render import WebRenderer from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform import random import math def get_ac(id: str) -> AirConditioner: ac = AirConditioner.air_conditioners.get(id) if ac is None: ac = AirConditioner(id) return ac async def send_ac_image(event: type[AlconnaMatcher], ac: AirConditioner): if(ac.burnt == True): # 打开坏掉的空调图片 with open(ASSETS_PATH / "img" / "ac" / "broken_ac.png", "rb") as f: # 将其转为 GIF 格式发送 output = BytesIO() Image.open(f).save(output, format="GIF") output.seek(0) await event.send(await UniMessage().image(raw=output).export()) return if(ac.frozen == True): # 打开坏掉的空调图片 with open(ASSETS_PATH / "img" / "ac" / "frozen_ac.png", "rb") as f: # 将其转为 GIF 格式发送 output = BytesIO() Image.open(f).save(output, format="GIF") output.seek(0) await event.send(await UniMessage().image(raw=output).export()) return ac_image = await generate_ac_image(ac) await event.send(await UniMessage().image(raw=ac_image).export()) evt = on_alconna(Alconna( "群空调" ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): id = target.channel_id ac = get_ac(id) await send_ac_image(evt, ac) evt = on_alconna(Alconna( "开空调" ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): id = target.channel_id ac = get_ac(id) ac.on = True await send_ac_image(evt, ac) evt = on_alconna(Alconna( "关空调" ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): id = target.channel_id ac = get_ac(id) ac.on = False await send_ac_image(evt, ac) evt = on_alconna(Alconna( "空调升温", Args["temp?", Optional[Union[int, float]]] # 可选参数,升温的度数,默认为1 ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1): if temp <= 0: return id = target.channel_id ac = get_ac(id) if not ac.on or ac.burnt == True or ac.frozen == True: await send_ac_image(evt, ac) return ac.temperature += temp if ac.temperature > 40: # 根据温度随机出是否爆炸,40度开始,呈指数增长 possibility = -math.e ** ((40-ac.temperature) / 50) + 1 if random.random() < possibility: # 打开爆炸图片 with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f: output = BytesIO() # 爆炸抖动 frames = wiggle_transform(np.array(Image.open(f)), intensity=5) pil_frames = [Image.fromarray(frame) for frame in frames] pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=35, disposal=2) output.seek(0) await evt.send(await UniMessage().image(raw=output).export()) ac.broke_ac(CrashType.BURNT) await evt.send("太热啦,空调炸了!") return await send_ac_image(evt, ac) evt = on_alconna(Alconna( "空调降温", Args["temp?", Optional[Union[int, float]]] # 可选参数,降温的度数,默认为1 ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1): if temp <= 0: return id = target.channel_id ac = get_ac(id) if not ac.on or ac.burnt == True or ac.frozen == True: await send_ac_image(evt, ac) return ac.temperature -= temp if ac.temperature < 0: # 根据温度随机出是否冻结,0度开始,呈指数增长 possibility = -math.e ** (ac.temperature / 50) + 1 if random.random() < possibility: ac.broke_ac(CrashType.FROZEN) await send_ac_image(evt, ac) evt = on_alconna(Alconna( "换空调" ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): id = target.channel_id ac = get_ac(id) ac.change_ac() await send_ac_image(evt, ac) evt = on_alconna(Alconna( "空调炸炸排行榜", ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): id = target.channel_id ac = get_ac(id) number, ranking = ac.get_crashes_and_ranking() params = { "number": number, "ranking": ranking } image = await WebRenderer.render_file( file_path=ASSETS_PATH / "webpage" / "ac" / "index.html", target=".box", params=params ) await evt.send(await UniMessage().image(raw=image).export())