Compare commits

..

9 Commits

Author SHA1 Message Date
bc6263ec31 Merge pull request '添加安安展示' (#27) from tnot/konabot:添加安安展示 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #27
2025-10-21 22:07:19 +08:00
bc9d025836 修好了bug的安安展示 2025-10-21 22:02:41 +08:00
b552aacf89 添加安安展示 2025-10-21 21:33:32 +08:00
f9a0249772 优化 giftool 的截取逻辑
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 18:31:14 +08:00
c94db33b11 更新 ptimeparse 到 0.2.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 22:45:34 +08:00
67382a0c0a 在我写的模块采用更安全的 asyncio 锁写法
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 20:27:18 +08:00
fd4c9302c2 async with lock
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 20:24:47 +08:00
f30ad0cb7d 判定部分优化
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 18:48:10 +08:00
f7afe48680 精度修复
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 18:36:27 +08:00
10 changed files with 370 additions and 196 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 841 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 821 KiB

View File

@ -254,11 +254,10 @@ def _save_longtask_data(data: LongTaskModuleData):
@asynccontextmanager @asynccontextmanager
async def longtask_data(): async def longtask_data():
await longtask_lock.acquire() async with longtask_lock:
data = _load_longtask_data() data = _load_longtask_data()
yield data yield data
_save_longtask_data(data) _save_longtask_data(data)
longtask_lock.release()
async def create_longtask( async def create_longtask(

View File

@ -157,24 +157,24 @@ class IdiomGame:
""" """
跳过当前成语,选择下一个成语 跳过当前成语,选择下一个成语
""" """
await self.lock.acquire() async with self.lock:
self._skip_idiom_async(buff_score) self._skip_idiom_async()
self.lock.release() self.add_buff_score(buff_score)
return self.last_idiom return self.last_idiom
def _skip_idiom_async(self, buff_score: int = -100) -> str: def _skip_idiom_async(self) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS) self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1] self.last_char = self.last_idiom[-1]
self.add_buff_score(buff_score) if not self.is_nextable(self.last_char):
self._skip_idiom_async()
return self.last_idiom return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState: async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
""" """
用户发送成语 用户发送成语
""" """
await self.lock.acquire() async with self.lock:
state = self._verify_idiom(idiom, user_id) state = self._verify_idiom(idiom, user_id)
self.lock.release()
return state return state
def is_nextable(self, last_char: str) -> bool: def is_nextable(self, last_char: str) -> bool:
@ -203,10 +203,12 @@ class IdiomGame:
return TryVerifyState.VERIFIED_BUT_NO_NEXT return TryVerifyState.VERIFIED_BUT_NO_NEXT
return TryVerifyState.VERIFIED return TryVerifyState.VERIFIED
def get_user_score(self, user_id: str) -> int: def get_user_score(self, user_id: str) -> float:
if user_id not in self.score_board: if user_id not in self.score_board:
return 0 return 0
return self.score_board[user_id]["score"] # 避免浮点数精度问题导致过长
handled_score = round(self.score_board[user_id]["score"], 1)
return handled_score
def add_score(self, user_id: str, score: int): def add_score(self, user_id: str, score: int):
if user_id not in self.score_board: if user_id not in self.score_board:
@ -377,7 +379,7 @@ async def end_game(event: BaseEvent, group_id: str):
result_text += ( result_text += (
f"{i + 1}. " f"{i + 1}. "
+ UniMessage().at(user_id) + UniMessage().at(user_id)
+ f": {info['score'] + instance.get_all_buff_score()}\n" + f": {round(info['score'] + instance.get_all_buff_score(), 1)}\n"
) )
await evt.send(await result_text.export()) await evt.send(await result_text.export())
instance.clear_score_board() instance.clear_score_board()

View File

@ -1,10 +1,10 @@
import re import re
from io import BytesIO from io import BytesIO
import PIL.Image
from nonebot import on_message from nonebot import on_message
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage, from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
on_alconna)
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image from konabot.common.nb.extract_image import PIL_Image
@ -29,7 +29,8 @@ def parse_timestamp(tx: str) -> float | None:
return res return res
cmd_giftool = on_alconna(Alconna( cmd_giftool = on_alconna(
Alconna(
"giftool", "giftool",
Args["img", Image | None], Args["img", Image | None],
Option("--ss", Args["start_point", str]), Option("--ss", Args["start_point", str]),
@ -37,7 +38,8 @@ cmd_giftool = on_alconna(Alconna(
Option("-t", Args["length", str]), Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]), Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]), Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
)) )
)
@cmd_giftool.handle() @cmd_giftool.handle()
@ -80,81 +82,66 @@ async def _(
if not getattr(image, "is_animated", False): if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF") raise BotExceptionMessage("错误输入的不是动图GIF")
frames = [] ##
durations = [] # 从这里开始,采样整个 GIF 图
total_duration = 0.0 frames: list[PIL.Image.Image] = []
durations: list[float] = []
try: try:
for i in range(getattr(image, "n_frames")): for i in range(getattr(image, "n_frames")):
image.seek(i) image.seek(i)
frames.append(image.copy()) frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒 duration = image.info.get("duration", 100) / 1000
durations.append(duration) durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError: except EOFError:
pass pass
if not frames: if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败") raise BotExceptionMessage("错误:读取 GIF 帧失败")
# 采样结束
def time_to_frame_index(target_time: float) -> int: ##
if target_time <= 0: # 根据开始、结束时间或者帧数量来裁取 GIF 图
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
start_frame = max(0, start_frame) begin_time = ss or 0
end_frame = min(len(frames) - 1, end_frame) end_time = sum(durations)
selected_frames = frames[start_frame : end_frame + 1] end_time = min(begin_time + (t or end_time), to or end_time, end_time)
selected_durations = durations[start_frame : end_frame + 1]
if frame_count is not None and frame_count > 0: accumulated = 0.0
if frame_count >= len(selected_frames): status = 0
pass
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
output_img = BytesIO() sel_frames: list[PIL.Image.Image] = []
sel_durations: list[float] = []
adjusted_durations = [ for i in range(len(frames)):
dur / speed_factor for dur in selected_durations frame = frames[i]
] duration = durations[i]
if status == 0:
if accumulated + duration > begin_time:
status = 1
sel_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
elif status == 1:
if accumulated + duration > end_time:
sel_frames.append(frame)
sel_durations.append(end_time - accumulated)
break
sel_frames.append(frame)
sel_durations.append(duration)
accumulated += duration
##
# 加速!
sel_durations = [dur / speed_factor * 1000 for dur in durations]
rframes = [] rframes = []
rdur = [] rdur = []
acc_mod_20 = 0 acc_mod_20 = 0
for i in range(len(selected_frames)): for i in range(len(sel_frames)):
fr = selected_frames[i] fr = sel_frames[i]
du: float = adjusted_durations[i] du = round(sel_durations[i])
if du >= 20: if du >= 20:
rframes.append(fr) rframes.append(fr)
@ -170,10 +157,12 @@ async def _(
if acc_mod_20 >= 20: if acc_mod_20 >= 20:
acc_mod_20 = 0 acc_mod_20 = 0
if len(rframes) == 1 and len(selected_frames) > 1: if len(rframes) == 1 and len(sel_frames) > 1:
rframes.append(selected_frames[max(2, len(selected_frames) // 2)]) rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
rdur.append(20) rdur.append(20)
##
# 收尾:看看透明度这块
transparency_flag = False transparency_flag = False
for f in rframes: for f in rframes:
if f.mode == "RGBA": if f.mode == "RGBA":
@ -186,12 +175,13 @@ async def _(
tf = {} tf = {}
if transparency_flag: if transparency_flag:
tf['transparency'] = 0 tf["transparency"] = 0
if is_rev: if is_rev:
rframes = rframes[::-1] rframes = rframes[::-1]
rdur = rdur[::-1] rdur = rdur[::-1]
output_img = BytesIO()
if rframes: if rframes:
rframes[0].save( rframes[0].save(
output_img, output_img,

View File

@ -2,25 +2,52 @@ from io import BytesIO
from typing import Iterable, cast from typing import Iterable, cast
from nonebot import on_message from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, Image, MultiVar, Option, Text, from nonebot_plugin_alconna import (
UniMessage, UniMsg, on_alconna) Alconna,
Args,
Field,
Image,
MultiVar,
Option,
Text,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display, draw_snaur_display from konabot.plugins.memepack.drawing.display import (
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten, draw_cao_display,
draw_geimao, draw_mnk, draw_snaur_display,
draw_pt, draw_suan) draw_anan_display,
)
from konabot.plugins.memepack.drawing.saying import (
draw_cute_ten,
draw_geimao,
draw_mnk,
draw_pt,
draw_suan,
)
from nonebot.adapters import Bot, Event from nonebot.adapters import Bot, Event
from returns.result import Success, Failure from returns.result import Success, Failure
geimao = on_alconna(Alconna( geimao = on_alconna(
Alconna(
"给猫说", "给猫说",
Args["saying", MultiVar(str, '+'), Field( Args[
missing_tips=lambda: "你没有写给猫说了什么" "saying",
)] MultiVar(str, "+"),
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"给猫哈"}) Field(missing_tips=lambda: "你没有写给猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"给猫哈"},
)
@geimao.handle() @geimao.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -31,12 +58,21 @@ async def _(saying: list[str]):
await geimao.send(await UniMessage().image(raw=img_bytes).export()) await geimao.send(await UniMessage().image(raw=img_bytes).export())
pt = on_alconna(Alconna( pt = on_alconna(
Alconna(
"pt说", "pt说",
Args["saying", MultiVar(str, '+'), Field( Args[
missing_tips=lambda: "你没有写小帕说了什么" "saying",
)] MultiVar(str, "+"),
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"小帕说"}) Field(missing_tips=lambda: "你没有写小帕说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"小帕说"},
)
@pt.handle() @pt.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -47,12 +83,21 @@ async def _(saying: list[str]):
await pt.send(await UniMessage().image(raw=img_bytes).export()) await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna( mnk = on_alconna(
Alconna(
"re:小?黑白子?说", "re:小?黑白子?说",
Args["saying", MultiVar(str, '+'), Field( Args[
missing_tips=lambda: "你没有写黑白子说了什么" "saying",
)] MultiVar(str, "+"),
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"}) Field(missing_tips=lambda: "你没有写黑白子说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"mnk说"},
)
@mnk.handle() @mnk.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -63,12 +108,21 @@ async def _(saying: list[str]):
await mnk.send(await UniMessage().image(raw=img_bytes).export()) await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna( suan = on_alconna(
Alconna(
"小蒜说", "小蒜说",
Args["saying", MultiVar(str, '+'), Field( Args[
missing_tips=lambda: "你没有写小蒜说了什么" "saying",
)] MultiVar(str, "+"),
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set()) Field(missing_tips=lambda: "你没有写小蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@suan.handle() @suan.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -79,12 +133,21 @@ async def _(saying: list[str]):
await suan.send(await UniMessage().image(raw=img_bytes).export()) await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna( dsuan = on_alconna(
Alconna(
"大蒜说", "大蒜说",
Args["saying", MultiVar(str, '+'), Field( Args[
missing_tips=lambda: "你没有写大蒜说了什么" "saying",
)] MultiVar(str, "+"),
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set()) Field(missing_tips=lambda: "你没有写大蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@dsuan.handle() @dsuan.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -95,12 +158,21 @@ async def _(saying: list[str]):
await dsuan.send(await UniMessage().image(raw=img_bytes).export()) await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna( cutecat = on_alconna(
Alconna(
"乖猫说", "乖猫说",
Args["saying", MultiVar(str, '+'), Field( Args[
missing_tips=lambda: "你没有写十猫说了什么" "saying",
)] MultiVar(str, "+"),
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"}) Field(missing_tips=lambda: "你没有写十猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"十猫说"},
)
@cutecat.handle() @cutecat.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -113,13 +185,14 @@ async def _(saying: list[str]):
cao_display_cmd = on_message() cao_display_cmd = on_message()
@cao_display_cmd.handle() @cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot): async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False flag = False
for text in cast(Iterable[Text], msg.get(Text)): for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示": if text.text.strip() == "小槽展示":
flag = True flag = True
elif text.text.strip() == '': elif text.text.strip() == "":
continue continue
else: else:
return return
@ -135,28 +208,70 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
await cao_display_cmd.send( await cao_display_cmd.send(
await UniMessage() await UniMessage()
.at(user_id=evt.get_user_id()) .at(user_id=evt.get_user_id())
.text(' ') .text(" ")
.text(err) .text(err)
.export() .export()
) )
snaur_display_cmd = on_alconna(Alconna( snaur_display_cmd = on_alconna(
Alconna(
"卵总展示", "卵总展示",
Option("--whiteness", Args["whiteness", float], alias=["-w"]), Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--black-level", Args["black_level", float], alias=["-b"]), Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--opacity", Args["opacity", float], alias=["-o"]), Option("--opacity", Args["opacity", float], alias=["-o"]),
Option("--saturation", Args["saturation", float], alias=["-s"]), Option("--saturation", Args["saturation", float], alias=["-s"]),
Args["image", Image | None], Args["image", Image | None],
)) )
)
@snaur_display_cmd.handle() @snaur_display_cmd.handle()
async def _(img: PIL_Image, whiteness: float = 0.0, black_level: float = 0.2, async def _(
opacity: float = 0.8, saturation: float = 0.85): img: PIL_Image,
whiteness: float = 0.0,
black_level: float = 0.2,
opacity: float = 0.8,
saturation: float = 0.85,
):
img_processed = await draw_snaur_display( img_processed = await draw_snaur_display(
img, whiteness, black_level, opacity, saturation, img,
whiteness,
black_level,
opacity,
saturation,
) )
img_data = BytesIO() img_data = BytesIO()
img_processed.save(img_data, "PNG") img_processed.save(img_data, "PNG")
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export()) await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())
anan_display_cmd = on_message()
@anan_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
stripped = text.text.strip()
if stripped == "安安展示":
flag = True
elif stripped == "":
continue
else:
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_anan_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await anan_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)

View File

@ -27,6 +27,15 @@ SNAUR_QUAD_POINTS = np.float32(cast(Any, [
[106, 1280], [106, 1280],
])) ]))
anan_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_base.png")
anan_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_top.png")
ANAN_QUAD_POINTS = np.float32([
[157, 585],
[793, 599],
[781, 908],
[160, 908]
])
def _draw_cao_display(image: PIL.Image.Image): def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB")) src = np.array(image.convert("RGB"))
h, w = src.shape[:2] h, w = src.shape[:2]
@ -139,3 +148,52 @@ async def draw_snaur_display(
opacity, saturation, opacity, saturation,
) )
def _draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
src = np.array(image.convert("RGBA"))
h, w = src.shape[:2]
src_points = np.float32([
[0, 0],
[w, 0],
[w, h],
[0, h]
])
dst_points = ANAN_QUAD_POINTS
M = cv2.getPerspectiveTransform(src_points, dst_points)
output_w, output_h = anan_image_top.size
src_rgb = cv2.cvtColor(src, cv2.COLOR_RGBA2RGB) if src.shape[2] == 4 else src
warped_rgb = cv2.warpPerspective(
src_rgb,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
mask = np.zeros((h, w), dtype=np.uint8)
mask[:, :] = 255
warped_mask = cv2.warpPerspective(
mask,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0
)
warped_rgba = cv2.cvtColor(warped_rgb, cv2.COLOR_RGB2RGBA)
warped_rgba[:, :, 3] = warped_mask
warped_pil = PIL.Image.fromarray(warped_rgba, 'RGBA')
result = PIL.Image.alpha_composite(anan_image_base, warped_pil)
result = PIL.Image.alpha_composite(result, anan_image_top)
return result
async def draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
return await asyncio.to_thread(_draw_anan_display, image)

View File

@ -15,10 +15,10 @@ from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord import Bot as DiscordBot from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11.event import \ from nonebot.adapters.onebot.v11.event import (
GroupMessageEvent as OnebotV11GroupMessageEvent GroupMessageEvent as OnebotV11GroupMessageEvent,
from nonebot.adapters.onebot.v11.event import \ )
MessageEvent as OnebotV11MessageEvent from nonebot.adapters.onebot.v11.event import MessageEvent as OnebotV11MessageEvent
from nonebot_plugin_alconna import UniMessage, UniMsg from nonebot_plugin_alconna import UniMessage, UniMsg
from pydantic import BaseModel from pydantic import BaseModel
@ -68,14 +68,14 @@ def save_notify_config(config: NotifyConfigFile):
async def notify_now(notify: Notify): async def notify_now(notify: Notify):
if notify.platform == 'console': if notify.platform == "console":
bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)] bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)]
if len(bot) != 1: if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}") logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False return False
bot = bot[0] bot = bot[0]
await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}") await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'discord': elif notify.platform == "discord":
bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)] bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)]
if len(bot) != 1: if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}") logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
@ -83,7 +83,7 @@ async def notify_now(notify: Notify):
bot = bot[0] bot = bot[0]
channel = await bot.create_DM(recipient_id=int(notify.target)) channel = await bot.create_DM(recipient_id=int(notify.target))
await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}") await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'qq': elif notify.platform == "qq":
bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)] bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)]
if len(bot) != 1: if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}") logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
@ -92,17 +92,22 @@ async def notify_now(notify: Notify):
if notify.target_env is None: if notify.target_env is None:
await bot.send_private_msg( await bot.send_private_msg(
user_id=int(notify.target), user_id=int(notify.target),
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export( message=cast(
Any,
await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
bot=bot, bot=bot,
)), ),
),
) )
else: else:
await bot.send_group_msg( await bot.send_group_msg(
group_id=int(notify.target_env), group_id=int(notify.target_env),
message=cast(Any, message=cast(
await UniMessage().at( Any,
notify.target await UniMessage()
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot) .at(notify.target)
.text(f" 代办通知:{notify.notify_msg}")
.export(bot=bot),
), ),
) )
else: else:
@ -127,15 +132,17 @@ def create_notify_task(notify: Notify, fail2remove: bool = True):
) )
res = await notify_now(notify) res = await notify_now(notify)
if fail2remove or res: if fail2remove or res:
await DATA_FILE_LOCK.acquire() async with DATA_FILE_LOCK:
cfg = load_notify_config() cfg = load_notify_config()
cfg.notifies = [n for n in cfg.notifies if n.get_str() != notify.get_str()] cfg.notifies = [
n for n in cfg.notifies if n.get_str() != notify.get_str()
]
if not res: if not res:
cfg.unsent.append(notify) cfg.unsent.append(notify)
save_notify_config(cfg) save_notify_config(cfg)
DATA_FILE_LOCK.release()
else: else:
pass pass
return asynkio.create_task(mission()) return asynkio.create_task(mission())
@ -155,7 +162,8 @@ async def _(msg: UniMsg, mEvt: Event):
notify_time, notify_text = segments notify_time, notify_text = segments
# target_time = get_target_time(notify_time) # target_time = get_target_time(notify_time)
try: try:
target_time = ptimeparse.parse(notify_time) # target_time = ptimeparse.parse(notify_time)
target_time = ptimeparse.Parser().parse(notify_time)
logger.info(f"{notify_time} 解析出了时间:{target_time}") logger.info(f"{notify_time} 解析出了时间:{target_time}")
except Exception: except Exception:
logger.info(f"无法从 {notify_time} 中解析出时间") logger.info(f"无法从 {notify_time} 中解析出时间")
@ -201,8 +209,12 @@ async def _(msg: UniMsg, mEvt: Event):
save_notify_config(cfg) save_notify_config(cfg)
DATA_FILE_LOCK.release() DATA_FILE_LOCK.release()
await evt.send(await UniMessage().at(mEvt.get_user_id()).text( await evt.send(
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export()) await UniMessage()
.at(mEvt.get_user_id())
.text(f" 了解啦!将会在 {notify.notify_time} 提醒你哦~")
.export()
)
logger.info(f"创建了一条于 {notify.notify_time} 的代办提醒") logger.info(f"创建了一条于 {notify.notify_time} 的代办提醒")
@ -253,8 +265,8 @@ async def _():
logger.info("所有的代办提醒 Task 都已经退出了") logger.info("所有的代办提醒 Task 都已经退出了")
for sig in (signal.SIGINT, signal.SIGTERM): for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, functools.partial( loop.add_signal_handler(
asynkio.create_task, shutdown(sig) sig, functools.partial(asynkio.create_task, shutdown(sig))
)) )
await asynkio.gather(*ASYNK_TASKS) await asynkio.gather(*ASYNK_TASKS)

8
poetry.lock generated
View File

@ -2402,14 +2402,14 @@ reference = "mirrors"
[[package]] [[package]]
name = "ptimeparse" name = "ptimeparse"
version = "0.1.2" version = "0.2.0"
description = "一个用于解析中文的时间表达的库" description = "一个用于解析中文的时间表达的库"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "ptimeparse-0.1.2-py3-none-any.whl", hash = "sha256:0eea791396e53b63330fadb40d9f0a2e6272bd5467246f10d1d6971bc606edff"}, {file = "ptimeparse-0.2.0-py3-none-any.whl", hash = "sha256:57055f8fd99fb69e19deac3b8a5c7ac91af86c7ac09781632e9abf318df0d6d2"},
{file = "ptimeparse-0.1.2.tar.gz", hash = "sha256:658be90a3cc2994c09c4ea2f276d257e7eb84bc330be79950baefe32b19779a2"}, {file = "ptimeparse-0.2.0.tar.gz", hash = "sha256:867c265f2e157fe4d793d20fe9c449b8ede5c855f336d7e6b2eb78551e622766"},
] ]
[package.source] [package.source]
@ -3807,4 +3807,4 @@ reference = "mirrors"
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.12,<4.0" python-versions = ">=3.12,<4.0"
content-hash = "d6a325b769fb3ed207c1a8891e65ea20bae20166fa281d6fa620faf54ad15bd8" content-hash = "02530953efe65da1a788845cd43f8856be62db5bfb59de691cad813f57bab25e"

View File

@ -2,9 +2,7 @@
name = "konabot" name = "konabot"
version = "0.1.0" version = "0.1.0"
description = "在 MTTU 内部使用的 bot" description = "在 MTTU 内部使用的 bot"
authors = [ authors = [{ name = "passthem", email = "Passthem183@gmail.com" }]
{name = "passthem",email = "Passthem183@gmail.com"}
]
readme = "README.md" readme = "README.md"
requires-python = ">=3.12,<4.0" requires-python = ">=3.12,<4.0"
dependencies = [ dependencies = [
@ -22,7 +20,7 @@ dependencies = [
"imagetext-py (>=2.2.0,<3.0.0)", "imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)", "opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)", "returns (>=0.26.0,<0.27.0)",
"ptimeparse (>=0.1.1,<0.2.0)", "ptimeparse (>=0.1.1,<1.0.0)",
"skia-python (>=138.0,<139.0)", "skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)", "nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)", "qrcode (>=8.2,<9.0)",