diff --git a/konabot/common/web_render/__init__.py b/konabot/common/web_render/__init__.py index 77f000e..191f178 100644 --- a/konabot/common/web_render/__init__.py +++ b/konabot/common/web_render/__init__.py @@ -1,13 +1,25 @@ import asyncio import queue +from typing import Any, Callable, Coroutine from loguru import logger -from playwright.async_api import async_playwright, Browser +from playwright.async_api import Page, Playwright, async_playwright, Browser + + +PageFunction = Callable[[Page], Coroutine[Any, Any, Any]] + class WebRenderer: browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue() @classmethod - async def render(cls, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes: + async def render( + cls, + url: str, + target: str, + params: dict = {}, + other_function: PageFunction | None = None, + timeout: int = 30, + ) -> bytes: ''' 访问指定URL并返回截图 @@ -28,21 +40,42 @@ class WebRenderer: logger.debug(f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}") return await instance.render(url, target, params=params, other_function=other_function, timeout=timeout) + class WebRendererInstance: def __init__(self): - self.playwright = None - self.browser: Browser = None - self.lock: asyncio.Lock = None + self._playwright: Playwright | None = None + self._browser: Browser | None = None + self.lock = asyncio.Lock() + + @property + def playwright(self) -> Playwright: + assert self._playwright is not None + return self._playwright + + @property + def browser(self) -> Browser: + assert self._browser is not None + return self._browser + + async def init(self): + self._playwright = await async_playwright().start() + self._browser = await self.playwright.chromium.launch(headless=True) @classmethod async def create(cls) -> "WebRendererInstance": instance = cls() - instance.playwright = await async_playwright().start() - instance.browser = await instance.playwright.chromium.launch(headless=True) - instance.lock = asyncio.Lock() + await instance.init() return instance - async def render(self, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes: + async def render( + self, + url: str, + target: str, + index: int = 0, + params: dict = {}, + other_function: PageFunction | None = None, + timeout: int = 30 + ) -> bytes: ''' 访问指定URL并返回截图 @@ -62,7 +95,7 @@ class WebRendererInstance: try: url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "") await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load") - logger.debug(f"Page loaded successfully") + logger.debug("Page loaded successfully") # 等待目标元素出现 await page.wait_for_selector(target, timeout=timeout * 1000) logger.debug(f"Target element '{target}' found, taking screenshot") @@ -75,7 +108,7 @@ class WebRendererInstance: raise Exception(f"Index {index} out of range for elements matching '{target}'.") element = elements[index] screenshot = await element.screenshot() - logger.debug(f"Screenshot taken successfully") + logger.debug("Screenshot taken successfully") return screenshot finally: await page.close() @@ -84,3 +117,4 @@ class WebRendererInstance: async def close(self): await self.browser.close() await self.playwright.stop() +