Compare commits
7 Commits
feature/pe
...
f3389ff2b9
| Author | SHA1 | Date | |
|---|---|---|---|
|
f3389ff2b9
|
|||
|
e59d3c2e4b
|
|||
|
31d19b7ec0
|
|||
|
c2f677911d
|
|||
|
f5b81319f8
|
|||
|
870e2383d8
|
|||
| 7e8fa45f36 |
@ -39,7 +39,7 @@ steps:
|
||||
commands:
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov-report term-missing:skip-covered
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov=./konabot/ --cov-report term-missing:skip-covered
|
||||
- name: 发送构建结果到 ntfy
|
||||
image: parrazam/drone-ntfy
|
||||
when:
|
||||
|
||||
@ -61,6 +61,7 @@ COPY bot.py pyproject.toml .env.prod .env.test ./
|
||||
COPY assets ./assets
|
||||
COPY scripts ./scripts
|
||||
COPY konabot ./konabot
|
||||
COPY tests ./tests
|
||||
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
|
||||
@ -1,12 +1,14 @@
|
||||
import re
|
||||
|
||||
from nonebot import get_plugin_config, on_message
|
||||
from nonebot.rule import Rule
|
||||
from nonebot_plugin_alconna import Reference, Reply, UniMsg
|
||||
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.adapters.onebot.v11.event import GroupMessageEvent as OB11GroupEvent
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.common.permsys import require_permission
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
bilifetch_enabled_groups: list[int] = []
|
||||
@ -19,11 +21,7 @@ pattern = (
|
||||
)
|
||||
|
||||
|
||||
def _rule(msg: UniMsg, evt: Event) -> bool:
|
||||
if isinstance(evt, OB11GroupEvent):
|
||||
if evt.group_id not in config.bilifetch_enabled_groups:
|
||||
return False
|
||||
|
||||
def _rule(msg: UniMsg) -> bool:
|
||||
to_search = msg.exclude(Reply, Reference).dump(json=True)
|
||||
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
|
||||
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
|
||||
@ -31,11 +29,11 @@ def _rule(msg: UniMsg, evt: Event) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
matcher_fix = on_message(rule=_rule)
|
||||
matcher_fix = on_message(rule=Rule(_rule) & require_permission("bilifetch"))
|
||||
|
||||
|
||||
@matcher_fix.handle()
|
||||
async def _(event: Event):
|
||||
from nonebot_plugin_analysis_bilibili import handle_analysis
|
||||
|
||||
await handle_analysis(event)
|
||||
|
||||
|
||||
@ -70,7 +70,7 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
|
||||
await target.send_message(res)
|
||||
return
|
||||
|
||||
env = TextHandlerEnvironment(is_trusted=False)
|
||||
env = TextHandlerEnvironment(is_trusted=False, event=evt)
|
||||
results = await runner.run_pipeline(res, istream or None, env)
|
||||
|
||||
# 检查是否有错误
|
||||
|
||||
@ -7,11 +7,13 @@ from string import whitespace
|
||||
from typing import cast
|
||||
|
||||
from loguru import logger
|
||||
from nonebot.adapters import Event
|
||||
|
||||
|
||||
@dataclass
|
||||
class TextHandlerEnvironment:
|
||||
is_trusted: bool
|
||||
event: Event | None = None
|
||||
buffers: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
@ -287,7 +289,7 @@ class PipelineRunner:
|
||||
env: TextHandlerEnvironment | None = None,
|
||||
) -> list[TextHandleResult]:
|
||||
if env is None:
|
||||
env = TextHandlerEnvironment(is_trusted=False, buffers={})
|
||||
env = TextHandlerEnvironment(is_trusted=False, event=None, buffers={})
|
||||
|
||||
results: list[TextHandleResult] = []
|
||||
|
||||
|
||||
@ -1,36 +1,51 @@
|
||||
from typing import Any, cast
|
||||
from konabot.common.llm import get_llm
|
||||
from konabot.plugins.handle_text.base import TextHandler, TextHandlerEnvironment, TextHandleResult
|
||||
from konabot.common.permsys import perm_manager
|
||||
from konabot.plugins.handle_text.base import (
|
||||
TextHandler,
|
||||
TextHandlerEnvironment,
|
||||
TextHandleResult,
|
||||
)
|
||||
|
||||
|
||||
class THQwen(TextHandler):
|
||||
name = "qwen"
|
||||
|
||||
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult:
|
||||
async def handle(
|
||||
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
|
||||
) -> TextHandleResult:
|
||||
pm = perm_manager()
|
||||
if env.event is None or not pm.check_has_permission(env.event, "textfx.qwen"):
|
||||
return TextHandleResult(
|
||||
code=1,
|
||||
ostream="这里暂未开启 AI 功能",
|
||||
)
|
||||
|
||||
llm = get_llm()
|
||||
messages = []
|
||||
|
||||
if istream is not None:
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": istream
|
||||
})
|
||||
messages.append({"role": "user", "content": istream})
|
||||
if len(args) > 0:
|
||||
message = ' '.join(args)
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": message,
|
||||
})
|
||||
message = " ".join(args)
|
||||
messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": message,
|
||||
}
|
||||
)
|
||||
if len(messages) == 0:
|
||||
return TextHandleResult(
|
||||
code=1,
|
||||
ostream="使用方法:qwen <提示词>",
|
||||
)
|
||||
|
||||
messages = [{
|
||||
"role": "system",
|
||||
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答"
|
||||
}] + messages
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答",
|
||||
}
|
||||
] + messages
|
||||
result = await llm.chat(cast(Any, messages))
|
||||
content = result.content
|
||||
if content is None:
|
||||
|
||||
@ -2,22 +2,39 @@ import datetime
|
||||
from math import ceil
|
||||
|
||||
from nonebot import get_plugin_config
|
||||
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query,
|
||||
Subcommand, SubcommandResult, UniMessage,
|
||||
on_alconna)
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
Args,
|
||||
Image,
|
||||
Option,
|
||||
Query,
|
||||
Subcommand,
|
||||
SubcommandResult,
|
||||
UniMessage,
|
||||
on_alconna,
|
||||
)
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.nb.extract_image import download_image_bytes
|
||||
from konabot.common.permsys import DepPermManager, require_permission
|
||||
from konabot.common.username import get_username
|
||||
from konabot.plugins.kona_ph.core.image import get_image_manager
|
||||
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
|
||||
get_puzzle_info_message,
|
||||
get_submission_message)
|
||||
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
|
||||
get_today_date,
|
||||
puzzle_manager)
|
||||
from konabot.plugins.kona_ph.core.message import (
|
||||
get_puzzle_description,
|
||||
get_puzzle_hint_list,
|
||||
get_puzzle_info_message,
|
||||
get_submission_message,
|
||||
)
|
||||
from konabot.plugins.kona_ph.core.storage import (
|
||||
Puzzle,
|
||||
PuzzleHint,
|
||||
PuzzleManager,
|
||||
get_today_date,
|
||||
puzzle_manager,
|
||||
)
|
||||
from konabot.plugins.poster.service import broadcast
|
||||
|
||||
PUZZLE_PAGE_SIZE = 10
|
||||
@ -32,19 +49,17 @@ class PuzzleConfig(BaseModel):
|
||||
config = get_plugin_config(PuzzleConfig)
|
||||
|
||||
|
||||
def is_puzzle_manager(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
|
||||
|
||||
|
||||
def is_puzzle_admin(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_admin
|
||||
|
||||
|
||||
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
|
||||
async def check_puzzle(
|
||||
manager: PuzzleManager,
|
||||
perm: DepPermManager,
|
||||
raw_id: str,
|
||||
event: Event,
|
||||
target: DepLongTaskTarget,
|
||||
) -> Puzzle:
|
||||
if raw_id not in manager.puzzle_data:
|
||||
raise BotExceptionMessage("没有这个谜题")
|
||||
puzzle = manager.puzzle_data[raw_id]
|
||||
if is_puzzle_admin(target):
|
||||
if await perm.check_has_permission(event, "konaph.admin"):
|
||||
return puzzle
|
||||
if target.target_id != puzzle.author_id:
|
||||
raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
|
||||
@ -60,7 +75,9 @@ def create_admin_commands():
|
||||
Subcommand("unready", Args["raw_id", str], dest="unready"),
|
||||
Subcommand("info", Args["raw_id", str], dest="info"),
|
||||
Subcommand("my", Args["page?", int], dest="my"),
|
||||
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"),
|
||||
Subcommand(
|
||||
"all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"
|
||||
),
|
||||
Subcommand("pin", Args["raw_id?", str], dest="pin"),
|
||||
Subcommand("unpin", dest="unpin"),
|
||||
Subcommand(
|
||||
@ -115,11 +132,11 @@ def create_admin_commands():
|
||||
dest="hint",
|
||||
),
|
||||
),
|
||||
rule=is_puzzle_manager,
|
||||
rule=require_permission("konaph.manager"),
|
||||
)
|
||||
|
||||
@cmd_admin.assign("$main")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event):
|
||||
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
|
||||
msg = msg.text("konaph create - 创建一个新的谜题\n")
|
||||
msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
|
||||
@ -132,7 +149,7 @@ def create_admin_commands():
|
||||
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
|
||||
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
|
||||
|
||||
if is_puzzle_admin(target):
|
||||
if await pm.check_has_permission(event, "konaph.admin"):
|
||||
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
|
||||
msg = msg.text("konaph pin - 查看当前置顶谜题\n")
|
||||
msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
|
||||
@ -145,48 +162,54 @@ def create_admin_commands():
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
puzzle = manager.admin_create_puzzle(target.target_id)
|
||||
await target.send_message(UniMessage.text(
|
||||
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
||||
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
||||
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
||||
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
||||
))
|
||||
await target.send_message(
|
||||
UniMessage.text(
|
||||
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
|
||||
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
|
||||
f"- 输入 `konaph my` 查看你创建的谜题\n"
|
||||
f"- 输入 `konaph modify` 查看更改谜题的方法"
|
||||
)
|
||||
)
|
||||
|
||||
@cmd_admin.assign("ready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async def _(
|
||||
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"题目早就准备好啦!"
|
||||
))
|
||||
return await target.send_message(UniMessage.text("题目早就准备好啦!"))
|
||||
p.ready = True
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经准备就绪!"
|
||||
))
|
||||
await target.send_message(
|
||||
UniMessage.text(f"谜题「{p.title}」已经准备就绪!")
|
||||
)
|
||||
|
||||
@cmd_admin.assign("unready")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async def _(
|
||||
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if not p.ready:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经是未取消状态了!"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!")
|
||||
)
|
||||
if manager.is_puzzle_published(p.raw_id):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"已发布的谜题不能取消准备状态!"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text("已发布的谜题不能取消准备状态!")
|
||||
)
|
||||
|
||||
p.ready = False
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经取消准备!"
|
||||
))
|
||||
await target.send_message(
|
||||
UniMessage.text(f"谜题「{p.title}」已经取消准备!")
|
||||
)
|
||||
|
||||
@cmd_admin.assign("info")
|
||||
async def _(raw_id: str, target: DepLongTaskTarget):
|
||||
async def _(
|
||||
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
await target.send_message(get_puzzle_info_message(manager, p))
|
||||
|
||||
@cmd_admin.assign("my")
|
||||
@ -194,15 +217,15 @@ def create_admin_commands():
|
||||
async with puzzle_manager() as manager:
|
||||
puzzles = manager.get_puzzles_of_user(target.target_id)
|
||||
if len(puzzles) == 0:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有谜题哦,使用 `konaph create` 创建一个吧!"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!")
|
||||
)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||||
)
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||||
message = UniMessage.text("==== 我的谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
@ -220,11 +243,15 @@ def create_admin_commands():
|
||||
await target.send_message(message)
|
||||
|
||||
@cmd_admin.assign("all")
|
||||
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
ready: Query[bool] = Query("all.ready"),
|
||||
page: int = 1,
|
||||
):
|
||||
if not perm.check_has_permission(event, "konaph.admin"):
|
||||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||
async with puzzle_manager() as manager:
|
||||
puzzles = [*manager.puzzle_data.values()]
|
||||
if ready.available:
|
||||
@ -232,10 +259,10 @@ def create_admin_commands():
|
||||
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
|
||||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||||
if page <= 0 or page > count_pages:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"页数只有 1 ~ {count_pages} 啦!"
|
||||
))
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
|
||||
)
|
||||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
|
||||
message = UniMessage.text("==== 所有谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
@ -253,32 +280,30 @@ def create_admin_commands():
|
||||
await target.send_message(message)
|
||||
|
||||
@cmd_admin.assign("pin")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str = ""):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async def _(
|
||||
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = ""
|
||||
):
|
||||
if not perm.check_has_permission(event, "konaph.admin"):
|
||||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||
|
||||
async with puzzle_manager() as manager:
|
||||
if raw_id == "":
|
||||
if manager.puzzle_pinned:
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}")
|
||||
)
|
||||
return await target.send_message("没有置顶谜题")
|
||||
if raw_id not in manager.unpublished_puzzles:
|
||||
return await target.send_message(UniMessage.text(
|
||||
"这个谜题已经发布了,或者还没准备好,或者不存在"
|
||||
))
|
||||
return await target.send_message(
|
||||
UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在")
|
||||
)
|
||||
manager.admin_pin_puzzle(raw_id)
|
||||
return await target.send_message(f"已置顶谜题 {raw_id}")
|
||||
|
||||
@cmd_admin.assign("unpin")
|
||||
async def _(target: DepLongTaskTarget):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager):
|
||||
if not perm.check_has_permission(event, "konaph.admin"):
|
||||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||
async with puzzle_manager() as manager:
|
||||
manager.admin_pin_puzzle("")
|
||||
return await target.send_message("已取消所有置顶")
|
||||
@ -286,6 +311,8 @@ def create_admin_commands():
|
||||
@cmd_admin.assign("modify")
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
raw_id: str = "",
|
||||
title: str | None = None,
|
||||
description: str | None = None,
|
||||
@ -306,7 +333,7 @@ def create_admin_commands():
|
||||
image_manager = get_image_manager()
|
||||
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if title is not None:
|
||||
p.title = title
|
||||
if description is not None:
|
||||
@ -329,11 +356,14 @@ def create_admin_commands():
|
||||
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
|
||||
|
||||
@cmd_admin.assign("publish")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str | None = None):
|
||||
if not is_puzzle_admin(target):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"你没有权限使用该指令"
|
||||
))
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
raw_id: str | None = None,
|
||||
):
|
||||
if not perm.check_has_permission(event, "konaph.admin"):
|
||||
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
|
||||
today = get_today_date()
|
||||
async with puzzle_manager() as manager:
|
||||
if today in manager.daily_puzzle_of_date:
|
||||
@ -348,46 +378,64 @@ def create_admin_commands():
|
||||
return await target.send_message("Ok!")
|
||||
|
||||
@cmd_admin.assign("preview")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str):
|
||||
async def _(
|
||||
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
return await target.send_message(get_puzzle_description(p))
|
||||
|
||||
@cmd_admin.assign("get-submits")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str):
|
||||
async def _(
|
||||
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
puzzle = manager.puzzle_data.get(raw_id)
|
||||
if puzzle is None:
|
||||
return await target.send_message("没有这个谜题")
|
||||
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id:
|
||||
if (
|
||||
not perm.check_has_permission(event, "konaph.admin")
|
||||
and target.target_id != puzzle.author_id
|
||||
):
|
||||
return await target.send_message("你没有权限预览这个谜题")
|
||||
|
||||
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
|
||||
submits = manager.submissions.get(raw_id, {})
|
||||
for uid, ls in submits.items():
|
||||
s = ', '.join((i.flag for i in ls))
|
||||
s = ", ".join((i.flag for i in ls))
|
||||
msg = msg.text(f"- {get_username(uid)}:{s}\n")
|
||||
return await target.send_message(msg)
|
||||
|
||||
@cmd_admin.assign("test")
|
||||
async def _(target: DepLongTaskTarget, raw_id: str, submission: str):
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
submission: str,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
):
|
||||
"""
|
||||
测试一道谜题的回答,并给出结果
|
||||
"""
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
result = p.check_submission(submission)
|
||||
msg = get_submission_message(p, result)
|
||||
return await target.send_message("[测试提交] " + msg)
|
||||
|
||||
@cmd_admin.assign("subcommands.hint")
|
||||
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")):
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
subcommands: Query[SubcommandResult] = Query("subcommands.hint"),
|
||||
):
|
||||
if len(subcommands.result.subcommands) > 0:
|
||||
return
|
||||
return await target.send_message(
|
||||
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
|
||||
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
|
||||
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n")
|
||||
.text(
|
||||
"- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n"
|
||||
)
|
||||
.text("- konaph hint modify <id> <hint_id>\n")
|
||||
.text(" - --pattern <pattern>\n - 更改匹配规则\n")
|
||||
.text(" - --message <message>\n - 更改提示文本\n")
|
||||
@ -402,9 +450,11 @@ def create_admin_commands():
|
||||
raw_id: str,
|
||||
pattern: str,
|
||||
message: str,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
p.hints[p.hint_id_max + 1] = PuzzleHint(
|
||||
pattern=pattern,
|
||||
message=message,
|
||||
@ -416,9 +466,11 @@ def create_admin_commands():
|
||||
async def _(
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
await target.send_message(get_puzzle_hint_list(p))
|
||||
|
||||
@cmd_admin.assign("subcommands.hint.modify")
|
||||
@ -426,12 +478,14 @@ def create_admin_commands():
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
hint_id: int,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
pattern: str | None = None,
|
||||
message: str | None = None,
|
||||
is_checkpoint: bool | None = None,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if hint_id not in p.hints:
|
||||
raise BotExceptionMessage(
|
||||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||||
@ -450,9 +504,11 @@ def create_admin_commands():
|
||||
target: DepLongTaskTarget,
|
||||
raw_id: str,
|
||||
hint_id: int,
|
||||
event: Event,
|
||||
perm: DepPermManager,
|
||||
):
|
||||
async with puzzle_manager() as manager:
|
||||
p = check_puzzle(manager, target, raw_id)
|
||||
p = await check_puzzle(manager, perm, raw_id, event, target)
|
||||
if hint_id not in p.hints:
|
||||
raise BotExceptionMessage(
|
||||
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
|
||||
@ -460,5 +516,4 @@ def create_admin_commands():
|
||||
del p.hints[hint_id]
|
||||
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
|
||||
|
||||
|
||||
return cmd_admin
|
||||
|
||||
@ -1,57 +0,0 @@
|
||||
import asyncio
|
||||
import mcstatus
|
||||
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from konabot.common.nb.is_admin import is_admin
|
||||
from mcstatus.responses import JavaStatusResponse
|
||||
|
||||
|
||||
cmd = on_command("宾几人", aliases=set(("宾人数", "mcbingo")), rule=is_admin)
|
||||
|
||||
|
||||
def parse_status(motd: str) -> str:
|
||||
if "[PRE-GAME]" in motd:
|
||||
return "[✨ 空闲]"
|
||||
if "[IN-GAME]" in motd:
|
||||
return "[🕜 游戏中]"
|
||||
if "[POST-GAME]" in motd:
|
||||
return "[🕜 游戏中]"
|
||||
return "[✨ 开放]"
|
||||
|
||||
|
||||
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
|
||||
if isinstance(status, JavaStatusResponse):
|
||||
motd = status.motd.to_plain()
|
||||
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
|
||||
st = parse_status(motd)
|
||||
players_sample = status.players.sample or []
|
||||
players_sample_suffix = ""
|
||||
if len(players_sample) > 0:
|
||||
player_list = [s.name for s in players_sample]
|
||||
players_sample_suffix = " (" + ", ".join(player_list) + ")"
|
||||
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
|
||||
else:
|
||||
return f"{name}: 好像没开"
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(evt: Event):
|
||||
servers = (
|
||||
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
|
||||
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
|
||||
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
|
||||
)
|
||||
|
||||
responses = await asyncio.gather(
|
||||
*map(lambda s: s[0].async_status(), servers),
|
||||
return_exceptions=True,
|
||||
)
|
||||
messages = "\n".join((
|
||||
dump_server_status(n, r)
|
||||
for n, r in zip(map(lambda s: s[1], servers), responses)
|
||||
))
|
||||
|
||||
await UniMessage.text(messages).finish(evt, at_sender=False)
|
||||
|
||||
131
konabot/plugins/minecraft_servers/__init__.py
Normal file
131
konabot/plugins/minecraft_servers/__init__.py
Normal file
@ -0,0 +1,131 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
from typing import Literal
|
||||
import mcstatus
|
||||
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters import Event
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||
from mcstatus.responses import JavaStatusResponse
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
from konabot.common.permsys import DepPermManager, require_permission
|
||||
from konabot.plugins.minecraft_servers.simpfun_server import SimpfunServer
|
||||
|
||||
|
||||
cmd = on_command(
|
||||
"宾几人",
|
||||
aliases=set(("宾人数", "mcbingo")),
|
||||
rule=require_permission("minecraft.bingo.check"),
|
||||
)
|
||||
|
||||
|
||||
def parse_status(motd: str) -> str:
|
||||
if "[PRE-GAME]" in motd:
|
||||
return "[✨ 空闲]"
|
||||
if "[IN-GAME]" in motd:
|
||||
return "[🕜 游戏中]"
|
||||
if "[POST-GAME]" in motd:
|
||||
return "[🕜 游戏中]"
|
||||
return "[✨ 开放]"
|
||||
|
||||
|
||||
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
|
||||
if isinstance(status, JavaStatusResponse):
|
||||
motd = status.motd.to_plain()
|
||||
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
|
||||
st = parse_status(motd)
|
||||
players_sample = status.players.sample or []
|
||||
players_sample_suffix = ""
|
||||
if len(players_sample) > 0:
|
||||
player_list = [s.name for s in players_sample]
|
||||
players_sample_suffix = " (" + ", ".join(player_list) + ")"
|
||||
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
|
||||
else:
|
||||
return f"{name}: 好像没开"
|
||||
|
||||
|
||||
@cmd.handle()
|
||||
async def _(evt: Event, pm: DepPermManager):
|
||||
servers = (
|
||||
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
|
||||
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
|
||||
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
|
||||
)
|
||||
|
||||
responses = await asyncio.gather(
|
||||
*map(lambda s: s[0].async_status(), servers),
|
||||
return_exceptions=True,
|
||||
)
|
||||
messages = "\n".join(
|
||||
(
|
||||
dump_server_status(n, r)
|
||||
for n, r in zip(map(lambda s: s[1], servers), responses)
|
||||
)
|
||||
)
|
||||
|
||||
if await pm.check_has_permission(evt, "minecraft.bingo.manipulate"):
|
||||
messages += "\n\n---\n\n你可以使用 bingoman start 开启小帕的 bingo 服,用 bingoman stop 关闭小帕的 bingo 服"
|
||||
|
||||
await UniMessage.text(messages).finish(evt, at_sender=False)
|
||||
|
||||
|
||||
cmd_bingo_manipulate = on_alconna(
|
||||
Alconna("bingoman", Args["action", str]),
|
||||
aliases=("宾服务器", "bingo服"),
|
||||
rule=require_permission("minecraft.bingo.manipulate"),
|
||||
)
|
||||
|
||||
actions: dict[str, Literal["start", "stop", "restart", "kill"]] = {
|
||||
"up": "start",
|
||||
"down": "stop",
|
||||
"start": "start",
|
||||
"stop": "stop",
|
||||
"开机": "start",
|
||||
"关机": "stop",
|
||||
"restart": "restart",
|
||||
"kill": "kill",
|
||||
"重启": "restart",
|
||||
}
|
||||
|
||||
|
||||
@cmd_bingo_manipulate.handle()
|
||||
async def _(action: str, event: Event):
|
||||
server = SimpfunServer.new() # 使用默认配置管理服务器
|
||||
a = actions.get(action.lower().strip())
|
||||
if a is None:
|
||||
await UniMessage.text(f"操作 {action} 不存在").send(event, at_sender=True)
|
||||
return
|
||||
resp = await server.power(a)
|
||||
if resp.code == 200:
|
||||
await UniMessage.text("好了").send(event, at_sender=True)
|
||||
else:
|
||||
await UniMessage.text(f"不好:{resp}").send(event, at_sender=True)
|
||||
|
||||
|
||||
@scheduler.scheduled_job("cron", hour="4,23")
|
||||
async def _():
|
||||
server = SimpfunServer.new()
|
||||
today = datetime.datetime.now()
|
||||
|
||||
# 获取服务器当前状态,重试多次以保证不会误判服务器未开启
|
||||
server_up = False
|
||||
server_players = 0
|
||||
for _ in range(3):
|
||||
mcs = mcstatus.JavaServer("play.simpfun.cn", 11495)
|
||||
try:
|
||||
resp = await mcs.async_status()
|
||||
server_up = True
|
||||
server_players = resp.players.online
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if today.weekday() == 5 and today.hour < 12:
|
||||
# 每周六开机一天,保证可以让服务器不被自动销毁
|
||||
if not server_up:
|
||||
await server.power("start")
|
||||
else:
|
||||
# 每用一个自然日都会计费,所以要赶在这一天结束之前关服
|
||||
# 平时如果没人,也自动关上
|
||||
if server_up and server_players == 0:
|
||||
await server.power("stop")
|
||||
90
konabot/plugins/minecraft_servers/simpfun_server.py
Normal file
90
konabot/plugins/minecraft_servers/simpfun_server.py
Normal file
@ -0,0 +1,90 @@
|
||||
from dataclasses import dataclass
|
||||
import datetime
|
||||
from typing import Literal
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class SimpfunServerConfig(BaseModel):
|
||||
plugin_simpfun_api_key: str = ""
|
||||
plugin_simpfun_base_url: str = "https://api.simpfun.cn"
|
||||
plugin_simpfun_instance_id: int = 0
|
||||
|
||||
|
||||
def get_config():
|
||||
from nonebot import get_plugin_config
|
||||
|
||||
return get_plugin_config(SimpfunServerConfig)
|
||||
|
||||
|
||||
class PowerManageResult(BaseModel):
|
||||
code: int
|
||||
status: bool
|
||||
msg: str
|
||||
|
||||
|
||||
class SimpfunServerDetailUtilization(BaseModel):
|
||||
memory_bytes: int
|
||||
cpu_absolute: float
|
||||
disk_bytes: int
|
||||
network_rx_bytes: int
|
||||
network_tx_bytes: int
|
||||
uptime: float
|
||||
disk_last_check_time: datetime.datetime
|
||||
|
||||
|
||||
class SimpfunServerDetailData(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
is_pro: bool
|
||||
|
||||
status: str
|
||||
"运行中的话,是 running"
|
||||
|
||||
is_suspended: bool
|
||||
utilization: SimpfunServerDetailUtilization
|
||||
|
||||
|
||||
class SimpfunServerDetailResp(BaseModel):
|
||||
code: int
|
||||
data: SimpfunServerDetailData
|
||||
|
||||
|
||||
@dataclass
|
||||
class SimpfunServer:
|
||||
instance_id: int
|
||||
api_key: str
|
||||
base_url: str
|
||||
|
||||
async def power(
|
||||
self, action: Literal["start", "stop", "restart", "kill"]
|
||||
) -> PowerManageResult:
|
||||
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
|
||||
|
||||
async with aiohttp.ClientSession(
|
||||
headers={"Authorization": self.api_key}
|
||||
) as session:
|
||||
async with session.get(url, params={"action": action}) as resp:
|
||||
resp.raise_for_status()
|
||||
return PowerManageResult.model_validate_json(await resp.read())
|
||||
|
||||
async def detail(self) -> SimpfunServerDetailResp:
|
||||
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
|
||||
|
||||
async with aiohttp.ClientSession(
|
||||
headers={"Authorization": self.api_key}
|
||||
) as session:
|
||||
async with session.get(url) as resp:
|
||||
resp.raise_for_status()
|
||||
return SimpfunServerDetailResp.model_validate_json(await resp.read())
|
||||
|
||||
@staticmethod
|
||||
def new(config: SimpfunServerConfig | None = None):
|
||||
if config is None:
|
||||
config = get_config()
|
||||
return SimpfunServer(
|
||||
instance_id=config.plugin_simpfun_instance_id,
|
||||
api_key=config.plugin_simpfun_api_key,
|
||||
base_url=config.plugin_simpfun_base_url,
|
||||
)
|
||||
Reference in New Issue
Block a user