113 lines
3.2 KiB
Python
113 lines
3.2 KiB
Python
from typing import Annotated
|
|
from nonebot.adapters import Event
|
|
from nonebot.params import Depends
|
|
from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, on_alconna
|
|
from konabot.common.pager import PagerQuery
|
|
from konabot.common.permsys import DepPermManager, require_permission
|
|
from konabot.common.permsys.entity import PermEntity, get_entity_chain_of_entity
|
|
|
|
|
|
cmd = on_alconna(
|
|
Alconna(
|
|
"konaperm",
|
|
Subcommand(
|
|
"list",
|
|
Args["platform", str],
|
|
Args["entity_type", str],
|
|
Args["external_id", str],
|
|
Args["page?", int],
|
|
),
|
|
Subcommand(
|
|
"get",
|
|
Args["platform", str],
|
|
Args["entity_type", str],
|
|
Args["external_id", str],
|
|
Args["perm", str],
|
|
),
|
|
Subcommand(
|
|
"set",
|
|
Args["platform", str],
|
|
Args["entity_type", str],
|
|
Args["external_id", str],
|
|
Args["perm", str],
|
|
Args["val", str],
|
|
),
|
|
),
|
|
rule=require_permission("admin"),
|
|
)
|
|
|
|
|
|
async def _get_perm_entity_chain(platform: str, entity_type: str, external_id: str):
|
|
return get_entity_chain_of_entity(PermEntity(platform, entity_type, external_id))
|
|
|
|
|
|
_DepEntityChain = Annotated[list[PermEntity], Depends(_get_perm_entity_chain)]
|
|
|
|
|
|
def make_formatter(parent: PermEntity):
|
|
def _formatter(d: tuple[PermEntity, str, bool]):
|
|
permmark = {True: "[✅ ALLOW] ", False: "[❌ DENY] "}[d[2]]
|
|
inheritmark = ""
|
|
if parent != d[0]:
|
|
inheritmark = (
|
|
f"[继承自 {d[0].platform}.{d[0].entity_type}.{d[0].external_id}] "
|
|
)
|
|
return f"{permmark}{inheritmark}{d[1]}"
|
|
|
|
return _formatter
|
|
|
|
|
|
@cmd.assign("list")
|
|
async def list_permission(
|
|
pm: DepPermManager,
|
|
ec: _DepEntityChain,
|
|
event: Event,
|
|
page: int = 1,
|
|
):
|
|
pq = PagerQuery(page, 10)
|
|
data = await pm.list_permission(ec, pq)
|
|
msg = data.to_unimessage(make_formatter(ec[0]))
|
|
await msg.send(event)
|
|
|
|
|
|
@cmd.assign("get")
|
|
async def get_permission(
|
|
pm: DepPermManager,
|
|
ec: _DepEntityChain,
|
|
perm: str,
|
|
event: Event,
|
|
):
|
|
data = await pm.check_has_permission_info(ec, perm)
|
|
|
|
obj_s = f"{ec[0].platform}.{ec[0].entity_type}.{ec[0].external_id}"
|
|
|
|
if data is None:
|
|
await UniMessage.text(f"对象 {obj_s} 无 {perm} 权限记录").send(event)
|
|
return
|
|
pe, k, p = data
|
|
inheritmark = ""
|
|
if ec[0] != pe or k != perm:
|
|
inheritmark = (
|
|
f"继承自 {pe.platform}.{pe.entity_type}.{pe.external_id} 对 {k} 的设置,"
|
|
)
|
|
await UniMessage.text(f"{inheritmark}对象 {obj_s} 对 {perm} 的权限为 {p}").send(
|
|
event
|
|
)
|
|
|
|
|
|
@cmd.assign("set")
|
|
async def set_permission(
|
|
pm: DepPermManager,
|
|
ec: _DepEntityChain,
|
|
perm: str,
|
|
val: str,
|
|
event: Event,
|
|
):
|
|
if any(i == val.lower() for i in ("y", "yes", "allow", "true", "t")):
|
|
await pm.update_permission(ec[0], perm, True)
|
|
elif any(i == val.lower() for i in ("n", "no", "deny", "false", "f")):
|
|
await pm.update_permission(ec[0], perm, False)
|
|
elif any(i == val.lower() for i in ("null", "none")):
|
|
await pm.update_permission(ec[0], perm, None)
|
|
await get_permission(pm, ec, perm, event)
|