Files
konabot/konabot/common/subscribe/service.py
passthem 392c699b33
All checks were successful
continuous-integration/drone/push Build is passing
移动 poster 模块到 common
2026-03-09 14:40:27 +08:00

64 lines
2.2 KiB
Python

from contextlib import asynccontextmanager
from typing import Annotated, Any
from nonebot.params import Depends
from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
from .subscribe_info import POSTER_INFO_DATA
from .repo_local_data import local_poster
from .repository import IPosterRepo
class PosterService:
def __init__(self, repo: IPosterRepo) -> None:
self.repo = repo
def parse_channel_id(self, channel: str):
for cid, cinfo in POSTER_INFO_DATA.items():
if channel in cinfo.aliases:
return cid
return channel
async def subscribe(self, channel: str, target: LongTaskTarget) -> bool:
channel = self.parse_channel_id(channel)
return await self.repo.add_channel_target(channel, target)
async def unsubscribe(self, channel: str, target: LongTaskTarget) -> bool:
channel = self.parse_channel_id(channel)
return await self.repo.remove_channel_target(channel, target)
async def broadcast(
self, channel: str, message: UniMessage[Any] | str
) -> list[LongTaskTarget]:
channel = self.parse_channel_id(channel)
targets = await self.repo.get_channel_targets(channel)
for target in targets:
# 因为是订阅消息,就不要 At 对方了
await target.send_message(message, at=False)
return targets
async def get_channels(
self, target: LongTaskTarget, pager: PagerQuery
) -> PagerResult[str]:
return await self.repo.get_subscribed_channels(target, pager)
async def fix_data(self):
for cid, cinfo in POSTER_INFO_DATA.items():
for alias in cinfo.aliases:
await self.repo.merge_channel(alias, cid)
@asynccontextmanager
async def dep_poster_service():
async with local_poster() as repo:
yield PosterService(repo)
async def broadcast(channel: str, message: UniMessage[Any] | str):
async with dep_poster_service() as service:
return await service.broadcast(channel, message)
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]