93 lines
2.9 KiB
Python
93 lines
2.9 KiB
Python
import re
|
||
|
||
from konabot.plugins.handle_text.base import (
|
||
TextHandleResult,
|
||
TextHandler,
|
||
TextHandlerEnvironment,
|
||
)
|
||
|
||
|
||
class THEcho(TextHandler):
|
||
name = "echo"
|
||
|
||
async def handle(
|
||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||
) -> TextHandleResult:
|
||
if len(args) == 0 and istream is None:
|
||
return TextHandleResult(1, "请在 echo 后面添加需要输出的文本")
|
||
if istream is not None:
|
||
return TextHandleResult(0, "\n".join([istream] + args))
|
||
return TextHandleResult(0, "\n".join(args))
|
||
|
||
|
||
class THCat(TextHandler):
|
||
name = "cat"
|
||
|
||
async def handle(
|
||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||
) -> TextHandleResult:
|
||
# No args: pass through stdin (like Unix cat with no arguments)
|
||
if len(args) == 0:
|
||
if istream is None:
|
||
return TextHandleResult(
|
||
1,
|
||
"cat 使用方法:cat [缓存名 ...]\n使用 - 代表标准输入,可拼接多个缓存",
|
||
)
|
||
return TextHandleResult(0, istream)
|
||
|
||
# Concatenate all specified sources in order
|
||
parts: list[str] = []
|
||
for arg in args:
|
||
if arg == "-":
|
||
if istream is None:
|
||
return TextHandleResult(2, "标准输入为空(没有管道输入或回复消息)")
|
||
parts.append(istream)
|
||
else:
|
||
if arg not in env.buffers:
|
||
return TextHandleResult(2, f"缓存 {arg} 不存在")
|
||
parts.append(env.buffers[arg])
|
||
|
||
return TextHandleResult(0, "\n".join(parts))
|
||
|
||
|
||
class THRm(TextHandler):
|
||
name = "rm"
|
||
|
||
async def handle(
|
||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||
) -> TextHandleResult:
|
||
if len(args) != 1:
|
||
return TextHandleResult(1, "rm 使用方法:rm <缓存名>")
|
||
buf = args[0]
|
||
if buf == "-":
|
||
buf = istream
|
||
if buf not in env.buffers:
|
||
return TextHandleResult(2, f"缓存 {buf} 不存在")
|
||
del env.buffers[buf]
|
||
return TextHandleResult(0, None)
|
||
|
||
|
||
class THReplace(TextHandler):
|
||
name = "replace"
|
||
keywords = ["sed", "替换"]
|
||
|
||
async def handle(
|
||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||
) -> TextHandleResult:
|
||
# 用法: replace <pattern> <replacement> [text]
|
||
if len(args) < 2:
|
||
return TextHandleResult(1, "用法:replace <正则> <替换内容> [文本]")
|
||
|
||
pattern, repl = args[0], args[1]
|
||
text = (
|
||
istream
|
||
if istream is not None
|
||
else (" ".join(args[2:]) if len(args) > 2 else "")
|
||
)
|
||
|
||
try:
|
||
res = re.sub(pattern, repl, text)
|
||
return TextHandleResult(0, res)
|
||
except Exception as e:
|
||
return TextHandleResult(1, f"正则错误: {str(e)}")
|