Compare commits

..

3 Commits

Author SHA1 Message Date
0acffea86d 添加排行榜,优化 UX
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 12:45:29 +08:00
3e395f8a35 更少的量,更解耦的数据,更健壮的系统
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 11:56:03 +08:00
312e203bbe 忘记把这个答题情况通知加上了
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-26 04:07:26 +08:00
6 changed files with 200 additions and 64 deletions

View File

@ -51,6 +51,10 @@ class LongTaskTarget(BaseModel):
target_id: str
"沟通对象的 ID"
@property
def is_private_chat(self):
return self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX)
async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool:
try:
bot = nonebot.get_bot(self.self_id)

View File

@ -0,0 +1,54 @@
import re
import nonebot
from nonebot.adapters.onebot.v11 import Bot as OBBot
class UsernameManager:
grouped_data: dict[int, dict[int, str]]
individual_data: dict[int, str]
def __init__(self) -> None:
self.grouped_data = {}
self.individual_data = {}
async def update(self):
for bot in nonebot.get_bots().values():
if isinstance(bot, OBBot):
for user in await bot.get_friend_list():
uid = user["user_id"]
nickname = user["nickname"]
self.individual_data[uid] = nickname
for group in await bot.get_group_list():
gid = group["group_id"]
for member in await bot.get_group_member_list(group_id=gid):
uid = member["user_id"]
card = member.get("card", "")
nickname = member.get("nickname", "")
if card:
self.grouped_data.setdefault(gid, {})[uid] = card
if nickname:
self.individual_data[uid] = nickname
def get(self, qqid: int, groupid: int | None = None) -> str:
if groupid is not None and groupid in self.grouped_data:
n = self.grouped_data[groupid].get(qqid)
if n is not None:
return n
if qqid in self.individual_data:
return self.individual_data[qqid]
return str(qqid)
manager = UsernameManager()
def get_username(qqid: int | str, group: int | str | None = None):
if isinstance(group, str):
group = None if not re.match(r"^\d+$", group) else int(group)
if isinstance(qqid, str):
if re.match(r"^\d+$", qqid):
qqid = int(qqid)
else:
return qqid
return manager.get(qqid, group)

View File

@ -1,7 +1,10 @@
from functools import reduce
from math import ceil
import re
from loguru import logger
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
from konabot.common.longtask import DepLongTaskTarget
@ -13,9 +16,9 @@ create_admin_commands()
async def is_play_group(target: DepLongTaskTarget):
if target.channel_id in config.plugin_puzzle_playgroup:
if target.is_private_chat:
return True
if target.target_id in target.channel_id:
if target.channel_id in config.plugin_puzzle_playgroup:
return True
return False
@ -33,7 +36,7 @@ async def _(flag: str, target: DepLongTaskTarget):
cmd_query = on_alconna(Alconna(
r"re:(?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?"
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
), rule=is_play_group)
@cmd_query.handle()
@ -45,6 +48,52 @@ async def _(target: DepLongTaskTarget):
await target.send_message(p.get_unimessage())
cmd_query_submission = on_alconna(Alconna(
"今日答题情况"
), rule=is_play_group)
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
async with puzzle_manager() as manager:
p = manager.get_today_puzzle()
if p is None:
return await target.send_message("今天无题")
msg = UniMessage.text("==== 今日答题情况 ====\n\n")
subcount = len(reduce(
lambda x, y: x + y,
manager.submissions.get(p.raw_id, {}).values(),
[],
))
info = manager.daily_puzzle[p.index_id]
msg = msg.text(
f"总体情况:答对 {len(info.success_users)} / "
f"参与 {len(info.tried_users)} / "
f"提交 {subcount}\n"
)
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
gid = None
if re.match(r"^\d+$", target.channel_id):
gid = int(target.channel_id)
for u, d in success_users:
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
t = d.strftime("%H:%M")
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [Solved at {t} in {tries} times]")
for u in info.tried_users - set(info.success_users.keys()):
uname = u
if re.match(r"^\d+$", u):
uname = get_username(int(u), gid)
tries = len(manager.submissions[p.raw_id][u])
msg = msg.text(f"\n- {uname} [Unsolved in {tries} times]")
await target.send_message(msg)
cmd_history = on_alconna(Alconna(
"历史题目",
Args["page?", int],
@ -92,6 +141,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@scheduler.scheduled_job("cron", hour="8")
async def _():
async with puzzle_manager() as manager:
msg2 = manager.get_report_yesterday()
if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
puzzle = manager.get_today_puzzle()
if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送")
@ -99,3 +152,4 @@ async def _():
else:
logger.info("自动任务:没有找到题目,跳过")

View File

@ -12,6 +12,7 @@ from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, Field, ValidationError
from konabot.common.path import DATA_PATH
from konabot.common.username import get_username
KONAPH_BASE = DATA_PATH / "KonaPH"
@ -37,9 +38,6 @@ class Puzzle(BaseModel):
flag: str
ready: bool = False
published: bool = False
pinned: bool = False
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
def get_image_path(self) -> Path:
@ -52,7 +50,7 @@ class Puzzle(BaseModel):
if self.img_name:
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes())
result = result.text("\n\n出题者:").at(self.author_id)
result = result.text(f"\n\n出题者:{get_username(self.author_id)}")
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
return result
@ -111,68 +109,81 @@ class PuzzleManager(BaseModel):
daily_puzzle_of_date: dict[datetime.date, str] = {}
puzzle_pinned: str = ""
unpublished_puzzles: set[str] = set()
unready_puzzles: set[str] = set()
published_puzzles: set[str] = set()
index_id_counter: int = 1
submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {}
last_pubish_date: datetime.date = Field(
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
)
last_checked_date: datetime.date = Field(
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
)
@property
def last_publish_date(self):
return max(self.daily_puzzle_of_date.keys())
@property
def unpublished_puzzles(self):
return set((
p.raw_id for p in self.puzzle_data.values()
if not self.is_puzzle_published(p.raw_id) and p.ready
))
@property
def unready_puzzles(self):
return set((
p.raw_id for p in self.puzzle_data.values()
if not self.is_puzzle_published(p.raw_id) and not p.ready
))
@property
def published_puzzles(self):
return set((
p.raw_id for p in self.puzzle_data.values()
if self.is_puzzle_published(p.raw_id)
))
def is_puzzle_published(self, raw_id: str):
return raw_id in [i.raw_id for i in self.daily_puzzle.values()]
def publish_puzzle(self, raw_id: str):
assert raw_id in self.puzzle_data
self.unpublished_puzzles -= set(raw_id)
self.unready_puzzles -= set(raw_id)
today = get_today_date()
p = self.puzzle_data[raw_id]
p.index_id = str(self.index_id_counter)
p.ready = True
p.published = True
p.pinned = False
self.puzzle_pinned = ""
self.last_pubish_date = get_today_date()
self.last_checked_date = self.last_pubish_date
self.last_checked_date = today
self.daily_puzzle[p.index_id] = DailyPuzzleInfo(
raw_id=raw_id,
time=self.last_pubish_date,
time=today,
)
self.daily_puzzle_of_date[self.last_pubish_date] = p.index_id
self.published_puzzles.add(raw_id)
self.daily_puzzle_of_date[today] = p.index_id
self.index_id_counter += 1
def admin_mark_ready(self, raw_id: str, ready: bool = True):
if raw_id not in self.puzzle_data:
return
if ready:
self.unready_puzzles -= set(raw_id)
if raw_id not in self.published_puzzles:
self.unpublished_puzzles.add(raw_id)
p = self.puzzle_data[raw_id]
p.ready = True
p.published = raw_id in self.published_puzzles
else:
self.unready_puzzles.add(raw_id)
self.unpublished_puzzles -= set(raw_id)
p = self.puzzle_data[raw_id]
p.ready = False
p.published = False
# if p.raw_id == self.puzzle_pinned:
# self.puzzle_pinned = ""
def fix(self):
# 尝试修复今日的数据
for p in self.puzzle_data.values():
if self.is_puzzle_published(p.raw_id):
p.ready = True
if self.puzzle_pinned not in self.unpublished_puzzles:
self.puzzle_pinned = ""
# 撤回重复发布的题
already_published: set[str] = set()
for date in list(self.daily_puzzle_of_date.keys()):
index_id = self.daily_puzzle_of_date[date]
info = self.daily_puzzle[index_id]
if info.raw_id in already_published:
del self.daily_puzzle[index_id]
del self.daily_puzzle_of_date[date]
else:
already_published.add(info.raw_id)
def admin_pin_puzzle(self, raw_id: str):
if self.puzzle_pinned:
p = self.puzzle_data.get(self.puzzle_pinned)
if p is not None:
p.pinned = False
if raw_id in self.puzzle_data:
p = self.puzzle_data[raw_id]
p.pinned = True
self.puzzle_pinned = raw_id
else:
self.puzzle_pinned = ""
@ -252,9 +263,7 @@ class PuzzleManager(BaseModel):
author_id=user,
flag="konaph{this_is_a_flag}",
ready=False,
published=False,
)
self.unready_puzzles.add(p.raw_id)
self.puzzle_data[p.raw_id] = p
return p
@ -286,9 +295,9 @@ class PuzzleManager(BaseModel):
us = us[:5]
for u, _ in us:
m = self.submissions[puzzle.raw_id][u][-1]
message = message.text("- ").at(u).text(f"{m.time.strftime('%H:%M')}")
message = message.text(f"- {get_username(u)}{m.time.strftime('%H:%M')}")
message = message.text("\n\n出题者:").at(puzzle.author_id)
message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
return message

View File

@ -8,7 +8,7 @@ from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.storage import Puzzle, get_today_date, puzzle_manager
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager
PUZZLE_PAGE_SIZE = 10
@ -30,12 +30,12 @@ def is_puzzle_admin(target: DepLongTaskTarget):
return target.target_id in config.plugin_puzzle_admin
def get_puzzle_info_message(puzzle: Puzzle) -> UniMessage[Any]:
status = "✅ 已准备,待发布" if puzzle.ready and not puzzle.published else \
(f"🟢 已发布: #{puzzle.index_id}" if puzzle.published else "⚙️ 未准备")
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
status_suffix = ""
if puzzle.pinned:
if puzzle.raw_id == manager.puzzle_pinned:
status_suffix += " | 📌 已被管理员置顶"
msg = UniMessage.text(
@ -130,7 +130,7 @@ def create_admin_commands():
return await target.send_message(UniMessage.text(
"题目早就准备好啦!"
))
manager.admin_mark_ready(raw_id, True)
p.ready = True
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经准备就绪!"
))
@ -151,12 +151,12 @@ def create_admin_commands():
return await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经是未取消状态了!"
))
if p.published:
if manager.is_puzzle_published(p.raw_id):
return await target.send_message(UniMessage.text(
"已发布的谜题不能取消准备状态!"
))
manager.admin_mark_ready(raw_id, False)
p.ready = False
await target.send_message(UniMessage.text(
f"谜题「{p.title}」已经取消准备!"
))
@ -174,7 +174,7 @@ def create_admin_commands():
"这不是你的题,你没有权限查看详细信息!"
))
await target.send_message(get_puzzle_info_message(p))
await target.send_message(get_puzzle_info_message(manager, p))
@cmd_admin.assign("my")
async def _(target: DepLongTaskTarget, page: int = 1):
@ -193,9 +193,9 @@ def create_admin_commands():
message = UniMessage.text("==== 我的谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
if p.pinned:
if manager.puzzle_pinned == p.raw_id:
message = message.text("[📌]")
if p.published:
if manager.is_puzzle_published(p.raw_id):
message = message.text(f"[#{p.index_id}] ")
elif p.ready:
message = message.text("[✅] ")
@ -224,9 +224,9 @@ def create_admin_commands():
message = UniMessage.text("==== 所有谜题 ====\n\n")
for p in puzzles:
message = message.text("- ")
if p.pinned:
if p.raw_id == manager.puzzle_pinned:
message = message.text("[📌]")
if p.published:
if manager.is_puzzle_published(p.raw_id):
message = message.text(f"[#{p.index_id}] ")
elif p.ready:
message = message.text("[✅] ")
@ -307,7 +307,7 @@ def create_admin_commands():
elif remove_image.available:
p.remove_image()
info2 = get_puzzle_info_message(p)
info2 = get_puzzle_info_message(manager, p)
return await target.send_message("修改好啦!看看效果:\n\n" + info2)

View File

@ -4,10 +4,13 @@ from typing import cast
from loguru import logger
from nonebot import get_bot, on_request
import nonebot
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from nonebot_plugin_apscheduler import scheduler
from konabot.common.nb.is_admin import cfg as adminConfig
from konabot.common.username import manager
add_request = on_request()
@ -23,3 +26,15 @@ async def _(req: FriendRequestEvent):
await req.approve(bot)
logger.info(f"已经自动同意 {req.user_id} 的好友请求")
@scheduler.scheduled_job("cron", minute="*/5")
async def _():
logger.info("尝试更新群成员信息")
await manager.update()
driver = nonebot.get_driver()
@driver.on_bot_connect
async def _():
logger.info("有 Bot 连接5 秒后试着更新群成员信息")
await asyncio.sleep(5)
await manager.update()