Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a0483d1d5c | |||
| ae83b66908 | |||
| 6abeb05a18 |
9
konabot/common/nb/exc.py
Normal file
9
konabot/common/nb/exc.py
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
from nonebot_plugin_alconna import UniMessage
|
||||||
|
|
||||||
|
|
||||||
|
class BotExceptionMessage(Exception):
|
||||||
|
def __init__(self, msg: UniMessage | str) -> None:
|
||||||
|
super().__init__()
|
||||||
|
if isinstance(msg, str):
|
||||||
|
msg = UniMessage().text(msg)
|
||||||
|
self.msg = msg
|
||||||
@ -1,17 +1,23 @@
|
|||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
from typing import Annotated
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
import PIL.Image
|
import PIL.Image
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
import nonebot
|
||||||
|
from nonebot.matcher import Matcher
|
||||||
from nonebot.adapters import Bot, Event, Message
|
from nonebot.adapters import Bot, Event, Message
|
||||||
from nonebot.adapters.discord import Bot as DiscordBot
|
from nonebot.adapters.discord import Bot as DiscordBot
|
||||||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||||
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
|
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
|
||||||
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
|
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
|
||||||
|
import nonebot.params
|
||||||
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||||
from PIL import UnidentifiedImageError
|
from PIL import UnidentifiedImageError
|
||||||
from returns.result import Failure, Result, Success
|
from returns.result import Failure, Result, Success
|
||||||
|
|
||||||
|
from konabot.common.nb.exc import BotExceptionMessage
|
||||||
|
|
||||||
|
|
||||||
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||||
# if "/matcha/cache/" in url:
|
# if "/matcha/cache/" in url:
|
||||||
@ -133,3 +139,21 @@ async def extract_image_from_message(
|
|||||||
else:
|
else:
|
||||||
return Failure("暂时不支持在这里中通过引用的方式获取图片")
|
return Failure("暂时不支持在这里中通过引用的方式获取图片")
|
||||||
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||||
|
|
||||||
|
|
||||||
|
async def _ext_img(
|
||||||
|
evt: Event,
|
||||||
|
bot: Bot,
|
||||||
|
matcher: Matcher,
|
||||||
|
) -> PIL.Image.Image | None:
|
||||||
|
match await extract_image_from_message(evt.get_message(), evt, bot):
|
||||||
|
case Success(img):
|
||||||
|
return img
|
||||||
|
case Failure(err):
|
||||||
|
# raise BotExceptionMessage(err)
|
||||||
|
await matcher.send(await UniMessage().text(err).export())
|
||||||
|
return None
|
||||||
|
assert False
|
||||||
|
|
||||||
|
|
||||||
|
PIL_Image = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]
|
||||||
|
|||||||
16
konabot/common/nb/match_keyword.py
Normal file
16
konabot/common/nb/match_keyword.py
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import re
|
||||||
|
|
||||||
|
from nonebot_plugin_alconna import Text, UniMsg
|
||||||
|
|
||||||
|
|
||||||
|
def match_keyword(*patterns: str | re.Pattern):
|
||||||
|
async def _matcher(msg: UniMsg):
|
||||||
|
text = msg.get(Text).extract_plain_text().strip()
|
||||||
|
for pattern in patterns:
|
||||||
|
if isinstance(pattern, str) and text == pattern:
|
||||||
|
return True
|
||||||
|
if isinstance(pattern, re.Pattern) and re.match(pattern, text):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
return _matcher
|
||||||
13
konabot/common/nb/reply_image.py
Normal file
13
konabot/common/nb/reply_image.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
import PIL
|
||||||
|
import PIL.Image
|
||||||
|
from nonebot.adapters import Bot
|
||||||
|
from nonebot.matcher import Matcher
|
||||||
|
from nonebot_plugin_alconna import UniMessage
|
||||||
|
|
||||||
|
|
||||||
|
async def reply_image(matcher: type[Matcher], bot: Bot, img: PIL.Image.Image):
|
||||||
|
data = BytesIO()
|
||||||
|
img.save(data, "PNG")
|
||||||
|
await matcher.send(await UniMessage().image(raw=data).export(bot))
|
||||||
45
konabot/plugins/errman.py
Normal file
45
konabot/plugins/errman.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from nonebot.adapters import Bot
|
||||||
|
from nonebot.matcher import Matcher
|
||||||
|
from nonebot.message import run_postprocessor
|
||||||
|
from nonebot_plugin_alconna import UniMessage
|
||||||
|
from returns.primitives.exceptions import UnwrapFailedError
|
||||||
|
|
||||||
|
from konabot.common.nb.exc import BotExceptionMessage
|
||||||
|
|
||||||
|
|
||||||
|
@run_postprocessor
|
||||||
|
async def _(bot: Bot, matcher: Matcher, exc: BotExceptionMessage | AssertionError | UnwrapFailedError):
|
||||||
|
if isinstance(exc, BotExceptionMessage):
|
||||||
|
msg = exc.msg
|
||||||
|
await matcher.send(await msg.export(bot))
|
||||||
|
if isinstance(exc, AssertionError):
|
||||||
|
if exc.args:
|
||||||
|
err_msg = exc.args[0]
|
||||||
|
|
||||||
|
err_msg_res: UniMessage
|
||||||
|
if isinstance(err_msg, str):
|
||||||
|
err_msg_res = UniMessage().text(err_msg)
|
||||||
|
elif isinstance(err_msg, UniMessage):
|
||||||
|
err_msg_res = err_msg
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
await matcher.send(await err_msg_res.export(bot))
|
||||||
|
if isinstance(exc, UnwrapFailedError):
|
||||||
|
obj = exc.halted_container
|
||||||
|
try:
|
||||||
|
failure: Any = obj.failure()
|
||||||
|
|
||||||
|
err_msg_res: UniMessage
|
||||||
|
if isinstance(failure, str):
|
||||||
|
err_msg_res = UniMessage().text(failure)
|
||||||
|
elif isinstance(failure, UniMessage):
|
||||||
|
err_msg_res = failure
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
await matcher.send(await err_msg_res.export(bot))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
13
konabot/plugins/image_process/__init__.py
Normal file
13
konabot/plugins/image_process/__init__.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
from nonebot import on_message
|
||||||
|
from nonebot.adapters import Bot
|
||||||
|
|
||||||
|
from konabot.common.nb.extract_image import PIL_Image
|
||||||
|
from konabot.common.nb.match_keyword import match_keyword
|
||||||
|
from konabot.common.nb.reply_image import reply_image
|
||||||
|
|
||||||
|
cmd_black_white = on_message(rule=match_keyword("黑白"))
|
||||||
|
|
||||||
|
|
||||||
|
@cmd_black_white.handle()
|
||||||
|
async def _(img: PIL_Image, bot: Bot):
|
||||||
|
await reply_image(cmd_black_white, bot, img.convert("LA"))
|
||||||
@ -1,18 +1,11 @@
|
|||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from PIL import Image
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from nonebot.adapters import Event as BaseEvent
|
|
||||||
from nonebot.adapters import Bot as BaseBot
|
from nonebot.adapters import Bot as BaseBot
|
||||||
|
from nonebot.adapters import Event as BaseEvent
|
||||||
from nonebot.plugin import PluginMetadata
|
from nonebot.plugin import PluginMetadata
|
||||||
from nonebot_plugin_alconna import (
|
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
|
||||||
Alconna,
|
from PIL import Image
|
||||||
Args,
|
|
||||||
Field,
|
|
||||||
UniMessage,
|
|
||||||
on_alconna,
|
|
||||||
)
|
|
||||||
from returns.result import Failure, Success
|
from returns.result import Failure, Success
|
||||||
|
|
||||||
from konabot.common.nb.extract_image import extract_image_from_message
|
from konabot.common.nb.extract_image import extract_image_from_message
|
||||||
@ -57,29 +50,6 @@ ytpgif_cmd = on_alconna(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def get_image_url(event: BaseEvent) -> Optional[str]:
|
|
||||||
"""从事件中提取图片 URL,支持直接消息和回复"""
|
|
||||||
msg = event.get_message()
|
|
||||||
for seg in msg:
|
|
||||||
if seg.type == "image" and seg.data.get("url"):
|
|
||||||
return str(seg.data["url"])
|
|
||||||
|
|
||||||
if hasattr(event, "reply") and (reply := getattr(event, "reply")):
|
|
||||||
reply_msg = reply.message
|
|
||||||
for seg in reply_msg:
|
|
||||||
if seg.type == "image" and seg.data.get("url"):
|
|
||||||
return str(seg.data["url"])
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
async def download_image(url: str) -> bytes:
|
|
||||||
import httpx
|
|
||||||
async with httpx.AsyncClient() as client:
|
|
||||||
resp = await client.get(url, timeout=10)
|
|
||||||
resp.raise_for_status()
|
|
||||||
return resp.content
|
|
||||||
|
|
||||||
|
|
||||||
def resize_frame(frame: Image.Image) -> Image.Image:
|
def resize_frame(frame: Image.Image) -> Image.Image:
|
||||||
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
|
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
|
||||||
w, h = frame.size
|
w, h = frame.size
|
||||||
|
|||||||
@ -8,9 +8,12 @@ nonebot.load_plugins("konabot/plugins")
|
|||||||
|
|
||||||
plugins = nonebot.get_loaded_plugins()
|
plugins = nonebot.get_loaded_plugins()
|
||||||
len_requires = len(
|
len_requires = len(
|
||||||
[f for f in (
|
[
|
||||||
Path(__file__).parent.parent / "konabot" / "plugins"
|
f
|
||||||
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
|
for f in (Path(__file__).parent.parent / "konabot" / "plugins").iterdir()
|
||||||
|
if (f.is_dir() and (f / "__init__.py").exists())
|
||||||
|
or ((not f.is_dir()) and f.suffix == ".py")
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]
|
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]
|
||||||
|
|||||||
Reference in New Issue
Block a user