Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0ca901e7b1 | |||
| d096f43d38 | |||
| 38ae3d1c74 | |||
| a0483d1d5c | |||
| ae83b66908 | |||
| 6abeb05a18 | |||
| 9b0a0368fa |
9
konabot/common/nb/exc.py
Normal file
9
konabot/common/nb/exc.py
Normal file
@ -0,0 +1,9 @@
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
|
||||
class BotExceptionMessage(Exception):
|
||||
def __init__(self, msg: UniMessage | str) -> None:
|
||||
super().__init__()
|
||||
if isinstance(msg, str):
|
||||
msg = UniMessage().text(msg)
|
||||
self.msg = msg
|
||||
@ -1,17 +1,23 @@
|
||||
from io import BytesIO
|
||||
from typing import Annotated
|
||||
|
||||
import httpx
|
||||
import PIL.Image
|
||||
from loguru import logger
|
||||
import nonebot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.adapters import Bot, Event, Message
|
||||
from nonebot.adapters.discord import Bot as DiscordBot
|
||||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
|
||||
import nonebot.params
|
||||
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||
from PIL import UnidentifiedImageError
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
|
||||
|
||||
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||
# if "/matcha/cache/" in url:
|
||||
@ -133,3 +139,21 @@ async def extract_image_from_message(
|
||||
else:
|
||||
return Failure("暂时不支持在这里中通过引用的方式获取图片")
|
||||
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||
|
||||
|
||||
async def _ext_img(
|
||||
evt: Event,
|
||||
bot: Bot,
|
||||
matcher: Matcher,
|
||||
) -> PIL.Image.Image | None:
|
||||
match await extract_image_from_message(evt.get_message(), evt, bot):
|
||||
case Success(img):
|
||||
return img
|
||||
case Failure(err):
|
||||
# raise BotExceptionMessage(err)
|
||||
await matcher.send(await UniMessage().text(err).export())
|
||||
return None
|
||||
assert False
|
||||
|
||||
|
||||
PIL_Image = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]
|
||||
|
||||
16
konabot/common/nb/match_keyword.py
Normal file
16
konabot/common/nb/match_keyword.py
Normal file
@ -0,0 +1,16 @@
|
||||
import re
|
||||
|
||||
from nonebot_plugin_alconna import Text, UniMsg
|
||||
|
||||
|
||||
def match_keyword(*patterns: str | re.Pattern):
|
||||
async def _matcher(msg: UniMsg):
|
||||
text = msg.get(Text).extract_plain_text().strip()
|
||||
for pattern in patterns:
|
||||
if isinstance(pattern, str) and text == pattern:
|
||||
return True
|
||||
if isinstance(pattern, re.Pattern) and re.match(pattern, text):
|
||||
return True
|
||||
return False
|
||||
|
||||
return _matcher
|
||||
13
konabot/common/nb/reply_image.py
Normal file
13
konabot/common/nb/reply_image.py
Normal file
@ -0,0 +1,13 @@
|
||||
from io import BytesIO
|
||||
|
||||
import PIL
|
||||
import PIL.Image
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
|
||||
async def reply_image(matcher: type[Matcher], bot: Bot, img: PIL.Image.Image):
|
||||
data = BytesIO()
|
||||
img.save(data, "PNG")
|
||||
await matcher.send(await UniMessage().image(raw=data).export(bot))
|
||||
59
konabot/docs/user/giftool.txt
Normal file
59
konabot/docs/user/giftool.txt
Normal file
@ -0,0 +1,59 @@
|
||||
指令介绍
|
||||
giftool - 对 GIF 动图进行裁剪、抽帧等处理
|
||||
|
||||
格式
|
||||
giftool [图片] [选项]
|
||||
|
||||
示例
|
||||
回复一张 GIF 并发送:
|
||||
`giftool --ss 1.5 -t 2.0`
|
||||
从 1.5 秒处开始,截取 2 秒长度的片段。
|
||||
|
||||
`giftool [图片] --ss 0:10 -to 0:15`
|
||||
截取从 10 秒到 15 秒之间的片段(支持 MM:SS 或 HH:MM:SS 格式)。
|
||||
|
||||
`giftool [图片] --frames:v 10`
|
||||
将整张 GIF 均匀抽帧,最终保留 10 帧。
|
||||
|
||||
`giftool [图片] --ss 2 --frames:v 5`
|
||||
从第 2 秒开始截取,并将结果抽帧为 5 帧。
|
||||
|
||||
参数说明
|
||||
图片(必需)
|
||||
- 必须是 GIF 动图。
|
||||
- 支持直接附带图片,或回复一条含 GIF 的消息后使用指令。
|
||||
|
||||
--ss <时间戳>(可选)
|
||||
- 指定开始时间(单位:秒),可使用以下格式:
|
||||
• 纯数字(如 `1.5` 表示 1.5 秒)
|
||||
• 分秒格式(如 `1:30` 表示 1 分 30 秒)
|
||||
• 时分秒格式(如 `0:1:30` 表示 1 分 30 秒)
|
||||
- 默认从开头开始(0 秒)。
|
||||
|
||||
-t <持续时间>(可选)
|
||||
- 指定截取的持续时间(单位:秒),格式同 --ss。
|
||||
- 与 --ss 配合使用:截取 [ss, ss + t] 区间。
|
||||
- 不能与 --to 同时使用。
|
||||
|
||||
--to <时间戳>(可选)
|
||||
- 指定结束时间(单位:秒),格式同 --ss。
|
||||
- 与 --ss 配合使用:截取 [ss, to] 区间。
|
||||
- 不能与 -t 同时使用。
|
||||
|
||||
--frames:v <帧数>(可选)
|
||||
- 对截取后的片段进行均匀抽帧,保留指定数量的帧。
|
||||
- 帧数必须为正整数(> 0)。
|
||||
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
|
||||
|
||||
--s <速度>(可选)
|
||||
- 调整 gif 图的速度
|
||||
|
||||
使用方式
|
||||
1. 发送指令前,请确保:
|
||||
- 消息中附带一张 GIF 动图,或
|
||||
- 回复一条包含 GIF 动图的消息后再发送指令。
|
||||
2. 插件会自动:
|
||||
- 解析 GIF 的每一帧及其持续时间(duration)
|
||||
- 根据时间参数转换为帧索引进行裁剪
|
||||
- 如指定抽帧,则对裁剪后的片段均匀采样
|
||||
- 生成新的 GIF 并保持原始循环设置(loop=0)
|
||||
5
konabot/docs/user/黑白.txt
Normal file
5
konabot/docs/user/黑白.txt
Normal file
@ -0,0 +1,5 @@
|
||||
指令介绍
|
||||
黑白 - 将图片经过一个黑白滤镜的处理
|
||||
|
||||
示例
|
||||
引用一个带有图片的消息,或者消息本身携带图片,然后发送「黑白」即可
|
||||
45
konabot/plugins/errman.py
Normal file
45
konabot/plugins/errman.py
Normal file
@ -0,0 +1,45 @@
|
||||
from typing import Any
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.message import run_postprocessor
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from returns.primitives.exceptions import UnwrapFailedError
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
|
||||
|
||||
@run_postprocessor
|
||||
async def _(bot: Bot, matcher: Matcher, exc: BotExceptionMessage | AssertionError | UnwrapFailedError):
|
||||
if isinstance(exc, BotExceptionMessage):
|
||||
msg = exc.msg
|
||||
await matcher.send(await msg.export(bot))
|
||||
if isinstance(exc, AssertionError):
|
||||
if exc.args:
|
||||
err_msg = exc.args[0]
|
||||
|
||||
err_msg_res: UniMessage
|
||||
if isinstance(err_msg, str):
|
||||
err_msg_res = UniMessage().text(err_msg)
|
||||
elif isinstance(err_msg, UniMessage):
|
||||
err_msg_res = err_msg
|
||||
else:
|
||||
return
|
||||
|
||||
await matcher.send(await err_msg_res.export(bot))
|
||||
if isinstance(exc, UnwrapFailedError):
|
||||
obj = exc.halted_container
|
||||
try:
|
||||
failure: Any = obj.failure()
|
||||
|
||||
err_msg_res: UniMessage
|
||||
if isinstance(failure, str):
|
||||
err_msg_res = UniMessage().text(failure)
|
||||
elif isinstance(failure, UniMessage):
|
||||
err_msg_res = failure
|
||||
else:
|
||||
return
|
||||
|
||||
await matcher.send(await err_msg_res.export(bot))
|
||||
except:
|
||||
pass
|
||||
160
konabot/plugins/image_process/__init__.py
Normal file
160
konabot/plugins/image_process/__init__.py
Normal file
@ -0,0 +1,160 @@
|
||||
import re
|
||||
from io import BytesIO
|
||||
|
||||
from nonebot import on_message
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
|
||||
on_alconna)
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.nb.extract_image import PIL_Image
|
||||
from konabot.common.nb.match_keyword import match_keyword
|
||||
from konabot.common.nb.reply_image import reply_image
|
||||
|
||||
cmd_black_white = on_message(rule=match_keyword("黑白"))
|
||||
|
||||
|
||||
@cmd_black_white.handle()
|
||||
async def _(img: PIL_Image, bot: Bot):
|
||||
await reply_image(cmd_black_white, bot, img.convert("LA"))
|
||||
|
||||
|
||||
def parse_timestamp(tx: str) -> float | None:
|
||||
res = 0.0
|
||||
for component in tx.split(":"):
|
||||
res *= 60
|
||||
if not re.match(r"^\d+(\.\d+)?$", component):
|
||||
return
|
||||
res += float(component)
|
||||
return res
|
||||
|
||||
|
||||
cmd_giftool = on_alconna(Alconna(
|
||||
"giftool",
|
||||
Args["img", Image | None],
|
||||
Option("--ss", Args["start_point", str]),
|
||||
Option("--frames:v", Args["frame_count", int]),
|
||||
Option("-t", Args["length", str]),
|
||||
Option("-to", Args["end_point", str]),
|
||||
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
|
||||
))
|
||||
|
||||
|
||||
@cmd_giftool.handle()
|
||||
async def _(
|
||||
image: PIL_Image,
|
||||
start_point: str | None = None,
|
||||
frame_count: int | None = None,
|
||||
length: str | None = None,
|
||||
speed_factor: float = 1.0,
|
||||
end_point: str | None = None,
|
||||
):
|
||||
ss: None | float = None
|
||||
if start_point:
|
||||
ss = parse_timestamp(start_point)
|
||||
if ss is None:
|
||||
raise BotExceptionMessage("--ss 的格式不满足条件")
|
||||
|
||||
t: None | float = None
|
||||
if length:
|
||||
t = parse_timestamp(length)
|
||||
if t is None:
|
||||
raise BotExceptionMessage("-t 的格式不满足条件")
|
||||
|
||||
to: None | float = None
|
||||
if end_point:
|
||||
to = parse_timestamp(end_point)
|
||||
if to is None:
|
||||
raise BotExceptionMessage("-to 的格式不满足条件")
|
||||
|
||||
if to is not None and ss is not None and to <= ss:
|
||||
raise BotExceptionMessage("错误:出点时间小于入点")
|
||||
if frame_count is not None and frame_count <= 0:
|
||||
raise BotExceptionMessage("错误:帧数量应该大于 0")
|
||||
if speed_factor <= 0:
|
||||
raise BotExceptionMessage("错误:--speed 必须大于 0")
|
||||
|
||||
if not getattr(image, "is_animated", False):
|
||||
raise BotExceptionMessage("错误:输入的不是动图(GIF)")
|
||||
|
||||
frames = []
|
||||
durations = []
|
||||
total_duration = 0.0
|
||||
|
||||
try:
|
||||
for i in range(getattr(image, "n_frames")):
|
||||
image.seek(i)
|
||||
frames.append(image.copy())
|
||||
duration = image.info.get("duration", 100) # 单位:毫秒
|
||||
durations.append(duration)
|
||||
total_duration += duration / 1000.0 # 转为秒
|
||||
except EOFError:
|
||||
pass
|
||||
|
||||
if not frames:
|
||||
raise BotExceptionMessage("错误:读取 GIF 帧失败")
|
||||
|
||||
def time_to_frame_index(target_time: float) -> int:
|
||||
if target_time <= 0:
|
||||
return 0
|
||||
cum = 0.0
|
||||
for idx, dur in enumerate(durations):
|
||||
cum += dur / 1000.0
|
||||
if cum >= target_time:
|
||||
return min(idx, len(frames) - 1)
|
||||
return len(frames) - 1
|
||||
start_frame = 0
|
||||
end_frame = len(frames) - 1
|
||||
if ss is not None:
|
||||
start_frame = time_to_frame_index(ss)
|
||||
if to is not None:
|
||||
end_frame = time_to_frame_index(to)
|
||||
if end_frame < start_frame:
|
||||
end_frame = start_frame
|
||||
elif t is not None:
|
||||
end_time = (ss or 0.0) + t
|
||||
end_frame = time_to_frame_index(end_time)
|
||||
if end_frame < start_frame:
|
||||
end_frame = start_frame
|
||||
|
||||
start_frame = max(0, start_frame)
|
||||
end_frame = min(len(frames) - 1, end_frame)
|
||||
selected_frames = frames[start_frame : end_frame + 1]
|
||||
selected_durations = durations[start_frame : end_frame + 1]
|
||||
|
||||
if frame_count is not None and frame_count > 0:
|
||||
if frame_count >= len(selected_frames):
|
||||
pass
|
||||
else:
|
||||
step = len(selected_frames) / frame_count
|
||||
sampled_frames = []
|
||||
sampled_durations = []
|
||||
for i in range(frame_count):
|
||||
idx = int(i * step)
|
||||
sampled_frames.append(selected_frames[idx])
|
||||
sampled_durations.append(
|
||||
sum(selected_durations) // len(selected_durations)
|
||||
)
|
||||
selected_frames = sampled_frames
|
||||
selected_durations = sampled_durations
|
||||
|
||||
output_img = BytesIO()
|
||||
|
||||
adjusted_durations = [
|
||||
max(10, int(dur / speed_factor)) for dur in selected_durations
|
||||
]
|
||||
|
||||
if selected_frames:
|
||||
selected_frames[0].save(
|
||||
output_img,
|
||||
format="GIF",
|
||||
save_all=True,
|
||||
append_images=selected_frames[1:],
|
||||
duration=adjusted_durations,
|
||||
loop=0,
|
||||
)
|
||||
else:
|
||||
raise BotExceptionMessage("错误:没有可输出的帧")
|
||||
output_img.seek(0)
|
||||
|
||||
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())
|
||||
@ -1,17 +1,11 @@
|
||||
from io import BytesIO
|
||||
from typing import Optional
|
||||
|
||||
from PIL import Image
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from loguru import logger
|
||||
from nonebot.adapters import Bot as BaseBot
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
Args,
|
||||
Field,
|
||||
UniMessage,
|
||||
on_alconna,
|
||||
)
|
||||
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
|
||||
from PIL import Image
|
||||
from returns.result import Failure, Success
|
||||
|
||||
from konabot.common.nb.extract_image import extract_image_from_message
|
||||
@ -56,29 +50,6 @@ ytpgif_cmd = on_alconna(
|
||||
)
|
||||
|
||||
|
||||
async def get_image_url(event: BaseEvent) -> Optional[str]:
|
||||
"""从事件中提取图片 URL,支持直接消息和回复"""
|
||||
msg = event.get_message()
|
||||
for seg in msg:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
return str(seg.data["url"])
|
||||
|
||||
if hasattr(event, "reply") and (reply := getattr(event, "reply")):
|
||||
reply_msg = reply.message
|
||||
for seg in reply_msg:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
return str(seg.data["url"])
|
||||
return None
|
||||
|
||||
|
||||
async def download_image(url: str) -> bytes:
|
||||
import httpx
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.get(url, timeout=10)
|
||||
resp.raise_for_status()
|
||||
return resp.content
|
||||
|
||||
|
||||
def resize_frame(frame: Image.Image) -> Image.Image:
|
||||
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
|
||||
w, h = frame.size
|
||||
@ -117,16 +88,17 @@ async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
|
||||
try:
|
||||
n_frames = getattr(src_img, "n_frames", 1)
|
||||
is_animated = n_frames > 1
|
||||
logger.debug(f"收到的动图的运动状态:{is_animated} 帧数量:{n_frames}")
|
||||
except Exception:
|
||||
is_animated = False
|
||||
n_frames = 1
|
||||
n_frames = 1
|
||||
|
||||
output_frames = []
|
||||
output_durations_ms = []
|
||||
|
||||
if is_animated:
|
||||
# === 动图模式:截取正向 + 镜像两段 ===
|
||||
frames_with_duration = []
|
||||
frames_with_duration: list[tuple[Image.Image, float]] = []
|
||||
palette = src_img.getpalette()
|
||||
|
||||
for idx in range(n_frames):
|
||||
@ -148,6 +120,7 @@ async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
|
||||
try:
|
||||
resized_frame.putpalette(palette)
|
||||
except Exception: # noqa
|
||||
logger.debug("色板应用失败")
|
||||
pass
|
||||
|
||||
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
|
||||
|
||||
@ -8,9 +8,12 @@ nonebot.load_plugins("konabot/plugins")
|
||||
|
||||
plugins = nonebot.get_loaded_plugins()
|
||||
len_requires = len(
|
||||
[f for f in (
|
||||
Path(__file__).parent.parent / "konabot" / "plugins"
|
||||
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
|
||||
[
|
||||
f
|
||||
for f in (Path(__file__).parent.parent / "konabot" / "plugins").iterdir()
|
||||
if (f.is_dir() and (f / "__init__.py").exists())
|
||||
or ((not f.is_dir()) and f.suffix == ".py")
|
||||
]
|
||||
)
|
||||
|
||||
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]
|
||||
|
||||
Reference in New Issue
Block a user