diff --git a/konabot/common/longtask.py b/konabot/common/longtask.py index 5f37391..72e42b7 100644 --- a/konabot/common/longtask.py +++ b/konabot/common/longtask.py @@ -51,6 +51,10 @@ class LongTaskTarget(BaseModel): target_id: str "沟通对象的 ID" + @property + def is_private_chat(self): + return self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX) + async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool: try: bot = nonebot.get_bot(self.self_id) diff --git a/konabot/plugins/kona_ph/__init__.py b/konabot/plugins/kona_ph/__init__.py index f0e9f78..f3e4a9f 100644 --- a/konabot/plugins/kona_ph/__init__.py +++ b/konabot/plugins/kona_ph/__init__.py @@ -13,9 +13,9 @@ create_admin_commands() async def is_play_group(target: DepLongTaskTarget): - if target.channel_id in config.plugin_puzzle_playgroup: + if target.is_private_chat: return True - if target.target_id in target.channel_id: + if target.channel_id in config.plugin_puzzle_playgroup: return True return False @@ -33,7 +33,7 @@ async def _(flag: str, target: DepLongTaskTarget): cmd_query = on_alconna(Alconna( - r"re:(?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?" + r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)" ), rule=is_play_group) @cmd_query.handle() diff --git a/konabot/plugins/kona_ph/core/storage.py b/konabot/plugins/kona_ph/core/storage.py index e4d23a3..467e546 100644 --- a/konabot/plugins/kona_ph/core/storage.py +++ b/konabot/plugins/kona_ph/core/storage.py @@ -37,9 +37,6 @@ class Puzzle(BaseModel): flag: str ready: bool = False - published: bool = False - pinned: bool = False - created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) def get_image_path(self) -> Path: @@ -111,68 +108,81 @@ class PuzzleManager(BaseModel): daily_puzzle_of_date: dict[datetime.date, str] = {} puzzle_pinned: str = "" - unpublished_puzzles: set[str] = set() - unready_puzzles: set[str] = set() - published_puzzles: set[str] = set() index_id_counter: int = 1 submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {} - last_pubish_date: datetime.date = Field( - default_factory=lambda: get_today_date() - datetime.timedelta(days=1) - ) last_checked_date: datetime.date = Field( default_factory=lambda: get_today_date() - datetime.timedelta(days=1) ) + @property + def last_publish_date(self): + return max(self.daily_puzzle_of_date.keys()) + + @property + def unpublished_puzzles(self): + return set(( + p.raw_id for p in self.puzzle_data.values() + if not self.is_puzzle_published(p.raw_id) and p.ready + )) + + @property + def unready_puzzles(self): + return set(( + p.raw_id for p in self.puzzle_data.values() + if not self.is_puzzle_published(p.raw_id) and not p.ready + )) + + @property + def published_puzzles(self): + return set(( + p.raw_id for p in self.puzzle_data.values() + if self.is_puzzle_published(p.raw_id) + )) + + def is_puzzle_published(self, raw_id: str): + return raw_id in [i.raw_id for i in self.daily_puzzle.values()] + def publish_puzzle(self, raw_id: str): assert raw_id in self.puzzle_data - self.unpublished_puzzles -= set(raw_id) - self.unready_puzzles -= set(raw_id) + today = get_today_date() + p = self.puzzle_data[raw_id] p.index_id = str(self.index_id_counter) p.ready = True - p.published = True - p.pinned = False self.puzzle_pinned = "" - self.last_pubish_date = get_today_date() - self.last_checked_date = self.last_pubish_date + self.last_checked_date = today self.daily_puzzle[p.index_id] = DailyPuzzleInfo( raw_id=raw_id, - time=self.last_pubish_date, + time=today, ) - self.daily_puzzle_of_date[self.last_pubish_date] = p.index_id - self.published_puzzles.add(raw_id) + self.daily_puzzle_of_date[today] = p.index_id self.index_id_counter += 1 - def admin_mark_ready(self, raw_id: str, ready: bool = True): - if raw_id not in self.puzzle_data: - return - if ready: - self.unready_puzzles -= set(raw_id) - if raw_id not in self.published_puzzles: - self.unpublished_puzzles.add(raw_id) - p = self.puzzle_data[raw_id] - p.ready = True - p.published = raw_id in self.published_puzzles - else: - self.unready_puzzles.add(raw_id) - self.unpublished_puzzles -= set(raw_id) - p = self.puzzle_data[raw_id] - p.ready = False - p.published = False - # if p.raw_id == self.puzzle_pinned: - # self.puzzle_pinned = "" + def fix(self): + # 尝试修复今日的数据 + for p in self.puzzle_data.values(): + if self.is_puzzle_published(p.raw_id): + p.ready = True + + if self.puzzle_pinned not in self.unpublished_puzzles: + self.puzzle_pinned = "" + + # 撤回重复发布的题 + already_published: set[str] = set() + for date in list(self.daily_puzzle_of_date.keys()): + index_id = self.daily_puzzle_of_date[date] + info = self.daily_puzzle[index_id] + if info.raw_id in already_published: + del self.daily_puzzle[index_id] + del self.daily_puzzle_of_date[date] + else: + already_published.add(info.raw_id) def admin_pin_puzzle(self, raw_id: str): - if self.puzzle_pinned: - p = self.puzzle_data.get(self.puzzle_pinned) - if p is not None: - p.pinned = False if raw_id in self.puzzle_data: - p = self.puzzle_data[raw_id] - p.pinned = True self.puzzle_pinned = raw_id else: self.puzzle_pinned = "" @@ -252,9 +262,7 @@ class PuzzleManager(BaseModel): author_id=user, flag="konaph{this_is_a_flag}", ready=False, - published=False, ) - self.unready_puzzles.add(p.raw_id) self.puzzle_data[p.raw_id] = p return p diff --git a/konabot/plugins/kona_ph/manager.py b/konabot/plugins/kona_ph/manager.py index 9de32be..a6ba642 100644 --- a/konabot/plugins/kona_ph/manager.py +++ b/konabot/plugins/kona_ph/manager.py @@ -8,7 +8,7 @@ from pydantic import BaseModel from konabot.common.longtask import DepLongTaskTarget from konabot.common.nb.extract_image import download_image_bytes from konabot.common.nb.qq_broadcast import qq_broadcast -from konabot.plugins.kona_ph.core.storage import Puzzle, get_today_date, puzzle_manager +from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager PUZZLE_PAGE_SIZE = 10 @@ -30,12 +30,12 @@ def is_puzzle_admin(target: DepLongTaskTarget): return target.target_id in config.plugin_puzzle_admin -def get_puzzle_info_message(puzzle: Puzzle) -> UniMessage[Any]: - status = "✅ 已准备,待发布" if puzzle.ready and not puzzle.published else \ - (f"🟢 已发布: #{puzzle.index_id}" if puzzle.published else "⚙️ 未准备") +def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]: + status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \ + (f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备") status_suffix = "" - if puzzle.pinned: + if puzzle.raw_id == manager.puzzle_pinned: status_suffix += " | 📌 已被管理员置顶" msg = UniMessage.text( @@ -130,7 +130,7 @@ def create_admin_commands(): return await target.send_message(UniMessage.text( "题目早就准备好啦!" )) - manager.admin_mark_ready(raw_id, True) + p.ready = True await target.send_message(UniMessage.text( f"谜题「{p.title}」已经准备就绪!" )) @@ -151,12 +151,12 @@ def create_admin_commands(): return await target.send_message(UniMessage.text( f"谜题「{p.title}」已经是未取消状态了!" )) - if p.published: + if manager.is_puzzle_published(p.raw_id): return await target.send_message(UniMessage.text( "已发布的谜题不能取消准备状态!" )) - manager.admin_mark_ready(raw_id, False) + p.ready = False await target.send_message(UniMessage.text( f"谜题「{p.title}」已经取消准备!" )) @@ -174,7 +174,7 @@ def create_admin_commands(): "这不是你的题,你没有权限查看详细信息!" )) - await target.send_message(get_puzzle_info_message(p)) + await target.send_message(get_puzzle_info_message(manager, p)) @cmd_admin.assign("my") async def _(target: DepLongTaskTarget, page: int = 1): @@ -193,9 +193,9 @@ def create_admin_commands(): message = UniMessage.text("==== 我的谜题 ====\n\n") for p in puzzles: message = message.text("- ") - if p.pinned: + if manager.puzzle_pinned == p.raw_id: message = message.text("[📌]") - if p.published: + if manager.is_puzzle_published(p.raw_id): message = message.text(f"[#{p.index_id}] ") elif p.ready: message = message.text("[✅] ") @@ -224,9 +224,9 @@ def create_admin_commands(): message = UniMessage.text("==== 所有谜题 ====\n\n") for p in puzzles: message = message.text("- ") - if p.pinned: + if p.raw_id == manager.puzzle_pinned: message = message.text("[📌]") - if p.published: + if manager.is_puzzle_published(p.raw_id): message = message.text(f"[#{p.index_id}] ") elif p.ready: message = message.text("[✅] ") @@ -307,7 +307,7 @@ def create_admin_commands(): elif remove_image.available: p.remove_image() - info2 = get_puzzle_info_message(p) + info2 = get_puzzle_info_message(manager, p) return await target.send_message("修改好啦!看看效果:\n\n" + info2)