from typing import cast import asyncio from loguru import logger from nonebot import on_command import nonebot from nonebot.adapters import Event, Bot from nonebot_plugin_alconna import UniMessage, UniMsg from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot from nonebot.adapters.onebot.v11.message import Message as OB11Message from konabot.common.apis.ali_content_safety import AlibabaGreen from konabot.common.longtask import DepLongTaskTarget from konabot.common.render_error_message import render_error_message from konabot.plugins.handle_text.base import ( PipelineRunner, TextHandlerEnvironment, register_text_handlers, ) from konabot.plugins.handle_text.handlers.ai_handlers import THQwen from konabot.plugins.handle_text.handlers.encoding_handlers import ( THAlign, THAlphaConv, THB64Hex, THBase64, THBaseConv, THCaesar, THMorse, THReverse, ) from konabot.plugins.handle_text.handlers.random_handlers import THShuffle, THSorted from konabot.plugins.handle_text.handlers.unix_handlers import ( THCat, THEcho, THFalse, THReplace, THRm, THTest, THTrue, ) from konabot.plugins.handle_text.handlers.whitespace_handlers import ( THLines, THLTrim, THRTrim, THSqueeze, THTrim, ) TEXTFX_MAX_RUNTIME_SECONDS = 60 _textfx_running_users: set[str] = set() def _get_textfx_user_key(evt: Event) -> str: user_id = getattr(evt, "user_id", None) self_id = getattr(evt, "self_id", None) group_id = getattr(evt, "group_id", None) if user_id is not None: if group_id is not None: return f"{self_id}:{group_id}:{user_id}" return f"{self_id}:private:{user_id}" session_id = getattr(evt, "get_session_id", None) if callable(session_id): try: return f"session:{evt.get_session_id()}" except Exception: pass return f"event:{evt.__class__.__name__}:{id(evt)}" cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"}) @cmd.handle() async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget): user_key = _get_textfx_user_key(evt) if user_key in _textfx_running_users: await target.send_message("你当前已有一个 textfx 脚本正在运行,请等待它结束后再试。") return istream = "" if isinstance(evt, OB11MessageEvent): if evt.reply is not None: istream = evt.reply.message.extract_plain_text() else: for seg in evt.get_message(): if seg.type == "reply": msgid = seg.get("id") if msgid is not None: msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid) istream = OB11Message( msg2data.get("message") ).extract_plain_text() script = msg.extract_plain_text().removeprefix("textfx").removeprefix("处理文字") runner = PipelineRunner.get_runner() res = runner.parse_pipeline(script) if isinstance(res, str): await target.send_message(res) return env = TextHandlerEnvironment(is_trusted=False, event=evt) _textfx_running_users.add(user_key) try: results = await asyncio.wait_for( runner.run_pipeline(res, istream or None, env), timeout=TEXTFX_MAX_RUNTIME_SECONDS, ) except asyncio.TimeoutError: rendered = await render_error_message( f"处理指令时出现问题:脚本执行超时(超过 {TEXTFX_MAX_RUNTIME_SECONDS} 秒)" ) await target.send_message(rendered) return finally: _textfx_running_users.discard(user_key) for r in results: if r.code != 0: message = f"处理指令时出现问题:{r.ostream}" rendered = await render_error_message(message) await target.send_message(rendered) return ostreams = [r.ostream for r in results if r.ostream is not None] attachments = [r.attachment for r in results if r.attachment is not None] if ostreams: txt = "\n".join(ostreams) err = await AlibabaGreen.detect(txt) if not err: await target.send_message( "处理指令时出现问题:内容被拦截!请你检查你的内容是否合理!" ) return await target.send_message(txt, at=False) for att in attachments: await target.send_message(UniMessage.image(raw=att), at=False) driver = nonebot.get_driver() @driver.on_startup async def _(): register_text_handlers( THCat(), THEcho(), THRm(), THTrue(), THFalse(), THTest(), THShuffle(), THReplace(), THBase64(), THCaesar(), THReverse(), THBaseConv(), THAlphaConv(), THB64Hex(), THAlign(), THSorted(), THMorse(), THQwen(), THTrim(), THLTrim(), THRTrim(), THSqueeze(), THLines(), ) logger.info(f"注册了 TextHandler:{PipelineRunner.get_runner().handlers}")