Files
konabot/konabot/plugins/handle_text/handlers/unix_handlers.py

162 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from konabot.plugins.handle_text.base import (
TextHandleResult,
TextHandler,
TextHandlerEnvironment,
)
class THEcho(TextHandler):
name = "echo"
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
# echo 不读 stdin只输出参数Unix 语义)
# 无参数时输出空行(与 Unix echo 行为一致)
return TextHandleResult(0, "\n".join(args))
class THCat(TextHandler):
name = "cat"
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
if len(args) == 0:
if istream is None:
return TextHandleResult(
1,
"cat 使用方法cat [缓存名 ...]\n使用 - 代表标准输入,可拼接多个缓存",
)
return TextHandleResult(0, istream)
parts: list[str] = []
for arg in args:
if arg == "-":
if istream is None:
return TextHandleResult(2, "标准输入为空(没有管道输入或回复消息)")
parts.append(istream)
else:
if arg not in env.buffers:
return TextHandleResult(2, f"缓存 {arg} 不存在")
parts.append(env.buffers[arg])
return TextHandleResult(0, "\n".join(parts))
class THRm(TextHandler):
name = "rm"
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
if len(args) != 1:
return TextHandleResult(1, "rm 使用方法rm <缓存名>")
buf = args[0]
if buf == "-":
buf = istream
if buf not in env.buffers:
return TextHandleResult(2, f"缓存 {buf} 不存在")
del env.buffers[buf]
return TextHandleResult(0, None)
class THReplace(TextHandler):
name = "replace"
keywords = ["sed", "替换"]
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
if len(args) < 2:
return TextHandleResult(1, "用法replace <正则> <替换内容> [文本]")
pattern, repl = args[0], args[1]
text = (
istream
if istream is not None
else (" ".join(args[2:]) if len(args) > 2 else "")
)
try:
res = re.sub(pattern, repl, text)
return TextHandleResult(0, res)
except Exception as e:
return TextHandleResult(1, f"正则错误: {str(e)}")
class THTrue(TextHandler):
name = "true"
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
return TextHandleResult(0, istream)
class THFalse(TextHandler):
name = "false"
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
return TextHandleResult(1, None)
class THTest(TextHandler):
name = "test"
keywords = ["["]
def _bool_result(self, value: bool) -> TextHandleResult:
return TextHandleResult(0 if value else 1, None)
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
expr = list(args)
# 支持方括号语法:[ expr ] 会自动移除末尾的 ]
if expr and expr[-1] == "]":
expr = expr[:-1]
if not expr:
return TextHandleResult(1, None)
if len(expr) == 1:
return self._bool_result(len(expr[0]) > 0)
if len(expr) == 2:
op, value = expr
if op == "-n":
return self._bool_result(len(value) > 0)
if op == "-z":
return self._bool_result(len(value) == 0)
return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}")
if len(expr) == 3:
left, op, right = expr
if op == "=":
return self._bool_result(left == right)
if op == "!=":
return self._bool_result(left != right)
if op in {"-eq", "-ne", "-gt", "-ge", "-lt", "-le"}:
try:
li = int(left)
ri = int(right)
except ValueError:
return TextHandleResult(2, "test 的数字比较参数必须是整数")
mapping = {
"-eq": li == ri,
"-ne": li != ri,
"-gt": li > ri,
"-ge": li >= ri,
"-lt": li < ri,
"-le": li <= ri,
}
return self._bool_result(mapping[op])
return TextHandleResult(2, f"test 不支持的操作符: {op}")
return TextHandleResult(2, f"test 不支持的表达式: {' '.join(args)}")