Files
konabot/konabot/plugins/air_conditioner/__init__.py
2025-10-25 21:54:38 +08:00

91 lines
3.2 KiB
Python

from typing import Optional
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
from konabot.common.web_render import WebRenderer
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from playwright.async_api import Page
async def open_handle(page: Page) -> None:
'''
开空调
'''
# 找到 id 为 power 的开关按钮元素
power_button = await page.query_selector("#power")
if power_button:
# 点击按钮打开空调
await power_button.click(force=True)
async def up_handle(page: Page) -> None:
'''
升温
'''
# 找到 id 为 add 的按钮元素
add_button = await page.query_selector("#add")
if add_button:
# 点击按钮升温,无需检测是否稳定
await add_button.click(force=True)
async def down_handle(page: Page) -> None:
'''
降温
'''
# 找到 id 为 minus 的按钮元素
minus_button = await page.query_selector("#minus")
if minus_button:
# 点击按钮降温
await minus_button.click(force=True)
def get_user_info(event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
return user_id, user_name
evt = on_alconna(
Alconna(
f"群空调",
Args["condition", str]
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, condition: Optional[str] = ""):
identify_code = f"air_conditioner_{target.channel_id}"
function_handle = None
match condition:
case "开空调" | "打开空调" | "启动空调" | "关闭空调" | "关空调" | "开关空调":
function_handle = open_handle
case "升温" | "调高温度" | "加温" | "" | "调高" | "提高" | "加一度":
function_handle = up_handle
case "降温" | "调低温度" | "减温" | "" | "调低" | "降低" | "减一度":
function_handle = down_handle
case "炸空调":
await WebRenderer.close_persistent_page(identify_code)
# 读取 boom 图片
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
boom_image = f.read()
await evt.send(await UniMessage().image(raw=boom_image).export())
user_id, _ = get_user_info(event)
await evt.send(await UniMessage().at(user_id).text("空调被你炸毁了!我们重新装了一台!").export())
return
case _:
return
screenshot = await WebRenderer.render_persistent_page(
page_id=identify_code,
url="https://toolwa.com/ac/",
target="#kt",
other_function=lambda page: function_handle(page) if function_handle else None,
timeout=30
)
await evt.send(
await UniMessage().image(raw=screenshot).export()
)