forked from mttu-developers/konabot
85 lines
3.0 KiB
Python
85 lines
3.0 KiB
Python
from typing import cast
|
||
from loguru import logger
|
||
from nonebot import on_command
|
||
import nonebot
|
||
from nonebot.adapters import Event, Bot
|
||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||
from nonebot.adapters.onebot.v11.event import MessageEvent as OB11MessageEvent
|
||
from nonebot.adapters.onebot.v11.bot import Bot as OB11Bot
|
||
from nonebot.adapters.onebot.v11.message import Message as OB11Message
|
||
|
||
from konabot.common.apis.ali_content_safety import AlibabaGreen
|
||
from konabot.common.longtask import DepLongTaskTarget
|
||
from konabot.plugins.handle_text.base import PipelineRunner, TextHandlerEnvironment, register_text_handlers
|
||
from konabot.plugins.handle_text.handlers.encoding_handlers import THAlign, THAlphaConv, THB64Hex, THBase64, THBaseConv, THCaesar, THMorse, THReverse
|
||
from konabot.plugins.handle_text.handlers.random_handlers import THShuffle, THSorted
|
||
from konabot.plugins.handle_text.handlers.unix_handlers import THCat, THEcho, THReplace, THRm
|
||
|
||
|
||
cmd = on_command(cmd="textfx", aliases={"处理文字", "处理文本"})
|
||
|
||
|
||
@cmd.handle()
|
||
async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
||
istream = ""
|
||
if isinstance(evt, OB11MessageEvent):
|
||
if evt.reply is not None:
|
||
istream = evt.reply.message.extract_plain_text()
|
||
else:
|
||
for seg in evt.get_message():
|
||
if seg.type == 'reply':
|
||
msgid = seg.get('id')
|
||
if msgid is not None:
|
||
msg2data = await cast(OB11Bot, bot).get_msg(message_id=msgid)
|
||
istream = OB11Message(msg2data.get("message")).extract_plain_text()
|
||
|
||
script = msg.extract_plain_text().removeprefix("textfx").removeprefix("处理文字")
|
||
runner = PipelineRunner.get_runner()
|
||
res = runner.parse_pipeline(script)
|
||
|
||
if isinstance(res, str):
|
||
await target.send_message(res)
|
||
return
|
||
|
||
env = TextHandlerEnvironment(
|
||
is_trusted=False
|
||
)
|
||
res2 = await runner.run_pipeline(res, istream or None, env)
|
||
if res2.code != 0:
|
||
await target.send_message(f"处理指令时出现问题:{res2.ostream}")
|
||
elif res2.ostream is not None:
|
||
txt = res2.ostream
|
||
err = await AlibabaGreen.detect(txt)
|
||
if not err:
|
||
await target.send_message("处理指令时出现问题:内容被拦截!请你检查你的内容是否合理!")
|
||
return
|
||
await target.send_message(res2.ostream, at=False)
|
||
if res2.attachment is not None:
|
||
# 潜在风险点:这里没有人可以做安全检查
|
||
await target.send_message(UniMessage.image(raw=res2.attachment), at=False)
|
||
|
||
|
||
driver = nonebot.get_driver()
|
||
|
||
|
||
@driver.on_startup
|
||
async def _():
|
||
register_text_handlers(
|
||
THCat(),
|
||
THEcho(),
|
||
THRm(),
|
||
THShuffle(),
|
||
THReplace(),
|
||
THBase64(),
|
||
THCaesar(),
|
||
THReverse(),
|
||
THBaseConv(),
|
||
THAlphaConv(),
|
||
THB64Hex(),
|
||
THAlign(),
|
||
THSorted(),
|
||
THMorse(),
|
||
)
|
||
logger.info(f"注册了 TextHandler:{PipelineRunner.get_runner().handlers}")
|
||
|