Compare commits

..

7 Commits

Author SHA1 Message Date
0ca901e7b1 添加 giftool
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 12:47:52 +08:00
d096f43d38 添加 giftool 2025-10-12 12:40:33 +08:00
38ae3d1c74 补充黑白的 man 2025-10-12 12:04:19 +08:00
a0483d1d5c 修复断言逻辑
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 11:52:41 +08:00
ae83b66908 添加图像黑白
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-12 11:50:15 +08:00
6abeb05a18 去除未使用的函数 2025-10-12 11:02:51 +08:00
9b0a0368fa 修改 YTPGIF 的功能问题
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 10:55:44 +08:00
10 changed files with 345 additions and 38 deletions

9
konabot/common/nb/exc.py Normal file
View File

@ -0,0 +1,9 @@
from nonebot_plugin_alconna import UniMessage
class BotExceptionMessage(Exception):
def __init__(self, msg: UniMessage | str) -> None:
super().__init__()
if isinstance(msg, str):
msg = UniMessage().text(msg)
self.msg = msg

View File

@ -1,17 +1,23 @@
from io import BytesIO
from typing import Annotated
import httpx
import PIL.Image
from loguru import logger
import nonebot
from nonebot.matcher import Matcher
from nonebot.adapters import Bot, Event, Message
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
import nonebot.params
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
from PIL import UnidentifiedImageError
from returns.result import Failure, Result, Success
from konabot.common.nb.exc import BotExceptionMessage
async def download_image_bytes(url: str) -> Result[bytes, str]:
# if "/matcha/cache/" in url:
@ -133,3 +139,21 @@ async def extract_image_from_message(
else:
return Failure("暂时不支持在这里中通过引用的方式获取图片")
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
async def _ext_img(
evt: Event,
bot: Bot,
matcher: Matcher,
) -> PIL.Image.Image | None:
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
return img
case Failure(err):
# raise BotExceptionMessage(err)
await matcher.send(await UniMessage().text(err).export())
return None
assert False
PIL_Image = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]

View File

@ -0,0 +1,16 @@
import re
from nonebot_plugin_alconna import Text, UniMsg
def match_keyword(*patterns: str | re.Pattern):
async def _matcher(msg: UniMsg):
text = msg.get(Text).extract_plain_text().strip()
for pattern in patterns:
if isinstance(pattern, str) and text == pattern:
return True
if isinstance(pattern, re.Pattern) and re.match(pattern, text):
return True
return False
return _matcher

View File

@ -0,0 +1,13 @@
from io import BytesIO
import PIL
import PIL.Image
from nonebot.adapters import Bot
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import UniMessage
async def reply_image(matcher: type[Matcher], bot: Bot, img: PIL.Image.Image):
data = BytesIO()
img.save(data, "PNG")
await matcher.send(await UniMessage().image(raw=data).export(bot))

View File

@ -0,0 +1,59 @@
指令介绍
giftool - 对 GIF 动图进行裁剪、抽帧等处理
格式
giftool [图片] [选项]
示例
回复一张 GIF 并发送:
`giftool --ss 1.5 -t 2.0`
从 1.5 秒处开始,截取 2 秒长度的片段。
`giftool [图片] --ss 0:10 -to 0:15`
截取从 10 秒到 15 秒之间的片段(支持 MM:SS 或 HH:MM:SS 格式)。
`giftool [图片] --frames:v 10`
将整张 GIF 均匀抽帧,最终保留 10 帧。
`giftool [图片] --ss 2 --frames:v 5`
从第 2 秒开始截取,并将结果抽帧为 5 帧。
参数说明
图片(必需)
- 必须是 GIF 动图。
- 支持直接附带图片,或回复一条含 GIF 的消息后使用指令。
--ss <时间戳>(可选)
- 指定开始时间(单位:秒),可使用以下格式:
• 纯数字(如 `1.5` 表示 1.5 秒)
• 分秒格式(如 `1:30` 表示 1 分 30 秒)
• 时分秒格式(如 `0:1:30` 表示 1 分 30 秒)
- 默认从开头开始0 秒)。
-t <持续时间>(可选)
- 指定截取的持续时间(单位:秒),格式同 --ss。
- 与 --ss 配合使用:截取 [ss, ss + t] 区间。
- 不能与 --to 同时使用。
--to <时间戳>(可选)
- 指定结束时间(单位:秒),格式同 --ss。
- 与 --ss 配合使用:截取 [ss, to] 区间。
- 不能与 -t 同时使用。
--frames:v <帧数>(可选)
- 对截取后的片段进行均匀抽帧,保留指定数量的帧。
- 帧数必须为正整数(> 0
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
--s <速度>(可选)
- 调整 gif 图的速度
使用方式
1. 发送指令前,请确保:
- 消息中附带一张 GIF 动图,或
- 回复一条包含 GIF 动图的消息后再发送指令。
2. 插件会自动:
- 解析 GIF 的每一帧及其持续时间duration
- 根据时间参数转换为帧索引进行裁剪
- 如指定抽帧,则对裁剪后的片段均匀采样
- 生成新的 GIF 并保持原始循环设置loop=0

View File

@ -0,0 +1,5 @@
指令介绍
黑白 - 将图片经过一个黑白滤镜的处理
示例
引用一个带有图片的消息,或者消息本身携带图片,然后发送「黑白」即可

45
konabot/plugins/errman.py Normal file
View File

@ -0,0 +1,45 @@
from typing import Any
from nonebot.adapters import Bot
from nonebot.matcher import Matcher
from nonebot.message import run_postprocessor
from nonebot_plugin_alconna import UniMessage
from returns.primitives.exceptions import UnwrapFailedError
from konabot.common.nb.exc import BotExceptionMessage
@run_postprocessor
async def _(bot: Bot, matcher: Matcher, exc: BotExceptionMessage | AssertionError | UnwrapFailedError):
if isinstance(exc, BotExceptionMessage):
msg = exc.msg
await matcher.send(await msg.export(bot))
if isinstance(exc, AssertionError):
if exc.args:
err_msg = exc.args[0]
err_msg_res: UniMessage
if isinstance(err_msg, str):
err_msg_res = UniMessage().text(err_msg)
elif isinstance(err_msg, UniMessage):
err_msg_res = err_msg
else:
return
await matcher.send(await err_msg_res.export(bot))
if isinstance(exc, UnwrapFailedError):
obj = exc.halted_container
try:
failure: Any = obj.failure()
err_msg_res: UniMessage
if isinstance(failure, str):
err_msg_res = UniMessage().text(failure)
elif isinstance(failure, UniMessage):
err_msg_res = failure
else:
return
await matcher.send(await err_msg_res.export(bot))
except:
pass

View File

@ -0,0 +1,160 @@
import re
from io import BytesIO
from nonebot import on_message
from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
on_alconna)
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image
from konabot.common.nb.match_keyword import match_keyword
from konabot.common.nb.reply_image import reply_image
cmd_black_white = on_message(rule=match_keyword("黑白"))
@cmd_black_white.handle()
async def _(img: PIL_Image, bot: Bot):
await reply_image(cmd_black_white, bot, img.convert("LA"))
def parse_timestamp(tx: str) -> float | None:
res = 0.0
for component in tx.split(":"):
res *= 60
if not re.match(r"^\d+(\.\d+)?$", component):
return
res += float(component)
return res
cmd_giftool = on_alconna(Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
))
@cmd_giftool.handle()
async def _(
image: PIL_Image,
start_point: str | None = None,
frame_count: int | None = None,
length: str | None = None,
speed_factor: float = 1.0,
end_point: str | None = None,
):
ss: None | float = None
if start_point:
ss = parse_timestamp(start_point)
if ss is None:
raise BotExceptionMessage("--ss 的格式不满足条件")
t: None | float = None
if length:
t = parse_timestamp(length)
if t is None:
raise BotExceptionMessage("-t 的格式不满足条件")
to: None | float = None
if end_point:
to = parse_timestamp(end_point)
if to is None:
raise BotExceptionMessage("-to 的格式不满足条件")
if to is not None and ss is not None and to <= ss:
raise BotExceptionMessage("错误:出点时间小于入点")
if frame_count is not None and frame_count <= 0:
raise BotExceptionMessage("错误:帧数量应该大于 0")
if speed_factor <= 0:
raise BotExceptionMessage("错误:--speed 必须大于 0")
if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF")
frames = []
durations = []
total_duration = 0.0
try:
for i in range(getattr(image, "n_frames")):
image.seek(i)
frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒
durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError:
pass
if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败")
def time_to_frame_index(target_time: float) -> int:
if target_time <= 0:
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
start_frame = max(0, start_frame)
end_frame = min(len(frames) - 1, end_frame)
selected_frames = frames[start_frame : end_frame + 1]
selected_durations = durations[start_frame : end_frame + 1]
if frame_count is not None and frame_count > 0:
if frame_count >= len(selected_frames):
pass
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
output_img = BytesIO()
adjusted_durations = [
max(10, int(dur / speed_factor)) for dur in selected_durations
]
if selected_frames:
selected_frames[0].save(
output_img,
format="GIF",
save_all=True,
append_images=selected_frames[1:],
duration=adjusted_durations,
loop=0,
)
else:
raise BotExceptionMessage("错误:没有可输出的帧")
output_img.seek(0)
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())

View File

@ -1,17 +1,11 @@
from io import BytesIO
from typing import Optional
from PIL import Image
from nonebot.adapters import Event as BaseEvent
from loguru import logger
from nonebot.adapters import Bot as BaseBot
from nonebot.adapters import Event as BaseEvent
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
Args,
Field,
UniMessage,
on_alconna,
)
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
from PIL import Image
from returns.result import Failure, Success
from konabot.common.nb.extract_image import extract_image_from_message
@ -56,29 +50,6 @@ ytpgif_cmd = on_alconna(
)
async def get_image_url(event: BaseEvent) -> Optional[str]:
"""从事件中提取图片 URL支持直接消息和回复"""
msg = event.get_message()
for seg in msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
if hasattr(event, "reply") and (reply := getattr(event, "reply")):
reply_msg = reply.message
for seg in reply_msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
return None
async def download_image(url: str) -> bytes:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=10)
resp.raise_for_status()
return resp.content
def resize_frame(frame: Image.Image) -> Image.Image:
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
w, h = frame.size
@ -117,16 +88,17 @@ async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
try:
n_frames = getattr(src_img, "n_frames", 1)
is_animated = n_frames > 1
logger.debug(f"收到的动图的运动状态:{is_animated} 帧数量:{n_frames}")
except Exception:
is_animated = False
n_frames = 1
n_frames = 1
output_frames = []
output_durations_ms = []
if is_animated:
# === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration = []
frames_with_duration: list[tuple[Image.Image, float]] = []
palette = src_img.getpalette()
for idx in range(n_frames):
@ -148,6 +120,7 @@ async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
try:
resized_frame.putpalette(palette)
except Exception: # noqa
logger.debug("色板应用失败")
pass
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))

View File

@ -8,9 +8,12 @@ nonebot.load_plugins("konabot/plugins")
plugins = nonebot.get_loaded_plugins()
len_requires = len(
[f for f in (
Path(__file__).parent.parent / "konabot" / "plugins"
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
[
f
for f in (Path(__file__).parent.parent / "konabot" / "plugins").iterdir()
if (f.is_dir() and (f / "__init__.py").exists())
or ((not f.is_dir()) and f.suffix == ".py")
]
)
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]