Compare commits
2 Commits
master
...
6a2fe11753
| Author | SHA1 | Date | |
|---|---|---|---|
|
6a2fe11753
|
|||
|
97e87c7ec3
|
4
.vscode/settings.json
vendored
4
.vscode/settings.json
vendored
@ -1,5 +1,3 @@
|
||||
{
|
||||
"python.REPL.enableREPLSmartSend": false,
|
||||
"python-envs.defaultEnvManager": "ms-python.python:poetry",
|
||||
"python-envs.defaultPackageManager": "ms-python.python:poetry"
|
||||
"python.REPL.enableREPLSmartSend": false
|
||||
}
|
||||
14
Dockerfile
14
Dockerfile
@ -1,16 +1,3 @@
|
||||
FROM alpine:latest AS artifacts
|
||||
|
||||
RUN apk add --no-cache curl xz
|
||||
WORKDIR /tmp
|
||||
|
||||
RUN mkdir -p /artifacts
|
||||
RUN curl -L -o typst.tar.xz "https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz" \
|
||||
&& tar -xJf typst.tar.xz \
|
||||
&& mv typst-x86_64-unknown-linux-musl/typst /artifacts
|
||||
|
||||
RUN chmod -R +x /artifacts/
|
||||
|
||||
|
||||
FROM python:3.13-slim AS base
|
||||
|
||||
ENV VIRTUAL_ENV=/app/.venv \
|
||||
@ -51,7 +38,6 @@ RUN uv sync --no-install-project
|
||||
FROM base AS runtime
|
||||
|
||||
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
|
||||
COPY --from=artifacts /artifacts/ /usr/local/bin/
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
|
||||
@ -1,9 +1,10 @@
|
||||
import asyncio
|
||||
from typing import Any, Awaitable, Callable
|
||||
import aiohttp
|
||||
import hashlib
|
||||
import platform
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
@ -14,6 +15,8 @@ from pydantic import BaseModel
|
||||
|
||||
@dataclass
|
||||
class ArtifactDepends:
|
||||
_Callback = Callable[[bool], Awaitable[Any]]
|
||||
|
||||
url: str
|
||||
sha256: str
|
||||
target: Path
|
||||
@ -27,6 +30,9 @@ class ArtifactDepends:
|
||||
use_proxy: bool = True
|
||||
"网络问题,赫赫;使用的是 Discord 模块配置的 proxy"
|
||||
|
||||
callbacks: list[_Callback] = field(default_factory=list)
|
||||
"在任务完成以后,应该做的事情"
|
||||
|
||||
def is_corresponding_platform(self) -> bool:
|
||||
if self.required_os is not None:
|
||||
if self.required_os.lower() != platform.system().lower():
|
||||
@ -36,26 +42,43 @@ class ArtifactDepends:
|
||||
return False
|
||||
return True
|
||||
|
||||
def on_finished(self, task: _Callback) -> _Callback:
|
||||
self.callbacks.append(task)
|
||||
return task
|
||||
|
||||
async def _finished(self, downloaded: bool) -> list[Any | BaseException]:
|
||||
tasks = set()
|
||||
for f in self.callbacks:
|
||||
tasks.add(f(downloaded))
|
||||
return await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
prefetch_artifact: bool = False
|
||||
"是否提前下载好二进制依赖"
|
||||
|
||||
|
||||
artifact_list = []
|
||||
artifact_list: list[ArtifactDepends] = []
|
||||
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
config = nonebot.get_plugin_config(Config)
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
if config.prefetch_artifact:
|
||||
logger.info("启动检测中:正在检测需求的二进制是否下载")
|
||||
semaphore = asyncio.Semaphore(10)
|
||||
|
||||
async def _task(artifact: ArtifactDepends):
|
||||
async with semaphore:
|
||||
await ensure_artifact(artifact)
|
||||
downloaded = await ensure_artifact(artifact)
|
||||
result = await artifact._finished(downloaded)
|
||||
for r in result:
|
||||
if isinstance(r, BaseException):
|
||||
logger.warning("完成了二进制文件的下载,但是有未捕捉的错误")
|
||||
logger.exception(r)
|
||||
|
||||
tasks: set[asyncio.Task] = set()
|
||||
for a in artifact_list:
|
||||
@ -78,35 +101,43 @@ async def download_artifact(artifact: ArtifactDepends):
|
||||
async with aiohttp.ClientSession(proxy=proxy) as client:
|
||||
result = await client.get(artifact.url)
|
||||
if result.status != 200:
|
||||
logger.warning(f"已经下载了二进制,但是注意服务器没有返回 200! URL={artifact.url} TARGET={artifact.target} CODE={result.status}")
|
||||
logger.warning(
|
||||
f"已经下载了二进制,但是注意服务器没有返回 200! URL={artifact.url} TARGET={artifact.target} CODE={result.status}"
|
||||
)
|
||||
data = await result.read()
|
||||
artifact.target.write_bytes(data)
|
||||
if not platform.system().lower() == 'windows':
|
||||
if not platform.system().lower() == "windows":
|
||||
artifact.target.chmod(0o755)
|
||||
|
||||
logger.info(f"下载好了 TARGET={artifact.target} URL={artifact.url}")
|
||||
m = hashlib.sha256(artifact.target.read_bytes())
|
||||
if m.hexdigest().lower() != artifact.sha256.lower():
|
||||
logger.warning(f"下载到的二进制的 sha256 与需求不同 TARGET={artifact.target} REQUESTED={artifact.sha256} ACTUAL={m.hexdigest()}")
|
||||
logger.warning(
|
||||
f"下载到的二进制的 sha256 与需求不同 TARGET={artifact.target} REQUESTED={artifact.sha256} ACTUAL={m.hexdigest()}"
|
||||
)
|
||||
|
||||
|
||||
async def ensure_artifact(artifact: ArtifactDepends):
|
||||
async def ensure_artifact(artifact: ArtifactDepends) -> bool:
|
||||
if not artifact.is_corresponding_platform():
|
||||
return
|
||||
return False
|
||||
|
||||
if not artifact.target.exists():
|
||||
logger.info(f"二进制依赖 {artifact.target} 不存在")
|
||||
if not artifact.target.parent.exists():
|
||||
artifact.target.parent.mkdir(parents=True, exist_ok=True)
|
||||
await download_artifact(artifact)
|
||||
return True
|
||||
else:
|
||||
m = hashlib.sha256(artifact.target.read_bytes())
|
||||
if m.hexdigest().lower() != artifact.sha256.lower():
|
||||
logger.info(f"二进制依赖 {artifact.target} 的哈希无法对应需求的哈希,准备重新下载")
|
||||
logger.info(
|
||||
f"二进制依赖 {artifact.target} 的哈希无法对应需求的哈希,准备重新下载"
|
||||
)
|
||||
artifact.target.unlink()
|
||||
await download_artifact(artifact)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def register_artifacts(*artifacts: ArtifactDepends):
|
||||
artifact_list.extend(artifacts)
|
||||
|
||||
|
||||
@ -79,14 +79,6 @@ fx [滤镜名称] <参数1> <参数2> ...
|
||||
* ```fx JPEG损坏 <质量=10>```
|
||||
* 质量范围建议为 1~95,数值越低,压缩痕迹越重、效果越搞笑。
|
||||
* ```fx 动图 <帧率=10>```
|
||||
* ```fx 像素排序 <方向=horizontal> <阈值=0> <自动阈值=true> <排序依据=brightness> <遮罩阈值=128> <反向=false> <块大小=1>```
|
||||
* 对像素按指定属性进行排序,效果类似 Photoshop/GIMP Pixel Sort。
|
||||
* **方向**:horizontal(水平)/ vertical(垂直)
|
||||
* **排序依据**:brightness(亮度)/ hue(色相)/ red / green / blue
|
||||
* **自动阈值**:true 时使用图像亮度中位数作为遮罩阈值
|
||||
* **遮罩阈值**:决定哪些像素参与排序(亮度 >= 阈值)
|
||||
* **反向**:true 时从亮到暗排序
|
||||
* **块大小**:每 N 行/列作为一个整体排序单位
|
||||
|
||||
### 多图像处理器
|
||||
* ```fx 存入图像 <目标名称>```
|
||||
|
||||
@ -71,14 +71,6 @@ giftool [图片] [选项]
|
||||
|
||||
- 调整 GIF 图的速度。若为负数,则代表倒放。
|
||||
|
||||
### `--pingpong`(可选)
|
||||
|
||||
- 开启乒乓模式,生成正放-倒放拼接的 GIF 图。
|
||||
- 即播放完正向后,会倒放回去,形成往复循环效果。
|
||||
- 可与 `--speed` 配合使用,调整播放速度。
|
||||
- 示例:`giftool [图片] --pingpong`
|
||||
- 示例:`giftool [图片] --pingpong --speed 2.0`
|
||||
|
||||
## 使用方式
|
||||
|
||||
1. 发送指令前,请确保:
|
||||
|
||||
@ -31,16 +31,7 @@
|
||||
|
||||
- 用 `|` 连接多个操作,前一个的输出自动作为后一个的输入。
|
||||
- 用 `;` 分隔多条独立指令,它们各自产生输出,最终合并显示。
|
||||
- 用 `&&` / `||` 做最小 shell 风格条件执行:
|
||||
- `cmd1 && cmd2`:仅当 `cmd1` 成功时执行 `cmd2`
|
||||
- `cmd1 || cmd2`:仅当 `cmd1` 失败时执行 `cmd2`
|
||||
- 用 `!` 对一条 pipeline 的成功/失败取反。
|
||||
- 支持最小 bash-like `if ... then ... else ... fi` 语句。
|
||||
- 支持最小 bash-like `while ... do ... done` 循环。
|
||||
- 可使用内建真假命令:`true` / `false`。
|
||||
- 为避免滥用与卡死:
|
||||
- 同一用户同时只能运行 **一个** textfx 脚本
|
||||
- 单个脚本最长执行时间为 **60 秒**
|
||||
- 用 `>` 或 `>>` 把结果保存起来(见下文),被重定向的指令不会产生输出。
|
||||
|
||||
**例子**:把"HELLO"先反转,再转成摩斯电码:(转换为摩斯电码功能暂未实现)
|
||||
```
|
||||
@ -48,36 +39,6 @@ textfx reverse HELLO | morse en
|
||||
```
|
||||
→ 输出:`--- .-.. .-.. . ....`
|
||||
|
||||
**例子**:失败后兜底执行:
|
||||
```
|
||||
textfx test a = b || echo 不相等
|
||||
```
|
||||
→ 输出:`不相等`
|
||||
|
||||
**例子**:成功后继续执行:
|
||||
```
|
||||
textfx [ 2 -gt 1 ] && echo 条件成立
|
||||
```
|
||||
→ 输出:`条件成立`
|
||||
|
||||
**例子**:真正的 if 语句:
|
||||
```
|
||||
textfx if test a = b; then echo yes; else echo no; fi
|
||||
```
|
||||
→ 输出:`no`
|
||||
|
||||
**例子**:对条件取反:
|
||||
```
|
||||
textfx ! test a = b && echo 条件不成立
|
||||
```
|
||||
→ 输出:`条件不成立`
|
||||
|
||||
**例子**:while 循环:
|
||||
```
|
||||
textfx while false; do echo 不会执行; done
|
||||
```
|
||||
→ 输出为空
|
||||
|
||||
**例子**:多条指令各自输出:
|
||||
```
|
||||
textfx echo 你好; echo 世界
|
||||
@ -171,51 +132,6 @@ Base64 编码或解码。
|
||||
|
||||
> 缓存仅在当前对话中有效,重启后清空。
|
||||
|
||||
### true / false / test / [
|
||||
最小 shell 风格条件命令。通常配合 `if`、`&&`、`||`、`!` 使用。
|
||||
|
||||
支持:
|
||||
- `true`:总是成功
|
||||
- `false`:总是失败
|
||||
- 字符串非空:`test foo`
|
||||
- `-n` / `-z`:`test -n foo`、`test -z ""`
|
||||
- 字符串比较:`test a = a`、`test a != b`
|
||||
- 整数比较:`test 2 -gt 1`、`test 3 -le 5`
|
||||
- 方括号别名:`[ 2 -gt 1 ]`
|
||||
|
||||
示例:
|
||||
- `/textfx true && echo 一定执行`
|
||||
- `/textfx false || echo 兜底执行`
|
||||
- `/textfx test hello && echo 有内容`
|
||||
- `/textfx test a = b || echo 不相等`
|
||||
- `/textfx [ 3 -ge 2 ] && echo yes`
|
||||
|
||||
### if / then / else / fi
|
||||
支持最小 bash-like 条件语句。
|
||||
|
||||
示例:
|
||||
- `/textfx if test a = a; then echo yes; else echo no; fi`
|
||||
- `/textfx if [ 2 -gt 1 ]; then echo 成立; fi`
|
||||
- `/textfx if test a = a; then if test b = c; then echo x; else echo y; fi; fi`
|
||||
|
||||
说明:
|
||||
- `if` 后面跟一个条件链,可配合 `test`、`[`、`!`、`&&`、`||`
|
||||
- `then` 和 `else` 后面都可以写多条以 `;` 分隔的 textfx 语句
|
||||
- `else` 可省略
|
||||
|
||||
### while / do / done
|
||||
支持最小 bash-like 循环语句。
|
||||
|
||||
示例:
|
||||
- `/textfx while false; do echo 不会执行; done`
|
||||
- `/textfx while ! false; do false; done`
|
||||
- `/textfx while ! false; do if true; then false; fi; done`
|
||||
|
||||
说明:
|
||||
- `while` 后面跟一个条件链,返回成功就继续循环
|
||||
- `do` 后面可写多条以 `;` 分隔的 textfx 语句
|
||||
- 为避免 bot 死循环,内置最大循环次数限制;超限会报错
|
||||
|
||||
### replace(或 替换、sed)
|
||||
替换文字(支持正则表达式)。
|
||||
示例(普通):`/textfx replace 世界 宇宙 你好世界` → `你好宇宙`
|
||||
|
||||
@ -1354,140 +1354,6 @@ class ImageFilterImplement:
|
||||
images.append(text_image)
|
||||
return image
|
||||
|
||||
# Pixel Sort - 像素排序效果
|
||||
@staticmethod
|
||||
def apply_pixel_sort(
|
||||
image: Image.Image,
|
||||
direction: str = "horizontal",
|
||||
threshold: float = 0.0,
|
||||
auto_threshold: bool = True,
|
||||
sort_by: str = "brightness",
|
||||
mask_threshold: float = 128.0,
|
||||
reverse: bool = False,
|
||||
block_size: int = 1
|
||||
) -> Image.Image:
|
||||
"""
|
||||
Pixel Sort 效果
|
||||
|
||||
参数:
|
||||
image: 输入图像
|
||||
direction: 排序方向,"horizontal"(水平) 或 "vertical"(垂直)
|
||||
threshold: 亮度阈值 (0-255),低于此值的像素会被排序(仅在 auto_threshold=False 时生效)
|
||||
auto_threshold: 是否自动计算阈值(使用图像中位数)
|
||||
sort_by: 排序依据,"brightness"(亮度)、"hue"(色相)、"red"、"green"、"blue"
|
||||
mask_threshold: 遮罩阈值 (0-255),决定哪些像素参与排序
|
||||
reverse: 是否反向排序
|
||||
block_size: 块大小,每 N 行/列作为一个整体排序单位
|
||||
"""
|
||||
if image.mode != 'RGBA':
|
||||
image = image.convert('RGBA')
|
||||
|
||||
arr = np.array(image)
|
||||
height, width = arr.shape[:2]
|
||||
|
||||
# 获取排序属性
|
||||
def get_sort_value(pixel):
|
||||
r, g, b = pixel[0], pixel[1], pixel[2]
|
||||
if sort_by == "brightness":
|
||||
return 0.299 * r + 0.587 * g + 0.114 * b
|
||||
elif sort_by == "hue":
|
||||
max_c = max(r, g, b)
|
||||
min_c = min(r, g, b)
|
||||
diff = max_c - min_c
|
||||
if diff == 0:
|
||||
return 0
|
||||
if max_c == r:
|
||||
return 60 * (((g - b) / diff) % 6)
|
||||
elif max_c == g:
|
||||
return 60 * ((b - r) / diff + 2)
|
||||
else:
|
||||
return 60 * ((r - g) / diff + 4)
|
||||
elif sort_by == "red":
|
||||
return r
|
||||
elif sort_by == "green":
|
||||
return g
|
||||
elif sort_by == "blue":
|
||||
return b
|
||||
return 0.299 * r + 0.587 * g + 0.114 * b
|
||||
|
||||
# 自动计算阈值
|
||||
if auto_threshold:
|
||||
# 使用图像亮度中位数作为阈值
|
||||
gray = np.array(image.convert('L'))
|
||||
mask_threshold = float(np.median(gray))
|
||||
|
||||
# 创建遮罩:哪些像素需要排序
|
||||
mask = np.zeros((height, width), dtype=bool)
|
||||
for y in range(height):
|
||||
for x in range(width):
|
||||
brightness = 0.299 * arr[y, x, 0] + 0.587 * arr[y, x, 1] + 0.114 * arr[y, x, 2]
|
||||
mask[y, x] = brightness >= mask_threshold
|
||||
|
||||
result = arr.copy()
|
||||
|
||||
if direction.lower() in ["horizontal", "h", "水平"]:
|
||||
# 水平排序(逐行)
|
||||
for y in range(height):
|
||||
# 收集当前行中需要排序的像素
|
||||
if block_size > 1:
|
||||
# 按块处理
|
||||
for block_start in range(0, width, block_size):
|
||||
block_end = min(block_start + block_size, width)
|
||||
pixels = []
|
||||
indices = []
|
||||
for x in range(block_start, block_end):
|
||||
if mask[y, x]:
|
||||
pixels.append(arr[y, x].copy())
|
||||
indices.append(x)
|
||||
if len(pixels) > 1:
|
||||
# 按指定属性排序
|
||||
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
|
||||
for i, x in enumerate(indices):
|
||||
result[y, x] = sorted_pixels[i]
|
||||
else:
|
||||
# 逐像素处理
|
||||
pixels = []
|
||||
indices = []
|
||||
for x in range(width):
|
||||
if mask[y, x]:
|
||||
pixels.append(arr[y, x].copy())
|
||||
indices.append(x)
|
||||
if len(pixels) > 1:
|
||||
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
|
||||
for i, x in enumerate(indices):
|
||||
result[y, x] = sorted_pixels[i]
|
||||
|
||||
elif direction.lower() in ["vertical", "v", "垂直"]:
|
||||
# 垂直排序(逐列)
|
||||
for x in range(width):
|
||||
if block_size > 1:
|
||||
# 按块处理
|
||||
for block_start in range(0, height, block_size):
|
||||
block_end = min(block_start + block_size, height)
|
||||
pixels = []
|
||||
indices = []
|
||||
for y in range(block_start, block_end):
|
||||
if mask[y, x]:
|
||||
pixels.append(arr[y, x].copy())
|
||||
indices.append(y)
|
||||
if len(pixels) > 1:
|
||||
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
|
||||
for i, y in enumerate(indices):
|
||||
result[y, x] = sorted_pixels[i]
|
||||
else:
|
||||
pixels = []
|
||||
indices = []
|
||||
for y in range(height):
|
||||
if mask[y, x]:
|
||||
pixels.append(arr[y, x].copy())
|
||||
indices.append(y)
|
||||
if len(pixels) > 1:
|
||||
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
|
||||
for i, y in enumerate(indices):
|
||||
result[y, x] = sorted_pixels[i]
|
||||
|
||||
return Image.fromarray(result, 'RGBA')
|
||||
|
||||
|
||||
|
||||
class ImageFilterEmpty:
|
||||
|
||||
@ -65,8 +65,6 @@ class ImageFilterManager:
|
||||
"覆盖图像": ImageFilterImplement.apply_overlay,
|
||||
# 生成式
|
||||
"覆加颜色": ImageFilterImplement.generate_solid,
|
||||
# Pixel Sort
|
||||
"像素排序": ImageFilterImplement.apply_pixel_sort,
|
||||
}
|
||||
|
||||
generate_filter_map = {
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
from typing import cast
|
||||
import asyncio
|
||||
from loguru import logger
|
||||
from nonebot import on_command
|
||||
import nonebot
|
||||
@ -32,11 +31,8 @@ from konabot.plugins.handle_text.handlers.random_handlers import THShuffle, THSo
|
||||
from konabot.plugins.handle_text.handlers.unix_handlers import (
|
||||
THCat,
|
||||
THEcho,
|
||||
THFalse,
|
||||
THReplace,
|
||||
THRm,
|
||||
THTest,
|
||||
THTrue,
|
||||
)
|
||||
from konabot.plugins.handle_text.handlers.whitespace_handlers import (
|
||||
THLines,
|
||||
@ -47,37 +43,11 @@ from konabot.plugins.handle_text.handlers.whitespace_handlers import (
|
||||
)
|
||||
|
||||
|
||||
TEXTFX_MAX_RUNTIME_SECONDS = 60
|
||||
_textfx_running_users: set[str] = set()
|
||||
|
||||
|
||||
def _get_textfx_user_key(evt: Event) -> str:
|
||||
user_id = getattr(evt, "user_id", None)
|
||||
self_id = getattr(evt, "self_id", None)
|
||||
group_id = getattr(evt, "group_id", None)
|
||||
if user_id is not None:
|
||||
if group_id is not None:
|
||||
return f"{self_id}:{group_id}:{user_id}"
|
||||
return f"{self_id}:private:{user_id}"
|
||||
session_id = getattr(evt, "get_session_id", None)
|
||||
if callable(session_id):
|
||||
try:
|
||||
return f"session:{evt.get_session_id()}"
|
||||
except Exception:
|
||||
pass
|
||||
return f"event:{evt.__class__.__name__}:{id(evt)}"
|
||||
|
||||
|
||||
cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"})
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
||||
user_key = _get_textfx_user_key(evt)
|
||||
if user_key in _textfx_running_users:
|
||||
await target.send_message("你当前已有一个 textfx 脚本正在运行,请等待它结束后再试。")
|
||||
return
|
||||
|
||||
istream = ""
|
||||
if isinstance(evt, OB11MessageEvent):
|
||||
if evt.reply is not None:
|
||||
@ -101,22 +71,9 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
||||
return
|
||||
|
||||
env = TextHandlerEnvironment(is_trusted=False, event=evt)
|
||||
results = await runner.run_pipeline(res, istream or None, env)
|
||||
|
||||
_textfx_running_users.add(user_key)
|
||||
try:
|
||||
results = await asyncio.wait_for(
|
||||
runner.run_pipeline(res, istream or None, env),
|
||||
timeout=TEXTFX_MAX_RUNTIME_SECONDS,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
rendered = await render_error_message(
|
||||
f"处理指令时出现问题:脚本执行超时(超过 {TEXTFX_MAX_RUNTIME_SECONDS} 秒)"
|
||||
)
|
||||
await target.send_message(rendered)
|
||||
return
|
||||
finally:
|
||||
_textfx_running_users.discard(user_key)
|
||||
|
||||
# 检查是否有错误
|
||||
for r in results:
|
||||
if r.code != 0:
|
||||
message = f"处理指令时出现问题:{r.ostream}"
|
||||
@ -124,6 +81,7 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
||||
await target.send_message(rendered)
|
||||
return
|
||||
|
||||
# 收集所有组的文本输出和附件
|
||||
ostreams = [r.ostream for r in results if r.ostream is not None]
|
||||
attachments = [r.attachment for r in results if r.attachment is not None]
|
||||
|
||||
@ -150,9 +108,6 @@ async def _():
|
||||
THCat(),
|
||||
THEcho(),
|
||||
THRm(),
|
||||
THTrue(),
|
||||
THFalse(),
|
||||
THTest(),
|
||||
THShuffle(),
|
||||
THReplace(),
|
||||
THBase64(),
|
||||
|
||||
@ -1,16 +1,15 @@
|
||||
import asyncio
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from string import whitespace
|
||||
from typing import cast
|
||||
|
||||
from loguru import logger
|
||||
from nonebot.adapters import Event
|
||||
|
||||
|
||||
MAX_WHILE_ITERATIONS = 100
|
||||
|
||||
|
||||
@dataclass
|
||||
class TextHandlerEnvironment:
|
||||
is_trusted: bool
|
||||
@ -54,63 +53,29 @@ class TextHandlerSync(TextHandler):
|
||||
|
||||
|
||||
@dataclass
|
||||
class Redirect:
|
||||
target: str
|
||||
append: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandNode:
|
||||
name: str
|
||||
class PipelineCommand:
|
||||
handler: TextHandler
|
||||
args: list[str]
|
||||
redirects: list[Redirect] = field(default_factory=list)
|
||||
# 新增:重定向目标(buffer key)
|
||||
redirect_target: str | None = None
|
||||
# 新增:是否为追加模式 (>>)
|
||||
redirect_append: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineNode:
|
||||
commands: list[CommandNode] = field(default_factory=list)
|
||||
negate: bool = False
|
||||
class Pipeline:
|
||||
command_groups: list[list[PipelineCommand]] = field(default_factory=list)
|
||||
"一个列表的列表,每一组之间的指令之间使用管道符连接,而不同组之间不会有数据流"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConditionalPipeline:
|
||||
op: str | None
|
||||
pipeline: PipelineNode
|
||||
class PipelineParseStatus(Enum):
|
||||
normal = 0
|
||||
in_string = 1
|
||||
in_string_to_escape = 2
|
||||
off_string = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandGroup:
|
||||
chains: list[ConditionalPipeline] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IfNode:
|
||||
condition: CommandGroup
|
||||
then_body: "Script"
|
||||
else_body: "Script | None" = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class WhileNode:
|
||||
condition: CommandGroup
|
||||
body: "Script"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Script:
|
||||
statements: list[CommandGroup | IfNode | WhileNode] = field(default_factory=list)
|
||||
|
||||
|
||||
class TokenKind(Enum):
|
||||
WORD = "word"
|
||||
OP = "op"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Token:
|
||||
kind: TokenKind
|
||||
value: str
|
||||
whitespaces = whitespace + " "
|
||||
|
||||
|
||||
class PipelineRunner:
|
||||
@ -128,433 +93,198 @@ class PipelineRunner:
|
||||
def register(self, handler: TextHandler):
|
||||
self.handlers.append(handler)
|
||||
|
||||
def _resolve_handler(self, cmd_name: str) -> TextHandler | str:
|
||||
matched = [
|
||||
h for h in self.handlers if cmd_name == h.name or cmd_name in h.keywords
|
||||
]
|
||||
if not matched:
|
||||
return f"不存在名为 {cmd_name} 的函数"
|
||||
if len(matched) > 1:
|
||||
logger.warning(
|
||||
f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}"
|
||||
def parse_pipeline(self, script: str) -> Pipeline | str:
|
||||
pipeline = Pipeline()
|
||||
|
||||
# 当前正在构建的上下文
|
||||
current_group: list[PipelineCommand] = []
|
||||
current_command_args: list[str] = []
|
||||
|
||||
# 字符串解析状态
|
||||
status = PipelineParseStatus.normal
|
||||
current_string = ""
|
||||
current_string_raw = ""
|
||||
status_in_string_pair = ""
|
||||
has_token = False # 是否正在构建一个 token(区分空字符串和无 token)
|
||||
|
||||
# 重定向解析状态
|
||||
is_parsing_redirect_filename = False
|
||||
current_redirect_target: str | None = None
|
||||
current_redirect_append = False
|
||||
|
||||
# 辅助函数:将当前解析到的字符串 flush 到 参数列表 或 重定向目标
|
||||
def _flush_token():
|
||||
nonlocal \
|
||||
current_string, \
|
||||
current_string_raw, \
|
||||
is_parsing_redirect_filename, \
|
||||
current_redirect_target, \
|
||||
has_token
|
||||
if not has_token:
|
||||
return
|
||||
|
||||
if is_parsing_redirect_filename:
|
||||
current_redirect_target = current_string
|
||||
is_parsing_redirect_filename = False # 重定向文件名只取一个 token
|
||||
else:
|
||||
current_command_args.append(current_string)
|
||||
|
||||
current_string = ""
|
||||
current_string_raw = ""
|
||||
has_token = False
|
||||
|
||||
# 辅助函数:将当前指令 flush 到当前组
|
||||
def _flush_command() -> str | None:
|
||||
nonlocal \
|
||||
current_command_args, \
|
||||
current_redirect_target, \
|
||||
current_redirect_append
|
||||
if not current_command_args:
|
||||
return None
|
||||
|
||||
cmd_name = current_command_args[0]
|
||||
args = current_command_args[1:]
|
||||
|
||||
matched = [
|
||||
h for h in self.handlers if cmd_name in h.keywords or cmd_name == h.name
|
||||
]
|
||||
if not matched:
|
||||
return f"不存在名为 {cmd_name} 的函数"
|
||||
if len(matched) > 1:
|
||||
logger.warning(
|
||||
f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}"
|
||||
)
|
||||
|
||||
cmd = PipelineCommand(
|
||||
handler=matched[0],
|
||||
args=args,
|
||||
redirect_target=current_redirect_target,
|
||||
redirect_append=current_redirect_append,
|
||||
)
|
||||
return matched[0]
|
||||
current_group.append(cmd)
|
||||
|
||||
def tokenize(self, script: str) -> list[Token] | str:
|
||||
tokens: list[Token] = []
|
||||
buf = ""
|
||||
quote: str | None = None
|
||||
escape = False
|
||||
# 重置指令级状态
|
||||
current_command_args = []
|
||||
current_redirect_target = None
|
||||
current_redirect_append = False
|
||||
return None
|
||||
|
||||
# 使用索引遍历以支持 look-ahead (处理 >>)
|
||||
i = 0
|
||||
operators = {"|", ";", ">", "&&", "||", ">>", "!"}
|
||||
escape_map = {
|
||||
"n": "\n",
|
||||
"r": "\r",
|
||||
"t": "\t",
|
||||
"0": "\0",
|
||||
"a": "\a",
|
||||
"b": "\b",
|
||||
"f": "\f",
|
||||
"v": "\v",
|
||||
"\\": "\\",
|
||||
'"': '"',
|
||||
"'": "'",
|
||||
}
|
||||
length = len(script)
|
||||
|
||||
def flush_word(force: bool = False):
|
||||
nonlocal buf
|
||||
if buf or force:
|
||||
tokens.append(Token(TokenKind.WORD, buf))
|
||||
buf = ""
|
||||
|
||||
while i < len(script):
|
||||
while i < length:
|
||||
c = script[i]
|
||||
|
||||
if quote is not None:
|
||||
if escape:
|
||||
buf += escape_map.get(c, c)
|
||||
escape = False
|
||||
elif c == "\\":
|
||||
escape = True
|
||||
elif c == quote:
|
||||
quote = None
|
||||
flush_word(force=True) # 引号闭合时强制 flush,即使是空字符串
|
||||
else:
|
||||
buf += c
|
||||
i += 1
|
||||
continue
|
||||
match status:
|
||||
case PipelineParseStatus.normal:
|
||||
if c in whitespaces:
|
||||
_flush_token()
|
||||
|
||||
if c in "'\"":
|
||||
quote = c
|
||||
i += 1
|
||||
continue
|
||||
elif c in "'\"":
|
||||
status_in_string_pair = c
|
||||
status = PipelineParseStatus.in_string
|
||||
current_string_raw = ""
|
||||
has_token = True
|
||||
|
||||
if c.isspace():
|
||||
flush_word()
|
||||
i += 1
|
||||
continue
|
||||
elif c == "|":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
# 管道符不结束 group,继续在 current_group 添加
|
||||
|
||||
two = script[i : i + 2]
|
||||
if two in operators:
|
||||
flush_word()
|
||||
tokens.append(Token(TokenKind.OP, two))
|
||||
i += 2
|
||||
continue
|
||||
elif c == ";":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
# 分号结束 group
|
||||
if current_group:
|
||||
pipeline.command_groups.append(current_group)
|
||||
current_group = []
|
||||
|
||||
if c in {"|", ";", ">", "!"}:
|
||||
flush_word()
|
||||
tokens.append(Token(TokenKind.OP, c))
|
||||
i += 1
|
||||
continue
|
||||
elif c == ">":
|
||||
_flush_token() # 先结束之前的参数
|
||||
# 检查是否是 append 模式 (>>)
|
||||
if i + 1 < length and script[i + 1] == ">":
|
||||
current_redirect_append = True
|
||||
i += 1 # 跳过下一个 >
|
||||
else:
|
||||
current_redirect_append = False
|
||||
|
||||
if c == "\\":
|
||||
if i + 1 < len(script):
|
||||
i += 1
|
||||
buf += escape_map.get(script[i], script[i])
|
||||
else:
|
||||
buf += c
|
||||
i += 1
|
||||
continue
|
||||
# 标记下一个 token 为文件名
|
||||
is_parsing_redirect_filename = True
|
||||
|
||||
else:
|
||||
current_string += c
|
||||
has_token = True
|
||||
|
||||
case PipelineParseStatus.in_string:
|
||||
current_string_raw += c
|
||||
if c == status_in_string_pair:
|
||||
status = PipelineParseStatus.off_string
|
||||
elif c == "\\":
|
||||
status = PipelineParseStatus.in_string_to_escape
|
||||
else:
|
||||
current_string += c
|
||||
|
||||
case PipelineParseStatus.in_string_to_escape:
|
||||
escape_map = {
|
||||
"n": "\n",
|
||||
"r": "\r",
|
||||
"t": "\t",
|
||||
"0": "\0",
|
||||
"a": "\a",
|
||||
"b": "\b",
|
||||
"f": "\f",
|
||||
"v": "\v",
|
||||
"\\": "\\",
|
||||
}
|
||||
current_string += escape_map.get(c, c)
|
||||
status = PipelineParseStatus.in_string
|
||||
|
||||
case PipelineParseStatus.off_string:
|
||||
if c in whitespaces:
|
||||
_flush_token()
|
||||
status = PipelineParseStatus.normal
|
||||
elif c == "|":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
status = PipelineParseStatus.normal
|
||||
elif c == ";":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
if current_group:
|
||||
pipeline.command_groups.append(current_group)
|
||||
current_group = []
|
||||
status = PipelineParseStatus.normal
|
||||
elif c == ">":
|
||||
_flush_token()
|
||||
status = PipelineParseStatus.normal
|
||||
# 回退索引,让下一次循环进入 normal 状态的 > 处理逻辑
|
||||
i -= 1
|
||||
else:
|
||||
# 紧接着的字符继续作为当前字符串的一部分 (如 "abc"d)
|
||||
current_string += c
|
||||
current_string_raw = ""
|
||||
status = PipelineParseStatus.normal
|
||||
|
||||
buf += c
|
||||
i += 1
|
||||
|
||||
if quote is not None:
|
||||
return "存在未闭合的引号"
|
||||
if escape:
|
||||
buf += "\\"
|
||||
# 循环结束后的收尾
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
|
||||
flush_word()
|
||||
return tokens
|
||||
if current_group:
|
||||
pipeline.command_groups.append(current_group)
|
||||
|
||||
def parse_pipeline(self, script: str) -> Script | str:
|
||||
tokens = self.tokenize(script)
|
||||
if isinstance(tokens, str):
|
||||
return tokens
|
||||
if not tokens:
|
||||
return Script()
|
||||
|
||||
pos = 0
|
||||
|
||||
def peek(offset: int = 0) -> Token | None:
|
||||
idx = pos + offset
|
||||
return tokens[idx] if idx < len(tokens) else None
|
||||
|
||||
def consume() -> Token:
|
||||
nonlocal pos
|
||||
tok = tokens[pos]
|
||||
pos += 1
|
||||
return tok
|
||||
|
||||
def consume_if_op(value: str) -> bool:
|
||||
tok = peek()
|
||||
if tok is not None and tok.kind == TokenKind.OP and tok.value == value:
|
||||
consume()
|
||||
return True
|
||||
return False
|
||||
|
||||
def consume_if_word(value: str) -> bool:
|
||||
tok = peek()
|
||||
if tok is not None and tok.kind == TokenKind.WORD and tok.value == value:
|
||||
consume()
|
||||
return True
|
||||
return False
|
||||
|
||||
def expect_word(msg: str) -> Token | str:
|
||||
tok = peek()
|
||||
if tok is None or tok.kind != TokenKind.WORD:
|
||||
return msg
|
||||
return consume()
|
||||
|
||||
def parse_command() -> CommandNode | str:
|
||||
first = expect_word("缺少指令名")
|
||||
if isinstance(first, str):
|
||||
return first
|
||||
|
||||
handler = self._resolve_handler(first.value)
|
||||
if isinstance(handler, str):
|
||||
return handler
|
||||
|
||||
args: list[str] = []
|
||||
redirects: list[Redirect] = []
|
||||
|
||||
while True:
|
||||
tok = peek()
|
||||
if tok is None:
|
||||
break
|
||||
if tok.kind == TokenKind.OP and tok.value in {"|", ";", "&&", "||"}:
|
||||
break
|
||||
if tok.kind == TokenKind.OP and tok.value in {">", ">>"}:
|
||||
op_tok = consume()
|
||||
target = expect_word("重定向操作符后面需要缓存名")
|
||||
if isinstance(target, str):
|
||||
return target
|
||||
redirects.append(
|
||||
Redirect(target=target.value, append=op_tok.value == ">>")
|
||||
)
|
||||
continue
|
||||
if tok.kind != TokenKind.WORD:
|
||||
return f"无法解析的 token: {tok.value}"
|
||||
args.append(consume().value)
|
||||
|
||||
return CommandNode(
|
||||
name=first.value,
|
||||
handler=handler,
|
||||
args=args,
|
||||
redirects=redirects,
|
||||
)
|
||||
|
||||
def parse_pipe() -> PipelineNode | str:
|
||||
negate = False
|
||||
while consume_if_op("!"):
|
||||
negate = not negate
|
||||
|
||||
pipeline = PipelineNode(negate=negate)
|
||||
command = parse_command()
|
||||
if isinstance(command, str):
|
||||
return command
|
||||
pipeline.commands.append(command)
|
||||
|
||||
while True:
|
||||
tok = peek()
|
||||
if tok is None or tok.kind != TokenKind.OP or tok.value != "|":
|
||||
break
|
||||
consume()
|
||||
next_command = parse_command()
|
||||
if isinstance(next_command, str):
|
||||
return next_command
|
||||
pipeline.commands.append(next_command)
|
||||
|
||||
return pipeline
|
||||
|
||||
def parse_chain() -> CommandGroup | str:
|
||||
group = CommandGroup()
|
||||
first_pipeline = parse_pipe()
|
||||
if isinstance(first_pipeline, str):
|
||||
return first_pipeline
|
||||
group.chains.append(ConditionalPipeline(op=None, pipeline=first_pipeline))
|
||||
|
||||
while True:
|
||||
tok = peek()
|
||||
if tok is None or tok.kind != TokenKind.OP or tok.value not in {"&&", "||"}:
|
||||
break
|
||||
op = consume().value
|
||||
next_pipeline = parse_pipe()
|
||||
if isinstance(next_pipeline, str):
|
||||
return next_pipeline
|
||||
group.chains.append(ConditionalPipeline(op=op, pipeline=next_pipeline))
|
||||
|
||||
return group
|
||||
|
||||
def parse_if() -> IfNode | str:
|
||||
if not consume_if_word("if"):
|
||||
return "缺少 if"
|
||||
|
||||
condition = parse_chain()
|
||||
if isinstance(condition, str):
|
||||
return condition
|
||||
|
||||
consume_if_op(";")
|
||||
if not consume_if_word("then"):
|
||||
return "if 语句缺少 then"
|
||||
|
||||
then_body = parse_script(stop_words={"else", "fi"})
|
||||
if isinstance(then_body, str):
|
||||
return then_body
|
||||
|
||||
else_body: Script | None = None
|
||||
if consume_if_word("else"):
|
||||
else_body = parse_script(stop_words={"fi"})
|
||||
if isinstance(else_body, str):
|
||||
return else_body
|
||||
|
||||
if not consume_if_word("fi"):
|
||||
return "if 语句缺少 fi"
|
||||
|
||||
return IfNode(condition=condition, then_body=then_body, else_body=else_body)
|
||||
|
||||
def parse_while() -> WhileNode | str:
|
||||
if not consume_if_word("while"):
|
||||
return "缺少 while"
|
||||
|
||||
condition = parse_chain()
|
||||
if isinstance(condition, str):
|
||||
return condition
|
||||
|
||||
consume_if_op(";")
|
||||
if not consume_if_word("do"):
|
||||
return "while 语句缺少 do"
|
||||
|
||||
body = parse_script(stop_words={"done"})
|
||||
if isinstance(body, str):
|
||||
return body
|
||||
|
||||
if not consume_if_word("done"):
|
||||
return "while 语句缺少 done"
|
||||
|
||||
return WhileNode(condition=condition, body=body)
|
||||
|
||||
def parse_statement() -> CommandGroup | IfNode | WhileNode | str:
|
||||
tok = peek()
|
||||
if tok is not None and tok.kind == TokenKind.WORD:
|
||||
if tok.value == "if":
|
||||
return parse_if()
|
||||
if tok.value == "while":
|
||||
return parse_while()
|
||||
return parse_chain()
|
||||
|
||||
def parse_script(stop_words: set[str] | None = None) -> Script | str:
|
||||
parsed = Script()
|
||||
nonlocal pos
|
||||
|
||||
while pos < len(tokens):
|
||||
tok = peek()
|
||||
if tok is None:
|
||||
break
|
||||
|
||||
if stop_words and tok.kind == TokenKind.WORD and tok.value in stop_words:
|
||||
break
|
||||
|
||||
if tok.kind == TokenKind.OP and tok.value == ";":
|
||||
consume()
|
||||
continue
|
||||
|
||||
statement = parse_statement()
|
||||
if isinstance(statement, str):
|
||||
return statement
|
||||
parsed.statements.append(statement)
|
||||
|
||||
tok = peek()
|
||||
if tok is not None and tok.kind == TokenKind.OP and tok.value == ";":
|
||||
consume()
|
||||
|
||||
return parsed
|
||||
|
||||
parsed = parse_script()
|
||||
if isinstance(parsed, str):
|
||||
return parsed
|
||||
if pos != len(tokens):
|
||||
tok = tokens[pos]
|
||||
return f"无法解析的 token: {tok.value}"
|
||||
return parsed
|
||||
|
||||
async def _execute_command(
|
||||
self,
|
||||
command: CommandNode,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
logger.debug(
|
||||
f"Executing: {command.name} args={command.args} redirects={command.redirects}"
|
||||
)
|
||||
result = await command.handler.handle(env, istream, command.args)
|
||||
|
||||
if result.code != 0:
|
||||
return result
|
||||
|
||||
if command.redirects:
|
||||
content = result.ostream or ""
|
||||
for redirect in command.redirects:
|
||||
if redirect.append:
|
||||
old_content = env.buffers.get(redirect.target, "")
|
||||
env.buffers[redirect.target] = old_content + content
|
||||
else:
|
||||
env.buffers[redirect.target] = content
|
||||
return TextHandleResult(code=0, ostream=None, attachment=result.attachment)
|
||||
|
||||
return result
|
||||
|
||||
async def _execute_pipeline(
|
||||
self,
|
||||
pipeline: PipelineNode,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
current_stream = istream
|
||||
last_result = TextHandleResult(code=0, ostream=None)
|
||||
|
||||
for command in pipeline.commands:
|
||||
try:
|
||||
last_result = await self._execute_command(command, current_stream, env)
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline execution failed at {command.name}")
|
||||
logger.exception(e)
|
||||
return TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误")
|
||||
|
||||
if last_result.code != 0:
|
||||
if pipeline.negate:
|
||||
return TextHandleResult(code=0, ostream=None)
|
||||
return last_result
|
||||
current_stream = last_result.ostream
|
||||
|
||||
if pipeline.negate:
|
||||
return TextHandleResult(code=1, ostream=None)
|
||||
return last_result
|
||||
|
||||
async def _execute_group(
|
||||
self,
|
||||
group: CommandGroup,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
last_result = TextHandleResult(code=0, ostream=None)
|
||||
|
||||
for chain in group.chains:
|
||||
should_run = True
|
||||
if chain.op == "&&":
|
||||
should_run = last_result.code == 0
|
||||
elif chain.op == "||":
|
||||
should_run = last_result.code != 0
|
||||
|
||||
if should_run:
|
||||
last_result = await self._execute_pipeline(chain.pipeline, istream, env)
|
||||
|
||||
return last_result
|
||||
|
||||
async def _execute_if(
|
||||
self,
|
||||
if_node: IfNode,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
condition_result = await self._execute_group(if_node.condition, istream, env)
|
||||
if condition_result.code == 0:
|
||||
results = await self.run_pipeline(if_node.then_body, istream, env)
|
||||
else:
|
||||
results = (
|
||||
await self.run_pipeline(if_node.else_body, istream, env)
|
||||
if if_node.else_body is not None
|
||||
else [TextHandleResult(code=0, ostream=None)]
|
||||
)
|
||||
return results[-1] if results else TextHandleResult(code=0, ostream=None)
|
||||
|
||||
async def _execute_while(
|
||||
self,
|
||||
while_node: WhileNode,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment,
|
||||
) -> TextHandleResult:
|
||||
last_result = TextHandleResult(code=0, ostream=None)
|
||||
|
||||
for _ in range(MAX_WHILE_ITERATIONS):
|
||||
condition_result = await self._execute_group(while_node.condition, istream, env)
|
||||
if condition_result.code != 0:
|
||||
return last_result
|
||||
|
||||
body_results = await self.run_pipeline(while_node.body, istream, env)
|
||||
if body_results:
|
||||
last_result = body_results[-1]
|
||||
if last_result.code != 0:
|
||||
return last_result
|
||||
|
||||
return TextHandleResult(
|
||||
code=2,
|
||||
ostream=f"while 循环超过最大迭代次数限制({MAX_WHILE_ITERATIONS})",
|
||||
)
|
||||
return pipeline
|
||||
|
||||
async def run_pipeline(
|
||||
self,
|
||||
pipeline: Script,
|
||||
pipeline: Pipeline,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment | None = None,
|
||||
) -> list[TextHandleResult]:
|
||||
@ -563,21 +293,54 @@ class PipelineRunner:
|
||||
|
||||
results: list[TextHandleResult] = []
|
||||
|
||||
for statement in pipeline.statements:
|
||||
try:
|
||||
if isinstance(statement, IfNode):
|
||||
results.append(await self._execute_if(statement, istream, env))
|
||||
elif isinstance(statement, WhileNode):
|
||||
results.append(await self._execute_while(statement, istream, env))
|
||||
else:
|
||||
results.append(await self._execute_group(statement, istream, env))
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline execution failed: {e}")
|
||||
logger.exception(e)
|
||||
results.append(
|
||||
TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误")
|
||||
)
|
||||
return results
|
||||
# 遍历执行指令组 (分号分隔),每个组独立产生输出
|
||||
for group in pipeline.command_groups:
|
||||
current_stream = istream
|
||||
group_result = TextHandleResult(code=0, ostream=None)
|
||||
|
||||
# 遍历组内指令 (管道分隔)
|
||||
for cmd in group:
|
||||
try:
|
||||
logger.debug(
|
||||
f"Executing: {cmd.handler.name} args={cmd.args} redirect={cmd.redirect_target}"
|
||||
)
|
||||
result = await cmd.handler.handle(env, current_stream, cmd.args)
|
||||
|
||||
if result.code != 0:
|
||||
# 组内出错,整条流水线中止
|
||||
results.append(result)
|
||||
return results
|
||||
|
||||
# 处理重定向逻辑
|
||||
if cmd.redirect_target:
|
||||
content_to_write = result.ostream or ""
|
||||
target_buffer = cmd.redirect_target
|
||||
|
||||
if cmd.redirect_append:
|
||||
old_content = env.buffers.get(target_buffer, "")
|
||||
env.buffers[target_buffer] = old_content + content_to_write
|
||||
else:
|
||||
env.buffers[target_buffer] = content_to_write
|
||||
|
||||
current_stream = None
|
||||
group_result = TextHandleResult(
|
||||
code=0, ostream=None, attachment=result.attachment
|
||||
)
|
||||
else:
|
||||
current_stream = result.ostream
|
||||
group_result = result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline execution failed at {cmd.handler.name}")
|
||||
logger.exception(e)
|
||||
results.append(
|
||||
TextHandleResult(
|
||||
code=-1, ostream="处理流水线时出现 python 错误"
|
||||
)
|
||||
)
|
||||
return results
|
||||
|
||||
results.append(group_result)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@ -13,8 +13,10 @@ class THEcho(TextHandler):
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
# echo 不读 stdin,只输出参数(Unix 语义)
|
||||
# 无参数时输出空行(与 Unix echo 行为一致)
|
||||
if len(args) == 0 and istream is None:
|
||||
return TextHandleResult(1, "请在 echo 后面添加需要输出的文本")
|
||||
if istream is not None:
|
||||
return TextHandleResult(0, "\n".join([istream] + args))
|
||||
return TextHandleResult(0, "\n".join(args))
|
||||
|
||||
|
||||
@ -24,6 +26,7 @@ class THCat(TextHandler):
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
# No args: pass through stdin (like Unix cat with no arguments)
|
||||
if len(args) == 0:
|
||||
if istream is None:
|
||||
return TextHandleResult(
|
||||
@ -32,6 +35,7 @@ class THCat(TextHandler):
|
||||
)
|
||||
return TextHandleResult(0, istream)
|
||||
|
||||
# Concatenate all specified sources in order
|
||||
parts: list[str] = []
|
||||
for arg in args:
|
||||
if arg == "-":
|
||||
@ -70,6 +74,7 @@ class THReplace(TextHandler):
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
# 用法: replace <pattern> <replacement> [text]
|
||||
if len(args) < 2:
|
||||
return TextHandleResult(1, "用法:replace <正则> <替换内容> [文本]")
|
||||
|
||||
@ -85,77 +90,3 @@ class THReplace(TextHandler):
|
||||
return TextHandleResult(0, res)
|
||||
except Exception as e:
|
||||
return TextHandleResult(1, f"正则错误: {str(e)}")
|
||||
|
||||
|
||||
class THTrue(TextHandler):
|
||||
name = "true"
|
||||
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
return TextHandleResult(0, istream)
|
||||
|
||||
|
||||
class THFalse(TextHandler):
|
||||
name = "false"
|
||||
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
return TextHandleResult(1, None)
|
||||
|
||||
|
||||
class THTest(TextHandler):
|
||||
name = "test"
|
||||
keywords = ["["]
|
||||
|
||||
def _bool_result(self, value: bool) -> TextHandleResult:
|
||||
return TextHandleResult(0 if value else 1, None)
|
||||
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
expr = list(args)
|
||||
|
||||
# 支持方括号语法:[ expr ] 会自动移除末尾的 ]
|
||||
if expr and expr[-1] == "]":
|
||||
expr = expr[:-1]
|
||||
|
||||
if not expr:
|
||||
return TextHandleResult(1, None)
|
||||
|
||||
if len(expr) == 1:
|
||||
return self._bool_result(len(expr[0]) > 0)
|
||||
|
||||
if len(expr) == 2:
|
||||
op, value = expr
|
||||
if op == "-n":
|
||||
return self._bool_result(len(value) > 0)
|
||||
if op == "-z":
|
||||
return self._bool_result(len(value) == 0)
|
||||
return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}")
|
||||
|
||||
if len(expr) == 3:
|
||||
left, op, right = expr
|
||||
if op == "=":
|
||||
return self._bool_result(left == right)
|
||||
if op == "!=":
|
||||
return self._bool_result(left != right)
|
||||
if op in {"-eq", "-ne", "-gt", "-ge", "-lt", "-le"}:
|
||||
try:
|
||||
li = int(left)
|
||||
ri = int(right)
|
||||
except ValueError:
|
||||
return TextHandleResult(2, "test 的数字比较参数必须是整数")
|
||||
mapping = {
|
||||
"-eq": li == ri,
|
||||
"-ne": li != ri,
|
||||
"-gt": li > ri,
|
||||
"-ge": li >= ri,
|
||||
"-lt": li < ri,
|
||||
"-le": li <= ri,
|
||||
}
|
||||
return self._bool_result(mapping[op])
|
||||
return TextHandleResult(2, f"test 不支持的操作符: {op}")
|
||||
|
||||
return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}")
|
||||
|
||||
@ -6,7 +6,7 @@ import PIL
|
||||
import PIL.Image
|
||||
import cv2
|
||||
import imageio.v3 as iio
|
||||
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, UniMessage, on_alconna
|
||||
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
|
||||
import numpy
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
@ -34,7 +34,6 @@ cmd_giftool = on_alconna(
|
||||
Option("-t", Args["length", str]),
|
||||
Option("-to", Args["end_point", str]),
|
||||
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
|
||||
Option("--pingpong"),
|
||||
)
|
||||
)
|
||||
|
||||
@ -47,7 +46,6 @@ async def _(
|
||||
length: str | None = None,
|
||||
speed_factor: float = 1.0,
|
||||
end_point: str | None = None,
|
||||
pingpong: Query[bool] = Query("pingpong"),
|
||||
):
|
||||
ss: None | float = None
|
||||
if start_point:
|
||||
@ -164,16 +162,6 @@ async def _(
|
||||
rframes = rframes[::-1]
|
||||
rdur_ms = rdur_ms[::-1]
|
||||
|
||||
# 处理 pingpong 模式
|
||||
if pingpong.available:
|
||||
# 复制一份反转的帧序列(去掉第一帧避免重复)
|
||||
pingpong_frames = rframes[1:][::-1] if len(rframes) > 1 else rframes[::-1]
|
||||
pingpong_durations = rdur_ms[1:][::-1] if len(rdur_ms) > 1 else rdur_ms[::-1]
|
||||
|
||||
# 拼接正放和倒放
|
||||
rframes = rframes + pingpong_frames
|
||||
rdur_ms = rdur_ms + pingpong_durations
|
||||
|
||||
output_img = BytesIO()
|
||||
|
||||
if rframes:
|
||||
|
||||
@ -1,210 +0,0 @@
|
||||
import copy
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters import Bot, Event, Message
|
||||
from nonebot.log import logger
|
||||
from nonebot.message import handle_event
|
||||
from nonebot.params import CommandArg
|
||||
|
||||
from konabot.common.database import DatabaseManager
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
|
||||
ROOT_PATH = Path(__file__).resolve().parent
|
||||
|
||||
cmd = on_command(cmd="语法糖", aliases={"糖", "sugar"}, block=True)
|
||||
|
||||
db_manager = DatabaseManager()
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def register_startup_hook():
|
||||
await init_db()
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def register_shutdown_hook():
|
||||
await db_manager.close_all_connections()
|
||||
|
||||
|
||||
async def init_db():
|
||||
await db_manager.execute_by_sql_file(ROOT_PATH / "sql" / "create_table.sql")
|
||||
|
||||
table_info = await db_manager.query("PRAGMA table_info(syntactic_sugar)")
|
||||
columns = {str(row.get("name")) for row in table_info}
|
||||
if "channel_id" not in columns:
|
||||
await db_manager.execute(
|
||||
"ALTER TABLE syntactic_sugar ADD COLUMN channel_id VARCHAR(255) NOT NULL DEFAULT ''"
|
||||
)
|
||||
|
||||
await db_manager.execute("DROP INDEX IF EXISTS idx_syntactic_sugar_name_belong_to")
|
||||
await db_manager.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target "
|
||||
"ON syntactic_sugar(name, channel_id, belong_to)"
|
||||
)
|
||||
|
||||
|
||||
def _extract_reply_plain_text(evt: Event) -> str:
|
||||
reply = getattr(evt, "reply", None)
|
||||
if reply is None:
|
||||
return ""
|
||||
|
||||
reply_message = getattr(reply, "message", None)
|
||||
if reply_message is None:
|
||||
return ""
|
||||
|
||||
extract_plain_text = getattr(reply_message, "extract_plain_text", None)
|
||||
if callable(extract_plain_text):
|
||||
return extract_plain_text().strip()
|
||||
return str(reply_message).strip()
|
||||
|
||||
|
||||
def _split_variables(tokens: list[str]) -> tuple[list[str], dict[str, str]]:
|
||||
positional: list[str] = []
|
||||
named: dict[str, str] = {}
|
||||
|
||||
for token in tokens:
|
||||
if "=" in token:
|
||||
key, value = token.split("=", 1)
|
||||
key = key.strip()
|
||||
if key:
|
||||
named[key] = value
|
||||
continue
|
||||
positional.append(token)
|
||||
|
||||
return positional, named
|
||||
|
||||
|
||||
def _render_template(content: str, positional: list[str], named: dict[str, str]) -> str:
|
||||
def replace(match: re.Match[str]) -> str:
|
||||
key = match.group(1).strip()
|
||||
if key.isdigit():
|
||||
idx = int(key) - 1
|
||||
if 0 <= idx < len(positional):
|
||||
return positional[idx]
|
||||
return match.group(0)
|
||||
return named.get(key, match.group(0))
|
||||
|
||||
return re.sub(r"\{([^{}]+)\}", replace, content)
|
||||
|
||||
|
||||
async def _store_sugar(name: str, content: str, belong_to: str, channel_id: str):
|
||||
await db_manager.execute_by_sql_file(
|
||||
ROOT_PATH / "sql" / "insert_sugar.sql",
|
||||
(name, content, belong_to, channel_id),
|
||||
)
|
||||
|
||||
|
||||
async def _delete_sugar(name: str, belong_to: str, channel_id: str):
|
||||
await db_manager.execute(
|
||||
"DELETE FROM syntactic_sugar WHERE name = ? AND belong_to = ? AND channel_id = ?",
|
||||
(name, belong_to, channel_id),
|
||||
)
|
||||
|
||||
|
||||
async def _find_sugar(name: str, belong_to: str, channel_id: str) -> str | None:
|
||||
rows = await db_manager.query(
|
||||
(
|
||||
"SELECT content FROM syntactic_sugar "
|
||||
"WHERE name = ? AND channel_id = ? "
|
||||
"ORDER BY CASE WHEN belong_to = ? THEN 0 ELSE 1 END, id ASC "
|
||||
"LIMIT 1"
|
||||
),
|
||||
(name, channel_id, belong_to),
|
||||
)
|
||||
if not rows:
|
||||
return None
|
||||
return rows[0].get("content")
|
||||
|
||||
|
||||
async def _reinject_command(bot: Bot, evt: Event, command_text: str) -> bool:
|
||||
depth = int(getattr(evt, "_syntactic_sugar_depth", 0))
|
||||
if depth >= 3:
|
||||
return False
|
||||
|
||||
try:
|
||||
cloned_evt = copy.deepcopy(evt)
|
||||
except Exception:
|
||||
logger.exception("语法糖克隆事件失败")
|
||||
return False
|
||||
|
||||
message = getattr(cloned_evt, "message", None)
|
||||
if message is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
msg_obj = type(message)(command_text)
|
||||
except Exception:
|
||||
msg_obj = command_text
|
||||
|
||||
setattr(cloned_evt, "message", msg_obj)
|
||||
if hasattr(cloned_evt, "original_message"):
|
||||
setattr(cloned_evt, "original_message", msg_obj)
|
||||
if hasattr(cloned_evt, "raw_message"):
|
||||
setattr(cloned_evt, "raw_message", command_text)
|
||||
|
||||
setattr(cloned_evt, "_syntactic_sugar_depth", depth + 1)
|
||||
|
||||
try:
|
||||
await handle_event(bot, cloned_evt)
|
||||
except Exception:
|
||||
logger.exception("语法糖回注事件失败")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(bot: Bot, evt: Event, target: DepLongTaskTarget, args: Message = CommandArg()):
|
||||
raw = args.extract_plain_text().strip()
|
||||
if not raw:
|
||||
return
|
||||
|
||||
tokens = raw.split()
|
||||
action = tokens[0]
|
||||
target_id = target.target_id
|
||||
channel_id = target.channel_id
|
||||
|
||||
if action == "存入":
|
||||
if len(tokens) < 2:
|
||||
await cmd.finish("请提供要存入的名称")
|
||||
name = tokens[1].strip()
|
||||
content = " ".join(tokens[2:]).strip()
|
||||
if not content:
|
||||
content = _extract_reply_plain_text(evt)
|
||||
if not content:
|
||||
await cmd.finish("请提供要存入的内容")
|
||||
|
||||
await _store_sugar(name, content, target_id, channel_id)
|
||||
await cmd.finish(f"糖已存入:「{name}」!")
|
||||
|
||||
if action == "删除":
|
||||
if len(tokens) < 2:
|
||||
await cmd.finish("请提供要删除的名称")
|
||||
name = tokens[1].strip()
|
||||
await _delete_sugar(name, target_id, channel_id)
|
||||
await cmd.finish(f"已删除糖:「{name}」!")
|
||||
|
||||
if action == "查看":
|
||||
if len(tokens) < 2:
|
||||
await cmd.finish("请提供要查看的名称")
|
||||
name = tokens[1].strip()
|
||||
content = await _find_sugar(name, target_id, channel_id)
|
||||
if content is None:
|
||||
await cmd.finish(f"没有糖:「{name}」")
|
||||
await cmd.finish(f"糖的内容:「{content}」")
|
||||
|
||||
|
||||
name = action
|
||||
content = await _find_sugar(name, target_id, channel_id)
|
||||
if content is None:
|
||||
await cmd.finish(f"没有糖:「{name}」")
|
||||
|
||||
positional, named = _split_variables(tokens[1:])
|
||||
rendered = _render_template(content, positional, named)
|
||||
|
||||
ok = await _reinject_command(bot, evt, rendered)
|
||||
if not ok:
|
||||
await cmd.finish(f"糖的展开结果:「{rendered}」")
|
||||
@ -1,12 +0,0 @@
|
||||
-- 创建语法糖表
|
||||
CREATE TABLE IF NOT EXISTS syntactic_sugar (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
belong_to VARCHAR(255) NOT NULL,
|
||||
channel_id VARCHAR(255) NOT NULL DEFAULT ''
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target
|
||||
ON syntactic_sugar(name, channel_id, belong_to);
|
||||
@ -1,5 +0,0 @@
|
||||
-- 插入语法糖,如果同一用户下名称已存在则更新内容
|
||||
INSERT INTO syntactic_sugar (name, content, belong_to, channel_id)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(name, channel_id, belong_to) DO UPDATE SET
|
||||
content = excluded.content;
|
||||
@ -1,9 +1,11 @@
|
||||
import asyncio
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import cast
|
||||
import zipfile
|
||||
|
||||
from loguru import logger
|
||||
from nonebot import on_command
|
||||
@ -13,22 +15,97 @@ from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
|
||||
from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot
|
||||
from nonebot.adapters.onebot.v11.message import Message as OB11Message
|
||||
|
||||
from konabot.common.artifact import ArtifactDepends, register_artifacts
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.path import TMP_PATH
|
||||
from konabot.common.path import BINARY_PATH, TMP_PATH
|
||||
|
||||
|
||||
arti_typst_linux = ArtifactDepends(
|
||||
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz",
|
||||
sha256="a6044cbad2a954deb921167e257e120ac0a16b20339ec01121194ff9d394996d",
|
||||
target=BINARY_PATH / "typst.tar.xz",
|
||||
required_os="Linux",
|
||||
required_arch="x86_64",
|
||||
)
|
||||
arti_typst_windows = ArtifactDepends(
|
||||
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-pc-windows-msvc.zip",
|
||||
sha256="51353994ac83218c3497052e89b2c432c53b9d4439cdc1b361e2ea4798ebfc13",
|
||||
target=BINARY_PATH / "typst.zip",
|
||||
required_os="Windows",
|
||||
required_arch="AMD64",
|
||||
)
|
||||
|
||||
|
||||
bin_path: Path | None = None
|
||||
|
||||
|
||||
@arti_typst_linux.on_finished
|
||||
async def _(downloaded: bool):
|
||||
global bin_path
|
||||
|
||||
tar_path = arti_typst_linux.target
|
||||
bin_path = tar_path.with_name("typst")
|
||||
|
||||
if downloaded or not bin_path.exists():
|
||||
bin_path.unlink(missing_ok=True)
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
"tar",
|
||||
"-xvf",
|
||||
tar_path,
|
||||
"--strip-components=1",
|
||||
"-C",
|
||||
BINARY_PATH,
|
||||
"typst-x86_64-unknown-linux-musl/typst",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode != 0 or not bin_path.exists():
|
||||
logger.warning(
|
||||
"似乎没有成功解压 Typst 二进制文件,检查一下吧! "
|
||||
f"stdout={stdout} stderr={stderr}"
|
||||
)
|
||||
else:
|
||||
os.chmod(bin_path, 0o755)
|
||||
|
||||
|
||||
@arti_typst_windows.on_finished
|
||||
async def _(downloaded: bool):
|
||||
global bin_path
|
||||
zip_path = arti_typst_windows.target
|
||||
bin_path = BINARY_PATH / "typst.exe"
|
||||
|
||||
if downloaded or not bin_path.exists():
|
||||
bin_path.unlink(missing_ok=True)
|
||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||
target_name = "typst-x86_64-pc-windows-msvc/typst.exe"
|
||||
if target_name not in zf.namelist():
|
||||
logger.warning("在 Zip 压缩包里面没有找到目标文件")
|
||||
return
|
||||
zf.extract(target_name, BINARY_PATH)
|
||||
(BINARY_PATH / target_name).rename(bin_path)
|
||||
(BINARY_PATH / "typst-x86_64-pc-windows-msvc").rmdir()
|
||||
|
||||
|
||||
register_artifacts(arti_typst_linux)
|
||||
register_artifacts(arti_typst_windows)
|
||||
|
||||
|
||||
TEMPLATE_PATH = Path(__file__).parent / "template.typ"
|
||||
TEMPLATE = TEMPLATE_PATH.read_text()
|
||||
|
||||
|
||||
def render_sync(code: str) -> bytes:
|
||||
def render_sync(code: str) -> bytes | None:
|
||||
if bin_path is None:
|
||||
return
|
||||
|
||||
with TemporaryDirectory(dir=TMP_PATH) as tmpdirname:
|
||||
temp_dir = Path(tmpdirname).resolve()
|
||||
temp_typ = temp_dir / "page.typ"
|
||||
temp_typ.write_text(TEMPLATE + "\n\n" + code)
|
||||
|
||||
cmd = [
|
||||
"typst",
|
||||
bin_path,
|
||||
"compile",
|
||||
temp_typ.name,
|
||||
"--format",
|
||||
@ -61,7 +138,7 @@ def render_sync(code: str) -> bytes:
|
||||
return result_png.read_bytes()
|
||||
|
||||
|
||||
async def render(code: str) -> bytes:
|
||||
async def render(code: str) -> bytes | None:
|
||||
task = asyncio.to_thread(lambda: render_sync(code))
|
||||
return await task
|
||||
|
||||
@ -70,7 +147,15 @@ cmd = on_command("typst")
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget):
|
||||
async def _(
|
||||
evt: Event,
|
||||
bot: Bot,
|
||||
msg: UniMsg,
|
||||
target: DepLongTaskTarget,
|
||||
):
|
||||
if bin_path is None:
|
||||
return
|
||||
|
||||
typst_code = ""
|
||||
if isinstance(evt, OB11MessageEvent):
|
||||
if evt.reply is not None:
|
||||
@ -92,6 +177,8 @@ async def _(evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget):
|
||||
|
||||
try:
|
||||
res = await render(typst_code)
|
||||
if res is None:
|
||||
raise FileNotFoundError("没有渲染出来内容")
|
||||
except FileNotFoundError as e:
|
||||
await target.send_message("渲染出错:内部错误")
|
||||
raise e from e
|
||||
|
||||
@ -86,67 +86,3 @@ def test_prase_input_args_parses_resize_second_argument_as_float():
|
||||
assert len(filters) == 1
|
||||
assert filters[0].name == "缩放"
|
||||
assert filters[0].args == [2.0, 3.0]
|
||||
|
||||
|
||||
def test_apply_pixel_sort_keeps_image_mode_and_size():
|
||||
"""测试 Pixel Sort 保持图像的 mode 和 size"""
|
||||
image = Image.new("RGBA", (10, 10), (255, 0, 0, 128))
|
||||
|
||||
result = ImageFilterImplement.apply_pixel_sort(image)
|
||||
|
||||
assert result.size == image.size
|
||||
assert result.mode == "RGBA"
|
||||
|
||||
|
||||
def test_apply_pixel_sort_horizontal():
|
||||
"""测试水平方向的 Pixel Sort"""
|
||||
# 创建一个简单的渐变图像
|
||||
image = Image.new("RGB", (5, 3))
|
||||
# 第一行:红到蓝渐变
|
||||
image.putpixel((0, 0), (255, 0, 0))
|
||||
image.putpixel((1, 0), (200, 0, 0))
|
||||
image.putpixel((2, 0), (100, 0, 0))
|
||||
image.putpixel((3, 0), (50, 0, 0))
|
||||
image.putpixel((4, 0), (0, 0, 255))
|
||||
# 填充其他行
|
||||
for y in range(1, 3):
|
||||
for x in range(5):
|
||||
image.putpixel((x, y), (128, 128, 128))
|
||||
|
||||
result = ImageFilterImplement.apply_pixel_sort(
|
||||
image, direction="horizontal", auto_threshold=False, mask_threshold=10
|
||||
)
|
||||
|
||||
assert result.size == image.size
|
||||
assert result.mode == "RGBA"
|
||||
|
||||
|
||||
def test_apply_pixel_sort_vertical():
|
||||
"""测试垂直方向的 Pixel Sort"""
|
||||
image = Image.new("RGB", (3, 5))
|
||||
# 第一列:绿到红渐变
|
||||
image.putpixel((0, 0), (0, 255, 0))
|
||||
image.putpixel((0, 1), (0, 200, 0))
|
||||
image.putpixel((0, 2), (0, 100, 0))
|
||||
image.putpixel((0, 3), (0, 50, 0))
|
||||
image.putpixel((0, 4), (255, 0, 0))
|
||||
# 填充其他列
|
||||
for y in range(5):
|
||||
for x in range(1, 3):
|
||||
image.putpixel((x, y), (128, 128, 128))
|
||||
|
||||
result = ImageFilterImplement.apply_pixel_sort(
|
||||
image, direction="vertical", auto_threshold=False, mask_threshold=10
|
||||
)
|
||||
|
||||
assert result.size == image.size
|
||||
assert result.mode == "RGBA"
|
||||
|
||||
|
||||
def test_prase_input_args_parses_pixel_sort_arguments():
|
||||
"""测试解析 Pixel Sort 参数"""
|
||||
filters = prase_input_args("像素排序 horizontal 0 false brightness 128 false 1")
|
||||
|
||||
assert len(filters) == 1
|
||||
assert filters[0].name == "像素排序"
|
||||
assert filters[0].args == ["horizontal", 0.0, False, "brightness", 128.0, False, 1]
|
||||
|
||||
@ -1,75 +0,0 @@
|
||||
import nonebot
|
||||
|
||||
nonebot.init()
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
from konabot.plugins.handle_text.__init__ import (
|
||||
_get_textfx_user_key,
|
||||
_textfx_running_users,
|
||||
TEXTFX_MAX_RUNTIME_SECONDS,
|
||||
)
|
||||
from konabot.plugins.handle_text.base import PipelineRunner
|
||||
|
||||
|
||||
class DummyEvent:
|
||||
def __init__(self, self_id=None, user_id=None, group_id=None, session_id=None):
|
||||
self.self_id = self_id
|
||||
self.user_id = user_id
|
||||
self.group_id = group_id
|
||||
self._session_id = session_id
|
||||
|
||||
def get_session_id(self):
|
||||
if self._session_id is None:
|
||||
raise RuntimeError('no session')
|
||||
return self._session_id
|
||||
|
||||
|
||||
def test_textfx_user_key_group():
|
||||
evt = DummyEvent(self_id='123', user_id='456', group_id='789')
|
||||
assert _get_textfx_user_key(evt) == '123:789:456'
|
||||
|
||||
|
||||
def test_textfx_user_key_private():
|
||||
evt = DummyEvent(self_id='123', user_id='456')
|
||||
assert _get_textfx_user_key(evt) == '123:private:456'
|
||||
|
||||
|
||||
def test_textfx_user_key_session_fallback():
|
||||
evt = DummyEvent(session_id='console:alice')
|
||||
assert _get_textfx_user_key(evt) == 'session:console:alice'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_textfx_timeout_limit():
|
||||
"""测试脚本执行超时限制"""
|
||||
runner = PipelineRunner.get_runner()
|
||||
|
||||
# 创建一个会超时的脚本(while true 会触发迭代限制,但我们用 sleep 模拟长时间运行)
|
||||
# 由于实际超时是 60 秒,我们不能真的等那么久,所以这个测试验证超时机制存在
|
||||
script = "echo start"
|
||||
parsed = runner.parse_pipeline(script)
|
||||
assert not isinstance(parsed, str), "脚本解析应该成功"
|
||||
|
||||
# 验证 TEXTFX_MAX_RUNTIME_SECONDS 常量存在且合理
|
||||
assert TEXTFX_MAX_RUNTIME_SECONDS == 60
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_textfx_concurrent_limit():
|
||||
"""测试同一用户并发执行限制"""
|
||||
user_key = "test:group:user123"
|
||||
|
||||
# 清理可能的残留状态
|
||||
_textfx_running_users.discard(user_key)
|
||||
|
||||
# 模拟第一个脚本正在运行
|
||||
assert user_key not in _textfx_running_users
|
||||
_textfx_running_users.add(user_key)
|
||||
|
||||
# 验证用户已被标记为运行中
|
||||
assert user_key in _textfx_running_users
|
||||
|
||||
# 清理
|
||||
_textfx_running_users.discard(user_key)
|
||||
assert user_key not in _textfx_running_users
|
||||
@ -1,225 +0,0 @@
|
||||
import pytest
|
||||
import nonebot
|
||||
|
||||
nonebot.init()
|
||||
|
||||
from konabot.plugins.handle_text.base import IfNode, PipelineRunner, TextHandlerEnvironment, WhileNode
|
||||
from konabot.plugins.handle_text.handlers.encoding_handlers import THReverse
|
||||
from konabot.plugins.handle_text.handlers.unix_handlers import (
|
||||
THCat,
|
||||
THEcho,
|
||||
THFalse,
|
||||
THRm,
|
||||
THTest,
|
||||
THTrue,
|
||||
)
|
||||
from konabot.plugins.handle_text.handlers.whitespace_handlers import THTrim
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def runner() -> PipelineRunner:
|
||||
runner = PipelineRunner()
|
||||
runner.register(THEcho())
|
||||
runner.register(THCat())
|
||||
runner.register(THRm())
|
||||
runner.register(THTrue())
|
||||
runner.register(THFalse())
|
||||
runner.register(THTest())
|
||||
runner.register(THReverse())
|
||||
runner.register(THTrim())
|
||||
return runner
|
||||
|
||||
|
||||
def test_parse_pipeline_shell_ops(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello | reverse && test a = a || echo no; echo done > out')
|
||||
assert not isinstance(parsed, str)
|
||||
assert len(parsed.statements) == 2
|
||||
first = parsed.statements[0]
|
||||
second = parsed.statements[1]
|
||||
assert not isinstance(first, IfNode)
|
||||
assert not isinstance(first, WhileNode)
|
||||
assert not isinstance(second, IfNode)
|
||||
assert not isinstance(second, WhileNode)
|
||||
assert len(first.chains) == 3
|
||||
assert first.chains[0].pipeline.commands[0].name == 'echo'
|
||||
assert first.chains[0].pipeline.commands[1].name == 'reverse'
|
||||
assert second.chains[0].pipeline.commands[0].redirects[0].target == 'out'
|
||||
|
||||
|
||||
def test_parse_if_statement(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('if test a = a; then echo yes; else echo no; fi')
|
||||
assert not isinstance(parsed, str)
|
||||
assert len(parsed.statements) == 1
|
||||
stmt = parsed.statements[0]
|
||||
assert isinstance(stmt, IfNode)
|
||||
assert stmt.else_body is not None
|
||||
assert len(stmt.then_body.statements) == 1
|
||||
|
||||
|
||||
def test_parse_while_statement(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while false; do echo yes; done')
|
||||
assert not isinstance(parsed, str)
|
||||
assert len(parsed.statements) == 1
|
||||
stmt = parsed.statements[0]
|
||||
assert isinstance(stmt, WhileNode)
|
||||
assert len(stmt.body.statements) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_pipe(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello | reverse')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert len(results) == 1
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'olleh'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_redirect_and_cat(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello > a; cat a')
|
||||
assert not isinstance(parsed, str)
|
||||
env = TextHandlerEnvironment(False)
|
||||
results = await runner.run_pipeline(parsed, None, env)
|
||||
assert env.buffers['a'] == 'hello'
|
||||
assert results[-1].ostream == 'hello'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_redirect(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello > a; echo world >> a; cat a')
|
||||
assert not isinstance(parsed, str)
|
||||
env = TextHandlerEnvironment(False)
|
||||
results = await runner.run_pipeline(parsed, None, env)
|
||||
assert env.buffers['a'] == 'helloworld'
|
||||
assert results[-1].ostream == 'helloworld'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_and_or_short_circuit(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('test a = b && echo bad || echo ok')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert len(results) == 1
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'ok'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_test_bracket_alias(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('[ 2 -gt 1 ] && echo yes')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'yes'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_test_string_ops(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('test -n abc && echo yes; test -z abc || echo no')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert [r.ostream for r in results] == ['yes', 'no']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_quote_and_trim(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo " hello world " | trim')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'hello world'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_if_then_else(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('if test a = b; then echo yes; else echo no; fi')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'no'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_if_then_without_else(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('if test a = a; then echo yes; fi')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'yes'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nested_if(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline(
|
||||
'if test a = a; then if test b = c; then echo x; else echo y; fi; else echo z; fi'
|
||||
)
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'y'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_negate_pipeline(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('! test a = b && echo ok')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'ok'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_true_false(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('true && echo yes; false || echo no')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert [r.ostream for r in results] == ['yes', 'no']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_false_noop(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while false; do echo yes; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_limit_guard(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while true; do echo yes; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 2
|
||||
assert 'while 循环超过最大迭代次数限制' in (results[0].ostream or '')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_with_immediate_break_condition(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while ! false; do false; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_body_can_use_if(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while ! false; do if true; then false; fi; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_echo_empty_string(runner: PipelineRunner):
|
||||
"""测试 echo 空字符串"""
|
||||
# 双引号空字符串
|
||||
parsed = runner.parse_pipeline('echo ""')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == ''
|
||||
|
||||
# 单引号空字符串
|
||||
parsed2 = runner.parse_pipeline("echo ''")
|
||||
assert not isinstance(parsed2, str)
|
||||
results2 = await runner.run_pipeline(parsed2, None, TextHandlerEnvironment(False))
|
||||
assert results2[0].code == 0
|
||||
assert results2[0].ostream == ''
|
||||
Reference in New Issue
Block a user