database 接入

This commit is contained in:
2025-11-18 19:36:05 +08:00
parent a8a7b62f76
commit f21da657db
23 changed files with 376 additions and 82 deletions

View File

@ -1,4 +1,4 @@
ENVIRONMENT=dev ENVIRONMENT=dev
PORT=21333 PORT=21333
DATABASE_PATH="./data/database.db"
ENABLE_CONSOLE=true ENABLE_CONSOLE=true

4
.gitignore vendored
View File

@ -1,4 +1,6 @@
/.env /.env
/data /data
__pycache__ __pycache__
*.db

4
bot.py
View File

@ -10,6 +10,7 @@ from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
from konabot.common.log import init_logger from konabot.common.log import init_logger
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.path import LOG_PATH from konabot.common.path import LOG_PATH
from konabot.core.preinit import preinit
dotenv.load_dotenv() dotenv.load_dotenv()
env = os.environ.get("ENVIRONMENT", "prod") env = os.environ.get("ENVIRONMENT", "prod")
@ -48,6 +49,9 @@ def main():
nonebot.load_plugins("konabot/plugins") nonebot.load_plugins("konabot/plugins")
nonebot.load_plugin("nonebot_plugin_analysis_bilibili") nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
# 预加载
preinit("konabot/plugins")
nonebot.run() nonebot.run()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -0,0 +1,64 @@
import os
import sqlite3
from typing import List, Dict, Any, Optional
class DatabaseManager:
"""超级无敌神奇的数据库!"""
@classmethod
def query(cls, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""执行查询语句并返回结果"""
conn = sqlite3.connect(os.environ.get('DATABASE_PATH', './data/database.db'))
cursor = conn.cursor()
cursor.execute(query, params or ())
columns = [description[0] for description in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
cursor.close()
conn.close()
return results
@classmethod
def query_by_sql_file(cls, file_path: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""从 SQL 文件中读取查询语句并执行"""
with open(file_path, 'r', encoding='utf-8') as f:
query = f.read()
return cls.query(query, params)
@classmethod
def execute(cls, command: str, params: Optional[tuple] = None) -> None:
"""执行非查询语句"""
conn = sqlite3.connect(os.environ.get('DATABASE_PATH', './data/database.db'))
cursor = conn.cursor()
cursor.execute(command, params or ())
conn.commit()
cursor.close()
conn.close()
@classmethod
def execute_by_sql_file(cls, file_path: str, params: Optional[tuple] = None) -> None:
"""从 SQL 文件中读取非查询语句并执行"""
with open(file_path, 'r', encoding='utf-8') as f:
command = f.read()
# 按照需要执行多条语句
commands = command.split(';')
for cmd in commands:
cmd = cmd.strip()
if cmd:
cls.execute(cmd, params)
@classmethod
def execute_many(cls, command: str, seq_of_params: List[tuple]) -> None:
"""执行多条非查询语句"""
conn = sqlite3.connect(os.environ.get('DATABASE_PATH', './data/database.db'))
cursor = conn.cursor()
cursor.executemany(command, seq_of_params)
conn.commit()
cursor.close()
conn.close()
@classmethod
def execute_many_values_by_sql_file(cls, file_path: str, seq_of_params: List[tuple]) -> None:
"""从 SQL 文件中读取一条语句,但是被不同值同时执行"""
with open(file_path, 'r', encoding='utf-8') as f:
command = f.read()
cls.execute_many(command, seq_of_params)

15
konabot/core/preinit.py Normal file
View File

@ -0,0 +1,15 @@
from pathlib import Path
from nonebot import logger
def preinit(path: str):
# 执行预初始化,递归找到位于对应路径内文件名为 __preinit__.py 的所有文件都会被执行
dir_path = Path(path)
for item in dir_path.iterdir():
if item.is_dir():
preinit(item)
elif item.is_file() and item.name == "__preinit__.py":
# 动态导入该文件以执行预初始化代码
module_path = str(item.with_suffix("")).replace("/", ".").replace("\\", ".")
__import__(module_path)
logger.info(f"Preinitialized module: {module_path}")

View File

@ -7,16 +7,19 @@ from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna
from PIL import Image from PIL import Image
import numpy as np import numpy as np
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH from konabot.common.path import ASSETS_PATH
from konabot.common.web_render import WebRenderer from konabot.common.web_render import WebRenderer
from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform
from pathlib import Path
import random import random
import math import math
ROOT_PATH = Path(__file__).resolve().parent
def get_ac(id: str) -> AirConditioner: def get_ac(id: str) -> AirConditioner:
ac = AirConditioner.air_conditioners.get(id) ac = AirConditioner.get_ac(id)
if ac is None: if ac is None:
ac = AirConditioner(id) ac = AirConditioner(id)
return ac return ac
@ -61,7 +64,7 @@ evt = on_alconna(Alconna(
async def _(event: BaseEvent, target: DepLongTaskTarget): async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id id = target.channel_id
ac = get_ac(id) ac = get_ac(id)
ac.on = True ac.update_ac(state=True)
await send_ac_image(evt, ac) await send_ac_image(evt, ac)
evt = on_alconna(Alconna( evt = on_alconna(Alconna(
@ -72,7 +75,7 @@ evt = on_alconna(Alconna(
async def _(event: BaseEvent, target: DepLongTaskTarget): async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id id = target.channel_id
ac = get_ac(id) ac = get_ac(id)
ac.on = False ac.update_ac(state=False)
await send_ac_image(evt, ac) await send_ac_image(evt, ac)
evt = on_alconna(Alconna( evt = on_alconna(Alconna(
@ -82,6 +85,8 @@ evt = on_alconna(Alconna(
@evt.handle() @evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1): async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp is None:
temp = 1
if temp <= 0: if temp <= 0:
return return
id = target.channel_id id = target.channel_id
@ -89,7 +94,7 @@ async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[in
if not ac.on or ac.burnt == True or ac.frozen == True: if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac) await send_ac_image(evt, ac)
return return
ac.temperature += temp ac.update_ac(temperature_delta=temp)
if ac.temperature > 40: if ac.temperature > 40:
# 根据温度随机出是否爆炸40度开始呈指数增长 # 根据温度随机出是否爆炸40度开始呈指数增长
possibility = -math.e ** ((40-ac.temperature) / 50) + 1 possibility = -math.e ** ((40-ac.temperature) / 50) + 1
@ -115,6 +120,8 @@ evt = on_alconna(Alconna(
@evt.handle() @evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1): async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp is None:
temp = 1
if temp <= 0: if temp <= 0:
return return
id = target.channel_id id = target.channel_id
@ -122,7 +129,7 @@ async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[in
if not ac.on or ac.burnt == True or ac.frozen == True: if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac) await send_ac_image(evt, ac)
return return
ac.temperature -= temp ac.update_ac(temperature_delta=-temp)
if ac.temperature < 0: if ac.temperature < 0:
# 根据温度随机出是否冻结0度开始呈指数增长 # 根据温度随机出是否冻结0度开始呈指数增长
possibility = -math.e ** (ac.temperature / 50) + 1 possibility = -math.e ** (ac.temperature / 50) + 1
@ -141,6 +148,16 @@ async def _(event: BaseEvent, target: DepLongTaskTarget):
ac.change_ac() ac.change_ac()
await send_ac_image(evt, ac) await send_ac_image(evt, ac)
def query_number_ranking(id: str) -> tuple[int, int]:
result = DatabaseManager.query_by_sql_file(
ROOT_PATH / "sql" / "query_crash_and_rank.sql",
(id,id)
)
if len(result) == 0:
return 0, 0
else:
return result[0].values()
evt = on_alconna(Alconna( evt = on_alconna(Alconna(
"空调炸炸排行榜", "空调炸炸排行榜",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@ -148,8 +165,9 @@ evt = on_alconna(Alconna(
@evt.handle() @evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget): async def _(event: BaseEvent, target: DepLongTaskTarget):
id = target.channel_id id = target.channel_id
ac = get_ac(id) # ac = get_ac(id)
number, ranking = ac.get_crashes_and_ranking() # number, ranking = ac.get_crashes_and_ranking()
number, ranking = query_number_ranking(id)
params = { params = {
"number": number, "number": number,
"ranking": ranking "ranking": ranking

View File

@ -0,0 +1,9 @@
# 预初始化,只要是导入本插件包就会执行这里的代码
from pathlib import Path
from konabot.common.database import DatabaseManager
# 初始化数据库表
DatabaseManager.execute_by_sql_file(
Path(__file__).resolve().parent / "sql" / "create_table.sql"
)

View File

@ -1,20 +1,71 @@
from enum import Enum from enum import Enum
from io import BytesIO from io import BytesIO
from pathlib import Path
import cv2 import cv2
import numpy as np import numpy as np
from PIL import Image, ImageDraw, ImageFont from PIL import Image, ImageDraw, ImageFont
from konabot.common.database import DatabaseManager
from konabot.common.path import ASSETS_PATH, FONTS_PATH from konabot.common.path import ASSETS_PATH, FONTS_PATH
from konabot.common.path import DATA_PATH from konabot.common.path import DATA_PATH
import json import json
ROOT_PATH = Path(__file__).resolve().parent
class CrashType(Enum): class CrashType(Enum):
BURNT = 0 BURNT = 0
FROZEN = 1 FROZEN = 1
class AirConditioner: class AirConditioner:
air_conditioners: dict[str, "AirConditioner"] = {} @classmethod
def get_ac(cls, id: str) -> 'AirConditioner':
result = DatabaseManager.query_by_sql_file(ROOT_PATH / "sql" / "query_ac.sql", (id,))
if len(result) == 0:
ac = cls.create_ac(id)
return ac
ac_data = result[0]
ac = AirConditioner(id)
ac.on = bool(ac_data["on"])
ac.temperature = float(ac_data["temperature"])
ac.burnt = bool(ac_data["burnt"])
ac.frozen = bool(ac_data["frozen"])
return ac
@classmethod
def create_ac(cls, id: str) -> 'AirConditioner':
ac = AirConditioner(id)
DatabaseManager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_ac.sql",
(id, ac.on, ac.temperature, ac.burnt, ac.frozen)
)
return ac
def update_ac(self, state: bool = None, temperature_delta: float = None, burnt: bool = None, frozen: bool = None) -> 'AirConditioner':
if state is not None:
self.on = state
if temperature_delta is not None:
self.temperature += temperature_delta
if burnt is not None:
self.burnt = burnt
if frozen is not None:
self.frozen = frozen
DatabaseManager.execute_by_sql_file(
ROOT_PATH / "sql" / "update_ac.sql",
(self.on, self.temperature, self.burnt, self.frozen, self.id)
)
return self
def change_ac(self) -> 'AirConditioner':
self.on = False
self.temperature = 24
self.burnt = False
self.frozen = False
DatabaseManager.execute_by_sql_file(
ROOT_PATH / "sql" / "update_ac.sql",
(self.on, self.temperature, self.burnt, self.frozen, self.id)
)
return self
def __init__(self, id: str) -> None: def __init__(self, id: str) -> None:
self.id = id self.id = id
@ -22,45 +73,40 @@ class AirConditioner:
self.temperature = 24 # 默认温度 self.temperature = 24 # 默认温度
self.burnt = False self.burnt = False
self.frozen = False self.frozen = False
AirConditioner.air_conditioners[id] = self
def change_ac(self):
self.burnt = False
self.frozen = False
self.on = False
self.temperature = 24 # 重置为默认温度
def broke_ac(self, crash_type: CrashType): def broke_ac(self, crash_type: CrashType):
''' '''
让空调坏掉,并保存数据 让空调坏掉
:param crash_type: CrashType 枚举,表示空调坏掉的类型 :param crash_type: CrashType 枚举,表示空调坏掉的类型
''' '''
match crash_type: match crash_type:
case CrashType.BURNT: case CrashType.BURNT:
self.burnt = True self.update_ac(burnt=True)
case CrashType.FROZEN: case CrashType.FROZEN:
self.frozen = True self.update_ac(frozen=True)
self.save_crash_data(crash_type) DatabaseManager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_crash.sql",
(self.id, crash_type.value)
)
def save_crash_data(self, crash_type: CrashType): # def save_crash_data(self, crash_type: CrashType):
''' # '''
如果空调爆炸了,就往本地的 ac_crash_data.json 里该 id 的记录加一 # 如果空调爆炸了,就往本地的 ac_crash_data.json 里该 id 的记录加一
''' # '''
data_file = DATA_PATH / "ac_crash_data.json" # data_file = DATA_PATH / "ac_crash_data.json"
crash_data = {} # crash_data = {}
if data_file.exists(): # if data_file.exists():
with open(data_file, "r", encoding="utf-8") as f: # with open(data_file, "r", encoding="utf-8") as f:
crash_data = json.load(f) # crash_data = json.load(f)
if self.id not in crash_data: # if self.id not in crash_data:
crash_data[self.id] = {"burnt": 0, "frozen": 0} # crash_data[self.id] = {"burnt": 0, "frozen": 0}
match crash_type: # match crash_type:
case CrashType.BURNT: # case CrashType.BURNT:
crash_data[self.id]["burnt"] += 1 # crash_data[self.id]["burnt"] += 1
case CrashType.FROZEN: # case CrashType.FROZEN:
crash_data[self.id]["frozen"] += 1 # crash_data[self.id]["frozen"] += 1
with open(data_file, "w", encoding="utf-8") as f: # with open(data_file, "w", encoding="utf-8") as f:
json.dump(crash_data, f, ensure_ascii=False, indent=4) # json.dump(crash_data, f, ensure_ascii=False, indent=4)
def get_crashes_and_ranking(self) -> tuple[int, int]: def get_crashes_and_ranking(self) -> tuple[int, int]:
''' '''

View File

@ -0,0 +1,15 @@
-- 创建所有表
CREATE TABLE IF NOT EXISTS air_conditioner (
id VARCHAR(128) PRIMARY KEY,
'on' BOOLEAN NOT NULL,
temperature REAL NOT NULL,
burnt BOOLEAN NOT NULL,
frozen BOOLEAN NOT NULL
);
CREATE TABLE IF NOT EXISTS air_conditioner_crash_log (
id VARCHAR(128) NOT NULL,
crash_type INT NOT NULL,
timestamp DATETIME NOT NULL,
FOREIGN KEY (id) REFERENCES air_conditioner(id)
);

View File

@ -0,0 +1,3 @@
-- 插入一台新空调
INSERT INTO air_conditioner (id, 'on', temperature, burnt, frozen)
VALUES (?, ?, ?, ?, ?);

View File

@ -0,0 +1,3 @@
-- 插入一条空调爆炸记录
INSERT INTO air_conditioner_crash_log (id, crash_type, timestamp)
VALUES (?, ?, CURRENT_TIMESTAMP);

View File

@ -0,0 +1,4 @@
-- 查询空调状态,如果没有就插入一条新的记录
SELECT *
FROM air_conditioner
WHERE id = ?;

View File

@ -0,0 +1,23 @@
-- 从 air_conditioner_crash_log 表中获取指定 id 损坏的次数以及损坏次数的排名
SELECT crash_count, crash_rank
FROM (
SELECT id,
COUNT(*) AS crash_count,
RANK() OVER (ORDER BY COUNT(*) DESC) AS crash_rank
FROM air_conditioner_crash_log
GROUP BY id
) AS ranked_data
WHERE id = ?
-- 如果该 id 没有损坏记录,则返回 0 次损坏和对应的最后一名
UNION
SELECT 0 AS crash_count,
(SELECT COUNT(DISTINCT id) + 1 FROM air_conditioner_crash_log) AS crash_rank
FROM (
SELECT DISTINCT id
FROM air_conditioner_crash_log
) AS ranked_data
WHERE NOT EXISTS (
SELECT 1
FROM air_conditioner_crash_log
WHERE id = ?
);

View File

@ -0,0 +1,4 @@
-- 更新空调状态
UPDATE air_conditioner
SET 'on' = ?, temperature = ?, burnt = ?, frozen = ?
WHERE id = ?;

View File

@ -18,11 +18,14 @@ from nonebot_plugin_alconna import (
on_alconna, on_alconna,
) )
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH from konabot.common.path import ASSETS_PATH
from konabot.common.llm import get_llm from konabot.common.llm import get_llm
ROOT_PATH = Path(__file__).resolve().parent
DATA_DIR = Path(__file__).parent.parent.parent.parent / "data" DATA_DIR = Path(__file__).parent.parent.parent.parent / "data"
DATA_FILE_PATH = ( DATA_FILE_PATH = (
@ -94,18 +97,19 @@ class IdiomGameLLM:
@classmethod @classmethod
async def storage_idiom(cls, idiom: str): async def storage_idiom(cls, idiom: str):
# 将 idiom 存入本地文件以备后续分析 # 将 idiom 存入数据库
with open(DATA_DIR / "idiom_llm_storage.txt", "a", encoding="utf-8") as f: DatabaseManager.execute_by_sql_file(
f.write(idiom + "\n") ROOT_PATH / "sql" / "insert_custom_word.sql",
IdiomGame.append_into_word_list(idiom) (idiom,)
)
class IdiomGame: class IdiomGame:
ALL_WORDS = [] # 所有四字词语 # ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语 # ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例 INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典 # IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典
AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典 # AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典
__inited = False __inited = False
@ -130,11 +134,10 @@ class IdiomGame:
''' '''
将一个新词加入到词语列表中 将一个新词加入到词语列表中
''' '''
if word not in cls.ALL_WORDS: DatabaseManager.execute_by_sql_file(
cls.ALL_WORDS.append(word) ROOT_PATH / "sql" / "insert_custom_word.sql",
if word[0] not in cls.IDIOM_FIRST_CHAR: (word,)
cls.IDIOM_FIRST_CHAR[word[0]] = [] )
cls.IDIOM_FIRST_CHAR[word[0]].append(word)
def be_able_to_play(self) -> bool: def be_able_to_play(self) -> bool:
if self.last_play_date != datetime.date.today(): if self.last_play_date != datetime.date.today():
@ -145,11 +148,17 @@ class IdiomGame:
return True return True
return False return False
@staticmethod
def random_idiom() -> str:
return DatabaseManager.query_by_sql_file(
ROOT_PATH / "sql" / "random_choose_idiom.sql"
)[0]["idiom"]
def choose_start_idiom(self) -> str: def choose_start_idiom(self) -> str:
""" """
随机选择一个成语作为起始成语 随机选择一个成语作为起始成语
""" """
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) self.last_idiom = IdiomGame.random_idiom()
self.last_char = self.last_idiom[-1] self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char): if not self.is_nextable(self.last_char):
self.choose_start_idiom() self.choose_start_idiom()
@ -208,7 +217,7 @@ class IdiomGame:
return self.last_idiom return self.last_idiom
def _skip_idiom_async(self) -> str: def _skip_idiom_async(self) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) self.last_idiom = IdiomGame.random_idiom()
self.last_char = self.last_idiom[-1] self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char): if not self.is_nextable(self.last_char):
self._skip_idiom_async() self._skip_idiom_async()
@ -228,8 +237,11 @@ class IdiomGame:
""" """
判断是否有成语可以接 判断是否有成语可以接
""" """
return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR return DatabaseManager.query_by_sql_file(
ROOT_PATH / "sql" / "is_nextable.sql",
(last_char,)
)[0]["DEED"] == 1
def add_already_idiom(self, idiom: str): def add_already_idiom(self, idiom: str):
if idiom in self.already_idioms: if idiom in self.already_idioms:
self.already_idioms[idiom] += 1 self.already_idioms[idiom] += 1
@ -259,7 +271,12 @@ class IdiomGame:
if idiom[0] != self.last_char: if idiom[0] != self.last_char:
state.append(TryVerifyState.WRONG_FIRST_CHAR) state.append(TryVerifyState.WRONG_FIRST_CHAR)
return state return state
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS: # 成语是否存在
result = DatabaseManager.query_by_sql_file(
ROOT_PATH / "sql" / "query_idiom.sql",
(idiom, idiom, idiom)
)[0]["status"]
if result == -1:
logger.info(f"用户 {user_id} 发送了未知词语 {idiom},正在使用 LLM 进行验证") logger.info(f"用户 {user_id} 发送了未知词语 {idiom},正在使用 LLM 进行验证")
try: try:
if not await IdiomGameLLM.verify_idiom_with_llm(idiom): if not await IdiomGameLLM.verify_idiom_with_llm(idiom):
@ -281,7 +298,7 @@ class IdiomGame:
self.last_idiom = idiom self.last_idiom = idiom
self.last_char = idiom[-1] self.last_char = idiom[-1]
self.add_score(user_id, 1 * score_k) # 先加 1 分 self.add_score(user_id, 1 * score_k) # 先加 1 分
if idiom in IdiomGame.ALL_IDIOMS: if result == 1:
state.append(TryVerifyState.VERIFIED_AND_REAL) state.append(TryVerifyState.VERIFIED_AND_REAL)
self.add_score(user_id, 4 * score_k) # 再加 4 分 self.add_score(user_id, 4 * score_k) # 再加 4 分
self.remain_rounds -= 1 self.remain_rounds -= 1
@ -319,14 +336,21 @@ class IdiomGame:
@classmethod @classmethod
def random_idiom_starting_with(cls, first_char: str) -> Optional[str]: def random_idiom_starting_with(cls, first_char: str) -> Optional[str]:
cls.init_lexicon() cls.init_lexicon()
if first_char not in cls.AVALIABLE_IDIOM_FIRST_CHAR: result = DatabaseManager.query_by_sql_file(
ROOT_PATH / "sql" / "query_idiom_start_with.sql",
(first_char,)
)
if len(result) == 0:
return None return None
return secrets.choice(cls.AVALIABLE_IDIOM_FIRST_CHAR[first_char]) return result[0]["idiom"]
@classmethod @classmethod
def init_lexicon(cls): def init_lexicon(cls):
if cls.__inited: if cls.__inited:
return return
DatabaseManager.execute_by_sql_file(
ROOT_PATH / "sql" / "create_table.sql"
) # 确保数据库初始化
cls.__inited = True cls.__inited = True
# 成语大表 # 成语大表
@ -334,11 +358,12 @@ class IdiomGame:
ALL_IDIOMS_INFOS = json.load(f) ALL_IDIOMS_INFOS = json.load(f)
# 词语大表 # 词语大表
ALL_WORDS = []
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f: with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
jsonData = json.load(f) jsonData = json.load(f)
cls.ALL_WORDS = [item["ci"] for item in jsonData] ALL_WORDS = [item["ci"] for item in jsonData]
logger.debug(f"Loaded {len(cls.ALL_WORDS)} words from ci.json") logger.debug(f"Loaded {len(ALL_WORDS)} words from ci.json")
logger.debug(f"Sample words: {cls.ALL_WORDS[:5]}") logger.debug(f"Sample words: {ALL_WORDS[:5]}")
COMMON_WORDS = [] COMMON_WORDS = []
# 读取 COMMON 词语大表 # 读取 COMMON 词语大表
@ -389,29 +414,44 @@ class IdiomGame:
logger.debug(f"Loaded additional {len(LOCAL_LLM_WORDS)} words from idiom_llm_storage.txt") logger.debug(f"Loaded additional {len(LOCAL_LLM_WORDS)} words from idiom_llm_storage.txt")
# 只有成语的大表 # 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重 ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
# 批量插入数据库
DatabaseManager.execute_many_values_by_sql_file(
ROOT_PATH / "sql" / "insert_idiom.sql",
[(idiom,) for idiom in ALL_IDIOMS]
)
# 其他四字词语表,仅表示可以有这个词 # 其他四字词语表,仅表示可以有这个词
cls.ALL_WORDS = ( ALL_WORDS = (
[word for word in cls.ALL_WORDS if len(word) == 4] [word for word in ALL_WORDS if len(word) == 4]
+ THUOCL_WORDS + THUOCL_WORDS
+ COMMON_WORDS + COMMON_WORDS
+ LOCAL_LLM_WORDS
) )
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重 # 插入数据库
DatabaseManager.execute_many_values_by_sql_file(
ROOT_PATH / "sql" / "insert_word.sql",
[(word,) for word in ALL_WORDS]
)
# 根据成语大表,划分出成语首字字典 # 自定义词语 LOCAL_LLM_WORDS 插入数据库,兼容用
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS: DatabaseManager.execute_many_values_by_sql_file(
if idiom[0] not in cls.IDIOM_FIRST_CHAR: ROOT_PATH / "sql" / "insert_custom_word.sql",
cls.IDIOM_FIRST_CHAR[idiom[0]] = [] [(word,) for word in LOCAL_LLM_WORDS]
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom) )
# 根据真正的成语大表,划分出有效成语首字字典 # # 根据成语大表,划分出成语首字字典
for idiom in cls.ALL_IDIOMS: # for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR: # if idiom[0] not in cls.IDIOM_FIRST_CHAR:
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = [] # cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom) # cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
# # 根据真正的成语大表,划分出有效成语首字字典
# for idiom in cls.ALL_IDIOMS:
# if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
# cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = []
# cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna( evt = on_alconna(
@ -514,7 +554,9 @@ async def end_game(event: BaseEvent, group_id: str):
for line in history_lines: for line in history_lines:
result_text += line + "\n" result_text += line + "\n"
await evt.send(await result_text.export()) await evt.send(await result_text.export())
instance.clear_score_board() # instance.clear_score_board()
# 将实例删除
del IdiomGame.INSTANCE_LIST[group_id]
evt = on_alconna( evt = on_alconna(

View File

@ -0,0 +1,15 @@
-- 创建成语大表
CREATE TABLE IF NOT EXISTS all_idioms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
idiom VARCHAR(128) NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS all_words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word VARCHAR(128) NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS custom_words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word VARCHAR(128) NOT NULL UNIQUE
);

View File

@ -0,0 +1,3 @@
-- 插入自定义词
INSERT OR IGNORE INTO custom_words (word)
VALUES (?);

View File

@ -0,0 +1,3 @@
-- 插入成语大表,避免重复插入
INSERT OR IGNORE INTO all_idioms (idiom)
VALUES (?);

View File

@ -0,0 +1,3 @@
-- 插入词
INSERT OR IGNORE INTO all_words (word)
VALUES (?);

View File

@ -0,0 +1,5 @@
-- 查询是否有以 xx 开头的成语,有则返回真,否则假
SELECT EXISTS(
SELECT 1 FROM all_idioms
WHERE idiom LIKE ? || '%'
) AS DEED;

View File

@ -0,0 +1,7 @@
-- 查询成语是否在 all_idioms 中,如果存在则返回 1否则再判断是否在 custom_words 或 all_words 中,存在则返回 0否则返回 -1
SELECT
CASE
WHEN EXISTS (SELECT 1 FROM all_idioms WHERE idiom = ?) THEN 1
WHEN EXISTS (SELECT 1 FROM custom_words WHERE word = ?) OR EXISTS (SELECT 1 FROM all_words WHERE word = ?) THEN 0
ELSE -1
END AS status;

View File

@ -0,0 +1,4 @@
-- 查询以 xx 开头的成语,随机打乱后只取第一个
SELECT idiom FROM all_idioms
WHERE idiom LIKE ? || '%'
ORDER BY RANDOM() LIMIT 1;

View File

@ -0,0 +1,2 @@
-- 随机从 all_idioms 表中选择一个成语
SELECT idiom FROM all_idioms ORDER BY RANDOM() LIMIT 1;