From a65cb118cc324d23e0ece52ad5808890ed074c9d Mon Sep 17 00:00:00 2001 From: passthem Date: Sun, 19 Oct 2025 04:51:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E5=85=A5=E6=88=91=E5=86=99=E7=9A=84?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E6=9D=A5=E8=8E=B7=E5=BE=97=E7=BE=A4=E4=B8=8A?= =?UTF-8?q?=E4=B8=8B=E6=96=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/plugins/idiomgame/__init__.py | 315 ++++++++++++++++++-------- 1 file changed, 216 insertions(+), 99 deletions(-) diff --git a/konabot/plugins/idiomgame/__init__.py b/konabot/plugins/idiomgame/__init__.py index c528f2b..d7e382b 100644 --- a/konabot/plugins/idiomgame/__init__.py +++ b/konabot/plugins/idiomgame/__init__.py @@ -12,12 +12,23 @@ from nonebot import on_message from nonebot.adapters import Event as BaseEvent from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent -from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand, - UniMessage, UniMsg, on_alconna) +from nonebot_plugin_alconna import ( + Alconna, + Args, + Field, + Subcommand, + UniMessage, + UniMsg, + on_alconna, +) +from konabot.common.longtask import DepLongTaskTarget from konabot.common.path import ASSETS_PATH -DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json" +DATA_FILE_PATH = ( + Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json" +) + def load_banned_ids() -> list[str]: if not DATA_FILE_PATH.exists(): @@ -27,32 +38,38 @@ def load_banned_ids() -> list[str]: except Exception as e: logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}") return [] - + + def is_idiom_game_banned(group_id: str) -> bool: banned_ids = load_banned_ids() return group_id in banned_ids + def add_banned_id(group_id: str): banned_ids = load_banned_ids() if group_id not in banned_ids: banned_ids.append(group_id) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4)) + def remove_banned_id(group_id: str): banned_ids = load_banned_ids() if group_id in banned_ids: banned_ids.remove(group_id) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4)) + class TryStartState(Enum): STARTED = 0 ALREADY_PLAYING = 1 NO_REMAINING_TIMES = 2 + class TryStopState(Enum): STOPPED = 0 NOT_PLAYING = 1 + class TryVerifyState(Enum): VERIFIED = 0 NOT_IDIOM = 1 @@ -60,11 +77,12 @@ class TryVerifyState(Enum): VERIFIED_BUT_NO_NEXT = 3 VERIFIED_GAME_END = 4 + class IdiomGame: - ALL_WORDS = [] # 所有四字词语 - ALL_IDIOMS = [] # 所有成语 - INSTANCE_LIST : dict[str, "IdiomGame"] = {} # 群号对应的游戏实例 - IDIOM_FIRST_CHAR = {} # 成语首字字典 + ALL_WORDS = [] # 所有四字词语 + ALL_IDIOMS = [] # 所有成语 + INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例 + IDIOM_FIRST_CHAR = {} # 成语首字字典 __inited = False @@ -79,22 +97,22 @@ class IdiomGame: self.last_play_date = "" self.all_buff_score = 0 self.lock = asynkio.Lock() - self.remain_rounds = 0 # 剩余回合数 + self.remain_rounds = 0 # 剩余回合数 IdiomGame.INSTANCE_LIST[group_id] = self def be_able_to_play(self) -> bool: - if(self.last_play_date != datetime.date.today()): + if self.last_play_date != datetime.date.today(): self.last_play_date = datetime.date.today() self.remain_playing_times = 1 - if(self.remain_playing_times > 0): + if self.remain_playing_times > 0: self.remain_playing_times -= 1 return True return False def choose_start_idiom(self) -> str: - ''' + """ 随机选择一个成语作为起始成语 - ''' + """ self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) self.last_char = self.last_idiom[-1] if not self.is_nextable(self.last_char): @@ -113,7 +131,7 @@ class IdiomGame: return TryStartState.NO_REMAINING_TIMES instance.now_playing = True return TryStartState.STARTED - + def start_game(self, rounds: int = 100): self.now_playing = True self.remain_rounds = rounds @@ -128,93 +146,89 @@ class IdiomGame: return TryStopState.NOT_PLAYING instance.now_playing = False return TryStopState.STOPPED - + def clear_score_board(self): self.score_board = {} self.last_char = "" - + def get_score_board(self) -> dict: return self.score_board - + def get_all_buff_score(self) -> int: return self.all_buff_score async def skip_idiom(self, buff_score: int = -100) -> str: - ''' + """ 跳过当前成语,选择下一个成语 - ''' + """ await self.lock.acquire() self._skip_idiom_async(buff_score) self.lock.release() return self.last_idiom - + def _skip_idiom_async(self, buff_score: int = -100) -> str: self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) self.last_char = self.last_idiom[-1] self.add_buff_score(buff_score) - return self.last_idiom - + return self.last_idiom + async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState: - ''' + """ 用户发送成语 - ''' + """ await self.lock.acquire() state = self._verify_idiom(idiom, user_id) self.lock.release() return state - - def is_nextable(self, last_char: str) -> bool: - ''' - 判断是否有成语可以接 - ''' - return last_char in IdiomGame.IDIOM_FIRST_CHAR + def is_nextable(self, last_char: str) -> bool: + """ + 判断是否有成语可以接 + """ + return last_char in IdiomGame.IDIOM_FIRST_CHAR def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState: # 新成语的首字应与上一条成语的尾字相同 if idiom[0] != self.last_char: return TryVerifyState.WRONG_FIRST_CHAR - if(idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS): + if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS: self.add_score(user_id, -0.1) return TryVerifyState.NOT_IDIOM self.last_idiom = idiom self.last_char = idiom[-1] self.add_score(user_id, 1) self.remain_rounds -= 1 - if(self.remain_rounds <= 0): + if self.remain_rounds <= 0: self.now_playing = False return TryVerifyState.VERIFIED_GAME_END - if(not self.is_nextable(self.last_char)): + if not self.is_nextable(self.last_char): # 没有成语可以接了,自动跳过 self._skip_idiom_async() return TryVerifyState.VERIFIED_BUT_NO_NEXT return TryVerifyState.VERIFIED - + def get_user_score(self, user_id: str) -> int: if user_id not in self.score_board: return 0 return self.score_board[user_id]["score"] - + def add_score(self, user_id: str, score: int): if user_id not in self.score_board: - self.score_board[user_id] = { - "name": user_id, - "score": 0 - } + self.score_board[user_id] = {"name": user_id, "score": 0} self.score_board[user_id]["score"] += score - + def add_buff_score(self, score: int): self.all_buff_score += score def get_playing_state(self) -> bool: return self.now_playing - + def get_last_char(self) -> str: return self.last_char @classmethod def init_lexicon(cls): - if(cls.__inited): + if cls.__inited: return cls.__inited = True @@ -235,28 +249,40 @@ class IdiomGame: COMMON_WORDS.append(word) # 读取 THUOCL 成语库 - with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f: + with open( + ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", + "r", + encoding="utf-8", + ) as f: THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f] # 读取 THUOCL 剩下的所有 txt 文件,只保留四字词 THUOCL_WORDS = [] import os + for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"): if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt": - with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f: + with open( + ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, + "r", + encoding="utf-8", + ) as f: for line in f: word = line.lstrip().split(" ")[0].strip() if len(word) == 4: THUOCL_WORDS.append(word) - # 只有成语的大表 cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS - cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重 + cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重 # 其他四字词语表,仅表示可以有这个词 - cls.ALL_WORDS = [word for word in cls.ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS - cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重 + cls.ALL_WORDS = ( + [word for word in cls.ALL_WORDS if len(word) == 4] + + THUOCL_WORDS + + COMMON_WORDS + ) + cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重 # 根据成语大表,划分出成语首字字典 for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS: @@ -264,42 +290,80 @@ class IdiomGame: cls.IDIOM_FIRST_CHAR[idiom[0]] = [] cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom) -evt = on_alconna(Alconna( - "我要玩成语接龙", - Args['rounds?', int], -), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) + +evt = on_alconna( + Alconna( + "我要玩成语接龙", + Args["rounds?", int], + ), + use_cmd_start=True, + use_cmd_sep=False, + skip_for_unmatch=True, +) + @evt.handle() -async def play_game(event: BaseEvent, force = False, rounds: Optional[int] = 100): - group_id = str(event.get_session_id()) +async def play_game( + event: BaseEvent, + target: DepLongTaskTarget, + force=False, + rounds: Optional[int] = 100, +): + # group_id = str(event.get_session_id()) + group_id = target.channel_id if is_idiom_game_banned(group_id): - await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export()) + await evt.send( + await UniMessage().text("本群已被禁止使用成语接龙功能!").export() + ) return + rounds = rounds or 0 if rounds <= 0: await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export()) return state = IdiomGame.try_start_game(group_id, force) if state == TryStartState.ALREADY_PLAYING: - await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export()) + await evt.send( + await UniMessage() + .text("当前已有成语接龙游戏在进行中,请稍后再试!") + .export() + ) return if state == TryStartState.NO_REMAINING_TIMES: await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export()) return - await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export()) + await evt.send( + await UniMessage() + .text( + "你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!" + ) + .export() + ) instance = IdiomGame.INSTANCE_LIST[group_id] instance.start_game(rounds) # 发布成语 - await evt.send(await UniMessage().text(f"第一个成语:「{instance.last_idiom}」,请接!").export()) + await evt.send( + await UniMessage() + .text(f"第一个成语:「{instance.last_idiom}」,请接!") + .export() + ) + + +evt = on_alconna( + Alconna( + "老子就是要玩成语接龙!!!", + Args["rounds?", int], + ), + use_cmd_start=True, + use_cmd_sep=False, + skip_for_unmatch=True, +) -evt = on_alconna(Alconna( - "老子就是要玩成语接龙!!!", - Args['rounds?', int], -), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() async def force_play_game(event: BaseEvent, rounds: Optional[int] = 100): await play_game(event, force=True, rounds=rounds) + async def end_game(event: BaseEvent, group_id: str): instance = IdiomGame.INSTANCE_LIST[group_id] result_text = UniMessage().text("游戏结束!\n最终得分榜:\n") @@ -308,19 +372,28 @@ async def end_game(event: BaseEvent, group_id: str): result_text += "无人得分!" else: # 按分数排序,名字用 at 的方式 - sorted_score = sorted(score_board.items(), key=lambda x: x[1]["score"], reverse=True) + sorted_score = sorted( + score_board.items(), key=lambda x: x[1]["score"], reverse=True + ) for i, (user_id, info) in enumerate(sorted_score): - result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + instance.get_all_buff_score()} 分\n" + result_text += ( + f"{i + 1}. " + + UniMessage().at(user_id) + + f": {info['score'] + instance.get_all_buff_score()} 分\n" + ) await evt.send(await result_text.export()) instance.clear_score_board() -evt = on_alconna(Alconna( - "不玩了" -), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) + +evt = on_alconna( + Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True +) + @evt.handle() -async def _(event: BaseEvent): - group_id = str(event.get_session_id()) +async def _(event: BaseEvent, target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id state = IdiomGame.try_stop_game(group_id) if state == TryStopState.STOPPED: # 发送好吧狗图片 @@ -330,22 +403,30 @@ async def _(event: BaseEvent): await evt.send(await UniMessage().image(raw=img_data).export()) await end_game(event, group_id) else: - await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export()) + await evt.send( + await UniMessage().text("当前没有成语接龙游戏在进行中!").export() + ) + # 跳过 -evt = on_alconna(Alconna( - "跳过成语" -), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) +evt = on_alconna( + Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True +) + @evt.handle() -async def _(event: BaseEvent): - group_id = str(event.get_session_id()) +async def _(target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id instance = IdiomGame.INSTANCE_LIST.get(group_id) if not instance or not instance.get_playing_state(): return await evt.send(await UniMessage().text("你们太菜了!全部扣100分!").export()) idiom = await instance.skip_idiom(-100) - await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}」").export()) + await evt.send( + await UniMessage().text(f"重新开始,下一个成语是「{idiom}」").export() + ) + def get_user_info(event: BaseEvent): if isinstance(event, DiscordMessageEvent): @@ -356,49 +437,85 @@ def get_user_info(event: BaseEvent): user_name = str(event.get_user_id()) return user_id, user_name + # 直接读取消息 evt = on_message() + @evt.handle() -async def _(event: BaseEvent, msg: UniMsg): - group_id = str(event.get_session_id()) +async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id instance = IdiomGame.INSTANCE_LIST.get(group_id) if not instance or not instance.get_playing_state(): return user_idiom = msg.extract_plain_text().strip() - user_id , user_name = get_user_info(event) + user_id, user_name = get_user_info(event) state = await instance.try_verify_idiom(user_idiom, user_id) - if(state == TryVerifyState.WRONG_FIRST_CHAR): + if state == TryVerifyState.WRONG_FIRST_CHAR: return - if(state == TryVerifyState.NOT_IDIOM): - await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export()) + if state == TryVerifyState.NOT_IDIOM: + await evt.send( + await UniMessage() + .at(user_id) + .text("接不上!这个不一样!你被扣了 0.1 分!") + .export() + ) return - await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {instance.get_user_score(user_id)} 分!").export()) - if(state == TryVerifyState.VERIFIED_GAME_END): + await evt.send( + await UniMessage() + .at(user_id) + .text(f"接对了!你有 {instance.get_user_score(user_id)} 分!") + .export() + ) + if state == TryVerifyState.VERIFIED_GAME_END: await evt.send(await UniMessage().text("全部回合结束!").export()) await end_game(event, group_id) return - if(state == TryVerifyState.VERIFIED_BUT_NO_NEXT): - await evt.send(await UniMessage().text("但是,这是条死路!你们全部都要扣 100 分!").export()) - await evt.send(await UniMessage().text(f"重新抽取成语「{instance.last_idiom}」").export()) - await evt.send(await UniMessage().text(f"下一个成语请以「{instance.get_last_char()}」开头!").export()) + if state == TryVerifyState.VERIFIED_BUT_NO_NEXT: + await evt.send( + await UniMessage() + .text("但是,这是条死路!你们全部都要扣 100 分!") + .export() + ) + await evt.send( + await UniMessage().text(f"重新抽取成语「{instance.last_idiom}」").export() + ) + await evt.send( + await UniMessage() + .text(f"下一个成语请以「{instance.get_last_char()}」开头!") + .export() + ) + + +evt = on_alconna( + Alconna("禁止成语接龙"), + use_cmd_start=True, + use_cmd_sep=False, + skip_for_unmatch=True, +) -evt = on_alconna(Alconna( - "禁止成语接龙" -), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() -async def _(event: BaseEvent): - group_id = str(event.get_session_id()) +async def _(event: BaseEvent, target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id add_banned_id(group_id) await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export()) -evt = on_alconna(Alconna( - "开启成语接龙" -), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) + +evt = on_alconna( + Alconna("开启成语接龙"), + use_cmd_start=True, + use_cmd_sep=False, + skip_for_unmatch=True, +) + @evt.handle() -async def _(event: BaseEvent): - group_id = str(event.get_session_id()) +async def _(event: BaseEvent, target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id remove_banned_id(group_id) - await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export()) \ No newline at end of file + await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export()) +