成语接龙5.0、群空调功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing

This commit is contained in:
2025-10-25 23:39:32 +08:00
parent 4d5678efac
commit 7bbd4f81ee
7 changed files with 425 additions and 129 deletions

View File

@ -1,91 +1,120 @@
from typing import Optional
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
from io import BytesIO
from typing import Optional, Union
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna
from PIL import Image
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
from konabot.common.web_render import WebRenderer
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from playwright.async_api import Page
from konabot.plugins.air_conditioner.ac import AirConditioner, generate_ac_image
async def open_handle(page: Page) -> None:
'''
开空调
'''
# 找到 id 为 power 的开关按钮元素
power_button = await page.query_selector("#power")
if power_button:
# 点击按钮打开空调
await power_button.click(force=True)
def get_ac(id: str) -> AirConditioner:
ac = AirConditioner.air_conditioners.get(id)
if ac is None:
ac = AirConditioner(id)
return ac
async def up_handle(page: Page) -> None:
'''
升温
'''
# 找到 id 为 add 的按钮元素
add_button = await page.query_selector("#add")
if add_button:
# 点击按钮升温,无需检测是否稳定
await add_button.click(force=True)
async def send_ac_image(event: type[AlconnaMatcher], ac: AirConditioner):
if(ac.burnt == True):
# 打开坏掉的空调图片
with open(ASSETS_PATH / "img" / "ac" / "broken_ac.png", "rb") as f:
# 将其转为 GIF 格式发送
output = BytesIO()
Image.open(f).save(output, format="GIF")
output.seek(0)
await event.send(await UniMessage().image(raw=output).export())
return
if(ac.frozen == True):
# 打开坏掉的空调图片
with open(ASSETS_PATH / "img" / "ac" / "frozen_ac.png", "rb") as f:
# 将其转为 GIF 格式发送
output = BytesIO()
Image.open(f).save(output, format="GIF")
output.seek(0)
await event.send(await UniMessage().image(raw=output).export())
return
ac_image = await generate_ac_image(ac)
await event.send(await UniMessage().image(raw=ac_image).export())
async def down_handle(page: Page) -> None:
'''
降温
'''
# 找到 id 为 minus 的按钮元素
minus_button = await page.query_selector("#minus")
if minus_button:
# 点击按钮降温
await minus_button.click(force=True)
evt = on_alconna(Alconna(
"群空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
def get_user_info(event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
return user_id, user_name
evt = on_alconna(
Alconna(
f"群空调",
Args["condition", str]
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent, target: DepLongTaskTarget, condition: Optional[str] = ""):
identify_code = f"air_conditioner_{target.channel_id}"
function_handle = None
match condition:
case "开空调" | "打开空调" | "启动空调" | "关闭空调" | "关空调" | "开关空调":
function_handle = open_handle
case "升温" | "调高温度" | "加温" | "" | "调高" | "提高" | "加一度":
function_handle = up_handle
case "降温" | "调低温度" | "减温" | "" | "调低" | "降低" | "减一度":
function_handle = down_handle
case "炸空调":
await WebRenderer.close_persistent_page(identify_code)
# 读取 boom 图片
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
boom_image = f.read()
await evt.send(await UniMessage().image(raw=boom_image).export())
user_id, _ = get_user_info(event)
await evt.send(await UniMessage().at(user_id).text("空调被你炸毁了!我们重新装了一台!").export())
return
case _:
return
screenshot = await WebRenderer.render_persistent_page(
page_id=identify_code,
url="https://toolwa.com/ac/",
target="#kt",
other_function=lambda page: function_handle(page) if function_handle else None,
timeout=30
)
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
await send_ac_image(evt, ac)
await evt.send(
await UniMessage().image(raw=screenshot).export()
)
evt = on_alconna(Alconna(
"开空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.on = True
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"关空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.on = False
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调升温"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
ac.temperature += 1
if ac.temperature > 40:
# 打开爆炸图片
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
output = BytesIO()
Image.open(f).save(output, format="GIF")
await evt.send(await UniMessage().image(raw=output).export())
ac.burnt = True
await evt.send("太热啦,空调炸了!")
return
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"空调降温"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
ac.temperature -= 1
if ac.temperature < 0:
ac.frozen = True
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
"换空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.change_ac()
await send_ac_image(evt, ac)

View File

@ -0,0 +1,225 @@
from io import BytesIO
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from konabot.common.path import ASSETS_PATH, FONTS_PATH
class AirConditioner:
air_conditioners: dict[str, "AirConditioner"] = {}
def __init__(self, id: str) -> None:
self.id = id
self.on = False
self.temperature = 24 # 默认温度
self.burnt = False
self.frozen = False
AirConditioner.air_conditioners[id] = self
def change_ac(self):
self.burnt = False
self.frozen = False
self.on = False
self.temperature = 24 # 重置为默认温度
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
"""
将文本转换为带透明背景的图像,图像大小刚好包含文本
"""
# 创建临时图像来计算文本尺寸
temp_image = Image.new('RGB', (1, 1), (255, 255, 255))
temp_draw = ImageDraw.Draw(temp_image)
font = ImageFont.truetype(FONTS_PATH / "montserrat.otf", font_size)
# 获取文本边界框
bbox = temp_draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# 计算图像大小(文本大小 + 内边距)
image_width = int(text_width + 2 * padding)
image_height = int(text_height + 2 * padding)
# 创建RGBA模式的空白图像带透明通道
image = Image.new('RGBA', (image_width, image_height), (0, 0, 0, 0))
draw = ImageDraw.Draw(image)
# 绘制文本(考虑内边距)
x = padding - bbox[0] # 调整起始位置
y = padding - bbox[1]
# 设置文本颜色(带透明度)
if len(text_color) == 3:
text_color = text_color + (255,) # 添加完全不透明的alpha值
draw.text((x, y), text, fill=text_color, font=font)
# 转换为OpenCV格式BGRA
image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGBA2BGRA)
return image_cv
def perspective_transform(image, target, corners):
"""
对图像进行透视变换(保持透明通道)
target: 画布
corners: 四个角点的坐标,顺序为 [左上, 右上, 右下, 左下]
"""
height, width = image.shape[:2]
# 源点(原始图像的四个角)
src_points = np.array([
[0, 0], # 左上
[width-1, 0], # 右上
[width-1, height-1], # 右下
[0, height-1] # 左下
], dtype=np.float32)
# 目标点(变换后的四个角)
dst_points = np.array(corners, dtype=np.float32)
# 计算透视变换矩阵
matrix = cv2.getPerspectiveTransform(src_points, dst_points)
# 获取画布大小
target_height, target_width = target.shape[:2]
# 应用透视变换保持所有通道包括alpha
transformed = cv2.warpPerspective(image, matrix, (target_width, target_height), flags=cv2.INTER_LINEAR)
return transformed, matrix
def blend_with_transparency(background, foreground, position):
"""
将带透明通道的前景图像合成到背景图像上
position: 前景图像在背景图像上的位置 (x, y)
"""
bg = background.copy()
# 如果背景没有alpha通道添加一个
if bg.shape[2] == 3:
bg = cv2.cvtColor(bg, cv2.COLOR_BGR2BGRA)
bg[:, :, 3] = 255 # 完全不透明
x, y = position
fg_height, fg_width = foreground.shape[:2]
bg_height, bg_width = bg.shape[:2]
# 确保位置在图像范围内
x = max(0, min(x, bg_width - fg_width))
y = max(0, min(y, bg_height - fg_height))
# 提取前景的alpha通道并归一化
alpha_foreground = foreground[:, :, 3] / 255.0
# 对于每个颜色通道进行合成
for c in range(3):
bg_region = bg[y:y+fg_height, x:x+fg_width, c]
fg_region = foreground[:, :, c]
# alpha混合公式
bg[y:y+fg_height, x:x+fg_width, c] = (
alpha_foreground * fg_region +
(1 - alpha_foreground) * bg_region
)
# 更新背景的alpha通道如果需要
bg_alpha_region = bg[y:y+fg_height, x:x+fg_width, 3]
bg[y:y+fg_height, x:x+fg_width, 3] = np.maximum(bg_alpha_region, foreground[:, :, 3])
return bg
def precise_blend_with_perspective(background, foreground, corners):
"""
精确合成:根据四个角点将前景图像透视合成到背景上
"""
# 创建与背景相同大小的空白图像
bg_height, bg_width = background.shape[:2]
# 如果背景没有alpha通道转换为BGRA
if background.shape[2] == 3:
background_bgra = cv2.cvtColor(background, cv2.COLOR_BGR2BGRA)
else:
background_bgra = background.copy()
# 创建与背景相同大小的前景图层
foreground_layer = np.zeros((bg_height, bg_width, 4), dtype=np.uint8)
# 计算前景图像在背景中的边界框
min_x = int(min(corners[:, 0]))
max_x = int(max(corners[:, 0]))
min_y = int(min(corners[:, 1]))
max_y = int(max(corners[:, 1]))
# 将变换后的前景图像放置到对应位置
fg_height, fg_width = foreground.shape[:2]
if min_y + fg_height <= bg_height and min_x + fg_width <= bg_width:
foreground_layer[min_y:min_y+fg_height, min_x:min_x+fg_width] = foreground
# 创建掩码(只在前景有内容的地方合成)
mask = (foreground_layer[:, :, 3] > 0)
# 合成图像
result = background_bgra.copy()
for c in range(3):
result[:, :, c][mask] = foreground_layer[:, :, c][mask]
result[:, :, 3][mask] = foreground_layer[:, :, 3][mask]
return result
def wiggle_transform(image) -> list[np.ndarray]:
'''
返回一组图像振动的帧组,模拟空调运作时的抖动效果
'''
frames = []
height, width = image.shape[:2]
shifts = [(-2, 0), (2, 0), (0, -2), (0, 2), (0, 0)]
for dx, dy in shifts:
M = np.float32([[1, 0, dx], [0, 1, dy]])
shifted = cv2.warpAffine(image, M, (width, height))
frames.append(shifted)
return frames
async def generate_ac_image(ac: AirConditioner) -> BytesIO:
# 找到空调底图
ac_image = cv2.imread(str(ASSETS_PATH / "img" / "ac" / "ac.png"), cv2.IMREAD_UNCHANGED)
if not ac.on:
# 空调关闭状态,直接返回底图
pil_final = Image.fromarray(ac_image)
output = BytesIO()
pil_final.save(output, format="GIF")
return output
# 根据生成温度文本图像
text = f"{ac.temperature}°C"
text_image = text_to_transparent_image(
text,
font_size=60,
text_color=(0, 0, 0) # 黑色文字
)
# 获取长宽比
height, width = text_image.shape[:2]
aspect_ratio = width / height
# 定义3D变换的四个角点透视效果
# 顺序: [左上, 右上, 右下, 左下]
corners = np.array([
[123, 45], # 左上
[284, 101], # 右上
[290, 140], # 右下
[119, 100] # 左下
], dtype=np.float32)
# 对文本图像进行3D变换保持透明通道
transformed_text, transform_matrix = perspective_transform(text_image, ac_image, corners)
final_image_simple = blend_with_transparency(ac_image, transformed_text, (0, 0))
frames = wiggle_transform(final_image_simple)
pil_frames = [Image.fromarray(frame) for frame in frames]
output = BytesIO()
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=50)
return output