import asyncio as asynkio import datetime import json import secrets import csv import zipfile from PIL import Image from io import BytesIO from enum import Enum from pathlib import Path from typing import Optional from loguru import logger from nonebot import on_message import nonebot from nonebot.adapters import Event as BaseEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot_plugin_alconna import ( Alconna, Args, UniMessage, UniMsg, on_alconna, ) from konabot.common.database import DatabaseManager from konabot.common.longtask import DepLongTaskTarget from konabot.common.path import ASSETS_PATH from konabot.plugins.hanzi import get_char ROOT_PATH = Path(__file__).resolve().parent DATA_DIR = Path(__file__).parent.parent.parent.parent / "data" DATA_FILE_PATH = ( DATA_DIR / "oracle_banned.json" ) # 创建全局数据库管理器实例 db_manager = DatabaseManager() def load_banned_ids() -> list[str]: if not DATA_FILE_PATH.exists(): return [] try: return json.loads(DATA_FILE_PATH.read_text("utf-8")) except Exception as e: logger.warning(f"在解析甲骨文封禁文件时遇到问题:{e}") return [] def is_oracle_game_banned(group_id: str) -> bool: banned_ids = load_banned_ids() return group_id in banned_ids def add_banned_id(group_id: str): banned_ids = load_banned_ids() if group_id not in banned_ids: banned_ids.append(group_id) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8") def remove_banned_id(group_id: str): banned_ids = load_banned_ids() if group_id in banned_ids: banned_ids.remove(group_id) DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8") driver = nonebot.get_driver() @driver.on_startup async def register_startup_hook(): """注册启动时需要执行的函数""" await oracleGame.init_lexicon() @driver.on_shutdown async def register_shutdown_hook(): """注册关闭时需要执行的函数""" # 关闭所有数据库连接 await db_manager.close_all_connections() class TryStartState(Enum): STARTED = 0 ALREADY_PLAYING = 1 NO_REMAINING_TIMES = 2 class TryStopState(Enum): STOPPED = 0 NOT_PLAYING = 1 class TryVerifyState(Enum): VERIFIED = 0 NOT_ORACLE = 1 HINT_ONE = 2 HINT_TWO = 3 GAME_END = 4 class oracleGame: ALL_ORACLES = {} INSTANCE_LIST: dict[str, "oracleGame"] = {} # 群号对应的游戏实例 __inited = False def __init__(self, group_id: str): # 初始化一局游戏 self.group_id = "" self.now_playing = False self.score_board = {} self.remain_playing_times = 3 self.last_play_date = "" self.all_buff_score = 0 self.lock = asynkio.Lock() self.remain_rounds = 0 # 剩余回合数 self.current_oracle_id = "" self.wrong_attempts = 0 oracleGame.INSTANCE_LIST[group_id] = self def be_able_to_play(self) -> bool: if self.last_play_date != datetime.date.today(): self.last_play_date = datetime.date.today() self.remain_playing_times = 3 if self.remain_playing_times > 0: self.remain_playing_times -= 1 return True return False def get_oracle_image(self) -> bytes: IMAGE_PATH = ASSETS_PATH / "oracle" / "image" with open(IMAGE_PATH / self.ALL_ORACLES[self.current_oracle_id]["image"], "rb") as f: img_data = f.read() return img_data def get_oracle_name(self) -> str: return self.ALL_ORACLES.get(self.current_oracle_id, {}).get("oracle", "?")[0] @staticmethod async def random_oracle() -> str: return secrets.choice(list(oracleGame.ALL_ORACLES.keys())) async def choose_start_oracle(self) -> str: """ 随机选择一个甲骨文作为起始甲骨文 """ self.current_oracle_id = await oracleGame.random_oracle() return self.current_oracle_id @classmethod async def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState: await cls.init_lexicon() if not cls.INSTANCE_LIST.get(group_id): cls(group_id) instance = cls.INSTANCE_LIST[group_id] if instance.now_playing: return TryStartState.ALREADY_PLAYING if not instance.be_able_to_play() and not force: return TryStartState.NO_REMAINING_TIMES instance.now_playing = True return TryStartState.STARTED async def start_game(self, rounds: int = 100): self.now_playing = True self.remain_rounds = rounds await self.choose_start_oracle() @classmethod def try_stop_game(cls, group_id: str) -> TryStopState: if not cls.INSTANCE_LIST.get(group_id): return TryStopState.NOT_PLAYING instance = cls.INSTANCE_LIST[group_id] if not instance.now_playing: return TryStopState.NOT_PLAYING instance.now_playing = False return TryStopState.STOPPED def clear_score_board(self): self.wrong_attempts = 0 self.score_board = {} self.all_buff_score = 0 def get_score_board(self) -> dict: return self.score_board def get_all_buff_score(self) -> int: return self.all_buff_score async def skip_oracle(self, buff_score: int = -100) -> str: """ 跳过当前甲骨文,选择下一个甲骨文 """ async with self.lock: await self._skip_oracle_async() self.add_buff_score(buff_score) return self.current_oracle_id async def _skip_oracle_async(self) -> str: self.wrong_attempts = 0 self.current_oracle_id = await oracleGame.random_oracle() return self.current_oracle_id async def try_verify_oracle(self, oracle: str, user_id: str) -> list[TryVerifyState]: """ 用户发送甲骨文 """ async with self.lock: state = await self._verify_oracle(oracle, user_id) return state async def _verify_oracle(self, oracle: str, user_id: str) -> list[TryVerifyState]: state = [] if oracle.strip() not in self.ALL_ORACLES[self.current_oracle_id].get("oracle", ""): state.append(TryVerifyState.NOT_ORACLE) self.wrong_attempts += 1 if self.wrong_attempts == 5: state.append(TryVerifyState.HINT_ONE) elif self.wrong_attempts == 10: state.append(TryVerifyState.HINT_TWO) return state if oracle.strip() == "": return [TryVerifyState.NOT_ORACLE] # 甲骨文合法,更新状态 self.wrong_attempts = 0 state.append(TryVerifyState.VERIFIED) self.add_score(user_id, 1) # 加 1 分 self.remain_rounds -= 1 if self.remain_rounds <= 0: self.now_playing = False state.append(TryVerifyState.GAME_END) else: await self._skip_oracle_async() return state def get_user_score(self, user_id: str) -> float: if user_id not in self.score_board: return 0 # 避免浮点数精度问题导致过长 handled_score = round(self.score_board[user_id]["score"] + self.all_buff_score, 1) return handled_score def add_score(self, user_id: str, score: int): if user_id not in self.score_board: self.score_board[user_id] = {"name": user_id, "score": 0} self.score_board[user_id]["score"] += score def add_buff_score(self, score: int): self.all_buff_score += score def get_playing_state(self) -> bool: return self.now_playing def get_pinyin_hint(self) -> str: return self.ALL_ORACLES[self.current_oracle_id].get("pinyin", "无") def get_meaning_hint(self) -> str: return self.ALL_ORACLES[self.current_oracle_id].get("meaning", "无") @classmethod async def init_lexicon(cls): if cls.__inited: return cls.__inited = True # 加载甲骨文 ORACLE_DATA_PATH = ASSETS_PATH / "oracle" with open(ORACLE_DATA_PATH / "zi_dict.csv", "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) # 以“子字头”为key,释文为value,构建字典 for row in reader: char = row["子字头"].strip() oracle = row["释文"].strip() img_path = row.get("路径", "").strip() cls.ALL_ORACLES[char] = { "oracle": oracle, "image": img_path, "pinyin": row.get("拼音", "").strip(), "meaning": row.get("含义", "").strip(), } logger.info(f"加载甲骨文字典,共计 {len(cls.ALL_ORACLES)} 条记录") # 解包图片资源 IMAGE_PATH = ASSETS_PATH / "oracle" / "image" if not IMAGE_PATH.exists(): IMAGE_PATH.mkdir(parents=True, exist_ok=True) # 将 image.zip 解压到 IMAGE_PATH if (ASSETS_PATH / "oracle" / "image.zip").exists(): with zipfile.ZipFile(ASSETS_PATH / "oracle" / "image.zip", "r") as zip_ref: zip_ref.extractall(IMAGE_PATH) evt = on_alconna( Alconna( "我要玩甲骨文", Args["rounds?", int], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def play_game( event: BaseEvent, target: DepLongTaskTarget, force=False, rounds: Optional[int] = 100, ): # group_id = str(event.get_session_id()) group_id = target.channel_id if is_oracle_game_banned(group_id): await evt.send( await UniMessage().text("本群已被禁止使用甲骨文功能!").export() ) return rounds = rounds or 0 if rounds <= 0: await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export()) return state = await oracleGame.try_start_game(group_id, force) if state == TryStartState.ALREADY_PLAYING: await evt.send( await UniMessage() .text("当前已有甲骨文游戏在进行中,请稍后再试!") .export() ) return if state == TryStartState.NO_REMAINING_TIMES: await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export()) return await evt.send( await UniMessage() .text( "你小子,还真有意思!\n好,甲骨文游戏开始!我发一个甲骨文,尼赖硕!" ) .export() ) instance = oracleGame.INSTANCE_LIST[group_id] await instance.start_game(rounds) # 发布甲骨文 await evt.send( await UniMessage() .image(raw=instance.get_oracle_image()) .export() ) evt = on_alconna( Alconna( "老子就是要玩甲骨文!!!", Args["rounds?", int], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def force_play_game( event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100 ): await play_game(event, target, force=True, rounds=rounds) async def end_game(event: BaseEvent, group_id: str): instance = oracleGame.INSTANCE_LIST[group_id] result_text = UniMessage().text("游戏结束!\n最终得分榜:\n") score_board = instance.get_score_board() if len(score_board) == 0: result_text += "无人得分!\n" else: # 按分数排序,名字用 at 的方式 sorted_score = sorted( score_board.items(), key=lambda x: x[1]["score"], reverse=True ) for i, (user_id, info) in enumerate(sorted_score): result_text += ( f"{i + 1}. " + UniMessage().at(user_id) + f": {round(info['score'] + instance.get_all_buff_score(), 1)} 分\n" ) await evt.send(await result_text.export()) # instance.clear_score_board() # 将实例删除 del oracleGame.INSTANCE_LIST[group_id] evt = on_alconna( Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True ) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id state = oracleGame.try_stop_game(group_id) if state == TryStopState.STOPPED: # 发送好吧狗图片 # 打开好吧狗本地文件 with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f: img_data = f.read() # 把好吧狗变成 GIF 格式以缩小尺寸 img_data = await convert_image_to_gif(img_data) await evt.send(await UniMessage().image(raw=img_data).export()) await end_game(event, group_id) else: # await evt.send( # await UniMessage().text("当前没有甲骨文游戏在进行中!").export() # ) return async def convert_image_to_gif(image_data: bytes) -> bytes: with Image.open(BytesIO(image_data)) as img: with BytesIO() as output: img.save(output, format="GIF") return output.getvalue() # 跳过 evt = on_alconna( Alconna("跳过甲骨文"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True ) @evt.handle() async def _(target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id instance = oracleGame.INSTANCE_LIST.get(group_id) if not instance or not instance.get_playing_state(): return # 发送哈哈狗图片 with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f: img_data = f.read() # 把哈哈狗变成 GIF 格式以缩小尺寸 img_data = await convert_image_to_gif(img_data) oracle = instance.get_oracle_name() await evt.send(await UniMessage().image(raw=img_data).export()) await evt.send(await UniMessage().text(f"你们太菜了,全部扣100分!这个甲骨文是「{oracle}」!").export()) oracle = await instance.skip_oracle(-100) await evt.send( await UniMessage() .image(raw=instance.get_oracle_image()) .export() ) def get_user_info(event: BaseEvent): if isinstance(event, DiscordMessageEvent): user_id = str(event.author.id) user_name = str(event.author.name) else: user_id = str(event.get_user_id()) user_name = str(event.get_user_id()) return user_id, user_name # 直接读取消息 evt = on_message() @evt.handle() async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id instance = oracleGame.INSTANCE_LIST.get(group_id) if not instance or not instance.get_playing_state(): return user_oracle = msg.extract_plain_text().strip() # 甲骨文应该是单个汉字 if len(user_oracle) != 1: return user_id, user_name = get_user_info(event) state = await instance.try_verify_oracle(user_oracle, user_id) if TryVerifyState.HINT_ONE in state: hint_pinyin = instance.get_pinyin_hint() await evt.send( await UniMessage() .text(f"提示:这个甲骨文的拼音是「{hint_pinyin}」") .export() ) if TryVerifyState.HINT_TWO in state: hint_meaning = instance.get_meaning_hint() await evt.send( await UniMessage() .text(f"提示:这个甲骨文的含义是「{hint_meaning}」") .export() ) if TryVerifyState.NOT_ORACLE in state: return if TryVerifyState.VERIFIED in state: await evt.send( await UniMessage() .at(user_id) .text(" 答对了!获得 1 分!") .export() ) if TryVerifyState.GAME_END in state: await evt.send(await UniMessage().text("全部回合结束!").export()) await end_game(event, group_id) return await evt.send( await UniMessage() .image(raw=instance.get_oracle_image()) .export() ) evt = on_alconna( Alconna("禁止甲骨文"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id add_banned_id(group_id) await evt.send(await UniMessage().text("本群已被禁止使用甲骨文功能!").export()) evt = on_alconna( Alconna("开启甲骨文"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True, ) @evt.handle() async def _(event: BaseEvent, target: DepLongTaskTarget): # group_id = str(event.get_session_id()) group_id = target.channel_id remove_banned_id(group_id) await evt.send(await UniMessage().text("本群已开启甲骨文功能!").export())