Compare commits

..

15 Commits

Author SHA1 Message Date
af566888ab fix2
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 15:24:49 +08:00
e72bc283f8 调整 giftool
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 15:18:07 +08:00
c9d58e7498 修改文档
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 13:45:34 +08:00
627a48da1c 添加安全限制 2025-10-12 13:40:40 +08:00
87be1916ee 添加 Shadertool(谁需要??????)
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-12 13:36:54 +08:00
0ca901e7b1 添加 giftool
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 12:47:52 +08:00
d096f43d38 添加 giftool 2025-10-12 12:40:33 +08:00
38ae3d1c74 补充黑白的 man 2025-10-12 12:04:19 +08:00
a0483d1d5c 修复断言逻辑
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 11:52:41 +08:00
ae83b66908 添加图像黑白
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-12 11:50:15 +08:00
6abeb05a18 去除未使用的函数 2025-10-12 11:02:51 +08:00
9b0a0368fa 修改 YTPGIF 的功能问题
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 10:55:44 +08:00
4eac493de4 更改令人费解的 requirements.txt 方案
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-09 20:25:34 +08:00
b4e400b626 调整 ytpgif 使用共用方法读取图片
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-09 19:56:16 +08:00
c35ee57976 优化时间读取逻辑 2025-10-09 19:55:53 +08:00
22 changed files with 893 additions and 689 deletions

24
.vscode/launch.json vendored
View File

@ -1,24 +0,0 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "运行 Bot 并调试(自动重载)",
"type": "debugpy",
"request": "launch",
"module": "watchfiles",
"args": [
"bot.main"
],
"console": "integratedTerminal",
"justMyCode": true,
"env": {
"PYTHONPATH": "${workspaceFolder}"
},
"cwd": "${workspaceFolder}",
"presentation": {
"hidden": false,
"group": "bot"
}
}
]
}

30
.vscode/tasks.json vendored
View File

@ -1,30 +0,0 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "Poetry: Export requirements.txt (Production)",
"type": "shell",
"command": "poetry export -f requirements.txt --output requirements.txt --without-hashes",
"group": "build",
"presentation": {
"reveal": "always",
"panel": "new"
},
"problemMatcher": [],
"detail": "导出生产环境依赖到 requirements.txt"
},
{
"label": "Bot: Run with Auto-reload",
"type": "shell",
"command": "poetry run watchfiles bot.main",
"group": "build",
"isBackground": true,
"presentation": {
"reveal": "always",
"panel": "new"
},
"problemMatcher": [],
"detail": "运行 bot 并启用自动重载功能"
}
]
}

View File

@ -1,8 +1,41 @@
FROM python:3.13-slim
# copied from https://www.martinrichards.me/post/python_poetry_docker/
FROM python:3.13-slim AS base
ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH"
RUN apt-get update && \
apt-get install -y --no-install-recommends \
libfontconfig1 \
libgl1 \
libegl1 \
libglvnd0 \
mesa-vulkan-drivers \
&& rm -rf /var/lib/apt/lists/*
FROM base AS builder
ENV POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_IN_PROJECT=1 \
POETRY_VIRTUALENVS_CREATE=1 \
POETRY_CACHE_DIR=/tmp/poetry_cache
WORKDIR /app
RUN pip install poetry
COPY pyproject.toml poetry.lock ./
RUN python -m poetry install --no-root && rm -rf $POETRY_CACHE_DIR
FROM base AS runtime
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt --no-deps
COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets

View File

@ -65,10 +65,10 @@ code .
### 运行
你可以在 VSCode 的「运行与调试」窗口,启动 `运行 Bot 并调试(自动重载)` 任务来启动 Bot也可以使用命令行手动启动 Bot
使用命令行手动启动 Bot
```bash
poetry run watchfiles bot.main
poetry run watchfiles bot.main konabot
```
如果你不希望自动重载,只是想运行 Bot可以直接运行

9
konabot/common/nb/exc.py Normal file
View File

@ -0,0 +1,9 @@
from nonebot_plugin_alconna import UniMessage
class BotExceptionMessage(Exception):
def __init__(self, msg: UniMessage | str) -> None:
super().__init__()
if isinstance(msg, str):
msg = UniMessage().text(msg)
self.msg = msg

View File

@ -1,17 +1,23 @@
from io import BytesIO
from typing import Annotated
import httpx
import PIL.Image
from loguru import logger
import nonebot
from nonebot.matcher import Matcher
from nonebot.adapters import Bot, Event, Message
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
import nonebot.params
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
from PIL import UnidentifiedImageError
from returns.result import Failure, Result, Success
from konabot.common.nb.exc import BotExceptionMessage
async def download_image_bytes(url: str) -> Result[bytes, str]:
# if "/matcha/cache/" in url:
@ -133,3 +139,21 @@ async def extract_image_from_message(
else:
return Failure("暂时不支持在这里中通过引用的方式获取图片")
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
async def _ext_img(
evt: Event,
bot: Bot,
matcher: Matcher,
) -> PIL.Image.Image | None:
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
return img
case Failure(err):
# raise BotExceptionMessage(err)
await matcher.send(await UniMessage().text(err).export())
return None
assert False
PIL_Image = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]

View File

@ -0,0 +1,16 @@
import re
from nonebot_plugin_alconna import Text, UniMsg
def match_keyword(*patterns: str | re.Pattern):
async def _matcher(msg: UniMsg):
text = msg.get(Text).extract_plain_text().strip()
for pattern in patterns:
if isinstance(pattern, str) and text == pattern:
return True
if isinstance(pattern, re.Pattern) and re.match(pattern, text):
return True
return False
return _matcher

View File

@ -0,0 +1,13 @@
from io import BytesIO
import PIL
import PIL.Image
from nonebot.adapters import Bot
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import UniMessage
async def reply_image(matcher: type[Matcher], bot: Bot, img: PIL.Image.Image):
data = BytesIO()
img.save(data, "PNG")
await matcher.send(await UniMessage().image(raw=data).export(bot))

View File

@ -0,0 +1,59 @@
指令介绍
giftool - 对 GIF 动图进行裁剪、抽帧等处理
格式
giftool [图片] [选项]
示例
回复一张 GIF 并发送:
`giftool --ss 1.5 -t 2.0`
从 1.5 秒处开始,截取 2 秒长度的片段。
`giftool [图片] --ss 0:10 -to 0:15`
截取从 10 秒到 15 秒之间的片段(支持 MM:SS 或 HH:MM:SS 格式)。
`giftool [图片] --frames:v 10`
将整张 GIF 均匀抽帧,最终保留 10 帧。
`giftool [图片] --ss 2 --frames:v 5`
从第 2 秒开始截取,并将结果抽帧为 5 帧。
参数说明
图片(必需)
- 必须是 GIF 动图。
- 支持直接附带图片,或回复一条含 GIF 的消息后使用指令。
--ss <时间戳>(可选)
- 指定开始时间(单位:秒),可使用以下格式:
• 纯数字(如 `1.5` 表示 1.5 秒)
• 分秒格式(如 `1:30` 表示 1 分 30 秒)
• 时分秒格式(如 `0:1:30` 表示 1 分 30 秒)
- 默认从开头开始0 秒)。
-t <持续时间>(可选)
- 指定截取的持续时间(单位:秒),格式同 --ss。
- 与 --ss 配合使用:截取 [ss, ss + t] 区间。
- 不能与 --to 同时使用。
--to <时间戳>(可选)
- 指定结束时间(单位:秒),格式同 --ss。
- 与 --ss 配合使用:截取 [ss, to] 区间。
- 不能与 -t 同时使用。
--frames:v <帧数>(可选)
- 对截取后的片段进行均匀抽帧,保留指定数量的帧。
- 帧数必须为正整数(> 0
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
--s <速度>(可选)
- 调整 gif 图的速度
使用方式
1. 发送指令前,请确保:
- 消息中附带一张 GIF 动图,或
- 回复一条包含 GIF 动图的消息后再发送指令。
2. 插件会自动:
- 解析 GIF 的每一帧及其持续时间duration
- 根据时间参数转换为帧索引进行裁剪
- 如指定抽帧,则对裁剪后的片段均匀采样
- 生成新的 GIF 并保持原始循环设置loop=0

View File

@ -0,0 +1,47 @@
指令介绍
shadertool - 使用 SkSLSkia Shader Language代码实时渲染并生成 GIF 动画
格式
shadertool [选项] <SkSL 代码>
示例
shadertool """
uniform float u_time;
uniform float2 u_resolution;
half4 main(float2 coord) {
return half4(
1.0,
sin((coord.y / u_resolution.y + u_time) * 3.1415926 * 2) * 0.5 + 0.5,
coord.x / u_resolution.x,
1.0
);
}
"""
参数说明
SkSL 代码(必填)
- 类型:字符串(建议用英文双引号包裹)
- 内容:符合 SkSL 语法的片段着色器代码,必须包含 `void main()` 函数,并为 `sk_FragColor` 赋值。
- 注意:插件会自动去除代码首尾的单引号或双引号,便于命令行输入。
--width <整数>(可选)
- 默认值320
- 作用:输出 GIF 的宽度(像素),必须大于 0。
--height <整数>(可选)
- 默认值180
- 作用:输出 GIF 的高度(像素),必须大于 0。
--duration <浮点数>(可选)
- 默认值1.0
- 作用:动画总时长(秒),必须大于 0。
- 限制:`duration × fps` 必须 ≥ 1 且 ≤ 100即至少 1 帧,最多 100 帧)。
--fps <浮点数>(可选)
- 默认值15.0
- 作用:每秒帧数,控制动画流畅度,必须大于 0。
- 常见值10低配流畅、15默认、24/30电影/视频级)。
使用方式
直接在群聊或私聊中发送 `shadertool` 指令,附上合法的 SkSL 代码即可。

View File

@ -0,0 +1,5 @@
指令介绍
黑白 - 将图片经过一个黑白滤镜的处理
示例
引用一个带有图片的消息,或者消息本身携带图片,然后发送「黑白」即可

45
konabot/plugins/errman.py Normal file
View File

@ -0,0 +1,45 @@
from typing import Any
from nonebot.adapters import Bot
from nonebot.matcher import Matcher
from nonebot.message import run_postprocessor
from nonebot_plugin_alconna import UniMessage
from returns.primitives.exceptions import UnwrapFailedError
from konabot.common.nb.exc import BotExceptionMessage
@run_postprocessor
async def _(bot: Bot, matcher: Matcher, exc: BotExceptionMessage | AssertionError | UnwrapFailedError):
if isinstance(exc, BotExceptionMessage):
msg = exc.msg
await matcher.send(await msg.export(bot))
if isinstance(exc, AssertionError):
if exc.args:
err_msg = exc.args[0]
err_msg_res: UniMessage
if isinstance(err_msg, str):
err_msg_res = UniMessage().text(err_msg)
elif isinstance(err_msg, UniMessage):
err_msg_res = err_msg
else:
return
await matcher.send(await err_msg_res.export(bot))
if isinstance(exc, UnwrapFailedError):
obj = exc.halted_container
try:
failure: Any = obj.failure()
err_msg_res: UniMessage
if isinstance(failure, str):
err_msg_res = UniMessage().text(failure)
elif isinstance(failure, UniMessage):
err_msg_res = failure
else:
return
await matcher.send(await err_msg_res.export(bot))
except:
pass

View File

@ -0,0 +1,204 @@
import re
from io import BytesIO
from nonebot import on_message
from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
on_alconna)
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image
from konabot.common.nb.match_keyword import match_keyword
from konabot.common.nb.reply_image import reply_image
cmd_black_white = on_message(rule=match_keyword("黑白"))
@cmd_black_white.handle()
async def _(img: PIL_Image, bot: Bot):
await reply_image(cmd_black_white, bot, img.convert("LA"))
def parse_timestamp(tx: str) -> float | None:
res = 0.0
for component in tx.split(":"):
res *= 60
if not re.match(r"^\d+(\.\d+)?$", component):
return
res += float(component)
return res
cmd_giftool = on_alconna(Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
))
@cmd_giftool.handle()
async def _(
image: PIL_Image,
start_point: str | None = None,
frame_count: int | None = None,
length: str | None = None,
speed_factor: float = 1.0,
end_point: str | None = None,
):
ss: None | float = None
if start_point:
ss = parse_timestamp(start_point)
if ss is None:
raise BotExceptionMessage("--ss 的格式不满足条件")
t: None | float = None
if length:
t = parse_timestamp(length)
if t is None:
raise BotExceptionMessage("-t 的格式不满足条件")
to: None | float = None
if end_point:
to = parse_timestamp(end_point)
if to is None:
raise BotExceptionMessage("-to 的格式不满足条件")
if to is not None and ss is not None and to <= ss:
raise BotExceptionMessage("错误:出点时间小于入点")
if frame_count is not None and frame_count <= 0:
raise BotExceptionMessage("错误:帧数量应该大于 0")
if speed_factor <= 0:
raise BotExceptionMessage("错误:--speed 必须大于 0")
if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF")
frames = []
durations = []
total_duration = 0.0
try:
for i in range(getattr(image, "n_frames")):
image.seek(i)
frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒
durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError:
pass
if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败")
def time_to_frame_index(target_time: float) -> int:
if target_time <= 0:
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
start_frame = max(0, start_frame)
end_frame = min(len(frames) - 1, end_frame)
selected_frames = frames[start_frame : end_frame + 1]
selected_durations = durations[start_frame : end_frame + 1]
if frame_count is not None and frame_count > 0:
if frame_count >= len(selected_frames):
pass
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
output_img = BytesIO()
adjusted_durations = [
dur / speed_factor for dur in selected_durations
]
rframes = []
rdur = []
acc_mod_20 = 0
for i in range(len(selected_frames)):
fr = selected_frames[i]
du: float = adjusted_durations[i]
if du >= 20:
rframes.append(fr)
rdur.append(int(du))
acc_mod_20 = 0
else:
if acc_mod_20 == 0:
rframes.append(fr)
rdur.append(20)
acc_mod_20 += du
else:
acc_mod_20 += du
if acc_mod_20 >= 20:
acc_mod_20 = 0
if len(rframes) == 1 and len(selected_frames) > 1:
rframes.append(selected_frames[max(2, len(selected_frames) // 2)])
rdur.append(20)
transparency_flag = False
for f in rframes:
if f.mode == "RGBA":
if any(pix < 255 for pix in f.getchannel("A").getdata()):
transparency_flag = True
break
elif f.mode == "P" and "transparency" in f.info:
transparency_flag = True
break
tf = {}
if transparency_flag:
tf['transparency'] = 0
if rframes:
rframes[0].save(
output_img,
format="GIF",
save_all=True,
append_images=rframes[1:],
duration=rdur,
loop=0,
optimize=False,
disposal=2,
**tf,
)
else:
raise BotExceptionMessage("错误:没有可输出的帧")
output_img.seek(0)
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())

View File

@ -4,6 +4,7 @@ from pathlib import Path
from typing import Any, Literal, cast
import nonebot
import ptimeparse
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event
@ -19,8 +20,6 @@ from nonebot.adapters.onebot.v11.event import \
from nonebot_plugin_alconna import UniMessage, UniMsg
from pydantic import BaseModel
from konabot.plugins.simple_notify.parse_time import get_target_time
evt = on_message()
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
@ -141,10 +140,15 @@ async def _(msg: UniMsg, mEvt: Event):
return
notify_time, notify_text = segments
target_time = get_target_time(notify_time)
if target_time is None:
# target_time = get_target_time(notify_time)
try:
target_time = ptimeparse.parse(notify_time)
except Exception:
logger.info(f"无法从 {notify_time} 中解析出时间")
return
# if target_time is None:
# logger.info(f"无法从 {notify_time} 中解析出时间")
# return
if not notify_text:
return

View File

@ -1,358 +0,0 @@
import datetime
import re
from typing import Optional, Dict, List, Callable, Tuple
from loguru import logger
# --- 常量与正则表达式定义 (Constants and Regex Definitions) ---
# 数字模式,兼容中文和阿拉伯数字
P_NUM = r"(\d+|[零一两二三四五六七八九十]+)"
# 预编译的正则表达式
PATTERNS = {
# 相对时间, e.g., "5分钟后"
"DELTA": re.compile(
r"^"
r"((?P<days>" + P_NUM + r") ?天)?"
r"((?P<hours>" + P_NUM + r") ?个?小?时)?"
r"((?P<minutes>" + P_NUM + r") ?分钟?)?"
r"((?P<seconds>" + P_NUM + r") ?秒钟?)?"
r" ?后 ?$"
),
# 绝对时间
"YEAR": re.compile(r"(" + P_NUM + r") ?年"),
"MONTH": re.compile(r"(" + P_NUM + r") ?月"),
"DAY": re.compile(r"(" + P_NUM + r") ?[日号]"),
"HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"),
"MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"),
"SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"),
"HMS_COLON": re.compile(r"(\d{1,2})[:](\d{1,2})([:](\d{1,2}))?"),
"PM": re.compile(r"(下午|PM|晚上)"),
# 相对日期
"TOMORROW": re.compile(r"明天"),
"DAY_AFTER_TOMORROW": re.compile(r"后天"),
"TODAY": re.compile(r"今天"),
}
# 中文数字到阿拉伯数字的映射
CHINESE_TO_ARABIC_MAP: Dict[str, int] = {
'': 0, '': 1, '': 2, '': 3, '': 4,
'': 5, '': 6, '': 7, '': 8, '': 9, '': 10
}
# --- 核心工具函数 (Core Utility Functions) ---
def parse_number(s: str) -> int:
"""
将包含中文或阿拉伯数字的字符串解析为整数。
例如: "" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。
返回 -1 表示解析失败。
"""
if not s:
return -1
s = s.strip().replace("", "")
if s.isdigit():
return int(s)
if s in CHINESE_TO_ARABIC_MAP:
return CHINESE_TO_ARABIC_MAP[s]
# 处理 "十" 在不同位置的情况
if s.startswith(''):
if len(s) == 1:
return 10
num = CHINESE_TO_ARABIC_MAP.get(s[1])
return 10 + num if num is not None else -1
if s.endswith(''):
if len(s) == 2:
num = CHINESE_TO_ARABIC_MAP.get(s[0])
return 10 * num if num is not None else -1
if '' in s:
parts = s.split('')
if len(parts) == 2:
left = CHINESE_TO_ARABIC_MAP.get(parts[0])
right = CHINESE_TO_ARABIC_MAP.get(parts[1])
if left is not None and right is not None:
return left * 10 + right
return -1
# --- 时间解析器类 (Time Parser Class) ---
class TimeParser:
"""
一个用于解析自然语言时间描述的类。
"""
def __init__(self, content: str):
self.original_content: str = content
self.content_to_parse: str = self._preprocess(content)
self.now: datetime.datetime = datetime.datetime.now()
# 将 t 作为结果构建器,初始化为今天的午夜
self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
self.is_pm_specified: bool = False
self.is_date_specified: bool = False
self.is_time_specified: bool = False
def _preprocess(self, content: str) -> str:
"""预处理字符串,移除不相关字符。"""
content = re.sub(r"\s+", "", content)
content = re.sub(r"[,\.。::、]", "", content)
return content
def _consume_match(self, match: re.Match) -> str:
"""从待解析字符串中移除已匹配的部分。"""
self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1)
return match.group(0)
def parse(self) -> Optional[datetime.datetime]:
"""
主解析方法。
首先尝试解析相对时间如“5分钟后”失败则尝试解析绝对时间。
"""
logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'")
if not self.content_to_parse:
logger.debug("❌ 内容为空,无法解析。")
return None
# 1. 尝试相对时间解析
if (target_time := self._parse_relative_time()) is not None:
return target_time
# 2. 尝试绝对时间解析
if (target_time := self._parse_absolute_time()) is not None:
return target_time
logger.debug(f"❌ 所有解析模式均未匹配成功。")
return None
def _parse_relative_time(self) -> Optional[datetime.datetime]:
"""解析 'X天X小时X分钟后' 这种格式。"""
if match := PATTERNS["DELTA"].match(self.content_to_parse):
logger.debug("⏳ 匹配到相对时间模式 (DELTA)。")
try:
delta_parts = {
"days": parse_number(match.group("days") or "0"),
"hours": parse_number(match.group("hours") or "0"),
"minutes": parse_number(match.group("minutes") or "0"),
"seconds": parse_number(match.group("seconds") or "0"),
}
# 检查是否有无效的数字解析
if any(v < 0 for v in delta_parts.values()):
logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}")
return None
delta = datetime.timedelta(**delta_parts)
if delta.total_seconds() == 0:
logger.debug("❌ 解析出的时间增量为0。")
return None
target_time = self.now + delta
logger.debug(f"✅ 相对时间解析成功 -> {target_time}")
return target_time
except (ValueError, TypeError) as e:
logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True)
return None
return None
def _parse_absolute_time(self) -> Optional[datetime.datetime]:
"""解析一个指定的日期和时间。"""
logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}")
# 定义解析步骤和顺序
# (pattern_key, handler_method)
parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [
("TOMORROW", self._handle_tomorrow),
("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow),
("TODAY", self._handle_today),
("YEAR", self._handle_year),
("MONTH", self._handle_month),
("DAY", self._handle_day),
("HMS_COLON", self._handle_hms_colon),
("PM", self._handle_pm),
("HOUR", self._handle_hour),
("MINUTE", self._handle_minute),
("SECOND", self._handle_second),
]
for key, handler in parsing_steps:
if match := PATTERNS[key].search(self.content_to_parse):
if not handler(match):
# 如果任何一个处理器返回False说明解析失败
return None
# 移除无意义的上午关键词
self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "")
# 如果解析后还有剩余字符,说明有无法识别的部分
if self.content_to_parse.strip():
logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'")
return None
# 最终调整和检查
return self._finalize_datetime()
# --- Handler Methods for Absolute Time Parsing ---
def _handle_tomorrow(self, match: re.Match) -> bool:
self.t += datetime.timedelta(days=1)
self.is_date_specified = True
logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_day_after_tomorrow(self, match: re.Match) -> bool:
self.t += datetime.timedelta(days=2)
self.is_date_specified = True
logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_today(self, match: re.Match) -> bool:
self.is_date_specified = True
logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'")
return True
def _handle_year(self, match: re.Match) -> bool:
year = parse_number(match.group(1))
if year < 0: return False
if year < 100: year += 2000 # 处理 "25年" -> 2025
if year < self.now.year:
logger.debug(f"❌ 指定的年份 {year} 已过去。")
return False
self.t = self.t.replace(year=year)
self.is_date_specified = True
logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_month(self, match: re.Match) -> bool:
month = parse_number(match.group(1))
if not (1 <= month <= 12):
logger.debug(f"❌ 无效的月份: {month}")
return False
# 如果设置的月份在当前月份之前,且没有指定年份,则年份加一
if month < self.t.month and not self.is_date_specified:
self.t = self.t.replace(year=self.t.year + 1)
logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}")
self.t = self.t.replace(month=month)
self.is_date_specified = True
logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_day(self, match: re.Match) -> bool:
day = parse_number(match.group(1))
if not (1 <= day <= 31):
logger.debug(f"❌ 无效的日期: {day}")
return False
try:
# 如果日期小于当前日期,且只指定了日,则月份加一
if day < self.t.day and not self.is_date_specified:
if self.t.month == 12:
self.t = self.t.replace(year=self.t.year + 1, month=1)
else:
self.t = self.t.replace(month=self.t.month + 1)
logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}")
self.t = self.t.replace(day=day)
self.is_date_specified = True
logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'")
return True
except ValueError:
logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。")
return False
def _handle_hms_colon(self, match: re.Match) -> bool:
h = int(match.group(1))
m = int(match.group(2))
s_str = match.group(4) # group(3) is with colon, group(4) is the number
s = int(s_str) if s_str else 0
if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59):
logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}")
return False
self.t = self.t.replace(hour=h, minute=m, second=s)
self.is_time_specified = True
logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_pm(self, match: re.Match) -> bool:
self.is_pm_specified = True
logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'")
return True
def _handle_hour(self, match: re.Match) -> bool:
hour = parse_number(match.group(1))
has_half = match.group(2) == ''
if not (0 <= hour <= 23):
logger.debug(f"❌ 无效的小时: {hour}")
return False
minute = 30 if has_half else self.t.minute
self.t = self.t.replace(hour=hour, minute=minute)
self.is_time_specified = True
logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_minute(self, match: re.Match) -> bool:
minute = parse_number(match.group(1))
if not (0 <= minute <= 59):
logger.debug(f"❌ 无效的分钟: {minute}")
return False
self.t = self.t.replace(minute=minute)
self.is_time_specified = True
logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_second(self, match: re.Match) -> bool:
second = parse_number(match.group(1))
if not (0 <= second <= 59):
logger.debug(f"❌ 无效的秒: {second}")
return False
self.t = self.t.replace(second=second)
self.is_time_specified = True
logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'")
return True
def _finalize_datetime(self) -> Optional[datetime.datetime]:
"""对解析出的时间进行最后的调整和检查。"""
# 处理下午/晚上
if self.is_pm_specified and self.t.hour < 12:
self.t = self.t.replace(hour=self.t.hour + 12)
logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}")
# 如果没有指定任何时间或日期部分,则认为解析无效
if not self.is_date_specified and not self.is_time_specified:
logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。")
return None
# 如果最终计算出的时间点在当前时间之前,自动往后推
# 例如:现在是 15:00说 "14点"应该是指明天的14点
if self.t < self.now:
# 只有在明确指定了时间的情况下,才自动加一天
# 如果只指定了一个过去的日期如“去年5月1号”则不应该调整
if self.is_time_specified:
self.t += datetime.timedelta(days=1)
logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}")
logger.debug(f"✅ 解析成功,最终时间: {self.t}")
return self.t
# --- 公共接口 (Public Interface) ---
def get_target_time(content: str) -> Optional[datetime.datetime]:
"""
高级接口,用于将自然语言时间描述转换为 datetime 对象。
Args:
content: 包含时间信息的字符串。
Returns:
一个 datetime 对象,如果解析失败则返回 None。
"""
parser = TimeParser(content)
return parser.parse()

View File

@ -0,0 +1,43 @@
from nonebot_plugin_alconna import Alconna, Args, Option, UniMessage, on_alconna
from konabot.common.nb.exc import BotExceptionMessage
from konabot.plugins.sksl.run_sksl import render_sksl_shader_to_gif
cmd_run_sksl = on_alconna(Alconna(
"shadertool",
Option("--width", Args["width_", int]),
Option("--height", Args["height_", int]),
Option("--duration", Args["duration_", float]),
Option("--fps", Args["fps_", float]),
Args["code", str],
))
@cmd_run_sksl.handle()
async def _(
code: str,
width_: int = 320,
height_: int = 180,
duration_: float = 1.0,
fps_: float = 15.0,
):
if width_ <= 0 or height_ <= 0:
raise BotExceptionMessage("长宽应该大于 0")
if duration_ <= 0:
raise BotExceptionMessage("渲染时长应该大于 0")
if fps_ <= 0:
raise BotExceptionMessage("渲染帧率应该大于 0")
if fps_ * duration_ < 1:
raise BotExceptionMessage("时长太短或帧率太小,没有帧被渲染")
if fps_ * duration_ > 100:
raise BotExceptionMessage("太多帧啦!试着缩短一点时间吧!")
if width_ > 640 or height_ > 640:
raise BotExceptionMessage("最大支持 640x640 啦!不要太大啦!")
code = code.strip("\"").strip("'")
try:
res = await render_sksl_shader_to_gif(code, width_, height_, duration_, fps_)
await cmd_run_sksl.send(await UniMessage().image(raw=res).export())
except (ValueError, RuntimeError) as e:
await cmd_run_sksl.send(await UniMessage().text(f"渲染时遇到了问题:\n\n{e}").export())

View File

@ -0,0 +1,155 @@
import asyncio
import io
import struct
from loguru import logger
import numpy as np
import skia
from PIL import Image
def _pack_uniforms(uniforms_dict, width, height, time_val):
"""
根据常见的教学用 uniform 布局打包字节数据
假设 SkSL 中 uniform 顺序为: float u_time; float2 u_resolution;
内存布局: [u_time(4B)][u_res_x(4B)][u_res_y(4B)] (总共 12 字节,无填充)
注意:为匹配 skia.RuntimeEffect 的紧凑布局,已移除 float 和 float2 之间的 4 字节填充。
"""
# u_time (float) - 4 bytes
time_bytes = struct.pack('f', time_val)
# u_resolution (vec2/float2) - 8 bytes
res_bytes = struct.pack('ff', float(width), float(height))
# 移除填充字节,使用紧凑布局
return time_bytes + res_bytes
def _render_sksl_shader_to_gif(
sksl_code: str,
width: int = 256,
height: int = 256,
duration: float = 2.0,
fps: float = 15,
) -> io.BytesIO:
"""
渲染 SkSL 着色器动画为 GIF适配 skia-python >= 138
"""
logger.info(f"开始编译\n{sksl_code}")
runtime_effect = skia.RuntimeEffect.MakeForShader(sksl_code)
if runtime_effect is None:
# SkSL 编译失败时,尝试获取错误信息(如果 skia 版本支持)
error_message = ""
# 注意skia-python 的错误信息捕获可能因版本而异
# 尝试检查编译错误是否在日志中,但最直接的是抛出已知错误
raise ValueError("SkSL 编译失败,请检查语法")
# --- 修复: 移除对不存在的 uniformSize() 的调用,直接使用硬编码的 12 字节尺寸 ---
# float (4 bytes) + float2 (8 bytes) = 12 bytes (基于 _pack_uniforms 函数的紧凑布局)
EXPECTED_UNIFORM_SIZE = 12
# 创建 CPU 后端 Surface
surface = skia.Surface(width, height)
frames = []
total_frames = int(duration * fps)
for frame in range(total_frames):
time_val = frame / fps
# 打包 uniform 数据
uniform_bytes = _pack_uniforms(None, width, height, time_val)
# [检查] 确保打包后的字节数与期望值匹配
if len(uniform_bytes) != EXPECTED_UNIFORM_SIZE:
raise ValueError(
f"Uniform 数据大小不匹配!期望 {EXPECTED_UNIFORM_SIZE} 字节,实际 {len(uniform_bytes)} 字节。请检查 _pack_uniforms 函数。"
)
uniform_data = skia.Data.MakeWithCopy(uniform_bytes)
# 创建着色器
try:
# makeShader 的参数: uniform_data, children_shaders, child_count
shader = runtime_effect.makeShader(uniform_data, None, 0)
if shader is None:
# 如果 SkSL 语法正确但 uniform 匹配失败makeShader 会返回 None
raise RuntimeError("着色器创建返回 None请检查 SkSL 语法和 uniform 数据匹配。")
except Exception as e:
raise ValueError(f"着色器创建失败: {e}")
# 绘制
canvas = surface.getCanvas()
canvas.clear(skia.Color(0, 0, 0, 255))
paint = skia.Paint()
paint.setShader(shader)
canvas.drawRect(skia.Rect.MakeWH(width, height), paint)
# --- 修复 peekPixels() 错误:改用 readPixels() 将数据复制到缓冲区 ---
image = surface.makeImageSnapshot()
# 1. 准备目标 ImageInfo (通常是 kBGRA_8888_ColorType)
target_info = skia.ImageInfo.Make(
image.width(),
image.height(),
skia.ColorType.kBGRA_8888_ColorType, # 目标格式,匹配 Skia 常见的输出格式
skia.AlphaType.kPremul_AlphaType
)
# 2. 创建一个用于接收像素数据的 bytearray 缓冲区
pixel_data = bytearray(image.width() * image.height() * 4) # 4 bytes per pixel (BGRA)
# 3. 将图像数据复制到缓冲区
success = image.readPixels(target_info, pixel_data, target_info.minRowBytes())
if not success:
raise RuntimeError("无法通过 readPixels() 获取图像像素")
# 4. 转换 bytearray 到 NumPy 数组 (BGRA 顺序)
img_array = np.frombuffer(pixel_data, dtype=np.uint8).reshape((height, width, 4))
# 5. BGRA 转换成 RGB 顺序 (交换 R 和 B 通道)
# [B, G, R, A] -> [R, G, B] (丢弃 A 通道)
rgb_array = img_array[:, :, [2, 1, 0]]
# 6. 创建 PIL Image
pil_img = Image.fromarray(rgb_array)
frames.append(pil_img)
# ------------------------------------------------------------------
# 生成 GIF
gif_buffer = io.BytesIO()
# 计算每帧的毫秒延迟
frame_duration_ms = int(1000 / fps)
frames[0].save(
gif_buffer,
format='GIF',
save_all=True,
append_images=frames[1:],
duration=frame_duration_ms,
loop=0, # 0 表示无限循环
optimize=True
)
gif_buffer.seek(0)
return gif_buffer
async def render_sksl_shader_to_gif(
sksl_code: str,
width: int = 256,
height: int = 256,
duration: float = 2.0,
fps: float = 15,
) -> io.BytesIO:
return await asyncio.to_thread(
_render_sksl_shader_to_gif,
sksl_code,
width,
height,
duration,
fps,
)

View File

@ -1,17 +1,14 @@
import os
import tempfile
from typing import Optional
from io import BytesIO
from PIL import Image, ImageSequence
from loguru import logger
from nonebot.adapters import Bot as BaseBot
from nonebot.adapters import Event as BaseEvent
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
Args,
Field,
UniMessage,
on_alconna,
)
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
from PIL import Image
from returns.result import Failure, Success
from konabot.common.nb.extract_image import extract_image_from_message
__plugin_meta__ = PluginMetadata(
name="ytpgif",
@ -53,29 +50,6 @@ ytpgif_cmd = on_alconna(
)
async def get_image_url(event: BaseEvent) -> Optional[str]:
"""从事件中提取图片 URL支持直接消息和回复"""
msg = event.get_message()
for seg in msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
if hasattr(event, "reply") and (reply := event.reply):
reply_msg = reply.message
for seg in reply_msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
return None
async def download_image(url: str) -> bytes:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=10)
resp.raise_for_status()
return resp.content
def resize_frame(frame: Image.Image) -> Image.Image:
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
w, h = frame.size
@ -89,7 +63,7 @@ def resize_frame(frame: Image.Image) -> Image.Image:
@ytpgif_cmd.handle()
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
# === 校验 speed 范围 ===
if not (MIN_SPEED <= speed <= MAX_SPEED):
await ytpgif_cmd.send(
@ -97,172 +71,150 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
)
return
img_url = await get_image_url(event)
if not img_url:
await ytpgif_cmd.send(
await UniMessage.text(
"请发送一张图片或回复一张图片来生成镜像动图。"
).export()
)
return
match await extract_image_from_message(event.get_message(), event, bot):
case Success(img):
src_img = img
case Failure(msg):
await ytpgif_cmd.send(
await UniMessage.text(msg).export()
)
return
case _:
return
try:
image_data = await download_image(img_url)
except Exception as e:
print(f"[YTPGIF] 下载失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 图片下载失败,请重试。").export()
)
return
try:
n_frames = getattr(src_img, "n_frames", 1)
is_animated = n_frames > 1
logger.debug(f"收到的动图的运动状态:{is_animated} 帧数量:{n_frames}")
except Exception:
is_animated = False
n_frames = 1
input_path = output_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
tmp_in.write(image_data)
input_path = tmp_in.name
output_frames = []
output_durations_ms = []
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out:
output_path = tmp_out.name
if is_animated:
# === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration: list[tuple[Image.Image, float]] = []
palette = src_img.getpalette()
with Image.open(input_path) as src_img:
# === 判断是否为动图 ===
try:
n_frames = getattr(src_img, "n_frames", 1)
is_animated = n_frames > 1
except Exception:
is_animated = False
for idx in range(n_frames):
src_img.seek(idx)
frame = src_img.copy()
# 检查是否需要透明通道
has_alpha = (
frame.mode in ("RGBA", "LA")
or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
output_frames = []
output_durations_ms = []
# 若原图有调色板,尝试保留(可选)
if palette and resized_frame.mode == "P":
try:
resized_frame.putpalette(palette)
except Exception: # noqa
logger.debug("色板应用失败")
pass
if is_animated:
# === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration = []
palette = src_img.getpalette()
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
dur_sec = max(0.01, ms / 1000.0)
frames_with_duration.append((resized_frame, dur_sec))
for idx in range(n_frames):
src_img.seek(idx)
frame = src_img.copy()
# 检查是否需要透明通道
has_alpha = (
frame.mode in ("RGBA", "LA")
or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0
frame_count = 0
# 若原图有调色板,尝试保留(可选)
if palette and resized_frame.mode == "P":
try:
resized_frame.putpalette(palette)
except Exception: # noqa
pass
# 正向段
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
output_frames.append(img)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
dur_sec = max(0.01, ms / 1000.0)
frames_with_duration.append((resized_frame, dur_sec))
max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0
frame_count = 0
# 正向段
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
output_frames.append(img)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
if frame_count == 0:
await ytpgif_cmd.send(
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
)
return
# 镜像段(从头开始)
accumulated = 0.0
frame_count = 0
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
output_frames.append(flipped)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
output_frames = [frame1, frame2]
output_durations_ms = [duration_ms, duration_ms]
if len(output_frames) < 1:
if frame_count == 0:
await ytpgif_cmd.send(
await UniMessage.text("未能生成任何帧").export()
await UniMessage.text("动图帧太短,无法生成有效片段").export()
)
return
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
need_transparency = False
for frame in output_frames:
if frame.mode == "RGBA":
alpha_channel = frame.getchannel("A")
if any(pix < 255 for pix in alpha_channel.getdata()):
need_transparency = True
break
elif frame.mode == "P" and "transparency" in frame.info:
# 镜像段(从头开始)
accumulated = 0.0
frame_count = 0
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
flipped = img.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
output_frames.append(flipped)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
frame2 = resized_frame.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
output_frames = [frame1, frame2]
output_durations_ms = [duration_ms, duration_ms]
if len(output_frames) < 1:
await ytpgif_cmd.send(
await UniMessage.text("未能生成任何帧。").export()
)
return
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
need_transparency = False
for frame in output_frames:
if frame.mode == "RGBA":
alpha_channel = frame.getchannel("A")
if any(pix < 255 for pix in alpha_channel.getdata()):
need_transparency = True
break
elif frame.mode == "P" and "transparency" in frame.info:
need_transparency = True
break
# 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency:
output_frames = [f.convert("RGB") for f in output_frames]
# 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency:
output_frames = [f.convert("RGB") for f in output_frames]
# 构建保存参数
save_kwargs = {
"save_all": True,
"append_images": output_frames[1:],
"format": "GIF",
"loop": 0, # 无限循环
"duration": output_durations_ms,
"disposal": 2, # 清除到背景色,避免残留
"optimize": False, # 关闭抖动(等效 -dither none
}
# 构建保存参数
save_kwargs = {
"save_all": True,
"append_images": output_frames[1:],
"format": "GIF",
"loop": 0, # 无限循环
"duration": output_durations_ms,
"disposal": 2, # 清除到背景色,避免残留
"optimize": False, # 关闭抖动(等效 -dither none
}
# 只有真正需要透明时才启用 transparency
if need_transparency:
save_kwargs["transparency"] = 0
# 只有真正需要透明时才启用 transparency
if need_transparency:
save_kwargs["transparency"] = 0
output_frames[0].save(output_path, **save_kwargs)
# 发送结果
with open(output_path, "rb") as f:
result_image = UniMessage.image(raw=f.read())
bio = BytesIO()
output_frames[0].save(bio, **save_kwargs)
result_image = UniMessage.image(raw=bio)
await ytpgif_cmd.send(await result_image.export())
except Exception as e:
print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
)
finally:
for path in filter(None, [input_path, output_path]):
if os.path.exists(path):
try:
os.unlink(path)
except: # noqa
pass
)

81
poetry.lock generated
View File

@ -2062,6 +2062,38 @@ files = [
{file = "propcache-0.3.2.tar.gz", hash = "sha256:20d7d62e4e7ef05f221e0db2856b979540686342e7dd9973b815599c7057e168"},
]
[[package]]
name = "ptimeparse"
version = "0.1.2"
description = "一个用于解析中文的时间表达的库"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "ptimeparse-0.1.2-py3-none-any.whl", hash = "sha256:0eea791396e53b63330fadb40d9f0a2e6272bd5467246f10d1d6971bc606edff"},
{file = "ptimeparse-0.1.2.tar.gz", hash = "sha256:658be90a3cc2994c09c4ea2f276d257e7eb84bc330be79950baefe32b19779a2"},
]
[package.source]
type = "legacy"
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple"
reference = "pt-gitea-pypi"
[[package]]
name = "pybind11"
version = "3.0.1"
description = "Seamless operability between C++11 and Python"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "pybind11-3.0.1-py3-none-any.whl", hash = "sha256:aa8f0aa6e0a94d3b64adfc38f560f33f15e589be2175e103c0a33c6bce55ee89"},
{file = "pybind11-3.0.1.tar.gz", hash = "sha256:9c0f40056a016da59bab516efb523089139fcc6f2ba7e4930854c61efb932051"},
]
[package.extras]
global = ["pybind11-global (==3.0.1)"]
[[package]]
name = "pycares"
version = "4.11.0"
@ -2498,6 +2530,53 @@ pygments = ">=2.13.0,<3.0.0"
[package.extras]
jupyter = ["ipywidgets (>=7.5.1,<9)"]
[[package]]
name = "skia-python"
version = "138.0"
description = "Skia python binding"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "skia_python-138.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:a8bcf4f8d6ea08bdf718ed257f9a9c722f643223f9b93230fd4c15ac5c82029f"},
{file = "skia_python-138.0-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:1b7f24a5b306f57d75b8733658189dd61ce9114a84b5d119b3b1fb09cbc991cb"},
{file = "skia_python-138.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:2c4164013355bcf255651ac8a0ca519fd6095b6e58fa5e7a057919bec158411d"},
{file = "skia_python-138.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:63a7f9dc8d5fdda969f35d7f8f3a774029093468a9c06771ccce9ab967b275ca"},
{file = "skia_python-138.0-cp310-cp310-win_amd64.whl", hash = "sha256:740104424e9c94a70a767adcdbfa2bce131c93a82fa00a50432406dc04bac7e8"},
{file = "skia_python-138.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:6e369105b2811b6c32d7f15624202609deeb428164ee395c715df01221c847f5"},
{file = "skia_python-138.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:55a2ae437d1c6dc46d9bbc2e3e0b4b5642022dcebc0634591ae1c8acea52a413"},
{file = "skia_python-138.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:e67c9bcac953fbeb31b38474a4e566371e5719d4e27032686359cbe602819af8"},
{file = "skia_python-138.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:4e9fb097849604e290be28a6b4cc29cb0079b0233e599c2f6b2ec635e47169d2"},
{file = "skia_python-138.0-cp311-cp311-win_amd64.whl", hash = "sha256:df5a0dd52e1038423f2e1c03677cba5ea974d215adec10763dd855f4a6ca0cfc"},
{file = "skia_python-138.0-cp311-cp311-win_arm64.whl", hash = "sha256:054ca430003d52468974cf69d98719e61f188630709a3706a868efc16e817970"},
{file = "skia_python-138.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6b83d7b8101038ab0fcb920b1a91c872615e45b0ac19fe0c66b55d3ad0d61970"},
{file = "skia_python-138.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:f22060f513b2fc258b4269e631128ca68889107bc4aa65255119f3d9c1c32993"},
{file = "skia_python-138.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:02b0c2146d25f45224c2c156dd8b859e89265ee5894c3fa8f456c6c41b27122d"},
{file = "skia_python-138.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:fb9e980a61defcb6ca19d73c97ea23d1aea633a82e9d1dead6ed259c3856baaf"},
{file = "skia_python-138.0-cp312-cp312-win_amd64.whl", hash = "sha256:625bb95dc225ea2257f1d1f7b0aee203290b91d909c8f5feaa7cd428f13bce22"},
{file = "skia_python-138.0-cp312-cp312-win_arm64.whl", hash = "sha256:e495143edba2a7cdf59e83723f00c9a247e5baeda4981fc6587803ed46c5ed2a"},
{file = "skia_python-138.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:1433b5a0bd4ca1c1d050541f54c25d27d55a129894dbd827d4e440697090ff83"},
{file = "skia_python-138.0-cp313-cp313-macosx_11_0_x86_64.whl", hash = "sha256:3f053128aa6344fa991c2485f29f40bda77b5f46467a43e778a34ec8f5b3615a"},
{file = "skia_python-138.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:878212d9576d065a45fd143c57d412bc0496c6594ecfcd9cc2cd93b4fd943cb4"},
{file = "skia_python-138.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:a6b3e58756a9fa8af3714edc05aeafc7922090f5a276c4be11337f252921dbe8"},
{file = "skia_python-138.0-cp313-cp313-win_amd64.whl", hash = "sha256:f2d596d5807fafef6bc43440f4df28f71db189f9e2cfb8613224713916837e3c"},
{file = "skia_python-138.0-cp313-cp313-win_arm64.whl", hash = "sha256:20285bf4ee41da754b842c80076fc4a240cb3801f4a1cbbb50b26a5dca1ebc39"},
{file = "skia_python-138.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:ff8dcb98b19e85d565c1d0accf8919e4b8a35d2a121d2d6f1c21fea655c85884"},
{file = "skia_python-138.0-cp38-cp38-macosx_11_0_x86_64.whl", hash = "sha256:ff3e692214b27490e38caf6ed123d82d2ff762da4561473a5a34fab1cb5dcd14"},
{file = "skia_python-138.0-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:db3388f4e6e8f63c9e2b6cb0add0c2e483cbed783558b10eb7d94540f75db9b6"},
{file = "skia_python-138.0-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:1dd9306b2025e296ad1d0e2c3038e3221ade1d48dfd0a20ce926f8116603b909"},
{file = "skia_python-138.0-cp38-cp38-win_amd64.whl", hash = "sha256:9ae5de84629dba4c751b334a8c43b04e9b7fe2a65df26992770648aa5e131b0c"},
{file = "skia_python-138.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:736e6618f246444bff2ff73b9efe8fd7159d9048c85ecdffb23290702a5c9099"},
{file = "skia_python-138.0-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:ab438dfe40d1be3da6fdd94890862781dc4e545a0bfa169a8e9dcc25f3bb0afd"},
{file = "skia_python-138.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:2f5d842930d1e385aa7911e0705d4751d8e4aa550389baf88d40673672f180b1"},
{file = "skia_python-138.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:d17ad82d6094f9c7bfc25b6525e1080bf3715abcc848be042c786b365f51a48c"},
{file = "skia_python-138.0-cp39-cp39-win_amd64.whl", hash = "sha256:0f6136827a269a4a5294d92919bc6fad008da7d7e74256666d81506f863a6ff9"},
]
[package.dependencies]
numpy = "*"
pybind11 = ">=2.6"
[[package]]
name = "sniffio"
version = "1.3.1"
@ -3181,4 +3260,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<4.0"
content-hash = "927913b9030d1f6c126bb2d12eab7307dc6297f259c7c62e3033706457d27ce0"
content-hash = "0c7709438b8eb3d468b775417b8ef642cd7a8c1031805fdc71fec32c48346e54"

View File

@ -22,9 +22,18 @@ dependencies = [
"imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)",
"ptimeparse (>=0.1.1,<0.2.0)",
"skia-python (>=138.0,<139.0)",
]
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
[[tool.poetry.source]]
name = "pt-gitea-pypi"
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
priority = "supplemental"
[tool.poetry.dependencies]
ptimeparse = {source = "pt-gitea-pypi"}

View File

@ -1,84 +0,0 @@
aio-mc-rcon==3.4.1 ; python_version >= "3.12" and python_version < "4.0"
aiodns==3.5.0 ; python_version >= "3.12" and python_version < "4.0"
aiohappyeyeballs==2.6.1 ; python_version >= "3.12" and python_version < "4.0"
aiohttp==3.12.15 ; python_version >= "3.12" and python_version < "4.0"
aiosignal==1.4.0 ; python_version >= "3.12" and python_version < "4.0"
annotated-types==0.7.0 ; python_version >= "3.12" and python_version < "4.0"
anyio==4.11.0 ; python_version >= "3.12" and python_version < "4.0"
apscheduler==3.11.0 ; python_version >= "3.12" and python_version < "4.0"
arclet-alconna-tools==0.7.11 ; python_version >= "3.12" and python_version < "4.0"
arclet-alconna==1.8.40 ; python_version >= "3.12" and python_version < "4.0"
attrs==25.3.0 ; python_version >= "3.12" and python_version < "4.0"
beautifulsoup4==4.13.5 ; python_version >= "3.12" and python_version < "4.0"
brotli==1.1.0 ; python_version >= "3.12" and python_version < "4.0" and platform_python_implementation == "CPython"
brotlicffi==1.1.0.0 ; python_version >= "3.12" and python_version < "4.0" and platform_python_implementation != "CPython"
certifi==2025.8.3 ; python_version >= "3.12" and python_version < "4.0"
cffi==2.0.0 ; python_version >= "3.12" and python_version < "4.0"
charset-normalizer==3.4.3 ; python_version >= "3.12" and python_version < "4.0"
click==8.3.0 ; python_version >= "3.12" and python_version < "4.0"
colorama==0.4.6 ; python_version >= "3.12" and python_version < "4.0" and (sys_platform == "win32" or platform_system == "Windows")
exceptiongroup==1.3.0 ; python_version >= "3.12" and python_version < "4.0"
fastapi==0.117.1 ; python_version >= "3.12" and python_version < "4.0"
frozenlist==1.7.0 ; python_version >= "3.12" and python_version < "4.0"
h11==0.16.0 ; python_version >= "3.12" and python_version < "4.0"
h2==4.3.0 ; python_version >= "3.12" and python_version < "4.0"
hpack==4.1.0 ; python_version >= "3.12" and python_version < "4.0"
httpcore==1.0.9 ; python_version >= "3.12" and python_version < "4.0"
httptools==0.6.4 ; python_version >= "3.12" and python_version < "4.0"
httpx==0.28.1 ; python_version >= "3.12" and python_version < "4.0"
hyperframe==6.1.0 ; python_version >= "3.12" and python_version < "4.0"
idna==3.10 ; python_version >= "3.12" and python_version < "4.0"
imagetext-py==2.2.0 ; python_version >= "3.12" and python_version < "4.0"
importlib-metadata==8.7.0 ; python_version >= "3.12" and python_version < "4.0"
linkify-it-py==2.0.3 ; python_version >= "3.12" and python_version < "4.0"
loguru==0.7.3 ; python_version >= "3.12" and python_version < "4.0"
lxml==6.0.2 ; python_version >= "3.12" and python_version < "4.0"
markdown-it-py==4.0.0 ; python_version >= "3.12" and python_version < "4.0"
mdit-py-plugins==0.5.0 ; python_version >= "3.12" and python_version < "4.0"
mdurl==0.1.2 ; python_version >= "3.12" and python_version < "4.0"
msgpack==1.1.1 ; python_version >= "3.12" and python_version < "4.0"
multidict==6.6.4 ; python_version >= "3.12" and python_version < "4.0"
nepattern==0.7.7 ; python_version >= "3.12" and python_version < "4.0"
nonebot-adapter-console==0.9.0 ; python_version >= "3.12" and python_version < "4.0"
nonebot-adapter-discord==0.1.8 ; python_version >= "3.12" and python_version < "4.0"
nonebot-adapter-minecraft==1.5.2 ; python_version >= "3.12" and python_version < "4.0"
nonebot-adapter-onebot==2.4.6 ; python_version >= "3.12" and python_version < "4.0"
nonebot-plugin-alconna==0.59.4 ; python_version >= "3.12" and python_version < "4.0"
nonebot-plugin-apscheduler==0.5.0 ; python_version >= "3.12" and python_version < "4.0"
nonebot-plugin-waiter==0.8.1 ; python_version >= "3.12" and python_version < "4.0"
nonebot2==2.4.3 ; python_version >= "3.12" and python_version < "4.0"
nonechat==0.6.1 ; python_version >= "3.12" and python_version < "4.0"
numpy==2.2.6 ; python_version >= "3.12" and python_version < "4.0"
opencv-python-headless==4.12.0.88 ; python_version >= "3.12" and python_version < "4.0"
pillow==11.3.0 ; python_version >= "3.12" and python_version < "4.0"
platformdirs==4.4.0 ; python_version >= "3.12" and python_version < "4.0"
propcache==0.3.2 ; python_version >= "3.12" and python_version < "4.0"
pycares==4.11.0 ; python_version >= "3.12" and python_version < "4.0"
pycparser==2.23 ; python_version >= "3.12" and python_version < "4.0" and implementation_name != "PyPy"
pydantic-core==2.33.2 ; python_version >= "3.12" and python_version < "4.0"
pydantic==2.11.9 ; python_version >= "3.12" and python_version < "4.0"
pygments==2.19.2 ; python_version >= "3.12" and python_version < "4.0"
pygtrie==2.5.0 ; python_version >= "3.12" and python_version < "4.0"
python-dotenv==1.1.1 ; python_version >= "3.12" and python_version < "4.0"
pyyaml==6.0.3 ; python_version >= "3.12" and python_version < "4.0"
requests==2.32.5 ; python_version >= "3.12" and python_version < "4.0"
returns==0.26.0 ; python_version >= "3.12" and python_version < "4.0"
rich==14.1.0 ; python_version >= "3.12" and python_version < "4.0"
sniffio==1.3.1 ; python_version >= "3.12" and python_version < "4.0"
soupsieve==2.8 ; python_version >= "3.12" and python_version < "4.0"
starlette==0.48.0 ; python_version >= "3.12" and python_version < "4.0"
tarina==0.6.8 ; python_version >= "3.12" and python_version < "4.0"
textual==3.7.1 ; python_version >= "3.12" and python_version < "4.0"
typing-extensions==4.15.0 ; python_version >= "3.12" and python_version < "4.0"
typing-inspection==0.4.1 ; python_version >= "3.12" and python_version < "4.0"
tzdata==2025.2 ; python_version >= "3.12" and python_version < "4.0" and platform_system == "Windows"
tzlocal==5.3.1 ; python_version >= "3.12" and python_version < "4.0"
uc-micro-py==1.0.3 ; python_version >= "3.12" and python_version < "4.0"
urllib3==2.5.0 ; python_version >= "3.12" and python_version < "4.0"
uvicorn==0.37.0 ; python_version >= "3.12" and python_version < "4.0"
uvloop==0.21.0 ; python_version >= "3.12" and python_version < "4.0" and sys_platform != "win32" and sys_platform != "cygwin" and platform_python_implementation != "PyPy"
watchfiles==1.1.0 ; python_version >= "3.12" and python_version < "4.0"
websockets==15.0.1 ; python_version >= "3.12" and python_version < "4.0"
win32-setctime==1.2.0 ; python_version >= "3.12" and python_version < "4.0" and sys_platform == "win32"
yarl==1.20.1 ; python_version >= "3.12" and python_version < "4.0"
zipp==3.23.0 ; python_version >= "3.12" and python_version < "4.0"

View File

@ -8,9 +8,12 @@ nonebot.load_plugins("konabot/plugins")
plugins = nonebot.get_loaded_plugins()
len_requires = len(
[f for f in (
Path(__file__).parent.parent / "konabot" / "plugins"
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
[
f
for f in (Path(__file__).parent.parent / "konabot" / "plugins").iterdir()
if (f.is_dir() and (f / "__init__.py").exists())
or ((not f.is_dir()) and f.suffix == ".py")
]
)
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]