120 lines
3.9 KiB
Python
120 lines
3.9 KiB
Python
from io import BytesIO
|
|
from typing import Optional, Union
|
|
from nonebot.adapters import Event as BaseEvent
|
|
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
|
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
|
from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna
|
|
from PIL import Image
|
|
from konabot.common.longtask import DepLongTaskTarget
|
|
from konabot.common.path import ASSETS_PATH
|
|
from konabot.plugins.air_conditioner.ac import AirConditioner, generate_ac_image
|
|
|
|
def get_ac(id: str) -> AirConditioner:
|
|
ac = AirConditioner.air_conditioners.get(id)
|
|
if ac is None:
|
|
ac = AirConditioner(id)
|
|
return ac
|
|
|
|
async def send_ac_image(event: type[AlconnaMatcher], ac: AirConditioner):
|
|
if(ac.burnt == True):
|
|
# 打开坏掉的空调图片
|
|
with open(ASSETS_PATH / "img" / "ac" / "broken_ac.png", "rb") as f:
|
|
# 将其转为 GIF 格式发送
|
|
output = BytesIO()
|
|
Image.open(f).save(output, format="GIF")
|
|
output.seek(0)
|
|
await event.send(await UniMessage().image(raw=output).export())
|
|
return
|
|
if(ac.frozen == True):
|
|
# 打开坏掉的空调图片
|
|
with open(ASSETS_PATH / "img" / "ac" / "frozen_ac.png", "rb") as f:
|
|
# 将其转为 GIF 格式发送
|
|
output = BytesIO()
|
|
Image.open(f).save(output, format="GIF")
|
|
output.seek(0)
|
|
await event.send(await UniMessage().image(raw=output).export())
|
|
return
|
|
ac_image = await generate_ac_image(ac)
|
|
await event.send(await UniMessage().image(raw=ac_image).export())
|
|
|
|
evt = on_alconna(Alconna(
|
|
"群空调"
|
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
|
|
|
@evt.handle()
|
|
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
|
id = target.channel_id
|
|
ac = get_ac(id)
|
|
await send_ac_image(evt, ac)
|
|
|
|
evt = on_alconna(Alconna(
|
|
"开空调"
|
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
|
|
|
@evt.handle()
|
|
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
|
id = target.channel_id
|
|
ac = get_ac(id)
|
|
ac.on = True
|
|
await send_ac_image(evt, ac)
|
|
|
|
evt = on_alconna(Alconna(
|
|
"关空调"
|
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
|
|
|
@evt.handle()
|
|
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
|
id = target.channel_id
|
|
ac = get_ac(id)
|
|
ac.on = False
|
|
await send_ac_image(evt, ac)
|
|
|
|
evt = on_alconna(Alconna(
|
|
"空调升温"
|
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
|
|
|
@evt.handle()
|
|
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
|
id = target.channel_id
|
|
ac = get_ac(id)
|
|
if not ac.on or ac.burnt == True or ac.frozen == True:
|
|
await send_ac_image(evt, ac)
|
|
return
|
|
ac.temperature += 1
|
|
if ac.temperature > 40:
|
|
# 打开爆炸图片
|
|
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
|
|
output = BytesIO()
|
|
Image.open(f).save(output, format="GIF")
|
|
await evt.send(await UniMessage().image(raw=output).export())
|
|
ac.burnt = True
|
|
await evt.send("太热啦,空调炸了!")
|
|
return
|
|
await send_ac_image(evt, ac)
|
|
|
|
evt = on_alconna(Alconna(
|
|
"空调降温"
|
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
|
|
|
@evt.handle()
|
|
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
|
id = target.channel_id
|
|
ac = get_ac(id)
|
|
if not ac.on or ac.burnt == True or ac.frozen == True:
|
|
await send_ac_image(evt, ac)
|
|
return
|
|
ac.temperature -= 1
|
|
if ac.temperature < 0:
|
|
ac.frozen = True
|
|
await send_ac_image(evt, ac)
|
|
|
|
evt = on_alconna(Alconna(
|
|
"换空调"
|
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
|
|
|
@evt.handle()
|
|
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
|
id = target.channel_id
|
|
ac = get_ac(id)
|
|
ac.change_ac()
|
|
await send_ac_image(evt, ac) |