Compare commits

..

29 Commits

Author SHA1 Message Date
6b152235cf Merge pull request '新增 marchtoy' (#70) from bkbkzzzz/konabot:marchtoy_gl into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #70
Reviewed-by: 钟晓帕 <Passthem183@gmail.com>
2026-04-27 02:42:16 +08:00
b4f167e5f6 regex fix 2026-04-27 02:07:01 +08:00
b720504e48 bug fixes 2026-04-27 01:26:42 +08:00
5d93af0666 补充 manual 2026-04-27 00:24:46 +08:00
24e59a7f52 more builtin colors 2026-04-27 00:06:55 +08:00
197535cd34 garbage collection 2026-04-27 00:02:45 +08:00
c3c22e7145 丰富了基本图形 2026-04-25 16:09:47 +08:00
6a68db70f5 fixed args 2026-04-25 15:45:41 +08:00
3f3a375dd6 column major 2026-04-25 14:58:50 +08:00
facd2d0e84 再见了,所有的skia 2026-04-25 14:41:43 +08:00
cc97ca5493 尝试 gl backend 2026-04-25 14:31:23 +08:00
81e0c05686 文件夹结构 2026-04-25 01:59:29 +08:00
3bf0735af4 marchtoy 初步 2026-04-25 01:13:32 +08:00
b73d020b67 不小心多commit了个png文件 2026-04-25 00:58:50 +08:00
0eb957ba87 写通了,存个档 2026-04-25 00:58:20 +08:00
2c1709a77d Raymarch 初步 2026-04-25 00:26:05 +08:00
87e5029be0 Merge pull request 'Enhancement/在应用层而非镜像构建层下载 Typst 构建产物' (#60) from enhancement/typst-binary into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #60
2026-04-23 21:50:15 +08:00
0ba51bc9b2 修复 rollback 失效问题
All checks were successful
continuous-integration/drone/push Build is passing
2026-04-18 10:53:53 +08:00
27670920f6 vrsay
All checks were successful
continuous-integration/drone/push Build is passing
2026-04-11 23:24:07 +08:00
e0268ec86b Merge pull request 'feat(fx): 添加像素排序 (Pixel Sort) 滤镜' (#67) from pi-agent/konabot:feature/pixel-sort into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #67
2026-04-08 15:52:01 +08:00
575cd43538 docs: 补充像素排序滤镜的 man 文档 2026-04-08 15:50:31 +08:00
cd010afc24 feat(fx): 添加像素排序 (Pixel Sort) 滤镜
- 新增 '像素排序' 滤镜,实现类似 Photoshop Pixel Sort 效果
- 支持水平/垂直方向排序
- 支持多种排序依据:亮度、色相、红/绿/蓝通道
- 支持自动阈值计算(使用图像亮度中位数)
- 支持自定义遮罩阈值
- 支持反向排序
- 支持块大小参数
- 添加相关单元测试
2026-04-08 14:20:46 +08:00
c2161635a8 Merge pull request 'fix: use Query to properly handle --pingpong flag' (#65) from pi-agent/konabot:fix/giftool-pingpong-bool into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #65
2026-04-02 20:47:34 +08:00
f21b7067df fix: use Query to properly handle --pingpong flag
Use nonebot_plugin_alconna Query to correctly handle the --pingpong
boolean flag. Previously the flag wasn't working because the
parameter wasn't being properly injected.

Changes:
- Import Query from nonebot_plugin_alconna
- Change Option to not have args (just Option('--pingpong'))
- Use Query[bool] type annotation with default Query('pingpong')
- Check pingpong.available to determine if flag was set
2026-04-02 20:34:23 +08:00
157236d9a6 Merge branch 'master' into enhancement/typst-binary 2026-03-21 20:13:34 +08:00
8725f28caf 更改文件 2026-03-19 00:08:49 +08:00
fef9041a97 在本地开发环境保证 typst 二进制存在 2026-03-18 17:53:45 +08:00
6a2fe11753 不再直接在 Dockerfile 里面下载构建产物 2026-03-18 17:29:34 +08:00
97e87c7ec3 添加 Typst 的二进制文件下载 2026-03-18 17:26:36 +08:00
31 changed files with 2468 additions and 1117 deletions

3
.gitignore vendored
View File

@ -24,3 +24,6 @@ __pycache__
/.venv /.venv
/venv /venv
*.egg-info *.egg-info
# OS 相关
.DS_Store

View File

@ -1,16 +1,3 @@
FROM alpine:latest AS artifacts
RUN apk add --no-cache curl xz
WORKDIR /tmp
RUN mkdir -p /artifacts
RUN curl -L -o typst.tar.xz "https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz" \
&& tar -xJf typst.tar.xz \
&& mv typst-x86_64-unknown-linux-musl/typst /artifacts
RUN chmod -R +x /artifacts/
FROM python:3.13-slim AS base FROM python:3.13-slim AS base
ENV VIRTUAL_ENV=/app/.venv \ ENV VIRTUAL_ENV=/app/.venv \
@ -51,7 +38,6 @@ RUN uv sync --no-install-project
FROM base AS runtime FROM base AS runtime
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV} COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
COPY --from=artifacts /artifacts/ /usr/local/bin/
WORKDIR /app WORKDIR /app

BIN
assets/img/meme/vr.jpg Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

View File

@ -1,9 +1,10 @@
import asyncio import asyncio
from typing import Any, Awaitable, Callable
import aiohttp import aiohttp
import hashlib import hashlib
import platform import platform
from dataclasses import dataclass from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
import nonebot import nonebot
@ -14,6 +15,8 @@ from pydantic import BaseModel
@dataclass @dataclass
class ArtifactDepends: class ArtifactDepends:
_Callback = Callable[[bool], Awaitable[Any]]
url: str url: str
sha256: str sha256: str
target: Path target: Path
@ -27,6 +30,9 @@ class ArtifactDepends:
use_proxy: bool = True use_proxy: bool = True
"网络问题,赫赫;使用的是 Discord 模块配置的 proxy" "网络问题,赫赫;使用的是 Discord 模块配置的 proxy"
callbacks: list[_Callback] = field(default_factory=list)
"在任务完成以后,应该做的事情"
def is_corresponding_platform(self) -> bool: def is_corresponding_platform(self) -> bool:
if self.required_os is not None: if self.required_os is not None:
if self.required_os.lower() != platform.system().lower(): if self.required_os.lower() != platform.system().lower():
@ -36,26 +42,43 @@ class ArtifactDepends:
return False return False
return True return True
def on_finished(self, task: _Callback) -> _Callback:
self.callbacks.append(task)
return task
async def _finished(self, downloaded: bool) -> list[Any | BaseException]:
tasks = set()
for f in self.callbacks:
tasks.add(f(downloaded))
return await asyncio.gather(*tasks, return_exceptions=True)
class Config(BaseModel): class Config(BaseModel):
prefetch_artifact: bool = False prefetch_artifact: bool = False
"是否提前下载好二进制依赖" "是否提前下载好二进制依赖"
artifact_list = [] artifact_list: list[ArtifactDepends] = []
driver = nonebot.get_driver() driver = nonebot.get_driver()
config = nonebot.get_plugin_config(Config) config = nonebot.get_plugin_config(Config)
@driver.on_startup @driver.on_startup
async def _(): async def _():
if config.prefetch_artifact: if config.prefetch_artifact:
logger.info("启动检测中:正在检测需求的二进制是否下载") logger.info("启动检测中:正在检测需求的二进制是否下载")
semaphore = asyncio.Semaphore(10) semaphore = asyncio.Semaphore(10)
async def _task(artifact: ArtifactDepends): async def _task(artifact: ArtifactDepends):
async with semaphore: async with semaphore:
await ensure_artifact(artifact) downloaded = await ensure_artifact(artifact)
result = await artifact._finished(downloaded)
for r in result:
if isinstance(r, BaseException):
logger.warning("完成了二进制文件的下载,但是有未捕捉的错误")
logger.exception(r)
tasks: set[asyncio.Task] = set() tasks: set[asyncio.Task] = set()
for a in artifact_list: for a in artifact_list:
@ -78,35 +101,43 @@ async def download_artifact(artifact: ArtifactDepends):
async with aiohttp.ClientSession(proxy=proxy) as client: async with aiohttp.ClientSession(proxy=proxy) as client:
result = await client.get(artifact.url) result = await client.get(artifact.url)
if result.status != 200: if result.status != 200:
logger.warning(f"已经下载了二进制,但是注意服务器没有返回 200 URL={artifact.url} TARGET={artifact.target} CODE={result.status}") logger.warning(
f"已经下载了二进制,但是注意服务器没有返回 200 URL={artifact.url} TARGET={artifact.target} CODE={result.status}"
)
data = await result.read() data = await result.read()
artifact.target.write_bytes(data) artifact.target.write_bytes(data)
if not platform.system().lower() == 'windows': if not platform.system().lower() == "windows":
artifact.target.chmod(0o755) artifact.target.chmod(0o755)
logger.info(f"下载好了 TARGET={artifact.target} URL={artifact.url}") logger.info(f"下载好了 TARGET={artifact.target} URL={artifact.url}")
m = hashlib.sha256(artifact.target.read_bytes()) m = hashlib.sha256(artifact.target.read_bytes())
if m.hexdigest().lower() != artifact.sha256.lower(): if m.hexdigest().lower() != artifact.sha256.lower():
logger.warning(f"下载到的二进制的 sha256 与需求不同 TARGET={artifact.target} REQUESTED={artifact.sha256} ACTUAL={m.hexdigest()}") logger.warning(
f"下载到的二进制的 sha256 与需求不同 TARGET={artifact.target} REQUESTED={artifact.sha256} ACTUAL={m.hexdigest()}"
)
async def ensure_artifact(artifact: ArtifactDepends): async def ensure_artifact(artifact: ArtifactDepends) -> bool:
if not artifact.is_corresponding_platform(): if not artifact.is_corresponding_platform():
return return False
if not artifact.target.exists(): if not artifact.target.exists():
logger.info(f"二进制依赖 {artifact.target} 不存在") logger.info(f"二进制依赖 {artifact.target} 不存在")
if not artifact.target.parent.exists(): if not artifact.target.parent.exists():
artifact.target.parent.mkdir(parents=True, exist_ok=True) artifact.target.parent.mkdir(parents=True, exist_ok=True)
await download_artifact(artifact) await download_artifact(artifact)
return True
else: else:
m = hashlib.sha256(artifact.target.read_bytes()) m = hashlib.sha256(artifact.target.read_bytes())
if m.hexdigest().lower() != artifact.sha256.lower(): if m.hexdigest().lower() != artifact.sha256.lower():
logger.info(f"二进制依赖 {artifact.target} 的哈希无法对应需求的哈希,准备重新下载") logger.info(
f"二进制依赖 {artifact.target} 的哈希无法对应需求的哈希,准备重新下载"
)
artifact.target.unlink() artifact.target.unlink()
await download_artifact(artifact) await download_artifact(artifact)
return True
return False
def register_artifacts(*artifacts: ArtifactDepends): def register_artifacts(*artifacts: ArtifactDepends):
artifact_list.extend(artifacts) artifact_list.extend(artifacts)

View File

@ -131,7 +131,7 @@ class DatabaseManager:
await conn.execute(command, params or ()) await conn.execute(command, params or ())
await conn.commit() await conn.commit()
except Exception as e: except Exception as e:
# 记录错误但重新抛出,让调用者处理 await conn.rollback()
raise Exception(f"数据库执行失败: {str(e)}") from e raise Exception(f"数据库执行失败: {str(e)}") from e
finally: finally:
await self._return_connection(conn) await self._return_connection(conn)
@ -143,7 +143,7 @@ class DatabaseManager:
await conn.executescript(script) await conn.executescript(script)
await conn.commit() await conn.commit()
except Exception as e: except Exception as e:
# 记录错误但重新抛出,让调用者处理 await conn.rollback()
raise Exception(f"数据库脚本执行失败: {str(e)}") from e raise Exception(f"数据库脚本执行失败: {str(e)}") from e
finally: finally:
await self._return_connection(conn) await self._return_connection(conn)
@ -197,7 +197,7 @@ class DatabaseManager:
await conn.executemany(command, seq_of_params) await conn.executemany(command, seq_of_params)
await conn.commit() await conn.commit()
except Exception as e: except Exception as e:
# 记录错误但重新抛出,让调用者处理 await conn.rollback()
raise Exception(f"数据库批量执行失败: {str(e)}") from e raise Exception(f"数据库批量执行失败: {str(e)}") from e
finally: finally:
await self._return_connection(conn) await self._return_connection(conn)

View File

@ -52,7 +52,11 @@ async def get_current_version(conn: aiosqlite.Connection) -> int:
if count[0] < 1: if count[0] < 1:
logger.info("权限系统数据表不存在,现在创建表") logger.info("权限系统数据表不存在,现在创建表")
await conn.executescript(SQL_CREATE_TABLE) await conn.executescript(SQL_CREATE_TABLE)
await conn.commit() try:
await conn.commit()
except Exception:
await conn.rollback()
raise
return 0 return 0
cursor = await conn.execute(SQL_GET_MIGRATE_VERSION) cursor = await conn.execute(SQL_GET_MIGRATE_VERSION)
row = await cursor.fetchone() row = await cursor.fetchone()
@ -72,10 +76,18 @@ async def execute_migration(
await conn.executescript(migration.get_upgrade_script()) await conn.executescript(migration.get_upgrade_script())
now_version += 1 now_version += 1
await conn.execute(SQL_UPDATE_VERSION, (now_version,)) await conn.execute(SQL_UPDATE_VERSION, (now_version,))
await conn.commit() try:
await conn.commit()
except Exception:
await conn.rollback()
raise
while now_version > version: while now_version > version:
migration = migrations[now_version - 1] migration = migrations[now_version - 1]
await conn.executescript(migration.get_downgrade_script()) await conn.executescript(migration.get_downgrade_script())
now_version -= 1 now_version -= 1
await conn.execute(SQL_UPDATE_VERSION, (now_version,)) await conn.execute(SQL_UPDATE_VERSION, (now_version,))
await conn.commit() try:
await conn.commit()
except Exception:
await conn.rollback()
raise

View File

@ -43,11 +43,15 @@ class PermRepo:
Raises: Raises:
AssertionError: 如果创建后无法获取实体 ID。 AssertionError: 如果创建后无法获取实体 ID。
""" """
await self.conn.execute( try:
s("create_entity.sql"), await self.conn.execute(
(entity.platform, entity.entity_type, entity.external_id), s("create_entity.sql"),
) (entity.platform, entity.entity_type, entity.external_id),
await self.conn.commit() )
await self.conn.commit()
except Exception:
await self.conn.rollback()
raise
eid = await self._get_entity_id_or_none(entity) eid = await self._get_entity_id_or_none(entity)
assert eid is not None assert eid is not None
return eid return eid
@ -115,8 +119,12 @@ class PermRepo:
value: 要设置的配置值True/False/None value: 要设置的配置值True/False/None
""" """
eid = await self.get_entity_id(entity) eid = await self.get_entity_id(entity)
await self.conn.execute(s("update_perm_info.sql"), (eid, config_key, value)) try:
await self.conn.commit() await self.conn.execute(s("update_perm_info.sql"), (eid, config_key, value))
await self.conn.commit()
except Exception:
await self.conn.rollback()
raise
async def get_entity_id_batch( async def get_entity_id_batch(
self, entities: list[PermEntity] self, entities: list[PermEntity]
@ -135,11 +143,15 @@ class PermRepo:
# s("create_entity.sql"), # s("create_entity.sql"),
# (entity.platform, entity.entity_type, entity.external_id), # (entity.platform, entity.entity_type, entity.external_id),
# ) # )
await self.conn.executemany( try:
s("create_entity.sql"), await self.conn.executemany(
[(e.platform, e.entity_type, e.external_id) for e in entities], s("create_entity.sql"),
) [(e.platform, e.entity_type, e.external_id) for e in entities],
await self.conn.commit() )
await self.conn.commit()
except Exception:
await self.conn.rollback()
raise
val_placeholders = ", ".join(["(?, ?, ?)"] * len(entities)) val_placeholders = ", ".join(["(?, ?, ?)"] * len(entities))
params = [] params = []
for e in entities: for e in entities:

View File

@ -79,6 +79,14 @@ fx [滤镜名称] <参数1> <参数2> ...
* ```fx JPEG损坏 <质量=10>``` * ```fx JPEG损坏 <质量=10>```
* 质量范围建议为 1~95数值越低压缩痕迹越重、效果越搞笑。 * 质量范围建议为 1~95数值越低压缩痕迹越重、效果越搞笑。
* ```fx 动图 <帧率=10>``` * ```fx 动图 <帧率=10>```
* ```fx 像素排序 <方向=horizontal> <阈值=0> <自动阈值=true> <排序依据=brightness> <遮罩阈值=128> <反向=false> <块大小=1>```
* 对像素按指定属性进行排序,效果类似 Photoshop/GIMP Pixel Sort。
* **方向**horizontal水平/ vertical垂直
* **排序依据**brightness亮度/ hue色相/ red / green / blue
* **自动阈值**true 时使用图像亮度中位数作为遮罩阈值
* **遮罩阈值**:决定哪些像素参与排序(亮度 >= 阈值)
* **反向**true 时从亮到暗排序
* **块大小**:每 N 行/列作为一个整体排序单位
### 多图像处理器 ### 多图像处理器
* ```fx 存入图像 <目标名称>``` * ```fx 存入图像 <目标名称>```

View File

@ -0,0 +1,22 @@
# 指令介绍
简易 Raymarch 小玩具
用法march <scene>
march sphere(1).color(red) box(0.5, 2.0, 0.5).pos(0, 0, 0) cam(0.5).pos(-5).lookat(0)
# 主要语法
<scene> ::= <scene> "." <op>
| <obj>
<obj> ::= <obj_ty>
| <obj_ty> "(" <args> ")"
<op> ::= <op_ty>
| <op_ty> "(" <args> ")"
<args> ::= <args> "," <arg>
| <arg>
其中 obj_ty、op_ty 分别为物体类型(如 cube、sphere、torus 等)与变换类型(如 pos、rot
# 特殊说明
<op_ty> 不包含 scale因为本工具基于 SDF 渲染,非正交的变换会破坏 SDF 的性质。

View File

@ -1354,6 +1354,140 @@ class ImageFilterImplement:
images.append(text_image) images.append(text_image)
return image return image
# Pixel Sort - 像素排序效果
@staticmethod
def apply_pixel_sort(
image: Image.Image,
direction: str = "horizontal",
threshold: float = 0.0,
auto_threshold: bool = True,
sort_by: str = "brightness",
mask_threshold: float = 128.0,
reverse: bool = False,
block_size: int = 1
) -> Image.Image:
"""
Pixel Sort 效果
参数:
image: 输入图像
direction: 排序方向,"horizontal"(水平) 或 "vertical"(垂直)
threshold: 亮度阈值 (0-255),低于此值的像素会被排序(仅在 auto_threshold=False 时生效)
auto_threshold: 是否自动计算阈值(使用图像中位数)
sort_by: 排序依据,"brightness"(亮度)、"hue"(色相)、"red""green""blue"
mask_threshold: 遮罩阈值 (0-255),决定哪些像素参与排序
reverse: 是否反向排序
block_size: 块大小,每 N 行/列作为一个整体排序单位
"""
if image.mode != 'RGBA':
image = image.convert('RGBA')
arr = np.array(image)
height, width = arr.shape[:2]
# 获取排序属性
def get_sort_value(pixel):
r, g, b = pixel[0], pixel[1], pixel[2]
if sort_by == "brightness":
return 0.299 * r + 0.587 * g + 0.114 * b
elif sort_by == "hue":
max_c = max(r, g, b)
min_c = min(r, g, b)
diff = max_c - min_c
if diff == 0:
return 0
if max_c == r:
return 60 * (((g - b) / diff) % 6)
elif max_c == g:
return 60 * ((b - r) / diff + 2)
else:
return 60 * ((r - g) / diff + 4)
elif sort_by == "red":
return r
elif sort_by == "green":
return g
elif sort_by == "blue":
return b
return 0.299 * r + 0.587 * g + 0.114 * b
# 自动计算阈值
if auto_threshold:
# 使用图像亮度中位数作为阈值
gray = np.array(image.convert('L'))
mask_threshold = float(np.median(gray))
# 创建遮罩:哪些像素需要排序
mask = np.zeros((height, width), dtype=bool)
for y in range(height):
for x in range(width):
brightness = 0.299 * arr[y, x, 0] + 0.587 * arr[y, x, 1] + 0.114 * arr[y, x, 2]
mask[y, x] = brightness >= mask_threshold
result = arr.copy()
if direction.lower() in ["horizontal", "h", "水平"]:
# 水平排序(逐行)
for y in range(height):
# 收集当前行中需要排序的像素
if block_size > 1:
# 按块处理
for block_start in range(0, width, block_size):
block_end = min(block_start + block_size, width)
pixels = []
indices = []
for x in range(block_start, block_end):
if mask[y, x]:
pixels.append(arr[y, x].copy())
indices.append(x)
if len(pixels) > 1:
# 按指定属性排序
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
for i, x in enumerate(indices):
result[y, x] = sorted_pixels[i]
else:
# 逐像素处理
pixels = []
indices = []
for x in range(width):
if mask[y, x]:
pixels.append(arr[y, x].copy())
indices.append(x)
if len(pixels) > 1:
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
for i, x in enumerate(indices):
result[y, x] = sorted_pixels[i]
elif direction.lower() in ["vertical", "v", "垂直"]:
# 垂直排序(逐列)
for x in range(width):
if block_size > 1:
# 按块处理
for block_start in range(0, height, block_size):
block_end = min(block_start + block_size, height)
pixels = []
indices = []
for y in range(block_start, block_end):
if mask[y, x]:
pixels.append(arr[y, x].copy())
indices.append(y)
if len(pixels) > 1:
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
for i, y in enumerate(indices):
result[y, x] = sorted_pixels[i]
else:
pixels = []
indices = []
for y in range(height):
if mask[y, x]:
pixels.append(arr[y, x].copy())
indices.append(y)
if len(pixels) > 1:
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
for i, y in enumerate(indices):
result[y, x] = sorted_pixels[i]
return Image.fromarray(result, 'RGBA')
class ImageFilterEmpty: class ImageFilterEmpty:

View File

@ -65,6 +65,8 @@ class ImageFilterManager:
"覆盖图像": ImageFilterImplement.apply_overlay, "覆盖图像": ImageFilterImplement.apply_overlay,
# 生成式 # 生成式
"覆加颜色": ImageFilterImplement.generate_solid, "覆加颜色": ImageFilterImplement.generate_solid,
# Pixel Sort
"像素排序": ImageFilterImplement.apply_pixel_sort,
} }
generate_filter_map = { generate_filter_map = {

View File

@ -6,7 +6,7 @@ import PIL
import PIL.Image import PIL.Image
import cv2 import cv2
import imageio.v3 as iio import imageio.v3 as iio
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, UniMessage, on_alconna
import numpy import numpy
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
@ -34,7 +34,7 @@ cmd_giftool = on_alconna(
Option("-t", Args["length", str]), Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]), Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]), Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
Option("--pingpong", default=False), Option("--pingpong"),
) )
) )
@ -47,7 +47,7 @@ async def _(
length: str | None = None, length: str | None = None,
speed_factor: float = 1.0, speed_factor: float = 1.0,
end_point: str | None = None, end_point: str | None = None,
pingpong: bool = False, pingpong: Query[bool] = Query("pingpong"),
): ):
ss: None | float = None ss: None | float = None
if start_point: if start_point:
@ -165,7 +165,7 @@ async def _(
rdur_ms = rdur_ms[::-1] rdur_ms = rdur_ms[::-1]
# 处理 pingpong 模式 # 处理 pingpong 模式
if pingpong: if pingpong.available:
# 复制一份反转的帧序列(去掉第一帧避免重复) # 复制一份反转的帧序列(去掉第一帧避免重复)
pingpong_frames = rframes[1:][::-1] if len(rframes) > 1 else rframes[::-1] pingpong_frames = rframes[1:][::-1] if len(rframes) > 1 else rframes[::-1]
pingpong_durations = rdur_ms[1:][::-1] if len(rdur_ms) > 1 else rdur_ms[::-1] pingpong_durations = rdur_ms[1:][::-1] if len(rdur_ms) > 1 else rdur_ms[::-1]

View File

@ -0,0 +1,19 @@
from nonebot import on_command
from nonebot.adapters import Message
from nonebot_plugin_alconna import UniMessage
from nonebot.params import CommandArg
import konabot.plugins.marchtoy.gl_render as render
import io
cmd_marchtoy = on_command("march")
@cmd_marchtoy.handle()
async def _(args: Message = CommandArg()):
if cmd := args.extract_plain_text():
try:
img = await render.render(cmd, (512, 512))
buffer = io.BytesIO()
# img.show()
img.save(buffer, format="PNG")
buffer.seek(0)
await cmd_marchtoy.send(await UniMessage().image(raw=buffer).export())
except Exception as e:
await cmd_marchtoy.send(await UniMessage.text(f"发生了错误: {e}").export())

View File

@ -0,0 +1,148 @@
"""
raymarch toy
usage: march <scene1> <scene2> ... <sceneN>
<scene> ::= <scene> "." <command>
| <command>
<command> ::= <id>
| <id> "(" <args> ")"
example:
march cube.pos(0, 0, 0).color(red) cam(1.0).pos(1, 1, 1).lookat(0, 0, 0)
"""
import pathlib
from dataclasses import dataclass
import re
import numpy as np
from dataclasses import dataclass
from typing import Optional
from konabot.plugins.marchtoy.obj import Object, Camera, OBJECT_ENTRIES
from konabot.plugins.marchtoy.op import OPERATION_ENTRIES
@dataclass
class Command:
id: str
args: list[str]
class CommandChainParser:
CHAIN_PATTERN = r"^[a-zA-Z]+(\([^(]*\))?(\.[a-zA-Z]+(\([^(]*\))?)*"
def __init__(self, _command_chain: str) -> None:
self.command_chain = _command_chain
def __iter__(self):
return self
def __next__(self):
if query := re.match(CommandChainParser.CHAIN_PATTERN, self.command_chain):
cmd_chain = query[0]
self.command_chain = self.command_chain[len(cmd_chain) + 1 :]
return cmd_chain
raise StopIteration
class CommandParser:
CMD_PATTERN = r"^[a-zA-Z]+(\([^(]*\))?"
ID_PATTERN = r"^[a-zA-Z]+(?=\(|\.|$)"
def __init__(self, _command: str) -> None:
self.command = _command
def __iter__(self):
return self
def __next__(self):
if query := re.match(CommandParser.CMD_PATTERN, self.command):
cmd = query[0]
if cmd_id_qry := re.match(CommandParser.ID_PATTERN, cmd):
cmd_id = cmd_id_qry[0]
self.command = self.command[len(cmd) + 1 :]
cmd_args = cmd[len(cmd_id) + 1 : -1].replace(" ", "").split(",")
while "" in cmd_args:
cmd_args.remove("")
return Command(cmd_id, cmd_args)
raise StopIteration
class Scene:
def __init__(self, _instruction: str) -> None:
self.canvas_objs: list[tuple[Object, str]] = []
self.camera: Camera = Camera()
for raw_cmd in CommandChainParser(_instruction):
cmd_queue = CommandParser(raw_cmd)
cmd_obj = next(cmd_queue)
obj_id, obj_args = cmd_obj.id, cmd_obj.args
obj_instance: Optional[Object] = None
if obj_id in OBJECT_ENTRIES:
obj_cls = OBJECT_ENTRIES[obj_id]
if not issubclass(obj_cls, Object):
raise Exception(f"{obj_id} is not a subclass of Object.")
obj_instance = obj_cls()
try:
if len(obj_args) != 0:
obj_instance.parse_args(obj_args)
except Exception as e:
raise Exception(
f"object {obj_id} failed to parse args passed in: {obj_args}"
) from e
else:
raise Exception(f"{obj_id} is not a valid object type.")
if obj_instance != None:
for cmd in cmd_queue:
op_id, op_args = cmd.id, cmd.args
if op_id in OPERATION_ENTRIES:
op_func = OPERATION_ENTRIES[op_id]
if not callable(op_func):
raise Exception(f"{op_id} is not a valid operation.")
op_func(obj_instance, op_args)
else:
raise Exception(f"{op_id} is not a valid operation.")
try:
sdf_block = obj_instance.sdf_block()
self.canvas_objs.append((obj_instance, sdf_block))
except:
if type(obj_instance) == Camera:
self.camera = obj_instance
def __str__(self) -> str:
return ", ".join([str(type(obj)) for obj in self.canvas_objs])
def compile(self, fs_src: str = "frag.glsl") -> str:
PATH = pathlib.Path(__file__).parent / "shaders" / fs_src
with PATH.open(encoding="utf-8") as f:
content = f.read()
sdf_block: str = ""
color_block: str = ""
index = 0
for canvas_item in self.canvas_objs:
obj, sdf_expr = canvas_item
sdf_block += f"float sd{index} = {sdf_expr};"
sdf_block += f"if(sd{index} < qry.value)"
sdf_block += "{" + f"qry.value = sd{index}; qry.obj_id = {index}; " + "}\n"
color = obj.texture.color
color_block += (
f"if(obj_id == {index}) return float4("
f"{color[0]}, {color[1]}, {color[2]}, {color[3]});\n"
)
index += 1
content = content.replace("<SDF_BLOCK>", sdf_block)
content = content.replace("<COLOR_BLOCK>", color_block)
cam_pos = self.camera.transform.t[0:3, 3]
cam_focus = self.camera.focus
cam_dir = self.camera.transform.t @ np.array((0.0, 0.0, -1.0, 0.0))
content = content.replace(
"<CAM_POS>", f"{cam_pos[0]}, {cam_pos[1]}, {cam_pos[2]}"
)
content = content.replace("<CAM_FOCUS>", str(cam_focus))
content = content.replace(
"<CAM_DIR>", f"{cam_dir[0]}, {cam_dir[1]}, {cam_dir[2]}"
)
return content

View File

@ -0,0 +1,35 @@
"""
headless moderngl
"""
import pathlib
import moderngl
import numpy as np
from PIL import Image
from konabot.plugins.marchtoy.command import Scene
async def render(command: str, res: tuple[int, int]):
fs = Scene(command).compile()
PATH = pathlib.Path(__file__).parent / "shaders"
with (PATH / "vert.glsl").open(encoding='utf-8') as f:
vs = f.read()
ctx = moderngl.create_context(standalone=True)
ctx.gc_mode = "auto"
try:
program = ctx.program(
vertex_shader=vs,
fragment_shader=fs
)
except Exception as e:
raise Exception(f"cannot compile glsl: {e}") from e
uniform = program['u_resolution']
uniform.write(np.array(res, dtype=np.float32))
vertices = np.array([-1.0, -1.0, -1.0, 1.0, 1.0, -1.0, 1.0, 1.0], dtype=np.float32)
indices = np.array([0, 1, 2, 1, 2, 3], dtype=np.int32)
ibo = ctx.buffer(indices)
vbo = ctx.buffer(vertices)
vao = ctx.vertex_array(program, vbo, 'in_position', index_buffer = ibo)
fbo = ctx.simple_framebuffer(res)
fbo.use()
fbo.clear(0.0, 0.0, 0.0, 0.0)
vao.render(moderngl.TRIANGLES)
return Image.frombytes('RGBA', fbo.size, fbo.read(components=4), 'raw', 'RGBA', 0, -1)

View File

@ -0,0 +1,174 @@
import numpy as np
from konabot.plugins.marchtoy.texture import Texture
from konabot.plugins.marchtoy.utilities import ArgParser, Formatter
OBJECT_ENTRIES = {}
def make_obj(*name: str):
def decorator(cls):
# OBJECT_ENTRIES[name] = cls
for alias in [*name]:
OBJECT_ENTRIES[alias] = cls
return cls
return decorator
class Transform:
def __init__(self) -> None:
self.t: np.ndarray = np.identity(4, dtype=np.float32)
@staticmethod
def normalize(p: np.ndarray) -> np.ndarray:
return p / (np.sqrt(np.dot(p, p)) + 1e-8)
def translate(self, x: float, y: float, z: float):
mat = np.identity(4, dtype=np.float32)
mat[0:3, 3] = [x, y, z]
self.t = mat @ self.t
return self
# scale 会破坏 sdf 的性质 梯度大小会变 导致 overshoot 等问题
def scale(self, x: float, y: float, z: float):
mat = np.identity(4, dtype=np.float32)
mat[0, 0], mat[1, 1], mat[2, 2] = x, y, z
self.t = mat @ self.t
return self
def rotate(self, x: float, y: float, z: float):
cx, sx = np.cos(x), np.sin(x)
cy, sy = np.cos(y), np.sin(y)
cz, sz = np.cos(z), np.sin(z)
mat = np.identity(4, dtype=np.float32)
mat[0, 0] = cy * cz
mat[0, 1] = sx * sy * cz - cx * sz
mat[0, 2] = cx * sy * cz + sx * sz
mat[1, 0] = cy * sz
mat[1, 1] = sx * sy * sz + cx * cz
mat[1, 2] = cx * sy * sz - sx * cz
mat[2, 0] = -sy
mat[2, 1] = sx * cy
mat[2, 2] = cx * cy
self.t = mat @ self.t
return self
def lookat(
self, x: float, y: float, z: float, up: np.ndarray = np.array([0.0, 1.0, 0.0])
):
p = self.t[0:3, 3]
q = np.array([x, y, z])
zaxis = Transform.normalize(q - p)
xaxis = Transform.normalize(np.cross(zaxis, up))
yaxis = Transform.normalize(np.cross(xaxis, zaxis))
self.t[:3, 0] = xaxis
self.t[:3, 1] = yaxis
self.t[:3, 2] = -zaxis
return self
def p_expr(self) -> str:
inv = np.linalg.inv(self.t) # + 1e-5 * np.identity(4, dtype=np.float32))
return f"({Formatter.float4(inv)} * vec4(p, 1.0)).xyz"
class Object:
def __init__(self) -> None:
self.transform: Transform = Transform()
self.texture: Texture = Texture()
def parse_args(self, args: list[str]):
raise NotImplementedError
def sdf_block(self) -> str:
raise NotImplementedError
@make_obj("cube", "box")
class Cube(Object):
def __init__(self, _size: np.ndarray = np.array([1.0, 1.0, 1.0])) -> None:
super().__init__()
self.size = _size
def parse_args(self, args: list[str]):
self.size = ArgParser.as_vec3(args)
def sdf_block(self) -> str:
return f"sdCube({self.transform.p_expr()}, vec3({self.size[0]}, {self.size[1]}, {self.size[2]}))"
@make_obj("sphere", "ball")
class Sphere(Object):
def __init__(self, _radius: float = 1.0) -> None:
super().__init__()
self.radius = _radius
def parse_args(self, args: list[str]):
self.radius = ArgParser.as_float(args)
def sdf_block(self) -> str:
return f"sdSphere({self.transform.p_expr()}, {self.radius})"
@make_obj("cylinder", "cyl")
class Cylinder(Object):
def __init__(self, _radius: float = 1.0, _height: float = 1.0) -> None:
super().__init__()
self.radius = _radius
self.height = _height
def parse_args(self, args: list[str]):
param = ArgParser.as_vec2(args)
self.radius = param[0]
self.height = param[1]
def sdf_block(self) -> str:
return (
f"sdCappedCylinder({self.transform.p_expr()}, {self.radius}, {self.height})"
)
@make_obj("torus")
class Torus(Object):
def __init__(self, _r1: float = 1.0, _r2: float = 0.4) -> None:
super().__init__()
self.r1 = _r1
self.r2 = _r2
def parse_args(self, args: list[str]):
param = ArgParser.as_vec2(args)
self.r1 = param[0]
self.r2 = param[1]
def sdf_block(self) -> str:
return f"sdTorus({self.transform.p_expr()}, vec2({self.r1}, {self.r2}))"
@make_obj("capsule", "pill")
class Capsule(Object):
def __init__(self, _h: float = 1.0, _r: float = 0.25) -> None:
super().__init__()
self._h = _h
self._r = _r
def parse_args(self, args: list[str]):
param = ArgParser.as_vec2(args)
self._h = param[0]
self._r = param[1]
def sdf_block(self) -> str:
return f"sdVerticalCapsule({self.transform.p_expr()}, {self._h}, {self._r})"
@make_obj("camera", "cam")
class Camera(Object):
def __init__(self, _focus: float = 1.0) -> None:
super().__init__()
self.focus = _focus
self.transform.translate(5.0, 5.0, 5.0).lookat(0.0, 0.0, 0.0)
def parse_args(self, args: list[str]):
self.focus = ArgParser.as_float(args)

View File

@ -0,0 +1,50 @@
from konabot.plugins.marchtoy.obj import Object
from konabot.plugins.marchtoy.utilities import ArgParser
OPERATION_ENTRIES = {}
def make_operation(*name: str):
def decorator(op):
# OPERATION_ENTRIES[name] = op
for alias in [*name]:
OPERATION_ENTRIES[alias] = op
return op
return decorator
@make_operation("pos", "translate", "position", "p")
def translate(obj: Object, args: list[str]):
pos = ArgParser.as_vec3(args)
obj.transform.translate(pos[0], pos[1], pos[2])
@make_operation("rot", "rotate", "r")
def rotate(obj: Object, args: list[str]):
pos = ArgParser.as_vec3(args)
obj.transform.rotate(pos[0], pos[1], pos[2])
@make_operation("lookat", "look", "l")
def lookat(obj: Object, args: list[str]):
pos = ArgParser.as_vec3(args)
obj.transform.lookat(pos[0], pos[1], pos[2])
@make_operation("color", "col", "texture")
def color(obj: Object, args: list[str]):
try:
if len(args) == 1:
col = ArgParser.as_literal_color(args)
obj.texture.color = (col[0], col[1], col[2], col[3])
elif len(args) == 3:
col = ArgParser.as_vec3(args)
obj.texture.color = (col[0], col[1], col[2], 1.0)
elif len(args) == 4:
col = ArgParser.as_vec4(args)
obj.texture.color = (col[0], col[1], col[2], col[3])
else:
raise Exception("unknown color")
except:
raise Exception("unknown color")

View File

@ -0,0 +1,99 @@
#version 330
// compatibility
#define float4x4 mat4x4
#define float4 vec4
#define float3 vec3
#define float2 vec2
const float EPS = 0.001;
const int MAX_ITER = 128;
uniform vec2 u_resolution;
out vec4 fragColor;
struct sdQuery {
float value;
int obj_id;
};
float sdCube(vec3 p, vec3 b) {
p = abs(p) - b;
return length(max(p, 0.0)) + min(max(p.x, max(p.y, p.z)), 0.0);
}
float sdSphere(vec3 p, float r) {
return length(p) - r;
}
float sdCappedCylinder( vec3 p, float r, float h )
{
vec2 d = abs(vec2(length(p.xz),p.y)) - vec2(r,h);
return min(max(d.x,d.y),0.0) + length(max(d,0.0));
}
float sdVerticalCapsule( vec3 p, float h, float r )
{
p.y -= clamp( p.y, 0.0, h );
return length( p ) - r;
}
float sdTorus( vec3 p, vec2 t )
{
vec2 q = vec2(length(p.xz)-t.x,p.y);
return length(q)-t.y;
}
sdQuery sd(vec3 p) {
sdQuery qry;
qry.value = 100000000.0;
qry.obj_id = -1;
<SDF_BLOCK>
return qry;
}
vec3 nrm(vec3 p) {
vec2 d = vec2(EPS, 0.0);
return normalize(vec3(
sd(p + d.xyy).value - sd(p - d.xyy).value,
sd(p + d.yxy).value - sd(p - d.yxy).value,
sd(p + d.yyx).value - sd(p - d.yyx).value
));
}
vec4 materialColor(int obj_id) {
<COLOR_BLOCK>
return vec4(1.0);
}
vec4 color(vec3 p, int obj_id) {
vec3 normal = nrm(p);
vec3 light_dir = normalize(vec3(0.5, 0.8, -0.6));
float light = 0.2 + 0.8 * max(dot(normal, light_dir), 0.0);
vec4 base = materialColor(obj_id);
return vec4(base.rgb * light, base.a);
}
vec4 march(vec3 p, vec3 r) {
sdQuery qry;
vec4 col = vec4(0.0);
for(int i = 0; i < MAX_ITER; ++i) {
qry = sd(p);
if(qry.value < EPS){
col = color(p, qry.obj_id);
break;
}
p += qry.value * r;
}
return col;
}
void main() {
vec2 uv = 2. * (gl_FragCoord.xy / u_resolution - .5) * vec2(u_resolution.x / u_resolution.y, 1.);
vec3 c_p = vec3(<CAM_POS>);
vec3 c_z = normalize(vec3(<CAM_DIR>));
vec3 world_up = abs(c_z.y) > 0.999 ? vec3(0., 0., 1.) : vec3(0., 1., 0.);
vec3 c_x = normalize(cross(c_z, world_up));
vec3 c_y = normalize(cross(c_x, c_z));
mat3 view = mat3(c_x, c_y, c_z);
vec3 r = normalize(vec3(uv, <CAM_FOCUS>));
fragColor = march(c_p, view * r);
}

View File

@ -0,0 +1,6 @@
#version 330
in vec2 in_position;
void main() {
gl_Position = vec4(in_position, 0.0, 1.0);
}

View File

@ -0,0 +1,42 @@
raise DeprecationWarning
from command import Scene
import skia
import struct
from PIL import Image
import numpy as np
# 暂时先照抄小帕的了,之后有空再单独封装一下
async def render(cmd: str, width: int, height: int):
scene = Scene(cmd)
surface = skia.Surface(width, height)
sksl_code = scene.compile()
runtime_effect = skia.RuntimeEffect.MakeForShader(scene.compile())
if runtime_effect is None:
raise Exception("cannot compile sksl shader")
uv_uniform = struct.pack("ff", float(width), float(height))
uniform_data = skia.Data.MakeWithCopy(uv_uniform)
shader = runtime_effect.makeShader(uniform_data, None, 0)
canvas = surface.getCanvas()
canvas.clear(skia.Color(0, 0, 0, 0))
paint = skia.Paint()
paint.setShader(shader)
canvas.drawRect(skia.Rect.MakeWH(width, height), paint)
image = surface.makeImageSnapshot()
target_info = skia.ImageInfo.Make(
image.width(),
image.height(),
skia.ColorType.kBGRA_8888_ColorType,
skia.AlphaType.kPremul_AlphaType,
)
pixel_data = bytearray(
image.width() * image.height() * 4
) # 4 bytes per pixel (BGRA)
success = image.readPixels(target_info, pixel_data, target_info.minRowBytes())
img_array = np.frombuffer(pixel_data, dtype=np.uint8).reshape((height, width, 4))
rgb_array = img_array[:, :, [2, 1, 0]]
pil_img = Image.fromarray(rgb_array)
return pil_img

View File

@ -0,0 +1,54 @@
from dataclasses import dataclass
COLORS = {
"red": (1.0, 0.0, 0.0, 1.0),
"green": (0.0, 1.0, 0.0, 1.0),
"blue": (0.0, 0.0, 1.0, 1.0),
"white": (1.0, 1.0, 1.0, 1.0),
"black": (0.0, 0.0, 0.0, 1.0),
"gray": (0.5, 0.5, 0.5, 1.0),
"light_gray": (0.75, 0.75, 0.75, 1.0),
"dark_gray": (0.25, 0.25, 0.25, 1.0),
"dark_red": (0.5, 0.0, 0.0, 1.0),
"crimson": (0.86, 0.08, 0.24, 1.0),
"pink": (1.0, 0.75, 0.8, 1.0),
"hot_pink": (1.0, 0.41, 0.71, 1.0),
"orange_red": (1.0, 0.27, 0.0, 1.0),
"orange": (1.0, 0.65, 0.0, 1.0),
"gold": (1.0, 0.84, 0.0, 1.0),
"yellow": (1.0, 1.0, 0.0, 1.0),
"light_yellow": (1.0, 1.0, 0.88, 1.0),
"khaki": (0.94, 0.90, 0.55, 1.0),
"light_green": (0.56, 0.93, 0.56, 1.0),
"lime": (0.0, 1.0, 0.0, 1.0),
"forest_green": (0.13, 0.55, 0.13, 1.0),
"dark_green": (0.0, 0.39, 0.0, 1.0),
"olive": (0.5, 0.5, 0.0, 1.0),
"teal": (0.0, 0.5, 0.5, 1.0),
"light_blue": (0.68, 0.85, 0.9, 1.0),
"sky_blue": (0.53, 0.81, 0.92, 1.0),
"cyan": (0.0, 1.0, 1.0, 1.0),
"navy": (0.0, 0.0, 0.5, 1.0),
"royal_blue": (0.25, 0.41, 0.88, 1.0),
"steel_blue": (0.27, 0.51, 0.71, 1.0),
"purple": (0.5, 0.0, 0.5, 1.0),
"magenta": (1.0, 0.0, 1.0, 1.0),
"violet": (0.93, 0.51, 0.93, 1.0),
"lavender": (0.90, 0.90, 0.98, 1.0),
"indigo": (0.29, 0.0, 0.51, 1.0),
"brown": (0.65, 0.16, 0.16, 1.0),
"saddle_brown": (0.55, 0.27, 0.07, 1.0),
"chocolate": (0.82, 0.41, 0.12, 1.0),
"tan": (0.82, 0.71, 0.55, 1.0),
"beige": (0.96, 0.96, 0.86, 1.0),
"coral": (1.0, 0.5, 0.31, 1.0),
"salmon": (0.98, 0.5, 0.45, 1.0),
"turquoise": (0.25, 0.88, 0.82, 1.0),
"aqua": (0.0, 1.0, 1.0, 1.0),
"plum": (0.87, 0.63, 0.87, 1.0),
"wheat": (0.96, 0.87, 0.70, 1.0),
}
@dataclass
class Texture:
color: tuple[float, float, float, float] = (1.0, 1.0, 1.0, 1.0)

View File

@ -0,0 +1,94 @@
import numpy as np
from konabot.plugins.marchtoy.texture import COLORS
class Formatter:
@staticmethod
def float4(m: np.ndarray) -> str:
if m.shape != (4, 4):
m = np.identity(4)
v_0 = ", ".join([str(x) for x in m[:, 0]])
v_1 = ", ".join([str(x) for x in m[:, 1]])
v_2 = ", ".join([str(x) for x in m[:, 2]])
v_3 = ", ".join([str(x) for x in m[:, 3]])
return f"float4x4(float4({v_0}), float4({v_1}), float4({v_2}), float4({v_3}))"
@staticmethod
def vec4(m: np.ndarray) -> str:
if m.shape != (4, 4):
m = np.identity(4)
v_0 = ", ".join([str(x) for x in m[:, 0]])
v_1 = ", ".join([str(x) for x in m[:, 1]])
v_2 = ", ".join([str(x) for x in m[:, 2]])
v_3 = ", ".join([str(x) for x in m[:, 3]])
return f"mat4(vec4({v_0}), vec4({v_1}), vec4({v_2}), vec4({v_3}))"
"""
TODO: 除零出现 nan 情况的单独处理
"""
class ArgParser:
@staticmethod
def as_float(args: list[str], default: float = 0.0) -> float:
try:
if len(args) >= 1:
x = float(args[0])
return x
except:
raise Exception(f"cannot parse {args}")
return default
@staticmethod
def as_vec2(
args: list[str], default: np.ndarray = np.array((0.0, 0.0))
) -> np.ndarray:
try:
if len(args) == 1:
x = float(args[0])
return np.array((x, x))
elif len(args) == 2:
x = float(args[0])
y = float(args[1])
return np.array((x, y))
except:
raise Exception(f"cannot parse {args}")
return default
@staticmethod
def as_vec3(
args: list[str], default: np.ndarray = np.array((0.0, 0.0, 0.0))
) -> np.ndarray:
try:
if len(args) == 1:
x = float(args[0])
return np.array((x, x, x))
elif len(args) == 3:
x = float(args[0])
y = float(args[1])
z = float(args[2])
return np.array((x, y, z))
except:
raise Exception(f"cannot parse {args}")
return default
@staticmethod
def as_vec4(
args: list[str], default: np.ndarray = np.array((0.0, 0.0, 0.0, 0.0))
) -> np.ndarray:
try:
if len(args) == 1:
x = float(args[0])
return np.array((x, x, x, x))
elif len(args) == 4:
x = float(args[0])
y = float(args[1])
z = float(args[2])
w = float(args[3])
return np.array((x, y, z, w))
except:
raise Exception(f"cannot parse {args}")
return default
@staticmethod
def as_literal_color(args: list[str], default=np.array((1.0, 1.0, 1.0, 1.0))):
if len(args) == 1 and args[0] in COLORS:
return np.array(COLORS[args[0]])
return default

View File

@ -34,6 +34,7 @@ from konabot.plugins.memepack.drawing.saying import (
draw_mnk, draw_mnk,
draw_pt, draw_pt,
draw_suan, draw_suan,
draw_vr,
) )
from konabot.plugins.memepack.drawing.watermark import draw_doubao_watermark from konabot.plugins.memepack.drawing.watermark import draw_doubao_watermark
@ -334,3 +335,29 @@ async def _(img: DepPILImage):
result_bytes = BytesIO() result_bytes = BytesIO()
result.save(result_bytes, format="PNG") result.save(result_bytes, format="PNG")
await doubao_cmd.send(await UniMessage().image(raw=result_bytes).export()) await doubao_cmd.send(await UniMessage().image(raw=result_bytes).export())
vrsay = on_alconna(
Alconna(
"vr说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写vr说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@vrsay.handle()
async def _(saying: list[str]):
img = await draw_vr("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await vrsay.send(await UniMessage().image(raw=img_bytes).export())

View File

@ -16,6 +16,7 @@ dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert(
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA") suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
cute_ten_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "tententen.png").convert("RGBA") cute_ten_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "tententen.png").convert("RGBA")
kio_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "kiosay.jpg").convert("RGBA") kio_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "kiosay.jpg").convert("RGBA")
vr_image = PIL.Image.open(ASSETS_PATH / 'img' / 'meme' / 'vr.jpg').convert("RGBA")
def _draw_geimao(saying: str): def _draw_geimao(saying: str):
@ -123,3 +124,24 @@ def draw_kiosay(saying: str):
) )
return img return img
@make_async
def draw_vr(saying: str):
img = vr_image.copy()
w, h = img.size
hw = 300
img2 = PIL.Image.new("RGBA", (w, h + hw), 'white')
img2.paste(img, (0, hw))
with imagetext_py.Writer(img2) as iw:
iw.draw_text_wrapped(
saying, w // 2, hw // 2 + 15, 0.5, 0.5, w, 64, LXGWWENKAI_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img2

View File

@ -3,15 +3,17 @@ from nonebot_plugin_alconna import Alconna, Args, Option, UniMessage, on_alconna
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.plugins.sksl.run_sksl import render_sksl_shader_to_gif from konabot.plugins.sksl.run_sksl import render_sksl_shader_to_gif
cmd_run_sksl = on_alconna(
Alconna(
"shadertool",
Option("--width", Args["width_", int]),
Option("--height", Args["height_", int]),
Option("--duration", Args["duration_", float]),
Option("--fps", Args["fps_", float]),
Args["code", str],
)
)
cmd_run_sksl = on_alconna(Alconna(
"shadertool",
Option("--width", Args["width_", int]),
Option("--height", Args["height_", int]),
Option("--duration", Args["duration_", float]),
Option("--fps", Args["fps_", float]),
Args["code", str],
))
@cmd_run_sksl.handle() @cmd_run_sksl.handle()
async def _( async def _(
@ -34,10 +36,12 @@ async def _(
if width_ > 640 or height_ > 640: if width_ > 640 or height_ > 640:
raise BotExceptionMessage("最大支持 640x640 啦!不要太大啦!") raise BotExceptionMessage("最大支持 640x640 啦!不要太大啦!")
code = code.strip("\"").strip("'") code = code.strip('"').strip("'")
try: try:
res = await render_sksl_shader_to_gif(code, width_, height_, duration_, fps_) res = await render_sksl_shader_to_gif(code, width_, height_, duration_, fps_)
await cmd_run_sksl.send(await UniMessage().image(raw=res).export()) await cmd_run_sksl.send(await UniMessage().image(raw=res).export())
except (ValueError, RuntimeError) as e: except (ValueError, RuntimeError) as e:
await cmd_run_sksl.send(await UniMessage().text(f"渲染时遇到了问题:\n\n{e}").export()) await cmd_run_sksl.send(
await UniMessage().text(f"渲染时遇到了问题:\n\n{e}").export()
)

View File

@ -24,7 +24,6 @@ def _pack_uniforms(uniforms_dict, width, height, time_val):
# 移除填充字节,使用紧凑布局 # 移除填充字节,使用紧凑布局
return time_bytes + res_bytes return time_bytes + res_bytes
def _render_sksl_shader_to_gif( def _render_sksl_shader_to_gif(
sksl_code: str, sksl_code: str,
width: int = 256, width: int = 256,
@ -152,4 +151,4 @@ async def render_sksl_shader_to_gif(
height, height,
duration, duration,
fps, fps,
) )

View File

@ -1,9 +1,11 @@
import asyncio import asyncio
import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import cast from typing import cast
import zipfile
from loguru import logger from loguru import logger
from nonebot import on_command from nonebot import on_command
@ -13,22 +15,99 @@ from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot
from nonebot.adapters.onebot.v11.message import Message as OB11Message from nonebot.adapters.onebot.v11.message import Message as OB11Message
from konabot.common.artifact import ArtifactDepends, ensure_artifact, register_artifacts
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import TMP_PATH from konabot.common.path import BINARY_PATH, TMP_PATH
arti_typst_linux = ArtifactDepends(
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz",
sha256="a6044cbad2a954deb921167e257e120ac0a16b20339ec01121194ff9d394996d",
target=BINARY_PATH / "typst.tar.xz",
required_os="Linux",
required_arch="x86_64",
)
arti_typst_windows = ArtifactDepends(
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-pc-windows-msvc.zip",
sha256="51353994ac83218c3497052e89b2c432c53b9d4439cdc1b361e2ea4798ebfc13",
target=BINARY_PATH / "typst.zip",
required_os="Windows",
required_arch="AMD64",
)
bin_path: Path | None = None
@arti_typst_linux.on_finished
async def _(downloaded: bool):
global bin_path
tar_path = arti_typst_linux.target
bin_path = BINARY_PATH / "typst"
if downloaded or not bin_path.exists():
bin_path.unlink(missing_ok=True)
process = await asyncio.create_subprocess_exec(
"tar",
"-xvf",
tar_path,
"--strip-components=1",
"-C",
BINARY_PATH,
"typst-x86_64-unknown-linux-musl/typst",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0 or not bin_path.exists():
logger.warning(
"似乎没有成功解压 Typst 二进制文件,检查一下吧! "
f"stdout={stdout} stderr={stderr}"
)
else:
os.chmod(bin_path, 0o755)
@arti_typst_windows.on_finished
async def _(downloaded: bool):
global bin_path
zip_path = arti_typst_windows.target
bin_path = BINARY_PATH / "typst.exe"
if downloaded or not bin_path.exists():
bin_path.unlink(missing_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
target_name = "typst-x86_64-pc-windows-msvc/typst.exe"
if target_name not in zf.namelist():
logger.warning("在 Zip 压缩包里面没有找到目标文件")
return
zf.extract(target_name, BINARY_PATH)
(BINARY_PATH / target_name).rename(bin_path)
(BINARY_PATH / "typst-x86_64-pc-windows-msvc").rmdir()
register_artifacts(arti_typst_linux)
register_artifacts(arti_typst_windows)
TEMPLATE_PATH = Path(__file__).parent / "template.typ" TEMPLATE_PATH = Path(__file__).parent / "template.typ"
TEMPLATE = TEMPLATE_PATH.read_text() TEMPLATE = TEMPLATE_PATH.read_text()
def render_sync(code: str) -> bytes: def render_sync(code: str) -> bytes | None:
global bin_path
if bin_path is None:
return
with TemporaryDirectory(dir=TMP_PATH) as tmpdirname: with TemporaryDirectory(dir=TMP_PATH) as tmpdirname:
temp_dir = Path(tmpdirname).resolve() temp_dir = Path(tmpdirname).resolve()
temp_typ = temp_dir / "page.typ" temp_typ = temp_dir / "page.typ"
temp_typ.write_text(TEMPLATE + "\n\n" + code) temp_typ.write_text(TEMPLATE + "\n\n" + code)
cmd = [ cmd = [
"typst", bin_path,
"compile", "compile",
temp_typ.name, temp_typ.name,
"--format", "--format",
@ -61,7 +140,7 @@ def render_sync(code: str) -> bytes:
return result_png.read_bytes() return result_png.read_bytes()
async def render(code: str) -> bytes: async def render(code: str) -> bytes | None:
task = asyncio.to_thread(lambda: render_sync(code)) task = asyncio.to_thread(lambda: render_sync(code))
return await task return await task
@ -70,7 +149,21 @@ cmd = on_command("typst")
@cmd.handle() @cmd.handle()
async def _(evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget): async def _(
evt: Event,
bot: Bot,
msg: UniMsg,
target: DepLongTaskTarget,
):
global bin_path
# 对于本地机器,一般不会在应用启动时自动下载,这里再保证存在
await ensure_artifact(arti_typst_linux)
await ensure_artifact(arti_typst_windows)
if bin_path is None or not bin_path.exists():
logger.warning("当前环境不存在 Typst但仍然调用了")
return
typst_code = "" typst_code = ""
if isinstance(evt, OB11MessageEvent): if isinstance(evt, OB11MessageEvent):
if evt.reply is not None: if evt.reply is not None:
@ -92,6 +185,8 @@ async def _(evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget):
try: try:
res = await render(typst_code) res = await render(typst_code)
if res is None:
raise FileNotFoundError("没有渲染出来内容")
except FileNotFoundError as e: except FileNotFoundError as e:
await target.send_message("渲染出错:内部错误") await target.send_message("渲染出错:内部错误")
raise e from e raise e from e

2297
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -19,7 +19,6 @@ dependencies = [
"imagetext-py (>=2.2.0,<3.0.0)", "imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)", "opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)", "returns (>=0.26.0,<0.27.0)",
"skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)", "nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)", "qrcode (>=8.2,<9.0)",
"nanoid (>=2.0.0,<3.0.0)", "nanoid (>=2.0.0,<3.0.0)",
@ -39,6 +38,8 @@ dependencies = [
"pytest-cov (>=7.0.0,<8.0.0)", "pytest-cov (>=7.0.0,<8.0.0)",
"aiosignal (>=1.4.0,<2.0.0)", "aiosignal (>=1.4.0,<2.0.0)",
"pytest-mock (>=3.15.1,<4.0.0)", "pytest-mock (>=3.15.1,<4.0.0)",
"skia-python (>=144.0.post2,<145.0)",
"moderngl (>=5.12.0,<6.0.0)",
] ]
[tool.poetry] [tool.poetry]

View File

@ -86,3 +86,67 @@ def test_prase_input_args_parses_resize_second_argument_as_float():
assert len(filters) == 1 assert len(filters) == 1
assert filters[0].name == "缩放" assert filters[0].name == "缩放"
assert filters[0].args == [2.0, 3.0] assert filters[0].args == [2.0, 3.0]
def test_apply_pixel_sort_keeps_image_mode_and_size():
"""测试 Pixel Sort 保持图像的 mode 和 size"""
image = Image.new("RGBA", (10, 10), (255, 0, 0, 128))
result = ImageFilterImplement.apply_pixel_sort(image)
assert result.size == image.size
assert result.mode == "RGBA"
def test_apply_pixel_sort_horizontal():
"""测试水平方向的 Pixel Sort"""
# 创建一个简单的渐变图像
image = Image.new("RGB", (5, 3))
# 第一行:红到蓝渐变
image.putpixel((0, 0), (255, 0, 0))
image.putpixel((1, 0), (200, 0, 0))
image.putpixel((2, 0), (100, 0, 0))
image.putpixel((3, 0), (50, 0, 0))
image.putpixel((4, 0), (0, 0, 255))
# 填充其他行
for y in range(1, 3):
for x in range(5):
image.putpixel((x, y), (128, 128, 128))
result = ImageFilterImplement.apply_pixel_sort(
image, direction="horizontal", auto_threshold=False, mask_threshold=10
)
assert result.size == image.size
assert result.mode == "RGBA"
def test_apply_pixel_sort_vertical():
"""测试垂直方向的 Pixel Sort"""
image = Image.new("RGB", (3, 5))
# 第一列:绿到红渐变
image.putpixel((0, 0), (0, 255, 0))
image.putpixel((0, 1), (0, 200, 0))
image.putpixel((0, 2), (0, 100, 0))
image.putpixel((0, 3), (0, 50, 0))
image.putpixel((0, 4), (255, 0, 0))
# 填充其他列
for y in range(5):
for x in range(1, 3):
image.putpixel((x, y), (128, 128, 128))
result = ImageFilterImplement.apply_pixel_sort(
image, direction="vertical", auto_threshold=False, mask_threshold=10
)
assert result.size == image.size
assert result.mode == "RGBA"
def test_prase_input_args_parses_pixel_sort_arguments():
"""测试解析 Pixel Sort 参数"""
filters = prase_input_args("像素排序 horizontal 0 false brightness 128 false 1")
assert len(filters) == 1
assert filters[0].name == "像素排序"
assert filters[0].args == ["horizontal", 0.0, False, "brightness", 128.0, False, 1]

View File

@ -0,0 +1,17 @@
import sys
from pathlib import Path
PLUGIN_DIR = Path(__file__).resolve().parents[1] / "konabot" / "plugins" / "marchtoy"
if str(PLUGIN_DIR) not in sys.path:
sys.path.insert(0, str(PLUGIN_DIR))
from obj import Transform
def test_translate_expression_puts_offset_in_matrix_column():
transform = Transform().translate(1.0, 2.0, 3.0)
expr = transform.p_expr()
assert "float4(-1.0, -2.0, -3.0, 1.0)" in expr