Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8edb999050 | |||
| 109a81923f | |||
| 91687fb8c3 | |||
| f889381cce | |||
| 1256055c9d | |||
| 40f35a474e | |||
| 6b01acfa8c | |||
| 09c9d44798 | |||
| 0c4206f461 | |||
| 9fb8fd90dc | |||
| 8c4fa2b5e4 | |||
| fb2c3f1ce2 | |||
| 265415e727 | |||
| 06555b2225 |
BIN
assets/fonts/LXGWWenKai-Regular.ttf
Normal file
BIN
assets/fonts/LXGWWenKai-Regular.ttf
Normal file
Binary file not shown.
BIN
assets/img/meme/caoimg1.png
Normal file
BIN
assets/img/meme/caoimg1.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 227 KiB |
BIN
assets/img/meme/dss.png
Normal file
BIN
assets/img/meme/dss.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 172 KiB |
BIN
assets/img/meme/mnksay.jpg
Normal file
BIN
assets/img/meme/mnksay.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 69 KiB |
BIN
assets/img/meme/suanleba.png
Normal file
BIN
assets/img/meme/suanleba.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 364 KiB |
BIN
assets/img/meme/tententen.png
Normal file
BIN
assets/img/meme/tententen.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 614 KiB |
135
konabot/common/nb/extract_image.py
Normal file
135
konabot/common/nb/extract_image.py
Normal file
@ -0,0 +1,135 @@
|
||||
from io import BytesIO
|
||||
|
||||
import httpx
|
||||
import PIL.Image
|
||||
from loguru import logger
|
||||
from nonebot.adapters import Bot, Event, Message
|
||||
from nonebot.adapters.discord import Bot as DiscordBot
|
||||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
|
||||
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||
from PIL import UnidentifiedImageError
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
|
||||
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||
# if "/matcha/cache/" in url:
|
||||
# url = url.replace('127.0.0.1', '10.126.126.101')
|
||||
logger.debug(f"开始从 {url} 下载图片")
|
||||
async with httpx.AsyncClient() as c:
|
||||
try:
|
||||
response = await c.get(url)
|
||||
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
|
||||
return Failure(f"HTTPX 模块下载图片时出错:{e}")
|
||||
except httpx.ConnectTimeout:
|
||||
return Failure("下载图片失败了,网络超时了qwq")
|
||||
if response.status_code != 200:
|
||||
return Failure("无法下载图片,可能存在网络问题需要排查")
|
||||
return Success(response.content)
|
||||
|
||||
|
||||
def bytes_to_pil(raw_data: bytes | BytesIO) -> Result[PIL.Image.Image, str]:
|
||||
try:
|
||||
if not isinstance(raw_data, BytesIO):
|
||||
img_pil = PIL.Image.open(BytesIO(raw_data))
|
||||
else:
|
||||
img_pil = PIL.Image.open(raw_data)
|
||||
img_pil.verify()
|
||||
if not isinstance(raw_data, BytesIO):
|
||||
img = PIL.Image.open(BytesIO(raw_data))
|
||||
else:
|
||||
raw_data.seek(0)
|
||||
img = PIL.Image.open(raw_data)
|
||||
return Success(img)
|
||||
except UnidentifiedImageError:
|
||||
return Failure("图像无法读取,可能是格式不支持orz")
|
||||
except IOError:
|
||||
return Failure("图像无法读取,可能是网络存在问题orz")
|
||||
|
||||
|
||||
async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]:
|
||||
if image.url is not None:
|
||||
raw_result = await download_image_bytes(image.url)
|
||||
elif image.raw is not None:
|
||||
raw_result = Success(image.raw)
|
||||
else:
|
||||
return Failure("由于一些内部问题,下载图片失败了orz")
|
||||
|
||||
return raw_result.bind(bytes_to_pil)
|
||||
|
||||
|
||||
async def extract_image_from_qq_message(
|
||||
msg: OnebotV11Message,
|
||||
evt: OnebotV11MessageEvent,
|
||||
bot: OnebotV11Bot,
|
||||
allow_reply: bool = True,
|
||||
) -> Result[PIL.Image.Image, str]:
|
||||
if allow_reply and (reply := evt.reply) is not None:
|
||||
return await extract_image_from_qq_message(
|
||||
reply.message,
|
||||
evt,
|
||||
bot,
|
||||
False,
|
||||
)
|
||||
for seg in msg:
|
||||
if seg.type == "reply" and allow_reply:
|
||||
msgid = seg.data.get("id")
|
||||
if msgid is None:
|
||||
return Failure("消息可能太久远,无法读取到消息原文")
|
||||
try:
|
||||
msg2 = await bot.get_msg(message_id=msgid)
|
||||
except Exception as e:
|
||||
logger.warning(f"获取消息内容时出错:{e}")
|
||||
return Failure("消息可能太久远,无法读取到消息原文")
|
||||
msg2_data = msg2.get("message")
|
||||
if msg2_data is None:
|
||||
return Failure("消息可能太久远,无法读取到消息原文")
|
||||
logger.debug("发现消息引用,递归一层")
|
||||
return await extract_image_from_qq_message(
|
||||
msg=OnebotV11Message(msg2_data),
|
||||
evt=evt,
|
||||
bot=bot,
|
||||
allow_reply=False,
|
||||
)
|
||||
if seg.type == "image":
|
||||
url = seg.data.get("url")
|
||||
if url is None:
|
||||
return Failure("无法下载图片,可能有一些网络问题")
|
||||
data = await download_image_bytes(url)
|
||||
return data.bind(bytes_to_pil)
|
||||
|
||||
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||
|
||||
|
||||
async def extract_image_from_message(
|
||||
msg: Message,
|
||||
evt: Event,
|
||||
bot: Bot,
|
||||
allow_reply: bool = True,
|
||||
) -> Result[PIL.Image.Image, str]:
|
||||
if (
|
||||
isinstance(bot, OnebotV11Bot)
|
||||
and isinstance(msg, OnebotV11Message)
|
||||
and isinstance(evt, OnebotV11MessageEvent)
|
||||
):
|
||||
# 看起来 UniMessage 在这方面能力似乎不足,因此用 QQ 的
|
||||
logger.debug('获取图片的路径 Fallback 到 QQ 模块')
|
||||
return await extract_image_from_qq_message(msg, evt, bot, allow_reply)
|
||||
|
||||
for seg in UniMessage.of(msg, bot):
|
||||
logger.info(seg)
|
||||
if isinstance(seg, Image):
|
||||
return await unimsg_img_to_pil(seg)
|
||||
elif isinstance(seg, Reply) and allow_reply:
|
||||
msg2 = seg.msg
|
||||
logger.debug(f"深入搜索引用的消息:{msg2}")
|
||||
if msg2 is None or isinstance(msg2, str):
|
||||
continue
|
||||
return await extract_image_from_message(msg2, evt, bot, False)
|
||||
elif isinstance(seg, RefNode) and allow_reply:
|
||||
if isinstance(bot, DiscordBot):
|
||||
return Failure("暂时不支持在 Discord 中通过引用的方式获取图片")
|
||||
else:
|
||||
return Failure("暂时不支持在这里中通过引用的方式获取图片")
|
||||
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||
34
konabot/common/nb/is_admin.py
Normal file
34
konabot/common/nb/is_admin.py
Normal file
@ -0,0 +1,34 @@
|
||||
from nonebot import get_plugin_config
|
||||
import nonebot
|
||||
import nonebot.adapters
|
||||
import nonebot.adapters.console
|
||||
import nonebot.adapters.discord
|
||||
import nonebot.adapters.onebot
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class IsAdminConfig(BaseModel):
|
||||
admin_qq_group: list[int] = []
|
||||
admin_qq_account: list[int] = []
|
||||
admin_discord_channel: list[int] = []
|
||||
admin_discord_account: list[int] = []
|
||||
|
||||
cfg = get_plugin_config(IsAdminConfig)
|
||||
|
||||
|
||||
def is_admin(event: nonebot.adapters.Event):
|
||||
if isinstance(event, nonebot.adapters.onebot.v11.MessageEvent):
|
||||
if event.user_id in cfg.admin_qq_account:
|
||||
return True
|
||||
if isinstance(event, nonebot.adapters.onebot.v11.GroupMessageEvent):
|
||||
if event.group_id in cfg.admin_qq_group:
|
||||
return True
|
||||
if isinstance(event, nonebot.adapters.discord.event.MessageEvent):
|
||||
if event.channel_id in cfg.admin_discord_channel:
|
||||
return True
|
||||
if event.user_id in cfg.admin_discord_account:
|
||||
return True
|
||||
if isinstance(event, nonebot.adapters.console.event.Event):
|
||||
return True
|
||||
|
||||
return False
|
||||
@ -2,3 +2,11 @@ from pathlib import Path
|
||||
|
||||
ASSETS_PATH = Path(__file__).resolve().parent.parent.parent / "assets"
|
||||
FONTS_PATH = ASSETS_PATH / "fonts"
|
||||
|
||||
SRC_PATH = Path(__file__).resolve().parent.parent
|
||||
|
||||
DOCS_PATH = SRC_PATH / "docs"
|
||||
DOCS_PATH_MAN1 = DOCS_PATH / "user"
|
||||
DOCS_PATH_MAN3 = DOCS_PATH / "lib"
|
||||
DOCS_PATH_MAN7 = DOCS_PATH / "concepts"
|
||||
DOCS_PATH_MAN8 = DOCS_PATH / "sys"
|
||||
|
||||
40
konabot/docs/README.md
Normal file
40
konabot/docs/README.md
Normal file
@ -0,0 +1,40 @@
|
||||
# 此方 Bot 的文档系统
|
||||
|
||||
此方 Bot 使用类 Linux 的 `man` 指令来管理文档。文档一般建议使用纯文本书写,带有相对良好的格式。
|
||||
|
||||
## 文件夹摆放规则
|
||||
|
||||
`docs` 目录下,有若干文档可以拿来阅读和输出。每个子文件夹里,文档文件使用名字不含空格的 txt 文件书写,其他后缀名的文件将会被忽略。所以,如果你希望有些文件只在代码库中可阅读,你可以使用 `.md` 格式。
|
||||
|
||||
### 1 - user
|
||||
|
||||
`docs/user` 目录下的文档是直接会给用户进行检索的文档,在直接使用 `man` 指令时,会搜索该文件夹的全部文件,以知晓所有有文档的指令。
|
||||
|
||||
### 3 - lib
|
||||
|
||||
`docs/lib` 目录下的文档主要给该项目的维护者进行阅读和使用,讲述的是本项目内置的一些函数的功能讲解(一般以便利为主要目的)以及一些项目安排上的要求。一般不会列举,除非用户指定要求列举该范围。
|
||||
|
||||
### 7 - concepts
|
||||
|
||||
`docs/concepts` 用来摆放任何的概念。任何的。一般不会列举,除非用户指定要求列举该范围。
|
||||
|
||||
### 8 - sys
|
||||
|
||||
`docs/sys` 用于摆放仅 MTTU 群可以使用的文档集合。在 MTTU 群内,该目录下的文档也会被索引,否则文档将不可阅读。
|
||||
|
||||
## 书写规范
|
||||
|
||||
无特殊要求,因为当用户进行 `man` 的时候,会将文档内的内容原封不动地展示出来。但是,你仍然可以模仿 Linux 下的 `man` 指令的格式进行书写。
|
||||
|
||||
```
|
||||
指令介绍
|
||||
man - 用于展示此方 BOT 使用手册的指令
|
||||
|
||||
格式
|
||||
man [文档类型] <指令>
|
||||
|
||||
示例
|
||||
`man` 查看所有有文档的指令清单
|
||||
`man 喵` 查看指令「喵」的使用说明
|
||||
……
|
||||
```
|
||||
0
konabot/docs/concepts/占位.md
Normal file
0
konabot/docs/concepts/占位.md
Normal file
45
konabot/docs/lib/is_admin.txt
Normal file
45
konabot/docs/lib/is_admin.txt
Normal file
@ -0,0 +1,45 @@
|
||||
指令介绍
|
||||
is_admin - 用于判断当前事件是否来自管理员的内部权限校验函数
|
||||
|
||||
格式
|
||||
from konabot.common.nb.is_admin import is_admin
|
||||
from nonebot import on
|
||||
from nonebot.adapters import Event
|
||||
from loguru import logger
|
||||
|
||||
@on().handle()
|
||||
async def _(event: Event):
|
||||
if is_admin(event):
|
||||
logger.info("管理员发送了消息")
|
||||
|
||||
说明
|
||||
is_admin 是 Bot 内部用于权限控制的核心函数,根据事件来源(QQ、Discord、控制台)及插件配置,判断触发事件的用户或群组是否具有管理员权限。
|
||||
|
||||
支持的适配器与判定逻辑:
|
||||
• OneBot V11(QQ)
|
||||
- 若用户 ID 在配置项 admin_qq_account 中,则视为管理员
|
||||
- 若为群聊消息,且群 ID 在配置项 admin_qq_group 中,则视为管理员
|
||||
|
||||
• Discord
|
||||
- 若频道 ID 在配置项 admin_discord_channel 中,则视为管理员
|
||||
- 若用户 ID 在配置项 admin_discord_account 中,则视为管理员
|
||||
|
||||
• Console(控制台)
|
||||
- 所有控制台输入均默认视为管理员操作,自动返回 True
|
||||
|
||||
配置项(位于插件配置中)
|
||||
ADMIN_QQ_GROUP: list[int]
|
||||
允许的管理员 QQ 群 ID 列表
|
||||
|
||||
ADMIN_QQ_ACCOUNT: list[int]
|
||||
允许的管理员 QQ 账号 ID 列表
|
||||
|
||||
ADMIN_DISCORD_CHANNEL: list[int]
|
||||
允许的管理员 Discord 频道 ID 列表
|
||||
|
||||
ADMIN_DISCORD_ACCOUNT: list[int]
|
||||
允许的管理员 Discord 用户 ID 列表
|
||||
|
||||
注意事项
|
||||
- 若未在配置文件中设置任何管理员 ID,该函数对所有非控制台事件返回 False
|
||||
- 控制台事件始终拥有管理员权限,便于本地调试与运维
|
||||
0
konabot/docs/lib/占位.md
Normal file
0
konabot/docs/lib/占位.md
Normal file
1
konabot/docs/sys/out.txt
Normal file
1
konabot/docs/sys/out.txt
Normal file
@ -0,0 +1 @@
|
||||
MAN what can I say!
|
||||
20
konabot/docs/user/man.txt
Normal file
20
konabot/docs/user/man.txt
Normal file
@ -0,0 +1,20 @@
|
||||
指令介绍
|
||||
man - 用于展示此方 BOT 使用手册的指令
|
||||
|
||||
格式
|
||||
man 文档类型
|
||||
man [文档类型] <指令>
|
||||
|
||||
示例
|
||||
`man` 查看所有有文档的指令清单
|
||||
`man 3` 列举所有可读文档的库函数清单
|
||||
`man 喵` 查看指令「喵」的使用说明
|
||||
`man 8 out` 查看管理员指令「out」的使用说明
|
||||
|
||||
文档类型
|
||||
文档类型用来区分同一指令在不同场景下的情景。你可以使用数字编号进行筛选。分为这些种类:
|
||||
|
||||
- 1 用户态指令,用于日常使用的指令
|
||||
- 3 库函数指令,用于 Bot 开发用的函数查询
|
||||
- 7 概念指令,用于概念解释
|
||||
- 8 系统指令,仅管理员可用
|
||||
21
konabot/docs/user/openssl.txt
Normal file
21
konabot/docs/user/openssl.txt
Normal file
@ -0,0 +1,21 @@
|
||||
指令介绍
|
||||
openssl - 用于生成指定长度的加密安全随机数据
|
||||
|
||||
格式
|
||||
openssl rand <模式> <字节数>
|
||||
|
||||
示例
|
||||
`openssl rand -hex 16` 生成 16 字节的十六进制随机数
|
||||
`openssl rand -base64 32` 生成 32 字节并以 Base64 编码输出的随机数据
|
||||
|
||||
说明
|
||||
该指令使用 Python 的 secrets 模块生成加密安全的随机字节,并支持以十六进制(-hex)或 Base64(-base64)格式输出。
|
||||
|
||||
参数说明
|
||||
模式(mode)
|
||||
- -hex :以十六进制字符串形式输出随机数据
|
||||
- -base64 :以 Base64 编码字符串形式输出随机数据
|
||||
|
||||
字节数(num)
|
||||
- 必须为正整数
|
||||
- 最大支持 256 字节
|
||||
41
konabot/docs/user/ytpgif.txt
Normal file
41
konabot/docs/user/ytpgif.txt
Normal file
@ -0,0 +1,41 @@
|
||||
指令介绍
|
||||
ytpgif - 生成来回镜像翻转的仿 YTPMV 动图
|
||||
|
||||
格式
|
||||
ytpgif [倍速]
|
||||
|
||||
示例
|
||||
`ytpgif`
|
||||
使用默认倍速(1.0)处理你发送或回复的图片,生成镜像动图。
|
||||
|
||||
`ytpgif 2.5`
|
||||
以 2.5 倍速处理图片,生成更快节奏的镜像动图。
|
||||
|
||||
回复一张图片并发送 `ytpgif 0.5`
|
||||
以慢速(0.5 倍)生成镜像动图。
|
||||
|
||||
参数说明
|
||||
倍速(可选)
|
||||
- 类型:浮点数
|
||||
- 默认值:1.0
|
||||
- 有效范围:0.1 ~ 20.0
|
||||
- 作用:
|
||||
• 对于静态图:控制镜像切换的快慢(值越大,切换越快)。
|
||||
• 对于动图:控制截取原始动图正向和反向片段的时长(值越大,截取的片段越长)。
|
||||
|
||||
使用方式
|
||||
发送指令前,请确保:
|
||||
- 直接在消息中附带一张图片,或
|
||||
- 回复一条包含图片的消息后再发送指令。
|
||||
|
||||
插件会自动:
|
||||
- 下载并识别图片(支持静态图和 GIF 动图)
|
||||
- 自动缩放至最大边长不超过 256 像素(保持宽高比)
|
||||
- 静态图 → 生成“原图↔镜像”循环动图
|
||||
- 动图 → 截取开头一段正向播放 + 同一段镜像翻转播放,拼接成新动图
|
||||
- 保留透明通道(如原图含透明),否则转为 RGB 避免颜色异常
|
||||
|
||||
注意事项
|
||||
- 图片过大、格式损坏或网络问题可能导致处理失败。
|
||||
- 动图帧数过多或单帧过短可能无法生成有效输出。
|
||||
- 输出 GIF 最大单段帧数限制为 500 帧,以防资源耗尽。
|
||||
2
konabot/docs/user/喵.txt
Normal file
2
konabot/docs/user/喵.txt
Normal file
@ -0,0 +1,2 @@
|
||||
指令介绍
|
||||
喵 - 你发喵,此方就会回复喵
|
||||
7
konabot/docs/user/摇数字.txt
Normal file
7
konabot/docs/user/摇数字.txt
Normal file
@ -0,0 +1,7 @@
|
||||
指令介绍
|
||||
摇数字 - 生成一个随机数字并发送
|
||||
|
||||
示例
|
||||
`摇数字` 随机生成一个 1-6 的数字
|
||||
|
||||
该指令不接受任何参数,直接调用即可。
|
||||
22
konabot/docs/user/摇骰子.txt
Normal file
22
konabot/docs/user/摇骰子.txt
Normal file
@ -0,0 +1,22 @@
|
||||
指令介绍
|
||||
摇骰子 - 用于生成随机数并以骰子图像形式展示的指令
|
||||
|
||||
格式
|
||||
摇骰子 [最小值] [最大值]
|
||||
|
||||
示例
|
||||
`摇骰子` 随机生成一个 1-6 的数字,并显示对应的骰子图像
|
||||
`摇骰子 10` 生成 1 到 10 之间的随机整数
|
||||
`摇骰子 0.5` 生成 0 到 0.5 之间的随机小数
|
||||
`摇骰子 -5 5` 生成 -5 到 5 之间的随机数
|
||||
|
||||
说明
|
||||
该指令支持以下几种调用方式:
|
||||
- 不带参数:使用默认范围生成随机数
|
||||
- 仅指定一个参数 f1:
|
||||
- 若 f1 > 1,则生成 [1, f1] 范围内的随机数
|
||||
- 若 0 < f1 ≤ 1,则生成 [0, f1] 范围内的随机数
|
||||
- 若 f1 ≤ 0,则生成 [f1, 0] 范围内的随机数
|
||||
- 指定两个参数 f1 和 f2:生成 [f1, f2] 范围内的随机数(顺序无关,内部会自动处理大小)
|
||||
|
||||
返回结果将以骰子样式的图像形式展示生成的随机数值。
|
||||
13
konabot/docs/user/雷达回波.txt
Normal file
13
konabot/docs/user/雷达回波.txt
Normal file
@ -0,0 +1,13 @@
|
||||
指令介绍
|
||||
雷达回波 - 用于获取指定地区的天气雷达回波图像
|
||||
|
||||
格式
|
||||
雷达回波 <地区>
|
||||
|
||||
示例
|
||||
`雷达回波 华南` 获取华南地区的天气雷达回波图
|
||||
`雷达回波 全国` 获取全国的天气雷达回波图
|
||||
|
||||
说明
|
||||
该指令通过查询中国气象局 https://www.nmc.cn/publish/radar/chinaall.html ,获取指定地区的实时天气雷达回波图像。
|
||||
支持的地区有:全国 华北 东北 华东 华中 华南 西南 西北。
|
||||
104
konabot/plugins/man/__init__.py
Normal file
104
konabot/plugins/man/__init__.py
Normal file
@ -0,0 +1,104 @@
|
||||
from curses.ascii import isdigit
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
import nonebot.adapters
|
||||
import nonebot.adapters.discord
|
||||
import nonebot.rule
|
||||
from nonebot import on_command
|
||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||
|
||||
from konabot.common.nb.is_admin import is_admin
|
||||
from konabot.common.path import DOCS_PATH_MAN1, DOCS_PATH_MAN3, DOCS_PATH_MAN7, DOCS_PATH_MAN8
|
||||
|
||||
def search_man(section: int) -> dict[tuple[int, str], Path]:
|
||||
base_path = {
|
||||
1: DOCS_PATH_MAN1,
|
||||
3: DOCS_PATH_MAN3,
|
||||
7: DOCS_PATH_MAN7,
|
||||
8: DOCS_PATH_MAN8,
|
||||
}.get(section, DOCS_PATH_MAN1)
|
||||
|
||||
res: dict[tuple[int, str], Path] = {}
|
||||
for fp in base_path.iterdir():
|
||||
if fp.suffix != '.txt':
|
||||
continue
|
||||
name = fp.name.lower().removesuffix('.txt')
|
||||
res[(section, name)] = fp
|
||||
return res
|
||||
|
||||
|
||||
man = on_alconna(Alconna(
|
||||
'man',
|
||||
Args['section', int | None],
|
||||
Args['doc', str | None],
|
||||
))
|
||||
|
||||
@man.handle()
|
||||
async def _(
|
||||
section: int | None,
|
||||
doc: str | None,
|
||||
event: nonebot.adapters.Event,
|
||||
):
|
||||
if doc is not None and section is None and all(isdigit(c) for c in doc):
|
||||
section = int(doc)
|
||||
doc = None
|
||||
|
||||
if section is not None and section not in {1, 3, 7, 8}:
|
||||
await man.send(
|
||||
UniMessage().text(f"你所指定的文档类型 {section} 不在可用范围内")
|
||||
)
|
||||
return
|
||||
|
||||
if doc is None:
|
||||
# 检索模式
|
||||
if section is None:
|
||||
section_set = {1}
|
||||
else:
|
||||
section_set = {section}
|
||||
if 1 in section_set and is_admin(event):
|
||||
section_set.add(8)
|
||||
mans: list[str] = []
|
||||
for section in section_set:
|
||||
mans += [f"{n}({s})" for s, n in search_man(section).keys()]
|
||||
mans.sort()
|
||||
|
||||
await man.send(UniMessage().text(
|
||||
(
|
||||
"★此方 BOT 使用帮助★\n"
|
||||
"使用 man <指令名> 查询某个指令的名字\n\n"
|
||||
"可供查询的指令清单:"
|
||||
)
|
||||
+ ", ".join(mans)
|
||||
+ "\n\n例如,使用 man man 来查询 man 指令的使用方法"
|
||||
))
|
||||
else:
|
||||
# 查阅模式
|
||||
if section is None:
|
||||
section_set = {1}
|
||||
else:
|
||||
section_set = {section}
|
||||
if 1 in section_set and is_admin(event):
|
||||
section_set.add(8)
|
||||
if 8 in section_set and not is_admin(event):
|
||||
await man.send(UniMessage().text("你没有查看该指令类型的权限"))
|
||||
return
|
||||
mans_dict: dict[tuple[int, str], Path] = {}
|
||||
for section in section_set:
|
||||
mans_dict: dict[tuple[int, str], Path] = {**mans_dict, **search_man(section)}
|
||||
mans_dict_2 = {key[1]: val for key, val in mans_dict.items()}
|
||||
mans_fp = mans_dict_2.get(doc.lower())
|
||||
if mans_fp is None:
|
||||
await man.send(UniMessage().text("你所检索的指令不存在"))
|
||||
return
|
||||
mans_msg = mans_fp.read_text('utf-8', 'replace')
|
||||
if isinstance(event, nonebot.adapters.discord.event.MessageEvent):
|
||||
mans_msg = f'```\n{mans_msg}\n```'
|
||||
await man.send(UniMessage().text(mans_msg))
|
||||
|
||||
|
||||
help_deprecated = on_command('help', rule=nonebot.rule.to_me())
|
||||
|
||||
@help_deprecated.handle()
|
||||
async def _():
|
||||
await help_deprecated.send('你可以使用 man 指令来查询此方 BOT 的帮助')
|
||||
@ -1,10 +1,19 @@
|
||||
from io import BytesIO
|
||||
from typing import Iterable, cast
|
||||
|
||||
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
|
||||
on_alconna)
|
||||
from nonebot import on_message
|
||||
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
|
||||
UniMessage, UniMsg, on_alconna)
|
||||
|
||||
from konabot.plugins.memepack.drawing.geimao import draw_geimao
|
||||
from konabot.plugins.memepack.drawing.pt import draw_pt
|
||||
from konabot.common.nb.extract_image import extract_image_from_message
|
||||
from konabot.plugins.memepack.drawing.display import draw_cao_display
|
||||
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
|
||||
draw_geimao, draw_mnk,
|
||||
draw_pt, draw_suan)
|
||||
|
||||
from nonebot.adapters import Bot, Event
|
||||
|
||||
from returns.result import Success, Failure
|
||||
|
||||
geimao = on_alconna(Alconna(
|
||||
"给猫说",
|
||||
@ -36,3 +45,97 @@ async def _(saying: list[str]):
|
||||
img.save(img_bytes, format="PNG")
|
||||
|
||||
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
||||
|
||||
|
||||
mnk = on_alconna(Alconna(
|
||||
"re:小?黑白子?说",
|
||||
Args["saying", MultiVar(str, '+'), Field(
|
||||
missing_tips=lambda: "你没有写黑白子说了什么"
|
||||
)]
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
|
||||
|
||||
@mnk.handle()
|
||||
async def _(saying: list[str]):
|
||||
img = await draw_mnk("\n".join(saying))
|
||||
img_bytes = BytesIO()
|
||||
img.save(img_bytes, format="PNG")
|
||||
|
||||
await mnk.send(await UniMessage().image(raw=img_bytes).export())
|
||||
|
||||
|
||||
suan = on_alconna(Alconna(
|
||||
"小蒜说",
|
||||
Args["saying", MultiVar(str, '+'), Field(
|
||||
missing_tips=lambda: "你没有写小蒜说了什么"
|
||||
)]
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
|
||||
|
||||
@suan.handle()
|
||||
async def _(saying: list[str]):
|
||||
img = await draw_suan("\n".join(saying))
|
||||
img_bytes = BytesIO()
|
||||
img.save(img_bytes, format="PNG")
|
||||
|
||||
await suan.send(await UniMessage().image(raw=img_bytes).export())
|
||||
|
||||
|
||||
dsuan = on_alconna(Alconna(
|
||||
"大蒜说",
|
||||
Args["saying", MultiVar(str, '+'), Field(
|
||||
missing_tips=lambda: "你没有写大蒜说了什么"
|
||||
)]
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
|
||||
|
||||
@dsuan.handle()
|
||||
async def _(saying: list[str]):
|
||||
img = await draw_suan("\n".join(saying), True)
|
||||
img_bytes = BytesIO()
|
||||
img.save(img_bytes, format="PNG")
|
||||
|
||||
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
|
||||
|
||||
|
||||
cutecat = on_alconna(Alconna(
|
||||
"乖猫说",
|
||||
Args["saying", MultiVar(str, '+'), Field(
|
||||
missing_tips=lambda: "你没有写十猫说了什么"
|
||||
)]
|
||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"})
|
||||
|
||||
@cutecat.handle()
|
||||
async def _(saying: list[str]):
|
||||
img = await draw_cute_ten("\n".join(saying))
|
||||
img_bytes = BytesIO()
|
||||
img.save(img_bytes, format="PNG")
|
||||
|
||||
await cutecat.send(await UniMessage().image(raw=img_bytes).export())
|
||||
|
||||
|
||||
cao_display_cmd = on_message()
|
||||
|
||||
@cao_display_cmd.handle()
|
||||
async def _(msg: UniMsg, evt: Event, bot: Bot):
|
||||
flag = False
|
||||
for text in cast(Iterable[Text], msg.get(Text)):
|
||||
if text.text.strip() == "小槽展示":
|
||||
flag = True
|
||||
elif text.text.strip() == '':
|
||||
continue
|
||||
else:
|
||||
return
|
||||
if not flag:
|
||||
return
|
||||
match await extract_image_from_message(evt.get_message(), evt, bot):
|
||||
case Success(img):
|
||||
img_handled = await draw_cao_display(img)
|
||||
img_bytes = BytesIO()
|
||||
img_handled.save(img_bytes, format="PNG")
|
||||
await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
|
||||
case Failure(err):
|
||||
await cao_display_cmd.send(
|
||||
await UniMessage()
|
||||
.at(user_id=evt.get_user_id())
|
||||
.text(' ')
|
||||
.text(err)
|
||||
.export()
|
||||
)
|
||||
|
||||
@ -10,3 +10,4 @@ FontDB.SetDefaultEmojiOptions(EmojiOptions(
|
||||
|
||||
HARMONYOS_SANS_SC_BLACK = FontDB.Query("HarmonyOS_Sans_SC_Black")
|
||||
HARMONYOS_SANS_SC_REGULAR = FontDB.Query("HarmonyOS_Sans_SC_Regular")
|
||||
LXGWWENKAI_REGULAR = FontDB.Query("LXGWWenKai-Regular")
|
||||
|
||||
45
konabot/plugins/memepack/drawing/display.py
Normal file
45
konabot/plugins/memepack/drawing/display.py
Normal file
@ -0,0 +1,45 @@
|
||||
import asyncio
|
||||
from typing import Any, cast
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import PIL.Image
|
||||
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
|
||||
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
|
||||
CAO_QUAD_POINTS = np.float32(cast(Any, [
|
||||
[392, 540],
|
||||
[577, 557],
|
||||
[567, 707],
|
||||
[381, 687],
|
||||
]))
|
||||
|
||||
def _draw_cao_display(image: PIL.Image.Image):
|
||||
src = np.array(image.convert("RGB"))
|
||||
h, w = src.shape[:2]
|
||||
src_points = np.float32(cast(Any, [
|
||||
[0, 0],
|
||||
[w, 0],
|
||||
[w, h],
|
||||
[0, h]
|
||||
]))
|
||||
dst_points = CAO_QUAD_POINTS
|
||||
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
|
||||
output_size = cao_image.size
|
||||
output_w, output_h = output_size
|
||||
warped = cv2.warpPerspective(
|
||||
src,
|
||||
M,
|
||||
(output_w, output_h),
|
||||
flags=cv2.INTER_LINEAR,
|
||||
borderMode=cv2.BORDER_CONSTANT,
|
||||
borderValue=(0, 0, 0)
|
||||
)
|
||||
result = PIL.Image.fromarray(warped, 'RGB').convert('RGBA')
|
||||
result = PIL.Image.alpha_composite(result, cao_image)
|
||||
return result
|
||||
|
||||
|
||||
async def draw_cao_display(image: PIL.Image.Image):
|
||||
return await asyncio.to_thread(_draw_cao_display, image)
|
||||
@ -1,30 +0,0 @@
|
||||
import asyncio
|
||||
from typing import Any, cast
|
||||
|
||||
import imagetext_py
|
||||
import PIL.Image
|
||||
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
|
||||
from .base.fonts import HARMONYOS_SANS_SC_BLACK
|
||||
|
||||
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
|
||||
|
||||
|
||||
def _draw_geimao(saying: str):
|
||||
img = geimao_image.copy()
|
||||
with imagetext_py.Writer(img) as iw:
|
||||
iw.draw_text_wrapped(
|
||||
saying, 960, 50, 00.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||
0.8,
|
||||
imagetext_py.TextAlign.Center,
|
||||
cast(Any, 30.0),
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
|
||||
draw_emojis=True,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
async def draw_geimao(saying: str):
|
||||
return await asyncio.to_thread(_draw_geimao, saying)
|
||||
@ -1,27 +0,0 @@
|
||||
import asyncio
|
||||
|
||||
import imagetext_py
|
||||
import PIL.Image
|
||||
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
|
||||
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
|
||||
|
||||
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
|
||||
|
||||
|
||||
def _draw_pt(saying: str):
|
||||
img = pt_image.copy()
|
||||
with imagetext_py.Writer(img) as iw:
|
||||
iw.draw_text_wrapped(
|
||||
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||
1.0,
|
||||
imagetext_py.TextAlign.Center,
|
||||
draw_emojis=True,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
async def draw_pt(saying: str):
|
||||
return await asyncio.to_thread(_draw_pt, saying)
|
||||
108
konabot/plugins/memepack/drawing/saying.py
Normal file
108
konabot/plugins/memepack/drawing/saying.py
Normal file
@ -0,0 +1,108 @@
|
||||
import asyncio
|
||||
from typing import Any, cast
|
||||
|
||||
import imagetext_py
|
||||
import PIL.Image
|
||||
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
|
||||
from .base.fonts import HARMONYOS_SANS_SC_BLACK, HARMONYOS_SANS_SC_REGULAR, LXGWWENKAI_REGULAR
|
||||
|
||||
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
|
||||
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
|
||||
mnk_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "mnksay.jpg").convert("RGBA")
|
||||
dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert("RGBA")
|
||||
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
|
||||
cute_ten_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "tententen.png").convert("RGBA")
|
||||
|
||||
|
||||
def _draw_geimao(saying: str):
|
||||
img = geimao_image.copy()
|
||||
with imagetext_py.Writer(img) as iw:
|
||||
iw.draw_text_wrapped(
|
||||
saying, 960, 50, 0.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||
0.8,
|
||||
imagetext_py.TextAlign.Center,
|
||||
cast(Any, 30.0),
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
|
||||
draw_emojis=True,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
async def draw_geimao(saying: str):
|
||||
return await asyncio.to_thread(_draw_geimao, saying)
|
||||
|
||||
|
||||
def _draw_pt(saying: str):
|
||||
img = pt_image.copy()
|
||||
with imagetext_py.Writer(img) as iw:
|
||||
iw.draw_text_wrapped(
|
||||
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||
1.0,
|
||||
imagetext_py.TextAlign.Center,
|
||||
draw_emojis=True,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
async def draw_pt(saying: str):
|
||||
return await asyncio.to_thread(_draw_pt, saying)
|
||||
|
||||
|
||||
def _draw_mnk(saying: str):
|
||||
img = mnk_image.copy()
|
||||
with imagetext_py.Writer(img) as iw:
|
||||
iw.draw_text_wrapped(
|
||||
saying, 540, 25, 0.5, 0, 1080, 120, HARMONYOS_SANS_SC_BLACK,
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||
0.8,
|
||||
imagetext_py.TextAlign.Center,
|
||||
cast(Any, 15.0),
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
|
||||
draw_emojis=True,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
async def draw_mnk(saying: str):
|
||||
return await asyncio.to_thread(_draw_mnk, saying)
|
||||
|
||||
|
||||
def _draw_suan(saying: str, dasuan: bool = False):
|
||||
if dasuan:
|
||||
img = dasuan_image.copy()
|
||||
else:
|
||||
img = suan_image.copy()
|
||||
with imagetext_py.Writer(img) as iw:
|
||||
iw.draw_text_wrapped(
|
||||
saying, 1020, 290, 0.5, 0.5, 400, 48, LXGWWENKAI_REGULAR,
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||
1.0,
|
||||
imagetext_py.TextAlign.Center,
|
||||
draw_emojis=True,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
async def draw_suan(saying: str, dasuan: bool = False):
|
||||
return await asyncio.to_thread(_draw_suan, saying, dasuan)
|
||||
|
||||
|
||||
def _draw_cute_ten(saying: str):
|
||||
img = cute_ten_image.copy()
|
||||
with imagetext_py.Writer(img) as iw:
|
||||
iw.draw_text_wrapped(
|
||||
saying, 390, 479, 0.5, 0.5, 760, 96, LXGWWENKAI_REGULAR,
|
||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||
1.0,
|
||||
imagetext_py.TextAlign.Center,
|
||||
draw_emojis=True,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
async def draw_cute_ten(saying: str):
|
||||
return await asyncio.to_thread(_draw_cute_ten, saying)
|
||||
@ -332,9 +332,10 @@ async def generate_dice_image(number: str) -> BytesIO:
|
||||
up_direction = (51 - 16, 5 - 30) # 右上角点 - 左上角点
|
||||
|
||||
move_distance = (up_direction[0] * (stretch_k - 1), up_direction[1] * (stretch_k - 1))
|
||||
|
||||
|
||||
# 加载背景图像,保留透明通道
|
||||
background = cv2.imread(ASSETS_PATH / "img" / "dice" / "template.png", cv2.IMREAD_UNCHANGED)
|
||||
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
|
||||
assert background is not None
|
||||
|
||||
height, width = background.shape[:2]
|
||||
|
||||
@ -352,7 +353,7 @@ async def generate_dice_image(number: str) -> BytesIO:
|
||||
], dtype=np.float32)
|
||||
corners[:, 0] += offset_x
|
||||
corners[:, 1] += offset_y
|
||||
|
||||
|
||||
|
||||
# 对文本图像进行3D变换(保持透明通道)
|
||||
transformed_text, transform_matrix = perspective_transform(text_image, background, corners)
|
||||
|
||||
@ -45,7 +45,7 @@ class Notify(BaseModel):
|
||||
|
||||
|
||||
class NotifyConfigFile(BaseModel):
|
||||
version: int = 1
|
||||
version: int = 2
|
||||
notifies: list[Notify] = []
|
||||
unsent: list[Notify] = []
|
||||
|
||||
@ -89,13 +89,17 @@ async def notify_now(notify: Notify):
|
||||
if notify.target_env is None:
|
||||
await bot.send_private_msg(
|
||||
user_id=int(notify.target),
|
||||
message=f"代办通知:{notify.notify_msg}",
|
||||
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
|
||||
bot=bot,
|
||||
)),
|
||||
)
|
||||
else:
|
||||
await bot.send_group_msg(
|
||||
group_id=int(notify.target_env),
|
||||
message=cast(Any,
|
||||
await UniMessage().at(notify.target).text(f" 代办通知:{notify.notify_msg}").export()
|
||||
await UniMessage().at(
|
||||
notify.target
|
||||
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot)
|
||||
),
|
||||
)
|
||||
else:
|
||||
@ -197,11 +201,15 @@ async def _():
|
||||
|
||||
NOTIFIED_FLAG["task_added"] = True
|
||||
|
||||
await asyncio.sleep(10)
|
||||
await DATA_FILE_LOCK.acquire()
|
||||
tasks = []
|
||||
cfg = load_notify_config()
|
||||
for notify in cfg.notifies:
|
||||
tasks.append(create_notify_task(notify, fail2remove=False))
|
||||
if cfg.version == 1:
|
||||
cfg.version = 2
|
||||
else:
|
||||
for notify in cfg.notifies:
|
||||
tasks.append(create_notify_task(notify, fail2remove=False))
|
||||
DATA_FILE_LOCK.release()
|
||||
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
268
konabot/plugins/ytpgif/__init__.py
Normal file
268
konabot/plugins/ytpgif/__init__.py
Normal file
@ -0,0 +1,268 @@
|
||||
import os
|
||||
import tempfile
|
||||
from typing import Optional
|
||||
|
||||
from PIL import Image, ImageSequence
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
Args,
|
||||
Field,
|
||||
UniMessage,
|
||||
on_alconna,
|
||||
)
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="ytpgif",
|
||||
description="生成来回镜像翻转的仿 YTPMV 动图。",
|
||||
usage="ytpgif [倍速=1.0] (倍速范围:0.1~20.0)",
|
||||
type="application",
|
||||
config=None,
|
||||
homepage=None,
|
||||
)
|
||||
|
||||
# 参数定义
|
||||
BASE_SEGMENT_DURATION = 0.25
|
||||
BASE_INTERVAL = 0.25
|
||||
MAX_SIZE = 256
|
||||
MIN_SPEED = 0.1
|
||||
MAX_SPEED = 20.0
|
||||
MAX_FRAMES_PER_SEGMENT = 500
|
||||
|
||||
# 提示语
|
||||
SPEED_TIPS = f"倍速必须是 {MIN_SPEED} 到 {MAX_SPEED} 之间的数字"
|
||||
|
||||
|
||||
# 定义命令 + 参数校验
|
||||
ytpgif_cmd = on_alconna(
|
||||
Alconna(
|
||||
"ytpgif",
|
||||
Args[
|
||||
"speed?",
|
||||
float,
|
||||
Field(
|
||||
default=1.0,
|
||||
unmatch_tips=lambda x: f"“{x}”不是有效数值。{SPEED_TIPS}",
|
||||
),
|
||||
],
|
||||
),
|
||||
use_cmd_start=True,
|
||||
use_cmd_sep=False,
|
||||
skip_for_unmatch=False,
|
||||
)
|
||||
|
||||
|
||||
async def get_image_url(event: BaseEvent) -> Optional[str]:
|
||||
"""从事件中提取图片 URL,支持直接消息和回复"""
|
||||
msg = event.get_message()
|
||||
for seg in msg:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
return str(seg.data["url"])
|
||||
|
||||
if hasattr(event, "reply") and (reply := event.reply):
|
||||
reply_msg = reply.message
|
||||
for seg in reply_msg:
|
||||
if seg.type == "image" and seg.data.get("url"):
|
||||
return str(seg.data["url"])
|
||||
return None
|
||||
|
||||
|
||||
async def download_image(url: str) -> bytes:
|
||||
import httpx
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.get(url, timeout=10)
|
||||
resp.raise_for_status()
|
||||
return resp.content
|
||||
|
||||
|
||||
def resize_frame(frame: Image.Image) -> Image.Image:
|
||||
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
|
||||
w, h = frame.size
|
||||
if w <= MAX_SIZE and h <= MAX_SIZE:
|
||||
return frame
|
||||
|
||||
scale = MAX_SIZE / max(w, h)
|
||||
new_w = int(w * scale)
|
||||
new_h = int(h * scale)
|
||||
return frame.resize((new_w, new_h), Image.Resampling.LANCZOS)
|
||||
|
||||
|
||||
@ytpgif_cmd.handle()
|
||||
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
|
||||
# === 校验 speed 范围 ===
|
||||
if not (MIN_SPEED <= speed <= MAX_SPEED):
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text(f"❌ {SPEED_TIPS}").export()
|
||||
)
|
||||
return
|
||||
|
||||
img_url = await get_image_url(event)
|
||||
if not img_url:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text(
|
||||
"请发送一张图片或回复一张图片来生成镜像动图。"
|
||||
).export()
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
image_data = await download_image(img_url)
|
||||
except Exception as e:
|
||||
print(f"[YTPGIF] 下载失败: {e}")
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("❌ 图片下载失败,请重试。").export()
|
||||
)
|
||||
return
|
||||
|
||||
input_path = output_path = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
|
||||
tmp_in.write(image_data)
|
||||
input_path = tmp_in.name
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out:
|
||||
output_path = tmp_out.name
|
||||
|
||||
with Image.open(input_path) as src_img:
|
||||
# === 判断是否为动图 ===
|
||||
try:
|
||||
n_frames = getattr(src_img, "n_frames", 1)
|
||||
is_animated = n_frames > 1
|
||||
except Exception:
|
||||
is_animated = False
|
||||
|
||||
output_frames = []
|
||||
output_durations_ms = []
|
||||
|
||||
if is_animated:
|
||||
# === 动图模式:截取正向 + 镜像两段 ===
|
||||
frames_with_duration = []
|
||||
palette = src_img.getpalette()
|
||||
|
||||
for idx in range(n_frames):
|
||||
src_img.seek(idx)
|
||||
frame = src_img.copy()
|
||||
# 检查是否需要透明通道
|
||||
has_alpha = (
|
||||
frame.mode in ("RGBA", "LA")
|
||||
or (frame.mode == "P" and "transparency" in frame.info)
|
||||
)
|
||||
if has_alpha:
|
||||
frame = frame.convert("RGBA")
|
||||
else:
|
||||
frame = frame.convert("RGB")
|
||||
resized_frame = resize_frame(frame)
|
||||
|
||||
# 若原图有调色板,尝试保留(可选)
|
||||
if palette and resized_frame.mode == "P":
|
||||
try:
|
||||
resized_frame.putpalette(palette)
|
||||
except Exception: # noqa
|
||||
pass
|
||||
|
||||
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
|
||||
dur_sec = max(0.01, ms / 1000.0)
|
||||
frames_with_duration.append((resized_frame, dur_sec))
|
||||
|
||||
max_dur = BASE_SEGMENT_DURATION * speed
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
|
||||
# 正向段
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
output_frames.append(img)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
if frame_count == 0:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
|
||||
)
|
||||
return
|
||||
|
||||
# 镜像段(从头开始)
|
||||
accumulated = 0.0
|
||||
frame_count = 0
|
||||
for img, dur in frames_with_duration:
|
||||
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||
break
|
||||
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
|
||||
output_frames.append(flipped)
|
||||
output_durations_ms.append(int(dur * 1000))
|
||||
accumulated += dur
|
||||
frame_count += 1
|
||||
|
||||
else:
|
||||
# === 静态图模式:制作翻转动画 ===
|
||||
raw_frame = src_img.convert("RGBA")
|
||||
resized_frame = resize_frame(raw_frame)
|
||||
|
||||
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
|
||||
duration_ms = int(interval_sec * 1000)
|
||||
|
||||
frame1 = resized_frame
|
||||
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
|
||||
|
||||
output_frames = [frame1, frame2]
|
||||
output_durations_ms = [duration_ms, duration_ms]
|
||||
|
||||
if len(output_frames) < 1:
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("未能生成任何帧。").export()
|
||||
)
|
||||
return
|
||||
|
||||
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
|
||||
need_transparency = False
|
||||
for frame in output_frames:
|
||||
if frame.mode == "RGBA":
|
||||
alpha_channel = frame.getchannel("A")
|
||||
if any(pix < 255 for pix in alpha_channel.getdata()):
|
||||
need_transparency = True
|
||||
break
|
||||
elif frame.mode == "P" and "transparency" in frame.info:
|
||||
need_transparency = True
|
||||
break
|
||||
|
||||
# 如果不需要透明,则统一转为 RGB 避免调色板污染
|
||||
if not need_transparency:
|
||||
output_frames = [f.convert("RGB") for f in output_frames]
|
||||
|
||||
# 构建保存参数
|
||||
save_kwargs = {
|
||||
"save_all": True,
|
||||
"append_images": output_frames[1:],
|
||||
"format": "GIF",
|
||||
"loop": 0, # 无限循环
|
||||
"duration": output_durations_ms,
|
||||
"disposal": 2, # 清除到背景色,避免残留
|
||||
"optimize": False, # 关闭抖动(等效 -dither none)
|
||||
}
|
||||
|
||||
# 只有真正需要透明时才启用 transparency
|
||||
if need_transparency:
|
||||
save_kwargs["transparency"] = 0
|
||||
|
||||
output_frames[0].save(output_path, **save_kwargs)
|
||||
|
||||
# 发送结果
|
||||
with open(output_path, "rb") as f:
|
||||
result_image = UniMessage.image(raw=f.read())
|
||||
await ytpgif_cmd.send(await result_image.export())
|
||||
|
||||
except Exception as e:
|
||||
print(f"[YTPGIF] 处理失败: {e}")
|
||||
await ytpgif_cmd.send(
|
||||
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
|
||||
)
|
||||
finally:
|
||||
for path in filter(None, [input_path, output_path]):
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
os.unlink(path)
|
||||
except: # noqa
|
||||
pass
|
||||
21
poetry.lock
generated
21
poetry.lock
generated
@ -2460,6 +2460,25 @@ urllib3 = ">=1.21.1,<3"
|
||||
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
|
||||
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
|
||||
|
||||
[[package]]
|
||||
name = "returns"
|
||||
version = "0.26.0"
|
||||
description = "Make your functions return something meaningful, typed, and safe!"
|
||||
optional = false
|
||||
python-versions = "<4.0,>=3.10"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "returns-0.26.0-py3-none-any.whl", hash = "sha256:7cae94c730d6c56ffd9d0f583f7a2c0b32cfe17d141837150c8e6cff3eb30d71"},
|
||||
{file = "returns-0.26.0.tar.gz", hash = "sha256:180320e0f6e9ea9845330ccfc020f542330f05b7250941d9b9b7c00203fcc3da"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
typing-extensions = ">=4.0,<5.0"
|
||||
|
||||
[package.extras]
|
||||
check-laws = ["hypothesis (>=6.136,<7.0)", "pytest (>=8.0,<9.0)"]
|
||||
compatible-mypy = ["mypy (>=1.12,<1.18)"]
|
||||
|
||||
[[package]]
|
||||
name = "rich"
|
||||
version = "14.1.0"
|
||||
@ -3162,4 +3181,4 @@ type = ["pytest-mypy"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.12,<4.0"
|
||||
content-hash = "673703a789248d0f7369999c364352eb12f8bb5830a8b4b6918f8bab6425a763"
|
||||
content-hash = "927913b9030d1f6c126bb2d12eab7307dc6297f259c7c62e3033706457d27ce0"
|
||||
|
||||
@ -21,6 +21,7 @@ dependencies = [
|
||||
"pillow (>=11.3.0,<12.0.0)",
|
||||
"imagetext-py (>=2.2.0,<3.0.0)",
|
||||
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
|
||||
"returns (>=0.26.0,<0.27.0)",
|
||||
]
|
||||
|
||||
|
||||
|
||||
2076
requirements.txt
2076
requirements.txt
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user