Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0ca901e7b1 | |||
| d096f43d38 | |||
| 38ae3d1c74 | |||
| a0483d1d5c | |||
| ae83b66908 | |||
| 6abeb05a18 | |||
| 9b0a0368fa | |||
| 4eac493de4 | |||
| b4e400b626 | |||
| c35ee57976 |
24
.vscode/launch.json
vendored
24
.vscode/launch.json
vendored
@ -1,24 +0,0 @@
|
||||
{
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "运行 Bot 并调试(自动重载)",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"module": "watchfiles",
|
||||
"args": [
|
||||
"bot.main"
|
||||
],
|
||||
"console": "integratedTerminal",
|
||||
"justMyCode": true,
|
||||
"env": {
|
||||
"PYTHONPATH": "${workspaceFolder}"
|
||||
},
|
||||
"cwd": "${workspaceFolder}",
|
||||
"presentation": {
|
||||
"hidden": false,
|
||||
"group": "bot"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
30
.vscode/tasks.json
vendored
30
.vscode/tasks.json
vendored
@ -1,30 +0,0 @@
|
||||
{
|
||||
"version": "2.0.0",
|
||||
"tasks": [
|
||||
{
|
||||
"label": "Poetry: Export requirements.txt (Production)",
|
||||
"type": "shell",
|
||||
"command": "poetry export -f requirements.txt --output requirements.txt --without-hashes",
|
||||
"group": "build",
|
||||
"presentation": {
|
||||
"reveal": "always",
|
||||
"panel": "new"
|
||||
},
|
||||
"problemMatcher": [],
|
||||
"detail": "导出生产环境依赖到 requirements.txt"
|
||||
},
|
||||
{
|
||||
"label": "Bot: Run with Auto-reload",
|
||||
"type": "shell",
|
||||
"command": "poetry run watchfiles bot.main",
|
||||
"group": "build",
|
||||
"isBackground": true,
|
||||
"presentation": {
|
||||
"reveal": "always",
|
||||
"panel": "new"
|
||||
},
|
||||
"problemMatcher": [],
|
||||
"detail": "运行 bot 并启用自动重载功能"
|
||||
}
|
||||
]
|
||||
}
|
||||
30
Dockerfile
30
Dockerfile
@ -1,8 +1,32 @@
|
||||
FROM python:3.13-slim
|
||||
# copied from https://www.martinrichards.me/post/python_poetry_docker/
|
||||
FROM python:3.13-slim AS base
|
||||
|
||||
ENV VIRTUAL_ENV=/app/.venv \
|
||||
PATH="/app/.venv/bin:$PATH"
|
||||
|
||||
|
||||
|
||||
FROM base AS builder
|
||||
|
||||
ENV POETRY_NO_INTERACTION=1 \
|
||||
POETRY_VIRTUALENVS_IN_PROJECT=1 \
|
||||
POETRY_VIRTUALENVS_CREATE=1 \
|
||||
POETRY_CACHE_DIR=/tmp/poetry_cache
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
RUN pip install poetry
|
||||
|
||||
COPY pyproject.toml poetry.lock ./
|
||||
RUN python -m poetry install --no-root && rm -rf $POETRY_CACHE_DIR
|
||||
|
||||
|
||||
|
||||
FROM base AS runtime
|
||||
|
||||
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
|
||||
|
||||
WORKDIR /app
|
||||
COPY requirements.txt ./
|
||||
RUN pip install -r requirements.txt --no-deps
|
||||
|
||||
COPY bot.py pyproject.toml .env.prod .env.test ./
|
||||
COPY assets ./assets
|
||||
|
||||
@ -65,10 +65,10 @@ code .
|
||||
|
||||
### 运行
|
||||
|
||||
你可以在 VSCode 的「运行与调试」窗口,启动 `运行 Bot 并调试(自动重载)` 任务来启动 Bot,也可以使用命令行手动启动 Bot:
|
||||
使用命令行手动启动 Bot:
|
||||
|
||||
```bash
|
||||
poetry run watchfiles bot.main
|
||||
poetry run watchfiles bot.main konabot
|
||||
```
|
||||
|
||||
如果你不希望自动重载,只是想运行 Bot,可以直接运行:
|
||||
|
||||
9
konabot/common/nb/exc.py
Normal file
9
konabot/common/nb/exc.py
Normal file
@ -0,0 +1,9 @@
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
|
||||
class BotExceptionMessage(Exception):
|
||||
def __init__(self, msg: UniMessage | str) -> None:
|
||||
super().__init__()
|
||||
if isinstance(msg, str):
|
||||
msg = UniMessage().text(msg)
|
||||
self.msg = msg
|
||||
@ -1,17 +1,23 @@
|
||||
from io import BytesIO
|
||||
from typing import Annotated
|
||||
|
||||
import httpx
|
||||
import PIL.Image
|
||||
from loguru import logger
|
||||
import nonebot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.adapters import Bot, Event, Message
|
||||
from nonebot.adapters.discord import Bot as DiscordBot
|
||||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
|
||||
import nonebot.params
|
||||
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||
from PIL import UnidentifiedImageError
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
|
||||
|
||||
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||
# if "/matcha/cache/" in url:
|
||||
@ -133,3 +139,21 @@ async def extract_image_from_message(
|
||||
else:
|
||||
return Failure("暂时不支持在这里中通过引用的方式获取图片")
|
||||
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||
|
||||
|
||||
async def _ext_img(
|
||||
evt: Event,
|
||||
bot: Bot,
|
||||
matcher: Matcher,
|
||||
) -> PIL.Image.Image | None:
|
||||
match await extract_image_from_message(evt.get_message(), evt, bot):
|
||||
case Success(img):
|
||||
return img
|
||||
case Failure(err):
|
||||
# raise BotExceptionMessage(err)
|
||||
await matcher.send(await UniMessage().text(err).export())
|
||||
return None
|
||||
assert False
|
||||
|
||||
|
||||
PIL_Image = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]
|
||||
|
||||
16
konabot/common/nb/match_keyword.py
Normal file
16
konabot/common/nb/match_keyword.py
Normal file
@ -0,0 +1,16 @@
|
||||
import re
|
||||
|
||||
from nonebot_plugin_alconna import Text, UniMsg
|
||||
|
||||
|
||||
def match_keyword(*patterns: str | re.Pattern):
|
||||
async def _matcher(msg: UniMsg):
|
||||
text = msg.get(Text).extract_plain_text().strip()
|
||||
for pattern in patterns:
|
||||
if isinstance(pattern, str) and text == pattern:
|
||||
return True
|
||||
if isinstance(pattern, re.Pattern) and re.match(pattern, text):
|
||||
return True
|
||||
return False
|
||||
|
||||
return _matcher
|
||||
13
konabot/common/nb/reply_image.py
Normal file
13
konabot/common/nb/reply_image.py
Normal file
@ -0,0 +1,13 @@
|
||||
from io import BytesIO
|
||||
|
||||
import PIL
|
||||
import PIL.Image
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
|
||||
|
||||
async def reply_image(matcher: type[Matcher], bot: Bot, img: PIL.Image.Image):
|
||||
data = BytesIO()
|
||||
img.save(data, "PNG")
|
||||
await matcher.send(await UniMessage().image(raw=data).export(bot))
|
||||
59
konabot/docs/user/giftool.txt
Normal file
59
konabot/docs/user/giftool.txt
Normal file
@ -0,0 +1,59 @@
|
||||
指令介绍
|
||||
giftool - 对 GIF 动图进行裁剪、抽帧等处理
|
||||
|
||||
格式
|
||||
giftool [图片] [选项]
|
||||
|
||||
示例
|
||||
回复一张 GIF 并发送:
|
||||
`giftool --ss 1.5 -t 2.0`
|
||||
从 1.5 秒处开始,截取 2 秒长度的片段。
|
||||
|
||||
`giftool [图片] --ss 0:10 -to 0:15`
|
||||
截取从 10 秒到 15 秒之间的片段(支持 MM:SS 或 HH:MM:SS 格式)。
|
||||
|
||||
`giftool [图片] --frames:v 10`
|
||||
将整张 GIF 均匀抽帧,最终保留 10 帧。
|
||||
|
||||
`giftool [图片] --ss 2 --frames:v 5`
|
||||
从第 2 秒开始截取,并将结果抽帧为 5 帧。
|
||||
|
||||
参数说明
|
||||
图片(必需)
|
||||
- 必须是 GIF 动图。
|
||||
- 支持直接附带图片,或回复一条含 GIF 的消息后使用指令。
|
||||
|
||||
--ss <时间戳>(可选)
|
||||
- 指定开始时间(单位:秒),可使用以下格式:
|
||||
• 纯数字(如 `1.5` 表示 1.5 秒)
|
||||
• 分秒格式(如 `1:30` 表示 1 分 30 秒)
|
||||
• 时分秒格式(如 `0:1:30` 表示 1 分 30 秒)
|
||||
- 默认从开头开始(0 秒)。
|
||||
|
||||
-t <持续时间>(可选)
|
||||
- 指定截取的持续时间(单位:秒),格式同 --ss。
|
||||
- 与 --ss 配合使用:截取 [ss, ss + t] 区间。
|
||||
- 不能与 --to 同时使用。
|
||||
|
||||
--to <时间戳>(可选)
|
||||
- 指定结束时间(单位:秒),格式同 --ss。
|
||||
- 与 --ss 配合使用:截取 [ss, to] 区间。
|
||||
- 不能与 -t 同时使用。
|
||||
|
||||
--frames:v <帧数>(可选)
|
||||
- 对截取后的片段进行均匀抽帧,保留指定数量的帧。
|
||||
- 帧数必须为正整数(> 0)。
|
||||
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
|
||||
|
||||
--s <速度>(可选)
|
||||
- 调整 gif 图的速度
|
||||
|
||||
使用方式
|
||||
1. 发送指令前,请确保:
|
||||
- 消息中附带一张 GIF 动图,或
|
||||
- 回复一条包含 GIF 动图的消息后再发送指令。
|
||||
2. 插件会自动:
|
||||
- 解析 GIF 的每一帧及其持续时间(duration)
|
||||
- 根据时间参数转换为帧索引进行裁剪
|
||||
- 如指定抽帧,则对裁剪后的片段均匀采样
|
||||
- 生成新的 GIF 并保持原始循环设置(loop=0)
|
||||
5
konabot/docs/user/黑白.txt
Normal file
5
konabot/docs/user/黑白.txt
Normal file
@ -0,0 +1,5 @@
|
||||
指令介绍
|
||||
黑白 - 将图片经过一个黑白滤镜的处理
|
||||
|
||||
示例
|
||||
引用一个带有图片的消息,或者消息本身携带图片,然后发送「黑白」即可
|
||||
45
konabot/plugins/errman.py
Normal file
45
konabot/plugins/errman.py
Normal file
@ -0,0 +1,45 @@
|
||||
from typing import Any
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.message import run_postprocessor
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from returns.primitives.exceptions import UnwrapFailedError
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
|
||||
|
||||
@run_postprocessor
|
||||
async def _(bot: Bot, matcher: Matcher, exc: BotExceptionMessage | AssertionError | UnwrapFailedError):
|
||||
if isinstance(exc, BotExceptionMessage):
|
||||
msg = exc.msg
|
||||
await matcher.send(await msg.export(bot))
|
||||
if isinstance(exc, AssertionError):
|
||||
if exc.args:
|
||||
err_msg = exc.args[0]
|
||||
|
||||
err_msg_res: UniMessage
|
||||
if isinstance(err_msg, str):
|
||||
err_msg_res = UniMessage().text(err_msg)
|
||||
elif isinstance(err_msg, UniMessage):
|
||||
err_msg_res = err_msg
|
||||
else:
|
||||
return
|
||||
|
||||
await matcher.send(await err_msg_res.export(bot))
|
||||
if isinstance(exc, UnwrapFailedError):
|
||||
obj = exc.halted_container
|
||||
try:
|
||||
failure: Any = obj.failure()
|
||||
|
||||
err_msg_res: UniMessage
|
||||
if isinstance(failure, str):
|
||||
err_msg_res = UniMessage().text(failure)
|
||||
elif isinstance(failure, UniMessage):
|
||||
err_msg_res = failure
|
||||
else:
|
||||
return
|
||||
|
||||
await matcher.send(await err_msg_res.export(bot))
|
||||
except:
|
||||
pass
|
||||
160
konabot/plugins/image_process/__init__.py
Normal file
160
konabot/plugins/image_process/__init__.py
Normal file
@ -0,0 +1,160 @@
|
||||
import re
|
||||
from io import BytesIO
|
||||
|
||||
from nonebot import on_message
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
|
||||
on_alconna)
|
||||
|
||||
from konabot.common.nb.exc import BotExceptionMessage
|
||||
from konabot.common.nb.extract_image import PIL_Image
|
||||
from konabot.common.nb.match_keyword import match_keyword
|
||||
from konabot.common.nb.reply_image import reply_image
|
||||
|
||||
cmd_black_white = on_message(rule=match_keyword("黑白"))
|
||||
|
||||
|
||||
@cmd_black_white.handle()
|
||||
async def _(img: PIL_Image, bot: Bot):
|
||||
await reply_image(cmd_black_white, bot, img.convert("LA"))
|
||||
|
||||
|
||||
def parse_timestamp(tx: str) -> float | None:
|
||||
res = 0.0
|
||||
for component in tx.split(":"):
|
||||
res *= 60
|
||||
if not re.match(r"^\d+(\.\d+)?$", component):
|
||||
return
|
||||
res += float(component)
|
||||
return res
|
||||
|
||||
|
||||
cmd_giftool = on_alconna(Alconna(
|
||||
"giftool",
|
||||
Args["img", Image | None],
|
||||
Option("--ss", Args["start_point", str]),
|
||||
Option("--frames:v", Args["frame_count", int]),
|
||||
Option("-t", Args["length", str]),
|
||||
Option("-to", Args["end_point", str]),
|
||||
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
|
||||
))
|
||||
|
||||
|
||||
@cmd_giftool.handle()
|
||||
async def _(
|
||||
image: PIL_Image,
|
||||
start_point: str | None = None,
|
||||
frame_count: int | None = None,
|
||||
length: str | None = None,
|
||||
speed_factor: float = 1.0,
|
||||
end_point: str | None = None,
|
||||
):
|
||||
ss: None | float = None
|
||||
if start_point:
|
||||
ss = parse_timestamp(start_point)
|
||||
if ss is None:
|
||||
raise BotExceptionMessage("--ss 的格式不满足条件")
|
||||
|
||||
t: None | float = None
|
||||
if length:
|
||||
t = parse_timestamp(length)
|
||||
if t is None:
|
||||
raise BotExceptionMessage("-t 的格式不满足条件")
|
||||
|
||||
to: None | float = None
|
||||
if end_point:
|
||||
to = parse_timestamp(end_point)
|
||||
if to is None:
|
||||
raise BotExceptionMessage("-to 的格式不满足条件")
|
||||
|
||||
if to is not None and ss is not None and to <= ss:
|
||||
raise BotExceptionMessage("错误:出点时间小于入点")
|
||||
if frame_count is not None and frame_count <= 0:
|
||||
raise BotExceptionMessage("错误:帧数量应该大于 0")
|
||||
if speed_factor <= 0:
|
||||
raise BotExceptionMessage("错误:--speed 必须大于 0")
|
||||
|
||||
if not getattr(image, "is_animated", False):
|
||||
raise BotExceptionMessage("错误:输入的不是动图(GIF)")
|
||||
|
||||
frames = []
|
||||
durations = []
|
||||
total_duration = 0.0
|
||||
|
||||
try:
|
||||
for i in range(getattr(image, "n_frames")):
|
||||
image.seek(i)
|
||||
frames.append(image.copy())
|
||||
duration = image.info.get("duration", 100) # 单位:毫秒
|
||||
durations.append(duration)
|
||||
total_duration += duration / 1000.0 # 转为秒
|
||||
except EOFError:
|
||||
pass
|
||||
|
||||
if not frames:
|
||||
raise BotExceptionMessage("错误:读取 GIF 帧失败")
|
||||
|
||||
def time_to_frame_index(target_time: float) -> int:
|
||||
if target_time <= 0:
|
||||
return 0
|
||||
cum = 0.0
|
||||
for idx, dur in enumerate(durations):
|
||||
cum += dur / 1000.0
|
||||
if cum >= target_time:
|
||||
return min(idx, len(frames) - 1)
|
||||
return len(frames) - 1
|
||||
start_frame = 0
|
||||
end_frame = len(frames) - 1
|
||||
if ss is not None:
|
||||
start_frame = time_to_frame_index(ss)
|
||||
if to is not None:
|
||||
end_frame = time_to_frame_index(to)
|
||||
if end_frame < start_frame:
|
||||
end_frame = start_frame
|
||||
elif t is not None:
|
||||
end_time = (ss or 0.0) + t
|
||||
end_frame = time_to_frame_index(end_time)
|
||||
if end_frame < start_frame:
|
||||
end_frame = start_frame
|
||||
|
||||
start_frame = max(0, start_frame)
|
||||
end_frame = min(len(frames) - 1, end_frame)
|
||||
selected_frames = frames[start_frame : end_frame + 1]
|
||||
selected_durations = durations[start_frame : end_frame + 1]
|
||||
|
||||
if frame_count is not None and frame_count > 0:
|
||||
if frame_count >= len(selected_frames):
|
||||
pass
|
||||
else:
|
||||
step = len(selected_frames) / frame_count
|
||||
sampled_frames = []
|
||||
sampled_durations = []
|
||||
for i in range(frame_count):
|
||||
idx = int(i * step)
|
||||
sampled_frames.append(selected_frames[idx])
|
||||
sampled_durations.append(
|
||||
sum(selected_durations) // len(selected_durations)
|
||||
)
|
||||
selected_frames = sampled_frames
|
||||
selected_durations = sampled_durations
|
||||
|
||||
output_img = BytesIO()
|
||||
|
||||
adjusted_durations = [
|
||||
max(10, int(dur / speed_factor)) for dur in selected_durations
|
||||
]
|
||||
|
||||
if selected_frames:
|
||||
selected_frames[0].save(
|
||||
output_img,
|
||||
format="GIF",
|
||||
save_all=True,
|
||||
append_images=selected_frames[1:],
|
||||
duration=adjusted_durations,
|
||||
loop=0,
|
||||
)
|
||||
else:
|
||||
raise BotExceptionMessage("错误:没有可输出的帧")
|
||||
output_img.seek(0)
|
||||
|
||||
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())
|
||||
@ -4,6 +4,7 @@ from pathlib import Path
|
||||
from typing import Any, Literal, cast
|
||||
|
||||
import nonebot
|
||||
import ptimeparse
|
||||
from loguru import logger
|
||||
from nonebot import on_message
|
||||
from nonebot.adapters import Event
|
||||
@ -19,8 +20,6 @@ from nonebot.adapters.onebot.v11.event import \
|
||||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.plugins.simple_notify.parse_time import get_target_time
|
||||
|
||||
evt = on_message()
|
||||
|
||||
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
|
||||
@ -141,10 +140,15 @@ async def _(msg: UniMsg, mEvt: Event):
|
||||
return
|
||||
|
||||
notify_time, notify_text = segments
|
||||
target_time = get_target_time(notify_time)
|
||||
if target_time is None:
|
||||
# target_time = get_target_time(notify_time)
|
||||
try:
|
||||
target_time = ptimeparse.parse(notify_time)
|
||||
except Exception:
|
||||
logger.info(f"无法从 {notify_time} 中解析出时间")
|
||||
return
|
||||
# if target_time is None:
|
||||
# logger.info(f"无法从 {notify_time} 中解析出时间")
|
||||
# return
|
||||
if not notify_text:
|
||||
return
|
||||
|
||||
|
||||
@ -1,358 +0,0 @@
|
||||
import datetime
|
||||
import re
|
||||
from typing import Optional, Dict, List, Callable, Tuple
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# --- 常量与正则表达式定义 (Constants and Regex Definitions) ---
|
||||
|
||||
# 数字模式,兼容中文和阿拉伯数字
|
||||
P_NUM = r"(\d+|[零一两二三四五六七八九十]+)"
|
||||
|
||||
# 预编译的正则表达式
|
||||
PATTERNS = {
|
||||
# 相对时间, e.g., "5分钟后"
|
||||
"DELTA": re.compile(
|
||||
r"^"
|
||||
r"((?P<days>" + P_NUM + r") ?天)?"
|
||||
r"((?P<hours>" + P_NUM + r") ?个?小?时)?"
|
||||
r"((?P<minutes>" + P_NUM + r") ?分钟?)?"
|
||||
r"((?P<seconds>" + P_NUM + r") ?秒钟?)?"
|
||||
r" ?后 ?$"
|
||||
),
|
||||
# 绝对时间
|
||||
"YEAR": re.compile(r"(" + P_NUM + r") ?年"),
|
||||
"MONTH": re.compile(r"(" + P_NUM + r") ?月"),
|
||||
"DAY": re.compile(r"(" + P_NUM + r") ?[日号]"),
|
||||
"HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"),
|
||||
"MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"),
|
||||
"SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"),
|
||||
"HMS_COLON": re.compile(r"(\d{1,2})[::](\d{1,2})([::](\d{1,2}))?"),
|
||||
"PM": re.compile(r"(下午|PM|晚上)"),
|
||||
# 相对日期
|
||||
"TOMORROW": re.compile(r"明天"),
|
||||
"DAY_AFTER_TOMORROW": re.compile(r"后天"),
|
||||
"TODAY": re.compile(r"今天"),
|
||||
}
|
||||
|
||||
# 中文数字到阿拉伯数字的映射
|
||||
CHINESE_TO_ARABIC_MAP: Dict[str, int] = {
|
||||
'零': 0, '一': 1, '二': 2, '三': 3, '四': 4,
|
||||
'五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10
|
||||
}
|
||||
|
||||
# --- 核心工具函数 (Core Utility Functions) ---
|
||||
|
||||
def parse_number(s: str) -> int:
|
||||
"""
|
||||
将包含中文或阿拉伯数字的字符串解析为整数。
|
||||
例如: "五" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。
|
||||
返回 -1 表示解析失败。
|
||||
"""
|
||||
if not s:
|
||||
return -1
|
||||
|
||||
s = s.strip().replace("两", "二")
|
||||
|
||||
if s.isdigit():
|
||||
return int(s)
|
||||
|
||||
if s in CHINESE_TO_ARABIC_MAP:
|
||||
return CHINESE_TO_ARABIC_MAP[s]
|
||||
|
||||
# 处理 "十" 在不同位置的情况
|
||||
if s.startswith('十'):
|
||||
if len(s) == 1:
|
||||
return 10
|
||||
num = CHINESE_TO_ARABIC_MAP.get(s[1])
|
||||
return 10 + num if num is not None else -1
|
||||
|
||||
if s.endswith('十'):
|
||||
if len(s) == 2:
|
||||
num = CHINESE_TO_ARABIC_MAP.get(s[0])
|
||||
return 10 * num if num is not None else -1
|
||||
|
||||
if '十' in s:
|
||||
parts = s.split('十')
|
||||
if len(parts) == 2:
|
||||
left = CHINESE_TO_ARABIC_MAP.get(parts[0])
|
||||
right = CHINESE_TO_ARABIC_MAP.get(parts[1])
|
||||
if left is not None and right is not None:
|
||||
return left * 10 + right
|
||||
|
||||
return -1
|
||||
|
||||
|
||||
# --- 时间解析器类 (Time Parser Class) ---
|
||||
|
||||
class TimeParser:
|
||||
"""
|
||||
一个用于解析自然语言时间描述的类。
|
||||
"""
|
||||
def __init__(self, content: str):
|
||||
self.original_content: str = content
|
||||
self.content_to_parse: str = self._preprocess(content)
|
||||
self.now: datetime.datetime = datetime.datetime.now()
|
||||
# 将 t 作为结果构建器,初始化为今天的午夜
|
||||
self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
self.is_pm_specified: bool = False
|
||||
self.is_date_specified: bool = False
|
||||
self.is_time_specified: bool = False
|
||||
|
||||
def _preprocess(self, content: str) -> str:
|
||||
"""预处理字符串,移除不相关字符。"""
|
||||
content = re.sub(r"\s+", "", content)
|
||||
content = re.sub(r"[,,\.。::、]", "", content)
|
||||
return content
|
||||
|
||||
def _consume_match(self, match: re.Match) -> str:
|
||||
"""从待解析字符串中移除已匹配的部分。"""
|
||||
self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1)
|
||||
return match.group(0)
|
||||
|
||||
def parse(self) -> Optional[datetime.datetime]:
|
||||
"""
|
||||
主解析方法。
|
||||
首先尝试解析相对时间(如“5分钟后”),失败则尝试解析绝对时间。
|
||||
"""
|
||||
logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'")
|
||||
if not self.content_to_parse:
|
||||
logger.debug("❌ 内容为空,无法解析。")
|
||||
return None
|
||||
|
||||
# 1. 尝试相对时间解析
|
||||
if (target_time := self._parse_relative_time()) is not None:
|
||||
return target_time
|
||||
|
||||
# 2. 尝试绝对时间解析
|
||||
if (target_time := self._parse_absolute_time()) is not None:
|
||||
return target_time
|
||||
|
||||
logger.debug(f"❌ 所有解析模式均未匹配成功。")
|
||||
return None
|
||||
|
||||
def _parse_relative_time(self) -> Optional[datetime.datetime]:
|
||||
"""解析 'X天X小时X分钟后' 这种格式。"""
|
||||
if match := PATTERNS["DELTA"].match(self.content_to_parse):
|
||||
logger.debug("⏳ 匹配到相对时间模式 (DELTA)。")
|
||||
try:
|
||||
delta_parts = {
|
||||
"days": parse_number(match.group("days") or "0"),
|
||||
"hours": parse_number(match.group("hours") or "0"),
|
||||
"minutes": parse_number(match.group("minutes") or "0"),
|
||||
"seconds": parse_number(match.group("seconds") or "0"),
|
||||
}
|
||||
|
||||
# 检查是否有无效的数字解析
|
||||
if any(v < 0 for v in delta_parts.values()):
|
||||
logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}")
|
||||
return None
|
||||
|
||||
delta = datetime.timedelta(**delta_parts)
|
||||
if delta.total_seconds() == 0:
|
||||
logger.debug("❌ 解析出的时间增量为0。")
|
||||
return None
|
||||
|
||||
target_time = self.now + delta
|
||||
logger.debug(f"✅ 相对时间解析成功 -> {target_time}")
|
||||
return target_time
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True)
|
||||
return None
|
||||
return None
|
||||
|
||||
def _parse_absolute_time(self) -> Optional[datetime.datetime]:
|
||||
"""解析一个指定的日期和时间。"""
|
||||
logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}")
|
||||
|
||||
# 定义解析步骤和顺序
|
||||
# (pattern_key, handler_method)
|
||||
parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [
|
||||
("TOMORROW", self._handle_tomorrow),
|
||||
("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow),
|
||||
("TODAY", self._handle_today),
|
||||
("YEAR", self._handle_year),
|
||||
("MONTH", self._handle_month),
|
||||
("DAY", self._handle_day),
|
||||
("HMS_COLON", self._handle_hms_colon),
|
||||
("PM", self._handle_pm),
|
||||
("HOUR", self._handle_hour),
|
||||
("MINUTE", self._handle_minute),
|
||||
("SECOND", self._handle_second),
|
||||
]
|
||||
|
||||
for key, handler in parsing_steps:
|
||||
if match := PATTERNS[key].search(self.content_to_parse):
|
||||
if not handler(match):
|
||||
# 如果任何一个处理器返回False,说明解析失败
|
||||
return None
|
||||
|
||||
# 移除无意义的上午关键词
|
||||
self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "")
|
||||
|
||||
# 如果解析后还有剩余字符,说明有无法识别的部分
|
||||
if self.content_to_parse.strip():
|
||||
logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'")
|
||||
return None
|
||||
|
||||
# 最终调整和检查
|
||||
return self._finalize_datetime()
|
||||
|
||||
# --- Handler Methods for Absolute Time Parsing ---
|
||||
|
||||
def _handle_tomorrow(self, match: re.Match) -> bool:
|
||||
self.t += datetime.timedelta(days=1)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_day_after_tomorrow(self, match: re.Match) -> bool:
|
||||
self.t += datetime.timedelta(days=2)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_today(self, match: re.Match) -> bool:
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_year(self, match: re.Match) -> bool:
|
||||
year = parse_number(match.group(1))
|
||||
if year < 0: return False
|
||||
if year < 100: year += 2000 # 处理 "25年" -> 2025
|
||||
if year < self.now.year:
|
||||
logger.debug(f"❌ 指定的年份 {year} 已过去。")
|
||||
return False
|
||||
self.t = self.t.replace(year=year)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_month(self, match: re.Match) -> bool:
|
||||
month = parse_number(match.group(1))
|
||||
if not (1 <= month <= 12):
|
||||
logger.debug(f"❌ 无效的月份: {month}")
|
||||
return False
|
||||
|
||||
# 如果设置的月份在当前月份之前,且没有指定年份,则年份加一
|
||||
if month < self.t.month and not self.is_date_specified:
|
||||
self.t = self.t.replace(year=self.t.year + 1)
|
||||
logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}")
|
||||
|
||||
self.t = self.t.replace(month=month)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_day(self, match: re.Match) -> bool:
|
||||
day = parse_number(match.group(1))
|
||||
if not (1 <= day <= 31):
|
||||
logger.debug(f"❌ 无效的日期: {day}")
|
||||
return False
|
||||
|
||||
try:
|
||||
# 如果日期小于当前日期,且只指定了日,则月份加一
|
||||
if day < self.t.day and not self.is_date_specified:
|
||||
if self.t.month == 12:
|
||||
self.t = self.t.replace(year=self.t.year + 1, month=1)
|
||||
else:
|
||||
self.t = self.t.replace(month=self.t.month + 1)
|
||||
logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}")
|
||||
|
||||
self.t = self.t.replace(day=day)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
except ValueError:
|
||||
logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。")
|
||||
return False
|
||||
|
||||
def _handle_hms_colon(self, match: re.Match) -> bool:
|
||||
h = int(match.group(1))
|
||||
m = int(match.group(2))
|
||||
s_str = match.group(4) # group(3) is with colon, group(4) is the number
|
||||
s = int(s_str) if s_str else 0
|
||||
if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59):
|
||||
logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}")
|
||||
return False
|
||||
self.t = self.t.replace(hour=h, minute=m, second=s)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_pm(self, match: re.Match) -> bool:
|
||||
self.is_pm_specified = True
|
||||
logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_hour(self, match: re.Match) -> bool:
|
||||
hour = parse_number(match.group(1))
|
||||
has_half = match.group(2) == '半'
|
||||
if not (0 <= hour <= 23):
|
||||
logger.debug(f"❌ 无效的小时: {hour}")
|
||||
return False
|
||||
minute = 30 if has_half else self.t.minute
|
||||
self.t = self.t.replace(hour=hour, minute=minute)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_minute(self, match: re.Match) -> bool:
|
||||
minute = parse_number(match.group(1))
|
||||
if not (0 <= minute <= 59):
|
||||
logger.debug(f"❌ 无效的分钟: {minute}")
|
||||
return False
|
||||
self.t = self.t.replace(minute=minute)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_second(self, match: re.Match) -> bool:
|
||||
second = parse_number(match.group(1))
|
||||
if not (0 <= second <= 59):
|
||||
logger.debug(f"❌ 无效的秒: {second}")
|
||||
return False
|
||||
self.t = self.t.replace(second=second)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _finalize_datetime(self) -> Optional[datetime.datetime]:
|
||||
"""对解析出的时间进行最后的调整和检查。"""
|
||||
# 处理下午/晚上
|
||||
if self.is_pm_specified and self.t.hour < 12:
|
||||
self.t = self.t.replace(hour=self.t.hour + 12)
|
||||
logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}")
|
||||
|
||||
# 如果没有指定任何时间或日期部分,则认为解析无效
|
||||
if not self.is_date_specified and not self.is_time_specified:
|
||||
logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。")
|
||||
return None
|
||||
|
||||
# 如果最终计算出的时间点在当前时间之前,自动往后推
|
||||
# 例如:现在是 15:00,说 "14点",应该是指明天的14点
|
||||
if self.t < self.now:
|
||||
# 只有在明确指定了时间的情况下,才自动加一天
|
||||
# 如果只指定了一个过去的日期(如“去年5月1号”),则不应该调整
|
||||
if self.is_time_specified:
|
||||
self.t += datetime.timedelta(days=1)
|
||||
logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}")
|
||||
|
||||
logger.debug(f"✅ 解析成功,最终时间: {self.t}")
|
||||
return self.t
|
||||
|
||||
# --- 公共接口 (Public Interface) ---
|
||||
|
||||
def get_target_time(content: str) -> Optional[datetime.datetime]:
|
||||
"""
|
||||
高级接口,用于将自然语言时间描述转换为 datetime 对象。
|
||||
|
||||
Args:
|
||||
content: 包含时间信息的字符串。
|
||||
|
||||
Returns:
|
||||
一个 datetime 对象,如果解析失败则返回 None。
|
||||
"""
|
||||
parser = TimeParser(content)
|
||||
return parser.parse()
|
||||
@ -1,17 +1,14 @@
|
||||
import os
|
||||
import tempfile
|
||||
from typing import Optional
|
||||
from io import BytesIO
|
||||
|
||||
from PIL import Image, ImageSequence
|
||||
from loguru import logger
|
||||
from nonebot.adapters import Bot as BaseBot
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
Args,
|
||||
Field,
|
||||
UniMessage,
|
||||
on_alconna,
|
||||
)
|
||||
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
|
||||
from PIL import Image
|
||||
from returns.result import Failure, Success
|
||||
|
||||
from konabot.common.nb.extract_image import extract_image_from_message
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="ytpgif",
|
||||
@ -53,29 +50,6 @@ ytpgif_cmd = on_alconna(
|
||||
)
|
||||
|
||||
|
||||
async def get_image_url(event: BaseEvent) -> Optional[str]:
|
||||
"""从事件中提取图片 URL,支持直接消息和回复"""
|
||||
msg = event.get_message()
|
||||
for seg in msg:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
return str(seg.data["url"])
|
||||
|
||||
if hasattr(event, "reply") and (reply := event.reply):
|
||||
reply_msg = reply.message
|
||||
for seg in reply_msg:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
return str(seg.data["url"])
|
||||
return None
|
||||
|
||||
|
||||
async def download_image(url: str) -> bytes:
|
||||
import httpx
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.get(url, timeout=10)
|
||||
resp.raise_for_status()
|
||||
return resp.content
|
||||
|
||||
|
||||
def resize_frame(frame: Image.Image) -> Image.Image:
|
||||
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
|
||||
w, h = frame.size
|
||||
@ -89,7 +63,7 @@ def resize_frame(frame: Image.Image) -> Image.Image:
|
||||
|
||||
|
||||
@ytpgif_cmd.handle()
|
||||
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
|
||||
async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
|
||||
# === 校验 speed 范围 ===
|
||||
if not (MIN_SPEED <= speed <= MAX_SPEED):
|
||||
await ytpgif_cmd.send(
|
||||
@ -97,172 +71,150 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
|
||||
)
|
||||
return
|
||||
|
||||
img_url = await get_image_url(event)
|
||||
if not img_url:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text(
|
||||
"请发送一张图片或回复一张图片来生成镜像动图。"
|
||||
).export()
|
||||
)
|
||||
return
|
||||
match await extract_image_from_message(event.get_message(), event, bot):
|
||||
case Success(img):
|
||||
src_img = img
|
||||
|
||||
case Failure(msg):
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text(msg).export()
|
||||
)
|
||||
return
|
||||
|
||||
case _:
|
||||
return
|
||||
|
||||
try:
|
||||
image_data = await download_image(img_url)
|
||||
except Exception as e:
|
||||
print(f"[YTPGIF] 下载失败: {e}")
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("❌ 图片下载失败,请重试。").export()
|
||||
)
|
||||
return
|
||||
try:
|
||||
n_frames = getattr(src_img, "n_frames", 1)
|
||||
is_animated = n_frames > 1
|
||||
logger.debug(f"收到的动图的运动状态:{is_animated} 帧数量:{n_frames}")
|
||||
except Exception:
|
||||
is_animated = False
|
||||
n_frames = 1
|
||||
|
||||
input_path = output_path = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
|
||||
tmp_in.write(image_data)
|
||||
input_path = tmp_in.name
|
||||
output_frames = []
|
||||
output_durations_ms = []
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out:
|
||||
output_path = tmp_out.name
|
||||
if is_animated:
|
||||
# === 动图模式:截取正向 + 镜像两段 ===
|
||||
frames_with_duration: list[tuple[Image.Image, float]] = []
|
||||
palette = src_img.getpalette()
|
||||
|
||||
with Image.open(input_path) as src_img:
|
||||
# === 判断是否为动图 ===
|
||||
try:
|
||||
n_frames = getattr(src_img, "n_frames", 1)
|
||||
is_animated = n_frames > 1
|
||||
except Exception:
|
||||
is_animated = False
|
||||
for idx in range(n_frames):
|
||||
src_img.seek(idx)
|
||||
frame = src_img.copy()
|
||||
# 检查是否需要透明通道
|
||||
has_alpha = (
|
||||
frame.mode in ("RGBA", "LA")
|
||||
or (frame.mode == "P" and "transparency" in frame.info)
|
||||
)
|
||||
if has_alpha:
|
||||
frame = frame.convert("RGBA")
|
||||
else:
|
||||
frame = frame.convert("RGB")
|
||||
resized_frame = resize_frame(frame)
|
||||
|
||||
output_frames = []
|
||||
output_durations_ms = []
|
||||
# 若原图有调色板,尝试保留(可选)
|
||||
if palette and resized_frame.mode == "P":
|
||||
try:
|
||||
resized_frame.putpalette(palette)
|
||||
except Exception: # noqa
|
||||
logger.debug("色板应用失败")
|
||||
pass
|
||||
|
||||
if is_animated:
|
||||
# === 动图模式:截取正向 + 镜像两段 ===
|
||||
frames_with_duration = []
|
||||
palette = src_img.getpalette()
|
||||
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
|
||||
dur_sec = max(0.01, ms / 1000.0)
|
||||
frames_with_duration.append((resized_frame, dur_sec))
|
||||
|
||||
for idx in range(n_frames):
|
||||
src_img.seek(idx)
|
||||
frame = src_img.copy()
|
||||
# 检查是否需要透明通道
|
||||
has_alpha = (
|
||||
frame.mode in ("RGBA", "LA")
|
||||
or (frame.mode == "P" and "transparency" in frame.info)
|
||||
)
|
||||
if has_alpha:
|
||||
frame = frame.convert("RGBA")
|
||||
else:
|
||||
frame = frame.convert("RGB")
|
||||
resized_frame = resize_frame(frame)
|
||||
max_dur = BASE_SEGMENT_DURATION * speed
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
|
||||
# 若原图有调色板,尝试保留(可选)
|
||||
if palette and resized_frame.mode == "P":
|
||||
try:
|
||||
resized_frame.putpalette(palette)
|
||||
except Exception: # noqa
|
||||
pass
|
||||
# 正向段
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
output_frames.append(img)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
|
||||
dur_sec = max(0.01, ms / 1000.0)
|
||||
frames_with_duration.append((resized_frame, dur_sec))
|
||||
|
||||
max_dur = BASE_SEGMENT_DURATION * speed
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
|
||||
# 正向段
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
output_frames.append(img)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
if frame_count == 0:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
|
||||
)
|
||||
return
|
||||
|
||||
# 镜像段(从头开始)
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
|
||||
output_frames.append(flipped)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
else:
|
||||
# === 静态图模式:制作翻转动画 ===
|
||||
raw_frame = src_img.convert("RGBA")
|
||||
resized_frame = resize_frame(raw_frame)
|
||||
|
||||
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
|
||||
duration_ms = int(interval_sec * 1000)
|
||||
|
||||
frame1 = resized_frame
|
||||
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
|
||||
|
||||
output_frames = [frame1, frame2]
|
||||
output_durations_ms = [duration_ms, duration_ms]
|
||||
|
||||
if len(output_frames) < 1:
|
||||
if frame_count == 0:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("未能生成任何帧。").export()
|
||||
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
|
||||
)
|
||||
return
|
||||
|
||||
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
|
||||
need_transparency = False
|
||||
for frame in output_frames:
|
||||
if frame.mode == "RGBA":
|
||||
alpha_channel = frame.getchannel("A")
|
||||
if any(pix < 255 for pix in alpha_channel.getdata()):
|
||||
need_transparency = True
|
||||
break
|
||||
elif frame.mode == "P" and "transparency" in frame.info:
|
||||
# 镜像段(从头开始)
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
flipped = img.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
|
||||
output_frames.append(flipped)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
else:
|
||||
# === 静态图模式:制作翻转动画 ===
|
||||
raw_frame = src_img.convert("RGBA")
|
||||
resized_frame = resize_frame(raw_frame)
|
||||
|
||||
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
|
||||
duration_ms = int(interval_sec * 1000)
|
||||
|
||||
frame1 = resized_frame
|
||||
frame2 = resized_frame.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
|
||||
|
||||
output_frames = [frame1, frame2]
|
||||
output_durations_ms = [duration_ms, duration_ms]
|
||||
|
||||
if len(output_frames) < 1:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("未能生成任何帧。").export()
|
||||
)
|
||||
return
|
||||
|
||||
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
|
||||
need_transparency = False
|
||||
for frame in output_frames:
|
||||
if frame.mode == "RGBA":
|
||||
alpha_channel = frame.getchannel("A")
|
||||
if any(pix < 255 for pix in alpha_channel.getdata()):
|
||||
need_transparency = True
|
||||
break
|
||||
elif frame.mode == "P" and "transparency" in frame.info:
|
||||
need_transparency = True
|
||||
break
|
||||
|
||||
# 如果不需要透明,则统一转为 RGB 避免调色板污染
|
||||
if not need_transparency:
|
||||
output_frames = [f.convert("RGB") for f in output_frames]
|
||||
# 如果不需要透明,则统一转为 RGB 避免调色板污染
|
||||
if not need_transparency:
|
||||
output_frames = [f.convert("RGB") for f in output_frames]
|
||||
|
||||
# 构建保存参数
|
||||
save_kwargs = {
|
||||
"save_all": True,
|
||||
"append_images": output_frames[1:],
|
||||
"format": "GIF",
|
||||
"loop": 0, # 无限循环
|
||||
"duration": output_durations_ms,
|
||||
"disposal": 2, # 清除到背景色,避免残留
|
||||
"optimize": False, # 关闭抖动(等效 -dither none)
|
||||
}
|
||||
# 构建保存参数
|
||||
save_kwargs = {
|
||||
"save_all": True,
|
||||
"append_images": output_frames[1:],
|
||||
"format": "GIF",
|
||||
"loop": 0, # 无限循环
|
||||
"duration": output_durations_ms,
|
||||
"disposal": 2, # 清除到背景色,避免残留
|
||||
"optimize": False, # 关闭抖动(等效 -dither none)
|
||||
}
|
||||
|
||||
# 只有真正需要透明时才启用 transparency
|
||||
if need_transparency:
|
||||
save_kwargs["transparency"] = 0
|
||||
# 只有真正需要透明时才启用 transparency
|
||||
if need_transparency:
|
||||
save_kwargs["transparency"] = 0
|
||||
|
||||
output_frames[0].save(output_path, **save_kwargs)
|
||||
|
||||
# 发送结果
|
||||
with open(output_path, "rb") as f:
|
||||
result_image = UniMessage.image(raw=f.read())
|
||||
bio = BytesIO()
|
||||
output_frames[0].save(bio, **save_kwargs)
|
||||
result_image = UniMessage.image(raw=bio)
|
||||
await ytpgif_cmd.send(await result_image.export())
|
||||
|
||||
except Exception as e:
|
||||
print(f"[YTPGIF] 处理失败: {e}")
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
|
||||
)
|
||||
finally:
|
||||
for path in filter(None, [input_path, output_path]):
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
os.unlink(path)
|
||||
except: # noqa
|
||||
pass
|
||||
)
|
||||
19
poetry.lock
generated
19
poetry.lock
generated
@ -2062,6 +2062,23 @@ files = [
|
||||
{file = "propcache-0.3.2.tar.gz", hash = "sha256:20d7d62e4e7ef05f221e0db2856b979540686342e7dd9973b815599c7057e168"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ptimeparse"
|
||||
version = "0.1.2"
|
||||
description = "一个用于解析中文的时间表达的库"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "ptimeparse-0.1.2-py3-none-any.whl", hash = "sha256:0eea791396e53b63330fadb40d9f0a2e6272bd5467246f10d1d6971bc606edff"},
|
||||
{file = "ptimeparse-0.1.2.tar.gz", hash = "sha256:658be90a3cc2994c09c4ea2f276d257e7eb84bc330be79950baefe32b19779a2"},
|
||||
]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple"
|
||||
reference = "pt-gitea-pypi"
|
||||
|
||||
[[package]]
|
||||
name = "pycares"
|
||||
version = "4.11.0"
|
||||
@ -3181,4 +3198,4 @@ type = ["pytest-mypy"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.12,<4.0"
|
||||
content-hash = "927913b9030d1f6c126bb2d12eab7307dc6297f259c7c62e3033706457d27ce0"
|
||||
content-hash = "b4c3d28f7572c57e867d126ce0c64787ae608b114e66b8de06147caf13e049dd"
|
||||
|
||||
@ -22,9 +22,17 @@ dependencies = [
|
||||
"imagetext-py (>=2.2.0,<3.0.0)",
|
||||
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
|
||||
"returns (>=0.26.0,<0.27.0)",
|
||||
"ptimeparse (>=0.1.1,<0.2.0)",
|
||||
]
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=2.0.0,<3.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[[tool.poetry.source]]
|
||||
name = "pt-gitea-pypi"
|
||||
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
|
||||
priority = "supplemental"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
ptimeparse = {source = "pt-gitea-pypi"}
|
||||
|
||||
@ -1,84 +0,0 @@
|
||||
aio-mc-rcon==3.4.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
aiodns==3.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
aiohappyeyeballs==2.6.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
aiohttp==3.12.15 ; python_version >= "3.12" and python_version < "4.0"
|
||||
aiosignal==1.4.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
annotated-types==0.7.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
anyio==4.11.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
apscheduler==3.11.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
arclet-alconna-tools==0.7.11 ; python_version >= "3.12" and python_version < "4.0"
|
||||
arclet-alconna==1.8.40 ; python_version >= "3.12" and python_version < "4.0"
|
||||
attrs==25.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
beautifulsoup4==4.13.5 ; python_version >= "3.12" and python_version < "4.0"
|
||||
brotli==1.1.0 ; python_version >= "3.12" and python_version < "4.0" and platform_python_implementation == "CPython"
|
||||
brotlicffi==1.1.0.0 ; python_version >= "3.12" and python_version < "4.0" and platform_python_implementation != "CPython"
|
||||
certifi==2025.8.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
cffi==2.0.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
charset-normalizer==3.4.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
click==8.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
colorama==0.4.6 ; python_version >= "3.12" and python_version < "4.0" and (sys_platform == "win32" or platform_system == "Windows")
|
||||
exceptiongroup==1.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
fastapi==0.117.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
frozenlist==1.7.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
h11==0.16.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
h2==4.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
hpack==4.1.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
httpcore==1.0.9 ; python_version >= "3.12" and python_version < "4.0"
|
||||
httptools==0.6.4 ; python_version >= "3.12" and python_version < "4.0"
|
||||
httpx==0.28.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
hyperframe==6.1.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
idna==3.10 ; python_version >= "3.12" and python_version < "4.0"
|
||||
imagetext-py==2.2.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
importlib-metadata==8.7.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
linkify-it-py==2.0.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
loguru==0.7.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
lxml==6.0.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
markdown-it-py==4.0.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
mdit-py-plugins==0.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
mdurl==0.1.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
msgpack==1.1.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
multidict==6.6.4 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nepattern==0.7.7 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-adapter-console==0.9.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-adapter-discord==0.1.8 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-adapter-minecraft==1.5.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-adapter-onebot==2.4.6 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-plugin-alconna==0.59.4 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-plugin-apscheduler==0.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-plugin-waiter==0.8.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot2==2.4.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonechat==0.6.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
numpy==2.2.6 ; python_version >= "3.12" and python_version < "4.0"
|
||||
opencv-python-headless==4.12.0.88 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pillow==11.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
platformdirs==4.4.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
propcache==0.3.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pycares==4.11.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pycparser==2.23 ; python_version >= "3.12" and python_version < "4.0" and implementation_name != "PyPy"
|
||||
pydantic-core==2.33.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pydantic==2.11.9 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pygments==2.19.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pygtrie==2.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
python-dotenv==1.1.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pyyaml==6.0.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
requests==2.32.5 ; python_version >= "3.12" and python_version < "4.0"
|
||||
returns==0.26.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
rich==14.1.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
sniffio==1.3.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
soupsieve==2.8 ; python_version >= "3.12" and python_version < "4.0"
|
||||
starlette==0.48.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
tarina==0.6.8 ; python_version >= "3.12" and python_version < "4.0"
|
||||
textual==3.7.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
typing-extensions==4.15.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
typing-inspection==0.4.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
tzdata==2025.2 ; python_version >= "3.12" and python_version < "4.0" and platform_system == "Windows"
|
||||
tzlocal==5.3.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
uc-micro-py==1.0.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
urllib3==2.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
uvicorn==0.37.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
uvloop==0.21.0 ; python_version >= "3.12" and python_version < "4.0" and sys_platform != "win32" and sys_platform != "cygwin" and platform_python_implementation != "PyPy"
|
||||
watchfiles==1.1.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
websockets==15.0.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
win32-setctime==1.2.0 ; python_version >= "3.12" and python_version < "4.0" and sys_platform == "win32"
|
||||
yarl==1.20.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
zipp==3.23.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
@ -8,9 +8,12 @@ nonebot.load_plugins("konabot/plugins")
|
||||
|
||||
plugins = nonebot.get_loaded_plugins()
|
||||
len_requires = len(
|
||||
[f for f in (
|
||||
Path(__file__).parent.parent / "konabot" / "plugins"
|
||||
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
|
||||
[
|
||||
f
|
||||
for f in (Path(__file__).parent.parent / "konabot" / "plugins").iterdir()
|
||||
if (f.is_dir() and (f / "__init__.py").exists())
|
||||
or ((not f.is_dir()) and f.suffix == ".py")
|
||||
]
|
||||
)
|
||||
|
||||
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]
|
||||
|
||||
Reference in New Issue
Block a user