102 lines
3.8 KiB
Python
102 lines
3.8 KiB
Python
from math import ceil
|
||
from loguru import logger
|
||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||
from konabot.plugins.kona_ph.core.storage import get_today_date
|
||
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
|
||
from konabot.common.longtask import DepLongTaskTarget
|
||
|
||
from nonebot_plugin_apscheduler import scheduler
|
||
|
||
|
||
create_admin_commands()
|
||
|
||
|
||
async def is_play_group(target: DepLongTaskTarget):
|
||
if target.channel_id in config.plugin_puzzle_playgroup:
|
||
return True
|
||
if target.target_id in target.channel_id:
|
||
return True
|
||
return False
|
||
|
||
|
||
cmd_submit = on_alconna(Alconna(
|
||
"提交答案",
|
||
Args["flag", str],
|
||
), rule=is_play_group)
|
||
|
||
@cmd_submit.handle()
|
||
async def _(flag: str, target: DepLongTaskTarget):
|
||
async with puzzle_manager() as manager:
|
||
result = manager.submit(target.target_id, flag)
|
||
await target.send_message(result.get_unimessage())
|
||
|
||
|
||
cmd_query = on_alconna(Alconna(
|
||
r"re:(?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?"
|
||
), rule=is_play_group)
|
||
|
||
@cmd_query.handle()
|
||
async def _(target: DepLongTaskTarget):
|
||
async with puzzle_manager() as manager:
|
||
p = manager.get_today_puzzle()
|
||
if p is None:
|
||
return await target.send_message("今天无题,改日再来吧!")
|
||
await target.send_message(p.get_unimessage())
|
||
|
||
|
||
cmd_history = on_alconna(Alconna(
|
||
"历史题目",
|
||
Args["page?", int],
|
||
Args["index_id?", str],
|
||
), rule=is_play_group)
|
||
|
||
@cmd_history.handle()
|
||
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||
async with puzzle_manager() as manager:
|
||
today = get_today_date()
|
||
if index_id:
|
||
index_id = index_id.removeprefix("#")
|
||
if index_id == manager.daily_puzzle_of_date.get(today, ""):
|
||
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
|
||
return await target.send_message(puzzle.get_unimessage())
|
||
if index_id in manager.daily_puzzle:
|
||
puzzle = manager.puzzle_data[manager.daily_puzzle[index_id].raw_id]
|
||
msg = puzzle.get_unimessage()
|
||
msg = msg.text(f"\n\n------\n\n题解:{puzzle.flag}")
|
||
return await target.send_message(msg)
|
||
return await target.send_message("没有这道题哦")
|
||
msg = UniMessage.text("====== 历史题目清单 ======\n\n")
|
||
puzzles = [
|
||
(manager.puzzle_data[manager.daily_puzzle[i].raw_id], d)
|
||
for d, i in manager.daily_puzzle_of_date.items()
|
||
]
|
||
puzzles = sorted(puzzles, key=lambda u: u[1], reverse=True)
|
||
count_pages = ceil(len(puzzles) / PUZZLE_PAGE_SIZE)
|
||
if page <= 0 or page > count_pages:
|
||
return await target.send_message(UniMessage.text(
|
||
f"页数只有 1 ~ {count_pages} 啦!"
|
||
))
|
||
puzzles = puzzles[(page - 1) * PUZZLE_PAGE_SIZE: page * PUZZLE_PAGE_SIZE]
|
||
for p, d in puzzles:
|
||
info = manager.daily_puzzle[manager.daily_puzzle_of_date[d]]
|
||
msg = msg.text(
|
||
f"- [#{p.index_id}: {len(info.success_users)}/{len(info.tried_users)}]"
|
||
f" {p.title} ({d})"
|
||
)
|
||
msg = msg.text("\n")
|
||
msg = msg.text(f"\n==== 第 {page} 页,共 {count_pages} 页 ====")
|
||
await target.send_message(msg)
|
||
|
||
|
||
@scheduler.scheduled_job("cron", hour="8")
|
||
async def _():
|
||
async with puzzle_manager() as manager:
|
||
puzzle = manager.get_today_puzzle()
|
||
if puzzle is not None:
|
||
logger.info(f"找到了题目 {puzzle.raw_id},发送")
|
||
await qq_broadcast(config.plugin_puzzle_playgroup, puzzle.get_unimessage())
|
||
else:
|
||
logger.info("自动任务:没有找到题目,跳过")
|
||
|