This commit is contained in:
78
konabot/plugins/handle_text/__init__.py
Normal file
78
konabot/plugins/handle_text/__init__.py
Normal file
@ -0,0 +1,78 @@
|
||||
from typing import cast
|
||||
from loguru import logger
|
||||
from nonebot import on_command
|
||||
import nonebot
|
||||
from nonebot.adapters import Event, Bot
|
||||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||
from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
|
||||
from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot
|
||||
from nonebot.adapters.onebot.v11.message import Message as OB11Message
|
||||
|
||||
from konabot.common.apis.ali_content_safety import AlibabaGreen
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.plugins.handle_text.base import PipelineRunner, TextHandlerEnvironment, register_text_handlers
|
||||
from konabot.plugins.handle_text.handlers.encoding_handlers import THBase64, THCaesar, THReverse
|
||||
from konabot.plugins.handle_text.handlers.random_handlers import THShuffle
|
||||
from konabot.plugins.handle_text.handlers.unix_handlers import THCat, THEcho, THReplace, THRm
|
||||
|
||||
|
||||
cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"})
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
||||
istream = ""
|
||||
if isinstance(evt, OB11MessageEvent):
|
||||
if evt.reply is not None:
|
||||
istream = evt.reply.message.extract_plain_text()
|
||||
else:
|
||||
for seg in evt.get_message():
|
||||
if seg.type == 'reply':
|
||||
msgid = seg.get('id')
|
||||
if msgid is not None:
|
||||
msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid)
|
||||
istream = OB11Message(msg2data.get("message")).extract_plain_text()
|
||||
|
||||
script = msg.extract_plain_text().removeprefix("textfx").removeprefix("处理文字")
|
||||
runner = PipelineRunner.get_runner()
|
||||
res = runner.parse_pipeline(script)
|
||||
|
||||
if isinstance(res, str):
|
||||
await target.send_message(res)
|
||||
return
|
||||
|
||||
env = TextHandlerEnvironment(
|
||||
is_trusted=False
|
||||
)
|
||||
res2 = await runner.run_pipeline(res, istream or None, env)
|
||||
if res2.code != 0:
|
||||
await target.send_message(f"处理指令时出现问题:{res2.ostream}")
|
||||
elif res2.ostream is not None:
|
||||
txt = res2.ostream
|
||||
err = await AlibabaGreen.detect(txt)
|
||||
if not err:
|
||||
await target.send_message("处理指令时出现问题:内容被拦截!请你检查你的内容是否合理!")
|
||||
return
|
||||
await target.send_message(res2.ostream)
|
||||
if res2.attachment is not None:
|
||||
# 潜在风险点:这里没有人可以做安全检查
|
||||
await target.send_message(UniMessage.image(raw=res2.attachment))
|
||||
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
register_text_handlers(
|
||||
THCat(),
|
||||
THEcho(),
|
||||
THRm(),
|
||||
THShuffle(),
|
||||
THReplace(),
|
||||
THBase64(),
|
||||
THCaesar(),
|
||||
THReverse(),
|
||||
)
|
||||
logger.info(f"注册了 TextHandler:{PipelineRunner.get_runner().handlers}")
|
||||
|
||||
321
konabot/plugins/handle_text/base.py
Normal file
321
konabot/plugins/handle_text/base.py
Normal file
@ -0,0 +1,321 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from string import whitespace
|
||||
from typing import cast
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
@dataclass
|
||||
class TextHandlerEnvironment:
|
||||
is_trusted: bool
|
||||
buffers: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TextHandleResult:
|
||||
code: int
|
||||
ostream: str | None
|
||||
attachment: bytes | None = None
|
||||
|
||||
|
||||
class TextHandler(ABC):
|
||||
name: str = ''
|
||||
keywords: list[str] = []
|
||||
|
||||
@abstractmethod
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult: ...
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<{self.__class__.__name__}: {self.name} [{''.join(self.keywords)}]>"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineCommand:
|
||||
handler: TextHandler
|
||||
args: list[str]
|
||||
# 新增:重定向目标(buffer key)
|
||||
redirect_target: str | None = None
|
||||
# 新增:是否为追加模式 (>>)
|
||||
redirect_append: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class Pipeline:
|
||||
command_groups: list[list[PipelineCommand]] = field(default_factory=list)
|
||||
"一个列表的列表,每一组之间的指令之间使用管道符连接,而不同组之间不会有数据流"
|
||||
|
||||
|
||||
class PipelineParseStatus(Enum):
|
||||
normal = 0
|
||||
in_string = 1
|
||||
in_string_to_escape = 2
|
||||
off_string = 3
|
||||
|
||||
|
||||
whitespaces = whitespace + " "
|
||||
|
||||
|
||||
class PipelineRunner:
|
||||
handlers: list[TextHandler]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.handlers = []
|
||||
|
||||
@staticmethod
|
||||
def get_runner():
|
||||
if "singleton" not in PipelineRunner.__annotations__:
|
||||
PipelineRunner.__annotations__["singleton"] = PipelineRunner()
|
||||
return cast(PipelineRunner, PipelineRunner.__annotations__.get("singleton"))
|
||||
|
||||
def register(self, handler: TextHandler):
|
||||
self.handlers.append(handler)
|
||||
|
||||
def parse_pipeline(self, script: str) -> Pipeline | str:
|
||||
pipeline = Pipeline()
|
||||
|
||||
# 当前正在构建的上下文
|
||||
current_group: list[PipelineCommand] = []
|
||||
current_command_args: list[str] = []
|
||||
|
||||
# 字符串解析状态
|
||||
status = PipelineParseStatus.normal
|
||||
current_string = ""
|
||||
current_string_raw = ""
|
||||
status_in_string_pair = ""
|
||||
|
||||
# 重定向解析状态
|
||||
is_parsing_redirect_filename = False
|
||||
current_redirect_target: str | None = None
|
||||
current_redirect_append = False
|
||||
|
||||
# 辅助函数:将当前解析到的字符串 flush 到 参数列表 或 重定向目标
|
||||
def _flush_token():
|
||||
nonlocal \
|
||||
current_string, \
|
||||
current_string_raw, \
|
||||
is_parsing_redirect_filename, \
|
||||
current_redirect_target
|
||||
if not current_string:
|
||||
return
|
||||
|
||||
if is_parsing_redirect_filename:
|
||||
current_redirect_target = current_string
|
||||
is_parsing_redirect_filename = False # 重定向文件名只取一个 token
|
||||
else:
|
||||
current_command_args.append(current_string)
|
||||
|
||||
current_string = ""
|
||||
current_string_raw = ""
|
||||
|
||||
# 辅助函数:将当前指令 flush 到当前组
|
||||
def _flush_command() -> str | None:
|
||||
nonlocal \
|
||||
current_command_args, \
|
||||
current_redirect_target, \
|
||||
current_redirect_append
|
||||
if not current_command_args:
|
||||
return None
|
||||
|
||||
cmd_name = current_command_args[0]
|
||||
args = current_command_args[1:]
|
||||
|
||||
matched = [
|
||||
h for h in self.handlers if cmd_name in h.keywords or cmd_name == h.name
|
||||
]
|
||||
if not matched:
|
||||
return f"不存在名为 {cmd_name} 的函数"
|
||||
if len(matched) > 1:
|
||||
logger.warning(
|
||||
f"指令能对应超过一个文本处理器 CMD={cmd_name} handlers={self.handlers}"
|
||||
)
|
||||
|
||||
cmd = PipelineCommand(
|
||||
handler=matched[0],
|
||||
args=args,
|
||||
redirect_target=current_redirect_target,
|
||||
redirect_append=current_redirect_append,
|
||||
)
|
||||
current_group.append(cmd)
|
||||
|
||||
# 重置指令级状态
|
||||
current_command_args = []
|
||||
current_redirect_target = None
|
||||
current_redirect_append = False
|
||||
return None
|
||||
|
||||
# 使用索引遍历以支持 look-ahead (处理 >>)
|
||||
i = 0
|
||||
length = len(script)
|
||||
|
||||
while i < length:
|
||||
c = script[i]
|
||||
|
||||
match status:
|
||||
case PipelineParseStatus.normal:
|
||||
if c in whitespaces:
|
||||
_flush_token()
|
||||
|
||||
elif c in "'\"":
|
||||
status_in_string_pair = c
|
||||
status = PipelineParseStatus.in_string
|
||||
current_string_raw = ""
|
||||
|
||||
elif c == "|":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
# 管道符不结束 group,继续在 current_group 添加
|
||||
|
||||
elif c == ";":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
# 分号结束 group
|
||||
if current_group:
|
||||
pipeline.command_groups.append(current_group)
|
||||
current_group = []
|
||||
|
||||
elif c == ">":
|
||||
_flush_token() # 先结束之前的参数
|
||||
# 检查是否是 append 模式 (>>)
|
||||
if i + 1 < length and script[i + 1] == ">":
|
||||
current_redirect_append = True
|
||||
i += 1 # 跳过下一个 >
|
||||
else:
|
||||
current_redirect_append = False
|
||||
|
||||
# 标记下一个 token 为文件名
|
||||
is_parsing_redirect_filename = True
|
||||
|
||||
else:
|
||||
current_string += c
|
||||
|
||||
case PipelineParseStatus.in_string:
|
||||
current_string_raw += c
|
||||
if c == status_in_string_pair:
|
||||
status = PipelineParseStatus.off_string
|
||||
elif c == "\\":
|
||||
status = PipelineParseStatus.in_string_to_escape
|
||||
else:
|
||||
current_string += c
|
||||
|
||||
case PipelineParseStatus.in_string_to_escape:
|
||||
current_string += c
|
||||
status = PipelineParseStatus.in_string
|
||||
|
||||
case PipelineParseStatus.off_string:
|
||||
if c in whitespaces:
|
||||
_flush_token()
|
||||
status = PipelineParseStatus.normal
|
||||
elif c == "|":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
status = PipelineParseStatus.normal
|
||||
elif c == ";":
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
if current_group:
|
||||
pipeline.command_groups.append(current_group)
|
||||
current_group = []
|
||||
status = PipelineParseStatus.normal
|
||||
elif c == ">":
|
||||
_flush_token()
|
||||
status = PipelineParseStatus.normal
|
||||
# 回退索引,让下一次循环进入 normal 状态的 > 处理逻辑
|
||||
i -= 1
|
||||
else:
|
||||
# 紧接着的字符继续作为当前字符串的一部分 (如 "abc"d)
|
||||
current_string += c
|
||||
current_string_raw = ""
|
||||
status = PipelineParseStatus.normal
|
||||
|
||||
i += 1
|
||||
|
||||
# 循环结束后的收尾
|
||||
_flush_token()
|
||||
if err := _flush_command():
|
||||
return err
|
||||
|
||||
if current_group:
|
||||
pipeline.command_groups.append(current_group)
|
||||
|
||||
return pipeline
|
||||
|
||||
async def run_pipeline(
|
||||
self,
|
||||
pipeline: Pipeline,
|
||||
istream: str | None,
|
||||
env: TextHandlerEnvironment | None = None,
|
||||
) -> TextHandleResult:
|
||||
if env is None:
|
||||
# 默认环境
|
||||
env = TextHandlerEnvironment(is_trusted=False, buffers={})
|
||||
|
||||
final_result = TextHandleResult(code=0, ostream=istream)
|
||||
|
||||
# 遍历执行指令组 (分号分隔)
|
||||
for group in pipeline.command_groups:
|
||||
# 每个组开始时,使用原始输入(或者根据需求设为 None,这里假设每个组独立处理 istream)
|
||||
# 通常分号分隔的命令组,第一条命令如果没有 pipe 输入,它接收的 istream 取决于整体输入
|
||||
current_stream = istream
|
||||
|
||||
# 遍历组内指令 (管道分隔)
|
||||
for cmd in group:
|
||||
if final_result.code != 0:
|
||||
break
|
||||
|
||||
try:
|
||||
logger.debug(
|
||||
f"Executing: {cmd.handler.name} args={cmd.args} redirect={cmd.redirect_target}"
|
||||
)
|
||||
result = await cmd.handler.handle(env, current_stream, cmd.args)
|
||||
|
||||
if result.code != 0:
|
||||
final_result = result
|
||||
break
|
||||
|
||||
# 处理重定向逻辑
|
||||
if cmd.redirect_target:
|
||||
content_to_write = result.ostream or ""
|
||||
target_buffer = cmd.redirect_target
|
||||
|
||||
if cmd.redirect_append:
|
||||
old_content = env.buffers.get(target_buffer, "")
|
||||
env.buffers[target_buffer] = old_content + content_to_write
|
||||
else:
|
||||
env.buffers[target_buffer] = content_to_write
|
||||
|
||||
# 重定向后,标准输出通常被消耗,后续管道接收到的流为空 (或 None)
|
||||
# 除非实现 tee 逻辑,否则视为流已终止
|
||||
current_stream = None
|
||||
|
||||
# 更新最终结果,但 ostream 设为 None 因为被重定向了
|
||||
final_result = TextHandleResult(
|
||||
code=0, ostream=None, attachment=result.attachment
|
||||
)
|
||||
else:
|
||||
current_stream = result.ostream
|
||||
final_result = result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline execution failed at {cmd.handler.name}")
|
||||
logger.exception(e)
|
||||
return TextHandleResult(code=-1, ostream="处理流水线时出现 python 错误")
|
||||
|
||||
# 一个组执行完,final_result保留该组最后的状态。
|
||||
# 如果还有下一个组,final_result.code 如果是 0 则继续执行下一个组
|
||||
if final_result.code != 0:
|
||||
break
|
||||
|
||||
return final_result
|
||||
|
||||
|
||||
def register_text_handlers(*handlers: TextHandler):
|
||||
for handler in handlers:
|
||||
PipelineRunner.get_runner().register(handler)
|
||||
70
konabot/plugins/handle_text/handlers/encoding_handlers.py
Normal file
70
konabot/plugins/handle_text/handlers/encoding_handlers.py
Normal file
@ -0,0 +1,70 @@
|
||||
import base64
|
||||
from konabot.plugins.handle_text.base import (
|
||||
TextHandleResult,
|
||||
TextHandler,
|
||||
TextHandlerEnvironment,
|
||||
)
|
||||
|
||||
|
||||
class THBase64(TextHandler):
|
||||
name = "b64"
|
||||
keywords = ["base64"]
|
||||
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
# 用法: b64 encode/decode [encoding] [text]
|
||||
if not args and istream is None:
|
||||
return TextHandleResult(
|
||||
1, "用法:b64 <encode|decode> [编码, 默认utf-8] [文本]"
|
||||
)
|
||||
|
||||
mode = args[0].lower() if args else "encode"
|
||||
encoding = args[1] if len(args) > 1 else "utf-8"
|
||||
|
||||
# 确定输入源
|
||||
text = (
|
||||
istream
|
||||
if istream is not None
|
||||
else (" ".join(args[2:]) if len(args) > 2 else "")
|
||||
)
|
||||
if not text:
|
||||
return TextHandleResult(1, "输入文本为空")
|
||||
|
||||
try:
|
||||
if mode == "encode":
|
||||
res = base64.b64encode(text.encode(encoding)).decode("ascii")
|
||||
else:
|
||||
res = base64.b64decode(text.encode("ascii")).decode(encoding)
|
||||
return TextHandleResult(0, res)
|
||||
except Exception as e:
|
||||
return TextHandleResult(1, f"Base64 转换失败: {str(e)}")
|
||||
|
||||
|
||||
class THCaesar(TextHandler):
|
||||
name = "caesar"
|
||||
keywords = ["凯撒", "rot"]
|
||||
|
||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
||||
# 用法: caesar <shift> [text]
|
||||
shift = int(args[0]) if args else 13
|
||||
text = istream if istream is not None else (" ".join(args[1:]) if len(args) > 1 else "")
|
||||
|
||||
def _shift(char):
|
||||
if not char.isalpha():
|
||||
return char
|
||||
start = ord('A') if char.isupper() else ord('a')
|
||||
return chr((ord(char) - start + shift) % 26 + start)
|
||||
|
||||
res = "".join(_shift(c) for c in text)
|
||||
return TextHandleResult(0, res)
|
||||
|
||||
|
||||
class THReverse(TextHandler):
|
||||
name = "reverse"
|
||||
keywords = ["rev", "反转"]
|
||||
|
||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
||||
text = istream if istream is not None else (" ".join(args) if args else "")
|
||||
return TextHandleResult(0, text[::-1])
|
||||
|
||||
21
konabot/plugins/handle_text/handlers/random_handlers.py
Normal file
21
konabot/plugins/handle_text/handlers/random_handlers.py
Normal file
@ -0,0 +1,21 @@
|
||||
import random
|
||||
from konabot.plugins.handle_text.base import TextHandleResult, TextHandler, TextHandlerEnvironment
|
||||
|
||||
|
||||
class THShuffle(TextHandler):
|
||||
name: str = "shuffle"
|
||||
keywords: list = ["打乱"]
|
||||
|
||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
||||
if istream is not None:
|
||||
w = istream
|
||||
elif len(args) == 0:
|
||||
return TextHandleResult(1, "使用方法:打乱 <待打乱的文本>,或者使用管道符传入待打乱的文本")
|
||||
else:
|
||||
w = args[0]
|
||||
args = args[1:]
|
||||
|
||||
w = [*w]
|
||||
random.shuffle(w)
|
||||
return TextHandleResult(0, ''.join(w))
|
||||
|
||||
63
konabot/plugins/handle_text/handlers/unix_handlers.py
Normal file
63
konabot/plugins/handle_text/handlers/unix_handlers.py
Normal file
@ -0,0 +1,63 @@
|
||||
import re
|
||||
|
||||
from konabot.plugins.handle_text.base import TextHandleResult, TextHandler, TextHandlerEnvironment
|
||||
|
||||
|
||||
class THEcho(TextHandler):
|
||||
name = 'echo'
|
||||
|
||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
||||
if len(args) == 0 and istream is None:
|
||||
return TextHandleResult(1, "请在 echo 后面添加需要输出的文本")
|
||||
if istream is not None:
|
||||
return TextHandleResult(0, '\n'.join([istream] + args))
|
||||
return TextHandleResult(0, '\n'.join(args))
|
||||
|
||||
|
||||
class THCat(TextHandler):
|
||||
name = 'cat'
|
||||
|
||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
||||
if len(args) != 1:
|
||||
return TextHandleResult(1, "cat 使用方法:cat <缓存名>")
|
||||
buf = args[0]
|
||||
if buf == '-':
|
||||
buf = istream
|
||||
if buf not in env.buffers:
|
||||
return TextHandleResult(2, f"缓存 {buf} 不存在")
|
||||
return TextHandleResult(0, env.buffers[buf])
|
||||
|
||||
|
||||
class THRm(TextHandler):
|
||||
name = 'rm'
|
||||
|
||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
||||
if len(args) != 1:
|
||||
return TextHandleResult(1, "rm 使用方法:rm <缓存名>")
|
||||
buf = args[0]
|
||||
if buf == '-':
|
||||
buf = istream
|
||||
if buf not in env.buffers:
|
||||
return TextHandleResult(2, f"缓存 {buf} 不存在")
|
||||
del env.buffers[buf]
|
||||
return TextHandleResult(0, None)
|
||||
|
||||
|
||||
class THReplace(TextHandler):
|
||||
name = "replace"
|
||||
keywords = ["sed", "替换"]
|
||||
|
||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
||||
# 用法: replace <pattern> <replacement> [text]
|
||||
if len(args) < 2:
|
||||
return TextHandleResult(1, "用法:replace <正则> <替换内容> [文本]")
|
||||
|
||||
pattern, repl = args[0], args[1]
|
||||
text = istream if istream is not None else (" ".join(args[2:]) if len(args) > 2 else "")
|
||||
|
||||
try:
|
||||
res = re.sub(pattern, repl, text)
|
||||
return TextHandleResult(0, res)
|
||||
except Exception as e:
|
||||
return TextHandleResult(1, f"正则错误: {str(e)}")
|
||||
|
||||
Reference in New Issue
Block a user