Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 312e203bbe | |||
| f9deabfce0 | |||
| 0a822bf440 | |||
| 534a2c9e75 | |||
| a03cef4124 | |||
| 16351792b6 |
@ -51,13 +51,16 @@ class LongTaskTarget(BaseModel):
|
||||
target_id: str
|
||||
"沟通对象的 ID"
|
||||
|
||||
async def send_message(self, msg: UniMessage, at: bool = True) -> bool:
|
||||
async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool:
|
||||
try:
|
||||
bot = nonebot.get_bot(self.self_id)
|
||||
except KeyError:
|
||||
logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}")
|
||||
return False
|
||||
|
||||
if isinstance(msg, str):
|
||||
msg = UniMessage.text(msg)
|
||||
|
||||
if self.platform == "qq":
|
||||
if not isinstance(bot, OBBot):
|
||||
logger.warning(
|
||||
|
||||
@ -16,8 +16,6 @@ from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||
from PIL import UnidentifiedImageError
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
|
||||
|
||||
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||
# if "/matcha/cache/" in url:
|
||||
|
||||
30
konabot/common/nb/qq_broadcast.py
Normal file
30
konabot/common/nb/qq_broadcast.py
Normal file
@ -0,0 +1,30 @@
|
||||
from typing import Any, cast
|
||||
import nonebot
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from nonebot.adapters.onebot.v11 import Bot as OBBot
|
||||
|
||||
|
||||
async def qq_broadcast(groups: list[str], msg: UniMessage[Any]):
|
||||
bots: dict[str, OBBot] = {}
|
||||
|
||||
# group_id -> bot_id
|
||||
availabilities: dict[str, str] = {}
|
||||
|
||||
for bot_id, bot in nonebot.get_bots().items():
|
||||
if not isinstance(bot, OBBot):
|
||||
continue
|
||||
bots[bot_id] = bot
|
||||
gl = await bot.get_group_list()
|
||||
for g in gl:
|
||||
gid = str(g.get("group_id", -1))
|
||||
if gid in groups:
|
||||
availabilities[gid] = bot_id
|
||||
|
||||
for group in groups:
|
||||
if group in availabilities:
|
||||
bot = bots[availabilities[group]]
|
||||
await bot.send_group_msg(
|
||||
group_id=int(group),
|
||||
message=cast(Any, await msg.export(bot)),
|
||||
auto_escape=False,
|
||||
)
|
||||
4
konabot/docs/sys/konaph.txt
Normal file
4
konabot/docs/sys/konaph.txt
Normal file
@ -0,0 +1,4 @@
|
||||
指令介绍
|
||||
konaph - KonaBot 的 PuzzleHunt 管理工具
|
||||
|
||||
详细介绍请直接输入 konaph 获取使用指引(该指令权限仅对部分人开放。如果你有权限的话才有响应。建议在此方 BOT 私聊使用该指令。)
|
||||
@ -248,6 +248,7 @@ class IdiomGame:
|
||||
if not self.is_nextable(self.last_char):
|
||||
# 没有成语可以接了,自动跳过
|
||||
self._skip_idiom_async()
|
||||
self.add_buff_score(-100)
|
||||
state.append(TryVerifyState.BUT_NO_NEXT)
|
||||
return state
|
||||
|
||||
|
||||
106
konabot/plugins/kona_ph/__init__.py
Normal file
106
konabot/plugins/kona_ph/__init__.py
Normal file
@ -0,0 +1,106 @@
|
||||
from math import ceil
|
||||
from loguru import logger
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||||
from konabot.plugins.kona_ph.core.storage import get_today_date
|
||||
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
|
||||
create_admin_commands()
|
||||
|
||||
|
||||
async def is_play_group(target: DepLongTaskTarget):
|
||||
if target.channel_id in config.plugin_puzzle_playgroup:
|
||||
return True
|
||||
if target.target_id in target.channel_id:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
cmd_submit = on_alconna(Alconna(
|
||||
"re:提交(?:答案|题解|[fF]lag)",
|
||||
Args["flag", str],
|
||||
), rule=is_play_group)
|
||||
|
||||
@cmd_submit.handle()
|
||||
async def _(flag: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
result = manager.submit(target.target_id, flag)
|
||||
await target.send_message(result.get_unimessage())
|
||||
|
||||
|
||||
cmd_query = on_alconna(Alconna(
|
||||
r"re:(?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?"
|
||||
), rule=is_play_group)
|
||||
|
||||
@cmd_query.handle()
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
p = manager.get_today_puzzle()
|
||||
if p is None:
|
||||
return await target.send_message("今天无题,改日再来吧!")
|
||||
await target.send_message(p.get_unimessage())
|
||||
|
||||
|
||||
cmd_history = on_alconna(Alconna(
|
||||
"历史题目",
|
||||
Args["page?", int],
|
||||
Args["index_id?", str],
|
||||
), rule=is_play_group)
|
||||
|
||||
@cmd_history.handle()
|
||||
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
async with puzzle_manager() as manager:
|
||||
today = get_today_date()
|
||||
if index_id:
|
||||
index_id = index_id.removeprefix("#")
|
||||
if index_id == manager.daily_puzzle_of_date.get(today, ""):
|
||||
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
|
||||
return await target.send_message(puzzle.get_unimessage())
|
||||
if index_id in manager.daily_puzzle:
|
||||
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
|
||||
msg = puzzle.get_unimessage()
|
||||
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}")
|
||||
return await target.send_message(msg)
|
||||
return await target.send_message("没有这道题哦")
|
||||
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
|
||||
puzzles = [
|
||||
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
|
||||
for d, i in manager.daily_puzzle_of_date.items()
|
||||
]
|
||||
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
for p, d in puzzles:
|
||||
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
|
||||
msg = msg.text(
|
||||
f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]"
|
||||
f" {p.title} ({d})"
|
||||
)
|
||||
msg = msg.text("\n")
|
||||
msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||||
await target.send_message(msg)
|
||||
|
||||
|
||||
@scheduler.scheduled_job("cron", hour="8")
|
||||
async def _():
|
||||
async with puzzle_manager() as manager:
|
||||
msg2 = manager.get_report_yesterday()
|
||||
if msg2 is not None:
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
|
||||
|
||||
puzzle = manager.get_today_puzzle()
|
||||
if puzzle is not None:
|
||||
logger.info(f"找到了题目 {puzzle.raw_id},发送")
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage())
|
||||
else:
|
||||
logger.info("自动任务:没有找到题目,跳过")
|
||||
|
||||
|
||||
0
konabot/plugins/kona_ph/core/__init__.py
Normal file
0
konabot/plugins/kona_ph/core/__init__.py
Normal file
315
konabot/plugins/kona_ph/core/storage.py
Normal file
315
konabot/plugins/kona_ph/core/storage.py
Normal file
@ -0,0 +1,315 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
import random
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import nanoid
|
||||
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
|
||||
from konabot.common.path import DATA_PATH
|
||||
|
||||
|
||||
KONAPH_BASE = DATA_PATH / "KonaPH"
|
||||
KONAPH_DATA_JSON = KONAPH_BASE / "data.json"
|
||||
KONAPH_IMAGE_BASE = KONAPH_BASE / "imgs"
|
||||
|
||||
# 保证所有文件夹存在
|
||||
KONAPH_BASE.mkdir(exist_ok=True)
|
||||
KONAPH_IMAGE_BASE.mkdir(exist_ok=True)
|
||||
|
||||
|
||||
class Puzzle(BaseModel):
|
||||
raw_id: str
|
||||
"用于给出题者管理的 ID"
|
||||
|
||||
index_id: str
|
||||
"展出的 ID,以展出顺序为准"
|
||||
|
||||
title: str
|
||||
content: str
|
||||
img_name: str
|
||||
author_id: str
|
||||
flag: str
|
||||
|
||||
ready: bool = False
|
||||
published: bool = False
|
||||
pinned: bool = False
|
||||
|
||||
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
|
||||
|
||||
def get_image_path(self) -> Path:
|
||||
return KONAPH_IMAGE_BASE / self.img_name
|
||||
|
||||
def get_unimessage(self) -> UniMessage[Any]:
|
||||
result = UniMessage.text(f"[KonaPH#{self.index_id}] {self.title}")
|
||||
result = result.text(f"\n\n{self.content}")
|
||||
|
||||
if self.img_name:
|
||||
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes())
|
||||
|
||||
result = result.text("\n\n出题者:").at(self.author_id)
|
||||
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
|
||||
|
||||
return result
|
||||
|
||||
def add_image(self, img: bytes, suffix: str = ".png"):
|
||||
if self.img_name:
|
||||
self.get_image_path().unlink(True)
|
||||
img_id = nanoid.generate(
|
||||
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
|
||||
21,
|
||||
)
|
||||
self.img_name = f"{img_id}{suffix}"
|
||||
self.get_image_path().write_bytes(img)
|
||||
|
||||
def remove_image(self):
|
||||
if self.img_name:
|
||||
self.get_image_path().unlink(True)
|
||||
self.img_name = ""
|
||||
|
||||
|
||||
class PuzzleSubmission(BaseModel):
|
||||
success: bool
|
||||
flag: str
|
||||
time: datetime.datetime
|
||||
|
||||
|
||||
class DailyPuzzleInfo(BaseModel):
|
||||
raw_id: str
|
||||
time: datetime.date
|
||||
tried_users: set[str] = set()
|
||||
success_users: dict[str, datetime.datetime] = {}
|
||||
|
||||
|
||||
class PuzzleSubmissionResultMessage(BaseModel):
|
||||
success: bool
|
||||
rank: int = -1
|
||||
message: str = ""
|
||||
|
||||
def get_unimessage(self) -> UniMessage[Any]:
|
||||
if self.success:
|
||||
return UniMessage.text(f"🎉 恭喜你答对了!你是今天第 {self.rank} 个解出来的!")
|
||||
return UniMessage.text(self.message)
|
||||
|
||||
|
||||
def get_today_date() -> datetime.date:
|
||||
now = datetime.datetime.now()
|
||||
if now.hour < 8:
|
||||
now -= datetime.timedelta(days=1)
|
||||
return now.date()
|
||||
|
||||
|
||||
class PuzzleManager(BaseModel):
|
||||
puzzle_data: dict[str, Puzzle] = {}
|
||||
|
||||
daily_puzzle: dict[str, DailyPuzzleInfo] = {}
|
||||
daily_puzzle_of_date: dict[datetime.date, str] = {}
|
||||
|
||||
puzzle_pinned: str = ""
|
||||
unpublished_puzzles: set[str] = set()
|
||||
unready_puzzles: set[str] = set()
|
||||
published_puzzles: set[str] = set()
|
||||
|
||||
index_id_counter: int = 1
|
||||
submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {}
|
||||
last_pubish_date: datetime.date = Field(
|
||||
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
|
||||
)
|
||||
last_checked_date: datetime.date = Field(
|
||||
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
|
||||
)
|
||||
|
||||
def publish_puzzle(self, raw_id: str):
|
||||
assert raw_id in self.puzzle_data
|
||||
|
||||
self.unpublished_puzzles -= set(raw_id)
|
||||
self.unready_puzzles -= set(raw_id)
|
||||
p = self.puzzle_data[raw_id]
|
||||
p.index_id = str(self.index_id_counter)
|
||||
p.ready = True
|
||||
p.published = True
|
||||
p.pinned = False
|
||||
self.puzzle_pinned = ""
|
||||
self.last_pubish_date = get_today_date()
|
||||
self.last_checked_date = self.last_pubish_date
|
||||
self.daily_puzzle[p.index_id] = DailyPuzzleInfo(
|
||||
raw_id=raw_id,
|
||||
time=self.last_pubish_date,
|
||||
)
|
||||
self.daily_puzzle_of_date[self.last_pubish_date] = p.index_id
|
||||
self.published_puzzles.add(raw_id)
|
||||
|
||||
self.index_id_counter += 1
|
||||
|
||||
def admin_mark_ready(self, raw_id: str, ready: bool = True):
|
||||
if raw_id not in self.puzzle_data:
|
||||
return
|
||||
if ready:
|
||||
self.unready_puzzles -= set(raw_id)
|
||||
if raw_id not in self.published_puzzles:
|
||||
self.unpublished_puzzles.add(raw_id)
|
||||
p = self.puzzle_data[raw_id]
|
||||
p.ready = True
|
||||
p.published = raw_id in self.published_puzzles
|
||||
else:
|
||||
self.unready_puzzles.add(raw_id)
|
||||
self.unpublished_puzzles -= set(raw_id)
|
||||
p = self.puzzle_data[raw_id]
|
||||
p.ready = False
|
||||
p.published = False
|
||||
# if p.raw_id == self.puzzle_pinned:
|
||||
# self.puzzle_pinned = ""
|
||||
|
||||
def admin_pin_puzzle(self, raw_id: str):
|
||||
if self.puzzle_pinned:
|
||||
p = self.puzzle_data.get(self.puzzle_pinned)
|
||||
if p is not None:
|
||||
p.pinned = False
|
||||
if raw_id in self.puzzle_data:
|
||||
p = self.puzzle_data[raw_id]
|
||||
p.pinned = True
|
||||
self.puzzle_pinned = raw_id
|
||||
else:
|
||||
self.puzzle_pinned = ""
|
||||
|
||||
def get_today_puzzle(self, strong: bool = False) -> Puzzle | None:
|
||||
today = get_today_date()
|
||||
if today in self.daily_puzzle_of_date:
|
||||
index_id = self.daily_puzzle_of_date[today]
|
||||
info = self.daily_puzzle[index_id]
|
||||
return self.puzzle_data[info.raw_id]
|
||||
if today == self.last_checked_date and not strong:
|
||||
return
|
||||
self.last_checked_date = today
|
||||
if self.puzzle_pinned and self.puzzle_pinned in self.puzzle_data:
|
||||
d = self.puzzle_pinned
|
||||
self.publish_puzzle(d)
|
||||
self.puzzle_pinned = ""
|
||||
return self.puzzle_data[d]
|
||||
elif len(self.unpublished_puzzles) > 0:
|
||||
d = random.choice(list(self.unpublished_puzzles))
|
||||
self.publish_puzzle(d)
|
||||
return self.puzzle_data[d]
|
||||
|
||||
def get_today_info(self) -> DailyPuzzleInfo | None:
|
||||
p = self.get_today_puzzle()
|
||||
if p is None:
|
||||
return
|
||||
return self.daily_puzzle[p.index_id]
|
||||
|
||||
def submit(self, user: str, flag: str) -> PuzzleSubmissionResultMessage:
|
||||
p = self.get_today_puzzle()
|
||||
d = self.get_today_info()
|
||||
now = datetime.datetime.now()
|
||||
if p is None or d is None:
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=False,
|
||||
message="今天没有题哦,改天再来吧!",
|
||||
)
|
||||
if user in d.success_users:
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=False,
|
||||
message="你今天已经答对过啦!不用重复提交哦!",
|
||||
)
|
||||
if flag != p.flag:
|
||||
d.tried_users.add(user)
|
||||
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
|
||||
success=False,
|
||||
flag=flag,
|
||||
time=now,
|
||||
))
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=False,
|
||||
message="❌ 答错了,请检查你的答案哦",
|
||||
)
|
||||
d.tried_users.add(user)
|
||||
d.success_users[user] = now
|
||||
self.submissions.setdefault(p.raw_id, {}).setdefault(user, []).append(PuzzleSubmission(
|
||||
success=True,
|
||||
flag=flag,
|
||||
time=now,
|
||||
))
|
||||
return PuzzleSubmissionResultMessage(
|
||||
success=True,
|
||||
rank=len(d.success_users),
|
||||
)
|
||||
|
||||
def admin_create_puzzle(self, user: str):
|
||||
p = Puzzle(
|
||||
raw_id=nanoid.generate(
|
||||
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz",
|
||||
12,
|
||||
),
|
||||
index_id="",
|
||||
title="示例标题",
|
||||
content="题目的内容填写内容",
|
||||
img_name="",
|
||||
author_id=user,
|
||||
flag="konaph{this_is_a_flag}",
|
||||
ready=False,
|
||||
published=False,
|
||||
)
|
||||
self.unready_puzzles.add(p.raw_id)
|
||||
self.puzzle_data[p.raw_id] = p
|
||||
return p
|
||||
|
||||
def get_puzzles_of_user(self, user: str):
|
||||
return sorted([
|
||||
p for p in self.puzzle_data.values()
|
||||
if p.author_id == user
|
||||
], key=lambda p: p.created_at, reverse=True)
|
||||
|
||||
def get_report_yesterday(self):
|
||||
yesterday = get_today_date() - datetime.timedelta(days=1)
|
||||
index_id = self.daily_puzzle_of_date.get(yesterday)
|
||||
if index_id is None:
|
||||
return None
|
||||
info = self.daily_puzzle[index_id]
|
||||
puzzle = self.puzzle_data[info.raw_id]
|
||||
message = UniMessage.text(f"[KonaPH#{index_id}] 「{puzzle.title}」解答报告")
|
||||
|
||||
if len(info.success_users) == 0:
|
||||
message = message.text(
|
||||
"\n\n昨日,竟无人解出此题!"
|
||||
)
|
||||
else:
|
||||
message = message.text(
|
||||
f"\n\n昨日,共有 {len(info.success_users)} 人解出此题。\n\n前五名的解答者:"
|
||||
)
|
||||
us = [(u, d) for u, d in info.success_users.items()]
|
||||
us = sorted(us, key=lambda t: t[1])
|
||||
us = us[:5]
|
||||
for u, _ in us:
|
||||
m = self.submissions[puzzle.raw_id][u][-1]
|
||||
message = message.text("- ").at(u).text(f" 于 {m.time.strftime('%H:%M')}")
|
||||
|
||||
message = message.text("\n\n出题者:").at(puzzle.author_id)
|
||||
return message
|
||||
|
||||
|
||||
lock = asyncio.Lock()
|
||||
|
||||
|
||||
def read_data():
|
||||
try:
|
||||
data_raw = KONAPH_DATA_JSON.read_text()
|
||||
return PuzzleManager.model_validate_json(data_raw)
|
||||
except (FileNotFoundError, ValidationError):
|
||||
return PuzzleManager()
|
||||
|
||||
|
||||
def write_data(data: PuzzleManager):
|
||||
KONAPH_DATA_JSON.write_text(data.model_dump_json())
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def puzzle_manager():
|
||||
async with lock:
|
||||
data = read_data()
|
||||
yield data
|
||||
write_data(data)
|
||||
329
konabot/plugins/kona_ph/manager.py
Normal file
329
konabot/plugins/kona_ph/manager.py
Normal file
@ -0,0 +1,329 @@
|
||||
import datetime
|
||||
from math import ceil
|
||||
from typing import Any
|
||||
from nonebot import get_plugin_config
|
||||
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.extract_image import download_image_bytes
|
||||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||||
from konabot.plugins.kona_ph.core.storage import Puzzle, get_today_date, puzzle_manager
|
||||
|
||||
PUZZLE_PAGE_SIZE = 10
|
||||
|
||||
|
||||
class PuzzleConfig(BaseModel):
|
||||
plugin_puzzle_manager: list[str] = []
|
||||
plugin_puzzle_admin: list[str] = []
|
||||
plugin_puzzle_playgroup: list[str] = []
|
||||
|
||||
|
||||
config = get_plugin_config(PuzzleConfig)
|
||||
|
||||
|
||||
def is_puzzle_manager(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
|
||||
|
||||
|
||||
def is_puzzle_admin(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_admin
|
||||
|
||||
|
||||
def get_puzzle_info_message(puzzle: Puzzle) -> UniMessage[Any]:
|
||||
status = "✅ 已准备,待发布" if puzzle.ready and not puzzle.published else \
|
||||
(f"🟢 已发布: #{puzzle.index_id}" if puzzle.published else "⚙️ 未准备")
|
||||
|
||||
status_suffix = ""
|
||||
if puzzle.pinned:
|
||||
status_suffix += " | 📌 已被管理员置顶"
|
||||
|
||||
msg = UniMessage.text(
|
||||
f"--- 谜题信息 ---\n"
|
||||
f"Raw ID: {puzzle.raw_id}\n"
|
||||
f"标题: {puzzle.title}\n"
|
||||
f"出题者 ID: {puzzle.author_id}\n"
|
||||
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||||
f"Flag: {puzzle.flag}\n"
|
||||
f"状态: {status}{status_suffix}\n\n"
|
||||
f"{puzzle.content}"
|
||||
)
|
||||
|
||||
if puzzle.img_name:
|
||||
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
|
||||
|
||||
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
def create_admin_commands():
|
||||
cmd_admin = on_alconna(
|
||||
Alconna(
|
||||
"konaph",
|
||||
Subcommand("create", dest="create"),
|
||||
Subcommand("ready", Args["raw_id", str], dest="ready"),
|
||||
Subcommand("unready", Args["raw_id", str], dest="unready"),
|
||||
Subcommand("info", Args["raw_id", str], dest="info"),
|
||||
Subcommand("my", Args["page?", int], dest="my"),
|
||||
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
|
||||
Subcommand("pin", Args["raw_id?", str], dest="pin"),
|
||||
Subcommand("unpin", dest="unpin"),
|
||||
Subcommand(
|
||||
"modify",
|
||||
Args["raw_id?", str],
|
||||
Option("--title", Args["title", str], alias=["-t"]),
|
||||
Option("--description", Args["description", str], alias=["-d"]),
|
||||
Option("--image", Args["image?", Image], alias=["-i"]),
|
||||
Option("--flag", Args["flag", str], alias=["-f"]),
|
||||
Option("--remove-image"),
|
||||
dest="modify",
|
||||
),
|
||||
Subcommand("publish", Args["raw_id?", str], dest="publish"),
|
||||
),
|
||||
rule=is_puzzle_manager,
|
||||
)
|
||||
|
||||
@cmd_admin.assign("$main")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
|
||||
msg = msg.text("konaph create - 创建一个新的谜题\n")
|
||||
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
|
||||
msg = msg.text("konaph unready <id> - 取消准备一道谜题\n")
|
||||
msg = msg.text("konaph info <id> - 查看谜题\n")
|
||||
msg = msg.text("konaph my <page?> - 查看我的谜题列表\n")
|
||||
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
|
||||
|
||||
if is_puzzle_admin(target):
|
||||
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
|
||||
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
|
||||
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
|
||||
msg = msg.text("konaph unpin - 取消置顶所有谜题\n")
|
||||
msg = msg.text("konaph publish <id?> - 强制发题")
|
||||
|
||||
await target.send_message(msg)
|
||||
|
||||
@cmd_admin.assign("create")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
puzzle = manager.admin_create_puzzle(target.target_id)
|
||||
await target.send_message(UniMessage.text(
|
||||
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
||||
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
||||
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
||||
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
||||
))
|
||||
|
||||
@cmd_admin.assign("ready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
if p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"题目早就准备好啦!"
|
||||
))
|
||||
manager.admin_mark_ready(raw_id, True)
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经准备就绪!"
|
||||
))
|
||||
|
||||
@cmd_admin.assign("unready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
if not p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经是未取消状态了!"
|
||||
))
|
||||
if p.published:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"已发布的谜题不能取消准备状态!"
|
||||
))
|
||||
|
||||
manager.admin_mark_ready(raw_id, False)
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经取消准备!"
|
||||
))
|
||||
|
||||
@cmd_admin.assign("info")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||||
))
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这不是你的题,你没有权限查看详细信息!"
|
||||
))
|
||||
|
||||
await target.send_message(get_puzzle_info_message(p))
|
||||
|
||||
@cmd_admin.assign("my")
|
||||
async def _(target: DepLongTaskTarget, page: int = 1):
|
||||
async with puzzle_manager() as manager:
|
||||
puzzles = manager.get_puzzles_of_user(target.target_id)
|
||||
if len(puzzles) == 0:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
|
||||
))
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
message = UniMessage.text("==== 我的谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
if p.pinned:
|
||||
message = message.text("[📌]")
|
||||
if p.published:
|
||||
message = message.text(f"[#{p.index_id}] ")
|
||||
elif p.ready:
|
||||
message = message.text("[✅] ")
|
||||
else:
|
||||
message = message.text("[⚙️] ")
|
||||
message = message.text(f"{p.title} ({p.raw_id})")
|
||||
message = message.text("\n")
|
||||
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||||
await target.send_message(message)
|
||||
|
||||
@cmd_admin.assign("all")
|
||||
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text("你没有权限查看所有的哦"))
|
||||
async with puzzle_manager() as manager:
|
||||
puzzles = [*manager.puzzle_data.values()]
|
||||
if ready.available:
|
||||
puzzles = [p for p in puzzles if p.ready]
|
||||
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
message = UniMessage.text("==== 所有谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
if p.pinned:
|
||||
message = message.text("[📌]")
|
||||
if p.published:
|
||||
message = message.text(f"[#{p.index_id}] ")
|
||||
elif p.ready:
|
||||
message = message.text("[✅] ")
|
||||
else:
|
||||
message = message.text("[⚙️] ")
|
||||
message = message.text(f"{p.title} ({p.raw_id} by {p.author_id})")
|
||||
message = message.text("\n")
|
||||
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||||
await target.send_message(message)
|
||||
|
||||
@cmd_admin.assign("pin")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str = ""):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id == "":
|
||||
if manager.puzzle_pinned:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
|
||||
))
|
||||
return await target.send_message("没有置顶谜题")
|
||||
if raw_id not in manager.unpublished_puzzles:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这个谜题已经发布了,或者还没准备好,或者不存在"
|
||||
))
|
||||
manager.admin_pin_puzzle(raw_id)
|
||||
return await target.send_message(f"已置顶谜题 {raw_id}")
|
||||
|
||||
@cmd_admin.assign("unpin")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async with puzzle_manager() as manager:
|
||||
manager.admin_pin_puzzle("")
|
||||
return await target.send_message("已取消所有置顶")
|
||||
|
||||
@cmd_admin.assign("modify")
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str = "",
|
||||
title: str | None = None,
|
||||
description: str | None = None,
|
||||
flag: str | None = None,
|
||||
image: Image | None = None,
|
||||
remove_image: Query[bool] = Query("modify.remove-image"),
|
||||
):
|
||||
if raw_id == "":
|
||||
return await target.send_message(
|
||||
"konaph modify <raw_id> - 修改一个谜题\n\n"
|
||||
"支持的参数:\n"
|
||||
" --title <str> 标题\n"
|
||||
" --description <str> 题目详情描述(用直引号包裹以支持多行)\n"
|
||||
" --flag <str> flag\n"
|
||||
" --image <图片> 图片\n"
|
||||
" --remove-image 删除图片"
|
||||
)
|
||||
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
return await target.send_message("没有这个谜题")
|
||||
p = manager.puzzle_data[raw_id]
|
||||
if not is_puzzle_admin(target) and target.target_id != p.author_id:
|
||||
return await target.send_message("你没有权限编辑这个谜题")
|
||||
if title is not None:
|
||||
p.title = title
|
||||
if description is not None:
|
||||
p.content = description
|
||||
if flag is not None:
|
||||
p.flag = flag
|
||||
if image is not None and image.url is not None:
|
||||
b = await download_image_bytes(image.url)
|
||||
p.add_image(b.unwrap())
|
||||
elif remove_image.available:
|
||||
p.remove_image()
|
||||
|
||||
info2 = get_puzzle_info_message(p)
|
||||
|
||||
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
|
||||
|
||||
@cmd_admin.assign("publish")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
|
||||
today = get_today_date()
|
||||
async with puzzle_manager() as manager:
|
||||
if today in manager.daily_puzzle_of_date:
|
||||
return await target.send_message("今日已经有题了哦")
|
||||
manager.last_checked_date = today - datetime.timedelta(days=-1)
|
||||
if raw_id is not None:
|
||||
manager.admin_pin_puzzle(raw_id)
|
||||
p = manager.get_today_puzzle(strong=True)
|
||||
if p is None:
|
||||
return await target.send_message("上架失败了orz,可能是没题了")
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage())
|
||||
return await target.send_message("Ok!")
|
||||
|
||||
return cmd_admin
|
||||
Reference in New Issue
Block a user