Compare commits

...

1 Commits

Author SHA1 Message Date
f9a0249772 优化 giftool 的截取逻辑
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 18:31:14 +08:00
2 changed files with 201 additions and 127 deletions

View File

@ -1,10 +1,10 @@
import re import re
from io import BytesIO from io import BytesIO
import PIL.Image
from nonebot import on_message from nonebot import on_message
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage, from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
on_alconna)
from konabot.common.nb.exc import BotExceptionMessage from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image from konabot.common.nb.extract_image import PIL_Image
@ -29,15 +29,17 @@ def parse_timestamp(tx: str) -> float | None:
return res return res
cmd_giftool = on_alconna(Alconna( cmd_giftool = on_alconna(
"giftool", Alconna(
Args["img", Image | None], "giftool",
Option("--ss", Args["start_point", str]), Args["img", Image | None],
Option("--frames:v", Args["frame_count", int]), Option("--ss", Args["start_point", str]),
Option("-t", Args["length", str]), Option("--frames:v", Args["frame_count", int]),
Option("-to", Args["end_point", str]), Option("-t", Args["length", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]), Option("-to", Args["end_point", str]),
)) Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
)
)
@cmd_giftool.handle() @cmd_giftool.handle()
@ -80,81 +82,66 @@ async def _(
if not getattr(image, "is_animated", False): if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF") raise BotExceptionMessage("错误输入的不是动图GIF")
frames = [] ##
durations = [] # 从这里开始,采样整个 GIF 图
total_duration = 0.0 frames: list[PIL.Image.Image] = []
durations: list[float] = []
try: try:
for i in range(getattr(image, "n_frames")): for i in range(getattr(image, "n_frames")):
image.seek(i) image.seek(i)
frames.append(image.copy()) frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒 duration = image.info.get("duration", 100) / 1000
durations.append(duration) durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError: except EOFError:
pass pass
if not frames: if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败") raise BotExceptionMessage("错误:读取 GIF 帧失败")
# 采样结束
def time_to_frame_index(target_time: float) -> int: ##
if target_time <= 0: # 根据开始、结束时间或者帧数量来裁取 GIF 图
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
start_frame = max(0, start_frame) begin_time = ss or 0
end_frame = min(len(frames) - 1, end_frame) end_time = sum(durations)
selected_frames = frames[start_frame : end_frame + 1] end_time = min(begin_time + (t or end_time), to or end_time, end_time)
selected_durations = durations[start_frame : end_frame + 1]
if frame_count is not None and frame_count > 0: accumulated = 0.0
if frame_count >= len(selected_frames): status = 0
pass
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
output_img = BytesIO() sel_frames: list[PIL.Image.Image] = []
sel_durations: list[float] = []
adjusted_durations = [ for i in range(len(frames)):
dur / speed_factor for dur in selected_durations frame = frames[i]
] duration = durations[i]
if status == 0:
if accumulated + duration > begin_time:
status = 1
sel_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
elif status == 1:
if accumulated + duration > end_time:
sel_frames.append(frame)
sel_durations.append(end_time - accumulated)
break
sel_frames.append(frame)
sel_durations.append(duration)
accumulated += duration
##
# 加速!
sel_durations = [dur / speed_factor * 1000 for dur in durations]
rframes = [] rframes = []
rdur = [] rdur = []
acc_mod_20 = 0 acc_mod_20 = 0
for i in range(len(selected_frames)): for i in range(len(sel_frames)):
fr = selected_frames[i] fr = sel_frames[i]
du: float = adjusted_durations[i] du = round(sel_durations[i])
if du >= 20: if du >= 20:
rframes.append(fr) rframes.append(fr)
@ -170,10 +157,12 @@ async def _(
if acc_mod_20 >= 20: if acc_mod_20 >= 20:
acc_mod_20 = 0 acc_mod_20 = 0
if len(rframes) == 1 and len(selected_frames) > 1: if len(rframes) == 1 and len(sel_frames) > 1:
rframes.append(selected_frames[max(2, len(selected_frames) // 2)]) rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
rdur.append(20) rdur.append(20)
##
# 收尾:看看透明度这块
transparency_flag = False transparency_flag = False
for f in rframes: for f in rframes:
if f.mode == "RGBA": if f.mode == "RGBA":
@ -186,12 +175,13 @@ async def _(
tf = {} tf = {}
if transparency_flag: if transparency_flag:
tf['transparency'] = 0 tf["transparency"] = 0
if is_rev: if is_rev:
rframes = rframes[::-1] rframes = rframes[::-1]
rdur = rdur[::-1] rdur = rdur[::-1]
output_img = BytesIO()
if rframes: if rframes:
rframes[0].save( rframes[0].save(
output_img, output_img,

View File

@ -2,25 +2,51 @@ from io import BytesIO
from typing import Iterable, cast from typing import Iterable, cast
from nonebot import on_message from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, Image, MultiVar, Option, Text, from nonebot_plugin_alconna import (
UniMessage, UniMsg, on_alconna) Alconna,
Args,
Field,
Image,
MultiVar,
Option,
Text,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display, draw_snaur_display from konabot.plugins.memepack.drawing.display import (
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten, draw_cao_display,
draw_geimao, draw_mnk, draw_snaur_display,
draw_pt, draw_suan) )
from konabot.plugins.memepack.drawing.saying import (
draw_cute_ten,
draw_geimao,
draw_mnk,
draw_pt,
draw_suan,
)
from nonebot.adapters import Bot, Event from nonebot.adapters import Bot, Event
from returns.result import Success, Failure from returns.result import Success, Failure
geimao = on_alconna(Alconna( geimao = on_alconna(
"给猫说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "给猫说",
missing_tips=lambda: "你没有写给猫说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"给猫哈"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写给猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"给猫哈"},
)
@geimao.handle() @geimao.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -31,12 +57,21 @@ async def _(saying: list[str]):
await geimao.send(await UniMessage().image(raw=img_bytes).export()) await geimao.send(await UniMessage().image(raw=img_bytes).export())
pt = on_alconna(Alconna( pt = on_alconna(
"pt说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "pt说",
missing_tips=lambda: "你没有写小帕说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"小帕说"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小帕说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"小帕说"},
)
@pt.handle() @pt.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -47,12 +82,21 @@ async def _(saying: list[str]):
await pt.send(await UniMessage().image(raw=img_bytes).export()) await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna( mnk = on_alconna(
"re:小?黑白子?说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "re:小?黑白子?说",
missing_tips=lambda: "你没有写黑白子说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写黑白子说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"mnk说"},
)
@mnk.handle() @mnk.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -63,12 +107,21 @@ async def _(saying: list[str]):
await mnk.send(await UniMessage().image(raw=img_bytes).export()) await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna( suan = on_alconna(
"小蒜说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "小蒜说",
missing_tips=lambda: "你没有写小蒜说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set()) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@suan.handle() @suan.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -79,12 +132,21 @@ async def _(saying: list[str]):
await suan.send(await UniMessage().image(raw=img_bytes).export()) await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna( dsuan = on_alconna(
"大蒜说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "大蒜说",
missing_tips=lambda: "你没有写大蒜说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set()) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写大蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@dsuan.handle() @dsuan.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -95,12 +157,21 @@ async def _(saying: list[str]):
await dsuan.send(await UniMessage().image(raw=img_bytes).export()) await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna( cutecat = on_alconna(
"乖猫说", Alconna(
Args["saying", MultiVar(str, '+'), Field( "乖猫说",
missing_tips=lambda: "你没有写十猫说了什么" Args[
)] "saying",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"}) MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写十猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"十猫说"},
)
@cutecat.handle() @cutecat.handle()
async def _(saying: list[str]): async def _(saying: list[str]):
@ -113,13 +184,14 @@ async def _(saying: list[str]):
cao_display_cmd = on_message() cao_display_cmd = on_message()
@cao_display_cmd.handle() @cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot): async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False flag = False
for text in cast(Iterable[Text], msg.get(Text)): for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示": if text.text.strip() == "小槽展示":
flag = True flag = True
elif text.text.strip() == '': elif text.text.strip() == "":
continue continue
else: else:
return return
@ -134,27 +206,39 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
case Failure(err): case Failure(err):
await cao_display_cmd.send( await cao_display_cmd.send(
await UniMessage() await UniMessage()
.at(user_id=evt.get_user_id()) .at(user_id=evt.get_user_id())
.text(' ') .text(" ")
.text(err) .text(err)
.export() .export()
) )
snaur_display_cmd = on_alconna(Alconna( snaur_display_cmd = on_alconna(
"卵总展示", Alconna(
Option("--whiteness", Args["whiteness", float], alias=["-w"]), "卵总展示",
Option("--black-level", Args["black_level", float], alias=["-b"]), Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--opacity", Args["opacity", float], alias=["-o"]), Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--saturation", Args["saturation", float], alias=["-s"]), Option("--opacity", Args["opacity", float], alias=["-o"]),
Args["image", Image | None], Option("--saturation", Args["saturation", float], alias=["-s"]),
)) Args["image", Image | None],
)
)
@snaur_display_cmd.handle() @snaur_display_cmd.handle()
async def _(img: PIL_Image, whiteness: float = 0.0, black_level: float = 0.2, async def _(
opacity: float = 0.8, saturation: float = 0.85): img: PIL_Image,
whiteness: float = 0.0,
black_level: float = 0.2,
opacity: float = 0.8,
saturation: float = 0.85,
):
img_processed = await draw_snaur_display( img_processed = await draw_snaur_display(
img, whiteness, black_level, opacity, saturation, img,
whiteness,
black_level,
opacity,
saturation,
) )
img_data = BytesIO() img_data = BytesIO()
img_processed.save(img_data, "PNG") img_processed.save(img_data, "PNG")