Compare commits

...

7 Commits

Author SHA1 Message Date
f3389ff2b9 添加服务器管理相关,以及 cronjob
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 03:34:14 +08:00
e59d3c2e4b 哎哟喂这个文件怎么没交
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 00:40:11 +08:00
31d19b7ec0 我没辙了直接把测试打包进去吧
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-07 18:41:59 +08:00
c2f677911d 添加一些权限目标
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:36:51 +08:00
f5b81319f8 konaph 接入权限系统
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:15:28 +08:00
870e2383d8 为 Drone 提供单元测试目录 2026-03-07 18:15:16 +08:00
7e8fa45f36 Merge pull request '权限系统' (#55) from feature/permsystem into master
Some checks failed
continuous-integration/drone/push Build is failing
Reviewed-on: #55
2026-03-07 17:55:27 +08:00
10 changed files with 420 additions and 185 deletions

View File

@ -39,7 +39,7 @@ steps:
commands: commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov-report term-missing:skip-covered - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov=./konabot/ --cov-report term-missing:skip-covered
- name: 发送构建结果到 ntfy - name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy image: parrazam/drone-ntfy
when: when:

View File

@ -61,6 +61,7 @@ COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets COPY assets ./assets
COPY scripts ./scripts COPY scripts ./scripts
COPY konabot ./konabot COPY konabot ./konabot
COPY tests ./tests
ENV PYTHONPATH=/app ENV PYTHONPATH=/app

View File

@ -1,12 +1,14 @@
import re import re
from nonebot import get_plugin_config, on_message from nonebot import get_plugin_config, on_message
from nonebot.rule import Rule
from nonebot_plugin_alconna import Reference, Reply, UniMsg from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event from nonebot.adapters import Event
from nonebot.adapters.onebot.v11.event import GroupMessageEvent as OB11GroupEvent
from pydantic import BaseModel from pydantic import BaseModel
from konabot.common.permsys import require_permission
class Config(BaseModel): class Config(BaseModel):
bilifetch_enabled_groups: list[int] = [] bilifetch_enabled_groups: list[int] = []
@ -19,11 +21,7 @@ pattern = (
) )
def _rule(msg: UniMsg, evt: Event) -> bool: def _rule(msg: UniMsg) -> bool:
if isinstance(evt, OB11GroupEvent):
if evt.group_id not in config.bilifetch_enabled_groups:
return False
to_search = msg.exclude(Reply, Reference).dump(json=True) to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text() to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2): if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
@ -31,11 +29,11 @@ def _rule(msg: UniMsg, evt: Event) -> bool:
return True return True
matcher_fix = on_message(rule=_rule) matcher_fix = on_message(rule=Rule(_rule) & require_permission("bilifetch"))
@matcher_fix.handle() @matcher_fix.handle()
async def _(event: Event): async def _(event: Event):
from nonebot_plugin_analysis_bilibili import handle_analysis from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event) await handle_analysis(event)

View File

@ -70,7 +70,7 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
await target.send_message(res) await target.send_message(res)
return return
env = TextHandlerEnvironment(is_trusted=False) env = TextHandlerEnvironment(is_trusted=False, event=evt)
results = await runner.run_pipeline(res, istream or None, env) results = await runner.run_pipeline(res, istream or None, env)
# 检查是否有错误 # 检查是否有错误

View File

@ -7,11 +7,13 @@ from string import whitespace
from typing import cast from typing import cast
from loguru import logger from loguru import logger
from nonebot.adapters import Event
@dataclass @dataclass
class TextHandlerEnvironment: class TextHandlerEnvironment:
is_trusted: bool is_trusted: bool
event: Event | None = None
buffers: dict[str, str] = field(default_factory=dict) buffers: dict[str, str] = field(default_factory=dict)
@ -287,7 +289,7 @@ class PipelineRunner:
env: TextHandlerEnvironment | None = None, env: TextHandlerEnvironment | None = None,
) -> list[TextHandleResult]: ) -> list[TextHandleResult]:
if env is None: if env is None:
env = TextHandlerEnvironment(is_trusted=False, buffers={}) env = TextHandlerEnvironment(is_trusted=False, event=None, buffers={})
results: list[TextHandleResult] = [] results: list[TextHandleResult] = []

View File

@ -1,36 +1,51 @@
from typing import Any, cast from typing import Any, cast
from konabot.common.llm import get_llm from konabot.common.llm import get_llm
from konabot.plugins.handle_text.base import TextHandler, TextHandlerEnvironment, TextHandleResult from konabot.common.permsys import perm_manager
from konabot.plugins.handle_text.base import (
TextHandler,
TextHandlerEnvironment,
TextHandleResult,
)
class THQwen(TextHandler): class THQwen(TextHandler):
name = "qwen" name = "qwen"
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
pm = perm_manager()
if env.event is None or not pm.check_has_permission(env.event, "textfx.qwen"):
return TextHandleResult(
code=1,
ostream="这里暂未开启 AI 功能",
)
llm = get_llm() llm = get_llm()
messages = [] messages = []
if istream is not None: if istream is not None:
messages.append({ messages.append({"role": "user", "content": istream})
"role": "user",
"content": istream
})
if len(args) > 0: if len(args) > 0:
message = ' '.join(args) message = " ".join(args)
messages.append({ messages.append(
"role": "user", {
"content": message, "role": "user",
}) "content": message,
}
)
if len(messages) == 0: if len(messages) == 0:
return TextHandleResult( return TextHandleResult(
code=1, code=1,
ostream="使用方法qwen <提示词>", ostream="使用方法qwen <提示词>",
) )
messages = [{ messages = [
"role": "system", {
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答" "role": "system",
}] + messages "content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答",
}
] + messages
result = await llm.chat(cast(Any, messages)) result = await llm.chat(cast(Any, messages))
content = result.content content = result.content
if content is None: if content is None:

View File

@ -2,22 +2,39 @@ import datetime
from math import ceil from math import ceil
from nonebot import get_plugin_config from nonebot import get_plugin_config
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query, from nonebot.adapters import Event
Subcommand, SubcommandResult, UniMessage, from nonebot_plugin_alconna import (
on_alconna) Alconna,
Args,
Image,
Option,
Query,
Subcommand,
SubcommandResult,
UniMessage,
on_alconna,
)
from pydantic import BaseModel from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.permsys import DepPermManager, require_permission
from konabot.common.username import get_username from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list, from konabot.plugins.kona_ph.core.message import (
get_puzzle_info_message, get_puzzle_description,
get_submission_message) get_puzzle_hint_list,
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager, get_puzzle_info_message,
get_today_date, get_submission_message,
puzzle_manager) )
from konabot.plugins.kona_ph.core.storage import (
Puzzle,
PuzzleHint,
PuzzleManager,
get_today_date,
puzzle_manager,
)
from konabot.plugins.poster.service import broadcast from konabot.plugins.poster.service import broadcast
PUZZLE_PAGE_SIZE = 10 PUZZLE_PAGE_SIZE = 10
@ -32,19 +49,17 @@ class PuzzleConfig(BaseModel):
config = get_plugin_config(PuzzleConfig) config = get_plugin_config(PuzzleConfig)
def is_puzzle_manager(target: DepLongTaskTarget): async def check_puzzle(
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target) manager: PuzzleManager,
perm: DepPermManager,
raw_id: str,
def is_puzzle_admin(target: DepLongTaskTarget): event: Event,
return target.target_id in config.plugin_puzzle_admin target: DepLongTaskTarget,
) -> Puzzle:
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
if raw_id not in manager.puzzle_data: if raw_id not in manager.puzzle_data:
raise BotExceptionMessage("没有这个谜题") raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id] puzzle = manager.puzzle_data[raw_id]
if is_puzzle_admin(target): if await perm.check_has_permission(event, "konaph.admin"):
return puzzle return puzzle
if target.target_id != puzzle.author_id: if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题") raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
@ -60,7 +75,9 @@ def create_admin_commands():
Subcommand("unready", Args["raw_id", str], dest="unready"), Subcommand("unready", Args["raw_id", str], dest="unready"),
Subcommand("info", Args["raw_id", str], dest="info"), Subcommand("info", Args["raw_id", str], dest="info"),
Subcommand("my", Args["page?", int], dest="my"), Subcommand("my", Args["page?", int], dest="my"),
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"), Subcommand(
"all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"
),
Subcommand("pin", Args["raw_id?", str], dest="pin"), Subcommand("pin", Args["raw_id?", str], dest="pin"),
Subcommand("unpin", dest="unpin"), Subcommand("unpin", dest="unpin"),
Subcommand( Subcommand(
@ -115,11 +132,11 @@ def create_admin_commands():
dest="hint", dest="hint",
), ),
), ),
rule=is_puzzle_manager, rule=require_permission("konaph.manager"),
) )
@cmd_admin.assign("$main") @cmd_admin.assign("$main")
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event):
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n") msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
msg = msg.text("konaph create - 创建一个新的谜题\n") msg = msg.text("konaph create - 创建一个新的谜题\n")
msg = msg.text("konaph ready <id> - 准备好一道谜题\n") msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
@ -132,7 +149,7 @@ def create_admin_commands():
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n") msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n") msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target): if await pm.check_has_permission(event, "konaph.admin"):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n") msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
msg = msg.text("konaph pin - 查看当前置顶谜题\n") msg = msg.text("konaph pin - 查看当前置顶谜题\n")
msg = msg.text("konaph pin <id> - 置顶一个谜题\n") msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
@ -145,48 +162,54 @@ def create_admin_commands():
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzle = manager.admin_create_puzzle(target.target_id) puzzle = manager.admin_create_puzzle(target.target_id)
await target.send_message(UniMessage.text( await target.send_message(
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n" UniMessage.text(
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n" f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
f"- 输入 `konaph my` 查看你创建的谜题\n" f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
f"- 输入 `konaph modify` 查看更改谜题的方法" f"- 输入 `konaph my` 查看你创建的谜题\n"
)) f"- 输入 `konaph modify` 查看更改谜题的方法"
)
)
@cmd_admin.assign("ready") @cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if p.ready: if p.ready:
return await target.send_message(UniMessage.text( return await target.send_message(UniMessage.text("题目早就准备好啦!"))
"题目早就准备好啦!"
))
p.ready = True p.ready = True
await target.send_message(UniMessage.text( await target.send_message(
f"谜题「{p.title}」已经准备就绪!" UniMessage.text(f"谜题「{p.title}」已经准备就绪!")
)) )
@cmd_admin.assign("unready") @cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if not p.ready: if not p.ready:
return await target.send_message(UniMessage.text( return await target.send_message(
f"谜题「{p.title}」已经是未取消状态了!" UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!")
)) )
if manager.is_puzzle_published(p.raw_id): if manager.is_puzzle_published(p.raw_id):
return await target.send_message(UniMessage.text( return await target.send_message(
"已发布的谜题不能取消准备状态!" UniMessage.text("已发布的谜题不能取消准备状态!")
)) )
p.ready = False p.ready = False
await target.send_message(UniMessage.text( await target.send_message(
f"谜题「{p.title}」已经取消准备!" UniMessage.text(f"谜题「{p.title}」已经取消准备!")
)) )
@cmd_admin.assign("info") @cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_info_message(manager, p)) await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my") @cmd_admin.assign("my")
@ -194,15 +217,15 @@ def create_admin_commands():
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzles = manager.get_puzzles_of_user(target.target_id) puzzles = manager.get_puzzles_of_user(target.target_id)
if len(puzzles) == 0: if len(puzzles) == 0:
return await target.send_message(UniMessage.text( return await target.send_message(
"你没有谜题哦,使用 `konaph create` 创建一个吧!" UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!")
)) )
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages: if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text( return await target.send_message(
f"页数只有 1 ~ {count_pages} 啦!" UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)) )
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 我的谜题 ====\n\n") message = UniMessage.text("==== 我的谜题 ====\n\n")
for p in puzzles: for p in puzzles:
message = message.text("- ") message = message.text("- ")
@ -220,11 +243,15 @@ def create_admin_commands():
await target.send_message(message) await target.send_message(message)
@cmd_admin.assign("all") @cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1): async def _(
if not is_puzzle_admin(target): target: DepLongTaskTarget,
return await target.send_message(UniMessage.text( event: Event,
"你没有权限使用该指令" perm: DepPermManager,
)) ready: Query[bool] = Query("all.ready"),
page: int = 1,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()] puzzles = [*manager.puzzle_data.values()]
if ready.available: if ready.available:
@ -232,10 +259,10 @@ def create_admin_commands():
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True) puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages: if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text( return await target.send_message(
f"页数只有 1 ~ {count_pages} 啦!" UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)) )
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 所有谜题 ====\n\n") message = UniMessage.text("==== 所有谜题 ====\n\n")
for p in puzzles: for p in puzzles:
message = message.text("- ") message = message.text("- ")
@ -253,32 +280,30 @@ def create_admin_commands():
await target.send_message(message) await target.send_message(message)
@cmd_admin.assign("pin") @cmd_admin.assign("pin")
async def _(target: DepLongTaskTarget, raw_id: str = ""): async def _(
if not is_puzzle_admin(target): target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = ""
return await target.send_message(UniMessage.text( ):
"你没有权限使用该指令" if not perm.check_has_permission(event, "konaph.admin"):
)) return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if raw_id == "": if raw_id == "":
if manager.puzzle_pinned: if manager.puzzle_pinned:
return await target.send_message(UniMessage.text( return await target.send_message(
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}" UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}")
)) )
return await target.send_message("没有置顶谜题") return await target.send_message("没有置顶谜题")
if raw_id not in manager.unpublished_puzzles: if raw_id not in manager.unpublished_puzzles:
return await target.send_message(UniMessage.text( return await target.send_message(
"这个谜题已经发布了,或者还没准备好,或者不存在" UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在")
)) )
manager.admin_pin_puzzle(raw_id) manager.admin_pin_puzzle(raw_id)
return await target.send_message(f"已置顶谜题 {raw_id}") return await target.send_message(f"已置顶谜题 {raw_id}")
@cmd_admin.assign("unpin") @cmd_admin.assign("unpin")
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager):
if not is_puzzle_admin(target): if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text( return await target.send_message(UniMessage.text("你没有权限使用该指令"))
"你没有权限使用该指令"
))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
manager.admin_pin_puzzle("") manager.admin_pin_puzzle("")
return await target.send_message("已取消所有置顶") return await target.send_message("已取消所有置顶")
@ -286,6 +311,8 @@ def create_admin_commands():
@cmd_admin.assign("modify") @cmd_admin.assign("modify")
async def _( async def _(
target: DepLongTaskTarget, target: DepLongTaskTarget,
event: Event,
perm: DepPermManager,
raw_id: str = "", raw_id: str = "",
title: str | None = None, title: str | None = None,
description: str | None = None, description: str | None = None,
@ -306,7 +333,7 @@ def create_admin_commands():
image_manager = get_image_manager() image_manager = get_image_manager()
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if title is not None: if title is not None:
p.title = title p.title = title
if description is not None: if description is not None:
@ -329,11 +356,14 @@ def create_admin_commands():
return await target.send_message("修改好啦!看看效果:\n\n" + info2) return await target.send_message("修改好啦!看看效果:\n\n" + info2)
@cmd_admin.assign("publish") @cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None): async def _(
if not is_puzzle_admin(target): target: DepLongTaskTarget,
return await target.send_message(UniMessage.text( event: Event,
"你没有权限使用该指令" perm: DepPermManager,
)) raw_id: str | None = None,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
today = get_today_date() today = get_today_date()
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date: if today in manager.daily_puzzle_of_date:
@ -348,46 +378,64 @@ def create_admin_commands():
return await target.send_message("Ok!") return await target.send_message("Ok!")
@cmd_admin.assign("preview") @cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str): async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
return await target.send_message(get_puzzle_description(p)) return await target.send_message(get_puzzle_description(p))
@cmd_admin.assign("get-submits") @cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str): async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id) puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None: if puzzle is None:
return await target.send_message("没有这个谜题") return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id: if (
not perm.check_has_permission(event, "konaph.admin")
and target.target_id != puzzle.author_id
):
return await target.send_message("你没有权限预览这个谜题") return await target.send_message("你没有权限预览这个谜题")
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n") msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
submits = manager.submissions.get(raw_id, {}) submits = manager.submissions.get(raw_id, {})
for uid, ls in submits.items(): for uid, ls in submits.items():
s = ', '.join((i.flag for i in ls)) s = ", ".join((i.flag for i in ls))
msg = msg.text(f"- {get_username(uid)}{s}\n") msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg) return await target.send_message(msg)
@cmd_admin.assign("test") @cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str): async def _(
target: DepLongTaskTarget,
raw_id: str,
submission: str,
event: Event,
perm: DepPermManager,
):
""" """
测试一道谜题的回答,并给出结果 测试一道谜题的回答,并给出结果
""" """
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
result = p.check_submission(submission) result = p.check_submission(submission)
msg = get_submission_message(p, result) msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg) return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint") @cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")): async def _(
target: DepLongTaskTarget,
subcommands: Query[SubcommandResult] = Query("subcommands.hint"),
):
if len(subcommands.result.subcommands) > 0: if len(subcommands.result.subcommands) > 0:
return return
return await target.send_message( return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n") UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n") .text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n") .text(
"- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n"
)
.text("- konaph hint modify <id> <hint_id>\n") .text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n") .text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n") .text(" - --message <message>\n - 更改提示文本\n")
@ -402,9 +450,11 @@ def create_admin_commands():
raw_id: str, raw_id: str,
pattern: str, pattern: str,
message: str, message: str,
event: Event,
perm: DepPermManager,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
p.hints[p.hint_id_max + 1] = PuzzleHint( p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern, pattern=pattern,
message=message, message=message,
@ -416,9 +466,11 @@ def create_admin_commands():
async def _( async def _(
target: DepLongTaskTarget, target: DepLongTaskTarget,
raw_id: str, raw_id: str,
event: Event,
perm: DepPermManager,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_hint_list(p)) await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify") @cmd_admin.assign("subcommands.hint.modify")
@ -426,12 +478,14 @@ def create_admin_commands():
target: DepLongTaskTarget, target: DepLongTaskTarget,
raw_id: str, raw_id: str,
hint_id: int, hint_id: int,
event: Event,
perm: DepPermManager,
pattern: str | None = None, pattern: str | None = None,
message: str | None = None, message: str | None = None,
is_checkpoint: bool | None = None, is_checkpoint: bool | None = None,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints: if hint_id not in p.hints:
raise BotExceptionMessage( raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单" f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -450,9 +504,11 @@ def create_admin_commands():
target: DepLongTaskTarget, target: DepLongTaskTarget,
raw_id: str, raw_id: str,
hint_id: int, hint_id: int,
event: Event,
perm: DepPermManager,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints: if hint_id not in p.hints:
raise BotExceptionMessage( raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单" f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -460,5 +516,4 @@ def create_admin_commands():
del p.hints[hint_id] del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p)) await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin return cmd_admin

View File

@ -1,57 +0,0 @@
import asyncio
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import UniMessage
from konabot.common.nb.is_admin import is_admin
from mcstatus.responses import JavaStatusResponse
cmd = on_command("宾几人", aliases=set(("宾人数", "mcbingo")), rule=is_admin)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join((
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
))
await UniMessage.text(messages).finish(evt, at_sender=False)

View File

@ -0,0 +1,131 @@
import asyncio
import datetime
from typing import Literal
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from mcstatus.responses import JavaStatusResponse
from nonebot_plugin_apscheduler import scheduler
from konabot.common.permsys import DepPermManager, require_permission
from konabot.plugins.minecraft_servers.simpfun_server import SimpfunServer
cmd = on_command(
"宾几人",
aliases=set(("宾人数", "mcbingo")),
rule=require_permission("minecraft.bingo.check"),
)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event, pm: DepPermManager):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join(
(
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
)
)
if await pm.check_has_permission(evt, "minecraft.bingo.manipulate"):
messages += "\n\n---\n\n你可以使用 bingoman start 开启小帕的 bingo 服,用 bingoman stop 关闭小帕的 bingo 服"
await UniMessage.text(messages).finish(evt, at_sender=False)
cmd_bingo_manipulate = on_alconna(
Alconna("bingoman", Args["action", str]),
aliases=("宾服务器", "bingo服"),
rule=require_permission("minecraft.bingo.manipulate"),
)
actions: dict[str, Literal["start", "stop", "restart", "kill"]] = {
"up": "start",
"down": "stop",
"start": "start",
"stop": "stop",
"开机": "start",
"关机": "stop",
"restart": "restart",
"kill": "kill",
"重启": "restart",
}
@cmd_bingo_manipulate.handle()
async def _(action: str, event: Event):
server = SimpfunServer.new() # 使用默认配置管理服务器
a = actions.get(action.lower().strip())
if a is None:
await UniMessage.text(f"操作 {action} 不存在").send(event, at_sender=True)
return
resp = await server.power(a)
if resp.code == 200:
await UniMessage.text("好了").send(event, at_sender=True)
else:
await UniMessage.text(f"不好:{resp}").send(event, at_sender=True)
@scheduler.scheduled_job("cron", hour="4,23")
async def _():
server = SimpfunServer.new()
today = datetime.datetime.now()
# 获取服务器当前状态,重试多次以保证不会误判服务器未开启
server_up = False
server_players = 0
for _ in range(3):
mcs = mcstatus.JavaServer("play.simpfun.cn", 11495)
try:
resp = await mcs.async_status()
server_up = True
server_players = resp.players.online
except Exception:
pass
if today.weekday() == 5 and today.hour < 12:
# 每周六开机一天,保证可以让服务器不被自动销毁
if not server_up:
await server.power("start")
else:
# 每用一个自然日都会计费,所以要赶在这一天结束之前关服
# 平时如果没人,也自动关上
if server_up and server_players == 0:
await server.power("stop")

View File

@ -0,0 +1,90 @@
from dataclasses import dataclass
import datetime
from typing import Literal
import aiohttp
from pydantic import BaseModel
class SimpfunServerConfig(BaseModel):
plugin_simpfun_api_key: str = ""
plugin_simpfun_base_url: str = "https://api.simpfun.cn"
plugin_simpfun_instance_id: int = 0
def get_config():
from nonebot import get_plugin_config
return get_plugin_config(SimpfunServerConfig)
class PowerManageResult(BaseModel):
code: int
status: bool
msg: str
class SimpfunServerDetailUtilization(BaseModel):
memory_bytes: int
cpu_absolute: float
disk_bytes: int
network_rx_bytes: int
network_tx_bytes: int
uptime: float
disk_last_check_time: datetime.datetime
class SimpfunServerDetailData(BaseModel):
id: int
name: str
is_pro: bool
status: str
"运行中的话,是 running"
is_suspended: bool
utilization: SimpfunServerDetailUtilization
class SimpfunServerDetailResp(BaseModel):
code: int
data: SimpfunServerDetailData
@dataclass
class SimpfunServer:
instance_id: int
api_key: str
base_url: str
async def power(
self, action: Literal["start", "stop", "restart", "kill"]
) -> PowerManageResult:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url, params={"action": action}) as resp:
resp.raise_for_status()
return PowerManageResult.model_validate_json(await resp.read())
async def detail(self) -> SimpfunServerDetailResp:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url) as resp:
resp.raise_for_status()
return SimpfunServerDetailResp.model_validate_json(await resp.read())
@staticmethod
def new(config: SimpfunServerConfig | None = None):
if config is None:
config = get_config()
return SimpfunServer(
instance_id=config.plugin_simpfun_instance_id,
api_key=config.plugin_simpfun_api_key,
base_url=config.plugin_simpfun_base_url,
)