Compare commits

...

7 Commits

Author SHA1 Message Date
265415e727 Merge pull request 'feat: ytpgif' (#16) from tnot/konabot:feat--ytpgif into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #16
2025-09-30 22:38:15 +08:00
06555b2225 feat: ytpgif 2025-09-30 22:24:16 +08:00
f6fd25a41d fix: 添加 Notify 任务创建限制
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 19:08:30 +08:00
9f6c70bf0f 合并后的部分骰子路径修复
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 15:21:53 +08:00
1c01e49d5d Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-09-30 15:18:16 +08:00
48c719bc33 全新骰子 2025-09-30 15:13:59 +08:00
6bc9f94e83 拷贝环境变量
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 02:45:01 +08:00
7 changed files with 462 additions and 16 deletions

View File

@ -4,7 +4,7 @@ WORKDIR /app
COPY requirements.txt ./ COPY requirements.txt ./
RUN pip install -r requirements.txt --no-deps RUN pip install -r requirements.txt --no-deps
COPY bot.py pyproject.toml ./ COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets COPY assets ./assets
COPY scripts ./scripts COPY scripts ./scripts
COPY konabot ./konabot COPY konabot ./konabot

BIN
assets/img/dice/stick.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

View File

@ -1,11 +1,11 @@
from typing import Optional from typing import Optional, Union
from nonebot.adapters import Event as BaseEvent from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.plugins.roll_dice.roll_dice import generate_dice_image from konabot.plugins.roll_dice.roll_dice import generate_dice_image
from konabot.plugins.roll_dice.roll_number import get_random_number, roll_number from konabot.plugins.roll_dice.roll_number import get_random_number, get_random_number_string, roll_number
evt = on_alconna(Alconna( evt = on_alconna(Alconna(
"摇数字" "摇数字"
@ -22,21 +22,26 @@ async def _(event: BaseEvent):
evt = on_alconna(Alconna( evt = on_alconna(Alconna(
"摇骰子", "摇骰子",
Args["f1?", int]["f2?", int] Args["f1?", str]["f2?", str]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True) ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle() @evt.handle()
async def _(event: BaseEvent, f1: Optional[int] = None, f2: Optional[int] = None): async def _(event: BaseEvent, f1: Optional[str] = None, f2: Optional[str] = None):
# if isinstance(event, DiscordMessageEvent): # if isinstance(event, DiscordMessageEvent):
# await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export()) # await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export())
# elif isinstance(event, ConsoleMessageEvent): # elif isinstance(event, ConsoleMessageEvent):
number = 0 number = ""
if(f1 is not None and f2 is not None): if(f1 is not None and f2 is not None):
number = get_random_number(f1, f2) number = get_random_number_string(f1, f2)
elif f1 is not None: elif f1 is not None:
number = get_random_number(1, f1) if(float(f1) > 1):
number = get_random_number_string("1", f1)
elif (float(f1) > 0):
number = get_random_number_string("0", f1)
else:
number = get_random_number_string(f1, "0")
else: else:
number = get_random_number() number = get_random_number_string()
await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export()) await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export())
# else: # else:
# await evt.send(await UniMessage().text(roll_dice(wide=True)).export()) # await evt.send(await UniMessage().text(roll_dice(wide=True)).export())

View File

@ -152,26 +152,206 @@ def precise_blend_with_perspective(background, foreground, corners):
return result return result
async def generate_dice_image(number: int) -> BytesIO: def draw_line_bresenham(image, x0, y0, x1, y1, color):
"""使用Bresenham算法画线避免间隙"""
dx = abs(x1 - x0)
dy = abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx - dy
while True:
if 0 <= x0 < image.shape[1] and 0 <= y0 < image.shape[0]:
image[y0, x0] = color
if x0 == x1 and y0 == y1:
break
e2 = 2 * err
if e2 > -dy:
err -= dy
x0 += sx
if e2 < dx:
err += dx
y0 += sy
def slice_and_stretch(image, slice_lines, direction):
'''
image: 图像
slice_lines: 切割线两个点的列表一般是倾斜45度的直线
direction: 移动方向向量(二元数组)
'''
# 获取图片的尺寸
height, width = image.shape[:2]
# 创建一个由移动方向扩充后,更大的图片
new_width = int(width + abs(direction[0]))
new_height = int(height + abs(direction[1]))
new_image = np.zeros((new_height, new_width, 4), dtype=image.dtype)
# 先把图片放在新图的和方向相反的一侧
offset_x = int(abs(min(0, direction[0])))
offset_y = int(abs(min(0, direction[1])))
new_image[offset_y:offset_y+height, offset_x:offset_x+width] = image
# 切割线也跟着偏移
slice_lines = [(x + offset_x, y + offset_y) for (x, y) in slice_lines]
# 复制切割线经过的像素,沿着方向移动,实现类似拖尾的效果
apply_trail_effect_vectorized(new_image, slice_lines, direction)
apply_stroke_vectorized(new_image, slice_lines, direction)
return new_image, offset_x, offset_y
def apply_trail_effect_vectorized(new_image, slice_lines, direction):
"""向量化实现拖尾效果"""
height, width = new_image.shape[:2]
# 创建坐标网格
y_coords, x_coords = np.mgrid[0:height, 0:width]
# 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]], axis=-1)
# 计算叉积(有向距离)
cross_products = (line_vec[0] * point_vecs[:, :, 1] -
line_vec[1] * point_vecs[:, :, 0])
# 选择直线右侧的像素 (d1 > 0)
mask = cross_products > 0
# 计算目标位置
target_x = (x_coords + direction[0]).astype(int)
target_y = (y_coords + direction[1]).astype(int)
# 创建有效位置掩码
valid_mask = mask & (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
# 批量复制像素
new_image[target_y[valid_mask], target_x[valid_mask]] = \
new_image[y_coords[valid_mask], x_coords[valid_mask]]
def apply_stroke_vectorized(new_image, slice_lines, direction):
"""使用向量化操作优化笔画效果"""
height, width = new_image.shape[:2]
# 1. 找到所有非透明像素
non_transparent = np.where(new_image[:, :, 3] > 0)
if len(non_transparent[0]) == 0:
return
y_coords, x_coords = non_transparent
# 2. 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.column_stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]])
# 计算叉积(距离)
cross_products = (line_vec[0] * point_vecs[:, 1] -
line_vec[1] * point_vecs[:, 0])
# 3. 选择靠近直线的像素
mask = np.abs(cross_products) < 1.0
selected_y = y_coords[mask]
selected_x = x_coords[mask]
selected_pixels = new_image[selected_y, selected_x]
if len(selected_x) == 0:
return
# 4. 预计算采样点
length = np.sqrt(direction[0]**2 + direction[1]**2)
if length == 0:
return
# 创建采样偏移
dx_dy = np.array([(dx, dy) for dx in [-0.5, 0, 0.5]
for dy in [-0.5, 0, 0.5]])
# 5. 批量计算目标位置
steps = max(1, int(length * 2))
alpha = 0.7
for k in range(1, steps + 1):
# 对所有选中的像素批量计算新位置
scale = k / steps
# 为每个像素和每个采样点计算目标位置
for dx, dy in dx_dy:
target_x = np.round(selected_x + dx + direction[0] * scale).astype(int)
target_y = np.round(selected_y + dy + direction[1] * scale).astype(int)
# 创建有效位置掩码
valid_mask = (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
if np.any(valid_mask):
valid_target_x = target_x[valid_mask]
valid_target_y = target_y[valid_mask]
valid_source_idx = np.where(valid_mask)[0]
# 批量混合像素
source_pixels = selected_pixels[valid_source_idx]
target_pixels = new_image[valid_target_y, valid_target_x]
new_image[valid_target_y, valid_target_x] = (
alpha * source_pixels + (1 - alpha) * target_pixels
)
async def generate_dice_image(number: str) -> BytesIO:
# 将文本转换为带透明背景的图像 # 将文本转换为带透明背景的图像
text = str(number) text = number
# 如果文本太长,直接返回金箍棒
if(len(text) > 50):
output = BytesIO()
push_image = Image.open(ASSETS_PATH / "img" / "dice" / "stick.png")
push_image.save(output,format='PNG')
output.seek(0)
return output
text_image = text_to_transparent_image( text_image = text_to_transparent_image(
text, text,
font_size=60, font_size=60,
text_color=(0, 0, 0) # 黑色文字 text_color=(0, 0, 0) # 黑色文字
) )
# 获取长宽比
height, width = text_image.shape[:2]
aspect_ratio = width / height
# 根据长宽比设置拉伸系数
stretch_k = 1
if aspect_ratio > 1:
stretch_k = aspect_ratio
# 骰子的方向
up_direction = (51 - 16, 5 - 30) # 右上角点 - 左上角点
move_distance = (up_direction[0] * (stretch_k - 1), up_direction[1] * (stretch_k - 1))
# 加载背景图像,保留透明通道
background = cv2.imread(ASSETS_PATH / "img" / "dice" / "template.png", cv2.IMREAD_UNCHANGED)
height, width = background.shape[:2]
background, offset_x, offset_y = slice_and_stretch(background,
[(10,10),(0,0)],
move_distance)
# 定义3D变换的四个角点透视效果 # 定义3D变换的四个角点透视效果
# 顺序: [左上, 右上, 右下, 左下] # 顺序: [左上, 右上, 右下, 左下]
corners = np.array([ corners = np.array([
[16, 30], # 左上 [16, 30], # 左上
[51, 5], # 右上(上移,创建透视) [51 + move_distance[0], 5 + move_distance[1]], # 右上(上移,创建透视)
[88, 33], # 右下 [88 + move_distance[0], 33 + move_distance[1]], # 右下
[49, 62] # 左下(下移) [49, 62] # 左下(下移)
], dtype=np.float32) ], dtype=np.float32)
corners[:, 0] += offset_x
# 加载背景图像,保留透明通道 corners[:, 1] += offset_y
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
# 对文本图像进行3D变换保持透明通道 # 对文本图像进行3D变换保持透明通道
@ -186,6 +366,26 @@ async def generate_dice_image(number: int) -> BytesIO:
images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)] images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)]
images.append(pil_final) images.append(pil_final)
frame_durations = [100] * (len(images) - 1) + [100000] frame_durations = [100] * (len(images) - 1) + [100000]
# 将导入的图像尺寸扩展为和 pil_final 相同的大小,随帧数进行扩展,然后不放大的情况下放在最中间
if(aspect_ratio > 1):
target_size = pil_final.size
for i in range(len(images) - 1):
k = i / (len(images) - 1)
now_distance = (move_distance[0] * k, move_distance[1] * k)
img = np.array(images[i])
img, _, _ = slice_and_stretch(img,
[(10,10),(0,0)],
now_distance)
# 只扩展边界,图像本身不放大
img_width, img_height = img.shape[1], img.shape[0]
new_img = Image.new("RGBA", target_size, (0, 0, 0, 0))
this_offset_x = (target_size[0] - img_width) // 2
this_offset_y = (target_size[1] - img_height) // 2
# new_img.paste(img, (this_offset_x, this_offset_y))
new_img.paste(Image.fromarray(img), (this_offset_x, this_offset_y))
images[i] = new_img
# 保存为BytesIO对象 # 保存为BytesIO对象
output = BytesIO() output = BytesIO()
images[0].save(output, images[0].save(output,
@ -194,4 +394,6 @@ async def generate_dice_image(number: int) -> BytesIO:
duration=frame_durations, duration=frame_durations,
format='GIF', format='GIF',
loop=1) loop=1)
output.seek(0)
# pil_final.save(output, format='PNG')
return output return output

View File

@ -42,6 +42,24 @@ def get_random_number(min: int = 1, max: int = 6) -> int:
import random import random
return random.randint(min, max) return random.randint(min, max)
def get_random_number_string(min_value: str = "1", max_value: str = "6") -> str:
import random
# 先判断二者是不是整数
if (float(min_value).is_integer()
and float(max_value).is_integer()
and "." not in min_value
and "." not in max_value):
return str(random.randint(int(float(min_value)), int(float(max_value))))
# 根据传入小数的位数,决定保留几位小数
if "." in str(min_value) or "." in str(max_value):
decimal_places = max(len(str(min_value).split(".")[1]) if "." in str(min_value) else 0,
len(str(max_value).split(".")[1]) if "." in str(max_value) else 0)
return str(round(random.uniform(float(min_value), float(max_value)), decimal_places))
# 如果没有小数点,很可能二者都是指数表示或均为 inf直接返回随机小数
return str(random.uniform(float(min_value), float(max_value)))
def roll_number(wide: bool = False) -> str: def roll_number(wide: bool = False) -> str:
raw = number_arts[get_random_number()] raw = number_arts[get_random_number()]
if wide: if wide:

View File

@ -185,9 +185,18 @@ async def _(msg: UniMsg, mEvt: Event):
driver = nonebot.get_driver() driver = nonebot.get_driver()
NOTIFIED_FLAG = {
"task_added": False,
}
@driver.on_bot_connect @driver.on_bot_connect
async def _(): async def _():
if NOTIFIED_FLAG["task_added"]:
return
NOTIFIED_FLAG["task_added"] = True
await DATA_FILE_LOCK.acquire() await DATA_FILE_LOCK.acquire()
tasks = [] tasks = []
cfg = load_notify_config() cfg = load_notify_config()

View File

@ -0,0 +1,212 @@
import os
import tempfile
from typing import Optional
from PIL import Image, ImageSequence
from nonebot.adapters import Event as BaseEvent
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
Args,
Field,
UniMessage,
on_alconna,
)
__plugin_meta__ = PluginMetadata(
name="ytpgif",
description="生成来回镜像翻转的动图:动图按时间分段播放,静态图高频翻转(带参数范围保护)",
usage="/ytpgif [倍速=1.0] 倍速范围0.120.0",
type="application",
config=None,
homepage=None,
)
# 参数定义(带硬性限制)
BASE_SEGMENT_DURATION = 0.25
BASE_INTERVAL = 0.25
MAX_SIZE = 256
MIN_SPEED = 0.1
MAX_SPEED = 20.0
MAX_FRAMES_PER_SEGMENT = 500
# 提示语
SPEED_TIPS = f"倍速必须是 {MIN_SPEED}{MAX_SPEED} 之间的数字"
# 定义命令 + 参数校验
ytpgif_cmd = on_alconna(
Alconna(
"ytpgif",
Args[
"speed?",
float,
Field(
default=1.0,
unmatch_tips=lambda x: f"{x}”不是有效数值。{SPEED_TIPS}",
),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
)
async def get_image_url(event: BaseEvent) -> Optional[str]:
msg = event.get_message()
for seg in msg:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
if hasattr(event, "reply") and (reply := event.reply):
for seg in reply.message:
if seg.type == "image" and seg.data.get("url"):
return str(seg.data["url"])
return None
async def download_image(url: str) -> bytes:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=10)
resp.raise_for_status()
return resp.content
def resize_frame(frame: Image.Image) -> Image.Image:
w, h = frame.size
if w <= MAX_SIZE and h <= MAX_SIZE:
return frame
scale = MAX_SIZE / max(w, h)
new_w = int(w * scale)
new_h = int(h * scale)
return frame.resize((new_w, new_h), Image.Resampling.LANCZOS)
@ytpgif_cmd.handle()
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
# === 校验 speed 范围 ===
if not (MIN_SPEED <= speed <= MAX_SPEED):
await ytpgif_cmd.send(
await UniMessage.text(f"{SPEED_TIPS}").export()
)
return
img_url = await get_image_url(event)
if not img_url:
await ytpgif_cmd.send(
await UniMessage.text(
"请发送一张图片并使用 /ytpgif或回复一张图片来生成镜像动图。"
).export()
)
return
try:
image_data = await download_image(img_url)
except Exception:
await ytpgif_cmd.send(
await UniMessage.text("❌ 图片下载失败,请重试。").export()
)
return
input_path = output_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
tmp_in.write(image_data)
input_path = tmp_in.name
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out:
output_path = tmp_out.name
with Image.open(input_path) as src_img:
is_animated = getattr(src_img, "is_animated", False) or src_img.n_frames > 1
output_frames = []
output_durations_ms = []
if is_animated:
# === 动图模式:播放两段,每段最多 BASE_SEGMENT_DURATION * speed 秒,且帧数 ≤ 100 ===
frames_with_duration = []
for frame in ImageSequence.Iterator(src_img):
rgb_frame = frame.convert("RGB")
resized_frame = resize_frame(rgb_frame)
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
dur_sec = max(0.01, ms / 1000.0) # 至少 10ms
frames_with_duration.append((resized_frame, dur_sec))
max_dur = BASE_SEGMENT_DURATION * speed # 每段最大播放时间
accumulated = 0.0
frame_count = 0
# 正向段
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
output_frames.append(img)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
if frame_count == 0:
await ytpgif_cmd.send(
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
)
return
# 镜像段(从头开始)
accumulated = 0.0
frame_count = 0
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
output_frames.append(flipped)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGB")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed)) # 限制在 25ms ~ 2.5s
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
output_frames = [frame1, frame2]
output_durations_ms = [duration_ms, duration_ms]
if len(output_frames) < 1:
await ytpgif_cmd.send(
await UniMessage.text("未能生成任何帧。").export()
)
return
# 保存 GIF
output_frames[0].save(
output_path,
save_all=True,
append_images=output_frames[1:],
format="GIF",
loop=0,
duration=output_durations_ms,
disposal=2,
)
# 发送结果
with open(output_path, "rb") as f:
result_image = UniMessage.image(raw=f.read())
await ytpgif_cmd.send(await result_image.export())
except Exception as e:
print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持或文件过大。").export()
)
finally:
for path in filter(None, [input_path, output_path]):
if os.path.exists(path):
os.unlink(path)