Files
konabot/konabot/plugins/handle_text/__init__.py
passthem d4cde42bdc
All checks were successful
continuous-integration/drone/push Build is passing
Vibe Coding: textfx 若干 issue 更新
2026-02-16 19:36:24 +08:00

127 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import cast
from loguru import logger
from nonebot import on_command
import nonebot
from nonebot.adapters import Event, Bot
from nonebot_plugin_alconna import UniMessage, UniMsg
from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot
from nonebot.adapters.onebot.v11.message import Message as OB11Message
from konabot.common.apis.ali_content_safety import AlibabaGreen
from konabot.common.longtask import DepLongTaskTarget
from konabot.plugins.handle_text.base import (
PipelineRunner,
TextHandlerEnvironment,
register_text_handlers,
)
from konabot.plugins.handle_text.handlers.ai_handlers import THQwen
from konabot.plugins.handle_text.handlers.encoding_handlers import (
THAlign,
THAlphaConv,
THB64Hex,
THBase64,
THBaseConv,
THCaesar,
THMorse,
THReverse,
)
from konabot.plugins.handle_text.handlers.random_handlers import THShuffle, THSorted
from konabot.plugins.handle_text.handlers.unix_handlers import (
THCat,
THEcho,
THReplace,
THRm,
)
from konabot.plugins.handle_text.handlers.whitespace_handlers import (
THLines,
THLTrim,
THRTrim,
THSqueeze,
THTrim,
)
cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"})
@cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
istream = ""
if isinstance(evt, OB11MessageEvent):
if evt.reply is not None:
istream = evt.reply.message.extract_plain_text()
else:
for seg in evt.get_message():
if seg.type == "reply":
msgid = seg.get("id")
if msgid is not None:
msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid)
istream = OB11Message(
msg2data.get("message")
).extract_plain_text()
script = msg.extract_plain_text().removeprefix("textfx").removeprefix("处理文字")
runner = PipelineRunner.get_runner()
res = runner.parse_pipeline(script)
if isinstance(res, str):
await target.send_message(res)
return
env = TextHandlerEnvironment(is_trusted=False)
results = await runner.run_pipeline(res, istream or None, env)
# 检查是否有错误
for r in results:
if r.code != 0:
await target.send_message(f"处理指令时出现问题:{r.ostream}")
return
# 收集所有组的文本输出和附件
ostreams = [r.ostream for r in results if r.ostream is not None]
attachments = [r.attachment for r in results if r.attachment is not None]
if ostreams:
txt = "\n".join(ostreams)
err = await AlibabaGreen.detect(txt)
if not err:
await target.send_message(
"处理指令时出现问题:内容被拦截!请你检查你的内容是否合理!"
)
return
await target.send_message(txt, at=False)
for att in attachments:
await target.send_message(UniMessage.image(raw=att), at=False)
driver = nonebot.get_driver()
@driver.on_startup
async def _():
register_text_handlers(
THCat(),
THEcho(),
THRm(),
THShuffle(),
THReplace(),
THBase64(),
THCaesar(),
THReverse(),
THBaseConv(),
THAlphaConv(),
THB64Hex(),
THAlign(),
THSorted(),
THMorse(),
THQwen(),
THTrim(),
THLTrim(),
THRTrim(),
THSqueeze(),
THLines(),
)
logger.info(f"注册了 TextHandler{PipelineRunner.get_runner().handlers}")