搞小槽的说话
This commit is contained in:
135
konabot/common/nb/extract_image.py
Normal file
135
konabot/common/nb/extract_image.py
Normal file
@ -0,0 +1,135 @@
|
||||
from io import BytesIO
|
||||
|
||||
import httpx
|
||||
import PIL.Image
|
||||
from loguru import logger
|
||||
from nonebot.adapters import Bot, Message, Event
|
||||
from nonebot.adapters.discord import Bot as DiscordBot
|
||||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
|
||||
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
|
||||
from PIL import UnidentifiedImageError
|
||||
from returns.result import Failure, Result, Success
|
||||
|
||||
|
||||
async def download_image_bytes(url: str) -> Result[bytes, str]:
|
||||
# if "/matcha/cache/" in url:
|
||||
# url = url.replace('127.0.0.1', '10.126.126.101')
|
||||
logger.debug(f"开始从 {url} 下载图片")
|
||||
async with httpx.AsyncClient() as c:
|
||||
try:
|
||||
response = await c.get(url)
|
||||
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
|
||||
return Failure(f"HTTPX 模块下载图片时出错:{e}")
|
||||
except httpx.ConnectTimeout:
|
||||
return Failure("下载图片失败了,网络超时了qwq")
|
||||
if response.status_code != 200:
|
||||
return Failure("无法下载图片,可能存在网络问题需要排查")
|
||||
return Success(response.content)
|
||||
|
||||
|
||||
def bytes_to_pil(raw_data: bytes | BytesIO) -> Result[PIL.Image.Image, str]:
|
||||
try:
|
||||
if not isinstance(raw_data, BytesIO):
|
||||
img_pil = PIL.Image.open(BytesIO(raw_data))
|
||||
else:
|
||||
img_pil = PIL.Image.open(raw_data)
|
||||
img_pil.verify()
|
||||
if not isinstance(raw_data, BytesIO):
|
||||
img = PIL.Image.open(BytesIO(raw_data))
|
||||
else:
|
||||
raw_data.seek(0)
|
||||
img = PIL.Image.open(raw_data)
|
||||
return Success(img)
|
||||
except UnidentifiedImageError:
|
||||
return Failure("图像无法读取,可能是格式不支持orz")
|
||||
except IOError:
|
||||
return Failure("图像无法读取,可能是网络存在问题orz")
|
||||
|
||||
|
||||
async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]:
|
||||
if image.url is not None:
|
||||
raw_result = await download_image_bytes(image.url)
|
||||
elif image.raw is not None:
|
||||
raw_result = Success(image.raw)
|
||||
else:
|
||||
return Failure("由于一些内部问题,下载图片失败了orz")
|
||||
|
||||
return raw_result.bind(bytes_to_pil)
|
||||
|
||||
|
||||
async def extract_image_from_qq_message(
|
||||
msg: OnebotV11Message,
|
||||
evt: OnebotV11MessageEvent,
|
||||
bot: OnebotV11Bot,
|
||||
allow_reply: bool = True,
|
||||
) -> Result[PIL.Image.Image, str]:
|
||||
if allow_reply and (reply := evt.reply) is not None:
|
||||
return await extract_image_from_qq_message(
|
||||
reply.message,
|
||||
evt,
|
||||
bot,
|
||||
False,
|
||||
)
|
||||
for seg in msg:
|
||||
if seg.type == "reply" and allow_reply:
|
||||
msgid = seg.data.get("id")
|
||||
if msgid is None:
|
||||
return Failure("消息可能太久远,无法读取到消息原文")
|
||||
try:
|
||||
msg2 = await bot.get_msg(message_id=msgid)
|
||||
except Exception as e:
|
||||
logger.warning(f"获取消息内容时出错:{e}")
|
||||
return Failure("消息可能太久远,无法读取到消息原文")
|
||||
msg2_data = msg2.get("message")
|
||||
if msg2_data is None:
|
||||
return Failure("消息可能太久远,无法读取到消息原文")
|
||||
logger.debug("发现消息引用,递归一层")
|
||||
return await extract_image_from_qq_message(
|
||||
msg=OnebotV11Message(msg2_data),
|
||||
evt=evt,
|
||||
bot=bot,
|
||||
allow_reply=False,
|
||||
)
|
||||
if seg.type == "image":
|
||||
url = seg.data.get("url")
|
||||
if url is None:
|
||||
return Failure("无法下载图片,可能有一些网络问题")
|
||||
data = await download_image_bytes(url)
|
||||
return data.bind(bytes_to_pil)
|
||||
|
||||
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||
|
||||
|
||||
async def extract_image_from_message(
|
||||
msg: Message,
|
||||
evt: Event,
|
||||
bot: Bot,
|
||||
allow_reply: bool = True,
|
||||
) -> Result[PIL.Image.Image, str]:
|
||||
if (
|
||||
isinstance(bot, OnebotV11Bot)
|
||||
and isinstance(msg, OnebotV11Message)
|
||||
and isinstance(evt, OnebotV11MessageEvent)
|
||||
):
|
||||
# 看起来 UniMessage 在这方面能力似乎不足,因此用 QQ 的
|
||||
logger.debug('获取图片的路径 Fallback 到 QQ 模块')
|
||||
return await extract_image_from_qq_message(msg, evt, bot, allow_reply)
|
||||
|
||||
for seg in UniMessage.of(msg, bot):
|
||||
logger.info(seg)
|
||||
if isinstance(seg, Image):
|
||||
return await unimsg_img_to_pil(seg)
|
||||
elif isinstance(seg, Reply) and allow_reply:
|
||||
msg2 = seg.msg
|
||||
logger.debug(f"深入搜索引用的消息:{msg2}")
|
||||
if msg2 is None or isinstance(msg2, str):
|
||||
continue
|
||||
return await extract_image_from_message(msg2, evt, bot, False)
|
||||
elif isinstance(seg, RefNode) and allow_reply:
|
||||
if isinstance(bot, DiscordBot):
|
||||
return Failure("暂时不支持在 Discord 中通过引用的方式获取图片")
|
||||
else:
|
||||
return Failure("暂时不支持在这里中通过引用的方式获取图片")
|
||||
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
|
||||
Reference in New Issue
Block a user