Compare commits

...

7 Commits

Author SHA1 Message Date
f3389ff2b9 添加服务器管理相关,以及 cronjob
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 03:34:14 +08:00
e59d3c2e4b 哎哟喂这个文件怎么没交
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 00:40:11 +08:00
31d19b7ec0 我没辙了直接把测试打包进去吧
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-07 18:41:59 +08:00
c2f677911d 添加一些权限目标
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:36:51 +08:00
f5b81319f8 konaph 接入权限系统
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:15:28 +08:00
870e2383d8 为 Drone 提供单元测试目录 2026-03-07 18:15:16 +08:00
7e8fa45f36 Merge pull request '权限系统' (#55) from feature/permsystem into master
Some checks failed
continuous-integration/drone/push Build is failing
Reviewed-on: #55
2026-03-07 17:55:27 +08:00
10 changed files with 420 additions and 185 deletions

View File

@ -39,7 +39,7 @@ steps:
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov-report term-missing:skip-covered
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov=./konabot/ --cov-report term-missing:skip-covered
- name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy
when:

View File

@ -61,6 +61,7 @@ COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets
COPY scripts ./scripts
COPY konabot ./konabot
COPY tests ./tests
ENV PYTHONPATH=/app

View File

@ -1,12 +1,14 @@
import re
from nonebot import get_plugin_config, on_message
from nonebot.rule import Rule
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
from nonebot.adapters.onebot.v11.event import GroupMessageEvent as OB11GroupEvent
from pydantic import BaseModel
from konabot.common.permsys import require_permission
class Config(BaseModel):
bilifetch_enabled_groups: list[int] = []
@ -19,11 +21,7 @@ pattern = (
)
def _rule(msg: UniMsg, evt: Event) -> bool:
if isinstance(evt, OB11GroupEvent):
if evt.group_id not in config.bilifetch_enabled_groups:
return False
def _rule(msg: UniMsg) -> bool:
to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
@ -31,11 +29,11 @@ def _rule(msg: UniMsg, evt: Event) -> bool:
return True
matcher_fix = on_message(rule=_rule)
matcher_fix = on_message(rule=Rule(_rule) & require_permission("bilifetch"))
@matcher_fix.handle()
async def _(event: Event):
from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event)

View File

@ -70,7 +70,7 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
await target.send_message(res)
return
env = TextHandlerEnvironment(is_trusted=False)
env = TextHandlerEnvironment(is_trusted=False, event=evt)
results = await runner.run_pipeline(res, istream or None, env)
# 检查是否有错误

View File

@ -7,11 +7,13 @@ from string import whitespace
from typing import cast
from loguru import logger
from nonebot.adapters import Event
@dataclass
class TextHandlerEnvironment:
is_trusted: bool
event: Event | None = None
buffers: dict[str, str] = field(default_factory=dict)
@ -287,7 +289,7 @@ class PipelineRunner:
env: TextHandlerEnvironment | None = None,
) -> list[TextHandleResult]:
if env is None:
env = TextHandlerEnvironment(is_trusted=False, buffers={})
env = TextHandlerEnvironment(is_trusted=False, event=None, buffers={})
results: list[TextHandleResult] = []

View File

@ -1,36 +1,51 @@
from typing import Any, cast
from konabot.common.llm import get_llm
from konabot.plugins.handle_text.base import TextHandler, TextHandlerEnvironment, TextHandleResult
from konabot.common.permsys import perm_manager
from konabot.plugins.handle_text.base import (
TextHandler,
TextHandlerEnvironment,
TextHandleResult,
)
class THQwen(TextHandler):
name = "qwen"
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
pm = perm_manager()
if env.event is None or not pm.check_has_permission(env.event, "textfx.qwen"):
return TextHandleResult(
code=1,
ostream="这里暂未开启 AI 功能",
)
llm = get_llm()
messages = []
if istream is not None:
messages.append({
"role": "user",
"content": istream
})
messages.append({"role": "user", "content": istream})
if len(args) > 0:
message = ' '.join(args)
messages.append({
"role": "user",
"content": message,
})
message = " ".join(args)
messages.append(
{
"role": "user",
"content": message,
}
)
if len(messages) == 0:
return TextHandleResult(
code=1,
ostream="使用方法qwen <提示词>",
)
messages = [{
"role": "system",
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答"
}] + messages
messages = [
{
"role": "system",
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答",
}
] + messages
result = await llm.chat(cast(Any, messages))
content = result.content
if content is None:

View File

@ -2,22 +2,39 @@ import datetime
from math import ceil
from nonebot import get_plugin_config
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
Subcommand, SubcommandResult, UniMessage,
on_alconna)
from nonebot.adapters import Event
from nonebot_plugin_alconna import (
Alconna,
Args,
Image,
Option,
Query,
Subcommand,
SubcommandResult,
UniMessage,
on_alconna,
)
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.permsys import DepPermManager, require_permission
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
get_puzzle_info_message,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
get_today_date,
puzzle_manager)
from konabot.plugins.kona_ph.core.message import (
get_puzzle_description,
get_puzzle_hint_list,
get_puzzle_info_message,
get_submission_message,
)
from konabot.plugins.kona_ph.core.storage import (
Puzzle,
PuzzleHint,
PuzzleManager,
get_today_date,
puzzle_manager,
)
from konabot.plugins.poster.service import broadcast
PUZZLE_PAGE_SIZE = 10
@ -32,19 +49,17 @@ class PuzzleConfig(BaseModel):
config = get_plugin_config(PuzzleConfig)
def is_puzzle_manager(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
async def check_puzzle(
manager: PuzzleManager,
perm: DepPermManager,
raw_id: str,
event: Event,
target: DepLongTaskTarget,
) -> Puzzle:
if raw_id not in manager.puzzle_data:
raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id]
if is_puzzle_admin(target):
if await perm.check_has_permission(event, "konaph.admin"):
return puzzle
if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
@ -60,7 +75,9 @@ def create_admin_commands():
Subcommand("unready", Args["raw_id", str], dest="unready"),
Subcommand("info", Args["raw_id", str], dest="info"),
Subcommand("my", Args["page?", int], dest="my"),
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
Subcommand(
"all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"
),
Subcommand("pin", Args["raw_id?", str], dest="pin"),
Subcommand("unpin", dest="unpin"),
Subcommand(
@ -115,11 +132,11 @@ def create_admin_commands():
dest="hint",
),
),
rule=is_puzzle_manager,
rule=require_permission("konaph.manager"),
)
@cmd_admin.assign("$main")
async def _(target: DepLongTaskTarget):
async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event):
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
msg = msg.text("konaph create - 创建一个新的谜题\n")
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
@ -132,7 +149,7 @@ def create_admin_commands():
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target):
if await pm.check_has_permission(event, "konaph.admin"):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
@ -145,48 +162,54 @@ def create_admin_commands():
async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager:
puzzle = manager.admin_create_puzzle(target.target_id)
await target.send_message(UniMessage.text(
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
f"- 输入 `konaph my` 查看你创建的谜题\n"
f"- 输入 `konaph modify` 查看更改谜题的方法"
))
await target.send_message(
UniMessage.text(
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
f"- 输入 `konaph my` 查看你创建的谜题\n"
f"- 输入 `konaph modify` 查看更改谜题的方法"
)
)
@cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget):
async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if p.ready:
return await target.send_message(UniMessage.text(
"题目早就准备好啦!"
))
return await target.send_message(UniMessage.text("题目早就准备好啦!"))
p.ready = True
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经准备就绪!"
))
await target.send_message(
UniMessage.text(f"谜题「{p.title}」已经准备就绪!")
)
@cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget):
async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if not p.ready:
return await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经是未取消状态了!"
))
return await target.send_message(
UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!")
)
if manager.is_puzzle_published(p.raw_id):
return await target.send_message(UniMessage.text(
"已发布的谜题不能取消准备状态!"
))
return await target.send_message(
UniMessage.text("已发布的谜题不能取消准备状态!")
)
p.ready = False
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经取消准备!"
))
await target.send_message(
UniMessage.text(f"谜题「{p.title}」已经取消准备!")
)
@cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget):
async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my")
@ -194,15 +217,15 @@ def create_admin_commands():
async with puzzle_manager() as manager:
puzzles = manager.get_puzzles_of_user(target.target_id)
if len(puzzles) == 0:
return await target.send_message(UniMessage.text(
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
))
return await target.send_message(
UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!")
)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
return await target.send_message(
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 我的谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
@ -220,11 +243,15 @@ def create_admin_commands():
await target.send_message(message)
@cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async def _(
target: DepLongTaskTarget,
event: Event,
perm: DepPermManager,
ready: Query[bool] = Query("all.ready"),
page: int = 1,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()]
if ready.available:
@ -232,10 +259,10 @@ def create_admin_commands():
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
return await target.send_message(
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 所有谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
@ -253,32 +280,30 @@ def create_admin_commands():
await target.send_message(message)
@cmd_admin.assign("pin")
async def _(target: DepLongTaskTarget, raw_id: str = ""):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = ""
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager:
if raw_id == "":
if manager.puzzle_pinned:
return await target.send_message(UniMessage.text(
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
))
return await target.send_message(
UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}")
)
return await target.send_message("没有置顶谜题")
if raw_id not in manager.unpublished_puzzles:
return await target.send_message(UniMessage.text(
"这个谜题已经发布了,或者还没准备好,或者不存在"
))
return await target.send_message(
UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在")
)
manager.admin_pin_puzzle(raw_id)
return await target.send_message(f"已置顶谜题 {raw_id}")
@cmd_admin.assign("unpin")
async def _(target: DepLongTaskTarget):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager:
manager.admin_pin_puzzle("")
return await target.send_message("已取消所有置顶")
@ -286,6 +311,8 @@ def create_admin_commands():
@cmd_admin.assign("modify")
async def _(
target: DepLongTaskTarget,
event: Event,
perm: DepPermManager,
raw_id: str = "",
title: str | None = None,
description: str | None = None,
@ -306,7 +333,7 @@ def create_admin_commands():
image_manager = get_image_manager()
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if title is not None:
p.title = title
if description is not None:
@ -329,11 +356,14 @@ def create_admin_commands():
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
@cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
if not is_puzzle_admin(target):
return await target.send_message(UniMessage.text(
"你没有权限使用该指令"
))
async def _(
target: DepLongTaskTarget,
event: Event,
perm: DepPermManager,
raw_id: str | None = None,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
today = get_today_date()
async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date:
@ -348,46 +378,64 @@ def create_admin_commands():
return await target.send_message("Ok!")
@cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str):
async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
return await target.send_message(get_puzzle_description(p))
@cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str):
async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None:
return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
if (
not perm.check_has_permission(event, "konaph.admin")
and target.target_id != puzzle.author_id
):
return await target.send_message("你没有权限预览这个谜题")
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
submits = manager.submissions.get(raw_id, {})
for uid, ls in submits.items():
s = ', '.join((i.flag for i in ls))
s = ", ".join((i.flag for i in ls))
msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg)
@cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
async def _(
target: DepLongTaskTarget,
raw_id: str,
submission: str,
event: Event,
perm: DepPermManager,
):
"""
测试一道谜题的回答,并给出结果
"""
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
result = p.check_submission(submission)
msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
async def _(
target: DepLongTaskTarget,
subcommands: Query[SubcommandResult] = Query("subcommands.hint"),
):
if len(subcommands.result.subcommands) > 0:
return
return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
.text(
"- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n"
)
.text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n")
@ -402,9 +450,11 @@ def create_admin_commands():
raw_id: str,
pattern: str,
message: str,
event: Event,
perm: DepPermManager,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern,
message=message,
@ -416,9 +466,11 @@ def create_admin_commands():
async def _(
target: DepLongTaskTarget,
raw_id: str,
event: Event,
perm: DepPermManager,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify")
@ -426,12 +478,14 @@ def create_admin_commands():
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
event: Event,
perm: DepPermManager,
pattern: str | None = None,
message: str | None = None,
is_checkpoint: bool | None = None,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -450,9 +504,11 @@ def create_admin_commands():
target: DepLongTaskTarget,
raw_id: str,
hint_id: int,
event: Event,
perm: DepPermManager,
):
async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id)
p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints:
raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -460,5 +516,4 @@ def create_admin_commands():
del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin

View File

@ -1,57 +0,0 @@
import asyncio
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import UniMessage
from konabot.common.nb.is_admin import is_admin
from mcstatus.responses import JavaStatusResponse
cmd = on_command("宾几人", aliases=set(("宾人数", "mcbingo")), rule=is_admin)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join((
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
))
await UniMessage.text(messages).finish(evt, at_sender=False)

View File

@ -0,0 +1,131 @@
import asyncio
import datetime
from typing import Literal
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from mcstatus.responses import JavaStatusResponse
from nonebot_plugin_apscheduler import scheduler
from konabot.common.permsys import DepPermManager, require_permission
from konabot.plugins.minecraft_servers.simpfun_server import SimpfunServer
cmd = on_command(
"宾几人",
aliases=set(("宾人数", "mcbingo")),
rule=require_permission("minecraft.bingo.check"),
)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event, pm: DepPermManager):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join(
(
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
)
)
if await pm.check_has_permission(evt, "minecraft.bingo.manipulate"):
messages += "\n\n---\n\n你可以使用 bingoman start 开启小帕的 bingo 服,用 bingoman stop 关闭小帕的 bingo 服"
await UniMessage.text(messages).finish(evt, at_sender=False)
cmd_bingo_manipulate = on_alconna(
Alconna("bingoman", Args["action", str]),
aliases=("宾服务器", "bingo服"),
rule=require_permission("minecraft.bingo.manipulate"),
)
actions: dict[str, Literal["start", "stop", "restart", "kill"]] = {
"up": "start",
"down": "stop",
"start": "start",
"stop": "stop",
"开机": "start",
"关机": "stop",
"restart": "restart",
"kill": "kill",
"重启": "restart",
}
@cmd_bingo_manipulate.handle()
async def _(action: str, event: Event):
server = SimpfunServer.new() # 使用默认配置管理服务器
a = actions.get(action.lower().strip())
if a is None:
await UniMessage.text(f"操作 {action} 不存在").send(event, at_sender=True)
return
resp = await server.power(a)
if resp.code == 200:
await UniMessage.text("好了").send(event, at_sender=True)
else:
await UniMessage.text(f"不好:{resp}").send(event, at_sender=True)
@scheduler.scheduled_job("cron", hour="4,23")
async def _():
server = SimpfunServer.new()
today = datetime.datetime.now()
# 获取服务器当前状态,重试多次以保证不会误判服务器未开启
server_up = False
server_players = 0
for _ in range(3):
mcs = mcstatus.JavaServer("play.simpfun.cn", 11495)
try:
resp = await mcs.async_status()
server_up = True
server_players = resp.players.online
except Exception:
pass
if today.weekday() == 5 and today.hour < 12:
# 每周六开机一天,保证可以让服务器不被自动销毁
if not server_up:
await server.power("start")
else:
# 每用一个自然日都会计费,所以要赶在这一天结束之前关服
# 平时如果没人,也自动关上
if server_up and server_players == 0:
await server.power("stop")

View File

@ -0,0 +1,90 @@
from dataclasses import dataclass
import datetime
from typing import Literal
import aiohttp
from pydantic import BaseModel
class SimpfunServerConfig(BaseModel):
plugin_simpfun_api_key: str = ""
plugin_simpfun_base_url: str = "https://api.simpfun.cn"
plugin_simpfun_instance_id: int = 0
def get_config():
from nonebot import get_plugin_config
return get_plugin_config(SimpfunServerConfig)
class PowerManageResult(BaseModel):
code: int
status: bool
msg: str
class SimpfunServerDetailUtilization(BaseModel):
memory_bytes: int
cpu_absolute: float
disk_bytes: int
network_rx_bytes: int
network_tx_bytes: int
uptime: float
disk_last_check_time: datetime.datetime
class SimpfunServerDetailData(BaseModel):
id: int
name: str
is_pro: bool
status: str
"运行中的话,是 running"
is_suspended: bool
utilization: SimpfunServerDetailUtilization
class SimpfunServerDetailResp(BaseModel):
code: int
data: SimpfunServerDetailData
@dataclass
class SimpfunServer:
instance_id: int
api_key: str
base_url: str
async def power(
self, action: Literal["start", "stop", "restart", "kill"]
) -> PowerManageResult:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url, params={"action": action}) as resp:
resp.raise_for_status()
return PowerManageResult.model_validate_json(await resp.read())
async def detail(self) -> SimpfunServerDetailResp:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url) as resp:
resp.raise_for_status()
return SimpfunServerDetailResp.model_validate_json(await resp.read())
@staticmethod
def new(config: SimpfunServerConfig | None = None):
if config is None:
config = get_config()
return SimpfunServer(
instance_id=config.plugin_simpfun_instance_id,
api_key=config.plugin_simpfun_api_key,
base_url=config.plugin_simpfun_base_url,
)