Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ce2b7fd6f6 | |||
| b28f8f85a2 | |||
| 0acffea86d | |||
| 3e395f8a35 | |||
| 312e203bbe | |||
| f9deabfce0 | |||
| 0a822bf440 | |||
| 534a2c9e75 | |||
| a03cef4124 | |||
| 7a20c3fe2f | |||
| 16351792b6 | |||
| 7bbd4f81ee | |||
| 4d5678efac | |||
| c7229bb763 | |||
| 6abc963ccf | |||
| 881f38d187 | |||
| 56d32bc9f4 | |||
| 76f19f9eac | |||
| 1479d8f8da |
@ -14,7 +14,8 @@ RUN apt-get update && \
|
||||
libxkbfile1 libxmu6 libxpm4 libxrender1 libxt6t64 x11-common x11-xkb-utils \
|
||||
xfonts-encodings xfonts-utils xkb-data xserver-common libnspr4 libatk1.0-0t64 \
|
||||
libatk-bridge2.0-0t64 libatspi2.0-0t64 libxcomposite1 libxdamage1 libxfixes3 \
|
||||
libxkbcommon0 libasound2t64 libnss3 \
|
||||
libxkbcommon0 libasound2t64 libnss3 fonts-noto-cjk fonts-noto-cjk-extra \
|
||||
fonts-noto-color-emoji \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
|
||||
BIN
assets/img/ac/ac.png
Normal file
BIN
assets/img/ac/ac.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 57 KiB |
BIN
assets/img/ac/broken_ac.png
Normal file
BIN
assets/img/ac/broken_ac.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 29 KiB |
BIN
assets/img/ac/frozen_ac.png
Normal file
BIN
assets/img/ac/frozen_ac.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 87 KiB |
BIN
assets/img/other/boom.jpg
Normal file
BIN
assets/img/other/boom.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 29 KiB |
BIN
assets/webpage/ac/assets/background.png
Normal file
BIN
assets/webpage/ac/assets/background.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.1 MiB |
76
assets/webpage/ac/index.html
Normal file
76
assets/webpage/ac/index.html
Normal file
@ -0,0 +1,76 @@
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>空调炸炸排行榜</title>
|
||||
</head>
|
||||
<body>
|
||||
<div class="box">
|
||||
<div class="text">位居全球第 <span id="ranking" class="ranking">200</span>!</div>
|
||||
<div class="text-2">您的群总共坏了 <span id="number" class="number">200</span> 台空调</div>
|
||||
<img class="background" src="./assets/background.png" alt="空调炸炸排行榜">
|
||||
</div>
|
||||
</body>
|
||||
<style>
|
||||
.box {
|
||||
position: relative;
|
||||
width: 1024px;
|
||||
}
|
||||
.number {
|
||||
font-size: 2em;
|
||||
color: #ffdd00;
|
||||
text-shadow: 3px 3px 6px rgba(0, 0, 0, 0.7);
|
||||
font-weight: bold;
|
||||
font-stretch: 50%;
|
||||
max-width: 520px;
|
||||
word-wrap: break-word;
|
||||
line-height: 0.8em;
|
||||
}
|
||||
.background {
|
||||
width: 1024px;
|
||||
}
|
||||
.text {
|
||||
position: absolute;
|
||||
top: 125px;
|
||||
width: 100%;
|
||||
font-size: 72px;
|
||||
color: white;
|
||||
text-shadow: 2px 2px 4px rgba(0, 0, 0, 0.7);
|
||||
font-weight: bolder;
|
||||
display: flex;
|
||||
align-items: baseline;
|
||||
justify-content: center;
|
||||
}
|
||||
.text-2 {
|
||||
position: absolute;
|
||||
top: 50px;
|
||||
width: 100%;
|
||||
font-size: 48px;
|
||||
color: white;
|
||||
text-shadow: 2px 2px 4px rgba(0, 0, 0, 0.7);
|
||||
font-weight: bolder;
|
||||
display: flex;
|
||||
align-items: baseline;
|
||||
justify-content: center;
|
||||
}
|
||||
.ranking {
|
||||
font-size: 2em;
|
||||
color: #ff0000;
|
||||
-webkit-text-stroke: #ffffff 2px;
|
||||
text-shadow: 3px 3px 6px rgba(0, 0, 0, 0.7);
|
||||
font-weight: bold;
|
||||
font-stretch: 50%;
|
||||
}
|
||||
</style>
|
||||
<script>
|
||||
// 从 URL 参数中获取 number 的值
|
||||
const urlParams = new URLSearchParams(window.location.search);
|
||||
const number = urlParams.get('number');
|
||||
// 将 number 显示在页面上
|
||||
document.getElementById('number').textContent = number;
|
||||
// 从 URL 参数中获取 ranking 的值
|
||||
const ranking = urlParams.get('ranking');
|
||||
// 将 ranking 显示在页面上
|
||||
document.getElementById('ranking').textContent = ranking;
|
||||
</script>
|
||||
</html>
|
||||
@ -51,13 +51,20 @@ class LongTaskTarget(BaseModel):
|
||||
target_id: str
|
||||
"沟通对象的 ID"
|
||||
|
||||
async def send_message(self, msg: UniMessage, at: bool = True) -> bool:
|
||||
@property
|
||||
def is_private_chat(self):
|
||||
return self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX)
|
||||
|
||||
async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool:
|
||||
try:
|
||||
bot = nonebot.get_bot(self.self_id)
|
||||
except KeyError:
|
||||
logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}")
|
||||
return False
|
||||
|
||||
if isinstance(msg, str):
|
||||
msg = UniMessage.text(msg)
|
||||
|
||||
if self.platform == "qq":
|
||||
if not isinstance(bot, OBBot):
|
||||
logger.warning(
|
||||
|
||||
@ -16,8 +16,6 @@ from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||
from PIL import UnidentifiedImageError
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
|
||||
|
||||
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||
# if "/matcha/cache/" in url:
|
||||
|
||||
30
konabot/common/nb/qq_broadcast.py
Normal file
30
konabot/common/nb/qq_broadcast.py
Normal file
@ -0,0 +1,30 @@
|
||||
from typing import Any, cast
|
||||
import nonebot
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from nonebot.adapters.onebot.v11 import Bot as OBBot
|
||||
|
||||
|
||||
async def qq_broadcast(groups: list[str], msg: UniMessage[Any]):
|
||||
bots: dict[str, OBBot] = {}
|
||||
|
||||
# group_id -> bot_id
|
||||
availabilities: dict[str, str] = {}
|
||||
|
||||
for bot_id, bot in nonebot.get_bots().items():
|
||||
if not isinstance(bot, OBBot):
|
||||
continue
|
||||
bots[bot_id] = bot
|
||||
gl = await bot.get_group_list()
|
||||
for g in gl:
|
||||
gid = str(g.get("group_id", -1))
|
||||
if gid in groups:
|
||||
availabilities[gid] = bot_id
|
||||
|
||||
for group in groups:
|
||||
if group in availabilities:
|
||||
bot = bots[availabilities[group]]
|
||||
await bot.send_group_msg(
|
||||
group_id=int(group),
|
||||
message=cast(Any, await msg.export(bot)),
|
||||
auto_escape=False,
|
||||
)
|
||||
54
konabot/common/username.py
Normal file
54
konabot/common/username.py
Normal file
@ -0,0 +1,54 @@
|
||||
import re
|
||||
import nonebot
|
||||
|
||||
from nonebot.adapters.onebot.v11 import Bot as OBBot
|
||||
|
||||
|
||||
class UsernameManager:
|
||||
grouped_data: dict[int, dict[int, str]]
|
||||
individual_data: dict[int, str]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.grouped_data = {}
|
||||
self.individual_data = {}
|
||||
|
||||
async def update(self):
|
||||
for bot in nonebot.get_bots().values():
|
||||
if isinstance(bot, OBBot):
|
||||
for user in await bot.get_friend_list():
|
||||
uid = user["user_id"]
|
||||
nickname = user["nickname"]
|
||||
self.individual_data[uid] = nickname
|
||||
for group in await bot.get_group_list():
|
||||
gid = group["group_id"]
|
||||
for member in await bot.get_group_member_list(group_id=gid):
|
||||
uid = member["user_id"]
|
||||
card = member.get("card", "")
|
||||
nickname = member.get("nickname", "")
|
||||
if card:
|
||||
self.grouped_data.setdefault(gid, {})[uid] = card
|
||||
if nickname:
|
||||
self.individual_data[uid] = nickname
|
||||
|
||||
def get(self, qqid: int, groupid: int | None = None) -> str:
|
||||
if groupid is not None and groupid in self.grouped_data:
|
||||
n = self.grouped_data[groupid].get(qqid)
|
||||
if n is not None:
|
||||
return n
|
||||
if qqid in self.individual_data:
|
||||
return self.individual_data[qqid]
|
||||
return str(qqid)
|
||||
|
||||
|
||||
manager = UsernameManager()
|
||||
|
||||
def get_username(qqid: int | str, group: int | str | None = None):
|
||||
if isinstance(group, str):
|
||||
group = None if not re.match(r"^\d+$", group) else int(group)
|
||||
if isinstance(qqid, str):
|
||||
if re.match(r"^\d+$", qqid):
|
||||
qqid = int(qqid)
|
||||
else:
|
||||
return qqid
|
||||
return manager.get(qqid, group)
|
||||
|
||||
@ -1,13 +1,45 @@
|
||||
import asyncio
|
||||
import queue
|
||||
from typing import Any, Callable, Coroutine
|
||||
from loguru import logger
|
||||
from playwright.async_api import async_playwright, Browser
|
||||
from playwright.async_api import Page, Playwright, async_playwright, Browser, Page, BrowserContext
|
||||
|
||||
|
||||
PageFunction = Callable[[Page], Coroutine[Any, Any, Any]]
|
||||
|
||||
|
||||
class WebRenderer:
|
||||
browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue()
|
||||
context_pool: dict[int, BrowserContext] = {} # 长期挂载的浏览器上下文池
|
||||
page_pool: dict[str, Page] = {} # 长期挂载的页面池
|
||||
|
||||
@classmethod
|
||||
async def render(cls, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
async def get_browser_instance(cls) -> "WebRendererInstance":
|
||||
if cls.browser_pool.empty():
|
||||
instance = await WebRendererInstance.create()
|
||||
cls.browser_pool.put(instance)
|
||||
instance = cls.browser_pool.get()
|
||||
cls.browser_pool.put(instance)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
async def get_browser_context(cls) -> BrowserContext:
|
||||
instance = await cls.get_browser_instance()
|
||||
if id(instance) not in cls.context_pool:
|
||||
context = await instance.browser.new_context()
|
||||
cls.context_pool[id(instance)] = context
|
||||
logger.debug(f"Created new persistent browser context for WebRendererInstance {id(instance)}")
|
||||
return cls.context_pool[id(instance)]
|
||||
|
||||
@classmethod
|
||||
async def render(
|
||||
cls,
|
||||
url: str,
|
||||
target: str,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
'''
|
||||
访问指定URL并返回截图
|
||||
|
||||
@ -19,30 +51,109 @@ class WebRenderer:
|
||||
:return: 截图的字节数据
|
||||
|
||||
'''
|
||||
logger.debug(f"Requesting render for {url} targeting {target} with timeout {timeout}")
|
||||
if cls.browser_pool.empty():
|
||||
instance = await WebRendererInstance.create()
|
||||
cls.browser_pool.put(instance)
|
||||
instance = cls.browser_pool.get()
|
||||
cls.browser_pool.put(instance)
|
||||
instance = await cls.get_browser_instance()
|
||||
logger.debug(f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}")
|
||||
return await instance.render(url, target, params=params, other_function=other_function, timeout=timeout)
|
||||
|
||||
|
||||
@classmethod
|
||||
async def render_persistent_page(cls, page_id: str, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
'''
|
||||
使用长期挂载的页面访问指定URL并返回截图
|
||||
|
||||
:param page_id: 页面唯一标识符
|
||||
:param url: 目标URL
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
'''
|
||||
logger.debug(f"Requesting persistent render for page_id {page_id} at {url} targeting {target} with timeout {timeout}")
|
||||
instance = await cls.get_browser_instance()
|
||||
if page_id not in cls.page_pool:
|
||||
context = await cls.get_browser_context()
|
||||
page = await context.new_page()
|
||||
cls.page_pool[page_id] = page
|
||||
logger.debug(f"Created new persistent page for page_id {page_id} using WebRendererInstance {id(instance)}")
|
||||
page = cls.page_pool[page_id]
|
||||
return await instance.render_with_page(page, url, target, params=params, other_function=other_function, timeout=timeout)
|
||||
|
||||
@classmethod
|
||||
async def render_file(
|
||||
cls,
|
||||
file_path: str,
|
||||
target: str,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
'''
|
||||
访问指定本地文件URL并返回截图
|
||||
|
||||
:param file_path: 目标文件路径
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
'''
|
||||
instance = await cls.get_browser_instance()
|
||||
logger.debug(f"Using WebRendererInstance {id(instance)} to render file {file_path} targeting {target}")
|
||||
return await instance.render_file(file_path, target, params=params, other_function=other_function, timeout=timeout)
|
||||
|
||||
@classmethod
|
||||
async def close_persistent_page(cls, page_id: str) -> None:
|
||||
'''
|
||||
关闭并移除长期挂载的页面
|
||||
|
||||
:param page_id: 页面唯一标识符
|
||||
'''
|
||||
if page_id in cls.page_pool:
|
||||
page = cls.page_pool[page_id]
|
||||
await page.close()
|
||||
del cls.page_pool[page_id]
|
||||
logger.debug(f"Closed and removed persistent page for page_id {page_id}")
|
||||
|
||||
|
||||
|
||||
class WebRendererInstance:
|
||||
def __init__(self):
|
||||
self.playwright = None
|
||||
self.browser: Browser = None
|
||||
self.lock: asyncio.Lock = None
|
||||
self._playwright: Playwright | None = None
|
||||
self._browser: Browser | None = None
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
@property
|
||||
def playwright(self) -> Playwright:
|
||||
assert self._playwright is not None
|
||||
return self._playwright
|
||||
|
||||
@property
|
||||
def browser(self) -> Browser:
|
||||
assert self._browser is not None
|
||||
return self._browser
|
||||
|
||||
async def init(self):
|
||||
self._playwright = await async_playwright().start()
|
||||
self._browser = await self.playwright.chromium.launch(headless=True)
|
||||
|
||||
@classmethod
|
||||
async def create(cls) -> "WebRendererInstance":
|
||||
instance = cls()
|
||||
instance.playwright = await async_playwright().start()
|
||||
instance.browser = await instance.playwright.chromium.launch(headless=True)
|
||||
instance.lock = asyncio.Lock()
|
||||
await instance.init()
|
||||
return instance
|
||||
|
||||
async def render(self, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
async def render(
|
||||
self,
|
||||
url: str,
|
||||
target: str,
|
||||
index: int = 0,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30
|
||||
) -> bytes:
|
||||
'''
|
||||
访问指定URL并返回截图
|
||||
|
||||
@ -58,29 +169,43 @@ class WebRendererInstance:
|
||||
async with self.lock:
|
||||
context = await self.browser.new_context()
|
||||
page = await context.new_page()
|
||||
logger.debug(f"Navigating to {url} with timeout {timeout}")
|
||||
try:
|
||||
url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "")
|
||||
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
|
||||
logger.debug(f"Page loaded successfully")
|
||||
# 等待目标元素出现
|
||||
await page.wait_for_selector(target, timeout=timeout * 1000)
|
||||
logger.debug(f"Target element '{target}' found, taking screenshot")
|
||||
if other_function:
|
||||
await other_function(page)
|
||||
elements = await page.query_selector_all(target)
|
||||
if not elements:
|
||||
raise Exception(f"Target element '{target}' not found on the page.")
|
||||
if index >= len(elements):
|
||||
raise Exception(f"Index {index} out of range for elements matching '{target}'.")
|
||||
element = elements[index]
|
||||
screenshot = await element.screenshot()
|
||||
logger.debug(f"Screenshot taken successfully")
|
||||
return screenshot
|
||||
finally:
|
||||
await page.close()
|
||||
await context.close()
|
||||
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
|
||||
await page.close()
|
||||
await context.close()
|
||||
return screenshot
|
||||
|
||||
async def render_with_page(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
async with self.lock:
|
||||
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
|
||||
return screenshot
|
||||
|
||||
async def render_file(self, file_path: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
file_path = "file:///" + str(file_path).replace("\\", "/")
|
||||
return await self.render(file_path, target, index, params, other_function, timeout)
|
||||
|
||||
async def inner_render(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
logger.debug(f"Navigating to {url} with timeout {timeout}")
|
||||
url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "")
|
||||
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
|
||||
logger.debug("Page loaded successfully")
|
||||
# 等待目标元素出现
|
||||
await page.wait_for_selector(target, timeout=timeout * 1000)
|
||||
logger.debug(f"Target element '{target}' found, taking screenshot")
|
||||
if other_function:
|
||||
await other_function(page)
|
||||
elements = await page.query_selector_all(target)
|
||||
if not elements:
|
||||
logger.error(f"Target element '{target}' not found on the page.")
|
||||
return None
|
||||
if index >= len(elements):
|
||||
logger.error(f"Index {index} out of range for elements matching '{target}'")
|
||||
return None
|
||||
element = elements[index]
|
||||
screenshot = await element.screenshot()
|
||||
logger.debug(f"Screenshot taken successfully")
|
||||
return screenshot
|
||||
|
||||
async def close(self):
|
||||
await self.browser.close()
|
||||
await self.playwright.stop()
|
||||
|
||||
|
||||
4
konabot/docs/sys/konaph.txt
Normal file
4
konabot/docs/sys/konaph.txt
Normal file
@ -0,0 +1,4 @@
|
||||
指令介绍
|
||||
konaph - KonaBot 的 PuzzleHunt 管理工具
|
||||
|
||||
详细介绍请直接输入 konaph 获取使用指引(该指令权限仅对部分人开放。如果你有权限的话才有响应。建议在此方 BOT 私聊使用该指令。)
|
||||
162
konabot/plugins/air_conditioner/__init__.py
Normal file
162
konabot/plugins/air_conditioner/__init__.py
Normal file
@ -0,0 +1,162 @@
|
||||
from io import BytesIO
|
||||
from typing import Optional, Union
|
||||
import cv2
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
||||
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||||
from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna
|
||||
from PIL import Image
|
||||
import numpy as np
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
from konabot.common.web_render import WebRenderer
|
||||
from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform
|
||||
|
||||
import random
|
||||
import math
|
||||
|
||||
def get_ac(id: str) -> AirConditioner:
|
||||
ac = AirConditioner.air_conditioners.get(id)
|
||||
if ac is None:
|
||||
ac = AirConditioner(id)
|
||||
return ac
|
||||
|
||||
async def send_ac_image(event: type[AlconnaMatcher], ac: AirConditioner):
|
||||
if(ac.burnt == True):
|
||||
# 打开坏掉的空调图片
|
||||
with open(ASSETS_PATH / "img" / "ac" / "broken_ac.png", "rb") as f:
|
||||
# 将其转为 GIF 格式发送
|
||||
output = BytesIO()
|
||||
Image.open(f).save(output, format="GIF")
|
||||
output.seek(0)
|
||||
await event.send(await UniMessage().image(raw=output).export())
|
||||
return
|
||||
if(ac.frozen == True):
|
||||
# 打开坏掉的空调图片
|
||||
with open(ASSETS_PATH / "img" / "ac" / "frozen_ac.png", "rb") as f:
|
||||
# 将其转为 GIF 格式发送
|
||||
output = BytesIO()
|
||||
Image.open(f).save(output, format="GIF")
|
||||
output.seek(0)
|
||||
await event.send(await UniMessage().image(raw=output).export())
|
||||
return
|
||||
ac_image = await generate_ac_image(ac)
|
||||
await event.send(await UniMessage().image(raw=ac_image).export())
|
||||
|
||||
evt = on_alconna(Alconna(
|
||||
"群空调"
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||||
|
||||
@evt.handle()
|
||||
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
||||
id = target.channel_id
|
||||
ac = get_ac(id)
|
||||
await send_ac_image(evt, ac)
|
||||
|
||||
evt = on_alconna(Alconna(
|
||||
"开空调"
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||||
|
||||
@evt.handle()
|
||||
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
||||
id = target.channel_id
|
||||
ac = get_ac(id)
|
||||
ac.on = True
|
||||
await send_ac_image(evt, ac)
|
||||
|
||||
evt = on_alconna(Alconna(
|
||||
"关空调"
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||||
|
||||
@evt.handle()
|
||||
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
||||
id = target.channel_id
|
||||
ac = get_ac(id)
|
||||
ac.on = False
|
||||
await send_ac_image(evt, ac)
|
||||
|
||||
evt = on_alconna(Alconna(
|
||||
"空调升温",
|
||||
Args["temp?", Optional[Union[int, float]]] # 可选参数,升温的度数,默认为1
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||||
|
||||
@evt.handle()
|
||||
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
|
||||
if temp <= 0:
|
||||
return
|
||||
id = target.channel_id
|
||||
ac = get_ac(id)
|
||||
if not ac.on or ac.burnt == True or ac.frozen == True:
|
||||
await send_ac_image(evt, ac)
|
||||
return
|
||||
ac.temperature += temp
|
||||
if ac.temperature > 40:
|
||||
# 根据温度随机出是否爆炸,40度开始,呈指数增长
|
||||
possibility = -math.e ** ((40-ac.temperature) / 50) + 1
|
||||
if random.random() < possibility:
|
||||
# 打开爆炸图片
|
||||
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
|
||||
output = BytesIO()
|
||||
# 爆炸抖动
|
||||
frames = wiggle_transform(np.array(Image.open(f)), intensity=5)
|
||||
pil_frames = [Image.fromarray(frame) for frame in frames]
|
||||
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=35, disposal=2)
|
||||
output.seek(0)
|
||||
await evt.send(await UniMessage().image(raw=output).export())
|
||||
ac.broke_ac(CrashType.BURNT)
|
||||
await evt.send("太热啦,空调炸了!")
|
||||
return
|
||||
await send_ac_image(evt, ac)
|
||||
|
||||
evt = on_alconna(Alconna(
|
||||
"空调降温",
|
||||
Args["temp?", Optional[Union[int, float]]] # 可选参数,降温的度数,默认为1
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||||
|
||||
@evt.handle()
|
||||
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
|
||||
if temp <= 0:
|
||||
return
|
||||
id = target.channel_id
|
||||
ac = get_ac(id)
|
||||
if not ac.on or ac.burnt == True or ac.frozen == True:
|
||||
await send_ac_image(evt, ac)
|
||||
return
|
||||
ac.temperature -= temp
|
||||
if ac.temperature < 0:
|
||||
# 根据温度随机出是否冻结,0度开始,呈指数增长
|
||||
possibility = -math.e ** (ac.temperature / 50) + 1
|
||||
if random.random() < possibility:
|
||||
ac.broke_ac(CrashType.FROZEN)
|
||||
await send_ac_image(evt, ac)
|
||||
|
||||
evt = on_alconna(Alconna(
|
||||
"换空调"
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||||
|
||||
@evt.handle()
|
||||
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
||||
id = target.channel_id
|
||||
ac = get_ac(id)
|
||||
ac.change_ac()
|
||||
await send_ac_image(evt, ac)
|
||||
|
||||
evt = on_alconna(Alconna(
|
||||
"空调炸炸排行榜",
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||||
|
||||
@evt.handle()
|
||||
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
||||
id = target.channel_id
|
||||
ac = get_ac(id)
|
||||
number, ranking = ac.get_crashes_and_ranking()
|
||||
params = {
|
||||
"number": number,
|
||||
"ranking": ranking
|
||||
}
|
||||
image = await WebRenderer.render_file(
|
||||
file_path=ASSETS_PATH / "webpage" / "ac" / "index.html",
|
||||
target=".box",
|
||||
params=params
|
||||
)
|
||||
await evt.send(await UniMessage().image(raw=image).export())
|
||||
288
konabot/plugins/air_conditioner/ac.py
Normal file
288
konabot/plugins/air_conditioner/ac.py
Normal file
@ -0,0 +1,288 @@
|
||||
from enum import Enum
|
||||
from io import BytesIO
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
from konabot.common.path import ASSETS_PATH, FONTS_PATH
|
||||
from konabot.common.path import DATA_PATH
|
||||
import json
|
||||
|
||||
class CrashType(Enum):
|
||||
BURNT = 0
|
||||
FROZEN = 1
|
||||
|
||||
class AirConditioner:
|
||||
air_conditioners: dict[str, "AirConditioner"] = {}
|
||||
|
||||
def __init__(self, id: str) -> None:
|
||||
self.id = id
|
||||
self.on = False
|
||||
self.temperature = 24 # 默认温度
|
||||
self.burnt = False
|
||||
self.frozen = False
|
||||
AirConditioner.air_conditioners[id] = self
|
||||
|
||||
def change_ac(self):
|
||||
self.burnt = False
|
||||
self.frozen = False
|
||||
self.on = False
|
||||
self.temperature = 24 # 重置为默认温度
|
||||
|
||||
def broke_ac(self, crash_type: CrashType):
|
||||
'''
|
||||
让空调坏掉,并保存数据
|
||||
|
||||
:param crash_type: CrashType 枚举,表示空调坏掉的类型
|
||||
'''
|
||||
match crash_type:
|
||||
case CrashType.BURNT:
|
||||
self.burnt = True
|
||||
case CrashType.FROZEN:
|
||||
self.frozen = True
|
||||
self.save_crash_data(crash_type)
|
||||
|
||||
def save_crash_data(self, crash_type: CrashType):
|
||||
'''
|
||||
如果空调爆炸了,就往本地的 ac_crash_data.json 里该 id 的记录加一
|
||||
'''
|
||||
data_file = DATA_PATH / "ac_crash_data.json"
|
||||
crash_data = {}
|
||||
if data_file.exists():
|
||||
with open(data_file, "r", encoding="utf-8") as f:
|
||||
crash_data = json.load(f)
|
||||
if self.id not in crash_data:
|
||||
crash_data[self.id] = {"burnt": 0, "frozen": 0}
|
||||
match crash_type:
|
||||
case CrashType.BURNT:
|
||||
crash_data[self.id]["burnt"] += 1
|
||||
case CrashType.FROZEN:
|
||||
crash_data[self.id]["frozen"] += 1
|
||||
with open(data_file, "w", encoding="utf-8") as f:
|
||||
json.dump(crash_data, f, ensure_ascii=False, indent=4)
|
||||
|
||||
def get_crashes_and_ranking(self) -> tuple[int, int]:
|
||||
'''
|
||||
获取该群在全国空调损坏的数量与排行榜的位置
|
||||
'''
|
||||
data_file = DATA_PATH / "ac_crash_data.json"
|
||||
if not data_file.exists():
|
||||
return 0, 1
|
||||
with open(data_file, "r", encoding="utf-8") as f:
|
||||
crash_data = json.load(f)
|
||||
ranking_list = []
|
||||
for gid, record in crash_data.items():
|
||||
total = record.get("burnt", 0) + record.get("frozen", 0)
|
||||
ranking_list.append((gid, total))
|
||||
ranking_list.sort(key=lambda x: x[1], reverse=True)
|
||||
total_crashes = crash_data.get(self.id, {}).get("burnt", 0) + crash_data.get(self.id, {}).get("frozen", 0)
|
||||
rank = 1
|
||||
for gid, total in ranking_list:
|
||||
if gid == self.id:
|
||||
break
|
||||
rank += 1
|
||||
return total_crashes, rank
|
||||
|
||||
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
|
||||
"""
|
||||
将文本转换为带透明背景的图像,图像大小刚好包含文本
|
||||
"""
|
||||
# 创建临时图像来计算文本尺寸
|
||||
temp_image = Image.new('RGB', (1, 1), (255, 255, 255))
|
||||
temp_draw = ImageDraw.Draw(temp_image)
|
||||
|
||||
font = ImageFont.truetype(FONTS_PATH / "montserrat.otf", font_size)
|
||||
|
||||
# 获取文本边界框
|
||||
bbox = temp_draw.textbbox((0, 0), text, font=font)
|
||||
text_width = bbox[2] - bbox[0]
|
||||
text_height = bbox[3] - bbox[1]
|
||||
|
||||
# 计算图像大小(文本大小 + 内边距)
|
||||
image_width = int(text_width + 2 * padding)
|
||||
image_height = int(text_height + 2 * padding)
|
||||
|
||||
# 创建RGBA模式的空白图像(带透明通道)
|
||||
image = Image.new('RGBA', (image_width, image_height), (0, 0, 0, 0))
|
||||
draw = ImageDraw.Draw(image)
|
||||
|
||||
# 绘制文本(考虑内边距)
|
||||
x = padding - bbox[0] # 调整起始位置
|
||||
y = padding - bbox[1]
|
||||
|
||||
# 设置文本颜色(带透明度)
|
||||
if len(text_color) == 3:
|
||||
text_color = text_color + (255,) # 添加完全不透明的alpha值
|
||||
|
||||
draw.text((x, y), text, fill=text_color, font=font)
|
||||
|
||||
# 转换为OpenCV格式(BGRA)
|
||||
image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGBA2BGRA)
|
||||
return image_cv
|
||||
|
||||
def perspective_transform(image, target, corners):
|
||||
"""
|
||||
对图像进行透视变换(保持透明通道)
|
||||
target: 画布
|
||||
corners: 四个角点的坐标,顺序为 [左上, 右上, 右下, 左下]
|
||||
"""
|
||||
height, width = image.shape[:2]
|
||||
|
||||
# 源点(原始图像的四个角)
|
||||
src_points = np.array([
|
||||
[0, 0], # 左上
|
||||
[width-1, 0], # 右上
|
||||
[width-1, height-1], # 右下
|
||||
[0, height-1] # 左下
|
||||
], dtype=np.float32)
|
||||
|
||||
# 目标点(变换后的四个角)
|
||||
dst_points = np.array(corners, dtype=np.float32)
|
||||
|
||||
# 计算透视变换矩阵
|
||||
matrix = cv2.getPerspectiveTransform(src_points, dst_points)
|
||||
|
||||
# 获取画布大小
|
||||
target_height, target_width = target.shape[:2]
|
||||
|
||||
# 应用透视变换(保持所有通道,包括alpha)
|
||||
transformed = cv2.warpPerspective(image, matrix, (target_width, target_height), flags=cv2.INTER_LINEAR)
|
||||
|
||||
return transformed, matrix
|
||||
|
||||
def blend_with_transparency(background, foreground, position):
|
||||
"""
|
||||
将带透明通道的前景图像合成到背景图像上
|
||||
position: 前景图像在背景图像上的位置 (x, y)
|
||||
"""
|
||||
bg = background.copy()
|
||||
|
||||
# 如果背景没有alpha通道,添加一个
|
||||
if bg.shape[2] == 3:
|
||||
bg = cv2.cvtColor(bg, cv2.COLOR_BGR2BGRA)
|
||||
bg[:, :, 3] = 255 # 完全不透明
|
||||
|
||||
x, y = position
|
||||
fg_height, fg_width = foreground.shape[:2]
|
||||
bg_height, bg_width = bg.shape[:2]
|
||||
|
||||
# 确保位置在图像范围内
|
||||
x = max(0, min(x, bg_width - fg_width))
|
||||
y = max(0, min(y, bg_height - fg_height))
|
||||
|
||||
# 提取前景的alpha通道并归一化
|
||||
alpha_foreground = foreground[:, :, 3] / 255.0
|
||||
|
||||
# 对于每个颜色通道进行合成
|
||||
for c in range(3):
|
||||
bg_region = bg[y:y+fg_height, x:x+fg_width, c]
|
||||
fg_region = foreground[:, :, c]
|
||||
|
||||
# alpha混合公式
|
||||
bg[y:y+fg_height, x:x+fg_width, c] = (
|
||||
alpha_foreground * fg_region +
|
||||
(1 - alpha_foreground) * bg_region
|
||||
)
|
||||
|
||||
# 更新背景的alpha通道(如果需要)
|
||||
bg_alpha_region = bg[y:y+fg_height, x:x+fg_width, 3]
|
||||
bg[y:y+fg_height, x:x+fg_width, 3] = np.maximum(bg_alpha_region, foreground[:, :, 3])
|
||||
|
||||
return bg
|
||||
|
||||
def precise_blend_with_perspective(background, foreground, corners):
|
||||
"""
|
||||
精确合成:根据四个角点将前景图像透视合成到背景上
|
||||
"""
|
||||
# 创建与背景相同大小的空白图像
|
||||
bg_height, bg_width = background.shape[:2]
|
||||
|
||||
# 如果背景没有alpha通道,转换为BGRA
|
||||
if background.shape[2] == 3:
|
||||
background_bgra = cv2.cvtColor(background, cv2.COLOR_BGR2BGRA)
|
||||
else:
|
||||
background_bgra = background.copy()
|
||||
|
||||
# 创建与背景相同大小的前景图层
|
||||
foreground_layer = np.zeros((bg_height, bg_width, 4), dtype=np.uint8)
|
||||
|
||||
# 计算前景图像在背景中的边界框
|
||||
min_x = int(min(corners[:, 0]))
|
||||
max_x = int(max(corners[:, 0]))
|
||||
min_y = int(min(corners[:, 1]))
|
||||
max_y = int(max(corners[:, 1]))
|
||||
|
||||
# 将变换后的前景图像放置到对应位置
|
||||
fg_height, fg_width = foreground.shape[:2]
|
||||
if min_y + fg_height <= bg_height and min_x + fg_width <= bg_width:
|
||||
foreground_layer[min_y:min_y+fg_height, min_x:min_x+fg_width] = foreground
|
||||
|
||||
# 创建掩码(只在前景有内容的地方合成)
|
||||
mask = (foreground_layer[:, :, 3] > 0)
|
||||
|
||||
# 合成图像
|
||||
result = background_bgra.copy()
|
||||
for c in range(3):
|
||||
result[:, :, c][mask] = foreground_layer[:, :, c][mask]
|
||||
result[:, :, 3][mask] = foreground_layer[:, :, 3][mask]
|
||||
|
||||
return result
|
||||
|
||||
def wiggle_transform(image, intensity=2) -> list[np.ndarray]:
|
||||
'''
|
||||
返回一组图像振动的帧组,模拟空调运作时的抖动效果
|
||||
'''
|
||||
frames = []
|
||||
height, width = image.shape[:2]
|
||||
shifts = [(-intensity, 0), (intensity, 0), (0, -intensity), (0, intensity), (0, 0)]
|
||||
for dx, dy in shifts:
|
||||
M = np.float32([[1, 0, dx], [0, 1, dy]])
|
||||
shifted = cv2.warpAffine(image, M, (width, height))
|
||||
frames.append(shifted)
|
||||
return frames
|
||||
|
||||
async def generate_ac_image(ac: AirConditioner) -> BytesIO:
|
||||
# 找到空调底图
|
||||
ac_image = cv2.imread(str(ASSETS_PATH / "img" / "ac" / "ac.png"), cv2.IMREAD_UNCHANGED)
|
||||
|
||||
if not ac.on:
|
||||
# 空调关闭状态,直接返回底图
|
||||
pil_final = Image.fromarray(ac_image)
|
||||
output = BytesIO()
|
||||
pil_final.save(output, format="GIF")
|
||||
return output
|
||||
|
||||
# 根据生成温度文本图像
|
||||
text = f"{round(ac.temperature, 1)}°C"
|
||||
text_image = text_to_transparent_image(
|
||||
text,
|
||||
font_size=60,
|
||||
text_color=(0, 0, 0) # 黑色文字
|
||||
)
|
||||
|
||||
# 获取长宽比
|
||||
height, width = text_image.shape[:2]
|
||||
aspect_ratio = width / height
|
||||
|
||||
# 定义3D变换的四个角点(透视效果)
|
||||
# 顺序: [左上, 右上, 右下, 左下]
|
||||
corners = np.array([
|
||||
[123, 45], # 左上
|
||||
[284, 101], # 右上
|
||||
[290, 140], # 右下
|
||||
[119, 100] # 左下
|
||||
], dtype=np.float32)
|
||||
|
||||
# 对文本图像进行3D变换(保持透明通道)
|
||||
transformed_text, transform_matrix = perspective_transform(text_image, ac_image, corners)
|
||||
|
||||
final_image_simple = blend_with_transparency(ac_image, transformed_text, (0, 0))
|
||||
|
||||
intensity = max(2, abs(int(ac.temperature) - 24) // 2)
|
||||
|
||||
frames = wiggle_transform(final_image_simple, intensity=intensity)
|
||||
pil_frames = [Image.fromarray(frame) for frame in frames]
|
||||
output = BytesIO()
|
||||
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=50, disposal=2)
|
||||
return output
|
||||
@ -69,10 +69,11 @@ class TryStopState(Enum):
|
||||
class TryVerifyState(Enum):
|
||||
VERIFIED = 0
|
||||
VERIFIED_AND_REAL = 1
|
||||
NOT_IDIOM = 2
|
||||
WRONG_FIRST_CHAR = 3
|
||||
BUT_NO_NEXT = 4
|
||||
GAME_END = 5
|
||||
ALREADY_USED = 2
|
||||
NOT_IDIOM = 3
|
||||
WRONG_FIRST_CHAR = 4
|
||||
BUT_NO_NEXT = 5
|
||||
GAME_END = 6
|
||||
|
||||
|
||||
class IdiomGame:
|
||||
@ -96,12 +97,14 @@ class IdiomGame:
|
||||
self.all_buff_score = 0
|
||||
self.lock = asynkio.Lock()
|
||||
self.remain_rounds = 0 # 剩余回合数
|
||||
self.already_idioms: dict[str, int] = {} # 已经使用过的成语和使用过的次数
|
||||
self.idiom_history: list[list[str]] = [] # 成语使用历史记录,多个数组以存储不同成语链
|
||||
IdiomGame.INSTANCE_LIST[group_id] = self
|
||||
|
||||
def be_able_to_play(self) -> bool:
|
||||
if self.last_play_date != datetime.date.today():
|
||||
self.last_play_date = datetime.date.today()
|
||||
self.remain_playing_times = 1
|
||||
self.remain_playing_times = 3
|
||||
if self.remain_playing_times > 0:
|
||||
self.remain_playing_times -= 1
|
||||
return True
|
||||
@ -115,6 +118,8 @@ class IdiomGame:
|
||||
self.last_char = self.last_idiom[-1]
|
||||
if not self.is_nextable(self.last_char):
|
||||
self.choose_start_idiom()
|
||||
else:
|
||||
self.add_history_idiom(self.last_idiom, new_chain=True)
|
||||
return self.last_idiom
|
||||
|
||||
@classmethod
|
||||
@ -148,6 +153,9 @@ class IdiomGame:
|
||||
def clear_score_board(self):
|
||||
self.score_board = {}
|
||||
self.last_char = ""
|
||||
self.all_buff_score = 0
|
||||
self.already_idioms = {}
|
||||
self.idiom_history = []
|
||||
|
||||
def get_score_board(self) -> dict:
|
||||
return self.score_board
|
||||
@ -169,6 +177,8 @@ class IdiomGame:
|
||||
self.last_char = self.last_idiom[-1]
|
||||
if not self.is_nextable(self.last_char):
|
||||
self._skip_idiom_async()
|
||||
else:
|
||||
self.add_history_idiom(self.last_idiom, new_chain=True)
|
||||
return self.last_idiom
|
||||
|
||||
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
|
||||
@ -184,6 +194,29 @@ class IdiomGame:
|
||||
判断是否有成语可以接
|
||||
"""
|
||||
return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR
|
||||
|
||||
def add_already_idiom(self, idiom: str):
|
||||
if idiom in self.already_idioms:
|
||||
self.already_idioms[idiom] += 1
|
||||
else:
|
||||
self.already_idioms[idiom] = 1
|
||||
|
||||
def get_already_used_num(self, idiom: str) -> int:
|
||||
if idiom in self.already_idioms:
|
||||
return self.already_idioms[idiom]
|
||||
return 0
|
||||
|
||||
def add_history_idiom(self, idiom: str, new_chain: bool = False):
|
||||
if new_chain or len(self.idiom_history) == 0:
|
||||
self.idiom_history.append([idiom])
|
||||
else:
|
||||
self.idiom_history[-1].append(idiom)
|
||||
|
||||
def display_history(self) -> list[str]:
|
||||
result = []
|
||||
for chain in self.idiom_history:
|
||||
result.append(" -> ".join(chain))
|
||||
return result
|
||||
|
||||
def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
|
||||
state = []
|
||||
@ -196,13 +229,18 @@ class IdiomGame:
|
||||
state.append(TryVerifyState.NOT_IDIOM)
|
||||
return state
|
||||
# 成语合法,更新状态
|
||||
self.add_history_idiom(idiom)
|
||||
score_k = 0.5 ** self.get_already_used_num(idiom) # 每被使用过一次,得分减半
|
||||
if(score_k != 1):
|
||||
state.append(TryVerifyState.ALREADY_USED)
|
||||
self.add_already_idiom(idiom)
|
||||
state.append(TryVerifyState.VERIFIED)
|
||||
self.last_idiom = idiom
|
||||
self.last_char = idiom[-1]
|
||||
self.add_score(user_id, 1)
|
||||
self.add_score(user_id, 1 * score_k) # 先加 1 分
|
||||
if idiom in IdiomGame.ALL_IDIOMS:
|
||||
state.append(TryVerifyState.VERIFIED_AND_REAL)
|
||||
self.add_score(user_id, 4) # 再加 4 分
|
||||
self.add_score(user_id, 4 * score_k) # 再加 4 分
|
||||
self.remain_rounds -= 1
|
||||
if self.remain_rounds <= 0:
|
||||
self.now_playing = False
|
||||
@ -210,6 +248,7 @@ class IdiomGame:
|
||||
if not self.is_nextable(self.last_char):
|
||||
# 没有成语可以接了,自动跳过
|
||||
self._skip_idiom_async()
|
||||
self.add_buff_score(-100)
|
||||
state.append(TryVerifyState.BUT_NO_NEXT)
|
||||
return state
|
||||
|
||||
@ -217,7 +256,7 @@ class IdiomGame:
|
||||
if user_id not in self.score_board:
|
||||
return 0
|
||||
# 避免浮点数精度问题导致过长
|
||||
handled_score = round(self.score_board[user_id]["score"], 1)
|
||||
handled_score = round(self.score_board[user_id]["score"] + self.all_buff_score, 1)
|
||||
return handled_score
|
||||
|
||||
def add_score(self, user_id: str, score: int):
|
||||
@ -401,7 +440,7 @@ async def end_game(event: BaseEvent, group_id: str):
|
||||
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
|
||||
score_board = instance.get_score_board()
|
||||
if len(score_board) == 0:
|
||||
result_text += "无人得分!"
|
||||
result_text += "无人得分!\n"
|
||||
else:
|
||||
# 按分数排序,名字用 at 的方式
|
||||
sorted_score = sorted(
|
||||
@ -413,6 +452,13 @@ async def end_game(event: BaseEvent, group_id: str):
|
||||
+ UniMessage().at(user_id)
|
||||
+ f": {round(info['score'] + instance.get_all_buff_score(), 1)} 分\n"
|
||||
)
|
||||
if len(instance.idiom_history) == 0:
|
||||
result_text += "\n本局没有任何接龙记录。"
|
||||
else:
|
||||
result_text += "\n你们的接龙记录是:\n"
|
||||
history_lines = instance.display_history()
|
||||
for line in history_lines:
|
||||
result_text += line + "\n"
|
||||
await evt.send(await result_text.export())
|
||||
instance.clear_score_board()
|
||||
|
||||
@ -499,20 +545,39 @@ async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
|
||||
.export()
|
||||
)
|
||||
return
|
||||
already_used_num = instance.get_already_used_num(user_idiom)
|
||||
if TryVerifyState.VERIFIED_AND_REAL in state:
|
||||
await evt.send(
|
||||
await UniMessage()
|
||||
.at(user_id)
|
||||
.text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!")
|
||||
.export()
|
||||
)
|
||||
score = 5 * (0.5 ** (already_used_num - 1))
|
||||
if already_used_num > 1:
|
||||
await evt.send(
|
||||
await UniMessage()
|
||||
.at(user_id)
|
||||
.text(f" 接上了,这是个被重复用过的成语,喜提 {score} 分!你有 {instance.get_user_score(user_id)} 分!")
|
||||
.export()
|
||||
)
|
||||
else:
|
||||
await evt.send(
|
||||
await UniMessage()
|
||||
.at(user_id)
|
||||
.text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!")
|
||||
.export()
|
||||
)
|
||||
elif TryVerifyState.VERIFIED in state:
|
||||
await evt.send(
|
||||
await UniMessage()
|
||||
.at(user_id)
|
||||
.text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!")
|
||||
.export()
|
||||
)
|
||||
score = 1 * (0.5 ** (already_used_num - 1))
|
||||
if already_used_num > 1:
|
||||
await evt.send(
|
||||
await UniMessage()
|
||||
.at(user_id)
|
||||
.text(f" 接上了,但重复了,喜提 {score} 分!你有 {instance.get_user_score(user_id)} 分!")
|
||||
.export()
|
||||
)
|
||||
else:
|
||||
await evt.send(
|
||||
await UniMessage()
|
||||
.at(user_id)
|
||||
.text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!")
|
||||
.export()
|
||||
)
|
||||
if TryVerifyState.GAME_END in state:
|
||||
await evt.send(await UniMessage().text("全部回合结束!").export())
|
||||
await end_game(event, group_id)
|
||||
|
||||
155
konabot/plugins/kona_ph/__init__.py
Normal file
155
konabot/plugins/kona_ph/__init__.py
Normal file
@ -0,0 +1,155 @@
|
||||
from functools import reduce
|
||||
from math import ceil
|
||||
import re
|
||||
from loguru import logger
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||||
from konabot.common.username import get_username
|
||||
from konabot.plugins.kona_ph.core.storage import get_today_date
|
||||
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
|
||||
create_admin_commands()
|
||||
|
||||
|
||||
async def is_play_group(target: DepLongTaskTarget):
|
||||
if target.is_private_chat:
|
||||
return True
|
||||
if target.channel_id in config.plugin_puzzle_playgroup:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
cmd_submit = on_alconna(Alconna(
|
||||
"re:提交(?:答案|题解|[fF]lag)",
|
||||
Args["flag", str],
|
||||
), rule=is_play_group)
|
||||
|
||||
@cmd_submit.handle()
|
||||
async def _(flag: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
result = manager.submit(target.target_id, flag)
|
||||
await target.send_message(result.get_unimessage())
|
||||
|
||||
|
||||
cmd_query = on_alconna(Alconna(
|
||||
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)"
|
||||
), rule=is_play_group)
|
||||
|
||||
@cmd_query.handle()
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
p = manager.get_today_puzzle()
|
||||
if p is None:
|
||||
return await target.send_message("今天无题,改日再来吧!")
|
||||
await target.send_message(p.get_unimessage())
|
||||
|
||||
|
||||
cmd_query_submission = on_alconna(Alconna(
|
||||
"今日答题情况"
|
||||
), rule=is_play_group)
|
||||
|
||||
@cmd_query_submission.handle()
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
p = manager.get_today_puzzle()
|
||||
if p is None:
|
||||
return await target.send_message("今天无题")
|
||||
msg = UniMessage.text("==== 今日答题情况 ====\n\n")
|
||||
|
||||
subcount = len(reduce(
|
||||
lambda x, y: x + y,
|
||||
manager.submissions.get(p.raw_id, {}).values(),
|
||||
[],
|
||||
))
|
||||
info = manager.daily_puzzle[p.index_id]
|
||||
|
||||
msg = msg.text(
|
||||
f"总体情况:答对 {len(info.success_users)} / "
|
||||
f"参与 {len(info.tried_users)} / "
|
||||
f"提交 {subcount}\n"
|
||||
)
|
||||
|
||||
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
|
||||
gid = None
|
||||
if re.match(r"^\d+$", target.channel_id):
|
||||
gid = int(target.channel_id)
|
||||
for u, d in success_users:
|
||||
uname = u
|
||||
if re.match(r"^\d+$", u):
|
||||
uname = get_username(int(u), gid)
|
||||
t = d.strftime("%H:%M")
|
||||
tries = len(manager.submissions[p.raw_id][u])
|
||||
msg = msg.text(f"\n- {uname} [Solved at {t} in {tries} times]")
|
||||
for u in info.tried_users - set(info.success_users.keys()):
|
||||
uname = u
|
||||
if re.match(r"^\d+$", u):
|
||||
uname = get_username(int(u), gid)
|
||||
tries = len(manager.submissions[p.raw_id][u])
|
||||
msg = msg.text(f"\n- {uname} [Unsolved in {tries} times]")
|
||||
|
||||
await target.send_message(msg)
|
||||
|
||||
|
||||
cmd_history = on_alconna(Alconna(
|
||||
"历史题目",
|
||||
Args["page?", int],
|
||||
Args["index_id?", str],
|
||||
), rule=is_play_group)
|
||||
|
||||
@cmd_history.handle()
|
||||
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
async with puzzle_manager() as manager:
|
||||
today = get_today_date()
|
||||
if index_id:
|
||||
index_id = index_id.removeprefix("#")
|
||||
if index_id == manager.daily_puzzle_of_date.get(today, ""):
|
||||
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
|
||||
return await target.send_message(puzzle.get_unimessage())
|
||||
if index_id in manager.daily_puzzle:
|
||||
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
|
||||
msg = puzzle.get_unimessage()
|
||||
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}")
|
||||
return await target.send_message(msg)
|
||||
return await target.send_message("没有这道题哦")
|
||||
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
|
||||
puzzles = [
|
||||
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
|
||||
for d, i in manager.daily_puzzle_of_date.items()
|
||||
]
|
||||
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
for p, d in puzzles:
|
||||
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
|
||||
msg = msg.text(
|
||||
f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]"
|
||||
f" {p.title} ({d})"
|
||||
)
|
||||
msg = msg.text("\n")
|
||||
msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||||
await target.send_message(msg)
|
||||
|
||||
|
||||
@scheduler.scheduled_job("cron", hour="8")
|
||||
async def _():
|
||||
async with puzzle_manager() as manager:
|
||||
msg2 = manager.get_report_yesterday()
|
||||
if msg2 is not None:
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
|
||||
|
||||
puzzle = manager.get_today_puzzle()
|
||||
if puzzle is not None:
|
||||
logger.info(f"找到了题目 {puzzle.raw_id},发送")
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage())
|
||||
else:
|
||||
logger.info("自动任务:没有找到题目,跳过")
|
||||
|
||||
|
||||
0
konabot/plugins/kona_ph/core/__init__.py
Normal file
0
konabot/plugins/kona_ph/core/__init__.py
Normal file
324
konabot/plugins/kona_ph/core/storage.py
Normal file
324
konabot/plugins/kona_ph/core/storage.py
Normal file
@ -0,0 +1,324 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
import random
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import nanoid
|
||||
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
|
||||
from konabot.common.path import DATA_PATH
|
||||
from konabot.common.username import get_username
|
||||
|
||||
|
||||
KONAPH_BASE = DATA_PATH / "KonaPH"
|
||||
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
|
||||
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
|
||||
|
||||
# 保证所有文件夹存在
|
||||
KONAPH_BASE.mkdir(exist_ok=True)
|
||||
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)
|
||||
|
||||
|
||||
class Puzzle(BaseModel):
|
||||
raw_id: str
|
||||
"用于给出题者管理的 ID"
|
||||
|
||||
index_id: str
|
||||
"展出的 ID,以展出顺序为准"
|
||||
|
||||
title: str
|
||||
content: str
|
||||
img_name: str
|
||||
author_id: str
|
||||
flag: str
|
||||
|
||||
ready: bool = False
|
||||
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
|
||||
|
||||
def get_image_path(self) -> Path:
|
||||
return KONAPH_IMAGE_BASE / self.img_name
|
||||
|
||||
def get_unimessage(self) -> UniMessage[Any]:
|
||||
result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}")
|
||||
result = result.text(f"\n\n{self.content}")
|
||||
|
||||
if self.img_name:
|
||||
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes())
|
||||
|
||||
result = result.text(f"\n\n出题者:{get_username(self.author_id)}")
|
||||
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
|
||||
|
||||
return result
|
||||
|
||||
def add_image(self, img: bytes, suffix: str = ".png"):
|
||||
if self.img_name:
|
||||
self.get_image_path().unlink(True)
|
||||
img_id = nanoid.generate(
|
||||
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
|
||||
21,
|
||||
)
|
||||
self.img_name = f"{img_id}{suffix}"
|
||||
self.get_image_path().write_bytes(img)
|
||||
|
||||
def remove_image(self):
|
||||
if self.img_name:
|
||||
self.get_image_path().unlink(True)
|
||||
self.img_name = ""
|
||||
|
||||
|
||||
class PuzzleSubmission(BaseModel):
|
||||
success: bool
|
||||
flag: str
|
||||
time: datetime.datetime
|
||||
|
||||
|
||||
class DailyPuzzleInfo(BaseModel):
|
||||
raw_id: str
|
||||
time: datetime.date
|
||||
tried_users: set[str] = set()
|
||||
success_users: dict[str, datetime.datetime] = {}
|
||||
|
||||
|
||||
class PuzzleSubmissionResultMessage(BaseModel):
|
||||
success: bool
|
||||
rank: int = -1
|
||||
message: str = ""
|
||||
|
||||
def get_unimessage(self) -> UniMessage[Any]:
|
||||
if self.success:
|
||||
return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!")
|
||||
return UniMessage.text(self.message)
|
||||
|
||||
|
||||
def get_today_date() -> datetime.date:
|
||||
now = datetime.datetime.now()
|
||||
if now.hour < 8:
|
||||
now -= datetime.timedelta(days=1)
|
||||
return now.date()
|
||||
|
||||
|
||||
class PuzzleManager(BaseModel):
|
||||
puzzle_data: dict[str, Puzzle] = {}
|
||||
|
||||
daily_puzzle: dict[str, DailyPuzzleInfo] = {}
|
||||
daily_puzzle_of_date: dict[datetime.date, str] = {}
|
||||
|
||||
puzzle_pinned: str = ""
|
||||
|
||||
index_id_counter: int = 1
|
||||
submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {}
|
||||
last_checked_date: datetime.date = Field(
|
||||
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
|
||||
)
|
||||
|
||||
@property
|
||||
def last_publish_date(self):
|
||||
return max(self.daily_puzzle_of_date.keys())
|
||||
|
||||
@property
|
||||
def unpublished_puzzles(self):
|
||||
return set((
|
||||
p.raw_id for p in self.puzzle_data.values()
|
||||
if not self.is_puzzle_published(p.raw_id) and p.ready
|
||||
))
|
||||
|
||||
@property
|
||||
def unready_puzzles(self):
|
||||
return set((
|
||||
p.raw_id for p in self.puzzle_data.values()
|
||||
if not self.is_puzzle_published(p.raw_id) and not p.ready
|
||||
))
|
||||
|
||||
@property
|
||||
def published_puzzles(self):
|
||||
return set((
|
||||
p.raw_id for p in self.puzzle_data.values()
|
||||
if self.is_puzzle_published(p.raw_id)
|
||||
))
|
||||
|
||||
def is_puzzle_published(self, raw_id: str):
|
||||
return raw_id in [i.raw_id for i in self.daily_puzzle.values()]
|
||||
|
||||
def publish_puzzle(self, raw_id: str):
|
||||
assert raw_id in self.puzzle_data
|
||||
|
||||
today = get_today_date()
|
||||
|
||||
p = self.puzzle_data[raw_id]
|
||||
p.index_id = str(self.index_id_counter)
|
||||
p.ready = True
|
||||
self.puzzle_pinned = ""
|
||||
self.last_checked_date = today
|
||||
self.daily_puzzle[p.index_id] = DailyPuzzleInfo(
|
||||
raw_id=raw_id,
|
||||
time=today,
|
||||
)
|
||||
self.daily_puzzle_of_date[today] = p.index_id
|
||||
|
||||
self.index_id_counter += 1
|
||||
|
||||
def fix(self):
|
||||
# 尝试修复今日的数据
|
||||
for p in self.puzzle_data.values():
|
||||
if self.is_puzzle_published(p.raw_id):
|
||||
p.ready = True
|
||||
|
||||
if self.puzzle_pinned not in self.unpublished_puzzles:
|
||||
self.puzzle_pinned = ""
|
||||
|
||||
# 撤回重复发布的题
|
||||
already_published: set[str] = set()
|
||||
for date in list(self.daily_puzzle_of_date.keys()):
|
||||
index_id = self.daily_puzzle_of_date[date]
|
||||
info = self.daily_puzzle[index_id]
|
||||
if info.raw_id in already_published:
|
||||
del self.daily_puzzle[index_id]
|
||||
del self.daily_puzzle_of_date[date]
|
||||
else:
|
||||
already_published.add(info.raw_id)
|
||||
|
||||
def admin_pin_puzzle(self, raw_id: str):
|
||||
if raw_id in self.puzzle_data:
|
||||
self.puzzle_pinned = raw_id
|
||||
else:
|
||||
self.puzzle_pinned = ""
|
||||
|
||||
def get_today_puzzle(self, strong: bool = False) -> Puzzle | None:
|
||||
today = get_today_date()
|
||||
if today in self.daily_puzzle_of_date:
|
||||
index_id = self.daily_puzzle_of_date[today]
|
||||
info = self.daily_puzzle[index_id]
|
||||
return self.puzzle_data[info.raw_id]
|
||||
if today == self.last_checked_date and not strong:
|
||||
return
|
||||
self.last_checked_date = today
|
||||
if self.puzzle_pinned and self.puzzle_pinned in self.puzzle_data:
|
||||
d = self.puzzle_pinned
|
||||
self.publish_puzzle(d)
|
||||
self.puzzle_pinned = ""
|
||||
return self.puzzle_data[d]
|
||||
elif len(self.unpublished_puzzles) > 0:
|
||||
d = random.choice(list(self.unpublished_puzzles))
|
||||
self.publish_puzzle(d)
|
||||
return self.puzzle_data[d]
|
||||
|
||||
def get_today_info(self) -> DailyPuzzleInfo | None:
|
||||
p = self.get_today_puzzle()
|
||||
if p is None:
|
||||
return
|
||||
return self.daily_puzzle[p.index_id]
|
||||
|
||||
def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage:
|
||||
p = self.get_today_puzzle()
|
||||
d = self.get_today_info()
|
||||
now = datetime.datetime.now()
|
||||
if p is None or d is None:
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=False,
|
||||
message="今天没有题哦,改天再来吧!",
|
||||
)
|
||||
if user in d.success_users:
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=False,
|
||||
message="你今天已经答对过啦!不用重复提交哦!",
|
||||
)
|
||||
if flag != p.flag:
|
||||
d.tried_users.add(user)
|
||||
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
|
||||
success=False,
|
||||
flag=flag,
|
||||
time=now,
|
||||
))
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=False,
|
||||
message="❌ 答错了,请检查你的答案哦",
|
||||
)
|
||||
d.tried_users.add(user)
|
||||
d.success_users[user] = now
|
||||
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
|
||||
success=True,
|
||||
flag=flag,
|
||||
time=now,
|
||||
))
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=True,
|
||||
rank=len(d.success_users),
|
||||
)
|
||||
|
||||
def admin_create_puzzle(self, user: str):
|
||||
p = Puzzle(
|
||||
raw_id=nanoid.generate(
|
||||
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
|
||||
12,
|
||||
),
|
||||
index_id="",
|
||||
title="示例标题",
|
||||
content="题目的内容填写内容",
|
||||
img_name="",
|
||||
author_id=user,
|
||||
flag="konaph{this_is_a_flag}",
|
||||
ready=False,
|
||||
)
|
||||
self.puzzle_data[p.raw_id] = p
|
||||
return p
|
||||
|
||||
def get_puzzles_of_user(self, user: str):
|
||||
return sorted([
|
||||
p for p in self.puzzle_data.values()
|
||||
if p.author_id == user
|
||||
], key=lambda p: p.created_at, reverse=True)
|
||||
|
||||
def get_report_yesterday(self):
|
||||
yesterday = get_today_date() - datetime.timedelta(days=1)
|
||||
index_id = self.daily_puzzle_of_date.get(yesterday)
|
||||
if index_id is None:
|
||||
return None
|
||||
info = self.daily_puzzle[index_id]
|
||||
puzzle = self.puzzle_data[info.raw_id]
|
||||
message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告")
|
||||
|
||||
if len(info.success_users) == 0:
|
||||
message = message.text(
|
||||
"\n\n昨日,竟无人解出此题!"
|
||||
)
|
||||
else:
|
||||
message = message.text(
|
||||
f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:"
|
||||
)
|
||||
us = [(u, d) for u, d in info.success_users.items()]
|
||||
us = sorted(us, key=lambda t: t[1])
|
||||
us = us[:5]
|
||||
for u, _ in us:
|
||||
m = self.submissions[puzzle.raw_id][u][-1]
|
||||
message = message.text(f"- {get_username(u)} 于 {m.time.strftime('%H:%M')}")
|
||||
|
||||
message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
|
||||
return message
|
||||
|
||||
|
||||
lock = asyncio.Lock()
|
||||
|
||||
|
||||
def read_data():
|
||||
try:
|
||||
data_raw = KONAPH_DATA_JSON.read_text()
|
||||
return PuzzleManager.model_validate_json(data_raw)
|
||||
except (FileNotFoundError, ValidationError):
|
||||
return PuzzleManager()
|
||||
|
||||
|
||||
def write_data(data: PuzzleManager):
|
||||
KONAPH_DATA_JSON.write_text(data.model_dump_json())
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def puzzle_manager():
|
||||
async with lock:
|
||||
data = read_data()
|
||||
yield data
|
||||
write_data(data)
|
||||
329
konabot/plugins/kona_ph/manager.py
Normal file
329
konabot/plugins/kona_ph/manager.py
Normal file
@ -0,0 +1,329 @@
|
||||
import datetime
|
||||
from math import ceil
|
||||
from typing import Any
|
||||
from nonebot import get_plugin_config
|
||||
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.extract_image import download_image_bytes
|
||||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||||
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager
|
||||
|
||||
PUZZLE_PAGE_SIZE = 10
|
||||
|
||||
|
||||
class PuzzleConfig(BaseModel):
|
||||
plugin_puzzle_manager: list[str] = []
|
||||
plugin_puzzle_admin: list[str] = []
|
||||
plugin_puzzle_playgroup: list[str] = []
|
||||
|
||||
|
||||
config = get_plugin_config(PuzzleConfig)
|
||||
|
||||
|
||||
def is_puzzle_manager(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
|
||||
|
||||
|
||||
def is_puzzle_admin(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_admin
|
||||
|
||||
|
||||
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
|
||||
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
|
||||
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
|
||||
|
||||
status_suffix = ""
|
||||
if puzzle.raw_id == manager.puzzle_pinned:
|
||||
status_suffix += " | 📌 已被管理员置顶"
|
||||
|
||||
msg = UniMessage.text(
|
||||
f"--- 谜题信息 ---\n"
|
||||
f"Raw ID: {puzzle.raw_id}\n"
|
||||
f"标题: {puzzle.title}\n"
|
||||
f"出题者 ID: {puzzle.author_id}\n"
|
||||
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||||
f"Flag: {puzzle.flag}\n"
|
||||
f"状态: {status}{status_suffix}\n\n"
|
||||
f"{puzzle.content}"
|
||||
)
|
||||
|
||||
if puzzle.img_name:
|
||||
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
|
||||
|
||||
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
def create_admin_commands():
|
||||
cmd_admin = on_alconna(
|
||||
Alconna(
|
||||
"konaph",
|
||||
Subcommand("create", dest="create"),
|
||||
Subcommand("ready", Args["raw_id", str], dest="ready"),
|
||||
Subcommand("unready", Args["raw_id", str], dest="unready"),
|
||||
Subcommand("info", Args["raw_id", str], dest="info"),
|
||||
Subcommand("my", Args["page?", int], dest="my"),
|
||||
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
|
||||
Subcommand("pin", Args["raw_id?", str], dest="pin"),
|
||||
Subcommand("unpin", dest="unpin"),
|
||||
Subcommand(
|
||||
"modify",
|
||||
Args["raw_id?", str],
|
||||
Option("--title", Args["title", str], alias=["-t"]),
|
||||
Option("--description", Args["description", str], alias=["-d"]),
|
||||
Option("--image", Args["image?", Image], alias=["-i"]),
|
||||
Option("--flag", Args["flag", str], alias=["-f"]),
|
||||
Option("--remove-image"),
|
||||
dest="modify",
|
||||
),
|
||||
Subcommand("publish", Args["raw_id?", str], dest="publish"),
|
||||
),
|
||||
rule=is_puzzle_manager,
|
||||
)
|
||||
|
||||
@cmd_admin.assign("$main")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
|
||||
msg = msg.text("konaph create - 创建一个新的谜题\n")
|
||||
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
|
||||
msg = msg.text("konaph unready <id> - 取消准备一道谜题\n")
|
||||
msg = msg.text("konaph info <id> - 查看谜题\n")
|
||||
msg = msg.text("konaph my <page?> - 查看我的谜题列表\n")
|
||||
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
|
||||
|
||||
if is_puzzle_admin(target):
|
||||
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
|
||||
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
|
||||
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
|
||||
msg = msg.text("konaph unpin - 取消置顶所有谜题\n")
|
||||
msg = msg.text("konaph publish <id?> - 强制发题")
|
||||
|
||||
await target.send_message(msg)
|
||||
|
||||
@cmd_admin.assign("create")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
puzzle = manager.admin_create_puzzle(target.target_id)
|
||||
await target.send_message(UniMessage.text(
|
||||
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
||||
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
||||
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
||||
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
||||
))
|
||||
|
||||
@cmd_admin.assign("ready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
if p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"题目早就准备好啦!"
|
||||
))
|
||||
p.ready = True
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经准备就绪!"
|
||||
))
|
||||
|
||||
@cmd_admin.assign("unready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
if not p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经是未取消状态了!"
|
||||
))
|
||||
if manager.is_puzzle_published(p.raw_id):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"已发布的谜题不能取消准备状态!"
|
||||
))
|
||||
|
||||
p.ready = False
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经取消准备!"
|
||||
))
|
||||
|
||||
@cmd_admin.assign("info")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这不是你的题,你没有权限查看详细信息!"
|
||||
))
|
||||
|
||||
await target.send_message(get_puzzle_info_message(manager, p))
|
||||
|
||||
@cmd_admin.assign("my")
|
||||
async def _(target: DepLongTaskTarget, page: int = 1):
|
||||
async with puzzle_manager() as manager:
|
||||
puzzles = manager.get_puzzles_of_user(target.target_id)
|
||||
if len(puzzles) == 0:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
|
||||
))
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
message = UniMessage.text("==== 我的谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
if manager.puzzle_pinned == p.raw_id:
|
||||
message = message.text("[📌]")
|
||||
if manager.is_puzzle_published(p.raw_id):
|
||||
message = message.text(f"[#{p.index_id}] ")
|
||||
elif p.ready:
|
||||
message = message.text("[✅] ")
|
||||
else:
|
||||
message = message.text("[⚙️] ")
|
||||
message = message.text(f"{p.title} ({p.raw_id})")
|
||||
message = message.text("\n")
|
||||
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||||
await target.send_message(message)
|
||||
|
||||
@cmd_admin.assign("all")
|
||||
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text("你没有权限查看所有的哦"))
|
||||
async with puzzle_manager() as manager:
|
||||
puzzles = [*manager.puzzle_data.values()]
|
||||
if ready.available:
|
||||
puzzles = [p for p in puzzles if p.ready]
|
||||
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
message = UniMessage.text("==== 所有谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
if p.raw_id == manager.puzzle_pinned:
|
||||
message = message.text("[📌]")
|
||||
if manager.is_puzzle_published(p.raw_id):
|
||||
message = message.text(f"[#{p.index_id}] ")
|
||||
elif p.ready:
|
||||
message = message.text("[✅] ")
|
||||
else:
|
||||
message = message.text("[⚙️] ")
|
||||
message = message.text(f"{p.title} ({p.raw_id} by {p.author_id})")
|
||||
message = message.text("\n")
|
||||
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||||
await target.send_message(message)
|
||||
|
||||
@cmd_admin.assign("pin")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str = ""):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id == "":
|
||||
if manager.puzzle_pinned:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
|
||||
))
|
||||
return await target.send_message("没有置顶谜题")
|
||||
if raw_id not in manager.unpublished_puzzles:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这个谜题已经发布了,或者还没准备好,或者不存在"
|
||||
))
|
||||
manager.admin_pin_puzzle(raw_id)
|
||||
return await target.send_message(f"已置顶谜题 {raw_id}")
|
||||
|
||||
@cmd_admin.assign("unpin")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async with puzzle_manager() as manager:
|
||||
manager.admin_pin_puzzle("")
|
||||
return await target.send_message("已取消所有置顶")
|
||||
|
||||
@cmd_admin.assign("modify")
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str = "",
|
||||
title: str | None = None,
|
||||
description: str | None = None,
|
||||
flag: str | None = None,
|
||||
image: Image | None = None,
|
||||
remove_image: Query[bool] = Query("modify.remove-image"),
|
||||
):
|
||||
if raw_id == "":
|
||||
return await target.send_message(
|
||||
"konaph modify <raw_id> - 修改一个谜题\n\n"
|
||||
"支持的参数:\n"
|
||||
" --title <str> 标题\n"
|
||||
" --description <str> 题目详情描述(用直引号包裹以支持多行)\n"
|
||||
" --flag <str> flag\n"
|
||||
" --image <图片> 图片\n"
|
||||
" --remove-image 删除图片"
|
||||
)
|
||||
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message("没有这个谜题")
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if not is_puzzle_admin(target) and target.target_id != p.author_id:
|
||||
return await target.send_message("你没有权限编辑这个谜题")
|
||||
if title is not None:
|
||||
p.title = title
|
||||
if description is not None:
|
||||
p.content = description
|
||||
if flag is not None:
|
||||
p.flag = flag
|
||||
if image is not None and image.url is not None:
|
||||
b = await download_image_bytes(image.url)
|
||||
p.add_image(b.unwrap())
|
||||
elif remove_image.available:
|
||||
p.remove_image()
|
||||
|
||||
info2 = get_puzzle_info_message(manager, p)
|
||||
|
||||
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
|
||||
|
||||
@cmd_admin.assign("publish")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
|
||||
today = get_today_date()
|
||||
async with puzzle_manager() as manager:
|
||||
if today in manager.daily_puzzle_of_date:
|
||||
return await target.send_message("今日已经有题了哦")
|
||||
manager.last_checked_date = today - datetime.timedelta(days=-1)
|
||||
if raw_id is not None:
|
||||
manager.admin_pin_puzzle(raw_id)
|
||||
p = manager.get_today_puzzle(strong=True)
|
||||
if p is None:
|
||||
return await target.send_message("上架失败了orz,可能是没题了")
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage())
|
||||
return await target.send_message("Ok!")
|
||||
|
||||
return cmd_admin
|
||||
@ -394,7 +394,8 @@ async def generate_dice_image(number: str) -> BytesIO:
|
||||
append_images=images[1:],
|
||||
duration=frame_durations,
|
||||
format='GIF',
|
||||
loop=1)
|
||||
loop=1,
|
||||
disposal=2)
|
||||
output.seek(0)
|
||||
# pil_final.save(output, format='PNG')
|
||||
return output
|
||||
@ -4,10 +4,13 @@ from typing import cast
|
||||
|
||||
from loguru import logger
|
||||
from nonebot import get_bot, on_request
|
||||
import nonebot
|
||||
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
|
||||
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
from konabot.common.nb.is_admin import cfg as adminConfig
|
||||
from konabot.common.username import manager
|
||||
|
||||
add_request = on_request()
|
||||
|
||||
@ -23,3 +26,15 @@ async def _(req: FriendRequestEvent):
|
||||
await req.approve(bot)
|
||||
logger.info(f"已经自动同意 {req.user_id} 的好友请求")
|
||||
|
||||
@scheduler.scheduled_job("cron", minute="*/5")
|
||||
async def _():
|
||||
logger.info("尝试更新群成员信息")
|
||||
await manager.update()
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
@driver.on_bot_connect
|
||||
async def _():
|
||||
logger.info("有 Bot 连接,5 秒后试着更新群成员信息")
|
||||
await asyncio.sleep(5)
|
||||
await manager.update()
|
||||
@ -1,23 +1,19 @@
|
||||
import aiohttp
|
||||
import asyncio as asynkio
|
||||
import datetime
|
||||
from math import ceil
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal
|
||||
from typing import Any
|
||||
|
||||
import nanoid
|
||||
import nonebot
|
||||
import ptimeparse
|
||||
from loguru import logger
|
||||
from nonebot import get_plugin_config, on_message
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.adapters.onebot.v11 import Bot as OBBot
|
||||
from nonebot.adapters.console import Bot as CBot
|
||||
from nonebot.adapters.discord import Bot as DCBot
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg, on_alconna
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget, LongTask, LongTaskTarget, create_longtask, handle_long_task, longtask_data
|
||||
from konabot.common.longtask import DepLongTaskTarget, LongTask, create_longtask, handle_long_task, longtask_data
|
||||
|
||||
evt = on_message()
|
||||
|
||||
@ -32,27 +28,8 @@ PAGE_SIZE = 6
|
||||
FMT_STRING = "%Y年%m月%d日 %H:%M:%S"
|
||||
|
||||
|
||||
class NotifyMessage(BaseModel):
|
||||
message: str
|
||||
|
||||
|
||||
class Notify(BaseModel):
|
||||
platform: Literal["console", "qq", "discord"]
|
||||
|
||||
target: str
|
||||
"需要接受通知的个体"
|
||||
|
||||
target_env: str | None
|
||||
"在哪里进行通知,如果是 None 代表私聊通知"
|
||||
|
||||
notify_time: datetime.datetime
|
||||
notify_msg: str
|
||||
|
||||
|
||||
class NotifyConfigFile(BaseModel):
|
||||
version: int = 2
|
||||
notifies: list[Notify] = []
|
||||
unsent: list[Notify] = []
|
||||
notify_channels: dict[str, str] = {}
|
||||
|
||||
|
||||
@ -78,36 +55,6 @@ async def send_notify_to_ntfy_instance(msg: str, channel: str):
|
||||
logger.info(f"访问 {url} 的结果是 {response.status}")
|
||||
|
||||
|
||||
def _get_bot_of(_type: type[Bot]):
|
||||
for bot in nonebot.get_bots().values():
|
||||
if isinstance(bot, _type):
|
||||
return bot.self_id
|
||||
return ""
|
||||
|
||||
|
||||
def get_target_from_notify(notify: Notify) -> LongTaskTarget:
|
||||
if notify.platform == "console":
|
||||
return LongTaskTarget(
|
||||
platform="console",
|
||||
self_id=_get_bot_of(CBot),
|
||||
channel_id=notify.target_env or "",
|
||||
target_id=notify.target,
|
||||
)
|
||||
if notify.platform == "discord":
|
||||
return LongTaskTarget(
|
||||
platform="discord",
|
||||
self_id=_get_bot_of(DCBot),
|
||||
channel_id=notify.target_env or "",
|
||||
target_id=notify.target,
|
||||
)
|
||||
return LongTaskTarget(
|
||||
platform="qq",
|
||||
self_id=_get_bot_of(OBBot),
|
||||
channel_id=notify.target_env or "",
|
||||
target_id=notify.target,
|
||||
)
|
||||
|
||||
|
||||
def load_notify_config() -> NotifyConfigFile:
|
||||
if not DATA_FILE_PATH.exists():
|
||||
return NotifyConfigFile()
|
||||
@ -160,40 +107,6 @@ async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget):
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
NOTIFIED_FLAG = {
|
||||
"task_added": False,
|
||||
}
|
||||
|
||||
|
||||
@driver.on_bot_connect
|
||||
async def _():
|
||||
if NOTIFIED_FLAG["task_added"]:
|
||||
return
|
||||
|
||||
NOTIFIED_FLAG["task_added"] = True
|
||||
|
||||
DELTA = 2
|
||||
logger.info(f"第一次探测到 Bot 连接,等待 {DELTA} 秒后开始通知")
|
||||
await asynkio.sleep(DELTA)
|
||||
|
||||
await DATA_FILE_LOCK.acquire()
|
||||
|
||||
cfg = load_notify_config()
|
||||
if cfg.version == 1:
|
||||
logger.info("将配置文件的版本升级为 2")
|
||||
cfg.version = 2
|
||||
else:
|
||||
for notify in [*cfg.notifies]:
|
||||
await create_longtask(
|
||||
handler=LONG_TASK_NAME,
|
||||
data={ "message": notify.notify_msg },
|
||||
target=get_target_from_notify(notify),
|
||||
deadline=notify.notify_time,
|
||||
)
|
||||
cfg.notifies = []
|
||||
save_notify_config(cfg)
|
||||
DATA_FILE_LOCK.release()
|
||||
|
||||
|
||||
@handle_long_task("TASK_SIMPLE_NOTIFY")
|
||||
async def _(task: LongTask):
|
||||
@ -284,7 +197,17 @@ cmd_notify_channel = on_alconna(Alconna(
|
||||
|
||||
@cmd_notify_channel.assign("$main")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async with DATA_FILE_LOCK:
|
||||
data = load_notify_config()
|
||||
target_channel = data.notify_channels.get(target.target_id)
|
||||
|
||||
if target_channel is None:
|
||||
channel_msg = "目前还没有配置 ntfy 地址"
|
||||
else:
|
||||
channel_msg = f"配置的 ntfy Channel 为:{target_channel}\n\n服务器地址:{config.plugin_notify_base_url}"
|
||||
|
||||
await target.send_message(UniMessage.text(
|
||||
f"{channel_msg}\n\n"
|
||||
"配置 ntfy 通知:\n\n"
|
||||
"- ntfy 创建: 启用 ntfy 通知,并为你随机生成一个通知渠道\n"
|
||||
"- ntfy 删除:禁用 ntfy 通知\n"
|
||||
|
||||
@ -75,6 +75,44 @@ async def _(msg: UniMsg, event: BaseEvent, content: Optional[str] = ""):
|
||||
other_function=lambda page: beibao_continue_handle(page, content),
|
||||
timeout=30
|
||||
)
|
||||
await evt.send(
|
||||
await UniMessage().image(raw=screenshot).export()
|
||||
)
|
||||
|
||||
async def continue_handle_3(page: Page, arg1: str, arg2: str) -> None:
|
||||
# 这里可以添加一些预处理逻辑
|
||||
# 找到 id 为 textL 的 input,id 为 textR 的 input
|
||||
input1 = await page.query_selector("#textL")
|
||||
input2 = await page.query_selector("#textR")
|
||||
if input1:
|
||||
await input1.fill(arg1)
|
||||
if input2:
|
||||
await input2.fill(arg2)
|
||||
# 等待 0.3 秒钟
|
||||
await page.wait_for_timeout(300)
|
||||
# 等待 id 为 loading 的元素不可见
|
||||
loading = await page.query_selector("#loading")
|
||||
if loading:
|
||||
await loading.wait_for_element_state("hidden")
|
||||
|
||||
evt = on_alconna(
|
||||
Alconna(
|
||||
f"BA生成",
|
||||
Args["arg1", str],
|
||||
Args["arg2", str]
|
||||
),
|
||||
use_cmd_start=True,
|
||||
use_cmd_sep=False,
|
||||
skip_for_unmatch=True,
|
||||
)
|
||||
@evt.handle()
|
||||
async def _(msg: UniMsg, event: BaseEvent, arg1: str, arg2: str):
|
||||
screenshot = await WebRenderer.render(
|
||||
url="https://tmp.nulla.top/ba-logo/",
|
||||
target="#canvas",
|
||||
other_function=lambda page: continue_handle_3(page, arg1, arg2),
|
||||
timeout=30
|
||||
)
|
||||
await evt.send(
|
||||
await UniMessage().image(raw=screenshot).export()
|
||||
)
|
||||
Reference in New Issue
Block a user