更新 web_render 模块并支持前端渲染

This commit is contained in:
2025-11-07 02:30:46 +08:00
parent 37ca4bf11f
commit c911410276
8 changed files with 445 additions and 208 deletions

View File

@ -1,211 +1,9 @@
import asyncio
import queue
from typing import Any, Callable, Coroutine
from loguru import logger
from playwright.async_api import Page, Playwright, async_playwright, Browser, Page, BrowserContext
from .config import web_render_config
from .core import WebRenderer as WebRenderer
from .core import WebRendererInstance as WebRendererInstance
PageFunction = Callable[[Page], Coroutine[Any, Any, Any]]
class WebRenderer:
browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue()
context_pool: dict[int, BrowserContext] = {} # 长期挂载的浏览器上下文池
page_pool: dict[str, Page] = {} # 长期挂载的页面池
@classmethod
async def get_browser_instance(cls) -> "WebRendererInstance":
if cls.browser_pool.empty():
instance = await WebRendererInstance.create()
cls.browser_pool.put(instance)
instance = cls.browser_pool.get()
cls.browser_pool.put(instance)
return instance
@classmethod
async def get_browser_context(cls) -> BrowserContext:
instance = await cls.get_browser_instance()
if id(instance) not in cls.context_pool:
context = await instance.browser.new_context()
cls.context_pool[id(instance)] = context
logger.debug(f"Created new persistent browser context for WebRendererInstance {id(instance)}")
return cls.context_pool[id(instance)]
@classmethod
async def render(
cls,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
'''
访问指定URL并返回截图
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
instance = await cls.get_browser_instance()
logger.debug(f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}")
return await instance.render(url, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def render_persistent_page(cls, page_id: str, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
'''
使用长期挂载的页面访问指定URL并返回截图
:param page_id: 页面唯一标识符
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
logger.debug(f"Requesting persistent render for page_id {page_id} at {url} targeting {target} with timeout {timeout}")
instance = await cls.get_browser_instance()
if page_id not in cls.page_pool:
context = await cls.get_browser_context()
page = await context.new_page()
cls.page_pool[page_id] = page
logger.debug(f"Created new persistent page for page_id {page_id} using WebRendererInstance {id(instance)}")
page = cls.page_pool[page_id]
return await instance.render_with_page(page, url, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def render_file(
cls,
file_path: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
'''
访问指定本地文件URL并返回截图
:param file_path: 目标文件路径
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
instance = await cls.get_browser_instance()
logger.debug(f"Using WebRendererInstance {id(instance)} to render file {file_path} targeting {target}")
return await instance.render_file(file_path, target, params=params, other_function=other_function, timeout=timeout)
@classmethod
async def close_persistent_page(cls, page_id: str) -> None:
'''
关闭并移除长期挂载的页面
:param page_id: 页面唯一标识符
'''
if page_id in cls.page_pool:
page = cls.page_pool[page_id]
await page.close()
del cls.page_pool[page_id]
logger.debug(f"Closed and removed persistent page for page_id {page_id}")
class WebRendererInstance:
def __init__(self):
self._playwright: Playwright | None = None
self._browser: Browser | None = None
self.lock = asyncio.Lock()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None
return self._browser
async def init(self):
self._playwright = await async_playwright().start()
self._browser = await self.playwright.chromium.launch(headless=True)
@classmethod
async def create(cls) -> "WebRendererInstance":
instance = cls()
await instance.init()
return instance
async def render(
self,
url: str,
target: str,
index: int = 0,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30
) -> bytes:
'''
访问指定URL并返回截图
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param index: 如果目标是一个列表,指定要截图的元素索引
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
async with self.lock:
context = await self.browser.new_context()
page = await context.new_page()
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
await page.close()
await context.close()
return screenshot
async def render_with_page(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
async with self.lock:
screenshot = await self.inner_render(page, url, target, index, params, other_function, timeout)
return screenshot
async def render_file(self, file_path: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
file_path = "file:///" + str(file_path).replace("\\", "/")
return await self.render(file_path, target, index, params, other_function, timeout)
async def inner_render(self, page: Page, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
logger.debug(f"Navigating to {url} with timeout {timeout}")
url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "")
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
logger.debug("Page loaded successfully")
# 等待目标元素出现
await page.wait_for_selector(target, timeout=timeout * 1000)
logger.debug(f"Target element '{target}' found, taking screenshot")
if other_function:
await other_function(page)
elements = await page.query_selector_all(target)
if not elements:
logger.error(f"Target element '{target}' not found on the page.")
return None
if index >= len(elements):
logger.error(f"Index {index} out of range for elements matching '{target}'")
return None
element = elements[index]
screenshot = await element.screenshot()
logger.debug(f"Screenshot taken successfully")
return screenshot
async def close(self):
await self.browser.close()
await self.playwright.stop()
def konaweb(sub_url: str):
sub_url = '/' + sub_url.removeprefix('/')
return web_render_config.module_web_render_weburl.removesuffix('/') + sub_url