Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 09c9d44798 | |||
| 0c4206f461 | |||
| 9fb8fd90dc | |||
| 8c4fa2b5e4 | |||
| fb2c3f1ce2 | |||
| 265415e727 | |||
| 06555b2225 | |||
| f6fd25a41d | |||
| 9f6c70bf0f | |||
| 1c01e49d5d | |||
| 48c719bc33 | |||
| 6bc9f94e83 | |||
| deab2d7b2b | |||
| 2a6abbe0d4 | |||
| 30bdc50024 | |||
| be8b1b9999 | |||
| 43d0a09de2 | |||
| 6e0082c1c9 | |||
| 3b8b060c5b | |||
| 8cfe58c7dd | |||
| f997bf945a | |||
| 0dbe164703 | |||
| 818f2b64ec | |||
| a855c69f61 | |||
| 90ee296f55 | |||
| 915f186955 |
@ -1,4 +1,5 @@
|
|||||||
/.env
|
/.env
|
||||||
/.git
|
/.git
|
||||||
|
/data
|
||||||
|
|
||||||
__pycache__
|
__pycache__
|
||||||
@ -26,6 +26,14 @@ steps:
|
|||||||
volumes:
|
volumes:
|
||||||
- name: docker-socket
|
- name: docker-socket
|
||||||
path: /var/run/docker.sock
|
path: /var/run/docker.sock
|
||||||
|
- name: 在容器中测试插件加载
|
||||||
|
image: docker:dind
|
||||||
|
privileged: true
|
||||||
|
volumes:
|
||||||
|
- name: docker-socket
|
||||||
|
path: /var/run/docker.sock
|
||||||
|
commands:
|
||||||
|
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
- name: docker-socket
|
- name: docker-socket
|
||||||
|
|||||||
4
.env.test
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
ENVIRONMENT=test
|
||||||
|
ENABLE_CONSOLE=false
|
||||||
|
ENABLE_QQ=false
|
||||||
|
ENABLE_DISCORD=false
|
||||||
1
.gitignore
vendored
@ -1,3 +1,4 @@
|
|||||||
/.env
|
/.env
|
||||||
|
/data
|
||||||
|
|
||||||
__pycache__
|
__pycache__
|
||||||
@ -4,5 +4,11 @@ WORKDIR /app
|
|||||||
COPY requirements.txt ./
|
COPY requirements.txt ./
|
||||||
RUN pip install -r requirements.txt --no-deps
|
RUN pip install -r requirements.txt --no-deps
|
||||||
|
|
||||||
COPY . .
|
COPY bot.py pyproject.toml .env.prod .env.test ./
|
||||||
|
COPY assets ./assets
|
||||||
|
COPY scripts ./scripts
|
||||||
|
COPY konabot ./konabot
|
||||||
|
|
||||||
|
ENV PYTHONPATH=/app
|
||||||
|
|
||||||
CMD [ "python", "bot.py" ]
|
CMD [ "python", "bot.py" ]
|
||||||
|
|||||||
BIN
assets/fonts/LXGWWenKai-Regular.ttf
Normal file
|
Before Width: | Height: | Size: 9.2 KiB After Width: | Height: | Size: 9.2 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 8.7 KiB After Width: | Height: | Size: 8.7 KiB |
|
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 11 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 8.7 KiB After Width: | Height: | Size: 8.7 KiB |
|
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 11 KiB |
|
Before Width: | Height: | Size: 9.3 KiB After Width: | Height: | Size: 9.3 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 8.7 KiB After Width: | Height: | Size: 8.7 KiB |
|
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 11 KiB |
|
Before Width: | Height: | Size: 9.3 KiB After Width: | Height: | Size: 9.3 KiB |
BIN
assets/img/dice/stick.png
Normal file
|
After Width: | Height: | Size: 80 KiB |
|
Before Width: | Height: | Size: 9.0 KiB After Width: | Height: | Size: 9.0 KiB |
BIN
assets/img/meme/dss.png
Normal file
|
After Width: | Height: | Size: 172 KiB |
|
Before Width: | Height: | Size: 219 KiB After Width: | Height: | Size: 219 KiB |
BIN
assets/img/meme/mnksay.jpg
Normal file
|
After Width: | Height: | Size: 69 KiB |
|
Before Width: | Height: | Size: 272 KiB After Width: | Height: | Size: 272 KiB |
BIN
assets/img/meme/suanleba.png
Normal file
|
After Width: | Height: | Size: 364 KiB |
0
konabot/__init__.py
Normal file
4
konabot/common/path.py
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
ASSETS_PATH = Path(__file__).resolve().parent.parent.parent / "assets"
|
||||||
|
FONTS_PATH = ASSETS_PATH / "fonts"
|
||||||
@ -3,8 +3,7 @@ from io import BytesIO
|
|||||||
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
|
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
|
||||||
on_alconna)
|
on_alconna)
|
||||||
|
|
||||||
from konabot.plugins.memepack.drawing.geimao import draw_geimao
|
from konabot.plugins.memepack.drawing.saying import draw_geimao, draw_mnk, draw_pt, draw_suan
|
||||||
from konabot.plugins.memepack.drawing.pt import draw_pt
|
|
||||||
|
|
||||||
geimao = on_alconna(Alconna(
|
geimao = on_alconna(Alconna(
|
||||||
"给猫说",
|
"给猫说",
|
||||||
@ -36,3 +35,51 @@ async def _(saying: list[str]):
|
|||||||
img.save(img_bytes, format="PNG")
|
img.save(img_bytes, format="PNG")
|
||||||
|
|
||||||
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
|
mnk = on_alconna(Alconna(
|
||||||
|
"re:小?黑白子?说",
|
||||||
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
|
missing_tips=lambda: "你没有写黑白子说了什么"
|
||||||
|
)]
|
||||||
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
|
||||||
|
|
||||||
|
@mnk.handle()
|
||||||
|
async def _(saying: list[str]):
|
||||||
|
img = await draw_mnk("\n".join(saying))
|
||||||
|
img_bytes = BytesIO()
|
||||||
|
img.save(img_bytes, format="PNG")
|
||||||
|
|
||||||
|
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
|
suan = on_alconna(Alconna(
|
||||||
|
"小蒜说",
|
||||||
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
|
missing_tips=lambda: "你没有写小蒜说了什么"
|
||||||
|
)]
|
||||||
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
|
||||||
|
|
||||||
|
@suan.handle()
|
||||||
|
async def _(saying: list[str]):
|
||||||
|
img = await draw_suan("\n".join(saying))
|
||||||
|
img_bytes = BytesIO()
|
||||||
|
img.save(img_bytes, format="PNG")
|
||||||
|
|
||||||
|
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
|
dsuan = on_alconna(Alconna(
|
||||||
|
"大蒜说",
|
||||||
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
|
missing_tips=lambda: "你没有写大蒜说了什么"
|
||||||
|
)]
|
||||||
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
|
||||||
|
|
||||||
|
@dsuan.handle()
|
||||||
|
async def _(saying: list[str]):
|
||||||
|
img = await draw_suan("\n".join(saying), True)
|
||||||
|
img_bytes = BytesIO()
|
||||||
|
img.save(img_bytes, format="PNG")
|
||||||
|
|
||||||
|
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|||||||
@ -1,8 +1,8 @@
|
|||||||
from imagetext_py import EmojiOptions, FontDB
|
from imagetext_py import EmojiOptions, FontDB
|
||||||
|
|
||||||
from .path import ASSETS
|
from konabot.common.path import FONTS_PATH
|
||||||
|
|
||||||
FontDB.LoadFromDir(str(ASSETS))
|
FontDB.LoadFromDir(str(FONTS_PATH))
|
||||||
|
|
||||||
FontDB.SetDefaultEmojiOptions(EmojiOptions(
|
FontDB.SetDefaultEmojiOptions(EmojiOptions(
|
||||||
parse_shortcodes=False,
|
parse_shortcodes=False,
|
||||||
@ -10,3 +10,4 @@ FontDB.SetDefaultEmojiOptions(EmojiOptions(
|
|||||||
|
|
||||||
HARMONYOS_SANS_SC_BLACK = FontDB.Query("HarmonyOS_Sans_SC_Black")
|
HARMONYOS_SANS_SC_BLACK = FontDB.Query("HarmonyOS_Sans_SC_Black")
|
||||||
HARMONYOS_SANS_SC_REGULAR = FontDB.Query("HarmonyOS_Sans_SC_Regular")
|
HARMONYOS_SANS_SC_REGULAR = FontDB.Query("HarmonyOS_Sans_SC_Regular")
|
||||||
|
LXGWWENKAI_REGULAR = FontDB.Query("LXGWWenKai-Regular")
|
||||||
|
|||||||
@ -1,3 +0,0 @@
|
|||||||
from pathlib import Path
|
|
||||||
|
|
||||||
ASSETS = Path(__file__).parent.parent.parent / "assets"
|
|
||||||
@ -1,29 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
from typing import Any, cast
|
|
||||||
|
|
||||||
import imagetext_py
|
|
||||||
import PIL.Image
|
|
||||||
|
|
||||||
from .base.fonts import HARMONYOS_SANS_SC_BLACK
|
|
||||||
from .base.path import ASSETS
|
|
||||||
|
|
||||||
geimao_image = PIL.Image.open(ASSETS / "geimao.jpg").convert("RGBA")
|
|
||||||
|
|
||||||
|
|
||||||
def _draw_geimao(saying: str):
|
|
||||||
img = geimao_image.copy()
|
|
||||||
with imagetext_py.Writer(img) as iw:
|
|
||||||
iw.draw_text_wrapped(
|
|
||||||
saying, 960, 50, 00.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
|
|
||||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
|
||||||
0.8,
|
|
||||||
imagetext_py.TextAlign.Center,
|
|
||||||
cast(Any, 30.0),
|
|
||||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
|
|
||||||
draw_emojis=True,
|
|
||||||
)
|
|
||||||
return img
|
|
||||||
|
|
||||||
|
|
||||||
async def draw_geimao(saying: str):
|
|
||||||
return await asyncio.to_thread(_draw_geimao, saying)
|
|
||||||
@ -1,26 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
|
|
||||||
import imagetext_py
|
|
||||||
import PIL.Image
|
|
||||||
|
|
||||||
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
|
|
||||||
from .base.path import ASSETS
|
|
||||||
|
|
||||||
pt_image = PIL.Image.open(ASSETS / "ptsay.png").convert("RGBA")
|
|
||||||
|
|
||||||
|
|
||||||
def _draw_pt(saying: str):
|
|
||||||
img = pt_image.copy()
|
|
||||||
with imagetext_py.Writer(img) as iw:
|
|
||||||
iw.draw_text_wrapped(
|
|
||||||
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
|
|
||||||
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
|
||||||
1.0,
|
|
||||||
imagetext_py.TextAlign.Center,
|
|
||||||
draw_emojis=True,
|
|
||||||
)
|
|
||||||
return img
|
|
||||||
|
|
||||||
|
|
||||||
async def draw_pt(saying: str):
|
|
||||||
return await asyncio.to_thread(_draw_pt, saying)
|
|
||||||
90
konabot/plugins/memepack/drawing/saying.py
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
import asyncio
|
||||||
|
from typing import Any, cast
|
||||||
|
|
||||||
|
import imagetext_py
|
||||||
|
import PIL.Image
|
||||||
|
|
||||||
|
from konabot.common.path import ASSETS_PATH
|
||||||
|
|
||||||
|
from .base.fonts import HARMONYOS_SANS_SC_BLACK, HARMONYOS_SANS_SC_REGULAR, LXGWWENKAI_REGULAR
|
||||||
|
|
||||||
|
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
|
||||||
|
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
|
||||||
|
mnk_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "mnksay.jpg").convert("RGBA")
|
||||||
|
dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert("RGBA")
|
||||||
|
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_geimao(saying: str):
|
||||||
|
img = geimao_image.copy()
|
||||||
|
with imagetext_py.Writer(img) as iw:
|
||||||
|
iw.draw_text_wrapped(
|
||||||
|
saying, 960, 50, 0.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||||
|
0.8,
|
||||||
|
imagetext_py.TextAlign.Center,
|
||||||
|
cast(Any, 30.0),
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
|
||||||
|
draw_emojis=True,
|
||||||
|
)
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_geimao(saying: str):
|
||||||
|
return await asyncio.to_thread(_draw_geimao, saying)
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_pt(saying: str):
|
||||||
|
img = pt_image.copy()
|
||||||
|
with imagetext_py.Writer(img) as iw:
|
||||||
|
iw.draw_text_wrapped(
|
||||||
|
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||||
|
1.0,
|
||||||
|
imagetext_py.TextAlign.Center,
|
||||||
|
draw_emojis=True,
|
||||||
|
)
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_pt(saying: str):
|
||||||
|
return await asyncio.to_thread(_draw_pt, saying)
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_mnk(saying: str):
|
||||||
|
img = mnk_image.copy()
|
||||||
|
with imagetext_py.Writer(img) as iw:
|
||||||
|
iw.draw_text_wrapped(
|
||||||
|
saying, 540, 25, 0.5, 0, 1080, 120, HARMONYOS_SANS_SC_BLACK,
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||||
|
0.8,
|
||||||
|
imagetext_py.TextAlign.Center,
|
||||||
|
cast(Any, 15.0),
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
|
||||||
|
draw_emojis=True,
|
||||||
|
)
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_mnk(saying: str):
|
||||||
|
return await asyncio.to_thread(_draw_mnk, saying)
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_suan(saying: str, dasuan: bool = False):
|
||||||
|
if dasuan:
|
||||||
|
img = dasuan_image.copy()
|
||||||
|
else:
|
||||||
|
img = suan_image.copy()
|
||||||
|
with imagetext_py.Writer(img) as iw:
|
||||||
|
iw.draw_text_wrapped(
|
||||||
|
saying, 1020, 290, 0.5, 0.5, 400, 48, LXGWWENKAI_REGULAR,
|
||||||
|
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
|
||||||
|
1.0,
|
||||||
|
imagetext_py.TextAlign.Center,
|
||||||
|
draw_emojis=True,
|
||||||
|
)
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_suan(saying: str, dasuan: bool = False):
|
||||||
|
return await asyncio.to_thread(_draw_suan, saying, dasuan)
|
||||||
@ -1,11 +1,11 @@
|
|||||||
from typing import Optional
|
from typing import Optional, Union
|
||||||
from nonebot.adapters import Event as BaseEvent
|
from nonebot.adapters import Event as BaseEvent
|
||||||
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
||||||
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||||||
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
|
||||||
|
|
||||||
from konabot.plugins.roll_dice.roll_dice import generate_dice_image
|
from konabot.plugins.roll_dice.roll_dice import generate_dice_image
|
||||||
from konabot.plugins.roll_dice.roll_number import get_random_number, roll_number
|
from konabot.plugins.roll_dice.roll_number import get_random_number, get_random_number_string, roll_number
|
||||||
|
|
||||||
evt = on_alconna(Alconna(
|
evt = on_alconna(Alconna(
|
||||||
"摇数字"
|
"摇数字"
|
||||||
@ -22,21 +22,26 @@ async def _(event: BaseEvent):
|
|||||||
|
|
||||||
evt = on_alconna(Alconna(
|
evt = on_alconna(Alconna(
|
||||||
"摇骰子",
|
"摇骰子",
|
||||||
Args["f1?", int]["f2?", int]
|
Args["f1?", str]["f2?", str]
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
||||||
|
|
||||||
@evt.handle()
|
@evt.handle()
|
||||||
async def _(event: BaseEvent, f1: Optional[int] = None, f2: Optional[int] = None):
|
async def _(event: BaseEvent, f1: Optional[str] = None, f2: Optional[str] = None):
|
||||||
# if isinstance(event, DiscordMessageEvent):
|
# if isinstance(event, DiscordMessageEvent):
|
||||||
# await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export())
|
# await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export())
|
||||||
# elif isinstance(event, ConsoleMessageEvent):
|
# elif isinstance(event, ConsoleMessageEvent):
|
||||||
number = 0
|
number = ""
|
||||||
if(f1 is not None and f2 is not None):
|
if(f1 is not None and f2 is not None):
|
||||||
number = get_random_number(f1, f2)
|
number = get_random_number_string(f1, f2)
|
||||||
elif f1 is not None:
|
elif f1 is not None:
|
||||||
number = get_random_number(1, f1)
|
if(float(f1) > 1):
|
||||||
|
number = get_random_number_string("1", f1)
|
||||||
|
elif (float(f1) > 0):
|
||||||
|
number = get_random_number_string("0", f1)
|
||||||
|
else:
|
||||||
|
number = get_random_number_string(f1, "0")
|
||||||
else:
|
else:
|
||||||
number = get_random_number()
|
number = get_random_number_string()
|
||||||
await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export())
|
await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export())
|
||||||
# else:
|
# else:
|
||||||
# await evt.send(await UniMessage().text(roll_dice(wide=True)).export())
|
# await evt.send(await UniMessage().text(roll_dice(wide=True)).export())
|
||||||
|
|||||||
@ -1,9 +1,11 @@
|
|||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
import cv2
|
import cv2
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from PIL import Image, ImageDraw, ImageFont
|
from PIL import Image, ImageDraw, ImageFont
|
||||||
|
|
||||||
from konabot.plugins.roll_dice.base.path import ASSETS
|
from konabot.common.path import ASSETS_PATH, FONTS_PATH
|
||||||
|
|
||||||
|
|
||||||
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
|
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
|
||||||
"""
|
"""
|
||||||
@ -13,15 +15,7 @@ def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0
|
|||||||
temp_image = Image.new('RGB', (1, 1), (255, 255, 255))
|
temp_image = Image.new('RGB', (1, 1), (255, 255, 255))
|
||||||
temp_draw = ImageDraw.Draw(temp_image)
|
temp_draw = ImageDraw.Draw(temp_image)
|
||||||
|
|
||||||
font = ImageFont.truetype(ASSETS / "montserrat.otf", font_size)
|
font = ImageFont.truetype(FONTS_PATH / "montserrat.otf", font_size)
|
||||||
# try:
|
|
||||||
# font = ImageFont.truetype(ASSETS / "montserrat.otf", font_size)
|
|
||||||
# except:
|
|
||||||
# try:
|
|
||||||
# font = ImageFont.truetype("arial.ttf", font_size)
|
|
||||||
# except:
|
|
||||||
# # 如果系统字体不可用,使用默认字体
|
|
||||||
# font = ImageFont.load_default()
|
|
||||||
|
|
||||||
# 获取文本边界框
|
# 获取文本边界框
|
||||||
bbox = temp_draw.textbbox((0, 0), text, font=font)
|
bbox = temp_draw.textbbox((0, 0), text, font=font)
|
||||||
@ -158,27 +152,208 @@ def precise_blend_with_perspective(background, foreground, corners):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def generate_dice_image(number: int) -> BytesIO:
|
def draw_line_bresenham(image, x0, y0, x1, y1, color):
|
||||||
|
"""使用Bresenham算法画线,避免间隙"""
|
||||||
|
dx = abs(x1 - x0)
|
||||||
|
dy = abs(y1 - y0)
|
||||||
|
sx = 1 if x0 < x1 else -1
|
||||||
|
sy = 1 if y0 < y1 else -1
|
||||||
|
err = dx - dy
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if 0 <= x0 < image.shape[1] and 0 <= y0 < image.shape[0]:
|
||||||
|
image[y0, x0] = color
|
||||||
|
|
||||||
|
if x0 == x1 and y0 == y1:
|
||||||
|
break
|
||||||
|
|
||||||
|
e2 = 2 * err
|
||||||
|
if e2 > -dy:
|
||||||
|
err -= dy
|
||||||
|
x0 += sx
|
||||||
|
if e2 < dx:
|
||||||
|
err += dx
|
||||||
|
y0 += sy
|
||||||
|
|
||||||
|
def slice_and_stretch(image, slice_lines, direction):
|
||||||
|
'''
|
||||||
|
image: 图像
|
||||||
|
slice_lines: 切割线(两个点的列表),一般是倾斜45度的直线
|
||||||
|
direction: 移动方向向量(二元数组)
|
||||||
|
'''
|
||||||
|
# 获取图片的尺寸
|
||||||
|
height, width = image.shape[:2]
|
||||||
|
# 创建一个由移动方向扩充后,更大的图片
|
||||||
|
new_width = int(width + abs(direction[0]))
|
||||||
|
new_height = int(height + abs(direction[1]))
|
||||||
|
new_image = np.zeros((new_height, new_width, 4), dtype=image.dtype)
|
||||||
|
# 先把图片放在新图的和方向相反的一侧
|
||||||
|
offset_x = int(abs(min(0, direction[0])))
|
||||||
|
offset_y = int(abs(min(0, direction[1])))
|
||||||
|
new_image[offset_y:offset_y+height, offset_x:offset_x+width] = image
|
||||||
|
# 切割线也跟着偏移
|
||||||
|
slice_lines = [(x + offset_x, y + offset_y) for (x, y) in slice_lines]
|
||||||
|
# 复制切割线经过的像素,沿着方向移动,实现类似拖尾的效果
|
||||||
|
apply_trail_effect_vectorized(new_image, slice_lines, direction)
|
||||||
|
apply_stroke_vectorized(new_image, slice_lines, direction)
|
||||||
|
|
||||||
|
|
||||||
|
return new_image, offset_x, offset_y
|
||||||
|
|
||||||
|
def apply_trail_effect_vectorized(new_image, slice_lines, direction):
|
||||||
|
"""向量化实现拖尾效果"""
|
||||||
|
height, width = new_image.shape[:2]
|
||||||
|
|
||||||
|
# 创建坐标网格
|
||||||
|
y_coords, x_coords = np.mgrid[0:height, 0:width]
|
||||||
|
|
||||||
|
# 向量化计算点到直线的距离
|
||||||
|
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
|
||||||
|
slice_lines[1][1] - slice_lines[0][1]])
|
||||||
|
point_vecs = np.stack([x_coords - slice_lines[0][0],
|
||||||
|
y_coords - slice_lines[0][1]], axis=-1)
|
||||||
|
|
||||||
|
# 计算叉积(有向距离)
|
||||||
|
cross_products = (line_vec[0] * point_vecs[:, :, 1] -
|
||||||
|
line_vec[1] * point_vecs[:, :, 0])
|
||||||
|
|
||||||
|
# 选择直线右侧的像素 (d1 > 0)
|
||||||
|
mask = cross_products > 0
|
||||||
|
|
||||||
|
# 计算目标位置
|
||||||
|
target_x = (x_coords + direction[0]).astype(int)
|
||||||
|
target_y = (y_coords + direction[1]).astype(int)
|
||||||
|
|
||||||
|
# 创建有效位置掩码
|
||||||
|
valid_mask = mask & (target_x >= 0) & (target_x < width) & \
|
||||||
|
(target_y >= 0) & (target_y < height)
|
||||||
|
|
||||||
|
# 批量复制像素
|
||||||
|
new_image[target_y[valid_mask], target_x[valid_mask]] = \
|
||||||
|
new_image[y_coords[valid_mask], x_coords[valid_mask]]
|
||||||
|
|
||||||
|
def apply_stroke_vectorized(new_image, slice_lines, direction):
|
||||||
|
"""使用向量化操作优化笔画效果"""
|
||||||
|
height, width = new_image.shape[:2]
|
||||||
|
|
||||||
|
# 1. 找到所有非透明像素
|
||||||
|
non_transparent = np.where(new_image[:, :, 3] > 0)
|
||||||
|
if len(non_transparent[0]) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
y_coords, x_coords = non_transparent
|
||||||
|
|
||||||
|
# 2. 向量化计算点到直线的距离
|
||||||
|
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
|
||||||
|
slice_lines[1][1] - slice_lines[0][1]])
|
||||||
|
point_vecs = np.column_stack([x_coords - slice_lines[0][0],
|
||||||
|
y_coords - slice_lines[0][1]])
|
||||||
|
|
||||||
|
# 计算叉积(距离)
|
||||||
|
cross_products = (line_vec[0] * point_vecs[:, 1] -
|
||||||
|
line_vec[1] * point_vecs[:, 0])
|
||||||
|
|
||||||
|
# 3. 选择靠近直线的像素
|
||||||
|
mask = np.abs(cross_products) < 1.0
|
||||||
|
selected_y = y_coords[mask]
|
||||||
|
selected_x = x_coords[mask]
|
||||||
|
selected_pixels = new_image[selected_y, selected_x]
|
||||||
|
|
||||||
|
if len(selected_x) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 4. 预计算采样点
|
||||||
|
length = np.sqrt(direction[0]**2 + direction[1]**2)
|
||||||
|
if length == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 创建采样偏移
|
||||||
|
dx_dy = np.array([(dx, dy) for dx in [-0.5, 0, 0.5]
|
||||||
|
for dy in [-0.5, 0, 0.5]])
|
||||||
|
|
||||||
|
# 5. 批量计算目标位置
|
||||||
|
steps = max(1, int(length * 2))
|
||||||
|
alpha = 0.7
|
||||||
|
|
||||||
|
for k in range(1, steps + 1):
|
||||||
|
# 对所有选中的像素批量计算新位置
|
||||||
|
scale = k / steps
|
||||||
|
|
||||||
|
# 为每个像素和每个采样点计算目标位置
|
||||||
|
for dx, dy in dx_dy:
|
||||||
|
target_x = np.round(selected_x + dx + direction[0] * scale).astype(int)
|
||||||
|
target_y = np.round(selected_y + dy + direction[1] * scale).astype(int)
|
||||||
|
|
||||||
|
# 创建有效位置掩码
|
||||||
|
valid_mask = (target_x >= 0) & (target_x < width) & \
|
||||||
|
(target_y >= 0) & (target_y < height)
|
||||||
|
|
||||||
|
if np.any(valid_mask):
|
||||||
|
valid_target_x = target_x[valid_mask]
|
||||||
|
valid_target_y = target_y[valid_mask]
|
||||||
|
valid_source_idx = np.where(valid_mask)[0]
|
||||||
|
|
||||||
|
# 批量混合像素
|
||||||
|
source_pixels = selected_pixels[valid_source_idx]
|
||||||
|
target_pixels = new_image[valid_target_y, valid_target_x]
|
||||||
|
|
||||||
|
new_image[valid_target_y, valid_target_x] = (
|
||||||
|
alpha * source_pixels + (1 - alpha) * target_pixels
|
||||||
|
)
|
||||||
|
|
||||||
|
async def generate_dice_image(number: str) -> BytesIO:
|
||||||
# 将文本转换为带透明背景的图像
|
# 将文本转换为带透明背景的图像
|
||||||
text = str(number)
|
text = number
|
||||||
|
|
||||||
|
# 如果文本太长,直接返回金箍棒
|
||||||
|
if(len(text) > 50):
|
||||||
|
output = BytesIO()
|
||||||
|
push_image = Image.open(ASSETS_PATH / "img" / "dice" / "stick.png")
|
||||||
|
push_image.save(output,format='PNG')
|
||||||
|
output.seek(0)
|
||||||
|
return output
|
||||||
|
|
||||||
text_image = text_to_transparent_image(
|
text_image = text_to_transparent_image(
|
||||||
text,
|
text,
|
||||||
font_size=60,
|
font_size=60,
|
||||||
text_color=(0, 0, 0) # 黑色文字
|
text_color=(0, 0, 0) # 黑色文字
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# 获取长宽比
|
||||||
|
height, width = text_image.shape[:2]
|
||||||
|
aspect_ratio = width / height
|
||||||
|
|
||||||
|
# 根据长宽比设置拉伸系数
|
||||||
|
stretch_k = 1
|
||||||
|
if aspect_ratio > 1:
|
||||||
|
stretch_k = aspect_ratio
|
||||||
|
|
||||||
|
# 骰子的方向
|
||||||
|
up_direction = (51 - 16, 5 - 30) # 右上角点 - 左上角点
|
||||||
|
|
||||||
|
move_distance = (up_direction[0] * (stretch_k - 1), up_direction[1] * (stretch_k - 1))
|
||||||
|
|
||||||
|
# 加载背景图像,保留透明通道
|
||||||
|
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
|
||||||
|
assert background is not None
|
||||||
|
|
||||||
|
height, width = background.shape[:2]
|
||||||
|
|
||||||
|
background, offset_x, offset_y = slice_and_stretch(background,
|
||||||
|
[(10,10),(0,0)],
|
||||||
|
move_distance)
|
||||||
|
|
||||||
# 定义3D变换的四个角点(透视效果)
|
# 定义3D变换的四个角点(透视效果)
|
||||||
# 顺序: [左上, 右上, 右下, 左下]
|
# 顺序: [左上, 右上, 右下, 左下]
|
||||||
corners = np.array([
|
corners = np.array([
|
||||||
[16, 30], # 左上
|
[16, 30], # 左上
|
||||||
[51, 5], # 右上(上移,创建透视)
|
[51 + move_distance[0], 5 + move_distance[1]], # 右上(上移,创建透视)
|
||||||
[88, 33], # 右下
|
[88 + move_distance[0], 33 + move_distance[1]], # 右下
|
||||||
[49, 62] # 左下(下移)
|
[49, 62] # 左下(下移)
|
||||||
], dtype=np.float32)
|
], dtype=np.float32)
|
||||||
|
corners[:, 0] += offset_x
|
||||||
|
corners[:, 1] += offset_y
|
||||||
|
|
||||||
# 加载背景图像,保留透明通道
|
|
||||||
background = cv2.imread(str(ASSETS / "template.png"), cv2.IMREAD_UNCHANGED)
|
|
||||||
|
|
||||||
|
|
||||||
# 对文本图像进行3D变换(保持透明通道)
|
# 对文本图像进行3D变换(保持透明通道)
|
||||||
transformed_text, transform_matrix = perspective_transform(text_image, background, corners)
|
transformed_text, transform_matrix = perspective_transform(text_image, background, corners)
|
||||||
@ -189,9 +364,29 @@ async def generate_dice_image(number: int) -> BytesIO:
|
|||||||
|
|
||||||
pil_final = Image.fromarray(final_image_simple)
|
pil_final = Image.fromarray(final_image_simple)
|
||||||
# 导入一系列图像
|
# 导入一系列图像
|
||||||
images: list[Image.Image] = [Image.open(ASSETS / f"{i}.png") for i in range(1, 12)]
|
images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)]
|
||||||
images.append(pil_final)
|
images.append(pil_final)
|
||||||
frame_durations = [100] * (len(images) - 1) + [100000]
|
frame_durations = [100] * (len(images) - 1) + [100000]
|
||||||
|
# 将导入的图像尺寸扩展为和 pil_final 相同的大小,随帧数进行扩展,然后不放大的情况下放在最中间
|
||||||
|
if(aspect_ratio > 1):
|
||||||
|
target_size = pil_final.size
|
||||||
|
for i in range(len(images) - 1):
|
||||||
|
k = i / (len(images) - 1)
|
||||||
|
now_distance = (move_distance[0] * k, move_distance[1] * k)
|
||||||
|
img = np.array(images[i])
|
||||||
|
img, _, _ = slice_and_stretch(img,
|
||||||
|
[(10,10),(0,0)],
|
||||||
|
now_distance)
|
||||||
|
# 只扩展边界,图像本身不放大
|
||||||
|
img_width, img_height = img.shape[1], img.shape[0]
|
||||||
|
new_img = Image.new("RGBA", target_size, (0, 0, 0, 0))
|
||||||
|
this_offset_x = (target_size[0] - img_width) // 2
|
||||||
|
this_offset_y = (target_size[1] - img_height) // 2
|
||||||
|
# new_img.paste(img, (this_offset_x, this_offset_y))
|
||||||
|
new_img.paste(Image.fromarray(img), (this_offset_x, this_offset_y))
|
||||||
|
images[i] = new_img
|
||||||
|
|
||||||
|
|
||||||
# 保存为BytesIO对象
|
# 保存为BytesIO对象
|
||||||
output = BytesIO()
|
output = BytesIO()
|
||||||
images[0].save(output,
|
images[0].save(output,
|
||||||
@ -200,4 +395,6 @@ async def generate_dice_image(number: int) -> BytesIO:
|
|||||||
duration=frame_durations,
|
duration=frame_durations,
|
||||||
format='GIF',
|
format='GIF',
|
||||||
loop=1)
|
loop=1)
|
||||||
|
output.seek(0)
|
||||||
|
# pil_final.save(output, format='PNG')
|
||||||
return output
|
return output
|
||||||
@ -42,6 +42,24 @@ def get_random_number(min: int = 1, max: int = 6) -> int:
|
|||||||
import random
|
import random
|
||||||
return random.randint(min, max)
|
return random.randint(min, max)
|
||||||
|
|
||||||
|
def get_random_number_string(min_value: str = "1", max_value: str = "6") -> str:
|
||||||
|
import random
|
||||||
|
|
||||||
|
# 先判断二者是不是整数
|
||||||
|
if (float(min_value).is_integer()
|
||||||
|
and float(max_value).is_integer()
|
||||||
|
and "." not in min_value
|
||||||
|
and "." not in max_value):
|
||||||
|
return str(random.randint(int(float(min_value)), int(float(max_value))))
|
||||||
|
|
||||||
|
# 根据传入小数的位数,决定保留几位小数
|
||||||
|
if "." in str(min_value) or "." in str(max_value):
|
||||||
|
decimal_places = max(len(str(min_value).split(".")[1]) if "." in str(min_value) else 0,
|
||||||
|
len(str(max_value).split(".")[1]) if "." in str(max_value) else 0)
|
||||||
|
return str(round(random.uniform(float(min_value), float(max_value)), decimal_places))
|
||||||
|
|
||||||
|
# 如果没有小数点,很可能二者都是指数表示或均为 inf,直接返回随机小数
|
||||||
|
return str(random.uniform(float(min_value), float(max_value)))
|
||||||
def roll_number(wide: bool = False) -> str:
|
def roll_number(wide: bool = False) -> str:
|
||||||
raw = number_arts[get_random_number()]
|
raw = number_arts[get_random_number()]
|
||||||
if wide:
|
if wide:
|
||||||
|
|||||||
207
konabot/plugins/simple_notify/__init__.py
Normal file
@ -0,0 +1,207 @@
|
|||||||
|
import asyncio
|
||||||
|
import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Literal, cast
|
||||||
|
|
||||||
|
import nonebot
|
||||||
|
from loguru import logger
|
||||||
|
from nonebot import on_message
|
||||||
|
from nonebot.adapters import Event
|
||||||
|
from nonebot.adapters.console import Bot as ConsoleBot
|
||||||
|
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
||||||
|
from nonebot.adapters.discord import Bot as DiscordBot
|
||||||
|
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||||||
|
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||||
|
from nonebot.adapters.onebot.v11.event import \
|
||||||
|
GroupMessageEvent as OnebotV11GroupMessageEvent
|
||||||
|
from nonebot.adapters.onebot.v11.event import \
|
||||||
|
MessageEvent as OnebotV11MessageEvent
|
||||||
|
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from konabot.plugins.simple_notify.parse_time import get_target_time
|
||||||
|
|
||||||
|
evt = on_message()
|
||||||
|
|
||||||
|
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
|
||||||
|
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json"
|
||||||
|
DATA_FILE_LOCK = asyncio.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
class Notify(BaseModel):
|
||||||
|
platform: Literal["console", "qq", "discord"]
|
||||||
|
|
||||||
|
target: str
|
||||||
|
"需要接受通知的个体"
|
||||||
|
|
||||||
|
target_env: str | None
|
||||||
|
"在哪里进行通知,如果是 None 代表私聊通知"
|
||||||
|
|
||||||
|
notify_time: datetime.datetime
|
||||||
|
notify_msg: str
|
||||||
|
|
||||||
|
def get_str(self):
|
||||||
|
return f"{self.target}-{self.target_env}-{self.platform}-{self.notify_time}"
|
||||||
|
|
||||||
|
|
||||||
|
class NotifyConfigFile(BaseModel):
|
||||||
|
version: int = 1
|
||||||
|
notifies: list[Notify] = []
|
||||||
|
unsent: list[Notify] = []
|
||||||
|
|
||||||
|
|
||||||
|
def load_notify_config() -> NotifyConfigFile:
|
||||||
|
if not DATA_FILE_PATH.exists():
|
||||||
|
return NotifyConfigFile()
|
||||||
|
try:
|
||||||
|
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text())
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"在解析 Notify 时遇到问题:{e}")
|
||||||
|
return NotifyConfigFile()
|
||||||
|
|
||||||
|
|
||||||
|
def save_notify_config(config: NotifyConfigFile):
|
||||||
|
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
|
||||||
|
|
||||||
|
|
||||||
|
async def notify_now(notify: Notify):
|
||||||
|
if notify.platform == 'console':
|
||||||
|
bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)]
|
||||||
|
if len(bot) != 1:
|
||||||
|
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
||||||
|
return False
|
||||||
|
bot = bot[0]
|
||||||
|
await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}")
|
||||||
|
elif notify.platform == 'discord':
|
||||||
|
bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)]
|
||||||
|
if len(bot) != 1:
|
||||||
|
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
||||||
|
return False
|
||||||
|
bot = bot[0]
|
||||||
|
channel = await bot.create_DM(recipient_id=int(notify.target))
|
||||||
|
await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}")
|
||||||
|
elif notify.platform == 'qq':
|
||||||
|
bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)]
|
||||||
|
if len(bot) != 1:
|
||||||
|
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
||||||
|
return False
|
||||||
|
bot = bot[0]
|
||||||
|
if notify.target_env is None:
|
||||||
|
await bot.send_private_msg(
|
||||||
|
user_id=int(notify.target),
|
||||||
|
message=f"代办通知:{notify.notify_msg}",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await bot.send_group_msg(
|
||||||
|
group_id=int(notify.target_env),
|
||||||
|
message=cast(Any,
|
||||||
|
await UniMessage().at(notify.target).text(f" 代办通知:{notify.notify_msg}").export()
|
||||||
|
),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(f"提醒未成功发送出去:{notify}")
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def create_notify_task(notify: Notify, fail2remove: bool = True):
|
||||||
|
async def mission():
|
||||||
|
begin_time = datetime.datetime.now()
|
||||||
|
if begin_time < notify.notify_time:
|
||||||
|
await asyncio.sleep((notify.notify_time - begin_time).total_seconds())
|
||||||
|
res = await notify_now(notify)
|
||||||
|
if fail2remove or res:
|
||||||
|
await DATA_FILE_LOCK.acquire()
|
||||||
|
cfg = load_notify_config()
|
||||||
|
cfg.notifies = [n for n in cfg.notifies if n.get_str() != notify.get_str()]
|
||||||
|
if not res:
|
||||||
|
cfg.unsent.append(notify)
|
||||||
|
save_notify_config(cfg)
|
||||||
|
DATA_FILE_LOCK.release()
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
return asyncio.create_task(mission())
|
||||||
|
|
||||||
|
|
||||||
|
@evt.handle()
|
||||||
|
async def _(msg: UniMsg, mEvt: Event):
|
||||||
|
if mEvt.get_user_id() in nonebot.get_bots():
|
||||||
|
return
|
||||||
|
|
||||||
|
text = msg.extract_plain_text()
|
||||||
|
if "提醒我" not in text:
|
||||||
|
return
|
||||||
|
|
||||||
|
segments = text.split("提醒我", maxsplit=1)
|
||||||
|
if len(segments) != 2:
|
||||||
|
return
|
||||||
|
|
||||||
|
notify_time, notify_text = segments
|
||||||
|
target_time = get_target_time(notify_time)
|
||||||
|
if target_time is None:
|
||||||
|
logger.info(f"无法从 {notify_time} 中解析出时间")
|
||||||
|
return
|
||||||
|
if not notify_text:
|
||||||
|
return
|
||||||
|
|
||||||
|
await DATA_FILE_LOCK.acquire()
|
||||||
|
cfg = load_notify_config()
|
||||||
|
|
||||||
|
if isinstance(mEvt, ConsoleMessageEvent):
|
||||||
|
platform = "console"
|
||||||
|
target = mEvt.get_user_id()
|
||||||
|
target_env = None
|
||||||
|
elif isinstance(mEvt, OnebotV11MessageEvent):
|
||||||
|
platform = "qq"
|
||||||
|
target = mEvt.get_user_id()
|
||||||
|
if isinstance(mEvt, OnebotV11GroupMessageEvent):
|
||||||
|
target_env = str(mEvt.group_id)
|
||||||
|
else:
|
||||||
|
target_env = None
|
||||||
|
elif isinstance(mEvt, DiscordMessageEvent):
|
||||||
|
platform = "discord"
|
||||||
|
target = mEvt.get_user_id()
|
||||||
|
target_env = None
|
||||||
|
else:
|
||||||
|
logger.warning(f"Notify 遇到不支持的平台:{type(mEvt).__name__}")
|
||||||
|
return
|
||||||
|
|
||||||
|
notify = Notify(
|
||||||
|
platform=platform,
|
||||||
|
target=target,
|
||||||
|
target_env=target_env,
|
||||||
|
notify_time=target_time,
|
||||||
|
notify_msg=notify_text,
|
||||||
|
)
|
||||||
|
await create_notify_task(notify)
|
||||||
|
|
||||||
|
cfg.notifies.append(notify)
|
||||||
|
save_notify_config(cfg)
|
||||||
|
DATA_FILE_LOCK.release()
|
||||||
|
|
||||||
|
await evt.send(await UniMessage().at(mEvt.get_user_id()).text(
|
||||||
|
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export())
|
||||||
|
|
||||||
|
|
||||||
|
driver = nonebot.get_driver()
|
||||||
|
|
||||||
|
NOTIFIED_FLAG = {
|
||||||
|
"task_added": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@driver.on_bot_connect
|
||||||
|
async def _():
|
||||||
|
if NOTIFIED_FLAG["task_added"]:
|
||||||
|
return
|
||||||
|
|
||||||
|
NOTIFIED_FLAG["task_added"] = True
|
||||||
|
|
||||||
|
await DATA_FILE_LOCK.acquire()
|
||||||
|
tasks = []
|
||||||
|
cfg = load_notify_config()
|
||||||
|
for notify in cfg.notifies:
|
||||||
|
tasks.append(create_notify_task(notify, fail2remove=False))
|
||||||
|
DATA_FILE_LOCK.release()
|
||||||
|
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
358
konabot/plugins/simple_notify/parse_time.py
Normal file
@ -0,0 +1,358 @@
|
|||||||
|
import datetime
|
||||||
|
import re
|
||||||
|
from typing import Optional, Dict, List, Callable, Tuple
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
# --- 常量与正则表达式定义 (Constants and Regex Definitions) ---
|
||||||
|
|
||||||
|
# 数字模式,兼容中文和阿拉伯数字
|
||||||
|
P_NUM = r"(\d+|[零一两二三四五六七八九十]+)"
|
||||||
|
|
||||||
|
# 预编译的正则表达式
|
||||||
|
PATTERNS = {
|
||||||
|
# 相对时间, e.g., "5分钟后"
|
||||||
|
"DELTA": re.compile(
|
||||||
|
r"^"
|
||||||
|
r"((?P<days>" + P_NUM + r") ?天)?"
|
||||||
|
r"((?P<hours>" + P_NUM + r") ?个?小?时)?"
|
||||||
|
r"((?P<minutes>" + P_NUM + r") ?分钟?)?"
|
||||||
|
r"((?P<seconds>" + P_NUM + r") ?秒钟?)?"
|
||||||
|
r" ?后 ?$"
|
||||||
|
),
|
||||||
|
# 绝对时间
|
||||||
|
"YEAR": re.compile(r"(" + P_NUM + r") ?年"),
|
||||||
|
"MONTH": re.compile(r"(" + P_NUM + r") ?月"),
|
||||||
|
"DAY": re.compile(r"(" + P_NUM + r") ?[日号]"),
|
||||||
|
"HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"),
|
||||||
|
"MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"),
|
||||||
|
"SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"),
|
||||||
|
"HMS_COLON": re.compile(r"(\d{1,2})[::](\d{1,2})([::](\d{1,2}))?"),
|
||||||
|
"PM": re.compile(r"(下午|PM|晚上)"),
|
||||||
|
# 相对日期
|
||||||
|
"TOMORROW": re.compile(r"明天"),
|
||||||
|
"DAY_AFTER_TOMORROW": re.compile(r"后天"),
|
||||||
|
"TODAY": re.compile(r"今天"),
|
||||||
|
}
|
||||||
|
|
||||||
|
# 中文数字到阿拉伯数字的映射
|
||||||
|
CHINESE_TO_ARABIC_MAP: Dict[str, int] = {
|
||||||
|
'零': 0, '一': 1, '二': 2, '三': 3, '四': 4,
|
||||||
|
'五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10
|
||||||
|
}
|
||||||
|
|
||||||
|
# --- 核心工具函数 (Core Utility Functions) ---
|
||||||
|
|
||||||
|
def parse_number(s: str) -> int:
|
||||||
|
"""
|
||||||
|
将包含中文或阿拉伯数字的字符串解析为整数。
|
||||||
|
例如: "五" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。
|
||||||
|
返回 -1 表示解析失败。
|
||||||
|
"""
|
||||||
|
if not s:
|
||||||
|
return -1
|
||||||
|
|
||||||
|
s = s.strip().replace("两", "二")
|
||||||
|
|
||||||
|
if s.isdigit():
|
||||||
|
return int(s)
|
||||||
|
|
||||||
|
if s in CHINESE_TO_ARABIC_MAP:
|
||||||
|
return CHINESE_TO_ARABIC_MAP[s]
|
||||||
|
|
||||||
|
# 处理 "十" 在不同位置的情况
|
||||||
|
if s.startswith('十'):
|
||||||
|
if len(s) == 1:
|
||||||
|
return 10
|
||||||
|
num = CHINESE_TO_ARABIC_MAP.get(s[1])
|
||||||
|
return 10 + num if num is not None else -1
|
||||||
|
|
||||||
|
if s.endswith('十'):
|
||||||
|
if len(s) == 2:
|
||||||
|
num = CHINESE_TO_ARABIC_MAP.get(s[0])
|
||||||
|
return 10 * num if num is not None else -1
|
||||||
|
|
||||||
|
if '十' in s:
|
||||||
|
parts = s.split('十')
|
||||||
|
if len(parts) == 2:
|
||||||
|
left = CHINESE_TO_ARABIC_MAP.get(parts[0])
|
||||||
|
right = CHINESE_TO_ARABIC_MAP.get(parts[1])
|
||||||
|
if left is not None and right is not None:
|
||||||
|
return left * 10 + right
|
||||||
|
|
||||||
|
return -1
|
||||||
|
|
||||||
|
|
||||||
|
# --- 时间解析器类 (Time Parser Class) ---
|
||||||
|
|
||||||
|
class TimeParser:
|
||||||
|
"""
|
||||||
|
一个用于解析自然语言时间描述的类。
|
||||||
|
"""
|
||||||
|
def __init__(self, content: str):
|
||||||
|
self.original_content: str = content
|
||||||
|
self.content_to_parse: str = self._preprocess(content)
|
||||||
|
self.now: datetime.datetime = datetime.datetime.now()
|
||||||
|
# 将 t 作为结果构建器,初始化为今天的午夜
|
||||||
|
self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
|
self.is_pm_specified: bool = False
|
||||||
|
self.is_date_specified: bool = False
|
||||||
|
self.is_time_specified: bool = False
|
||||||
|
|
||||||
|
def _preprocess(self, content: str) -> str:
|
||||||
|
"""预处理字符串,移除不相关字符。"""
|
||||||
|
content = re.sub(r"\s+", "", content)
|
||||||
|
content = re.sub(r"[,,\.。::、]", "", content)
|
||||||
|
return content
|
||||||
|
|
||||||
|
def _consume_match(self, match: re.Match) -> str:
|
||||||
|
"""从待解析字符串中移除已匹配的部分。"""
|
||||||
|
self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1)
|
||||||
|
return match.group(0)
|
||||||
|
|
||||||
|
def parse(self) -> Optional[datetime.datetime]:
|
||||||
|
"""
|
||||||
|
主解析方法。
|
||||||
|
首先尝试解析相对时间(如“5分钟后”),失败则尝试解析绝对时间。
|
||||||
|
"""
|
||||||
|
logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'")
|
||||||
|
if not self.content_to_parse:
|
||||||
|
logger.debug("❌ 内容为空,无法解析。")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 1. 尝试相对时间解析
|
||||||
|
if (target_time := self._parse_relative_time()) is not None:
|
||||||
|
return target_time
|
||||||
|
|
||||||
|
# 2. 尝试绝对时间解析
|
||||||
|
if (target_time := self._parse_absolute_time()) is not None:
|
||||||
|
return target_time
|
||||||
|
|
||||||
|
logger.debug(f"❌ 所有解析模式均未匹配成功。")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _parse_relative_time(self) -> Optional[datetime.datetime]:
|
||||||
|
"""解析 'X天X小时X分钟后' 这种格式。"""
|
||||||
|
if match := PATTERNS["DELTA"].match(self.content_to_parse):
|
||||||
|
logger.debug("⏳ 匹配到相对时间模式 (DELTA)。")
|
||||||
|
try:
|
||||||
|
delta_parts = {
|
||||||
|
"days": parse_number(match.group("days") or "0"),
|
||||||
|
"hours": parse_number(match.group("hours") or "0"),
|
||||||
|
"minutes": parse_number(match.group("minutes") or "0"),
|
||||||
|
"seconds": parse_number(match.group("seconds") or "0"),
|
||||||
|
}
|
||||||
|
|
||||||
|
# 检查是否有无效的数字解析
|
||||||
|
if any(v < 0 for v in delta_parts.values()):
|
||||||
|
logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
delta = datetime.timedelta(**delta_parts)
|
||||||
|
if delta.total_seconds() == 0:
|
||||||
|
logger.debug("❌ 解析出的时间增量为0。")
|
||||||
|
return None
|
||||||
|
|
||||||
|
target_time = self.now + delta
|
||||||
|
logger.debug(f"✅ 相对时间解析成功 -> {target_time}")
|
||||||
|
return target_time
|
||||||
|
except (ValueError, TypeError) as e:
|
||||||
|
logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True)
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _parse_absolute_time(self) -> Optional[datetime.datetime]:
|
||||||
|
"""解析一个指定的日期和时间。"""
|
||||||
|
logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}")
|
||||||
|
|
||||||
|
# 定义解析步骤和顺序
|
||||||
|
# (pattern_key, handler_method)
|
||||||
|
parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [
|
||||||
|
("TOMORROW", self._handle_tomorrow),
|
||||||
|
("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow),
|
||||||
|
("TODAY", self._handle_today),
|
||||||
|
("YEAR", self._handle_year),
|
||||||
|
("MONTH", self._handle_month),
|
||||||
|
("DAY", self._handle_day),
|
||||||
|
("HMS_COLON", self._handle_hms_colon),
|
||||||
|
("PM", self._handle_pm),
|
||||||
|
("HOUR", self._handle_hour),
|
||||||
|
("MINUTE", self._handle_minute),
|
||||||
|
("SECOND", self._handle_second),
|
||||||
|
]
|
||||||
|
|
||||||
|
for key, handler in parsing_steps:
|
||||||
|
if match := PATTERNS[key].search(self.content_to_parse):
|
||||||
|
if not handler(match):
|
||||||
|
# 如果任何一个处理器返回False,说明解析失败
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 移除无意义的上午关键词
|
||||||
|
self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "")
|
||||||
|
|
||||||
|
# 如果解析后还有剩余字符,说明有无法识别的部分
|
||||||
|
if self.content_to_parse.strip():
|
||||||
|
logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 最终调整和检查
|
||||||
|
return self._finalize_datetime()
|
||||||
|
|
||||||
|
# --- Handler Methods for Absolute Time Parsing ---
|
||||||
|
|
||||||
|
def _handle_tomorrow(self, match: re.Match) -> bool:
|
||||||
|
self.t += datetime.timedelta(days=1)
|
||||||
|
self.is_date_specified = True
|
||||||
|
logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_day_after_tomorrow(self, match: re.Match) -> bool:
|
||||||
|
self.t += datetime.timedelta(days=2)
|
||||||
|
self.is_date_specified = True
|
||||||
|
logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_today(self, match: re.Match) -> bool:
|
||||||
|
self.is_date_specified = True
|
||||||
|
logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_year(self, match: re.Match) -> bool:
|
||||||
|
year = parse_number(match.group(1))
|
||||||
|
if year < 0: return False
|
||||||
|
if year < 100: year += 2000 # 处理 "25年" -> 2025
|
||||||
|
if year < self.now.year:
|
||||||
|
logger.debug(f"❌ 指定的年份 {year} 已过去。")
|
||||||
|
return False
|
||||||
|
self.t = self.t.replace(year=year)
|
||||||
|
self.is_date_specified = True
|
||||||
|
logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_month(self, match: re.Match) -> bool:
|
||||||
|
month = parse_number(match.group(1))
|
||||||
|
if not (1 <= month <= 12):
|
||||||
|
logger.debug(f"❌ 无效的月份: {month}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 如果设置的月份在当前月份之前,且没有指定年份,则年份加一
|
||||||
|
if month < self.t.month and not self.is_date_specified:
|
||||||
|
self.t = self.t.replace(year=self.t.year + 1)
|
||||||
|
logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}")
|
||||||
|
|
||||||
|
self.t = self.t.replace(month=month)
|
||||||
|
self.is_date_specified = True
|
||||||
|
logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_day(self, match: re.Match) -> bool:
|
||||||
|
day = parse_number(match.group(1))
|
||||||
|
if not (1 <= day <= 31):
|
||||||
|
logger.debug(f"❌ 无效的日期: {day}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 如果日期小于当前日期,且只指定了日,则月份加一
|
||||||
|
if day < self.t.day and not self.is_date_specified:
|
||||||
|
if self.t.month == 12:
|
||||||
|
self.t = self.t.replace(year=self.t.year + 1, month=1)
|
||||||
|
else:
|
||||||
|
self.t = self.t.replace(month=self.t.month + 1)
|
||||||
|
logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}")
|
||||||
|
|
||||||
|
self.t = self.t.replace(day=day)
|
||||||
|
self.is_date_specified = True
|
||||||
|
logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _handle_hms_colon(self, match: re.Match) -> bool:
|
||||||
|
h = int(match.group(1))
|
||||||
|
m = int(match.group(2))
|
||||||
|
s_str = match.group(4) # group(3) is with colon, group(4) is the number
|
||||||
|
s = int(s_str) if s_str else 0
|
||||||
|
if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59):
|
||||||
|
logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}")
|
||||||
|
return False
|
||||||
|
self.t = self.t.replace(hour=h, minute=m, second=s)
|
||||||
|
self.is_time_specified = True
|
||||||
|
logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_pm(self, match: re.Match) -> bool:
|
||||||
|
self.is_pm_specified = True
|
||||||
|
logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_hour(self, match: re.Match) -> bool:
|
||||||
|
hour = parse_number(match.group(1))
|
||||||
|
has_half = match.group(2) == '半'
|
||||||
|
if not (0 <= hour <= 23):
|
||||||
|
logger.debug(f"❌ 无效的小时: {hour}")
|
||||||
|
return False
|
||||||
|
minute = 30 if has_half else self.t.minute
|
||||||
|
self.t = self.t.replace(hour=hour, minute=minute)
|
||||||
|
self.is_time_specified = True
|
||||||
|
logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_minute(self, match: re.Match) -> bool:
|
||||||
|
minute = parse_number(match.group(1))
|
||||||
|
if not (0 <= minute <= 59):
|
||||||
|
logger.debug(f"❌ 无效的分钟: {minute}")
|
||||||
|
return False
|
||||||
|
self.t = self.t.replace(minute=minute)
|
||||||
|
self.is_time_specified = True
|
||||||
|
logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _handle_second(self, match: re.Match) -> bool:
|
||||||
|
second = parse_number(match.group(1))
|
||||||
|
if not (0 <= second <= 59):
|
||||||
|
logger.debug(f"❌ 无效的秒: {second}")
|
||||||
|
return False
|
||||||
|
self.t = self.t.replace(second=second)
|
||||||
|
self.is_time_specified = True
|
||||||
|
logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _finalize_datetime(self) -> Optional[datetime.datetime]:
|
||||||
|
"""对解析出的时间进行最后的调整和检查。"""
|
||||||
|
# 处理下午/晚上
|
||||||
|
if self.is_pm_specified and self.t.hour < 12:
|
||||||
|
self.t = self.t.replace(hour=self.t.hour + 12)
|
||||||
|
logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}")
|
||||||
|
|
||||||
|
# 如果没有指定任何时间或日期部分,则认为解析无效
|
||||||
|
if not self.is_date_specified and not self.is_time_specified:
|
||||||
|
logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 如果最终计算出的时间点在当前时间之前,自动往后推
|
||||||
|
# 例如:现在是 15:00,说 "14点",应该是指明天的14点
|
||||||
|
if self.t < self.now:
|
||||||
|
# 只有在明确指定了时间的情况下,才自动加一天
|
||||||
|
# 如果只指定了一个过去的日期(如“去年5月1号”),则不应该调整
|
||||||
|
if self.is_time_specified:
|
||||||
|
self.t += datetime.timedelta(days=1)
|
||||||
|
logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}")
|
||||||
|
|
||||||
|
logger.debug(f"✅ 解析成功,最终时间: {self.t}")
|
||||||
|
return self.t
|
||||||
|
|
||||||
|
# --- 公共接口 (Public Interface) ---
|
||||||
|
|
||||||
|
def get_target_time(content: str) -> Optional[datetime.datetime]:
|
||||||
|
"""
|
||||||
|
高级接口,用于将自然语言时间描述转换为 datetime 对象。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content: 包含时间信息的字符串。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
一个 datetime 对象,如果解析失败则返回 None。
|
||||||
|
"""
|
||||||
|
parser = TimeParser(content)
|
||||||
|
return parser.parse()
|
||||||
268
konabot/plugins/ytpgif/__init__.py
Normal file
@ -0,0 +1,268 @@
|
|||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from PIL import Image, ImageSequence
|
||||||
|
from nonebot.adapters import Event as BaseEvent
|
||||||
|
from nonebot.plugin import PluginMetadata
|
||||||
|
from nonebot_plugin_alconna import (
|
||||||
|
Alconna,
|
||||||
|
Args,
|
||||||
|
Field,
|
||||||
|
UniMessage,
|
||||||
|
on_alconna,
|
||||||
|
)
|
||||||
|
|
||||||
|
__plugin_meta__ = PluginMetadata(
|
||||||
|
name="ytpgif",
|
||||||
|
description="生成来回镜像翻转的仿 YTPMV 动图。",
|
||||||
|
usage="ytpgif [倍速=1.0] (倍速范围:0.1~20.0)",
|
||||||
|
type="application",
|
||||||
|
config=None,
|
||||||
|
homepage=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 参数定义
|
||||||
|
BASE_SEGMENT_DURATION = 0.25
|
||||||
|
BASE_INTERVAL = 0.25
|
||||||
|
MAX_SIZE = 256
|
||||||
|
MIN_SPEED = 0.1
|
||||||
|
MAX_SPEED = 20.0
|
||||||
|
MAX_FRAMES_PER_SEGMENT = 500
|
||||||
|
|
||||||
|
# 提示语
|
||||||
|
SPEED_TIPS = f"倍速必须是 {MIN_SPEED} 到 {MAX_SPEED} 之间的数字"
|
||||||
|
|
||||||
|
|
||||||
|
# 定义命令 + 参数校验
|
||||||
|
ytpgif_cmd = on_alconna(
|
||||||
|
Alconna(
|
||||||
|
"ytpgif",
|
||||||
|
Args[
|
||||||
|
"speed?",
|
||||||
|
float,
|
||||||
|
Field(
|
||||||
|
default=1.0,
|
||||||
|
unmatch_tips=lambda x: f"“{x}”不是有效数值。{SPEED_TIPS}",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_image_url(event: BaseEvent) -> Optional[str]:
|
||||||
|
"""从事件中提取图片 URL,支持直接消息和回复"""
|
||||||
|
msg = event.get_message()
|
||||||
|
for seg in msg:
|
||||||
|
if seg.type == "image" and seg.data.get("url"):
|
||||||
|
return str(seg.data["url"])
|
||||||
|
|
||||||
|
if hasattr(event, "reply") and (reply := event.reply):
|
||||||
|
reply_msg = reply.message
|
||||||
|
for seg in reply_msg:
|
||||||
|
if seg.type == "image" and seg.data.get("url"):
|
||||||
|
return str(seg.data["url"])
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def download_image(url: str) -> bytes:
|
||||||
|
import httpx
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
resp = await client.get(url, timeout=10)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.content
|
||||||
|
|
||||||
|
|
||||||
|
def resize_frame(frame: Image.Image) -> Image.Image:
|
||||||
|
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
|
||||||
|
w, h = frame.size
|
||||||
|
if w <= MAX_SIZE and h <= MAX_SIZE:
|
||||||
|
return frame
|
||||||
|
|
||||||
|
scale = MAX_SIZE / max(w, h)
|
||||||
|
new_w = int(w * scale)
|
||||||
|
new_h = int(h * scale)
|
||||||
|
return frame.resize((new_w, new_h), Image.Resampling.LANCZOS)
|
||||||
|
|
||||||
|
|
||||||
|
@ytpgif_cmd.handle()
|
||||||
|
async def handle_ytpgif(event: BaseEvent, speed: float = 1.0):
|
||||||
|
# === 校验 speed 范围 ===
|
||||||
|
if not (MIN_SPEED <= speed <= MAX_SPEED):
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text(f"❌ {SPEED_TIPS}").export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
img_url = await get_image_url(event)
|
||||||
|
if not img_url:
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text(
|
||||||
|
"请发送一张图片或回复一张图片来生成镜像动图。"
|
||||||
|
).export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
image_data = await download_image(img_url)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[YTPGIF] 下载失败: {e}")
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text("❌ 图片下载失败,请重试。").export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
input_path = output_path = None
|
||||||
|
try:
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_in:
|
||||||
|
tmp_in.write(image_data)
|
||||||
|
input_path = tmp_in.name
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".gif") as tmp_out:
|
||||||
|
output_path = tmp_out.name
|
||||||
|
|
||||||
|
with Image.open(input_path) as src_img:
|
||||||
|
# === 判断是否为动图 ===
|
||||||
|
try:
|
||||||
|
n_frames = getattr(src_img, "n_frames", 1)
|
||||||
|
is_animated = n_frames > 1
|
||||||
|
except Exception:
|
||||||
|
is_animated = False
|
||||||
|
|
||||||
|
output_frames = []
|
||||||
|
output_durations_ms = []
|
||||||
|
|
||||||
|
if is_animated:
|
||||||
|
# === 动图模式:截取正向 + 镜像两段 ===
|
||||||
|
frames_with_duration = []
|
||||||
|
palette = src_img.getpalette()
|
||||||
|
|
||||||
|
for idx in range(n_frames):
|
||||||
|
src_img.seek(idx)
|
||||||
|
frame = src_img.copy()
|
||||||
|
# 检查是否需要透明通道
|
||||||
|
has_alpha = (
|
||||||
|
frame.mode in ("RGBA", "LA")
|
||||||
|
or (frame.mode == "P" and "transparency" in frame.info)
|
||||||
|
)
|
||||||
|
if has_alpha:
|
||||||
|
frame = frame.convert("RGBA")
|
||||||
|
else:
|
||||||
|
frame = frame.convert("RGB")
|
||||||
|
resized_frame = resize_frame(frame)
|
||||||
|
|
||||||
|
# 若原图有调色板,尝试保留(可选)
|
||||||
|
if palette and resized_frame.mode == "P":
|
||||||
|
try:
|
||||||
|
resized_frame.putpalette(palette)
|
||||||
|
except Exception: # noqa
|
||||||
|
pass
|
||||||
|
|
||||||
|
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
|
||||||
|
dur_sec = max(0.01, ms / 1000.0)
|
||||||
|
frames_with_duration.append((resized_frame, dur_sec))
|
||||||
|
|
||||||
|
max_dur = BASE_SEGMENT_DURATION * speed
|
||||||
|
accumulated = 0.0
|
||||||
|
frame_count = 0
|
||||||
|
|
||||||
|
# 正向段
|
||||||
|
for img, dur in frames_with_duration:
|
||||||
|
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||||
|
break
|
||||||
|
output_frames.append(img)
|
||||||
|
output_durations_ms.append(int(dur * 1000))
|
||||||
|
accumulated += dur
|
||||||
|
frame_count += 1
|
||||||
|
|
||||||
|
if frame_count == 0:
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# 镜像段(从头开始)
|
||||||
|
accumulated = 0.0
|
||||||
|
frame_count = 0
|
||||||
|
for img, dur in frames_with_duration:
|
||||||
|
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
|
||||||
|
break
|
||||||
|
flipped = img.transpose(Image.FLIP_LEFT_RIGHT)
|
||||||
|
output_frames.append(flipped)
|
||||||
|
output_durations_ms.append(int(dur * 1000))
|
||||||
|
accumulated += dur
|
||||||
|
frame_count += 1
|
||||||
|
|
||||||
|
else:
|
||||||
|
# === 静态图模式:制作翻转动画 ===
|
||||||
|
raw_frame = src_img.convert("RGBA")
|
||||||
|
resized_frame = resize_frame(raw_frame)
|
||||||
|
|
||||||
|
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
|
||||||
|
duration_ms = int(interval_sec * 1000)
|
||||||
|
|
||||||
|
frame1 = resized_frame
|
||||||
|
frame2 = resized_frame.transpose(Image.FLIP_LEFT_RIGHT)
|
||||||
|
|
||||||
|
output_frames = [frame1, frame2]
|
||||||
|
output_durations_ms = [duration_ms, duration_ms]
|
||||||
|
|
||||||
|
if len(output_frames) < 1:
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text("未能生成任何帧。").export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
|
||||||
|
need_transparency = False
|
||||||
|
for frame in output_frames:
|
||||||
|
if frame.mode == "RGBA":
|
||||||
|
alpha_channel = frame.getchannel("A")
|
||||||
|
if any(pix < 255 for pix in alpha_channel.getdata()):
|
||||||
|
need_transparency = True
|
||||||
|
break
|
||||||
|
elif frame.mode == "P" and "transparency" in frame.info:
|
||||||
|
need_transparency = True
|
||||||
|
break
|
||||||
|
|
||||||
|
# 如果不需要透明,则统一转为 RGB 避免调色板污染
|
||||||
|
if not need_transparency:
|
||||||
|
output_frames = [f.convert("RGB") for f in output_frames]
|
||||||
|
|
||||||
|
# 构建保存参数
|
||||||
|
save_kwargs = {
|
||||||
|
"save_all": True,
|
||||||
|
"append_images": output_frames[1:],
|
||||||
|
"format": "GIF",
|
||||||
|
"loop": 0, # 无限循环
|
||||||
|
"duration": output_durations_ms,
|
||||||
|
"disposal": 2, # 清除到背景色,避免残留
|
||||||
|
"optimize": False, # 关闭抖动(等效 -dither none)
|
||||||
|
}
|
||||||
|
|
||||||
|
# 只有真正需要透明时才启用 transparency
|
||||||
|
if need_transparency:
|
||||||
|
save_kwargs["transparency"] = 0
|
||||||
|
|
||||||
|
output_frames[0].save(output_path, **save_kwargs)
|
||||||
|
|
||||||
|
# 发送结果
|
||||||
|
with open(output_path, "rb") as f:
|
||||||
|
result_image = UniMessage.image(raw=f.read())
|
||||||
|
await ytpgif_cmd.send(await result_image.export())
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[YTPGIF] 处理失败: {e}")
|
||||||
|
await ytpgif_cmd.send(
|
||||||
|
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
for path in filter(None, [input_path, output_path]):
|
||||||
|
if os.path.exists(path):
|
||||||
|
try:
|
||||||
|
os.unlink(path)
|
||||||
|
except: # noqa
|
||||||
|
pass
|
||||||
0
scripts/__init__.py
Normal file
21
scripts/test_plugin_load.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import nonebot
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
nonebot.init()
|
||||||
|
nonebot.load_plugins("konabot/plugins")
|
||||||
|
|
||||||
|
plugins = nonebot.get_loaded_plugins()
|
||||||
|
len_requires = len(
|
||||||
|
[f for f in (
|
||||||
|
Path(__file__).parent.parent / "konabot" / "plugins"
|
||||||
|
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
|
||||||
|
)
|
||||||
|
|
||||||
|
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]
|
||||||
|
|
||||||
|
logger.info(f"已经加载的插件数量 {len(plugins)}")
|
||||||
|
logger.info(f"期待加载的插件数量 {len_requires}")
|
||||||
|
|
||||||
|
assert len(plugins) == len_requires
|
||||||