import asyncio as asynkio import datetime from io import BytesIO import json import secrets from enum import Enum from pathlib import Path from typing import Optional from PIL import Image from loguru import logger from nonebot import on_message import nonebot from nonebot.adapters import Event as BaseEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot_plugin_alconna import ( Alconna, Args, UniMessage, UniMsg, on_alconna, ) from konabot.common.database import DatabaseManager from konabot.common.longtask import DepLongTaskTarget from konabot.common.path import ASSETS_PATH from konabot.common.llm import get_llm ROOT_PATH = Path(__file__).resolve().parent DATA_DIR = Path(__file__).parent.parent.parent.parent / "data" DATA_FILE_PATH = ( DATA_DIR / "idiom_banned.json" ) # 创建全局数据库管理器实例 db_manager = DatabaseManager() def load_banned_ids() -> list[str]: if not DATA_FILE_PATH.exists(): return [] try: return json.loads(DATA_FILE_PATH.read_text("utf-8")) except Exception as e: logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}") return [] def is_idiom_game_banned(group_id: str) -> bool: banned_ids = load_banned_ids() return group_id in banned_ids def add_banned_id(group_id: str): banned_ids = load_banned_ids() if group_id not in banned_ids: banned_ids.append(group_id) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8") def remove_banned_id(group_id: str): banned_ids = load_banned_ids() if group_id in banned_ids: banned_ids.remove(group_id) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8") driver = nonebot.get_driver() @driver.on_startup async def register_startup_hook(): """注册启动时需要执行的函数""" await IdiomGame.init_lexicon() @driver.on_shutdown async def register_shutdown_hook(): """注册关闭时需要执行的函数""" # 关闭所有数据库连接 await db_manager.close_all_connections() class TryStartState(Enum): STARTED = 0 ALREADY_PLAYING = 1 NO_REMAINING_TIMES = 2 class TryStopState(Enum): STOPPED = 0 NOT_PLAYING = 1 class TryVerifyState(Enum): VERIFIED = 0 VERIFIED_AND_REAL = 1 ALREADY_USED = 2 NOT_IDIOM = 3 WRONG_FIRST_CHAR = 4 BUT_NO_NEXT = 5 GAME_END = 6 class IdiomGameLLM: @classmethod async def verify_idiom_with_llm(cls, idiom: str) -> bool: if len(idiom) != 4: return False llm = get_llm() system_prompt = "请判断用户的输入是否为一个合理的成语,或者这四个字在中文环境下是否说得通。如果是请回答「T」,否则回答「F」。请注意,即使这个词不是成语,如果说得通(也就是能念起来很通顺),你也该输出「T」。请不要包含任何解释,也不要包含任何标点符号。" message = await llm.chat([{"role": "system", "content": system_prompt}, {"role": "user", "content": idiom}]) answer = message.content logger.info(f"LLM 对成语 {idiom} 的判断结果是 {answer}") if answer == "T": await cls.storage_idiom(idiom) return answer == "T" @classmethod async def storage_idiom(cls, idiom: str): # 将 idiom 存入数据库 # await db_manager.execute_by_sql_file( # ROOT_PATH / "sql" / "insert_custom_word.sql", # (idiom,) # ) # 将 idiom 存入本地文件以备后续分析 with open(DATA_DIR / "idiom_llm_storage.txt", "a", encoding="utf-8") as f: f.write(idiom + "\n") IdiomGame.append_into_word_list(idiom) class IdiomGame: ALL_WORDS = [] # 所有四字词语 ALL_IDIOMS = [] # 所有成语 INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例 IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典 AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典 __inited = False def __init__(self, group_id: str): # 初始化一局游戏 self.group_id = "" self.now_playing = False self.score_board = {} self.last_idiom = "" self.last_char = "" self.remain_playing_times = 3 self.last_play_date = "" self.all_buff_score = 0 self.lock = asynkio.Lock() self.remain_rounds = 0 # 剩余回合数 self.already_idioms: dict[str, int] = {} # 已经使用过的成语和使用过的次数 self.idiom_history: list[list[str]] = [] # 成语使用历史记录,多个数组以存储不同成语链 IdiomGame.INSTANCE_LIST[group_id] = self @classmethod async def append_into_word_list(cls, word: str): ''' 将一个新词加入到词语列表中 ''' if word not in cls.ALL_WORDS: cls.ALL_WORDS.append(word) if word[0] not in cls.IDIOM_FIRST_CHAR: cls.IDIOM_FIRST_CHAR[word[0]] = [] cls.IDIOM_FIRST_CHAR[word[0]].append(word) # await db_manager.execute_by_sql_file( # ROOT_PATH / "sql" / "insert_custom_word.sql", # (word,) # ) def be_able_to_play(self) -> bool: if self.last_play_date != datetime.date.today(): self.last_play_date = datetime.date.today() self.remain_playing_times = 3 if self.remain_playing_times > 0: self.remain_playing_times -= 1 return True return False @staticmethod async def random_idiom() -> str: # result = await db_manager.query_by_sql_file( # ROOT_PATH / "sql" / "random_choose_idiom.sql" # ) # return result[0]["idiom"] return secrets.choice(IdiomGame.ALL_IDIOMS) async def choose_start_idiom(self) -> str: """ 随机选择一个成语作为起始成语 """ self.last_idiom = await IdiomGame.random_idiom() self.last_char = self.last_idiom[-1] if not await self.is_nextable(self.last_char): await self.choose_start_idiom() else: self.add_history_idiom(self.last_idiom, new_chain=True) return self.last_idiom @classmethod async def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState: await cls.init_lexicon() if not cls.INSTANCE_LIST.get(group_id): cls(group_id) instance = cls.INSTANCE_LIST[group_id] if instance.now_playing: return TryStartState.ALREADY_PLAYING if not instance.be_able_to_play() and not force: return TryStartState.NO_REMAINING_TIMES instance.now_playing = True return TryStartState.STARTED async def start_game(self, rounds: int = 100): self.now_playing = True self.remain_rounds = rounds await self.choose_start_idiom() @classmethod def try_stop_game(cls, group_id: str) -> TryStopState: if not cls.INSTANCE_LIST.get(group_id): return TryStopState.NOT_PLAYING instance = cls.INSTANCE_LIST[group_id] if not instance.now_playing: return TryStopState.NOT_PLAYING instance.now_playing = False return TryStopState.STOPPED def clear_score_board(self): self.score_board = {} self.last_char = "" self.all_buff_score = 0 self.already_idioms = {} self.idiom_history = [] def get_score_board(self) -> dict: return self.score_board def get_all_buff_score(self) -> int: return self.all_buff_score async def skip_idiom(self, buff_score: int = -100) -> str: """ 跳过当前成语,选择下一个成语 """ async with self.lock: await self._skip_idiom_async() self.add_buff_score(buff_score) return self.last_idiom async def _skip_idiom_async(self) -> str: self.last_idiom = await IdiomGame.random_idiom() self.last_char = self.last_idiom[-1] if not await self.is_nextable(self.last_char): await self._skip_idiom_async() else: self.add_history_idiom(self.last_idiom, new_chain=True) return self.last_idiom async def try_verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]: """ 用户发送成语 """ async with self.lock: state = await self._verify_idiom(idiom, user_id) return state async def is_nextable(self, last_char: str) -> bool: """ 判断是否有成语可以接 """ # result = await db_manager.query_by_sql_file( # ROOT_PATH / "sql" / "is_nextable.sql", # (last_char,) # ) # return result[0]["DEED"] == 1 return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR def add_already_idiom(self, idiom: str): if idiom in self.already_idioms: self.already_idioms[idiom] += 1 else: self.already_idioms[idiom] = 1 def get_already_used_num(self, idiom: str) -> int: if idiom in self.already_idioms: return self.already_idioms[idiom] return 0 def add_history_idiom(self, idiom: str, new_chain: bool = False): if new_chain or len(self.idiom_history) == 0: self.idiom_history.append([idiom]) else: self.idiom_history[-1].append(idiom) def display_history(self) -> list[str]: result = [] for chain in self.idiom_history: result.append(" -> ".join(chain)) return result async def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]: state = [] # 新成语的首字应与上一条成语的尾字相同 if idiom[0] != self.last_char: state.append(TryVerifyState.WRONG_FIRST_CHAR) return state # 成语是否存在 # result = await db_manager.query_by_sql_file( # ROOT_PATH / "sql" / "query_idiom.sql", # (idiom, idiom, idiom) # ) # status_result = result[0]["status"] # if status_result == -1: if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS: logger.info(f"用户 {user_id} 发送了未知词语 {idiom},正在使用 LLM 进行验证") try: if not await IdiomGameLLM.verify_idiom_with_llm(idiom): self.add_score(user_id, -0.1) state.append(TryVerifyState.NOT_IDIOM) return state except Exception as e: logger.error(f"LLM 验证成语 {idiom} 时出现错误:{e}") self.add_score(user_id, -0.1) state.append(TryVerifyState.NOT_IDIOM) return state # 成语合法,更新状态 self.add_history_idiom(idiom) score_k = 0.5 ** self.get_already_used_num(idiom) # 每被使用过一次,得分减半 if(score_k != 1): state.append(TryVerifyState.ALREADY_USED) self.add_already_idiom(idiom) state.append(TryVerifyState.VERIFIED) self.last_idiom = idiom self.last_char = idiom[-1] self.add_score(user_id, 1 * score_k) # 先加 1 分 # if status_result == 1: if idiom in IdiomGame.ALL_IDIOMS: state.append(TryVerifyState.VERIFIED_AND_REAL) self.add_score(user_id, 4 * score_k) # 再加 4 分 self.remain_rounds -= 1 if self.remain_rounds <= 0: self.now_playing = False state.append(TryVerifyState.GAME_END) if not await self.is_nextable(self.last_char): # 没有成语可以接了,自动跳过 await self._skip_idiom_async() self.add_buff_score(-100) state.append(TryVerifyState.BUT_NO_NEXT) return state def get_user_score(self, user_id: str) -> float: if user_id not in self.score_board: return 0 # 避免浮点数精度问题导致过长 handled_score = round(self.score_board[user_id]["score"] + self.all_buff_score, 1) return handled_score def add_score(self, user_id: str, score: int): if user_id not in self.score_board: self.score_board[user_id] = {"name": user_id, "score": 0} self.score_board[user_id]["score"] += score def add_buff_score(self, score: int): self.all_buff_score += score def get_playing_state(self) -> bool: return self.now_playing def get_last_char(self) -> str: return self.last_char @classmethod async def random_idiom_starting_with(cls, first_char: str) -> Optional[str]: # await cls.init_lexicon() # result = await db_manager.query_by_sql_file( # ROOT_PATH / "sql" / "query_idiom_start_with.sql", # (first_char,) # ) # if len(result) == 0: # return None # return result[0]["idiom"] await cls.init_lexicon() if first_char not in cls.AVALIABLE_IDIOM_FIRST_CHAR: return None return secrets.choice(cls.AVALIABLE_IDIOM_FIRST_CHAR[first_char]) @classmethod async def init_lexicon(cls): if cls.__inited: return # await db_manager.execute_by_sql_file( # ROOT_PATH / "sql" / "create_table.sql" # ) # 确保数据库初始化 cls.__inited = True # 成语大表 with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f: ALL_IDIOMS_INFOS = json.load(f) # 词语大表 ALL_WORDS = [] with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f: jsonData = json.load(f) ALL_WORDS = [item["ci"] for item in jsonData] logger.debug(f"Loaded {len(ALL_WORDS)} words from ci.json") logger.debug(f"Sample words: {ALL_WORDS[:5]}") COMMON_WORDS = [] # 读取 COMMON 词语大表 with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f: for line in f: word = line.strip() if len(word) == 4: COMMON_WORDS.append(word) logger.debug(f"Loaded {len(COMMON_WORDS)} common words from common.txt") logger.debug(f"Sample common words: {COMMON_WORDS[:5]}") # 读取 THUOCL 成语库 with open( ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8", ) as f: THUOCL_IDIOMS = [line.split(" ")[0].split("\t")[0].strip() for line in f] logger.debug(f"Loaded {len(THUOCL_IDIOMS)} idioms from THUOCL_chengyu.txt") logger.debug(f"Sample idioms: {THUOCL_IDIOMS[:5]}") # 读取 THUOCL 剩下的所有 txt 文件,只保留四字词 THUOCL_WORDS = [] import os for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"): if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt": with open( ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8", ) as f: for line in f: word = line.lstrip().split(" ")[0].split("\t")[0].strip() if len(word) == 4: THUOCL_WORDS.append(word) logger.debug(f"Loaded {len(THUOCL_WORDS)} words from THUOCL txt files") logger.debug(f"Sample words: {THUOCL_WORDS[:5]}") # 读取本地的 idiom_llm_storage.txt 文件,补充词语表 LOCAL_LLM_WORDS = [] if (DATA_DIR / "idiom_llm_storage.txt").exists(): with open(DATA_DIR / "idiom_llm_storage.txt", "r", encoding="utf-8") as f: for line in f: word = line.strip() if len(word) == 4: LOCAL_LLM_WORDS.append(word) logger.debug(f"Loaded additional {len(LOCAL_LLM_WORDS)} words from idiom_llm_storage.txt") # 只有成语的大表 ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重 # 批量插入数据库 # await db_manager.execute_many_values_by_sql_file( # ROOT_PATH / "sql" / "insert_idiom.sql", # [(idiom,) for idiom in ALL_IDIOMS] # ) # 其他四字词语表,仅表示可以有这个词 ALL_WORDS = ( [word for word in ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS ) cls.ALL_WORDS = ALL_WORDS + LOCAL_LLM_WORDS cls.ALL_IDIOMS = ALL_IDIOMS # 插入数据库 # await db_manager.execute_many_values_by_sql_file( # ROOT_PATH / "sql" / "insert_word.sql", # [(word,) for word in ALL_WORDS] # ) # 自定义词语 LOCAL_LLM_WORDS 插入数据库,兼容用 # await db_manager.execute_many_values_by_sql_file( # ROOT_PATH / "sql" / "insert_custom_word.sql", # [(word,) for word in LOCAL_LLM_WORDS] # ) # 根据成语大表,划分出成语首字字典 for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS: if idiom[0] not in cls.IDIOM_FIRST_CHAR: cls.IDIOM_FIRST_CHAR[idiom[0]] = [] cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom) # 根据真正的成语大表,划分出有效成语首字字典 for idiom in cls.ALL_IDIOMS: if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR: cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = [] cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom) evt = on_alconna( Alconna( "我要玩成语接龙", Args["rounds?", int], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def play_game( event: BaseEvent, target: DepLongTaskTarget, force=False, rounds: Optional[int] = 100, ): # group_id = str(event.get_session_id()) group_id = target.channel_id if is_idiom_game_banned(group_id): await evt.send( await UniMessage().text("本群已被禁止使用成语接龙功能!").export() ) return rounds = rounds or 0 if rounds <= 0: await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export()) return state = await IdiomGame.try_start_game(group_id, force) if state == TryStartState.ALREADY_PLAYING: await evt.send( await UniMessage() .text("当前已有成语接龙游戏在进行中,请稍后再试!") .export() ) return if state == TryStartState.NO_REMAINING_TIMES: await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export()) return await evt.send( await UniMessage() .text( "你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!" ) .export() ) instance = IdiomGame.INSTANCE_LIST[group_id] await instance.start_game(rounds) # 发布成语 await evt.send( await UniMessage() .text(f"第一个成语:「{instance.last_idiom}」,请接!") .export() ) evt = on_alconna( Alconna( "老子就是要玩成语接龙!!!", Args["rounds?", int], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def force_play_game( event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100 ): await play_game(event, target, force=True, rounds=rounds) async def end_game(event: BaseEvent, group_id: str): instance = IdiomGame.INSTANCE_LIST[group_id] result_text = UniMessage().text("游戏结束!\n最终得分榜:\n") score_board = instance.get_score_board() if len(score_board) == 0: result_text += "无人得分!\n" else: # 按分数排序,名字用 at 的方式 sorted_score = sorted( score_board.items(), key=lambda x: x[1]["score"], reverse=True ) for i, (user_id, info) in enumerate(sorted_score): result_text += ( f"{i + 1}. " + UniMessage().at(user_id) + f": {round(info['score'] + instance.get_all_buff_score(), 1)} 分\n" ) if len(instance.idiom_history) == 0: result_text += "\n本局没有任何接龙记录。" else: result_text += "\n你们的接龙记录是:\n" history_lines = instance.display_history() for line in history_lines: result_text += line + "\n" await evt.send(await result_text.export()) # instance.clear_score_board() # 将实例删除 del IdiomGame.INSTANCE_LIST[group_id] evt = on_alconna( Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True ) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id state = IdiomGame.try_stop_game(group_id) if state == TryStopState.STOPPED: # 发送好吧狗图片 # 打开好吧狗本地文件 with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f: img_data = f.read() # 把好吧狗变成 GIF 格式以缩小尺寸 img_data = await convert_image_to_gif(img_data) await evt.send(await UniMessage().image(raw=img_data).export()) await end_game(event, group_id) else: # await evt.send( # await UniMessage().text("当前没有成语接龙游戏在进行中!").export() # ) return async def convert_image_to_gif(image_data: bytes) -> bytes: with Image.open(BytesIO(image_data)) as img: with BytesIO() as output: img.save(output, format="GIF") return output.getvalue() # 跳过 evt = on_alconna( Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True ) @evt.handle() async def _(target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id instance = IdiomGame.INSTANCE_LIST.get(group_id) if not instance or not instance.get_playing_state(): return avaliable_idiom = await IdiomGame.random_idiom_starting_with(instance.get_last_char()) # 发送哈哈狗图片 with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f: img_data = f.read() # 把哈哈狗变成 GIF 格式以缩小尺寸 img_data = await convert_image_to_gif(img_data) await evt.send(await UniMessage().image(raw=img_data).export()) await evt.send(await UniMessage().text(f"你们太菜了,全部扣100分!明明还可以接「{avaliable_idiom}」的!").export()) idiom = await instance.skip_idiom(-100) await evt.send( await UniMessage().text(f"重新开始,下一个成语是「{idiom}」").export() ) def get_user_info(event: BaseEvent): if isinstance(event, DiscordMessageEvent): user_id = str(event.author.id) user_name = str(event.author.name) else: user_id = str(event.get_user_id()) user_name = str(event.get_user_id()) return user_id, user_name # 直接读取消息 evt = on_message() @evt.handle() async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id instance = IdiomGame.INSTANCE_LIST.get(group_id) if not instance or not instance.get_playing_state(): return user_idiom = msg.extract_plain_text().strip() user_id, user_name = get_user_info(event) state = await instance.try_verify_idiom(user_idiom, user_id) if TryVerifyState.WRONG_FIRST_CHAR in state: return if TryVerifyState.NOT_IDIOM in state: await evt.send( await UniMessage() .at(user_id) .text(" 接不上!这个不一样!你被扣了 0.1 分!") .export() ) return already_used_num = instance.get_already_used_num(user_idiom) if TryVerifyState.VERIFIED_AND_REAL in state: score = 5 * (0.5 ** (already_used_num - 1)) if already_used_num > 1: await evt.send( await UniMessage() .at(user_id) .text(f" 接上了,这是个被重复用过的成语,喜提 {score} 分!你有 {instance.get_user_score(user_id)} 分!") .export() ) else: await evt.send( await UniMessage() .at(user_id) .text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!") .export() ) elif TryVerifyState.VERIFIED in state: score = 1 * (0.5 ** (already_used_num - 1)) if already_used_num > 1: await evt.send( await UniMessage() .at(user_id) .text(f" 接上了,但重复了,喜提 {score} 分!你有 {instance.get_user_score(user_id)} 分!") .export() ) else: await evt.send( await UniMessage() .at(user_id) .text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!") .export() ) if TryVerifyState.GAME_END in state: await evt.send(await UniMessage().text("全部回合结束!").export()) await end_game(event, group_id) return if TryVerifyState.BUT_NO_NEXT in state: await evt.send( await UniMessage() .text("但是,这是条死路!你们全部都要扣 100 分!") .export() ) await evt.send( await UniMessage().text(f"重新抽取成语「{instance.last_idiom}」").export() ) await evt.send( await UniMessage() .text(f"下一个成语请以「{instance.get_last_char()}」开头!") .export() ) evt = on_alconna( Alconna("禁止成语接龙"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id add_banned_id(group_id) await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export()) evt = on_alconna( Alconna("开启成语接龙"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id remove_banned_id(group_id) await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())