import datetime import re from math import ceil from loguru import logger from nonebot import on_message from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg, on_alconna) from nonebot_plugin_apscheduler import scheduler from konabot.common.longtask import DepLongTaskTarget from konabot.common.nb.qq_broadcast import qq_broadcast from konabot.plugins.kona_ph.core.message import (get_daily_report, get_daily_report_v2, get_puzzle_description, get_submission_message) from konabot.plugins.kona_ph.core.storage import get_today_date from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config, create_admin_commands, puzzle_manager) create_admin_commands() async def is_play_group(target: DepLongTaskTarget): if target.is_private_chat: return True if target.channel_id in config.plugin_puzzle_playgroup: return True return False cmd_submit = on_message(rule=is_play_group) @cmd_submit.handle() async def _(msg: UniMsg, target: DepLongTaskTarget): txt = msg.extract_plain_text().strip() if match := re.match(r"^提交(?:答案|题解|[fF]lag)\s*(?P.+?)\s*$", txt): submission: str = match.group("submission") async with puzzle_manager() as manager: result = manager.submit(target.target_id, submission) if isinstance(result, str): await target.send_message(result) else: await target.send_message(get_submission_message( daily_puzzle_info=result.info, submission=result.submission, puzzle=result.puzzle, )) cmd_query = on_alconna(Alconna( r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)" ), rule=is_play_group) @cmd_query.handle() async def _(target: DepLongTaskTarget): async with puzzle_manager() as manager: p = manager.get_today_puzzle() if p is None: return await target.send_message("今天无题,改日再来吧!") await target.send_message(get_puzzle_description(p)) cmd_query_submission = on_alconna(Alconna( "今日答题情况" ), rule=is_play_group) @cmd_query_submission.handle() async def _(target: DepLongTaskTarget): gid = None if re.match(r"^\d+$", target.channel_id): gid = int(target.channel_id) async with puzzle_manager() as manager: await target.send_message(get_daily_report_v2(manager, gid)) cmd_history = on_alconna(Alconna( "历史题目", Args["page?", int], Args["index_id?", str], ), rule=is_play_group) @cmd_history.handle() async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1): async with puzzle_manager() as manager: today = get_today_date() if index_id: index_id = index_id.removeprefix("#") if index_id not in manager.daily_puzzle: return await target.send_message("没有这道题哦") puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id] msg = get_puzzle_description( puzzle, with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")), ) return await target.send_message(msg) msg = UniMessage.text("====== 历史题目清单 ======\n\n") puzzles = [ (manager.puzzle_data[manager.daily_puzzle[i].raw_id], d) for d, i in manager.daily_puzzle_of_date.items() ] puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True) count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE) if page <= 0 or page > count_pages: return await target.send_message(UniMessage.text( f"页数只有 1 ~ {count_pages} 啦!" )) puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE] for p, d in puzzles: info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]] msg = msg.text( f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]" f" {p.title} ({d})" ) msg = msg.text("\n") msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====") await target.send_message(msg) @scheduler.scheduled_job("cron", hour="8") async def _(): async with puzzle_manager() as manager: yesterday = get_today_date() - datetime.timedelta(days=1) msg2 = get_daily_report(manager, yesterday) if msg2 is not None: await qq_broadcast(config.plugin_puzzle_playgroup, msg2) puzzle = manager.get_today_puzzle() if puzzle is not None: logger.info(f"找到了题目 {puzzle.raw_id},发送") await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle)) else: logger.info("自动任务:没有找到题目,跳过")