From 96679033f352f77917e8df0bb339bad51f517dd3 Mon Sep 17 00:00:00 2001 From: passthem Date: Fri, 24 Oct 2025 02:21:56 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8D=E5=86=8D=E6=9C=89=20fortune?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/plugins/fortune/__init__.py | 68 ---------------- konabot/plugins/fortune/random_text_record.py | 78 ------------------- 2 files changed, 146 deletions(-) delete mode 100644 konabot/plugins/fortune/__init__.py delete mode 100644 konabot/plugins/fortune/random_text_record.py diff --git a/konabot/plugins/fortune/__init__.py b/konabot/plugins/fortune/__init__.py deleted file mode 100644 index 736b13e..0000000 --- a/konabot/plugins/fortune/__init__.py +++ /dev/null @@ -1,68 +0,0 @@ -import asyncio -from nonebot import get_plugin_config, on_command, on_message -from nonebot.adapters import Bot -from nonebot.adapters.onebot.v11.event import GroupMessageEvent -from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna -from pydantic import BaseModel - -from konabot.common.nb.is_admin import is_admin -from konabot.common.path import DATA_PATH - -from .random_text_record import RandomTextManager - - -class FortuneConfig(BaseModel): - plugin_fortune_collect_groups: list[int] = [] - - -fortune_wtf = RandomTextManager(DATA_PATH / "fortune_wtf.txt") -fortune_insert_lock = asyncio.Lock() -fortune_config = get_plugin_config(FortuneConfig) - - -async def is_collect_target(evt: GroupMessageEvent) -> bool: - if evt.group_id not in fortune_config.plugin_fortune_collect_groups: - return False - return True - - -evt_collector = on_message(rule=is_collect_target) - -@evt_collector.handle() -async def _(msg: UniMsg): - txt = msg.extract_plain_text() - if len(txt) > 50 or not txt.strip(): - return - if txt.startswith("说点怪话") or txt.startswith("说些怪话") or txt.startswith("怪话过滤"): - return - async with fortune_insert_lock: - fortune_wtf.insert(txt) - - -cmd_guaihua = on_command("说点怪话", rule=is_collect_target) - -@cmd_guaihua.handle() -async def _(bot: Bot): - await cmd_guaihua.send(await UniMessage().text(fortune_wtf.choice()).export(bot)) - - -cmd_guaihuas = on_command("说些怪话", rule=is_collect_target) - -@cmd_guaihuas.handle() -async def _(bot: Bot): - for _ in range(3): - await cmd_guaihuas.send(await UniMessage().text(fortune_wtf.choice()).export(bot)) - await asyncio.sleep(1) - - -cmd_filter_guaihua = on_alconna(Alconna( - "怪话过滤", - Args["keyword", str], -), rule=is_admin) - -@cmd_filter_guaihua.handle() -async def _(keyword: str, bot: Bot): - async with fortune_insert_lock: - c = fortune_wtf.filter_out(keyword) - await cmd_filter_guaihua.send(await UniMessage().text(f"删除了 {c} 条怪话").export(bot)) - diff --git a/konabot/plugins/fortune/random_text_record.py b/konabot/plugins/fortune/random_text_record.py deleted file mode 100644 index ee3b385..0000000 --- a/konabot/plugins/fortune/random_text_record.py +++ /dev/null @@ -1,78 +0,0 @@ -import base64 -import math -import random -import time - -from pathlib import Path - - -def dec_func(t: float) -> float: - if t < 86400: - return 0.5 * (1 + math.tanh(t / 43200 - 1)) - return math.exp(-0.00000043 * (t - 86400)) - - -class RandomTextManager: - _cache: list[tuple[float, str]] - - def __init__(self, fp: Path) -> None: - self.fp = fp - self._cache = [] - if not self.fp.exists(): - self.fp.touch() - else: - self.load() - - def load(self): - self._cache = [] - with self.fp.open("r") as f: - for line in f.readlines(): - if not line.strip(): - continue - if "|" not in line: - continue - ts, cn = line.split("|") - try: - ts = float(ts) - except Exception: - continue - self._cache.append((ts, base64.b64decode(cn).decode("utf-8"))) - - def save(self): - lines = [ - str(ts) + "|" + base64.b64encode(cn.encode("utf-8")).decode() - for ts, cn in self._cache - ] - with self.fp.open("w") as f: - f.writelines(lines) - - def insert(self, text: str, timestamp: float | None = None): - if timestamp is None: - timestamp = time.time() - with self.fp.open("a") as f: - f.write(str(timestamp) + "|" + base64.b64encode(text.encode("utf-8")).decode() + "\n") - self._cache.append((timestamp, text)) - - def choice(self, now: float | None = None): - contents: list[str] = [] - weights: list[float] = [] - - if now is None: - now = time.time() - - for ts, cn in self._cache: - contents.append(cn) - # weights.append((abs(now - ts) + 0.01) ** (-1)) - weights.append(dec_func(now - ts)) - - return random.choices(contents, weights)[0] - - def filter_out(self, keyword: str): - len1 = len(self._cache) - self._cache = [ - (ts, cn) for ts, cn in self._cache - if keyword not in cn - ] - self.save() - return len1 - len(self._cache) -