Files
konabot/konabot/plugins/mc_count_player/__init__.py

58 lines
1.8 KiB
Python

import asyncio
import mcstatus
from nonebot import on_command
from nonebot.adapters import Event
from nonebot_plugin_alconna import UniMessage
from konabot.common.nb.is_admin import is_admin
from mcstatus.responses import JavaStatusResponse
cmd = on_command("宾几人", aliases=set(("宾人数", "mcbingo")), rule=is_admin)
def parse_status(motd: str) -> str:
if "[PRE-GAME]" in motd:
return "[✨ 空闲]"
if "[IN-GAME]" in motd:
return "[🕜 游戏中]"
if "[POST-GAME]" in motd:
return "[🕜 游戏中]"
return "[✨ 开放]"
def dump_server_status(name: str, status: JavaStatusResponse | BaseException) -> str:
if isinstance(status, JavaStatusResponse):
motd = status.motd.to_plain()
# Bingo Status: [PRE-GAME], [IN-GAME], [POST-GAME]
st = parse_status(motd)
players_sample = status.players.sample or []
players_sample_suffix = ""
if len(players_sample) > 0:
player_list = [s.name for s in players_sample]
players_sample_suffix = " (" + ", ".join(player_list) + ")"
return f"{name}: {st} {status.players.online} 人在线{players_sample_suffix}"
else:
return f"{name}: 好像没开"
@cmd.handle()
async def _(evt: Event):
servers = (
(mcstatus.JavaServer("play.simpfun.cn", 11495), "小帕 Bingo"),
(mcstatus.JavaServer("bingo.mujica.tech"), "坏枪 Bingo"),
(mcstatus.JavaServer("mc.mujica.tech", 11456), "齿轮盛宴"),
)
responses = await asyncio.gather(
*map(lambda s: s[0].async_status(), servers),
return_exceptions=True,
)
messages = "\n".join((
dump_server_status(n, r)
for n, r in zip(map(lambda s: s[1], servers), responses)
))
await UniMessage.text(messages).finish(evt, at_sender=False)