Compare commits

..

2 Commits

Author SHA1 Message Date
6a2fe11753 不再直接在 Dockerfile 里面下载构建产物 2026-03-18 17:29:34 +08:00
97e87c7ec3 添加 Typst 的二进制文件下载 2026-03-18 17:26:36 +08:00
19 changed files with 383 additions and 1471 deletions

View File

@ -1,5 +1,3 @@
{
"python.REPL.enableREPLSmartSend": false,
"python-envs.defaultEnvManager": "ms-python.python:poetry",
"python-envs.defaultPackageManager": "ms-python.python:poetry"
"python.REPL.enableREPLSmartSend": false
}

View File

@ -1,16 +1,3 @@
FROM alpine:latest AS artifacts
RUN apk add --no-cache curl xz
WORKDIR /tmp
RUN mkdir -p /artifacts
RUN curl -L -o typst.tar.xz "https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz" \
&& tar -xJf typst.tar.xz \
&& mv typst-x86_64-unknown-linux-musl/typst /artifacts
RUN chmod -R +x /artifacts/
FROM python:3.13-slim AS base
ENV VIRTUAL_ENV=/app/.venv \
@ -51,7 +38,6 @@ RUN uv sync --no-install-project
FROM base AS runtime
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
COPY --from=artifacts /artifacts/ /usr/local/bin/
WORKDIR /app

View File

@ -1,9 +1,10 @@
import asyncio
from typing import Any, Awaitable, Callable
import aiohttp
import hashlib
import platform
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
import nonebot
@ -14,6 +15,8 @@ from pydantic import BaseModel
@dataclass
class ArtifactDepends:
_Callback = Callable[[bool], Awaitable[Any]]
url: str
sha256: str
target: Path
@ -27,6 +30,9 @@ class ArtifactDepends:
use_proxy: bool = True
"网络问题,赫赫;使用的是 Discord 模块配置的 proxy"
callbacks: list[_Callback] = field(default_factory=list)
"在任务完成以后,应该做的事情"
def is_corresponding_platform(self) -> bool:
if self.required_os is not None:
if self.required_os.lower() != platform.system().lower():
@ -36,26 +42,43 @@ class ArtifactDepends:
return False
return True
def on_finished(self, task: _Callback) -> _Callback:
self.callbacks.append(task)
return task
async def _finished(self, downloaded: bool) -> list[Any | BaseException]:
tasks = set()
for f in self.callbacks:
tasks.add(f(downloaded))
return await asyncio.gather(*tasks, return_exceptions=True)
class Config(BaseModel):
prefetch_artifact: bool = False
"是否提前下载好二进制依赖"
artifact_list = []
artifact_list: list[ArtifactDepends] = []
driver = nonebot.get_driver()
config = nonebot.get_plugin_config(Config)
@driver.on_startup
async def _():
if config.prefetch_artifact:
logger.info("启动检测中:正在检测需求的二进制是否下载")
semaphore = asyncio.Semaphore(10)
async def _task(artifact: ArtifactDepends):
async with semaphore:
await ensure_artifact(artifact)
downloaded = await ensure_artifact(artifact)
result = await artifact._finished(downloaded)
for r in result:
if isinstance(r, BaseException):
logger.warning("完成了二进制文件的下载,但是有未捕捉的错误")
logger.exception(r)
tasks: set[asyncio.Task] = set()
for a in artifact_list:
@ -78,35 +101,43 @@ async def download_artifact(artifact: ArtifactDepends):
async with aiohttp.ClientSession(proxy=proxy) as client:
result = await client.get(artifact.url)
if result.status != 200:
logger.warning(f"已经下载了二进制,但是注意服务器没有返回 200 URL={artifact.url} TARGET={artifact.target} CODE={result.status}")
logger.warning(
f"已经下载了二进制,但是注意服务器没有返回 200 URL={artifact.url} TARGET={artifact.target} CODE={result.status}"
)
data = await result.read()
artifact.target.write_bytes(data)
if not platform.system().lower() == 'windows':
if not platform.system().lower() == "windows":
artifact.target.chmod(0o755)
logger.info(f"下载好了 TARGET={artifact.target} URL={artifact.url}")
m = hashlib.sha256(artifact.target.read_bytes())
if m.hexdigest().lower() != artifact.sha256.lower():
logger.warning(f"下载到的二进制的 sha256 与需求不同 TARGET={artifact.target} REQUESTED={artifact.sha256} ACTUAL={m.hexdigest()}")
logger.warning(
f"下载到的二进制的 sha256 与需求不同 TARGET={artifact.target} REQUESTED={artifact.sha256} ACTUAL={m.hexdigest()}"
)
async def ensure_artifact(artifact: ArtifactDepends):
async def ensure_artifact(artifact: ArtifactDepends) -> bool:
if not artifact.is_corresponding_platform():
return
return False
if not artifact.target.exists():
logger.info(f"二进制依赖 {artifact.target} 不存在")
if not artifact.target.parent.exists():
artifact.target.parent.mkdir(parents=True, exist_ok=True)
await download_artifact(artifact)
return True
else:
m = hashlib.sha256(artifact.target.read_bytes())
if m.hexdigest().lower() != artifact.sha256.lower():
logger.info(f"二进制依赖 {artifact.target} 的哈希无法对应需求的哈希,准备重新下载")
logger.info(
f"二进制依赖 {artifact.target} 的哈希无法对应需求的哈希,准备重新下载"
)
artifact.target.unlink()
await download_artifact(artifact)
return True
return False
def register_artifacts(*artifacts: ArtifactDepends):
artifact_list.extend(artifacts)

View File

@ -79,14 +79,6 @@ fx [滤镜名称] <参数1> <参数2> ...
* ```fx JPEG损坏 <质量=10>```
* 质量范围建议为 1~95数值越低压缩痕迹越重、效果越搞笑。
* ```fx 动图 <帧率=10>```
* ```fx 像素排序 <方向=horizontal> <阈值=0> <自动阈值=true> <排序依据=brightness> <遮罩阈值=128> <反向=false> <块大小=1>```
* 对像素按指定属性进行排序,效果类似 Photoshop/GIMP Pixel Sort。
* **方向**horizontal水平/ vertical垂直
* **排序依据**brightness亮度/ hue色相/ red / green / blue
* **自动阈值**true 时使用图像亮度中位数作为遮罩阈值
* **遮罩阈值**:决定哪些像素参与排序(亮度 >= 阈值)
* **反向**true 时从亮到暗排序
* **块大小**:每 N 行/列作为一个整体排序单位
### 多图像处理器
* ```fx 存入图像 <目标名称>```

View File

@ -71,14 +71,6 @@ giftool [图片] [选项]
- 调整 GIF 图的速度。若为负数,则代表倒放。
### `--pingpong`(可选)
- 开启乒乓模式,生成正放-倒放拼接的 GIF 图。
- 即播放完正向后,会倒放回去,形成往复循环效果。
- 可与 `--speed` 配合使用,调整播放速度。
- 示例:`giftool [图片] --pingpong`
- 示例:`giftool [图片] --pingpong --speed 2.0`
## 使用方式
1. 发送指令前,请确保:

View File

@ -31,16 +31,7 @@
- 用 `|` 连接多个操作,前一个的输出自动作为后一个的输入。
- 用 `;` 分隔多条独立指令,它们各自产生输出,最终合并显示。
- 用 `&&` / `||` 做最小 shell 风格条件执行:
- `cmd1 && cmd2`:仅当 `cmd1` 成功时执行 `cmd2`
- `cmd1 || cmd2`:仅当 `cmd1` 失败时执行 `cmd2`
- 用 `!` 对一条 pipeline 的成功/失败取反。
- 支持最小 bash-like `if ... then ... else ... fi` 语句。
- 支持最小 bash-like `while ... do ... done` 循环。
- 可使用内建真假命令:`true` / `false`。
- 为避免滥用与卡死:
- 同一用户同时只能运行 **一个** textfx 脚本
- 单个脚本最长执行时间为 **60 秒**
- 用 `>` `>>` 把结果保存起来(见下文),被重定向的指令不会产生输出。
**例子**:把"HELLO"先反转,再转成摩斯电码:(转换为摩斯电码功能暂未实现)
```
@ -48,36 +39,6 @@ textfx reverse HELLO | morse en
```
→ 输出:`--- .-.. .-.. . ....`
**例子**:失败后兜底执行:
```
textfx test a = b || echo 不相等
```
→ 输出:`不相等`
**例子**:成功后继续执行:
```
textfx [ 2 -gt 1 ] && echo 条件成立
```
→ 输出:`条件成立`
**例子**:真正的 if 语句:
```
textfx if test a = b; then echo yes; else echo no; fi
```
→ 输出:`no`
**例子**:对条件取反:
```
textfx ! test a = b && echo 条件不成立
```
→ 输出:`条件不成立`
**例子**while 循环:
```
textfx while false; do echo 不会执行; done
```
→ 输出为空
**例子**:多条指令各自输出:
```
textfx echo 你好; echo 世界
@ -171,51 +132,6 @@ Base64 编码或解码。
> 缓存仅在当前对话中有效,重启后清空。
### true / false / test / [
最小 shell 风格条件命令。通常配合 `if`、`&&`、`||`、`!` 使用。
支持:
- `true`:总是成功
- `false`:总是失败
- 字符串非空:`test foo`
- `-n` / `-z``test -n foo`、`test -z ""`
- 字符串比较:`test a = a`、`test a != b`
- 整数比较:`test 2 -gt 1`、`test 3 -le 5`
- 方括号别名:`[ 2 -gt 1 ]`
示例:
- `/textfx true && echo 一定执行`
- `/textfx false || echo 兜底执行`
- `/textfx test hello && echo 有内容`
- `/textfx test a = b || echo 不相等`
- `/textfx [ 3 -ge 2 ] && echo yes`
### if / then / else / fi
支持最小 bash-like 条件语句。
示例:
- `/textfx if test a = a; then echo yes; else echo no; fi`
- `/textfx if [ 2 -gt 1 ]; then echo 成立; fi`
- `/textfx if test a = a; then if test b = c; then echo x; else echo y; fi; fi`
说明:
- `if` 后面跟一个条件链,可配合 `test`、`[`、`!`、`&&`、`||`
- `then` 和 `else` 后面都可以写多条以 `;` 分隔的 textfx 语句
- `else` 可省略
### while / do / done
支持最小 bash-like 循环语句。
示例:
- `/textfx while false; do echo 不会执行; done`
- `/textfx while ! false; do false; done`
- `/textfx while ! false; do if true; then false; fi; done`
说明:
- `while` 后面跟一个条件链,返回成功就继续循环
- `do` 后面可写多条以 `;` 分隔的 textfx 语句
- 为避免 bot 死循环,内置最大循环次数限制;超限会报错
### replace或 替换、sed
替换文字(支持正则表达式)。
示例(普通):`/textfx replace 世界 宇宙 你好世界` → `你好宇宙`

View File

@ -1354,140 +1354,6 @@ class ImageFilterImplement:
images.append(text_image)
return image
# Pixel Sort - 像素排序效果
@staticmethod
def apply_pixel_sort(
image: Image.Image,
direction: str = "horizontal",
threshold: float = 0.0,
auto_threshold: bool = True,
sort_by: str = "brightness",
mask_threshold: float = 128.0,
reverse: bool = False,
block_size: int = 1
) -> Image.Image:
"""
Pixel Sort 效果
参数:
image: 输入图像
direction: 排序方向,"horizontal"(水平) 或 "vertical"(垂直)
threshold: 亮度阈值 (0-255),低于此值的像素会被排序(仅在 auto_threshold=False 时生效)
auto_threshold: 是否自动计算阈值(使用图像中位数)
sort_by: 排序依据,"brightness"(亮度)、"hue"(色相)、"red""green""blue"
mask_threshold: 遮罩阈值 (0-255),决定哪些像素参与排序
reverse: 是否反向排序
block_size: 块大小,每 N 行/列作为一个整体排序单位
"""
if image.mode != 'RGBA':
image = image.convert('RGBA')
arr = np.array(image)
height, width = arr.shape[:2]
# 获取排序属性
def get_sort_value(pixel):
r, g, b = pixel[0], pixel[1], pixel[2]
if sort_by == "brightness":
return 0.299 * r + 0.587 * g + 0.114 * b
elif sort_by == "hue":
max_c = max(r, g, b)
min_c = min(r, g, b)
diff = max_c - min_c
if diff == 0:
return 0
if max_c == r:
return 60 * (((g - b) / diff) % 6)
elif max_c == g:
return 60 * ((b - r) / diff + 2)
else:
return 60 * ((r - g) / diff + 4)
elif sort_by == "red":
return r
elif sort_by == "green":
return g
elif sort_by == "blue":
return b
return 0.299 * r + 0.587 * g + 0.114 * b
# 自动计算阈值
if auto_threshold:
# 使用图像亮度中位数作为阈值
gray = np.array(image.convert('L'))
mask_threshold = float(np.median(gray))
# 创建遮罩:哪些像素需要排序
mask = np.zeros((height, width), dtype=bool)
for y in range(height):
for x in range(width):
brightness = 0.299 * arr[y, x, 0] + 0.587 * arr[y, x, 1] + 0.114 * arr[y, x, 2]
mask[y, x] = brightness >= mask_threshold
result = arr.copy()
if direction.lower() in ["horizontal", "h", "水平"]:
# 水平排序(逐行)
for y in range(height):
# 收集当前行中需要排序的像素
if block_size > 1:
# 按块处理
for block_start in range(0, width, block_size):
block_end = min(block_start + block_size, width)
pixels = []
indices = []
for x in range(block_start, block_end):
if mask[y, x]:
pixels.append(arr[y, x].copy())
indices.append(x)
if len(pixels) > 1:
# 按指定属性排序
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
for i, x in enumerate(indices):
result[y, x] = sorted_pixels[i]
else:
# 逐像素处理
pixels = []
indices = []
for x in range(width):
if mask[y, x]:
pixels.append(arr[y, x].copy())
indices.append(x)
if len(pixels) > 1:
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
for i, x in enumerate(indices):
result[y, x] = sorted_pixels[i]
elif direction.lower() in ["vertical", "v", "垂直"]:
# 垂直排序(逐列)
for x in range(width):
if block_size > 1:
# 按块处理
for block_start in range(0, height, block_size):
block_end = min(block_start + block_size, height)
pixels = []
indices = []
for y in range(block_start, block_end):
if mask[y, x]:
pixels.append(arr[y, x].copy())
indices.append(y)
if len(pixels) > 1:
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
for i, y in enumerate(indices):
result[y, x] = sorted_pixels[i]
else:
pixels = []
indices = []
for y in range(height):
if mask[y, x]:
pixels.append(arr[y, x].copy())
indices.append(y)
if len(pixels) > 1:
sorted_pixels = sorted(pixels, key=get_sort_value, reverse=reverse)
for i, y in enumerate(indices):
result[y, x] = sorted_pixels[i]
return Image.fromarray(result, 'RGBA')
class ImageFilterEmpty:

View File

@ -65,8 +65,6 @@ class ImageFilterManager:
"覆盖图像": ImageFilterImplement.apply_overlay,
# 生成式
"覆加颜色": ImageFilterImplement.generate_solid,
# Pixel Sort
"像素排序": ImageFilterImplement.apply_pixel_sort,
}
generate_filter_map = {

View File

@ -1,5 +1,4 @@
from typing import cast
import asyncio
from loguru import logger
from nonebot import on_command
import nonebot
@ -32,11 +31,8 @@ from konabot.plugins.handle_text.handlers.random_handlers import THShuffle, THSo
from konabot.plugins.handle_text.handlers.unix_handlers import (
THCat,
THEcho,
THFalse,
THReplace,
THRm,
THTest,
THTrue,
)
from konabot.plugins.handle_text.handlers.whitespace_handlers import (
THLines,
@ -47,37 +43,11 @@ from konabot.plugins.handle_text.handlers.whitespace_handlers import (
)
TEXTFX_MAX_RUNTIME_SECONDS = 60
_textfx_running_users: set[str] = set()
def _get_textfx_user_key(evt: Event) -> str:
user_id = getattr(evt, "user_id", None)
self_id = getattr(evt, "self_id", None)
group_id = getattr(evt, "group_id", None)
if user_id is not None:
if group_id is not None:
return f"{self_id}:{group_id}:{user_id}"
return f"{self_id}:private:{user_id}"
session_id = getattr(evt, "get_session_id", None)
if callable(session_id):
try:
return f"session:{evt.get_session_id()}"
except Exception:
pass
return f"event:{evt.__class__.__name__}:{id(evt)}"
cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"})
@cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
user_key = _get_textfx_user_key(evt)
if user_key in _textfx_running_users:
await target.send_message("你当前已有一个 textfx 脚本正在运行,请等待它结束后再试。")
return
istream = ""
if isinstance(evt, OB11MessageEvent):
if evt.reply is not None:
@ -101,22 +71,9 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
return
env = TextHandlerEnvironment(is_trusted=False, event=evt)
results = await runner.run_pipeline(res, istream or None, env)
_textfx_running_users.add(user_key)
try:
results = await asyncio.wait_for(
runner.run_pipeline(res, istream or None, env),
timeout=TEXTFX_MAX_RUNTIME_SECONDS,
)
except asyncio.TimeoutError:
rendered = await render_error_message(
f"处理指令时出现问题:脚本执行超时(超过 {TEXTFX_MAX_RUNTIME_SECONDS} 秒)"
)
await target.send_message(rendered)
return
finally:
_textfx_running_users.discard(user_key)
# 检查是否有错误
for r in results:
if r.code != 0:
message = f"处理指令时出现问题:{r.ostream}"
@ -124,6 +81,7 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
await target.send_message(rendered)
return
# 收集所有组的文本输出和附件
ostreams = [r.ostream for r in results if r.ostream is not None]
attachments = [r.attachment for r in results if r.attachment is not None]
@ -150,9 +108,6 @@ async def _():
THCat(),
THEcho(),
THRm(),
THTrue(),
THFalse(),
THTest(),
THShuffle(),
THReplace(),
THBase64(),

View File

@ -1,16 +1,15 @@
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from string import whitespace
from typing import cast
from loguru import logger
from nonebot.adapters import Event
MAX_WHILE_ITERATIONS = 100
@dataclass
class TextHandlerEnvironment:
is_trusted: bool
@ -54,63 +53,29 @@ class TextHandlerSync(TextHandler):
@dataclass
class Redirect:
target: str
append: bool = False
@dataclass
class CommandNode:
name: str
class PipelineCommand:
handler: TextHandler
args: list[str]
redirects: list[Redirect] = field(default_factory=list)
# 新增重定向目标buffer key
redirect_target: str | None = None
# 新增:是否为追加模式 (>>)
redirect_append: bool = False
@dataclass
class PipelineNode:
commands: list[CommandNode] = field(default_factory=list)
negate: bool = False
class Pipeline:
command_groups: list[list[PipelineCommand]] = field(default_factory=list)
"一个列表的列表,每一组之间的指令之间使用管道符连接,而不同组之间不会有数据流"
@dataclass
class ConditionalPipeline:
op: str | None
pipeline: PipelineNode
class PipelineParseStatus(Enum):
normal = 0
in_string = 1
in_string_to_escape = 2
off_string = 3
@dataclass
class CommandGroup:
chains: list[ConditionalPipeline] = field(default_factory=list)
@dataclass
class IfNode:
condition: CommandGroup
then_body: "Script"
else_body: "Script | None" = None
@dataclass
class WhileNode:
condition: CommandGroup
body: "Script"
@dataclass
class Script:
statements: list[CommandGroup | IfNode | WhileNode] = field(default_factory=list)
class TokenKind(Enum):
WORD = "word"
OP = "op"
@dataclass
class Token:
kind: TokenKind
value: str
whitespaces = whitespace + ""
class PipelineRunner:
@ -128,433 +93,198 @@ class PipelineRunner:
def register(self, handler: TextHandler):
self.handlers.append(handler)
def _resolve_handler(self, cmd_name: str) -> TextHandler | str:
matched = [
h for h in self.handlers if cmd_name == h.name or cmd_name in h.keywords
]
if not matched:
return f"不存在名为 {cmd_name} 的函数"
if len(matched) > 1:
logger.warning(
f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}"
def parse_pipeline(self, script: str) -> Pipeline | str:
pipeline = Pipeline()
# 当前正在构建的上下文
current_group: list[PipelineCommand] = []
current_command_args: list[str] = []
# 字符串解析状态
status = PipelineParseStatus.normal
current_string = ""
current_string_raw = ""
status_in_string_pair = ""
has_token = False # 是否正在构建一个 token区分空字符串和无 token
# 重定向解析状态
is_parsing_redirect_filename = False
current_redirect_target: str | None = None
current_redirect_append = False
# 辅助函数:将当前解析到的字符串 flush 到 参数列表 或 重定向目标
def _flush_token():
nonlocal \
current_string, \
current_string_raw, \
is_parsing_redirect_filename, \
current_redirect_target, \
has_token
if not has_token:
return
if is_parsing_redirect_filename:
current_redirect_target = current_string
is_parsing_redirect_filename = False # 重定向文件名只取一个 token
else:
current_command_args.append(current_string)
current_string = ""
current_string_raw = ""
has_token = False
# 辅助函数:将当前指令 flush 到当前组
def _flush_command() -> str | None:
nonlocal \
current_command_args, \
current_redirect_target, \
current_redirect_append
if not current_command_args:
return None
cmd_name = current_command_args[0]
args = current_command_args[1:]
matched = [
h for h in self.handlers if cmd_name in h.keywords or cmd_name == h.name
]
if not matched:
return f"不存在名为 {cmd_name} 的函数"
if len(matched) > 1:
logger.warning(
f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}"
)
cmd = PipelineCommand(
handler=matched[0],
args=args,
redirect_target=current_redirect_target,
redirect_append=current_redirect_append,
)
return matched[0]
current_group.append(cmd)
def tokenize(self, script: str) -> list[Token] | str:
tokens: list[Token] = []
buf = ""
quote: str | None = None
escape = False
# 重置指令级状态
current_command_args = []
current_redirect_target = None
current_redirect_append = False
return None
# 使用索引遍历以支持 look-ahead (处理 >>)
i = 0
operators = {"|", ";", ">", "&&", "||", ">>", "!"}
escape_map = {
"n": "\n",
"r": "\r",
"t": "\t",
"0": "\0",
"a": "\a",
"b": "\b",
"f": "\f",
"v": "\v",
"\\": "\\",
'"': '"',
"'": "'",
}
length = len(script)
def flush_word(force: bool = False):
nonlocal buf
if buf or force:
tokens.append(Token(TokenKind.WORD, buf))
buf = ""
while i < len(script):
while i < length:
c = script[i]
if quote is not None:
if escape:
buf += escape_map.get(c, c)
escape = False
elif c == "\\":
escape = True
elif c == quote:
quote = None
flush_word(force=True) # 引号闭合时强制 flush即使是空字符串
else:
buf += c
i += 1
continue
match status:
case PipelineParseStatus.normal:
if c in whitespaces:
_flush_token()
if c in "'\"":
quote = c
i += 1
continue
elif c in "'\"":
status_in_string_pair = c
status = PipelineParseStatus.in_string
current_string_raw = ""
has_token = True
if c.isspace():
flush_word()
i += 1
continue
elif c == "|":
_flush_token()
if err := _flush_command():
return err
# 管道符不结束 group继续在 current_group 添加
two = script[i : i + 2]
if two in operators:
flush_word()
tokens.append(Token(TokenKind.OP, two))
i += 2
continue
elif c == ";":
_flush_token()
if err := _flush_command():
return err
# 分号结束 group
if current_group:
pipeline.command_groups.append(current_group)
current_group = []
if c in {"|", ";", ">", "!"}:
flush_word()
tokens.append(Token(TokenKind.OP, c))
i += 1
continue
elif c == ">":
_flush_token() # 先结束之前的参数
# 检查是否是 append 模式 (>>)
if i + 1 < length and script[i + 1] == ">":
current_redirect_append = True
i += 1 # 跳过下一个 >
else:
current_redirect_append = False
if c == "\\":
if i + 1 < len(script):
i += 1
buf += escape_map.get(script[i], script[i])
else:
buf += c
i += 1
continue
# 标记下一个 token 为文件名
is_parsing_redirect_filename = True
else:
current_string += c
has_token = True
case PipelineParseStatus.in_string:
current_string_raw += c
if c == status_in_string_pair:
status = PipelineParseStatus.off_string
elif c == "\\":
status = PipelineParseStatus.in_string_to_escape
else:
current_string += c
case PipelineParseStatus.in_string_to_escape:
escape_map = {
"n": "\n",
"r": "\r",
"t": "\t",
"0": "\0",
"a": "\a",
"b": "\b",
"f": "\f",
"v": "\v",
"\\": "\\",
}
current_string += escape_map.get(c, c)
status = PipelineParseStatus.in_string
case PipelineParseStatus.off_string:
if c in whitespaces:
_flush_token()
status = PipelineParseStatus.normal
elif c == "|":
_flush_token()
if err := _flush_command():
return err
status = PipelineParseStatus.normal
elif c == ";":
_flush_token()
if err := _flush_command():
return err
if current_group:
pipeline.command_groups.append(current_group)
current_group = []
status = PipelineParseStatus.normal
elif c == ">":
_flush_token()
status = PipelineParseStatus.normal
# 回退索引,让下一次循环进入 normal 状态的 > 处理逻辑
i -= 1
else:
# 紧接着的字符继续作为当前字符串的一部分 (如 "abc"d)
current_string += c
current_string_raw = ""
status = PipelineParseStatus.normal
buf += c
i += 1
if quote is not None:
return "存在未闭合的引号"
if escape:
buf += "\\"
# 循环结束后的收尾
_flush_token()
if err := _flush_command():
return err
flush_word()
return tokens
if current_group:
pipeline.command_groups.append(current_group)
def parse_pipeline(self, script: str) -> Script | str:
tokens = self.tokenize(script)
if isinstance(tokens, str):
return tokens
if not tokens:
return Script()
pos = 0
def peek(offset: int = 0) -> Token | None:
idx = pos + offset
return tokens[idx] if idx < len(tokens) else None
def consume() -> Token:
nonlocal pos
tok = tokens[pos]
pos += 1
return tok
def consume_if_op(value: str) -> bool:
tok = peek()
if tok is not None and tok.kind == TokenKind.OP and tok.value == value:
consume()
return True
return False
def consume_if_word(value: str) -> bool:
tok = peek()
if tok is not None and tok.kind == TokenKind.WORD and tok.value == value:
consume()
return True
return False
def expect_word(msg: str) -> Token | str:
tok = peek()
if tok is None or tok.kind != TokenKind.WORD:
return msg
return consume()
def parse_command() -> CommandNode | str:
first = expect_word("缺少指令名")
if isinstance(first, str):
return first
handler = self._resolve_handler(first.value)
if isinstance(handler, str):
return handler
args: list[str] = []
redirects: list[Redirect] = []
while True:
tok = peek()
if tok is None:
break
if tok.kind == TokenKind.OP and tok.value in {"|", ";", "&&", "||"}:
break
if tok.kind == TokenKind.OP and tok.value in {">", ">>"}:
op_tok = consume()
target = expect_word("重定向操作符后面需要缓存名")
if isinstance(target, str):
return target
redirects.append(
Redirect(target=target.value, append=op_tok.value == ">>")
)
continue
if tok.kind != TokenKind.WORD:
return f"无法解析的 token: {tok.value}"
args.append(consume().value)
return CommandNode(
name=first.value,
handler=handler,
args=args,
redirects=redirects,
)
def parse_pipe() -> PipelineNode | str:
negate = False
while consume_if_op("!"):
negate = not negate
pipeline = PipelineNode(negate=negate)
command = parse_command()
if isinstance(command, str):
return command
pipeline.commands.append(command)
while True:
tok = peek()
if tok is None or tok.kind != TokenKind.OP or tok.value != "|":
break
consume()
next_command = parse_command()
if isinstance(next_command, str):
return next_command
pipeline.commands.append(next_command)
return pipeline
def parse_chain() -> CommandGroup | str:
group = CommandGroup()
first_pipeline = parse_pipe()
if isinstance(first_pipeline, str):
return first_pipeline
group.chains.append(ConditionalPipeline(op=None, pipeline=first_pipeline))
while True:
tok = peek()
if tok is None or tok.kind != TokenKind.OP or tok.value not in {"&&", "||"}:
break
op = consume().value
next_pipeline = parse_pipe()
if isinstance(next_pipeline, str):
return next_pipeline
group.chains.append(ConditionalPipeline(op=op, pipeline=next_pipeline))
return group
def parse_if() -> IfNode | str:
if not consume_if_word("if"):
return "缺少 if"
condition = parse_chain()
if isinstance(condition, str):
return condition
consume_if_op(";")
if not consume_if_word("then"):
return "if 语句缺少 then"
then_body = parse_script(stop_words={"else", "fi"})
if isinstance(then_body, str):
return then_body
else_body: Script | None = None
if consume_if_word("else"):
else_body = parse_script(stop_words={"fi"})
if isinstance(else_body, str):
return else_body
if not consume_if_word("fi"):
return "if 语句缺少 fi"
return IfNode(condition=condition, then_body=then_body, else_body=else_body)
def parse_while() -> WhileNode | str:
if not consume_if_word("while"):
return "缺少 while"
condition = parse_chain()
if isinstance(condition, str):
return condition
consume_if_op(";")
if not consume_if_word("do"):
return "while 语句缺少 do"
body = parse_script(stop_words={"done"})
if isinstance(body, str):
return body
if not consume_if_word("done"):
return "while 语句缺少 done"
return WhileNode(condition=condition, body=body)
def parse_statement() -> CommandGroup | IfNode | WhileNode | str:
tok = peek()
if tok is not None and tok.kind == TokenKind.WORD:
if tok.value == "if":
return parse_if()
if tok.value == "while":
return parse_while()
return parse_chain()
def parse_script(stop_words: set[str] | None = None) -> Script | str:
parsed = Script()
nonlocal pos
while pos < len(tokens):
tok = peek()
if tok is None:
break
if stop_words and tok.kind == TokenKind.WORD and tok.value in stop_words:
break
if tok.kind == TokenKind.OP and tok.value == ";":
consume()
continue
statement = parse_statement()
if isinstance(statement, str):
return statement
parsed.statements.append(statement)
tok = peek()
if tok is not None and tok.kind == TokenKind.OP and tok.value == ";":
consume()
return parsed
parsed = parse_script()
if isinstance(parsed, str):
return parsed
if pos != len(tokens):
tok = tokens[pos]
return f"无法解析的 token: {tok.value}"
return parsed
async def _execute_command(
self,
command: CommandNode,
istream: str | None,
env: TextHandlerEnvironment,
) -> TextHandleResult:
logger.debug(
f"Executing: {command.name} args={command.args} redirects={command.redirects}"
)
result = await command.handler.handle(env, istream, command.args)
if result.code != 0:
return result
if command.redirects:
content = result.ostream or ""
for redirect in command.redirects:
if redirect.append:
old_content = env.buffers.get(redirect.target, "")
env.buffers[redirect.target] = old_content + content
else:
env.buffers[redirect.target] = content
return TextHandleResult(code=0, ostream=None, attachment=result.attachment)
return result
async def _execute_pipeline(
self,
pipeline: PipelineNode,
istream: str | None,
env: TextHandlerEnvironment,
) -> TextHandleResult:
current_stream = istream
last_result = TextHandleResult(code=0, ostream=None)
for command in pipeline.commands:
try:
last_result = await self._execute_command(command, current_stream, env)
except Exception as e:
logger.error(f"Pipeline execution failed at {command.name}")
logger.exception(e)
return TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误")
if last_result.code != 0:
if pipeline.negate:
return TextHandleResult(code=0, ostream=None)
return last_result
current_stream = last_result.ostream
if pipeline.negate:
return TextHandleResult(code=1, ostream=None)
return last_result
async def _execute_group(
self,
group: CommandGroup,
istream: str | None,
env: TextHandlerEnvironment,
) -> TextHandleResult:
last_result = TextHandleResult(code=0, ostream=None)
for chain in group.chains:
should_run = True
if chain.op == "&&":
should_run = last_result.code == 0
elif chain.op == "||":
should_run = last_result.code != 0
if should_run:
last_result = await self._execute_pipeline(chain.pipeline, istream, env)
return last_result
async def _execute_if(
self,
if_node: IfNode,
istream: str | None,
env: TextHandlerEnvironment,
) -> TextHandleResult:
condition_result = await self._execute_group(if_node.condition, istream, env)
if condition_result.code == 0:
results = await self.run_pipeline(if_node.then_body, istream, env)
else:
results = (
await self.run_pipeline(if_node.else_body, istream, env)
if if_node.else_body is not None
else [TextHandleResult(code=0, ostream=None)]
)
return results[-1] if results else TextHandleResult(code=0, ostream=None)
async def _execute_while(
self,
while_node: WhileNode,
istream: str | None,
env: TextHandlerEnvironment,
) -> TextHandleResult:
last_result = TextHandleResult(code=0, ostream=None)
for _ in range(MAX_WHILE_ITERATIONS):
condition_result = await self._execute_group(while_node.condition, istream, env)
if condition_result.code != 0:
return last_result
body_results = await self.run_pipeline(while_node.body, istream, env)
if body_results:
last_result = body_results[-1]
if last_result.code != 0:
return last_result
return TextHandleResult(
code=2,
ostream=f"while 循环超过最大迭代次数限制({MAX_WHILE_ITERATIONS}",
)
return pipeline
async def run_pipeline(
self,
pipeline: Script,
pipeline: Pipeline,
istream: str | None,
env: TextHandlerEnvironment | None = None,
) -> list[TextHandleResult]:
@ -563,21 +293,54 @@ class PipelineRunner:
results: list[TextHandleResult] = []
for statement in pipeline.statements:
try:
if isinstance(statement, IfNode):
results.append(await self._execute_if(statement, istream, env))
elif isinstance(statement, WhileNode):
results.append(await self._execute_while(statement, istream, env))
else:
results.append(await self._execute_group(statement, istream, env))
except Exception as e:
logger.error(f"Pipeline execution failed: {e}")
logger.exception(e)
results.append(
TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误")
)
return results
# 遍历执行指令组 (分号分隔),每个组独立产生输出
for group in pipeline.command_groups:
current_stream = istream
group_result = TextHandleResult(code=0, ostream=None)
# 遍历组内指令 (管道分隔)
for cmd in group:
try:
logger.debug(
f"Executing: {cmd.handler.name} args={cmd.args} redirect={cmd.redirect_target}"
)
result = await cmd.handler.handle(env, current_stream, cmd.args)
if result.code != 0:
# 组内出错,整条流水线中止
results.append(result)
return results
# 处理重定向逻辑
if cmd.redirect_target:
content_to_write = result.ostream or ""
target_buffer = cmd.redirect_target
if cmd.redirect_append:
old_content = env.buffers.get(target_buffer, "")
env.buffers[target_buffer] = old_content + content_to_write
else:
env.buffers[target_buffer] = content_to_write
current_stream = None
group_result = TextHandleResult(
code=0, ostream=None, attachment=result.attachment
)
else:
current_stream = result.ostream
group_result = result
except Exception as e:
logger.error(f"Pipeline execution failed at {cmd.handler.name}")
logger.exception(e)
results.append(
TextHandleResult(
code=-1, ostream="处理流水线时出现 python 错误"
)
)
return results
results.append(group_result)
return results

View File

@ -13,8 +13,10 @@ class THEcho(TextHandler):
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
# echo 不读 stdin只输出参数Unix 语义)
# 无参数时输出空行(与 Unix echo 行为一致)
if len(args) == 0 and istream is None:
return TextHandleResult(1, "请在 echo 后面添加需要输出的文本")
if istream is not None:
return TextHandleResult(0, "\n".join([istream] + args))
return TextHandleResult(0, "\n".join(args))
@ -24,6 +26,7 @@ class THCat(TextHandler):
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
# No args: pass through stdin (like Unix cat with no arguments)
if len(args) == 0:
if istream is None:
return TextHandleResult(
@ -32,6 +35,7 @@ class THCat(TextHandler):
)
return TextHandleResult(0, istream)
# Concatenate all specified sources in order
parts: list[str] = []
for arg in args:
if arg == "-":
@ -70,6 +74,7 @@ class THReplace(TextHandler):
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
# 用法: replace <pattern> <replacement> [text]
if len(args) < 2:
return TextHandleResult(1, "用法replace <正则> <替换内容> [文本]")
@ -85,77 +90,3 @@ class THReplace(TextHandler):
return TextHandleResult(0, res)
except Exception as e:
return TextHandleResult(1, f"正则错误: {str(e)}")
class THTrue(TextHandler):
name = "true"
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
return TextHandleResult(0, istream)
class THFalse(TextHandler):
name = "false"
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
return TextHandleResult(1, None)
class THTest(TextHandler):
name = "test"
keywords = ["["]
def _bool_result(self, value: bool) -> TextHandleResult:
return TextHandleResult(0 if value else 1, None)
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
expr = list(args)
# 支持方括号语法:[ expr ] 会自动移除末尾的 ]
if expr and expr[-1] == "]":
expr = expr[:-1]
if not expr:
return TextHandleResult(1, None)
if len(expr) == 1:
return self._bool_result(len(expr[0]) > 0)
if len(expr) == 2:
op, value = expr
if op == "-n":
return self._bool_result(len(value) > 0)
if op == "-z":
return self._bool_result(len(value) == 0)
return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}")
if len(expr) == 3:
left, op, right = expr
if op == "=":
return self._bool_result(left == right)
if op == "!=":
return self._bool_result(left != right)
if op in {"-eq", "-ne", "-gt", "-ge", "-lt", "-le"}:
try:
li = int(left)
ri = int(right)
except ValueError:
return TextHandleResult(2, "test 的数字比较参数必须是整数")
mapping = {
"-eq": li == ri,
"-ne": li != ri,
"-gt": li > ri,
"-ge": li >= ri,
"-lt": li < ri,
"-le": li <= ri,
}
return self._bool_result(mapping[op])
return TextHandleResult(2, f"test 不支持的操作符: {op}")
return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}")

View File

@ -6,7 +6,7 @@ import PIL
import PIL.Image
import cv2
import imageio.v3 as iio
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, UniMessage, on_alconna
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
import numpy
from konabot.common.nb.exc import BotExceptionMessage
@ -34,7 +34,6 @@ cmd_giftool = on_alconna(
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
Option("--pingpong"),
)
)
@ -47,7 +46,6 @@ async def _(
length: str | None = None,
speed_factor: float = 1.0,
end_point: str | None = None,
pingpong: Query[bool] = Query("pingpong"),
):
ss: None | float = None
if start_point:
@ -164,16 +162,6 @@ async def _(
rframes = rframes[::-1]
rdur_ms = rdur_ms[::-1]
# 处理 pingpong 模式
if pingpong.available:
# 复制一份反转的帧序列(去掉第一帧避免重复)
pingpong_frames = rframes[1:][::-1] if len(rframes) > 1 else rframes[::-1]
pingpong_durations = rdur_ms[1:][::-1] if len(rdur_ms) > 1 else rdur_ms[::-1]
# 拼接正放和倒放
rframes = rframes + pingpong_frames
rdur_ms = rdur_ms + pingpong_durations
output_img = BytesIO()
if rframes:

View File

@ -1,210 +0,0 @@
import copy
import re
from pathlib import Path
import nonebot
from nonebot import on_command
from nonebot.adapters import Bot, Event, Message
from nonebot.log import logger
from nonebot.message import handle_event
from nonebot.params import CommandArg
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget
ROOT_PATH = Path(__file__).resolve().parent
cmd = on_command(cmd="语法糖", aliases={"", "sugar"}, block=True)
db_manager = DatabaseManager()
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
await init_db()
@driver.on_shutdown
async def register_shutdown_hook():
await db_manager.close_all_connections()
async def init_db():
await db_manager.execute_by_sql_file(ROOT_PATH / "sql" / "create_table.sql")
table_info = await db_manager.query("PRAGMA table_info(syntactic_sugar)")
columns = {str(row.get("name")) for row in table_info}
if "channel_id" not in columns:
await db_manager.execute(
"ALTER TABLE syntactic_sugar ADD COLUMN channel_id VARCHAR(255) NOT NULL DEFAULT ''"
)
await db_manager.execute("DROP INDEX IF EXISTS idx_syntactic_sugar_name_belong_to")
await db_manager.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target "
"ON syntactic_sugar(name, channel_id, belong_to)"
)
def _extract_reply_plain_text(evt: Event) -> str:
reply = getattr(evt, "reply", None)
if reply is None:
return ""
reply_message = getattr(reply, "message", None)
if reply_message is None:
return ""
extract_plain_text = getattr(reply_message, "extract_plain_text", None)
if callable(extract_plain_text):
return extract_plain_text().strip()
return str(reply_message).strip()
def _split_variables(tokens: list[str]) -> tuple[list[str], dict[str, str]]:
positional: list[str] = []
named: dict[str, str] = {}
for token in tokens:
if "=" in token:
key, value = token.split("=", 1)
key = key.strip()
if key:
named[key] = value
continue
positional.append(token)
return positional, named
def _render_template(content: str, positional: list[str], named: dict[str, str]) -> str:
def replace(match: re.Match[str]) -> str:
key = match.group(1).strip()
if key.isdigit():
idx = int(key) - 1
if 0 <= idx < len(positional):
return positional[idx]
return match.group(0)
return named.get(key, match.group(0))
return re.sub(r"\{([^{}]+)\}", replace, content)
async def _store_sugar(name: str, content: str, belong_to: str, channel_id: str):
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_sugar.sql",
(name, content, belong_to, channel_id),
)
async def _delete_sugar(name: str, belong_to: str, channel_id: str):
await db_manager.execute(
"DELETE FROM syntactic_sugar WHERE name = ? AND belong_to = ? AND channel_id = ?",
(name, belong_to, channel_id),
)
async def _find_sugar(name: str, belong_to: str, channel_id: str) -> str | None:
rows = await db_manager.query(
(
"SELECT content FROM syntactic_sugar "
"WHERE name = ? AND channel_id = ? "
"ORDER BY CASE WHEN belong_to = ? THEN 0 ELSE 1 END, id ASC "
"LIMIT 1"
),
(name, channel_id, belong_to),
)
if not rows:
return None
return rows[0].get("content")
async def _reinject_command(bot: Bot, evt: Event, command_text: str) -> bool:
depth = int(getattr(evt, "_syntactic_sugar_depth", 0))
if depth >= 3:
return False
try:
cloned_evt = copy.deepcopy(evt)
except Exception:
logger.exception("语法糖克隆事件失败")
return False
message = getattr(cloned_evt, "message", None)
if message is None:
return False
try:
msg_obj = type(message)(command_text)
except Exception:
msg_obj = command_text
setattr(cloned_evt, "message", msg_obj)
if hasattr(cloned_evt, "original_message"):
setattr(cloned_evt, "original_message", msg_obj)
if hasattr(cloned_evt, "raw_message"):
setattr(cloned_evt, "raw_message", command_text)
setattr(cloned_evt, "_syntactic_sugar_depth", depth + 1)
try:
await handle_event(bot, cloned_evt)
except Exception:
logger.exception("语法糖回注事件失败")
return False
return True
@cmd.handle()
async def _(bot: Bot, evt: Event, target: DepLongTaskTarget, args: Message = CommandArg()):
raw = args.extract_plain_text().strip()
if not raw:
return
tokens = raw.split()
action = tokens[0]
target_id = target.target_id
channel_id = target.channel_id
if action == "存入":
if len(tokens) < 2:
await cmd.finish("请提供要存入的名称")
name = tokens[1].strip()
content = " ".join(tokens[2:]).strip()
if not content:
content = _extract_reply_plain_text(evt)
if not content:
await cmd.finish("请提供要存入的内容")
await _store_sugar(name, content, target_id, channel_id)
await cmd.finish(f"糖已存入:「{name}」!")
if action == "删除":
if len(tokens) < 2:
await cmd.finish("请提供要删除的名称")
name = tokens[1].strip()
await _delete_sugar(name, target_id, channel_id)
await cmd.finish(f"已删除糖:「{name}」!")
if action == "查看":
if len(tokens) < 2:
await cmd.finish("请提供要查看的名称")
name = tokens[1].strip()
content = await _find_sugar(name, target_id, channel_id)
if content is None:
await cmd.finish(f"没有糖:「{name}")
await cmd.finish(f"糖的内容:「{content}")
name = action
content = await _find_sugar(name, target_id, channel_id)
if content is None:
await cmd.finish(f"没有糖:「{name}")
positional, named = _split_variables(tokens[1:])
rendered = _render_template(content, positional, named)
ok = await _reinject_command(bot, evt, rendered)
if not ok:
await cmd.finish(f"糖的展开结果:「{rendered}")

View File

@ -1,12 +0,0 @@
-- 创建语法糖表
CREATE TABLE IF NOT EXISTS syntactic_sugar (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
belong_to VARCHAR(255) NOT NULL,
channel_id VARCHAR(255) NOT NULL DEFAULT ''
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target
ON syntactic_sugar(name, channel_id, belong_to);

View File

@ -1,5 +0,0 @@
-- 插入语法糖,如果同一用户下名称已存在则更新内容
INSERT INTO syntactic_sugar (name, content, belong_to, channel_id)
VALUES (?, ?, ?, ?)
ON CONFLICT(name, channel_id, belong_to) DO UPDATE SET
content = excluded.content;

View File

@ -1,9 +1,11 @@
import asyncio
import os
import subprocess
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import cast
import zipfile
from loguru import logger
from nonebot import on_command
@ -13,22 +15,97 @@ from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot
from nonebot.adapters.onebot.v11.message import Message as OB11Message
from konabot.common.artifact import ArtifactDepends, register_artifacts
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import TMP_PATH
from konabot.common.path import BINARY_PATH, TMP_PATH
arti_typst_linux = ArtifactDepends(
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-unknown-linux-musl.tar.xz",
sha256="a6044cbad2a954deb921167e257e120ac0a16b20339ec01121194ff9d394996d",
target=BINARY_PATH / "typst.tar.xz",
required_os="Linux",
required_arch="x86_64",
)
arti_typst_windows = ArtifactDepends(
url="https://github.com/typst/typst/releases/download/v0.14.2/typst-x86_64-pc-windows-msvc.zip",
sha256="51353994ac83218c3497052e89b2c432c53b9d4439cdc1b361e2ea4798ebfc13",
target=BINARY_PATH / "typst.zip",
required_os="Windows",
required_arch="AMD64",
)
bin_path: Path | None = None
@arti_typst_linux.on_finished
async def _(downloaded: bool):
global bin_path
tar_path = arti_typst_linux.target
bin_path = tar_path.with_name("typst")
if downloaded or not bin_path.exists():
bin_path.unlink(missing_ok=True)
process = await asyncio.create_subprocess_exec(
"tar",
"-xvf",
tar_path,
"--strip-components=1",
"-C",
BINARY_PATH,
"typst-x86_64-unknown-linux-musl/typst",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0 or not bin_path.exists():
logger.warning(
"似乎没有成功解压 Typst 二进制文件,检查一下吧! "
f"stdout={stdout} stderr={stderr}"
)
else:
os.chmod(bin_path, 0o755)
@arti_typst_windows.on_finished
async def _(downloaded: bool):
global bin_path
zip_path = arti_typst_windows.target
bin_path = BINARY_PATH / "typst.exe"
if downloaded or not bin_path.exists():
bin_path.unlink(missing_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
target_name = "typst-x86_64-pc-windows-msvc/typst.exe"
if target_name not in zf.namelist():
logger.warning("在 Zip 压缩包里面没有找到目标文件")
return
zf.extract(target_name, BINARY_PATH)
(BINARY_PATH / target_name).rename(bin_path)
(BINARY_PATH / "typst-x86_64-pc-windows-msvc").rmdir()
register_artifacts(arti_typst_linux)
register_artifacts(arti_typst_windows)
TEMPLATE_PATH = Path(__file__).parent / "template.typ"
TEMPLATE = TEMPLATE_PATH.read_text()
def render_sync(code: str) -> bytes:
def render_sync(code: str) -> bytes | None:
if bin_path is None:
return
with TemporaryDirectory(dir=TMP_PATH) as tmpdirname:
temp_dir = Path(tmpdirname).resolve()
temp_typ = temp_dir / "page.typ"
temp_typ.write_text(TEMPLATE + "\n\n" + code)
cmd = [
"typst",
bin_path,
"compile",
temp_typ.name,
"--format",
@ -61,7 +138,7 @@ def render_sync(code: str) -> bytes:
return result_png.read_bytes()
async def render(code: str) -> bytes:
async def render(code: str) -> bytes | None:
task = asyncio.to_thread(lambda: render_sync(code))
return await task
@ -70,7 +147,15 @@ cmd = on_command("typst")
@cmd.handle()
async def _(evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget):
async def _(
evt: Event,
bot: Bot,
msg: UniMsg,
target: DepLongTaskTarget,
):
if bin_path is None:
return
typst_code = ""
if isinstance(evt, OB11MessageEvent):
if evt.reply is not None:
@ -92,6 +177,8 @@ async def _(evt: Event, bot: Bot, msg: UniMsg, target: DepLongTaskTarget):
try:
res = await render(typst_code)
if res is None:
raise FileNotFoundError("没有渲染出来内容")
except FileNotFoundError as e:
await target.send_message("渲染出错:内部错误")
raise e from e

View File

@ -86,67 +86,3 @@ def test_prase_input_args_parses_resize_second_argument_as_float():
assert len(filters) == 1
assert filters[0].name == "缩放"
assert filters[0].args == [2.0, 3.0]
def test_apply_pixel_sort_keeps_image_mode_and_size():
"""测试 Pixel Sort 保持图像的 mode 和 size"""
image = Image.new("RGBA", (10, 10), (255, 0, 0, 128))
result = ImageFilterImplement.apply_pixel_sort(image)
assert result.size == image.size
assert result.mode == "RGBA"
def test_apply_pixel_sort_horizontal():
"""测试水平方向的 Pixel Sort"""
# 创建一个简单的渐变图像
image = Image.new("RGB", (5, 3))
# 第一行:红到蓝渐变
image.putpixel((0, 0), (255, 0, 0))
image.putpixel((1, 0), (200, 0, 0))
image.putpixel((2, 0), (100, 0, 0))
image.putpixel((3, 0), (50, 0, 0))
image.putpixel((4, 0), (0, 0, 255))
# 填充其他行
for y in range(1, 3):
for x in range(5):
image.putpixel((x, y), (128, 128, 128))
result = ImageFilterImplement.apply_pixel_sort(
image, direction="horizontal", auto_threshold=False, mask_threshold=10
)
assert result.size == image.size
assert result.mode == "RGBA"
def test_apply_pixel_sort_vertical():
"""测试垂直方向的 Pixel Sort"""
image = Image.new("RGB", (3, 5))
# 第一列:绿到红渐变
image.putpixel((0, 0), (0, 255, 0))
image.putpixel((0, 1), (0, 200, 0))
image.putpixel((0, 2), (0, 100, 0))
image.putpixel((0, 3), (0, 50, 0))
image.putpixel((0, 4), (255, 0, 0))
# 填充其他列
for y in range(5):
for x in range(1, 3):
image.putpixel((x, y), (128, 128, 128))
result = ImageFilterImplement.apply_pixel_sort(
image, direction="vertical", auto_threshold=False, mask_threshold=10
)
assert result.size == image.size
assert result.mode == "RGBA"
def test_prase_input_args_parses_pixel_sort_arguments():
"""测试解析 Pixel Sort 参数"""
filters = prase_input_args("像素排序 horizontal 0 false brightness 128 false 1")
assert len(filters) == 1
assert filters[0].name == "像素排序"
assert filters[0].args == ["horizontal", 0.0, False, "brightness", 128.0, False, 1]

View File

@ -1,75 +0,0 @@
import nonebot
nonebot.init()
import asyncio
import pytest
from konabot.plugins.handle_text.__init__ import (
_get_textfx_user_key,
_textfx_running_users,
TEXTFX_MAX_RUNTIME_SECONDS,
)
from konabot.plugins.handle_text.base import PipelineRunner
class DummyEvent:
def __init__(self, self_id=None, user_id=None, group_id=None, session_id=None):
self.self_id = self_id
self.user_id = user_id
self.group_id = group_id
self._session_id = session_id
def get_session_id(self):
if self._session_id is None:
raise RuntimeError('no session')
return self._session_id
def test_textfx_user_key_group():
evt = DummyEvent(self_id='123', user_id='456', group_id='789')
assert _get_textfx_user_key(evt) == '123:789:456'
def test_textfx_user_key_private():
evt = DummyEvent(self_id='123', user_id='456')
assert _get_textfx_user_key(evt) == '123:private:456'
def test_textfx_user_key_session_fallback():
evt = DummyEvent(session_id='console:alice')
assert _get_textfx_user_key(evt) == 'session:console:alice'
@pytest.mark.asyncio
async def test_textfx_timeout_limit():
"""测试脚本执行超时限制"""
runner = PipelineRunner.get_runner()
# 创建一个会超时的脚本while true 会触发迭代限制,但我们用 sleep 模拟长时间运行)
# 由于实际超时是 60 秒,我们不能真的等那么久,所以这个测试验证超时机制存在
script = "echo start"
parsed = runner.parse_pipeline(script)
assert not isinstance(parsed, str), "脚本解析应该成功"
# 验证 TEXTFX_MAX_RUNTIME_SECONDS 常量存在且合理
assert TEXTFX_MAX_RUNTIME_SECONDS == 60
@pytest.mark.asyncio
async def test_textfx_concurrent_limit():
"""测试同一用户并发执行限制"""
user_key = "test:group:user123"
# 清理可能的残留状态
_textfx_running_users.discard(user_key)
# 模拟第一个脚本正在运行
assert user_key not in _textfx_running_users
_textfx_running_users.add(user_key)
# 验证用户已被标记为运行中
assert user_key in _textfx_running_users
# 清理
_textfx_running_users.discard(user_key)
assert user_key not in _textfx_running_users

View File

@ -1,225 +0,0 @@
import pytest
import nonebot
nonebot.init()
from konabot.plugins.handle_text.base import IfNode, PipelineRunner, TextHandlerEnvironment, WhileNode
from konabot.plugins.handle_text.handlers.encoding_handlers import THReverse
from konabot.plugins.handle_text.handlers.unix_handlers import (
THCat,
THEcho,
THFalse,
THRm,
THTest,
THTrue,
)
from konabot.plugins.handle_text.handlers.whitespace_handlers import THTrim
@pytest.fixture
def runner() -> PipelineRunner:
runner = PipelineRunner()
runner.register(THEcho())
runner.register(THCat())
runner.register(THRm())
runner.register(THTrue())
runner.register(THFalse())
runner.register(THTest())
runner.register(THReverse())
runner.register(THTrim())
return runner
def test_parse_pipeline_shell_ops(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo hello | reverse && test a = a || echo no; echo done > out')
assert not isinstance(parsed, str)
assert len(parsed.statements) == 2
first = parsed.statements[0]
second = parsed.statements[1]
assert not isinstance(first, IfNode)
assert not isinstance(first, WhileNode)
assert not isinstance(second, IfNode)
assert not isinstance(second, WhileNode)
assert len(first.chains) == 3
assert first.chains[0].pipeline.commands[0].name == 'echo'
assert first.chains[0].pipeline.commands[1].name == 'reverse'
assert second.chains[0].pipeline.commands[0].redirects[0].target == 'out'
def test_parse_if_statement(runner: PipelineRunner):
parsed = runner.parse_pipeline('if test a = a; then echo yes; else echo no; fi')
assert not isinstance(parsed, str)
assert len(parsed.statements) == 1
stmt = parsed.statements[0]
assert isinstance(stmt, IfNode)
assert stmt.else_body is not None
assert len(stmt.then_body.statements) == 1
def test_parse_while_statement(runner: PipelineRunner):
parsed = runner.parse_pipeline('while false; do echo yes; done')
assert not isinstance(parsed, str)
assert len(parsed.statements) == 1
stmt = parsed.statements[0]
assert isinstance(stmt, WhileNode)
assert len(stmt.body.statements) == 1
@pytest.mark.asyncio
async def test_pipeline_pipe(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo hello | reverse')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert len(results) == 1
assert results[0].code == 0
assert results[0].ostream == 'olleh'
@pytest.mark.asyncio
async def test_redirect_and_cat(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo hello > a; cat a')
assert not isinstance(parsed, str)
env = TextHandlerEnvironment(False)
results = await runner.run_pipeline(parsed, None, env)
assert env.buffers['a'] == 'hello'
assert results[-1].ostream == 'hello'
@pytest.mark.asyncio
async def test_append_redirect(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo hello > a; echo world >> a; cat a')
assert not isinstance(parsed, str)
env = TextHandlerEnvironment(False)
results = await runner.run_pipeline(parsed, None, env)
assert env.buffers['a'] == 'helloworld'
assert results[-1].ostream == 'helloworld'
@pytest.mark.asyncio
async def test_and_or_short_circuit(runner: PipelineRunner):
parsed = runner.parse_pipeline('test a = b && echo bad || echo ok')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert len(results) == 1
assert results[0].code == 0
assert results[0].ostream == 'ok'
@pytest.mark.asyncio
async def test_test_bracket_alias(runner: PipelineRunner):
parsed = runner.parse_pipeline('[ 2 -gt 1 ] && echo yes')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 0
assert results[0].ostream == 'yes'
@pytest.mark.asyncio
async def test_test_string_ops(runner: PipelineRunner):
parsed = runner.parse_pipeline('test -n abc && echo yes; test -z abc || echo no')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert [r.ostream for r in results] == ['yes', 'no']
@pytest.mark.asyncio
async def test_quote_and_trim(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo " hello world " | trim')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].ostream == 'hello world'
@pytest.mark.asyncio
async def test_if_then_else(runner: PipelineRunner):
parsed = runner.parse_pipeline('if test a = b; then echo yes; else echo no; fi')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 0
assert results[0].ostream == 'no'
@pytest.mark.asyncio
async def test_if_then_without_else(runner: PipelineRunner):
parsed = runner.parse_pipeline('if test a = a; then echo yes; fi')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].ostream == 'yes'
@pytest.mark.asyncio
async def test_nested_if(runner: PipelineRunner):
parsed = runner.parse_pipeline(
'if test a = a; then if test b = c; then echo x; else echo y; fi; else echo z; fi'
)
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].ostream == 'y'
@pytest.mark.asyncio
async def test_negate_pipeline(runner: PipelineRunner):
parsed = runner.parse_pipeline('! test a = b && echo ok')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].ostream == 'ok'
@pytest.mark.asyncio
async def test_true_false(runner: PipelineRunner):
parsed = runner.parse_pipeline('true && echo yes; false || echo no')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert [r.ostream for r in results] == ['yes', 'no']
@pytest.mark.asyncio
async def test_while_false_noop(runner: PipelineRunner):
parsed = runner.parse_pipeline('while false; do echo yes; done')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 0
assert results[0].ostream is None
@pytest.mark.asyncio
async def test_while_limit_guard(runner: PipelineRunner):
parsed = runner.parse_pipeline('while true; do echo yes; done')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 2
assert 'while 循环超过最大迭代次数限制' in (results[0].ostream or '')
@pytest.mark.asyncio
async def test_while_with_immediate_break_condition(runner: PipelineRunner):
parsed = runner.parse_pipeline('while ! false; do false; done')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 1
@pytest.mark.asyncio
async def test_while_body_can_use_if(runner: PipelineRunner):
parsed = runner.parse_pipeline('while ! false; do if true; then false; fi; done')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 1
@pytest.mark.asyncio
async def test_echo_empty_string(runner: PipelineRunner):
"""测试 echo 空字符串"""
# 双引号空字符串
parsed = runner.parse_pipeline('echo ""')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 0
assert results[0].ostream == ''
# 单引号空字符串
parsed2 = runner.parse_pipeline("echo ''")
assert not isinstance(parsed2, str)
results2 = await runner.run_pipeline(parsed2, None, TextHandlerEnvironment(False))
assert results2[0].code == 0
assert results2[0].ostream == ''