from typing import cast from loguru import logger from nonebot import on_command import nonebot from nonebot.adapters import Event, Bot from nonebot_plugin_alconna import UniMessage, UniMsg from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot from nonebot.adapters.onebot.v11.message import Message as OB11Message from konabot.common.apis.ali_content_safety import AlibabaGreen from konabot.common.longtask import DepLongTaskTarget from konabot.plugins.handle_text.base import ( PipelineRunner, TextHandlerEnvironment, register_text_handlers, ) from konabot.plugins.handle_text.handlers.ai_handlers import THQwen from konabot.plugins.handle_text.handlers.encoding_handlers import ( THAlign, THAlphaConv, THB64Hex, THBase64, THBaseConv, THCaesar, THMorse, THReverse, ) from konabot.plugins.handle_text.handlers.random_handlers import THShuffle, THSorted from konabot.plugins.handle_text.handlers.unix_handlers import ( THCat, THEcho, THReplace, THRm, ) from konabot.plugins.handle_text.handlers.whitespace_handlers import ( THLines, THLTrim, THRTrim, THSqueeze, THTrim, ) cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"}) @cmd.handle() async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget): istream = "" if isinstance(evt, OB11MessageEvent): if evt.reply is not None: istream = evt.reply.message.extract_plain_text() else: for seg in evt.get_message(): if seg.type == "reply": msgid = seg.get("id") if msgid is not None: msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid) istream = OB11Message( msg2data.get("message") ).extract_plain_text() script = msg.extract_plain_text().removeprefix("textfx").removeprefix("处理文字") runner = PipelineRunner.get_runner() res = runner.parse_pipeline(script) if isinstance(res, str): await target.send_message(res) return env = TextHandlerEnvironment(is_trusted=False) results = await runner.run_pipeline(res, istream or None, env) # 检查是否有错误 for r in results: if r.code != 0: await target.send_message(f"处理指令时出现问题:{r.ostream}") return # 收集所有组的文本输出和附件 ostreams = [r.ostream for r in results if r.ostream is not None] attachments = [r.attachment for r in results if r.attachment is not None] if ostreams: txt = "\n".join(ostreams) err = await AlibabaGreen.detect(txt) if not err: await target.send_message( "处理指令时出现问题:内容被拦截!请你检查你的内容是否合理!" ) return await target.send_message(txt, at=False) for att in attachments: await target.send_message(UniMessage.image(raw=att), at=False) driver = nonebot.get_driver() @driver.on_startup async def _(): register_text_handlers( THCat(), THEcho(), THRm(), THShuffle(), THReplace(), THBase64(), THCaesar(), THReverse(), THBaseConv(), THAlphaConv(), THB64Hex(), THAlign(), THSorted(), THMorse(), THQwen(), THTrim(), THLTrim(), THRTrim(), THSqueeze(), THLines(), ) logger.info(f"注册了 TextHandler:{PipelineRunner.get_runner().handlers}")