Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a30c7b8093 | |||
| 3da2c2266f | |||
| 96e3c3fe17 | |||
| 851c9eb3c7 | |||
| 11269b2a5a |
6
bot.py
6
bot.py
@ -20,9 +20,13 @@ env_enable_minecraft = os.environ.get("ENABLE_MINECRAFT", "none")
|
||||
|
||||
|
||||
def main():
|
||||
if env.upper() == 'DEBUG' or env.upper() == 'DEV':
|
||||
console_log_level = 'DEBUG'
|
||||
else:
|
||||
console_log_level = 'INFO'
|
||||
init_logger(LOG_PATH, [
|
||||
BotExceptionMessage,
|
||||
])
|
||||
], console_log_level=console_log_level)
|
||||
|
||||
nonebot.init()
|
||||
|
||||
|
||||
@ -18,7 +18,7 @@ def file_exception_filter(
|
||||
否则,返回 True(允许记录)。
|
||||
"""
|
||||
exception_info = record.get("exception")
|
||||
|
||||
|
||||
if exception_info:
|
||||
exception_type = exception_info[0]
|
||||
|
||||
@ -29,8 +29,9 @@ def file_exception_filter(
|
||||
|
||||
|
||||
def init_logger(
|
||||
log_dir: Path,
|
||||
ignored_exceptions: List[Type[Exception]]
|
||||
log_dir: Path,
|
||||
ignored_exceptions: List[Type[Exception]],
|
||||
console_log_level: str = "INFO",
|
||||
) -> None:
|
||||
"""
|
||||
配置全局 Loguru Logger。
|
||||
@ -47,7 +48,7 @@ def init_logger(
|
||||
|
||||
logger.add(
|
||||
sys.stderr,
|
||||
level="INFO",
|
||||
level=console_log_level,
|
||||
colorize=True,
|
||||
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
|
||||
)
|
||||
@ -76,4 +77,4 @@ def init_logger(
|
||||
)
|
||||
|
||||
logger.info("Loguru Logger 初始化完成!")
|
||||
logger.info(f"控制台日志级别: INFO")
|
||||
logger.info(f"控制台日志级别: {console_log_level}")
|
||||
|
||||
@ -25,6 +25,7 @@ from pydantic import BaseModel, ValidationError
|
||||
from .path import DATA_PATH
|
||||
|
||||
LONGTASK_DATA_DIR = DATA_PATH / "longtasks.json"
|
||||
QQ_PRIVATE_CHAT_CHANNEL_PREFIX = "_CHANNEL_QQ_PRIVATE_"
|
||||
|
||||
|
||||
class LongTaskTarget(BaseModel):
|
||||
@ -65,7 +66,7 @@ class LongTaskTarget(BaseModel):
|
||||
} BOT_CLASS={bot.__class__.__name__}"
|
||||
)
|
||||
return False
|
||||
if self.channel_id == "":
|
||||
if self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX):
|
||||
# 私聊模式
|
||||
await bot.send_private_msg(
|
||||
user_id=int(self.target_id),
|
||||
@ -196,7 +197,7 @@ async def get_long_task_target(event: BaseEvent, bot: BaseBot) -> LongTaskTarget
|
||||
return LongTaskTarget(
|
||||
platform="qq",
|
||||
self_id=str(event.self_id),
|
||||
channel_id="",
|
||||
channel_id=f"{QQ_PRIVATE_CHAT_CHANNEL_PREFIX}{event.self_id}",
|
||||
target_id=str(event.user_id),
|
||||
)
|
||||
if isinstance(event, ConsoleMessageEvent):
|
||||
|
||||
9
konabot/docs/sys/怪话过滤.txt
Normal file
9
konabot/docs/sys/怪话过滤.txt
Normal file
@ -0,0 +1,9 @@
|
||||
指令介绍
|
||||
怪话过滤 - 去除含有关键词的怪话
|
||||
|
||||
使用方法
|
||||
`怪话过滤 说的道理`
|
||||
去除所有含有“说的道理”的怪话
|
||||
|
||||
另见
|
||||
怪话(1)
|
||||
12
konabot/docs/user/怪话.txt
Normal file
12
konabot/docs/user/怪话.txt
Normal file
@ -0,0 +1,12 @@
|
||||
指令介绍
|
||||
说点怪话/说些怪话 - 让 BOT 学群友胡言乱语
|
||||
|
||||
适用范围
|
||||
为保证安全,只有少数授权的群聊可以使用该指令
|
||||
|
||||
使用方法
|
||||
`说点怪话 今天吃什么`
|
||||
期待 Bot 会回答你什么吧
|
||||
|
||||
`说些怪话 明天不想上体育课`
|
||||
Bot 会回复你三句怪话
|
||||
68
konabot/plugins/fortune/__init__.py
Normal file
68
konabot/plugins/fortune/__init__.py
Normal file
@ -0,0 +1,68 @@
|
||||
import asyncio
|
||||
from nonebot import get_plugin_config, on_command, on_message
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.common.nb.is_admin import is_admin
|
||||
from konabot.common.path import DATA_PATH
|
||||
|
||||
from .random_text_record import RandomTextManager
|
||||
|
||||
|
||||
class FortuneConfig(BaseModel):
|
||||
plugin_fortune_collect_groups: list[int] = []
|
||||
|
||||
|
||||
fortune_wtf = RandomTextManager(DATA_PATH / "fortune_wtf.txt")
|
||||
fortune_insert_lock = asyncio.Lock()
|
||||
fortune_config = get_plugin_config(FortuneConfig)
|
||||
|
||||
|
||||
async def is_collect_target(evt: GroupMessageEvent) -> bool:
|
||||
if evt.group_id not in fortune_config.plugin_fortune_collect_groups:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
evt_collector = on_message(rule=is_collect_target)
|
||||
|
||||
@evt_collector.handle()
|
||||
async def _(msg: UniMsg):
|
||||
txt = msg.extract_plain_text()
|
||||
if len(txt) > 50:
|
||||
return
|
||||
if txt.startswith("说点怪话") or txt.startswith("说些怪话") or txt.startswith("怪话过滤"):
|
||||
return
|
||||
async with fortune_insert_lock:
|
||||
fortune_wtf.insert(txt)
|
||||
|
||||
|
||||
cmd_guaihua = on_command("说点怪话", rule=is_collect_target)
|
||||
|
||||
@cmd_guaihua.handle()
|
||||
async def _(bot: Bot):
|
||||
await cmd_guaihua.send(await UniMessage().text(fortune_wtf.choice()).export(bot))
|
||||
|
||||
|
||||
cmd_guaihuas = on_command("说些怪话", rule=is_collect_target)
|
||||
|
||||
@cmd_guaihuas.handle()
|
||||
async def _(bot: Bot):
|
||||
for _ in range(3):
|
||||
await cmd_guaihuas.send(await UniMessage().text(fortune_wtf.choice()).export(bot))
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
cmd_filter_guaihua = on_alconna(Alconna(
|
||||
"怪话过滤",
|
||||
Args["keyword", str],
|
||||
), rule=is_admin)
|
||||
|
||||
@cmd_filter_guaihua.handle()
|
||||
async def _(keyword: str, bot: Bot):
|
||||
async with fortune_insert_lock:
|
||||
c = fortune_wtf.filter_out(keyword)
|
||||
await cmd_filter_guaihua.send(await UniMessage().text(f"删除了 {c} 条怪话").export(bot))
|
||||
|
||||
70
konabot/plugins/fortune/random_text_record.py
Normal file
70
konabot/plugins/fortune/random_text_record.py
Normal file
@ -0,0 +1,70 @@
|
||||
import base64
|
||||
import random
|
||||
import time
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class RandomTextManager:
|
||||
_cache: list[tuple[float, str]]
|
||||
|
||||
def __init__(self, fp: Path) -> None:
|
||||
self.fp = fp
|
||||
self._cache = []
|
||||
if not self.fp.exists():
|
||||
self.fp.touch()
|
||||
else:
|
||||
self.load()
|
||||
|
||||
def load(self):
|
||||
self._cache = []
|
||||
with self.fp.open("r") as f:
|
||||
for line in f.readlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
if "|" not in line:
|
||||
continue
|
||||
ts, cn = line.split("|")
|
||||
try:
|
||||
ts = float(ts)
|
||||
except Exception:
|
||||
continue
|
||||
self._cache.append((ts, base64.b64decode(cn).decode("utf-8")))
|
||||
|
||||
def save(self):
|
||||
lines = [
|
||||
str(ts) + "|" + base64.b64encode(cn.encode("utf-8")).decode()
|
||||
for ts, cn in self._cache
|
||||
]
|
||||
with self.fp.open("w") as f:
|
||||
f.writelines(lines)
|
||||
|
||||
def insert(self, text: str, timestamp: float | None = None):
|
||||
if timestamp is None:
|
||||
timestamp = time.time()
|
||||
with self.fp.open("a") as f:
|
||||
f.write(str(timestamp) + "|" + base64.b64encode(text.encode("utf-8")).decode() + "\n")
|
||||
self._cache.append((timestamp, text))
|
||||
|
||||
def choice(self, now: float | None = None):
|
||||
contents: list[str] = []
|
||||
weights: list[float] = []
|
||||
|
||||
if now is None:
|
||||
now = time.time()
|
||||
|
||||
for ts, cn in self._cache:
|
||||
contents.append(cn)
|
||||
weights.append((abs(now - ts) + 0.01) ** (-1))
|
||||
|
||||
return random.choices(contents, weights)[0]
|
||||
|
||||
def filter_out(self, keyword: str):
|
||||
len1 = len(self._cache)
|
||||
self._cache = [
|
||||
(ts, cn) for ts, cn in self._cache
|
||||
if keyword not in cn
|
||||
]
|
||||
self.save()
|
||||
return len1 - len(self._cache)
|
||||
|
||||
44
konabot/plugins/no_luowen.py
Normal file
44
konabot/plugins/no_luowen.py
Normal file
@ -0,0 +1,44 @@
|
||||
import nonebot
|
||||
|
||||
from nonebot.adapters.onebot.v11.bot import Bot
|
||||
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
|
||||
from nonebot_plugin_alconna import UniMsg, UniMessage
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class NoLuowenConfig(BaseModel):
|
||||
plugin_noluowen_qqid: int = -1
|
||||
plugin_noluowen_enable_group: list[int] = []
|
||||
|
||||
config = nonebot.get_plugin_config(NoLuowenConfig)
|
||||
|
||||
|
||||
async def is_luowen_mentioned(evt: GroupMessageEvent, msg: UniMsg) -> bool:
|
||||
if config.plugin_noluowen_qqid <= 0:
|
||||
return False
|
||||
if evt.user_id == config.plugin_noluowen_qqid:
|
||||
return False
|
||||
if evt.group_id not in config.plugin_noluowen_enable_group:
|
||||
return False
|
||||
txt = msg.extract_plain_text()
|
||||
if "洛温" not in txt:
|
||||
return False
|
||||
if "罗文" in txt:
|
||||
return False
|
||||
if "阿特金森" in txt:
|
||||
return False
|
||||
return True
|
||||
|
||||
evt_luowen_mentioned = nonebot.on_message(rule=is_luowen_mentioned)
|
||||
|
||||
|
||||
@evt_luowen_mentioned.handle()
|
||||
async def _(evt: GroupMessageEvent, bot: Bot):
|
||||
msg = (
|
||||
UniMessage()
|
||||
.reply(str(evt.message_id))
|
||||
.at(str(config.plugin_noluowen_qqid))
|
||||
.text(" 好像有人念错了你的 ID")
|
||||
)
|
||||
await evt_luowen_mentioned.send(await msg.export(bot=bot))
|
||||
|
||||
@ -1,10 +1,8 @@
|
||||
import asyncio as asynkio
|
||||
import datetime
|
||||
import functools
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal, cast
|
||||
|
||||
import signal
|
||||
import nonebot
|
||||
import ptimeparse
|
||||
from loguru import logger
|
||||
@ -123,7 +121,9 @@ def create_notify_task(notify: Notify, fail2remove: bool = True):
|
||||
try:
|
||||
await asynkio.sleep((notify.notify_time - begin_time).total_seconds())
|
||||
except asynkio.CancelledError:
|
||||
logger.debug("代办提醒被信号中止,任务退出")
|
||||
logger.debug(
|
||||
f"代办提醒被信号中止,任务退出 NOTIFY={notify.notify_msg} TIME={notify.notify_time}"
|
||||
)
|
||||
return
|
||||
else:
|
||||
logger.warning(
|
||||
@ -254,19 +254,3 @@ async def _():
|
||||
save_notify_config(cfg)
|
||||
DATA_FILE_LOCK.release()
|
||||
|
||||
loop = asynkio.get_running_loop()
|
||||
|
||||
# 解决 asynk task 没有被 cancel 的问题
|
||||
async def shutdown(sig: signal.Signals):
|
||||
logger.info(f"收到 {sig.name} 指令,正在关闭所有的东西")
|
||||
for task in ASYNK_TASKS:
|
||||
task.cancel()
|
||||
await asynkio.gather(*ASYNK_TASKS, return_exceptions=True)
|
||||
logger.info("所有的代办提醒 Task 都已经退出了")
|
||||
|
||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(
|
||||
sig, functools.partial(asynkio.create_task, shutdown(sig))
|
||||
)
|
||||
|
||||
await asynkio.gather(*ASYNK_TASKS)
|
||||
|
||||
Reference in New Issue
Block a user