forked from mttu-developers/konabot
63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
from typing import Iterable
|
|
import nonebot
|
|
from nonebot.adapters import Event
|
|
|
|
from konabot.common.database import DatabaseManager
|
|
from konabot.common.path import DATA_PATH
|
|
from konabot.common.permsys.entity import PermEntity, get_entity_chain
|
|
from konabot.common.permsys.migrates import execute_migration
|
|
from konabot.common.permsys.repo import PermRepo
|
|
|
|
|
|
driver = nonebot.get_driver()
|
|
|
|
db = DatabaseManager(DATA_PATH / "perm.sqlite3")
|
|
|
|
|
|
class PermManager:
|
|
def __init__(self, db: DatabaseManager) -> None:
|
|
self.db = db
|
|
|
|
async def check_has_permission(
|
|
self, entities: Event | PermEntity | list[PermEntity], key: str
|
|
) -> bool:
|
|
if isinstance(entities, Event):
|
|
entities = await get_entity_chain(entities)
|
|
if isinstance(entities, PermEntity):
|
|
entities = [entities]
|
|
|
|
key_split = key.split(".")
|
|
keys = [".".join(key_split[: i + 1]) for i in range(len(key_split))][::-1]
|
|
|
|
async with self.db.get_conn() as conn:
|
|
repo = PermRepo(conn)
|
|
# for entity in entities:
|
|
# for k in keys:
|
|
# perm = await repo.get_perm_info(entity, k)
|
|
# if perm is not None:
|
|
# return perm
|
|
data = await repo.get_perm_info_batch(entities, keys)
|
|
for entity in entities:
|
|
for k in keys:
|
|
p = data.get((entity, k))
|
|
if p is not None:
|
|
return p
|
|
return False
|
|
|
|
|
|
def perm_manager(_db: DatabaseManager | None = None) -> PermManager:
|
|
if _db is None:
|
|
_db = db
|
|
return PermManager(_db)
|
|
|
|
|
|
@driver.on_startup
|
|
async def _():
|
|
async with db.get_conn() as conn:
|
|
await execute_migration(conn)
|
|
|
|
|
|
@driver.on_shutdown
|
|
async def _():
|
|
await db.close_all_connections()
|