Compare commits

...

4 Commits

Author SHA1 Message Date
bc6263ec31 Merge pull request '添加安安展示' (#27) from tnot/konabot:添加安安展示 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #27
2025-10-21 22:07:19 +08:00
bc9d025836 修好了bug的安安展示 2025-10-21 22:02:41 +08:00
b552aacf89 添加安安展示 2025-10-21 21:33:32 +08:00
f9a0249772 优化 giftool 的截取逻辑
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 18:31:14 +08:00
5 changed files with 290 additions and 127 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 841 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 821 KiB

View File

@ -1,10 +1,10 @@
import re
from io import BytesIO
import PIL.Image
from nonebot import on_message
from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
on_alconna)
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image
@ -29,15 +29,17 @@ def parse_timestamp(tx: str) -> float | None:
return res
cmd_giftool = on_alconna(Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
))
cmd_giftool = on_alconna(
Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
)
)
@cmd_giftool.handle()
@ -80,81 +82,66 @@ async def _(
if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF")
frames = []
durations = []
total_duration = 0.0
##
# 从这里开始,采样整个 GIF 图
frames: list[PIL.Image.Image] = []
durations: list[float] = []
try:
for i in range(getattr(image, "n_frames")):
image.seek(i)
frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒
duration = image.info.get("duration", 100) / 1000
durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError:
pass
if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败")
# 采样结束
def time_to_frame_index(target_time: float) -> int:
if target_time <= 0:
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
##
# 根据开始、结束时间或者帧数量来裁取 GIF 图
start_frame = max(0, start_frame)
end_frame = min(len(frames) - 1, end_frame)
selected_frames = frames[start_frame : end_frame + 1]
selected_durations = durations[start_frame : end_frame + 1]
begin_time = ss or 0
end_time = sum(durations)
end_time = min(begin_time + (t or end_time), to or end_time, end_time)
if frame_count is not None and frame_count > 0:
if frame_count >= len(selected_frames):
pass
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
accumulated = 0.0
status = 0
output_img = BytesIO()
sel_frames: list[PIL.Image.Image] = []
sel_durations: list[float] = []
adjusted_durations = [
dur / speed_factor for dur in selected_durations
]
for i in range(len(frames)):
frame = frames[i]
duration = durations[i]
if status == 0:
if accumulated + duration > begin_time:
status = 1
sel_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
elif status == 1:
if accumulated + duration > end_time:
sel_frames.append(frame)
sel_durations.append(end_time - accumulated)
break
sel_frames.append(frame)
sel_durations.append(duration)
accumulated += duration
##
# 加速!
sel_durations = [dur / speed_factor * 1000 for dur in durations]
rframes = []
rdur = []
acc_mod_20 = 0
for i in range(len(selected_frames)):
fr = selected_frames[i]
du: float = adjusted_durations[i]
for i in range(len(sel_frames)):
fr = sel_frames[i]
du = round(sel_durations[i])
if du >= 20:
rframes.append(fr)
@ -170,10 +157,12 @@ async def _(
if acc_mod_20 >= 20:
acc_mod_20 = 0
if len(rframes) == 1 and len(selected_frames) > 1:
rframes.append(selected_frames[max(2, len(selected_frames) // 2)])
if len(rframes) == 1 and len(sel_frames) > 1:
rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
rdur.append(20)
##
# 收尾:看看透明度这块
transparency_flag = False
for f in rframes:
if f.mode == "RGBA":
@ -186,12 +175,13 @@ async def _(
tf = {}
if transparency_flag:
tf['transparency'] = 0
tf["transparency"] = 0
if is_rev:
rframes = rframes[::-1]
rdur = rdur[::-1]
output_img = BytesIO()
if rframes:
rframes[0].save(
output_img,

View File

@ -2,25 +2,52 @@ from io import BytesIO
from typing import Iterable, cast
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, Image, MultiVar, Option, Text,
UniMessage, UniMsg, on_alconna)
from nonebot_plugin_alconna import (
Alconna,
Args,
Field,
Image,
MultiVar,
Option,
Text,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display, draw_snaur_display
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
draw_geimao, draw_mnk,
draw_pt, draw_suan)
from konabot.plugins.memepack.drawing.display import (
draw_cao_display,
draw_snaur_display,
draw_anan_display,
)
from konabot.plugins.memepack.drawing.saying import (
draw_cute_ten,
draw_geimao,
draw_mnk,
draw_pt,
draw_suan,
)
from nonebot.adapters import Bot, Event
from returns.result import Success, Failure
geimao = on_alconna(Alconna(
"给猫说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写给猫说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"给猫哈"})
geimao = on_alconna(
Alconna(
"给猫说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写给猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"给猫哈"},
)
@geimao.handle()
async def _(saying: list[str]):
@ -31,12 +58,21 @@ async def _(saying: list[str]):
await geimao.send(await UniMessage().image(raw=img_bytes).export())
pt = on_alconna(Alconna(
"pt说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写小帕说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"小帕说"})
pt = on_alconna(
Alconna(
"pt说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小帕说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"小帕说"},
)
@pt.handle()
async def _(saying: list[str]):
@ -47,12 +83,21 @@ async def _(saying: list[str]):
await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna(
"re:小?黑白子?说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写黑白子说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
mnk = on_alconna(
Alconna(
"re:小?黑白子?说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写黑白子说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"mnk说"},
)
@mnk.handle()
async def _(saying: list[str]):
@ -63,12 +108,21 @@ async def _(saying: list[str]):
await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna(
"小蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写小蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
suan = on_alconna(
Alconna(
"小蒜说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@suan.handle()
async def _(saying: list[str]):
@ -79,12 +133,21 @@ async def _(saying: list[str]):
await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna(
"大蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写大蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
dsuan = on_alconna(
Alconna(
"大蒜说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写大蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@dsuan.handle()
async def _(saying: list[str]):
@ -95,12 +158,21 @@ async def _(saying: list[str]):
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna(
"乖猫说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写十猫说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"})
cutecat = on_alconna(
Alconna(
"乖猫说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写十猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"十猫说"},
)
@cutecat.handle()
async def _(saying: list[str]):
@ -113,13 +185,14 @@ async def _(saying: list[str]):
cao_display_cmd = on_message()
@cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示":
flag = True
elif text.text.strip() == '':
elif text.text.strip() == "":
continue
else:
return
@ -134,29 +207,71 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
case Failure(err):
await cao_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(' ')
.text(err)
.export()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)
snaur_display_cmd = on_alconna(Alconna(
"卵总展示",
Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--opacity", Args["opacity", float], alias=["-o"]),
Option("--saturation", Args["saturation", float], alias=["-s"]),
Args["image", Image | None],
))
snaur_display_cmd = on_alconna(
Alconna(
"卵总展示",
Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--opacity", Args["opacity", float], alias=["-o"]),
Option("--saturation", Args["saturation", float], alias=["-s"]),
Args["image", Image | None],
)
)
@snaur_display_cmd.handle()
async def _(img: PIL_Image, whiteness: float = 0.0, black_level: float = 0.2,
opacity: float = 0.8, saturation: float = 0.85):
async def _(
img: PIL_Image,
whiteness: float = 0.0,
black_level: float = 0.2,
opacity: float = 0.8,
saturation: float = 0.85,
):
img_processed = await draw_snaur_display(
img, whiteness, black_level, opacity, saturation,
img,
whiteness,
black_level,
opacity,
saturation,
)
img_data = BytesIO()
img_processed.save(img_data, "PNG")
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())
anan_display_cmd = on_message()
@anan_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
stripped = text.text.strip()
if stripped == "安安展示":
flag = True
elif stripped == "":
continue
else:
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_anan_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await anan_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)

View File

@ -27,6 +27,15 @@ SNAUR_QUAD_POINTS = np.float32(cast(Any, [
[106, 1280],
]))
anan_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_base.png")
anan_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_top.png")
ANAN_QUAD_POINTS = np.float32([
[157, 585],
[793, 599],
[781, 908],
[160, 908]
])
def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB"))
h, w = src.shape[:2]
@ -139,3 +148,52 @@ async def draw_snaur_display(
opacity, saturation,
)
def _draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
src = np.array(image.convert("RGBA"))
h, w = src.shape[:2]
src_points = np.float32([
[0, 0],
[w, 0],
[w, h],
[0, h]
])
dst_points = ANAN_QUAD_POINTS
M = cv2.getPerspectiveTransform(src_points, dst_points)
output_w, output_h = anan_image_top.size
src_rgb = cv2.cvtColor(src, cv2.COLOR_RGBA2RGB) if src.shape[2] == 4 else src
warped_rgb = cv2.warpPerspective(
src_rgb,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
mask = np.zeros((h, w), dtype=np.uint8)
mask[:, :] = 255
warped_mask = cv2.warpPerspective(
mask,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0
)
warped_rgba = cv2.cvtColor(warped_rgb, cv2.COLOR_RGB2RGBA)
warped_rgba[:, :, 3] = warped_mask
warped_pil = PIL.Image.fromarray(warped_rgba, 'RGBA')
result = PIL.Image.alpha_composite(anan_image_base, warped_pil)
result = PIL.Image.alpha_composite(result, anan_image_top)
return result
async def draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
return await asyncio.to_thread(_draw_anan_display, image)