Compare commits

...

5 Commits

Author SHA1 Message Date
91687fb8c3 Merge pull request 'feature-更多更多说' (#22) from feature-更多更多说 into master
Reviewed-on: mttu-developers/konabot#22
2025-10-03 17:40:52 +08:00
f889381cce 排序 import 2025-10-03 17:39:49 +08:00
1256055c9d 补充依赖 2025-10-03 17:38:02 +08:00
40f35a474e 搞小槽的说话 2025-10-03 17:37:43 +08:00
6b01acfa8c 十猫 2025-10-03 14:23:01 +08:00
10 changed files with 378 additions and 2004 deletions

BIN
assets/img/meme/caoimg1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 227 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 614 KiB

View File

@ -0,0 +1,135 @@
from io import BytesIO
import httpx
import PIL.Image
from loguru import logger
from nonebot.adapters import Bot, Event, Message
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
from PIL import UnidentifiedImageError
from returns.result import Failure, Result, Success
async def download_image_bytes(url: str) -> Result[bytes, str]:
# if "/matcha/cache/" in url:
# url = url.replace('127.0.0.1', '10.126.126.101')
logger.debug(f"开始从 {url} 下载图片")
async with httpx.AsyncClient() as c:
try:
response = await c.get(url)
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
return Failure(f"HTTPX 模块下载图片时出错:{e}")
except httpx.ConnectTimeout:
return Failure("下载图片失败了网络超时了qwq")
if response.status_code != 200:
return Failure("无法下载图片,可能存在网络问题需要排查")
return Success(response.content)
def bytes_to_pil(raw_data: bytes | BytesIO) -> Result[PIL.Image.Image, str]:
try:
if not isinstance(raw_data, BytesIO):
img_pil = PIL.Image.open(BytesIO(raw_data))
else:
img_pil = PIL.Image.open(raw_data)
img_pil.verify()
if not isinstance(raw_data, BytesIO):
img = PIL.Image.open(BytesIO(raw_data))
else:
raw_data.seek(0)
img = PIL.Image.open(raw_data)
return Success(img)
except UnidentifiedImageError:
return Failure("图像无法读取可能是格式不支持orz")
except IOError:
return Failure("图像无法读取可能是网络存在问题orz")
async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]:
if image.url is not None:
raw_result = await download_image_bytes(image.url)
elif image.raw is not None:
raw_result = Success(image.raw)
else:
return Failure("由于一些内部问题下载图片失败了orz")
return raw_result.bind(bytes_to_pil)
async def extract_image_from_qq_message(
msg: OnebotV11Message,
evt: OnebotV11MessageEvent,
bot: OnebotV11Bot,
allow_reply: bool = True,
) -> Result[PIL.Image.Image, str]:
if allow_reply and (reply := evt.reply) is not None:
return await extract_image_from_qq_message(
reply.message,
evt,
bot,
False,
)
for seg in msg:
if seg.type == "reply" and allow_reply:
msgid = seg.data.get("id")
if msgid is None:
return Failure("消息可能太久远,无法读取到消息原文")
try:
msg2 = await bot.get_msg(message_id=msgid)
except Exception as e:
logger.warning(f"获取消息内容时出错:{e}")
return Failure("消息可能太久远,无法读取到消息原文")
msg2_data = msg2.get("message")
if msg2_data is None:
return Failure("消息可能太久远,无法读取到消息原文")
logger.debug("发现消息引用,递归一层")
return await extract_image_from_qq_message(
msg=OnebotV11Message(msg2_data),
evt=evt,
bot=bot,
allow_reply=False,
)
if seg.type == "image":
url = seg.data.get("url")
if url is None:
return Failure("无法下载图片,可能有一些网络问题")
data = await download_image_bytes(url)
return data.bind(bytes_to_pil)
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
async def extract_image_from_message(
msg: Message,
evt: Event,
bot: Bot,
allow_reply: bool = True,
) -> Result[PIL.Image.Image, str]:
if (
isinstance(bot, OnebotV11Bot)
and isinstance(msg, OnebotV11Message)
and isinstance(evt, OnebotV11MessageEvent)
):
# 看起来 UniMessage 在这方面能力似乎不足,因此用 QQ 的
logger.debug('获取图片的路径 Fallback 到 QQ 模块')
return await extract_image_from_qq_message(msg, evt, bot, allow_reply)
for seg in UniMessage.of(msg, bot):
logger.info(seg)
if isinstance(seg, Image):
return await unimsg_img_to_pil(seg)
elif isinstance(seg, Reply) and allow_reply:
msg2 = seg.msg
logger.debug(f"深入搜索引用的消息:{msg2}")
if msg2 is None or isinstance(msg2, str):
continue
return await extract_image_from_message(msg2, evt, bot, False)
elif isinstance(seg, RefNode) and allow_reply:
if isinstance(bot, DiscordBot):
return Failure("暂时不支持在 Discord 中通过引用的方式获取图片")
else:
return Failure("暂时不支持在这里中通过引用的方式获取图片")
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")

View File

@ -1,9 +1,19 @@
from io import BytesIO
from typing import Iterable, cast
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
UniMessage, UniMsg, on_alconna)
from konabot.plugins.memepack.drawing.saying import draw_geimao, draw_mnk, draw_pt, draw_suan
from konabot.common.nb.extract_image import extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
draw_geimao, draw_mnk,
draw_pt, draw_suan)
from nonebot.adapters import Bot, Event
from returns.result import Success, Failure
geimao = on_alconna(Alconna(
"给猫说",
@ -50,7 +60,7 @@ async def _(saying: list[str]):
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna(
@ -66,7 +76,7 @@ async def _(saying: list[str]):
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna(
@ -82,4 +92,50 @@ async def _(saying: list[str]):
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna(
"乖猫说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写十猫说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"})
@cutecat.handle()
async def _(saying: list[str]):
img = await draw_cute_ten("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await cutecat.send(await UniMessage().image(raw=img_bytes).export())
cao_display_cmd = on_message()
@cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示":
flag = True
elif text.text.strip() == '':
continue
else:
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_cao_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await cao_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(' ')
.text(err)
.export()
)

View File

@ -0,0 +1,45 @@
import asyncio
from typing import Any, cast
import cv2
import numpy as np
import PIL.Image
from konabot.common.path import ASSETS_PATH
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
CAO_QUAD_POINTS = np.float32(cast(Any, [
[392, 540],
[577, 557],
[567, 707],
[381, 687],
]))
def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB"))
h, w = src.shape[:2]
src_points = np.float32(cast(Any, [
[0, 0],
[w, 0],
[w, h],
[0, h]
]))
dst_points = CAO_QUAD_POINTS
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
output_size = cao_image.size
output_w, output_h = output_size
warped = cv2.warpPerspective(
src,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
result = PIL.Image.fromarray(warped, 'RGB').convert('RGBA')
result = PIL.Image.alpha_composite(result, cao_image)
return result
async def draw_cao_display(image: PIL.Image.Image):
return await asyncio.to_thread(_draw_cao_display, image)

View File

@ -13,6 +13,7 @@ pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("R
mnk_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "mnksay.jpg").convert("RGBA")
dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert("RGBA")
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
cute_ten_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "tententen.png").convert("RGBA")
def _draw_geimao(saying: str):
@ -88,3 +89,20 @@ def _draw_suan(saying: str, dasuan: bool = False):
async def draw_suan(saying: str, dasuan: bool = False):
return await asyncio.to_thread(_draw_suan, saying, dasuan)
def _draw_cute_ten(saying: str):
img = cute_ten_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 390, 479, 0.5, 0.5, 760, 96, LXGWWENKAI_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_cute_ten(saying: str):
return await asyncio.to_thread(_draw_cute_ten, saying)

View File

@ -45,7 +45,7 @@ class Notify(BaseModel):
class NotifyConfigFile(BaseModel):
version: int = 1
version: int = 2
notifies: list[Notify] = []
unsent: list[Notify] = []
@ -89,13 +89,17 @@ async def notify_now(notify: Notify):
if notify.target_env is None:
await bot.send_private_msg(
user_id=int(notify.target),
message=f"代办通知:{notify.notify_msg}",
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
bot=bot,
)),
)
else:
await bot.send_group_msg(
group_id=int(notify.target_env),
message=cast(Any,
await UniMessage().at(notify.target).text(f" 代办通知:{notify.notify_msg}").export()
await UniMessage().at(
notify.target
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot)
),
)
else:
@ -197,11 +201,15 @@ async def _():
NOTIFIED_FLAG["task_added"] = True
await asyncio.sleep(10)
await DATA_FILE_LOCK.acquire()
tasks = []
cfg = load_notify_config()
for notify in cfg.notifies:
tasks.append(create_notify_task(notify, fail2remove=False))
if cfg.version == 1:
cfg.version = 2
else:
for notify in cfg.notifies:
tasks.append(create_notify_task(notify, fail2remove=False))
DATA_FILE_LOCK.release()
await asyncio.gather(*tasks)

21
poetry.lock generated
View File

@ -2460,6 +2460,25 @@ urllib3 = ">=1.21.1,<3"
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "returns"
version = "0.26.0"
description = "Make your functions return something meaningful, typed, and safe!"
optional = false
python-versions = "<4.0,>=3.10"
groups = ["main"]
files = [
{file = "returns-0.26.0-py3-none-any.whl", hash = "sha256:7cae94c730d6c56ffd9d0f583f7a2c0b32cfe17d141837150c8e6cff3eb30d71"},
{file = "returns-0.26.0.tar.gz", hash = "sha256:180320e0f6e9ea9845330ccfc020f542330f05b7250941d9b9b7c00203fcc3da"},
]
[package.dependencies]
typing-extensions = ">=4.0,<5.0"
[package.extras]
check-laws = ["hypothesis (>=6.136,<7.0)", "pytest (>=8.0,<9.0)"]
compatible-mypy = ["mypy (>=1.12,<1.18)"]
[[package]]
name = "rich"
version = "14.1.0"
@ -3162,4 +3181,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<4.0"
content-hash = "673703a789248d0f7369999c364352eb12f8bb5830a8b4b6918f8bab6425a763"
content-hash = "927913b9030d1f6c126bb2d12eab7307dc6297f259c7c62e3033706457d27ce0"

View File

@ -21,6 +21,7 @@ dependencies = [
"pillow (>=11.3.0,<12.0.0)",
"imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)",
]

File diff suppressed because it is too large Load Diff