Compare commits

..

11 Commits

Author SHA1 Message Date
021133954e 调整 man 默认范围
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:33:59 +08:00
7baa04dbc2 添加罗文提示 2025-10-24 01:33:01 +08:00
e55bdbdf4a 怪话不可为空!!!
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:27:40 +08:00
a30c7b8093 添加怪话过滤功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:21:54 +08:00
3da2c2266f 说怪话 bot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-24 01:08:46 +08:00
96e3c3fe17 让 Onebot private channel 也有 ID 2025-10-24 00:46:05 +08:00
851c9eb3c7 修复程序退出耗时太久的问题 2025-10-24 00:01:13 +08:00
11269b2a5a 在罗文被念错时提醒他 2025-10-23 23:32:56 +08:00
875e0efc2f Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-23 22:12:41 +08:00
4f43312663 升级 ptimeparse 2025-10-23 22:12:34 +08:00
b2f4768573 优化判定与计分规则
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 23:52:05 +08:00
14 changed files with 270 additions and 48 deletions

6
bot.py
View File

@ -20,9 +20,13 @@ env_enable_minecraft = os.environ.get("ENABLE_MINECRAFT", "none")
def main(): def main():
if env.upper() == 'DEBUG' or env.upper() == 'DEV':
console_log_level = 'DEBUG'
else:
console_log_level = 'INFO'
init_logger(LOG_PATH, [ init_logger(LOG_PATH, [
BotExceptionMessage, BotExceptionMessage,
]) ], console_log_level=console_log_level)
nonebot.init() nonebot.init()

View File

@ -18,7 +18,7 @@ def file_exception_filter(
否则,返回 True允许记录 否则,返回 True允许记录
""" """
exception_info = record.get("exception") exception_info = record.get("exception")
if exception_info: if exception_info:
exception_type = exception_info[0] exception_type = exception_info[0]
@ -29,8 +29,9 @@ def file_exception_filter(
def init_logger( def init_logger(
log_dir: Path, log_dir: Path,
ignored_exceptions: List[Type[Exception]] ignored_exceptions: List[Type[Exception]],
console_log_level: str = "INFO",
) -> None: ) -> None:
""" """
配置全局 Loguru Logger。 配置全局 Loguru Logger。
@ -47,7 +48,7 @@ def init_logger(
logger.add( logger.add(
sys.stderr, sys.stderr,
level="INFO", level=console_log_level,
colorize=True, colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
) )
@ -76,4 +77,4 @@ def init_logger(
) )
logger.info("Loguru Logger 初始化完成!") logger.info("Loguru Logger 初始化完成!")
logger.info(f"控制台日志级别: INFO") logger.info(f"控制台日志级别: {console_log_level}")

View File

@ -25,6 +25,7 @@ from pydantic import BaseModel, ValidationError
from .path import DATA_PATH from .path import DATA_PATH
LONGTASK_DATA_DIR = DATA_PATH / "longtasks.json" LONGTASK_DATA_DIR = DATA_PATH / "longtasks.json"
QQ_PRIVATE_CHAT_CHANNEL_PREFIX = "_CHANNEL_QQ_PRIVATE_"
class LongTaskTarget(BaseModel): class LongTaskTarget(BaseModel):
@ -65,7 +66,7 @@ class LongTaskTarget(BaseModel):
} BOT_CLASS={bot.__class__.__name__}" } BOT_CLASS={bot.__class__.__name__}"
) )
return False return False
if self.channel_id == "": if self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX):
# 私聊模式 # 私聊模式
await bot.send_private_msg( await bot.send_private_msg(
user_id=int(self.target_id), user_id=int(self.target_id),
@ -196,7 +197,7 @@ async def get_long_task_target(event: BaseEvent, bot: BaseBot) -> LongTaskTarget
return LongTaskTarget( return LongTaskTarget(
platform="qq", platform="qq",
self_id=str(event.self_id), self_id=str(event.self_id),
channel_id="", channel_id=f"{QQ_PRIVATE_CHAT_CHANNEL_PREFIX}{event.self_id}",
target_id=str(event.user_id), target_id=str(event.user_id),
) )
if isinstance(event, ConsoleMessageEvent): if isinstance(event, ConsoleMessageEvent):

View File

@ -0,0 +1,2 @@
关于罗文和洛温:
AdoreLowen 希望和洛温阿特金森区分,所以最好就不要叫他洛温了!此方 BOT 会在一些群提醒叫错了的人。

View File

@ -0,0 +1,9 @@
指令介绍
怪话过滤 - 去除含有关键词的怪话
使用方法
`怪话过滤 说的道理`
去除所有含有“说的道理”的怪话
另见
怪话(1)

View File

@ -0,0 +1,12 @@
指令介绍
说点怪话/说些怪话 - 让 BOT 学群友胡言乱语
适用范围
为保证安全,只有少数授权的群聊可以使用该指令
使用方法
`说点怪话 今天吃什么`
期待 Bot 会回答你什么吧
`说些怪话 明天不想上体育课`
Bot 会回复你三句怪话

View File

@ -0,0 +1,68 @@
import asyncio
from nonebot import get_plugin_config, on_command, on_message
from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
from pydantic import BaseModel
from konabot.common.nb.is_admin import is_admin
from konabot.common.path import DATA_PATH
from .random_text_record import RandomTextManager
class FortuneConfig(BaseModel):
plugin_fortune_collect_groups: list[int] = []
fortune_wtf = RandomTextManager(DATA_PATH / "fortune_wtf.txt")
fortune_insert_lock = asyncio.Lock()
fortune_config = get_plugin_config(FortuneConfig)
async def is_collect_target(evt: GroupMessageEvent) -> bool:
if evt.group_id not in fortune_config.plugin_fortune_collect_groups:
return False
return True
evt_collector = on_message(rule=is_collect_target)
@evt_collector.handle()
async def _(msg: UniMsg):
txt = msg.extract_plain_text()
if len(txt) > 50 or not txt.strip():
return
if txt.startswith("说点怪话") or txt.startswith("说些怪话") or txt.startswith("怪话过滤"):
return
async with fortune_insert_lock:
fortune_wtf.insert(txt)
cmd_guaihua = on_command("说点怪话", rule=is_collect_target)
@cmd_guaihua.handle()
async def _(bot: Bot):
await cmd_guaihua.send(await UniMessage().text(fortune_wtf.choice()).export(bot))
cmd_guaihuas = on_command("说些怪话", rule=is_collect_target)
@cmd_guaihuas.handle()
async def _(bot: Bot):
for _ in range(3):
await cmd_guaihuas.send(await UniMessage().text(fortune_wtf.choice()).export(bot))
await asyncio.sleep(1)
cmd_filter_guaihua = on_alconna(Alconna(
"怪话过滤",
Args["keyword", str],
), rule=is_admin)
@cmd_filter_guaihua.handle()
async def _(keyword: str, bot: Bot):
async with fortune_insert_lock:
c = fortune_wtf.filter_out(keyword)
await cmd_filter_guaihua.send(await UniMessage().text(f"删除了 {c} 条怪话").export(bot))

View File

@ -0,0 +1,70 @@
import base64
import random
import time
from pathlib import Path
class RandomTextManager:
_cache: list[tuple[float, str]]
def __init__(self, fp: Path) -> None:
self.fp = fp
self._cache = []
if not self.fp.exists():
self.fp.touch()
else:
self.load()
def load(self):
self._cache = []
with self.fp.open("r") as f:
for line in f.readlines():
if not line.strip():
continue
if "|" not in line:
continue
ts, cn = line.split("|")
try:
ts = float(ts)
except Exception:
continue
self._cache.append((ts, base64.b64decode(cn).decode("utf-8")))
def save(self):
lines = [
str(ts) + "|" + base64.b64encode(cn.encode("utf-8")).decode()
for ts, cn in self._cache
]
with self.fp.open("w") as f:
f.writelines(lines)
def insert(self, text: str, timestamp: float | None = None):
if timestamp is None:
timestamp = time.time()
with self.fp.open("a") as f:
f.write(str(timestamp) + "|" + base64.b64encode(text.encode("utf-8")).decode() + "\n")
self._cache.append((timestamp, text))
def choice(self, now: float | None = None):
contents: list[str] = []
weights: list[float] = []
if now is None:
now = time.time()
for ts, cn in self._cache:
contents.append(cn)
weights.append((abs(now - ts) + 0.01) ** (-1))
return random.choices(contents, weights)[0]
def filter_out(self, keyword: str):
len1 = len(self._cache)
self._cache = [
(ts, cn) for ts, cn in self._cache
if keyword not in cn
]
self.save()
return len1 - len(self._cache)

View File

@ -68,17 +68,19 @@ class TryStopState(Enum):
class TryVerifyState(Enum): class TryVerifyState(Enum):
VERIFIED = 0 VERIFIED = 0
NOT_IDIOM = 1 VERIFIED_AND_REAL = 1
WRONG_FIRST_CHAR = 2 NOT_IDIOM = 2
VERIFIED_BUT_NO_NEXT = 3 WRONG_FIRST_CHAR = 3
VERIFIED_GAME_END = 4 VERIFIED_BUT_NO_NEXT = 4
VERIFIED_GAME_END = 5
class IdiomGame: class IdiomGame:
ALL_WORDS = [] # 所有四字词语 ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语 ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例 INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 成语首字字典 IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典
AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典
__inited = False __inited = False
@ -181,7 +183,7 @@ class IdiomGame:
""" """
判断是否有成语可以接 判断是否有成语可以接
""" """
return last_char in IdiomGame.IDIOM_FIRST_CHAR return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR
def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState: def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
# 新成语的首字应与上一条成语的尾字相同 # 新成语的首字应与上一条成语的尾字相同
@ -193,6 +195,8 @@ class IdiomGame:
self.last_idiom = idiom self.last_idiom = idiom
self.last_char = idiom[-1] self.last_char = idiom[-1]
self.add_score(user_id, 1) self.add_score(user_id, 1)
if idiom in IdiomGame.ALL_IDIOMS:
self.add_score(user_id, 4) # 再加 4 分
self.remain_rounds -= 1 self.remain_rounds -= 1
if self.remain_rounds <= 0: if self.remain_rounds <= 0:
self.now_playing = False self.now_playing = False
@ -201,6 +205,8 @@ class IdiomGame:
# 没有成语可以接了,自动跳过 # 没有成语可以接了,自动跳过
self._skip_idiom_async() self._skip_idiom_async()
return TryVerifyState.VERIFIED_BUT_NO_NEXT return TryVerifyState.VERIFIED_BUT_NO_NEXT
if idiom in IdiomGame.ALL_IDIOMS:
return TryVerifyState.VERIFIED_AND_REAL # 真实成语
return TryVerifyState.VERIFIED return TryVerifyState.VERIFIED
def get_user_score(self, user_id: str) -> float: def get_user_score(self, user_id: str) -> float:
@ -223,6 +229,13 @@ class IdiomGame:
def get_last_char(self) -> str: def get_last_char(self) -> str:
return self.last_char return self.last_char
@classmethod
def random_idiom_starting_with(cls, first_char: str) -> Optional[str]:
cls.init_lexicon()
if first_char not in cls.IDIOM_FIRST_CHAR:
return None
return secrets.choice(cls.IDIOM_FIRST_CHAR[first_char])
@classmethod @classmethod
def init_lexicon(cls): def init_lexicon(cls):
@ -288,6 +301,12 @@ class IdiomGame:
cls.IDIOM_FIRST_CHAR[idiom[0]] = [] cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom) cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
# 根据真正的成语大表,划分出有效成语首字字典
for idiom in cls.ALL_IDIOMS:
if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = []
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna( evt = on_alconna(
Alconna( Alconna(
@ -421,7 +440,8 @@ async def _(target: DepLongTaskTarget):
instance = IdiomGame.INSTANCE_LIST.get(group_id) instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state(): if not instance or not instance.get_playing_state():
return return
await evt.send(await UniMessage().text("你们太菜了全部扣100分").export()) avaliable_idiom = IdiomGame.random_idiom_starting_with(instance.get_last_char())
await evt.send(await UniMessage().text(f"你们太菜了全部扣100分明明还可以接「{avaliable_idiom}」的!").export())
idiom = await instance.skip_idiom(-100) idiom = await instance.skip_idiom(-100)
await evt.send( await evt.send(
await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export() await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export()
@ -458,16 +478,24 @@ async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
await evt.send( await evt.send(
await UniMessage() await UniMessage()
.at(user_id) .at(user_id)
.text("接不上!这个不一样!你被扣了 0.1 分!") .text(" 接不上!这个不一样!你被扣了 0.1 分!")
.export() .export()
) )
return return
await evt.send( if state == TryVerifyState.VERIFIED:
await UniMessage() await evt.send(
.at(user_id) await UniMessage()
.text(f"接对了!你有 {instance.get_user_score(user_id)} 分!") .at(user_id)
.export() .text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!")
) .export()
)
elif state == TryVerifyState.VERIFIED_AND_REAL:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
if state == TryVerifyState.VERIFIED_GAME_END: if state == TryVerifyState.VERIFIED_GAME_END:
await evt.send(await UniMessage().text("全部回合结束!").export()) await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id) await end_game(event, group_id)

View File

@ -53,7 +53,7 @@ async def _(
if doc is None: if doc is None:
# 检索模式 # 检索模式
if section is None: if section is None:
section_set = {1} section_set = {1, 7}
else: else:
section_set = {section} section_set = {section}
if 1 in section_set and is_admin(event): if 1 in section_set and is_admin(event):

View File

@ -0,0 +1,44 @@
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from nonebot_plugin_alconna import UniMsg, UniMessage
from pydantic import BaseModel
class NoLuowenConfig(BaseModel):
plugin_noluowen_qqid: int = -1
plugin_noluowen_enable_group: list[int] = []
config = nonebot.get_plugin_config(NoLuowenConfig)
async def is_luowen_mentioned(evt: GroupMessageEvent, msg: UniMsg) -> bool:
if config.plugin_noluowen_qqid <= 0:
return False
if evt.user_id == config.plugin_noluowen_qqid:
return False
if evt.group_id not in config.plugin_noluowen_enable_group:
return False
txt = msg.extract_plain_text()
if "洛温" not in txt:
return False
if "罗文" in txt:
return False
if "阿特金森" in txt:
return False
return True
evt_luowen_mentioned = nonebot.on_message(rule=is_luowen_mentioned)
@evt_luowen_mentioned.handle()
async def _(evt: GroupMessageEvent, bot: Bot):
msg = (
UniMessage()
.reply(str(evt.message_id))
.at(str(config.plugin_noluowen_qqid))
.text(" 好像有人念错了你的 ID")
)
await evt_luowen_mentioned.send(await msg.export(bot=bot))

View File

@ -1,10 +1,8 @@
import asyncio as asynkio import asyncio as asynkio
import datetime import datetime
import functools
from pathlib import Path from pathlib import Path
from typing import Any, Literal, cast from typing import Any, Literal, cast
import signal
import nonebot import nonebot
import ptimeparse import ptimeparse
from loguru import logger from loguru import logger
@ -123,7 +121,9 @@ def create_notify_task(notify: Notify, fail2remove: bool = True):
try: try:
await asynkio.sleep((notify.notify_time - begin_time).total_seconds()) await asynkio.sleep((notify.notify_time - begin_time).total_seconds())
except asynkio.CancelledError: except asynkio.CancelledError:
logger.debug("代办提醒被信号中止,任务退出") logger.debug(
f"代办提醒被信号中止,任务退出 NOTIFY={notify.notify_msg} TIME={notify.notify_time}"
)
return return
else: else:
logger.warning( logger.warning(
@ -254,19 +254,3 @@ async def _():
save_notify_config(cfg) save_notify_config(cfg)
DATA_FILE_LOCK.release() DATA_FILE_LOCK.release()
loop = asynkio.get_running_loop()
# 解决 asynk task 没有被 cancel 的问题
async def shutdown(sig: signal.Signals):
logger.info(f"收到 {sig.name} 指令,正在关闭所有的东西")
for task in ASYNK_TASKS:
task.cancel()
await asynkio.gather(*ASYNK_TASKS, return_exceptions=True)
logger.info("所有的代办提醒 Task 都已经退出了")
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig, functools.partial(asynkio.create_task, shutdown(sig))
)
await asynkio.gather(*ASYNK_TASKS)

8
poetry.lock generated
View File

@ -2402,14 +2402,14 @@ reference = "mirrors"
[[package]] [[package]]
name = "ptimeparse" name = "ptimeparse"
version = "0.2.0" version = "0.2.1"
description = "一个用于解析中文的时间表达的库" description = "一个用于解析中文的时间表达的库"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "ptimeparse-0.2.0-py3-none-any.whl", hash = "sha256:57055f8fd99fb69e19deac3b8a5c7ac91af86c7ac09781632e9abf318df0d6d2"}, {file = "ptimeparse-0.2.1-py3-none-any.whl", hash = "sha256:cf1115784d5d983da2d5b7af327108bf04c218c795d63291e71f76d7c6ffd2d4"},
{file = "ptimeparse-0.2.0.tar.gz", hash = "sha256:867c265f2e157fe4d793d20fe9c449b8ede5c855f336d7e6b2eb78551e622766"}, {file = "ptimeparse-0.2.1.tar.gz", hash = "sha256:9b640e0a315d19b1e3821a290d236a051d8320348970ce3a835ed675bd2d832f"},
] ]
[package.source] [package.source]
@ -3807,4 +3807,4 @@ reference = "mirrors"
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.12,<4.0" python-versions = ">=3.12,<4.0"
content-hash = "02530953efe65da1a788845cd43f8856be62db5bfb59de691cad813f57bab25e" content-hash = "78a299c64ba07999fae807300b10a1c622d45b8b387aded5a34d17cf5550e777"

View File

@ -20,10 +20,10 @@ dependencies = [
"imagetext-py (>=2.2.0,<3.0.0)", "imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)", "opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)", "returns (>=0.26.0,<0.27.0)",
"ptimeparse (>=0.1.1,<1.0.0)",
"skia-python (>=138.0,<139.0)", "skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)", "nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)", "qrcode (>=8.2,<9.0)",
"ptimeparse (>=0.2.1,<0.3.0)",
] ]
[build-system] [build-system]
@ -41,4 +41,3 @@ url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
priority = "primary" priority = "primary"
[tool.poetry.dependencies] [tool.poetry.dependencies]
ptimeparse = { source = "pt-gitea-pypi" }