Compare commits

...

4 Commits

Author SHA1 Message Date
bc6263ec31 Merge pull request '添加安安展示' (#27) from tnot/konabot:添加安安展示 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #27
2025-10-21 22:07:19 +08:00
bc9d025836 修好了bug的安安展示 2025-10-21 22:02:41 +08:00
b552aacf89 添加安安展示 2025-10-21 21:33:32 +08:00
f9a0249772 优化 giftool 的截取逻辑
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 18:31:14 +08:00
5 changed files with 290 additions and 127 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 841 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 821 KiB

View File

@ -1,10 +1,10 @@
import re import re
from io import BytesIO from io import BytesIO
import PIL.Image
from nonebot import on_message from nonebot import on_message
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage, from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
on_alconna)
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image from konabot.common.nb.extract_image import PIL_Image
@ -29,15 +29,17 @@ def parse_timestamp(tx: str) -> float | None:
return res return res
cmd_giftool = on_alconna(Alconna( cmd_giftool = on_alconna(
"giftool", Alconna(
Args["img", Image | None], "giftool",
Option("--ss", Args["start_point", str]), Args["img", Image | None],
Option("--frames:v", Args["frame_count", int]), Option("--ss", Args["start_point", str]),
Option("-t", Args["length", str]), Option("--frames:v", Args["frame_count", int]),
Option("-to", Args["end_point", str]), Option("-t", Args["length", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]), Option("-to", Args["end_point", str]),
)) Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
)
)
@cmd_giftool.handle() @cmd_giftool.handle()
@ -80,81 +82,66 @@ async def _(
if not getattr(image, "is_animated", False): if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF") raise BotExceptionMessage("错误输入的不是动图GIF")
frames = [] ##
durations = [] # 从这里开始,采样整个 GIF 图
total_duration = 0.0 frames: list[PIL.Image.Image] = []
durations: list[float] = []
try: try:
for i in range(getattr(image, "n_frames")): for i in range(getattr(image, "n_frames")):
image.seek(i) image.seek(i)
frames.append(image.copy()) frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒 duration = image.info.get("duration", 100) / 1000
durations.append(duration) durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError: except EOFError:
pass pass
if not frames: if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败") raise BotExceptionMessage("错误:读取 GIF 帧失败")
# 采样结束
def time_to_frame_index(target_time: float) -> int: ##
if target_time <= 0: # 根据开始、结束时间或者帧数量来裁取 GIF 图
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
start_frame = max(0, start_frame) begin_time = ss or 0
end_frame = min(len(frames) - 1, end_frame) end_time = sum(durations)
selected_frames = frames[start_frame : end_frame + 1] end_time = min(begin_time + (t or end_time), to or end_time, end_time)
selected_durations = durations[start_frame : end_frame + 1]
if frame_count is not None and frame_count > 0: accumulated = 0.0
if frame_count >= len(selected_frames): status = 0
pass
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
output_img = BytesIO() sel_frames: list[PIL.Image.Image] = []
sel_durations: list[float] = []
adjusted_durations = [ for i in range(len(frames)):
dur / speed_factor for dur in selected_durations frame = frames[i]
] duration = durations[i]
if status == 0:
if accumulated + duration > begin_time:
status = 1
sel_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
elif status == 1:
if accumulated + duration > end_time:
sel_frames.append(frame)
sel_durations.append(end_time - accumulated)
break
sel_frames.append(frame)
sel_durations.append(duration)
accumulated += duration
##
# 加速!
sel_durations = [dur / speed_factor * 1000 for dur in durations]
rframes = [] rframes = []
rdur = [] rdur = []
acc_mod_20 = 0 acc_mod_20 = 0
for i in range(len(selected_frames)): for i in range(len(sel_frames)):
fr = selected_frames[i] fr = sel_frames[i]
du: float = adjusted_durations[i] du = round(sel_durations[i])
if du >= 20: if du >= 20:
rframes.append(fr) rframes.append(fr)
@ -170,10 +157,12 @@ async def _(
if acc_mod_20 >= 20: if acc_mod_20 >= 20:
acc_mod_20 = 0 acc_mod_20 = 0
if len(rframes) == 1 and len(selected_frames) > 1: if len(rframes) == 1 and len(sel_frames) > 1:
rframes.append(selected_frames[max(2, len(selected_frames) // 2)]) rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
rdur.append(20) rdur.append(20)
##
# 收尾:看看透明度这块
transparency_flag = False transparency_flag = False
for f in rframes: for f in rframes:
if f.mode == "RGBA": if f.mode == "RGBA":
@ -186,12 +175,13 @@ async def _(
tf = {} tf = {}
if transparency_flag: if transparency_flag:
tf['transparency'] = 0 tf["transparency"] = 0
if is_rev: if is_rev:
rframes = rframes[::-1] rframes = rframes[::-1]
rdur = rdur[::-1] rdur = rdur[::-1]
output_img = BytesIO()
if rframes: if rframes:
rframes[0].save( rframes[0].save(
output_img, output_img,

View File

@ -2,25 +2,52 @@ from io import BytesIO
from typing import Iterable, cast from typing import Iterable, cast
from nonebot import on_message from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, Image, MultiVar, Option, Text, from nonebot_plugin_alconna import (
UniMessage, UniMsg, on_alconna) Alconna,
Args,
Field,
Image,
MultiVar,
Option,
Text,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display, draw_snaur_display from konabot.plugins.memepack.drawing.display import (
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten, draw_cao_display,
draw_geimao, draw_mnk, draw_snaur_display,
draw_pt, draw_suan) draw_anan_display,
)
from konabot.plugins.memepack.drawing.saying import (
draw_cute_ten,
draw_geimao,
draw_mnk,
draw_pt,
draw_suan,
)
from nonebot.adapters import Bot, Event from nonebot.adapters import Bot, Event
from returns.result import Success, Failure from returns.result import Success, Failure
geimao = on_alconna(Alconna( geimao = on_alconna(
"给猫说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "给猫说",
missing_tips=lambda: "你没有写给猫说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"给猫哈"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写给猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"给猫哈"},
)
@geimao.handle() @geimao.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -31,12 +58,21 @@ async def _(saying: list[str]):
await geimao.send(await UniMessage().image(raw=img_bytes).export()) await geimao.send(await UniMessage().image(raw=img_bytes).export())
pt = on_alconna(Alconna( pt = on_alconna(
"pt说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "pt说",
missing_tips=lambda: "你没有写小帕说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"小帕说"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小帕说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"小帕说"},
)
@pt.handle() @pt.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -47,12 +83,21 @@ async def _(saying: list[str]):
await pt.send(await UniMessage().image(raw=img_bytes).export()) await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna( mnk = on_alconna(
"re:小?黑白子?说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "re:小?黑白子?说",
missing_tips=lambda: "你没有写黑白子说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写黑白子说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"mnk说"},
)
@mnk.handle() @mnk.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -63,12 +108,21 @@ async def _(saying: list[str]):
await mnk.send(await UniMessage().image(raw=img_bytes).export()) await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna( suan = on_alconna(
"小蒜说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "小蒜说",
missing_tips=lambda: "你没有写小蒜说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set()) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@suan.handle() @suan.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -79,12 +133,21 @@ async def _(saying: list[str]):
await suan.send(await UniMessage().image(raw=img_bytes).export()) await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna( dsuan = on_alconna(
"大蒜说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "大蒜说",
missing_tips=lambda: "你没有写大蒜说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set()) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写大蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@dsuan.handle() @dsuan.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -95,12 +158,21 @@ async def _(saying: list[str]):
await dsuan.send(await UniMessage().image(raw=img_bytes).export()) await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna( cutecat = on_alconna(
"乖猫说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "乖猫说",
missing_tips=lambda: "你没有写十猫说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写十猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"十猫说"},
)
@cutecat.handle() @cutecat.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -113,13 +185,14 @@ async def _(saying: list[str]):
cao_display_cmd = on_message() cao_display_cmd = on_message()
@cao_display_cmd.handle() @cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot): async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False flag = False
for text in cast(Iterable[Text], msg.get(Text)): for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示": if text.text.strip() == "小槽展示":
flag = True flag = True
elif text.text.strip() == '': elif text.text.strip() == "":
continue continue
else: else:
return return
@ -134,29 +207,71 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
case Failure(err): case Failure(err):
await cao_display_cmd.send( await cao_display_cmd.send(
await UniMessage() await UniMessage()
.at(user_id=evt.get_user_id()) .at(user_id=evt.get_user_id())
.text(' ') .text(" ")
.text(err) .text(err)
.export() .export()
) )
snaur_display_cmd = on_alconna(Alconna( snaur_display_cmd = on_alconna(
"卵总展示", Alconna(
Option("--whiteness", Args["whiteness", float], alias=["-w"]), "卵总展示",
Option("--black-level", Args["black_level", float], alias=["-b"]), Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--opacity", Args["opacity", float], alias=["-o"]), Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--saturation", Args["saturation", float], alias=["-s"]), Option("--opacity", Args["opacity", float], alias=["-o"]),
Args["image", Image | None], Option("--saturation", Args["saturation", float], alias=["-s"]),
)) Args["image", Image | None],
)
)
@snaur_display_cmd.handle() @snaur_display_cmd.handle()
async def _(img: PIL_Image, whiteness: float = 0.0, black_level: float = 0.2, async def _(
opacity: float = 0.8, saturation: float = 0.85): img: PIL_Image,
whiteness: float = 0.0,
black_level: float = 0.2,
opacity: float = 0.8,
saturation: float = 0.85,
):
img_processed = await draw_snaur_display( img_processed = await draw_snaur_display(
img, whiteness, black_level, opacity, saturation, img,
whiteness,
black_level,
opacity,
saturation,
) )
img_data = BytesIO() img_data = BytesIO()
img_processed.save(img_data, "PNG") img_processed.save(img_data, "PNG")
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export()) await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())
anan_display_cmd = on_message()
@anan_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
stripped = text.text.strip()
if stripped == "安安展示":
flag = True
elif stripped == "":
continue
else:
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_anan_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await anan_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)

View File

@ -27,6 +27,15 @@ SNAUR_QUAD_POINTS = np.float32(cast(Any, [
[106, 1280], [106, 1280],
])) ]))
anan_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_base.png")
anan_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_top.png")
ANAN_QUAD_POINTS = np.float32([
[157, 585],
[793, 599],
[781, 908],
[160, 908]
])
def _draw_cao_display(image: PIL.Image.Image): def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB")) src = np.array(image.convert("RGB"))
h, w = src.shape[:2] h, w = src.shape[:2]
@ -139,3 +148,52 @@ async def draw_snaur_display(
opacity, saturation, opacity, saturation,
) )
def _draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
src = np.array(image.convert("RGBA"))
h, w = src.shape[:2]
src_points = np.float32([
[0, 0],
[w, 0],
[w, h],
[0, h]
])
dst_points = ANAN_QUAD_POINTS
M = cv2.getPerspectiveTransform(src_points, dst_points)
output_w, output_h = anan_image_top.size
src_rgb = cv2.cvtColor(src, cv2.COLOR_RGBA2RGB) if src.shape[2] == 4 else src
warped_rgb = cv2.warpPerspective(
src_rgb,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
mask = np.zeros((h, w), dtype=np.uint8)
mask[:, :] = 255
warped_mask = cv2.warpPerspective(
mask,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0
)
warped_rgba = cv2.cvtColor(warped_rgb, cv2.COLOR_RGB2RGBA)
warped_rgba[:, :, 3] = warped_mask
warped_pil = PIL.Image.fromarray(warped_rgba, 'RGBA')
result = PIL.Image.alpha_composite(anan_image_base, warped_pil)
result = PIL.Image.alpha_composite(result, anan_image_top)
return result
async def draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
return await asyncio.to_thread(_draw_anan_display, image)