Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0acffea86d | |||
| 3e395f8a35 | |||
| 312e203bbe |
@ -51,6 +51,10 @@ class LongTaskTarget(BaseModel):
|
||||
target_id: str
|
||||
"沟通对象的 ID"
|
||||
|
||||
@property
|
||||
def is_private_chat(self):
|
||||
return self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX)
|
||||
|
||||
async def send_message(self, msg: UniMessage | str, at: bool = True) -> bool:
|
||||
try:
|
||||
bot = nonebot.get_bot(self.self_id)
|
||||
|
||||
54
konabot/common/username.py
Normal file
54
konabot/common/username.py
Normal file
@ -0,0 +1,54 @@
|
||||
import re
|
||||
import nonebot
|
||||
|
||||
from nonebot.adapters.onebot.v11 import Bot as OBBot
|
||||
|
||||
|
||||
class UsernameManager:
|
||||
grouped_data: dict[int, dict[int, str]]
|
||||
individual_data: dict[int, str]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.grouped_data = {}
|
||||
self.individual_data = {}
|
||||
|
||||
async def update(self):
|
||||
for bot in nonebot.get_bots().values():
|
||||
if isinstance(bot, OBBot):
|
||||
for user in await bot.get_friend_list():
|
||||
uid = user["user_id"]
|
||||
nickname = user["nickname"]
|
||||
self.individual_data[uid] = nickname
|
||||
for group in await bot.get_group_list():
|
||||
gid = group["group_id"]
|
||||
for member in await bot.get_group_member_list(group_id=gid):
|
||||
uid = member["user_id"]
|
||||
card = member.get("card", "")
|
||||
nickname = member.get("nickname", "")
|
||||
if card:
|
||||
self.grouped_data.setdefault(gid, {})[uid] = card
|
||||
if nickname:
|
||||
self.individual_data[uid] = nickname
|
||||
|
||||
def get(self, qqid: int, groupid: int | None = None) -> str:
|
||||
if groupid is not None and groupid in self.grouped_data:
|
||||
n = self.grouped_data[groupid].get(qqid)
|
||||
if n is not None:
|
||||
return n
|
||||
if qqid in self.individual_data:
|
||||
return self.individual_data[qqid]
|
||||
return str(qqid)
|
||||
|
||||
|
||||
manager = UsernameManager()
|
||||
|
||||
def get_username(qqid: int | str, group: int | str | None = None):
|
||||
if isinstance(group, str):
|
||||
group = None if not re.match(r"^\d+$", group) else int(group)
|
||||
if isinstance(qqid, str):
|
||||
if re.match(r"^\d+$", qqid):
|
||||
qqid = int(qqid)
|
||||
else:
|
||||
return qqid
|
||||
return manager.get(qqid, group)
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
from functools import reduce
|
||||
from math import ceil
|
||||
import re
|
||||
from loguru import logger
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||||
from konabot.common.username import get_username
|
||||
from konabot.plugins.kona_ph.core.storage import get_today_date
|
||||
from konabot.plugins.kona_ph.manager import PUZZLE_PAGE_SIZE, create_admin_commands, config, puzzle_manager
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
@ -13,9 +16,9 @@ create_admin_commands()
|
||||
|
||||
|
||||
async def is_play_group(target: DepLongTaskTarget):
|
||||
if target.channel_id in config.plugin_puzzle_playgroup:
|
||||
if target.is_private_chat:
|
||||
return True
|
||||
if target.target_id in target.channel_id:
|
||||
if target.channel_id in config.plugin_puzzle_playgroup:
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -33,7 +36,7 @@ async def _(flag: str, target: DepLongTaskTarget):
|
||||
|
||||
|
||||
cmd_query = on_alconna(Alconna(
|
||||
r"re:(?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?"
|
||||
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\??)?)|今日谜?题目?)"
|
||||
), rule=is_play_group)
|
||||
|
||||
@cmd_query.handle()
|
||||
@ -45,6 +48,52 @@ async def _(target: DepLongTaskTarget):
|
||||
await target.send_message(p.get_unimessage())
|
||||
|
||||
|
||||
cmd_query_submission = on_alconna(Alconna(
|
||||
"今日答题情况"
|
||||
), rule=is_play_group)
|
||||
|
||||
@cmd_query_submission.handle()
|
||||
async def _(target: DepLongTaskTarget):
|
||||
async with puzzle_manager() as manager:
|
||||
p = manager.get_today_puzzle()
|
||||
if p is None:
|
||||
return await target.send_message("今天无题")
|
||||
msg = UniMessage.text("==== 今日答题情况 ====\n\n")
|
||||
|
||||
subcount = len(reduce(
|
||||
lambda x, y: x + y,
|
||||
manager.submissions.get(p.raw_id, {}).values(),
|
||||
[],
|
||||
))
|
||||
info = manager.daily_puzzle[p.index_id]
|
||||
|
||||
msg = msg.text(
|
||||
f"总体情况:答对 {len(info.success_users)} / "
|
||||
f"参与 {len(info.tried_users)} / "
|
||||
f"提交 {subcount}\n"
|
||||
)
|
||||
|
||||
success_users = sorted(list(info.success_users.items()), key=lambda v: v[1])
|
||||
gid = None
|
||||
if re.match(r"^\d+$", target.channel_id):
|
||||
gid = int(target.channel_id)
|
||||
for u, d in success_users:
|
||||
uname = u
|
||||
if re.match(r"^\d+$", u):
|
||||
uname = get_username(int(u), gid)
|
||||
t = d.strftime("%H:%M")
|
||||
tries = len(manager.submissions[p.raw_id][u])
|
||||
msg = msg.text(f"\n- {uname} [Solved at {t} in {tries} times]")
|
||||
for u in info.tried_users - set(info.success_users.keys()):
|
||||
uname = u
|
||||
if re.match(r"^\d+$", u):
|
||||
uname = get_username(int(u), gid)
|
||||
tries = len(manager.submissions[p.raw_id][u])
|
||||
msg = msg.text(f"\n- {uname} [Unsolved in {tries} times]")
|
||||
|
||||
await target.send_message(msg)
|
||||
|
||||
|
||||
cmd_history = on_alconna(Alconna(
|
||||
"历史题目",
|
||||
Args["page?", int],
|
||||
@ -92,6 +141,10 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
|
||||
@scheduler.scheduled_job("cron", hour="8")
|
||||
async def _():
|
||||
async with puzzle_manager() as manager:
|
||||
msg2 = manager.get_report_yesterday()
|
||||
if msg2 is not None:
|
||||
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
|
||||
|
||||
puzzle = manager.get_today_puzzle()
|
||||
if puzzle is not None:
|
||||
logger.info(f"找到了题目 {puzzle.raw_id},发送")
|
||||
@ -99,3 +152,4 @@ async def _():
|
||||
else:
|
||||
logger.info("自动任务:没有找到题目,跳过")
|
||||
|
||||
|
||||
|
||||
@ -12,6 +12,7 @@ from nonebot_plugin_alconna import UniMessage
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
|
||||
from konabot.common.path import DATA_PATH
|
||||
from konabot.common.username import get_username
|
||||
|
||||
|
||||
KONAPH_BASE = DATA_PATH / "KonaPH"
|
||||
@ -37,9 +38,6 @@ class Puzzle(BaseModel):
|
||||
flag: str
|
||||
|
||||
ready: bool = False
|
||||
published: bool = False
|
||||
pinned: bool = False
|
||||
|
||||
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
|
||||
|
||||
def get_image_path(self) -> Path:
|
||||
@ -52,7 +50,7 @@ class Puzzle(BaseModel):
|
||||
if self.img_name:
|
||||
result = result.text("\n\n").image(raw=self.get_image_path().read_bytes())
|
||||
|
||||
result = result.text("\n\n出题者:").at(self.author_id)
|
||||
result = result.text(f"\n\n出题者:{get_username(self.author_id)}")
|
||||
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
|
||||
|
||||
return result
|
||||
@ -111,68 +109,81 @@ class PuzzleManager(BaseModel):
|
||||
daily_puzzle_of_date: dict[datetime.date, str] = {}
|
||||
|
||||
puzzle_pinned: str = ""
|
||||
unpublished_puzzles: set[str] = set()
|
||||
unready_puzzles: set[str] = set()
|
||||
published_puzzles: set[str] = set()
|
||||
|
||||
index_id_counter: int = 1
|
||||
submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {}
|
||||
last_pubish_date: datetime.date = Field(
|
||||
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
|
||||
)
|
||||
last_checked_date: datetime.date = Field(
|
||||
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
|
||||
)
|
||||
|
||||
@property
|
||||
def last_publish_date(self):
|
||||
return max(self.daily_puzzle_of_date.keys())
|
||||
|
||||
@property
|
||||
def unpublished_puzzles(self):
|
||||
return set((
|
||||
p.raw_id for p in self.puzzle_data.values()
|
||||
if not self.is_puzzle_published(p.raw_id) and p.ready
|
||||
))
|
||||
|
||||
@property
|
||||
def unready_puzzles(self):
|
||||
return set((
|
||||
p.raw_id for p in self.puzzle_data.values()
|
||||
if not self.is_puzzle_published(p.raw_id) and not p.ready
|
||||
))
|
||||
|
||||
@property
|
||||
def published_puzzles(self):
|
||||
return set((
|
||||
p.raw_id for p in self.puzzle_data.values()
|
||||
if self.is_puzzle_published(p.raw_id)
|
||||
))
|
||||
|
||||
def is_puzzle_published(self, raw_id: str):
|
||||
return raw_id in [i.raw_id for i in self.daily_puzzle.values()]
|
||||
|
||||
def publish_puzzle(self, raw_id: str):
|
||||
assert raw_id in self.puzzle_data
|
||||
|
||||
self.unpublished_puzzles -= set(raw_id)
|
||||
self.unready_puzzles -= set(raw_id)
|
||||
today = get_today_date()
|
||||
|
||||
p = self.puzzle_data[raw_id]
|
||||
p.index_id = str(self.index_id_counter)
|
||||
p.ready = True
|
||||
p.published = True
|
||||
p.pinned = False
|
||||
self.puzzle_pinned = ""
|
||||
self.last_pubish_date = get_today_date()
|
||||
self.last_checked_date = self.last_pubish_date
|
||||
self.last_checked_date = today
|
||||
self.daily_puzzle[p.index_id] = DailyPuzzleInfo(
|
||||
raw_id=raw_id,
|
||||
time=self.last_pubish_date,
|
||||
time=today,
|
||||
)
|
||||
self.daily_puzzle_of_date[self.last_pubish_date] = p.index_id
|
||||
self.published_puzzles.add(raw_id)
|
||||
self.daily_puzzle_of_date[today] = p.index_id
|
||||
|
||||
self.index_id_counter += 1
|
||||
|
||||
def admin_mark_ready(self, raw_id: str, ready: bool = True):
|
||||
if raw_id not in self.puzzle_data:
|
||||
return
|
||||
if ready:
|
||||
self.unready_puzzles -= set(raw_id)
|
||||
if raw_id not in self.published_puzzles:
|
||||
self.unpublished_puzzles.add(raw_id)
|
||||
p = self.puzzle_data[raw_id]
|
||||
p.ready = True
|
||||
p.published = raw_id in self.published_puzzles
|
||||
else:
|
||||
self.unready_puzzles.add(raw_id)
|
||||
self.unpublished_puzzles -= set(raw_id)
|
||||
p = self.puzzle_data[raw_id]
|
||||
p.ready = False
|
||||
p.published = False
|
||||
# if p.raw_id == self.puzzle_pinned:
|
||||
# self.puzzle_pinned = ""
|
||||
def fix(self):
|
||||
# 尝试修复今日的数据
|
||||
for p in self.puzzle_data.values():
|
||||
if self.is_puzzle_published(p.raw_id):
|
||||
p.ready = True
|
||||
|
||||
if self.puzzle_pinned not in self.unpublished_puzzles:
|
||||
self.puzzle_pinned = ""
|
||||
|
||||
# 撤回重复发布的题
|
||||
already_published: set[str] = set()
|
||||
for date in list(self.daily_puzzle_of_date.keys()):
|
||||
index_id = self.daily_puzzle_of_date[date]
|
||||
info = self.daily_puzzle[index_id]
|
||||
if info.raw_id in already_published:
|
||||
del self.daily_puzzle[index_id]
|
||||
del self.daily_puzzle_of_date[date]
|
||||
else:
|
||||
already_published.add(info.raw_id)
|
||||
|
||||
def admin_pin_puzzle(self, raw_id: str):
|
||||
if self.puzzle_pinned:
|
||||
p = self.puzzle_data.get(self.puzzle_pinned)
|
||||
if p is not None:
|
||||
p.pinned = False
|
||||
if raw_id in self.puzzle_data:
|
||||
p = self.puzzle_data[raw_id]
|
||||
p.pinned = True
|
||||
self.puzzle_pinned = raw_id
|
||||
else:
|
||||
self.puzzle_pinned = ""
|
||||
@ -252,9 +263,7 @@ class PuzzleManager(BaseModel):
|
||||
author_id=user,
|
||||
flag="konaph{this_is_a_flag}",
|
||||
ready=False,
|
||||
published=False,
|
||||
)
|
||||
self.unready_puzzles.add(p.raw_id)
|
||||
self.puzzle_data[p.raw_id] = p
|
||||
return p
|
||||
|
||||
@ -286,9 +295,9 @@ class PuzzleManager(BaseModel):
|
||||
us = us[:5]
|
||||
for u, _ in us:
|
||||
m = self.submissions[puzzle.raw_id][u][-1]
|
||||
message = message.text("- ").at(u).text(f" 于 {m.time.strftime('%H:%M')}")
|
||||
message = message.text(f"- {get_username(u)} 于 {m.time.strftime('%H:%M')}")
|
||||
|
||||
message = message.text("\n\n出题者:").at(puzzle.author_id)
|
||||
message = message.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
|
||||
return message
|
||||
|
||||
|
||||
|
||||
@ -8,7 +8,7 @@ from pydantic import BaseModel
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.nb.extract_image import download_image_bytes
|
||||
from konabot.common.nb.qq_broadcast import qq_broadcast
|
||||
from konabot.plugins.kona_ph.core.storage import Puzzle, get_today_date, puzzle_manager
|
||||
from konabot.plugins.kona_ph.core.storage import Puzzle, PuzzleManager, get_today_date, puzzle_manager
|
||||
|
||||
PUZZLE_PAGE_SIZE = 10
|
||||
|
||||
@ -30,12 +30,12 @@ def is_puzzle_admin(target: DepLongTaskTarget):
|
||||
return target.target_id in config.plugin_puzzle_admin
|
||||
|
||||
|
||||
def get_puzzle_info_message(puzzle: Puzzle) -> UniMessage[Any]:
|
||||
status = "✅ 已准备,待发布" if puzzle.ready and not puzzle.published else \
|
||||
(f"🟢 已发布: #{puzzle.index_id}" if puzzle.published else "⚙️ 未准备")
|
||||
def get_puzzle_info_message(manager: PuzzleManager, puzzle: Puzzle) -> UniMessage[Any]:
|
||||
status = "✅ 已准备,待发布" if puzzle.ready and not manager.is_puzzle_published(puzzle.raw_id) else \
|
||||
(f"🟢 已发布: #{puzzle.index_id}" if manager.is_puzzle_published(puzzle.raw_id) else "⚙️ 未准备")
|
||||
|
||||
status_suffix = ""
|
||||
if puzzle.pinned:
|
||||
if puzzle.raw_id == manager.puzzle_pinned:
|
||||
status_suffix += " | 📌 已被管理员置顶"
|
||||
|
||||
msg = UniMessage.text(
|
||||
@ -130,7 +130,7 @@ def create_admin_commands():
|
||||
return await target.send_message(UniMessage.text(
|
||||
"题目早就准备好啦!"
|
||||
))
|
||||
manager.admin_mark_ready(raw_id, True)
|
||||
p.ready = True
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经准备就绪!"
|
||||
))
|
||||
@ -151,12 +151,12 @@ def create_admin_commands():
|
||||
return await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经是未取消状态了!"
|
||||
))
|
||||
if p.published:
|
||||
if manager.is_puzzle_published(p.raw_id):
|
||||
return await target.send_message(UniMessage.text(
|
||||
"已发布的谜题不能取消准备状态!"
|
||||
))
|
||||
|
||||
manager.admin_mark_ready(raw_id, False)
|
||||
p.ready = False
|
||||
await target.send_message(UniMessage.text(
|
||||
f"谜题「{p.title}」已经取消准备!"
|
||||
))
|
||||
@ -174,7 +174,7 @@ def create_admin_commands():
|
||||
"这不是你的题,你没有权限查看详细信息!"
|
||||
))
|
||||
|
||||
await target.send_message(get_puzzle_info_message(p))
|
||||
await target.send_message(get_puzzle_info_message(manager, p))
|
||||
|
||||
@cmd_admin.assign("my")
|
||||
async def _(target: DepLongTaskTarget, page: int = 1):
|
||||
@ -193,9 +193,9 @@ def create_admin_commands():
|
||||
message = UniMessage.text("==== 我的谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
if p.pinned:
|
||||
if manager.puzzle_pinned == p.raw_id:
|
||||
message = message.text("[📌]")
|
||||
if p.published:
|
||||
if manager.is_puzzle_published(p.raw_id):
|
||||
message = message.text(f"[#{p.index_id}] ")
|
||||
elif p.ready:
|
||||
message = message.text("[✅] ")
|
||||
@ -224,9 +224,9 @@ def create_admin_commands():
|
||||
message = UniMessage.text("==== 所有谜题 ====\n\n")
|
||||
for p in puzzles:
|
||||
message = message.text("- ")
|
||||
if p.pinned:
|
||||
if p.raw_id == manager.puzzle_pinned:
|
||||
message = message.text("[📌]")
|
||||
if p.published:
|
||||
if manager.is_puzzle_published(p.raw_id):
|
||||
message = message.text(f"[#{p.index_id}] ")
|
||||
elif p.ready:
|
||||
message = message.text("[✅] ")
|
||||
@ -307,7 +307,7 @@ def create_admin_commands():
|
||||
elif remove_image.available:
|
||||
p.remove_image()
|
||||
|
||||
info2 = get_puzzle_info_message(p)
|
||||
info2 = get_puzzle_info_message(manager, p)
|
||||
|
||||
return await target.send_message("修改好啦!看看效果:\n\n" + info2)
|
||||
|
||||
|
||||
@ -4,10 +4,13 @@ from typing import cast
|
||||
|
||||
from loguru import logger
|
||||
from nonebot import get_bot, on_request
|
||||
import nonebot
|
||||
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
|
||||
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
|
||||
from konabot.common.nb.is_admin import cfg as adminConfig
|
||||
from konabot.common.username import manager
|
||||
|
||||
add_request = on_request()
|
||||
|
||||
@ -23,3 +26,15 @@ async def _(req: FriendRequestEvent):
|
||||
await req.approve(bot)
|
||||
logger.info(f"已经自动同意 {req.user_id} 的好友请求")
|
||||
|
||||
@scheduler.scheduled_job("cron", minute="*/5")
|
||||
async def _():
|
||||
logger.info("尝试更新群成员信息")
|
||||
await manager.update()
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
@driver.on_bot_connect
|
||||
async def _():
|
||||
logger.info("有 Bot 连接,5 秒后试着更新群成员信息")
|
||||
await asyncio.sleep(5)
|
||||
await manager.update()
|
||||
Reference in New Issue
Block a user