接入我写的模块来获得群上下文

This commit is contained in:
2025-10-19 04:51:49 +08:00
parent 75c6bbd23f
commit a65cb118cc

View File

@ -12,12 +12,23 @@ from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
UniMessage, UniMsg, on_alconna)
from nonebot_plugin_alconna import (
Alconna,
Args,
Field,
Subcommand,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
DATA_FILE_PATH = (
Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
)
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
@ -27,32 +38,38 @@ def load_banned_ids() -> list[str]:
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
def is_idiom_game_banned(group_id: str) -> bool:
banned_ids = load_banned_ids()
return group_id in banned_ids
def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
class TryStartState(Enum):
STARTED = 0
ALREADY_PLAYING = 1
NO_REMAINING_TIMES = 2
class TryStopState(Enum):
STOPPED = 0
NOT_PLAYING = 1
class TryVerifyState(Enum):
VERIFIED = 0
NOT_IDIOM = 1
@ -60,11 +77,12 @@ class TryVerifyState(Enum):
VERIFIED_BUT_NO_NEXT = 3
VERIFIED_GAME_END = 4
class IdiomGame:
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST : dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 成语首字字典
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 成语首字字典
__inited = False
@ -79,22 +97,22 @@ class IdiomGame:
self.last_play_date = ""
self.all_buff_score = 0
self.lock = asynkio.Lock()
self.remain_rounds = 0 # 剩余回合数
self.remain_rounds = 0 # 剩余回合数
IdiomGame.INSTANCE_LIST[group_id] = self
def be_able_to_play(self) -> bool:
if(self.last_play_date != datetime.date.today()):
if self.last_play_date != datetime.date.today():
self.last_play_date = datetime.date.today()
self.remain_playing_times = 1
if(self.remain_playing_times > 0):
if self.remain_playing_times > 0:
self.remain_playing_times -= 1
return True
return False
def choose_start_idiom(self) -> str:
'''
"""
随机选择一个成语作为起始成语
'''
"""
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
@ -113,7 +131,7 @@ class IdiomGame:
return TryStartState.NO_REMAINING_TIMES
instance.now_playing = True
return TryStartState.STARTED
def start_game(self, rounds: int = 100):
self.now_playing = True
self.remain_rounds = rounds
@ -128,93 +146,89 @@ class IdiomGame:
return TryStopState.NOT_PLAYING
instance.now_playing = False
return TryStopState.STOPPED
def clear_score_board(self):
self.score_board = {}
self.last_char = ""
def get_score_board(self) -> dict:
return self.score_board
def get_all_buff_score(self) -> int:
return self.all_buff_score
async def skip_idiom(self, buff_score: int = -100) -> str:
'''
"""
跳过当前成语,选择下一个成语
'''
"""
await self.lock.acquire()
self._skip_idiom_async(buff_score)
self.lock.release()
return self.last_idiom
def _skip_idiom_async(self, buff_score: int = -100) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
self.add_buff_score(buff_score)
return self.last_idiom
return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
'''
"""
用户发送成语
'''
"""
await self.lock.acquire()
state = self._verify_idiom(idiom, user_id)
self.lock.release()
return state
def is_nextable(self, last_char: str) -> bool:
'''
判断是否有成语可以接
'''
return last_char in IdiomGame.IDIOM_FIRST_CHAR
def is_nextable(self, last_char: str) -> bool:
"""
判断是否有成语可以接
"""
return last_char in IdiomGame.IDIOM_FIRST_CHAR
def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
# 新成语的首字应与上一条成语的尾字相同
if idiom[0] != self.last_char:
return TryVerifyState.WRONG_FIRST_CHAR
if(idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS):
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
self.add_score(user_id, -0.1)
return TryVerifyState.NOT_IDIOM
self.last_idiom = idiom
self.last_char = idiom[-1]
self.add_score(user_id, 1)
self.remain_rounds -= 1
if(self.remain_rounds <= 0):
if self.remain_rounds <= 0:
self.now_playing = False
return TryVerifyState.VERIFIED_GAME_END
if(not self.is_nextable(self.last_char)):
if not self.is_nextable(self.last_char):
# 没有成语可以接了,自动跳过
self._skip_idiom_async()
return TryVerifyState.VERIFIED_BUT_NO_NEXT
return TryVerifyState.VERIFIED
def get_user_score(self, user_id: str) -> int:
if user_id not in self.score_board:
return 0
return self.score_board[user_id]["score"]
def add_score(self, user_id: str, score: int):
if user_id not in self.score_board:
self.score_board[user_id] = {
"name": user_id,
"score": 0
}
self.score_board[user_id] = {"name": user_id, "score": 0}
self.score_board[user_id]["score"] += score
def add_buff_score(self, score: int):
self.all_buff_score += score
def get_playing_state(self) -> bool:
return self.now_playing
def get_last_char(self) -> str:
return self.last_char
@classmethod
def init_lexicon(cls):
if(cls.__inited):
if cls.__inited:
return
cls.__inited = True
@ -235,28 +249,40 @@ class IdiomGame:
COMMON_WORDS.append(word)
# 读取 THUOCL 成语库
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
with open(
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt",
"r",
encoding="utf-8",
) as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
with open(
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename,
"r",
encoding="utf-8",
) as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
# 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
# 其他四字词语表,仅表示可以有这个词
cls.ALL_WORDS = [word for word in cls.ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
cls.ALL_WORDS = (
[word for word in cls.ALL_WORDS if len(word) == 4]
+ THUOCL_WORDS
+ COMMON_WORDS
)
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
# 根据成语大表,划分出成语首字字典
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
@ -264,42 +290,80 @@ class IdiomGame:
cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna(Alconna(
"我要玩成语接龙",
Args['rounds?', int],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
evt = on_alconna(
Alconna(
"我要玩成语接龙",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def play_game(event: BaseEvent, force = False, rounds: Optional[int] = 100):
group_id = str(event.get_session_id())
async def play_game(
event: BaseEvent,
target: DepLongTaskTarget,
force=False,
rounds: Optional[int] = 100,
):
# group_id = str(event.get_session_id())
group_id = target.channel_id
if is_idiom_game_banned(group_id):
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
await evt.send(
await UniMessage().text("本群已被禁止使用成语接龙功能!").export()
)
return
rounds = rounds or 0
if rounds <= 0:
await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export())
return
state = IdiomGame.try_start_game(group_id, force)
if state == TryStartState.ALREADY_PLAYING:
await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export())
await evt.send(
await UniMessage()
.text("当前已有成语接龙游戏在进行中,请稍后再试!")
.export()
)
return
if state == TryStartState.NO_REMAINING_TIMES:
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
return
await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export())
await evt.send(
await UniMessage()
.text(
"你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!"
)
.export()
)
instance = IdiomGame.INSTANCE_LIST[group_id]
instance.start_game(rounds)
# 发布成语
await evt.send(await UniMessage().text(f"第一个成语:「{instance.last_idiom}」,请接!").export())
await evt.send(
await UniMessage()
.text(f"第一个成语:「{instance.last_idiom}」,请接!")
.export()
)
evt = on_alconna(
Alconna(
"老子就是要玩成语接龙!!!",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
evt = on_alconna(Alconna(
"老子就是要玩成语接龙!!!",
Args['rounds?', int],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def force_play_game(event: BaseEvent, rounds: Optional[int] = 100):
await play_game(event, force=True, rounds=rounds)
async def end_game(event: BaseEvent, group_id: str):
instance = IdiomGame.INSTANCE_LIST[group_id]
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
@ -308,19 +372,28 @@ async def end_game(event: BaseEvent, group_id: str):
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(score_board.items(), key=lambda x: x[1]["score"], reverse=True)
sorted_score = sorted(
score_board.items(), key=lambda x: x[1]["score"], reverse=True
)
for i, (user_id, info) in enumerate(sorted_score):
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + instance.get_all_buff_score()}\n"
result_text += (
f"{i + 1}. "
+ UniMessage().at(user_id)
+ f": {info['score'] + instance.get_all_buff_score()}\n"
)
await evt.send(await result_text.export())
instance.clear_score_board()
evt = on_alconna(Alconna(
"不玩了"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
evt = on_alconna(
Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
state = IdiomGame.try_stop_game(group_id)
if state == TryStopState.STOPPED:
# 发送好吧狗图片
@ -330,22 +403,30 @@ async def _(event: BaseEvent):
await evt.send(await UniMessage().image(raw=img_data).export())
await end_game(event, group_id)
else:
await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export())
await evt.send(
await UniMessage().text("当前没有成语接龙游戏在进行中!").export()
)
# 跳过
evt = on_alconna(Alconna(
"跳过成语"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
evt = on_alconna(
Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
async def _(target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
await evt.send(await UniMessage().text("你们太菜了全部扣100分").export())
idiom = await instance.skip_idiom(-100)
await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export())
await evt.send(
await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export()
)
def get_user_info(event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
@ -356,49 +437,85 @@ def get_user_info(event: BaseEvent):
user_name = str(event.get_user_id())
return user_id, user_name
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg):
group_id = str(event.get_session_id())
async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
user_idiom = msg.extract_plain_text().strip()
user_id , user_name = get_user_info(event)
user_id, user_name = get_user_info(event)
state = await instance.try_verify_idiom(user_idiom, user_id)
if(state == TryVerifyState.WRONG_FIRST_CHAR):
if state == TryVerifyState.WRONG_FIRST_CHAR:
return
if(state == TryVerifyState.NOT_IDIOM):
await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export())
if state == TryVerifyState.NOT_IDIOM:
await evt.send(
await UniMessage()
.at(user_id)
.text("接不上!这个不一样!你被扣了 0.1 分!")
.export()
)
return
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {instance.get_user_score(user_id)} 分!").export())
if(state == TryVerifyState.VERIFIED_GAME_END):
await evt.send(
await UniMessage()
.at(user_id)
.text(f"接对了!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
if state == TryVerifyState.VERIFIED_GAME_END:
await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id)
return
if(state == TryVerifyState.VERIFIED_BUT_NO_NEXT):
await evt.send(await UniMessage().text("但是,这是条死路!你们全部都要扣 100 分!").export())
await evt.send(await UniMessage().text(f"重新抽取成语「{instance.last_idiom}").export())
await evt.send(await UniMessage().text(f"下一个成语请以「{instance.get_last_char()}」开头!").export())
if state == TryVerifyState.VERIFIED_BUT_NO_NEXT:
await evt.send(
await UniMessage()
.text("但是,这是条死路!你们全部都要扣 100 分!")
.export()
)
await evt.send(
await UniMessage().text(f"重新抽取成语「{instance.last_idiom}").export()
)
await evt.send(
await UniMessage()
.text(f"下一个成语请以「{instance.get_last_char()}」开头!")
.export()
)
evt = on_alconna(
Alconna("禁止成语接龙"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
evt = on_alconna(Alconna(
"禁止成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
add_banned_id(group_id)
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
evt = on_alconna(Alconna(
"开启成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
evt = on_alconna(
Alconna("开启成语接龙"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
remove_banned_id(group_id)
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())