diff --git a/konabot/docs/user/textfx.txt b/konabot/docs/user/textfx.txt index e274e62..e798465 100644 --- a/konabot/docs/user/textfx.txt +++ b/konabot/docs/user/textfx.txt @@ -30,14 +30,35 @@ ## 二、流水线语法(超简单) - 用 `|` 连接多个操作,前一个的输出自动作为后一个的输入。 -- 用 `>` 或 `>>` 把结果保存起来(见下文)。 +- 用 `;` 分隔多条独立指令,它们各自产生输出,最终合并显示。 +- 用 `>` 或 `>>` 把结果保存起来(见下文),被重定向的指令不会产生输出。 -**例子**:把“HELLO”先反转,再转成摩斯电码:(转换为摩斯电码功能暂未实现) +**例子**:把"HELLO"先反转,再转成摩斯电码:(转换为摩斯电码功能暂未实现) ``` textfx reverse HELLO | morse en ``` → 输出:`--- .-.. .-.. . ....` +**例子**:多条指令各自输出: +``` +textfx echo 你好; echo 世界 +``` +→ 输出: +``` +你好 +世界 +``` + +**例子**:重定向的指令不输出,其余正常输出: +``` +textfx echo 1; echo 2 > a; echo 3 +``` +→ 输出: +``` +1 +3 +``` + --- ## 三、功能清单(含示例) @@ -91,6 +112,18 @@ Base64 编码或解码。 输出指定文字。 示例:`/textfx echo 你好` → `你好` +### cat +读取并拼接缓存内容,类似 Unix cat 命令。 +- 无参数时直接传递标准输入(管道输入或回复的消息)。 +- 使用 `-` 代表标准输入,可与缓存名混合使用。 +- 支持多个参数,按顺序拼接输出。 + +示例: +- 传递输入:`/textfx echo 你好 | cat` → `你好` +- 读取缓存:`/textfx cat mytext` → 输出 mytext 的内容 +- 拼接多个缓存:`/textfx cat a b c` → 依次拼接缓存 a、b、c +- 混合标准输入和缓存:`/textfx echo 前缀 | cat - mytext` → 拼接标准输入与缓存 mytext + ### 缓存操作(保存中间结果) - 保存:`/textfx reverse 你好 > mytext`(不输出,存入 mytext) - 读取:`/textfx cat mytext` → `好你` @@ -104,6 +137,32 @@ Base64 编码或解码。 示例(普通):`/textfx replace 世界 宇宙 你好世界` → `你好宇宙` 示例(正则):`/textfx replace \d+ [数字] 我有123个苹果` → `我有[数字]个苹果` +### trim(或 strip、去空格) +去除文本首尾空白字符。 +示例:`/textfx trim " 你好 "` → `你好` +示例:`/textfx echo " hello " | trim` → `hello` + +### ltrim(或 lstrip) +去除文本左侧空白字符。 +示例:`/textfx ltrim " 你好 "` → `你好 ` + +### rtrim(或 rstrip) +去除文本右侧空白字符。 +示例:`/textfx rtrim " 你好 "` → ` 你好` + +### squeeze(或 压缩空白) +将连续的空白字符(空格、制表符)压缩为单个空格。 +示例:`/textfx squeeze "你好 世界"` → `你好 世界` + +### lines(或 行处理) +按行处理文本,支持以下子命令: +- `lines trim` — 去除每行首尾空白 +- `lines empty` — 去除所有空行 +- `lines squeeze` — 将连续空行压缩为一行 + +示例:`/textfx echo " hello\n\n\n world " | lines trim` → `hello\n\n\n world` +示例:`/textfx echo "a\n\n\nb" | lines squeeze` → `a\n\nb` + --- ## 常见问题 diff --git a/konabot/plugins/handle_text/__init__.py b/konabot/plugins/handle_text/__init__.py index a7c5c8f..f8d417f 100644 --- a/konabot/plugins/handle_text/__init__.py +++ b/konabot/plugins/handle_text/__init__.py @@ -10,11 +10,36 @@ from nonebot.adapters.onebot.v11.message import Message as OB11Message from konabot.common.apis.ali_content_safety import AlibabaGreen from konabot.common.longtask import DepLongTaskTarget -from konabot.plugins.handle_text.base import PipelineRunner, TextHandlerEnvironment, register_text_handlers +from konabot.plugins.handle_text.base import ( + PipelineRunner, + TextHandlerEnvironment, + register_text_handlers, +) from konabot.plugins.handle_text.handlers.ai_handlers import THQwen -from konabot.plugins.handle_text.handlers.encoding_handlers import THAlign, THAlphaConv, THB64Hex, THBase64, THBaseConv, THCaesar, THMorse, THReverse +from konabot.plugins.handle_text.handlers.encoding_handlers import ( + THAlign, + THAlphaConv, + THB64Hex, + THBase64, + THBaseConv, + THCaesar, + THMorse, + THReverse, +) from konabot.plugins.handle_text.handlers.random_handlers import THShuffle, THSorted -from konabot.plugins.handle_text.handlers.unix_handlers import THCat, THEcho, THReplace, THRm +from konabot.plugins.handle_text.handlers.unix_handlers import ( + THCat, + THEcho, + THReplace, + THRm, +) +from konabot.plugins.handle_text.handlers.whitespace_handlers import ( + THLines, + THLTrim, + THRTrim, + THSqueeze, + THTrim, +) cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"}) @@ -28,11 +53,13 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget): istream = evt.reply.message.extract_plain_text() else: for seg in evt.get_message(): - if seg.type == 'reply': - msgid = seg.get('id') + if seg.type == "reply": + msgid = seg.get("id") if msgid is not None: msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid) - istream = OB11Message(msg2data.get("message")).extract_plain_text() + istream = OB11Message( + msg2data.get("message") + ).extract_plain_text() script = msg.extract_plain_text().removeprefix("textfx").removeprefix("处理文字") runner = PipelineRunner.get_runner() @@ -42,22 +69,31 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget): await target.send_message(res) return - env = TextHandlerEnvironment( - is_trusted=False - ) - res2 = await runner.run_pipeline(res, istream or None, env) - if res2.code != 0: - await target.send_message(f"处理指令时出现问题:{res2.ostream}") - elif res2.ostream is not None: - txt = res2.ostream + env = TextHandlerEnvironment(is_trusted=False) + results = await runner.run_pipeline(res, istream or None, env) + + # 检查是否有错误 + for r in results: + if r.code != 0: + await target.send_message(f"处理指令时出现问题:{r.ostream}") + return + + # 收集所有组的文本输出和附件 + ostreams = [r.ostream for r in results if r.ostream is not None] + attachments = [r.attachment for r in results if r.attachment is not None] + + if ostreams: + txt = "\n".join(ostreams) err = await AlibabaGreen.detect(txt) if not err: - await target.send_message("处理指令时出现问题:内容被拦截!请你检查你的内容是否合理!") + await target.send_message( + "处理指令时出现问题:内容被拦截!请你检查你的内容是否合理!" + ) return - await target.send_message(res2.ostream, at=False) - if res2.attachment is not None: - # 潜在风险点:这里没有人可以做安全检查 - await target.send_message(UniMessage.image(raw=res2.attachment), at=False) + await target.send_message(txt, at=False) + + for att in attachments: + await target.send_message(UniMessage.image(raw=att), at=False) driver = nonebot.get_driver() @@ -81,6 +117,10 @@ async def _(): THSorted(), THMorse(), THQwen(), + THTrim(), + THLTrim(), + THRTrim(), + THSqueeze(), + THLines(), ) logger.info(f"注册了 TextHandler:{PipelineRunner.get_runner().handlers}") - diff --git a/konabot/plugins/handle_text/base.py b/konabot/plugins/handle_text/base.py index cb65578..ede362c 100644 --- a/konabot/plugins/handle_text/base.py +++ b/konabot/plugins/handle_text/base.py @@ -23,7 +23,7 @@ class TextHandleResult: class TextHandler(ABC): - name: str = '' + name: str = "" keywords: list[str] = [] @abstractmethod @@ -37,12 +37,16 @@ class TextHandler(ABC): class TextHandlerSync(TextHandler): @abstractmethod - def handle_sync(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: - ... + def handle_sync( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: ... - async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: def _hs(): return self.handle_sync(env, istream, args) + return await asyncio.to_thread(_hs) @@ -99,6 +103,7 @@ class PipelineRunner: current_string = "" current_string_raw = "" status_in_string_pair = "" + has_token = False # 是否正在构建一个 token(区分空字符串和无 token) # 重定向解析状态 is_parsing_redirect_filename = False @@ -111,8 +116,9 @@ class PipelineRunner: current_string, \ current_string_raw, \ is_parsing_redirect_filename, \ - current_redirect_target - if not current_string: + current_redirect_target, \ + has_token + if not has_token: return if is_parsing_redirect_filename: @@ -123,6 +129,7 @@ class PipelineRunner: current_string = "" current_string_raw = "" + has_token = False # 辅助函数:将当前指令 flush 到当前组 def _flush_command() -> str | None: @@ -176,6 +183,7 @@ class PipelineRunner: status_in_string_pair = c status = PipelineParseStatus.in_string current_string_raw = "" + has_token = True elif c == "|": _flush_token() @@ -206,6 +214,7 @@ class PipelineRunner: else: current_string += c + has_token = True case PipelineParseStatus.in_string: current_string_raw += c @@ -217,7 +226,18 @@ class PipelineRunner: current_string += c case PipelineParseStatus.in_string_to_escape: - current_string += c + escape_map = { + "n": "\n", + "r": "\r", + "t": "\t", + "0": "\0", + "a": "\a", + "b": "\b", + "f": "\f", + "v": "\v", + "\\": "\\", + } + current_string += escape_map.get(c, c) status = PipelineParseStatus.in_string case PipelineParseStatus.off_string: @@ -265,24 +285,19 @@ class PipelineRunner: pipeline: Pipeline, istream: str | None, env: TextHandlerEnvironment | None = None, - ) -> TextHandleResult: + ) -> list[TextHandleResult]: if env is None: - # 默认环境 env = TextHandlerEnvironment(is_trusted=False, buffers={}) - final_result = TextHandleResult(code=0, ostream=istream) + results: list[TextHandleResult] = [] - # 遍历执行指令组 (分号分隔) + # 遍历执行指令组 (分号分隔),每个组独立产生输出 for group in pipeline.command_groups: - # 每个组开始时,使用原始输入(或者根据需求设为 None,这里假设每个组独立处理 istream) - # 通常分号分隔的命令组,第一条命令如果没有 pipe 输入,它接收的 istream 取决于整体输入 current_stream = istream + group_result = TextHandleResult(code=0, ostream=None) # 遍历组内指令 (管道分隔) for cmd in group: - if final_result.code != 0: - break - try: logger.debug( f"Executing: {cmd.handler.name} args={cmd.args} redirect={cmd.redirect_target}" @@ -290,8 +305,9 @@ class PipelineRunner: result = await cmd.handler.handle(env, current_stream, cmd.args) if result.code != 0: - final_result = result - break + # 组内出错,整条流水线中止 + results.append(result) + return results # 处理重定向逻辑 if cmd.redirect_target: @@ -304,29 +320,27 @@ class PipelineRunner: else: env.buffers[target_buffer] = content_to_write - # 重定向后,标准输出通常被消耗,后续管道接收到的流为空 (或 None) - # 除非实现 tee 逻辑,否则视为流已终止 current_stream = None - - # 更新最终结果,但 ostream 设为 None 因为被重定向了 - final_result = TextHandleResult( + group_result = TextHandleResult( code=0, ostream=None, attachment=result.attachment ) else: current_stream = result.ostream - final_result = result + group_result = result except Exception as e: logger.error(f"Pipeline execution failed at {cmd.handler.name}") logger.exception(e) - return TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误") + results.append( + TextHandleResult( + code=-1, ostream="处理流水线时出现 python 错误" + ) + ) + return results - # 一个组执行完,final_result保留该组最后的状态。 - # 如果还有下一个组,final_result.code 如果是 0 则继续执行下一个组 - if final_result.code != 0: - break + results.append(group_result) - return final_result + return results def register_text_handlers(*handlers: TextHandler): diff --git a/konabot/plugins/handle_text/handlers/unix_handlers.py b/konabot/plugins/handle_text/handlers/unix_handlers.py index f248104..4d49c27 100644 --- a/konabot/plugins/handle_text/handlers/unix_handlers.py +++ b/konabot/plugins/handle_text/handlers/unix_handlers.py @@ -1,41 +1,65 @@ import re -from konabot.plugins.handle_text.base import TextHandleResult, TextHandler, TextHandlerEnvironment +from konabot.plugins.handle_text.base import ( + TextHandleResult, + TextHandler, + TextHandlerEnvironment, +) class THEcho(TextHandler): - name = 'echo' + name = "echo" - async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: if len(args) == 0 and istream is None: return TextHandleResult(1, "请在 echo 后面添加需要输出的文本") if istream is not None: - return TextHandleResult(0, '\n'.join([istream] + args)) - return TextHandleResult(0, '\n'.join(args)) + return TextHandleResult(0, "\n".join([istream] + args)) + return TextHandleResult(0, "\n".join(args)) class THCat(TextHandler): - name = 'cat' + name = "cat" - async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: - if len(args) != 1: - return TextHandleResult(1, "cat 使用方法:cat <缓存名>") - buf = args[0] - if buf == '-': - buf = istream - if buf not in env.buffers: - return TextHandleResult(2, f"缓存 {buf} 不存在") - return TextHandleResult(0, env.buffers[buf]) + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + # No args: pass through stdin (like Unix cat with no arguments) + if len(args) == 0: + if istream is None: + return TextHandleResult( + 1, + "cat 使用方法:cat [缓存名 ...]\n使用 - 代表标准输入,可拼接多个缓存", + ) + return TextHandleResult(0, istream) + + # Concatenate all specified sources in order + parts: list[str] = [] + for arg in args: + if arg == "-": + if istream is None: + return TextHandleResult(2, "标准输入为空(没有管道输入或回复消息)") + parts.append(istream) + else: + if arg not in env.buffers: + return TextHandleResult(2, f"缓存 {arg} 不存在") + parts.append(env.buffers[arg]) + + return TextHandleResult(0, "\n".join(parts)) class THRm(TextHandler): - name = 'rm' + name = "rm" - async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: if len(args) != 1: return TextHandleResult(1, "rm 使用方法:rm <缓存名>") buf = args[0] - if buf == '-': + if buf == "-": buf = istream if buf not in env.buffers: return TextHandleResult(2, f"缓存 {buf} 不存在") @@ -47,17 +71,22 @@ class THReplace(TextHandler): name = "replace" keywords = ["sed", "替换"] - async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: # 用法: replace [text] if len(args) < 2: return TextHandleResult(1, "用法:replace <正则> <替换内容> [文本]") - + pattern, repl = args[0], args[1] - text = istream if istream is not None else (" ".join(args[2:]) if len(args) > 2 else "") - + text = ( + istream + if istream is not None + else (" ".join(args[2:]) if len(args) > 2 else "") + ) + try: res = re.sub(pattern, repl, text) return TextHandleResult(0, res) except Exception as e: return TextHandleResult(1, f"正则错误: {str(e)}") - diff --git a/konabot/plugins/handle_text/handlers/whitespace_handlers.py b/konabot/plugins/handle_text/handlers/whitespace_handlers.py new file mode 100644 index 0000000..c1ff6d2 --- /dev/null +++ b/konabot/plugins/handle_text/handlers/whitespace_handlers.py @@ -0,0 +1,126 @@ +import re + +from konabot.plugins.handle_text.base import ( + TextHandleResult, + TextHandler, + TextHandlerEnvironment, +) + + +def _get_text(istream: str | None, args: list[str]) -> str | None: + """从 istream 或 args 中获取待处理文本""" + if istream is not None: + return istream + if args: + return " ".join(args) + return None + + +class THTrim(TextHandler): + name = "trim" + keywords = ["strip", "去空格"] + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + text = _get_text(istream, args) + if text is None: + return TextHandleResult(1, "trim 使用方法:trim [文本]\n去除首尾空白字符") + return TextHandleResult(0, text.strip()) + + +class THLTrim(TextHandler): + name = "ltrim" + keywords = ["lstrip"] + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + text = _get_text(istream, args) + if text is None: + return TextHandleResult(1, "ltrim 使用方法:ltrim [文本]\n去除左侧空白字符") + return TextHandleResult(0, text.lstrip()) + + +class THRTrim(TextHandler): + name = "rtrim" + keywords = ["rstrip"] + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + text = _get_text(istream, args) + if text is None: + return TextHandleResult(1, "rtrim 使用方法:rtrim [文本]\n去除右侧空白字符") + return TextHandleResult(0, text.rstrip()) + + +class THSqueeze(TextHandler): + name = "squeeze" + keywords = ["压缩空白"] + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + text = _get_text(istream, args) + if text is None: + return TextHandleResult( + 1, "squeeze 使用方法:squeeze [文本]\n将连续空白字符压缩为单个空格" + ) + return TextHandleResult(0, re.sub(r"[ \t]+", " ", text)) + + +class THLines(TextHandler): + name = "lines" + keywords = ["行处理"] + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + # lines <子命令> [文本] + # 子命令: trim | empty | squeeze + if len(args) < 1: + return TextHandleResult( + 1, + "lines 使用方法:lines <子命令> [文本]\n" + "子命令:\n" + " trim - 去除每行首尾空白\n" + " empty - 去除所有空行\n" + " squeeze - 将连续空行压缩为一行", + ) + + subcmd = args[0] + text = ( + istream + if istream is not None + else (" ".join(args[1:]) if len(args) > 1 else None) + ) + if text is None: + return TextHandleResult(1, "请提供需要处理的文本(通过管道或参数)") + + raw_lines = text.split("\n") + + match subcmd: + case "trim": + result = "\n".join(line.strip() for line in raw_lines) + case "empty": + result = "\n".join(line for line in raw_lines if line.strip()) + case "squeeze": + squeezed: list[str] = [] + prev_empty = False + for line in raw_lines: + is_empty = not line.strip() + if is_empty: + if not prev_empty: + squeezed.append("") + prev_empty = True + else: + squeezed.append(line) + prev_empty = False + result = "\n".join(squeezed) + case _: + return TextHandleResult( + 1, f"未知子命令:{subcmd}\n可用:trim, empty, squeeze" + ) + + return TextHandleResult(0, result)