87 lines
3.8 KiB
Python
87 lines
3.8 KiB
Python
import asyncio
|
||
import queue
|
||
from loguru import logger
|
||
from playwright.async_api import async_playwright, Browser
|
||
|
||
class WebRenderer:
|
||
browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue()
|
||
|
||
@classmethod
|
||
async def render(cls, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||
'''
|
||
访问指定URL并返回截图
|
||
|
||
:param url: 目标URL
|
||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||
:param timeout: 页面加载超时时间,单位秒
|
||
:param params: URL键值对参数
|
||
:param other_function: 其他自定义操作函数,接受page参数
|
||
:return: 截图的字节数据
|
||
|
||
'''
|
||
logger.debug(f"Requesting render for {url} targeting {target} with timeout {timeout}")
|
||
if cls.browser_pool.empty():
|
||
instance = await WebRendererInstance.create()
|
||
cls.browser_pool.put(instance)
|
||
instance = cls.browser_pool.get()
|
||
cls.browser_pool.put(instance)
|
||
logger.debug(f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}")
|
||
return await instance.render(url, target, params=params, other_function=other_function, timeout=timeout)
|
||
|
||
class WebRendererInstance:
|
||
def __init__(self):
|
||
self.playwright = None
|
||
self.browser: Browser = None
|
||
self.lock: asyncio.Lock = None
|
||
|
||
@classmethod
|
||
async def create(cls) -> "WebRendererInstance":
|
||
instance = cls()
|
||
instance.playwright = await async_playwright().start()
|
||
instance.browser = await instance.playwright.chromium.launch(headless=True)
|
||
instance.lock = asyncio.Lock()
|
||
return instance
|
||
|
||
async def render(self, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
|
||
'''
|
||
访问指定URL并返回截图
|
||
|
||
:param url: 目标URL
|
||
:param target: 渲染目标,如 ".box"、"#main" 等CSS选择器
|
||
:param timeout: 页面加载超时时间,单位秒
|
||
:param index: 如果目标是一个列表,指定要截图的元素索引
|
||
:param params: URL键值对参数
|
||
:param other_function: 其他自定义操作函数,接受page参数
|
||
:return: 截图的字节数据
|
||
|
||
'''
|
||
async with self.lock:
|
||
context = await self.browser.new_context()
|
||
page = await context.new_page()
|
||
logger.debug(f"Navigating to {url} with timeout {timeout}")
|
||
try:
|
||
url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "")
|
||
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
|
||
logger.debug(f"Page loaded successfully")
|
||
# 等待目标元素出现
|
||
await page.wait_for_selector(target, timeout=timeout * 1000)
|
||
logger.debug(f"Target element '{target}' found, taking screenshot")
|
||
if other_function:
|
||
await other_function(page)
|
||
elements = await page.query_selector_all(target)
|
||
if not elements:
|
||
raise Exception(f"Target element '{target}' not found on the page.")
|
||
if index >= len(elements):
|
||
raise Exception(f"Index {index} out of range for elements matching '{target}'.")
|
||
element = elements[index]
|
||
screenshot = await element.screenshot()
|
||
logger.debug(f"Screenshot taken successfully")
|
||
return screenshot
|
||
finally:
|
||
await page.close()
|
||
await context.close()
|
||
|
||
async def close(self):
|
||
await self.browser.close()
|
||
await self.playwright.stop()
|