diff --git a/konabot/plugins/handle_text/__init__.py b/konabot/plugins/handle_text/__init__.py index de48905..b40cb08 100644 --- a/konabot/plugins/handle_text/__init__.py +++ b/konabot/plugins/handle_text/__init__.py @@ -11,7 +11,7 @@ from nonebot.adapters.onebot.v11.message import Message as OB11Message from konabot.common.apis.ali_content_safety import AlibabaGreen from konabot.common.longtask import DepLongTaskTarget from konabot.plugins.handle_text.base import PipelineRunner, TextHandlerEnvironment, register_text_handlers -from konabot.plugins.handle_text.handlers.encoding_handlers import THBase64, THCaesar, THReverse +from konabot.plugins.handle_text.handlers.encoding_handlers import THAlign, THAlphaConv, THB64Hex, THBase64, THBaseConv, THCaesar, THReverse from konabot.plugins.handle_text.handlers.random_handlers import THShuffle from konabot.plugins.handle_text.handlers.unix_handlers import THCat, THEcho, THReplace, THRm @@ -73,6 +73,10 @@ async def _(): THBase64(), THCaesar(), THReverse(), + THBaseConv(), + THAlphaConv(), + THB64Hex(), + THAlign(), ) logger.info(f"注册了 TextHandler:{PipelineRunner.get_runner().handlers}") diff --git a/konabot/plugins/handle_text/base.py b/konabot/plugins/handle_text/base.py index 5765ff3..cb65578 100644 --- a/konabot/plugins/handle_text/base.py +++ b/konabot/plugins/handle_text/base.py @@ -1,3 +1,5 @@ +import asyncio + from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum @@ -33,6 +35,17 @@ class TextHandler(ABC): return f"<{self.__class__.__name__}: {self.name} [{''.join(self.keywords)}]>" +class TextHandlerSync(TextHandler): + @abstractmethod + def handle_sync(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + ... + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + def _hs(): + return self.handle_sync(env, istream, args) + return await asyncio.to_thread(_hs) + + @dataclass class PipelineCommand: handler: TextHandler diff --git a/konabot/plugins/handle_text/handlers/encoding_handlers.py b/konabot/plugins/handle_text/handlers/encoding_handlers.py index 384e778..1699107 100644 --- a/konabot/plugins/handle_text/handlers/encoding_handlers.py +++ b/konabot/plugins/handle_text/handlers/encoding_handlers.py @@ -33,9 +33,9 @@ class THBase64(TextHandler): try: if mode == "encode": - res = base64.b64encode(text.encode(encoding)).decode("ascii") + res = base64.b64encode(text.encode(encoding, "replace")).decode("ascii") else: - res = base64.b64decode(text.encode("ascii")).decode(encoding) + res = base64.b64decode(text.encode("ascii")).decode(encoding, "replace") return TextHandleResult(0, res) except Exception as e: return TextHandleResult(1, f"Base64 转换失败: {str(e)}") @@ -45,15 +45,21 @@ class THCaesar(TextHandler): name = "caesar" keywords = ["凯撒", "rot"] - async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: # 用法: caesar [text] shift = int(args[0]) if args else 13 - text = istream if istream is not None else (" ".join(args[1:]) if len(args) > 1 else "") - + text = ( + istream + if istream is not None + else (" ".join(args[1:]) if len(args) > 1 else "") + ) + def _shift(char): if not char.isalpha(): return char - start = ord('A') if char.isupper() else ord('a') + start = ord("A") if char.isupper() else ord("a") return chr((ord(char) - start + shift) % 26 + start) res = "".join(_shift(c) for c in text) @@ -64,7 +70,277 @@ class THReverse(TextHandler): name = "reverse" keywords = ["rev", "反转"] - async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: text = istream if istream is not None else (" ".join(args) if args else "") return TextHandleResult(0, text[::-1]) + +class THMorse(TextHandler): + name = "morse" + keywords = ["摩斯", "decode_morse"] + + # 国际摩斯电码表 (部分) + MORSE_EN = { + ".-": "A", + "-...": "B", + "-.-.": "C", + "-..": "D", + ".": "E", + "..-.": "F", + "--.": "G", + "....": "H", + "..": "I", + ".---": "J", + "-.-": "K", + ".-..": "L", + "--": "M", + "-.": "N", + "---": "O", + ".--.": "P", + "--.-": "Q", + ".-.": "R", + "...": "S", + "-": "T", + "..-": "U", + "...-": "V", + ".--": "W", + "-..-": "X", + "-.--": "Y", + "--..": "Z", + "-----": "0", + ".----": "1", + "..---": "2", + "...--": "3", + "....-": "4", + ".....": "5", + "-....": "6", + "--...": "7", + "---..": "8", + "----.": "9", + "/": " ", + } + + # 日文和文摩斯电码表 (Wabun Code) + MORSE_JP = { + "--.--": "ア", + ".-": "イ", + "..-": "ウ", + "-.---": "エ", + ".-...": "オ", + ".-..": "カ", + "-.-..": "キ", + "...-": "ク", + "-.--": "ケ", + "----": "コ", + "-.-.-": "サ", + "--.-.": "シ", + "---.-": "ス", + ".---.": "セ", + "---.": "ソ", + "-.": "タ", + "..-.": "チ", + ".--.": "ツ", + ".-.--": "テ", + "..-..": "ト", + ".-.": "ナ", + "-.-.": "ニ", + "....": "ヌ", + "--.-": "ネ", + "..--": "ノ", + "-...": "ハ", + "--..-": "ヒ", + "--..": "フ", + ".": "ヘ", + "-..": "ホ", + "-..-": "マ", + "..-.-": "ミ", + "-": "ム", + "-...-": "メ", + "-..-.": "モ", + ".--": "ヤ", + "-..--": "ユ", + "--": "ヨ", + "...": "ラ", + "--.": "リ", + "-.--.": "ル", + "---": "レ", + ".-.-": "ロ", + "-.-": "ワ", + ".-..-": "ヰ", + ".--..": "ヱ", + ".---": "ヲ", + ".-.-.": "ン", + "-..-.--.": "ッ", + "-..-.--": "ャ", + "-..--..--": "ュ", + "-..---": "ョ", + "-..---.--": "ァ", + "-..-.-": "ィ", + "-..-..-": "ゥ", + "-..--.---": "ェ", + "-..-.-...": "ォ", + "-..-.-..": "ヵ", + "-..--.--": "ヶ", + "..": "゛", + "..--.": "゜", + ".--.-": "ー", + ".-.-.-": "、", + ".-.-..": "。", + "-.--.-": "(", + ".-..-.": ")", + } + + async def handle( + self, env: TextHandlerEnvironment, istream: str | None, args: list[str] + ) -> TextHandleResult: + """ + 用法: morse [text] + 例子: morse en .... . .-.. .-.. --- + """ + if not args and istream is None: + return TextHandleResult( + 1, "用法:morse <电码>。使用空格分隔字符,/ 分隔单词。" + ) + + mode = args[0].lower() if args else "en" + text = ( + istream + if istream is not None + else (" ".join(args[1:]) if len(args) > 1 else "") + ) + + if not text: + return TextHandleResult(1, "请输入电码内容") + + # 选择词典 + mapping = self.MORSE_JP if mode == "jp" else self.MORSE_EN + + try: + # 按空格切分符号,过滤掉多余空位 + tokens = [t for t in text.split(" ") if t] + decoded = [] + + for token in tokens: + # 处理部分解谜中可能出现的换行或特殊斜杠 + token = token.strip() + if token in mapping: + decoded.append(mapping[token]) + else: + decoded.append("[?]") # 无法识别的符号 + + return TextHandleResult(0, "".join(decoded)) + except Exception as e: + return TextHandleResult(1, f"摩斯电码解析出错: {str(e)}") + + +class THBaseConv(TextHandler): + name = "baseconv" + keywords = ["进制转换"] + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + # 用法: baseconv [text] + if len(args) < 2 and istream is None: + return TextHandleResult(1, "用法:baseconv <原进制> <目标进制> [文本]") + + src_base = int(args[0]) + dst_base = int(args[1]) + val_str = istream if istream is not None else "".join(args[2:]) + + try: + # 先转为 10 进制中间量,再转为目标进制 + decimal_val = int(val_str, src_base) + + if dst_base == 10: + res = str(decimal_val) + elif dst_base == 16: + res = hex(decimal_val)[2:] + else: + # 通用任意进制转换逻辑 + chars = "0123456789abcdefghijklmnopqrstuvwxyz" + res = "" + temp = decimal_val + while temp > 0: + res = chars[temp % dst_base] + res + temp //= dst_base + res = res or "0" + + return TextHandleResult(0, res.upper() if dst_base == 16 else res) + except Exception as e: + return TextHandleResult(1, f"转换失败: {str(e)}") + + +class THAlphaConv(TextHandler): + name = "alphaconv" + keywords = ["字母表转换"] + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + # 用法: alphaconv [text] + if len(args) < 2: + return TextHandleResult(1, "用法:alphaconv <字母表> [文本]") + + alphabet = args[0] + mode = args[1].lower() + base = len(alphabet) + text = istream if istream is not None else "".join(args[2:]) + + try: + if mode == "to_hex": + # 自定义字母表 -> 10进制 -> 16进制 + val = 0 + for char in text: + val = val * base + alphabet.index(char) + return TextHandleResult(0, hex(val)[2:]) + else: + # 16进制 -> 10进制 -> 自定义字母表 + val = int(text, 16) + res = "" + while val > 0: + res = alphabet[val % base] + res + val //= base + return TextHandleResult(0, res or alphabet[0]) + except Exception as e: + return TextHandleResult(1, f"字母表转换失败: {str(e)}") + + +class THB64Hex(TextHandler): + name = "b64hex" + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + # 用法: b64hex [text] + mode = args[0] if args else "dec" + text = istream if istream is not None else "".join(args[1:]) + + try: + if mode == "enc": # Hex -> B64 + raw_bytes = bytes.fromhex(text) + res = base64.b64encode(raw_bytes).decode() + else: # B64 -> Hex + raw_bytes = base64.b64decode(text) + res = raw_bytes.hex() + return TextHandleResult(0, res) + except Exception as e: + return TextHandleResult(1, f"Base64-Hex 转换失败: {str(e)}") + + +class THAlign(TextHandler): + name = "align" + keywords = ["format", "排版"] + + async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + # 用法: align [text] + # 例子: align 2 8 (即 2个一组,8组一行,类似 0011 2233...) + n = int(args[0]) if len(args) > 0 else 2 + m = int(args[1]) if len(args) > 1 else 8 + text = istream if istream is not None else "".join(args[2:]) + + # 移除现有空格换行以便重新排版 + text = "".join(text.split()) + + chunks = [text[i:i+n] for i in range(0, len(text), n)] + lines = [] + for i in range(0, len(chunks), m): + lines.append(" ".join(chunks[i:i+m])) + + return TextHandleResult(0, "\n".join(lines)) diff --git a/konabot/plugins/handle_text/handlers/random_handlers.py b/konabot/plugins/handle_text/handlers/random_handlers.py index 227d9b2..8ce4542 100644 --- a/konabot/plugins/handle_text/handlers/random_handlers.py +++ b/konabot/plugins/handle_text/handlers/random_handlers.py @@ -1,5 +1,5 @@ import random -from konabot.plugins.handle_text.base import TextHandleResult, TextHandler, TextHandlerEnvironment +from konabot.plugins.handle_text.base import TextHandleResult, TextHandler, TextHandlerEnvironment, TextHandlerSync class THShuffle(TextHandler): @@ -19,3 +19,19 @@ class THShuffle(TextHandler): random.shuffle(w) return TextHandleResult(0, ''.join(w)) + +class THSorted(TextHandlerSync): + name = "sort" + keywords = ["排序"] + + def handle_sync(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: + if istream is not None: + w = istream + elif len(args) == 0: + return TextHandleResult(1, "使用方法:排序 <待排序的文本>,或者使用管道符传入待打乱的文本") + else: + w = args[0] + args = args[1:] + + return TextHandleResult(0, ''.join(sorted([*w]))) +