Compare commits

...

7 Commits

Author SHA1 Message Date
3e395f8a35 更少的量,更解耦的数据,更健壮的系统
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 11:56:03 +08:00
312e203bbe 忘记把这个答题情况通知加上了
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 04:07:26 +08:00
f9deabfce0 修复 Query 逻辑
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 04:03:51 +08:00
0a822bf440 优化 konaph UX 并添加文档
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 03:55:31 +08:00
534a2c9e75 解密厨来了2
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 03:42:28 +08:00
a03cef4124 解密厨来了 2025-10-26 03:23:51 +08:00
16351792b6 修复成语接龙大家没被扣分的BUG
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-25 23:59:41 +08:00
9 changed files with 801 additions and 3 deletions

View File

@ -51,13 +51,20 @@ class LongTaskTarget(BaseModel):
target_id: str
"沟通对象的 ID"
async def send_message(self, msg: UniMessage, at: bool = True) -> bool:
@property
def is_private_chat(self):
return self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX)
async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool:
try:
bot = nonebot.get_bot(self.self_id)
except KeyError:
logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}")
return False
if isinstance(msg, str):
msg = UniMessage.text(msg)
if self.platform == "qq":
if not isinstance(bot, OBBot):
logger.warning(

View File

@ -16,8 +16,6 @@ from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
from PIL import UnidentifiedImageError
from returns.result import Failure, Result, Success
from konabot.common.nb.exc import BotExceptionMessage
async def download_image_bytes(url: str) -> Result[bytes, str]:
# if "/matcha/cache/" in url:

View File

@ -0,0 +1,30 @@
from typing import Any, cast
import nonebot
from nonebot_plugin_alconna import UniMessage
from nonebot.adapters.onebot.v11 import Bot as OBBot
async def qq_broadcast(groups: list[str], msg: UniMessage[Any]):
bots: dict[str, OBBot] = {}
# group_id -> bot_id
availabilities: dict[str, str] = {}
for bot_id, bot in nonebot.get_bots().items():
if not isinstance(bot, OBBot):
continue
bots[bot_id] = bot
gl = await bot.get_group_list()
for g in gl:
gid = str(g.get("group_id", -1))
if gid in groups:
availabilities[gid] = bot_id
for group in groups:
if group in availabilities:
bot = bots[availabilities[group]]
await bot.send_group_msg(
group_id=int(group),
message=cast(Any, await msg.export(bot)),
auto_escape=False,
)

View File

@ -0,0 +1,4 @@
指令介绍
konaph - KonaBot 的 PuzzleHunt 管理工具
详细介绍请直接输入 konaph 获取使用指引(该指令权限仅对部分人开放。如果你有权限的话才有响应。建议在此方 BOT 私聊使用该指令。)

View File

@ -248,6 +248,7 @@ class IdiomGame:
if not self.is_nextable(self.last_char):
# 没有成语可以接了,自动跳过
self._skip_idiom_async()
self.add_buff_score(-100)
state.append(TryVerifyState.BUT_NO_NEXT)
return state

View File

@ -0,0 +1,106 @@
from math import ceil
from loguru import logger
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
from konabot.common.longtask import DepLongTaskTarget
from nonebot_plugin_apscheduler import scheduler
create_admin_commands()
async def is_play_group(target: DepLongTaskTarget):
if target.is_private_chat:
return True
if target.channel_id in config.plugin_puzzle_playgroup:
return True
return False
cmd_submit = on_alconna(Alconna(
"re:提交(?:答案|题解|[fF]lag)",
Args["flag", str],
), rule=is_play_group)
@cmd_submit.handle()
async def _(flag: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
result = manager.submit(target.target_id, flag)
await target.send_message(result.get_unimessage())
cmd_query = on_alconna(Alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
), rule=is_play_group)
@cmd_query.handle()
async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager:
p = manager.get_today_puzzle()
if p is None:
return await target.send_message("今天无题,改日再来吧!")
await target.send_message(p.get_unimessage())
cmd_history = on_alconna(Alconna(
"历史题目",
Args["page?", int],
Args["index_id?", str],
), rule=is_play_group)
@cmd_history.handle()
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
async with puzzle_manager() as manager:
today = get_today_date()
if index_id:
index_id = index_id.removeprefix("#")
if index_id == manager.daily_puzzle_of_date.get(today, ""):
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
return await target.send_message(puzzle.get_unimessage())
if index_id in manager.daily_puzzle:
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
msg = puzzle.get_unimessage()
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}")
return await target.send_message(msg)
return await target.send_message("没有这道题哦")
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
puzzles = [
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
for d, i in manager.daily_puzzle_of_date.items()
]
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
for p, d in puzzles:
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
msg = msg.text(
f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]"
f" {p.title} ({d})"
)
msg = msg.text("\n")
msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
await target.send_message(msg)
@scheduler.scheduled_job("cron", hour="8")
async def _():
async with puzzle_manager() as manager:
msg2 = manager.get_report_yesterday()
if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
puzzle = manager.get_today_puzzle()
if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage())
else:
logger.info("自动任务:没有找到题目,跳过")

View File

View File

@ -0,0 +1,323 @@
import asyncio
import datetime
import random
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
import nanoid
from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, Field, ValidationError
from konabot.common.path import DATA_PATH
KONAPH_BASE = DATA_PATH / "KonaPH"
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
# 保证所有文件夹存在
KONAPH_BASE.mkdir(exist_ok=True)
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)
class Puzzle(BaseModel):
raw_id: str
"用于给出题者管理的 ID"
index_id: str
"展出的 ID以展出顺序为准"
title: str
content: str
img_name: str
author_id: str
flag: str
ready: bool = False
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
def get_image_path(self) -> Path:
return KONAPH_IMAGE_BASE / self.img_name
def get_unimessage(self) -> UniMessage[Any]:
result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}")
result = result.text(f"\n\n{self.content}")
if self.img_name:
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes())
result = result.text("\n\n出题者:").at(self.author_id)
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
def add_image(self, img: bytes, suffix: str = ".png"):
if self.img_name:
self.get_image_path().unlink(True)
img_id = nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
21,
)
self.img_name = f"{img_id}{suffix}"
self.get_image_path().write_bytes(img)
def remove_image(self):
if self.img_name:
self.get_image_path().unlink(True)
self.img_name = ""
class PuzzleSubmission(BaseModel):
success: bool
flag: str
time: datetime.datetime
class DailyPuzzleInfo(BaseModel):
raw_id: str
time: datetime.date
tried_users: set[str] = set()
success_users: dict[str, datetime.datetime] = {}
class PuzzleSubmissionResultMessage(BaseModel):
success: bool
rank: int = -1
message: str = ""
def get_unimessage(self) -> UniMessage[Any]:
if self.success:
return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!")
return UniMessage.text(self.message)
def get_today_date() -> datetime.date:
now = datetime.datetime.now()
if now.hour < 8:
now -= datetime.timedelta(days=1)
return now.date()
class PuzzleManager(BaseModel):
puzzle_data: dict[str, Puzzle] = {}
daily_puzzle: dict[str, DailyPuzzleInfo] = {}
daily_puzzle_of_date: dict[datetime.date, str] = {}
puzzle_pinned: str = ""
index_id_counter: int = 1
submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {}
last_checked_date: datetime.date = Field(
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
)
@property
def last_publish_date(self):
return max(self.daily_puzzle_of_date.keys())
@property
def unpublished_puzzles(self):
return set((
p.raw_id for p in self.puzzle_data.values()
if not self.is_puzzle_published(p.raw_id) and p.ready
))
@property
def unready_puzzles(self):
return set((
p.raw_id for p in self.puzzle_data.values()
if not self.is_puzzle_published(p.raw_id) and not p.ready
))
@property
def published_puzzles(self):
return set((
p.raw_id for p in self.puzzle_data.values()
if self.is_puzzle_published(p.raw_id)
))
def is_puzzle_published(self, raw_id: str):
return raw_id in [i.raw_id for i in self.daily_puzzle.values()]
def publish_puzzle(self, raw_id: str):
assert raw_id in self.puzzle_data
today = get_today_date()
p = self.puzzle_data[raw_id]
p.index_id = str(self.index_id_counter)
p.ready = True
self.puzzle_pinned = ""
self.last_checked_date = today
self.daily_puzzle[p.index_id] = DailyPuzzleInfo(
raw_id=raw_id,
time=today,
)
self.daily_puzzle_of_date[today] = p.index_id
self.index_id_counter += 1
def fix(self):
# 尝试修复今日的数据
for p in self.puzzle_data.values():
if self.is_puzzle_published(p.raw_id):
p.ready = True
if self.puzzle_pinned not in self.unpublished_puzzles:
self.puzzle_pinned = ""
# 撤回重复发布的题
already_published: set[str] = set()
for date in list(self.daily_puzzle_of_date.keys()):
index_id = self.daily_puzzle_of_date[date]
info = self.daily_puzzle[index_id]
if info.raw_id in already_published:
del self.daily_puzzle[index_id]
del self.daily_puzzle_of_date[date]
else:
already_published.add(info.raw_id)
def admin_pin_puzzle(self, raw_id: str):
if raw_id in self.puzzle_data:
self.puzzle_pinned = raw_id
else:
self.puzzle_pinned = ""
def get_today_puzzle(self, strong: bool = False) -> Puzzle | None:
today = get_today_date()
if today in self.daily_puzzle_of_date:
index_id = self.daily_puzzle_of_date[today]
info = self.daily_puzzle[index_id]
return self.puzzle_data[info.raw_id]
if today == self.last_checked_date and not strong:
return
self.last_checked_date = today
if self.puzzle_pinned and self.puzzle_pinned in self.puzzle_data:
d = self.puzzle_pinned
self.publish_puzzle(d)
self.puzzle_pinned = ""
return self.puzzle_data[d]
elif len(self.unpublished_puzzles) > 0:
d = random.choice(list(self.unpublished_puzzles))
self.publish_puzzle(d)
return self.puzzle_data[d]
def get_today_info(self) -> DailyPuzzleInfo | None:
p = self.get_today_puzzle()
if p is None:
return
return self.daily_puzzle[p.index_id]
def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage:
p = self.get_today_puzzle()
d = self.get_today_info()
now = datetime.datetime.now()
if p is None or d is None:
return PuzzleSubmissionResultMessage(
success=False,
message="今天没有题哦,改天再来吧!",
)
if user in d.success_users:
return PuzzleSubmissionResultMessage(
success=False,
message="你今天已经答对过啦!不用重复提交哦!",
)
if flag != p.flag:
d.tried_users.add(user)
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=False,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=False,
message="❌ 答错了,请检查你的答案哦",
)
d.tried_users.add(user)
d.success_users[user] = now
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
success=True,
flag=flag,
time=now,
))
return PuzzleSubmissionResultMessage(
success=True,
rank=len(d.success_users),
)
def admin_create_puzzle(self, user: str):
p = Puzzle(
raw_id=nanoid.generate(
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
12,
),
index_id="",
title="示例标题",
content="题目的内容填写内容",
img_name="",
author_id=user,
flag="konaph{this_is_a_flag}",
ready=False,
)
self.puzzle_data[p.raw_id] = p
return p
def get_puzzles_of_user(self, user: str):
return sorted([
p for p in self.puzzle_data.values()
if p.author_id == user
], key=lambda p: p.created_at, reverse=True)
def get_report_yesterday(self):
yesterday = get_today_date() - datetime.timedelta(days=1)
index_id = self.daily_puzzle_of_date.get(yesterday)
if index_id is None:
return None
info = self.daily_puzzle[index_id]
puzzle = self.puzzle_data[info.raw_id]
message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告")
if len(info.success_users) == 0:
message = message.text(
"\n\n昨日,竟无人解出此题!"
)
else:
message = message.text(
f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:"
)
us = [(u, d) for u, d in info.success_users.items()]
us = sorted(us, key=lambda t: t[1])
us = us[:5]
for u, _ in us:
m = self.submissions[puzzle.raw_id][u][-1]
message = message.text("- ").at(u).text(f"{m.time.strftime('%H:%M')}")
message = message.text("\n\n出题者:").at(puzzle.author_id)
return message
lock = asyncio.Lock()
def read_data():
try:
data_raw = KONAPH_DATA_JSON.read_text()
return PuzzleManager.model_validate_json(data_raw)
except (FileNotFoundError, ValidationError):
return PuzzleManager()
def write_data(data: PuzzleManager):
KONAPH_DATA_JSON.write_text(data.model_dump_json())
@asynccontextmanager
async def puzzle_manager():
async with lock:
data = read_data()
yield data
write_data(data)

View File

@ -0,0 +1,329 @@
import datetime
from math import ceil
from typing import Any
from nonebot import get_plugin_config
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager
PUZZLE_PAGE_SIZE = 10
class PuzzleConfig(BaseModel):
plugin_puzzle_manager: list[str] = []
plugin_puzzle_admin: list[str] = []
plugin_puzzle_playgroup: list[str] = []
config = get_plugin_config(PuzzleConfig)
def is_puzzle_manager(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
f"--- 谜题信息 ---\n"
f"Raw ID: {puzzle.raw_id}\n"
f"标题: {puzzle.title}\n"
f"出题者 ID: {puzzle.author_id}\n"
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Flag: {puzzle.flag}\n"
f"状态: {status}{status_suffix}\n\n"
f"{puzzle.content}"
)
if puzzle.img_name:
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
return msg
def create_admin_commands():
cmd_admin = on_alconna(
Alconna(
"konaph",
Subcommand("create", dest="create"),
Subcommand("ready", Args["raw_id", str], dest="ready"),
Subcommand("unready", Args["raw_id", str], dest="unready"),
Subcommand("info", Args["raw_id", str], dest="info"),
Subcommand("my", Args["page?", int], dest="my"),
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
Subcommand("pin", Args["raw_id?", str], dest="pin"),
Subcommand("unpin", dest="unpin"),
Subcommand(
"modify",
Args["raw_id?", str],
Option("--title", Args["title", str], alias=["-t"]),
Option("--description", Args["description", str], alias=["-d"]),
Option("--image", Args["image?", Image], alias=["-i"]),
Option("--flag", Args["flag", str], alias=["-f"]),
Option("--remove-image"),
dest="modify",
),
Subcommand("publish", Args["raw_id?", str], dest="publish"),
),
rule=is_puzzle_manager,
)
@cmd_admin.assign("$main")
async def _(target: DepLongTaskTarget):
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
msg = msg.text("konaph create - 创建一个新的谜题\n")
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
msg = msg.text("konaph unready <id> - 取消准备一道谜题\n")
msg = msg.text("konaph info <id> - 查看谜题\n")
msg = msg.text("konaph my <page?> - 查看我的谜题列表\n")
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
if is_puzzle_admin(target):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
msg = msg.text("konaph unpin - 取消置顶所有谜题\n")
msg = msg.text("konaph publish <id?> - 强制发题")
await target.send_message(msg)
@cmd_admin.assign("create")
async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager:
puzzle = manager.admin_create_puzzle(target.target_id)
await target.send_message(UniMessage.text(
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
f"- 输入 `konaph my` 查看你创建的谜题\n"
f"- 输入 `konaph modify` 查看更改谜题的方法"
))
@cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
if p.ready:
return await target.send_message(UniMessage.text(
"题目早就准备好啦!"
))
p.ready = True
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经准备就绪!"
))
@cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
))
if not p.ready:
return await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经是未取消状态了!"
))
if manager.is_puzzle_published(p.raw_id):
return await target.send_message(UniMessage.text(
"已发布的谜题不能取消准备状态!"
))
p.ready = False
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经取消准备!"
))
@cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget):
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message(UniMessage.text(
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
))
p = manager.puzzle_data[raw_id]
if p.author_id != target.target_id and not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"这不是你的题,你没有权限查看详细信息!"
))
await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my")
async def _(target: DepLongTaskTarget, page: int = 1):
async with puzzle_manager() as manager:
puzzles = manager.get_puzzles_of_user(target.target_id)
if len(puzzles) == 0:
return await target.send_message(UniMessage.text(
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
))
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 我的谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
if manager.puzzle_pinned == p.raw_id:
message = message.text("[📌]")
if manager.is_puzzle_published(p.raw_id):
message = message.text(f"[#{p.index_id}] ")
elif p.ready:
message = message.text("[✅] ")
else:
message = message.text("[⚙️] ")
message = message.text(f"{p.title} ({p.raw_id})")
message = message.text("\n")
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
await target.send_message(message)
@cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text("你没有权限查看所有的哦"))
async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()]
if ready.available:
puzzles = [p for p in puzzles if p.ready]
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 所有谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
if p.raw_id == manager.puzzle_pinned:
message = message.text("[📌]")
if manager.is_puzzle_published(p.raw_id):
message = message.text(f"[#{p.index_id}] ")
elif p.ready:
message = message.text("[✅] ")
else:
message = message.text("[⚙️] ")
message = message.text(f"{p.title} ({p.raw_id} by {p.author_id})")
message = message.text("\n")
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
await target.send_message(message)
@cmd_admin.assign("pin")
async def _(target: DepLongTaskTarget, raw_id: str = ""):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async with puzzle_manager() as manager:
if raw_id == "":
if manager.puzzle_pinned:
return await target.send_message(UniMessage.text(
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
))
return await target.send_message("没有置顶谜题")
if raw_id not in manager.unpublished_puzzles:
return await target.send_message(UniMessage.text(
"这个谜题已经发布了,或者还没准备好,或者不存在"
))
manager.admin_pin_puzzle(raw_id)
return await target.send_message(f"已置顶谜题 {raw_id}")
@cmd_admin.assign("unpin")
async def _(target: DepLongTaskTarget):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async with puzzle_manager() as manager:
manager.admin_pin_puzzle("")
return await target.send_message("已取消所有置顶")
@cmd_admin.assign("modify")
async def _(
target: DepLongTaskTarget,
raw_id: str = "",
title: str | None = None,
description: str | None = None,
flag: str | None = None,
image: Image | None = None,
remove_image: Query[bool] = Query("modify.remove-image"),
):
if raw_id == "":
return await target.send_message(
"konaph modify <raw_id> - 修改一个谜题\n\n"
"支持的参数:\n"
" --title <str> 标题\n"
" --description <str> 题目详情描述(用直引号包裹以支持多行)\n"
" --flag <str> flag\n"
" --image <图片> 图片\n"
" --remove-image 删除图片"
)
async with puzzle_manager() as manager:
if raw_id not in manager.puzzle_data:
return await target.send_message("没有这个谜题")
p = manager.puzzle_data[raw_id]
if not is_puzzle_admin(target) and target.target_id != p.author_id:
return await target.send_message("你没有权限编辑这个谜题")
if title is not None:
p.title = title
if description is not None:
p.content = description
if flag is not None:
p.flag = flag
if image is not None and image.url is not None:
b = await download_image_bytes(image.url)
p.add_image(b.unwrap())
elif remove_image.available:
p.remove_image()
info2 = get_puzzle_info_message(manager, p)
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
@cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
today = get_today_date()
async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date:
return await target.send_message("今日已经有题了哦")
manager.last_checked_date = today - datetime.timedelta(days=-1)
if raw_id is not None:
manager.admin_pin_puzzle(raw_id)
p = manager.get_today_puzzle(strong=True)
if p is None:
return await target.send_message("上架失败了orz可能是没题了")
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage())
return await target.send_message("Ok!")
return cmd_admin