甲骨文

This commit is contained in:
2025-12-16 15:40:18 +08:00
parent ef617e1c85
commit 7026337a43
4 changed files with 507 additions and 2 deletions

3
.gitmodules vendored
View File

@ -1,3 +1,6 @@
[submodule "assets/lexicon/THUOCL"]
path = assets/lexicon/THUOCL
url = https://github.com/thunlp/THUOCL.git
[submodule "assets/oracle"]
path = assets/oracle
url = https://gitea.service.jazzwhom.top/mttu-developers/oracle-source.git

1
assets/oracle Submodule

Submodule assets/oracle added at 29eea55632

View File

@ -2,7 +2,6 @@ import random
from typing import Optional
import opencc
from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (
@ -215,3 +214,18 @@ async def _(msg: UniMsg, event: BaseEvent, source: Optional[str] = None):
converted_prefix = convert(random_string("转换结果"), "s", "s")
await evt.send(await UniMessage().text(f"{converted_prefix}{final_text}").export())
def get_char(char: str, abbr: str) -> str:
output = ""
for src_abbr in ["s","hk","jp","tw","t"]:
if src_abbr != abbr:
output += convert(char, src_abbr, abbr)
return output
def get_all_variants(char: str) -> str:
output = ""
for abbr in ["s","hk","jp","tw","t"]:
for src_abbr in ["s","hk","jp","tw","t"]:
if src_abbr != abbr:
output += convert(char, src_abbr, abbr)
return output

View File

@ -0,0 +1,487 @@
import asyncio as asynkio
import datetime
import json
import secrets
import csv
import zipfile
from enum import Enum
from pathlib import Path
from typing import Optional
from loguru import logger
from nonebot import on_message
import nonebot
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (
Alconna,
Args,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
from konabot.plugins.hanzi import get_char
ROOT_PATH = Path(__file__).resolve().parent
DATA_DIR = Path(__file__).parent.parent.parent.parent / "data"
DATA_FILE_PATH = (
DATA_DIR / "oracle_banned.json"
)
# 创建全局数据库管理器实例
db_manager = DatabaseManager()
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
return []
try:
return json.loads(DATA_FILE_PATH.read_text("utf-8"))
except Exception as e:
logger.warning(f"在解析甲骨文封禁文件时遇到问题:{e}")
return []
def is_oracle_game_banned(group_id: str) -> bool:
banned_ids = load_banned_ids()
return group_id in banned_ids
def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
"""注册启动时需要执行的函数"""
await oracleGame.init_lexicon()
@driver.on_shutdown
async def register_shutdown_hook():
"""注册关闭时需要执行的函数"""
# 关闭所有数据库连接
await db_manager.close_all_connections()
class TryStartState(Enum):
STARTED = 0
ALREADY_PLAYING = 1
NO_REMAINING_TIMES = 2
class TryStopState(Enum):
STOPPED = 0
NOT_PLAYING = 1
class TryVerifyState(Enum):
VERIFIED = 0
NOT_ORACLE = 1
GAME_END = 2
class oracleGame:
ALL_ORACLES = {}
INSTANCE_LIST: dict[str, "oracleGame"] = {} # 群号对应的游戏实例
__inited = False
def __init__(self, group_id: str):
# 初始化一局游戏
self.group_id = ""
self.now_playing = False
self.score_board = {}
self.remain_playing_times = 3
self.last_play_date = ""
self.all_buff_score = 0
self.lock = asynkio.Lock()
self.remain_rounds = 0 # 剩余回合数
self.current_oracle_id = ""
oracleGame.INSTANCE_LIST[group_id] = self
def be_able_to_play(self) -> bool:
if self.last_play_date != datetime.date.today():
self.last_play_date = datetime.date.today()
self.remain_playing_times = 3
if self.remain_playing_times > 0:
self.remain_playing_times -= 1
return True
return False
def get_oracle_image(self) -> bytes:
IMAGE_PATH = ASSETS_PATH / "oracle" / "image"
with open(IMAGE_PATH / self.current_oracle_id / self.current_oracle_id / f"{self.current_oracle_id}.png", "rb") as f:
img_data = f.read()
return img_data
def get_oracle_name(self) -> str:
return self.ALL_ORACLES.get(self.current_oracle_id, "?")[0]
@staticmethod
async def random_oracle() -> str:
return secrets.choice(list(oracleGame.ALL_ORACLES.keys()))
async def choose_start_oracle(self) -> str:
"""
随机选择一个甲骨文作为起始甲骨文
"""
self.current_oracle_id = await oracleGame.random_oracle()
return self.current_oracle_id
@classmethod
async def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState:
await cls.init_lexicon()
if not cls.INSTANCE_LIST.get(group_id):
cls(group_id)
instance = cls.INSTANCE_LIST[group_id]
if instance.now_playing:
return TryStartState.ALREADY_PLAYING
if not instance.be_able_to_play() and not force:
return TryStartState.NO_REMAINING_TIMES
instance.now_playing = True
return TryStartState.STARTED
async def start_game(self, rounds: int = 100):
self.now_playing = True
self.remain_rounds = rounds
await self.choose_start_oracle()
@classmethod
def try_stop_game(cls, group_id: str) -> TryStopState:
if not cls.INSTANCE_LIST.get(group_id):
return TryStopState.NOT_PLAYING
instance = cls.INSTANCE_LIST[group_id]
if not instance.now_playing:
return TryStopState.NOT_PLAYING
instance.now_playing = False
return TryStopState.STOPPED
def clear_score_board(self):
self.score_board = {}
self.all_buff_score = 0
def get_score_board(self) -> dict:
return self.score_board
def get_all_buff_score(self) -> int:
return self.all_buff_score
async def skip_oracle(self, buff_score: int = -100) -> str:
"""
跳过当前甲骨文,选择下一个甲骨文
"""
async with self.lock:
await self._skip_oracle_async()
self.add_buff_score(buff_score)
return self.current_oracle_id
async def _skip_oracle_async(self) -> str:
self.current_oracle_id = await oracleGame.random_oracle()
return self.current_oracle_id
async def try_verify_oracle(self, oracle: str, user_id: str) -> list[TryVerifyState]:
"""
用户发送甲骨文
"""
async with self.lock:
state = await self._verify_oracle(oracle, user_id)
return state
async def _verify_oracle(self, oracle: str, user_id: str) -> list[TryVerifyState]:
state = []
if oracle not in self.ALL_ORACLES[self.current_oracle_id]:
return [TryVerifyState.NOT_ORACLE]
# 甲骨文合法,更新状态
self.add_score(user_id, 1) # 加 1 分
self.remain_rounds -= 1
if self.remain_rounds <= 0:
self.now_playing = False
state.append(TryVerifyState.GAME_END)
return state
def get_user_score(self, user_id: str) -> float:
if user_id not in self.score_board:
return 0
# 避免浮点数精度问题导致过长
handled_score = round(self.score_board[user_id]["score"] + self.all_buff_score, 1)
return handled_score
def add_score(self, user_id: str, score: int):
if user_id not in self.score_board:
self.score_board[user_id] = {"name": user_id, "score": 0}
self.score_board[user_id]["score"] += score
def add_buff_score(self, score: int):
self.all_buff_score += score
def get_playing_state(self) -> bool:
return self.now_playing
@classmethod
async def init_lexicon(cls):
if cls.__inited:
return
cls.__inited = True
# 加载甲骨文
ORACLE_DATA_PATH = ASSETS_PATH / "oracle"
with open(ORACLE_DATA_PATH / "zi_dict.csv", "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
# 以“子字头”为key释文为value构建字典
for row in reader:
char = row["子字头"].strip()
oracle = row["释文"].strip()
cls.ALL_ORACLES[char] = oracle
logger.info(f"加载甲骨文字典,共计 {len(cls.ALL_ORACLES)} 条记录")
# 解包图片资源
IMAGE_PATH = ASSETS_PATH / "oracle" / "image"
if not IMAGE_PATH.exists():
IMAGE_PATH.mkdir(parents=True, exist_ok=True)
# 将 image.zip 解压到 IMAGE_PATH
if (ASSETS_PATH / "oracle" / "image.zip").exists():
with zipfile.ZipFile(ASSETS_PATH / "oracle" / "image.zip", "r") as zip_ref:
zip_ref.extractall(IMAGE_PATH)
evt = on_alconna(
Alconna(
"我要玩甲骨文",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def play_game(
event: BaseEvent,
target: DepLongTaskTarget,
force=False,
rounds: Optional[int] = 100,
):
# group_id = str(event.get_session_id())
group_id = target.channel_id
if is_oracle_game_banned(group_id):
await evt.send(
await UniMessage().text("本群已被禁止使用甲骨文功能!").export()
)
return
rounds = rounds or 0
if rounds <= 0:
await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export())
return
state = await oracleGame.try_start_game(group_id, force)
if state == TryStartState.ALREADY_PLAYING:
await evt.send(
await UniMessage()
.text("当前已有甲骨文游戏在进行中,请稍后再试!")
.export()
)
return
if state == TryStartState.NO_REMAINING_TIMES:
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
return
await evt.send(
await UniMessage()
.text(
"你小子,还真有意思!\n好,甲骨文游戏开始!我发一个甲骨文,尼赖硕!"
)
.export()
)
instance = oracleGame.INSTANCE_LIST[group_id]
await instance.start_game(rounds)
# 发布甲骨文
await evt.send(
await UniMessage()
.image(raw=instance.get_oracle_image())
.export()
)
evt = on_alconna(
Alconna(
"老子就是要玩甲骨文!!!",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def force_play_game(
event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100
):
await play_game(event, target, force=True, rounds=rounds)
async def end_game(event: BaseEvent, group_id: str):
instance = oracleGame.INSTANCE_LIST[group_id]
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
score_board = instance.get_score_board()
if len(score_board) == 0:
result_text += "无人得分!\n"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(
score_board.items(), key=lambda x: x[1]["score"], reverse=True
)
for i, (user_id, info) in enumerate(sorted_score):
result_text += (
f"{i + 1}. "
+ UniMessage().at(user_id)
+ f": {round(info['score'] + instance.get_all_buff_score(), 1)}\n"
)
await evt.send(await result_text.export())
# instance.clear_score_board()
# 将实例删除
del oracleGame.INSTANCE_LIST[group_id]
evt = on_alconna(
Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
state = oracleGame.try_stop_game(group_id)
if state == TryStopState.STOPPED:
# 发送好吧狗图片
# 打开好吧狗本地文件
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
img_data = f.read()
await evt.send(await UniMessage().image(raw=img_data).export())
await end_game(event, group_id)
else:
await evt.send(
await UniMessage().text("当前没有甲骨文游戏在进行中!").export()
)
# 跳过
evt = on_alconna(
Alconna("跳过甲骨文"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = oracleGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
# 发送哈哈狗图片
with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f:
img_data = f.read()
oracle = instance.get_oracle_name()
await evt.send(await UniMessage().image(raw=img_data).export())
await evt.send(await UniMessage().text(f"你们太菜了全部扣100分这个甲骨文是「{oracle}」!").export())
oracle = await instance.skip_oracle(-100)
await evt.send(
await UniMessage()
.image(raw=instance.get_oracle_image())
.export()
)
def get_user_info(event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
return user_id, user_name
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = oracleGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
user_oracle = msg.extract_plain_text().strip()
user_id, user_name = get_user_info(event)
state = await instance.try_verify_oracle(user_oracle, user_id)
if TryVerifyState.NOT_ORACLE in state:
return
if TryVerifyState.VERIFIED:
await evt.send(
await UniMessage()
.text(f"{user_name} 答对了!获得 1 分!")
.export()
)
if TryVerifyState.GAME_END in state:
await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id)
return
await evt.send(
await UniMessage()
.image(raw=instance.get_oracle_image())
.export()
)
evt = on_alconna(
Alconna("禁止甲骨文"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
add_banned_id(group_id)
await evt.send(await UniMessage().text("本群已被禁止使用甲骨文功能!").export())
evt = on_alconna(
Alconna("开启甲骨文"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
remove_banned_id(group_id)
await evt.send(await UniMessage().text("本群已开启甲骨文功能!").export())