Compare commits

...

12 Commits

Author SHA1 Message Date
8edb999050 添加巨大多东西
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-07 16:07:48 +08:00
109a81923f 添加 man 指令 2025-10-07 15:54:16 +08:00
91687fb8c3 Merge pull request 'feature-更多更多说' (#22) from feature-更多更多说 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #22
2025-10-03 17:40:52 +08:00
f889381cce 排序 import 2025-10-03 17:39:49 +08:00
1256055c9d 补充依赖 2025-10-03 17:38:02 +08:00
40f35a474e 搞小槽的说话 2025-10-03 17:37:43 +08:00
6b01acfa8c 十猫 2025-10-03 14:23:01 +08:00
09c9d44798 Merge pull request 'Feature: 好多好多的说' (#21) from feature-新说 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #21
2025-10-03 13:52:31 +08:00
0c4206f461 好多好多的说 2025-10-03 13:49:36 +08:00
9fb8fd90dc 修复类型注解
Some checks are pending
continuous-integration/drone/push Build is running
2025-10-02 12:06:20 +08:00
8c4fa2b5e4 Merge pull request 'fix: 透明底正常生成;静动图分离完成' (#18) from tnot/konabot:fix--修复部分Bug into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #18
2025-10-01 19:24:23 +08:00
fb2c3f1ce2 fix: 透明底正常生成;静动图分离完成 2025-10-01 11:54:54 +08:00
34 changed files with 959 additions and 2089 deletions

Binary file not shown.

BIN
assets/img/meme/caoimg1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 227 KiB

BIN
assets/img/meme/dss.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 172 KiB

BIN
assets/img/meme/mnksay.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 364 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 614 KiB

View File

@ -0,0 +1,135 @@
from io import BytesIO
import httpx
import PIL.Image
from loguru import logger
from nonebot.adapters import Bot, Event, Message
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
from PIL import UnidentifiedImageError
from returns.result import Failure, Result, Success
async def download_image_bytes(url: str) -> Result[bytes, str]:
# if "/matcha/cache/" in url:
# url = url.replace('127.0.0.1', '10.126.126.101')
logger.debug(f"开始从 {url} 下载图片")
async with httpx.AsyncClient() as c:
try:
response = await c.get(url)
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
return Failure(f"HTTPX 模块下载图片时出错:{e}")
except httpx.ConnectTimeout:
return Failure("下载图片失败了网络超时了qwq")
if response.status_code != 200:
return Failure("无法下载图片,可能存在网络问题需要排查")
return Success(response.content)
def bytes_to_pil(raw_data: bytes | BytesIO) -> Result[PIL.Image.Image, str]:
try:
if not isinstance(raw_data, BytesIO):
img_pil = PIL.Image.open(BytesIO(raw_data))
else:
img_pil = PIL.Image.open(raw_data)
img_pil.verify()
if not isinstance(raw_data, BytesIO):
img = PIL.Image.open(BytesIO(raw_data))
else:
raw_data.seek(0)
img = PIL.Image.open(raw_data)
return Success(img)
except UnidentifiedImageError:
return Failure("图像无法读取可能是格式不支持orz")
except IOError:
return Failure("图像无法读取可能是网络存在问题orz")
async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]:
if image.url is not None:
raw_result = await download_image_bytes(image.url)
elif image.raw is not None:
raw_result = Success(image.raw)
else:
return Failure("由于一些内部问题下载图片失败了orz")
return raw_result.bind(bytes_to_pil)
async def extract_image_from_qq_message(
msg: OnebotV11Message,
evt: OnebotV11MessageEvent,
bot: OnebotV11Bot,
allow_reply: bool = True,
) -> Result[PIL.Image.Image, str]:
if allow_reply and (reply := evt.reply) is not None:
return await extract_image_from_qq_message(
reply.message,
evt,
bot,
False,
)
for seg in msg:
if seg.type == "reply" and allow_reply:
msgid = seg.data.get("id")
if msgid is None:
return Failure("消息可能太久远,无法读取到消息原文")
try:
msg2 = await bot.get_msg(message_id=msgid)
except Exception as e:
logger.warning(f"获取消息内容时出错:{e}")
return Failure("消息可能太久远,无法读取到消息原文")
msg2_data = msg2.get("message")
if msg2_data is None:
return Failure("消息可能太久远,无法读取到消息原文")
logger.debug("发现消息引用,递归一层")
return await extract_image_from_qq_message(
msg=OnebotV11Message(msg2_data),
evt=evt,
bot=bot,
allow_reply=False,
)
if seg.type == "image":
url = seg.data.get("url")
if url is None:
return Failure("无法下载图片,可能有一些网络问题")
data = await download_image_bytes(url)
return data.bind(bytes_to_pil)
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
async def extract_image_from_message(
msg: Message,
evt: Event,
bot: Bot,
allow_reply: bool = True,
) -> Result[PIL.Image.Image, str]:
if (
isinstance(bot, OnebotV11Bot)
and isinstance(msg, OnebotV11Message)
and isinstance(evt, OnebotV11MessageEvent)
):
# 看起来 UniMessage 在这方面能力似乎不足,因此用 QQ 的
logger.debug('获取图片的路径 Fallback 到 QQ 模块')
return await extract_image_from_qq_message(msg, evt, bot, allow_reply)
for seg in UniMessage.of(msg, bot):
logger.info(seg)
if isinstance(seg, Image):
return await unimsg_img_to_pil(seg)
elif isinstance(seg, Reply) and allow_reply:
msg2 = seg.msg
logger.debug(f"深入搜索引用的消息:{msg2}")
if msg2 is None or isinstance(msg2, str):
continue
return await extract_image_from_message(msg2, evt, bot, False)
elif isinstance(seg, RefNode) and allow_reply:
if isinstance(bot, DiscordBot):
return Failure("暂时不支持在 Discord 中通过引用的方式获取图片")
else:
return Failure("暂时不支持在这里中通过引用的方式获取图片")
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")

View File

@ -0,0 +1,34 @@
from nonebot import get_plugin_config
import nonebot
import nonebot.adapters
import nonebot.adapters.console
import nonebot.adapters.discord
import nonebot.adapters.onebot
from pydantic import BaseModel
class IsAdminConfig(BaseModel):
admin_qq_group: list[int] = []
admin_qq_account: list[int] = []
admin_discord_channel: list[int] = []
admin_discord_account: list[int] = []
cfg = get_plugin_config(IsAdminConfig)
def is_admin(event: nonebot.adapters.Event):
if isinstance(event, nonebot.adapters.onebot.v11.MessageEvent):
if event.user_id in cfg.admin_qq_account:
return True
if isinstance(event, nonebot.adapters.onebot.v11.GroupMessageEvent):
if event.group_id in cfg.admin_qq_group:
return True
if isinstance(event, nonebot.adapters.discord.event.MessageEvent):
if event.channel_id in cfg.admin_discord_channel:
return True
if event.user_id in cfg.admin_discord_account:
return True
if isinstance(event, nonebot.adapters.console.event.Event):
return True
return False

View File

@ -2,3 +2,11 @@ from pathlib import Path
ASSETS_PATH = Path(__file__).resolve().parent.parent.parent / "assets"
FONTS_PATH = ASSETS_PATH / "fonts"
SRC_PATH = Path(__file__).resolve().parent.parent
DOCS_PATH = SRC_PATH / "docs"
DOCS_PATH_MAN1 = DOCS_PATH / "user"
DOCS_PATH_MAN3 = DOCS_PATH / "lib"
DOCS_PATH_MAN7 = DOCS_PATH / "concepts"
DOCS_PATH_MAN8 = DOCS_PATH / "sys"

40
konabot/docs/README.md Normal file
View File

@ -0,0 +1,40 @@
# 此方 Bot 的文档系统
此方 Bot 使用类 Linux 的 `man` 指令来管理文档。文档一般建议使用纯文本书写,带有相对良好的格式。
## 文件夹摆放规则
`docs` 目录下,有若干文档可以拿来阅读和输出。每个子文件夹里,文档文件使用名字不含空格的 txt 文件书写,其他后缀名的文件将会被忽略。所以,如果你希望有些文件只在代码库中可阅读,你可以使用 `.md` 格式。
### 1 - user
`docs/user` 目录下的文档是直接会给用户进行检索的文档,在直接使用 `man` 指令时,会搜索该文件夹的全部文件,以知晓所有有文档的指令。
### 3 - lib
`docs/lib` 目录下的文档主要给该项目的维护者进行阅读和使用,讲述的是本项目内置的一些函数的功能讲解(一般以便利为主要目的)以及一些项目安排上的要求。一般不会列举,除非用户指定要求列举该范围。
### 7 - concepts
`docs/concepts` 用来摆放任何的概念。任何的。一般不会列举,除非用户指定要求列举该范围。
### 8 - sys
`docs/sys` 用于摆放仅 MTTU 群可以使用的文档集合。在 MTTU 群内,该目录下的文档也会被索引,否则文档将不可阅读。
## 书写规范
无特殊要求,因为当用户进行 `man` 的时候,会将文档内的内容原封不动地展示出来。但是,你仍然可以模仿 Linux 下的 `man` 指令的格式进行书写。
```
指令介绍
man - 用于展示此方 BOT 使用手册的指令
格式
man [文档类型] <指令>
示例
`man` 查看所有有文档的指令清单
`man 喵` 查看指令「喵」的使用说明
……
```

View File

View File

@ -0,0 +1,45 @@
指令介绍
is_admin - 用于判断当前事件是否来自管理员的内部权限校验函数
格式
from konabot.common.nb.is_admin import is_admin
from nonebot import on
from nonebot.adapters import Event
from loguru import logger
@on().handle()
async def _(event: Event):
if is_admin(event):
logger.info("管理员发送了消息")
说明
is_admin 是 Bot 内部用于权限控制的核心函数根据事件来源QQ、Discord、控制台及插件配置判断触发事件的用户或群组是否具有管理员权限。
支持的适配器与判定逻辑:
• OneBot V11QQ
- 若用户 ID 在配置项 admin_qq_account 中,则视为管理员
- 若为群聊消息,且群 ID 在配置项 admin_qq_group 中,则视为管理员
• Discord
- 若频道 ID 在配置项 admin_discord_channel 中,则视为管理员
- 若用户 ID 在配置项 admin_discord_account 中,则视为管理员
• Console控制台
- 所有控制台输入均默认视为管理员操作,自动返回 True
配置项(位于插件配置中)
ADMIN_QQ_GROUP: list[int]
允许的管理员 QQ 群 ID 列表
ADMIN_QQ_ACCOUNT: list[int]
允许的管理员 QQ 账号 ID 列表
ADMIN_DISCORD_CHANNEL: list[int]
允许的管理员 Discord 频道 ID 列表
ADMIN_DISCORD_ACCOUNT: list[int]
允许的管理员 Discord 用户 ID 列表
注意事项
- 若未在配置文件中设置任何管理员 ID该函数对所有非控制台事件返回 False
- 控制台事件始终拥有管理员权限,便于本地调试与运维

View File

1
konabot/docs/sys/out.txt Normal file
View File

@ -0,0 +1 @@
MAN what can I say!

20
konabot/docs/user/man.txt Normal file
View File

@ -0,0 +1,20 @@
指令介绍
man - 用于展示此方 BOT 使用手册的指令
格式
man 文档类型
man [文档类型] <指令>
示例
`man` 查看所有有文档的指令清单
`man 3` 列举所有可读文档的库函数清单
`man 喵` 查看指令「喵」的使用说明
`man 8 out` 查看管理员指令「out」的使用说明
文档类型
文档类型用来区分同一指令在不同场景下的情景。你可以使用数字编号进行筛选。分为这些种类:
- 1 用户态指令,用于日常使用的指令
- 3 库函数指令,用于 Bot 开发用的函数查询
- 7 概念指令,用于概念解释
- 8 系统指令,仅管理员可用

View File

@ -0,0 +1,21 @@
指令介绍
openssl - 用于生成指定长度的加密安全随机数据
格式
openssl rand <模式> <字节数>
示例
`openssl rand -hex 16` 生成 16 字节的十六进制随机数
`openssl rand -base64 32` 生成 32 字节并以 Base64 编码输出的随机数据
说明
该指令使用 Python 的 secrets 模块生成加密安全的随机字节,并支持以十六进制(-hex或 Base64-base64格式输出。
参数说明
模式mode
- -hex :以十六进制字符串形式输出随机数据
- -base64 :以 Base64 编码字符串形式输出随机数据
字节数num
- 必须为正整数
- 最大支持 256 字节

View File

@ -0,0 +1,41 @@
指令介绍
ytpgif - 生成来回镜像翻转的仿 YTPMV 动图
格式
ytpgif [倍速]
示例
`ytpgif`
使用默认倍速1.0)处理你发送或回复的图片,生成镜像动图。
`ytpgif 2.5`
以 2.5 倍速处理图片,生成更快节奏的镜像动图。
回复一张图片并发送 `ytpgif 0.5`
以慢速0.5 倍)生成镜像动图。
参数说明
倍速(可选)
- 类型:浮点数
- 默认值1.0
- 有效范围0.1 20.0
- 作用:
• 对于静态图:控制镜像切换的快慢(值越大,切换越快)。
• 对于动图:控制截取原始动图正向和反向片段的时长(值越大,截取的片段越长)。
使用方式
发送指令前,请确保:
- 直接在消息中附带一张图片,或
- 回复一条包含图片的消息后再发送指令。
插件会自动:
- 下载并识别图片(支持静态图和 GIF 动图)
- 自动缩放至最大边长不超过 256 像素(保持宽高比)
- 静态图 → 生成“原图↔镜像”循环动图
- 动图 → 截取开头一段正向播放 + 同一段镜像翻转播放,拼接成新动图
- 保留透明通道(如原图含透明),否则转为 RGB 避免颜色异常
注意事项
- 图片过大、格式损坏或网络问题可能导致处理失败。
- 动图帧数过多或单帧过短可能无法生成有效输出。
- 输出 GIF 最大单段帧数限制为 500 帧,以防资源耗尽。

View File

@ -0,0 +1,2 @@
指令介绍
喵 - 你发喵,此方就会回复喵

View File

@ -0,0 +1,7 @@
指令介绍
摇数字 - 生成一个随机数字并发送
示例
`摇数字` 随机生成一个 1-6 的数字
该指令不接受任何参数,直接调用即可。

View File

@ -0,0 +1,22 @@
指令介绍
摇骰子 - 用于生成随机数并以骰子图像形式展示的指令
格式
摇骰子 [最小值] [最大值]
示例
`摇骰子` 随机生成一个 1-6 的数字,并显示对应的骰子图像
`摇骰子 10` 生成 1 到 10 之间的随机整数
`摇骰子 0.5` 生成 0 到 0.5 之间的随机小数
`摇骰子 -5 5` 生成 -5 到 5 之间的随机数
说明
该指令支持以下几种调用方式:
- 不带参数:使用默认范围生成随机数
- 仅指定一个参数 f1
- 若 f1 > 1则生成 [1, f1] 范围内的随机数
- 若 0 < f1 ≤ 1则生成 [0, f1] 范围内的随机数
- 若 f1 ≤ 0则生成 [f1, 0] 范围内的随机数
- 指定两个参数 f1 和 f2生成 [f1, f2] 范围内的随机数(顺序无关,内部会自动处理大小)
返回结果将以骰子样式的图像形式展示生成的随机数值。

View File

@ -0,0 +1,13 @@
指令介绍
雷达回波 - 用于获取指定地区的天气雷达回波图像
格式
雷达回波 <地区>
示例
`雷达回波 华南` 获取华南地区的天气雷达回波图
`雷达回波 全国` 获取全国的天气雷达回波图
说明
该指令通过查询中国气象局 https://www.nmc.cn/publish/radar/chinaall.html ,获取指定地区的实时天气雷达回波图像。
支持的地区有:全国 华北 东北 华东 华中 华南 西南 西北。

View File

@ -0,0 +1,104 @@
from curses.ascii import isdigit
from pathlib import Path
import nonebot
import nonebot.adapters
import nonebot.adapters.discord
import nonebot.rule
from nonebot import on_command
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.common.nb.is_admin import is_admin
from konabot.common.path import DOCS_PATH_MAN1, DOCS_PATH_MAN3, DOCS_PATH_MAN7, DOCS_PATH_MAN8
def search_man(section: int) -> dict[tuple[int, str], Path]:
base_path = {
1: DOCS_PATH_MAN1,
3: DOCS_PATH_MAN3,
7: DOCS_PATH_MAN7,
8: DOCS_PATH_MAN8,
}.get(section, DOCS_PATH_MAN1)
res: dict[tuple[int, str], Path] = {}
for fp in base_path.iterdir():
if fp.suffix != '.txt':
continue
name = fp.name.lower().removesuffix('.txt')
res[(section, name)] = fp
return res
man = on_alconna(Alconna(
'man',
Args['section', int | None],
Args['doc', str | None],
))
@man.handle()
async def _(
section: int | None,
doc: str | None,
event: nonebot.adapters.Event,
):
if doc is not None and section is None and all(isdigit(c) for c in doc):
section = int(doc)
doc = None
if section is not None and section not in {1, 3, 7, 8}:
await man.send(
UniMessage().text(f"你所指定的文档类型 {section} 不在可用范围内")
)
return
if doc is None:
# 检索模式
if section is None:
section_set = {1}
else:
section_set = {section}
if 1 in section_set and is_admin(event):
section_set.add(8)
mans: list[str] = []
for section in section_set:
mans += [f"{n}({s})" for s, n in search_man(section).keys()]
mans.sort()
await man.send(UniMessage().text(
(
"★此方 BOT 使用帮助★\n"
"使用 man <指令名> 查询某个指令的名字\n\n"
"可供查询的指令清单:"
)
+ ", ".join(mans)
+ "\n\n例如,使用 man man 来查询 man 指令的使用方法"
))
else:
# 查阅模式
if section is None:
section_set = {1}
else:
section_set = {section}
if 1 in section_set and is_admin(event):
section_set.add(8)
if 8 in section_set and not is_admin(event):
await man.send(UniMessage().text("你没有查看该指令类型的权限"))
return
mans_dict: dict[tuple[int, str], Path] = {}
for section in section_set:
mans_dict: dict[tuple[int, str], Path] = {**mans_dict, **search_man(section)}
mans_dict_2 = {key[1]: val for key, val in mans_dict.items()}
mans_fp = mans_dict_2.get(doc.lower())
if mans_fp is None:
await man.send(UniMessage().text("你所检索的指令不存在"))
return
mans_msg = mans_fp.read_text('utf-8', 'replace')
if isinstance(event, nonebot.adapters.discord.event.MessageEvent):
mans_msg = f'```\n{mans_msg}\n```'
await man.send(UniMessage().text(mans_msg))
help_deprecated = on_command('help', rule=nonebot.rule.to_me())
@help_deprecated.handle()
async def _():
await help_deprecated.send('你可以使用 man 指令来查询此方 BOT 的帮助')

View File

@ -1,10 +1,19 @@
from io import BytesIO
from typing import Iterable, cast
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
UniMessage, UniMsg, on_alconna)
from konabot.plugins.memepack.drawing.geimao import draw_geimao
from konabot.plugins.memepack.drawing.pt import draw_pt
from konabot.common.nb.extract_image import extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
draw_geimao, draw_mnk,
draw_pt, draw_suan)
from nonebot.adapters import Bot, Event
from returns.result import Success, Failure
geimao = on_alconna(Alconna(
"给猫说",
@ -36,3 +45,97 @@ async def _(saying: list[str]):
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna(
"re:小?黑白子?说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写黑白子说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
@mnk.handle()
async def _(saying: list[str]):
img = await draw_mnk("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna(
"小蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写小蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
@suan.handle()
async def _(saying: list[str]):
img = await draw_suan("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna(
"大蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写大蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
@dsuan.handle()
async def _(saying: list[str]):
img = await draw_suan("\n".join(saying), True)
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna(
"乖猫说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写十猫说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"})
@cutecat.handle()
async def _(saying: list[str]):
img = await draw_cute_ten("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await cutecat.send(await UniMessage().image(raw=img_bytes).export())
cao_display_cmd = on_message()
@cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示":
flag = True
elif text.text.strip() == '':
continue
else:
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_cao_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await cao_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(' ')
.text(err)
.export()
)

View File

@ -10,3 +10,4 @@ FontDB.SetDefaultEmojiOptions(EmojiOptions(
HARMONYOS_SANS_SC_BLACK = FontDB.Query("HarmonyOS_Sans_SC_Black")
HARMONYOS_SANS_SC_REGULAR = FontDB.Query("HarmonyOS_Sans_SC_Regular")
LXGWWENKAI_REGULAR = FontDB.Query("LXGWWenKai-Regular")

View File

@ -0,0 +1,45 @@
import asyncio
from typing import Any, cast
import cv2
import numpy as np
import PIL.Image
from konabot.common.path import ASSETS_PATH
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
CAO_QUAD_POINTS = np.float32(cast(Any, [
[392, 540],
[577, 557],
[567, 707],
[381, 687],
]))
def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB"))
h, w = src.shape[:2]
src_points = np.float32(cast(Any, [
[0, 0],
[w, 0],
[w, h],
[0, h]
]))
dst_points = CAO_QUAD_POINTS
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
output_size = cao_image.size
output_w, output_h = output_size
warped = cv2.warpPerspective(
src,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
result = PIL.Image.fromarray(warped, 'RGB').convert('RGBA')
result = PIL.Image.alpha_composite(result, cao_image)
return result
async def draw_cao_display(image: PIL.Image.Image):
return await asyncio.to_thread(_draw_cao_display, image)

View File

@ -1,30 +0,0 @@
import asyncio
from typing import Any, cast
import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from .base.fonts import HARMONYOS_SANS_SC_BLACK
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
def _draw_geimao(saying: str):
img = geimao_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 960, 50, 00.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
0.8,
imagetext_py.TextAlign.Center,
cast(Any, 30.0),
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
draw_emojis=True,
)
return img
async def draw_geimao(saying: str):
return await asyncio.to_thread(_draw_geimao, saying)

View File

@ -1,27 +0,0 @@
import asyncio
import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
def _draw_pt(saying: str):
img = pt_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_pt(saying: str):
return await asyncio.to_thread(_draw_pt, saying)

View File

@ -0,0 +1,108 @@
import asyncio
from typing import Any, cast
import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from .base.fonts import HARMONYOS_SANS_SC_BLACK, HARMONYOS_SANS_SC_REGULAR, LXGWWENKAI_REGULAR
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
mnk_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "mnksay.jpg").convert("RGBA")
dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert("RGBA")
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
cute_ten_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "tententen.png").convert("RGBA")
def _draw_geimao(saying: str):
img = geimao_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 960, 50, 0.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
0.8,
imagetext_py.TextAlign.Center,
cast(Any, 30.0),
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
draw_emojis=True,
)
return img
async def draw_geimao(saying: str):
return await asyncio.to_thread(_draw_geimao, saying)
def _draw_pt(saying: str):
img = pt_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_pt(saying: str):
return await asyncio.to_thread(_draw_pt, saying)
def _draw_mnk(saying: str):
img = mnk_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 540, 25, 0.5, 0, 1080, 120, HARMONYOS_SANS_SC_BLACK,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
0.8,
imagetext_py.TextAlign.Center,
cast(Any, 15.0),
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
draw_emojis=True,
)
return img
async def draw_mnk(saying: str):
return await asyncio.to_thread(_draw_mnk, saying)
def _draw_suan(saying: str, dasuan: bool = False):
if dasuan:
img = dasuan_image.copy()
else:
img = suan_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 1020, 290, 0.5, 0.5, 400, 48, LXGWWENKAI_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_suan(saying: str, dasuan: bool = False):
return await asyncio.to_thread(_draw_suan, saying, dasuan)
def _draw_cute_ten(saying: str):
img = cute_ten_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 390, 479, 0.5, 0.5, 760, 96, LXGWWENKAI_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_cute_ten(saying: str):
return await asyncio.to_thread(_draw_cute_ten, saying)

View File

@ -332,9 +332,10 @@ async def generate_dice_image(number: str) -> BytesIO:
up_direction = (51 - 16, 5 - 30) # 右上角点 - 左上角点
move_distance = (up_direction[0] * (stretch_k - 1), up_direction[1] * (stretch_k - 1))
# 加载背景图像,保留透明通道
background = cv2.imread(ASSETS_PATH / "img" / "dice" / "template.png", cv2.IMREAD_UNCHANGED)
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
assert background is not None
height, width = background.shape[:2]
@ -352,7 +353,7 @@ async def generate_dice_image(number: str) -> BytesIO:
], dtype=np.float32)
corners[:, 0] += offset_x
corners[:, 1] += offset_y
# 对文本图像进行3D变换保持透明通道
transformed_text, transform_matrix = perspective_transform(text_image, background, corners)

View File

@ -45,7 +45,7 @@ class Notify(BaseModel):
class NotifyConfigFile(BaseModel):
version: int = 1
version: int = 2
notifies: list[Notify] = []
unsent: list[Notify] = []
@ -89,13 +89,17 @@ async def notify_now(notify: Notify):
if notify.target_env is None:
await bot.send_private_msg(
user_id=int(notify.target),
message=f"代办通知:{notify.notify_msg}",
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
bot=bot,
)),
)
else:
await bot.send_group_msg(
group_id=int(notify.target_env),
message=cast(Any,
await UniMessage().at(notify.target).text(f" 代办通知:{notify.notify_msg}").export()
await UniMessage().at(
notify.target
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot)
),
)
else:
@ -197,11 +201,15 @@ async def _():
NOTIFIED_FLAG["task_added"] = True
await asyncio.sleep(10)
await DATA_FILE_LOCK.acquire()
tasks = []
cfg = load_notify_config()
for notify in cfg.notifies:
tasks.append(create_notify_task(notify, fail2remove=False))
if cfg.version == 1:
cfg.version = 2
else:
for notify in cfg.notifies:
tasks.append(create_notify_task(notify, fail2remove=False))
DATA_FILE_LOCK.release()
await asyncio.gather(*tasks)

View File

@ -15,14 +15,14 @@ from nonebot_plugin_alconna import (
__plugin_meta__ = PluginMetadata(
name="ytpgif",
description="生成来回镜像翻转的动图:动图按时间分段播放,静态图高频翻转(带参数范围保护)",
usage="/ytpgif [倍速=1.0] 倍速范围0.120.0",
description="生成来回镜像翻转的仿 YTPMV 动图。",
usage="ytpgif [倍速=1.0] 倍速范围0.120.0",
type="application",
config=None,
homepage=None,
)
# 参数定义(带硬性限制)
# 参数定义
BASE_SEGMENT_DURATION = 0.25
BASE_INTERVAL = 0.25
MAX_SIZE = 256
@ -54,12 +54,15 @@ ytpgif_cmd = on_alconna(
async def get_image_url(event: BaseEvent) -> Optional[str]:
"""从事件中提取图片 URL支持直接消息和回复"""
msg = event.get_message()
for seg in msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
if hasattr(event, "reply") and (reply := event.reply):
for seg in reply.message:
reply_msg = reply.message
for seg in reply_msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
return None
@ -74,9 +77,11 @@ async def download_image(url: str) -> bytes:
def resize_frame(frame: Image.Image) -> Image.Image:
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
w, h = frame.size
if w <= MAX_SIZE and h <= MAX_SIZE:
return frame
scale = MAX_SIZE / max(w, h)
new_w = int(w * scale)
new_h = int(h * scale)
@ -96,14 +101,15 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
if not img_url:
await ytpgif_cmd.send(
await UniMessage.text(
"请发送一张图片并使用 /ytpgif或回复一张图片来生成镜像动图。"
"请发送一张图片或回复一张图片来生成镜像动图。"
).export()
)
return
try:
image_data = await download_image(img_url)
except Exception:
except Exception as e:
print(f"[YTPGIF] 下载失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 图片下载失败,请重试。").export()
)
@ -119,22 +125,47 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
output_path = tmp_out.name
with Image.open(input_path) as src_img:
is_animated = getattr(src_img, "is_animated", False) or src_img.n_frames > 1
# === 判断是否为动图 ===
try:
n_frames = getattr(src_img, "n_frames", 1)
is_animated = n_frames > 1
except Exception:
is_animated = False
output_frames = []
output_durations_ms = []
if is_animated:
# === 动图模式:播放两段,每段最多 BASE_SEGMENT_DURATION * speed 秒,且帧数 ≤ 100 ===
# === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration = []
for frame in ImageSequence.Iterator(src_img):
rgb_frame = frame.convert("RGB")
resized_frame = resize_frame(rgb_frame)
palette = src_img.getpalette()
for idx in range(n_frames):
src_img.seek(idx)
frame = src_img.copy()
# 检查是否需要透明通道
has_alpha = (
frame.mode in ("RGBA", "LA")
or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
# 若原图有调色板,尝试保留(可选)
if palette and resized_frame.mode == "P":
try:
resized_frame.putpalette(palette)
except Exception: # noqa
pass
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
dur_sec = max(0.01, ms / 1000.0) # 至少 10ms
dur_sec = max(0.01, ms / 1000.0)
frames_with_duration.append((resized_frame, dur_sec))
max_dur = BASE_SEGMENT_DURATION * speed # 每段最大播放时间
max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0
frame_count = 0
@ -167,10 +198,10 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGB")
raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed)) # 限制在 25ms ~ 2.5s
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
@ -185,16 +216,38 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
)
return
# 保存 GIF
output_frames[0].save(
output_path,
save_all=True,
append_images=output_frames[1:],
format="GIF",
loop=0,
duration=output_durations_ms,
disposal=2,
)
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
need_transparency = False
for frame in output_frames:
if frame.mode == "RGBA":
alpha_channel = frame.getchannel("A")
if any(pix < 255 for pix in alpha_channel.getdata()):
need_transparency = True
break
elif frame.mode == "P" and "transparency" in frame.info:
need_transparency = True
break
# 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency:
output_frames = [f.convert("RGB") for f in output_frames]
# 构建保存参数
save_kwargs = {
"save_all": True,
"append_images": output_frames[1:],
"format": "GIF",
"loop": 0, # 无限循环
"duration": output_durations_ms,
"disposal": 2, # 清除到背景色,避免残留
"optimize": False, # 关闭抖动(等效 -dither none
}
# 只有真正需要透明时才启用 transparency
if need_transparency:
save_kwargs["transparency"] = 0
output_frames[0].save(output_path, **save_kwargs)
# 发送结果
with open(output_path, "rb") as f:
@ -204,9 +257,12 @@ async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
except Exception as e:
print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持文件过大。").export()
await UniMessage.text("❌ 处理失败,可能是图片格式不支持文件损坏或过大。").export()
)
finally:
for path in filter(None, [input_path, output_path]):
if os.path.exists(path):
os.unlink(path)
try:
os.unlink(path)
except: # noqa
pass

21
poetry.lock generated
View File

@ -2460,6 +2460,25 @@ urllib3 = ">=1.21.1,<3"
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "returns"
version = "0.26.0"
description = "Make your functions return something meaningful, typed, and safe!"
optional = false
python-versions = "<4.0,>=3.10"
groups = ["main"]
files = [
{file = "returns-0.26.0-py3-none-any.whl", hash = "sha256:7cae94c730d6c56ffd9d0f583f7a2c0b32cfe17d141837150c8e6cff3eb30d71"},
{file = "returns-0.26.0.tar.gz", hash = "sha256:180320e0f6e9ea9845330ccfc020f542330f05b7250941d9b9b7c00203fcc3da"},
]
[package.dependencies]
typing-extensions = ">=4.0,<5.0"
[package.extras]
check-laws = ["hypothesis (>=6.136,<7.0)", "pytest (>=8.0,<9.0)"]
compatible-mypy = ["mypy (>=1.12,<1.18)"]
[[package]]
name = "rich"
version = "14.1.0"
@ -3162,4 +3181,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<4.0"
content-hash = "673703a789248d0f7369999c364352eb12f8bb5830a8b4b6918f8bab6425a763"
content-hash = "927913b9030d1f6c126bb2d12eab7307dc6297f259c7c62e3033706457d27ce0"

View File

@ -21,6 +21,7 @@ dependencies = [
"pillow (>=11.3.0,<12.0.0)",
"imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)",
]

File diff suppressed because it is too large Load Diff