添加好多好多 crypto
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-01-10 00:39:45 +08:00
parent 3adbd38d65
commit 16a55ae69a
4 changed files with 318 additions and 9 deletions

View File

@ -11,7 +11,7 @@ from nonebot.adapters.onebot.v11.message import Message as OB11Message
from konabot.common.apis.ali_content_safety import AlibabaGreen
from konabot.common.longtask import DepLongTaskTarget
from konabot.plugins.handle_text.base import PipelineRunner, TextHandlerEnvironment, register_text_handlers
from konabot.plugins.handle_text.handlers.encoding_handlers import THBase64, THCaesar, THReverse
from konabot.plugins.handle_text.handlers.encoding_handlers import THAlign, THAlphaConv, THB64Hex, THBase64, THBaseConv, THCaesar, THReverse
from konabot.plugins.handle_text.handlers.random_handlers import THShuffle
from konabot.plugins.handle_text.handlers.unix_handlers import THCat, THEcho, THReplace, THRm
@ -73,6 +73,10 @@ async def _():
THBase64(),
THCaesar(),
THReverse(),
THBaseConv(),
THAlphaConv(),
THB64Hex(),
THAlign(),
)
logger.info(f"注册了 TextHandler{PipelineRunner.get_runner().handlers}")

View File

@ -1,3 +1,5 @@
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
@ -33,6 +35,17 @@ class TextHandler(ABC):
return f"<{self.__class__.__name__}: {self.name} [{''.join(self.keywords)}]>"
class TextHandlerSync(TextHandler):
@abstractmethod
def handle_sync(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
...
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
def _hs():
return self.handle_sync(env, istream, args)
return await asyncio.to_thread(_hs)
@dataclass
class PipelineCommand:
handler: TextHandler

View File

@ -33,9 +33,9 @@ class THBase64(TextHandler):
try:
if mode == "encode":
res = base64.b64encode(text.encode(encoding)).decode("ascii")
res = base64.b64encode(text.encode(encoding, "replace")).decode("ascii")
else:
res = base64.b64decode(text.encode("ascii")).decode(encoding)
res = base64.b64decode(text.encode("ascii")).decode(encoding, "replace")
return TextHandleResult(0, res)
except Exception as e:
return TextHandleResult(1, f"Base64 转换失败: {str(e)}")
@ -45,15 +45,21 @@ class THCaesar(TextHandler):
name = "caesar"
keywords = ["凯撒", "rot"]
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
# 用法: caesar <shift> [text]
shift = int(args[0]) if args else 13
text = istream if istream is not None else (" ".join(args[1:]) if len(args) > 1 else "")
text = (
istream
if istream is not None
else (" ".join(args[1:]) if len(args) > 1 else "")
)
def _shift(char):
if not char.isalpha():
return char
start = ord('A') if char.isupper() else ord('a')
start = ord("A") if char.isupper() else ord("a")
return chr((ord(char) - start + shift) % 26 + start)
res = "".join(_shift(c) for c in text)
@ -64,7 +70,277 @@ class THReverse(TextHandler):
name = "reverse"
keywords = ["rev", "反转"]
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
text = istream if istream is not None else (" ".join(args) if args else "")
return TextHandleResult(0, text[::-1])
class THMorse(TextHandler):
name = "morse"
keywords = ["摩斯", "decode_morse"]
# 国际摩斯电码表 (部分)
MORSE_EN = {
".-": "A",
"-...": "B",
"-.-.": "C",
"-..": "D",
".": "E",
"..-.": "F",
"--.": "G",
"....": "H",
"..": "I",
".---": "J",
"-.-": "K",
".-..": "L",
"--": "M",
"-.": "N",
"---": "O",
".--.": "P",
"--.-": "Q",
".-.": "R",
"...": "S",
"-": "T",
"..-": "U",
"...-": "V",
".--": "W",
"-..-": "X",
"-.--": "Y",
"--..": "Z",
"-----": "0",
".----": "1",
"..---": "2",
"...--": "3",
"....-": "4",
".....": "5",
"-....": "6",
"--...": "7",
"---..": "8",
"----.": "9",
"/": " ",
}
# 日文和文摩斯电码表 (Wabun Code)
MORSE_JP = {
"--.--": "",
".-": "",
"..-": "",
"-.---": "",
".-...": "",
".-..": "",
"-.-..": "",
"...-": "",
"-.--": "",
"----": "",
"-.-.-": "",
"--.-.": "",
"---.-": "",
".---.": "",
"---.": "",
"-.": "",
"..-.": "",
".--.": "",
".-.--": "",
"..-..": "",
".-.": "",
"-.-.": "",
"....": "",
"--.-": "",
"..--": "",
"-...": "",
"--..-": "",
"--..": "",
".": "",
"-..": "",
"-..-": "",
"..-.-": "",
"-": "",
"-...-": "",
"-..-.": "",
".--": "",
"-..--": "",
"--": "",
"...": "",
"--.": "",
"-.--.": "",
"---": "",
".-.-": "",
"-.-": "",
".-..-": "",
".--..": "",
".---": "",
".-.-.": "",
"-..-.--.": "",
"-..-.--": "",
"-..--..--": "",
"-..---": "",
"-..---.--": "",
"-..-.-": "",
"-..-..-": "",
"-..--.---": "",
"-..-.-...": "",
"-..-.-..": "",
"-..--.--": "",
"..": "",
"..--.": "",
".--.-": "",
".-.-.-": "",
".-.-..": "",
"-.--.-": "",
".-..-.": "",
}
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
"""
用法: morse <mode: en|jp> [text]
例子: morse en .... . .-.. .-.. ---
"""
if not args and istream is None:
return TextHandleResult(
1, "用法morse <en|jp> <电码>。使用空格分隔字符,/ 分隔单词。"
)
mode = args[0].lower() if args else "en"
text = (
istream
if istream is not None
else (" ".join(args[1:]) if len(args) > 1 else "")
)
if not text:
return TextHandleResult(1, "请输入电码内容")
# 选择词典
mapping = self.MORSE_JP if mode == "jp" else self.MORSE_EN
try:
# 按空格切分符号,过滤掉多余空位
tokens = [t for t in text.split(" ") if t]
decoded = []
for token in tokens:
# 处理部分解谜中可能出现的换行或特殊斜杠
token = token.strip()
if token in mapping:
decoded.append(mapping[token])
else:
decoded.append("[?]") # 无法识别的符号
return TextHandleResult(0, "".join(decoded))
except Exception as e:
return TextHandleResult(1, f"摩斯电码解析出错: {str(e)}")
class THBaseConv(TextHandler):
name = "baseconv"
keywords = ["进制转换"]
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
# 用法: baseconv <src_base> <dst_base> [text]
if len(args) < 2 and istream is None:
return TextHandleResult(1, "用法baseconv <原进制> <目标进制> [文本]")
src_base = int(args[0])
dst_base = int(args[1])
val_str = istream if istream is not None else "".join(args[2:])
try:
# 先转为 10 进制中间量,再转为目标进制
decimal_val = int(val_str, src_base)
if dst_base == 10:
res = str(decimal_val)
elif dst_base == 16:
res = hex(decimal_val)[2:]
else:
# 通用任意进制转换逻辑
chars = "0123456789abcdefghijklmnopqrstuvwxyz"
res = ""
temp = decimal_val
while temp > 0:
res = chars[temp % dst_base] + res
temp //= dst_base
res = res or "0"
return TextHandleResult(0, res.upper() if dst_base == 16 else res)
except Exception as e:
return TextHandleResult(1, f"转换失败: {str(e)}")
class THAlphaConv(TextHandler):
name = "alphaconv"
keywords = ["字母表转换"]
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
# 用法: alphaconv <alphabet> <to_hex|from_hex> [text]
if len(args) < 2:
return TextHandleResult(1, "用法alphaconv <字母表> <to_hex|from_hex> [文本]")
alphabet = args[0]
mode = args[1].lower()
base = len(alphabet)
text = istream if istream is not None else "".join(args[2:])
try:
if mode == "to_hex":
# 自定义字母表 -> 10进制 -> 16进制
val = 0
for char in text:
val = val * base + alphabet.index(char)
return TextHandleResult(0, hex(val)[2:])
else:
# 16进制 -> 10进制 -> 自定义字母表
val = int(text, 16)
res = ""
while val > 0:
res = alphabet[val % base] + res
val //= base
return TextHandleResult(0, res or alphabet[0])
except Exception as e:
return TextHandleResult(1, f"字母表转换失败: {str(e)}")
class THB64Hex(TextHandler):
name = "b64hex"
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
# 用法: b64hex <enc|dec> [text]
mode = args[0] if args else "dec"
text = istream if istream is not None else "".join(args[1:])
try:
if mode == "enc": # Hex -> B64
raw_bytes = bytes.fromhex(text)
res = base64.b64encode(raw_bytes).decode()
else: # B64 -> Hex
raw_bytes = base64.b64decode(text)
res = raw_bytes.hex()
return TextHandleResult(0, res)
except Exception as e:
return TextHandleResult(1, f"Base64-Hex 转换失败: {str(e)}")
class THAlign(TextHandler):
name = "align"
keywords = ["format", "排版"]
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
# 用法: align <n:每组长度> <m:每行组数> [text]
# 例子: align 2 8 (即 2个一组8组一行类似 0011 2233...)
n = int(args[0]) if len(args) > 0 else 2
m = int(args[1]) if len(args) > 1 else 8
text = istream if istream is not None else "".join(args[2:])
# 移除现有空格换行以便重新排版
text = "".join(text.split())
chunks = [text[i:i+n] for i in range(0, len(text), n)]
lines = []
for i in range(0, len(chunks), m):
lines.append(" ".join(chunks[i:i+m]))
return TextHandleResult(0, "\n".join(lines))

View File

@ -1,5 +1,5 @@
import random
from konabot.plugins.handle_text.base import TextHandleResult, TextHandler, TextHandlerEnvironment
from konabot.plugins.handle_text.base import TextHandleResult, TextHandler, TextHandlerEnvironment, TextHandlerSync
class THShuffle(TextHandler):
@ -19,3 +19,19 @@ class THShuffle(TextHandler):
random.shuffle(w)
return TextHandleResult(0, ''.join(w))
class THSorted(TextHandlerSync):
name = "sort"
keywords = ["排序"]
def handle_sync(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
if istream is not None:
w = istream
elif len(args) == 0:
return TextHandleResult(1, "使用方法:排序 <待排序的文本>,或者使用管道符传入待打乱的文本")
else:
w = args[0]
args = args[1:]
return TextHandleResult(0, ''.join(sorted([*w])))