From 3da2c2266f74862814ff45edaf6f77d9a2af2166 Mon Sep 17 00:00:00 2001 From: passthem Date: Fri, 24 Oct 2025 01:08:46 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=B4=E6=80=AA=E8=AF=9D=20bot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/plugins/fortune/__init__.py | 55 +++++++++++++++++++ konabot/plugins/fortune/random_text_record.py | 53 ++++++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 konabot/plugins/fortune/__init__.py create mode 100644 konabot/plugins/fortune/random_text_record.py diff --git a/konabot/plugins/fortune/__init__.py b/konabot/plugins/fortune/__init__.py new file mode 100644 index 0000000..072cf40 --- /dev/null +++ b/konabot/plugins/fortune/__init__.py @@ -0,0 +1,55 @@ +import asyncio +from nonebot import get_plugin_config, on_command, on_message +from nonebot.adapters import Bot +from nonebot.adapters.onebot.v11.event import GroupMessageEvent +from nonebot_plugin_alconna import UniMessage, UniMsg +from pydantic import BaseModel + +from konabot.common.path import DATA_PATH + +from .random_text_record import RandomTextManager + + +class FortuneConfig(BaseModel): + plugin_fortune_collect_groups: list[int] = [] + + +fortune_wtf = RandomTextManager(DATA_PATH / "fortune_wtf.txt") +fortune_insert_lock = asyncio.Lock() +fortune_config = get_plugin_config(FortuneConfig) + + +async def is_collect_target(evt: GroupMessageEvent) -> bool: + if evt.group_id not in fortune_config.plugin_fortune_collect_groups: + return False + return True + + +evt_collector = on_message(rule=is_collect_target) + +@evt_collector.handle() +async def _(msg: UniMsg): + txt = msg.extract_plain_text() + if len(txt) > 50: + return + if txt.startswith("说点怪话") or txt.startswith("说些怪话") or txt.startswith("怪话过滤"): + return + async with fortune_insert_lock: + fortune_wtf.insert(txt) + + +cmd_guaihua = on_command("说点怪话", rule=is_collect_target) + +@cmd_guaihua.handle() +async def _(bot: Bot): + await cmd_guaihua.send(await UniMessage().text(fortune_wtf.choice()).export(bot)) + + +cmd_guaihuas = on_command("说些怪话", rule=is_collect_target) + +@cmd_guaihuas.handle() +async def _(bot: Bot): + for _ in range(3): + await cmd_guaihuas.send(await UniMessage().text(fortune_wtf.choice()).export(bot)) + await asyncio.sleep(1) + diff --git a/konabot/plugins/fortune/random_text_record.py b/konabot/plugins/fortune/random_text_record.py new file mode 100644 index 0000000..7e1c6fe --- /dev/null +++ b/konabot/plugins/fortune/random_text_record.py @@ -0,0 +1,53 @@ +import base64 +import random +import time + +from pathlib import Path + + +class RandomTextManager: + _cache: list[tuple[float, str]] + + def __init__(self, fp: Path) -> None: + self.fp = fp + self._cache = [] + if not self.fp.exists(): + self.fp.touch() + else: + self.load() + + def load(self): + self._cache = [] + with self.fp.open("r") as f: + for line in f.readlines(): + if not line.strip(): + continue + if "|" not in line: + continue + ts, cn = line.split("|") + try: + ts = float(ts) + except Exception: + continue + self._cache.append((ts, base64.b64decode(cn).decode("utf-8"))) + + def insert(self, text: str, timestamp: float | None = None): + if timestamp is None: + timestamp = time.time() + with self.fp.open("a") as f: + f.write(str(timestamp) + "|" + base64.b64encode(text.encode("utf-8")).decode() + "\n") + self._cache.append((timestamp, text)) + + def choice(self, now: float | None = None): + contents: list[str] = [] + weights: list[float] = [] + + if now is None: + now = time.time() + + for ts, cn in self._cache: + contents.append(cn) + weights.append((abs(now - ts) + 0.01) ** (-1)) + + return random.choices(contents, weights)[0] +