Files
konabot/konabot/plugins/kona_ph/__init__.py

136 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import re
from math import ceil
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
on_alconna)
from nonebot_plugin_apscheduler import scheduler
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
create_admin_commands,
puzzle_manager)
create_admin_commands()
async def is_play_group(target: DepLongTaskTarget):
if target.is_private_chat:
return True
if target.channel_id in config.plugin_puzzle_playgroup:
return True
return False
cmd_submit = on_message(rule=is_play_group)
@cmd_submit.handle()
async def _(msg: UniMsg, target: DepLongTaskTarget):
txt = msg.extract_plain_text().strip()
if match := re.match(r"^提交(?:答案|题解|[fF]lag)\s*(?P<submission>.+?)\s*$", txt):
submission: str = match.group("submission")
async with puzzle_manager() as manager:
result = manager.submit(target.target_id, submission)
if isinstance(result, str):
await target.send_message(result)
else:
await target.send_message(get_submission_message(
daily_puzzle_info=result.info,
submission=result.submission,
puzzle=result.puzzle,
))
cmd_query = on_alconna(Alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
), rule=is_play_group)
@cmd_query.handle()
async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager:
p = manager.get_today_puzzle()
if p is None:
return await target.send_message("今天无题,改日再来吧!")
await target.send_message(get_puzzle_description(p))
cmd_query_submission = on_alconna(Alconna(
"今日答题情况"
), rule=is_play_group)
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
async with puzzle_manager() as manager:
await target.send_message(get_daily_report_v2(manager, gid))
cmd_history = on_alconna(Alconna(
"历史题目",
Args["page?", int],
Args["index_id?", str],
), rule=is_play_group)
@cmd_history.handle()
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
async with puzzle_manager() as manager:
today = get_today_date()
if index_id:
index_id = index_id.removeprefix("#")
if index_id not in manager.daily_puzzle:
return await target.send_message("没有这道题哦")
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
msg = get_puzzle_description(
puzzle,
with_answer=(index_id != manager.daily_puzzle_of_date.get(today, "")),
)
return await target.send_message(msg)
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
puzzles = [
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
for d, i in manager.daily_puzzle_of_date.items()
]
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
if page <= 0 or page > count_pages:
return await target.send_message(UniMessage.text(
f"页数只有 1 ~ {count_pages} 啦!"
))
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
for p, d in puzzles:
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
msg = msg.text(
f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]"
f" {p.title} ({d})"
)
msg = msg.text("\n")
msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
await target.send_message(msg)
@scheduler.scheduled_job("cron", hour="8")
async def _():
async with puzzle_manager() as manager:
yesterday = get_today_date() - datetime.timedelta(days=1)
msg2 = get_daily_report(manager, yesterday)
if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
puzzle = manager.get_today_puzzle()
if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
else:
logger.info("自动任务:没有找到题目,跳过")