Files
konabot/konabot/plugins/handle_text/base.py
passthem d4cde42bdc
All checks were successful
continuous-integration/drone/push Build is passing
Vibe Coding: textfx 若干 issue 更新
2026-02-16 19:36:24 +08:00

349 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from string import whitespace
from typing import cast
from loguru import logger
@dataclass
class TextHandlerEnvironment:
is_trusted: bool
buffers: dict[str, str] = field(default_factory=dict)
@dataclass
class TextHandleResult:
code: int
ostream: str | None
attachment: bytes | None = None
class TextHandler(ABC):
name: str = ""
keywords: list[str] = []
@abstractmethod
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult: ...
def __repr__(self) -> str:
return f"<{self.__class__.__name__}: {self.name} [{''.join(self.keywords)}]>"
class TextHandlerSync(TextHandler):
@abstractmethod
def handle_sync(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult: ...
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
def _hs():
return self.handle_sync(env, istream, args)
return await asyncio.to_thread(_hs)
@dataclass
class PipelineCommand:
handler: TextHandler
args: list[str]
# 新增重定向目标buffer key
redirect_target: str | None = None
# 新增:是否为追加模式 (>>)
redirect_append: bool = False
@dataclass
class Pipeline:
command_groups: list[list[PipelineCommand]] = field(default_factory=list)
"一个列表的列表,每一组之间的指令之间使用管道符连接,而不同组之间不会有数据流"
class PipelineParseStatus(Enum):
normal = 0
in_string = 1
in_string_to_escape = 2
off_string = 3
whitespaces = whitespace + ""
class PipelineRunner:
handlers: list[TextHandler]
def __init__(self) -> None:
self.handlers = []
@staticmethod
def get_runner():
if "singleton" not in PipelineRunner.__annotations__:
PipelineRunner.__annotations__["singleton"] = PipelineRunner()
return cast(PipelineRunner, PipelineRunner.__annotations__.get("singleton"))
def register(self, handler: TextHandler):
self.handlers.append(handler)
def parse_pipeline(self, script: str) -> Pipeline | str:
pipeline = Pipeline()
# 当前正在构建的上下文
current_group: list[PipelineCommand] = []
current_command_args: list[str] = []
# 字符串解析状态
status = PipelineParseStatus.normal
current_string = ""
current_string_raw = ""
status_in_string_pair = ""
has_token = False # 是否正在构建一个 token区分空字符串和无 token
# 重定向解析状态
is_parsing_redirect_filename = False
current_redirect_target: str | None = None
current_redirect_append = False
# 辅助函数:将当前解析到的字符串 flush 到 参数列表 或 重定向目标
def _flush_token():
nonlocal \
current_string, \
current_string_raw, \
is_parsing_redirect_filename, \
current_redirect_target, \
has_token
if not has_token:
return
if is_parsing_redirect_filename:
current_redirect_target = current_string
is_parsing_redirect_filename = False # 重定向文件名只取一个 token
else:
current_command_args.append(current_string)
current_string = ""
current_string_raw = ""
has_token = False
# 辅助函数:将当前指令 flush 到当前组
def _flush_command() -> str | None:
nonlocal \
current_command_args, \
current_redirect_target, \
current_redirect_append
if not current_command_args:
return None
cmd_name = current_command_args[0]
args = current_command_args[1:]
matched = [
h for h in self.handlers if cmd_name in h.keywords or cmd_name == h.name
]
if not matched:
return f"不存在名为 {cmd_name} 的函数"
if len(matched) > 1:
logger.warning(
f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}"
)
cmd = PipelineCommand(
handler=matched[0],
args=args,
redirect_target=current_redirect_target,
redirect_append=current_redirect_append,
)
current_group.append(cmd)
# 重置指令级状态
current_command_args = []
current_redirect_target = None
current_redirect_append = False
return None
# 使用索引遍历以支持 look-ahead (处理 >>)
i = 0
length = len(script)
while i < length:
c = script[i]
match status:
case PipelineParseStatus.normal:
if c in whitespaces:
_flush_token()
elif c in "'\"":
status_in_string_pair = c
status = PipelineParseStatus.in_string
current_string_raw = ""
has_token = True
elif c == "|":
_flush_token()
if err := _flush_command():
return err
# 管道符不结束 group继续在 current_group 添加
elif c == ";":
_flush_token()
if err := _flush_command():
return err
# 分号结束 group
if current_group:
pipeline.command_groups.append(current_group)
current_group = []
elif c == ">":
_flush_token() # 先结束之前的参数
# 检查是否是 append 模式 (>>)
if i + 1 < length and script[i + 1] == ">":
current_redirect_append = True
i += 1 # 跳过下一个 >
else:
current_redirect_append = False
# 标记下一个 token 为文件名
is_parsing_redirect_filename = True
else:
current_string += c
has_token = True
case PipelineParseStatus.in_string:
current_string_raw += c
if c == status_in_string_pair:
status = PipelineParseStatus.off_string
elif c == "\\":
status = PipelineParseStatus.in_string_to_escape
else:
current_string += c
case PipelineParseStatus.in_string_to_escape:
escape_map = {
"n": "\n",
"r": "\r",
"t": "\t",
"0": "\0",
"a": "\a",
"b": "\b",
"f": "\f",
"v": "\v",
"\\": "\\",
}
current_string += escape_map.get(c, c)
status = PipelineParseStatus.in_string
case PipelineParseStatus.off_string:
if c in whitespaces:
_flush_token()
status = PipelineParseStatus.normal
elif c == "|":
_flush_token()
if err := _flush_command():
return err
status = PipelineParseStatus.normal
elif c == ";":
_flush_token()
if err := _flush_command():
return err
if current_group:
pipeline.command_groups.append(current_group)
current_group = []
status = PipelineParseStatus.normal
elif c == ">":
_flush_token()
status = PipelineParseStatus.normal
# 回退索引,让下一次循环进入 normal 状态的 > 处理逻辑
i -= 1
else:
# 紧接着的字符继续作为当前字符串的一部分 (如 "abc"d)
current_string += c
current_string_raw = ""
status = PipelineParseStatus.normal
i += 1
# 循环结束后的收尾
_flush_token()
if err := _flush_command():
return err
if current_group:
pipeline.command_groups.append(current_group)
return pipeline
async def run_pipeline(
self,
pipeline: Pipeline,
istream: str | None,
env: TextHandlerEnvironment | None = None,
) -> list[TextHandleResult]:
if env is None:
env = TextHandlerEnvironment(is_trusted=False, buffers={})
results: list[TextHandleResult] = []
# 遍历执行指令组 (分号分隔),每个组独立产生输出
for group in pipeline.command_groups:
current_stream = istream
group_result = TextHandleResult(code=0, ostream=None)
# 遍历组内指令 (管道分隔)
for cmd in group:
try:
logger.debug(
f"Executing: {cmd.handler.name} args={cmd.args} redirect={cmd.redirect_target}"
)
result = await cmd.handler.handle(env, current_stream, cmd.args)
if result.code != 0:
# 组内出错,整条流水线中止
results.append(result)
return results
# 处理重定向逻辑
if cmd.redirect_target:
content_to_write = result.ostream or ""
target_buffer = cmd.redirect_target
if cmd.redirect_append:
old_content = env.buffers.get(target_buffer, "")
env.buffers[target_buffer] = old_content + content_to_write
else:
env.buffers[target_buffer] = content_to_write
current_stream = None
group_result = TextHandleResult(
code=0, ostream=None, attachment=result.attachment
)
else:
current_stream = result.ostream
group_result = result
except Exception as e:
logger.error(f"Pipeline execution failed at {cmd.handler.name}")
logger.exception(e)
results.append(
TextHandleResult(
code=-1, ostream="处理流水线时出现 python 错误"
)
)
return results
results.append(group_result)
return results
def register_text_handlers(*handlers: TextHandler):
for handler in handlers:
PipelineRunner.get_runner().register(handler)