diff --git a/assets/webpage/ac/assets/background.png b/assets/webpage/ac/assets/background.png new file mode 100644 index 0000000..e4280c9 Binary files /dev/null and b/assets/webpage/ac/assets/background.png differ diff --git a/assets/webpage/ac/index.html b/assets/webpage/ac/index.html new file mode 100644 index 0000000..34f9fec --- /dev/null +++ b/assets/webpage/ac/index.html @@ -0,0 +1,76 @@ + + + + + 空调炸炸排行榜 + + +
+
位居全球第 200
+
您的群总共坏了 200 台空调
+ 空调炸炸排行榜 +
+ + + + \ No newline at end of file diff --git a/konabot/common/web_render/__init__.py b/konabot/common/web_render/__init__.py index a06e1bd..3478392 100644 --- a/konabot/common/web_render/__init__.py +++ b/konabot/common/web_render/__init__.py @@ -80,6 +80,30 @@ class WebRenderer: page = cls.page_pool[page_id] return await instance.render_with_page(page, url, target, params=params, other_function=other_function, timeout=timeout) + @classmethod + async def render_file( + cls, + file_path: str, + target: str, + params: dict = {}, + other_function: PageFunction | None = None, + timeout: int = 30, + ) -> bytes: + ''' + 访问指定本地文件URL并返回截图 + + :param file_path: 目标文件路径 + :param target: 渲染目标,如 ".box"、"#main" 等CSS选择器 + :param timeout: 页面加载超时时间,单位秒 + :param params: URL键值对参数 + :param other_function: 其他自定义操作函数,接受page参数 + :return: 截图的字节数据 + + ''' + instance = await cls.get_browser_instance() + logger.debug(f"Using WebRendererInstance {id(instance)} to render file {file_path} targeting {target}") + return await instance.render_file(file_path, target, params=params, other_function=other_function, timeout=timeout) + @classmethod async def close_persistent_page(cls, page_id: str) -> None: ''' @@ -154,6 +178,10 @@ class WebRendererInstance: async with self.lock: screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout) return screenshot + + async def render_file(self, file_path: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes: + file_path = "file:///" + str(file_path).replace("\\", "/") + return await self.render(file_path, target, index, params, other_function, timeout) async def inner_render(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes: logger.debug(f"Navigating to {url} with timeout {timeout}") diff --git a/konabot/plugins/air_conditioner/__init__.py b/konabot/plugins/air_conditioner/__init__.py index cf86e65..4f921fe 100644 --- a/konabot/plugins/air_conditioner/__init__.py +++ b/konabot/plugins/air_conditioner/__init__.py @@ -1,13 +1,16 @@ from io import BytesIO from typing import Optional, Union +import cv2 from nonebot.adapters import Event as BaseEvent from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna from PIL import Image +import numpy as np from konabot.common.longtask import DepLongTaskTarget from konabot.common.path import ASSETS_PATH -from konabot.plugins.air_conditioner.ac import AirConditioner, generate_ac_image +from konabot.common.web_render import WebRenderer +from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform import random import math @@ -73,17 +76,20 @@ async def _(event: BaseEvent, target: DepLongTaskTarget): await send_ac_image(evt, ac) evt = on_alconna(Alconna( - "空调升温" + "空调升温", + Args["temp?", Optional[Union[int, float]]] # 可选参数,升温的度数,默认为1 ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() -async def _(event: BaseEvent, target: DepLongTaskTarget): +async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1): + if temp <= 0: + return id = target.channel_id ac = get_ac(id) if not ac.on or ac.burnt == True or ac.frozen == True: await send_ac_image(evt, ac) return - ac.temperature += 1 + ac.temperature += temp if ac.temperature > 40: # 根据温度随机出是否爆炸,40度开始,呈指数增长 possibility = -math.e ** ((40-ac.temperature) / 50) + 1 @@ -91,30 +97,37 @@ async def _(event: BaseEvent, target: DepLongTaskTarget): # 打开爆炸图片 with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f: output = BytesIO() - Image.open(f).save(output, format="GIF") + # 爆炸抖动 + frames = wiggle_transform(np.array(Image.open(f)), intensity=5) + pil_frames = [Image.fromarray(frame) for frame in frames] + pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=35, disposal=2) + output.seek(0) await evt.send(await UniMessage().image(raw=output).export()) - ac.burnt = True + ac.broke_ac(CrashType.BURNT) await evt.send("太热啦,空调炸了!") return await send_ac_image(evt, ac) evt = on_alconna(Alconna( - "空调降温" + "空调降温", + Args["temp?", Optional[Union[int, float]]] # 可选参数,降温的度数,默认为1 ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() -async def _(event: BaseEvent, target: DepLongTaskTarget): +async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1): + if temp <= 0: + return id = target.channel_id ac = get_ac(id) if not ac.on or ac.burnt == True or ac.frozen == True: await send_ac_image(evt, ac) return - ac.temperature -= 1 + ac.temperature -= temp if ac.temperature < 0: # 根据温度随机出是否冻结,0度开始,呈指数增长 possibility = -math.e ** (ac.temperature / 50) + 1 if random.random() < possibility: - ac.frozen = True + ac.broke_ac(CrashType.FROZEN) await send_ac_image(evt, ac) evt = on_alconna(Alconna( @@ -126,4 +139,24 @@ async def _(event: BaseEvent, target: DepLongTaskTarget): id = target.channel_id ac = get_ac(id) ac.change_ac() - await send_ac_image(evt, ac) \ No newline at end of file + await send_ac_image(evt, ac) + +evt = on_alconna(Alconna( + "空调炸炸排行榜", +), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) + +@evt.handle() +async def _(event: BaseEvent, target: DepLongTaskTarget): + id = target.channel_id + ac = get_ac(id) + number, ranking = ac.get_crashes_and_ranking() + params = { + "number": number, + "ranking": ranking + } + image = await WebRenderer.render_file( + file_path=ASSETS_PATH / "webpage" / "ac" / "index.html", + target=".box", + params=params + ) + await evt.send(await UniMessage().image(raw=image).export()) \ No newline at end of file diff --git a/konabot/plugins/air_conditioner/ac.py b/konabot/plugins/air_conditioner/ac.py index 7e3468e..6614784 100644 --- a/konabot/plugins/air_conditioner/ac.py +++ b/konabot/plugins/air_conditioner/ac.py @@ -1,3 +1,4 @@ +from enum import Enum from io import BytesIO import cv2 @@ -5,6 +6,12 @@ import numpy as np from PIL import Image, ImageDraw, ImageFont from konabot.common.path import ASSETS_PATH, FONTS_PATH +from konabot.common.path import DATA_PATH +import json + +class CrashType(Enum): + BURNT = 0 + FROZEN = 1 class AirConditioner: air_conditioners: dict[str, "AirConditioner"] = {} @@ -23,6 +30,60 @@ class AirConditioner: self.on = False self.temperature = 24 # 重置为默认温度 + def broke_ac(self, crash_type: CrashType): + ''' + 让空调坏掉,并保存数据 + + :param crash_type: CrashType 枚举,表示空调坏掉的类型 + ''' + match crash_type: + case CrashType.BURNT: + self.burnt = True + case CrashType.FROZEN: + self.frozen = True + self.save_crash_data(crash_type) + + def save_crash_data(self, crash_type: CrashType): + ''' + 如果空调爆炸了,就往本地的 ac_crash_data.json 里该 id 的记录加一 + ''' + data_file = DATA_PATH / "ac_crash_data.json" + crash_data = {} + if data_file.exists(): + with open(data_file, "r", encoding="utf-8") as f: + crash_data = json.load(f) + if self.id not in crash_data: + crash_data[self.id] = {"burnt": 0, "frozen": 0} + match crash_type: + case CrashType.BURNT: + crash_data[self.id]["burnt"] += 1 + case CrashType.FROZEN: + crash_data[self.id]["frozen"] += 1 + with open(data_file, "w", encoding="utf-8") as f: + json.dump(crash_data, f, ensure_ascii=False, indent=4) + + def get_crashes_and_ranking(self) -> tuple[int, int]: + ''' + 获取该群在全国空调损坏的数量与排行榜的位置 + ''' + data_file = DATA_PATH / "ac_crash_data.json" + if not data_file.exists(): + return 0, 1 + with open(data_file, "r", encoding="utf-8") as f: + crash_data = json.load(f) + ranking_list = [] + for gid, record in crash_data.items(): + total = record.get("burnt", 0) + record.get("frozen", 0) + ranking_list.append((gid, total)) + ranking_list.sort(key=lambda x: x[1], reverse=True) + total_crashes = crash_data.get(self.id, {}).get("burnt", 0) + crash_data.get(self.id, {}).get("frozen", 0) + rank = 1 + for gid, total in ranking_list: + if gid == self.id: + break + rank += 1 + return total_crashes, rank + def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)): """ 将文本转换为带透明背景的图像,图像大小刚好包含文本 @@ -193,7 +254,7 @@ async def generate_ac_image(ac: AirConditioner) -> BytesIO: return output # 根据生成温度文本图像 - text = f"{ac.temperature}°C" + text = f"{round(ac.temperature, 1)}°C" text_image = text_to_transparent_image( text, font_size=60, @@ -218,7 +279,7 @@ async def generate_ac_image(ac: AirConditioner) -> BytesIO: final_image_simple = blend_with_transparency(ac_image, transformed_text, (0, 0)) - intensity = max(2, abs(ac.temperature - 24) // 2) + intensity = max(2, abs(int(ac.temperature) - 24) // 2) frames = wiggle_transform(final_image_simple, intensity=intensity) pil_frames = [Image.fromarray(frame) for frame in frames]