This commit is contained in:
Submodule assets/oracle updated: 29eea55632...5a3ef41d61
@ -1,11 +1,12 @@
|
|||||||
import asyncio as asynkio
|
import asyncio as asynkio
|
||||||
import datetime
|
import datetime
|
||||||
|
from io import BytesIO
|
||||||
import json
|
import json
|
||||||
import secrets
|
import secrets
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
from PIL import Image
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from nonebot import on_message
|
from nonebot import on_message
|
||||||
import nonebot
|
import nonebot
|
||||||
@ -617,14 +618,23 @@ async def _(event: BaseEvent, target: DepLongTaskTarget):
|
|||||||
# 打开好吧狗本地文件
|
# 打开好吧狗本地文件
|
||||||
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
|
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
|
||||||
img_data = f.read()
|
img_data = f.read()
|
||||||
|
# 把好吧狗变成 GIF 格式以缩小尺寸
|
||||||
|
img_data = await convert_image_to_gif(img_data)
|
||||||
await evt.send(await UniMessage().image(raw=img_data).export())
|
await evt.send(await UniMessage().image(raw=img_data).export())
|
||||||
await end_game(event, group_id)
|
await end_game(event, group_id)
|
||||||
else:
|
else:
|
||||||
await evt.send(
|
# await evt.send(
|
||||||
await UniMessage().text("当前没有成语接龙游戏在进行中!").export()
|
# await UniMessage().text("当前没有成语接龙游戏在进行中!").export()
|
||||||
)
|
# )
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
async def convert_image_to_gif(image_data: bytes) -> bytes:
|
||||||
|
with Image.open(BytesIO(image_data)) as img:
|
||||||
|
with BytesIO() as output:
|
||||||
|
img.save(output, format="GIF")
|
||||||
|
return output.getvalue()
|
||||||
|
|
||||||
# 跳过
|
# 跳过
|
||||||
evt = on_alconna(
|
evt = on_alconna(
|
||||||
Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
|
Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
|
||||||
@ -642,6 +652,8 @@ async def _(target: DepLongTaskTarget):
|
|||||||
# 发送哈哈狗图片
|
# 发送哈哈狗图片
|
||||||
with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f:
|
with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f:
|
||||||
img_data = f.read()
|
img_data = f.read()
|
||||||
|
# 把哈哈狗变成 GIF 格式以缩小尺寸
|
||||||
|
img_data = await convert_image_to_gif(img_data)
|
||||||
await evt.send(await UniMessage().image(raw=img_data).export())
|
await evt.send(await UniMessage().image(raw=img_data).export())
|
||||||
await evt.send(await UniMessage().text(f"你们太菜了,全部扣100分!明明还可以接「{avaliable_idiom}」的!").export())
|
await evt.send(await UniMessage().text(f"你们太菜了,全部扣100分!明明还可以接「{avaliable_idiom}」的!").export())
|
||||||
idiom = await instance.skip_idiom(-100)
|
idiom = await instance.skip_idiom(-100)
|
||||||
|
|||||||
@ -4,6 +4,8 @@ import json
|
|||||||
import secrets
|
import secrets
|
||||||
import csv
|
import csv
|
||||||
import zipfile
|
import zipfile
|
||||||
|
from PIL import Image
|
||||||
|
from io import BytesIO
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
@ -126,12 +128,12 @@ class oracleGame:
|
|||||||
|
|
||||||
def get_oracle_image(self) -> bytes:
|
def get_oracle_image(self) -> bytes:
|
||||||
IMAGE_PATH = ASSETS_PATH / "oracle" / "image"
|
IMAGE_PATH = ASSETS_PATH / "oracle" / "image"
|
||||||
with open(IMAGE_PATH / self.current_oracle_id / self.current_oracle_id / f"{self.current_oracle_id}.png", "rb") as f:
|
with open(IMAGE_PATH / self.ALL_ORACLES[self.current_oracle_id]["image"], "rb") as f:
|
||||||
img_data = f.read()
|
img_data = f.read()
|
||||||
return img_data
|
return img_data
|
||||||
|
|
||||||
def get_oracle_name(self) -> str:
|
def get_oracle_name(self) -> str:
|
||||||
return self.ALL_ORACLES.get(self.current_oracle_id, "?")[0]
|
return self.ALL_ORACLES.get(self.current_oracle_id, {}).get("oracle", "?")[0]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def random_oracle() -> str:
|
async def random_oracle() -> str:
|
||||||
@ -205,14 +207,19 @@ class oracleGame:
|
|||||||
|
|
||||||
async def _verify_oracle(self, oracle: str, user_id: str) -> list[TryVerifyState]:
|
async def _verify_oracle(self, oracle: str, user_id: str) -> list[TryVerifyState]:
|
||||||
state = []
|
state = []
|
||||||
if oracle not in self.ALL_ORACLES[self.current_oracle_id]:
|
if oracle.strip() not in self.ALL_ORACLES[self.current_oracle_id].get("oracle", ""):
|
||||||
|
return [TryVerifyState.NOT_ORACLE]
|
||||||
|
if oracle.strip() == "":
|
||||||
return [TryVerifyState.NOT_ORACLE]
|
return [TryVerifyState.NOT_ORACLE]
|
||||||
# 甲骨文合法,更新状态
|
# 甲骨文合法,更新状态
|
||||||
|
state.append(TryVerifyState.VERIFIED)
|
||||||
self.add_score(user_id, 1) # 加 1 分
|
self.add_score(user_id, 1) # 加 1 分
|
||||||
self.remain_rounds -= 1
|
self.remain_rounds -= 1
|
||||||
if self.remain_rounds <= 0:
|
if self.remain_rounds <= 0:
|
||||||
self.now_playing = False
|
self.now_playing = False
|
||||||
state.append(TryVerifyState.GAME_END)
|
state.append(TryVerifyState.GAME_END)
|
||||||
|
else:
|
||||||
|
await self._skip_oracle_async()
|
||||||
return state
|
return state
|
||||||
|
|
||||||
def get_user_score(self, user_id: str) -> float:
|
def get_user_score(self, user_id: str) -> float:
|
||||||
@ -248,7 +255,11 @@ class oracleGame:
|
|||||||
for row in reader:
|
for row in reader:
|
||||||
char = row["子字头"].strip()
|
char = row["子字头"].strip()
|
||||||
oracle = row["释文"].strip()
|
oracle = row["释文"].strip()
|
||||||
cls.ALL_ORACLES[char] = oracle
|
img_path = row.get("路径", "").strip()
|
||||||
|
cls.ALL_ORACLES[char] = {
|
||||||
|
"oracle": oracle,
|
||||||
|
"image": img_path,
|
||||||
|
}
|
||||||
|
|
||||||
logger.info(f"加载甲骨文字典,共计 {len(cls.ALL_ORACLES)} 条记录")
|
logger.info(f"加载甲骨文字典,共计 {len(cls.ALL_ORACLES)} 条记录")
|
||||||
|
|
||||||
@ -377,14 +388,22 @@ async def _(event: BaseEvent, target: DepLongTaskTarget):
|
|||||||
# 打开好吧狗本地文件
|
# 打开好吧狗本地文件
|
||||||
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
|
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
|
||||||
img_data = f.read()
|
img_data = f.read()
|
||||||
|
# 把好吧狗变成 GIF 格式以缩小尺寸
|
||||||
|
img_data = await convert_image_to_gif(img_data)
|
||||||
await evt.send(await UniMessage().image(raw=img_data).export())
|
await evt.send(await UniMessage().image(raw=img_data).export())
|
||||||
await end_game(event, group_id)
|
await end_game(event, group_id)
|
||||||
else:
|
else:
|
||||||
await evt.send(
|
# await evt.send(
|
||||||
await UniMessage().text("当前没有甲骨文游戏在进行中!").export()
|
# await UniMessage().text("当前没有甲骨文游戏在进行中!").export()
|
||||||
)
|
# )
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
async def convert_image_to_gif(image_data: bytes) -> bytes:
|
||||||
|
with Image.open(BytesIO(image_data)) as img:
|
||||||
|
with BytesIO() as output:
|
||||||
|
img.save(output, format="GIF")
|
||||||
|
return output.getvalue()
|
||||||
# 跳过
|
# 跳过
|
||||||
evt = on_alconna(
|
evt = on_alconna(
|
||||||
Alconna("跳过甲骨文"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
|
Alconna("跳过甲骨文"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
|
||||||
@ -401,6 +420,8 @@ async def _(target: DepLongTaskTarget):
|
|||||||
# 发送哈哈狗图片
|
# 发送哈哈狗图片
|
||||||
with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f:
|
with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f:
|
||||||
img_data = f.read()
|
img_data = f.read()
|
||||||
|
# 把哈哈狗变成 GIF 格式以缩小尺寸
|
||||||
|
img_data = await convert_image_to_gif(img_data)
|
||||||
oracle = instance.get_oracle_name()
|
oracle = instance.get_oracle_name()
|
||||||
await evt.send(await UniMessage().image(raw=img_data).export())
|
await evt.send(await UniMessage().image(raw=img_data).export())
|
||||||
await evt.send(await UniMessage().text(f"你们太菜了,全部扣100分!这个甲骨文是「{oracle}」!").export())
|
await evt.send(await UniMessage().text(f"你们太菜了,全部扣100分!这个甲骨文是「{oracle}」!").export())
|
||||||
@ -438,10 +459,11 @@ async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
|
|||||||
state = await instance.try_verify_oracle(user_oracle, user_id)
|
state = await instance.try_verify_oracle(user_oracle, user_id)
|
||||||
if TryVerifyState.NOT_ORACLE in state:
|
if TryVerifyState.NOT_ORACLE in state:
|
||||||
return
|
return
|
||||||
if TryVerifyState.VERIFIED:
|
if TryVerifyState.VERIFIED in state:
|
||||||
await evt.send(
|
await evt.send(
|
||||||
await UniMessage()
|
await UniMessage()
|
||||||
.text(f"{user_name} 答对了!获得 1 分!")
|
.at(user_id)
|
||||||
|
.text(" 答对了!获得 1 分!")
|
||||||
.export()
|
.export()
|
||||||
)
|
)
|
||||||
if TryVerifyState.GAME_END in state:
|
if TryVerifyState.GAME_END in state:
|
||||||
|
|||||||
59
konabot/plugins/oracle_game/find_path.py
Normal file
59
konabot/plugins/oracle_game/find_path.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
import csv
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
# 获取当前文件所在目录
|
||||||
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
# 向上两级到 konabot 目录
|
||||||
|
konabot_dir = os.path.abspath(os.path.join(current_dir, '../../../'))
|
||||||
|
|
||||||
|
if konabot_dir not in sys.path:
|
||||||
|
sys.path.insert(0, konabot_dir)
|
||||||
|
|
||||||
|
from konabot.common.path import ASSETS_PATH
|
||||||
|
|
||||||
|
|
||||||
|
ORACLE_PATH = ASSETS_PATH / "oracle"
|
||||||
|
|
||||||
|
final_zi_dict = {}
|
||||||
|
with open(ORACLE_PATH / "zi_dict.csv", "r", encoding="utf-8-sig") as f:
|
||||||
|
reader = csv.DictReader(f)
|
||||||
|
for row in reader:
|
||||||
|
logger.info(f"Progress: {reader.line_num}")
|
||||||
|
# 找到子字头字段,并找到对应的图像路径
|
||||||
|
char = row["子字头"].strip()
|
||||||
|
# 寻找路径
|
||||||
|
image_path = ORACLE_PATH / "image"
|
||||||
|
# 遍历所有子目录,寻找对应的图片文件
|
||||||
|
found_image = None
|
||||||
|
for subdir in image_path.iterdir():
|
||||||
|
if subdir.is_dir():
|
||||||
|
candidate = subdir / char
|
||||||
|
if candidate.exists():
|
||||||
|
# 寻找该目录下有没有以 char 命名的图片文件,没有就选第一个图片文件
|
||||||
|
if (candidate / f"{char}.png").exists():
|
||||||
|
found_image = candidate / f"{char}.png"
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
for file in candidate.iterdir():
|
||||||
|
if file.suffix.lower() in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]:
|
||||||
|
found_image = file
|
||||||
|
break
|
||||||
|
if found_image is not None:
|
||||||
|
# 提取相对路径
|
||||||
|
found_image = found_image.relative_to(ORACLE_PATH / "image")
|
||||||
|
# 反斜杠改正为斜杠
|
||||||
|
found_image = found_image.as_posix()
|
||||||
|
# 更新行数据
|
||||||
|
row.update({"路径": str(found_image)})
|
||||||
|
final_zi_dict[char] = row
|
||||||
|
|
||||||
|
# 将最终的字典写入新的 CSV 文件
|
||||||
|
with open(ORACLE_PATH / "zi_dict_with_images.csv", "w", encoding="utf-8-sig", newline="") as f:
|
||||||
|
fieldnames = list(final_zi_dict[next(iter(final_zi_dict))].keys())
|
||||||
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||||||
|
writer.writeheader()
|
||||||
|
for char, data in final_zi_dict.items():
|
||||||
|
writer.writerow(data)
|
||||||
Reference in New Issue
Block a user