Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4eac493de4 | |||
| b4e400b626 | |||
| c35ee57976 |
24
.vscode/launch.json
vendored
24
.vscode/launch.json
vendored
@ -1,24 +0,0 @@
|
||||
{
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "运行 Bot 并调试(自动重载)",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"module": "watchfiles",
|
||||
"args": [
|
||||
"bot.main"
|
||||
],
|
||||
"console": "integratedTerminal",
|
||||
"justMyCode": true,
|
||||
"env": {
|
||||
"PYTHONPATH": "${workspaceFolder}"
|
||||
},
|
||||
"cwd": "${workspaceFolder}",
|
||||
"presentation": {
|
||||
"hidden": false,
|
||||
"group": "bot"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
30
.vscode/tasks.json
vendored
30
.vscode/tasks.json
vendored
@ -1,30 +0,0 @@
|
||||
{
|
||||
"version": "2.0.0",
|
||||
"tasks": [
|
||||
{
|
||||
"label": "Poetry: Export requirements.txt (Production)",
|
||||
"type": "shell",
|
||||
"command": "poetry export -f requirements.txt --output requirements.txt --without-hashes",
|
||||
"group": "build",
|
||||
"presentation": {
|
||||
"reveal": "always",
|
||||
"panel": "new"
|
||||
},
|
||||
"problemMatcher": [],
|
||||
"detail": "导出生产环境依赖到 requirements.txt"
|
||||
},
|
||||
{
|
||||
"label": "Bot: Run with Auto-reload",
|
||||
"type": "shell",
|
||||
"command": "poetry run watchfiles bot.main",
|
||||
"group": "build",
|
||||
"isBackground": true,
|
||||
"presentation": {
|
||||
"reveal": "always",
|
||||
"panel": "new"
|
||||
},
|
||||
"problemMatcher": [],
|
||||
"detail": "运行 bot 并启用自动重载功能"
|
||||
}
|
||||
]
|
||||
}
|
||||
30
Dockerfile
30
Dockerfile
@ -1,8 +1,32 @@
|
||||
FROM python:3.13-slim
|
||||
# copied from https://www.martinrichards.me/post/python_poetry_docker/
|
||||
FROM python:3.13-slim AS base
|
||||
|
||||
ENV VIRTUAL_ENV=/app/.venv \
|
||||
PATH="/app/.venv/bin:$PATH"
|
||||
|
||||
|
||||
|
||||
FROM base AS builder
|
||||
|
||||
ENV POETRY_NO_INTERACTION=1 \
|
||||
POETRY_VIRTUALENVS_IN_PROJECT=1 \
|
||||
POETRY_VIRTUALENVS_CREATE=1 \
|
||||
POETRY_CACHE_DIR=/tmp/poetry_cache
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
RUN pip install poetry
|
||||
|
||||
COPY pyproject.toml poetry.lock ./
|
||||
RUN python -m poetry install --no-root && rm -rf $POETRY_CACHE_DIR
|
||||
|
||||
|
||||
|
||||
FROM base AS runtime
|
||||
|
||||
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
|
||||
|
||||
WORKDIR /app
|
||||
COPY requirements.txt ./
|
||||
RUN pip install -r requirements.txt --no-deps
|
||||
|
||||
COPY bot.py pyproject.toml .env.prod .env.test ./
|
||||
COPY assets ./assets
|
||||
|
||||
@ -65,10 +65,10 @@ code .
|
||||
|
||||
### 运行
|
||||
|
||||
你可以在 VSCode 的「运行与调试」窗口,启动 `运行 Bot 并调试(自动重载)` 任务来启动 Bot,也可以使用命令行手动启动 Bot:
|
||||
使用命令行手动启动 Bot:
|
||||
|
||||
```bash
|
||||
poetry run watchfiles bot.main
|
||||
poetry run watchfiles bot.main konabot
|
||||
```
|
||||
|
||||
如果你不希望自动重载,只是想运行 Bot,可以直接运行:
|
||||
|
||||
@ -4,6 +4,7 @@ from pathlib import Path
|
||||
from typing import Any, Literal, cast
|
||||
|
||||
import nonebot
|
||||
import ptimeparse
|
||||
from loguru import logger
|
||||
from nonebot import on_message
|
||||
from nonebot.adapters import Event
|
||||
@ -19,8 +20,6 @@ from nonebot.adapters.onebot.v11.event import \
|
||||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.plugins.simple_notify.parse_time import get_target_time
|
||||
|
||||
evt = on_message()
|
||||
|
||||
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
|
||||
@ -141,10 +140,15 @@ async def _(msg: UniMsg, mEvt: Event):
|
||||
return
|
||||
|
||||
notify_time, notify_text = segments
|
||||
target_time = get_target_time(notify_time)
|
||||
if target_time is None:
|
||||
# target_time = get_target_time(notify_time)
|
||||
try:
|
||||
target_time = ptimeparse.parse(notify_time)
|
||||
except Exception:
|
||||
logger.info(f"无法从 {notify_time} 中解析出时间")
|
||||
return
|
||||
# if target_time is None:
|
||||
# logger.info(f"无法从 {notify_time} 中解析出时间")
|
||||
# return
|
||||
if not notify_text:
|
||||
return
|
||||
|
||||
|
||||
@ -1,358 +0,0 @@
|
||||
import datetime
|
||||
import re
|
||||
from typing import Optional, Dict, List, Callable, Tuple
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# --- 常量与正则表达式定义 (Constants and Regex Definitions) ---
|
||||
|
||||
# 数字模式,兼容中文和阿拉伯数字
|
||||
P_NUM = r"(\d+|[零一两二三四五六七八九十]+)"
|
||||
|
||||
# 预编译的正则表达式
|
||||
PATTERNS = {
|
||||
# 相对时间, e.g., "5分钟后"
|
||||
"DELTA": re.compile(
|
||||
r"^"
|
||||
r"((?P<days>" + P_NUM + r") ?天)?"
|
||||
r"((?P<hours>" + P_NUM + r") ?个?小?时)?"
|
||||
r"((?P<minutes>" + P_NUM + r") ?分钟?)?"
|
||||
r"((?P<seconds>" + P_NUM + r") ?秒钟?)?"
|
||||
r" ?后 ?$"
|
||||
),
|
||||
# 绝对时间
|
||||
"YEAR": re.compile(r"(" + P_NUM + r") ?年"),
|
||||
"MONTH": re.compile(r"(" + P_NUM + r") ?月"),
|
||||
"DAY": re.compile(r"(" + P_NUM + r") ?[日号]"),
|
||||
"HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"),
|
||||
"MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"),
|
||||
"SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"),
|
||||
"HMS_COLON": re.compile(r"(\d{1,2})[::](\d{1,2})([::](\d{1,2}))?"),
|
||||
"PM": re.compile(r"(下午|PM|晚上)"),
|
||||
# 相对日期
|
||||
"TOMORROW": re.compile(r"明天"),
|
||||
"DAY_AFTER_TOMORROW": re.compile(r"后天"),
|
||||
"TODAY": re.compile(r"今天"),
|
||||
}
|
||||
|
||||
# 中文数字到阿拉伯数字的映射
|
||||
CHINESE_TO_ARABIC_MAP: Dict[str, int] = {
|
||||
'零': 0, '一': 1, '二': 2, '三': 3, '四': 4,
|
||||
'五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10
|
||||
}
|
||||
|
||||
# --- 核心工具函数 (Core Utility Functions) ---
|
||||
|
||||
def parse_number(s: str) -> int:
|
||||
"""
|
||||
将包含中文或阿拉伯数字的字符串解析为整数。
|
||||
例如: "五" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。
|
||||
返回 -1 表示解析失败。
|
||||
"""
|
||||
if not s:
|
||||
return -1
|
||||
|
||||
s = s.strip().replace("两", "二")
|
||||
|
||||
if s.isdigit():
|
||||
return int(s)
|
||||
|
||||
if s in CHINESE_TO_ARABIC_MAP:
|
||||
return CHINESE_TO_ARABIC_MAP[s]
|
||||
|
||||
# 处理 "十" 在不同位置的情况
|
||||
if s.startswith('十'):
|
||||
if len(s) == 1:
|
||||
return 10
|
||||
num = CHINESE_TO_ARABIC_MAP.get(s[1])
|
||||
return 10 + num if num is not None else -1
|
||||
|
||||
if s.endswith('十'):
|
||||
if len(s) == 2:
|
||||
num = CHINESE_TO_ARABIC_MAP.get(s[0])
|
||||
return 10 * num if num is not None else -1
|
||||
|
||||
if '十' in s:
|
||||
parts = s.split('十')
|
||||
if len(parts) == 2:
|
||||
left = CHINESE_TO_ARABIC_MAP.get(parts[0])
|
||||
right = CHINESE_TO_ARABIC_MAP.get(parts[1])
|
||||
if left is not None and right is not None:
|
||||
return left * 10 + right
|
||||
|
||||
return -1
|
||||
|
||||
|
||||
# --- 时间解析器类 (Time Parser Class) ---
|
||||
|
||||
class TimeParser:
|
||||
"""
|
||||
一个用于解析自然语言时间描述的类。
|
||||
"""
|
||||
def __init__(self, content: str):
|
||||
self.original_content: str = content
|
||||
self.content_to_parse: str = self._preprocess(content)
|
||||
self.now: datetime.datetime = datetime.datetime.now()
|
||||
# 将 t 作为结果构建器,初始化为今天的午夜
|
||||
self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
self.is_pm_specified: bool = False
|
||||
self.is_date_specified: bool = False
|
||||
self.is_time_specified: bool = False
|
||||
|
||||
def _preprocess(self, content: str) -> str:
|
||||
"""预处理字符串,移除不相关字符。"""
|
||||
content = re.sub(r"\s+", "", content)
|
||||
content = re.sub(r"[,,\.。::、]", "", content)
|
||||
return content
|
||||
|
||||
def _consume_match(self, match: re.Match) -> str:
|
||||
"""从待解析字符串中移除已匹配的部分。"""
|
||||
self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1)
|
||||
return match.group(0)
|
||||
|
||||
def parse(self) -> Optional[datetime.datetime]:
|
||||
"""
|
||||
主解析方法。
|
||||
首先尝试解析相对时间(如“5分钟后”),失败则尝试解析绝对时间。
|
||||
"""
|
||||
logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'")
|
||||
if not self.content_to_parse:
|
||||
logger.debug("❌ 内容为空,无法解析。")
|
||||
return None
|
||||
|
||||
# 1. 尝试相对时间解析
|
||||
if (target_time := self._parse_relative_time()) is not None:
|
||||
return target_time
|
||||
|
||||
# 2. 尝试绝对时间解析
|
||||
if (target_time := self._parse_absolute_time()) is not None:
|
||||
return target_time
|
||||
|
||||
logger.debug(f"❌ 所有解析模式均未匹配成功。")
|
||||
return None
|
||||
|
||||
def _parse_relative_time(self) -> Optional[datetime.datetime]:
|
||||
"""解析 'X天X小时X分钟后' 这种格式。"""
|
||||
if match := PATTERNS["DELTA"].match(self.content_to_parse):
|
||||
logger.debug("⏳ 匹配到相对时间模式 (DELTA)。")
|
||||
try:
|
||||
delta_parts = {
|
||||
"days": parse_number(match.group("days") or "0"),
|
||||
"hours": parse_number(match.group("hours") or "0"),
|
||||
"minutes": parse_number(match.group("minutes") or "0"),
|
||||
"seconds": parse_number(match.group("seconds") or "0"),
|
||||
}
|
||||
|
||||
# 检查是否有无效的数字解析
|
||||
if any(v < 0 for v in delta_parts.values()):
|
||||
logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}")
|
||||
return None
|
||||
|
||||
delta = datetime.timedelta(**delta_parts)
|
||||
if delta.total_seconds() == 0:
|
||||
logger.debug("❌ 解析出的时间增量为0。")
|
||||
return None
|
||||
|
||||
target_time = self.now + delta
|
||||
logger.debug(f"✅ 相对时间解析成功 -> {target_time}")
|
||||
return target_time
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True)
|
||||
return None
|
||||
return None
|
||||
|
||||
def _parse_absolute_time(self) -> Optional[datetime.datetime]:
|
||||
"""解析一个指定的日期和时间。"""
|
||||
logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}")
|
||||
|
||||
# 定义解析步骤和顺序
|
||||
# (pattern_key, handler_method)
|
||||
parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [
|
||||
("TOMORROW", self._handle_tomorrow),
|
||||
("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow),
|
||||
("TODAY", self._handle_today),
|
||||
("YEAR", self._handle_year),
|
||||
("MONTH", self._handle_month),
|
||||
("DAY", self._handle_day),
|
||||
("HMS_COLON", self._handle_hms_colon),
|
||||
("PM", self._handle_pm),
|
||||
("HOUR", self._handle_hour),
|
||||
("MINUTE", self._handle_minute),
|
||||
("SECOND", self._handle_second),
|
||||
]
|
||||
|
||||
for key, handler in parsing_steps:
|
||||
if match := PATTERNS[key].search(self.content_to_parse):
|
||||
if not handler(match):
|
||||
# 如果任何一个处理器返回False,说明解析失败
|
||||
return None
|
||||
|
||||
# 移除无意义的上午关键词
|
||||
self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "")
|
||||
|
||||
# 如果解析后还有剩余字符,说明有无法识别的部分
|
||||
if self.content_to_parse.strip():
|
||||
logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'")
|
||||
return None
|
||||
|
||||
# 最终调整和检查
|
||||
return self._finalize_datetime()
|
||||
|
||||
# --- Handler Methods for Absolute Time Parsing ---
|
||||
|
||||
def _handle_tomorrow(self, match: re.Match) -> bool:
|
||||
self.t += datetime.timedelta(days=1)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_day_after_tomorrow(self, match: re.Match) -> bool:
|
||||
self.t += datetime.timedelta(days=2)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_today(self, match: re.Match) -> bool:
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_year(self, match: re.Match) -> bool:
|
||||
year = parse_number(match.group(1))
|
||||
if year < 0: return False
|
||||
if year < 100: year += 2000 # 处理 "25年" -> 2025
|
||||
if year < self.now.year:
|
||||
logger.debug(f"❌ 指定的年份 {year} 已过去。")
|
||||
return False
|
||||
self.t = self.t.replace(year=year)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_month(self, match: re.Match) -> bool:
|
||||
month = parse_number(match.group(1))
|
||||
if not (1 <= month <= 12):
|
||||
logger.debug(f"❌ 无效的月份: {month}")
|
||||
return False
|
||||
|
||||
# 如果设置的月份在当前月份之前,且没有指定年份,则年份加一
|
||||
if month < self.t.month and not self.is_date_specified:
|
||||
self.t = self.t.replace(year=self.t.year + 1)
|
||||
logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}")
|
||||
|
||||
self.t = self.t.replace(month=month)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_day(self, match: re.Match) -> bool:
|
||||
day = parse_number(match.group(1))
|
||||
if not (1 <= day <= 31):
|
||||
logger.debug(f"❌ 无效的日期: {day}")
|
||||
return False
|
||||
|
||||
try:
|
||||
# 如果日期小于当前日期,且只指定了日,则月份加一
|
||||
if day < self.t.day and not self.is_date_specified:
|
||||
if self.t.month == 12:
|
||||
self.t = self.t.replace(year=self.t.year + 1, month=1)
|
||||
else:
|
||||
self.t = self.t.replace(month=self.t.month + 1)
|
||||
logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}")
|
||||
|
||||
self.t = self.t.replace(day=day)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
except ValueError:
|
||||
logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。")
|
||||
return False
|
||||
|
||||
def _handle_hms_colon(self, match: re.Match) -> bool:
|
||||
h = int(match.group(1))
|
||||
m = int(match.group(2))
|
||||
s_str = match.group(4) # group(3) is with colon, group(4) is the number
|
||||
s = int(s_str) if s_str else 0
|
||||
if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59):
|
||||
logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}")
|
||||
return False
|
||||
self.t = self.t.replace(hour=h, minute=m, second=s)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_pm(self, match: re.Match) -> bool:
|
||||
self.is_pm_specified = True
|
||||
logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_hour(self, match: re.Match) -> bool:
|
||||
hour = parse_number(match.group(1))
|
||||
has_half = match.group(2) == '半'
|
||||
if not (0 <= hour <= 23):
|
||||
logger.debug(f"❌ 无效的小时: {hour}")
|
||||
return False
|
||||
minute = 30 if has_half else self.t.minute
|
||||
self.t = self.t.replace(hour=hour, minute=minute)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_minute(self, match: re.Match) -> bool:
|
||||
minute = parse_number(match.group(1))
|
||||
if not (0 <= minute <= 59):
|
||||
logger.debug(f"❌ 无效的分钟: {minute}")
|
||||
return False
|
||||
self.t = self.t.replace(minute=minute)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_second(self, match: re.Match) -> bool:
|
||||
second = parse_number(match.group(1))
|
||||
if not (0 <= second <= 59):
|
||||
logger.debug(f"❌ 无效的秒: {second}")
|
||||
return False
|
||||
self.t = self.t.replace(second=second)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _finalize_datetime(self) -> Optional[datetime.datetime]:
|
||||
"""对解析出的时间进行最后的调整和检查。"""
|
||||
# 处理下午/晚上
|
||||
if self.is_pm_specified and self.t.hour < 12:
|
||||
self.t = self.t.replace(hour=self.t.hour + 12)
|
||||
logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}")
|
||||
|
||||
# 如果没有指定任何时间或日期部分,则认为解析无效
|
||||
if not self.is_date_specified and not self.is_time_specified:
|
||||
logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。")
|
||||
return None
|
||||
|
||||
# 如果最终计算出的时间点在当前时间之前,自动往后推
|
||||
# 例如:现在是 15:00,说 "14点",应该是指明天的14点
|
||||
if self.t < self.now:
|
||||
# 只有在明确指定了时间的情况下,才自动加一天
|
||||
# 如果只指定了一个过去的日期(如“去年5月1号”),则不应该调整
|
||||
if self.is_time_specified:
|
||||
self.t += datetime.timedelta(days=1)
|
||||
logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}")
|
||||
|
||||
logger.debug(f"✅ 解析成功,最终时间: {self.t}")
|
||||
return self.t
|
||||
|
||||
# --- 公共接口 (Public Interface) ---
|
||||
|
||||
def get_target_time(content: str) -> Optional[datetime.datetime]:
|
||||
"""
|
||||
高级接口,用于将自然语言时间描述转换为 datetime 对象。
|
||||
|
||||
Args:
|
||||
content: 包含时间信息的字符串。
|
||||
|
||||
Returns:
|
||||
一个 datetime 对象,如果解析失败则返回 None。
|
||||
"""
|
||||
parser = TimeParser(content)
|
||||
return parser.parse()
|
||||
@ -1,9 +1,9 @@
|
||||
import os
|
||||
import tempfile
|
||||
from io import BytesIO
|
||||
from typing import Optional
|
||||
|
||||
from PIL import Image, ImageSequence
|
||||
from PIL import Image
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot.adapters import Bot as BaseBot
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
@ -12,6 +12,9 @@ from nonebot_plugin_alconna import (
|
||||
UniMessage,
|
||||
on_alconna,
|
||||
)
|
||||
from returns.result import Failure, Success
|
||||
|
||||
from konabot.common.nb.extract_image import extract_image_from_message
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="ytpgif",
|
||||
@ -60,7 +63,7 @@ async def get_image_url(event: BaseEvent) -> Optional[str]:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
return str(seg.data["url"])
|
||||
|
||||
if hasattr(event, "reply") and (reply := event.reply):
|
||||
if hasattr(event, "reply") and (reply := getattr(event, "reply")):
|
||||
reply_msg = reply.message
|
||||
for seg in reply_msg:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
@ -89,7 +92,7 @@ def resize_frame(frame: Image.Image) -> Image.Image:
|
||||
|
||||
|
||||
@ytpgif_cmd.handle()
|
||||
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
|
||||
async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
|
||||
# === 校验 speed 范围 ===
|
||||
if not (MIN_SPEED <= speed <= MAX_SPEED):
|
||||
await ytpgif_cmd.send(
|
||||
@ -97,172 +100,148 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
|
||||
)
|
||||
return
|
||||
|
||||
img_url = await get_image_url(event)
|
||||
if not img_url:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text(
|
||||
"请发送一张图片或回复一张图片来生成镜像动图。"
|
||||
).export()
|
||||
)
|
||||
return
|
||||
match await extract_image_from_message(event.get_message(), event, bot):
|
||||
case Success(img):
|
||||
src_img = img
|
||||
|
||||
case Failure(msg):
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text(msg).export()
|
||||
)
|
||||
return
|
||||
|
||||
case _:
|
||||
return
|
||||
|
||||
try:
|
||||
image_data = await download_image(img_url)
|
||||
except Exception as e:
|
||||
print(f"[YTPGIF] 下载失败: {e}")
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("❌ 图片下载失败,请重试。").export()
|
||||
)
|
||||
return
|
||||
try:
|
||||
n_frames = getattr(src_img, "n_frames", 1)
|
||||
is_animated = n_frames > 1
|
||||
except Exception:
|
||||
is_animated = False
|
||||
n_frames = 1
|
||||
|
||||
input_path = output_path = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
|
||||
tmp_in.write(image_data)
|
||||
input_path = tmp_in.name
|
||||
output_frames = []
|
||||
output_durations_ms = []
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out:
|
||||
output_path = tmp_out.name
|
||||
if is_animated:
|
||||
# === 动图模式:截取正向 + 镜像两段 ===
|
||||
frames_with_duration = []
|
||||
palette = src_img.getpalette()
|
||||
|
||||
with Image.open(input_path) as src_img:
|
||||
# === 判断是否为动图 ===
|
||||
try:
|
||||
n_frames = getattr(src_img, "n_frames", 1)
|
||||
is_animated = n_frames > 1
|
||||
except Exception:
|
||||
is_animated = False
|
||||
for idx in range(n_frames):
|
||||
src_img.seek(idx)
|
||||
frame = src_img.copy()
|
||||
# 检查是否需要透明通道
|
||||
has_alpha = (
|
||||
frame.mode in ("RGBA", "LA")
|
||||
or (frame.mode == "P" and "transparency" in frame.info)
|
||||
)
|
||||
if has_alpha:
|
||||
frame = frame.convert("RGBA")
|
||||
else:
|
||||
frame = frame.convert("RGB")
|
||||
resized_frame = resize_frame(frame)
|
||||
|
||||
output_frames = []
|
||||
output_durations_ms = []
|
||||
# 若原图有调色板,尝试保留(可选)
|
||||
if palette and resized_frame.mode == "P":
|
||||
try:
|
||||
resized_frame.putpalette(palette)
|
||||
except Exception: # noqa
|
||||
pass
|
||||
|
||||
if is_animated:
|
||||
# === 动图模式:截取正向 + 镜像两段 ===
|
||||
frames_with_duration = []
|
||||
palette = src_img.getpalette()
|
||||
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
|
||||
dur_sec = max(0.01, ms / 1000.0)
|
||||
frames_with_duration.append((resized_frame, dur_sec))
|
||||
|
||||
for idx in range(n_frames):
|
||||
src_img.seek(idx)
|
||||
frame = src_img.copy()
|
||||
# 检查是否需要透明通道
|
||||
has_alpha = (
|
||||
frame.mode in ("RGBA", "LA")
|
||||
or (frame.mode == "P" and "transparency" in frame.info)
|
||||
)
|
||||
if has_alpha:
|
||||
frame = frame.convert("RGBA")
|
||||
else:
|
||||
frame = frame.convert("RGB")
|
||||
resized_frame = resize_frame(frame)
|
||||
max_dur = BASE_SEGMENT_DURATION * speed
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
|
||||
# 若原图有调色板,尝试保留(可选)
|
||||
if palette and resized_frame.mode == "P":
|
||||
try:
|
||||
resized_frame.putpalette(palette)
|
||||
except Exception: # noqa
|
||||
pass
|
||||
# 正向段
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
output_frames.append(img)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
|
||||
dur_sec = max(0.01, ms / 1000.0)
|
||||
frames_with_duration.append((resized_frame, dur_sec))
|
||||
|
||||
max_dur = BASE_SEGMENT_DURATION * speed
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
|
||||
# 正向段
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
output_frames.append(img)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
if frame_count == 0:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
|
||||
)
|
||||
return
|
||||
|
||||
# 镜像段(从头开始)
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
|
||||
output_frames.append(flipped)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
else:
|
||||
# === 静态图模式:制作翻转动画 ===
|
||||
raw_frame = src_img.convert("RGBA")
|
||||
resized_frame = resize_frame(raw_frame)
|
||||
|
||||
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
|
||||
duration_ms = int(interval_sec * 1000)
|
||||
|
||||
frame1 = resized_frame
|
||||
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
|
||||
|
||||
output_frames = [frame1, frame2]
|
||||
output_durations_ms = [duration_ms, duration_ms]
|
||||
|
||||
if len(output_frames) < 1:
|
||||
if frame_count == 0:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("未能生成任何帧。").export()
|
||||
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
|
||||
)
|
||||
return
|
||||
|
||||
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
|
||||
need_transparency = False
|
||||
for frame in output_frames:
|
||||
if frame.mode == "RGBA":
|
||||
alpha_channel = frame.getchannel("A")
|
||||
if any(pix < 255 for pix in alpha_channel.getdata()):
|
||||
need_transparency = True
|
||||
break
|
||||
elif frame.mode == "P" and "transparency" in frame.info:
|
||||
# 镜像段(从头开始)
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
flipped = img.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
|
||||
output_frames.append(flipped)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
else:
|
||||
# === 静态图模式:制作翻转动画 ===
|
||||
raw_frame = src_img.convert("RGBA")
|
||||
resized_frame = resize_frame(raw_frame)
|
||||
|
||||
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
|
||||
duration_ms = int(interval_sec * 1000)
|
||||
|
||||
frame1 = resized_frame
|
||||
frame2 = resized_frame.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
|
||||
|
||||
output_frames = [frame1, frame2]
|
||||
output_durations_ms = [duration_ms, duration_ms]
|
||||
|
||||
if len(output_frames) < 1:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("未能生成任何帧。").export()
|
||||
)
|
||||
return
|
||||
|
||||
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
|
||||
need_transparency = False
|
||||
for frame in output_frames:
|
||||
if frame.mode == "RGBA":
|
||||
alpha_channel = frame.getchannel("A")
|
||||
if any(pix < 255 for pix in alpha_channel.getdata()):
|
||||
need_transparency = True
|
||||
break
|
||||
elif frame.mode == "P" and "transparency" in frame.info:
|
||||
need_transparency = True
|
||||
break
|
||||
|
||||
# 如果不需要透明,则统一转为 RGB 避免调色板污染
|
||||
if not need_transparency:
|
||||
output_frames = [f.convert("RGB") for f in output_frames]
|
||||
# 如果不需要透明,则统一转为 RGB 避免调色板污染
|
||||
if not need_transparency:
|
||||
output_frames = [f.convert("RGB") for f in output_frames]
|
||||
|
||||
# 构建保存参数
|
||||
save_kwargs = {
|
||||
"save_all": True,
|
||||
"append_images": output_frames[1:],
|
||||
"format": "GIF",
|
||||
"loop": 0, # 无限循环
|
||||
"duration": output_durations_ms,
|
||||
"disposal": 2, # 清除到背景色,避免残留
|
||||
"optimize": False, # 关闭抖动(等效 -dither none)
|
||||
}
|
||||
# 构建保存参数
|
||||
save_kwargs = {
|
||||
"save_all": True,
|
||||
"append_images": output_frames[1:],
|
||||
"format": "GIF",
|
||||
"loop": 0, # 无限循环
|
||||
"duration": output_durations_ms,
|
||||
"disposal": 2, # 清除到背景色,避免残留
|
||||
"optimize": False, # 关闭抖动(等效 -dither none)
|
||||
}
|
||||
|
||||
# 只有真正需要透明时才启用 transparency
|
||||
if need_transparency:
|
||||
save_kwargs["transparency"] = 0
|
||||
# 只有真正需要透明时才启用 transparency
|
||||
if need_transparency:
|
||||
save_kwargs["transparency"] = 0
|
||||
|
||||
output_frames[0].save(output_path, **save_kwargs)
|
||||
|
||||
# 发送结果
|
||||
with open(output_path, "rb") as f:
|
||||
result_image = UniMessage.image(raw=f.read())
|
||||
bio = BytesIO()
|
||||
output_frames[0].save(bio, **save_kwargs)
|
||||
result_image = UniMessage.image(raw=bio)
|
||||
await ytpgif_cmd.send(await result_image.export())
|
||||
|
||||
except Exception as e:
|
||||
print(f"[YTPGIF] 处理失败: {e}")
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
|
||||
)
|
||||
finally:
|
||||
for path in filter(None, [input_path, output_path]):
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
os.unlink(path)
|
||||
except: # noqa
|
||||
pass
|
||||
)
|
||||
19
poetry.lock
generated
19
poetry.lock
generated
@ -2062,6 +2062,23 @@ files = [
|
||||
{file = "propcache-0.3.2.tar.gz", hash = "sha256:20d7d62e4e7ef05f221e0db2856b979540686342e7dd9973b815599c7057e168"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ptimeparse"
|
||||
version = "0.1.2"
|
||||
description = "一个用于解析中文的时间表达的库"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "ptimeparse-0.1.2-py3-none-any.whl", hash = "sha256:0eea791396e53b63330fadb40d9f0a2e6272bd5467246f10d1d6971bc606edff"},
|
||||
{file = "ptimeparse-0.1.2.tar.gz", hash = "sha256:658be90a3cc2994c09c4ea2f276d257e7eb84bc330be79950baefe32b19779a2"},
|
||||
]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple"
|
||||
reference = "pt-gitea-pypi"
|
||||
|
||||
[[package]]
|
||||
name = "pycares"
|
||||
version = "4.11.0"
|
||||
@ -3181,4 +3198,4 @@ type = ["pytest-mypy"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.12,<4.0"
|
||||
content-hash = "927913b9030d1f6c126bb2d12eab7307dc6297f259c7c62e3033706457d27ce0"
|
||||
content-hash = "b4c3d28f7572c57e867d126ce0c64787ae608b114e66b8de06147caf13e049dd"
|
||||
|
||||
@ -22,9 +22,17 @@ dependencies = [
|
||||
"imagetext-py (>=2.2.0,<3.0.0)",
|
||||
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
|
||||
"returns (>=0.26.0,<0.27.0)",
|
||||
"ptimeparse (>=0.1.1,<0.2.0)",
|
||||
]
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=2.0.0,<3.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[[tool.poetry.source]]
|
||||
name = "pt-gitea-pypi"
|
||||
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
|
||||
priority = "supplemental"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
ptimeparse = {source = "pt-gitea-pypi"}
|
||||
|
||||
@ -1,84 +0,0 @@
|
||||
aio-mc-rcon==3.4.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
aiodns==3.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
aiohappyeyeballs==2.6.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
aiohttp==3.12.15 ; python_version >= "3.12" and python_version < "4.0"
|
||||
aiosignal==1.4.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
annotated-types==0.7.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
anyio==4.11.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
apscheduler==3.11.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
arclet-alconna-tools==0.7.11 ; python_version >= "3.12" and python_version < "4.0"
|
||||
arclet-alconna==1.8.40 ; python_version >= "3.12" and python_version < "4.0"
|
||||
attrs==25.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
beautifulsoup4==4.13.5 ; python_version >= "3.12" and python_version < "4.0"
|
||||
brotli==1.1.0 ; python_version >= "3.12" and python_version < "4.0" and platform_python_implementation == "CPython"
|
||||
brotlicffi==1.1.0.0 ; python_version >= "3.12" and python_version < "4.0" and platform_python_implementation != "CPython"
|
||||
certifi==2025.8.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
cffi==2.0.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
charset-normalizer==3.4.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
click==8.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
colorama==0.4.6 ; python_version >= "3.12" and python_version < "4.0" and (sys_platform == "win32" or platform_system == "Windows")
|
||||
exceptiongroup==1.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
fastapi==0.117.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
frozenlist==1.7.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
h11==0.16.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
h2==4.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
hpack==4.1.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
httpcore==1.0.9 ; python_version >= "3.12" and python_version < "4.0"
|
||||
httptools==0.6.4 ; python_version >= "3.12" and python_version < "4.0"
|
||||
httpx==0.28.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
hyperframe==6.1.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
idna==3.10 ; python_version >= "3.12" and python_version < "4.0"
|
||||
imagetext-py==2.2.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
importlib-metadata==8.7.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
linkify-it-py==2.0.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
loguru==0.7.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
lxml==6.0.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
markdown-it-py==4.0.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
mdit-py-plugins==0.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
mdurl==0.1.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
msgpack==1.1.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
multidict==6.6.4 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nepattern==0.7.7 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-adapter-console==0.9.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-adapter-discord==0.1.8 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-adapter-minecraft==1.5.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-adapter-onebot==2.4.6 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-plugin-alconna==0.59.4 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-plugin-apscheduler==0.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot-plugin-waiter==0.8.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonebot2==2.4.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
nonechat==0.6.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
numpy==2.2.6 ; python_version >= "3.12" and python_version < "4.0"
|
||||
opencv-python-headless==4.12.0.88 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pillow==11.3.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
platformdirs==4.4.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
propcache==0.3.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pycares==4.11.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pycparser==2.23 ; python_version >= "3.12" and python_version < "4.0" and implementation_name != "PyPy"
|
||||
pydantic-core==2.33.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pydantic==2.11.9 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pygments==2.19.2 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pygtrie==2.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
python-dotenv==1.1.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
pyyaml==6.0.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
requests==2.32.5 ; python_version >= "3.12" and python_version < "4.0"
|
||||
returns==0.26.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
rich==14.1.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
sniffio==1.3.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
soupsieve==2.8 ; python_version >= "3.12" and python_version < "4.0"
|
||||
starlette==0.48.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
tarina==0.6.8 ; python_version >= "3.12" and python_version < "4.0"
|
||||
textual==3.7.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
typing-extensions==4.15.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
typing-inspection==0.4.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
tzdata==2025.2 ; python_version >= "3.12" and python_version < "4.0" and platform_system == "Windows"
|
||||
tzlocal==5.3.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
uc-micro-py==1.0.3 ; python_version >= "3.12" and python_version < "4.0"
|
||||
urllib3==2.5.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
uvicorn==0.37.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
uvloop==0.21.0 ; python_version >= "3.12" and python_version < "4.0" and sys_platform != "win32" and sys_platform != "cygwin" and platform_python_implementation != "PyPy"
|
||||
watchfiles==1.1.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
websockets==15.0.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
win32-setctime==1.2.0 ; python_version >= "3.12" and python_version < "4.0" and sys_platform == "win32"
|
||||
yarl==1.20.1 ; python_version >= "3.12" and python_version < "4.0"
|
||||
zipp==3.23.0 ; python_version >= "3.12" and python_version < "4.0"
|
||||
Reference in New Issue
Block a user