diff --git a/konabot/docs/user/textfx.txt b/konabot/docs/user/textfx.txt index e798465..cbb54b9 100644 --- a/konabot/docs/user/textfx.txt +++ b/konabot/docs/user/textfx.txt @@ -31,7 +31,16 @@ - 用 `|` 连接多个操作,前一个的输出自动作为后一个的输入。 - 用 `;` 分隔多条独立指令,它们各自产生输出,最终合并显示。 -- 用 `>` 或 `>>` 把结果保存起来(见下文),被重定向的指令不会产生输出。 +- 用 `&&` / `||` 做最小 shell 风格条件执行: + - `cmd1 && cmd2`:仅当 `cmd1` 成功时执行 `cmd2` + - `cmd1 || cmd2`:仅当 `cmd1` 失败时执行 `cmd2` +- 用 `!` 对一条 pipeline 的成功/失败取反。 +- 支持最小 bash-like `if ... then ... else ... fi` 语句。 +- 支持最小 bash-like `while ... do ... done` 循环。 +- 可使用内建真假命令:`true` / `false`。 +- 为避免滥用与卡死: + - 同一用户同时只能运行 **一个** textfx 脚本 + - 单个脚本最长执行时间为 **60 秒** **例子**:把"HELLO"先反转,再转成摩斯电码:(转换为摩斯电码功能暂未实现) ``` @@ -39,6 +48,36 @@ textfx reverse HELLO | morse en ``` → 输出:`--- .-.. .-.. . ....` +**例子**:失败后兜底执行: +``` +textfx test a = b || echo 不相等 +``` +→ 输出:`不相等` + +**例子**:成功后继续执行: +``` +textfx [ 2 -gt 1 ] && echo 条件成立 +``` +→ 输出:`条件成立` + +**例子**:真正的 if 语句: +``` +textfx if test a = b; then echo yes; else echo no; fi +``` +→ 输出:`no` + +**例子**:对条件取反: +``` +textfx ! test a = b && echo 条件不成立 +``` +→ 输出:`条件不成立` + +**例子**:while 循环: +``` +textfx while false; do echo 不会执行; done +``` +→ 输出为空 + **例子**:多条指令各自输出: ``` textfx echo 你好; echo 世界 @@ -132,6 +171,51 @@ Base64 编码或解码。 > 缓存仅在当前对话中有效,重启后清空。 +### true / false / test / [ +最小 shell 风格条件命令。通常配合 `if`、`&&`、`||`、`!` 使用。 + +支持: +- `true`:总是成功 +- `false`:总是失败 +- 字符串非空:`test foo` +- `-n` / `-z`:`test -n foo`、`test -z ""` +- 字符串比较:`test a = a`、`test a != b` +- 整数比较:`test 2 -gt 1`、`test 3 -le 5` +- 方括号别名:`[ 2 -gt 1 ]` + +示例: +- `/textfx true && echo 一定执行` +- `/textfx false || echo 兜底执行` +- `/textfx test hello && echo 有内容` +- `/textfx test a = b || echo 不相等` +- `/textfx [ 3 -ge 2 ] && echo yes` + +### if / then / else / fi +支持最小 bash-like 条件语句。 + +示例: +- `/textfx if test a = a; then echo yes; else echo no; fi` +- `/textfx if [ 2 -gt 1 ]; then echo 成立; fi` +- `/textfx if test a = a; then if test b = c; then echo x; else echo y; fi; fi` + +说明: +- `if` 后面跟一个条件链,可配合 `test`、`[`、`!`、`&&`、`||` +- `then` 和 `else` 后面都可以写多条以 `;` 分隔的 textfx 语句 +- `else` 可省略 + +### while / do / done +支持最小 bash-like 循环语句。 + +示例: +- `/textfx while false; do echo 不会执行; done` +- `/textfx while ! false; do false; done` +- `/textfx while ! false; do if true; then false; fi; done` + +说明: +- `while` 后面跟一个条件链,返回成功就继续循环 +- `do` 后面可写多条以 `;` 分隔的 textfx 语句 +- 为避免 bot 死循环,内置最大循环次数限制;超限会报错 + ### replace(或 替换、sed) 替换文字(支持正则表达式)。 示例(普通):`/textfx replace 世界 宇宙 你好世界` → `你好宇宙` diff --git a/konabot/plugins/handle_text/__init__.py b/konabot/plugins/handle_text/__init__.py index 3b17c52..1765719 100644 --- a/konabot/plugins/handle_text/__init__.py +++ b/konabot/plugins/handle_text/__init__.py @@ -1,4 +1,5 @@ from typing import cast +import asyncio from loguru import logger from nonebot import on_command import nonebot @@ -31,8 +32,11 @@ from konabot.plugins.handle_text.handlers.random_handlers import THShuffle, THSo from konabot.plugins.handle_text.handlers.unix_handlers import ( THCat, THEcho, + THFalse, THReplace, THRm, + THTest, + THTrue, ) from konabot.plugins.handle_text.handlers.whitespace_handlers import ( THLines, @@ -43,11 +47,37 @@ from konabot.plugins.handle_text.handlers.whitespace_handlers import ( ) +TEXTFX_MAX_RUNTIME_SECONDS = 60 +_textfx_running_users: set[str] = set() + + +def _get_textfx_user_key(evt: Event) -> str: + user_id = getattr(evt, "user_id", None) + self_id = getattr(evt, "self_id", None) + group_id = getattr(evt, "group_id", None) + if user_id is not None: + if group_id is not None: + return f"{self_id}:{group_id}:{user_id}" + return f"{self_id}:private:{user_id}" + session_id = getattr(evt, "get_session_id", None) + if callable(session_id): + try: + return f"session:{evt.get_session_id()}" + except Exception: + pass + return f"event:{evt.__class__.__name__}:{id(evt)}" + + cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"}) @cmd.handle() async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget): + user_key = _get_textfx_user_key(evt) + if user_key in _textfx_running_users: + await target.send_message("你当前已有一个 textfx 脚本正在运行,请等待它结束后再试。") + return + istream = "" if isinstance(evt, OB11MessageEvent): if evt.reply is not None: @@ -71,9 +101,22 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget): return env = TextHandlerEnvironment(is_trusted=False, event=evt) - results = await runner.run_pipeline(res, istream or None, env) - # 检查是否有错误 + _textfx_running_users.add(user_key) + try: + results = await asyncio.wait_for( + runner.run_pipeline(res, istream or None, env), + timeout=TEXTFX_MAX_RUNTIME_SECONDS, + ) + except asyncio.TimeoutError: + rendered = await render_error_message( + f"处理指令时出现问题:脚本执行超时(超过 {TEXTFX_MAX_RUNTIME_SECONDS} 秒)" + ) + await target.send_message(rendered) + return + finally: + _textfx_running_users.discard(user_key) + for r in results: if r.code != 0: message = f"处理指令时出现问题:{r.ostream}" @@ -81,7 +124,6 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget): await target.send_message(rendered) return - # 收集所有组的文本输出和附件 ostreams = [r.ostream for r in results if r.ostream is not None] attachments = [r.attachment for r in results if r.attachment is not None] @@ -108,6 +150,9 @@ async def _(): THCat(), THEcho(), THRm(), + THTrue(), + THFalse(), + THTest(), THShuffle(), THReplace(), THBase64(), diff --git a/konabot/plugins/handle_text/base.py b/konabot/plugins/handle_text/base.py index 90d144a..b199ae3 100644 --- a/konabot/plugins/handle_text/base.py +++ b/konabot/plugins/handle_text/base.py @@ -1,15 +1,16 @@ import asyncio - from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum -from string import whitespace from typing import cast from loguru import logger from nonebot.adapters import Event +MAX_WHILE_ITERATIONS = 100 + + @dataclass class TextHandlerEnvironment: is_trusted: bool @@ -53,29 +54,63 @@ class TextHandlerSync(TextHandler): @dataclass -class PipelineCommand: - handler: TextHandler - args: list[str] - # 新增:重定向目标(buffer key) - redirect_target: str | None = None - # 新增:是否为追加模式 (>>) - redirect_append: bool = False +class Redirect: + target: str + append: bool = False @dataclass -class Pipeline: - command_groups: list[list[PipelineCommand]] = field(default_factory=list) - "一个列表的列表,每一组之间的指令之间使用管道符连接,而不同组之间不会有数据流" +class CommandNode: + name: str + handler: TextHandler + args: list[str] + redirects: list[Redirect] = field(default_factory=list) -class PipelineParseStatus(Enum): - normal = 0 - in_string = 1 - in_string_to_escape = 2 - off_string = 3 +@dataclass +class PipelineNode: + commands: list[CommandNode] = field(default_factory=list) + negate: bool = False -whitespaces = whitespace + "  " +@dataclass +class ConditionalPipeline: + op: str | None + pipeline: PipelineNode + + +@dataclass +class CommandGroup: + chains: list[ConditionalPipeline] = field(default_factory=list) + + +@dataclass +class IfNode: + condition: CommandGroup + then_body: "Script" + else_body: "Script | None" = None + + +@dataclass +class WhileNode: + condition: CommandGroup + body: "Script" + + +@dataclass +class Script: + statements: list[CommandGroup | IfNode | WhileNode] = field(default_factory=list) + + +class TokenKind(Enum): + WORD = "word" + OP = "op" + + +@dataclass +class Token: + kind: TokenKind + value: str class PipelineRunner: @@ -93,198 +128,432 @@ class PipelineRunner: def register(self, handler: TextHandler): self.handlers.append(handler) - def parse_pipeline(self, script: str) -> Pipeline | str: - pipeline = Pipeline() - - # 当前正在构建的上下文 - current_group: list[PipelineCommand] = [] - current_command_args: list[str] = [] - - # 字符串解析状态 - status = PipelineParseStatus.normal - current_string = "" - current_string_raw = "" - status_in_string_pair = "" - has_token = False # 是否正在构建一个 token(区分空字符串和无 token) - - # 重定向解析状态 - is_parsing_redirect_filename = False - current_redirect_target: str | None = None - current_redirect_append = False - - # 辅助函数:将当前解析到的字符串 flush 到 参数列表 或 重定向目标 - def _flush_token(): - nonlocal \ - current_string, \ - current_string_raw, \ - is_parsing_redirect_filename, \ - current_redirect_target, \ - has_token - if not has_token: - return - - if is_parsing_redirect_filename: - current_redirect_target = current_string - is_parsing_redirect_filename = False # 重定向文件名只取一个 token - else: - current_command_args.append(current_string) - - current_string = "" - current_string_raw = "" - has_token = False - - # 辅助函数:将当前指令 flush 到当前组 - def _flush_command() -> str | None: - nonlocal \ - current_command_args, \ - current_redirect_target, \ - current_redirect_append - if not current_command_args: - return None - - cmd_name = current_command_args[0] - args = current_command_args[1:] - - matched = [ - h for h in self.handlers if cmd_name in h.keywords or cmd_name == h.name - ] - if not matched: - return f"不存在名为 {cmd_name} 的函数" - if len(matched) > 1: - logger.warning( - f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}" - ) - - cmd = PipelineCommand( - handler=matched[0], - args=args, - redirect_target=current_redirect_target, - redirect_append=current_redirect_append, + def _resolve_handler(self, cmd_name: str) -> TextHandler | str: + matched = [ + h for h in self.handlers if cmd_name == h.name or cmd_name in h.keywords + ] + if not matched: + return f"不存在名为 {cmd_name} 的函数" + if len(matched) > 1: + logger.warning( + f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}" ) - current_group.append(cmd) + return matched[0] - # 重置指令级状态 - current_command_args = [] - current_redirect_target = None - current_redirect_append = False - return None - - # 使用索引遍历以支持 look-ahead (处理 >>) + def tokenize(self, script: str) -> list[Token] | str: + tokens: list[Token] = [] + buf = "" + quote: str | None = None + escape = False i = 0 - length = len(script) + operators = {"|", ";", ">", "&&", "||", ">>", "!"} + escape_map = { + "n": "\n", + "r": "\r", + "t": "\t", + "0": "\0", + "a": "\a", + "b": "\b", + "f": "\f", + "v": "\v", + "\\": "\\", + '"': '"', + "'": "'", + } - while i < length: + def flush_word(): + nonlocal buf + if buf: + tokens.append(Token(TokenKind.WORD, buf)) + buf = "" + + while i < len(script): c = script[i] - match status: - case PipelineParseStatus.normal: - if c in whitespaces: - _flush_token() + if quote is not None: + if escape: + buf += escape_map.get(c, c) + escape = False + elif c == "\\": + escape = True + elif c == quote: + quote = None + else: + buf += c + i += 1 + continue - elif c in "'\"": - status_in_string_pair = c - status = PipelineParseStatus.in_string - current_string_raw = "" - has_token = True + if c in "'\"": + quote = c + i += 1 + continue - elif c == "|": - _flush_token() - if err := _flush_command(): - return err - # 管道符不结束 group,继续在 current_group 添加 + if c.isspace() or c in "  ": + flush_word() + i += 1 + continue - elif c == ";": - _flush_token() - if err := _flush_command(): - return err - # 分号结束 group - if current_group: - pipeline.command_groups.append(current_group) - current_group = [] + two = script[i : i + 2] + if two in operators: + flush_word() + tokens.append(Token(TokenKind.OP, two)) + i += 2 + continue - elif c == ">": - _flush_token() # 先结束之前的参数 - # 检查是否是 append 模式 (>>) - if i + 1 < length and script[i + 1] == ">": - current_redirect_append = True - i += 1 # 跳过下一个 > - else: - current_redirect_append = False + if c in {"|", ";", ">", "!"}: + flush_word() + tokens.append(Token(TokenKind.OP, c)) + i += 1 + continue - # 标记下一个 token 为文件名 - is_parsing_redirect_filename = True - - else: - current_string += c - has_token = True - - case PipelineParseStatus.in_string: - current_string_raw += c - if c == status_in_string_pair: - status = PipelineParseStatus.off_string - elif c == "\\": - status = PipelineParseStatus.in_string_to_escape - else: - current_string += c - - case PipelineParseStatus.in_string_to_escape: - escape_map = { - "n": "\n", - "r": "\r", - "t": "\t", - "0": "\0", - "a": "\a", - "b": "\b", - "f": "\f", - "v": "\v", - "\\": "\\", - } - current_string += escape_map.get(c, c) - status = PipelineParseStatus.in_string - - case PipelineParseStatus.off_string: - if c in whitespaces: - _flush_token() - status = PipelineParseStatus.normal - elif c == "|": - _flush_token() - if err := _flush_command(): - return err - status = PipelineParseStatus.normal - elif c == ";": - _flush_token() - if err := _flush_command(): - return err - if current_group: - pipeline.command_groups.append(current_group) - current_group = [] - status = PipelineParseStatus.normal - elif c == ">": - _flush_token() - status = PipelineParseStatus.normal - # 回退索引,让下一次循环进入 normal 状态的 > 处理逻辑 - i -= 1 - else: - # 紧接着的字符继续作为当前字符串的一部分 (如 "abc"d) - current_string += c - current_string_raw = "" - status = PipelineParseStatus.normal + if c == "\\": + if i + 1 < len(script): + i += 1 + buf += escape_map.get(script[i], script[i]) + else: + buf += c + i += 1 + continue + buf += c i += 1 - # 循环结束后的收尾 - _flush_token() - if err := _flush_command(): - return err + if quote is not None: + return "存在未闭合的引号" + if escape: + buf += "\\" - if current_group: - pipeline.command_groups.append(current_group) + flush_word() + return tokens - return pipeline + def parse_pipeline(self, script: str) -> Script | str: + tokens = self.tokenize(script) + if isinstance(tokens, str): + return tokens + if not tokens: + return Script() + + pos = 0 + + def peek(offset: int = 0) -> Token | None: + idx = pos + offset + return tokens[idx] if idx < len(tokens) else None + + def consume() -> Token: + nonlocal pos + tok = tokens[pos] + pos += 1 + return tok + + def consume_if_op(value: str) -> bool: + tok = peek() + if tok is not None and tok.kind == TokenKind.OP and tok.value == value: + consume() + return True + return False + + def consume_if_word(value: str) -> bool: + tok = peek() + if tok is not None and tok.kind == TokenKind.WORD and tok.value == value: + consume() + return True + return False + + def expect_word(msg: str) -> Token | str: + tok = peek() + if tok is None or tok.kind != TokenKind.WORD: + return msg + return consume() + + def parse_command() -> CommandNode | str: + first = expect_word("缺少指令名") + if isinstance(first, str): + return first + + handler = self._resolve_handler(first.value) + if isinstance(handler, str): + return handler + + args: list[str] = [] + redirects: list[Redirect] = [] + + while True: + tok = peek() + if tok is None: + break + if tok.kind == TokenKind.OP and tok.value in {"|", ";", "&&", "||"}: + break + if tok.kind == TokenKind.OP and tok.value in {">", ">>"}: + op_tok = consume() + target = expect_word("重定向操作符后面需要缓存名") + if isinstance(target, str): + return target + redirects.append( + Redirect(target=target.value, append=op_tok.value == ">>") + ) + continue + if tok.kind != TokenKind.WORD: + return f"无法解析的 token: {tok.value}" + args.append(consume().value) + + return CommandNode( + name=first.value, + handler=handler, + args=args, + redirects=redirects, + ) + + def parse_pipe() -> PipelineNode | str: + negate = False + while consume_if_op("!"): + negate = not negate + + pipeline = PipelineNode(negate=negate) + command = parse_command() + if isinstance(command, str): + return command + pipeline.commands.append(command) + + while True: + tok = peek() + if tok is None or tok.kind != TokenKind.OP or tok.value != "|": + break + consume() + next_command = parse_command() + if isinstance(next_command, str): + return next_command + pipeline.commands.append(next_command) + + return pipeline + + def parse_chain() -> CommandGroup | str: + group = CommandGroup() + first_pipeline = parse_pipe() + if isinstance(first_pipeline, str): + return first_pipeline + group.chains.append(ConditionalPipeline(op=None, pipeline=first_pipeline)) + + while True: + tok = peek() + if tok is None or tok.kind != TokenKind.OP or tok.value not in {"&&", "||"}: + break + op = consume().value + next_pipeline = parse_pipe() + if isinstance(next_pipeline, str): + return next_pipeline + group.chains.append(ConditionalPipeline(op=op, pipeline=next_pipeline)) + + return group + + def parse_if() -> IfNode | str: + if not consume_if_word("if"): + return "缺少 if" + + condition = parse_chain() + if isinstance(condition, str): + return condition + + consume_if_op(";") + if not consume_if_word("then"): + return "if 语句缺少 then" + + then_body = parse_script(stop_words={"else", "fi"}) + if isinstance(then_body, str): + return then_body + + else_body: Script | None = None + if consume_if_word("else"): + else_body = parse_script(stop_words={"fi"}) + if isinstance(else_body, str): + return else_body + + if not consume_if_word("fi"): + return "if 语句缺少 fi" + + return IfNode(condition=condition, then_body=then_body, else_body=else_body) + + def parse_while() -> WhileNode | str: + if not consume_if_word("while"): + return "缺少 while" + + condition = parse_chain() + if isinstance(condition, str): + return condition + + consume_if_op(";") + if not consume_if_word("do"): + return "while 语句缺少 do" + + body = parse_script(stop_words={"done"}) + if isinstance(body, str): + return body + + if not consume_if_word("done"): + return "while 语句缺少 done" + + return WhileNode(condition=condition, body=body) + + def parse_statement() -> CommandGroup | IfNode | WhileNode | str: + tok = peek() + if tok is not None and tok.kind == TokenKind.WORD: + if tok.value == "if": + return parse_if() + if tok.value == "while": + return parse_while() + return parse_chain() + + def parse_script(stop_words: set[str] | None = None) -> Script | str: + parsed = Script() + nonlocal pos + + while pos < len(tokens): + tok = peek() + if tok is None: + break + + if stop_words and tok.kind == TokenKind.WORD and tok.value in stop_words: + break + + if tok.kind == TokenKind.OP and tok.value == ";": + consume() + continue + + statement = parse_statement() + if isinstance(statement, str): + return statement + parsed.statements.append(statement) + + tok = peek() + if tok is not None and tok.kind == TokenKind.OP and tok.value == ";": + consume() + + return parsed + + parsed = parse_script() + if isinstance(parsed, str): + return parsed + if pos != len(tokens): + tok = tokens[pos] + return f"无法解析的 token: {tok.value}" + return parsed + + async def _execute_command( + self, + command: CommandNode, + istream: str | None, + env: TextHandlerEnvironment, + ) -> TextHandleResult: + logger.debug( + f"Executing: {command.name} args={command.args} redirects={command.redirects}" + ) + result = await command.handler.handle(env, istream, command.args) + + if result.code != 0: + return result + + if command.redirects: + content = result.ostream or "" + for redirect in command.redirects: + if redirect.append: + old_content = env.buffers.get(redirect.target, "") + env.buffers[redirect.target] = old_content + content + else: + env.buffers[redirect.target] = content + return TextHandleResult(code=0, ostream=None, attachment=result.attachment) + + return result + + async def _execute_pipeline( + self, + pipeline: PipelineNode, + istream: str | None, + env: TextHandlerEnvironment, + ) -> TextHandleResult: + current_stream = istream + last_result = TextHandleResult(code=0, ostream=None) + + for command in pipeline.commands: + try: + last_result = await self._execute_command(command, current_stream, env) + except Exception as e: + logger.error(f"Pipeline execution failed at {command.name}") + logger.exception(e) + return TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误") + + if last_result.code != 0: + if pipeline.negate: + return TextHandleResult(code=0, ostream=None) + return last_result + current_stream = last_result.ostream + + if pipeline.negate: + return TextHandleResult(code=1, ostream=None) + return last_result + + async def _execute_group( + self, + group: CommandGroup, + istream: str | None, + env: TextHandlerEnvironment, + ) -> TextHandleResult: + last_result = TextHandleResult(code=0, ostream=None) + + for chain in group.chains: + should_run = True + if chain.op == "&&": + should_run = last_result.code == 0 + elif chain.op == "||": + should_run = last_result.code != 0 + + if should_run: + last_result = await self._execute_pipeline(chain.pipeline, istream, env) + + return last_result + + async def _execute_if( + self, + if_node: IfNode, + istream: str | None, + env: TextHandlerEnvironment, + ) -> TextHandleResult: + condition_result = await self._execute_group(if_node.condition, istream, env) + if condition_result.code == 0: + results = await self.run_pipeline(if_node.then_body, istream, env) + else: + results = ( + await self.run_pipeline(if_node.else_body, istream, env) + if if_node.else_body is not None + else [TextHandleResult(code=0, ostream=None)] + ) + return results[-1] if results else TextHandleResult(code=0, ostream=None) + + async def _execute_while( + self, + while_node: WhileNode, + istream: str | None, + env: TextHandlerEnvironment, + ) -> TextHandleResult: + last_result = TextHandleResult(code=0, ostream=None) + + for _ in range(MAX_WHILE_ITERATIONS): + condition_result = await self._execute_group(while_node.condition, istream, env) + if condition_result.code != 0: + return last_result + + body_results = await self.run_pipeline(while_node.body, istream, env) + if body_results: + last_result = body_results[-1] + if last_result.code != 0: + return last_result + + return TextHandleResult( + code=2, + ostream=f"while 循环超过最大迭代次数限制({MAX_WHILE_ITERATIONS})", + ) async def run_pipeline( self, - pipeline: Pipeline, + pipeline: Script, istream: str | None, env: TextHandlerEnvironment | None = None, ) -> list[TextHandleResult]: @@ -293,54 +562,21 @@ class PipelineRunner: results: list[TextHandleResult] = [] - # 遍历执行指令组 (分号分隔),每个组独立产生输出 - for group in pipeline.command_groups: - current_stream = istream - group_result = TextHandleResult(code=0, ostream=None) - - # 遍历组内指令 (管道分隔) - for cmd in group: - try: - logger.debug( - f"Executing: {cmd.handler.name} args={cmd.args} redirect={cmd.redirect_target}" - ) - result = await cmd.handler.handle(env, current_stream, cmd.args) - - if result.code != 0: - # 组内出错,整条流水线中止 - results.append(result) - return results - - # 处理重定向逻辑 - if cmd.redirect_target: - content_to_write = result.ostream or "" - target_buffer = cmd.redirect_target - - if cmd.redirect_append: - old_content = env.buffers.get(target_buffer, "") - env.buffers[target_buffer] = old_content + content_to_write - else: - env.buffers[target_buffer] = content_to_write - - current_stream = None - group_result = TextHandleResult( - code=0, ostream=None, attachment=result.attachment - ) - else: - current_stream = result.ostream - group_result = result - - except Exception as e: - logger.error(f"Pipeline execution failed at {cmd.handler.name}") - logger.exception(e) - results.append( - TextHandleResult( - code=-1, ostream="处理流水线时出现 python 错误" - ) - ) - return results - - results.append(group_result) + for statement in pipeline.statements: + try: + if isinstance(statement, IfNode): + results.append(await self._execute_if(statement, istream, env)) + elif isinstance(statement, WhileNode): + results.append(await self._execute_while(statement, istream, env)) + else: + results.append(await self._execute_group(statement, istream, env)) + except Exception as e: + logger.error(f"Pipeline execution failed: {e}") + logger.exception(e) + results.append( + TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误") + ) + return results return results diff --git a/konabot/plugins/handle_text/handlers/unix_handlers.py b/konabot/plugins/handle_text/handlers/unix_handlers.py index 4d49c27..16e210d 100644 --- a/konabot/plugins/handle_text/handlers/unix_handlers.py +++ b/konabot/plugins/handle_text/handlers/unix_handlers.py @@ -26,7 +26,6 @@ class THCat(TextHandler): async def handle( self, env: TextHandlerEnvironment, istream: str | None, args: list[str] ) -> TextHandleResult: - # No args: pass through stdin (like Unix cat with no arguments) if len(args) == 0: if istream is None: return TextHandleResult( @@ -35,7 +34,6 @@ class THCat(TextHandler): ) return TextHandleResult(0, istream) - # Concatenate all specified sources in order parts: list[str] = [] for arg in args: if arg == "-": @@ -74,7 +72,6 @@ class THReplace(TextHandler): async def handle( self, env: TextHandlerEnvironment, istream: str | None, args: list[str] ) -> TextHandleResult: - # 用法: replace [text] if len(args) < 2: return TextHandleResult(1, "用法:replace <正则> <替换内容> [文本]") @@ -90,3 +87,77 @@ class THReplace(TextHandler): return TextHandleResult(0, res) except Exception as e: return TextHandleResult(1, f"正则错误: {str(e)}") + + +class THTrue(TextHandler): + name = "true" + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + return TextHandleResult(0, istream) + + +class THFalse(TextHandler): + name = "false" + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + return TextHandleResult(1, None) + + +class THTest(TextHandler): + name = "test" + keywords = ["["] + + def _bool_result(self, value: bool) -> TextHandleResult: + return TextHandleResult(0 if value else 1, None) + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + expr = list(args) + + # 支持方括号语法:[ expr ] 会自动移除末尾的 ] + if expr and expr[-1] == "]": + expr = expr[:-1] + + if not expr: + return TextHandleResult(1, None) + + if len(expr) == 1: + return self._bool_result(len(expr[0]) > 0) + + if len(expr) == 2: + op, value = expr + if op == "-n": + return self._bool_result(len(value) > 0) + if op == "-z": + return self._bool_result(len(value) == 0) + return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}") + + if len(expr) == 3: + left, op, right = expr + if op == "=": + return self._bool_result(left == right) + if op == "!=": + return self._bool_result(left != right) + if op in {"-eq", "-ne", "-gt", "-ge", "-lt", "-le"}: + try: + li = int(left) + ri = int(right) + except ValueError: + return TextHandleResult(2, "test 的数字比较参数必须是整数") + mapping = { + "-eq": li == ri, + "-ne": li != ri, + "-gt": li > ri, + "-ge": li >= ri, + "-lt": li < ri, + "-le": li <= ri, + } + return self._bool_result(mapping[op]) + return TextHandleResult(2, f"test 不支持的操作符: {op}") + + return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}") diff --git a/tests/test_textfx_runtime_limits.py b/tests/test_textfx_runtime_limits.py new file mode 100644 index 0000000..880e407 --- /dev/null +++ b/tests/test_textfx_runtime_limits.py @@ -0,0 +1,75 @@ +import nonebot + +nonebot.init() + +import asyncio +import pytest +from konabot.plugins.handle_text.__init__ import ( + _get_textfx_user_key, + _textfx_running_users, + TEXTFX_MAX_RUNTIME_SECONDS, +) +from konabot.plugins.handle_text.base import PipelineRunner + + +class DummyEvent: + def __init__(self, self_id=None, user_id=None, group_id=None, session_id=None): + self.self_id = self_id + self.user_id = user_id + self.group_id = group_id + self._session_id = session_id + + def get_session_id(self): + if self._session_id is None: + raise RuntimeError('no session') + return self._session_id + + +def test_textfx_user_key_group(): + evt = DummyEvent(self_id='123', user_id='456', group_id='789') + assert _get_textfx_user_key(evt) == '123:789:456' + + +def test_textfx_user_key_private(): + evt = DummyEvent(self_id='123', user_id='456') + assert _get_textfx_user_key(evt) == '123:private:456' + + +def test_textfx_user_key_session_fallback(): + evt = DummyEvent(session_id='console:alice') + assert _get_textfx_user_key(evt) == 'session:console:alice' + + +@pytest.mark.asyncio +async def test_textfx_timeout_limit(): + """测试脚本执行超时限制""" + runner = PipelineRunner.get_runner() + + # 创建一个会超时的脚本(while true 会触发迭代限制,但我们用 sleep 模拟长时间运行) + # 由于实际超时是 60 秒,我们不能真的等那么久,所以这个测试验证超时机制存在 + script = "echo start" + parsed = runner.parse_pipeline(script) + assert not isinstance(parsed, str), "脚本解析应该成功" + + # 验证 TEXTFX_MAX_RUNTIME_SECONDS 常量存在且合理 + assert TEXTFX_MAX_RUNTIME_SECONDS == 60 + + +@pytest.mark.asyncio +async def test_textfx_concurrent_limit(): + """测试同一用户并发执行限制""" + user_key = "test:group:user123" + + # 清理可能的残留状态 + _textfx_running_users.discard(user_key) + + # 模拟第一个脚本正在运行 + assert user_key not in _textfx_running_users + _textfx_running_users.add(user_key) + + # 验证用户已被标记为运行中 + assert user_key in _textfx_running_users + + # 清理 + _textfx_running_users.discard(user_key) + assert user_key not in _textfx_running_users diff --git a/tests/test_textfx_shell.py b/tests/test_textfx_shell.py new file mode 100644 index 0000000..5e7def7 --- /dev/null +++ b/tests/test_textfx_shell.py @@ -0,0 +1,207 @@ +import pytest +import nonebot + +nonebot.init() + +from konabot.plugins.handle_text.base import IfNode, PipelineRunner, TextHandlerEnvironment, WhileNode +from konabot.plugins.handle_text.handlers.encoding_handlers import THReverse +from konabot.plugins.handle_text.handlers.unix_handlers import ( + THCat, + THEcho, + THFalse, + THRm, + THTest, + THTrue, +) +from konabot.plugins.handle_text.handlers.whitespace_handlers import THTrim + + +@pytest.fixture +def runner() -> PipelineRunner: + runner = PipelineRunner() + runner.register(THEcho()) + runner.register(THCat()) + runner.register(THRm()) + runner.register(THTrue()) + runner.register(THFalse()) + runner.register(THTest()) + runner.register(THReverse()) + runner.register(THTrim()) + return runner + + +def test_parse_pipeline_shell_ops(runner: PipelineRunner): + parsed = runner.parse_pipeline('echo hello | reverse && test a = a || echo no; echo done > out') + assert not isinstance(parsed, str) + assert len(parsed.statements) == 2 + first = parsed.statements[0] + second = parsed.statements[1] + assert not isinstance(first, IfNode) + assert not isinstance(first, WhileNode) + assert not isinstance(second, IfNode) + assert not isinstance(second, WhileNode) + assert len(first.chains) == 3 + assert first.chains[0].pipeline.commands[0].name == 'echo' + assert first.chains[0].pipeline.commands[1].name == 'reverse' + assert second.chains[0].pipeline.commands[0].redirects[0].target == 'out' + + +def test_parse_if_statement(runner: PipelineRunner): + parsed = runner.parse_pipeline('if test a = a; then echo yes; else echo no; fi') + assert not isinstance(parsed, str) + assert len(parsed.statements) == 1 + stmt = parsed.statements[0] + assert isinstance(stmt, IfNode) + assert stmt.else_body is not None + assert len(stmt.then_body.statements) == 1 + + +def test_parse_while_statement(runner: PipelineRunner): + parsed = runner.parse_pipeline('while false; do echo yes; done') + assert not isinstance(parsed, str) + assert len(parsed.statements) == 1 + stmt = parsed.statements[0] + assert isinstance(stmt, WhileNode) + assert len(stmt.body.statements) == 1 + + +@pytest.mark.asyncio +async def test_pipeline_pipe(runner: PipelineRunner): + parsed = runner.parse_pipeline('echo hello | reverse') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert len(results) == 1 + assert results[0].code == 0 + assert results[0].ostream == 'olleh' + + +@pytest.mark.asyncio +async def test_redirect_and_cat(runner: PipelineRunner): + parsed = runner.parse_pipeline('echo hello > a; cat a') + assert not isinstance(parsed, str) + env = TextHandlerEnvironment(False) + results = await runner.run_pipeline(parsed, None, env) + assert env.buffers['a'] == 'hello' + assert results[-1].ostream == 'hello' + + +@pytest.mark.asyncio +async def test_append_redirect(runner: PipelineRunner): + parsed = runner.parse_pipeline('echo hello > a; echo world >> a; cat a') + assert not isinstance(parsed, str) + env = TextHandlerEnvironment(False) + results = await runner.run_pipeline(parsed, None, env) + assert env.buffers['a'] == 'helloworld' + assert results[-1].ostream == 'helloworld' + + +@pytest.mark.asyncio +async def test_and_or_short_circuit(runner: PipelineRunner): + parsed = runner.parse_pipeline('test a = b && echo bad || echo ok') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert len(results) == 1 + assert results[0].code == 0 + assert results[0].ostream == 'ok' + + +@pytest.mark.asyncio +async def test_test_bracket_alias(runner: PipelineRunner): + parsed = runner.parse_pipeline('[ 2 -gt 1 ] && echo yes') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].code == 0 + assert results[0].ostream == 'yes' + + +@pytest.mark.asyncio +async def test_test_string_ops(runner: PipelineRunner): + parsed = runner.parse_pipeline('test -n abc && echo yes; test -z abc || echo no') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert [r.ostream for r in results] == ['yes', 'no'] + + +@pytest.mark.asyncio +async def test_quote_and_trim(runner: PipelineRunner): + parsed = runner.parse_pipeline('echo " hello world " | trim') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].ostream == 'hello world' + + +@pytest.mark.asyncio +async def test_if_then_else(runner: PipelineRunner): + parsed = runner.parse_pipeline('if test a = b; then echo yes; else echo no; fi') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].code == 0 + assert results[0].ostream == 'no' + + +@pytest.mark.asyncio +async def test_if_then_without_else(runner: PipelineRunner): + parsed = runner.parse_pipeline('if test a = a; then echo yes; fi') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].ostream == 'yes' + + +@pytest.mark.asyncio +async def test_nested_if(runner: PipelineRunner): + parsed = runner.parse_pipeline( + 'if test a = a; then if test b = c; then echo x; else echo y; fi; else echo z; fi' + ) + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].ostream == 'y' + + +@pytest.mark.asyncio +async def test_negate_pipeline(runner: PipelineRunner): + parsed = runner.parse_pipeline('! test a = b && echo ok') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].ostream == 'ok' + + +@pytest.mark.asyncio +async def test_true_false(runner: PipelineRunner): + parsed = runner.parse_pipeline('true && echo yes; false || echo no') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert [r.ostream for r in results] == ['yes', 'no'] + + +@pytest.mark.asyncio +async def test_while_false_noop(runner: PipelineRunner): + parsed = runner.parse_pipeline('while false; do echo yes; done') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].code == 0 + assert results[0].ostream is None + + +@pytest.mark.asyncio +async def test_while_limit_guard(runner: PipelineRunner): + parsed = runner.parse_pipeline('while true; do echo yes; done') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].code == 2 + assert 'while 循环超过最大迭代次数限制' in (results[0].ostream or '') + + +@pytest.mark.asyncio +async def test_while_with_immediate_break_condition(runner: PipelineRunner): + parsed = runner.parse_pipeline('while ! false; do false; done') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].code == 1 + + +@pytest.mark.asyncio +async def test_while_body_can_use_if(runner: PipelineRunner): + parsed = runner.parse_pipeline('while ! false; do if true; then false; fi; done') + assert not isinstance(parsed, str) + results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) + assert results[0].code == 1