Compare commits

...

4 Commits

Author SHA1 Message Date
abb864ec70 修复单元测试问题 2026-03-07 17:51:49 +08:00
b38dde1b70 修正若干拼写错误,增强相关逻辑 2026-03-07 17:50:35 +08:00
8f40572a38 修复拼写错误并完成文档 2026-03-07 17:43:37 +08:00
230705f689 完成权限系统 2026-03-07 17:35:59 +08:00
8 changed files with 695 additions and 33 deletions

View File

@ -30,7 +30,7 @@ steps:
volumes: volumes:
- name: docker-socket - name: docker-socket
path: /var/run/docker.sock path: /var/run/docker.sock
- name: 在容器中测试插件加载 - name: 在容器中进行若干测试
image: docker:dind image: docker:dind
privileged: true privileged: true
volumes: volumes:
@ -38,14 +38,8 @@ steps:
path: /var/run/docker.sock path: /var/run/docker.sock
commands: commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- name: 在容器中测试 Playwright 工作正常
image: docker:dind
privileged: true
volumes:
- name: docker-socket
path: /var/run/docker.sock
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py - docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python -m pytest --cov-report term-missing:skip-covered
- name: 发送构建结果到 ntfy - name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy image: parrazam/drone-ntfy
when: when:

5
bot.py
View File

@ -7,7 +7,6 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
from konabot.common import permsys
from konabot.common.log import init_logger from konabot.common.log import init_logger
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.path import LOG_PATH from konabot.common.path import LOG_PATH
@ -57,11 +56,13 @@ def main():
nonebot.load_plugins("konabot/plugins") nonebot.load_plugins("konabot/plugins")
nonebot.load_plugin("nonebot_plugin_analysis_bilibili") nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
from konabot.common import permsys
permsys.create_startup() permsys.create_startup()
# 注册关闭钩子 # 注册关闭钩子
@driver.on_shutdown @driver.on_shutdown
async def shutdown_handler(): async def _():
# 关闭全局数据库管理器 # 关闭全局数据库管理器
db_manager = get_global_db_manager() db_manager = get_global_db_manager()
await db_manager.close_all_connections() await db_manager.close_all_connections()

235
docs/permsys.md Normal file
View File

@ -0,0 +1,235 @@
# 权限系统 `konabot.common.permsys`
本文档面向维护者,说明 `konabot/common/permsys` 模块的职责、数据模型、权限解析规则,以及在插件中接入的推荐方式。
## 模块目标
`permsys` 提供了一套简单的、可继承的权限系统,用于回答两个问题:
1. 某个事件对应的主体是谁。
2. 该主体是否拥有某项权限。
它适合处理 bot 内部的功能开关、管理权限、平台级授权等场景。
当前模块由以下几部分组成:
- `konabot/common/permsys/__init__.py`
- 暴露 `PermManager``DepPermManager``require_permission`
- 负责数据库初始化、启动迁移、超级管理员默认授权
- `konabot/common/permsys/entity.py`
- 定义 `PermEntity`
- 将事件转换为可查询的实体链
- `konabot/common/permsys/repo.py`
- 封装 SQLite 读写
- `konabot/common/permsys/migrates/`
- 存放迁移 SQL
- `konabot/common/permsys/sql/`
- 存放查询与更新 SQL
## 核心概念
### 1. `PermEntity`
`PermEntity` 是权限系统中的最小主体标识:
```python
PermEntity(platform: str, entity_type: str, external_id: str)
```
示例:
- `PermEntity("sys", "global", "global")`
- `PermEntity("ob11", "group", "123456")`
- `PermEntity("ob11", "user", "987654")`
其中:
- `platform` 表示来源平台,如 `sys``ob11``discord`
- `entity_type` 表示主体类型,如 `global``group``user`
- `external_id` 表示平台侧的外部标识
### 2. 实体链
权限判断不是只看单个实体,而是看一条“实体链”。
`get_entity_chain_of_entity()` 为例,传入一个具体实体时,返回的链为:
```python
[
PermEntity(platform, entity_type, external_id),
PermEntity(platform, "global", "global"),
PermEntity("sys", "global", "global"),
]
```
这意味着权限会优先读取更具体的主体,再回退到平台全局,最后回退到系统全局。
`get_entity_chain(event)` 则会根据事件类型自动构造链。例如:
- OneBot V11 群消息:用户 -> 群 -> 平台全局 -> 系统全局
- OneBot V11 私聊:用户 -> 平台全局 -> 系统全局
- Discord 频道消息:用户/频道/服务器 -> 平台全局 -> 系统全局
- Console控制台用户/频道 -> 平台全局 -> 系统全局
注意:当前 `entity.py` 中的具体链顺序与字段命名应以实现为准;修改这里时要评估现有权限继承是否会被破坏。
### 3. 权限键
权限键使用点分结构,例如:
- `admin`
- `plugin.weather`
- `plugin.weather.use`
检查时会自动做前缀回退。以 `plugin.weather.use` 为例,查询顺序是:
1. `plugin.weather.use`
2. `plugin.weather`
3. `plugin`
4. `*`
因此,`*` 可以看作兜底总权限。
## 权限解析规则
`PermManager.check_has_permission_info()` 的逻辑可以概括为:
1. 先把输入转换成实体链。
2. 对权限键做逐级回退,同时追加 `*`
3. 在数据库中批量查出链上所有实体、所有候选键的显式记录。
4. 按“实体越具体越优先、权限键越具体越优先”的顺序,返回第一条命中的记录。
若没有任何显式记录:
- `check_has_permission_info()` 返回 `None`
- `check_has_permission()` 返回 `False`
这表示本系统默认是“未授权即拒绝”。
## 数据存储
模块使用 SQLite默认数据库文件位于
- `data/perm.sqlite3`
启动时会执行迁移:
- `create_startup()` 在 NoneBot 启动事件中调用 `execute_migration()`
权限值支持三态:
- `True`:显式允许
- `False`:显式拒绝
- `None`:删除/清空该层的显式设置,让判断重新回退到继承链
`repo.py` 中的 `update_perm_info()` 会将这个三态直接写入数据库。
## 超级管理员注入
在启动阶段,`create_startup()` 会读取 `konabot.common.nb.is_admin.cfg.admin_qq_account`,并为这些 QQ 账号写入:
```python
PermEntity("ob11", "user", str(account)), "*", True
```
也就是说,配置中的超级管理员会直接拥有全部权限。
这属于启动时自动灌入的保底策略,不依赖手工授权命令。
## 在插件中使用
### 1. 直接做权限检查
```python
from konabot.common.permsys import DepPermManager
async def handler(pm: DepPermManager, event):
ok = await pm.check_has_permission(event, "plugin.example.use")
if not ok:
return
```
适合需要在处理流程中动态决定权限键的场景。
### 2. 挂到 Rule 上做准入控制
```python
from nonebot_plugin_alconna import Alconna, on_alconna
from konabot.common.permsys import require_permission
cmd = on_alconna(
Alconna("example"),
rule=require_permission("plugin.example.use"),
)
```
适合命令入口明确、未通过时直接拦截的场景。
### 3. 更新权限
```python
from konabot.common.permsys import DepPermManager
from konabot.common.permsys.entity import PermEntity
await pm.update_permission(
PermEntity("ob11", "group", "123456"),
"plugin.example.use",
True,
)
```
建议只在专门的管理插件中开放写权限,避免普通功能插件到处分散改表。
## `perm_manage` 插件与本模块的关系
`konabot/plugins/perm_manage/__init__.py` 是本模块当前的管理入口,提供:
- `konaperm list`:列出实体链上已有的显式权限记录
- `konaperm get`:查看某个权限最终命中的记录
- `konaperm set`:写入 allow/deny/null
这个插件本身使用 `require_permission("admin")` 保护,因此只有拥有 `admin` 权限的主体才能管理权限。
## 接入建议
### 权限键命名
建议使用稳定、可扩展的分层键名:
- 推荐:`plugin.xxx``plugin.xxx.action`
- 不推荐:含糊的单词或临时字符串
这样才能利用前缀回退机制做批量授权。
### 输入安全
虽然这个项目偏内部使用,但权限键、实体类型、外部 ID 仍然应视为不可信输入:
- 不要把聊天输入直接拼到 SQL 中
- 不要让任意用户可随意构造高权限写入
- 对可写命令至少做权限保护和必要校验
### 改动兼容性
以下改动都可能影响全局权限行为,修改前应充分评估:
- 更改实体链顺序
- 更改默认兜底键 `*` 的语义
- 更改 `None` 的处理方式
- 更改启动时超级管理员注入逻辑
## 调试建议
- 先用 `konaperm get ...` 确认某个权限最终命中了哪一层
- 再用 `konaperm list ...` 查看该实体链上有哪些显式记录
- 若表现异常,检查是否是更上层实体或更宽泛权限键提前命中
## 相关文件
- `konabot/common/permsys/__init__.py`
- `konabot/common/permsys/entity.py`
- `konabot/common/permsys/repo.py`
- `konabot/plugins/perm_manage/__init__.py`

View File

@ -1,7 +1,11 @@
from typing import Annotated
import nonebot import nonebot
from nonebot.adapters import Event from nonebot.adapters import Event
from nonebot.params import Depends
from nonebot.rule import Rule
from konabot.common.database import DatabaseManager from konabot.common.database import DatabaseManager
from konabot.common.pager import PagerQuery
from konabot.common.path import DATA_PATH from konabot.common.path import DATA_PATH
from konabot.common.permsys.entity import PermEntity, get_entity_chain from konabot.common.permsys.entity import PermEntity, get_entity_chain
from konabot.common.permsys.migrates import execute_migration from konabot.common.permsys.migrates import execute_migration
@ -11,18 +15,23 @@ from konabot.common.permsys.repo import PermRepo
db = DatabaseManager(DATA_PATH / "perm.sqlite3") db = DatabaseManager(DATA_PATH / "perm.sqlite3")
_EntityLike = Event | PermEntity | list[PermEntity]
async def _to_entity_chain(el: _EntityLike):
if isinstance(el, Event):
return await get_entity_chain(el) # pragma: no cover
if isinstance(el, PermEntity):
return [el]
return el
class PermManager: class PermManager:
def __init__(self, db: DatabaseManager) -> None: def __init__(self, db: DatabaseManager) -> None:
self.db = db self.db = db
async def check_has_permission( async def check_has_permission_info(self, entities: _EntityLike, key: str):
self, entities: Event | PermEntity | list[PermEntity], key: str entities = await _to_entity_chain(entities)
) -> bool:
if isinstance(entities, Event):
entities = await get_entity_chain(entities) # pragma: no cover
if isinstance(entities, PermEntity):
entities = [entities]
key = key.removesuffix("*").removesuffix(".") key = key.removesuffix("*").removesuffix(".")
key_split = key.split(".") key_split = key.split(".")
key_split = [s for s in key_split if len(s) > 0] key_split = [s for s in key_split if len(s) > 0]
@ -32,24 +41,31 @@ class PermManager:
async with self.db.get_conn() as conn: async with self.db.get_conn() as conn:
repo = PermRepo(conn) repo = PermRepo(conn)
# for entity in entities:
# for k in keys:
# perm = await repo.get_perm_info(entity, k)
# if perm is not None:
# return perm
data = await repo.get_perm_info_batch(entities, keys) data = await repo.get_perm_info_batch(entities, keys)
for entity in entities: for entity in entities:
for k in keys: for k in keys:
p = data.get((entity, k)) p = data.get((entity, k))
if p is not None: if p is not None:
return p return (entity, k, p)
return False return None
async def check_has_permission(self, entities: _EntityLike, key: str) -> bool:
res = await self.check_has_permission_info(entities, key)
if res is None:
return False
return res[2]
async def update_permission(self, entity: PermEntity, key: str, perm: bool | None): async def update_permission(self, entity: PermEntity, key: str, perm: bool | None):
async with self.db.get_conn() as conn: async with self.db.get_conn() as conn:
repo = PermRepo(conn) repo = PermRepo(conn)
await repo.update_perm_info(entity, key, perm) await repo.update_perm_info(entity, key, perm)
async def list_permission(self, entities: _EntityLike, query: PagerQuery):
entities = await _to_entity_chain(entities)
async with self.db.get_conn() as conn:
repo = PermRepo(conn)
return await repo.list_perm_info_batch(entities, query)
def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma: no cover def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma: no cover
if _db is None: if _db is None:
@ -58,13 +74,35 @@ def perm_manager(_db: DatabaseManager | None = None) -> PermManager: # pragma:
def create_startup(): # pragma: no cover def create_startup(): # pragma: no cover
from konabot.common.nb.is_admin import cfg
driver = nonebot.get_driver() driver = nonebot.get_driver()
@driver.on_startup @driver.on_startup
async def _(): async def _():
async with db.get_conn() as conn: async with db.get_conn() as conn:
await execute_migration(conn) await execute_migration(conn)
pm = perm_manager(db)
for account in cfg.admin_qq_account:
# ^ 这里的是超级管理员!!用环境变量定义的。
# 咕嘿嘿嘿!!!夺取全部权限!!!
await pm.update_permission(
PermEntity("ob11", "user", str(account)), "*", True
)
@driver.on_shutdown @driver.on_shutdown
async def _(): async def _():
await db.close_all_connections() try:
await db.close_all_connections()
except Exception:
pass
DepPermManager = Annotated[PermManager, Depends(perm_manager)]
def require_permission(perm: str) -> Rule: # pragma: no cover
async def check_permission(event: Event, pm: DepPermManager) -> bool:
return await pm.check_has_permission(event, perm)
return Rule(check_permission)

View File

@ -21,6 +21,14 @@ class PermEntity:
external_id: str external_id: str
def get_entity_chain_of_entity(entity: PermEntity) -> list[PermEntity]:
return [
PermEntity("sys", "global", "global"),
PermEntity(entity.platform, "global", "global"),
entity,
][::-1]
async def get_entity_chain(event: Event) -> list[PermEntity]: # pragma: no cover async def get_entity_chain(event: Event) -> list[PermEntity]: # pragma: no cover
entities = [PermEntity("sys", "global", "global")] entities = [PermEntity("sys", "global", "global")]
@ -38,7 +46,7 @@ async def get_entity_chain(event: Event) -> list[PermEntity]: # pragma: no cove
entities.append(PermEntity("discord", "global", "global")) entities.append(PermEntity("discord", "global", "global"))
if isinstance(event, DiscordGMEvent): if isinstance(event, DiscordGMEvent):
entities.append(PermEntity("discord", "guilt", str(event.guild_id))) entities.append(PermEntity("discord", "guild", str(event.guild_id)))
entities.append(PermEntity("discord", "channel", str(event.channel_id))) entities.append(PermEntity("discord", "channel", str(event.channel_id)))
entities.append(PermEntity("discord", "user", str(event.user_id))) entities.append(PermEntity("discord", "user", str(event.user_id)))

View File

@ -1,8 +1,11 @@
from dataclasses import dataclass from dataclasses import dataclass
import math
from pathlib import Path from pathlib import Path
import aiosqlite import aiosqlite
from konabot.common.pager import PagerQuery, PagerResult
from .entity import PermEntity from .entity import PermEntity
@ -118,7 +121,7 @@ class PermRepo:
async def get_entity_id_batch( async def get_entity_id_batch(
self, entities: list[PermEntity] self, entities: list[PermEntity]
) -> dict[PermEntity, int]: ) -> dict[PermEntity, int]:
"""批量获取 Entity 的 eneity_id """批量获取 Entity 的 entity_id
Args: Args:
entities: PermEntity 列表 entities: PermEntity 列表
@ -127,11 +130,15 @@ class PermRepo:
字典,键为 PermEntity值为对应的 ID 字典,键为 PermEntity值为对应的 ID
""" """
for entity in entities: # for entity in entities:
await self.conn.execute( # await self.conn.execute(
s("create_entity.sql"), # s("create_entity.sql"),
(entity.platform, entity.entity_type, entity.external_id), # (entity.platform, entity.entity_type, entity.external_id),
) # )
await self.conn.executemany(
s("create_entity.sql"),
[(e.platform, e.entity_type, e.external_id) for e in entities],
)
await self.conn.commit() await self.conn.commit()
val_placeholders = ", ".join(["(?, ?, ?)"] * len(entities)) val_placeholders = ", ".join(["(?, ?, ?)"] * len(entities))
params = [] params = []
@ -178,3 +185,58 @@ class PermRepo:
rows = await cursor.fetchall() rows = await cursor.fetchall()
return {(entity_ids[row[0]], row[1]): bool(row[2]) for row in rows} return {(entity_ids[row[0]], row[1]): bool(row[2]) for row in rows}
async def list_perm_info_batch(
self, entities: list[PermEntity], pager: PagerQuery
) -> PagerResult[tuple[PermEntity, str, bool]]:
"""批量获取某个实体的权限信息
Args:
entities: PermEntity 列表
pager: PagerQuery 对象,即分页要求
Returns:
字典,键是 PermEntity值是权限条目和布尔的元组过滤掉所有空值
"""
entity_to_id = await self.get_entity_id_batch(entities)
id_to_entity = {v: k for k, v in entity_to_id.items()}
ordered_ids = [entity_to_id[e] for e in entities if e in entity_to_id]
placeholders = ", ".join("?" * len(ordered_ids))
order_by_cases = " ".join([f"WHEN ? THEN {i}" for i in range(len(ordered_ids))])
pagecount_sql = f"SELECT COUNT(*) FROM perm_info WHERE entity_id IN ({placeholders}) AND value IS NOT NULL;"
count_cursor = await self.conn.execute(pagecount_sql, tuple(ordered_ids))
total_count = (await count_cursor.fetchone() or (0,))[0]
sql = f"""
SELECT entity_id, config_key, value
FROM perm_info
WHERE entity_id IN ({placeholders})
AND value IS NOT NULL
ORDER BY
(CASE entity_id {order_by_cases} END) ASC,
config_key ASC
LIMIT ?
OFFSET ?;
"""
params = (
tuple(ordered_ids)
+ tuple(ordered_ids)
+ (
pager.page_size,
(pager.page_index - 1) * pager.page_size,
)
)
cursor = await self.conn.execute(sql, params)
rows = await cursor.fetchall()
# return {entity_ids[row[0]]: (row[1], bool(row[2])) for row in rows}
return PagerResult(
data=[(id_to_entity[row[0]], row[1], row[2]) for row in rows],
success=True,
message="",
page_count=math.ceil(total_count / pager.page_size),
query=pager,
)

View File

@ -0,0 +1,212 @@
# 指令介绍
`konaperm` - 用于查看和修改 Bot 内部权限系统记录的管理员指令
## 权限要求
只有拥有 `admin` 权限的主体才能使用本指令。
## 格式
```text
konaperm list <platform> <entity_type> <external_id> [page]
konaperm get <platform> <entity_type> <external_id> <perm>
konaperm set <platform> <entity_type> <external_id> <perm> <val>
```
## 子命令说明
### `list`
列出指定对象及其继承链上的显式权限记录,按分页输出。
参数:
- `platform` 平台名,如 `ob11`、`discord`、`sys`
- `entity_type` 对象类型,如 `user`、`group`、`global`
- `external_id` 平台侧对象 ID全局对象通常写 `global`
- `page` 页码,可省略,默认 `1`
### `get`
查询某个对象对指定权限的最终判断结果,并说明它是从哪一层继承来的。
参数:
- `platform`
- `entity_type`
- `external_id`
- `perm` 权限键,如 `admin`、`plugin.xxx.use`
### `set`
为指定对象写入显式权限。
参数:
- `platform`
- `entity_type`
- `external_id`
- `perm` 权限键
- `val` 设置值
`val` 支持以下写法:
- 允许:`y` `yes` `allow` `true` `t`
- 拒绝:`n` `no` `deny` `false` `f`
- 清除:`null` `none`
其中:
- 允许 表示显式授予该权限
- 拒绝 表示显式禁止该权限
- 清除 表示删除该层的显式设置,重新回退到继承链判断
## 对象格式
本指令操作的对象由三段组成:
```text
<platform>.<entity_type>.<external_id>
```
例如:
- `ob11.user.123456789`
- `ob11.group.987654321`
- `sys.global.global`
## 当前支持的 `PermEntity` 值
以下内容按当前实现整理,便于手工查询和设置权限。
### `sys`
- `sys.global.global`
这是系统总兜底对象。
### `ob11`
- `ob11.global.global`
- `ob11.group.<group_id>`
- `ob11.user.<user_id>`
常见场景:
- 给整个 OneBot V11 平台统一授权:`ob11.global.global`
- 给某个 QQ 群授权:`ob11.group.群号`
- 给某个 QQ 用户授权:`ob11.user.QQ号`
### `discord`
- `discord.global.global`
- `discord.guild.<guild_id>`
- `discord.channel.<channel_id>`
- `discord.user.<user_id>`
常见场景:
- 给整个 Discord 平台统一授权:`discord.global.global`
- 给某个服务器授权:`discord.guild.服务器ID`
- 给某个频道授权:`discord.channel.频道ID`
- 给某个用户授权:`discord.user.用户ID`
### `minecraft`
- `minecraft.global.global`
- `minecraft.server.<server_name>`
- `minecraft.player.<player_uuid_hex>`
常见场景:
- 给整个 Minecraft 平台统一授权:`minecraft.global.global`
- 给某个服务器授权:`minecraft.server.服务器名`
- 给某个玩家授权:`minecraft.player.玩家UUID的hex`
### `console`
- `console.global.global`
- `console.channel.<channel_id>`
- `console.user.<user_id>`
### 快速参考
```text
sys.global.global
ob11.global.global
ob11.group.<group_id>
ob11.user.<user_id>
discord.global.global
discord.guild.<guild_id>
discord.channel.<channel_id>
discord.user.<user_id>
minecraft.global.global
minecraft.server.<server_name>
minecraft.player.<player_uuid_hex>
console.global.global
console.channel.<channel_id>
console.user.<user_id>
```
## 权限继承
权限不是只看当前对象,还会按继承链回退。
例如对 `ob11.user.123456` 查询时,通常会从更具体的对象一路回退到:
1. 当前用户
2. 平台全局对象
3. 系统全局对象
权限键本身也支持逐级回退。比如查询 `plugin.demo.use` 时,可能依次命中:
1. `plugin.demo.use`
2. `plugin.demo`
3. `plugin`
4. `*`
所以 `get` 返回的结果可能来自更宽泛的权限键,或更上层的继承对象。
## 示例
```text
konaperm list ob11 user 123456
```
查看 `ob11.user.123456` 及其继承链上的权限记录第一页。
```text
konaperm get ob11 user 123456 admin
```
查看该用户最终是否拥有 `admin` 权限,以及命中来源。
```text
konaperm set ob11 user 123456 admin allow
```
显式授予该用户 `admin` 权限。
```text
konaperm set ob11 user 123456 admin deny
```
显式拒绝该用户 `admin` 权限。
```text
konaperm set ob11 user 123456 admin none
```
删除该用户这一层对 `admin` 的显式设置,恢复继承判断。
## 注意事项
- 这是系统级管理指令,误操作可能直接影响其他插件的权限控制。
- `list` 只列出显式记录;没有显示出来不代表最终一定无权限,可能是从上层继承。
- `get` 显示的是最终命中的结果,比 `list` 更适合排查“为什么有/没有某个权限”。
- 对 `admin` 或 `*` 这类高影响权限做修改前,建议先确认对象是否写对。

View File

@ -0,0 +1,112 @@
from typing import Annotated
from nonebot.adapters import Event
from nonebot.params import Depends
from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, on_alconna
from konabot.common.pager import PagerQuery
from konabot.common.permsys import DepPermManager, require_permission
from konabot.common.permsys.entity import PermEntity, get_entity_chain_of_entity
cmd = on_alconna(
Alconna(
"konaperm",
Subcommand(
"list",
Args["platform", str],
Args["entity_type", str],
Args["external_id", str],
Args["page?", int],
),
Subcommand(
"get",
Args["platform", str],
Args["entity_type", str],
Args["external_id", str],
Args["perm", str],
),
Subcommand(
"set",
Args["platform", str],
Args["entity_type", str],
Args["external_id", str],
Args["perm", str],
Args["val", str],
),
),
rule=require_permission("admin"),
)
async def _get_perm_entity_chain(platform: str, entity_type: str, external_id: str):
return get_entity_chain_of_entity(PermEntity(platform, entity_type, external_id))
_DepEntityChain = Annotated[list[PermEntity], Depends(_get_perm_entity_chain)]
def make_formatter(parent: PermEntity):
def _formatter(d: tuple[PermEntity, str, bool]):
permmark = {True: "[✅ ALLOW] ", False: "[❌ DENY] "}[d[2]]
inheritmark = ""
if parent != d[0]:
inheritmark = (
f"[继承自 {d[0].platform}.{d[0].entity_type}.{d[0].external_id}] "
)
return f"{permmark}{inheritmark}{d[1]}"
return _formatter
@cmd.assign("list")
async def list_permission(
pm: DepPermManager,
ec: _DepEntityChain,
event: Event,
page: int = 1,
):
pq = PagerQuery(page, 10)
data = await pm.list_permission(ec, pq)
msg = data.to_unimessage(make_formatter(ec[0]))
await msg.send(event)
@cmd.assign("get")
async def get_permission(
pm: DepPermManager,
ec: _DepEntityChain,
perm: str,
event: Event,
):
data = await pm.check_has_permission_info(ec, perm)
obj_s = f"{ec[0].platform}.{ec[0].entity_type}.{ec[0].external_id}"
if data is None:
await UniMessage.text(f"对象 {obj_s}{perm} 权限记录").send(event)
return
pe, k, p = data
inheritmark = ""
if ec[0] != pe or k != perm:
inheritmark = (
f"继承自 {pe.platform}.{pe.entity_type}.{pe.external_id}{k} 的设置,"
)
await UniMessage.text(f"{inheritmark}对象 {obj_s}{perm} 的权限为 {p}").send(
event
)
@cmd.assign("set")
async def set_permission(
pm: DepPermManager,
ec: _DepEntityChain,
perm: str,
val: str,
event: Event,
):
if any(i == val.lower() for i in ("y", "yes", "allow", "true", "t")):
await pm.update_permission(ec[0], perm, True)
elif any(i == val.lower() for i in ("n", "no", "deny", "false", "f")):
await pm.update_permission(ec[0], perm, False)
elif any(i == val.lower() for i in ("null", "none")):
await pm.update_permission(ec[0], perm, None)
await get_permission(pm, ec, perm, event)