forked from mttu-developers/konabot
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| af566888ab | |||
| e72bc283f8 | |||
| c9d58e7498 | |||
| 627a48da1c | |||
| 87be1916ee | |||
| 0ca901e7b1 | |||
| d096f43d38 | |||
| 38ae3d1c74 | |||
| a0483d1d5c | |||
| ae83b66908 | |||
| 6abeb05a18 |
@ -4,6 +4,15 @@ FROM python:3.13-slim AS base
|
||||
ENV VIRTUAL_ENV=/app/.venv \
|
||||
PATH="/app/.venv/bin:$PATH"
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends \
|
||||
libfontconfig1 \
|
||||
libgl1 \
|
||||
libegl1 \
|
||||
libglvnd0 \
|
||||
mesa-vulkan-drivers \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
|
||||
FROM base AS builder
|
||||
|
||||
9
konabot/common/nb/exc.py
Normal file
9
konabot/common/nb/exc.py
Normal file
@ -0,0 +1,9 @@
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
|
||||
class BotExceptionMessage(Exception):
|
||||
def __init__(self, msg: UniMessage | str) -> None:
|
||||
super().__init__()
|
||||
if isinstance(msg, str):
|
||||
msg = UniMessage().text(msg)
|
||||
self.msg = msg
|
||||
@ -1,17 +1,23 @@
|
||||
from io import BytesIO
|
||||
from typing import Annotated
|
||||
|
||||
import httpx
|
||||
import PIL.Image
|
||||
from loguru import logger
|
||||
import nonebot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.adapters import Bot, Event, Message
|
||||
from nonebot.adapters.discord import Bot as DiscordBot
|
||||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
|
||||
import nonebot.params
|
||||
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||
from PIL import UnidentifiedImageError
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
|
||||
|
||||
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||
# if "/matcha/cache/" in url:
|
||||
@ -133,3 +139,21 @@ async def extract_image_from_message(
|
||||
else:
|
||||
return Failure("暂时不支持在这里中通过引用的方式获取图片")
|
||||
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||
|
||||
|
||||
async def _ext_img(
|
||||
evt: Event,
|
||||
bot: Bot,
|
||||
matcher: Matcher,
|
||||
) -> PIL.Image.Image | None:
|
||||
match await extract_image_from_message(evt.get_message(), evt, bot):
|
||||
case Success(img):
|
||||
return img
|
||||
case Failure(err):
|
||||
# raise BotExceptionMessage(err)
|
||||
await matcher.send(await UniMessage().text(err).export())
|
||||
return None
|
||||
assert False
|
||||
|
||||
|
||||
PIL_Image = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]
|
||||
|
||||
16
konabot/common/nb/match_keyword.py
Normal file
16
konabot/common/nb/match_keyword.py
Normal file
@ -0,0 +1,16 @@
|
||||
import re
|
||||
|
||||
from nonebot_plugin_alconna import Text, UniMsg
|
||||
|
||||
|
||||
def match_keyword(*patterns: str | re.Pattern):
|
||||
async def _matcher(msg: UniMsg):
|
||||
text = msg.get(Text).extract_plain_text().strip()
|
||||
for pattern in patterns:
|
||||
if isinstance(pattern, str) and text == pattern:
|
||||
return True
|
||||
if isinstance(pattern, re.Pattern) and re.match(pattern, text):
|
||||
return True
|
||||
return False
|
||||
|
||||
return _matcher
|
||||
13
konabot/common/nb/reply_image.py
Normal file
13
konabot/common/nb/reply_image.py
Normal file
@ -0,0 +1,13 @@
|
||||
from io import BytesIO
|
||||
|
||||
import PIL
|
||||
import PIL.Image
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
|
||||
async def reply_image(matcher: type[Matcher], bot: Bot, img: PIL.Image.Image):
|
||||
data = BytesIO()
|
||||
img.save(data, "PNG")
|
||||
await matcher.send(await UniMessage().image(raw=data).export(bot))
|
||||
59
konabot/docs/user/giftool.txt
Normal file
59
konabot/docs/user/giftool.txt
Normal file
@ -0,0 +1,59 @@
|
||||
指令介绍
|
||||
giftool - 对 GIF 动图进行裁剪、抽帧等处理
|
||||
|
||||
格式
|
||||
giftool [图片] [选项]
|
||||
|
||||
示例
|
||||
回复一张 GIF 并发送:
|
||||
`giftool --ss 1.5 -t 2.0`
|
||||
从 1.5 秒处开始,截取 2 秒长度的片段。
|
||||
|
||||
`giftool [图片] --ss 0:10 -to 0:15`
|
||||
截取从 10 秒到 15 秒之间的片段(支持 MM:SS 或 HH:MM:SS 格式)。
|
||||
|
||||
`giftool [图片] --frames:v 10`
|
||||
将整张 GIF 均匀抽帧,最终保留 10 帧。
|
||||
|
||||
`giftool [图片] --ss 2 --frames:v 5`
|
||||
从第 2 秒开始截取,并将结果抽帧为 5 帧。
|
||||
|
||||
参数说明
|
||||
图片(必需)
|
||||
- 必须是 GIF 动图。
|
||||
- 支持直接附带图片,或回复一条含 GIF 的消息后使用指令。
|
||||
|
||||
--ss <时间戳>(可选)
|
||||
- 指定开始时间(单位:秒),可使用以下格式:
|
||||
• 纯数字(如 `1.5` 表示 1.5 秒)
|
||||
• 分秒格式(如 `1:30` 表示 1 分 30 秒)
|
||||
• 时分秒格式(如 `0:1:30` 表示 1 分 30 秒)
|
||||
- 默认从开头开始(0 秒)。
|
||||
|
||||
-t <持续时间>(可选)
|
||||
- 指定截取的持续时间(单位:秒),格式同 --ss。
|
||||
- 与 --ss 配合使用:截取 [ss, ss + t] 区间。
|
||||
- 不能与 --to 同时使用。
|
||||
|
||||
--to <时间戳>(可选)
|
||||
- 指定结束时间(单位:秒),格式同 --ss。
|
||||
- 与 --ss 配合使用:截取 [ss, to] 区间。
|
||||
- 不能与 -t 同时使用。
|
||||
|
||||
--frames:v <帧数>(可选)
|
||||
- 对截取后的片段进行均匀抽帧,保留指定数量的帧。
|
||||
- 帧数必须为正整数(> 0)。
|
||||
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
|
||||
|
||||
--s <速度>(可选)
|
||||
- 调整 gif 图的速度
|
||||
|
||||
使用方式
|
||||
1. 发送指令前,请确保:
|
||||
- 消息中附带一张 GIF 动图,或
|
||||
- 回复一条包含 GIF 动图的消息后再发送指令。
|
||||
2. 插件会自动:
|
||||
- 解析 GIF 的每一帧及其持续时间(duration)
|
||||
- 根据时间参数转换为帧索引进行裁剪
|
||||
- 如指定抽帧,则对裁剪后的片段均匀采样
|
||||
- 生成新的 GIF 并保持原始循环设置(loop=0)
|
||||
47
konabot/docs/user/shadertool.txt
Normal file
47
konabot/docs/user/shadertool.txt
Normal file
@ -0,0 +1,47 @@
|
||||
指令介绍
|
||||
shadertool - 使用 SkSL(Skia Shader Language)代码实时渲染并生成 GIF 动画
|
||||
|
||||
格式
|
||||
shadertool [选项] <SkSL 代码>
|
||||
|
||||
示例
|
||||
shadertool """
|
||||
uniform float u_time;
|
||||
uniform float2 u_resolution;
|
||||
|
||||
half4 main(float2 coord) {
|
||||
return half4(
|
||||
1.0,
|
||||
sin((coord.y / u_resolution.y + u_time) * 3.1415926 * 2) * 0.5 + 0.5,
|
||||
coord.x / u_resolution.x,
|
||||
1.0
|
||||
);
|
||||
}
|
||||
"""
|
||||
|
||||
参数说明
|
||||
SkSL 代码(必填)
|
||||
- 类型:字符串(建议用英文双引号包裹)
|
||||
- 内容:符合 SkSL 语法的片段着色器代码,必须包含 `void main()` 函数,并为 `sk_FragColor` 赋值。
|
||||
- 注意:插件会自动去除代码首尾的单引号或双引号,便于命令行输入。
|
||||
|
||||
--width <整数>(可选)
|
||||
- 默认值:320
|
||||
- 作用:输出 GIF 的宽度(像素),必须大于 0。
|
||||
|
||||
--height <整数>(可选)
|
||||
- 默认值:180
|
||||
- 作用:输出 GIF 的高度(像素),必须大于 0。
|
||||
|
||||
--duration <浮点数>(可选)
|
||||
- 默认值:1.0
|
||||
- 作用:动画总时长(秒),必须大于 0。
|
||||
- 限制:`duration × fps` 必须 ≥ 1 且 ≤ 100(即至少 1 帧,最多 100 帧)。
|
||||
|
||||
--fps <浮点数>(可选)
|
||||
- 默认值:15.0
|
||||
- 作用:每秒帧数,控制动画流畅度,必须大于 0。
|
||||
- 常见值:10(低配流畅)、15(默认)、24/30(电影/视频级)。
|
||||
|
||||
使用方式
|
||||
直接在群聊或私聊中发送 `shadertool` 指令,附上合法的 SkSL 代码即可。
|
||||
5
konabot/docs/user/黑白.txt
Normal file
5
konabot/docs/user/黑白.txt
Normal file
@ -0,0 +1,5 @@
|
||||
指令介绍
|
||||
黑白 - 将图片经过一个黑白滤镜的处理
|
||||
|
||||
示例
|
||||
引用一个带有图片的消息,或者消息本身携带图片,然后发送「黑白」即可
|
||||
45
konabot/plugins/errman.py
Normal file
45
konabot/plugins/errman.py
Normal file
@ -0,0 +1,45 @@
|
||||
from typing import Any
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.message import run_postprocessor
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from returns.primitives.exceptions import UnwrapFailedError
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
|
||||
|
||||
@run_postprocessor
|
||||
async def _(bot: Bot, matcher: Matcher, exc: BotExceptionMessage | AssertionError | UnwrapFailedError):
|
||||
if isinstance(exc, BotExceptionMessage):
|
||||
msg = exc.msg
|
||||
await matcher.send(await msg.export(bot))
|
||||
if isinstance(exc, AssertionError):
|
||||
if exc.args:
|
||||
err_msg = exc.args[0]
|
||||
|
||||
err_msg_res: UniMessage
|
||||
if isinstance(err_msg, str):
|
||||
err_msg_res = UniMessage().text(err_msg)
|
||||
elif isinstance(err_msg, UniMessage):
|
||||
err_msg_res = err_msg
|
||||
else:
|
||||
return
|
||||
|
||||
await matcher.send(await err_msg_res.export(bot))
|
||||
if isinstance(exc, UnwrapFailedError):
|
||||
obj = exc.halted_container
|
||||
try:
|
||||
failure: Any = obj.failure()
|
||||
|
||||
err_msg_res: UniMessage
|
||||
if isinstance(failure, str):
|
||||
err_msg_res = UniMessage().text(failure)
|
||||
elif isinstance(failure, UniMessage):
|
||||
err_msg_res = failure
|
||||
else:
|
||||
return
|
||||
|
||||
await matcher.send(await err_msg_res.export(bot))
|
||||
except:
|
||||
pass
|
||||
204
konabot/plugins/image_process/__init__.py
Normal file
204
konabot/plugins/image_process/__init__.py
Normal file
@ -0,0 +1,204 @@
|
||||
import re
|
||||
from io import BytesIO
|
||||
|
||||
from nonebot import on_message
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
|
||||
on_alconna)
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.nb.extract_image import PIL_Image
|
||||
from konabot.common.nb.match_keyword import match_keyword
|
||||
from konabot.common.nb.reply_image import reply_image
|
||||
|
||||
cmd_black_white = on_message(rule=match_keyword("黑白"))
|
||||
|
||||
|
||||
@cmd_black_white.handle()
|
||||
async def _(img: PIL_Image, bot: Bot):
|
||||
await reply_image(cmd_black_white, bot, img.convert("LA"))
|
||||
|
||||
|
||||
def parse_timestamp(tx: str) -> float | None:
|
||||
res = 0.0
|
||||
for component in tx.split(":"):
|
||||
res *= 60
|
||||
if not re.match(r"^\d+(\.\d+)?$", component):
|
||||
return
|
||||
res += float(component)
|
||||
return res
|
||||
|
||||
|
||||
cmd_giftool = on_alconna(Alconna(
|
||||
"giftool",
|
||||
Args["img", Image | None],
|
||||
Option("--ss", Args["start_point", str]),
|
||||
Option("--frames:v", Args["frame_count", int]),
|
||||
Option("-t", Args["length", str]),
|
||||
Option("-to", Args["end_point", str]),
|
||||
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
|
||||
))
|
||||
|
||||
|
||||
@cmd_giftool.handle()
|
||||
async def _(
|
||||
image: PIL_Image,
|
||||
start_point: str | None = None,
|
||||
frame_count: int | None = None,
|
||||
length: str | None = None,
|
||||
speed_factor: float = 1.0,
|
||||
end_point: str | None = None,
|
||||
):
|
||||
ss: None | float = None
|
||||
if start_point:
|
||||
ss = parse_timestamp(start_point)
|
||||
if ss is None:
|
||||
raise BotExceptionMessage("--ss 的格式不满足条件")
|
||||
|
||||
t: None | float = None
|
||||
if length:
|
||||
t = parse_timestamp(length)
|
||||
if t is None:
|
||||
raise BotExceptionMessage("-t 的格式不满足条件")
|
||||
|
||||
to: None | float = None
|
||||
if end_point:
|
||||
to = parse_timestamp(end_point)
|
||||
if to is None:
|
||||
raise BotExceptionMessage("-to 的格式不满足条件")
|
||||
|
||||
if to is not None and ss is not None and to <= ss:
|
||||
raise BotExceptionMessage("错误:出点时间小于入点")
|
||||
if frame_count is not None and frame_count <= 0:
|
||||
raise BotExceptionMessage("错误:帧数量应该大于 0")
|
||||
if speed_factor <= 0:
|
||||
raise BotExceptionMessage("错误:--speed 必须大于 0")
|
||||
|
||||
if not getattr(image, "is_animated", False):
|
||||
raise BotExceptionMessage("错误:输入的不是动图(GIF)")
|
||||
|
||||
frames = []
|
||||
durations = []
|
||||
total_duration = 0.0
|
||||
|
||||
try:
|
||||
for i in range(getattr(image, "n_frames")):
|
||||
image.seek(i)
|
||||
frames.append(image.copy())
|
||||
duration = image.info.get("duration", 100) # 单位:毫秒
|
||||
durations.append(duration)
|
||||
total_duration += duration / 1000.0 # 转为秒
|
||||
except EOFError:
|
||||
pass
|
||||
|
||||
if not frames:
|
||||
raise BotExceptionMessage("错误:读取 GIF 帧失败")
|
||||
|
||||
def time_to_frame_index(target_time: float) -> int:
|
||||
if target_time <= 0:
|
||||
return 0
|
||||
cum = 0.0
|
||||
for idx, dur in enumerate(durations):
|
||||
cum += dur / 1000.0
|
||||
if cum >= target_time:
|
||||
return min(idx, len(frames) - 1)
|
||||
return len(frames) - 1
|
||||
start_frame = 0
|
||||
end_frame = len(frames) - 1
|
||||
if ss is not None:
|
||||
start_frame = time_to_frame_index(ss)
|
||||
if to is not None:
|
||||
end_frame = time_to_frame_index(to)
|
||||
if end_frame < start_frame:
|
||||
end_frame = start_frame
|
||||
elif t is not None:
|
||||
end_time = (ss or 0.0) + t
|
||||
end_frame = time_to_frame_index(end_time)
|
||||
if end_frame < start_frame:
|
||||
end_frame = start_frame
|
||||
|
||||
start_frame = max(0, start_frame)
|
||||
end_frame = min(len(frames) - 1, end_frame)
|
||||
selected_frames = frames[start_frame : end_frame + 1]
|
||||
selected_durations = durations[start_frame : end_frame + 1]
|
||||
|
||||
if frame_count is not None and frame_count > 0:
|
||||
if frame_count >= len(selected_frames):
|
||||
pass
|
||||
else:
|
||||
step = len(selected_frames) / frame_count
|
||||
sampled_frames = []
|
||||
sampled_durations = []
|
||||
for i in range(frame_count):
|
||||
idx = int(i * step)
|
||||
sampled_frames.append(selected_frames[idx])
|
||||
sampled_durations.append(
|
||||
sum(selected_durations) // len(selected_durations)
|
||||
)
|
||||
selected_frames = sampled_frames
|
||||
selected_durations = sampled_durations
|
||||
|
||||
output_img = BytesIO()
|
||||
|
||||
adjusted_durations = [
|
||||
dur / speed_factor for dur in selected_durations
|
||||
]
|
||||
|
||||
rframes = []
|
||||
rdur = []
|
||||
|
||||
acc_mod_20 = 0
|
||||
|
||||
for i in range(len(selected_frames)):
|
||||
fr = selected_frames[i]
|
||||
du: float = adjusted_durations[i]
|
||||
|
||||
if du >= 20:
|
||||
rframes.append(fr)
|
||||
rdur.append(int(du))
|
||||
acc_mod_20 = 0
|
||||
else:
|
||||
if acc_mod_20 == 0:
|
||||
rframes.append(fr)
|
||||
rdur.append(20)
|
||||
acc_mod_20 += du
|
||||
else:
|
||||
acc_mod_20 += du
|
||||
if acc_mod_20 >= 20:
|
||||
acc_mod_20 = 0
|
||||
|
||||
if len(rframes) == 1 and len(selected_frames) > 1:
|
||||
rframes.append(selected_frames[max(2, len(selected_frames) // 2)])
|
||||
rdur.append(20)
|
||||
|
||||
transparency_flag = False
|
||||
for f in rframes:
|
||||
if f.mode == "RGBA":
|
||||
if any(pix < 255 for pix in f.getchannel("A").getdata()):
|
||||
transparency_flag = True
|
||||
break
|
||||
elif f.mode == "P" and "transparency" in f.info:
|
||||
transparency_flag = True
|
||||
break
|
||||
|
||||
tf = {}
|
||||
if transparency_flag:
|
||||
tf['transparency'] = 0
|
||||
|
||||
if rframes:
|
||||
rframes[0].save(
|
||||
output_img,
|
||||
format="GIF",
|
||||
save_all=True,
|
||||
append_images=rframes[1:],
|
||||
duration=rdur,
|
||||
loop=0,
|
||||
optimize=False,
|
||||
disposal=2,
|
||||
**tf,
|
||||
)
|
||||
else:
|
||||
raise BotExceptionMessage("错误:没有可输出的帧")
|
||||
output_img.seek(0)
|
||||
|
||||
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())
|
||||
43
konabot/plugins/sksl/__init__.py
Normal file
43
konabot/plugins/sksl/__init__.py
Normal file
@ -0,0 +1,43 @@
|
||||
from nonebot_plugin_alconna import Alconna, Args, Option, UniMessage, on_alconna
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.plugins.sksl.run_sksl import render_sksl_shader_to_gif
|
||||
|
||||
|
||||
cmd_run_sksl = on_alconna(Alconna(
|
||||
"shadertool",
|
||||
Option("--width", Args["width_", int]),
|
||||
Option("--height", Args["height_", int]),
|
||||
Option("--duration", Args["duration_", float]),
|
||||
Option("--fps", Args["fps_", float]),
|
||||
Args["code", str],
|
||||
))
|
||||
|
||||
@cmd_run_sksl.handle()
|
||||
async def _(
|
||||
code: str,
|
||||
width_: int = 320,
|
||||
height_: int = 180,
|
||||
duration_: float = 1.0,
|
||||
fps_: float = 15.0,
|
||||
):
|
||||
if width_ <= 0 or height_ <= 0:
|
||||
raise BotExceptionMessage("长宽应该大于 0")
|
||||
if duration_ <= 0:
|
||||
raise BotExceptionMessage("渲染时长应该大于 0")
|
||||
if fps_ <= 0:
|
||||
raise BotExceptionMessage("渲染帧率应该大于 0")
|
||||
if fps_ * duration_ < 1:
|
||||
raise BotExceptionMessage("时长太短或帧率太小,没有帧被渲染")
|
||||
if fps_ * duration_ > 100:
|
||||
raise BotExceptionMessage("太多帧啦!试着缩短一点时间吧!")
|
||||
if width_ > 640 or height_ > 640:
|
||||
raise BotExceptionMessage("最大支持 640x640 啦!不要太大啦!")
|
||||
|
||||
code = code.strip("\"").strip("'")
|
||||
|
||||
try:
|
||||
res = await render_sksl_shader_to_gif(code, width_, height_, duration_, fps_)
|
||||
await cmd_run_sksl.send(await UniMessage().image(raw=res).export())
|
||||
except (ValueError, RuntimeError) as e:
|
||||
await cmd_run_sksl.send(await UniMessage().text(f"渲染时遇到了问题:\n\n{e}").export())
|
||||
155
konabot/plugins/sksl/run_sksl.py
Normal file
155
konabot/plugins/sksl/run_sksl.py
Normal file
@ -0,0 +1,155 @@
|
||||
import asyncio
|
||||
import io
|
||||
import struct
|
||||
|
||||
from loguru import logger
|
||||
import numpy as np
|
||||
import skia
|
||||
from PIL import Image
|
||||
|
||||
|
||||
def _pack_uniforms(uniforms_dict, width, height, time_val):
|
||||
"""
|
||||
根据常见的教学用 uniform 布局打包字节数据
|
||||
假设 SkSL 中 uniform 顺序为: float u_time; float2 u_resolution;
|
||||
内存布局: [u_time(4B)][u_res_x(4B)][u_res_y(4B)] (总共 12 字节,无填充)
|
||||
注意:为匹配 skia.RuntimeEffect 的紧凑布局,已移除 float 和 float2 之间的 4 字节填充。
|
||||
"""
|
||||
# u_time (float) - 4 bytes
|
||||
time_bytes = struct.pack('f', time_val)
|
||||
|
||||
# u_resolution (vec2/float2) - 8 bytes
|
||||
res_bytes = struct.pack('ff', float(width), float(height))
|
||||
|
||||
# 移除填充字节,使用紧凑布局
|
||||
return time_bytes + res_bytes
|
||||
|
||||
|
||||
def _render_sksl_shader_to_gif(
|
||||
sksl_code: str,
|
||||
width: int = 256,
|
||||
height: int = 256,
|
||||
duration: float = 2.0,
|
||||
fps: float = 15,
|
||||
) -> io.BytesIO:
|
||||
"""
|
||||
渲染 SkSL 着色器动画为 GIF(适配 skia-python >= 138)
|
||||
"""
|
||||
|
||||
logger.info(f"开始编译\n{sksl_code}")
|
||||
|
||||
runtime_effect = skia.RuntimeEffect.MakeForShader(sksl_code)
|
||||
if runtime_effect is None:
|
||||
# SkSL 编译失败时,尝试获取错误信息(如果 skia 版本支持)
|
||||
error_message = ""
|
||||
# 注意:skia-python 的错误信息捕获可能因版本而异
|
||||
|
||||
# 尝试检查编译错误是否在日志中,但最直接的是抛出已知错误
|
||||
raise ValueError("SkSL 编译失败,请检查语法")
|
||||
|
||||
# --- 修复: 移除对不存在的 uniformSize() 的调用,直接使用硬编码的 12 字节尺寸 ---
|
||||
# float (4 bytes) + float2 (8 bytes) = 12 bytes (基于 _pack_uniforms 函数的紧凑布局)
|
||||
EXPECTED_UNIFORM_SIZE = 12
|
||||
|
||||
# 创建 CPU 后端 Surface
|
||||
surface = skia.Surface(width, height)
|
||||
|
||||
frames = []
|
||||
total_frames = int(duration * fps)
|
||||
|
||||
for frame in range(total_frames):
|
||||
time_val = frame / fps
|
||||
|
||||
# 打包 uniform 数据
|
||||
uniform_bytes = _pack_uniforms(None, width, height, time_val)
|
||||
|
||||
# [检查] 确保打包后的字节数与期望值匹配
|
||||
if len(uniform_bytes) != EXPECTED_UNIFORM_SIZE:
|
||||
raise ValueError(
|
||||
f"Uniform 数据大小不匹配!期望 {EXPECTED_UNIFORM_SIZE} 字节,实际 {len(uniform_bytes)} 字节。请检查 _pack_uniforms 函数。"
|
||||
)
|
||||
|
||||
uniform_data = skia.Data.MakeWithCopy(uniform_bytes)
|
||||
|
||||
# 创建着色器
|
||||
try:
|
||||
# makeShader 的参数: uniform_data, children_shaders, child_count
|
||||
shader = runtime_effect.makeShader(uniform_data, None, 0)
|
||||
if shader is None:
|
||||
# 如果 SkSL 语法正确但 uniform 匹配失败,makeShader 会返回 None
|
||||
raise RuntimeError("着色器创建返回 None!请检查 SkSL 语法和 uniform 数据匹配。")
|
||||
except Exception as e:
|
||||
raise ValueError(f"着色器创建失败: {e}")
|
||||
|
||||
# 绘制
|
||||
canvas = surface.getCanvas()
|
||||
canvas.clear(skia.Color(0, 0, 0, 255))
|
||||
|
||||
paint = skia.Paint()
|
||||
paint.setShader(shader)
|
||||
canvas.drawRect(skia.Rect.MakeWH(width, height), paint)
|
||||
|
||||
# --- 修复 peekPixels() 错误:改用 readPixels() 将数据复制到缓冲区 ---
|
||||
image = surface.makeImageSnapshot()
|
||||
|
||||
# 1. 准备目标 ImageInfo (通常是 kBGRA_8888_ColorType)
|
||||
target_info = skia.ImageInfo.Make(
|
||||
image.width(),
|
||||
image.height(),
|
||||
skia.ColorType.kBGRA_8888_ColorType, # 目标格式,匹配 Skia 常见的输出格式
|
||||
skia.AlphaType.kPremul_AlphaType
|
||||
)
|
||||
|
||||
# 2. 创建一个用于接收像素数据的 bytearray 缓冲区
|
||||
pixel_data = bytearray(image.width() * image.height() * 4) # 4 bytes per pixel (BGRA)
|
||||
|
||||
# 3. 将图像数据复制到缓冲区
|
||||
success = image.readPixels(target_info, pixel_data, target_info.minRowBytes())
|
||||
|
||||
if not success:
|
||||
raise RuntimeError("无法通过 readPixels() 获取图像像素")
|
||||
|
||||
# 4. 转换 bytearray 到 NumPy 数组 (BGRA 顺序)
|
||||
img_array = np.frombuffer(pixel_data, dtype=np.uint8).reshape((height, width, 4))
|
||||
|
||||
# 5. BGRA 转换成 RGB 顺序 (交换 R 和 B 通道)
|
||||
# [B, G, R, A] -> [R, G, B] (丢弃 A 通道)
|
||||
rgb_array = img_array[:, :, [2, 1, 0]]
|
||||
|
||||
# 6. 创建 PIL Image
|
||||
pil_img = Image.fromarray(rgb_array)
|
||||
frames.append(pil_img)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
# 生成 GIF
|
||||
gif_buffer = io.BytesIO()
|
||||
# 计算每帧的毫秒延迟
|
||||
frame_duration_ms = int(1000 / fps)
|
||||
frames[0].save(
|
||||
gif_buffer,
|
||||
format='GIF',
|
||||
save_all=True,
|
||||
append_images=frames[1:],
|
||||
duration=frame_duration_ms,
|
||||
loop=0, # 0 表示无限循环
|
||||
optimize=True
|
||||
)
|
||||
gif_buffer.seek(0)
|
||||
return gif_buffer
|
||||
|
||||
|
||||
async def render_sksl_shader_to_gif(
|
||||
sksl_code: str,
|
||||
width: int = 256,
|
||||
height: int = 256,
|
||||
duration: float = 2.0,
|
||||
fps: float = 15,
|
||||
) -> io.BytesIO:
|
||||
return await asyncio.to_thread(
|
||||
_render_sksl_shader_to_gif,
|
||||
sksl_code,
|
||||
width,
|
||||
height,
|
||||
duration,
|
||||
fps,
|
||||
)
|
||||
@ -1,18 +1,11 @@
|
||||
from io import BytesIO
|
||||
from typing import Optional
|
||||
|
||||
from PIL import Image
|
||||
from loguru import logger
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot.adapters import Bot as BaseBot
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
Args,
|
||||
Field,
|
||||
UniMessage,
|
||||
on_alconna,
|
||||
)
|
||||
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
|
||||
from PIL import Image
|
||||
from returns.result import Failure, Success
|
||||
|
||||
from konabot.common.nb.extract_image import extract_image_from_message
|
||||
@ -57,29 +50,6 @@ ytpgif_cmd = on_alconna(
|
||||
)
|
||||
|
||||
|
||||
async def get_image_url(event: BaseEvent) -> Optional[str]:
|
||||
"""从事件中提取图片 URL,支持直接消息和回复"""
|
||||
msg = event.get_message()
|
||||
for seg in msg:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
return str(seg.data["url"])
|
||||
|
||||
if hasattr(event, "reply") and (reply := getattr(event, "reply")):
|
||||
reply_msg = reply.message
|
||||
for seg in reply_msg:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
return str(seg.data["url"])
|
||||
return None
|
||||
|
||||
|
||||
async def download_image(url: str) -> bytes:
|
||||
import httpx
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.get(url, timeout=10)
|
||||
resp.raise_for_status()
|
||||
return resp.content
|
||||
|
||||
|
||||
def resize_frame(frame: Image.Image) -> Image.Image:
|
||||
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
|
||||
w, h = frame.size
|
||||
|
||||
64
poetry.lock
generated
64
poetry.lock
generated
@ -2079,6 +2079,21 @@ type = "legacy"
|
||||
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple"
|
||||
reference = "pt-gitea-pypi"
|
||||
|
||||
[[package]]
|
||||
name = "pybind11"
|
||||
version = "3.0.1"
|
||||
description = "Seamless operability between C++11 and Python"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "pybind11-3.0.1-py3-none-any.whl", hash = "sha256:aa8f0aa6e0a94d3b64adfc38f560f33f15e589be2175e103c0a33c6bce55ee89"},
|
||||
{file = "pybind11-3.0.1.tar.gz", hash = "sha256:9c0f40056a016da59bab516efb523089139fcc6f2ba7e4930854c61efb932051"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
global = ["pybind11-global (==3.0.1)"]
|
||||
|
||||
[[package]]
|
||||
name = "pycares"
|
||||
version = "4.11.0"
|
||||
@ -2515,6 +2530,53 @@ pygments = ">=2.13.0,<3.0.0"
|
||||
[package.extras]
|
||||
jupyter = ["ipywidgets (>=7.5.1,<9)"]
|
||||
|
||||
[[package]]
|
||||
name = "skia-python"
|
||||
version = "138.0"
|
||||
description = "Skia python binding"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "skia_python-138.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:a8bcf4f8d6ea08bdf718ed257f9a9c722f643223f9b93230fd4c15ac5c82029f"},
|
||||
{file = "skia_python-138.0-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:1b7f24a5b306f57d75b8733658189dd61ce9114a84b5d119b3b1fb09cbc991cb"},
|
||||
{file = "skia_python-138.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:2c4164013355bcf255651ac8a0ca519fd6095b6e58fa5e7a057919bec158411d"},
|
||||
{file = "skia_python-138.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:63a7f9dc8d5fdda969f35d7f8f3a774029093468a9c06771ccce9ab967b275ca"},
|
||||
{file = "skia_python-138.0-cp310-cp310-win_amd64.whl", hash = "sha256:740104424e9c94a70a767adcdbfa2bce131c93a82fa00a50432406dc04bac7e8"},
|
||||
{file = "skia_python-138.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:6e369105b2811b6c32d7f15624202609deeb428164ee395c715df01221c847f5"},
|
||||
{file = "skia_python-138.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:55a2ae437d1c6dc46d9bbc2e3e0b4b5642022dcebc0634591ae1c8acea52a413"},
|
||||
{file = "skia_python-138.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:e67c9bcac953fbeb31b38474a4e566371e5719d4e27032686359cbe602819af8"},
|
||||
{file = "skia_python-138.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:4e9fb097849604e290be28a6b4cc29cb0079b0233e599c2f6b2ec635e47169d2"},
|
||||
{file = "skia_python-138.0-cp311-cp311-win_amd64.whl", hash = "sha256:df5a0dd52e1038423f2e1c03677cba5ea974d215adec10763dd855f4a6ca0cfc"},
|
||||
{file = "skia_python-138.0-cp311-cp311-win_arm64.whl", hash = "sha256:054ca430003d52468974cf69d98719e61f188630709a3706a868efc16e817970"},
|
||||
{file = "skia_python-138.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6b83d7b8101038ab0fcb920b1a91c872615e45b0ac19fe0c66b55d3ad0d61970"},
|
||||
{file = "skia_python-138.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:f22060f513b2fc258b4269e631128ca68889107bc4aa65255119f3d9c1c32993"},
|
||||
{file = "skia_python-138.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:02b0c2146d25f45224c2c156dd8b859e89265ee5894c3fa8f456c6c41b27122d"},
|
||||
{file = "skia_python-138.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:fb9e980a61defcb6ca19d73c97ea23d1aea633a82e9d1dead6ed259c3856baaf"},
|
||||
{file = "skia_python-138.0-cp312-cp312-win_amd64.whl", hash = "sha256:625bb95dc225ea2257f1d1f7b0aee203290b91d909c8f5feaa7cd428f13bce22"},
|
||||
{file = "skia_python-138.0-cp312-cp312-win_arm64.whl", hash = "sha256:e495143edba2a7cdf59e83723f00c9a247e5baeda4981fc6587803ed46c5ed2a"},
|
||||
{file = "skia_python-138.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:1433b5a0bd4ca1c1d050541f54c25d27d55a129894dbd827d4e440697090ff83"},
|
||||
{file = "skia_python-138.0-cp313-cp313-macosx_11_0_x86_64.whl", hash = "sha256:3f053128aa6344fa991c2485f29f40bda77b5f46467a43e778a34ec8f5b3615a"},
|
||||
{file = "skia_python-138.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:878212d9576d065a45fd143c57d412bc0496c6594ecfcd9cc2cd93b4fd943cb4"},
|
||||
{file = "skia_python-138.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:a6b3e58756a9fa8af3714edc05aeafc7922090f5a276c4be11337f252921dbe8"},
|
||||
{file = "skia_python-138.0-cp313-cp313-win_amd64.whl", hash = "sha256:f2d596d5807fafef6bc43440f4df28f71db189f9e2cfb8613224713916837e3c"},
|
||||
{file = "skia_python-138.0-cp313-cp313-win_arm64.whl", hash = "sha256:20285bf4ee41da754b842c80076fc4a240cb3801f4a1cbbb50b26a5dca1ebc39"},
|
||||
{file = "skia_python-138.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:ff8dcb98b19e85d565c1d0accf8919e4b8a35d2a121d2d6f1c21fea655c85884"},
|
||||
{file = "skia_python-138.0-cp38-cp38-macosx_11_0_x86_64.whl", hash = "sha256:ff3e692214b27490e38caf6ed123d82d2ff762da4561473a5a34fab1cb5dcd14"},
|
||||
{file = "skia_python-138.0-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:db3388f4e6e8f63c9e2b6cb0add0c2e483cbed783558b10eb7d94540f75db9b6"},
|
||||
{file = "skia_python-138.0-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:1dd9306b2025e296ad1d0e2c3038e3221ade1d48dfd0a20ce926f8116603b909"},
|
||||
{file = "skia_python-138.0-cp38-cp38-win_amd64.whl", hash = "sha256:9ae5de84629dba4c751b334a8c43b04e9b7fe2a65df26992770648aa5e131b0c"},
|
||||
{file = "skia_python-138.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:736e6618f246444bff2ff73b9efe8fd7159d9048c85ecdffb23290702a5c9099"},
|
||||
{file = "skia_python-138.0-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:ab438dfe40d1be3da6fdd94890862781dc4e545a0bfa169a8e9dcc25f3bb0afd"},
|
||||
{file = "skia_python-138.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:2f5d842930d1e385aa7911e0705d4751d8e4aa550389baf88d40673672f180b1"},
|
||||
{file = "skia_python-138.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:d17ad82d6094f9c7bfc25b6525e1080bf3715abcc848be042c786b365f51a48c"},
|
||||
{file = "skia_python-138.0-cp39-cp39-win_amd64.whl", hash = "sha256:0f6136827a269a4a5294d92919bc6fad008da7d7e74256666d81506f863a6ff9"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
numpy = "*"
|
||||
pybind11 = ">=2.6"
|
||||
|
||||
[[package]]
|
||||
name = "sniffio"
|
||||
version = "1.3.1"
|
||||
@ -3198,4 +3260,4 @@ type = ["pytest-mypy"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.12,<4.0"
|
||||
content-hash = "b4c3d28f7572c57e867d126ce0c64787ae608b114e66b8de06147caf13e049dd"
|
||||
content-hash = "0c7709438b8eb3d468b775417b8ef642cd7a8c1031805fdc71fec32c48346e54"
|
||||
|
||||
@ -23,6 +23,7 @@ dependencies = [
|
||||
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
|
||||
"returns (>=0.26.0,<0.27.0)",
|
||||
"ptimeparse (>=0.1.1,<0.2.0)",
|
||||
"skia-python (>=138.0,<139.0)",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
|
||||
@ -8,9 +8,12 @@ nonebot.load_plugins("konabot/plugins")
|
||||
|
||||
plugins = nonebot.get_loaded_plugins()
|
||||
len_requires = len(
|
||||
[f for f in (
|
||||
Path(__file__).parent.parent / "konabot" / "plugins"
|
||||
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
|
||||
[
|
||||
f
|
||||
for f in (Path(__file__).parent.parent / "konabot" / "plugins").iterdir()
|
||||
if (f.is_dir() and (f / "__init__.py").exists())
|
||||
or ((not f.is_dir()) and f.suffix == ".py")
|
||||
]
|
||||
)
|
||||
|
||||
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]
|
||||
|
||||
Reference in New Issue
Block a user