更稳定的 MarkDown 和 LaTeX 生成!
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-11-10 21:59:45 +08:00
parent e2f99af73b
commit 500053e630
3 changed files with 116 additions and 21 deletions

View File

@ -13,7 +13,7 @@ from playwright.async_api import (
)
from .config import web_render_config
from playwright.async_api import ConsoleMessage, Page
T = TypeVar("T")
TFunction = Callable[[T], Coroutine[Any, Any, Any]]
@ -99,6 +99,75 @@ class WebRenderer:
timeout=timeout,
)
@classmethod
async def render_with_persistent_page(
cls,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
"""
使用长期挂载的页面进行渲染
:param page_id: 页面唯一标识符
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
"""
instance = await cls.get_browser_instance()
logger.debug(
f"Using WebRendererInstance {id(instance)} to render with persistent page {page_id} targeting {target}"
)
return await instance.render_with_persistent_page(
page_id,
url,
target,
params=params,
other_function=other_function,
timeout=timeout,
)
@classmethod
async def get_persistent_page(cls, page_id: str, url: str) -> Page:
"""
获取长期挂载的页面,如果不存在则创建一个新的页面并存储
"""
if page_id in cls.page_pool:
return cls.page_pool[page_id]
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
instance = await cls.get_browser_instance()
if isinstance(instance, RemotePlaywrightInstance):
context = await instance.browser.new_context()
page = await context.new_page()
await page.goto(url)
cls.page_pool[page_id] = page
logger.debug(f"Created new persistent page for page_id {page_id}, navigated to {url}")
page.on('console', on_console)
return page
elif isinstance(instance, LocalPlaywrightInstance):
context = await instance.browser.new_context()
page = await context.new_page()
await page.goto(url)
cls.page_pool[page_id] = page
logger.debug(f"Created new persistent page for page_id {page_id}, navigated to {url}")
page.on('console', on_console)
return page
else:
raise NotImplementedError("Unsupported WebRendererInstance type")
@classmethod
async def close_persistent_page(cls, page_id: str) -> None:
"""
@ -136,6 +205,17 @@ class WebRendererInstance(ABC, Generic[T]):
timeout: int = 30,
) -> bytes: ...
@abstractmethod
async def render_with_persistent_page(
self,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes: ...
class PlaywrightInstance(WebRendererInstance[Page]):
def __init__(self) -> None:
@ -191,6 +271,21 @@ class PlaywrightInstance(WebRendererInstance[Page]):
file_path, target, index, params or {}, other_function, timeout
)
async def render_with_persistent_page(
self,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
page = await WebRenderer.get_persistent_page(page_id, url)
screenshot = await self.inner_render(
page, url, target, 0, params, other_function, timeout
)
return screenshot
async def inner_render(
self,
page: Page,

View File

@ -11,7 +11,7 @@ from konabot.plugins.markdown.core import MarkDownCore
def is_markdown_mentioned(msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "markdown" not in txt[:10] and "md" not in txt[:3]:
if "markdown" not in txt[:8] and "md" not in txt[:2]:
return False
return True
@ -25,7 +25,7 @@ async def _(msg: UniMsg, event: BaseEvent):
content = msg.extract_plain_text()
logger.debug(f"Received markdown command with content: {content}")
if "md" in content[:3]:
if "md" in content[:2]:
message = content.replace("md", "", 1).strip()
else:
message = content.replace("markdown", "", 1).strip()
@ -44,7 +44,7 @@ async def _(msg: UniMsg, event: BaseEvent):
def is_latex_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "latex" not in txt[:8]:
if "latex" not in txt[:5]:
return False
return True

View File

@ -8,19 +8,18 @@ class MarkDownCore:
@staticmethod
async def render_markdown(markdown_text: str, theme: str = "dark", params: dict = {}) -> bytes:
async def page_function(page: Page):
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
await page.emulate_media(color_scheme=theme)
page.on('console', on_console)
await page.locator('textarea[name=content]').fill(markdown_text)
await page.wait_for_timeout(200)
await page.locator('#button').click()
await page.wait_for_timeout(200)
# 等待 checkState 函数加载完成
await page.wait_for_function("typeof checkState === 'function'", timeout=1000)
# 访问 checkState 函数,确保渲染完成
await page.wait_for_function("checkState() === true", timeout=1000)
out = await WebRenderer.render(
out = await WebRenderer.render_with_persistent_page(
"markdown_renderer",
konaweb('markdown'),
target='#main',
other_function=page_function,
@ -32,22 +31,23 @@ class MarkDownCore:
@staticmethod
async def render_latex(text: str, theme: str = "dark") -> bytes:
params = {
"size": "2em",
"size": "2.5em",
}
async def page_function(page: Page):
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
await page.emulate_media(color_scheme=theme)
page.on('console', on_console)
page.wait_for_selector('textarea[name=content]')
await page.locator('textarea[name=content]').fill(f"$$ {text} $$")
await page.wait_for_timeout(200)
page.wait_for_selector('#button')
await page.locator('#button').click()
await page.wait_for_timeout(200)
# 等待 checkState 函数加载完成
await page.wait_for_function("typeof checkState === 'function'", timeout=2000)
# 访问 checkState 函数,确保渲染完成
await page.wait_for_function("checkState() === true", timeout=10000)
out = await WebRenderer.render(
out = await WebRenderer.render_with_persistent_page(
"latex_renderer",
konaweb('latex'),
target='#main',
other_function=page_function,