import asyncio import io import struct from loguru import logger import numpy as np import skia from PIL import Image def _pack_uniforms(uniforms_dict, width, height, time_val): """ 根据常见的教学用 uniform 布局打包字节数据 假设 SkSL 中 uniform 顺序为: float u_time; float2 u_resolution; 内存布局: [u_time(4B)][u_res_x(4B)][u_res_y(4B)] (总共 12 字节,无填充) 注意:为匹配 skia.RuntimeEffect 的紧凑布局,已移除 float 和 float2 之间的 4 字节填充。 """ # u_time (float) - 4 bytes time_bytes = struct.pack('f', time_val) # u_resolution (vec2/float2) - 8 bytes res_bytes = struct.pack('ff', float(width), float(height)) # 移除填充字节,使用紧凑布局 return time_bytes + res_bytes def _render_sksl_shader_to_gif( sksl_code: str, width: int = 256, height: int = 256, duration: float = 2.0, fps: float = 15, ) -> io.BytesIO: """ 渲染 SkSL 着色器动画为 GIF(适配 skia-python >= 138) """ logger.info(f"开始编译\n{sksl_code}") runtime_effect = skia.RuntimeEffect.MakeForShader(sksl_code) if runtime_effect is None: # SkSL 编译失败时,尝试获取错误信息(如果 skia 版本支持) error_message = "" # 注意:skia-python 的错误信息捕获可能因版本而异 # 尝试检查编译错误是否在日志中,但最直接的是抛出已知错误 raise ValueError("SkSL 编译失败,请检查语法") # --- 修复: 移除对不存在的 uniformSize() 的调用,直接使用硬编码的 12 字节尺寸 --- # float (4 bytes) + float2 (8 bytes) = 12 bytes (基于 _pack_uniforms 函数的紧凑布局) EXPECTED_UNIFORM_SIZE = 12 # 创建 CPU 后端 Surface surface = skia.Surface(width, height) frames = [] total_frames = int(duration * fps) for frame in range(total_frames): time_val = frame / fps # 打包 uniform 数据 uniform_bytes = _pack_uniforms(None, width, height, time_val) # [检查] 确保打包后的字节数与期望值匹配 if len(uniform_bytes) != EXPECTED_UNIFORM_SIZE: raise ValueError( f"Uniform 数据大小不匹配!期望 {EXPECTED_UNIFORM_SIZE} 字节,实际 {len(uniform_bytes)} 字节。请检查 _pack_uniforms 函数。" ) uniform_data = skia.Data.MakeWithCopy(uniform_bytes) # 创建着色器 try: # makeShader 的参数: uniform_data, children_shaders, child_count shader = runtime_effect.makeShader(uniform_data, None, 0) if shader is None: # 如果 SkSL 语法正确但 uniform 匹配失败,makeShader 会返回 None raise RuntimeError("着色器创建返回 None!请检查 SkSL 语法和 uniform 数据匹配。") except Exception as e: raise ValueError(f"着色器创建失败: {e}") # 绘制 canvas = surface.getCanvas() canvas.clear(skia.Color(0, 0, 0, 255)) paint = skia.Paint() paint.setShader(shader) canvas.drawRect(skia.Rect.MakeWH(width, height), paint) # --- 修复 peekPixels() 错误:改用 readPixels() 将数据复制到缓冲区 --- image = surface.makeImageSnapshot() # 1. 准备目标 ImageInfo (通常是 kBGRA_8888_ColorType) target_info = skia.ImageInfo.Make( image.width(), image.height(), skia.ColorType.kBGRA_8888_ColorType, # 目标格式,匹配 Skia 常见的输出格式 skia.AlphaType.kPremul_AlphaType ) # 2. 创建一个用于接收像素数据的 bytearray 缓冲区 pixel_data = bytearray(image.width() * image.height() * 4) # 4 bytes per pixel (BGRA) # 3. 将图像数据复制到缓冲区 success = image.readPixels(target_info, pixel_data, target_info.minRowBytes()) if not success: raise RuntimeError("无法通过 readPixels() 获取图像像素") # 4. 转换 bytearray 到 NumPy 数组 (BGRA 顺序) img_array = np.frombuffer(pixel_data, dtype=np.uint8).reshape((height, width, 4)) # 5. BGRA 转换成 RGB 顺序 (交换 R 和 B 通道) # [B, G, R, A] -> [R, G, B] (丢弃 A 通道) rgb_array = img_array[:, :, [2, 1, 0]] # 6. 创建 PIL Image pil_img = Image.fromarray(rgb_array) frames.append(pil_img) # ------------------------------------------------------------------ # 生成 GIF gif_buffer = io.BytesIO() # 计算每帧的毫秒延迟 frame_duration_ms = int(1000 / fps) frames[0].save( gif_buffer, format='GIF', save_all=True, append_images=frames[1:], duration=frame_duration_ms, loop=0, # 0 表示无限循环 optimize=True ) gif_buffer.seek(0) return gif_buffer async def render_sksl_shader_to_gif( sksl_code: str, width: int = 256, height: int = 256, duration: float = 2.0, fps: float = 15, ) -> io.BytesIO: return await asyncio.to_thread( _render_sksl_shader_to_gif, sksl_code, width, height, duration, fps, )