Compare commits

...

7 Commits

Author SHA1 Message Date
b42385f780 修复成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 18:24:03 +08:00
6cae38dea9 提升 LongTask 的健壮性
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 16:54:59 +08:00
8594b59783 修复 LongTask 在 Discord 和控制台无法正确返回是否顺利完成任务的问题 2025-10-19 16:51:22 +08:00
f768c91430 完善 LongTask 模块
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 16:47:50 +08:00
a65cb118cc 接入我写的模块来获得群上下文
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 04:51:49 +08:00
75c6bbd23f Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot 2025-10-19 04:45:26 +08:00
aaf0a75d65 添加若干有用的小模块 2025-10-19 04:45:15 +08:00
4 changed files with 605 additions and 107 deletions

View File

@ -0,0 +1,36 @@
import asyncio
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Generic, TypeVar
from pydantic import BaseModel, ValidationError
T = TypeVar("T", bound=BaseModel)
class DataManager(Generic[T]):
def __init__(self, cls: type[T], fp: Path) -> None:
self.cls = cls
self.fp = fp
self._aio_lock = asyncio.Lock()
self._data: T | None = None
def load(self) -> T:
if not self.fp.exists():
return self.cls()
try:
return self.cls.model_validate_json(self.fp.read_text())
except ValidationError:
return self.cls()
def save(self, data: T):
self.fp.write_text(data.model_dump_json())
@asynccontextmanager
async def get_data(self):
await self._aio_lock.acquire()
self._data = self.load()
yield self._data
self.save(self._data)
self._data = None
self._aio_lock.release()

301
konabot/common/longtask.py Normal file
View File

@ -0,0 +1,301 @@
from __future__ import annotations
from contextlib import asynccontextmanager
import datetime
import json
from typing import Annotated, Any, Callable, Coroutine, cast
import asyncio as asynkio
import uuid
from loguru import logger
import nonebot
from nonebot.params import Depends
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters import Bot as BaseBot
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot.adapters.onebot.v11 import GroupMessageEvent as OBGroupMessageEvent
from nonebot.adapters.onebot.v11 import PrivateMessageEvent as OBPrivateMessageEvent
from nonebot.adapters.console import Bot as ConsoleBot
from nonebot.adapters.console import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord import MessageEvent as DCMessageEvent
from nonebot.adapters.discord import Bot as DCBot
from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, ValidationError
from .path import DATA_PATH
LONGTASK_DATA_DIR = DATA_PATH / "longtasks.json"
class LongTaskTarget(BaseModel):
"""
用于定义长期任务的目标沟通对象,一般通过 DepLongTaskTarget 依赖注入获取:
```python
@cmd.handle()
async def _(target: DepLongTaskTarget):
...
```
"""
platform: str
"沟通对象所在的平台"
self_id: str
"进行沟通的对象自己的 ID"
channel_id: str
"沟通对象所在的群或者 Discord Channel。若为空则代表是私聊"
target_id: str
"沟通对象的 ID"
async def send_message(self, msg: UniMessage, at: bool = True) -> bool:
try:
bot = nonebot.get_bot(self.self_id)
except KeyError:
logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}")
return False
if self.platform == "qq":
if not isinstance(bot, OBBot):
logger.warning(
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
self.platform
} BOT_CLASS={bot.__class__.__name__}"
)
return False
if self.channel_id == "":
# 私聊模式
await bot.send_private_msg(
user_id=int(self.target_id),
message=cast(Any, await msg.export(bot)),
auto_escape=False,
)
return True
else:
if at:
msg = UniMessage().at(self.target_id).text(" ") + msg
await bot.send_group_msg(
group_id=int(self.channel_id),
message=cast(Any, await msg.export(bot)),
auto_escape=False,
)
return True
if self.platform == "console":
if not isinstance(bot, ConsoleBot):
logger.warning(
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
self.platform
} BOT_CLASS={bot.__class__.__name__}"
)
return False
await bot.send_message(self.channel_id, cast(Any, await msg.export()))
return True
if self.platform == "discord":
if not isinstance(bot, DCBot):
logger.warning(
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
self.platform
} BOT_CLASS={bot.__class__.__name__}"
)
return False
await bot.send_to(
channel_id=int(self.channel_id),
message=cast(
Any, await (UniMessage().at(self.target_id) + msg).export()
),
tts=False,
)
return True
logger.warning(f"没有一个平台是期望的平台 PLATFORM={self.platform}")
return False
class LongTask(BaseModel):
uuid: str
data_json: str
target: LongTaskTarget
callback: str
deadline: datetime.datetime
canceled: bool = False
_aio_task: asynkio.Task | None = None
async def run(self):
now = datetime.datetime.now()
if self.deadline < now and not self.canceled:
await self._run_task()
return
await asynkio.sleep((self.deadline - now).total_seconds())
if self.canceled:
return
await self._run_task()
async def _run_task(self):
hdl = registered_long_task_handler.get(self.callback, None)
if hdl is None:
logger.warning(
f"Callback {self.callback} 未曾被注册,但是被期待调用,已忽略"
)
async with longtask_data() as datafile:
datafile.to_handle[self.callback] = [
t
for t in datafile.to_handle.get(self.callback, [])
if t.uuid != self.uuid
]
datafile.unhandled.setdefault(self.callback, []).append(self)
return
success = False
try:
await hdl(self)
success = True
except Exception as e:
logger.exception(e)
async with longtask_data() as datafile:
datafile.to_handle[self.callback] = [
t for t in datafile.to_handle[self.callback] if t.uuid != self.uuid
]
if not success:
datafile.unhandled.setdefault(self.callback, []).append(self)
logger.info(
f"LongTask 执行失败 UUID={self.uuid} callback={self.callback}"
)
else:
logger.info(
f"LongTask 工作完成 UUID={self.uuid} callback={self.callback}"
)
def clean(self):
self._aio_task = None
@property
def data(self):
return json.loads(self.data_json)
async def start(self):
self._aio_task = asynkio.Task(self.run())
self._aio_task.add_done_callback(lambda _: self.clean())
class LongTaskModuleData(BaseModel):
to_handle: dict[str, list[LongTask]]
unhandled: dict[str, list[LongTask]]
async def get_long_task_target(event: BaseEvent, bot: BaseBot) -> LongTaskTarget | None:
if isinstance(event, OBGroupMessageEvent):
return LongTaskTarget(
platform="qq",
self_id=str(event.self_id),
channel_id=str(event.group_id),
target_id=str(event.user_id),
)
if isinstance(event, OBPrivateMessageEvent):
return LongTaskTarget(
platform="qq",
self_id=str(event.self_id),
channel_id="",
target_id=str(event.user_id),
)
if isinstance(event, ConsoleMessageEvent):
return LongTaskTarget(
platform="console",
self_id=str(event.self_id),
channel_id=str(event.channel.id),
target_id=str(event.user.id),
)
if isinstance(event, DCMessageEvent):
self_id = ""
if isinstance(bot, DCBot):
self_id = str(bot.self_id)
return LongTaskTarget(
platform="discord",
self_id=self_id,
channel_id=str(event.channel_id),
target_id=str(event.user_id),
)
_TaskHandler = Callable[[LongTask], Coroutine[Any, Any, Any]]
registered_long_task_handler: dict[str, _TaskHandler] = {}
longtask_lock = asynkio.Lock()
def handle_long_task(callback_id: str):
def _decorator(func: _TaskHandler):
assert callback_id not in registered_long_task_handler, (
"有长任务的 ID 出现冲突,请换个名字!"
)
registered_long_task_handler[callback_id] = func
return func
return _decorator
def _load_longtask_data() -> LongTaskModuleData:
try:
txt = LONGTASK_DATA_DIR.read_text()
return LongTaskModuleData.model_validate_json(txt)
except (FileNotFoundError, ValidationError) as e:
logger.info(f"取得 LongTask 数据时出现问题:{e}")
return LongTaskModuleData(
to_handle={},
unhandled={},
)
def _save_longtask_data(data: LongTaskModuleData):
LONGTASK_DATA_DIR.write_text(data.model_dump_json())
@asynccontextmanager
async def longtask_data():
await longtask_lock.acquire()
data = _load_longtask_data()
yield data
_save_longtask_data(data)
longtask_lock.release()
async def create_longtask(
handler: str,
data: dict[str, Any],
target: LongTaskTarget,
deadline: datetime.datetime,
):
task = LongTask(
uuid=str(uuid.uuid4()),
data_json=json.dumps(data),
target=target,
callback=handler,
deadline=deadline,
)
logger.info(f"创建了新的 LongTask UUID={task.uuid} CALLBACK={task.callback}")
await task.start()
async with longtask_data() as d:
d.to_handle.setdefault(handler, []).append(task)
return task
async def init_longtask():
counter = 0
req: set[str] = set()
async with longtask_data() as data:
for v in data.to_handle.values():
for t in v:
await t.start()
counter += 1
req.add(t.callback)
logger.info(f"LongTask 启动了任务 数量={counter} 期望的门类=[{','.join(req)}]")
DepLongTaskTarget = Annotated[LongTaskTarget, Depends(get_long_task_target)]

View File

@ -1,23 +1,30 @@
import asyncio as asynkio
import base64
from pathlib import Path
import secrets
import json
import datetime
from typing import Literal, Optional
import json
import secrets
from enum import Enum
from pathlib import Path
from typing import Optional
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
UniMessage, UniMsg, on_alconna)
from nonebot_plugin_alconna import (
Alconna,
Args,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
DATA_FILE_PATH = (
Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
)
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
@ -27,32 +34,38 @@ def load_banned_ids() -> list[str]:
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
def is_idiom_game_banned(group_id: str) -> bool:
banned_ids = load_banned_ids()
return group_id in banned_ids
def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
class TryStartState(Enum):
STARTED = 0
ALREADY_PLAYING = 1
NO_REMAINING_TIMES = 2
class TryStopState(Enum):
STOPPED = 0
NOT_PLAYING = 1
class TryVerifyState(Enum):
VERIFIED = 0
NOT_IDIOM = 1
@ -60,11 +73,12 @@ class TryVerifyState(Enum):
VERIFIED_BUT_NO_NEXT = 3
VERIFIED_GAME_END = 4
class IdiomGame:
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST : dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 成语首字字典
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 成语首字字典
__inited = False
@ -79,22 +93,22 @@ class IdiomGame:
self.last_play_date = ""
self.all_buff_score = 0
self.lock = asynkio.Lock()
self.remain_rounds = 0 # 剩余回合数
self.remain_rounds = 0 # 剩余回合数
IdiomGame.INSTANCE_LIST[group_id] = self
def be_able_to_play(self) -> bool:
if(self.last_play_date != datetime.date.today()):
if self.last_play_date != datetime.date.today():
self.last_play_date = datetime.date.today()
self.remain_playing_times = 1
if(self.remain_playing_times > 0):
if self.remain_playing_times > 0:
self.remain_playing_times -= 1
return True
return False
def choose_start_idiom(self) -> str:
'''
"""
随机选择一个成语作为起始成语
'''
"""
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
@ -113,7 +127,7 @@ class IdiomGame:
return TryStartState.NO_REMAINING_TIMES
instance.now_playing = True
return TryStartState.STARTED
def start_game(self, rounds: int = 100):
self.now_playing = True
self.remain_rounds = rounds
@ -128,93 +142,89 @@ class IdiomGame:
return TryStopState.NOT_PLAYING
instance.now_playing = False
return TryStopState.STOPPED
def clear_score_board(self):
self.score_board = {}
self.last_char = ""
def get_score_board(self) -> dict:
return self.score_board
def get_all_buff_score(self) -> int:
return self.all_buff_score
async def skip_idiom(self, buff_score: int = -100) -> str:
'''
"""
跳过当前成语,选择下一个成语
'''
"""
await self.lock.acquire()
self._skip_idiom_async(buff_score)
self.lock.release()
return self.last_idiom
def _skip_idiom_async(self, buff_score: int = -100) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
self.add_buff_score(buff_score)
return self.last_idiom
return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
'''
"""
用户发送成语
'''
"""
await self.lock.acquire()
state = self._verify_idiom(idiom, user_id)
self.lock.release()
return state
def is_nextable(self, last_char: str) -> bool:
'''
判断是否有成语可以接
'''
return last_char in IdiomGame.IDIOM_FIRST_CHAR
def is_nextable(self, last_char: str) -> bool:
"""
判断是否有成语可以接
"""
return last_char in IdiomGame.IDIOM_FIRST_CHAR
def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
# 新成语的首字应与上一条成语的尾字相同
if idiom[0] != self.last_char:
return TryVerifyState.WRONG_FIRST_CHAR
if(idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS):
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
self.add_score(user_id, -0.1)
return TryVerifyState.NOT_IDIOM
self.last_idiom = idiom
self.last_char = idiom[-1]
self.add_score(user_id, 1)
self.remain_rounds -= 1
if(self.remain_rounds <= 0):
if self.remain_rounds <= 0:
self.now_playing = False
return TryVerifyState.VERIFIED_GAME_END
if(not self.is_nextable(self.last_char)):
if not self.is_nextable(self.last_char):
# 没有成语可以接了,自动跳过
self._skip_idiom_async()
return TryVerifyState.VERIFIED_BUT_NO_NEXT
return TryVerifyState.VERIFIED
def get_user_score(self, user_id: str) -> int:
if user_id not in self.score_board:
return 0
return self.score_board[user_id]["score"]
def add_score(self, user_id: str, score: int):
if user_id not in self.score_board:
self.score_board[user_id] = {
"name": user_id,
"score": 0
}
self.score_board[user_id] = {"name": user_id, "score": 0}
self.score_board[user_id]["score"] += score
def add_buff_score(self, score: int):
self.all_buff_score += score
def get_playing_state(self) -> bool:
return self.now_playing
def get_last_char(self) -> str:
return self.last_char
@classmethod
def init_lexicon(cls):
if(cls.__inited):
if cls.__inited:
return
cls.__inited = True
@ -235,28 +245,40 @@ class IdiomGame:
COMMON_WORDS.append(word)
# 读取 THUOCL 成语库
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
with open(
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt",
"r",
encoding="utf-8",
) as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
with open(
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename,
"r",
encoding="utf-8",
) as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
# 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
# 其他四字词语表,仅表示可以有这个词
cls.ALL_WORDS = [word for word in cls.ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
cls.ALL_WORDS = (
[word for word in cls.ALL_WORDS if len(word) == 4]
+ THUOCL_WORDS
+ COMMON_WORDS
)
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
# 根据成语大表,划分出成语首字字典
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
@ -264,41 +286,81 @@ class IdiomGame:
cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna(Alconna(
"我要玩成语接龙",
Args['rounds?', int],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
evt = on_alconna(
Alconna(
"我要玩成语接龙",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def play_game(event: BaseEvent, force = False, rounds: Optional[int] = 100):
group_id = str(event.get_session_id())
async def play_game(
event: BaseEvent,
target: DepLongTaskTarget,
force=False,
rounds: Optional[int] = 100,
):
# group_id = str(event.get_session_id())
group_id = target.channel_id
if is_idiom_game_banned(group_id):
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
await evt.send(
await UniMessage().text("本群已被禁止使用成语接龙功能!").export()
)
return
rounds = rounds or 0
if rounds <= 0:
await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export())
return
state = IdiomGame.try_start_game(group_id, force)
if state == TryStartState.ALREADY_PLAYING:
await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export())
await evt.send(
await UniMessage()
.text("当前已有成语接龙游戏在进行中,请稍后再试!")
.export()
)
return
if state == TryStartState.NO_REMAINING_TIMES:
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
return
await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export())
await evt.send(
await UniMessage()
.text(
"你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!"
)
.export()
)
instance = IdiomGame.INSTANCE_LIST[group_id]
instance.start_game(rounds)
# 发布成语
await evt.send(await UniMessage().text(f"第一个成语:「{instance.last_idiom}」,请接!").export())
await evt.send(
await UniMessage()
.text(f"第一个成语:「{instance.last_idiom}」,请接!")
.export()
)
evt = on_alconna(
Alconna(
"老子就是要玩成语接龙!!!",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
evt = on_alconna(Alconna(
"老子就是要玩成语接龙!!!",
Args['rounds?', int],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def force_play_game(event: BaseEvent, rounds: Optional[int] = 100):
await play_game(event, force=True, rounds=rounds)
async def force_play_game(
event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100
):
await play_game(event, target, force=True, rounds=rounds)
async def end_game(event: BaseEvent, group_id: str):
instance = IdiomGame.INSTANCE_LIST[group_id]
@ -308,19 +370,28 @@ async def end_game(event: BaseEvent, group_id: str):
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(score_board.items(), key=lambda x: x[1]["score"], reverse=True)
sorted_score = sorted(
score_board.items(), key=lambda x: x[1]["score"], reverse=True
)
for i, (user_id, info) in enumerate(sorted_score):
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + instance.get_all_buff_score()}\n"
result_text += (
f"{i + 1}. "
+ UniMessage().at(user_id)
+ f": {info['score'] + instance.get_all_buff_score()}\n"
)
await evt.send(await result_text.export())
instance.clear_score_board()
evt = on_alconna(Alconna(
"不玩了"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
evt = on_alconna(
Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
state = IdiomGame.try_stop_game(group_id)
if state == TryStopState.STOPPED:
# 发送好吧狗图片
@ -330,22 +401,30 @@ async def _(event: BaseEvent):
await evt.send(await UniMessage().image(raw=img_data).export())
await end_game(event, group_id)
else:
await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export())
await evt.send(
await UniMessage().text("当前没有成语接龙游戏在进行中!").export()
)
# 跳过
evt = on_alconna(Alconna(
"跳过成语"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
evt = on_alconna(
Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
async def _(target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
await evt.send(await UniMessage().text("你们太菜了全部扣100分").export())
idiom = await instance.skip_idiom(-100)
await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export())
await evt.send(
await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export()
)
def get_user_info(event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
@ -356,49 +435,84 @@ def get_user_info(event: BaseEvent):
user_name = str(event.get_user_id())
return user_id, user_name
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg):
group_id = str(event.get_session_id())
async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
user_idiom = msg.extract_plain_text().strip()
user_id , user_name = get_user_info(event)
user_id, user_name = get_user_info(event)
state = await instance.try_verify_idiom(user_idiom, user_id)
if(state == TryVerifyState.WRONG_FIRST_CHAR):
if state == TryVerifyState.WRONG_FIRST_CHAR:
return
if(state == TryVerifyState.NOT_IDIOM):
await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export())
if state == TryVerifyState.NOT_IDIOM:
await evt.send(
await UniMessage()
.at(user_id)
.text("接不上!这个不一样!你被扣了 0.1 分!")
.export()
)
return
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {instance.get_user_score(user_id)} 分!").export())
if(state == TryVerifyState.VERIFIED_GAME_END):
await evt.send(
await UniMessage()
.at(user_id)
.text(f"接对了!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
if state == TryVerifyState.VERIFIED_GAME_END:
await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id)
return
if(state == TryVerifyState.VERIFIED_BUT_NO_NEXT):
await evt.send(await UniMessage().text("但是,这是条死路!你们全部都要扣 100 分!").export())
await evt.send(await UniMessage().text(f"重新抽取成语「{instance.last_idiom}").export())
await evt.send(await UniMessage().text(f"下一个成语请以「{instance.get_last_char()}」开头!").export())
if state == TryVerifyState.VERIFIED_BUT_NO_NEXT:
await evt.send(
await UniMessage()
.text("但是,这是条死路!你们全部都要扣 100 分!")
.export()
)
await evt.send(
await UniMessage().text(f"重新抽取成语「{instance.last_idiom}").export()
)
await evt.send(
await UniMessage()
.text(f"下一个成语请以「{instance.get_last_char()}」开头!")
.export()
)
evt = on_alconna(
Alconna("禁止成语接龙"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
evt = on_alconna(Alconna(
"禁止成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
add_banned_id(group_id)
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
evt = on_alconna(Alconna(
"开启成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
evt = on_alconna(
Alconna("开启成语接龙"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
remove_banned_id(group_id)
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())

View File

@ -0,0 +1,47 @@
import asyncio
# import datetime
from loguru import logger
import nonebot
# from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import (
# DepLongTaskTarget,
# LongTask,
# create_longtask,
# handle_long_task,
init_longtask,
)
driver = nonebot.get_driver()
INIT_FLAG = {"flag": False}
@driver.on_bot_connect
async def _():
if INIT_FLAG["flag"]:
return
INIT_FLAG["flag"] = True
logger.info("有 Bot 连接,等待 5 秒后初始化 LongTask 模块")
await asyncio.sleep(5)
await init_longtask()
logger.info("LongTask 初始化完成")
# cmd1 = nonebot.on_command("test114")
#
#
# @handle_long_task("test_callback_001")
# async def _(lt: LongTask):
# await lt.target.send_message(UniMessage().text("Hello, world!"), at=True)
#
#
# @cmd1.handle()
# async def _(target: DepLongTaskTarget):
# await create_longtask(
# handler="test_callback_001",
# data={},
# target=target,
# deadline=datetime.datetime.now() + datetime.timedelta(seconds=20),
# )