from io import BytesIO from typing import Iterable, cast from loguru import logger from nonebot import on_message from nonebot_plugin_alconna import ( Alconna, Args, Field, Image, MultiVar, Option, Text, UniMessage, UniMsg, on_alconna, ) from playwright.async_api import ConsoleMessage, Page from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message from konabot.common.web_render import konaweb from konabot.common.web_render.core import WebRenderer from konabot.common.web_render.host_images import host_tempdir from konabot.plugins.memepack.drawing.display import ( draw_cao_display, draw_snaur_display, draw_anan_display, ) from konabot.plugins.memepack.drawing.saying import ( draw_cute_ten, draw_geimao, draw_kiosay, draw_mnk, draw_pt, draw_suan, ) from konabot.plugins.memepack.drawing.watermark import draw_doubao_watermark from nonebot.adapters import Bot, Event from returns.result import Success, Failure geimao = on_alconna( Alconna( "给猫说", Args[ "saying", MultiVar(str, "+"), Field(missing_tips=lambda: "你没有写给猫说了什么"), ], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"给猫哈"}, ) @geimao.handle() async def _(saying: list[str]): img = await draw_geimao("\n".join(saying)) img_bytes = BytesIO() img.save(img_bytes, format="PNG") await geimao.send(await UniMessage().image(raw=img_bytes).export()) pt = on_alconna( Alconna( "pt说", Args[ "saying", MultiVar(str, "+"), Field(missing_tips=lambda: "你没有写小帕说了什么"), ], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"小帕说"}, ) @pt.handle() async def _(saying: list[str]): img = await draw_pt("\n".join(saying)) img_bytes = BytesIO() img.save(img_bytes, format="PNG") await pt.send(await UniMessage().image(raw=img_bytes).export()) mnk = on_alconna( Alconna( "re:小?黑白子?说", Args[ "saying", MultiVar(str, "+"), Field(missing_tips=lambda: "你没有写黑白子说了什么"), ], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"}, ) @mnk.handle() async def _(saying: list[str]): img = await draw_mnk("\n".join(saying)) img_bytes = BytesIO() img.save(img_bytes, format="PNG") await mnk.send(await UniMessage().image(raw=img_bytes).export()) suan = on_alconna( Alconna( "小蒜说", Args[ "saying", MultiVar(str, "+"), Field(missing_tips=lambda: "你没有写小蒜说了什么"), ], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set(), ) @suan.handle() async def _(saying: list[str]): img = await draw_suan("\n".join(saying)) img_bytes = BytesIO() img.save(img_bytes, format="PNG") await suan.send(await UniMessage().image(raw=img_bytes).export()) dsuan = on_alconna( Alconna( "大蒜说", Args[ "saying", MultiVar(str, "+"), Field(missing_tips=lambda: "你没有写大蒜说了什么"), ], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set(), ) @dsuan.handle() async def _(saying: list[str]): img = await draw_suan("\n".join(saying), True) img_bytes = BytesIO() img.save(img_bytes, format="PNG") await dsuan.send(await UniMessage().image(raw=img_bytes).export()) cutecat = on_alconna( Alconna( "乖猫说", Args[ "saying", MultiVar(str, "+"), Field(missing_tips=lambda: "你没有写十猫说了什么"), ], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"}, ) @cutecat.handle() async def _(saying: list[str]): img = await draw_cute_ten("\n".join(saying)) img_bytes = BytesIO() img.save(img_bytes, format="PNG") await cutecat.send(await UniMessage().image(raw=img_bytes).export()) cao_display_cmd = on_message() @cao_display_cmd.handle() async def _(msg: UniMsg, evt: Event, bot: Bot): flag = False for text in cast(Iterable[Text], msg.get(Text)): if text.text.strip() == "小槽展示": flag = True elif text.text.strip() == "": continue else: return if not flag: return match await extract_image_from_message(evt.get_message(), evt, bot): case Success(img): img_handled = await draw_cao_display(img) img_bytes = BytesIO() img_handled.save(img_bytes, format="PNG") await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export()) case Failure(err): await cao_display_cmd.send( await UniMessage() .at(user_id=evt.get_user_id()) .text(" ") .text(err) .export() ) snaur_display_cmd = on_alconna( Alconna( "卵总展示", Option("--whiteness", Args["whiteness", float], alias=["-w"]), Option("--black-level", Args["black_level", float], alias=["-b"]), Option("--opacity", Args["opacity", float], alias=["-o"]), Option("--saturation", Args["saturation", float], alias=["-s"]), Args["image", Image | None], ) ) @snaur_display_cmd.handle() async def _( img: PIL_Image, whiteness: float = 0.0, black_level: float = 0.2, opacity: float = 0.8, saturation: float = 0.85, ): img_processed = await draw_snaur_display( img, whiteness, black_level, opacity, saturation, ) img_data = BytesIO() img_processed.save(img_data, "PNG") await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export()) anan_display_cmd = on_message() @anan_display_cmd.handle() async def _(msg: UniMsg, evt: Event, bot: Bot): flag = False for text in cast(Iterable[Text], msg.get(Text)): stripped = text.text.strip() if stripped == "安安展示": flag = True elif stripped == "": continue else: return if not flag: return match await extract_image_from_message(evt.get_message(), evt, bot): case Success(img): img_handled = await draw_anan_display(img) img_bytes = BytesIO() img_handled.save(img_bytes, format="PNG") await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export()) case Failure(err): await anan_display_cmd.send( await UniMessage() .at(user_id=evt.get_user_id()) .text(" ") .text(err) .export() ) kiosay = on_alconna( Alconna( "西多说", Args[ "saying", MultiVar(str, "+"), Field(missing_tips=lambda: "你没有写西多说了什么"), ], ), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set(), ) @kiosay.handle() async def _(saying: list[str]): img = await draw_kiosay("\n".join(saying)) img_bytes = BytesIO() img.save(img_bytes, format="PNG") await kiosay.send(await UniMessage().image(raw=img_bytes).export()) quote_cmd = on_alconna(Alconna( "名人名言", Args["quote", str], Args["author", str], Args["image?", Image | None], ), aliases={"quote"}) @quote_cmd.handle() async def _(quote: str, author: str, img: PIL_Image): async with host_tempdir() as tempdir: img_path = tempdir.path / "image.png" img_url = tempdir.url_of(img_path) img.save(img_path) async def page_function(page: Page): async def on_console(msg: ConsoleMessage): logger.debug(f"WEB CONSOLE {msg.text}") page.on('console', on_console) await page.locator('input[name=image]').fill(img_url) await page.locator('input[name=quote]').fill(quote) await page.locator('input[name=author]').fill(author) await page.wait_for_timeout(500) await page.wait_for_load_state('networkidle') await page.wait_for_timeout(500) out = await WebRenderer.render( konaweb('makequote'), target='#main', other_function=page_function, ) await quote_cmd.send(await UniMessage().image(raw=out).export()) doubao_cmd = on_alconna(Alconna( "豆包水印", Args["image?", Image | None], )) @doubao_cmd.handle() async def _(img: PIL_Image): result = await draw_doubao_watermark(img) result_bytes = BytesIO() result.save(result_bytes, format="PNG") await doubao_cmd.send(await UniMessage().image(raw=result_bytes).export())