将成语接龙还原为内存存储,空调优化为部分内存存储且具有过期期限,避免频繁数据库查询
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-11-19 11:04:13 +08:00
parent 00bdb90e3c
commit 1233677eea
2 changed files with 191 additions and 67 deletions

View File

@ -1,14 +1,20 @@
import asyncio
from enum import Enum
from io import BytesIO
from pathlib import Path
import signal
import time
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from nonebot import logger
from konabot.common.database import DatabaseManager
from konabot.common.path import ASSETS_PATH, FONTS_PATH
from konabot.common.path import DATA_PATH
import nonebot
import json
ROOT_PATH = Path(__file__).resolve().parent
@ -20,9 +26,102 @@ class CrashType(Enum):
BURNT = 0
FROZEN = 1
driver = nonebot.get_driver()
ac_manager: 'AirConditionerManager' = None
@driver.on_startup
async def register_startup_hook():
global ac_manager
# 启动自动保存任务
ac_manager = AirConditionerManager(save_interval=300) # 5分钟
await ac_manager.start_auto_save()
@driver.on_shutdown
async def register_shutdown_hook():
"""注册关闭时需要执行的函数"""
# 停止自动保存任务
if ac_manager:
await ac_manager.stop_auto_save()
class AirConditionerManager:
def __init__(self, save_interval: int = 300): # 默认5分钟保存一次
self.save_interval = save_interval
self._save_task = None
self._running = False
async def start_auto_save(self):
"""启动自动保存任务"""
self._running = True
self._save_task = asyncio.create_task(self._auto_save_loop())
logger.info(f"自动保存任务已启动,间隔: {self.save_interval}")
async def stop_auto_save(self):
"""停止自动保存任务"""
if self._save_task:
self._running = False
self._save_task.cancel()
try:
await self._save_task
except asyncio.CancelledError:
pass
logger.info("自动保存任务已停止")
else:
logger.warning("没有正在运行的自动保存任务")
async def _auto_save_loop(self):
"""自动保存循环"""
while self._running:
try:
await asyncio.sleep(self.save_interval)
await self.save_all_instances()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"定时保存失败: {e}")
async def save_all_instances(self):
save_time = time.time()
to_remove = []
"""保存所有实例到数据库"""
for ac_id, ac_instance in AirConditioner.InstancesPool.items():
try:
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "update_ac.sql",
(ac_instance.on, ac_instance.temperature,
ac_instance.burnt, ac_instance.frozen, ac_id)
)
if(save_time - ac_instance.instance_get_time >= 3600):
to_remove.append(ac_id)
except Exception as e:
logger.error(f"保存空调 {ac_id} 失败: {e}")
logger.info(f"定时保存完成,共保存 {len(AirConditioner.InstancesPool)} 个空调实例")
# 删除时间过长实例
for ac_id in to_remove:
del AirConditioner.InstancesPool[ac_id]
logger.info(f"清理长期不活跃的空调实例完成,目前池内共有 {len(AirConditioner.InstancesPool)} 个实例")
class AirConditioner:
InstancesPool: dict[str, 'AirConditioner'] = {}
@classmethod
async def refresh_ac(cls, id: str):
cls.InstancesPool[id].instance_get_time = time.time()
@classmethod
async def storage_ac(cls, id: str, ac: 'AirConditioner'):
cls.InstancesPool[id] = ac
@classmethod
async def get_ac(cls, id: str) -> 'AirConditioner':
if(id in cls.InstancesPool):
cls.refresh_ac(id)
return cls.InstancesPool[id]
# 如果没有,那么从数据库重新实例化一个 AC 出来
result = await db_manager.query_by_sql_file(ROOT_PATH / "sql" / "query_ac.sql", (id,))
if len(result) == 0:
ac = await cls.create_ac(id)
@ -33,6 +132,7 @@ class AirConditioner:
ac.temperature = float(ac_data["temperature"])
ac.burnt = bool(ac_data["burnt"])
ac.frozen = bool(ac_data["frozen"])
await cls.storage_ac(id, ac)
return ac
@classmethod
@ -42,6 +142,7 @@ class AirConditioner:
ROOT_PATH / "sql" / "insert_ac.sql",
(id, ac.on, ac.temperature, ac.burnt, ac.frozen)
)
await cls.storage_ac(id, ac)
return ac
async def update_ac(self, state: bool = None, temperature_delta: float = None, burnt: bool = None, frozen: bool = None) -> 'AirConditioner':
@ -53,10 +154,10 @@ class AirConditioner:
self.burnt = burnt
if frozen is not None:
self.frozen = frozen
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "update_ac.sql",
(self.on, self.temperature, self.burnt, self.frozen, self.id)
)
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "update_ac.sql",
# (self.on, self.temperature, self.burnt, self.frozen, self.id)
# )
return self
async def change_ac(self) -> 'AirConditioner':
@ -64,10 +165,10 @@ class AirConditioner:
self.temperature = 24
self.burnt = False
self.frozen = False
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "update_ac.sql",
(self.on, self.temperature, self.burnt, self.frozen, self.id)
)
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "update_ac.sql",
# (self.on, self.temperature, self.burnt, self.frozen, self.id)
# )
return self
def __init__(self, id: str) -> None:
@ -77,6 +178,8 @@ class AirConditioner:
self.burnt = False
self.frozen = False
self.instance_get_time = time.time()
async def broke_ac(self, crash_type: CrashType):
'''
让空调坏掉

View File

@ -117,18 +117,22 @@ class IdiomGameLLM:
@classmethod
async def storage_idiom(cls, idiom: str):
# 将 idiom 存入数据库
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_custom_word.sql",
(idiom,)
)
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "insert_custom_word.sql",
# (idiom,)
# )
# 将 idiom 存入本地文件以备后续分析
with open(DATA_DIR / "idiom_llm_storage.txt", "a", encoding="utf-8") as f:
f.write(idiom + "\n")
IdiomGame.append_into_word_list(idiom)
class IdiomGame:
# ALL_WORDS = [] # 所有四字词语
# ALL_IDIOMS = [] # 所有成语
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
# IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典
# AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典
IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典
AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典
__inited = False
@ -153,10 +157,15 @@ class IdiomGame:
'''
将一个新词加入到词语列表中
'''
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_custom_word.sql",
(word,)
)
if word not in cls.ALL_WORDS:
cls.ALL_WORDS.append(word)
if word[0] not in cls.IDIOM_FIRST_CHAR:
cls.IDIOM_FIRST_CHAR[word[0]] = []
cls.IDIOM_FIRST_CHAR[word[0]].append(word)
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "insert_custom_word.sql",
# (word,)
# )
def be_able_to_play(self) -> bool:
if self.last_play_date != datetime.date.today():
@ -169,10 +178,11 @@ class IdiomGame:
@staticmethod
async def random_idiom() -> str:
result = await db_manager.query_by_sql_file(
ROOT_PATH / "sql" / "random_choose_idiom.sql"
)
return result[0]["idiom"]
# result = await db_manager.query_by_sql_file(
# ROOT_PATH / "sql" / "random_choose_idiom.sql"
# )
# return result[0]["idiom"]
return secrets.choice(IdiomGame.ALL_IDIOMS)
async def choose_start_idiom(self) -> str:
"""
@ -257,11 +267,12 @@ class IdiomGame:
"""
判断是否有成语可以接
"""
result = await db_manager.query_by_sql_file(
ROOT_PATH / "sql" / "is_nextable.sql",
(last_char,)
)
return result[0]["DEED"] == 1
# result = await db_manager.query_by_sql_file(
# ROOT_PATH / "sql" / "is_nextable.sql",
# (last_char,)
# )
# return result[0]["DEED"] == 1
return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR
def add_already_idiom(self, idiom: str):
if idiom in self.already_idioms:
@ -293,12 +304,13 @@ class IdiomGame:
state.append(TryVerifyState.WRONG_FIRST_CHAR)
return state
# 成语是否存在
result = await db_manager.query_by_sql_file(
ROOT_PATH / "sql" / "query_idiom.sql",
(idiom, idiom, idiom)
)
status_result = result[0]["status"]
if status_result == -1:
# result = await db_manager.query_by_sql_file(
# ROOT_PATH / "sql" / "query_idiom.sql",
# (idiom, idiom, idiom)
# )
# status_result = result[0]["status"]
# if status_result == -1:
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
logger.info(f"用户 {user_id} 发送了未知词语 {idiom},正在使用 LLM 进行验证")
try:
if not await IdiomGameLLM.verify_idiom_with_llm(idiom):
@ -320,7 +332,8 @@ class IdiomGame:
self.last_idiom = idiom
self.last_char = idiom[-1]
self.add_score(user_id, 1 * score_k) # 先加 1 分
if status_result == 1:
# if status_result == 1:
if idiom in IdiomGame.ALL_IDIOMS:
state.append(TryVerifyState.VERIFIED_AND_REAL)
self.add_score(user_id, 4 * score_k) # 再加 4 分
self.remain_rounds -= 1
@ -357,22 +370,26 @@ class IdiomGame:
@classmethod
async def random_idiom_starting_with(cls, first_char: str) -> Optional[str]:
# await cls.init_lexicon()
# result = await db_manager.query_by_sql_file(
# ROOT_PATH / "sql" / "query_idiom_start_with.sql",
# (first_char,)
# )
# if len(result) == 0:
# return None
# return result[0]["idiom"]
await cls.init_lexicon()
result = await db_manager.query_by_sql_file(
ROOT_PATH / "sql" / "query_idiom_start_with.sql",
(first_char,)
)
if len(result) == 0:
if first_char not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
return None
return result[0]["idiom"]
return secrets.choice(cls.AVALIABLE_IDIOM_FIRST_CHAR[first_char])
@classmethod
async def init_lexicon(cls):
if cls.__inited:
return
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "create_table.sql"
) # 确保数据库初始化
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "create_table.sql"
# ) # 确保数据库初始化
cls.__inited = True
# 成语大表
@ -439,10 +456,10 @@ class IdiomGame:
ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
# 批量插入数据库
await db_manager.execute_many_values_by_sql_file(
ROOT_PATH / "sql" / "insert_idiom.sql",
[(idiom,) for idiom in ALL_IDIOMS]
)
# await db_manager.execute_many_values_by_sql_file(
# ROOT_PATH / "sql" / "insert_idiom.sql",
# [(idiom,) for idiom in ALL_IDIOMS]
# )
# 其他四字词语表,仅表示可以有这个词
@ -451,29 +468,33 @@ class IdiomGame:
+ THUOCL_WORDS
+ COMMON_WORDS
)
cls.ALL_WORDS = ALL_WORDS + LOCAL_LLM_WORDS
cls.ALL_IDIOMS = ALL_IDIOMS
# 插入数据库
await db_manager.execute_many_values_by_sql_file(
ROOT_PATH / "sql" / "insert_word.sql",
[(word,) for word in ALL_WORDS]
)
# await db_manager.execute_many_values_by_sql_file(
# ROOT_PATH / "sql" / "insert_word.sql",
# [(word,) for word in ALL_WORDS]
# )
# 自定义词语 LOCAL_LLM_WORDS 插入数据库,兼容用
await db_manager.execute_many_values_by_sql_file(
ROOT_PATH / "sql" / "insert_custom_word.sql",
[(word,) for word in LOCAL_LLM_WORDS]
)
# await db_manager.execute_many_values_by_sql_file(
# ROOT_PATH / "sql" / "insert_custom_word.sql",
# [(word,) for word in LOCAL_LLM_WORDS]
# )
# # 根据成语大表,划分出成语首字字典
# for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
# if idiom[0] not in cls.IDIOM_FIRST_CHAR:
# cls.IDIOM_FIRST_CHAR[idiom[0]] = []
# cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
# 根据成语大表,划分出成语首字字典
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
if idiom[0] not in cls.IDIOM_FIRST_CHAR:
cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
# # 根据真正的成语大表,划分出有效成语首字字典
# for idiom in cls.ALL_IDIOMS:
# if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
# cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = []
# cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
# 根据真正的成语大表,划分出有效成语首字字典
for idiom in cls.ALL_IDIOMS:
if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = []
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna(