diff --git a/konabot/plugins/idiomgame/__init__.py b/konabot/plugins/idiomgame/__init__.py index 99a5a7b..c528f2b 100644 --- a/konabot/plugins/idiomgame/__init__.py +++ b/konabot/plugins/idiomgame/__init__.py @@ -1,8 +1,13 @@ +import asyncio as asynkio import base64 +from pathlib import Path import secrets import json -from typing import Literal +import datetime +from typing import Literal, Optional +from enum import Enum +from loguru import logger from nonebot import on_message from nonebot.adapters import Event as BaseEvent from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent @@ -12,120 +17,302 @@ from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand, from konabot.common.path import ASSETS_PATH -ALL_WORDS = [] # 所有四字词语 -ALL_IDIOMS = [] # 所有成语 -IDIOM_FIRST_CHAR = {} # 成语首字字典 +DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json" -INITED = False - -def init_lexicon(): - global ALL_WORDS, ALL_IDIOMS, IDIOM_FIRST_CHAR - # 成语大表 - with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f: - ALL_IDIOMS_INFOS = json.load(f) - - # 词语大表 - with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f: - ALL_WORDS = json.load(f) +def load_banned_ids() -> list[str]: + if not DATA_FILE_PATH.exists(): + return [] + try: + return json.loads(DATA_FILE_PATH.read_text()) + except Exception as e: + logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}") + return [] - COMMON_WORDS = [] - # 读取 COMMON 词语大表 - with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f: - for line in f: - word = line.strip() - if len(word) == 4: - COMMON_WORDS.append(word) +def is_idiom_game_banned(group_id: str) -> bool: + banned_ids = load_banned_ids() + return group_id in banned_ids - # 读取 THUOCL 成语库 - with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f: - THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f] +def add_banned_id(group_id: str): + banned_ids = load_banned_ids() + if group_id not in banned_ids: + banned_ids.append(group_id) + DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4)) - # 读取 THUOCL 剩下的所有 txt 文件,只保留四字词 - THUOCL_WORDS = [] - import os - for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"): - if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt": - with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f: - for line in f: - word = line.lstrip().split(" ")[0].strip() - if len(word) == 4: - THUOCL_WORDS.append(word) +def remove_banned_id(group_id: str): + banned_ids = load_banned_ids() + if group_id in banned_ids: + banned_ids.remove(group_id) + DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4)) + +class TryStartState(Enum): + STARTED = 0 + ALREADY_PLAYING = 1 + NO_REMAINING_TIMES = 2 + +class TryStopState(Enum): + STOPPED = 0 + NOT_PLAYING = 1 + +class TryVerifyState(Enum): + VERIFIED = 0 + NOT_IDIOM = 1 + WRONG_FIRST_CHAR = 2 + VERIFIED_BUT_NO_NEXT = 3 + VERIFIED_GAME_END = 4 + +class IdiomGame: + ALL_WORDS = [] # 所有四字词语 + ALL_IDIOMS = [] # 所有成语 + INSTANCE_LIST : dict[str, "IdiomGame"] = {} # 群号对应的游戏实例 + IDIOM_FIRST_CHAR = {} # 成语首字字典 + + __inited = False + + def __init__(self, group_id: str): + # 初始化一局游戏 + self.group_id = "" + self.now_playing = False + self.score_board = {} + self.last_idiom = "" + self.last_char = "" + self.remain_playing_times = 3 + self.last_play_date = "" + self.all_buff_score = 0 + self.lock = asynkio.Lock() + self.remain_rounds = 0 # 剩余回合数 + IdiomGame.INSTANCE_LIST[group_id] = self + + def be_able_to_play(self) -> bool: + if(self.last_play_date != datetime.date.today()): + self.last_play_date = datetime.date.today() + self.remain_playing_times = 1 + if(self.remain_playing_times > 0): + self.remain_playing_times -= 1 + return True + return False + + def choose_start_idiom(self) -> str: + ''' + 随机选择一个成语作为起始成语 + ''' + self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) + self.last_char = self.last_idiom[-1] + if not self.is_nextable(self.last_char): + self.choose_start_idiom() + return self.last_idiom + + @classmethod + def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState: + cls.init_lexicon() + if not cls.INSTANCE_LIST.get(group_id): + cls(group_id) + instance = cls.INSTANCE_LIST[group_id] + if instance.now_playing: + return TryStartState.ALREADY_PLAYING + if not instance.be_able_to_play() and not force: + return TryStartState.NO_REMAINING_TIMES + instance.now_playing = True + return TryStartState.STARTED + + def start_game(self, rounds: int = 100): + self.now_playing = True + self.remain_rounds = rounds + self.choose_start_idiom() + + @classmethod + def try_stop_game(cls, group_id: str) -> TryStopState: + if not cls.INSTANCE_LIST.get(group_id): + return TryStopState.NOT_PLAYING + instance = cls.INSTANCE_LIST[group_id] + if not instance.now_playing: + return TryStopState.NOT_PLAYING + instance.now_playing = False + return TryStopState.STOPPED + + def clear_score_board(self): + self.score_board = {} + self.last_char = "" + + def get_score_board(self) -> dict: + return self.score_board + + def get_all_buff_score(self) -> int: + return self.all_buff_score + + async def skip_idiom(self, buff_score: int = -100) -> str: + ''' + 跳过当前成语,选择下一个成语 + ''' + await self.lock.acquire() + self._skip_idiom_async(buff_score) + self.lock.release() + return self.last_idiom + + def _skip_idiom_async(self, buff_score: int = -100) -> str: + self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) + self.last_char = self.last_idiom[-1] + self.add_buff_score(buff_score) + return self.last_idiom + + async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState: + ''' + 用户发送成语 + ''' + await self.lock.acquire() + state = self._verify_idiom(idiom, user_id) + self.lock.release() + return state + + def is_nextable(self, last_char: str) -> bool: + ''' + 判断是否有成语可以接 + ''' + return last_char in IdiomGame.IDIOM_FIRST_CHAR - # 只有成语的大表 - ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS - ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重 + def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState: + # 新成语的首字应与上一条成语的尾字相同 + if idiom[0] != self.last_char: + return TryVerifyState.WRONG_FIRST_CHAR + if(idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS): + self.add_score(user_id, -0.1) + return TryVerifyState.NOT_IDIOM + self.last_idiom = idiom + self.last_char = idiom[-1] + self.add_score(user_id, 1) + self.remain_rounds -= 1 + if(self.remain_rounds <= 0): + self.now_playing = False + return TryVerifyState.VERIFIED_GAME_END + if(not self.is_nextable(self.last_char)): + # 没有成语可以接了,自动跳过 + self._skip_idiom_async() + return TryVerifyState.VERIFIED_BUT_NO_NEXT + return TryVerifyState.VERIFIED + + def get_user_score(self, user_id: str) -> int: + if user_id not in self.score_board: + return 0 + return self.score_board[user_id]["score"] + + def add_score(self, user_id: str, score: int): + if user_id not in self.score_board: + self.score_board[user_id] = { + "name": user_id, + "score": 0 + } + self.score_board[user_id]["score"] += score + + def add_buff_score(self, score: int): + self.all_buff_score += score - # 其他四字词语表,仅表示可以有这个词 - ALL_WORDS = [word for word in ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS - ALL_WORDS = list(set(ALL_WORDS)) # 去重 + def get_playing_state(self) -> bool: + return self.now_playing + + def get_last_char(self) -> str: + return self.last_char - # 根据成语大表,划分出成语首字字典 - IDIOM_FIRST_CHAR = {} - for idiom in ALL_IDIOMS + ALL_WORDS: - if idiom[0] not in IDIOM_FIRST_CHAR: - IDIOM_FIRST_CHAR[idiom[0]] = [] - IDIOM_FIRST_CHAR[idiom[0]].append(idiom) + @classmethod + def init_lexicon(cls): + if(cls.__inited): + return + cls.__inited = True -NOW_PLAYING = False + # 成语大表 + with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f: + ALL_IDIOMS_INFOS = json.load(f) -SCORE_BOARD = {} + # 词语大表 + with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f: + cls.ALL_WORDS = json.load(f) -LAST_CHAR = "" + COMMON_WORDS = [] + # 读取 COMMON 词语大表 + with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f: + for line in f: + word = line.strip() + if len(word) == 4: + COMMON_WORDS.append(word) -USER_NAME_CACHE = {} # 缓存用户名称,避免多次获取 + # 读取 THUOCL 成语库 + with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f: + THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f] -REMAIN_PLAYING_TIMES = 1 -LAST_PLAY_DATE = "" + # 读取 THUOCL 剩下的所有 txt 文件,只保留四字词 + THUOCL_WORDS = [] + import os + for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"): + if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt": + with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f: + for line in f: + word = line.lstrip().split(" ")[0].strip() + if len(word) == 4: + THUOCL_WORDS.append(word) -LOCK = False -ALL_BUFF_SCORE = 0 # 全体分数 + # 只有成语的大表 + cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS + cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重 -import datetime + # 其他四字词语表,仅表示可以有这个词 + cls.ALL_WORDS = [word for word in cls.ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS + cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重 -def be_able_to_play(): - global REMAIN_PLAYING_TIMES, LAST_PLAY_DATE - if(LAST_PLAY_DATE != datetime.date.today()): - LAST_PLAY_DATE = datetime.date.today() - REMAIN_PLAYING_TIMES = 1 - if(REMAIN_PLAYING_TIMES > 0): - REMAIN_PLAYING_TIMES -= 1 - return True - return False + # 根据成语大表,划分出成语首字字典 + for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS: + if idiom[0] not in cls.IDIOM_FIRST_CHAR: + cls.IDIOM_FIRST_CHAR[idiom[0]] = [] + cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom) evt = on_alconna(Alconna( - "我要玩成语接龙" + "我要玩成语接龙", + Args['rounds?', int], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() -async def play_game(event: BaseEvent, force = False): - global NOW_PLAYING, LAST_CHAR, INITED - if NOW_PLAYING: +async def play_game(event: BaseEvent, force = False, rounds: Optional[int] = 100): + group_id = str(event.get_session_id()) + if is_idiom_game_banned(group_id): + await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export()) + return + if rounds <= 0: + await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export()) + return + state = IdiomGame.try_start_game(group_id, force) + if state == TryStartState.ALREADY_PLAYING: await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export()) return - if not be_able_to_play() and not force: + if state == TryStartState.NO_REMAINING_TIMES: await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export()) return - if not INITED: - init_lexicon() - INITED = True - NOW_PLAYING = True await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export()) - # 选择一个随机成语 - idiom = secrets.choice(ALL_IDIOMS) - LAST_CHAR = idiom[-1] + instance = IdiomGame.INSTANCE_LIST[group_id] + instance.start_game(rounds) # 发布成语 - await evt.send(await UniMessage().text(f"第一个成语:「{idiom}」,请接!").export()) + await evt.send(await UniMessage().text(f"第一个成语:「{instance.last_idiom}」,请接!").export()) evt = on_alconna(Alconna( - "老子就是要玩成语接龙!!!" + "老子就是要玩成语接龙!!!", + Args['rounds?', int], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) @evt.handle() -async def force_play_game(event: BaseEvent): - await play_game(event, force=True) - +async def force_play_game(event: BaseEvent, rounds: Optional[int] = 100): + await play_game(event, force=True, rounds=rounds) + +async def end_game(event: BaseEvent, group_id: str): + instance = IdiomGame.INSTANCE_LIST[group_id] + result_text = UniMessage().text("游戏结束!\n最终得分榜:\n") + score_board = instance.get_score_board() + if len(score_board) == 0: + result_text += "无人得分!" + else: + # 按分数排序,名字用 at 的方式 + sorted_score = sorted(score_board.items(), key=lambda x: x[1]["score"], reverse=True) + for i, (user_id, info) in enumerate(sorted_score): + result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + instance.get_all_buff_score()} 分\n" + await evt.send(await result_text.export()) + instance.clear_score_board() evt = on_alconna(Alconna( "不玩了" @@ -133,26 +320,15 @@ evt = on_alconna(Alconna( @evt.handle() async def _(event: BaseEvent): - global NOW_PLAYING, SCORE_BOARD, LAST_CHAR, ALL_BUFF_SCORE - if NOW_PLAYING: - NOW_PLAYING = False + group_id = str(event.get_session_id()) + state = IdiomGame.try_stop_game(group_id) + if state == TryStopState.STOPPED: # 发送好吧狗图片 # 打开好吧狗本地文件 with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f: img_data = f.read() await evt.send(await UniMessage().image(raw=img_data).export()) - result_text = UniMessage().text("游戏结束!\n最终得分榜:\n") - if len(SCORE_BOARD) == 0: - result_text += "无人得分!" - else: - # 按分数排序,名字用 at 的方式 - sorted_score = sorted(SCORE_BOARD.items(), key=lambda x: x[1]["score"], reverse=True) - for i, (user_id, info) in enumerate(sorted_score): - result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + ALL_BUFF_SCORE} 分\n" - await evt.send(await result_text.export()) - # 重置分数板 - SCORE_BOARD = {} - LAST_CHAR = "" + await end_game(event, group_id) else: await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export()) @@ -163,67 +339,66 @@ evt = on_alconna(Alconna( @evt.handle() async def _(event: BaseEvent): - global NOW_PLAYING, LAST_CHAR, ALL_BUFF_SCORE - if not NOW_PLAYING: + group_id = str(event.get_session_id()) + instance = IdiomGame.INSTANCE_LIST.get(group_id) + if not instance or not instance.get_playing_state(): return await evt.send(await UniMessage().text("你们太菜了!全部扣100分!").export()) - ALL_BUFF_SCORE -= 100 - # 选择下一个成语 - idiom = secrets.choice(ALL_IDIOMS) - LAST_CHAR = idiom[-1] + idiom = await instance.skip_idiom(-100) await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}」").export()) -# 直接读取消息 -evt = on_message() - -@evt.handle() -async def _(event: BaseEvent, msg: UniMsg): - global NOW_PLAYING, LAST_CHAR, LOCK - if not NOW_PLAYING: - return - user_idiom = msg.extract_plain_text().strip() - if(user_idiom[0] != LAST_CHAR): - return - if LOCK: - return - LOCK = True - await handle_send_info(event, msg) - LOCK = False - -async def handle_send_info(event: BaseEvent, msg: UniMsg): - global NOW_PLAYING, LAST_CHAR, SCORE_BOARD, ALL_BUFF_SCORE - user_idiom = msg.extract_plain_text().strip() - if(user_idiom not in ALL_IDIOMS and user_idiom not in ALL_WORDS): - # 扣0.1分 - if isinstance(event, DiscordMessageEvent): - user_id = str(event.author.id) - user_name = str(event.author.name) - else: - user_id = str(event.get_user_id()) - user_name = str(event.get_user_id()) - if user_id not in SCORE_BOARD: - SCORE_BOARD[user_id] = { - "name": user_name, - "score": 0 - } - SCORE_BOARD[user_id]["score"] -= 0.1 - await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export()) - return - # 成功接上 +def get_user_info(event: BaseEvent): if isinstance(event, DiscordMessageEvent): user_id = str(event.author.id) user_name = str(event.author.name) else: user_id = str(event.get_user_id()) user_name = str(event.get_user_id()) + return user_id, user_name - if user_id not in SCORE_BOARD: - SCORE_BOARD[user_id] = { - "name": user_name, - "score": 0 - } - SCORE_BOARD[user_id]["score"] += 1 - # at 指定玩家 - await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {SCORE_BOARD[user_id]['score'] + ALL_BUFF_SCORE} 分!").export()) - LAST_CHAR = user_idiom[-1] - await evt.send(await UniMessage().text(f"下一个成语请以「{LAST_CHAR}」开头!").export()) \ No newline at end of file +# 直接读取消息 +evt = on_message() + +@evt.handle() +async def _(event: BaseEvent, msg: UniMsg): + group_id = str(event.get_session_id()) + instance = IdiomGame.INSTANCE_LIST.get(group_id) + if not instance or not instance.get_playing_state(): + return + user_idiom = msg.extract_plain_text().strip() + user_id , user_name = get_user_info(event) + state = await instance.try_verify_idiom(user_idiom, user_id) + if(state == TryVerifyState.WRONG_FIRST_CHAR): + return + if(state == TryVerifyState.NOT_IDIOM): + await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export()) + return + await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {instance.get_user_score(user_id)} 分!").export()) + if(state == TryVerifyState.VERIFIED_GAME_END): + await evt.send(await UniMessage().text("全部回合结束!").export()) + await end_game(event, group_id) + return + if(state == TryVerifyState.VERIFIED_BUT_NO_NEXT): + await evt.send(await UniMessage().text("但是,这是条死路!你们全部都要扣 100 分!").export()) + await evt.send(await UniMessage().text(f"重新抽取成语「{instance.last_idiom}」").export()) + await evt.send(await UniMessage().text(f"下一个成语请以「{instance.get_last_char()}」开头!").export()) + +evt = on_alconna(Alconna( + "禁止成语接龙" +), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) + +@evt.handle() +async def _(event: BaseEvent): + group_id = str(event.get_session_id()) + add_banned_id(group_id) + await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export()) + +evt = on_alconna(Alconna( + "开启成语接龙" +), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) + +@evt.handle() +async def _(event: BaseEvent): + group_id = str(event.get_session_id()) + remove_banned_id(group_id) + await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export()) \ No newline at end of file