330 lines
14 KiB
Python
330 lines
14 KiB
Python
import datetime
|
||
from math import ceil
|
||
from typing import Any
|
||
from nonebot import get_plugin_config
|
||
from nonebot_plugin_alconna import Alconna, Args, Image, Option, Query, Subcommand, UniMessage, on_alconna
|
||
from pydantic import BaseModel
|
||
|
||
from konabot.common.longtask import DepLongTaskTarget
|
||
from konabot.common.nb.extract_image import download_image_bytes
|
||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||
from konabot.plugins.kona_ph.core.storage import Puzzle, get_today_date, puzzle_manager
|
||
|
||
PUZZLE_PAGE_SIZE = 10
|
||
|
||
|
||
class PuzzleConfig(BaseModel):
|
||
plugin_puzzle_manager: list[str] = []
|
||
plugin_puzzle_admin: list[str] = []
|
||
plugin_puzzle_playgroup: list[str] = []
|
||
|
||
|
||
config = get_plugin_config(PuzzleConfig)
|
||
|
||
|
||
def is_puzzle_manager(target: DepLongTaskTarget):
|
||
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
|
||
|
||
|
||
def is_puzzle_admin(target: DepLongTaskTarget):
|
||
return target.target_id in config.plugin_puzzle_admin
|
||
|
||
|
||
def get_puzzle_info_message(puzzle: Puzzle) -> UniMessage[Any]:
|
||
status = "✅ 已准备,待发布" if puzzle.ready and not puzzle.published else \
|
||
(f"🟢 已发布: #{puzzle.index_id}" if puzzle.published else "⚙️ 未准备")
|
||
|
||
status_suffix = ""
|
||
if puzzle.pinned:
|
||
status_suffix += " | 📌 已被管理员置顶"
|
||
|
||
msg = UniMessage.text(
|
||
f"--- 谜题信息 ---\n"
|
||
f"Raw ID: {puzzle.raw_id}\n"
|
||
f"标题: {puzzle.title}\n"
|
||
f"出题者 ID: {puzzle.author_id}\n"
|
||
f"创建时间: {puzzle.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||
f"Flag: {puzzle.flag}\n"
|
||
f"状态: {status}{status_suffix}\n\n"
|
||
f"{puzzle.content}"
|
||
)
|
||
|
||
if puzzle.img_name:
|
||
msg = msg.image(raw=puzzle.get_image_path().read_bytes())
|
||
|
||
msg = msg.text(f"\n---------\n使用 `konaph ready {puzzle.raw_id}` 完成编辑")
|
||
|
||
return msg
|
||
|
||
|
||
def create_admin_commands():
|
||
cmd_admin = on_alconna(
|
||
Alconna(
|
||
"konaph",
|
||
Subcommand("create", dest="create"),
|
||
Subcommand("ready", Args["raw_id", str], dest="ready"),
|
||
Subcommand("unready", Args["raw_id", str], dest="unready"),
|
||
Subcommand("info", Args["raw_id", str], dest="info"),
|
||
Subcommand("my", Args["page?", int], dest="my"),
|
||
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
|
||
Subcommand("pin", Args["raw_id?", str], dest="pin"),
|
||
Subcommand("unpin", dest="unpin"),
|
||
Subcommand(
|
||
"modify",
|
||
Args["raw_id?", str],
|
||
Option("--title", Args["title", str], alias=["-t"]),
|
||
Option("--description", Args["description", str], alias=["-d"]),
|
||
Option("--image", Args["image?", Image], alias=["-i"]),
|
||
Option("--flag", Args["flag", str], alias=["-f"]),
|
||
Option("--remove-image"),
|
||
dest="modify",
|
||
),
|
||
Subcommand("publish", Args["raw_id?", str], dest="publish"),
|
||
),
|
||
rule=is_puzzle_manager,
|
||
)
|
||
|
||
@cmd_admin.assign("$main")
|
||
async def _(target: DepLongTaskTarget):
|
||
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
|
||
msg = msg.text("konaph create - 创建一个新的谜题\n")
|
||
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
|
||
msg = msg.text("konaph unready <id> - 取消准备一道谜题\n")
|
||
msg = msg.text("konaph info <id> - 查看谜题\n")
|
||
msg = msg.text("konaph my <page?> - 查看我的谜题列表\n")
|
||
msg = msg.text("konaph modify - 查看如何修改谜题信息\n")
|
||
|
||
if is_puzzle_admin(target):
|
||
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
|
||
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
|
||
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
|
||
msg = msg.text("konaph unpin - 取消置顶所有谜题\n")
|
||
msg = msg.text("konaph publish <id?> - 强制发题")
|
||
|
||
await target.send_message(msg)
|
||
|
||
@cmd_admin.assign("create")
|
||
async def _(target: DepLongTaskTarget):
|
||
async with puzzle_manager() as manager:
|
||
puzzle = manager.admin_create_puzzle(target.target_id)
|
||
await target.send_message(UniMessage.text(
|
||
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
||
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
||
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
||
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
||
))
|
||
|
||
@cmd_admin.assign("ready")
|
||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||
async with puzzle_manager() as manager:
|
||
if raw_id not in manager.puzzle_data:
|
||
return await target.send_message(UniMessage.text(
|
||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||
))
|
||
p = manager.puzzle_data[raw_id]
|
||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||
return await target.send_message(UniMessage.text(
|
||
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
|
||
))
|
||
if p.ready:
|
||
return await target.send_message(UniMessage.text(
|
||
"题目早就准备好啦!"
|
||
))
|
||
manager.admin_mark_ready(raw_id, True)
|
||
await target.send_message(UniMessage.text(
|
||
f"谜题「{p.title}」已经准备就绪!"
|
||
))
|
||
|
||
@cmd_admin.assign("unready")
|
||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||
async with puzzle_manager() as manager:
|
||
if raw_id not in manager.puzzle_data:
|
||
return await target.send_message(UniMessage.text(
|
||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||
))
|
||
p = manager.puzzle_data[raw_id]
|
||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||
return await target.send_message(UniMessage.text(
|
||
"这不是你的题,你没有权限编辑!输入 `konaph my` 查看你创建的谜题"
|
||
))
|
||
if not p.ready:
|
||
return await target.send_message(UniMessage.text(
|
||
f"谜题「{p.title}」已经是未取消状态了!"
|
||
))
|
||
if p.published:
|
||
return await target.send_message(UniMessage.text(
|
||
"已发布的谜题不能取消准备状态!"
|
||
))
|
||
|
||
manager.admin_mark_ready(raw_id, False)
|
||
await target.send_message(UniMessage.text(
|
||
f"谜题「{p.title}」已经取消准备!"
|
||
))
|
||
|
||
@cmd_admin.assign("info")
|
||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||
async with puzzle_manager() as manager:
|
||
if raw_id not in manager.puzzle_data:
|
||
return await target.send_message(UniMessage.text(
|
||
"你输入的谜题不存在!输入 `konaph my` 查看你创建的谜题"
|
||
))
|
||
p = manager.puzzle_data[raw_id]
|
||
if p.author_id != target.target_id and not is_puzzle_admin(target):
|
||
return await target.send_message(UniMessage.text(
|
||
"这不是你的题,你没有权限查看详细信息!"
|
||
))
|
||
|
||
await target.send_message(get_puzzle_info_message(p))
|
||
|
||
@cmd_admin.assign("my")
|
||
async def _(target: DepLongTaskTarget, page: int = 1):
|
||
async with puzzle_manager() as manager:
|
||
puzzles = manager.get_puzzles_of_user(target.target_id)
|
||
if len(puzzles) == 0:
|
||
return await target.send_message(UniMessage.text(
|
||
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
|
||
))
|
||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||
if page <= 0 or page > count_pages:
|
||
return await target.send_message(UniMessage.text(
|
||
f"页数只有 1 ~ {count_pages} 啦!"
|
||
))
|
||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||
message = UniMessage.text("==== 我的谜题 ====\n\n")
|
||
for p in puzzles:
|
||
message = message.text("- ")
|
||
if p.pinned:
|
||
message = message.text("[📌]")
|
||
if p.published:
|
||
message = message.text(f"[#{p.index_id}] ")
|
||
elif p.ready:
|
||
message = message.text("[✅] ")
|
||
else:
|
||
message = message.text("[⚙️] ")
|
||
message = message.text(f"{p.title} ({p.raw_id})")
|
||
message = message.text("\n")
|
||
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||
await target.send_message(message)
|
||
|
||
@cmd_admin.assign("all")
|
||
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
|
||
if not is_puzzle_admin(target):
|
||
return await target.send_message(UniMessage.text("你没有权限查看所有的哦"))
|
||
async with puzzle_manager() as manager:
|
||
puzzles = [*manager.puzzle_data.values()]
|
||
if ready.available:
|
||
puzzles = [p for p in puzzles if p.ready]
|
||
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
|
||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||
if page <= 0 or page > count_pages:
|
||
return await target.send_message(UniMessage.text(
|
||
f"页数只有 1 ~ {count_pages} 啦!"
|
||
))
|
||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||
message = UniMessage.text("==== 所有谜题 ====\n\n")
|
||
for p in puzzles:
|
||
message = message.text("- ")
|
||
if p.pinned:
|
||
message = message.text("[📌]")
|
||
if p.published:
|
||
message = message.text(f"[#{p.index_id}] ")
|
||
elif p.ready:
|
||
message = message.text("[✅] ")
|
||
else:
|
||
message = message.text("[⚙️] ")
|
||
message = message.text(f"{p.title} ({p.raw_id} by {p.author_id})")
|
||
message = message.text("\n")
|
||
message = message.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||
await target.send_message(message)
|
||
|
||
@cmd_admin.assign("pin")
|
||
async def _(target: DepLongTaskTarget, raw_id: str = ""):
|
||
if not is_puzzle_admin(target):
|
||
return await target.send_message(UniMessage.text(
|
||
"你没有权限使用该指令"
|
||
))
|
||
|
||
async with puzzle_manager() as manager:
|
||
if raw_id == "":
|
||
if manager.puzzle_pinned:
|
||
return await target.send_message(UniMessage.text(
|
||
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
|
||
))
|
||
return await target.send_message("没有置顶谜题")
|
||
if raw_id not in manager.unpublished_puzzles:
|
||
return await target.send_message(UniMessage.text(
|
||
"这个谜题已经发布了,或者还没准备好,或者不存在"
|
||
))
|
||
manager.admin_pin_puzzle(raw_id)
|
||
return await target.send_message(f"已置顶谜题 {raw_id}")
|
||
|
||
@cmd_admin.assign("unpin")
|
||
async def _(target: DepLongTaskTarget):
|
||
if not is_puzzle_admin(target):
|
||
return await target.send_message(UniMessage.text(
|
||
"你没有权限使用该指令"
|
||
))
|
||
async with puzzle_manager() as manager:
|
||
manager.admin_pin_puzzle("")
|
||
return await target.send_message("已取消所有置顶")
|
||
|
||
@cmd_admin.assign("modify")
|
||
async def _(
|
||
target: DepLongTaskTarget,
|
||
raw_id: str = "",
|
||
title: str | None = None,
|
||
description: str | None = None,
|
||
flag: str | None = None,
|
||
image: Image | None = None,
|
||
remove_image: Query[bool] = Query("modify.remove-image"),
|
||
):
|
||
if raw_id == "":
|
||
return await target.send_message(
|
||
"konaph modify <raw_id> - 修改一个谜题\n\n"
|
||
"支持的参数:\n"
|
||
" --title <str> 标题\n"
|
||
" --description <str> 题目详情描述(用直引号包裹以支持多行)\n"
|
||
" --flag <str> flag\n"
|
||
" --image <图片> 图片\n"
|
||
" --remove-image 删除图片"
|
||
)
|
||
|
||
async with puzzle_manager() as manager:
|
||
if raw_id not in manager.puzzle_data:
|
||
return await target.send_message("没有这个谜题")
|
||
p = manager.puzzle_data[raw_id]
|
||
if not is_puzzle_admin(target) and target.target_id != p.author_id:
|
||
return await target.send_message("你没有权限编辑这个谜题")
|
||
if title is not None:
|
||
p.title = title
|
||
if description is not None:
|
||
p.content = description
|
||
if flag is not None:
|
||
p.flag = flag
|
||
if image is not None and image.url is not None:
|
||
b = await download_image_bytes(image.url)
|
||
p.add_image(b.unwrap())
|
||
elif remove_image.available:
|
||
p.remove_image()
|
||
|
||
info2 = get_puzzle_info_message(p)
|
||
|
||
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
|
||
|
||
@cmd_admin.assign("publish")
|
||
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
|
||
today = get_today_date()
|
||
async with puzzle_manager() as manager:
|
||
if today in manager.daily_puzzle_of_date:
|
||
return await target.send_message("今日已经有题了哦")
|
||
manager.last_checked_date = today - datetime.timedelta(days=-1)
|
||
if raw_id is not None:
|
||
manager.admin_pin_puzzle(raw_id)
|
||
p = manager.get_today_puzzle(strong=True)
|
||
if p is None:
|
||
return await target.send_message("上架失败了orz,可能是没题了")
|
||
await qq_broadcast(config.plugin_puzzle_playgroup, p.get_unimessage())
|
||
return await target.send_message("Ok!")
|
||
|
||
return cmd_admin
|