@ -1,8 +1,13 @@
import asyncio as asynkio
import base64
from pathlib import Path
import secrets
import json
from typing import Literal
import datetime
from typing import Literal , Optional
from enum import Enum
from loguru import logger
from nonebot import on_message
from nonebot . adapters import Event as BaseEvent
from nonebot . adapters . console . event import MessageEvent as ConsoleMessageEvent
@ -12,120 +17,302 @@ from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
from konabot . common . path import ASSETS_PATH
ALL_WORDS = [ ] # 所有四字词语
ALL_IDIOMS = [ ] # 所有成语
IDIOM_FIRST_CHAR = { } # 成语首字字典
DATA_FILE_PATH = Path ( __file__ ) . parent . parent . parent . parent / " data " / " idiom_banned.json "
INITED = False
def init_lexicon ( ) :
global ALL_WORDS , ALL_IDIOMS , IDIOM_FIRST_CHAR
# 成语大表
with open ( ASSETS_PATH / " lexicon " / " idiom.json " , " r " , encoding = " utf-8 " ) as f :
ALL_IDIOMS_INFOS = json . load ( f )
# 词语大表
with open ( ASSETS_PATH / " lexicon " / " ci.json " , " r " , encoding = " utf-8 " ) as f :
ALL_WORDS = json . load ( f )
def load_banned_ids ( ) - > list [ str ] :
if not DATA_FILE_PATH . exists ( ) :
return [ ]
try :
return json . loads ( DATA_FILE_PATH . read_text ( ) )
except Exception as e :
logger . warning ( f " 在解析成语接龙封禁文件时遇到问题: { e } " )
return [ ]
COMMON_WORDS = [ ]
# 读取 COMMON 词语大表
with open ( ASSETS_PATH / " lexicon " / " common.txt " , " r " , encoding = " utf-8 " ) as f :
for line in f :
word = line . strip ( )
if len ( word ) == 4 :
COMMON_WORDS . append ( word )
def is_idiom_game_banned ( group_id : str ) - > bool :
banned_ids = load_banned_ids ( )
return group_id in banned_ids
# 读取 THUOCL 成语库
with open ( ASSETS_PATH / " lexicon " / " THUOCL " / " data " / " THUOCL_chengyu.txt " , " r " , encoding = " utf-8 " ) as f :
THUOCL_IDIOMS = [ line . split ( " " ) [ 0 ] . strip ( ) for line in f ]
def add_banned_id ( group_id : str ) :
banned_ids = load_banned_ids ( )
if group_id not in banned_ids :
banned_ids . append ( group_id )
DATA_FILE_PATH . write_text ( json . dumps ( banned_ids , ensure_ascii = False , indent = 4 ) )
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = [ ]
import os
for filename in os . listdir ( ASSETS_PATH / " lexicon " / " THUOCL " / " data " ) :
if filename . endswith ( " .txt " ) and filename != " THUOCL_chengyu.txt " :
with open ( ASSETS_PATH / " lexicon " / " THUOCL " / " data " / filename , " r " , encoding = " utf-8 " ) as f :
for line in f :
word = line . lstrip ( ) . split ( " " ) [ 0 ] . strip ( )
if len ( word ) == 4 :
THUOCL_WORDS . append ( word )
def remove_banned_id ( group_id : str ) :
banned_ids = load_banned_ids ( )
if group_id in banned_ids :
banned_ids . remove ( group_id )
DATA_FILE_PATH . write_text ( json . dumps ( banned_ids , ensure_ascii = False , indent = 4 ) )
class TryStartState ( Enum ) :
STARTED = 0
ALREADY_PLAYING = 1
NO_REMAINING_TIMES = 2
class TryStopState ( Enum ) :
STOPPED = 0
NOT_PLAYING = 1
class TryVerifyState ( Enum ) :
VERIFIED = 0
NOT_IDIOM = 1
WRONG_FIRST_CHAR = 2
VERIFIED_BUT_NO_NEXT = 3
VERIFIED_GAME_END = 4
class IdiomGame :
ALL_WORDS = [ ] # 所有四字词语
ALL_IDIOMS = [ ] # 所有成语
INSTANCE_LIST : dict [ str , " IdiomGame " ] = { } # 群号对应的游戏实例
IDIOM_FIRST_CHAR = { } # 成语首字字典
__inited = False
def __init__ ( self , group_id : str ) :
# 初始化一局游戏
self . group_id = " "
self . now_playing = False
self . score_board = { }
self . last_idiom = " "
self . last_char = " "
self . remain_playing_times = 3
self . last_play_date = " "
self . all_buff_score = 0
self . lock = asynkio . Lock ( )
self . remain_rounds = 0 # 剩余回合数
IdiomGame . INSTANCE_LIST [ group_id ] = self
def be_able_to_play ( self ) - > bool :
if ( self . last_play_date != datetime . date . today ( ) ) :
self . last_play_date = datetime . date . today ( )
self . remain_playing_times = 1
if ( self . remain_playing_times > 0 ) :
self . remain_playing_times - = 1
return True
return False
def choose_start_idiom ( self ) - > str :
'''
随机选择一个成语作为起始成语
'''
self . last_idiom = secrets . choice ( IdiomGame . ALL_IDIOMS )
self . last_char = self . last_idiom [ - 1 ]
if not self . is_nextable ( self . last_char ) :
self . choose_start_idiom ( )
return self . last_idiom
@classmethod
def try_start_game ( cls , group_id : str , force : bool = False ) - > TryStartState :
cls . init_lexicon ( )
if not cls . INSTANCE_LIST . get ( group_id ) :
cls ( group_id )
instance = cls . INSTANCE_LIST [ group_id ]
if instance . now_playing :
return TryStartState . ALREADY_PLAYING
if not instance . be_able_to_play ( ) and not force :
return TryStartState . NO_REMAINING_TIMES
instance . now_playing = True
return TryStartState . STARTED
def start_game ( self , rounds : int = 100 ) :
self . now_playing = True
self . remain_rounds = rounds
self . choose_start_idiom ( )
@classmethod
def try_stop_game ( cls , group_id : str ) - > TryStopState :
if not cls . INSTANCE_LIST . get ( group_id ) :
return TryStopState . NOT_PLAYING
instance = cls . INSTANCE_LIST [ group_id ]
if not instance . now_playing :
return TryStopState . NOT_PLAYING
instance . now_playing = False
return TryStopState . STOPPED
def clear_score_board ( self ) :
self . score_board = { }
self . last_char = " "
def get_score_board ( self ) - > dict :
return self . score_board
def get_all_buff_score ( self ) - > int :
return self . all_buff_score
async def skip_idiom ( self , buff_score : int = - 100 ) - > str :
'''
跳过当前成语,选择下一个成语
'''
await self . lock . acquire ( )
self . _skip_idiom_async ( buff_score )
self . lock . release ( )
return self . last_idiom
def _skip_idiom_async ( self , buff_score : int = - 100 ) - > str :
self . last_idiom = secrets . choice ( IdiomGame . ALL_IDIOMS )
self . last_char = self . last_idiom [ - 1 ]
self . add_buff_score ( buff_score )
return self . last_idiom
async def try_verify_idiom ( self , idiom : str , user_id : str ) - > TryVerifyState :
'''
用户发送成语
'''
await self . lock . acquire ( )
state = self . _verify_idiom ( idiom , user_id )
self . lock . release ( )
return state
def is_nextable ( self , last_char : str ) - > bool :
'''
判断是否有成语可以接
'''
return last_char in IdiomGame . IDIOM_FIRST_CHAR
# 只有成语的大表
ALL_IDIOMS = [ idiom [ " word " ] for idiom in ALL_IDIOMS_INFOS ] + THUOCL_IDIOMS
ALL_IDIOMS = list ( set ( ALL_IDIOMS ) ) # 去重
def _verify_idiom ( self , idiom : str , user_id : str ) - > TryVerifyState :
# 新成语的首字应与上一条成语的尾字相同
if idiom [ 0 ] ! = self . last_char :
return TryVerifyState . WRONG_FIRST_CHAR
if ( idiom not in IdiomGame . ALL_IDIOMS and idiom not in IdiomGame . ALL_WORDS ) :
self . add_score ( user_id , - 0.1 )
return TryVerifyState . NOT_IDIOM
self . last_idiom = idiom
self . last_char = idiom [ - 1 ]
self . add_score ( user_id , 1 )
self . remain_rounds - = 1
if ( self . remain_rounds < = 0 ) :
self . now_playing = False
return TryVerifyState . VERIFIED_GAME_END
if ( not self . is_nextable ( self . last_char ) ) :
# 没有成语可以接了,自动跳过
self . _skip_idiom_async ( )
return TryVerifyState . VERIFIED_BUT_NO_NEXT
return TryVerifyState . VERIFIED
def get_user_score ( self , user_id : str ) - > int :
if user_id not in self . score_board :
return 0
return self . score_board [ user_id ] [ " score " ]
def add_score ( self , user_id : str , score : int ) :
if user_id not in self . score_board :
self . score_board [ user_id ] = {
" name " : user_id ,
" score " : 0
}
self . score_board [ user_id ] [ " score " ] + = score
def add_buff_score ( self , score : int ) :
self . all_buff_score + = score
# 其他四字词语表,仅表示可以有这个词
ALL_WORDS = [ word for word in ALL_WORDS if len ( word ) == 4 ] + THUOCL_WORDS + COMMON_WORDS
ALL_WORDS = list ( set ( ALL_WORDS ) ) # 去重
def get_playing_state ( self ) - > bool :
return self . now_playing
def get_last_char ( self ) - > str :
return self . last_char
# 根据成语大表,划分出成语首字字典
IDIOM_FIRST_CHAR = { }
for idiom in ALL_IDIOMS + ALL_WORDS :
if idiom [ 0 ] not in IDIOM_FIRST_CHAR :
IDIOM_FIRST_CHAR [ idiom [ 0 ] ] = [ ]
IDIOM_FIRST_CHAR [ idiom [ 0 ] ] . append ( idiom )
@classmethod
def init_lexicon ( cls ) :
if ( cls . __inited ) :
return
cls . __inited = True
NOW_PLAYING = False
# 成语大表
with open ( ASSETS_PATH / " lexicon " / " idiom.json " , " r " , encoding = " utf-8 " ) as f :
ALL_IDIOMS_INFOS = json . load ( f )
SCORE_BOARD = { }
# 词语大表
with open ( ASSETS_PATH / " lexicon " / " ci.json " , " r " , encoding = " utf-8 " ) as f :
cls . ALL_WORDS = json . load ( f )
LAST_CHAR = " "
COMMON_WORDS = [ ]
# 读取 COMMON 词语大表
with open ( ASSETS_PATH / " lexicon " / " common.txt " , " r " , encoding = " utf-8 " ) as f :
for line in f :
word = line . strip ( )
if len ( word ) == 4 :
COMMON_WORDS . append ( word )
USER_NAME_CACHE = { } # 缓存用户名称,避免多次获取
# 读取 THUOCL 成语库
with open ( ASSETS_PATH / " lexicon " / " THUOCL " / " data " / " THUOCL_chengyu.txt " , " r " , encoding = " utf-8 " ) as f :
THUOCL_IDIOMS = [ line . split ( " " ) [ 0 ] . strip ( ) for line in f ]
REMAIN_PLAYING_TIMES = 1
LAST_PLAY_DATE = " "
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = [ ]
import os
for filename in os . listdir ( ASSETS_PATH / " lexicon " / " THUOCL " / " data " ) :
if filename . endswith ( " .txt " ) and filename != " THUOCL_chengyu.txt " :
with open ( ASSETS_PATH / " lexicon " / " THUOCL " / " data " / filename , " r " , encoding = " utf-8 " ) as f :
for line in f :
word = line . lstrip ( ) . split ( " " ) [ 0 ] . strip ( )
if len ( word ) == 4 :
THUOCL_WORDS . append ( word )
LOCK = False
ALL_BUFF_SCORE = 0 # 全体分数
# 只有成语的大表
cls . ALL_IDIOMS = [ idiom [ " word " ] for idiom in ALL_IDIOMS_INFOS ] + THUOCL_IDIOMS
cls . ALL_IDIOMS = list ( set ( cls . ALL_IDIOMS ) ) # 去重
import datetime
# 其他四字词语表,仅表示可以有这个词
cls . ALL_WORDS = [ word for word in cls . ALL_WORDS if len ( word ) == 4 ] + THUOCL_WORDS + COMMON_WORDS
cls . ALL_WORDS = list ( set ( cls . ALL_WORDS ) ) # 去重
def be_able_to_play ( ) :
global REMAIN_PLAYING_TIMES , LAST_PLAY_DATE
if ( LAST_PLAY_DATE != datetime . date . today ( ) ) :
LAST_PLAY_DATE = datetime . date . today ( )
REMAIN_PLAYING_TIMES = 1
if ( REMAIN_PLAYING_TIMES > 0 ) :
REMAIN_PLAYING_TIMES - = 1
return True
return False
# 根据成语大表,划分出成语首字字典
for idiom in cls . ALL_IDIOMS + cls . ALL_WORDS :
if idiom [ 0 ] not in cls . IDIOM_FIRST_CHAR :
cls . IDIOM_FIRST_CHAR [ idiom [ 0 ] ] = [ ]
cls . IDIOM_FIRST_CHAR [ idiom [ 0 ] ] . append ( idiom )
evt = on_alconna ( Alconna (
" 我要玩成语接龙 "
" 我要玩成语接龙 " ,
Args [ ' rounds? ' , int ] ,
) , use_cmd_start = True , use_cmd_sep = False , skip_for_unmatch = True )
@evt.handle ( )
async def play_game ( event : BaseEvent , force = False ) :
global NOW_PLAYING , LAST_CHAR , INITED
if NOW_PLAYING :
async def play_game ( event : BaseEvent , force = False , rounds : Optional [ int ] = 100 ):
group_id = str ( event . get_session_id ( ) )
if is_idiom_game_banned ( group_id ) :
await evt . send ( await UniMessage ( ) . text ( " 本群已被禁止使用成语接龙功能! " ) . export ( ) )
return
if rounds < = 0 :
await evt . send ( await UniMessage ( ) . text ( " 干什么!你想玩负数局吗? " ) . export ( ) )
return
state = IdiomGame . try_start_game ( group_id , force )
if state == TryStartState . ALREADY_PLAYING :
await evt . send ( await UniMessage ( ) . text ( " 当前已有成语接龙游戏在进行中,请稍后再试! " ) . export ( ) )
return
if not be_able_to_play ( ) and not force :
if state == TryStartState . NO_REMAINING_TIMES :
await evt . send ( await UniMessage ( ) . text ( " 玩玩玩,就知道玩,快去睡觉! " ) . export ( ) )
return
if not INITED :
init_lexicon ( )
INITED = True
NOW_PLAYING = True
await evt . send ( await UniMessage ( ) . text ( " 你小子,还真有意思! \n 好,成语接龙游戏开始!我说一个成语,请大家接下去! " ) . export ( ) )
# 选择一个随机成语
idiom = secrets . choice ( ALL_IDIOMS )
LAST_CHAR = idiom [ - 1 ]
instance = IdiomGame . INSTANCE_LIST [ group_id ]
instance . start_game ( rounds )
# 发布成语
await evt . send ( await UniMessage ( ) . text ( f " 第一个成语:「 { idiom } 」,请接! " ) . export ( ) )
await evt . send ( await UniMessage ( ) . text ( f " 第一个成语:「 { instance . last_ idiom} 」,请接! " ) . export ( ) )
evt = on_alconna ( Alconna (
" 老子就是要玩成语接龙!!! "
" 老子就是要玩成语接龙!!! " ,
Args [ ' rounds? ' , int ] ,
) , use_cmd_start = True , use_cmd_sep = False , skip_for_unmatch = True )
@evt.handle ( )
async def force_play_game ( event : BaseEvent ) :
await play_game ( event , force = True )
async def force_play_game ( event : BaseEvent , rounds : Optional [ int ] = 100 ):
await play_game ( event , force = True , rounds = rounds )
async def end_game ( event : BaseEvent , group_id : str ) :
instance = IdiomGame . INSTANCE_LIST [ group_id ]
result_text = UniMessage ( ) . text ( " 游戏结束! \n 最终得分榜: \n " )
score_board = instance . get_score_board ( )
if len ( score_board ) == 0 :
result_text + = " 无人得分! "
else :
# 按分数排序,名字用 at 的方式
sorted_score = sorted ( score_board . items ( ) , key = lambda x : x [ 1 ] [ " score " ] , reverse = True )
for i , ( user_id , info ) in enumerate ( sorted_score ) :
result_text + = f " { i + 1 } . " + UniMessage ( ) . at ( user_id ) + f " : { info [ ' score ' ] + instance . get_all_buff_score ( ) } 分 \n "
await evt . send ( await result_text . export ( ) )
instance . clear_score_board ( )
evt = on_alconna ( Alconna (
" 不玩了 "
@ -133,26 +320,15 @@ evt = on_alconna(Alconna(
@evt.handle ( )
async def _ ( event : BaseEvent ) :
global NOW_PLAYING , SCORE_BOARD , LAST_CHAR , ALL_BUFF_SCORE
if NOW_PLAYING :
NOW_PLAYING = False
group_id = str ( event . get_session_id ( ) )
state = IdiomGame . try_stop_game ( group_id )
if state == TryStopState . STOPPED :
# 发送好吧狗图片
# 打开好吧狗本地文件
with open ( ASSETS_PATH / " img " / " dog " / " haoba_dog.jpg " , " rb " ) as f :
img_data = f . read ( )
await evt . send ( await UniMessage ( ) . image ( raw = img_data ) . export ( ) )
result_text = UniMessage ( ) . text ( " 游戏结束! \n 最终得分榜: \n " )
if len ( SCORE_BOARD ) == 0 :
result_text + = " 无人得分! "
else :
# 按分数排序,名字用 at 的方式
sorted_score = sorted ( SCORE_BOARD . items ( ) , key = lambda x : x [ 1 ] [ " score " ] , reverse = True )
for i , ( user_id , info ) in enumerate ( sorted_score ) :
result_text + = f " { i + 1 } . " + UniMessage ( ) . at ( user_id ) + f " : { info [ ' score ' ] + ALL_BUFF_SCORE } 分 \n "
await evt . send ( await result_text . export ( ) )
# 重置分数板
SCORE_BOARD = { }
LAST_CHAR = " "
await end_game ( event , group_id )
else :
await evt . send ( await UniMessage ( ) . text ( " 当前没有成语接龙游戏在进行中! " ) . export ( ) )
@ -163,67 +339,66 @@ evt = on_alconna(Alconna(
@evt.handle ( )
async def _ ( event : BaseEvent ) :
global NOW_PLAYING , LAST_CHAR , ALL_BUFF_SCORE
if not NOW_PLAYING :
group_id = str ( event . get_session_id ( ) )
instance = IdiomGame . INSTANCE_LIST . get ( group_id )
if not instance or not instance . get_playing_state ( ) :
return
await evt . send ( await UniMessage ( ) . text ( " 你们太菜了! 全部扣100分! " ) . export ( ) )
ALL_BUFF_SCORE - = 100
# 选择下一个成语
idiom = secrets . choice ( ALL_IDIOMS )
LAST_CHAR = idiom [ - 1 ]
idiom = await instance . skip_idiom ( - 100 )
await evt . send ( await UniMessage ( ) . text ( f " 重新开始,下一个成语是「 { idiom } 」 " ) . export ( ) )
# 直接读取消息
evt = on_message ( )
@evt.handle ( )
async def _ ( event : BaseEvent , msg : UniMsg ) :
global NOW_PLAYING , LAST_CHAR , LOCK
if not NOW_PLAYING :
return
user_idiom = msg . extract_plain_text ( ) . strip ( )
if ( user_idiom [ 0 ] != LAST_CHAR ) :
return
if LOCK :
return
LOCK = True
await handle_send_info ( event , msg )
LOCK = False
async def handle_send_info ( event : BaseEvent , msg : UniMsg ) :
global NOW_PLAYING , LAST_CHAR , SCORE_BOARD , ALL_BUFF_SCORE
user_idiom = msg . extract_plain_text ( ) . strip ( )
if ( user_idiom not in ALL_IDIOMS and user_idiom not in ALL_WORDS ) :
# 扣0.1分
if isinstance ( event , DiscordMessageEvent ) :
user_id = str ( event . author . id )
user_name = str ( event . author . name )
else :
user_id = str ( event . get_user_id ( ) )
user_name = str ( event . get_user_id ( ) )
if user_id not in SCORE_BOARD :
SCORE_BOARD [ user_id ] = {
" name " : user_name ,
" score " : 0
}
SCORE_BOARD [ user_id ] [ " score " ] - = 0.1
await evt . send ( await UniMessage ( ) . at ( user_id ) . text ( " 接不上!这个不一样!你被扣了 0.1 分! " ) . export ( ) )
return
# 成功接上
def get_user_info ( event : BaseEvent ) :
if isinstance ( event , DiscordMessageEvent ) :
user_id = str ( event . author . id )
user_name = str ( event . author . name )
else :
user_id = str ( event . get_user_id ( ) )
user_name = str ( event . get_user_id ( ) )
return user_id , user_name
if user_id not in SCORE_BOARD :
SCORE_BOARD [ user_id ] = {
" name " : user_name ,
" score " : 0
}
SCORE_BOARD [ user_id ] [ " score " ] + = 1
# at 指定玩家
await evt . send ( await UniMessage ( ) . at ( user_id ) . text ( f " 接对了!你有 { SCORE_BOARD [ user_id ] [ ' score ' ] + ALL_BUFF_SCORE } 分! " ) . export ( ) )
LAST_CHAR = user_idiom [ - 1 ]
await evt . send ( await UniMessage ( ) . text ( f " 下一个成语请以「 { LAST_CHAR } 」开头! " ) . export ( ) )
# 直接读取消息
evt = on_message ( )
@evt.handle ( )
async def _ ( event : BaseEvent , msg : UniMsg ) :
group_id = str ( event . get_session_id ( ) )
instance = IdiomGame . INSTANCE_LIST . get ( group_id )
if not instance or not instance . get_playing_state ( ) :
return
user_idiom = msg . extract_plain_text ( ) . strip ( )
user_id , user_name = get_user_info ( event )
state = await instance . try_verify_idiom ( user_idiom , user_id )
if ( state == TryVerifyState . WRONG_FIRST_CHAR ) :
return
if ( state == TryVerifyState . NOT_IDIOM ) :
await evt . send ( await UniMessage ( ) . at ( user_id ) . text ( " 接不上!这个不一样!你被扣了 0.1 分! " ) . export ( ) )
return
await evt . send ( await UniMessage ( ) . at ( user_id ) . text ( f " 接对了!你有 { instance . get_user_score ( user_id ) } 分! " ) . export ( ) )
if ( state == TryVerifyState . VERIFIED_GAME_END ) :
await evt . send ( await UniMessage ( ) . text ( " 全部回合结束! " ) . export ( ) )
await end_game ( event , group_id )
return
if ( state == TryVerifyState . VERIFIED_BUT_NO_NEXT ) :
await evt . send ( await UniMessage ( ) . text ( " 但是,这是条死路!你们全部都要扣 100 分! " ) . export ( ) )
await evt . send ( await UniMessage ( ) . text ( f " 重新抽取成语「 { instance . last_idiom } 」 " ) . export ( ) )
await evt . send ( await UniMessage ( ) . text ( f " 下一个成语请以「 { instance . get_last_char ( ) } 」开头! " ) . export ( ) )
evt = on_alconna ( Alconna (
" 禁止成语接龙 "
) , use_cmd_start = True , use_cmd_sep = False , skip_for_unmatch = True )
@evt.handle ( )
async def _ ( event : BaseEvent ) :
group_id = str ( event . get_session_id ( ) )
add_banned_id ( group_id )
await evt . send ( await UniMessage ( ) . text ( " 本群已被禁止使用成语接龙功能! " ) . export ( ) )
evt = on_alconna ( Alconna (
" 开启成语接龙 "
) , use_cmd_start = True , use_cmd_sep = False , skip_for_unmatch = True )
@evt.handle ( )
async def _ ( event : BaseEvent ) :
group_id = str ( event . get_session_id ( ) )
remove_banned_id ( group_id )
await evt . send ( await UniMessage ( ) . text ( " 本群已开启成语接龙功能! " ) . export ( ) )