229 lines
8.0 KiB
Python
229 lines
8.0 KiB
Python
import base64
|
||
import secrets
|
||
import json
|
||
from typing import Literal
|
||
|
||
from nonebot import on_message
|
||
from nonebot.adapters import Event as BaseEvent
|
||
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
||
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||
from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
|
||
UniMessage, UniMsg, on_alconna)
|
||
|
||
from konabot.common.path import ASSETS_PATH
|
||
|
||
ALL_WORDS = [] # 所有四字词语
|
||
ALL_IDIOMS = [] # 所有成语
|
||
IDIOM_FIRST_CHAR = {} # 成语首字字典
|
||
|
||
INITED = False
|
||
|
||
def init_lexicon():
|
||
global ALL_WORDS, ALL_IDIOMS, IDIOM_FIRST_CHAR
|
||
# 成语大表
|
||
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
|
||
ALL_IDIOMS_INFOS = json.load(f)
|
||
|
||
# 词语大表
|
||
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
|
||
ALL_WORDS = json.load(f)
|
||
|
||
COMMON_WORDS = []
|
||
# 读取 COMMON 词语大表
|
||
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
word = line.strip()
|
||
if len(word) == 4:
|
||
COMMON_WORDS.append(word)
|
||
|
||
# 读取 THUOCL 成语库
|
||
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
|
||
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
|
||
|
||
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
|
||
THUOCL_WORDS = []
|
||
import os
|
||
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
|
||
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
|
||
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
word = line.lstrip().split(" ")[0].strip()
|
||
if len(word) == 4:
|
||
THUOCL_WORDS.append(word)
|
||
|
||
|
||
# 只有成语的大表
|
||
ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
|
||
ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
|
||
|
||
# 其他四字词语表,仅表示可以有这个词
|
||
ALL_WORDS = [word for word in ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
|
||
ALL_WORDS = list(set(ALL_WORDS)) # 去重
|
||
|
||
# 根据成语大表,划分出成语首字字典
|
||
IDIOM_FIRST_CHAR = {}
|
||
for idiom in ALL_IDIOMS + ALL_WORDS:
|
||
if idiom[0] not in IDIOM_FIRST_CHAR:
|
||
IDIOM_FIRST_CHAR[idiom[0]] = []
|
||
IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
|
||
|
||
NOW_PLAYING = False
|
||
|
||
SCORE_BOARD = {}
|
||
|
||
LAST_CHAR = ""
|
||
|
||
USER_NAME_CACHE = {} # 缓存用户名称,避免多次获取
|
||
|
||
REMAIN_PLAYING_TIMES = 1
|
||
LAST_PLAY_DATE = ""
|
||
|
||
LOCK = False
|
||
|
||
ALL_BUFF_SCORE = 0 # 全体分数
|
||
|
||
import datetime
|
||
|
||
def be_able_to_play():
|
||
global REMAIN_PLAYING_TIMES, LAST_PLAY_DATE
|
||
if(LAST_PLAY_DATE != datetime.date.today()):
|
||
LAST_PLAY_DATE = datetime.date.today()
|
||
REMAIN_PLAYING_TIMES = 1
|
||
if(REMAIN_PLAYING_TIMES > 0):
|
||
REMAIN_PLAYING_TIMES -= 1
|
||
return True
|
||
return False
|
||
|
||
evt = on_alconna(Alconna(
|
||
"我要玩成语接龙"
|
||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||
|
||
@evt.handle()
|
||
async def play_game(event: BaseEvent, force = False):
|
||
global NOW_PLAYING, LAST_CHAR, INITED
|
||
if NOW_PLAYING:
|
||
await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export())
|
||
return
|
||
if not be_able_to_play() and not force:
|
||
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
|
||
return
|
||
if not INITED:
|
||
init_lexicon()
|
||
INITED = True
|
||
NOW_PLAYING = True
|
||
await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export())
|
||
# 选择一个随机成语
|
||
idiom = secrets.choice(ALL_IDIOMS)
|
||
LAST_CHAR = idiom[-1]
|
||
# 发布成语
|
||
await evt.send(await UniMessage().text(f"第一个成语:「{idiom}」,请接!").export())
|
||
|
||
evt = on_alconna(Alconna(
|
||
"老子就是要玩成语接龙!!!"
|
||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||
|
||
@evt.handle()
|
||
async def force_play_game(event: BaseEvent):
|
||
await play_game(event, force=True)
|
||
|
||
|
||
evt = on_alconna(Alconna(
|
||
"不玩了"
|
||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||
|
||
@evt.handle()
|
||
async def _(event: BaseEvent):
|
||
global NOW_PLAYING, SCORE_BOARD, LAST_CHAR, ALL_BUFF_SCORE
|
||
if NOW_PLAYING:
|
||
NOW_PLAYING = False
|
||
# 发送好吧狗图片
|
||
# 打开好吧狗本地文件
|
||
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
|
||
img_data = f.read()
|
||
await evt.send(await UniMessage().image(raw=img_data).export())
|
||
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
|
||
if len(SCORE_BOARD) == 0:
|
||
result_text += "无人得分!"
|
||
else:
|
||
# 按分数排序,名字用 at 的方式
|
||
sorted_score = sorted(SCORE_BOARD.items(), key=lambda x: x[1]["score"], reverse=True)
|
||
for i, (user_id, info) in enumerate(sorted_score):
|
||
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + ALL_BUFF_SCORE} 分\n"
|
||
await evt.send(await result_text.export())
|
||
# 重置分数板
|
||
SCORE_BOARD = {}
|
||
LAST_CHAR = ""
|
||
else:
|
||
await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export())
|
||
|
||
# 跳过
|
||
evt = on_alconna(Alconna(
|
||
"跳过成语"
|
||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||
|
||
@evt.handle()
|
||
async def _(event: BaseEvent):
|
||
global NOW_PLAYING, LAST_CHAR, ALL_BUFF_SCORE
|
||
if not NOW_PLAYING:
|
||
return
|
||
await evt.send(await UniMessage().text("你们太菜了!全部扣100分!").export())
|
||
ALL_BUFF_SCORE -= 100
|
||
# 选择下一个成语
|
||
idiom = secrets.choice(ALL_IDIOMS)
|
||
LAST_CHAR = idiom[-1]
|
||
await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}」").export())
|
||
|
||
# 直接读取消息
|
||
evt = on_message()
|
||
|
||
@evt.handle()
|
||
async def _(event: BaseEvent, msg: UniMsg):
|
||
global NOW_PLAYING, LAST_CHAR, LOCK
|
||
if not NOW_PLAYING:
|
||
return
|
||
user_idiom = msg.extract_plain_text().strip()
|
||
if(user_idiom[0] != LAST_CHAR):
|
||
return
|
||
if LOCK:
|
||
return
|
||
LOCK = True
|
||
await handle_send_info(event, msg)
|
||
LOCK = False
|
||
|
||
async def handle_send_info(event: BaseEvent, msg: UniMsg):
|
||
global NOW_PLAYING, LAST_CHAR, SCORE_BOARD, ALL_BUFF_SCORE
|
||
user_idiom = msg.extract_plain_text().strip()
|
||
if(user_idiom not in ALL_IDIOMS and user_idiom not in ALL_WORDS):
|
||
# 扣0.1分
|
||
if isinstance(event, DiscordMessageEvent):
|
||
user_id = str(event.author.id)
|
||
user_name = str(event.author.name)
|
||
else:
|
||
user_id = str(event.get_user_id())
|
||
user_name = str(event.get_user_id())
|
||
if user_id not in SCORE_BOARD:
|
||
SCORE_BOARD[user_id] = {
|
||
"name": user_name,
|
||
"score": 0
|
||
}
|
||
SCORE_BOARD[user_id]["score"] -= 0.1
|
||
await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export())
|
||
return
|
||
# 成功接上
|
||
if isinstance(event, DiscordMessageEvent):
|
||
user_id = str(event.author.id)
|
||
user_name = str(event.author.name)
|
||
else:
|
||
user_id = str(event.get_user_id())
|
||
user_name = str(event.get_user_id())
|
||
|
||
if user_id not in SCORE_BOARD:
|
||
SCORE_BOARD[user_id] = {
|
||
"name": user_name,
|
||
"score": 0
|
||
}
|
||
SCORE_BOARD[user_id]["score"] += 1
|
||
# at 指定玩家
|
||
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {SCORE_BOARD[user_id]['score'] + ALL_BUFF_SCORE} 分!").export())
|
||
LAST_CHAR = user_idiom[-1]
|
||
await evt.send(await UniMessage().text(f"下一个成语请以「{LAST_CHAR}」开头!").export()) |