import asyncio as asynkio import datetime import json import secrets from enum import Enum from pathlib import Path from typing import Optional from loguru import logger from nonebot import on_message from nonebot.adapters import Event as BaseEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot_plugin_alconna import ( Alconna, Args, UniMessage, UniMsg, on_alconna, ) from konabot.common.longtask import DepLongTaskTarget from konabot.common.path import ASSETS_PATH DATA_FILE_PATH = ( Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json" ) def load_banned_ids() -> list[str]: if not DATA_FILE_PATH.exists(): return [] try: return json.loads(DATA_FILE_PATH.read_text("utf-8")) except Exception as e: logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}") return [] def is_idiom_game_banned(group_id: str) -> bool: banned_ids = load_banned_ids() return group_id in banned_ids def add_banned_id(group_id: str): banned_ids = load_banned_ids() if group_id not in banned_ids: banned_ids.append(group_id) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8") def remove_banned_id(group_id: str): banned_ids = load_banned_ids() if group_id in banned_ids: banned_ids.remove(group_id) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8") class TryStartState(Enum): STARTED = 0 ALREADY_PLAYING = 1 NO_REMAINING_TIMES = 2 class TryStopState(Enum): STOPPED = 0 NOT_PLAYING = 1 class TryVerifyState(Enum): VERIFIED = 0 VERIFIED_AND_REAL = 1 ALREADY_USED = 2 NOT_IDIOM = 3 WRONG_FIRST_CHAR = 4 BUT_NO_NEXT = 5 GAME_END = 6 class IdiomGame: ALL_WORDS = [] # 所有四字词语 ALL_IDIOMS = [] # 所有成语 INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例 IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典 AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典 __inited = False def __init__(self, group_id: str): # 初始化一局游戏 self.group_id = "" self.now_playing = False self.score_board = {} self.last_idiom = "" self.last_char = "" self.remain_playing_times = 3 self.last_play_date = "" self.all_buff_score = 0 self.lock = asynkio.Lock() self.remain_rounds = 0 # 剩余回合数 self.already_idioms: dict[str, int] = {} # 已经使用过的成语和使用过的次数 self.idiom_history: list[list[str]] = [] # 成语使用历史记录,多个数组以存储不同成语链 IdiomGame.INSTANCE_LIST[group_id] = self def be_able_to_play(self) -> bool: if self.last_play_date != datetime.date.today(): self.last_play_date = datetime.date.today() self.remain_playing_times = 3 if self.remain_playing_times > 0: self.remain_playing_times -= 1 return True return False def choose_start_idiom(self) -> str: """ 随机选择一个成语作为起始成语 """ self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) self.last_char = self.last_idiom[-1] if not self.is_nextable(self.last_char): self.choose_start_idiom() else: self.add_history_idiom(self.last_idiom, new_chain=True) return self.last_idiom @classmethod def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState: cls.init_lexicon() if not cls.INSTANCE_LIST.get(group_id): cls(group_id) instance = cls.INSTANCE_LIST[group_id] if instance.now_playing: return TryStartState.ALREADY_PLAYING if not instance.be_able_to_play() and not force: return TryStartState.NO_REMAINING_TIMES instance.now_playing = True return TryStartState.STARTED def start_game(self, rounds: int = 100): self.now_playing = True self.remain_rounds = rounds self.choose_start_idiom() @classmethod def try_stop_game(cls, group_id: str) -> TryStopState: if not cls.INSTANCE_LIST.get(group_id): return TryStopState.NOT_PLAYING instance = cls.INSTANCE_LIST[group_id] if not instance.now_playing: return TryStopState.NOT_PLAYING instance.now_playing = False return TryStopState.STOPPED def clear_score_board(self): self.score_board = {} self.last_char = "" self.all_buff_score = 0 self.already_idioms = {} self.idiom_history = [] def get_score_board(self) -> dict: return self.score_board def get_all_buff_score(self) -> int: return self.all_buff_score async def skip_idiom(self, buff_score: int = -100) -> str: """ 跳过当前成语,选择下一个成语 """ async with self.lock: self._skip_idiom_async() self.add_buff_score(buff_score) return self.last_idiom def _skip_idiom_async(self) -> str: self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) self.last_char = self.last_idiom[-1] if not self.is_nextable(self.last_char): self._skip_idiom_async() else: self.add_history_idiom(self.last_idiom, new_chain=True) return self.last_idiom async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState: """ 用户发送成语 """ async with self.lock: state = self._verify_idiom(idiom, user_id) return state def is_nextable(self, last_char: str) -> bool: """ 判断是否有成语可以接 """ return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR def add_already_idiom(self, idiom: str): if idiom in self.already_idioms: self.already_idioms[idiom] += 1 else: self.already_idioms[idiom] = 1 def get_already_used_num(self, idiom: str) -> int: if idiom in self.already_idioms: return self.already_idioms[idiom] return 0 def add_history_idiom(self, idiom: str, new_chain: bool = False): if new_chain or len(self.idiom_history) == 0: self.idiom_history.append([idiom]) else: self.idiom_history[-1].append(idiom) def display_history(self) -> list[str]: result = [] for chain in self.idiom_history: result.append(" -> ".join(chain)) return result def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]: state = [] # 新成语的首字应与上一条成语的尾字相同 if idiom[0] != self.last_char: state.append(TryVerifyState.WRONG_FIRST_CHAR) return state if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS: self.add_score(user_id, -0.1) state.append(TryVerifyState.NOT_IDIOM) return state # 成语合法,更新状态 self.add_history_idiom(idiom) score_k = 0.5 ** self.get_already_used_num(idiom) # 每被使用过一次,得分减半 if(score_k != 1): state.append(TryVerifyState.ALREADY_USED) self.add_already_idiom(idiom) state.append(TryVerifyState.VERIFIED) self.last_idiom = idiom self.last_char = idiom[-1] self.add_score(user_id, 1 * score_k) # 先加 1 分 if idiom in IdiomGame.ALL_IDIOMS: state.append(TryVerifyState.VERIFIED_AND_REAL) self.add_score(user_id, 4 * score_k) # 再加 4 分 self.remain_rounds -= 1 if self.remain_rounds <= 0: self.now_playing = False state.append(TryVerifyState.GAME_END) if not self.is_nextable(self.last_char): # 没有成语可以接了,自动跳过 self._skip_idiom_async() self.add_buff_score(-100) state.append(TryVerifyState.BUT_NO_NEXT) return state def get_user_score(self, user_id: str) -> float: if user_id not in self.score_board: return 0 # 避免浮点数精度问题导致过长 handled_score = round(self.score_board[user_id]["score"] + self.all_buff_score, 1) return handled_score def add_score(self, user_id: str, score: int): if user_id not in self.score_board: self.score_board[user_id] = {"name": user_id, "score": 0} self.score_board[user_id]["score"] += score def add_buff_score(self, score: int): self.all_buff_score += score def get_playing_state(self) -> bool: return self.now_playing def get_last_char(self) -> str: return self.last_char @classmethod def random_idiom_starting_with(cls, first_char: str) -> Optional[str]: cls.init_lexicon() if first_char not in cls.AVALIABLE_IDIOM_FIRST_CHAR: return None return secrets.choice(cls.AVALIABLE_IDIOM_FIRST_CHAR[first_char]) @classmethod def init_lexicon(cls): if cls.__inited: return cls.__inited = True # 成语大表 with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f: ALL_IDIOMS_INFOS = json.load(f) # 词语大表 with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f: jsonData = json.load(f) cls.ALL_WORDS = [item["ci"] for item in jsonData] logger.debug(f"Loaded {len(cls.ALL_WORDS)} words from ci.json") logger.debug(f"Sample words: {cls.ALL_WORDS[:5]}") COMMON_WORDS = [] # 读取 COMMON 词语大表 with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f: for line in f: word = line.strip() if len(word) == 4: COMMON_WORDS.append(word) logger.debug(f"Loaded {len(COMMON_WORDS)} common words from common.txt") logger.debug(f"Sample common words: {COMMON_WORDS[:5]}") # 读取 THUOCL 成语库 with open( ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8", ) as f: THUOCL_IDIOMS = [line.split(" ")[0].split("\t")[0].strip() for line in f] logger.debug(f"Loaded {len(THUOCL_IDIOMS)} idioms from THUOCL_chengyu.txt") logger.debug(f"Sample idioms: {THUOCL_IDIOMS[:5]}") # 读取 THUOCL 剩下的所有 txt 文件,只保留四字词 THUOCL_WORDS = [] import os for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"): if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt": with open( ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8", ) as f: for line in f: word = line.lstrip().split(" ")[0].split("\t")[0].strip() if len(word) == 4: THUOCL_WORDS.append(word) logger.debug(f"Loaded {len(THUOCL_WORDS)} words from THUOCL txt files") logger.debug(f"Sample words: {THUOCL_WORDS[:5]}") # 只有成语的大表 cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重 # 其他四字词语表,仅表示可以有这个词 cls.ALL_WORDS = ( [word for word in cls.ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS ) cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重 # 根据成语大表,划分出成语首字字典 for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS: if idiom[0] not in cls.IDIOM_FIRST_CHAR: cls.IDIOM_FIRST_CHAR[idiom[0]] = [] cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom) # 根据真正的成语大表,划分出有效成语首字字典 for idiom in cls.ALL_IDIOMS: if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR: cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = [] cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom) evt = on_alconna( Alconna( "我要玩成语接龙", Args["rounds?", int], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def play_game( event: BaseEvent, target: DepLongTaskTarget, force=False, rounds: Optional[int] = 100, ): # group_id = str(event.get_session_id()) group_id = target.channel_id if is_idiom_game_banned(group_id): await evt.send( await UniMessage().text("本群已被禁止使用成语接龙功能!").export() ) return rounds = rounds or 0 if rounds <= 0: await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export()) return state = IdiomGame.try_start_game(group_id, force) if state == TryStartState.ALREADY_PLAYING: await evt.send( await UniMessage() .text("当前已有成语接龙游戏在进行中,请稍后再试!") .export() ) return if state == TryStartState.NO_REMAINING_TIMES: await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export()) return await evt.send( await UniMessage() .text( "你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!" ) .export() ) instance = IdiomGame.INSTANCE_LIST[group_id] instance.start_game(rounds) # 发布成语 await evt.send( await UniMessage() .text(f"第一个成语:「{instance.last_idiom}」,请接!") .export() ) evt = on_alconna( Alconna( "老子就是要玩成语接龙!!!", Args["rounds?", int], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def force_play_game( event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100 ): await play_game(event, target, force=True, rounds=rounds) async def end_game(event: BaseEvent, group_id: str): instance = IdiomGame.INSTANCE_LIST[group_id] result_text = UniMessage().text("游戏结束!\n最终得分榜:\n") score_board = instance.get_score_board() if len(score_board) == 0: result_text += "无人得分!\n" else: # 按分数排序,名字用 at 的方式 sorted_score = sorted( score_board.items(), key=lambda x: x[1]["score"], reverse=True ) for i, (user_id, info) in enumerate(sorted_score): result_text += ( f"{i + 1}. " + UniMessage().at(user_id) + f": {round(info['score'] + instance.get_all_buff_score(), 1)} 分\n" ) if len(instance.idiom_history) == 0: result_text += "\n本局没有任何接龙记录。" else: result_text += "\n你们的接龙记录是:\n" history_lines = instance.display_history() for line in history_lines: result_text += line + "\n" await evt.send(await result_text.export()) instance.clear_score_board() evt = on_alconna( Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True ) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id state = IdiomGame.try_stop_game(group_id) if state == TryStopState.STOPPED: # 发送好吧狗图片 # 打开好吧狗本地文件 with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f: img_data = f.read() await evt.send(await UniMessage().image(raw=img_data).export()) await end_game(event, group_id) else: await evt.send( await UniMessage().text("当前没有成语接龙游戏在进行中!").export() ) # 跳过 evt = on_alconna( Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True ) @evt.handle() async def _(target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id instance = IdiomGame.INSTANCE_LIST.get(group_id) if not instance or not instance.get_playing_state(): return avaliable_idiom = IdiomGame.random_idiom_starting_with(instance.get_last_char()) # 发送哈哈狗图片 with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f: img_data = f.read() await evt.send(await UniMessage().image(raw=img_data).export()) await evt.send(await UniMessage().text(f"你们太菜了,全部扣100分!明明还可以接「{avaliable_idiom}」的!").export()) idiom = await instance.skip_idiom(-100) await evt.send( await UniMessage().text(f"重新开始,下一个成语是「{idiom}」").export() ) def get_user_info(event: BaseEvent): if isinstance(event, DiscordMessageEvent): user_id = str(event.author.id) user_name = str(event.author.name) else: user_id = str(event.get_user_id()) user_name = str(event.get_user_id()) return user_id, user_name # 直接读取消息 evt = on_message() @evt.handle() async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id instance = IdiomGame.INSTANCE_LIST.get(group_id) if not instance or not instance.get_playing_state(): return user_idiom = msg.extract_plain_text().strip() user_id, user_name = get_user_info(event) state = await instance.try_verify_idiom(user_idiom, user_id) if TryVerifyState.WRONG_FIRST_CHAR in state: return if TryVerifyState.NOT_IDIOM in state: await evt.send( await UniMessage() .at(user_id) .text(" 接不上!这个不一样!你被扣了 0.1 分!") .export() ) return already_used_num = instance.get_already_used_num(user_idiom) if TryVerifyState.VERIFIED_AND_REAL in state: score = 5 * (0.5 ** (already_used_num - 1)) if already_used_num > 1: await evt.send( await UniMessage() .at(user_id) .text(f" 接上了,这是个被重复用过的成语,喜提 {score} 分!你有 {instance.get_user_score(user_id)} 分!") .export() ) else: await evt.send( await UniMessage() .at(user_id) .text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!") .export() ) elif TryVerifyState.VERIFIED in state: score = 1 * (0.5 ** (already_used_num - 1)) if already_used_num > 1: await evt.send( await UniMessage() .at(user_id) .text(f" 接上了,但重复了,喜提 {score} 分!你有 {instance.get_user_score(user_id)} 分!") .export() ) else: await evt.send( await UniMessage() .at(user_id) .text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!") .export() ) if TryVerifyState.GAME_END in state: await evt.send(await UniMessage().text("全部回合结束!").export()) await end_game(event, group_id) return if TryVerifyState.BUT_NO_NEXT in state: await evt.send( await UniMessage() .text("但是,这是条死路!你们全部都要扣 100 分!") .export() ) await evt.send( await UniMessage().text(f"重新抽取成语「{instance.last_idiom}」").export() ) await evt.send( await UniMessage() .text(f"下一个成语请以「{instance.get_last_char()}」开头!") .export() ) evt = on_alconna( Alconna("禁止成语接龙"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id add_banned_id(group_id) await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export()) evt = on_alconna( Alconna("开启成语接龙"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id remove_banned_id(group_id) await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())