Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 91687fb8c3 | |||
| f889381cce | |||
| 1256055c9d | |||
| 40f35a474e | |||
| 6b01acfa8c | |||
| 09c9d44798 | |||
| 0c4206f461 | |||
| 9fb8fd90dc | |||
| 8c4fa2b5e4 | |||
| fb2c3f1ce2 | |||
| 265415e727 | |||
| 06555b2225 | |||
| f6fd25a41d | |||
| 9f6c70bf0f | |||
| 1c01e49d5d | |||
| 48c719bc33 | |||
| 6bc9f94e83 |
@ -4,7 +4,7 @@ WORKDIR /app
|
|||||||
COPY requirements.txt ./
|
COPY requirements.txt ./
|
||||||
RUN pip install -r requirements.txt --no-deps
|
RUN pip install -r requirements.txt --no-deps
|
||||||
|
|
||||||
COPY bot.py pyproject.toml ./
|
COPY bot.py pyproject.toml .env.prod .env.test ./
|
||||||
COPY assets ./assets
|
COPY assets ./assets
|
||||||
COPY scripts ./scripts
|
COPY scripts ./scripts
|
||||||
COPY konabot ./konabot
|
COPY konabot ./konabot
|
||||||
|
|||||||
BIN
assets/fonts/LXGWWenKai-Regular.ttf
Normal file
BIN
assets/fonts/LXGWWenKai-Regular.ttf
Normal file
Binary file not shown.
BIN
assets/img/dice/stick.png
Normal file
BIN
assets/img/dice/stick.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 80 KiB |
BIN
assets/img/meme/caoimg1.png
Normal file
BIN
assets/img/meme/caoimg1.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 227 KiB |
BIN
assets/img/meme/dss.png
Normal file
BIN
assets/img/meme/dss.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 172 KiB |
BIN
assets/img/meme/mnksay.jpg
Normal file
BIN
assets/img/meme/mnksay.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 69 KiB |
BIN
assets/img/meme/suanleba.png
Normal file
BIN
assets/img/meme/suanleba.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 364 KiB |
BIN
assets/img/meme/tententen.png
Normal file
BIN
assets/img/meme/tententen.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 614 KiB |
135
konabot/common/nb/extract_image.py
Normal file
135
konabot/common/nb/extract_image.py
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import PIL.Image
|
||||||
|
from loguru import logger
|
||||||
|
from nonebot.adapters import Bot, Event, Message
|
||||||
|
from nonebot.adapters.discord import Bot as DiscordBot
|
||||||
|
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||||
|
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
|
||||||
|
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
|
||||||
|
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||||
|
from PIL import UnidentifiedImageError
|
||||||
|
from returns.result import Failure, Result, Success
|
||||||
|
|
||||||
|
|
||||||
|
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||||
|
# if "/matcha/cache/" in url:
|
||||||
|
# url = url.replace('127.0.0.1', '10.126.126.101')
|
||||||
|
logger.debug(f"开始从 {url} 下载图片")
|
||||||
|
async with httpx.AsyncClient() as c:
|
||||||
|
try:
|
||||||
|
response = await c.get(url)
|
||||||
|
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
|
||||||
|
return Failure(f"HTTPX 模块下载图片时出错:{e}")
|
||||||
|
except httpx.ConnectTimeout:
|
||||||
|
return Failure("下载图片失败了,网络超时了qwq")
|
||||||
|
if response.status_code != 200:
|
||||||
|
return Failure("无法下载图片,可能存在网络问题需要排查")
|
||||||
|
return Success(response.content)
|
||||||
|
|
||||||
|
|
||||||
|
def bytes_to_pil(raw_data: bytes | BytesIO) -> Result[PIL.Image.Image, str]:
|
||||||
|
try:
|
||||||
|
if not isinstance(raw_data, BytesIO):
|
||||||
|
img_pil = PIL.Image.open(BytesIO(raw_data))
|
||||||
|
else:
|
||||||
|
img_pil = PIL.Image.open(raw_data)
|
||||||
|
img_pil.verify()
|
||||||
|
if not isinstance(raw_data, BytesIO):
|
||||||
|
img = PIL.Image.open(BytesIO(raw_data))
|
||||||
|
else:
|
||||||
|
raw_data.seek(0)
|
||||||
|
img = PIL.Image.open(raw_data)
|
||||||
|
return Success(img)
|
||||||
|
except UnidentifiedImageError:
|
||||||
|
return Failure("图像无法读取,可能是格式不支持orz")
|
||||||
|
except IOError:
|
||||||
|
return Failure("图像无法读取,可能是网络存在问题orz")
|
||||||
|
|
||||||
|
|
||||||
|
async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]:
|
||||||
|
if image.url is not None:
|
||||||
|
raw_result = await download_image_bytes(image.url)
|
||||||
|
elif image.raw is not None:
|
||||||
|
raw_result = Success(image.raw)
|
||||||
|
else:
|
||||||
|
return Failure("由于一些内部问题,下载图片失败了orz")
|
||||||
|
|
||||||
|
return raw_result.bind(bytes_to_pil)
|
||||||
|
|
||||||
|
|
||||||
|
async def extract_image_from_qq_message(
|
||||||
|
msg: OnebotV11Message,
|
||||||
|
evt: OnebotV11MessageEvent,
|
||||||
|
bot: OnebotV11Bot,
|
||||||
|
allow_reply: bool = True,
|
||||||
|
) -> Result[PIL.Image.Image, str]:
|
||||||
|
if allow_reply and (reply := evt.reply) is not None:
|
||||||
|
return await extract_image_from_qq_message(
|
||||||
|
reply.message,
|
||||||
|
evt,
|
||||||
|
bot,
|
||||||
|
False,
|
||||||
|
)
|
||||||
|
for seg in msg:
|
||||||
|
if seg.type == "reply" and allow_reply:
|
||||||
|
msgid = seg.data.get("id")
|
||||||
|
if msgid is None:
|
||||||
|
return Failure("消息可能太久远,无法读取到消息原文")
|
||||||
|
try:
|
||||||
|
msg2 = await bot.get_msg(message_id=msgid)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"获取消息内容时出错:{e}")
|
||||||
|
return Failure("消息可能太久远,无法读取到消息原文")
|
||||||
|
msg2_data = msg2.get("message")
|
||||||
|
if msg2_data is None:
|
||||||
|
return Failure("消息可能太久远,无法读取到消息原文")
|
||||||
|
logger.debug("发现消息引用,递归一层")
|
||||||
|
return await extract_image_from_qq_message(
|
||||||
|
msg=OnebotV11Message(msg2_data),
|
||||||
|
evt=evt,
|
||||||
|
bot=bot,
|
||||||
|
allow_reply=False,
|
||||||
|
)
|
||||||
|
if seg.type == "image":
|
||||||
|
url = seg.data.get("url")
|
||||||
|
if url is None:
|
||||||
|
return Failure("无法下载图片,可能有一些网络问题")
|
||||||
|
data = await download_image_bytes(url)
|
||||||
|
return data.bind(bytes_to_pil)
|
||||||
|
|
||||||
|
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||||
|
|
||||||
|
|
||||||
|
async def extract_image_from_message(
|
||||||
|
msg: Message,
|
||||||
|
evt: Event,
|
||||||
|
bot: Bot,
|
||||||
|
allow_reply: bool = True,
|
||||||
|
) -> Result[PIL.Image.Image, str]:
|
||||||
|
if (
|
||||||
|
isinstance(bot, OnebotV11Bot)
|
||||||
|
and isinstance(msg, OnebotV11Message)
|
||||||
|
and isinstance(evt, OnebotV11MessageEvent)
|
||||||
|
):
|
||||||
|
# 看起来 UniMessage 在这方面能力似乎不足,因此用 QQ 的
|
||||||
|
logger.debug('获取图片的路径 Fallback 到 QQ 模块')
|
||||||
|
return await extract_image_from_qq_message(msg, evt, bot, allow_reply)
|
||||||
|
|
||||||
|
for seg in UniMessage.of(msg, bot):
|
||||||
|
logger.info(seg)
|
||||||
|
if isinstance(seg, Image):
|
||||||
|
return await unimsg_img_to_pil(seg)
|
||||||
|
elif isinstance(seg, Reply) and allow_reply:
|
||||||
|
msg2 = seg.msg
|
||||||
|
logger.debug(f"深入搜索引用的消息:{msg2}")
|
||||||
|
if msg2 is None or isinstance(msg2, str):
|
||||||
|
continue
|
||||||
|
return await extract_image_from_message(msg2, evt, bot, False)
|
||||||
|
elif isinstance(seg, RefNode) and allow_reply:
|
||||||
|
if isinstance(bot, DiscordBot):
|
||||||
|
return Failure("暂时不支持在 Discord 中通过引用的方式获取图片")
|
||||||
|
else:
|
||||||
|
return Failure("暂时不支持在这里中通过引用的方式获取图片")
|
||||||
|
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||||
@ -1,10 +1,19 @@
|
|||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
from typing import Iterable, cast
|
||||||
|
|
||||||
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
|
from nonebot import on_message
|
||||||
on_alconna)
|
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
|
||||||
|
UniMessage, UniMsg, on_alconna)
|
||||||
|
|
||||||
from konabot.plugins.memepack.drawing.geimao import draw_geimao
|
from konabot.common.nb.extract_image import extract_image_from_message
|
||||||
from konabot.plugins.memepack.drawing.pt import draw_pt
|
from konabot.plugins.memepack.drawing.display import draw_cao_display
|
||||||
|
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
|
||||||
|
draw_geimao, draw_mnk,
|
||||||
|
draw_pt, draw_suan)
|
||||||
|
|
||||||
|
from nonebot.adapters import Bot, Event
|
||||||
|
|
||||||
|
from returns.result import Success, Failure
|
||||||
|
|
||||||
geimao = on_alconna(Alconna(
|
geimao = on_alconna(Alconna(
|
||||||
"给猫说",
|
"给猫说",
|
||||||
@ -36,3 +45,97 @@ async def _(saying: list[str]):
|
|||||||
img.save(img_bytes, format="PNG")
|
img.save(img_bytes, format="PNG")
|
||||||
|
|
||||||
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
|
mnk = on_alconna(Alconna(
|
||||||
|
"re:小?黑白子?说",
|
||||||
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
|
missing_tips=lambda: "你没有写黑白子说了什么"
|
||||||
|
)]
|
||||||
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
|
||||||
|
|
||||||
|
@mnk.handle()
|
||||||
|
async def _(saying: list[str]):
|
||||||
|
img = await draw_mnk("\n".join(saying))
|
||||||
|
img_bytes = BytesIO()
|
||||||
|
img.save(img_bytes, format="PNG")
|
||||||
|
|
||||||
|
await mnk.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
|
suan = on_alconna(Alconna(
|
||||||
|
"小蒜说",
|
||||||
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
|
missing_tips=lambda: "你没有写小蒜说了什么"
|
||||||
|
)]
|
||||||
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
|
||||||
|
|
||||||
|
@suan.handle()
|
||||||
|
async def _(saying: list[str]):
|
||||||
|
img = await draw_suan("\n".join(saying))
|
||||||
|
img_bytes = BytesIO()
|
||||||
|
img.save(img_bytes, format="PNG")
|
||||||
|
|
||||||
|
await suan.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
|
dsuan = on_alconna(Alconna(
|
||||||
|
"大蒜说",
|
||||||
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
|
missing_tips=lambda: "你没有写大蒜说了什么"
|
||||||
|
)]
|
||||||
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
|
||||||
|
|
||||||
|
@dsuan.handle()
|
||||||
|
async def _(saying: list[str]):
|
||||||
|
img = await draw_suan("\n".join(saying), True)
|
||||||
|
img_bytes = BytesIO()
|
||||||
|
img.save(img_bytes, format="PNG")
|
||||||
|
|
||||||
|
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
|
cutecat = on_alconna(Alconna(
|
||||||
|
"乖猫说",
|
||||||
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
|
missing_tips=lambda: "你没有写十猫说了什么"
|
||||||
|
)]
|
||||||
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"})
|
||||||
|
|
||||||
|
@cutecat.handle()
|
||||||
|
async def _(saying: list[str]):
|
||||||
|
img = await draw_cute_ten("\n".join(saying))
|
||||||
|
img_bytes = BytesIO()
|
||||||
|
img.save(img_bytes, format="PNG")
|
||||||
|
|
||||||
|
await cutecat.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
|
cao_display_cmd = on_message()
|
||||||
|
|
||||||
|
@cao_display_cmd.handle()
|
||||||
|
async def _(msg: UniMsg, evt: Event, bot: Bot):
|
||||||
|
flag = False
|
||||||
|
for text in cast(Iterable[Text], msg.get(Text)):
|
||||||
|
if text.text.strip() == "小槽展示":
|
||||||
|
flag = True
|
||||||
|
elif text.text.strip() == '':
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
if not flag:
|
||||||
|
return
|
||||||
|
match await extract_image_from_message(evt.get_message(), evt, bot):
|
||||||
|
case Success(img):
|
||||||
|
img_handled = await draw_cao_display(img)
|
||||||
|
img_bytes = BytesIO()
|
||||||
|
img_handled.save(img_bytes, format="PNG")
|
||||||
|
await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
case Failure(err):
|
||||||
|
await cao_display_cmd.send(
|
||||||
|
await UniMessage()
|
||||||
|
.at(user_id=evt.get_user_id())
|
||||||
|
.text(' ')
|
||||||
|
.text(err)
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
|||||||
@ -10,3 +10,4 @@ FontDB.SetDefaultEmojiOptions(EmojiOptions(
|
|||||||
|
|
||||||
HARMONYOS_SANS_SC_BLACK = FontDB.Query("HarmonyOS_Sans_SC_Black")
|
HARMONYOS_SANS_SC_BLACK = FontDB.Query("HarmonyOS_Sans_SC_Black")
|
||||||
HARMONYOS_SANS_SC_REGULAR = FontDB.Query("HarmonyOS_Sans_SC_Regular")
|
HARMONYOS_SANS_SC_REGULAR = FontDB.Query("HarmonyOS_Sans_SC_Regular")
|
||||||
|
LXGWWENKAI_REGULAR = FontDB.Query("LXGWWenKai-Regular")
|
||||||
|
|||||||
45
konabot/plugins/memepack/drawing/display.py
Normal file
45
konabot/plugins/memepack/drawing/display.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
import asyncio
|
||||||
|
from typing import Any, cast
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import PIL.Image
|
||||||
|
|
||||||
|
from konabot.common.path import ASSETS_PATH
|
||||||
|
|
||||||
|
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
|
||||||
|
CAO_QUAD_POINTS = np.float32(cast(Any, [
|
||||||
|
[392, 540],
|
||||||
|
[577, 557],
|
||||||
|
[567, 707],
|
||||||
|
[381, 687],
|
||||||
|
]))
|
||||||
|
|
||||||
|
def _draw_cao_display(image: PIL.Image.Image):
|
||||||
|
src = np.array(image.convert("RGB"))
|
||||||
|
h, w = src.shape[:2]
|
||||||
|
src_points = np.float32(cast(Any, [
|
||||||
|
[0, 0],
|
||||||
|
[w, 0],
|
||||||
|
[w, h],
|
||||||
|
[0, h]
|
||||||
|
]))
|
||||||
|
dst_points = CAO_QUAD_POINTS
|
||||||
|
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
|
||||||
|
output_size = cao_image.size
|
||||||
|
output_w, output_h = output_size
|
||||||
|
warped = cv2.warpPerspective(
|
||||||
|
src,
|
||||||
|
M,
|
||||||
|
(output_w, output_h),
|
||||||
|
flags=cv2.INTER_LINEAR,
|
||||||
|
borderMode=cv2.BORDER_CONSTANT,
|
||||||
|
borderValue=(0, 0, 0)
|
||||||
|
)
|
||||||
|
result = PIL.Image.fromarray(warped, 'RGB').convert('RGBA')
|
||||||
|
result = PIL.Image.alpha_composite(result, cao_image)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_cao_display(image: PIL.Image.Image):
|
||||||
|
return await asyncio.to_thread(_draw_cao_display, image)
|
||||||
@ -1,30 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
from typing import Any, cast
|
|
||||||
|
|
||||||
import imagetext_py
|
|
||||||
import PIL.Image
|
|
||||||
|
|
||||||
from konabot.common.path import ASSETS_PATH
|
|
||||||
|
|
||||||
from .base.fonts import HARMONYOS_SANS_SC_BLACK
|
|
||||||
|
|
||||||
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
|
|
||||||
|
|
||||||
|
|
||||||
def _draw_geimao(saying: str):
|
|
||||||
img = geimao_image.copy()
|
|
||||||
with imagetext_py.Writer(img) as iw:
|
|
||||||
iw.draw_text_wrapped(
|
|
||||||
saying, 960, 50, 00.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
|
|
||||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
|
||||||
0.8,
|
|
||||||
imagetext_py.TextAlign.Center,
|
|
||||||
cast(Any, 30.0),
|
|
||||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
|
|
||||||
draw_emojis=True,
|
|
||||||
)
|
|
||||||
return img
|
|
||||||
|
|
||||||
|
|
||||||
async def draw_geimao(saying: str):
|
|
||||||
return await asyncio.to_thread(_draw_geimao, saying)
|
|
||||||
@ -1,27 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
|
|
||||||
import imagetext_py
|
|
||||||
import PIL.Image
|
|
||||||
|
|
||||||
from konabot.common.path import ASSETS_PATH
|
|
||||||
|
|
||||||
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
|
|
||||||
|
|
||||||
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
|
|
||||||
|
|
||||||
|
|
||||||
def _draw_pt(saying: str):
|
|
||||||
img = pt_image.copy()
|
|
||||||
with imagetext_py.Writer(img) as iw:
|
|
||||||
iw.draw_text_wrapped(
|
|
||||||
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
|
|
||||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
|
||||||
1.0,
|
|
||||||
imagetext_py.TextAlign.Center,
|
|
||||||
draw_emojis=True,
|
|
||||||
)
|
|
||||||
return img
|
|
||||||
|
|
||||||
|
|
||||||
async def draw_pt(saying: str):
|
|
||||||
return await asyncio.to_thread(_draw_pt, saying)
|
|
||||||
108
konabot/plugins/memepack/drawing/saying.py
Normal file
108
konabot/plugins/memepack/drawing/saying.py
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
import asyncio
|
||||||
|
from typing import Any, cast
|
||||||
|
|
||||||
|
import imagetext_py
|
||||||
|
import PIL.Image
|
||||||
|
|
||||||
|
from konabot.common.path import ASSETS_PATH
|
||||||
|
|
||||||
|
from .base.fonts import HARMONYOS_SANS_SC_BLACK, HARMONYOS_SANS_SC_REGULAR, LXGWWENKAI_REGULAR
|
||||||
|
|
||||||
|
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
|
||||||
|
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
|
||||||
|
mnk_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "mnksay.jpg").convert("RGBA")
|
||||||
|
dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert("RGBA")
|
||||||
|
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
|
||||||
|
cute_ten_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "tententen.png").convert("RGBA")
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_geimao(saying: str):
|
||||||
|
img = geimao_image.copy()
|
||||||
|
with imagetext_py.Writer(img) as iw:
|
||||||
|
iw.draw_text_wrapped(
|
||||||
|
saying, 960, 50, 0.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||||
|
0.8,
|
||||||
|
imagetext_py.TextAlign.Center,
|
||||||
|
cast(Any, 30.0),
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
|
||||||
|
draw_emojis=True,
|
||||||
|
)
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_geimao(saying: str):
|
||||||
|
return await asyncio.to_thread(_draw_geimao, saying)
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_pt(saying: str):
|
||||||
|
img = pt_image.copy()
|
||||||
|
with imagetext_py.Writer(img) as iw:
|
||||||
|
iw.draw_text_wrapped(
|
||||||
|
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||||
|
1.0,
|
||||||
|
imagetext_py.TextAlign.Center,
|
||||||
|
draw_emojis=True,
|
||||||
|
)
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_pt(saying: str):
|
||||||
|
return await asyncio.to_thread(_draw_pt, saying)
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_mnk(saying: str):
|
||||||
|
img = mnk_image.copy()
|
||||||
|
with imagetext_py.Writer(img) as iw:
|
||||||
|
iw.draw_text_wrapped(
|
||||||
|
saying, 540, 25, 0.5, 0, 1080, 120, HARMONYOS_SANS_SC_BLACK,
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||||
|
0.8,
|
||||||
|
imagetext_py.TextAlign.Center,
|
||||||
|
cast(Any, 15.0),
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
|
||||||
|
draw_emojis=True,
|
||||||
|
)
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_mnk(saying: str):
|
||||||
|
return await asyncio.to_thread(_draw_mnk, saying)
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_suan(saying: str, dasuan: bool = False):
|
||||||
|
if dasuan:
|
||||||
|
img = dasuan_image.copy()
|
||||||
|
else:
|
||||||
|
img = suan_image.copy()
|
||||||
|
with imagetext_py.Writer(img) as iw:
|
||||||
|
iw.draw_text_wrapped(
|
||||||
|
saying, 1020, 290, 0.5, 0.5, 400, 48, LXGWWENKAI_REGULAR,
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||||
|
1.0,
|
||||||
|
imagetext_py.TextAlign.Center,
|
||||||
|
draw_emojis=True,
|
||||||
|
)
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_suan(saying: str, dasuan: bool = False):
|
||||||
|
return await asyncio.to_thread(_draw_suan, saying, dasuan)
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_cute_ten(saying: str):
|
||||||
|
img = cute_ten_image.copy()
|
||||||
|
with imagetext_py.Writer(img) as iw:
|
||||||
|
iw.draw_text_wrapped(
|
||||||
|
saying, 390, 479, 0.5, 0.5, 760, 96, LXGWWENKAI_REGULAR,
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||||
|
1.0,
|
||||||
|
imagetext_py.TextAlign.Center,
|
||||||
|
draw_emojis=True,
|
||||||
|
)
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_cute_ten(saying: str):
|
||||||
|
return await asyncio.to_thread(_draw_cute_ten, saying)
|
||||||
@ -1,11 +1,11 @@
|
|||||||
from typing import Optional
|
from typing import Optional, Union
|
||||||
from nonebot.adapters import Event as BaseEvent
|
from nonebot.adapters import Event as BaseEvent
|
||||||
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
||||||
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||||
|
|
||||||
from konabot.plugins.roll_dice.roll_dice import generate_dice_image
|
from konabot.plugins.roll_dice.roll_dice import generate_dice_image
|
||||||
from konabot.plugins.roll_dice.roll_number import get_random_number, roll_number
|
from konabot.plugins.roll_dice.roll_number import get_random_number, get_random_number_string, roll_number
|
||||||
|
|
||||||
evt = on_alconna(Alconna(
|
evt = on_alconna(Alconna(
|
||||||
"摇数字"
|
"摇数字"
|
||||||
@ -22,21 +22,26 @@ async def _(event: BaseEvent):
|
|||||||
|
|
||||||
evt = on_alconna(Alconna(
|
evt = on_alconna(Alconna(
|
||||||
"摇骰子",
|
"摇骰子",
|
||||||
Args["f1?", int]["f2?", int]
|
Args["f1?", str]["f2?", str]
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||||||
|
|
||||||
@evt.handle()
|
@evt.handle()
|
||||||
async def _(event: BaseEvent, f1: Optional[int] = None, f2: Optional[int] = None):
|
async def _(event: BaseEvent, f1: Optional[str] = None, f2: Optional[str] = None):
|
||||||
# if isinstance(event, DiscordMessageEvent):
|
# if isinstance(event, DiscordMessageEvent):
|
||||||
# await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export())
|
# await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export())
|
||||||
# elif isinstance(event, ConsoleMessageEvent):
|
# elif isinstance(event, ConsoleMessageEvent):
|
||||||
number = 0
|
number = ""
|
||||||
if(f1 is not None and f2 is not None):
|
if(f1 is not None and f2 is not None):
|
||||||
number = get_random_number(f1, f2)
|
number = get_random_number_string(f1, f2)
|
||||||
elif f1 is not None:
|
elif f1 is not None:
|
||||||
number = get_random_number(1, f1)
|
if(float(f1) > 1):
|
||||||
|
number = get_random_number_string("1", f1)
|
||||||
|
elif (float(f1) > 0):
|
||||||
|
number = get_random_number_string("0", f1)
|
||||||
|
else:
|
||||||
|
number = get_random_number_string(f1, "0")
|
||||||
else:
|
else:
|
||||||
number = get_random_number()
|
number = get_random_number_string()
|
||||||
await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export())
|
await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export())
|
||||||
# else:
|
# else:
|
||||||
# await evt.send(await UniMessage().text(roll_dice(wide=True)).export())
|
# await evt.send(await UniMessage().text(roll_dice(wide=True)).export())
|
||||||
|
|||||||
@ -152,27 +152,208 @@ def precise_blend_with_perspective(background, foreground, corners):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def generate_dice_image(number: int) -> BytesIO:
|
def draw_line_bresenham(image, x0, y0, x1, y1, color):
|
||||||
|
"""使用Bresenham算法画线,避免间隙"""
|
||||||
|
dx = abs(x1 - x0)
|
||||||
|
dy = abs(y1 - y0)
|
||||||
|
sx = 1 if x0 < x1 else -1
|
||||||
|
sy = 1 if y0 < y1 else -1
|
||||||
|
err = dx - dy
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if 0 <= x0 < image.shape[1] and 0 <= y0 < image.shape[0]:
|
||||||
|
image[y0, x0] = color
|
||||||
|
|
||||||
|
if x0 == x1 and y0 == y1:
|
||||||
|
break
|
||||||
|
|
||||||
|
e2 = 2 * err
|
||||||
|
if e2 > -dy:
|
||||||
|
err -= dy
|
||||||
|
x0 += sx
|
||||||
|
if e2 < dx:
|
||||||
|
err += dx
|
||||||
|
y0 += sy
|
||||||
|
|
||||||
|
def slice_and_stretch(image, slice_lines, direction):
|
||||||
|
'''
|
||||||
|
image: 图像
|
||||||
|
slice_lines: 切割线(两个点的列表),一般是倾斜45度的直线
|
||||||
|
direction: 移动方向向量(二元数组)
|
||||||
|
'''
|
||||||
|
# 获取图片的尺寸
|
||||||
|
height, width = image.shape[:2]
|
||||||
|
# 创建一个由移动方向扩充后,更大的图片
|
||||||
|
new_width = int(width + abs(direction[0]))
|
||||||
|
new_height = int(height + abs(direction[1]))
|
||||||
|
new_image = np.zeros((new_height, new_width, 4), dtype=image.dtype)
|
||||||
|
# 先把图片放在新图的和方向相反的一侧
|
||||||
|
offset_x = int(abs(min(0, direction[0])))
|
||||||
|
offset_y = int(abs(min(0, direction[1])))
|
||||||
|
new_image[offset_y:offset_y+height, offset_x:offset_x+width] = image
|
||||||
|
# 切割线也跟着偏移
|
||||||
|
slice_lines = [(x + offset_x, y + offset_y) for (x, y) in slice_lines]
|
||||||
|
# 复制切割线经过的像素,沿着方向移动,实现类似拖尾的效果
|
||||||
|
apply_trail_effect_vectorized(new_image, slice_lines, direction)
|
||||||
|
apply_stroke_vectorized(new_image, slice_lines, direction)
|
||||||
|
|
||||||
|
|
||||||
|
return new_image, offset_x, offset_y
|
||||||
|
|
||||||
|
def apply_trail_effect_vectorized(new_image, slice_lines, direction):
|
||||||
|
"""向量化实现拖尾效果"""
|
||||||
|
height, width = new_image.shape[:2]
|
||||||
|
|
||||||
|
# 创建坐标网格
|
||||||
|
y_coords, x_coords = np.mgrid[0:height, 0:width]
|
||||||
|
|
||||||
|
# 向量化计算点到直线的距离
|
||||||
|
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
|
||||||
|
slice_lines[1][1] - slice_lines[0][1]])
|
||||||
|
point_vecs = np.stack([x_coords - slice_lines[0][0],
|
||||||
|
y_coords - slice_lines[0][1]], axis=-1)
|
||||||
|
|
||||||
|
# 计算叉积(有向距离)
|
||||||
|
cross_products = (line_vec[0] * point_vecs[:, :, 1] -
|
||||||
|
line_vec[1] * point_vecs[:, :, 0])
|
||||||
|
|
||||||
|
# 选择直线右侧的像素 (d1 > 0)
|
||||||
|
mask = cross_products > 0
|
||||||
|
|
||||||
|
# 计算目标位置
|
||||||
|
target_x = (x_coords + direction[0]).astype(int)
|
||||||
|
target_y = (y_coords + direction[1]).astype(int)
|
||||||
|
|
||||||
|
# 创建有效位置掩码
|
||||||
|
valid_mask = mask & (target_x >= 0) & (target_x < width) & \
|
||||||
|
(target_y >= 0) & (target_y < height)
|
||||||
|
|
||||||
|
# 批量复制像素
|
||||||
|
new_image[target_y[valid_mask], target_x[valid_mask]] = \
|
||||||
|
new_image[y_coords[valid_mask], x_coords[valid_mask]]
|
||||||
|
|
||||||
|
def apply_stroke_vectorized(new_image, slice_lines, direction):
|
||||||
|
"""使用向量化操作优化笔画效果"""
|
||||||
|
height, width = new_image.shape[:2]
|
||||||
|
|
||||||
|
# 1. 找到所有非透明像素
|
||||||
|
non_transparent = np.where(new_image[:, :, 3] > 0)
|
||||||
|
if len(non_transparent[0]) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
y_coords, x_coords = non_transparent
|
||||||
|
|
||||||
|
# 2. 向量化计算点到直线的距离
|
||||||
|
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
|
||||||
|
slice_lines[1][1] - slice_lines[0][1]])
|
||||||
|
point_vecs = np.column_stack([x_coords - slice_lines[0][0],
|
||||||
|
y_coords - slice_lines[0][1]])
|
||||||
|
|
||||||
|
# 计算叉积(距离)
|
||||||
|
cross_products = (line_vec[0] * point_vecs[:, 1] -
|
||||||
|
line_vec[1] * point_vecs[:, 0])
|
||||||
|
|
||||||
|
# 3. 选择靠近直线的像素
|
||||||
|
mask = np.abs(cross_products) < 1.0
|
||||||
|
selected_y = y_coords[mask]
|
||||||
|
selected_x = x_coords[mask]
|
||||||
|
selected_pixels = new_image[selected_y, selected_x]
|
||||||
|
|
||||||
|
if len(selected_x) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 4. 预计算采样点
|
||||||
|
length = np.sqrt(direction[0]**2 + direction[1]**2)
|
||||||
|
if length == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 创建采样偏移
|
||||||
|
dx_dy = np.array([(dx, dy) for dx in [-0.5, 0, 0.5]
|
||||||
|
for dy in [-0.5, 0, 0.5]])
|
||||||
|
|
||||||
|
# 5. 批量计算目标位置
|
||||||
|
steps = max(1, int(length * 2))
|
||||||
|
alpha = 0.7
|
||||||
|
|
||||||
|
for k in range(1, steps + 1):
|
||||||
|
# 对所有选中的像素批量计算新位置
|
||||||
|
scale = k / steps
|
||||||
|
|
||||||
|
# 为每个像素和每个采样点计算目标位置
|
||||||
|
for dx, dy in dx_dy:
|
||||||
|
target_x = np.round(selected_x + dx + direction[0] * scale).astype(int)
|
||||||
|
target_y = np.round(selected_y + dy + direction[1] * scale).astype(int)
|
||||||
|
|
||||||
|
# 创建有效位置掩码
|
||||||
|
valid_mask = (target_x >= 0) & (target_x < width) & \
|
||||||
|
(target_y >= 0) & (target_y < height)
|
||||||
|
|
||||||
|
if np.any(valid_mask):
|
||||||
|
valid_target_x = target_x[valid_mask]
|
||||||
|
valid_target_y = target_y[valid_mask]
|
||||||
|
valid_source_idx = np.where(valid_mask)[0]
|
||||||
|
|
||||||
|
# 批量混合像素
|
||||||
|
source_pixels = selected_pixels[valid_source_idx]
|
||||||
|
target_pixels = new_image[valid_target_y, valid_target_x]
|
||||||
|
|
||||||
|
new_image[valid_target_y, valid_target_x] = (
|
||||||
|
alpha * source_pixels + (1 - alpha) * target_pixels
|
||||||
|
)
|
||||||
|
|
||||||
|
async def generate_dice_image(number: str) -> BytesIO:
|
||||||
# 将文本转换为带透明背景的图像
|
# 将文本转换为带透明背景的图像
|
||||||
text = str(number)
|
text = number
|
||||||
|
|
||||||
|
# 如果文本太长,直接返回金箍棒
|
||||||
|
if(len(text) > 50):
|
||||||
|
output = BytesIO()
|
||||||
|
push_image = Image.open(ASSETS_PATH / "img" / "dice" / "stick.png")
|
||||||
|
push_image.save(output,format='PNG')
|
||||||
|
output.seek(0)
|
||||||
|
return output
|
||||||
|
|
||||||
text_image = text_to_transparent_image(
|
text_image = text_to_transparent_image(
|
||||||
text,
|
text,
|
||||||
font_size=60,
|
font_size=60,
|
||||||
text_color=(0, 0, 0) # 黑色文字
|
text_color=(0, 0, 0) # 黑色文字
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# 获取长宽比
|
||||||
|
height, width = text_image.shape[:2]
|
||||||
|
aspect_ratio = width / height
|
||||||
|
|
||||||
|
# 根据长宽比设置拉伸系数
|
||||||
|
stretch_k = 1
|
||||||
|
if aspect_ratio > 1:
|
||||||
|
stretch_k = aspect_ratio
|
||||||
|
|
||||||
|
# 骰子的方向
|
||||||
|
up_direction = (51 - 16, 5 - 30) # 右上角点 - 左上角点
|
||||||
|
|
||||||
|
move_distance = (up_direction[0] * (stretch_k - 1), up_direction[1] * (stretch_k - 1))
|
||||||
|
|
||||||
|
# 加载背景图像,保留透明通道
|
||||||
|
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
|
||||||
|
assert background is not None
|
||||||
|
|
||||||
|
height, width = background.shape[:2]
|
||||||
|
|
||||||
|
background, offset_x, offset_y = slice_and_stretch(background,
|
||||||
|
[(10,10),(0,0)],
|
||||||
|
move_distance)
|
||||||
|
|
||||||
# 定义3D变换的四个角点(透视效果)
|
# 定义3D变换的四个角点(透视效果)
|
||||||
# 顺序: [左上, 右上, 右下, 左下]
|
# 顺序: [左上, 右上, 右下, 左下]
|
||||||
corners = np.array([
|
corners = np.array([
|
||||||
[16, 30], # 左上
|
[16, 30], # 左上
|
||||||
[51, 5], # 右上(上移,创建透视)
|
[51 + move_distance[0], 5 + move_distance[1]], # 右上(上移,创建透视)
|
||||||
[88, 33], # 右下
|
[88 + move_distance[0], 33 + move_distance[1]], # 右下
|
||||||
[49, 62] # 左下(下移)
|
[49, 62] # 左下(下移)
|
||||||
], dtype=np.float32)
|
], dtype=np.float32)
|
||||||
|
corners[:, 0] += offset_x
|
||||||
|
corners[:, 1] += offset_y
|
||||||
|
|
||||||
# 加载背景图像,保留透明通道
|
|
||||||
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
|
|
||||||
|
|
||||||
|
|
||||||
# 对文本图像进行3D变换(保持透明通道)
|
# 对文本图像进行3D变换(保持透明通道)
|
||||||
transformed_text, transform_matrix = perspective_transform(text_image, background, corners)
|
transformed_text, transform_matrix = perspective_transform(text_image, background, corners)
|
||||||
@ -186,6 +367,26 @@ async def generate_dice_image(number: int) -> BytesIO:
|
|||||||
images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)]
|
images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)]
|
||||||
images.append(pil_final)
|
images.append(pil_final)
|
||||||
frame_durations = [100] * (len(images) - 1) + [100000]
|
frame_durations = [100] * (len(images) - 1) + [100000]
|
||||||
|
# 将导入的图像尺寸扩展为和 pil_final 相同的大小,随帧数进行扩展,然后不放大的情况下放在最中间
|
||||||
|
if(aspect_ratio > 1):
|
||||||
|
target_size = pil_final.size
|
||||||
|
for i in range(len(images) - 1):
|
||||||
|
k = i / (len(images) - 1)
|
||||||
|
now_distance = (move_distance[0] * k, move_distance[1] * k)
|
||||||
|
img = np.array(images[i])
|
||||||
|
img, _, _ = slice_and_stretch(img,
|
||||||
|
[(10,10),(0,0)],
|
||||||
|
now_distance)
|
||||||
|
# 只扩展边界,图像本身不放大
|
||||||
|
img_width, img_height = img.shape[1], img.shape[0]
|
||||||
|
new_img = Image.new("RGBA", target_size, (0, 0, 0, 0))
|
||||||
|
this_offset_x = (target_size[0] - img_width) // 2
|
||||||
|
this_offset_y = (target_size[1] - img_height) // 2
|
||||||
|
# new_img.paste(img, (this_offset_x, this_offset_y))
|
||||||
|
new_img.paste(Image.fromarray(img), (this_offset_x, this_offset_y))
|
||||||
|
images[i] = new_img
|
||||||
|
|
||||||
|
|
||||||
# 保存为BytesIO对象
|
# 保存为BytesIO对象
|
||||||
output = BytesIO()
|
output = BytesIO()
|
||||||
images[0].save(output,
|
images[0].save(output,
|
||||||
@ -194,4 +395,6 @@ async def generate_dice_image(number: int) -> BytesIO:
|
|||||||
duration=frame_durations,
|
duration=frame_durations,
|
||||||
format='GIF',
|
format='GIF',
|
||||||
loop=1)
|
loop=1)
|
||||||
|
output.seek(0)
|
||||||
|
# pil_final.save(output, format='PNG')
|
||||||
return output
|
return output
|
||||||
@ -42,6 +42,24 @@ def get_random_number(min: int = 1, max: int = 6) -> int:
|
|||||||
import random
|
import random
|
||||||
return random.randint(min, max)
|
return random.randint(min, max)
|
||||||
|
|
||||||
|
def get_random_number_string(min_value: str = "1", max_value: str = "6") -> str:
|
||||||
|
import random
|
||||||
|
|
||||||
|
# 先判断二者是不是整数
|
||||||
|
if (float(min_value).is_integer()
|
||||||
|
and float(max_value).is_integer()
|
||||||
|
and "." not in min_value
|
||||||
|
and "." not in max_value):
|
||||||
|
return str(random.randint(int(float(min_value)), int(float(max_value))))
|
||||||
|
|
||||||
|
# 根据传入小数的位数,决定保留几位小数
|
||||||
|
if "." in str(min_value) or "." in str(max_value):
|
||||||
|
decimal_places = max(len(str(min_value).split(".")[1]) if "." in str(min_value) else 0,
|
||||||
|
len(str(max_value).split(".")[1]) if "." in str(max_value) else 0)
|
||||||
|
return str(round(random.uniform(float(min_value), float(max_value)), decimal_places))
|
||||||
|
|
||||||
|
# 如果没有小数点,很可能二者都是指数表示或均为 inf,直接返回随机小数
|
||||||
|
return str(random.uniform(float(min_value), float(max_value)))
|
||||||
def roll_number(wide: bool = False) -> str:
|
def roll_number(wide: bool = False) -> str:
|
||||||
raw = number_arts[get_random_number()]
|
raw = number_arts[get_random_number()]
|
||||||
if wide:
|
if wide:
|
||||||
|
|||||||
@ -45,7 +45,7 @@ class Notify(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class NotifyConfigFile(BaseModel):
|
class NotifyConfigFile(BaseModel):
|
||||||
version: int = 1
|
version: int = 2
|
||||||
notifies: list[Notify] = []
|
notifies: list[Notify] = []
|
||||||
unsent: list[Notify] = []
|
unsent: list[Notify] = []
|
||||||
|
|
||||||
@ -89,13 +89,17 @@ async def notify_now(notify: Notify):
|
|||||||
if notify.target_env is None:
|
if notify.target_env is None:
|
||||||
await bot.send_private_msg(
|
await bot.send_private_msg(
|
||||||
user_id=int(notify.target),
|
user_id=int(notify.target),
|
||||||
message=f"代办通知:{notify.notify_msg}",
|
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
|
||||||
|
bot=bot,
|
||||||
|
)),
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await bot.send_group_msg(
|
await bot.send_group_msg(
|
||||||
group_id=int(notify.target_env),
|
group_id=int(notify.target_env),
|
||||||
message=cast(Any,
|
message=cast(Any,
|
||||||
await UniMessage().at(notify.target).text(f" 代办通知:{notify.notify_msg}").export()
|
await UniMessage().at(
|
||||||
|
notify.target
|
||||||
|
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot)
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
@ -185,14 +189,27 @@ async def _(msg: UniMsg, mEvt: Event):
|
|||||||
|
|
||||||
driver = nonebot.get_driver()
|
driver = nonebot.get_driver()
|
||||||
|
|
||||||
|
NOTIFIED_FLAG = {
|
||||||
|
"task_added": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@driver.on_bot_connect
|
@driver.on_bot_connect
|
||||||
async def _():
|
async def _():
|
||||||
|
if NOTIFIED_FLAG["task_added"]:
|
||||||
|
return
|
||||||
|
|
||||||
|
NOTIFIED_FLAG["task_added"] = True
|
||||||
|
|
||||||
|
await asyncio.sleep(10)
|
||||||
await DATA_FILE_LOCK.acquire()
|
await DATA_FILE_LOCK.acquire()
|
||||||
tasks = []
|
tasks = []
|
||||||
cfg = load_notify_config()
|
cfg = load_notify_config()
|
||||||
for notify in cfg.notifies:
|
if cfg.version == 1:
|
||||||
tasks.append(create_notify_task(notify, fail2remove=False))
|
cfg.version = 2
|
||||||
|
else:
|
||||||
|
for notify in cfg.notifies:
|
||||||
|
tasks.append(create_notify_task(notify, fail2remove=False))
|
||||||
DATA_FILE_LOCK.release()
|
DATA_FILE_LOCK.release()
|
||||||
|
|
||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
|
|||||||
268
konabot/plugins/ytpgif/__init__.py
Normal file
268
konabot/plugins/ytpgif/__init__.py
Normal file
@ -0,0 +1,268 @@
|
|||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from PIL import Image, ImageSequence
|
||||||
|
from nonebot.adapters import Event as BaseEvent
|
||||||
|
from nonebot.plugin import PluginMetadata
|
||||||
|
from nonebot_plugin_alconna import (
|
||||||
|
Alconna,
|
||||||
|
Args,
|
||||||
|
Field,
|
||||||
|
UniMessage,
|
||||||
|
on_alconna,
|
||||||
|
)
|
||||||
|
|
||||||
|
__plugin_meta__ = PluginMetadata(
|
||||||
|
name="ytpgif",
|
||||||
|
description="生成来回镜像翻转的仿 YTPMV 动图。",
|
||||||
|
usage="ytpgif [倍速=1.0] (倍速范围:0.1~20.0)",
|
||||||
|
type="application",
|
||||||
|
config=None,
|
||||||
|
homepage=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 参数定义
|
||||||
|
BASE_SEGMENT_DURATION = 0.25
|
||||||
|
BASE_INTERVAL = 0.25
|
||||||
|
MAX_SIZE = 256
|
||||||
|
MIN_SPEED = 0.1
|
||||||
|
MAX_SPEED = 20.0
|
||||||
|
MAX_FRAMES_PER_SEGMENT = 500
|
||||||
|
|
||||||
|
# 提示语
|
||||||
|
SPEED_TIPS = f"倍速必须是 {MIN_SPEED} 到 {MAX_SPEED} 之间的数字"
|
||||||
|
|
||||||
|
|
||||||
|
# 定义命令 + 参数校验
|
||||||
|
ytpgif_cmd = on_alconna(
|
||||||
|
Alconna(
|
||||||
|
"ytpgif",
|
||||||
|
Args[
|
||||||
|
"speed?",
|
||||||
|
float,
|
||||||
|
Field(
|
||||||
|
default=1.0,
|
||||||
|
unmatch_tips=lambda x: f"“{x}”不是有效数值。{SPEED_TIPS}",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_image_url(event: BaseEvent) -> Optional[str]:
|
||||||
|
"""从事件中提取图片 URL,支持直接消息和回复"""
|
||||||
|
msg = event.get_message()
|
||||||
|
for seg in msg:
|
||||||
|
if seg.type == "image" and seg.data.get("url"):
|
||||||
|
return str(seg.data["url"])
|
||||||
|
|
||||||
|
if hasattr(event, "reply") and (reply := event.reply):
|
||||||
|
reply_msg = reply.message
|
||||||
|
for seg in reply_msg:
|
||||||
|
if seg.type == "image" and seg.data.get("url"):
|
||||||
|
return str(seg.data["url"])
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def download_image(url: str) -> bytes:
|
||||||
|
import httpx
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
resp = await client.get(url, timeout=10)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.content
|
||||||
|
|
||||||
|
|
||||||
|
def resize_frame(frame: Image.Image) -> Image.Image:
|
||||||
|
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
|
||||||
|
w, h = frame.size
|
||||||
|
if w <= MAX_SIZE and h <= MAX_SIZE:
|
||||||
|
return frame
|
||||||
|
|
||||||
|
scale = MAX_SIZE / max(w, h)
|
||||||
|
new_w = int(w * scale)
|
||||||
|
new_h = int(h * scale)
|
||||||
|
return frame.resize((new_w, new_h), Image.Resampling.LANCZOS)
|
||||||
|
|
||||||
|
|
||||||
|
@ytpgif_cmd.handle()
|
||||||
|
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
|
||||||
|
# === 校验 speed 范围 ===
|
||||||
|
if not (MIN_SPEED <= speed <= MAX_SPEED):
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text(f"❌ {SPEED_TIPS}").export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
img_url = await get_image_url(event)
|
||||||
|
if not img_url:
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text(
|
||||||
|
"请发送一张图片或回复一张图片来生成镜像动图。"
|
||||||
|
).export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
image_data = await download_image(img_url)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[YTPGIF] 下载失败: {e}")
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text("❌ 图片下载失败,请重试。").export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
input_path = output_path = None
|
||||||
|
try:
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
|
||||||
|
tmp_in.write(image_data)
|
||||||
|
input_path = tmp_in.name
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out:
|
||||||
|
output_path = tmp_out.name
|
||||||
|
|
||||||
|
with Image.open(input_path) as src_img:
|
||||||
|
# === 判断是否为动图 ===
|
||||||
|
try:
|
||||||
|
n_frames = getattr(src_img, "n_frames", 1)
|
||||||
|
is_animated = n_frames > 1
|
||||||
|
except Exception:
|
||||||
|
is_animated = False
|
||||||
|
|
||||||
|
output_frames = []
|
||||||
|
output_durations_ms = []
|
||||||
|
|
||||||
|
if is_animated:
|
||||||
|
# === 动图模式:截取正向 + 镜像两段 ===
|
||||||
|
frames_with_duration = []
|
||||||
|
palette = src_img.getpalette()
|
||||||
|
|
||||||
|
for idx in range(n_frames):
|
||||||
|
src_img.seek(idx)
|
||||||
|
frame = src_img.copy()
|
||||||
|
# 检查是否需要透明通道
|
||||||
|
has_alpha = (
|
||||||
|
frame.mode in ("RGBA", "LA")
|
||||||
|
or (frame.mode == "P" and "transparency" in frame.info)
|
||||||
|
)
|
||||||
|
if has_alpha:
|
||||||
|
frame = frame.convert("RGBA")
|
||||||
|
else:
|
||||||
|
frame = frame.convert("RGB")
|
||||||
|
resized_frame = resize_frame(frame)
|
||||||
|
|
||||||
|
# 若原图有调色板,尝试保留(可选)
|
||||||
|
if palette and resized_frame.mode == "P":
|
||||||
|
try:
|
||||||
|
resized_frame.putpalette(palette)
|
||||||
|
except Exception: # noqa
|
||||||
|
pass
|
||||||
|
|
||||||
|
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
|
||||||
|
dur_sec = max(0.01, ms / 1000.0)
|
||||||
|
frames_with_duration.append((resized_frame, dur_sec))
|
||||||
|
|
||||||
|
max_dur = BASE_SEGMENT_DURATION * speed
|
||||||
|
accumulated = 0.0
|
||||||
|
frame_count = 0
|
||||||
|
|
||||||
|
# 正向段
|
||||||
|
for img, dur in frames_with_duration:
|
||||||
|
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||||
|
break
|
||||||
|
output_frames.append(img)
|
||||||
|
output_durations_ms.append(int(dur * 1000))
|
||||||
|
accumulated += dur
|
||||||
|
frame_count += 1
|
||||||
|
|
||||||
|
if frame_count == 0:
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# 镜像段(从头开始)
|
||||||
|
accumulated = 0.0
|
||||||
|
frame_count = 0
|
||||||
|
for img, dur in frames_with_duration:
|
||||||
|
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||||
|
break
|
||||||
|
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
|
||||||
|
output_frames.append(flipped)
|
||||||
|
output_durations_ms.append(int(dur * 1000))
|
||||||
|
accumulated += dur
|
||||||
|
frame_count += 1
|
||||||
|
|
||||||
|
else:
|
||||||
|
# === 静态图模式:制作翻转动画 ===
|
||||||
|
raw_frame = src_img.convert("RGBA")
|
||||||
|
resized_frame = resize_frame(raw_frame)
|
||||||
|
|
||||||
|
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
|
||||||
|
duration_ms = int(interval_sec * 1000)
|
||||||
|
|
||||||
|
frame1 = resized_frame
|
||||||
|
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
|
||||||
|
|
||||||
|
output_frames = [frame1, frame2]
|
||||||
|
output_durations_ms = [duration_ms, duration_ms]
|
||||||
|
|
||||||
|
if len(output_frames) < 1:
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text("未能生成任何帧。").export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
|
||||||
|
need_transparency = False
|
||||||
|
for frame in output_frames:
|
||||||
|
if frame.mode == "RGBA":
|
||||||
|
alpha_channel = frame.getchannel("A")
|
||||||
|
if any(pix < 255 for pix in alpha_channel.getdata()):
|
||||||
|
need_transparency = True
|
||||||
|
break
|
||||||
|
elif frame.mode == "P" and "transparency" in frame.info:
|
||||||
|
need_transparency = True
|
||||||
|
break
|
||||||
|
|
||||||
|
# 如果不需要透明,则统一转为 RGB 避免调色板污染
|
||||||
|
if not need_transparency:
|
||||||
|
output_frames = [f.convert("RGB") for f in output_frames]
|
||||||
|
|
||||||
|
# 构建保存参数
|
||||||
|
save_kwargs = {
|
||||||
|
"save_all": True,
|
||||||
|
"append_images": output_frames[1:],
|
||||||
|
"format": "GIF",
|
||||||
|
"loop": 0, # 无限循环
|
||||||
|
"duration": output_durations_ms,
|
||||||
|
"disposal": 2, # 清除到背景色,避免残留
|
||||||
|
"optimize": False, # 关闭抖动(等效 -dither none)
|
||||||
|
}
|
||||||
|
|
||||||
|
# 只有真正需要透明时才启用 transparency
|
||||||
|
if need_transparency:
|
||||||
|
save_kwargs["transparency"] = 0
|
||||||
|
|
||||||
|
output_frames[0].save(output_path, **save_kwargs)
|
||||||
|
|
||||||
|
# 发送结果
|
||||||
|
with open(output_path, "rb") as f:
|
||||||
|
result_image = UniMessage.image(raw=f.read())
|
||||||
|
await ytpgif_cmd.send(await result_image.export())
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[YTPGIF] 处理失败: {e}")
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
for path in filter(None, [input_path, output_path]):
|
||||||
|
if os.path.exists(path):
|
||||||
|
try:
|
||||||
|
os.unlink(path)
|
||||||
|
except: # noqa
|
||||||
|
pass
|
||||||
21
poetry.lock
generated
21
poetry.lock
generated
@ -2460,6 +2460,25 @@ urllib3 = ">=1.21.1,<3"
|
|||||||
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
|
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
|
||||||
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
|
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "returns"
|
||||||
|
version = "0.26.0"
|
||||||
|
description = "Make your functions return something meaningful, typed, and safe!"
|
||||||
|
optional = false
|
||||||
|
python-versions = "<4.0,>=3.10"
|
||||||
|
groups = ["main"]
|
||||||
|
files = [
|
||||||
|
{file = "returns-0.26.0-py3-none-any.whl", hash = "sha256:7cae94c730d6c56ffd9d0f583f7a2c0b32cfe17d141837150c8e6cff3eb30d71"},
|
||||||
|
{file = "returns-0.26.0.tar.gz", hash = "sha256:180320e0f6e9ea9845330ccfc020f542330f05b7250941d9b9b7c00203fcc3da"},
|
||||||
|
]
|
||||||
|
|
||||||
|
[package.dependencies]
|
||||||
|
typing-extensions = ">=4.0,<5.0"
|
||||||
|
|
||||||
|
[package.extras]
|
||||||
|
check-laws = ["hypothesis (>=6.136,<7.0)", "pytest (>=8.0,<9.0)"]
|
||||||
|
compatible-mypy = ["mypy (>=1.12,<1.18)"]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rich"
|
name = "rich"
|
||||||
version = "14.1.0"
|
version = "14.1.0"
|
||||||
@ -3162,4 +3181,4 @@ type = ["pytest-mypy"]
|
|||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.1"
|
lock-version = "2.1"
|
||||||
python-versions = ">=3.12,<4.0"
|
python-versions = ">=3.12,<4.0"
|
||||||
content-hash = "673703a789248d0f7369999c364352eb12f8bb5830a8b4b6918f8bab6425a763"
|
content-hash = "927913b9030d1f6c126bb2d12eab7307dc6297f259c7c62e3033706457d27ce0"
|
||||||
|
|||||||
@ -21,6 +21,7 @@ dependencies = [
|
|||||||
"pillow (>=11.3.0,<12.0.0)",
|
"pillow (>=11.3.0,<12.0.0)",
|
||||||
"imagetext-py (>=2.2.0,<3.0.0)",
|
"imagetext-py (>=2.2.0,<3.0.0)",
|
||||||
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
|
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
|
||||||
|
"returns (>=0.26.0,<0.27.0)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
2076
requirements.txt
2076
requirements.txt
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user