diff --git a/konabot/plugins/handle_text/__init__.py b/konabot/plugins/handle_text/__init__.py new file mode 100644 index 0000000..52df24c --- /dev/null +++ b/konabot/plugins/handle_text/__init__.py @@ -0,0 +1,78 @@ +from typing import cast +from loguru import logger +from nonebot import on_command +import nonebot +from nonebot.adapters import Event, Bot +from nonebot_plugin_alconna import UniMessage, UniMsg +from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent +from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot +from nonebot.adapters.onebot.v11.message import Message as OB11Message + +from konabot.common.apis.ali_content_safety import AlibabaGreen +from konabot.common.longtask import DepLongTaskTarget +from konabot.plugins.handle_text.base import PipelineRunner, TextHandlerEnvironment, register_text_handlers +from konabot.plugins.handle_text.handlers.encoding_handlers import THBase64, THCaesar, THReverse +from konabot.plugins.handle_text.handlers.random_handlers import THShuffle +from konabot.plugins.handle_text.handlers.unix_handlers import THCat, THEcho, THReplace, THRm + + +cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"}) + + +@cmd.handle() +async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget): + istream = "" + if isinstance(evt, OB11MessageEvent): + if evt.reply is not None: + istream = evt.reply.message.extract_plain_text() + else: + for seg in evt.get_message(): + if seg.type == 'reply': + msgid = seg.get('id') + if msgid is not None: + msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid) + istream = OB11Message(msg2data.get("message")).extract_plain_text() + + script = msg.extract_plain_text().removeprefix("textfx").removeprefix("处理文字") + runner = PipelineRunner.get_runner() + res = runner.parse_pipeline(script) + + if isinstance(res, str): + await target.send_message(res) + return + + env = TextHandlerEnvironment( + is_trusted=False + ) + res2 = await runner.run_pipeline(res, istream or None, env) + if res2.code != 0: + await target.send_message(f"处理指令时出现问题:{res2.ostream}") + elif res2.ostream is not None: + txt = res2.ostream + err = await AlibabaGreen.detect(txt) + if not err: + await target.send_message("处理指令时出现问题:内容被拦截!请你检查你的内容是否合理!") + return + await target.send_message(res2.ostream) + if res2.attachment is not None: + # 潜在风险点:这里没有人可以做安全检查 + await target.send_message(UniMessage.image(raw=res2.attachment)) + + +driver = nonebot.get_driver() + + +@driver.on_startup +async def _(): + register_text_handlers( + THCat(), + THEcho(), + THRm(), + THShuffle(), + THReplace(), + THBase64(), + THCaesar(), + THReverse(), + ) + logger.info(f"注册了 TextHandler:{PipelineRunner.get_runner().handlers}") + diff --git a/konabot/plugins/handle_text/base.py b/konabot/plugins/handle_text/base.py new file mode 100644 index 0000000..5765ff3 --- /dev/null +++ b/konabot/plugins/handle_text/base.py @@ -0,0 +1,321 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import Enum +from string import whitespace +from typing import cast + +from loguru import logger + + +@dataclass +class TextHandlerEnvironment: + is_trusted: bool + buffers: dict[str, str] = field(default_factory=dict) + + +@dataclass +class TextHandleResult: + code: int + ostream: str | None + attachment: bytes | None = None + + +class TextHandler(ABC): + name: str = '' + keywords: list[str] = [] + + @abstractmethod + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: ... + + def __repr__(self) -> str: + return f"<{self.__class__.__name__}: {self.name} [{''.join(self.keywords)}]>" + + +@dataclass +class PipelineCommand: + handler: TextHandler + args: list[str] + # 新增:重定向目标(buffer key) + redirect_target: str | None = None + # 新增:是否为追加模式 (>>) + redirect_append: bool = False + + +@dataclass +class Pipeline: + command_groups: list[list[PipelineCommand]] = field(default_factory=list) + "一个列表的列表,每一组之间的指令之间使用管道符连接,而不同组之间不会有数据流" + + +class PipelineParseStatus(Enum): + normal = 0 + in_string = 1 + in_string_to_escape = 2 + off_string = 3 + + +whitespaces = whitespace + "  " + + +class PipelineRunner: + handlers: list[TextHandler] + + def __init__(self) -> None: + self.handlers = [] + + @staticmethod + def get_runner(): + if "singleton" not in PipelineRunner.__annotations__: + PipelineRunner.__annotations__["singleton"] = PipelineRunner() + return cast(PipelineRunner, PipelineRunner.__annotations__.get("singleton")) + + def register(self, handler: TextHandler): + self.handlers.append(handler) + + def parse_pipeline(self, script: str) -> Pipeline | str: + pipeline = Pipeline() + + # 当前正在构建的上下文 + current_group: list[PipelineCommand] = [] + current_command_args: list[str] = [] + + # 字符串解析状态 + status = PipelineParseStatus.normal + current_string = "" + current_string_raw = "" + status_in_string_pair = "" + + # 重定向解析状态 + is_parsing_redirect_filename = False + current_redirect_target: str | None = None + current_redirect_append = False + + # 辅助函数:将当前解析到的字符串 flush 到 参数列表 或 重定向目标 + def _flush_token(): + nonlocal \ + current_string, \ + current_string_raw, \ + is_parsing_redirect_filename, \ + current_redirect_target + if not current_string: + return + + if is_parsing_redirect_filename: + current_redirect_target = current_string + is_parsing_redirect_filename = False # 重定向文件名只取一个 token + else: + current_command_args.append(current_string) + + current_string = "" + current_string_raw = "" + + # 辅助函数:将当前指令 flush 到当前组 + def _flush_command() -> str | None: + nonlocal \ + current_command_args, \ + current_redirect_target, \ + current_redirect_append + if not current_command_args: + return None + + cmd_name = current_command_args[0] + args = current_command_args[1:] + + matched = [ + h for h in self.handlers if cmd_name in h.keywords or cmd_name == h.name + ] + if not matched: + return f"不存在名为 {cmd_name} 的函数" + if len(matched) > 1: + logger.warning( + f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}" + ) + + cmd = PipelineCommand( + handler=matched[0], + args=args, + redirect_target=current_redirect_target, + redirect_append=current_redirect_append, + ) + current_group.append(cmd) + + # 重置指令级状态 + current_command_args = [] + current_redirect_target = None + current_redirect_append = False + return None + + # 使用索引遍历以支持 look-ahead (处理 >>) + i = 0 + length = len(script) + + while i < length: + c = script[i] + + match status: + case PipelineParseStatus.normal: + if c in whitespaces: + _flush_token() + + elif c in "'\"": + status_in_string_pair = c + status = PipelineParseStatus.in_string + current_string_raw = "" + + elif c == "|": + _flush_token() + if err := _flush_command(): + return err + # 管道符不结束 group,继续在 current_group 添加 + + elif c == ";": + _flush_token() + if err := _flush_command(): + return err + # 分号结束 group + if current_group: + pipeline.command_groups.append(current_group) + current_group = [] + + elif c == ">": + _flush_token() # 先结束之前的参数 + # 检查是否是 append 模式 (>>) + if i + 1 < length and script[i + 1] == ">": + current_redirect_append = True + i += 1 # 跳过下一个 > + else: + current_redirect_append = False + + # 标记下一个 token 为文件名 + is_parsing_redirect_filename = True + + else: + current_string += c + + case PipelineParseStatus.in_string: + current_string_raw += c + if c == status_in_string_pair: + status = PipelineParseStatus.off_string + elif c == "\\": + status = PipelineParseStatus.in_string_to_escape + else: + current_string += c + + case PipelineParseStatus.in_string_to_escape: + current_string += c + status = PipelineParseStatus.in_string + + case PipelineParseStatus.off_string: + if c in whitespaces: + _flush_token() + status = PipelineParseStatus.normal + elif c == "|": + _flush_token() + if err := _flush_command(): + return err + status = PipelineParseStatus.normal + elif c == ";": + _flush_token() + if err := _flush_command(): + return err + if current_group: + pipeline.command_groups.append(current_group) + current_group = [] + status = PipelineParseStatus.normal + elif c == ">": + _flush_token() + status = PipelineParseStatus.normal + # 回退索引,让下一次循环进入 normal 状态的 > 处理逻辑 + i -= 1 + else: + # 紧接着的字符继续作为当前字符串的一部分 (如 "abc"d) + current_string += c + current_string_raw = "" + status = PipelineParseStatus.normal + + i += 1 + + # 循环结束后的收尾 + _flush_token() + if err := _flush_command(): + return err + + if current_group: + pipeline.command_groups.append(current_group) + + return pipeline + + async def run_pipeline( + self, + pipeline: Pipeline, + istream: str | None, + env: TextHandlerEnvironment | None = None, + ) -> TextHandleResult: + if env is None: + # 默认环境 + env = TextHandlerEnvironment(is_trusted=False, buffers={}) + + final_result = TextHandleResult(code=0, ostream=istream) + + # 遍历执行指令组 (分号分隔) + for group in pipeline.command_groups: + # 每个组开始时,使用原始输入(或者根据需求设为 None,这里假设每个组独立处理 istream) + # 通常分号分隔的命令组,第一条命令如果没有 pipe 输入,它接收的 istream 取决于整体输入 + current_stream = istream + + # 遍历组内指令 (管道分隔) + for cmd in group: + if final_result.code != 0: + break + + try: + logger.debug( + f"Executing: {cmd.handler.name} args={cmd.args} redirect={cmd.redirect_target}" + ) + result = await cmd.handler.handle(env, current_stream, cmd.args) + + if result.code != 0: + final_result = result + break + + # 处理重定向逻辑 + if cmd.redirect_target: + content_to_write = result.ostream or "" + target_buffer = cmd.redirect_target + + if cmd.redirect_append: + old_content = env.buffers.get(target_buffer, "") + env.buffers[target_buffer] = old_content + content_to_write + else: + env.buffers[target_buffer] = content_to_write + + # 重定向后,标准输出通常被消耗,后续管道接收到的流为空 (或 None) + # 除非实现 tee 逻辑,否则视为流已终止 + current_stream = None + + # 更新最终结果,但 ostream 设为 None 因为被重定向了 + final_result = TextHandleResult( + code=0, ostream=None, attachment=result.attachment + ) + else: + current_stream = result.ostream + final_result = result + + except Exception as e: + logger.error(f"Pipeline execution failed at {cmd.handler.name}") + logger.exception(e) + return TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误") + + # 一个组执行完,final_result保留该组最后的状态。 + # 如果还有下一个组,final_result.code 如果是 0 则继续执行下一个组 + if final_result.code != 0: + break + + return final_result + + +def register_text_handlers(*handlers: TextHandler): + for handler in handlers: + PipelineRunner.get_runner().register(handler) diff --git a/konabot/plugins/handle_text/handlers/encoding_handlers.py b/konabot/plugins/handle_text/handlers/encoding_handlers.py new file mode 100644 index 0000000..384e778 --- /dev/null +++ b/konabot/plugins/handle_text/handlers/encoding_handlers.py @@ -0,0 +1,70 @@ +import base64 +from konabot.plugins.handle_text.base import ( + TextHandleResult, + TextHandler, + TextHandlerEnvironment, +) + + +class THBase64(TextHandler): + name = "b64" + keywords = ["base64"] + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + # 用法: b64 encode/decode [encoding] [text] + if not args and istream is None: + return TextHandleResult( + 1, "用法:b64 [编码, 默认utf-8] [文本]" + ) + + mode = args[0].lower() if args else "encode" + encoding = args[1] if len(args) > 1 else "utf-8" + + # 确定输入源 + text = ( + istream + if istream is not None + else (" ".join(args[2:]) if len(args) > 2 else "") + ) + if not text: + return TextHandleResult(1, "输入文本为空") + + try: + if mode == "encode": + res = base64.b64encode(text.encode(encoding)).decode("ascii") + else: + res = base64.b64decode(text.encode("ascii")).decode(encoding) + return TextHandleResult(0, res) + except Exception as e: + return TextHandleResult(1, f"Base64 转换失败: {str(e)}") + + +class THCaesar(TextHandler): + name = "caesar" + keywords = ["凯撒", "rot"] + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + # 用法: caesar [text] + shift = int(args[0]) if args else 13 + text = istream if istream is not None else (" ".join(args[1:]) if len(args) > 1 else "") + + def _shift(char): + if not char.isalpha(): + return char + start = ord('A') if char.isupper() else ord('a') + return chr((ord(char) - start + shift) % 26 + start) + + res = "".join(_shift(c) for c in text) + return TextHandleResult(0, res) + + +class THReverse(TextHandler): + name = "reverse" + keywords = ["rev", "反转"] + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + text = istream if istream is not None else (" ".join(args) if args else "") + return TextHandleResult(0, text[::-1]) + diff --git a/konabot/plugins/handle_text/handlers/random_handlers.py b/konabot/plugins/handle_text/handlers/random_handlers.py new file mode 100644 index 0000000..227d9b2 --- /dev/null +++ b/konabot/plugins/handle_text/handlers/random_handlers.py @@ -0,0 +1,21 @@ +import random +from konabot.plugins.handle_text.base import TextHandleResult, TextHandler, TextHandlerEnvironment + + +class THShuffle(TextHandler): + name: str = "shuffle" + keywords: list = ["打乱"] + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + if istream is not None: + w = istream + elif len(args) == 0: + return TextHandleResult(1, "使用方法:打乱 <待打乱的文本>,或者使用管道符传入待打乱的文本") + else: + w = args[0] + args = args[1:] + + w = [*w] + random.shuffle(w) + return TextHandleResult(0, ''.join(w)) + diff --git a/konabot/plugins/handle_text/handlers/unix_handlers.py b/konabot/plugins/handle_text/handlers/unix_handlers.py new file mode 100644 index 0000000..f248104 --- /dev/null +++ b/konabot/plugins/handle_text/handlers/unix_handlers.py @@ -0,0 +1,63 @@ +import re + +from konabot.plugins.handle_text.base import TextHandleResult, TextHandler, TextHandlerEnvironment + + +class THEcho(TextHandler): + name = 'echo' + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + if len(args) == 0 and istream is None: + return TextHandleResult(1, "请在 echo 后面添加需要输出的文本") + if istream is not None: + return TextHandleResult(0, '\n'.join([istream] + args)) + return TextHandleResult(0, '\n'.join(args)) + + +class THCat(TextHandler): + name = 'cat' + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + if len(args) != 1: + return TextHandleResult(1, "cat 使用方法:cat <缓存名>") + buf = args[0] + if buf == '-': + buf = istream + if buf not in env.buffers: + return TextHandleResult(2, f"缓存 {buf} 不存在") + return TextHandleResult(0, env.buffers[buf]) + + +class THRm(TextHandler): + name = 'rm' + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + if len(args) != 1: + return TextHandleResult(1, "rm 使用方法:rm <缓存名>") + buf = args[0] + if buf == '-': + buf = istream + if buf not in env.buffers: + return TextHandleResult(2, f"缓存 {buf} 不存在") + del env.buffers[buf] + return TextHandleResult(0, None) + + +class THReplace(TextHandler): + name = "replace" + keywords = ["sed", "替换"] + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + # 用法: replace [text] + if len(args) < 2: + return TextHandleResult(1, "用法:replace <正则> <替换内容> [文本]") + + pattern, repl = args[0], args[1] + text = istream if istream is not None else (" ".join(args[2:]) if len(args) > 2 else "") + + try: + res = re.sub(pattern, repl, text) + return TextHandleResult(0, res) + except Exception as e: + return TextHandleResult(1, f"正则错误: {str(e)}") +