Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ae6297b98d | |||
| dacae29054 | |||
| 8acb546c6a | |||
| 49e0914416 | |||
| 5b74c78ec3 | |||
| c911410276 | |||
| 37ca4bf11f | |||
| 8ef084c22a | |||
| 57f0cd728f | |||
| 627a29f57e | |||
| 650c500f47 | |||
| 86acbe51e9 | |||
| 4900a7e0ad | |||
| 34da08126b | |||
| 00f416c8bc | |||
| 9c7d0a4486 | |||
| e3b9d6723f |
@ -67,6 +67,10 @@ code .
|
||||
|
||||
详见[LLM 配置文档](/docs/LLM.md)。
|
||||
|
||||
#### 配置 konabot-web 以支持更高级的图片渲染
|
||||
|
||||
详见[konabot-web 配置文档](/docs/konabot-web.md)
|
||||
|
||||
### 运行
|
||||
|
||||
使用命令行手动启动 Bot:
|
||||
|
||||
BIN
assets/img/meme/doubao.png
Executable file
BIN
assets/img/meme/doubao.png
Executable file
Binary file not shown.
|
After Width: | Height: | Size: 8.0 KiB |
BIN
assets/img/meme/kiosay.jpg
Executable file
BIN
assets/img/meme/kiosay.jpg
Executable file
Binary file not shown.
|
After Width: | Height: | Size: 71 KiB |
18
docs/konabot-web.md
Normal file
18
docs/konabot-web.md
Normal file
@ -0,0 +1,18 @@
|
||||
# konabot-web 配置文档
|
||||
|
||||
本文档教你配置一个此方 Bot 的 Web 服务器。
|
||||
|
||||
## 安装并运行 konabot-web
|
||||
|
||||
按照 [konabot-web README](https://gitea.service.jazzwhom.top/mttu-developers/konabot-web) 安装并运行 konabot-web 实例。
|
||||
|
||||
## 指定 konabot-web 实例地址
|
||||
|
||||
如果你的 Web 服务器的端口不是 5173,或者你有特殊的网络结构,你需要手动设置 konabot-web。编辑 `.env` 文件:
|
||||
|
||||
```
|
||||
MODULE_WEB_RENDER_WEBURL=http://web-server:port
|
||||
MODULE_WEB_RENDER_INSTANCE=http://konabot-server:port
|
||||
```
|
||||
|
||||
替换 web-server 为你的前端服务器地址,konabot-server 为后端服务器地址,port 为端口号。
|
||||
@ -8,21 +8,38 @@ import nonebot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.adapters import Bot, Event, Message
|
||||
from nonebot.adapters.discord import Bot as DiscordBot
|
||||
from nonebot.adapters.discord import GuildMessageCreateEvent as DiscordMessageEvent
|
||||
from nonebot.adapters.discord import MessageEvent as DiscordMessageEvent
|
||||
from nonebot.adapters.discord.config import Config as DiscordConfig
|
||||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
|
||||
import nonebot.params
|
||||
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||
from PIL import UnidentifiedImageError
|
||||
from pydantic import BaseModel
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
|
||||
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||
|
||||
discordConfig = nonebot.get_plugin_config(DiscordConfig)
|
||||
|
||||
|
||||
class ExtractImageConfig(BaseModel):
|
||||
module_extract_image_no_download: bool = False
|
||||
"要不要算了,不下载了,直接爆炸算了,适用于一些比较奇怪的网络环境,无法从协议端下载文件"
|
||||
|
||||
|
||||
module_config = nonebot.get_plugin_config(ExtractImageConfig)
|
||||
|
||||
|
||||
async def download_image_bytes(url: str, proxy: str | None = None) -> Result[bytes, str]:
|
||||
# if "/matcha/cache/" in url:
|
||||
# url = url.replace('127.0.0.1', '10.126.126.101')
|
||||
if module_config.module_extract_image_no_download:
|
||||
return Success((ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes())
|
||||
logger.debug(f"开始从 {url} 下载图片")
|
||||
async with httpx.AsyncClient() as c:
|
||||
async with httpx.AsyncClient(proxy=proxy) as c:
|
||||
try:
|
||||
response = await c.get(url)
|
||||
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
|
||||
@ -127,8 +144,8 @@ async def extract_image_from_message(
|
||||
for a in evt.attachments:
|
||||
if "image/" not in a.content_type:
|
||||
continue
|
||||
url = a.url
|
||||
return (await download_image_bytes(url)).bind(bytes_to_pil)
|
||||
url = a.proxy_url
|
||||
return (await download_image_bytes(url, discordConfig.discord_proxy)).bind(bytes_to_pil)
|
||||
|
||||
for seg in UniMessage.of(msg, bot):
|
||||
logger.info(seg)
|
||||
|
||||
17
konabot/common/utils/to_async.py
Normal file
17
konabot/common/utils/to_async.py
Normal file
@ -0,0 +1,17 @@
|
||||
import asyncio
|
||||
import functools
|
||||
|
||||
from typing import Awaitable, Callable, ParamSpec, TypeVar
|
||||
|
||||
|
||||
TA = ParamSpec("TA")
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def make_async(func: Callable[TA, T]) -> Callable[TA, Awaitable[T]]:
|
||||
@functools.wraps(func, assigned=("__module__", "__name__", "__qualname__", "__doc__", "__annotations__"))
|
||||
async def wrapper(*args: TA.args, **kwargs: TA.kwargs):
|
||||
return await asyncio.to_thread(func, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
@ -1,211 +1,9 @@
|
||||
import asyncio
|
||||
import queue
|
||||
from typing import Any, Callable, Coroutine
|
||||
from loguru import logger
|
||||
from playwright.async_api import Page, Playwright, async_playwright, Browser, Page, BrowserContext
|
||||
from .config import web_render_config
|
||||
from .core import WebRenderer as WebRenderer
|
||||
from .core import WebRendererInstance as WebRendererInstance
|
||||
|
||||
|
||||
PageFunction = Callable[[Page], Coroutine[Any, Any, Any]]
|
||||
|
||||
|
||||
class WebRenderer:
|
||||
browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue()
|
||||
context_pool: dict[int, BrowserContext] = {} # 长期挂载的浏览器上下文池
|
||||
page_pool: dict[str, Page] = {} # 长期挂载的页面池
|
||||
|
||||
@classmethod
|
||||
async def get_browser_instance(cls) -> "WebRendererInstance":
|
||||
if cls.browser_pool.empty():
|
||||
instance = await WebRendererInstance.create()
|
||||
cls.browser_pool.put(instance)
|
||||
instance = cls.browser_pool.get()
|
||||
cls.browser_pool.put(instance)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
async def get_browser_context(cls) -> BrowserContext:
|
||||
instance = await cls.get_browser_instance()
|
||||
if id(instance) not in cls.context_pool:
|
||||
context = await instance.browser.new_context()
|
||||
cls.context_pool[id(instance)] = context
|
||||
logger.debug(f"Created new persistent browser context for WebRendererInstance {id(instance)}")
|
||||
return cls.context_pool[id(instance)]
|
||||
|
||||
@classmethod
|
||||
async def render(
|
||||
cls,
|
||||
url: str,
|
||||
target: str,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
'''
|
||||
访问指定URL并返回截图
|
||||
|
||||
:param url: 目标URL
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
'''
|
||||
instance = await cls.get_browser_instance()
|
||||
logger.debug(f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}")
|
||||
return await instance.render(url, target, params=params, other_function=other_function, timeout=timeout)
|
||||
|
||||
|
||||
@classmethod
|
||||
async def render_persistent_page(cls, page_id: str, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
'''
|
||||
使用长期挂载的页面访问指定URL并返回截图
|
||||
|
||||
:param page_id: 页面唯一标识符
|
||||
:param url: 目标URL
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
'''
|
||||
logger.debug(f"Requesting persistent render for page_id {page_id} at {url} targeting {target} with timeout {timeout}")
|
||||
instance = await cls.get_browser_instance()
|
||||
if page_id not in cls.page_pool:
|
||||
context = await cls.get_browser_context()
|
||||
page = await context.new_page()
|
||||
cls.page_pool[page_id] = page
|
||||
logger.debug(f"Created new persistent page for page_id {page_id} using WebRendererInstance {id(instance)}")
|
||||
page = cls.page_pool[page_id]
|
||||
return await instance.render_with_page(page, url, target, params=params, other_function=other_function, timeout=timeout)
|
||||
|
||||
@classmethod
|
||||
async def render_file(
|
||||
cls,
|
||||
file_path: str,
|
||||
target: str,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
'''
|
||||
访问指定本地文件URL并返回截图
|
||||
|
||||
:param file_path: 目标文件路径
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
'''
|
||||
instance = await cls.get_browser_instance()
|
||||
logger.debug(f"Using WebRendererInstance {id(instance)} to render file {file_path} targeting {target}")
|
||||
return await instance.render_file(file_path, target, params=params, other_function=other_function, timeout=timeout)
|
||||
|
||||
@classmethod
|
||||
async def close_persistent_page(cls, page_id: str) -> None:
|
||||
'''
|
||||
关闭并移除长期挂载的页面
|
||||
|
||||
:param page_id: 页面唯一标识符
|
||||
'''
|
||||
if page_id in cls.page_pool:
|
||||
page = cls.page_pool[page_id]
|
||||
await page.close()
|
||||
del cls.page_pool[page_id]
|
||||
logger.debug(f"Closed and removed persistent page for page_id {page_id}")
|
||||
|
||||
|
||||
|
||||
class WebRendererInstance:
|
||||
def __init__(self):
|
||||
self._playwright: Playwright | None = None
|
||||
self._browser: Browser | None = None
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
@property
|
||||
def playwright(self) -> Playwright:
|
||||
assert self._playwright is not None
|
||||
return self._playwright
|
||||
|
||||
@property
|
||||
def browser(self) -> Browser:
|
||||
assert self._browser is not None
|
||||
return self._browser
|
||||
|
||||
async def init(self):
|
||||
self._playwright = await async_playwright().start()
|
||||
self._browser = await self.playwright.chromium.launch(headless=True)
|
||||
|
||||
@classmethod
|
||||
async def create(cls) -> "WebRendererInstance":
|
||||
instance = cls()
|
||||
await instance.init()
|
||||
return instance
|
||||
|
||||
async def render(
|
||||
self,
|
||||
url: str,
|
||||
target: str,
|
||||
index: int = 0,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30
|
||||
) -> bytes:
|
||||
'''
|
||||
访问指定URL并返回截图
|
||||
|
||||
:param url: 目标URL
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param index: 如果目标是一个列表,指定要截图的元素索引
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
'''
|
||||
async with self.lock:
|
||||
context = await self.browser.new_context()
|
||||
page = await context.new_page()
|
||||
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
|
||||
await page.close()
|
||||
await context.close()
|
||||
return screenshot
|
||||
|
||||
async def render_with_page(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
async with self.lock:
|
||||
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
|
||||
return screenshot
|
||||
|
||||
async def render_file(self, file_path: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
file_path = "file:///" + str(file_path).replace("\\", "/")
|
||||
return await self.render(file_path, target, index, params, other_function, timeout)
|
||||
|
||||
async def inner_render(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||||
logger.debug(f"Navigating to {url} with timeout {timeout}")
|
||||
url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "")
|
||||
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
|
||||
logger.debug("Page loaded successfully")
|
||||
# 等待目标元素出现
|
||||
await page.wait_for_selector(target, timeout=timeout * 1000)
|
||||
logger.debug(f"Target element '{target}' found, taking screenshot")
|
||||
if other_function:
|
||||
await other_function(page)
|
||||
elements = await page.query_selector_all(target)
|
||||
if not elements:
|
||||
logger.error(f"Target element '{target}' not found on the page.")
|
||||
return None
|
||||
if index >= len(elements):
|
||||
logger.error(f"Index {index} out of range for elements matching '{target}'")
|
||||
return None
|
||||
element = elements[index]
|
||||
screenshot = await element.screenshot()
|
||||
logger.debug(f"Screenshot taken successfully")
|
||||
return screenshot
|
||||
|
||||
async def close(self):
|
||||
await self.browser.close()
|
||||
await self.playwright.stop()
|
||||
def konaweb(sub_url: str):
|
||||
sub_url = '/' + sub_url.removeprefix('/')
|
||||
return web_render_config.module_web_render_weburl.removesuffix('/') + sub_url
|
||||
|
||||
|
||||
19
konabot/common/web_render/config.py
Normal file
19
konabot/common/web_render/config.py
Normal file
@ -0,0 +1,19 @@
|
||||
import nonebot
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
class Config(BaseModel):
|
||||
module_web_render_weburl: str = "localhost:5173"
|
||||
module_web_render_instance: str = ""
|
||||
|
||||
def get_instance_baseurl(self):
|
||||
if self.module_web_render_instance:
|
||||
return self.module_web_render_instance.removesuffix('/')
|
||||
config = nonebot.get_driver().config
|
||||
ip = str(config.host)
|
||||
if ip == "0.0.0.0":
|
||||
ip = "127.0.0.1"
|
||||
port = config.port
|
||||
return f'http://{ip}:{port}'
|
||||
|
||||
web_render_config = nonebot.get_plugin_config(Config)
|
||||
281
konabot/common/web_render/core.py
Normal file
281
konabot/common/web_render/core.py
Normal file
@ -0,0 +1,281 @@
|
||||
import asyncio
|
||||
import queue
|
||||
from typing import Any, Callable, Coroutine
|
||||
from loguru import logger
|
||||
from playwright.async_api import (
|
||||
Page,
|
||||
Playwright,
|
||||
async_playwright,
|
||||
Browser,
|
||||
BrowserContext,
|
||||
)
|
||||
|
||||
|
||||
PageFunction = Callable[[Page], Coroutine[Any, Any, Any]]
|
||||
|
||||
|
||||
class WebRenderer:
|
||||
browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue()
|
||||
context_pool: dict[int, BrowserContext] = {} # 长期挂载的浏览器上下文池
|
||||
page_pool: dict[str, Page] = {} # 长期挂载的页面池
|
||||
|
||||
@classmethod
|
||||
async def get_browser_instance(cls) -> "WebRendererInstance":
|
||||
if cls.browser_pool.empty():
|
||||
instance = await WebRendererInstance.create()
|
||||
cls.browser_pool.put(instance)
|
||||
instance = cls.browser_pool.get()
|
||||
cls.browser_pool.put(instance)
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
async def get_browser_context(cls) -> BrowserContext:
|
||||
instance = await cls.get_browser_instance()
|
||||
if id(instance) not in cls.context_pool:
|
||||
context = await instance.browser.new_context()
|
||||
cls.context_pool[id(instance)] = context
|
||||
logger.debug(
|
||||
f"Created new persistent browser context for WebRendererInstance {id(instance)}"
|
||||
)
|
||||
return cls.context_pool[id(instance)]
|
||||
|
||||
@classmethod
|
||||
async def render(
|
||||
cls,
|
||||
url: str,
|
||||
target: str,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
"""
|
||||
访问指定URL并返回截图
|
||||
|
||||
:param url: 目标URL
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
"""
|
||||
instance = await cls.get_browser_instance()
|
||||
logger.debug(
|
||||
f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}"
|
||||
)
|
||||
return await instance.render(
|
||||
url, target, params=params, other_function=other_function, timeout=timeout
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def render_persistent_page(
|
||||
cls,
|
||||
page_id: str,
|
||||
url: str,
|
||||
target: str,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
"""
|
||||
使用长期挂载的页面访问指定URL并返回截图
|
||||
|
||||
:param page_id: 页面唯一标识符
|
||||
:param url: 目标URL
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
"""
|
||||
logger.debug(
|
||||
f"Requesting persistent render for page_id {page_id} at {url} targeting {target} with timeout {timeout}"
|
||||
)
|
||||
instance = await cls.get_browser_instance()
|
||||
if page_id not in cls.page_pool:
|
||||
context = await cls.get_browser_context()
|
||||
page = await context.new_page()
|
||||
cls.page_pool[page_id] = page
|
||||
logger.debug(
|
||||
f"Created new persistent page for page_id {page_id} using WebRendererInstance {id(instance)}"
|
||||
)
|
||||
page = cls.page_pool[page_id]
|
||||
return await instance.render_with_page(
|
||||
page,
|
||||
url,
|
||||
target,
|
||||
params=params,
|
||||
other_function=other_function,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def render_file(
|
||||
cls,
|
||||
file_path: str,
|
||||
target: str,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
"""
|
||||
访问指定本地文件URL并返回截图
|
||||
|
||||
:param file_path: 目标文件路径
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
"""
|
||||
instance = await cls.get_browser_instance()
|
||||
logger.debug(
|
||||
f"Using WebRendererInstance {id(instance)} to render file {file_path} targeting {target}"
|
||||
)
|
||||
return await instance.render_file(
|
||||
file_path,
|
||||
target,
|
||||
params=params,
|
||||
other_function=other_function,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def close_persistent_page(cls, page_id: str) -> None:
|
||||
"""
|
||||
关闭并移除长期挂载的页面
|
||||
|
||||
:param page_id: 页面唯一标识符
|
||||
"""
|
||||
if page_id in cls.page_pool:
|
||||
page = cls.page_pool[page_id]
|
||||
await page.close()
|
||||
del cls.page_pool[page_id]
|
||||
logger.debug(f"Closed and removed persistent page for page_id {page_id}")
|
||||
|
||||
|
||||
class WebRendererInstance:
|
||||
def __init__(self):
|
||||
self._playwright: Playwright | None = None
|
||||
self._browser: Browser | None = None
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
@property
|
||||
def playwright(self) -> Playwright:
|
||||
assert self._playwright is not None
|
||||
return self._playwright
|
||||
|
||||
@property
|
||||
def browser(self) -> Browser:
|
||||
assert self._browser is not None
|
||||
return self._browser
|
||||
|
||||
async def init(self):
|
||||
self._playwright = await async_playwright().start()
|
||||
self._browser = await self.playwright.chromium.launch(headless=True)
|
||||
|
||||
@classmethod
|
||||
async def create(cls) -> "WebRendererInstance":
|
||||
instance = cls()
|
||||
await instance.init()
|
||||
return instance
|
||||
|
||||
async def render(
|
||||
self,
|
||||
url: str,
|
||||
target: str,
|
||||
index: int = 0,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
"""
|
||||
访问指定URL并返回截图
|
||||
|
||||
:param url: 目标URL
|
||||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||||
:param timeout: 页面加载超时时间,单位秒
|
||||
:param index: 如果目标是一个列表,指定要截图的元素索引
|
||||
:param params: URL键值对参数
|
||||
:param other_function: 其他自定义操作函数,接受page参数
|
||||
:return: 截图的字节数据
|
||||
|
||||
"""
|
||||
async with self.lock:
|
||||
context = await self.browser.new_context()
|
||||
page = await context.new_page()
|
||||
screenshot = await self.inner_render(
|
||||
page, url, target, index, params, other_function, timeout
|
||||
)
|
||||
await page.close()
|
||||
await context.close()
|
||||
return screenshot
|
||||
|
||||
async def render_with_page(
|
||||
self,
|
||||
page: Page,
|
||||
url: str,
|
||||
target: str,
|
||||
index: int = 0,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
async with self.lock:
|
||||
screenshot = await self.inner_render(
|
||||
page, url, target, index, params, other_function, timeout
|
||||
)
|
||||
return screenshot
|
||||
|
||||
async def render_file(
|
||||
self,
|
||||
file_path: str,
|
||||
target: str,
|
||||
index: int = 0,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
file_path = "file:///" + str(file_path).replace("\\", "/")
|
||||
return await self.render(
|
||||
file_path, target, index, params, other_function, timeout
|
||||
)
|
||||
|
||||
async def inner_render(
|
||||
self,
|
||||
page: Page,
|
||||
url: str,
|
||||
target: str,
|
||||
index: int = 0,
|
||||
params: dict = {},
|
||||
other_function: PageFunction | None = None,
|
||||
timeout: int = 30,
|
||||
) -> bytes:
|
||||
logger.debug(f"Navigating to {url} with timeout {timeout}")
|
||||
url_with_params = url + (
|
||||
"?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else ""
|
||||
)
|
||||
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
|
||||
logger.debug("Page loaded successfully")
|
||||
# 等待目标元素出现
|
||||
await page.wait_for_selector(target, timeout=timeout * 1000)
|
||||
logger.debug(f"Target element '{target}' found, taking screenshot")
|
||||
if other_function:
|
||||
await other_function(page)
|
||||
elements = await page.query_selector_all(target)
|
||||
if not elements:
|
||||
logger.warning(f"Target element '{target}' not found on the page.")
|
||||
elements = await page.query_selector_all('body')
|
||||
if index >= len(elements):
|
||||
logger.warning(f"Index {index} out of range for elements matching '{target}'")
|
||||
index = 0
|
||||
element = elements[index]
|
||||
screenshot = await element.screenshot()
|
||||
logger.debug("Screenshot taken successfully")
|
||||
return screenshot
|
||||
|
||||
async def close(self):
|
||||
await self.browser.close()
|
||||
await self.playwright.stop()
|
||||
66
konabot/common/web_render/host_images.py
Normal file
66
konabot/common/web_render/host_images.py
Normal file
@ -0,0 +1,66 @@
|
||||
import asyncio
|
||||
import tempfile
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
from fastapi import HTTPException
|
||||
from fastapi.responses import FileResponse
|
||||
import nanoid
|
||||
import nonebot
|
||||
|
||||
from nonebot.drivers.fastapi import Driver as FastAPIDriver
|
||||
|
||||
from .config import web_render_config
|
||||
|
||||
app = cast(FastAPIDriver, nonebot.get_driver()).asgi
|
||||
|
||||
hosted_tempdirs: dict[str, Path] = {}
|
||||
hosted_tempdirs_lock = asyncio.Lock()
|
||||
|
||||
|
||||
@dataclass
|
||||
class TempDir:
|
||||
path: Path
|
||||
url_base: str
|
||||
|
||||
def url_of(self, file: Path):
|
||||
assert file.is_relative_to(self.path)
|
||||
relative_path = file.relative_to(self.path)
|
||||
url_path_segment = str(relative_path).replace("\\", "/")
|
||||
return f"{self.url_base}/{url_path_segment}"
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def host_tempdir():
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
fp = Path(tempdir)
|
||||
nid = nanoid.generate(size=10)
|
||||
async with hosted_tempdirs_lock:
|
||||
hosted_tempdirs[nid] = fp
|
||||
yield TempDir(
|
||||
path=fp,
|
||||
url_base=f"{web_render_config.get_instance_baseurl()}/tempdir/{nid}",
|
||||
)
|
||||
async with hosted_tempdirs_lock:
|
||||
del hosted_tempdirs[nid]
|
||||
|
||||
|
||||
@app.get("/tempdir/{nid}/{file_path:path}")
|
||||
async def _(nid: str, file_path: str):
|
||||
async with hosted_tempdirs_lock:
|
||||
base_path = hosted_tempdirs.get(nid)
|
||||
if base_path is None:
|
||||
raise HTTPException(404)
|
||||
full_path = base_path / file_path
|
||||
try:
|
||||
if not full_path.resolve().is_relative_to(base_path.resolve()):
|
||||
raise HTTPException(status_code=403, detail="Access denied.")
|
||||
except Exception:
|
||||
raise HTTPException(status_code=403, detail="Access denied.")
|
||||
if not full_path.is_file():
|
||||
raise HTTPException(status_code=404, detail="File not found.")
|
||||
|
||||
return FileResponse(full_path.resolve())
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
from io import BytesIO
|
||||
from typing import Iterable, cast
|
||||
|
||||
from loguru import logger
|
||||
from nonebot import on_message
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
@ -14,8 +15,12 @@ from nonebot_plugin_alconna import (
|
||||
UniMsg,
|
||||
on_alconna,
|
||||
)
|
||||
from playwright.async_api import ConsoleMessage, Page
|
||||
|
||||
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
|
||||
from konabot.common.web_render import konaweb
|
||||
from konabot.common.web_render.core import WebRenderer
|
||||
from konabot.common.web_render.host_images import host_tempdir
|
||||
from konabot.plugins.memepack.drawing.display import (
|
||||
draw_cao_display,
|
||||
draw_snaur_display,
|
||||
@ -24,10 +29,12 @@ from konabot.plugins.memepack.drawing.display import (
|
||||
from konabot.plugins.memepack.drawing.saying import (
|
||||
draw_cute_ten,
|
||||
draw_geimao,
|
||||
draw_kiosay,
|
||||
draw_mnk,
|
||||
draw_pt,
|
||||
draw_suan,
|
||||
)
|
||||
from konabot.plugins.memepack.drawing.watermark import draw_doubao_watermark
|
||||
|
||||
from nonebot.adapters import Bot, Event
|
||||
|
||||
@ -275,3 +282,77 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
|
||||
.export()
|
||||
)
|
||||
|
||||
|
||||
kiosay = on_alconna(
|
||||
Alconna(
|
||||
"西多说",
|
||||
Args[
|
||||
"saying",
|
||||
MultiVar(str, "+"),
|
||||
Field(missing_tips=lambda: "你没有写西多说了什么"),
|
||||
],
|
||||
),
|
||||
use_cmd_start=True,
|
||||
use_cmd_sep=False,
|
||||
skip_for_unmatch=False,
|
||||
aliases=set(),
|
||||
)
|
||||
|
||||
|
||||
@kiosay.handle()
|
||||
async def _(saying: list[str]):
|
||||
img = await draw_kiosay("\n".join(saying))
|
||||
img_bytes = BytesIO()
|
||||
img.save(img_bytes, format="PNG")
|
||||
|
||||
await kiosay.send(await UniMessage().image(raw=img_bytes).export())
|
||||
|
||||
|
||||
quote_cmd = on_alconna(Alconna(
|
||||
"名人名言",
|
||||
Args["quote", str],
|
||||
Args["author", str],
|
||||
Args["image?", Image | None],
|
||||
), aliases={"quote"})
|
||||
|
||||
@quote_cmd.handle()
|
||||
async def _(quote: str, author: str, img: PIL_Image):
|
||||
async with host_tempdir() as tempdir:
|
||||
img_path = tempdir.path / "image.png"
|
||||
img_url = tempdir.url_of(img_path)
|
||||
img.save(img_path)
|
||||
|
||||
async def page_function(page: Page):
|
||||
async def on_console(msg: ConsoleMessage):
|
||||
logger.debug(f"WEB CONSOLE {msg.text}")
|
||||
|
||||
page.on('console', on_console)
|
||||
|
||||
await page.locator('input[name=image]').fill(img_url)
|
||||
await page.locator('input[name=quote]').fill(quote)
|
||||
await page.locator('input[name=author]').fill(author)
|
||||
|
||||
await page.wait_for_timeout(500)
|
||||
await page.wait_for_load_state('networkidle')
|
||||
await page.wait_for_timeout(500)
|
||||
|
||||
out = await WebRenderer.render(
|
||||
konaweb('makequote'),
|
||||
target='#main',
|
||||
other_function=page_function,
|
||||
)
|
||||
await quote_cmd.send(await UniMessage().image(raw=out).export())
|
||||
|
||||
|
||||
doubao_cmd = on_alconna(Alconna(
|
||||
"豆包水印",
|
||||
Args["image?", Image | None],
|
||||
))
|
||||
|
||||
|
||||
@doubao_cmd.handle()
|
||||
async def _(img: PIL_Image):
|
||||
result = await draw_doubao_watermark(img)
|
||||
result_bytes = BytesIO()
|
||||
result.save(result_bytes, format="PNG")
|
||||
await doubao_cmd.send(await UniMessage().image(raw=result_bytes).export())
|
||||
|
||||
@ -5,6 +5,7 @@ import imagetext_py
|
||||
import PIL.Image
|
||||
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
from konabot.common.utils.to_async import make_async
|
||||
|
||||
from .base.fonts import HARMONYOS_SANS_SC_BLACK, HARMONYOS_SANS_SC_REGULAR, LXGWWENKAI_REGULAR
|
||||
|
||||
@ -14,6 +15,7 @@ mnk_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "mnksay.jpg").convert(
|
||||
dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert("RGBA")
|
||||
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
|
||||
cute_ten_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "tententen.png").convert("RGBA")
|
||||
kio_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "kiosay.jpg").convert("RGBA")
|
||||
|
||||
|
||||
def _draw_geimao(saying: str):
|
||||
@ -29,7 +31,7 @@ def _draw_geimao(saying: str):
|
||||
draw_emojis=True,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
|
||||
async def draw_geimao(saying: str):
|
||||
return await asyncio.to_thread(_draw_geimao, saying)
|
||||
@ -106,3 +108,18 @@ def _draw_cute_ten(saying: str):
|
||||
|
||||
async def draw_cute_ten(saying: str):
|
||||
return await asyncio.to_thread(_draw_cute_ten, saying)
|
||||
|
||||
|
||||
@make_async
|
||||
def draw_kiosay(saying: str):
|
||||
img = kio_image.copy()
|
||||
with imagetext_py.Writer(img) as iw:
|
||||
iw.draw_text_wrapped(
|
||||
saying, 450, 540, 0.5, 0.5, 900, 96, LXGWWENKAI_REGULAR,
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||
1.0,
|
||||
imagetext_py.TextAlign.Center,
|
||||
draw_emojis=True,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
15
konabot/plugins/memepack/drawing/watermark.py
Normal file
15
konabot/plugins/memepack/drawing/watermark.py
Normal file
@ -0,0 +1,15 @@
|
||||
import PIL
|
||||
import PIL.Image
|
||||
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
from konabot.common.utils.to_async import make_async
|
||||
|
||||
doubao_watermark = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "doubao.png").convert("RGBA").resize((140, 40))
|
||||
|
||||
|
||||
@make_async
|
||||
def draw_doubao_watermark(base: PIL.Image.Image) -> PIL.Image.Image:
|
||||
base = base.copy().convert("RGBA")
|
||||
base.alpha_composite(doubao_watermark, (base.size[0] - 160, base.size[1] - 60))
|
||||
return base
|
||||
|
||||
Reference in New Issue
Block a user