diff --git a/.gitmodules b/.gitmodules index 64cb43c..251f4d9 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "assets/lexicon/THUOCL"] path = assets/lexicon/THUOCL url = https://github.com/thunlp/THUOCL.git +[submodule "assets/oracle"] + path = assets/oracle + url = https://gitea.service.jazzwhom.top/mttu-developers/oracle-source.git diff --git a/assets/oracle b/assets/oracle new file mode 160000 index 0000000..29eea55 --- /dev/null +++ b/assets/oracle @@ -0,0 +1 @@ +Subproject commit 29eea55632f7a8a51ac0c758798dfbd1a23676f3 diff --git a/konabot/plugins/hanzi/__init__.py b/konabot/plugins/hanzi/__init__.py index e165327..a50f34b 100644 --- a/konabot/plugins/hanzi/__init__.py +++ b/konabot/plugins/hanzi/__init__.py @@ -2,7 +2,6 @@ import random from typing import Optional import opencc -from nonebot import on_message from nonebot.adapters import Event as BaseEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot_plugin_alconna import ( @@ -214,4 +213,19 @@ async def _(msg: UniMsg, event: BaseEvent, source: Optional[str] = None): final_text = random_string(to_convert) converted_prefix = convert(random_string("转换结果"), "s", "s") - await evt.send(await UniMessage().text(f"{converted_prefix}:{final_text}").export()) \ No newline at end of file + await evt.send(await UniMessage().text(f"{converted_prefix}:{final_text}").export()) + +def get_char(char: str, abbr: str) -> str: + output = "" + for src_abbr in ["s","hk","jp","tw","t"]: + if src_abbr != abbr: + output += convert(char, src_abbr, abbr) + return output + +def get_all_variants(char: str) -> str: + output = "" + for abbr in ["s","hk","jp","tw","t"]: + for src_abbr in ["s","hk","jp","tw","t"]: + if src_abbr != abbr: + output += convert(char, src_abbr, abbr) + return output \ No newline at end of file diff --git a/konabot/plugins/oracle_game/__init__.py b/konabot/plugins/oracle_game/__init__.py new file mode 100644 index 0000000..3101ed7 --- /dev/null +++ b/konabot/plugins/oracle_game/__init__.py @@ -0,0 +1,487 @@ +import asyncio as asynkio +import datetime +import json +import secrets +import csv +import zipfile +from enum import Enum +from pathlib import Path +from typing import Optional + +from loguru import logger +from nonebot import on_message +import nonebot +from nonebot.adapters import Event as BaseEvent +from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent +from nonebot_plugin_alconna import ( + Alconna, + Args, + UniMessage, + UniMsg, + on_alconna, +) + +from konabot.common.database import DatabaseManager +from konabot.common.longtask import DepLongTaskTarget +from konabot.common.path import ASSETS_PATH + +from konabot.plugins.hanzi import get_char + +ROOT_PATH = Path(__file__).resolve().parent + +DATA_DIR = Path(__file__).parent.parent.parent.parent / "data" + +DATA_FILE_PATH = ( + DATA_DIR / "oracle_banned.json" +) + +# 创建全局数据库管理器实例 +db_manager = DatabaseManager() + +def load_banned_ids() -> list[str]: + if not DATA_FILE_PATH.exists(): + return [] + try: + return json.loads(DATA_FILE_PATH.read_text("utf-8")) + except Exception as e: + logger.warning(f"在解析甲骨文封禁文件时遇到问题:{e}") + return [] + + +def is_oracle_game_banned(group_id: str) -> bool: + banned_ids = load_banned_ids() + return group_id in banned_ids + + +def add_banned_id(group_id: str): + banned_ids = load_banned_ids() + if group_id not in banned_ids: + banned_ids.append(group_id) + DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8") + + +def remove_banned_id(group_id: str): + banned_ids = load_banned_ids() + if group_id in banned_ids: + banned_ids.remove(group_id) + DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8") + + +driver = nonebot.get_driver() + + +@driver.on_startup +async def register_startup_hook(): + """注册启动时需要执行的函数""" + await oracleGame.init_lexicon() + +@driver.on_shutdown +async def register_shutdown_hook(): + """注册关闭时需要执行的函数""" + # 关闭所有数据库连接 + await db_manager.close_all_connections() + + +class TryStartState(Enum): + STARTED = 0 + ALREADY_PLAYING = 1 + NO_REMAINING_TIMES = 2 + + +class TryStopState(Enum): + STOPPED = 0 + NOT_PLAYING = 1 + +class TryVerifyState(Enum): + VERIFIED = 0 + NOT_ORACLE = 1 + GAME_END = 2 + +class oracleGame: + ALL_ORACLES = {} + INSTANCE_LIST: dict[str, "oracleGame"] = {} # 群号对应的游戏实例 + __inited = False + + def __init__(self, group_id: str): + # 初始化一局游戏 + self.group_id = "" + self.now_playing = False + self.score_board = {} + self.remain_playing_times = 3 + self.last_play_date = "" + self.all_buff_score = 0 + self.lock = asynkio.Lock() + self.remain_rounds = 0 # 剩余回合数 + self.current_oracle_id = "" + oracleGame.INSTANCE_LIST[group_id] = self + + def be_able_to_play(self) -> bool: + if self.last_play_date != datetime.date.today(): + self.last_play_date = datetime.date.today() + self.remain_playing_times = 3 + if self.remain_playing_times > 0: + self.remain_playing_times -= 1 + return True + return False + + def get_oracle_image(self) -> bytes: + IMAGE_PATH = ASSETS_PATH / "oracle" / "image" + with open(IMAGE_PATH / self.current_oracle_id / self.current_oracle_id / f"{self.current_oracle_id}.png", "rb") as f: + img_data = f.read() + return img_data + + def get_oracle_name(self) -> str: + return self.ALL_ORACLES.get(self.current_oracle_id, "?")[0] + + @staticmethod + async def random_oracle() -> str: + return secrets.choice(list(oracleGame.ALL_ORACLES.keys())) + + async def choose_start_oracle(self) -> str: + """ + 随机选择一个甲骨文作为起始甲骨文 + """ + self.current_oracle_id = await oracleGame.random_oracle() + return self.current_oracle_id + + @classmethod + async def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState: + await cls.init_lexicon() + if not cls.INSTANCE_LIST.get(group_id): + cls(group_id) + instance = cls.INSTANCE_LIST[group_id] + if instance.now_playing: + return TryStartState.ALREADY_PLAYING + if not instance.be_able_to_play() and not force: + return TryStartState.NO_REMAINING_TIMES + instance.now_playing = True + return TryStartState.STARTED + + async def start_game(self, rounds: int = 100): + self.now_playing = True + self.remain_rounds = rounds + await self.choose_start_oracle() + + @classmethod + def try_stop_game(cls, group_id: str) -> TryStopState: + if not cls.INSTANCE_LIST.get(group_id): + return TryStopState.NOT_PLAYING + instance = cls.INSTANCE_LIST[group_id] + if not instance.now_playing: + return TryStopState.NOT_PLAYING + instance.now_playing = False + return TryStopState.STOPPED + + def clear_score_board(self): + self.score_board = {} + self.all_buff_score = 0 + + def get_score_board(self) -> dict: + return self.score_board + + def get_all_buff_score(self) -> int: + return self.all_buff_score + + async def skip_oracle(self, buff_score: int = -100) -> str: + """ + 跳过当前甲骨文,选择下一个甲骨文 + """ + async with self.lock: + await self._skip_oracle_async() + self.add_buff_score(buff_score) + return self.current_oracle_id + + async def _skip_oracle_async(self) -> str: + self.current_oracle_id = await oracleGame.random_oracle() + return self.current_oracle_id + + async def try_verify_oracle(self, oracle: str, user_id: str) -> list[TryVerifyState]: + """ + 用户发送甲骨文 + """ + async with self.lock: + state = await self._verify_oracle(oracle, user_id) + return state + + async def _verify_oracle(self, oracle: str, user_id: str) -> list[TryVerifyState]: + state = [] + if oracle not in self.ALL_ORACLES[self.current_oracle_id]: + return [TryVerifyState.NOT_ORACLE] + # 甲骨文合法,更新状态 + self.add_score(user_id, 1) # 加 1 分 + self.remain_rounds -= 1 + if self.remain_rounds <= 0: + self.now_playing = False + state.append(TryVerifyState.GAME_END) + return state + + def get_user_score(self, user_id: str) -> float: + if user_id not in self.score_board: + return 0 + # 避免浮点数精度问题导致过长 + handled_score = round(self.score_board[user_id]["score"] + self.all_buff_score, 1) + return handled_score + + def add_score(self, user_id: str, score: int): + if user_id not in self.score_board: + self.score_board[user_id] = {"name": user_id, "score": 0} + self.score_board[user_id]["score"] += score + + def add_buff_score(self, score: int): + self.all_buff_score += score + + def get_playing_state(self) -> bool: + return self.now_playing + + @classmethod + async def init_lexicon(cls): + if cls.__inited: + return + cls.__inited = True + + # 加载甲骨文 + ORACLE_DATA_PATH = ASSETS_PATH / "oracle" + + with open(ORACLE_DATA_PATH / "zi_dict.csv", "r", encoding="utf-8-sig") as f: + reader = csv.DictReader(f) + # 以“子字头”为key,释文为value,构建字典 + for row in reader: + char = row["子字头"].strip() + oracle = row["释文"].strip() + cls.ALL_ORACLES[char] = oracle + + logger.info(f"加载甲骨文字典,共计 {len(cls.ALL_ORACLES)} 条记录") + + # 解包图片资源 + IMAGE_PATH = ASSETS_PATH / "oracle" / "image" + + if not IMAGE_PATH.exists(): + IMAGE_PATH.mkdir(parents=True, exist_ok=True) + # 将 image.zip 解压到 IMAGE_PATH + if (ASSETS_PATH / "oracle" / "image.zip").exists(): + with zipfile.ZipFile(ASSETS_PATH / "oracle" / "image.zip", "r") as zip_ref: + zip_ref.extractall(IMAGE_PATH) + + + +evt = on_alconna( + Alconna( + "我要玩甲骨文", + Args["rounds?", int], + ), + use_cmd_start=True, + use_cmd_sep=False, + skip_for_unmatch=True, +) + + +@evt.handle() +async def play_game( + event: BaseEvent, + target: DepLongTaskTarget, + force=False, + rounds: Optional[int] = 100, +): + # group_id = str(event.get_session_id()) + group_id = target.channel_id + if is_oracle_game_banned(group_id): + await evt.send( + await UniMessage().text("本群已被禁止使用甲骨文功能!").export() + ) + return + rounds = rounds or 0 + if rounds <= 0: + await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export()) + return + state = await oracleGame.try_start_game(group_id, force) + if state == TryStartState.ALREADY_PLAYING: + await evt.send( + await UniMessage() + .text("当前已有甲骨文游戏在进行中,请稍后再试!") + .export() + ) + return + if state == TryStartState.NO_REMAINING_TIMES: + await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export()) + return + await evt.send( + await UniMessage() + .text( + "你小子,还真有意思!\n好,甲骨文游戏开始!我发一个甲骨文,尼赖硕!" + ) + .export() + ) + instance = oracleGame.INSTANCE_LIST[group_id] + await instance.start_game(rounds) + # 发布甲骨文 + await evt.send( + await UniMessage() + .image(raw=instance.get_oracle_image()) + .export() + ) + + +evt = on_alconna( + Alconna( + "老子就是要玩甲骨文!!!", + Args["rounds?", int], + ), + use_cmd_start=True, + use_cmd_sep=False, + skip_for_unmatch=True, +) + + +@evt.handle() +async def force_play_game( + event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100 +): + await play_game(event, target, force=True, rounds=rounds) + + +async def end_game(event: BaseEvent, group_id: str): + instance = oracleGame.INSTANCE_LIST[group_id] + result_text = UniMessage().text("游戏结束!\n最终得分榜:\n") + score_board = instance.get_score_board() + if len(score_board) == 0: + result_text += "无人得分!\n" + else: + # 按分数排序,名字用 at 的方式 + sorted_score = sorted( + score_board.items(), key=lambda x: x[1]["score"], reverse=True + ) + for i, (user_id, info) in enumerate(sorted_score): + result_text += ( + f"{i + 1}. " + + UniMessage().at(user_id) + + f": {round(info['score'] + instance.get_all_buff_score(), 1)} 分\n" + ) + await evt.send(await result_text.export()) + # instance.clear_score_board() + # 将实例删除 + del oracleGame.INSTANCE_LIST[group_id] + + +evt = on_alconna( + Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True +) + + +@evt.handle() +async def _(event: BaseEvent, target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id + state = oracleGame.try_stop_game(group_id) + if state == TryStopState.STOPPED: + # 发送好吧狗图片 + # 打开好吧狗本地文件 + with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f: + img_data = f.read() + await evt.send(await UniMessage().image(raw=img_data).export()) + await end_game(event, group_id) + else: + await evt.send( + await UniMessage().text("当前没有甲骨文游戏在进行中!").export() + ) + + +# 跳过 +evt = on_alconna( + Alconna("跳过甲骨文"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True +) + + +@evt.handle() +async def _(target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id + instance = oracleGame.INSTANCE_LIST.get(group_id) + if not instance or not instance.get_playing_state(): + return + # 发送哈哈狗图片 + with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f: + img_data = f.read() + oracle = instance.get_oracle_name() + await evt.send(await UniMessage().image(raw=img_data).export()) + await evt.send(await UniMessage().text(f"你们太菜了,全部扣100分!这个甲骨文是「{oracle}」!").export()) + oracle = await instance.skip_oracle(-100) + await evt.send( + await UniMessage() + .image(raw=instance.get_oracle_image()) + .export() + ) + + +def get_user_info(event: BaseEvent): + if isinstance(event, DiscordMessageEvent): + user_id = str(event.author.id) + user_name = str(event.author.name) + else: + user_id = str(event.get_user_id()) + user_name = str(event.get_user_id()) + return user_id, user_name + + +# 直接读取消息 +evt = on_message() + + +@evt.handle() +async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id + instance = oracleGame.INSTANCE_LIST.get(group_id) + if not instance or not instance.get_playing_state(): + return + user_oracle = msg.extract_plain_text().strip() + user_id, user_name = get_user_info(event) + state = await instance.try_verify_oracle(user_oracle, user_id) + if TryVerifyState.NOT_ORACLE in state: + return + if TryVerifyState.VERIFIED: + await evt.send( + await UniMessage() + .text(f"{user_name} 答对了!获得 1 分!") + .export() + ) + if TryVerifyState.GAME_END in state: + await evt.send(await UniMessage().text("全部回合结束!").export()) + await end_game(event, group_id) + return + await evt.send( + await UniMessage() + .image(raw=instance.get_oracle_image()) + .export() + ) + + +evt = on_alconna( + Alconna("禁止甲骨文"), + use_cmd_start=True, + use_cmd_sep=False, + skip_for_unmatch=True, +) + + +@evt.handle() +async def _(event: BaseEvent, target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id + add_banned_id(group_id) + await evt.send(await UniMessage().text("本群已被禁止使用甲骨文功能!").export()) + + +evt = on_alconna( + Alconna("开启甲骨文"), + use_cmd_start=True, + use_cmd_sep=False, + skip_for_unmatch=True, +) + + +@evt.handle() +async def _(event: BaseEvent, target: DepLongTaskTarget): + # group_id = str(event.get_session_id()) + group_id = target.channel_id + remove_banned_id(group_id) + await evt.send(await UniMessage().text("本群已开启甲骨文功能!").export())