Compare commits

..

9 Commits

Author SHA1 Message Date
392c699b33 移动 poster 模块到 common
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-09 14:40:27 +08:00
72e21cd9aa 添加多字符喵对一些符号的响应 2026-03-09 13:46:56 +08:00
f3389ff2b9 添加服务器管理相关,以及 cronjob
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 03:34:14 +08:00
e59d3c2e4b 哎哟喂这个文件怎么没交
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-08 00:40:11 +08:00
31d19b7ec0 我没辙了直接把测试打包进去吧
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-07 18:41:59 +08:00
c2f677911d 添加一些权限目标
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:36:51 +08:00
f5b81319f8 konaph 接入权限系统
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-07 18:15:28 +08:00
870e2383d8 为 Drone 提供单元测试目录 2026-03-07 18:15:16 +08:00
7e8fa45f36 Merge pull request '权限系统' (#55) from feature/permsystem into master
Some checks failed
continuous-integration/drone/push Build is failing
Reviewed-on: #55
2026-03-07 17:55:27 +08:00
20 changed files with 572 additions and 296 deletions

View File

@ -39,7 +39,7 @@ steps:
commands: commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov-report term-missing:skip-covered - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov=./konabot/ --cov-report term-missing:skip-covered
- name: 发送构建结果到 ntfy - name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy image: parrazam/drone-ntfy
when: when:

View File

@ -61,6 +61,7 @@ COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets COPY assets ./assets
COPY scripts ./scripts COPY scripts ./scripts
COPY konabot ./konabot COPY konabot ./konabot
COPY tests ./tests
ENV PYTHONPATH=/app ENV PYTHONPATH=/app

View File

@ -0,0 +1,11 @@
"""
Subscribe 模块,用于向一些订阅的频道广播消息
"""
from .service import broadcast as broadcast
from .service import dep_poster_service as dep_poster_service
from .service import DepPosterService as DepPosterService
from .service import PosterService as PosterService
from .subscribe_info import PosterInfo as PosterInfo
from .subscribe_info import POSTER_INFO_DATA as POSTER_INFO_DATA
from .subscribe_info import register_poster_info as register_poster_info

View File

@ -6,7 +6,8 @@ from pydantic import BaseModel, ValidationError
from konabot.common.longtask import LongTaskTarget from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult from konabot.common.pager import PagerQuery, PagerResult
from konabot.common.path import DATA_PATH from konabot.common.path import DATA_PATH
from konabot.plugins.poster.repository import IPosterRepo
from .repository import IPosterRepo
class ChannelData(BaseModel): class ChannelData(BaseModel):
@ -18,9 +19,9 @@ class PosterData(BaseModel):
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool: def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
if (target1.is_private_chat and not target2.is_private_chat): if target1.is_private_chat and not target2.is_private_chat:
return False return False
if (target2.is_private_chat and not target1.is_private_chat): if target2.is_private_chat and not target1.is_private_chat:
return False return False
if target1.platform != target2.platform: if target1.platform != target2.platform:
return False return False
@ -58,7 +59,9 @@ class LocalPosterRepo(IPosterRepo):
len1 = len(self.data.channels[channel].targets) len1 = len(self.data.channels[channel].targets)
return len0 != len1 return len0 != len1
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]: async def get_subscribed_channels(
self, target: LongTaskTarget, pager: PagerQuery
) -> PagerResult[str]:
channels: list[str] = [] channels: list[str] = []
for channel_id, channel in self.data.channels.items(): for channel_id, channel in self.data.channels.items():
for t in channel.targets: for t in channel.targets:
@ -95,7 +98,9 @@ async def local_poster_data():
data = PosterData() data = PosterData()
else: else:
try: try:
data = PosterData.model_validate_json(LOCAL_POSTER_DATA_PATH.read_text()) data = PosterData.model_validate_json(
LOCAL_POSTER_DATA_PATH.read_text()
)
except ValidationError: except ValidationError:
data = PosterData() data = PosterData()
yield data yield data
@ -109,4 +114,3 @@ async def local_poster():
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)] DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]

View File

@ -4,9 +4,10 @@ from nonebot.params import Depends
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import LongTaskTarget from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult from konabot.common.pager import PagerQuery, PagerResult
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.repo_local_data import local_poster from .subscribe_info import POSTER_INFO_DATA
from konabot.plugins.poster.repository import IPosterRepo from .repo_local_data import local_poster
from .repository import IPosterRepo
class PosterService: class PosterService:
@ -27,7 +28,9 @@ class PosterService:
channel = self.parse_channel_id(channel) channel = self.parse_channel_id(channel)
return await self.repo.remove_channel_target(channel, target) return await self.repo.remove_channel_target(channel, target)
async def broadcast(self, channel: str, message: UniMessage[Any] | str) -> list[LongTaskTarget]: async def broadcast(
self, channel: str, message: UniMessage[Any] | str
) -> list[LongTaskTarget]:
channel = self.parse_channel_id(channel) channel = self.parse_channel_id(channel)
targets = await self.repo.get_channel_targets(channel) targets = await self.repo.get_channel_targets(channel)
for target in targets: for target in targets:
@ -35,7 +38,9 @@ class PosterService:
await target.send_message(message, at=False) await target.send_message(message, at=False)
return targets return targets
async def get_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]: async def get_channels(
self, target: LongTaskTarget, pager: PagerQuery
) -> PagerResult[str]:
return await self.repo.get_subscribed_channels(target, pager) return await self.repo.get_subscribed_channels(target, pager)
async def fix_data(self): async def fix_data(self):
@ -56,4 +61,3 @@ async def broadcast(channel: str, message: UniMessage[Any] | str):
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)] DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]

View File

@ -4,7 +4,7 @@ from dataclasses import dataclass, field
@dataclass @dataclass
class PosterInfo: class PosterInfo:
aliases: set[str] = field(default_factory=set) aliases: set[str] = field(default_factory=set)
description: str = field(default='') description: str = field(default="")
POSTER_INFO_DATA: dict[str, PosterInfo] = {} POSTER_INFO_DATA: dict[str, PosterInfo] = {}
@ -12,4 +12,3 @@ POSTER_INFO_DATA: dict[str, PosterInfo] = {}
def register_poster_info(channel: str, info: PosterInfo): def register_poster_info(channel: str, info: PosterInfo):
POSTER_INFO_DATA[channel] = info POSTER_INFO_DATA[channel] = info

View File

@ -1,12 +1,14 @@
import re import re
from nonebot import get_plugin_config, on_message from nonebot import get_plugin_config, on_message
from nonebot.rule import Rule
from nonebot_plugin_alconna import Reference, Reply, UniMsg from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event from nonebot.adapters import Event
from nonebot.adapters.onebot.v11.event import GroupMessageEvent as OB11GroupEvent
from pydantic import BaseModel from pydantic import BaseModel
from konabot.common.permsys import require_permission
class Config(BaseModel): class Config(BaseModel):
bilifetch_enabled_groups: list[int] = [] bilifetch_enabled_groups: list[int] = []
@ -19,11 +21,7 @@ pattern = (
) )
def _rule(msg: UniMsg, evt: Event) -> bool: def _rule(msg: UniMsg) -> bool:
if isinstance(evt, OB11GroupEvent):
if evt.group_id not in config.bilifetch_enabled_groups:
return False
to_search = msg.exclude(Reply, Reference).dump(json=True) to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text() to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2): if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
@ -31,11 +29,11 @@ def _rule(msg: UniMsg, evt: Event) -> bool:
return True return True
matcher_fix = on_message(rule=_rule) matcher_fix = on_message(rule=Rule(_rule) & require_permission("bilifetch"))
@matcher_fix.handle() @matcher_fix.handle()
async def _(event: Event): async def _(event: Event):
from nonebot_plugin_analysis_bilibili import handle_analysis from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event) await handle_analysis(event)

View File

@ -70,7 +70,7 @@ async def _(msg: UniMsg, evt: Event, bot: Bot, target: DepLongTaskTarget):
await target.send_message(res) await target.send_message(res)
return return
env = TextHandlerEnvironment(is_trusted=False) env = TextHandlerEnvironment(is_trusted=False, event=evt)
results = await runner.run_pipeline(res, istream or None, env) results = await runner.run_pipeline(res, istream or None, env)
# 检查是否有错误 # 检查是否有错误

View File

@ -7,11 +7,13 @@ from string import whitespace
from typing import cast from typing import cast
from loguru import logger from loguru import logger
from nonebot.adapters import Event
@dataclass @dataclass
class TextHandlerEnvironment: class TextHandlerEnvironment:
is_trusted: bool is_trusted: bool
event: Event | None = None
buffers: dict[str, str] = field(default_factory=dict) buffers: dict[str, str] = field(default_factory=dict)
@ -287,7 +289,7 @@ class PipelineRunner:
env: TextHandlerEnvironment | None = None, env: TextHandlerEnvironment | None = None,
) -> list[TextHandleResult]: ) -> list[TextHandleResult]:
if env is None: if env is None:
env = TextHandlerEnvironment(is_trusted=False, buffers={}) env = TextHandlerEnvironment(is_trusted=False, event=None, buffers={})
results: list[TextHandleResult] = [] results: list[TextHandleResult] = []

View File

@ -1,36 +1,51 @@
from typing import Any, cast from typing import Any, cast
from konabot.common.llm import get_llm from konabot.common.llm import get_llm
from konabot.plugins.handle_text.base import TextHandler, TextHandlerEnvironment, TextHandleResult from konabot.common.permsys import perm_manager
from konabot.plugins.handle_text.base import (
TextHandler,
TextHandlerEnvironment,
TextHandleResult,
)
class THQwen(TextHandler): class THQwen(TextHandler):
name = "qwen" name = "qwen"
async def handle(self, env: TextHandlerEnvironment, istream: str | None, args: list[str]) -> TextHandleResult: async def handle(
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
pm = perm_manager()
if env.event is None or not pm.check_has_permission(env.event, "textfx.qwen"):
return TextHandleResult(
code=1,
ostream="这里暂未开启 AI 功能",
)
llm = get_llm() llm = get_llm()
messages = [] messages = []
if istream is not None: if istream is not None:
messages.append({ messages.append({"role": "user", "content": istream})
"role": "user",
"content": istream
})
if len(args) > 0: if len(args) > 0:
message = ' '.join(args) message = " ".join(args)
messages.append({ messages.append(
"role": "user", {
"content": message, "role": "user",
}) "content": message,
}
)
if len(messages) == 0: if len(messages) == 0:
return TextHandleResult( return TextHandleResult(
code=1, code=1,
ostream="使用方法qwen <提示词>", ostream="使用方法qwen <提示词>",
) )
messages = [{ messages = [
"role": "system", {
"content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答" "role": "system",
}] + messages "content": "除非用户要求,请尽可能短点回答。另外,当前环境不支持 Markdown 语法,如果可以,请使用纯文本回答",
}
] + messages
result = await llm.chat(cast(Any, messages)) result = await llm.chat(cast(Any, messages))
content = result.content content = result.content
if content is None: if content is None:

View File

@ -6,29 +6,34 @@ from loguru import logger
from nonebot import on_message from nonebot import on_message
import nonebot import nonebot
from nonebot.rule import to_me from nonebot.rule import to_me
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg, from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
on_alconna)
from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_apscheduler import scheduler
from konabot.common import username from konabot.common import username
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery from konabot.common.pager import PagerQuery
from konabot.plugins.kona_ph.core.message import (get_daily_report, from konabot.plugins.kona_ph.core.message import (
get_daily_report_v2, get_daily_report,
get_puzzle_description, get_daily_report_v2,
get_submission_message) get_puzzle_description,
get_submission_message,
)
from konabot.plugins.kona_ph.core.storage import get_today_date from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, from konabot.plugins.kona_ph.manager import (
create_admin_commands, PUZZLE_PAGE_SIZE,
puzzle_manager) create_admin_commands,
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info puzzle_manager,
from konabot.plugins.poster.service import broadcast )
from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
create_admin_commands() create_admin_commands()
register_poster_info("每日谜题", info=PosterInfo( register_poster_info(
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"}, "每日谜题",
description="此方 BOT 每日谜题推送", info=PosterInfo(
)) aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
description="此方 BOT 每日谜题推送",
),
)
cmd_submit = on_message(rule=to_me()) cmd_submit = on_message(rule=to_me())
@ -44,16 +49,22 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
if isinstance(result, str): if isinstance(result, str):
await target.send_message(result) await target.send_message(result)
else: else:
await target.send_message(get_submission_message( await target.send_message(
daily_puzzle_info=result.info, get_submission_message(
submission=result.submission, daily_puzzle_info=result.info,
puzzle=result.puzzle, submission=result.submission,
)) puzzle=result.puzzle,
)
)
cmd_query = on_alconna(Alconna( cmd_query = on_alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)" Alconna(
), rule=to_me()) r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
),
rule=to_me(),
)
@cmd_query.handle() @cmd_query.handle()
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget):
@ -64,9 +75,8 @@ async def _(target: DepLongTaskTarget):
await target.send_message(get_puzzle_description(p)) await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna( cmd_query_submission = on_alconna(Alconna("今日答题情况"), rule=to_me())
"今日答题情况"
), rule=to_me())
@cmd_query_submission.handle() @cmd_query_submission.handle()
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget):
@ -77,11 +87,15 @@ async def _(target: DepLongTaskTarget):
await target.send_message(get_daily_report_v2(manager, gid)) await target.send_message(get_daily_report_v2(manager, gid))
cmd_history = on_alconna(Alconna( cmd_history = on_alconna(
"re:历史(题目|谜题)", Alconna(
Args["page?", int], "re:历史(题目|谜题)",
Args["index_id?", str], Args["page?", int],
), rule=to_me()) Args["index_id?", str],
),
rule=to_me(),
)
@cmd_history.handle() @cmd_history.handle()
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@ -105,10 +119,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True) puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages: if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text( return await target.send_message(
f"页数只有 1 ~ {count_pages} 啦!" UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)) )
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
for p, d in puzzles: for p, d in puzzles:
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]] info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
msg = msg.text( msg = msg.text(
@ -120,22 +134,26 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
await target.send_message(msg) await target.send_message(msg)
cmd_leadboard = on_alconna(Alconna( cmd_leadboard = on_alconna(
"re:此方(解谜|谜题)排行榜", Alconna(
Args["page?", int], "re:此方(解谜|谜题)排行榜",
)) Args["page?", int],
)
)
@cmd_leadboard.handle() @cmd_leadboard.handle()
async def _(target: DepLongTaskTarget, page: int = 1): async def _(target: DepLongTaskTarget, page: int = 1):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
result = manager.get_leadboard(PagerQuery(page, 10)) result = manager.get_leadboard(PagerQuery(page, 10))
await target.send_message(result.to_unimessage( await target.send_message(
title="此方解谜排行榜", result.to_unimessage(
formatter=lambda data: ( title="此方解谜排行榜",
f"{data[1]} 已完成 | " formatter=lambda data: (
f"{username.get_username(data[0])}" f"{data[1]} 已完成 | {username.get_username(data[0])}"
),
) )
)) )
@scheduler.scheduled_job("cron", hour="8") @scheduler.scheduled_job("cron", hour="8")
@ -155,4 +173,3 @@ async def _():
driver = nonebot.get_driver() driver = nonebot.get_driver()

View File

@ -1,50 +1,54 @@
import datetime import datetime
from math import ceil from math import ceil
from nonebot import get_plugin_config from nonebot.adapters import Event
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, Query, from nonebot_plugin_alconna import (
Subcommand, SubcommandResult, UniMessage, Alconna,
on_alconna) Args,
from pydantic import BaseModel Image,
Option,
Query,
Subcommand,
SubcommandResult,
UniMessage,
on_alconna,
)
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.permsys import DepPermManager, require_permission
from konabot.common.username import get_username from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list, from konabot.plugins.kona_ph.core.message import (
get_puzzle_info_message, get_puzzle_description,
get_submission_message) get_puzzle_hint_list,
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager, get_puzzle_info_message,
get_today_date, get_submission_message,
puzzle_manager) )
from konabot.plugins.poster.service import broadcast from konabot.plugins.kona_ph.core.storage import (
Puzzle,
PuzzleHint,
PuzzleManager,
get_today_date,
puzzle_manager,
)
from konabot.common.subscribe import broadcast
PUZZLE_PAGE_SIZE = 10 PUZZLE_PAGE_SIZE = 10
class PuzzleConfig(BaseModel): async def check_puzzle(
plugin_puzzle_manager: list[str] = [] manager: PuzzleManager,
plugin_puzzle_admin: list[str] = [] perm: DepPermManager,
plugin_puzzle_playgroup: list[str] = [] raw_id: str,
event: Event,
target: DepLongTaskTarget,
config = get_plugin_config(PuzzleConfig) ) -> Puzzle:
def is_puzzle_manager(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_manager or is_puzzle_admin(target)
def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def check_puzzle(manager: PuzzleManager, target: DepLongTaskTarget, raw_id: str) -> Puzzle:
if raw_id not in manager.puzzle_data: if raw_id not in manager.puzzle_data:
raise BotExceptionMessage("没有这个谜题") raise BotExceptionMessage("没有这个谜题")
puzzle = manager.puzzle_data[raw_id] puzzle = manager.puzzle_data[raw_id]
if is_puzzle_admin(target): if await perm.check_has_permission(event, "konaph.admin"):
return puzzle return puzzle
if target.target_id != puzzle.author_id: if target.target_id != puzzle.author_id:
raise BotExceptionMessage("你没有权限查看或编辑这个谜题") raise BotExceptionMessage("你没有权限查看或编辑这个谜题")
@ -60,7 +64,9 @@ def create_admin_commands():
Subcommand("unready", Args["raw_id", str], dest="unready"), Subcommand("unready", Args["raw_id", str], dest="unready"),
Subcommand("info", Args["raw_id", str], dest="info"), Subcommand("info", Args["raw_id", str], dest="info"),
Subcommand("my", Args["page?", int], dest="my"), Subcommand("my", Args["page?", int], dest="my"),
Subcommand("all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"), Subcommand(
"all", Option("--ready", alias=["-r"]), Args["page?", int], dest="all"
),
Subcommand("pin", Args["raw_id?", str], dest="pin"), Subcommand("pin", Args["raw_id?", str], dest="pin"),
Subcommand("unpin", dest="unpin"), Subcommand("unpin", dest="unpin"),
Subcommand( Subcommand(
@ -115,11 +121,11 @@ def create_admin_commands():
dest="hint", dest="hint",
), ),
), ),
rule=is_puzzle_manager, rule=require_permission("konaph.manager"),
) )
@cmd_admin.assign("$main") @cmd_admin.assign("$main")
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget, pm: DepPermManager, event: Event):
msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n") msg = UniMessage.text("==== [KonaPH] 指令一览 ====\n\n")
msg = msg.text("konaph create - 创建一个新的谜题\n") msg = msg.text("konaph create - 创建一个新的谜题\n")
msg = msg.text("konaph ready <id> - 准备好一道谜题\n") msg = msg.text("konaph ready <id> - 准备好一道谜题\n")
@ -132,7 +138,7 @@ def create_admin_commands():
msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n") msg = msg.text("konaph test <id> <answer> - 尝试提交一个答案,看回答的效果\n")
msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n") msg = msg.text("konaph hint - 查看如何编辑题目的中间答案\n")
if is_puzzle_admin(target): if await pm.check_has_permission(event, "konaph.admin"):
msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n") msg = msg.text("konaph all [--ready] <page?> - 查看所有谜题\n")
msg = msg.text("konaph pin - 查看当前置顶谜题\n") msg = msg.text("konaph pin - 查看当前置顶谜题\n")
msg = msg.text("konaph pin <id> - 置顶一个谜题\n") msg = msg.text("konaph pin <id> - 置顶一个谜题\n")
@ -145,48 +151,54 @@ def create_admin_commands():
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzle = manager.admin_create_puzzle(target.target_id) puzzle = manager.admin_create_puzzle(target.target_id)
await target.send_message(UniMessage.text( await target.send_message(
f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n" UniMessage.text(
f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n" f"✨ 创建好啦!谜题 ID 为 {puzzle.raw_id}\n\n"
f"- 输入 `konaph my` 查看你创建的谜题\n" f"- 输入 `konaph info {puzzle.raw_id}` 获得谜题的信息\n"
f"- 输入 `konaph modify` 查看更改谜题的方法" f"- 输入 `konaph my` 查看你创建的谜题\n"
)) f"- 输入 `konaph modify` 查看更改谜题的方法"
)
)
@cmd_admin.assign("ready") @cmd_admin.assign("ready")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if p.ready: if p.ready:
return await target.send_message(UniMessage.text( return await target.send_message(UniMessage.text("题目早就准备好啦!"))
"题目早就准备好啦!"
))
p.ready = True p.ready = True
await target.send_message(UniMessage.text( await target.send_message(
f"谜题「{p.title}」已经准备就绪!" UniMessage.text(f"谜题「{p.title}」已经准备就绪!")
)) )
@cmd_admin.assign("unready") @cmd_admin.assign("unready")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if not p.ready: if not p.ready:
return await target.send_message(UniMessage.text( return await target.send_message(
f"谜题「{p.title}」已经是未取消状态了!" UniMessage.text(f"谜题「{p.title}」已经是未取消状态了!")
)) )
if manager.is_puzzle_published(p.raw_id): if manager.is_puzzle_published(p.raw_id):
return await target.send_message(UniMessage.text( return await target.send_message(
"已发布的谜题不能取消准备状态!" UniMessage.text("已发布的谜题不能取消准备状态!")
)) )
p.ready = False p.ready = False
await target.send_message(UniMessage.text( await target.send_message(
f"谜题「{p.title}」已经取消准备!" UniMessage.text(f"谜题「{p.title}」已经取消准备!")
)) )
@cmd_admin.assign("info") @cmd_admin.assign("info")
async def _(raw_id: str, target: DepLongTaskTarget): async def _(
raw_id: str, target: DepLongTaskTarget, event: Event, perm: DepPermManager
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_info_message(manager, p)) await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my") @cmd_admin.assign("my")
@ -194,15 +206,15 @@ def create_admin_commands():
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzles = manager.get_puzzles_of_user(target.target_id) puzzles = manager.get_puzzles_of_user(target.target_id)
if len(puzzles) == 0: if len(puzzles) == 0:
return await target.send_message(UniMessage.text( return await target.send_message(
"你没有谜题哦,使用 `konaph create` 创建一个吧!" UniMessage.text("你没有谜题哦,使用 `konaph create` 创建一个吧!")
)) )
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages: if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text( return await target.send_message(
f"页数只有 1 ~ {count_pages} 啦!" UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)) )
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 我的谜题 ====\n\n") message = UniMessage.text("==== 我的谜题 ====\n\n")
for p in puzzles: for p in puzzles:
message = message.text("- ") message = message.text("- ")
@ -220,11 +232,15 @@ def create_admin_commands():
await target.send_message(message) await target.send_message(message)
@cmd_admin.assign("all") @cmd_admin.assign("all")
async def _(target: DepLongTaskTarget, ready: Query[bool] = Query("all.ready"), page: int = 1): async def _(
if not is_puzzle_admin(target): target: DepLongTaskTarget,
return await target.send_message(UniMessage.text( event: Event,
"你没有权限使用该指令" perm: DepPermManager,
)) ready: Query[bool] = Query("all.ready"),
page: int = 1,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzles = [*manager.puzzle_data.values()] puzzles = [*manager.puzzle_data.values()]
if ready.available: if ready.available:
@ -232,10 +248,10 @@ def create_admin_commands():
puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True) puzzles = sorted(puzzles, key=lambda p: p.created_at, reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages: if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text( return await target.send_message(
f"页数只有 1 ~ {count_pages} 啦!" UniMessage.text(f"页数只有 1 ~ {count_pages} 啦!")
)) )
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE : page * PUZZLE_PAGE_SIZE]
message = UniMessage.text("==== 所有谜题 ====\n\n") message = UniMessage.text("==== 所有谜题 ====\n\n")
for p in puzzles: for p in puzzles:
message = message.text("- ") message = message.text("- ")
@ -253,32 +269,30 @@ def create_admin_commands():
await target.send_message(message) await target.send_message(message)
@cmd_admin.assign("pin") @cmd_admin.assign("pin")
async def _(target: DepLongTaskTarget, raw_id: str = ""): async def _(
if not is_puzzle_admin(target): target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str = ""
return await target.send_message(UniMessage.text( ):
"你没有权限使用该指令" if not perm.check_has_permission(event, "konaph.admin"):
)) return await target.send_message(UniMessage.text("你没有权限使用该指令"))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if raw_id == "": if raw_id == "":
if manager.puzzle_pinned: if manager.puzzle_pinned:
return await target.send_message(UniMessage.text( return await target.send_message(
f"被 Pin 的谜题 ID = {manager.puzzle_pinned}" UniMessage.text(f"被 Pin 的谜题 ID = {manager.puzzle_pinned}")
)) )
return await target.send_message("没有置顶谜题") return await target.send_message("没有置顶谜题")
if raw_id not in manager.unpublished_puzzles: if raw_id not in manager.unpublished_puzzles:
return await target.send_message(UniMessage.text( return await target.send_message(
"这个谜题已经发布了,或者还没准备好,或者不存在" UniMessage.text("这个谜题已经发布了,或者还没准备好,或者不存在")
)) )
manager.admin_pin_puzzle(raw_id) manager.admin_pin_puzzle(raw_id)
return await target.send_message(f"已置顶谜题 {raw_id}") return await target.send_message(f"已置顶谜题 {raw_id}")
@cmd_admin.assign("unpin") @cmd_admin.assign("unpin")
async def _(target: DepLongTaskTarget): async def _(target: DepLongTaskTarget, event: Event, perm: DepPermManager):
if not is_puzzle_admin(target): if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text( return await target.send_message(UniMessage.text("你没有权限使用该指令"))
"你没有权限使用该指令"
))
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
manager.admin_pin_puzzle("") manager.admin_pin_puzzle("")
return await target.send_message("已取消所有置顶") return await target.send_message("已取消所有置顶")
@ -286,6 +300,8 @@ def create_admin_commands():
@cmd_admin.assign("modify") @cmd_admin.assign("modify")
async def _( async def _(
target: DepLongTaskTarget, target: DepLongTaskTarget,
event: Event,
perm: DepPermManager,
raw_id: str = "", raw_id: str = "",
title: str | None = None, title: str | None = None,
description: str | None = None, description: str | None = None,
@ -306,7 +322,7 @@ def create_admin_commands():
image_manager = get_image_manager() image_manager = get_image_manager()
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if title is not None: if title is not None:
p.title = title p.title = title
if description is not None: if description is not None:
@ -329,11 +345,14 @@ def create_admin_commands():
return await target.send_message("修改好啦!看看效果:\n\n" + info2) return await target.send_message("修改好啦!看看效果:\n\n" + info2)
@cmd_admin.assign("publish") @cmd_admin.assign("publish")
async def _(target: DepLongTaskTarget, raw_id: str | None = None): async def _(
if not is_puzzle_admin(target): target: DepLongTaskTarget,
return await target.send_message(UniMessage.text( event: Event,
"你没有权限使用该指令" perm: DepPermManager,
)) raw_id: str | None = None,
):
if not perm.check_has_permission(event, "konaph.admin"):
return await target.send_message(UniMessage.text("你没有权限使用该指令"))
today = get_today_date() today = get_today_date()
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
if today in manager.daily_puzzle_of_date: if today in manager.daily_puzzle_of_date:
@ -348,46 +367,64 @@ def create_admin_commands():
return await target.send_message("Ok!") return await target.send_message("Ok!")
@cmd_admin.assign("preview") @cmd_admin.assign("preview")
async def _(target: DepLongTaskTarget, raw_id: str): async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
return await target.send_message(get_puzzle_description(p)) return await target.send_message(get_puzzle_description(p))
@cmd_admin.assign("get-submits") @cmd_admin.assign("get-submits")
async def _(target: DepLongTaskTarget, raw_id: str): async def _(
target: DepLongTaskTarget, event: Event, perm: DepPermManager, raw_id: str
):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
puzzle = manager.puzzle_data.get(raw_id) puzzle = manager.puzzle_data.get(raw_id)
if puzzle is None: if puzzle is None:
return await target.send_message("没有这个谜题") return await target.send_message("没有这个谜题")
if not is_puzzle_admin(target) and target.target_id != puzzle.author_id: if (
not perm.check_has_permission(event, "konaph.admin")
and target.target_id != puzzle.author_id
):
return await target.send_message("你没有权限预览这个谜题") return await target.send_message("你没有权限预览这个谜题")
msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n") msg = UniMessage.text(f"==== {puzzle.title} 提交记录 ====\n\n")
submits = manager.submissions.get(raw_id, {}) submits = manager.submissions.get(raw_id, {})
for uid, ls in submits.items(): for uid, ls in submits.items():
s = ', '.join((i.flag for i in ls)) s = ", ".join((i.flag for i in ls))
msg = msg.text(f"- {get_username(uid)}{s}\n") msg = msg.text(f"- {get_username(uid)}{s}\n")
return await target.send_message(msg) return await target.send_message(msg)
@cmd_admin.assign("test") @cmd_admin.assign("test")
async def _(target: DepLongTaskTarget, raw_id: str, submission: str): async def _(
target: DepLongTaskTarget,
raw_id: str,
submission: str,
event: Event,
perm: DepPermManager,
):
""" """
测试一道谜题的回答,并给出结果 测试一道谜题的回答,并给出结果
""" """
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
result = p.check_submission(submission) result = p.check_submission(submission)
msg = get_submission_message(p, result) msg = get_submission_message(p, result)
return await target.send_message("[测试提交] " + msg) return await target.send_message("[测试提交] " + msg)
@cmd_admin.assign("subcommands.hint") @cmd_admin.assign("subcommands.hint")
async def _(target: DepLongTaskTarget, subcommands: Query[SubcommandResult] = Query("subcommands.hint")): async def _(
target: DepLongTaskTarget,
subcommands: Query[SubcommandResult] = Query("subcommands.hint"),
):
if len(subcommands.result.subcommands) > 0: if len(subcommands.result.subcommands) > 0:
return return
return await target.send_message( return await target.send_message(
UniMessage.text("==== 提示/中间答案编辑器 ====\n\n") UniMessage.text("==== 提示/中间答案编辑器 ====\n\n")
.text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n") .text("- konaph hint list <id>\n - 查看某道题的所有提示 / 中间答案\n")
.text("- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n") .text(
"- konaph hint add <id> <pattern> <hint>\n - 添加一个提示 / 中间答案\n"
)
.text("- konaph hint modify <id> <hint_id>\n") .text("- konaph hint modify <id> <hint_id>\n")
.text(" - --pattern <pattern>\n - 更改匹配规则\n") .text(" - --pattern <pattern>\n - 更改匹配规则\n")
.text(" - --message <message>\n - 更改提示文本\n") .text(" - --message <message>\n - 更改提示文本\n")
@ -402,9 +439,11 @@ def create_admin_commands():
raw_id: str, raw_id: str,
pattern: str, pattern: str,
message: str, message: str,
event: Event,
perm: DepPermManager,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
p.hints[p.hint_id_max + 1] = PuzzleHint( p.hints[p.hint_id_max + 1] = PuzzleHint(
pattern=pattern, pattern=pattern,
message=message, message=message,
@ -416,9 +455,11 @@ def create_admin_commands():
async def _( async def _(
target: DepLongTaskTarget, target: DepLongTaskTarget,
raw_id: str, raw_id: str,
event: Event,
perm: DepPermManager,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
await target.send_message(get_puzzle_hint_list(p)) await target.send_message(get_puzzle_hint_list(p))
@cmd_admin.assign("subcommands.hint.modify") @cmd_admin.assign("subcommands.hint.modify")
@ -426,12 +467,14 @@ def create_admin_commands():
target: DepLongTaskTarget, target: DepLongTaskTarget,
raw_id: str, raw_id: str,
hint_id: int, hint_id: int,
event: Event,
perm: DepPermManager,
pattern: str | None = None, pattern: str | None = None,
message: str | None = None, message: str | None = None,
is_checkpoint: bool | None = None, is_checkpoint: bool | None = None,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints: if hint_id not in p.hints:
raise BotExceptionMessage( raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单" f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -450,9 +493,11 @@ def create_admin_commands():
target: DepLongTaskTarget, target: DepLongTaskTarget,
raw_id: str, raw_id: str,
hint_id: int, hint_id: int,
event: Event,
perm: DepPermManager,
): ):
async with puzzle_manager() as manager: async with puzzle_manager() as manager:
p = check_puzzle(manager, target, raw_id) p = await check_puzzle(manager, perm, raw_id, event, target)
if hint_id not in p.hints: if hint_id not in p.hints:
raise BotExceptionMessage( raise BotExceptionMessage(
f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单" f"没有这个 hint_id。请使用 konaph hint list {raw_id} 了解 hint 清单"
@ -460,5 +505,4 @@ def create_admin_commands():
del p.hints[hint_id] del p.hints[hint_id]
await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p)) await target.send_message("删除成功!\n\n" + get_puzzle_hint_list(p))
return cmd_admin return cmd_admin

View File

@ -1,57 +0,0 @@
import asyncio
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import UniMessage
from konabot.common.nb.is_admin import is_admin
from mcstatus.responses import JavaStatusResponse
cmd = on_command("宾几人", aliases=set(("宾人数", "mcbingo")), rule=is_admin)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join((
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
))
await UniMessage.text(messages).finish(evt, at_sender=False)

View File

@ -0,0 +1,131 @@
import asyncio
import datetime
from typing import Literal
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from mcstatus.responses import JavaStatusResponse
from nonebot_plugin_apscheduler import scheduler
from konabot.common.permsys import DepPermManager, require_permission
from konabot.plugins.minecraft_servers.simpfun_server import SimpfunServer
cmd = on_command(
"宾几人",
aliases=set(("宾人数", "mcbingo")),
rule=require_permission("minecraft.bingo.check"),
)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event, pm: DepPermManager):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join(
(
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
)
)
if await pm.check_has_permission(evt, "minecraft.bingo.manipulate"):
messages += "\n\n---\n\n你可以使用 bingoman start 开启小帕的 bingo 服,用 bingoman stop 关闭小帕的 bingo 服"
await UniMessage.text(messages).finish(evt, at_sender=False)
cmd_bingo_manipulate = on_alconna(
Alconna("bingoman", Args["action", str]),
aliases=("宾服务器", "bingo服"),
rule=require_permission("minecraft.bingo.manipulate"),
)
actions: dict[str, Literal["start", "stop", "restart", "kill"]] = {
"up": "start",
"down": "stop",
"start": "start",
"stop": "stop",
"开机": "start",
"关机": "stop",
"restart": "restart",
"kill": "kill",
"重启": "restart",
}
@cmd_bingo_manipulate.handle()
async def _(action: str, event: Event):
server = SimpfunServer.new() # 使用默认配置管理服务器
a = actions.get(action.lower().strip())
if a is None:
await UniMessage.text(f"操作 {action} 不存在").send(event, at_sender=True)
return
resp = await server.power(a)
if resp.code == 200:
await UniMessage.text("好了").send(event, at_sender=True)
else:
await UniMessage.text(f"不好:{resp}").send(event, at_sender=True)
@scheduler.scheduled_job("cron", hour="4,23")
async def _():
server = SimpfunServer.new()
today = datetime.datetime.now()
# 获取服务器当前状态,重试多次以保证不会误判服务器未开启
server_up = False
server_players = 0
for _ in range(3):
mcs = mcstatus.JavaServer("play.simpfun.cn", 11495)
try:
resp = await mcs.async_status()
server_up = True
server_players = resp.players.online
except Exception:
pass
if today.weekday() == 5 and today.hour < 12:
# 每周六开机一天,保证可以让服务器不被自动销毁
if not server_up:
await server.power("start")
else:
# 每用一个自然日都会计费,所以要赶在这一天结束之前关服
# 平时如果没人,也自动关上
if server_up and server_players == 0:
await server.power("stop")

View File

@ -0,0 +1,90 @@
from dataclasses import dataclass
import datetime
from typing import Literal
import aiohttp
from pydantic import BaseModel
class SimpfunServerConfig(BaseModel):
plugin_simpfun_api_key: str = ""
plugin_simpfun_base_url: str = "https://api.simpfun.cn"
plugin_simpfun_instance_id: int = 0
def get_config():
from nonebot import get_plugin_config
return get_plugin_config(SimpfunServerConfig)
class PowerManageResult(BaseModel):
code: int
status: bool
msg: str
class SimpfunServerDetailUtilization(BaseModel):
memory_bytes: int
cpu_absolute: float
disk_bytes: int
network_rx_bytes: int
network_tx_bytes: int
uptime: float
disk_last_check_time: datetime.datetime
class SimpfunServerDetailData(BaseModel):
id: int
name: str
is_pro: bool
status: str
"运行中的话,是 running"
is_suspended: bool
utilization: SimpfunServerDetailUtilization
class SimpfunServerDetailResp(BaseModel):
code: int
data: SimpfunServerDetailData
@dataclass
class SimpfunServer:
instance_id: int
api_key: str
base_url: str
async def power(
self, action: Literal["start", "stop", "restart", "kill"]
) -> PowerManageResult:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url, params={"action": action}) as resp:
resp.raise_for_status()
return PowerManageResult.model_validate_json(await resp.read())
async def detail(self) -> SimpfunServerDetailResp:
url = f"{self.base_url}/api/ins/{self.instance_id}/power"
async with aiohttp.ClientSession(
headers={"Authorization": self.api_key}
) as session:
async with session.get(url) as resp:
resp.raise_for_status()
return SimpfunServerDetailResp.model_validate_json(await resp.read())
@staticmethod
def new(config: SimpfunServerConfig | None = None):
if config is None:
config = get_config()
return SimpfunServer(
instance_id=config.plugin_simpfun_instance_id,
api_key=config.plugin_simpfun_api_key,
base_url=config.plugin_simpfun_base_url,
)

View File

@ -6,6 +6,7 @@ from konabot.common.nb.match_keyword import match_keyword
evt_nya = on_message(rule=match_keyword("")) evt_nya = on_message(rule=match_keyword(""))
@evt_nya.handle() @evt_nya.handle()
async def _(): async def _():
await evt_nya.send(await UniMessage().text("").export()) await evt_nya.send(await UniMessage().text("").export())
@ -25,8 +26,9 @@ NYA_SYMBOL_MAPPING = {
"~": "~", "~": "~",
"": "", "": "",
" ": " ", " ": " ",
"\n": "\n",
} }
NYA_SYMBOL_KEEP = "—¹₁²₂³₃⁴₄⁵₅⁶₆⁷₇⁸₈⁹₉⁰₀\n"
NYA_SYMBOL_MAPPING.update((k, k) for k in NYA_SYMBOL_KEEP)
async def has_nya(msg: UniMsg) -> bool: async def has_nya(msg: UniMsg) -> bool:
@ -49,10 +51,10 @@ async def has_nya(msg: UniMsg) -> bool:
evt_nya_v2 = on_message(rule=has_nya) evt_nya_v2 = on_message(rule=has_nya)
@evt_nya_v2.handle() @evt_nya_v2.handle()
async def _(msg: UniMsg, evt: Event): async def _(msg: UniMsg, evt: Event):
text = msg.extract_plain_text() text = msg.extract_plain_text()
await UniMessage.text(''.join( await UniMessage.text("".join((NYA_SYMBOL_MAPPING.get(c, "") for c in text))).send(
(NYA_SYMBOL_MAPPING.get(c, '') for c in text) evt
)).send(evt) )

View File

@ -3,14 +3,15 @@ from nonebot_plugin_alconna import Alconna, Args, on_alconna
from konabot.common.longtask import DepLongTaskTarget from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery from konabot.common.pager import PagerQuery
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA from konabot.common.subscribe import POSTER_INFO_DATA, dep_poster_service
from konabot.plugins.poster.service import dep_poster_service
cmd_subscribe = on_alconna(Alconna( cmd_subscribe = on_alconna(
"订阅", Alconna(
Args["channel", str], "订阅",
)) Args["channel", str],
)
)
@cmd_subscribe.handle() @cmd_subscribe.handle()
@ -23,10 +24,12 @@ async def _(target: DepLongTaskTarget, channel: str):
await target.send_message(f"已经订阅过「{channel}」了") await target.send_message(f"已经订阅过「{channel}」了")
cmd_list = on_alconna(Alconna( cmd_list = on_alconna(
"re:(?:查询|我的|获取)订阅(列表)?", Alconna(
Args["page?", int], "re:(?:查询|我的|获取)订阅(列表)?",
)) Args["page?", int],
)
)
def better_channel_message(channel_id: str) -> str: def better_channel_message(channel_id: str) -> str:
@ -39,17 +42,24 @@ def better_channel_message(channel_id: str) -> str:
@cmd_list.handle() @cmd_list.handle()
async def _(target: DepLongTaskTarget, page: int = 1): async def _(target: DepLongTaskTarget, page: int = 1):
async with dep_poster_service() as service: async with dep_poster_service() as service:
result = await service.get_channels(target, PagerQuery( result = await service.get_channels(
page_index=page, target,
page_size=10, PagerQuery(
)) page_index=page,
await target.send_message(result.to_unimessage(title="订阅列表", formatter=better_channel_message)) page_size=10,
),
)
await target.send_message(
result.to_unimessage(title="订阅列表", formatter=better_channel_message)
)
cmd_list_available = on_alconna(Alconna( cmd_list_available = on_alconna(
"re:(查询)?可用订阅(列表)?", Alconna(
Args["page?", int], "re:(查询)?可用订阅(列表)?",
)) Args["page?", int],
)
)
@cmd_list_available.handle() @cmd_list_available.handle()
@ -58,13 +68,17 @@ async def _(target: DepLongTaskTarget, page: int = 1):
page_index=page, page_index=page,
page_size=10, page_size=10,
).apply(sorted(POSTER_INFO_DATA.keys())) ).apply(sorted(POSTER_INFO_DATA.keys()))
await target.send_message(result.to_unimessage(title="可用订阅列表", formatter=better_channel_message)) await target.send_message(
result.to_unimessage(title="可用订阅列表", formatter=better_channel_message)
)
cmd_unsubscribe = on_alconna(Alconna( cmd_unsubscribe = on_alconna(
"取消订阅", Alconna(
Args["channel", str], "取消订阅",
)) Args["channel", str],
)
)
@cmd_unsubscribe.handle() @cmd_unsubscribe.handle()
@ -79,6 +93,7 @@ async def _(target: DepLongTaskTarget, channel: str):
driver = nonebot.get_driver() driver = nonebot.get_driver()
@driver.on_startup @driver.on_startup
async def _(): async def _():
async with dep_poster_service() as service: async with dep_poster_service() as service:

View File

@ -4,8 +4,7 @@ from nonebot.internal.adapter.event import Event
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_apscheduler import scheduler
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info from konabot.common.subscribe import PosterInfo, register_poster_info, broadcast
from konabot.plugins.poster.service import broadcast
register_poster_info( register_poster_info(
"二十四节气", "二十四节气",
@ -98,4 +97,3 @@ async def _(event: Event):
msg = UniMessage.text(f"现在的节气是{date.term}") msg = UniMessage.text(f"现在的节气是{date.term}")
await msg.send(event) await msg.send(event)

View File

@ -1,20 +1,23 @@
import asyncio import asyncio
from nonebot import get_driver from nonebot import get_driver
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from konabot.plugins.poster.poster_info import register_poster_info, PosterInfo from konabot.common.subscribe import register_poster_info, PosterInfo, broadcast
from konabot.plugins.poster.service import broadcast
CHANNEL_STARTUP = "启动通知" CHANNEL_STARTUP = "启动通知"
register_poster_info(CHANNEL_STARTUP, PosterInfo( register_poster_info(
aliases=set(), CHANNEL_STARTUP,
description="当 Bot 重启时告知", PosterInfo(
)) aliases=set(),
description="当 Bot 重启时告知",
),
)
driver = get_driver() driver = get_driver()
@driver.on_startup @driver.on_startup
async def _(): async def _():
# 要尽量保证接受讯息的服务存在 # 要尽量保证接受讯息的服务存在
@ -30,4 +33,3 @@ async def _():
await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了")) await broadcast(CHANNEL_STARTUP, UniMessage.text("此方 BOT 重启好了"))
asyncio.create_task(task()) asyncio.create_task(task())