新成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing

This commit is contained in:
2025-10-19 01:25:34 +08:00
parent 9f3f79f51d
commit 8f560ce1ba

View File

@ -1,8 +1,13 @@
import asyncio as asynkio
import base64
from pathlib import Path
import secrets
import json
from typing import Literal
import datetime
from typing import Literal, Optional
from enum import Enum
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
@ -12,120 +17,302 @@ from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
from konabot.common.path import ASSETS_PATH
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
IDIOM_FIRST_CHAR = {} # 成语首字字典
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
INITED = False
def init_lexicon():
global ALL_WORDS, ALL_IDIOMS, IDIOM_FIRST_CHAR
# 成语大表
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
ALL_IDIOMS_INFOS = json.load(f)
# 词语大表
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
ALL_WORDS = json.load(f)
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
return []
try:
return json.loads(DATA_FILE_PATH.read_text())
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
COMMON_WORDS = []
# 读取 COMMON 词语大表
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
COMMON_WORDS.append(word)
def is_idiom_game_banned(group_id: str) -> bool:
banned_ids = load_banned_ids()
return group_id in banned_ids
# 读取 THUOCL 成语库
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
class TryStartState(Enum):
STARTED = 0
ALREADY_PLAYING = 1
NO_REMAINING_TIMES = 2
class TryStopState(Enum):
STOPPED = 0
NOT_PLAYING = 1
class TryVerifyState(Enum):
VERIFIED = 0
NOT_IDIOM = 1
WRONG_FIRST_CHAR = 2
VERIFIED_BUT_NO_NEXT = 3
VERIFIED_GAME_END = 4
class IdiomGame:
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST : dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 成语首字字典
__inited = False
def __init__(self, group_id: str):
# 初始化一局游戏
self.group_id = ""
self.now_playing = False
self.score_board = {}
self.last_idiom = ""
self.last_char = ""
self.remain_playing_times = 3
self.last_play_date = ""
self.all_buff_score = 0
self.lock = asynkio.Lock()
self.remain_rounds = 0 # 剩余回合数
IdiomGame.INSTANCE_LIST[group_id] = self
def be_able_to_play(self) -> bool:
if(self.last_play_date != datetime.date.today()):
self.last_play_date = datetime.date.today()
self.remain_playing_times = 1
if(self.remain_playing_times > 0):
self.remain_playing_times -= 1
return True
return False
def choose_start_idiom(self) -> str:
'''
随机选择一个成语作为起始成语
'''
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self.choose_start_idiom()
return self.last_idiom
@classmethod
def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState:
cls.init_lexicon()
if not cls.INSTANCE_LIST.get(group_id):
cls(group_id)
instance = cls.INSTANCE_LIST[group_id]
if instance.now_playing:
return TryStartState.ALREADY_PLAYING
if not instance.be_able_to_play() and not force:
return TryStartState.NO_REMAINING_TIMES
instance.now_playing = True
return TryStartState.STARTED
def start_game(self, rounds: int = 100):
self.now_playing = True
self.remain_rounds = rounds
self.choose_start_idiom()
@classmethod
def try_stop_game(cls, group_id: str) -> TryStopState:
if not cls.INSTANCE_LIST.get(group_id):
return TryStopState.NOT_PLAYING
instance = cls.INSTANCE_LIST[group_id]
if not instance.now_playing:
return TryStopState.NOT_PLAYING
instance.now_playing = False
return TryStopState.STOPPED
def clear_score_board(self):
self.score_board = {}
self.last_char = ""
def get_score_board(self) -> dict:
return self.score_board
def get_all_buff_score(self) -> int:
return self.all_buff_score
async def skip_idiom(self, buff_score: int = -100) -> str:
'''
跳过当前成语,选择下一个成语
'''
await self.lock.acquire()
self._skip_idiom_async(buff_score)
self.lock.release()
return self.last_idiom
def _skip_idiom_async(self, buff_score: int = -100) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
self.add_buff_score(buff_score)
return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
'''
用户发送成语
'''
await self.lock.acquire()
state = self._verify_idiom(idiom, user_id)
self.lock.release()
return state
def is_nextable(self, last_char: str) -> bool:
'''
判断是否有成语可以接
'''
return last_char in IdiomGame.IDIOM_FIRST_CHAR
# 只有成语的大表
ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
# 新成语的首字应与上一条成语的尾字相同
if idiom[0] != self.last_char:
return TryVerifyState.WRONG_FIRST_CHAR
if(idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS):
self.add_score(user_id, -0.1)
return TryVerifyState.NOT_IDIOM
self.last_idiom = idiom
self.last_char = idiom[-1]
self.add_score(user_id, 1)
self.remain_rounds -= 1
if(self.remain_rounds <= 0):
self.now_playing = False
return TryVerifyState.VERIFIED_GAME_END
if(not self.is_nextable(self.last_char)):
# 没有成语可以接了,自动跳过
self._skip_idiom_async()
return TryVerifyState.VERIFIED_BUT_NO_NEXT
return TryVerifyState.VERIFIED
def get_user_score(self, user_id: str) -> int:
if user_id not in self.score_board:
return 0
return self.score_board[user_id]["score"]
def add_score(self, user_id: str, score: int):
if user_id not in self.score_board:
self.score_board[user_id] = {
"name": user_id,
"score": 0
}
self.score_board[user_id]["score"] += score
def add_buff_score(self, score: int):
self.all_buff_score += score
# 其他四字词语表,仅表示可以有这个词
ALL_WORDS = [word for word in ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
ALL_WORDS = list(set(ALL_WORDS)) # 去重
def get_playing_state(self) -> bool:
return self.now_playing
def get_last_char(self) -> str:
return self.last_char
# 根据成语大表,划分出成语首字字典
IDIOM_FIRST_CHAR = {}
for idiom in ALL_IDIOMS + ALL_WORDS:
if idiom[0] not in IDIOM_FIRST_CHAR:
IDIOM_FIRST_CHAR[idiom[0]] = []
IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
@classmethod
def init_lexicon(cls):
if(cls.__inited):
return
cls.__inited = True
NOW_PLAYING = False
# 成语大表
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
ALL_IDIOMS_INFOS = json.load(f)
SCORE_BOARD = {}
# 词语大表
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
cls.ALL_WORDS = json.load(f)
LAST_CHAR = ""
COMMON_WORDS = []
# 读取 COMMON 词语大表
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
COMMON_WORDS.append(word)
USER_NAME_CACHE = {} # 缓存用户名称,避免多次获取
# 读取 THUOCL 成语库
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
REMAIN_PLAYING_TIMES = 1
LAST_PLAY_DATE = ""
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
LOCK = False
ALL_BUFF_SCORE = 0 # 全体分数
# 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
import datetime
# 其他四字词语表,仅表示可以有这个词
cls.ALL_WORDS = [word for word in cls.ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
def be_able_to_play():
global REMAIN_PLAYING_TIMES, LAST_PLAY_DATE
if(LAST_PLAY_DATE != datetime.date.today()):
LAST_PLAY_DATE = datetime.date.today()
REMAIN_PLAYING_TIMES = 1
if(REMAIN_PLAYING_TIMES > 0):
REMAIN_PLAYING_TIMES -= 1
return True
return False
# 根据成语大表,划分出成语首字字典
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
if idiom[0] not in cls.IDIOM_FIRST_CHAR:
cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna(Alconna(
"我要玩成语接龙"
"我要玩成语接龙",
Args['rounds?', int],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def play_game(event: BaseEvent, force = False):
global NOW_PLAYING, LAST_CHAR, INITED
if NOW_PLAYING:
async def play_game(event: BaseEvent, force = False, rounds: Optional[int] = 100):
group_id = str(event.get_session_id())
if is_idiom_game_banned(group_id):
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
return
if rounds <= 0:
await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export())
return
state = IdiomGame.try_start_game(group_id, force)
if state == TryStartState.ALREADY_PLAYING:
await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export())
return
if not be_able_to_play() and not force:
if state == TryStartState.NO_REMAINING_TIMES:
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
return
if not INITED:
init_lexicon()
INITED = True
NOW_PLAYING = True
await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export())
# 选择一个随机成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
instance = IdiomGame.INSTANCE_LIST[group_id]
instance.start_game(rounds)
# 发布成语
await evt.send(await UniMessage().text(f"第一个成语:「{idiom}」,请接!").export())
await evt.send(await UniMessage().text(f"第一个成语:「{instance.last_idiom}」,请接!").export())
evt = on_alconna(Alconna(
"老子就是要玩成语接龙!!!"
"老子就是要玩成语接龙!!!",
Args['rounds?', int],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def force_play_game(event: BaseEvent):
await play_game(event, force=True)
async def force_play_game(event: BaseEvent, rounds: Optional[int] = 100):
await play_game(event, force=True, rounds=rounds)
async def end_game(event: BaseEvent, group_id: str):
instance = IdiomGame.INSTANCE_LIST[group_id]
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
score_board = instance.get_score_board()
if len(score_board) == 0:
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(score_board.items(), key=lambda x: x[1]["score"], reverse=True)
for i, (user_id, info) in enumerate(sorted_score):
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + instance.get_all_buff_score()}\n"
await evt.send(await result_text.export())
instance.clear_score_board()
evt = on_alconna(Alconna(
"不玩了"
@ -133,26 +320,15 @@ evt = on_alconna(Alconna(
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, SCORE_BOARD, LAST_CHAR, ALL_BUFF_SCORE
if NOW_PLAYING:
NOW_PLAYING = False
group_id = str(event.get_session_id())
state = IdiomGame.try_stop_game(group_id)
if state == TryStopState.STOPPED:
# 发送好吧狗图片
# 打开好吧狗本地文件
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
img_data = f.read()
await evt.send(await UniMessage().image(raw=img_data).export())
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
if len(SCORE_BOARD) == 0:
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(SCORE_BOARD.items(), key=lambda x: x[1]["score"], reverse=True)
for i, (user_id, info) in enumerate(sorted_score):
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + ALL_BUFF_SCORE}\n"
await evt.send(await result_text.export())
# 重置分数板
SCORE_BOARD = {}
LAST_CHAR = ""
await end_game(event, group_id)
else:
await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export())
@ -163,67 +339,66 @@ evt = on_alconna(Alconna(
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, LAST_CHAR, ALL_BUFF_SCORE
if not NOW_PLAYING:
group_id = str(event.get_session_id())
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
await evt.send(await UniMessage().text("你们太菜了全部扣100分").export())
ALL_BUFF_SCORE -= 100
# 选择下一个成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
idiom = await instance.skip_idiom(-100)
await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export())
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg):
global NOW_PLAYING, LAST_CHAR, LOCK
if not NOW_PLAYING:
return
user_idiom = msg.extract_plain_text().strip()
if(user_idiom[0] != LAST_CHAR):
return
if LOCK:
return
LOCK = True
await handle_send_info(event, msg)
LOCK = False
async def handle_send_info(event: BaseEvent, msg: UniMsg):
global NOW_PLAYING, LAST_CHAR, SCORE_BOARD, ALL_BUFF_SCORE
user_idiom = msg.extract_plain_text().strip()
if(user_idiom not in ALL_IDIOMS and user_idiom not in ALL_WORDS):
# 扣0.1分
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
if user_id not in SCORE_BOARD:
SCORE_BOARD[user_id] = {
"name": user_name,
"score": 0
}
SCORE_BOARD[user_id]["score"] -= 0.1
await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export())
return
# 成功接上
def get_user_info(event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
return user_id, user_name
if user_id not in SCORE_BOARD:
SCORE_BOARD[user_id] = {
"name": user_name,
"score": 0
}
SCORE_BOARD[user_id]["score"] += 1
# at 指定玩家
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {SCORE_BOARD[user_id]['score'] + ALL_BUFF_SCORE} 分!").export())
LAST_CHAR = user_idiom[-1]
await evt.send(await UniMessage().text(f"下一个成语请以「{LAST_CHAR}」开头!").export())
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg):
group_id = str(event.get_session_id())
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
user_idiom = msg.extract_plain_text().strip()
user_id , user_name = get_user_info(event)
state = await instance.try_verify_idiom(user_idiom, user_id)
if(state == TryVerifyState.WRONG_FIRST_CHAR):
return
if(state == TryVerifyState.NOT_IDIOM):
await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export())
return
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {instance.get_user_score(user_id)} 分!").export())
if(state == TryVerifyState.VERIFIED_GAME_END):
await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id)
return
if(state == TryVerifyState.VERIFIED_BUT_NO_NEXT):
await evt.send(await UniMessage().text("但是,这是条死路!你们全部都要扣 100 分!").export())
await evt.send(await UniMessage().text(f"重新抽取成语「{instance.last_idiom}").export())
await evt.send(await UniMessage().text(f"下一个成语请以「{instance.get_last_char()}」开头!").export())
evt = on_alconna(Alconna(
"禁止成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
add_banned_id(group_id)
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
evt = on_alconna(Alconna(
"开启成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
remove_banned_id(group_id)
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())