Compare commits

..

57 Commits

Author SHA1 Message Date
2d688a6ed6 new
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-14 12:43:25 +08:00
e9aac52200 chengyu update 2025-10-14 01:23:49 +00:00
4305548ab5 submodule
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 22:53:44 +08:00
99382a3bf5 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 22:48:17 +08:00
92e43785bf submodule 2025-10-13 22:46:30 +08:00
fc5b11c5e8 调整 notify 的强制退出
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 22:16:50 +08:00
0ec66988fa 更新投票存储位置
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 22:05:21 +08:00
e5c3081c22 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 22:02:44 +08:00
14b356120a 成语接龙 2025-10-13 22:02:33 +08:00
a208302cb9 添加依赖
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 21:35:44 +08:00
01ffa451bb Merge pull request '投票功能和二维码生成(从 testpilot 移植)' (#26) from wzq02/konabot:master into master
Some checks failed
continuous-integration/drone/push Build is failing
Reviewed-on: #26
2025-10-13 21:33:03 +08:00
2b6c2e84bd Merge branch 'master' into master 2025-10-13 21:31:40 +08:00
4f0a9af2dc 成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 21:10:18 +08:00
4a4aa6b243 Add submodule: THUOCL 2025-10-13 21:10:05 +08:00
4c8625ae02 小完善(添加对应的 man) 2025-10-13 21:08:32 +08:00
c5f820a1f9 投票功能和二维码生成(从 testpilot 移植) 2025-10-13 20:49:56 +08:00
a3dd2dbbda 添加更加宽松的匹配规则
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 18:28:32 +08:00
8d4f74dafe 添加 Bilibili 视频解析的插件
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 18:12:39 +08:00
7c1bac64c9 修复在 log 文件中没有空格的问题
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 17:03:39 +08:00
e09fa13d0f 修复 Notify 的通知信息
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 16:55:50 +08:00
990a622cf6 添加一些日志用于调试 Notify 功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 11:48:22 +08:00
6144563d4d 添加 giftool 倒放选项 2025-10-13 11:34:06 +08:00
a6413c9809 添加报错和日志
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 21:52:35 +08:00
af566888ab fix2
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 15:24:49 +08:00
e72bc283f8 调整 giftool
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 15:18:07 +08:00
c9d58e7498 修改文档
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 13:45:34 +08:00
627a48da1c 添加安全限制 2025-10-12 13:40:40 +08:00
87be1916ee 添加 Shadertool(谁需要??????)
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-12 13:36:54 +08:00
0ca901e7b1 添加 giftool
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 12:47:52 +08:00
d096f43d38 添加 giftool 2025-10-12 12:40:33 +08:00
38ae3d1c74 补充黑白的 man 2025-10-12 12:04:19 +08:00
a0483d1d5c 修复断言逻辑
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 11:52:41 +08:00
ae83b66908 添加图像黑白
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-12 11:50:15 +08:00
6abeb05a18 去除未使用的函数 2025-10-12 11:02:51 +08:00
9b0a0368fa 修改 YTPGIF 的功能问题
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 10:55:44 +08:00
4eac493de4 更改令人费解的 requirements.txt 方案
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-09 20:25:34 +08:00
b4e400b626 调整 ytpgif 使用共用方法读取图片
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-09 19:56:16 +08:00
c35ee57976 优化时间读取逻辑 2025-10-09 19:55:53 +08:00
8edb999050 添加巨大多东西
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-07 16:07:48 +08:00
109a81923f 添加 man 指令 2025-10-07 15:54:16 +08:00
91687fb8c3 Merge pull request 'feature-更多更多说' (#22) from feature-更多更多说 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #22
2025-10-03 17:40:52 +08:00
f889381cce 排序 import 2025-10-03 17:39:49 +08:00
1256055c9d 补充依赖 2025-10-03 17:38:02 +08:00
40f35a474e 搞小槽的说话 2025-10-03 17:37:43 +08:00
6b01acfa8c 十猫 2025-10-03 14:23:01 +08:00
09c9d44798 Merge pull request 'Feature: 好多好多的说' (#21) from feature-新说 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #21
2025-10-03 13:52:31 +08:00
0c4206f461 好多好多的说 2025-10-03 13:49:36 +08:00
9fb8fd90dc 修复类型注解
Some checks are pending
continuous-integration/drone/push Build is running
2025-10-02 12:06:20 +08:00
8c4fa2b5e4 Merge pull request 'fix: 透明底正常生成;静动图分离完成' (#18) from tnot/konabot:fix--修复部分Bug into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #18
2025-10-01 19:24:23 +08:00
fb2c3f1ce2 fix: 透明底正常生成;静动图分离完成 2025-10-01 11:54:54 +08:00
265415e727 Merge pull request 'feat: ytpgif' (#16) from tnot/konabot:feat--ytpgif into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #16
2025-09-30 22:38:15 +08:00
06555b2225 feat: ytpgif 2025-09-30 22:24:16 +08:00
f6fd25a41d fix: 添加 Notify 任务创建限制
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 19:08:30 +08:00
9f6c70bf0f 合并后的部分骰子路径修复
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 15:21:53 +08:00
1c01e49d5d Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-09-30 15:18:16 +08:00
48c719bc33 全新骰子 2025-09-30 15:13:59 +08:00
6bc9f94e83 拷贝环境变量
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-09-30 02:45:01 +08:00
72 changed files with 703526 additions and 2517 deletions

View File

@ -10,6 +10,10 @@ trigger:
- master
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: 构建 Docker 镜像
image: plugins/docker:latest
privileged: true
@ -50,6 +54,10 @@ trigger:
- tag
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: 构建并推送 Release Docker 镜像
image: plugins/docker:latest
privileged: true

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "assets/lexicon/THUOCL"]
path = assets/lexicon/THUOCL
url = https://github.com/thunlp/THUOCL.git

24
.vscode/launch.json vendored
View File

@ -1,24 +0,0 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "运行 Bot 并调试(自动重载)",
"type": "debugpy",
"request": "launch",
"module": "watchfiles",
"args": [
"bot.main"
],
"console": "integratedTerminal",
"justMyCode": true,
"env": {
"PYTHONPATH": "${workspaceFolder}"
},
"cwd": "${workspaceFolder}",
"presentation": {
"hidden": false,
"group": "bot"
}
}
]
}

30
.vscode/tasks.json vendored
View File

@ -1,30 +0,0 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "Poetry: Export requirements.txt (Production)",
"type": "shell",
"command": "poetry export -f requirements.txt --output requirements.txt --without-hashes",
"group": "build",
"presentation": {
"reveal": "always",
"panel": "new"
},
"problemMatcher": [],
"detail": "导出生产环境依赖到 requirements.txt"
},
{
"label": "Bot: Run with Auto-reload",
"type": "shell",
"command": "poetry run watchfiles bot.main",
"group": "build",
"isBackground": true,
"presentation": {
"reveal": "always",
"panel": "new"
},
"problemMatcher": [],
"detail": "运行 bot 并启用自动重载功能"
}
]
}

View File

@ -1,10 +1,43 @@
FROM python:3.13-slim
# copied from https://www.martinrichards.me/post/python_poetry_docker/
FROM python:3.13-slim AS base
ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH"
RUN apt-get update && \
apt-get install -y --no-install-recommends \
libfontconfig1 \
libgl1 \
libegl1 \
libglvnd0 \
mesa-vulkan-drivers \
&& rm -rf /var/lib/apt/lists/*
FROM base AS builder
ENV POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_IN_PROJECT=1 \
POETRY_VIRTUALENVS_CREATE=1 \
POETRY_CACHE_DIR=/tmp/poetry_cache
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt --no-deps
COPY bot.py pyproject.toml ./
RUN pip install poetry
COPY pyproject.toml poetry.lock ./
RUN python -m poetry install --no-root && rm -rf $POETRY_CACHE_DIR
FROM base AS runtime
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
WORKDIR /app
COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets
COPY scripts ./scripts
COPY konabot ./konabot

View File

@ -65,10 +65,10 @@ code .
### 运行
你可以在 VSCode 的「运行与调试」窗口,启动 `运行 Bot 并调试(自动重载)` 任务来启动 Bot也可以使用命令行手动启动 Bot
使用命令行手动启动 Bot
```bash
poetry run watchfiles bot.main
poetry run watchfiles bot.main konabot
```
如果你不希望自动重载,只是想运行 Bot可以直接运行

Binary file not shown.

BIN
assets/img/dice/stick.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

BIN
assets/img/meme/caoimg1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 227 KiB

BIN
assets/img/meme/dss.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 172 KiB

BIN
assets/img/meme/mnksay.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 364 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 614 KiB

1
assets/json/poll.json Normal file
View File

@ -0,0 +1 @@
{"poll": {"0": {"create": 1760357553, "expiry": 1760443953, "options": {"0": "此方bot", "1": "testpilot", "2": "小镜bot", "3": "可怜bot"}, "polldata": {}, "qq": "2975499623", "title": "我~是~谁~"}}}

1
assets/lexicon/THUOCL Submodule

Submodule assets/lexicon/THUOCL added at a30ce79d89

1
assets/lexicon/ci.json Normal file

File diff suppressed because one or more lines are too long

360393
assets/lexicon/common.txt Normal file

File diff suppressed because it is too large Load Diff

339847
assets/lexicon/idiom.json Normal file

File diff suppressed because it is too large Load Diff

10
bot.py
View File

@ -7,6 +7,10 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
from konabot.common.log import init_logger
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.path import LOG_PATH
dotenv.load_dotenv()
env = os.environ.get("ENVIRONMENT", "prod")
env_enable_console = os.environ.get("ENABLE_CONSOLE", "none")
@ -14,7 +18,12 @@ env_enable_qq = os.environ.get("ENABLE_QQ", "none")
env_enable_discord = os.environ.get("ENABLE_DISCORD", "none")
env_enable_minecraft = os.environ.get("ENABLE_MINECRAFT", "none")
def main():
init_logger(LOG_PATH, [
BotExceptionMessage,
])
nonebot.init()
driver = nonebot.get_driver()
@ -33,6 +42,7 @@ def main():
# nonebot.load_builtin_plugin("echo")
nonebot.load_plugins("konabot/plugins")
nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
nonebot.run()

79
konabot/common/log.py Normal file
View File

@ -0,0 +1,79 @@
import sys
from pathlib import Path
from typing import TYPE_CHECKING, List, Type
from loguru import logger
if TYPE_CHECKING:
from loguru import Record
def file_exception_filter(
record: "Record",
ignored_exceptions: tuple[Type[Exception], ...]
) -> bool:
"""
一个自定义的 Loguru 过滤器函数。
如果日志记录包含异常信息,并且该异常的类型在 ignored_exceptions 中,则返回 False忽略
否则,返回 True允许记录
"""
exception_info = record.get("exception")
if exception_info:
exception_type = exception_info[0]
if exception_type and issubclass(exception_type, ignored_exceptions):
return False
return True
def init_logger(
log_dir: Path,
ignored_exceptions: List[Type[Exception]]
) -> None:
"""
配置全局 Loguru Logger。
Args:
log_dir (Path): 存放日志文件的文件夹路径,会自动创建。
ignored_exceptions (List[Type[Exception]]): 在 WARNING 级别文件日志中需要忽略的异常类型列表。
"""
ignored_exceptions_tuple = tuple(ignored_exceptions)
logger.remove()
log_dir.mkdir(parents=True, exist_ok=True)
logger.add(
sys.stderr,
level="INFO",
colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
)
info_log_path = log_dir / "log.log"
logger.add(
str(info_log_path),
level="INFO",
rotation="10 MB",
retention="7 days",
enqueue=True,
backtrace=False,
diagnose=False,
)
warning_error_log_path = log_dir / "error.log"
logger.add(
str(warning_error_log_path),
level="WARNING",
rotation="10 MB",
compression="zip",
enqueue=True,
filter=lambda record: file_exception_filter(record, ignored_exceptions_tuple),
backtrace=True,
diagnose=True,
)
logger.info("Loguru Logger 初始化完成!")
logger.info(f"控制台日志级别: INFO")

9
konabot/common/nb/exc.py Normal file
View File

@ -0,0 +1,9 @@
from nonebot_plugin_alconna import UniMessage
class BotExceptionMessage(Exception):
def __init__(self, msg: UniMessage | str) -> None:
super().__init__()
if isinstance(msg, str):
msg = UniMessage().text(msg)
self.msg = msg

View File

@ -0,0 +1,159 @@
from io import BytesIO
from typing import Annotated
import httpx
import PIL.Image
from loguru import logger
import nonebot
from nonebot.matcher import Matcher
from nonebot.adapters import Bot, Event, Message
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11 import Message as OnebotV11Message
from nonebot.adapters.onebot.v11 import MessageEvent as OnebotV11MessageEvent
import nonebot.params
from nonebot_plugin_alconna import Image, RefNode, Reply, UniMessage
from PIL import UnidentifiedImageError
from returns.result import Failure, Result, Success
from konabot.common.nb.exc import BotExceptionMessage
async def download_image_bytes(url: str) -> Result[bytes, str]:
# if "/matcha/cache/" in url:
# url = url.replace('127.0.0.1', '10.126.126.101')
logger.debug(f"开始从 {url} 下载图片")
async with httpx.AsyncClient() as c:
try:
response = await c.get(url)
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
return Failure(f"HTTPX 模块下载图片时出错:{e}")
except httpx.ConnectTimeout:
return Failure("下载图片失败了网络超时了qwq")
if response.status_code != 200:
return Failure("无法下载图片,可能存在网络问题需要排查")
return Success(response.content)
def bytes_to_pil(raw_data: bytes | BytesIO) -> Result[PIL.Image.Image, str]:
try:
if not isinstance(raw_data, BytesIO):
img_pil = PIL.Image.open(BytesIO(raw_data))
else:
img_pil = PIL.Image.open(raw_data)
img_pil.verify()
if not isinstance(raw_data, BytesIO):
img = PIL.Image.open(BytesIO(raw_data))
else:
raw_data.seek(0)
img = PIL.Image.open(raw_data)
return Success(img)
except UnidentifiedImageError:
return Failure("图像无法读取可能是格式不支持orz")
except IOError:
return Failure("图像无法读取可能是网络存在问题orz")
async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]:
if image.url is not None:
raw_result = await download_image_bytes(image.url)
elif image.raw is not None:
raw_result = Success(image.raw)
else:
return Failure("由于一些内部问题下载图片失败了orz")
return raw_result.bind(bytes_to_pil)
async def extract_image_from_qq_message(
msg: OnebotV11Message,
evt: OnebotV11MessageEvent,
bot: OnebotV11Bot,
allow_reply: bool = True,
) -> Result[PIL.Image.Image, str]:
if allow_reply and (reply := evt.reply) is not None:
return await extract_image_from_qq_message(
reply.message,
evt,
bot,
False,
)
for seg in msg:
if seg.type == "reply" and allow_reply:
msgid = seg.data.get("id")
if msgid is None:
return Failure("消息可能太久远,无法读取到消息原文")
try:
msg2 = await bot.get_msg(message_id=msgid)
except Exception as e:
logger.warning(f"获取消息内容时出错:{e}")
return Failure("消息可能太久远,无法读取到消息原文")
msg2_data = msg2.get("message")
if msg2_data is None:
return Failure("消息可能太久远,无法读取到消息原文")
logger.debug("发现消息引用,递归一层")
return await extract_image_from_qq_message(
msg=OnebotV11Message(msg2_data),
evt=evt,
bot=bot,
allow_reply=False,
)
if seg.type == "image":
url = seg.data.get("url")
if url is None:
return Failure("无法下载图片,可能有一些网络问题")
data = await download_image_bytes(url)
return data.bind(bytes_to_pil)
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
async def extract_image_from_message(
msg: Message,
evt: Event,
bot: Bot,
allow_reply: bool = True,
) -> Result[PIL.Image.Image, str]:
if (
isinstance(bot, OnebotV11Bot)
and isinstance(msg, OnebotV11Message)
and isinstance(evt, OnebotV11MessageEvent)
):
# 看起来 UniMessage 在这方面能力似乎不足,因此用 QQ 的
logger.debug('获取图片的路径 Fallback 到 QQ 模块')
return await extract_image_from_qq_message(msg, evt, bot, allow_reply)
for seg in UniMessage.of(msg, bot):
logger.info(seg)
if isinstance(seg, Image):
return await unimsg_img_to_pil(seg)
elif isinstance(seg, Reply) and allow_reply:
msg2 = seg.msg
logger.debug(f"深入搜索引用的消息:{msg2}")
if msg2 is None or isinstance(msg2, str):
continue
return await extract_image_from_message(msg2, evt, bot, False)
elif isinstance(seg, RefNode) and allow_reply:
if isinstance(bot, DiscordBot):
return Failure("暂时不支持在 Discord 中通过引用的方式获取图片")
else:
return Failure("暂时不支持在这里中通过引用的方式获取图片")
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
async def _ext_img(
evt: Event,
bot: Bot,
matcher: Matcher,
) -> PIL.Image.Image | None:
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
return img
case Failure(err):
# raise BotExceptionMessage(err)
await matcher.send(await UniMessage().text(err).export())
return None
assert False
PIL_Image = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]

View File

@ -0,0 +1,34 @@
from nonebot import get_plugin_config
import nonebot
import nonebot.adapters
import nonebot.adapters.console
import nonebot.adapters.discord
import nonebot.adapters.onebot
from pydantic import BaseModel
class IsAdminConfig(BaseModel):
admin_qq_group: list[int] = []
admin_qq_account: list[int] = []
admin_discord_channel: list[int] = []
admin_discord_account: list[int] = []
cfg = get_plugin_config(IsAdminConfig)
def is_admin(event: nonebot.adapters.Event):
if isinstance(event, nonebot.adapters.onebot.v11.MessageEvent):
if event.user_id in cfg.admin_qq_account:
return True
if isinstance(event, nonebot.adapters.onebot.v11.GroupMessageEvent):
if event.group_id in cfg.admin_qq_group:
return True
if isinstance(event, nonebot.adapters.discord.event.MessageEvent):
if event.channel_id in cfg.admin_discord_channel:
return True
if event.user_id in cfg.admin_discord_account:
return True
if isinstance(event, nonebot.adapters.console.event.Event):
return True
return False

View File

@ -0,0 +1,16 @@
import re
from nonebot_plugin_alconna import Text, UniMsg
def match_keyword(*patterns: str | re.Pattern):
async def _matcher(msg: UniMsg):
text = msg.get(Text).extract_plain_text().strip()
for pattern in patterns:
if isinstance(pattern, str) and text == pattern:
return True
if isinstance(pattern, re.Pattern) and re.match(pattern, text):
return True
return False
return _matcher

View File

@ -0,0 +1,13 @@
from io import BytesIO
import PIL
import PIL.Image
from nonebot.adapters import Bot
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import UniMessage
async def reply_image(matcher: type[Matcher], bot: Bot, img: PIL.Image.Image):
data = BytesIO()
img.save(data, "PNG")
await matcher.send(await UniMessage().image(raw=data).export(bot))

View File

@ -2,3 +2,20 @@ from pathlib import Path
ASSETS_PATH = Path(__file__).resolve().parent.parent.parent / "assets"
FONTS_PATH = ASSETS_PATH / "fonts"
SRC_PATH = Path(__file__).resolve().parent.parent
DATA_PATH = SRC_PATH.parent / "data"
LOG_PATH = DATA_PATH / "logs"
DOCS_PATH = SRC_PATH / "docs"
DOCS_PATH_MAN1 = DOCS_PATH / "user"
DOCS_PATH_MAN3 = DOCS_PATH / "lib"
DOCS_PATH_MAN7 = DOCS_PATH / "concepts"
DOCS_PATH_MAN8 = DOCS_PATH / "sys"
if not DATA_PATH.exists():
DATA_PATH.mkdir()
if not LOG_PATH.exists():
LOG_PATH.mkdir()

40
konabot/docs/README.md Normal file
View File

@ -0,0 +1,40 @@
# 此方 Bot 的文档系统
此方 Bot 使用类 Linux 的 `man` 指令来管理文档。文档一般建议使用纯文本书写,带有相对良好的格式。
## 文件夹摆放规则
`docs` 目录下,有若干文档可以拿来阅读和输出。每个子文件夹里,文档文件使用名字不含空格的 txt 文件书写,其他后缀名的文件将会被忽略。所以,如果你希望有些文件只在代码库中可阅读,你可以使用 `.md` 格式。
### 1 - user
`docs/user` 目录下的文档是直接会给用户进行检索的文档,在直接使用 `man` 指令时,会搜索该文件夹的全部文件,以知晓所有有文档的指令。
### 3 - lib
`docs/lib` 目录下的文档主要给该项目的维护者进行阅读和使用,讲述的是本项目内置的一些函数的功能讲解(一般以便利为主要目的)以及一些项目安排上的要求。一般不会列举,除非用户指定要求列举该范围。
### 7 - concepts
`docs/concepts` 用来摆放任何的概念。任何的。一般不会列举,除非用户指定要求列举该范围。
### 8 - sys
`docs/sys` 用于摆放仅 MTTU 群可以使用的文档集合。在 MTTU 群内,该目录下的文档也会被索引,否则文档将不可阅读。
## 书写规范
无特殊要求,因为当用户进行 `man` 的时候,会将文档内的内容原封不动地展示出来。但是,你仍然可以模仿 Linux 下的 `man` 指令的格式进行书写。
```
指令介绍
man - 用于展示此方 BOT 使用手册的指令
格式
man [文档类型] <指令>
示例
`man` 查看所有有文档的指令清单
`man 喵` 查看指令「喵」的使用说明
……
```

View File

View File

@ -0,0 +1,45 @@
指令介绍
is_admin - 用于判断当前事件是否来自管理员的内部权限校验函数
格式
from konabot.common.nb.is_admin import is_admin
from nonebot import on
from nonebot.adapters import Event
from loguru import logger
@on().handle()
async def _(event: Event):
if is_admin(event):
logger.info("管理员发送了消息")
说明
is_admin 是 Bot 内部用于权限控制的核心函数根据事件来源QQ、Discord、控制台及插件配置判断触发事件的用户或群组是否具有管理员权限。
支持的适配器与判定逻辑:
• OneBot V11QQ
- 若用户 ID 在配置项 admin_qq_account 中,则视为管理员
- 若为群聊消息,且群 ID 在配置项 admin_qq_group 中,则视为管理员
• Discord
- 若频道 ID 在配置项 admin_discord_channel 中,则视为管理员
- 若用户 ID 在配置项 admin_discord_account 中,则视为管理员
• Console控制台
- 所有控制台输入均默认视为管理员操作,自动返回 True
配置项(位于插件配置中)
ADMIN_QQ_GROUP: list[int]
允许的管理员 QQ 群 ID 列表
ADMIN_QQ_ACCOUNT: list[int]
允许的管理员 QQ 账号 ID 列表
ADMIN_DISCORD_CHANNEL: list[int]
允许的管理员 Discord 频道 ID 列表
ADMIN_DISCORD_ACCOUNT: list[int]
允许的管理员 Discord 用户 ID 列表
注意事项
- 若未在配置文件中设置任何管理员 ID该函数对所有非控制台事件返回 False
- 控制台事件始终拥有管理员权限,便于本地调试与运维

View File

1
konabot/docs/sys/out.txt Normal file
View File

@ -0,0 +1 @@
MAN what can I say!

View File

@ -0,0 +1,59 @@
指令介绍
giftool - 对 GIF 动图进行裁剪、抽帧等处理
格式
giftool [图片] [选项]
示例
回复一张 GIF 并发送:
`giftool --ss 1.5 -t 2.0`
从 1.5 秒处开始,截取 2 秒长度的片段。
`giftool [图片] --ss 0:10 -to 0:15`
截取从 10 秒到 15 秒之间的片段(支持 MM:SS 或 HH:MM:SS 格式)。
`giftool [图片] --frames:v 10`
将整张 GIF 均匀抽帧,最终保留 10 帧。
`giftool [图片] --ss 2 --frames:v 5`
从第 2 秒开始截取,并将结果抽帧为 5 帧。
参数说明
图片(必需)
- 必须是 GIF 动图。
- 支持直接附带图片,或回复一条含 GIF 的消息后使用指令。
--ss <时间戳>(可选)
- 指定开始时间(单位:秒),可使用以下格式:
• 纯数字(如 `1.5` 表示 1.5 秒)
• 分秒格式(如 `1:30` 表示 1 分 30 秒)
• 时分秒格式(如 `0:1:30` 表示 1 分 30 秒)
- 默认从开头开始0 秒)。
-t <持续时间>(可选)
- 指定截取的持续时间(单位:秒),格式同 --ss。
- 与 --ss 配合使用:截取 [ss, ss + t] 区间。
- 不能与 --to 同时使用。
--to <时间戳>(可选)
- 指定结束时间(单位:秒),格式同 --ss。
- 与 --ss 配合使用:截取 [ss, to] 区间。
- 不能与 -t 同时使用。
--frames:v <帧数>(可选)
- 对截取后的片段进行均匀抽帧,保留指定数量的帧。
- 帧数必须为正整数(> 0
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
--s <速度>(可选)
- 调整 gif 图的速度。若为负数,则代表倒放
使用方式
1. 发送指令前,请确保:
- 消息中附带一张 GIF 动图,或
- 回复一条包含 GIF 动图的消息后再发送指令。
2. 插件会自动:
- 解析 GIF 的每一帧及其持续时间duration
- 根据时间参数转换为帧索引进行裁剪
- 如指定抽帧,则对裁剪后的片段均匀采样
- 生成新的 GIF 并保持原始循环设置loop=0

20
konabot/docs/user/man.txt Normal file
View File

@ -0,0 +1,20 @@
指令介绍
man - 用于展示此方 BOT 使用手册的指令
格式
man 文档类型
man [文档类型] <指令>
示例
`man` 查看所有有文档的指令清单
`man 3` 列举所有可读文档的库函数清单
`man 喵` 查看指令「喵」的使用说明
`man 8 out` 查看管理员指令「out」的使用说明
文档类型
文档类型用来区分同一指令在不同场景下的情景。你可以使用数字编号进行筛选。分为这些种类:
- 1 用户态指令,用于日常使用的指令
- 3 库函数指令,用于 Bot 开发用的函数查询
- 7 概念指令,用于概念解释
- 8 系统指令,仅管理员可用

View File

@ -0,0 +1,21 @@
指令介绍
openssl - 用于生成指定长度的加密安全随机数据
格式
openssl rand <模式> <字节数>
示例
`openssl rand -hex 16` 生成 16 字节的十六进制随机数
`openssl rand -base64 32` 生成 32 字节并以 Base64 编码输出的随机数据
说明
该指令使用 Python 的 secrets 模块生成加密安全的随机字节,并支持以十六进制(-hex或 Base64-base64格式输出。
参数说明
模式mode
- -hex :以十六进制字符串形式输出随机数据
- -base64 :以 Base64 编码字符串形式输出随机数据
字节数num
- 必须为正整数
- 最大支持 256 字节

View File

@ -0,0 +1,47 @@
指令介绍
shadertool - 使用 SkSLSkia Shader Language代码实时渲染并生成 GIF 动画
格式
shadertool [选项] <SkSL 代码>
示例
shadertool """
uniform float u_time;
uniform float2 u_resolution;
half4 main(float2 coord) {
return half4(
1.0,
sin((coord.y / u_resolution.y + u_time) * 3.1415926 * 2) * 0.5 + 0.5,
coord.x / u_resolution.x,
1.0
);
}
"""
参数说明
SkSL 代码(必填)
- 类型:字符串(建议用英文双引号包裹)
- 内容:符合 SkSL 语法的片段着色器代码,必须包含 `void main()` 函数,并为 `sk_FragColor` 赋值。
- 注意:插件会自动去除代码首尾的单引号或双引号,便于命令行输入。
--width <整数>(可选)
- 默认值320
- 作用:输出 GIF 的宽度(像素),必须大于 0。
--height <整数>(可选)
- 默认值180
- 作用:输出 GIF 的高度(像素),必须大于 0。
--duration <浮点数>(可选)
- 默认值1.0
- 作用:动画总时长(秒),必须大于 0。
- 限制:`duration × fps` 必须 ≥ 1 且 ≤ 100即至少 1 帧,最多 100 帧)。
--fps <浮点数>(可选)
- 默认值15.0
- 作用:每秒帧数,控制动画流畅度,必须大于 0。
- 常见值10低配流畅、15默认、24/30电影/视频级)。
使用方式
直接在群聊或私聊中发送 `shadertool` 指令,附上合法的 SkSL 代码即可。

View File

@ -0,0 +1,41 @@
指令介绍
ytpgif - 生成来回镜像翻转的仿 YTPMV 动图
格式
ytpgif [倍速]
示例
`ytpgif`
使用默认倍速1.0)处理你发送或回复的图片,生成镜像动图。
`ytpgif 2.5`
以 2.5 倍速处理图片,生成更快节奏的镜像动图。
回复一张图片并发送 `ytpgif 0.5`
以慢速0.5 倍)生成镜像动图。
参数说明
倍速(可选)
- 类型:浮点数
- 默认值1.0
- 有效范围0.1 20.0
- 作用:
• 对于静态图:控制镜像切换的快慢(值越大,切换越快)。
• 对于动图:控制截取原始动图正向和反向片段的时长(值越大,截取的片段越长)。
使用方式
发送指令前,请确保:
- 直接在消息中附带一张图片,或
- 回复一条包含图片的消息后再发送指令。
插件会自动:
- 下载并识别图片(支持静态图和 GIF 动图)
- 自动缩放至最大边长不超过 256 像素(保持宽高比)
- 静态图 → 生成“原图↔镜像”循环动图
- 动图 → 截取开头一段正向播放 + 同一段镜像翻转播放,拼接成新动图
- 保留透明通道(如原图含透明),否则转为 RGB 避免颜色异常
注意事项
- 图片过大、格式损坏或网络问题可能导致处理失败。
- 动图帧数过多或单帧过短可能无法生成有效输出。
- 输出 GIF 最大单段帧数限制为 500 帧,以防资源耗尽。

View File

@ -0,0 +1,11 @@
指令介绍
发起投票 - 发起一个投票
格式
发起投票 <投票标题> <选项1> <选项2> ...
示例
`发起投票 这是一个投票 A B C` 发起标题为“这是一个投票”选项为“A”、“B”、“C”的投票
说明
投票各个选项之间用空格分隔选项数量为2-15项。投票的默认有效期为24小时。

View File

@ -0,0 +1,2 @@
指令介绍
喵 - 你发喵,此方就会回复喵

View File

@ -0,0 +1,12 @@
指令介绍
投票 - 参与已发起的投票
格式
投票 <投票ID/标题> <选项文本>
示例
`投票 1 A` 在ID为1的投票中投给“A”
`投票 这是一个投票 B` 在标题为“这是一个投票”的投票中投给“B”
说明
目前不支持单人多投,每个人只能投一项。

View File

@ -0,0 +1,7 @@
指令介绍
摇数字 - 生成一个随机数字并发送
示例
`摇数字` 随机生成一个 1-6 的数字
该指令不接受任何参数,直接调用即可。

View File

@ -0,0 +1,22 @@
指令介绍
摇骰子 - 用于生成随机数并以骰子图像形式展示的指令
格式
摇骰子 [最小值] [最大值]
示例
`摇骰子` 随机生成一个 1-6 的数字,并显示对应的骰子图像
`摇骰子 10` 生成 1 到 10 之间的随机整数
`摇骰子 0.5` 生成 0 到 0.5 之间的随机小数
`摇骰子 -5 5` 生成 -5 到 5 之间的随机数
说明
该指令支持以下几种调用方式:
- 不带参数:使用默认范围生成随机数
- 仅指定一个参数 f1
- 若 f1 > 1则生成 [1, f1] 范围内的随机数
- 若 0 < f1 ≤ 1则生成 [0, f1] 范围内的随机数
- 若 f1 ≤ 0则生成 [f1, 0] 范围内的随机数
- 指定两个参数 f1 和 f2生成 [f1, f2] 范围内的随机数(顺序无关,内部会自动处理大小)
返回结果将以骰子样式的图像形式展示生成的随机数值。

View File

@ -0,0 +1,12 @@
指令介绍
查看投票 - 查看已发起的投票
格式
查看投票 <投票ID或标题>
示例
`查看投票 1` 查看ID为1的投票
`查看投票 这是一个投票` 查看标题为“这是一个投票”的投票
说明
投票在进行时,使用此命令可以看到投票的各个选项;投票结束后,则可以看到各项的票数。

View File

@ -0,0 +1,8 @@
指令介绍
生成二维码 - 将文本内容转换为二维码
格式
生成二维码 <文本内容>
示例
`生成二维码 嗨嗨嗨` 生成扫描结果为“嗨嗨嗨”的二维码图片

View File

@ -0,0 +1,13 @@
指令介绍
雷达回波 - 用于获取指定地区的天气雷达回波图像
格式
雷达回波 <地区>
示例
`雷达回波 华南` 获取华南地区的天气雷达回波图
`雷达回波 全国` 获取全国的天气雷达回波图
说明
该指令通过查询中国气象局 https://www.nmc.cn/publish/radar/chinaall.html ,获取指定地区的实时天气雷达回波图像。
支持的地区有:全国 华北 东北 华东 华中 华南 西南 西北。

View File

@ -0,0 +1,5 @@
指令介绍
黑白 - 将图片经过一个黑白滤镜的处理
示例
引用一个带有图片的消息,或者消息本身携带图片,然后发送「黑白」即可

View File

@ -0,0 +1,25 @@
import re
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
matcher_fix = on_message()
pattern = (
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&amp;#93;|&#93;|\])哔哩哔哩).{0,500}"
)
@matcher_fix.handle()
async def _(msg: UniMsg, event: Event):
to_search = msg.exclude(Reply, Reference).dump(json=True)
if not re.search(pattern, to_search):
return
logger.info("检测到有 Bilibili 相关的消息,直接进行一个调用")
_module = __import__("nonebot_plugin_analysis_bilibili")
await _module.handle_analysis(event)

45
konabot/plugins/errman.py Normal file
View File

@ -0,0 +1,45 @@
from typing import Any
from nonebot.adapters import Bot
from nonebot.matcher import Matcher
from nonebot.message import run_postprocessor
from nonebot_plugin_alconna import UniMessage
from returns.primitives.exceptions import UnwrapFailedError
from konabot.common.nb.exc import BotExceptionMessage
@run_postprocessor
async def _(bot: Bot, matcher: Matcher, exc: BotExceptionMessage | AssertionError | UnwrapFailedError):
if isinstance(exc, BotExceptionMessage):
msg = exc.msg
await matcher.send(await msg.export(bot))
if isinstance(exc, AssertionError):
if exc.args:
err_msg = exc.args[0]
err_msg_res: UniMessage
if isinstance(err_msg, str):
err_msg_res = UniMessage().text(err_msg)
elif isinstance(err_msg, UniMessage):
err_msg_res = err_msg
else:
return
await matcher.send(await err_msg_res.export(bot))
if isinstance(exc, UnwrapFailedError):
obj = exc.halted_container
try:
failure: Any = obj.failure()
err_msg_res: UniMessage
if isinstance(failure, str):
err_msg_res = UniMessage().text(failure)
elif isinstance(failure, UniMessage):
err_msg_res = failure
else:
return
await matcher.send(await err_msg_res.export(bot))
except:
pass

View File

@ -0,0 +1,69 @@
import qrcode
# from pyzbar.pyzbar import decode
# from PIL import Image
import requests
from io import BytesIO
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply
async def download_img(url):
resp = requests.get(url.replace("https://multimedia.nt.qq","http://multimedia.nt.qq")) # bim获取QQ的图片时避免SSLv3报错
img_bytes = BytesIO()
with open(img_bytes,"wb") as f:
f.write(resp.content)
return img_bytes
def genqr(data):
qr = qrcode.QRCode(version=1,error_correction=qrcode.constants.ERROR_CORRECT_L,box_size=8,border=4)
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
return img_bytes
"""
async def recqr(url):
im_path = "assets/img/qrcode/2.jpg"
data = await download_img(url)
img = Image.open(im_path)
decoded_objects = decode(img)
data = ""
for obj in decoded_objects:
data += obj.data.decode('utf-8')
return data
"""
gqrc = on_alconna(Alconna(
"genqr",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "请输入你要转换为二维码的文字!"
)],
# UniMessage[]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"})
@gqrc.handle()
async def _(saying: list):
"""
img = await draw_pt("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
# print(saying)
# 二维码识别
if type(saying[0]) == 'image':
data = await recqr(saying[0].data['url'])
if data == "":
await gqrc.send("二维码图片解析失败!")
else:
await gqrc.send(recqr(saying[0].data['url']))
# 二维码生成
else:
"""
# genqr("\n".join(saying))
await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export())

View File

@ -0,0 +1,229 @@
import base64
import secrets
import json
from typing import Literal
from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
UniMessage, UniMsg, on_alconna)
from konabot.common.path import ASSETS_PATH
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
IDIOM_FIRST_CHAR = {} # 成语首字字典
INITED = False
def init_lexicon():
global ALL_WORDS, ALL_IDIOMS, IDIOM_FIRST_CHAR
# 成语大表
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
ALL_IDIOMS_INFOS = json.load(f)
# 词语大表
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
ALL_WORDS = json.load(f)
COMMON_WORDS = []
# 读取 COMMON 词语大表
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
COMMON_WORDS.append(word)
# 读取 THUOCL 成语库
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
# 只有成语的大表
ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
# 其他四字词语表,仅表示可以有这个词
ALL_WORDS = [word for word in ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
ALL_WORDS = list(set(ALL_WORDS)) # 去重
# 根据成语大表,划分出成语首字字典
IDIOM_FIRST_CHAR = {}
for idiom in ALL_IDIOMS + ALL_WORDS:
if idiom[0] not in IDIOM_FIRST_CHAR:
IDIOM_FIRST_CHAR[idiom[0]] = []
IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
NOW_PLAYING = False
SCORE_BOARD = {}
LAST_CHAR = ""
USER_NAME_CACHE = {} # 缓存用户名称,避免多次获取
REMAIN_PLAYING_TIMES = 1
LAST_PLAY_DATE = ""
LOCK = False
ALL_BUFF_SCORE = 0 # 全体分数
import datetime
def be_able_to_play():
global REMAIN_PLAYING_TIMES, LAST_PLAY_DATE
if(LAST_PLAY_DATE != datetime.date.today()):
LAST_PLAY_DATE = datetime.date.today()
REMAIN_PLAYING_TIMES = 1
if(REMAIN_PLAYING_TIMES > 0):
REMAIN_PLAYING_TIMES -= 1
return True
return False
evt = on_alconna(Alconna(
"我要玩成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def play_game(event: BaseEvent, force = False):
global NOW_PLAYING, LAST_CHAR, INITED
if NOW_PLAYING:
await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export())
return
if not be_able_to_play() and not force:
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
return
if not INITED:
init_lexicon()
INITED = True
NOW_PLAYING = True
await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export())
# 选择一个随机成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
# 发布成语
await evt.send(await UniMessage().text(f"第一个成语:「{idiom}」,请接!").export())
evt = on_alconna(Alconna(
"老子就是要玩成语接龙!!!"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def force_play_game(event: BaseEvent):
await play_game(event, force=True)
evt = on_alconna(Alconna(
"不玩了"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, SCORE_BOARD, LAST_CHAR, ALL_BUFF_SCORE
if NOW_PLAYING:
NOW_PLAYING = False
# 发送好吧狗图片
# 打开好吧狗本地文件
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
img_data = f.read()
await evt.send(await UniMessage().image(raw=img_data).export())
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
if len(SCORE_BOARD) == 0:
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(SCORE_BOARD.items(), key=lambda x: x[1]["score"], reverse=True)
for i, (user_id, info) in enumerate(sorted_score):
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + ALL_BUFF_SCORE}\n"
await evt.send(await result_text.export())
# 重置分数板
SCORE_BOARD = {}
LAST_CHAR = ""
else:
await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export())
# 跳过
evt = on_alconna(Alconna(
"跳过成语"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, LAST_CHAR, ALL_BUFF_SCORE
if not NOW_PLAYING:
return
await evt.send(await UniMessage().text("你们太菜了全部扣100分").export())
ALL_BUFF_SCORE -= 100
# 选择下一个成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export())
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg):
global NOW_PLAYING, LAST_CHAR, LOCK
if not NOW_PLAYING:
return
user_idiom = msg.extract_plain_text().strip()
if(user_idiom[0] != LAST_CHAR):
return
if LOCK:
return
LOCK = True
await handle_send_info(event, msg)
LOCK = False
async def handle_send_info(event: BaseEvent, msg: UniMsg):
global NOW_PLAYING, LAST_CHAR, SCORE_BOARD, ALL_BUFF_SCORE
user_idiom = msg.extract_plain_text().strip()
if(user_idiom not in ALL_IDIOMS and user_idiom not in ALL_WORDS):
# 扣0.1分
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
if user_id not in SCORE_BOARD:
SCORE_BOARD[user_id] = {
"name": user_name,
"score": 0
}
SCORE_BOARD[user_id]["score"] -= 0.1
await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export())
return
# 成功接上
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
if user_id not in SCORE_BOARD:
SCORE_BOARD[user_id] = {
"name": user_name,
"score": 0
}
SCORE_BOARD[user_id]["score"] += 1
# at 指定玩家
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {SCORE_BOARD[user_id]['score'] + ALL_BUFF_SCORE} 分!").export())
LAST_CHAR = user_idiom[-1]
await evt.send(await UniMessage().text(f"下一个成语请以「{LAST_CHAR}」开头!").export())

View File

@ -0,0 +1,3 @@
from pathlib import Path
ASSETS = Path(__file__).parent.parent / "assets"

View File

@ -0,0 +1,211 @@
import re
from io import BytesIO
from nonebot import on_message
from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
on_alconna)
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image
from konabot.common.nb.match_keyword import match_keyword
from konabot.common.nb.reply_image import reply_image
cmd_black_white = on_message(rule=match_keyword("黑白"))
@cmd_black_white.handle()
async def _(img: PIL_Image, bot: Bot):
await reply_image(cmd_black_white, bot, img.convert("LA"))
def parse_timestamp(tx: str) -> float | None:
res = 0.0
for component in tx.split(":"):
res *= 60
if not re.match(r"^\d+(\.\d+)?$", component):
return
res += float(component)
return res
cmd_giftool = on_alconna(Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
))
@cmd_giftool.handle()
async def _(
image: PIL_Image,
start_point: str | None = None,
frame_count: int | None = None,
length: str | None = None,
speed_factor: float = 1.0,
end_point: str | None = None,
):
ss: None | float = None
if start_point:
ss = parse_timestamp(start_point)
if ss is None:
raise BotExceptionMessage("--ss 的格式不满足条件")
t: None | float = None
if length:
t = parse_timestamp(length)
if t is None:
raise BotExceptionMessage("-t 的格式不满足条件")
to: None | float = None
if end_point:
to = parse_timestamp(end_point)
if to is None:
raise BotExceptionMessage("-to 的格式不满足条件")
if to is not None and ss is not None and to <= ss:
raise BotExceptionMessage("错误:出点时间小于入点")
if frame_count is not None and frame_count <= 0:
raise BotExceptionMessage("错误:帧数量应该大于 0")
if speed_factor == 0:
raise BotExceptionMessage("错误:速度不能为 0")
is_rev = speed_factor < 0
speed_factor = abs(speed_factor)
if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF")
frames = []
durations = []
total_duration = 0.0
try:
for i in range(getattr(image, "n_frames")):
image.seek(i)
frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒
durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError:
pass
if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败")
def time_to_frame_index(target_time: float) -> int:
if target_time <= 0:
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
start_frame = max(0, start_frame)
end_frame = min(len(frames) - 1, end_frame)
selected_frames = frames[start_frame : end_frame + 1]
selected_durations = durations[start_frame : end_frame + 1]
if frame_count is not None and frame_count > 0:
if frame_count >= len(selected_frames):
pass
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
output_img = BytesIO()
adjusted_durations = [
dur / speed_factor for dur in selected_durations
]
rframes = []
rdur = []
acc_mod_20 = 0
for i in range(len(selected_frames)):
fr = selected_frames[i]
du: float = adjusted_durations[i]
if du >= 20:
rframes.append(fr)
rdur.append(int(du))
acc_mod_20 = 0
else:
if acc_mod_20 == 0:
rframes.append(fr)
rdur.append(20)
acc_mod_20 += du
else:
acc_mod_20 += du
if acc_mod_20 >= 20:
acc_mod_20 = 0
if len(rframes) == 1 and len(selected_frames) > 1:
rframes.append(selected_frames[max(2, len(selected_frames) // 2)])
rdur.append(20)
transparency_flag = False
for f in rframes:
if f.mode == "RGBA":
if any(pix < 255 for pix in f.getchannel("A").getdata()):
transparency_flag = True
break
elif f.mode == "P" and "transparency" in f.info:
transparency_flag = True
break
tf = {}
if transparency_flag:
tf['transparency'] = 0
if is_rev:
rframes = rframes[::-1]
rdur = rdur[::-1]
if rframes:
rframes[0].save(
output_img,
format="GIF",
save_all=True,
append_images=rframes[1:],
duration=rdur,
loop=0,
optimize=False,
disposal=2,
**tf,
)
else:
raise BotExceptionMessage("错误:没有可输出的帧")
output_img.seek(0)
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())

View File

@ -0,0 +1,104 @@
from curses.ascii import isdigit
from pathlib import Path
import nonebot
import nonebot.adapters
import nonebot.adapters.discord
import nonebot.rule
from nonebot import on_command
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.common.nb.is_admin import is_admin
from konabot.common.path import DOCS_PATH_MAN1, DOCS_PATH_MAN3, DOCS_PATH_MAN7, DOCS_PATH_MAN8
def search_man(section: int) -> dict[tuple[int, str], Path]:
base_path = {
1: DOCS_PATH_MAN1,
3: DOCS_PATH_MAN3,
7: DOCS_PATH_MAN7,
8: DOCS_PATH_MAN8,
}.get(section, DOCS_PATH_MAN1)
res: dict[tuple[int, str], Path] = {}
for fp in base_path.iterdir():
if fp.suffix != '.txt':
continue
name = fp.name.lower().removesuffix('.txt')
res[(section, name)] = fp
return res
man = on_alconna(Alconna(
'man',
Args['section', int | None],
Args['doc', str | None],
))
@man.handle()
async def _(
section: int | None,
doc: str | None,
event: nonebot.adapters.Event,
):
if doc is not None and section is None and all(isdigit(c) for c in doc):
section = int(doc)
doc = None
if section is not None and section not in {1, 3, 7, 8}:
await man.send(
UniMessage().text(f"你所指定的文档类型 {section} 不在可用范围内")
)
return
if doc is None:
# 检索模式
if section is None:
section_set = {1}
else:
section_set = {section}
if 1 in section_set and is_admin(event):
section_set.add(8)
mans: list[str] = []
for section in section_set:
mans += [f"{n}({s})" for s, n in search_man(section).keys()]
mans.sort()
await man.send(UniMessage().text(
(
"★此方 BOT 使用帮助★\n"
"使用 man <指令名> 查询某个指令的名字\n\n"
"可供查询的指令清单:"
)
+ ", ".join(mans)
+ "\n\n例如,使用 man man 来查询 man 指令的使用方法"
))
else:
# 查阅模式
if section is None:
section_set = {1}
else:
section_set = {section}
if 1 in section_set and is_admin(event):
section_set.add(8)
if 8 in section_set and not is_admin(event):
await man.send(UniMessage().text("你没有查看该指令类型的权限"))
return
mans_dict: dict[tuple[int, str], Path] = {}
for section in section_set:
mans_dict: dict[tuple[int, str], Path] = {**mans_dict, **search_man(section)}
mans_dict_2 = {key[1]: val for key, val in mans_dict.items()}
mans_fp = mans_dict_2.get(doc.lower())
if mans_fp is None:
await man.send(UniMessage().text("你所检索的指令不存在"))
return
mans_msg = mans_fp.read_text('utf-8', 'replace')
if isinstance(event, nonebot.adapters.discord.event.MessageEvent):
mans_msg = f'```\n{mans_msg}\n```'
await man.send(UniMessage().text(mans_msg))
help_deprecated = on_command('help', rule=nonebot.rule.to_me())
@help_deprecated.handle()
async def _():
await help_deprecated.send('你可以使用 man 指令来查询此方 BOT 的帮助')

View File

@ -1,10 +1,19 @@
from io import BytesIO
from typing import Iterable, cast
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
UniMessage, UniMsg, on_alconna)
from konabot.plugins.memepack.drawing.geimao import draw_geimao
from konabot.plugins.memepack.drawing.pt import draw_pt
from konabot.common.nb.extract_image import extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
draw_geimao, draw_mnk,
draw_pt, draw_suan)
from nonebot.adapters import Bot, Event
from returns.result import Success, Failure
geimao = on_alconna(Alconna(
"给猫说",
@ -36,3 +45,97 @@ async def _(saying: list[str]):
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna(
"re:小?黑白子?说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写黑白子说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
@mnk.handle()
async def _(saying: list[str]):
img = await draw_mnk("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna(
"小蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写小蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
@suan.handle()
async def _(saying: list[str]):
img = await draw_suan("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna(
"大蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写大蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
@dsuan.handle()
async def _(saying: list[str]):
img = await draw_suan("\n".join(saying), True)
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna(
"乖猫说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写十猫说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"})
@cutecat.handle()
async def _(saying: list[str]):
img = await draw_cute_ten("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await cutecat.send(await UniMessage().image(raw=img_bytes).export())
cao_display_cmd = on_message()
@cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示":
flag = True
elif text.text.strip() == '':
continue
else:
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_cao_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await cao_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(' ')
.text(err)
.export()
)

View File

@ -10,3 +10,4 @@ FontDB.SetDefaultEmojiOptions(EmojiOptions(
HARMONYOS_SANS_SC_BLACK = FontDB.Query("HarmonyOS_Sans_SC_Black")
HARMONYOS_SANS_SC_REGULAR = FontDB.Query("HarmonyOS_Sans_SC_Regular")
LXGWWENKAI_REGULAR = FontDB.Query("LXGWWenKai-Regular")

View File

@ -0,0 +1,45 @@
import asyncio
from typing import Any, cast
import cv2
import numpy as np
import PIL.Image
from konabot.common.path import ASSETS_PATH
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
CAO_QUAD_POINTS = np.float32(cast(Any, [
[392, 540],
[577, 557],
[567, 707],
[381, 687],
]))
def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB"))
h, w = src.shape[:2]
src_points = np.float32(cast(Any, [
[0, 0],
[w, 0],
[w, h],
[0, h]
]))
dst_points = CAO_QUAD_POINTS
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
output_size = cao_image.size
output_w, output_h = output_size
warped = cv2.warpPerspective(
src,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
result = PIL.Image.fromarray(warped, 'RGB').convert('RGBA')
result = PIL.Image.alpha_composite(result, cao_image)
return result
async def draw_cao_display(image: PIL.Image.Image):
return await asyncio.to_thread(_draw_cao_display, image)

View File

@ -1,30 +0,0 @@
import asyncio
from typing import Any, cast
import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from .base.fonts import HARMONYOS_SANS_SC_BLACK
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
def _draw_geimao(saying: str):
img = geimao_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 960, 50, 00.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
0.8,
imagetext_py.TextAlign.Center,
cast(Any, 30.0),
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
draw_emojis=True,
)
return img
async def draw_geimao(saying: str):
return await asyncio.to_thread(_draw_geimao, saying)

View File

@ -1,27 +0,0 @@
import asyncio
import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
def _draw_pt(saying: str):
img = pt_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_pt(saying: str):
return await asyncio.to_thread(_draw_pt, saying)

View File

@ -0,0 +1,108 @@
import asyncio
from typing import Any, cast
import imagetext_py
import PIL.Image
from konabot.common.path import ASSETS_PATH
from .base.fonts import HARMONYOS_SANS_SC_BLACK, HARMONYOS_SANS_SC_REGULAR, LXGWWENKAI_REGULAR
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
mnk_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "mnksay.jpg").convert("RGBA")
dasuan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "dss.png").convert("RGBA")
suan_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "suanleba.png").convert("RGBA")
cute_ten_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "tententen.png").convert("RGBA")
def _draw_geimao(saying: str):
img = geimao_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 960, 50, 0.5, 0, 1920, 240, HARMONYOS_SANS_SC_BLACK,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
0.8,
imagetext_py.TextAlign.Center,
cast(Any, 30.0),
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
draw_emojis=True,
)
return img
async def draw_geimao(saying: str):
return await asyncio.to_thread(_draw_geimao, saying)
def _draw_pt(saying: str):
img = pt_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 259, 278, 0.5, 0.5, 360, 48, HARMONYOS_SANS_SC_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_pt(saying: str):
return await asyncio.to_thread(_draw_pt, saying)
def _draw_mnk(saying: str):
img = mnk_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 540, 25, 0.5, 0, 1080, 120, HARMONYOS_SANS_SC_BLACK,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
0.8,
imagetext_py.TextAlign.Center,
cast(Any, 15.0),
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("FFFFFFFF")),
draw_emojis=True,
)
return img
async def draw_mnk(saying: str):
return await asyncio.to_thread(_draw_mnk, saying)
def _draw_suan(saying: str, dasuan: bool = False):
if dasuan:
img = dasuan_image.copy()
else:
img = suan_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 1020, 290, 0.5, 0.5, 400, 48, LXGWWENKAI_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_suan(saying: str, dasuan: bool = False):
return await asyncio.to_thread(_draw_suan, saying, dasuan)
def _draw_cute_ten(saying: str):
img = cute_ten_image.copy()
with imagetext_py.Writer(img) as iw:
iw.draw_text_wrapped(
saying, 390, 479, 0.5, 0.5, 760, 96, LXGWWENKAI_REGULAR,
imagetext_py.Paint.Color(imagetext_py.Color.from_hex("000000FF")),
1.0,
imagetext_py.TextAlign.Center,
draw_emojis=True,
)
return img
async def draw_cute_ten(saying: str):
return await asyncio.to_thread(_draw_cute_ten, saying)

View File

@ -0,0 +1,166 @@
import json, time
from nonebot_plugin_alconna import Alconna, Args, Field, MultiVar, on_alconna
from nonebot.adapters.onebot.v11 import Event
from konabot.common.path import ASSETS_PATH, DATA_PATH
POLL_TEMPLATE_FILE = ASSETS_PATH / "json" / "poll.json"
POLL_DATA_FILE = DATA_PATH / "poll.json"
if not POLL_DATA_FILE.exists():
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
async def createpoll(title,qqid,options):
polllength = len(poll_list)
pollid = str(polllength)
poll_create = int(time.time())
poll_expiry = poll_create + 24*3600
polljson = {"title":title,"qq":qqid,"create":poll_create,"expiry":poll_expiry,"options":options,"polldata":{}}
poll_list[pollid] = polljson
writeback()
return pollid
def getpolldata(pollid_or_title):
# 初始化“被指定的投票项目”
thepoll = {}
polnum = -1
# 判断是ID还是标题
if str.isdigit(pollid_or_title):
if pollid_or_title in poll_list:
thepoll = poll_list[pollid_or_title]
polnum = pollid_or_title
else:
return [{},-1]
else:
for i in poll_list:
if poll_list[i]["title"] == pollid_or_title:
thepoll = poll_list[i]
polnum = i
break
if polnum == -1:
return [{},-1]
return [thepoll,polnum]
def writeback():
# file = open(poll_json_path,"w",encoding="utf-8")
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
POLL_DATA_FILE.write_text(json.dumps({
'poll': poll_list,
}, ensure_ascii=False, sort_keys=True))
async def pollvote(polnum,optionnum,qqnum):
optiond = poll_list[polnum]["polldata"]
if optionnum not in optiond:
poll_list[polnum]["polldata"][optionnum] = []
poll_list[polnum]["polldata"][optionnum].append(qqnum)
writeback()
return
poll = on_alconna(Alconna(
"poll",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:发起投票 <投票标题> <选项1> <选项2> ..."
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"})
@poll.handle()
async def _(saying: list, event: Event):
if (len(saying) < 3):
await poll.send("请提供至少两个投票选项!")
elif (len(saying) < 17):
title = saying[0]
saying.remove(title)
options = {}
for i in saying:
options[saying.index(i)] = i
qqid = event.get_user_id()
result = await createpoll(title,qqid,options)
await poll.send("已创建投票。回复 查看投票 "+str(result)+" 查看该投票。")
else:
await poll.send("投票选项太多了请减少到15个选项以内。")
viewpoll = on_alconna(Alconna(
"viewpoll",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "请指定投票ID或标题。用法查看投票 <投票ID或标题>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"})
@viewpoll.handle()
async def _(saying: list):
# 参数投票ID或者标题
# pollid_or_title = params[0]
polldata = getpolldata(saying[0])
# 被指定的投票项目
thepoll = polldata[0]
polnum = polldata[1]
if polnum == -1:
await viewpoll.send("该投票不存在!")
else:
# 检查投票是否已结束
pollended = 0
if time.time() > thepoll["expiry"]:
pollended = 1
# 回复内容
reply = "投票:"+thepoll["title"]+" [ID: "+str(polnum)+"]"
# 如果投票已结束
if pollended:
for i in thepoll["options"]:
reply += "\n"
# 检查该选项是否有人投票
if i in thepoll["polldata"]:
reply += "["+str(len(thepoll["polldata"][i]))+" 票]"
else:
reply += "[0 票]"
reply += " "+thepoll["options"][i]
reply += "\n\n此投票已结束。"
else:
for i in thepoll["options"]:
reply += "\n"
reply += "- "+thepoll["options"][i]
# reply += "\n\n小提示向bot私聊发送 /viewpoll "+str(polnum)+" 可查看已投票数哦!"
reply += "\n\n发送 投票 "+str(polnum)+" <选项文本> 即可参与投票!"
await viewpoll.send(reply)
vote = on_alconna(Alconna(
"vote",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:投票 <投票ID/标题> <选项文本>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"})
@vote.handle()
async def _(saying: list, event: Event):
if (len(saying) < 2):
await vote.send("请指定投给哪一项!")
else:
polldata = getpolldata(saying[0])
# 被指定的投票项目
thepoll = polldata[0]
polnum = polldata[1]
if polnum == -1:
await viewpoll.finish("没有找到这个投票!")
# thepolldata = thepoll["polldata"]
# 查找对应的投票项
optionnum = -1
for i in thepoll["options"]:
if saying[1] == thepoll["options"][i]:
optionnum = i
break
if optionnum == -1:
reply = "此投票里面没有这一项!可用的选项有:"
for i in thepoll["options"]:
reply += "\n"
reply += "- "+thepoll["options"][i]
await viewpoll.send(reply)
# 检查是否符合投票条件该qq号是否已参与过投票、投票是否过期
elif time.time() > thepoll["expiry"]:
await viewpoll.send("此投票已经结束!请发送 查看投票 "+polnum+" 查看结果。")
elif str(event.get_user_id()) in str(thepoll["polldata"]):
await viewpoll.send("你已参与过此投票!请在投票结束后发送 查看投票 "+polnum+" 查看结果。")
# 写入项目
else:
await pollvote(polnum,optionnum,event.get_user_id())
await viewpoll.send("投票成功!你投给了 "+saying[1])

View File

@ -1,11 +1,11 @@
from typing import Optional
from typing import Optional, Union
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.plugins.roll_dice.roll_dice import generate_dice_image
from konabot.plugins.roll_dice.roll_number import get_random_number, roll_number
from konabot.plugins.roll_dice.roll_number import get_random_number, get_random_number_string, roll_number
evt = on_alconna(Alconna(
"摇数字"
@ -22,21 +22,26 @@ async def _(event: BaseEvent):
evt = on_alconna(Alconna(
"摇骰子",
Args["f1?", int]["f2?", int]
Args["f1?", str]["f2?", str]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, f1: Optional[int] = None, f2: Optional[int] = None):
async def _(event: BaseEvent, f1: Optional[str] = None, f2: Optional[str] = None):
# if isinstance(event, DiscordMessageEvent):
# await evt.send(await UniMessage().text("```\n" + roll_dice() + "\n```").export())
# elif isinstance(event, ConsoleMessageEvent):
number = 0
number = ""
if(f1 is not None and f2 is not None):
number = get_random_number(f1, f2)
number = get_random_number_string(f1, f2)
elif f1 is not None:
number = get_random_number(1, f1)
if(float(f1) > 1):
number = get_random_number_string("1", f1)
elif (float(f1) > 0):
number = get_random_number_string("0", f1)
else:
number = get_random_number_string(f1, "0")
else:
number = get_random_number()
number = get_random_number_string()
await evt.send(await UniMessage().image(raw=await generate_dice_image(number)).export())
# else:
# await evt.send(await UniMessage().text(roll_dice(wide=True)).export())

View File

@ -152,27 +152,208 @@ def precise_blend_with_perspective(background, foreground, corners):
return result
async def generate_dice_image(number: int) -> BytesIO:
def draw_line_bresenham(image, x0, y0, x1, y1, color):
"""使用Bresenham算法画线避免间隙"""
dx = abs(x1 - x0)
dy = abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx - dy
while True:
if 0 <= x0 < image.shape[1] and 0 <= y0 < image.shape[0]:
image[y0, x0] = color
if x0 == x1 and y0 == y1:
break
e2 = 2 * err
if e2 > -dy:
err -= dy
x0 += sx
if e2 < dx:
err += dx
y0 += sy
def slice_and_stretch(image, slice_lines, direction):
'''
image: 图像
slice_lines: 切割线两个点的列表一般是倾斜45度的直线
direction: 移动方向向量(二元数组)
'''
# 获取图片的尺寸
height, width = image.shape[:2]
# 创建一个由移动方向扩充后,更大的图片
new_width = int(width + abs(direction[0]))
new_height = int(height + abs(direction[1]))
new_image = np.zeros((new_height, new_width, 4), dtype=image.dtype)
# 先把图片放在新图的和方向相反的一侧
offset_x = int(abs(min(0, direction[0])))
offset_y = int(abs(min(0, direction[1])))
new_image[offset_y:offset_y+height, offset_x:offset_x+width] = image
# 切割线也跟着偏移
slice_lines = [(x + offset_x, y + offset_y) for (x, y) in slice_lines]
# 复制切割线经过的像素,沿着方向移动,实现类似拖尾的效果
apply_trail_effect_vectorized(new_image, slice_lines, direction)
apply_stroke_vectorized(new_image, slice_lines, direction)
return new_image, offset_x, offset_y
def apply_trail_effect_vectorized(new_image, slice_lines, direction):
"""向量化实现拖尾效果"""
height, width = new_image.shape[:2]
# 创建坐标网格
y_coords, x_coords = np.mgrid[0:height, 0:width]
# 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]], axis=-1)
# 计算叉积(有向距离)
cross_products = (line_vec[0] * point_vecs[:, :, 1] -
line_vec[1] * point_vecs[:, :, 0])
# 选择直线右侧的像素 (d1 > 0)
mask = cross_products > 0
# 计算目标位置
target_x = (x_coords + direction[0]).astype(int)
target_y = (y_coords + direction[1]).astype(int)
# 创建有效位置掩码
valid_mask = mask & (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
# 批量复制像素
new_image[target_y[valid_mask], target_x[valid_mask]] = \
new_image[y_coords[valid_mask], x_coords[valid_mask]]
def apply_stroke_vectorized(new_image, slice_lines, direction):
"""使用向量化操作优化笔画效果"""
height, width = new_image.shape[:2]
# 1. 找到所有非透明像素
non_transparent = np.where(new_image[:, :, 3] > 0)
if len(non_transparent[0]) == 0:
return
y_coords, x_coords = non_transparent
# 2. 向量化计算点到直线的距离
line_vec = np.array([slice_lines[1][0] - slice_lines[0][0],
slice_lines[1][1] - slice_lines[0][1]])
point_vecs = np.column_stack([x_coords - slice_lines[0][0],
y_coords - slice_lines[0][1]])
# 计算叉积(距离)
cross_products = (line_vec[0] * point_vecs[:, 1] -
line_vec[1] * point_vecs[:, 0])
# 3. 选择靠近直线的像素
mask = np.abs(cross_products) < 1.0
selected_y = y_coords[mask]
selected_x = x_coords[mask]
selected_pixels = new_image[selected_y, selected_x]
if len(selected_x) == 0:
return
# 4. 预计算采样点
length = np.sqrt(direction[0]**2 + direction[1]**2)
if length == 0:
return
# 创建采样偏移
dx_dy = np.array([(dx, dy) for dx in [-0.5, 0, 0.5]
for dy in [-0.5, 0, 0.5]])
# 5. 批量计算目标位置
steps = max(1, int(length * 2))
alpha = 0.7
for k in range(1, steps + 1):
# 对所有选中的像素批量计算新位置
scale = k / steps
# 为每个像素和每个采样点计算目标位置
for dx, dy in dx_dy:
target_x = np.round(selected_x + dx + direction[0] * scale).astype(int)
target_y = np.round(selected_y + dy + direction[1] * scale).astype(int)
# 创建有效位置掩码
valid_mask = (target_x >= 0) & (target_x < width) & \
(target_y >= 0) & (target_y < height)
if np.any(valid_mask):
valid_target_x = target_x[valid_mask]
valid_target_y = target_y[valid_mask]
valid_source_idx = np.where(valid_mask)[0]
# 批量混合像素
source_pixels = selected_pixels[valid_source_idx]
target_pixels = new_image[valid_target_y, valid_target_x]
new_image[valid_target_y, valid_target_x] = (
alpha * source_pixels + (1 - alpha) * target_pixels
)
async def generate_dice_image(number: str) -> BytesIO:
# 将文本转换为带透明背景的图像
text = str(number)
text = number
# 如果文本太长,直接返回金箍棒
if(len(text) > 50):
output = BytesIO()
push_image = Image.open(ASSETS_PATH / "img" / "dice" / "stick.png")
push_image.save(output,format='GIF')
output.seek(0)
return output
text_image = text_to_transparent_image(
text,
font_size=60,
text_color=(0, 0, 0) # 黑色文字
)
# 获取长宽比
height, width = text_image.shape[:2]
aspect_ratio = width / height
# 根据长宽比设置拉伸系数
stretch_k = 1
if aspect_ratio > 1:
stretch_k = aspect_ratio
# 骰子的方向
up_direction = (51 - 16, 5 - 30) # 右上角点 - 左上角点
move_distance = (up_direction[0] * (stretch_k - 1), up_direction[1] * (stretch_k - 1))
# 加载背景图像,保留透明通道
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
assert background is not None
height, width = background.shape[:2]
background, offset_x, offset_y = slice_and_stretch(background,
[(10,10),(0,0)],
move_distance)
# 定义3D变换的四个角点透视效果
# 顺序: [左上, 右上, 右下, 左下]
corners = np.array([
[16, 30], # 左上
[51, 5], # 右上(上移,创建透视)
[88, 33], # 右下
[51 + move_distance[0], 5 + move_distance[1]], # 右上(上移,创建透视)
[88 + move_distance[0], 33 + move_distance[1]], # 右下
[49, 62] # 左下(下移)
], dtype=np.float32)
corners[:, 0] += offset_x
corners[:, 1] += offset_y
# 加载背景图像,保留透明通道
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
# 对文本图像进行3D变换保持透明通道
transformed_text, transform_matrix = perspective_transform(text_image, background, corners)
@ -186,6 +367,26 @@ async def generate_dice_image(number: int) -> BytesIO:
images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)]
images.append(pil_final)
frame_durations = [100] * (len(images) - 1) + [100000]
# 将导入的图像尺寸扩展为和 pil_final 相同的大小,随帧数进行扩展,然后不放大的情况下放在最中间
if(aspect_ratio > 1):
target_size = pil_final.size
for i in range(len(images) - 1):
k = i / (len(images) - 1)
now_distance = (move_distance[0] * k, move_distance[1] * k)
img = np.array(images[i])
img, _, _ = slice_and_stretch(img,
[(10,10),(0,0)],
now_distance)
# 只扩展边界,图像本身不放大
img_width, img_height = img.shape[1], img.shape[0]
new_img = Image.new("RGBA", target_size, (0, 0, 0, 0))
this_offset_x = (target_size[0] - img_width) // 2
this_offset_y = (target_size[1] - img_height) // 2
# new_img.paste(img, (this_offset_x, this_offset_y))
new_img.paste(Image.fromarray(img), (this_offset_x, this_offset_y))
images[i] = new_img
# 保存为BytesIO对象
output = BytesIO()
images[0].save(output,
@ -194,4 +395,6 @@ async def generate_dice_image(number: int) -> BytesIO:
duration=frame_durations,
format='GIF',
loop=1)
output.seek(0)
# pil_final.save(output, format='PNG')
return output

View File

@ -42,6 +42,24 @@ def get_random_number(min: int = 1, max: int = 6) -> int:
import random
return random.randint(min, max)
def get_random_number_string(min_value: str = "1", max_value: str = "6") -> str:
import random
# 先判断二者是不是整数
if (float(min_value).is_integer()
and float(max_value).is_integer()
and "." not in min_value
and "." not in max_value):
return str(random.randint(int(float(min_value)), int(float(max_value))))
# 根据传入小数的位数,决定保留几位小数
if "." in str(min_value) or "." in str(max_value):
decimal_places = max(len(str(min_value).split(".")[1]) if "." in str(min_value) else 0,
len(str(max_value).split(".")[1]) if "." in str(max_value) else 0)
return str(round(random.uniform(float(min_value), float(max_value)), decimal_places))
# 如果没有小数点,很可能二者都是指数表示或均为 inf直接返回随机小数
return str(random.uniform(float(min_value), float(max_value)))
def roll_number(wide: bool = False) -> str:
raw = number_arts[get_random_number()]
if wide:

View File

@ -1,9 +1,12 @@
import asyncio
import asyncio as asynkio
import datetime
import functools
from pathlib import Path
from typing import Any, Literal, cast
import signal
import nonebot
import ptimeparse
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event
@ -19,13 +22,13 @@ from nonebot.adapters.onebot.v11.event import \
from nonebot_plugin_alconna import UniMessage, UniMsg
from pydantic import BaseModel
from konabot.plugins.simple_notify.parse_time import get_target_time
evt = on_message()
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json"
DATA_FILE_LOCK = asyncio.Lock()
DATA_FILE_LOCK = asynkio.Lock()
ASYNK_TASKS: set[asynkio.Task[Any]] = set()
class Notify(BaseModel):
@ -45,7 +48,7 @@ class Notify(BaseModel):
class NotifyConfigFile(BaseModel):
version: int = 1
version: int = 2
notifies: list[Notify] = []
unsent: list[Notify] = []
@ -89,13 +92,17 @@ async def notify_now(notify: Notify):
if notify.target_env is None:
await bot.send_private_msg(
user_id=int(notify.target),
message=f"代办通知:{notify.notify_msg}",
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
bot=bot,
)),
)
else:
await bot.send_group_msg(
group_id=int(notify.target_env),
message=cast(Any,
await UniMessage().at(notify.target).text(f" 代办通知:{notify.notify_msg}").export()
await UniMessage().at(
notify.target
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot)
),
)
else:
@ -104,11 +111,20 @@ async def notify_now(notify: Notify):
return True
async def create_notify_task(notify: Notify, fail2remove: bool = True):
def create_notify_task(notify: Notify, fail2remove: bool = True):
async def mission():
begin_time = datetime.datetime.now()
if begin_time < notify.notify_time:
await asyncio.sleep((notify.notify_time - begin_time).total_seconds())
try:
await asynkio.sleep((notify.notify_time - begin_time).total_seconds())
except asynkio.CancelledError:
logger.debug("代办提醒被信号中止,任务退出")
return
else:
logger.warning(
f"期望在 {notify.notify_time} 在平台 {notify.platform} {notify.target_env}"
f" {notify.target} 的代办通知 {notify.notify_msg} 已经超时,将会直接通知!"
)
res = await notify_now(notify)
if fail2remove or res:
await DATA_FILE_LOCK.acquire()
@ -120,7 +136,7 @@ async def create_notify_task(notify: Notify, fail2remove: bool = True):
DATA_FILE_LOCK.release()
else:
pass
return asyncio.create_task(mission())
return asynkio.create_task(mission())
@evt.handle()
@ -137,16 +153,22 @@ async def _(msg: UniMsg, mEvt: Event):
return
notify_time, notify_text = segments
target_time = get_target_time(notify_time)
if target_time is None:
# target_time = get_target_time(notify_time)
try:
target_time = ptimeparse.parse(notify_time)
logger.info(f"{notify_time} 解析出了时间:{target_time}")
except Exception:
logger.info(f"无法从 {notify_time} 中解析出时间")
return
# if target_time is None:
# logger.info(f"无法从 {notify_time} 中解析出时间")
# return
if not notify_text:
return
await DATA_FILE_LOCK.acquire()
cfg = load_notify_config()
if isinstance(mEvt, ConsoleMessageEvent):
platform = "console"
target = mEvt.get_user_id()
@ -173,7 +195,7 @@ async def _(msg: UniMsg, mEvt: Event):
notify_time=target_time,
notify_msg=notify_text,
)
await create_notify_task(notify)
create_notify_task(notify)
cfg.notifies.append(notify)
save_notify_config(cfg)
@ -181,18 +203,58 @@ async def _(msg: UniMsg, mEvt: Event):
await evt.send(await UniMessage().at(mEvt.get_user_id()).text(
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export())
logger.info(f"创建了一条于 {notify.notify_time} 的代办提醒")
driver = nonebot.get_driver()
NOTIFIED_FLAG = {
"task_added": False,
}
@driver.on_bot_connect
async def _():
if NOTIFIED_FLAG["task_added"]:
return
NOTIFIED_FLAG["task_added"] = True
DELTA = 2
logger.info(f"第一次探测到 Bot 连接,等待 {DELTA} 秒后开始通知")
await asynkio.sleep(DELTA)
await DATA_FILE_LOCK.acquire()
tasks = []
# tasks: set[asynkio.Task[Any]] = set()
cfg = load_notify_config()
for notify in cfg.notifies:
tasks.append(create_notify_task(notify, fail2remove=False))
if cfg.version == 1:
logger.info("将配置文件的版本升级为 2")
cfg.version = 2
else:
counter = 0
for notify in [*cfg.notifies]:
task = create_notify_task(notify, fail2remove=False)
ASYNK_TASKS.add(task)
task.add_done_callback(lambda self: ASYNK_TASKS.remove(self))
counter += 1
logger.info(f"成功创建了 {counter} 条代办事项")
save_notify_config(cfg)
DATA_FILE_LOCK.release()
await asyncio.gather(*tasks)
loop = asynkio.get_running_loop()
# 解决 asynk task 没有被 cancel 的问题
async def shutdown(sig: signal.Signals):
logger.info(f"收到 {sig.name} 指令,正在关闭所有的东西")
for task in ASYNK_TASKS:
task.cancel()
await asynkio.gather(*ASYNK_TASKS, return_exceptions=True)
logger.info("所有的代办提醒 Task 都已经退出了")
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, functools.partial(
asynkio.create_task, shutdown(sig)
))
await asynkio.gather(*ASYNK_TASKS)

View File

@ -1,358 +0,0 @@
import datetime
import re
from typing import Optional, Dict, List, Callable, Tuple
from loguru import logger
# --- 常量与正则表达式定义 (Constants and Regex Definitions) ---
# 数字模式,兼容中文和阿拉伯数字
P_NUM = r"(\d+|[零一两二三四五六七八九十]+)"
# 预编译的正则表达式
PATTERNS = {
# 相对时间, e.g., "5分钟后"
"DELTA": re.compile(
r"^"
r"((?P<days>" + P_NUM + r") ?天)?"
r"((?P<hours>" + P_NUM + r") ?个?小?时)?"
r"((?P<minutes>" + P_NUM + r") ?分钟?)?"
r"((?P<seconds>" + P_NUM + r") ?秒钟?)?"
r" ?后 ?$"
),
# 绝对时间
"YEAR": re.compile(r"(" + P_NUM + r") ?年"),
"MONTH": re.compile(r"(" + P_NUM + r") ?月"),
"DAY": re.compile(r"(" + P_NUM + r") ?[日号]"),
"HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"),
"MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"),
"SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"),
"HMS_COLON": re.compile(r"(\d{1,2})[:](\d{1,2})([:](\d{1,2}))?"),
"PM": re.compile(r"(下午|PM|晚上)"),
# 相对日期
"TOMORROW": re.compile(r"明天"),
"DAY_AFTER_TOMORROW": re.compile(r"后天"),
"TODAY": re.compile(r"今天"),
}
# 中文数字到阿拉伯数字的映射
CHINESE_TO_ARABIC_MAP: Dict[str, int] = {
'': 0, '': 1, '': 2, '': 3, '': 4,
'': 5, '': 6, '': 7, '': 8, '': 9, '': 10
}
# --- 核心工具函数 (Core Utility Functions) ---
def parse_number(s: str) -> int:
"""
将包含中文或阿拉伯数字的字符串解析为整数。
例如: "" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。
返回 -1 表示解析失败。
"""
if not s:
return -1
s = s.strip().replace("", "")
if s.isdigit():
return int(s)
if s in CHINESE_TO_ARABIC_MAP:
return CHINESE_TO_ARABIC_MAP[s]
# 处理 "十" 在不同位置的情况
if s.startswith(''):
if len(s) == 1:
return 10
num = CHINESE_TO_ARABIC_MAP.get(s[1])
return 10 + num if num is not None else -1
if s.endswith(''):
if len(s) == 2:
num = CHINESE_TO_ARABIC_MAP.get(s[0])
return 10 * num if num is not None else -1
if '' in s:
parts = s.split('')
if len(parts) == 2:
left = CHINESE_TO_ARABIC_MAP.get(parts[0])
right = CHINESE_TO_ARABIC_MAP.get(parts[1])
if left is not None and right is not None:
return left * 10 + right
return -1
# --- 时间解析器类 (Time Parser Class) ---
class TimeParser:
"""
一个用于解析自然语言时间描述的类。
"""
def __init__(self, content: str):
self.original_content: str = content
self.content_to_parse: str = self._preprocess(content)
self.now: datetime.datetime = datetime.datetime.now()
# 将 t 作为结果构建器,初始化为今天的午夜
self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
self.is_pm_specified: bool = False
self.is_date_specified: bool = False
self.is_time_specified: bool = False
def _preprocess(self, content: str) -> str:
"""预处理字符串,移除不相关字符。"""
content = re.sub(r"\s+", "", content)
content = re.sub(r"[,\.。::、]", "", content)
return content
def _consume_match(self, match: re.Match) -> str:
"""从待解析字符串中移除已匹配的部分。"""
self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1)
return match.group(0)
def parse(self) -> Optional[datetime.datetime]:
"""
主解析方法。
首先尝试解析相对时间如“5分钟后”失败则尝试解析绝对时间。
"""
logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'")
if not self.content_to_parse:
logger.debug("❌ 内容为空,无法解析。")
return None
# 1. 尝试相对时间解析
if (target_time := self._parse_relative_time()) is not None:
return target_time
# 2. 尝试绝对时间解析
if (target_time := self._parse_absolute_time()) is not None:
return target_time
logger.debug(f"❌ 所有解析模式均未匹配成功。")
return None
def _parse_relative_time(self) -> Optional[datetime.datetime]:
"""解析 'X天X小时X分钟后' 这种格式。"""
if match := PATTERNS["DELTA"].match(self.content_to_parse):
logger.debug("⏳ 匹配到相对时间模式 (DELTA)。")
try:
delta_parts = {
"days": parse_number(match.group("days") or "0"),
"hours": parse_number(match.group("hours") or "0"),
"minutes": parse_number(match.group("minutes") or "0"),
"seconds": parse_number(match.group("seconds") or "0"),
}
# 检查是否有无效的数字解析
if any(v < 0 for v in delta_parts.values()):
logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}")
return None
delta = datetime.timedelta(**delta_parts)
if delta.total_seconds() == 0:
logger.debug("❌ 解析出的时间增量为0。")
return None
target_time = self.now + delta
logger.debug(f"✅ 相对时间解析成功 -> {target_time}")
return target_time
except (ValueError, TypeError) as e:
logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True)
return None
return None
def _parse_absolute_time(self) -> Optional[datetime.datetime]:
"""解析一个指定的日期和时间。"""
logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}")
# 定义解析步骤和顺序
# (pattern_key, handler_method)
parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [
("TOMORROW", self._handle_tomorrow),
("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow),
("TODAY", self._handle_today),
("YEAR", self._handle_year),
("MONTH", self._handle_month),
("DAY", self._handle_day),
("HMS_COLON", self._handle_hms_colon),
("PM", self._handle_pm),
("HOUR", self._handle_hour),
("MINUTE", self._handle_minute),
("SECOND", self._handle_second),
]
for key, handler in parsing_steps:
if match := PATTERNS[key].search(self.content_to_parse):
if not handler(match):
# 如果任何一个处理器返回False说明解析失败
return None
# 移除无意义的上午关键词
self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "")
# 如果解析后还有剩余字符,说明有无法识别的部分
if self.content_to_parse.strip():
logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'")
return None
# 最终调整和检查
return self._finalize_datetime()
# --- Handler Methods for Absolute Time Parsing ---
def _handle_tomorrow(self, match: re.Match) -> bool:
self.t += datetime.timedelta(days=1)
self.is_date_specified = True
logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_day_after_tomorrow(self, match: re.Match) -> bool:
self.t += datetime.timedelta(days=2)
self.is_date_specified = True
logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_today(self, match: re.Match) -> bool:
self.is_date_specified = True
logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'")
return True
def _handle_year(self, match: re.Match) -> bool:
year = parse_number(match.group(1))
if year < 0: return False
if year < 100: year += 2000 # 处理 "25年" -> 2025
if year < self.now.year:
logger.debug(f"❌ 指定的年份 {year} 已过去。")
return False
self.t = self.t.replace(year=year)
self.is_date_specified = True
logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_month(self, match: re.Match) -> bool:
month = parse_number(match.group(1))
if not (1 <= month <= 12):
logger.debug(f"❌ 无效的月份: {month}")
return False
# 如果设置的月份在当前月份之前,且没有指定年份,则年份加一
if month < self.t.month and not self.is_date_specified:
self.t = self.t.replace(year=self.t.year + 1)
logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}")
self.t = self.t.replace(month=month)
self.is_date_specified = True
logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_day(self, match: re.Match) -> bool:
day = parse_number(match.group(1))
if not (1 <= day <= 31):
logger.debug(f"❌ 无效的日期: {day}")
return False
try:
# 如果日期小于当前日期,且只指定了日,则月份加一
if day < self.t.day and not self.is_date_specified:
if self.t.month == 12:
self.t = self.t.replace(year=self.t.year + 1, month=1)
else:
self.t = self.t.replace(month=self.t.month + 1)
logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}")
self.t = self.t.replace(day=day)
self.is_date_specified = True
logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'")
return True
except ValueError:
logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。")
return False
def _handle_hms_colon(self, match: re.Match) -> bool:
h = int(match.group(1))
m = int(match.group(2))
s_str = match.group(4) # group(3) is with colon, group(4) is the number
s = int(s_str) if s_str else 0
if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59):
logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}")
return False
self.t = self.t.replace(hour=h, minute=m, second=s)
self.is_time_specified = True
logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_pm(self, match: re.Match) -> bool:
self.is_pm_specified = True
logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'")
return True
def _handle_hour(self, match: re.Match) -> bool:
hour = parse_number(match.group(1))
has_half = match.group(2) == ''
if not (0 <= hour <= 23):
logger.debug(f"❌ 无效的小时: {hour}")
return False
minute = 30 if has_half else self.t.minute
self.t = self.t.replace(hour=hour, minute=minute)
self.is_time_specified = True
logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_minute(self, match: re.Match) -> bool:
minute = parse_number(match.group(1))
if not (0 <= minute <= 59):
logger.debug(f"❌ 无效的分钟: {minute}")
return False
self.t = self.t.replace(minute=minute)
self.is_time_specified = True
logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'")
return True
def _handle_second(self, match: re.Match) -> bool:
second = parse_number(match.group(1))
if not (0 <= second <= 59):
logger.debug(f"❌ 无效的秒: {second}")
return False
self.t = self.t.replace(second=second)
self.is_time_specified = True
logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'")
return True
def _finalize_datetime(self) -> Optional[datetime.datetime]:
"""对解析出的时间进行最后的调整和检查。"""
# 处理下午/晚上
if self.is_pm_specified and self.t.hour < 12:
self.t = self.t.replace(hour=self.t.hour + 12)
logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}")
# 如果没有指定任何时间或日期部分,则认为解析无效
if not self.is_date_specified and not self.is_time_specified:
logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。")
return None
# 如果最终计算出的时间点在当前时间之前,自动往后推
# 例如:现在是 15:00说 "14点"应该是指明天的14点
if self.t < self.now:
# 只有在明确指定了时间的情况下,才自动加一天
# 如果只指定了一个过去的日期如“去年5月1号”则不应该调整
if self.is_time_specified:
self.t += datetime.timedelta(days=1)
logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}")
logger.debug(f"✅ 解析成功,最终时间: {self.t}")
return self.t
# --- 公共接口 (Public Interface) ---
def get_target_time(content: str) -> Optional[datetime.datetime]:
"""
高级接口,用于将自然语言时间描述转换为 datetime 对象。
Args:
content: 包含时间信息的字符串。
Returns:
一个 datetime 对象,如果解析失败则返回 None。
"""
parser = TimeParser(content)
return parser.parse()

View File

@ -0,0 +1,43 @@
from nonebot_plugin_alconna import Alconna, Args, Option, UniMessage, on_alconna
from konabot.common.nb.exc import BotExceptionMessage
from konabot.plugins.sksl.run_sksl import render_sksl_shader_to_gif
cmd_run_sksl = on_alconna(Alconna(
"shadertool",
Option("--width", Args["width_", int]),
Option("--height", Args["height_", int]),
Option("--duration", Args["duration_", float]),
Option("--fps", Args["fps_", float]),
Args["code", str],
))
@cmd_run_sksl.handle()
async def _(
code: str,
width_: int = 320,
height_: int = 180,
duration_: float = 1.0,
fps_: float = 15.0,
):
if width_ <= 0 or height_ <= 0:
raise BotExceptionMessage("长宽应该大于 0")
if duration_ <= 0:
raise BotExceptionMessage("渲染时长应该大于 0")
if fps_ <= 0:
raise BotExceptionMessage("渲染帧率应该大于 0")
if fps_ * duration_ < 1:
raise BotExceptionMessage("时长太短或帧率太小,没有帧被渲染")
if fps_ * duration_ > 100:
raise BotExceptionMessage("太多帧啦!试着缩短一点时间吧!")
if width_ > 640 or height_ > 640:
raise BotExceptionMessage("最大支持 640x640 啦!不要太大啦!")
code = code.strip("\"").strip("'")
try:
res = await render_sksl_shader_to_gif(code, width_, height_, duration_, fps_)
await cmd_run_sksl.send(await UniMessage().image(raw=res).export())
except (ValueError, RuntimeError) as e:
await cmd_run_sksl.send(await UniMessage().text(f"渲染时遇到了问题:\n\n{e}").export())

View File

@ -0,0 +1,155 @@
import asyncio
import io
import struct
from loguru import logger
import numpy as np
import skia
from PIL import Image
def _pack_uniforms(uniforms_dict, width, height, time_val):
"""
根据常见的教学用 uniform 布局打包字节数据
假设 SkSL 中 uniform 顺序为: float u_time; float2 u_resolution;
内存布局: [u_time(4B)][u_res_x(4B)][u_res_y(4B)] (总共 12 字节,无填充)
注意:为匹配 skia.RuntimeEffect 的紧凑布局,已移除 float 和 float2 之间的 4 字节填充。
"""
# u_time (float) - 4 bytes
time_bytes = struct.pack('f', time_val)
# u_resolution (vec2/float2) - 8 bytes
res_bytes = struct.pack('ff', float(width), float(height))
# 移除填充字节,使用紧凑布局
return time_bytes + res_bytes
def _render_sksl_shader_to_gif(
sksl_code: str,
width: int = 256,
height: int = 256,
duration: float = 2.0,
fps: float = 15,
) -> io.BytesIO:
"""
渲染 SkSL 着色器动画为 GIF适配 skia-python >= 138
"""
logger.info(f"开始编译\n{sksl_code}")
runtime_effect = skia.RuntimeEffect.MakeForShader(sksl_code)
if runtime_effect is None:
# SkSL 编译失败时,尝试获取错误信息(如果 skia 版本支持)
error_message = ""
# 注意skia-python 的错误信息捕获可能因版本而异
# 尝试检查编译错误是否在日志中,但最直接的是抛出已知错误
raise ValueError("SkSL 编译失败,请检查语法")
# --- 修复: 移除对不存在的 uniformSize() 的调用,直接使用硬编码的 12 字节尺寸 ---
# float (4 bytes) + float2 (8 bytes) = 12 bytes (基于 _pack_uniforms 函数的紧凑布局)
EXPECTED_UNIFORM_SIZE = 12
# 创建 CPU 后端 Surface
surface = skia.Surface(width, height)
frames = []
total_frames = int(duration * fps)
for frame in range(total_frames):
time_val = frame / fps
# 打包 uniform 数据
uniform_bytes = _pack_uniforms(None, width, height, time_val)
# [检查] 确保打包后的字节数与期望值匹配
if len(uniform_bytes) != EXPECTED_UNIFORM_SIZE:
raise ValueError(
f"Uniform 数据大小不匹配!期望 {EXPECTED_UNIFORM_SIZE} 字节,实际 {len(uniform_bytes)} 字节。请检查 _pack_uniforms 函数。"
)
uniform_data = skia.Data.MakeWithCopy(uniform_bytes)
# 创建着色器
try:
# makeShader 的参数: uniform_data, children_shaders, child_count
shader = runtime_effect.makeShader(uniform_data, None, 0)
if shader is None:
# 如果 SkSL 语法正确但 uniform 匹配失败makeShader 会返回 None
raise RuntimeError("着色器创建返回 None请检查 SkSL 语法和 uniform 数据匹配。")
except Exception as e:
raise ValueError(f"着色器创建失败: {e}")
# 绘制
canvas = surface.getCanvas()
canvas.clear(skia.Color(0, 0, 0, 255))
paint = skia.Paint()
paint.setShader(shader)
canvas.drawRect(skia.Rect.MakeWH(width, height), paint)
# --- 修复 peekPixels() 错误:改用 readPixels() 将数据复制到缓冲区 ---
image = surface.makeImageSnapshot()
# 1. 准备目标 ImageInfo (通常是 kBGRA_8888_ColorType)
target_info = skia.ImageInfo.Make(
image.width(),
image.height(),
skia.ColorType.kBGRA_8888_ColorType, # 目标格式,匹配 Skia 常见的输出格式
skia.AlphaType.kPremul_AlphaType
)
# 2. 创建一个用于接收像素数据的 bytearray 缓冲区
pixel_data = bytearray(image.width() * image.height() * 4) # 4 bytes per pixel (BGRA)
# 3. 将图像数据复制到缓冲区
success = image.readPixels(target_info, pixel_data, target_info.minRowBytes())
if not success:
raise RuntimeError("无法通过 readPixels() 获取图像像素")
# 4. 转换 bytearray 到 NumPy 数组 (BGRA 顺序)
img_array = np.frombuffer(pixel_data, dtype=np.uint8).reshape((height, width, 4))
# 5. BGRA 转换成 RGB 顺序 (交换 R 和 B 通道)
# [B, G, R, A] -> [R, G, B] (丢弃 A 通道)
rgb_array = img_array[:, :, [2, 1, 0]]
# 6. 创建 PIL Image
pil_img = Image.fromarray(rgb_array)
frames.append(pil_img)
# ------------------------------------------------------------------
# 生成 GIF
gif_buffer = io.BytesIO()
# 计算每帧的毫秒延迟
frame_duration_ms = int(1000 / fps)
frames[0].save(
gif_buffer,
format='GIF',
save_all=True,
append_images=frames[1:],
duration=frame_duration_ms,
loop=0, # 0 表示无限循环
optimize=True
)
gif_buffer.seek(0)
return gif_buffer
async def render_sksl_shader_to_gif(
sksl_code: str,
width: int = 256,
height: int = 256,
duration: float = 2.0,
fps: float = 15,
) -> io.BytesIO:
return await asyncio.to_thread(
_render_sksl_shader_to_gif,
sksl_code,
width,
height,
duration,
fps,
)

View File

@ -0,0 +1,220 @@
from io import BytesIO
from loguru import logger
from nonebot.adapters import Bot as BaseBot
from nonebot.adapters import Event as BaseEvent
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
from PIL import Image
from returns.result import Failure, Success
from konabot.common.nb.extract_image import extract_image_from_message
__plugin_meta__ = PluginMetadata(
name="ytpgif",
description="生成来回镜像翻转的仿 YTPMV 动图。",
usage="ytpgif [倍速=1.0] 倍速范围0.120.0",
type="application",
config=None,
homepage=None,
)
# 参数定义
BASE_SEGMENT_DURATION = 0.25
BASE_INTERVAL = 0.25
MAX_SIZE = 256
MIN_SPEED = 0.1
MAX_SPEED = 20.0
MAX_FRAMES_PER_SEGMENT = 500
# 提示语
SPEED_TIPS = f"倍速必须是 {MIN_SPEED}{MAX_SPEED} 之间的数字"
# 定义命令 + 参数校验
ytpgif_cmd = on_alconna(
Alconna(
"ytpgif",
Args[
"speed?",
float,
Field(
default=1.0,
unmatch_tips=lambda x: f"{x}”不是有效数值。{SPEED_TIPS}",
),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
)
def resize_frame(frame: Image.Image) -> Image.Image:
"""缩放图像,保持宽高比,不超过 MAX_SIZE"""
w, h = frame.size
if w <= MAX_SIZE and h <= MAX_SIZE:
return frame
scale = MAX_SIZE / max(w, h)
new_w = int(w * scale)
new_h = int(h * scale)
return frame.resize((new_w, new_h), Image.Resampling.LANCZOS)
@ytpgif_cmd.handle()
async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
# === 校验 speed 范围 ===
if not (MIN_SPEED <= speed <= MAX_SPEED):
await ytpgif_cmd.send(
await UniMessage.text(f"{SPEED_TIPS}").export()
)
return
match await extract_image_from_message(event.get_message(), event, bot):
case Success(img):
src_img = img
case Failure(msg):
await ytpgif_cmd.send(
await UniMessage.text(msg).export()
)
return
case _:
return
try:
try:
n_frames = getattr(src_img, "n_frames", 1)
is_animated = n_frames > 1
logger.debug(f"收到的动图的运动状态:{is_animated} 帧数量:{n_frames}")
except Exception:
is_animated = False
n_frames = 1
output_frames = []
output_durations_ms = []
if is_animated:
# === 动图模式:截取正向 + 镜像两段 ===
frames_with_duration: list[tuple[Image.Image, float]] = []
palette = src_img.getpalette()
for idx in range(n_frames):
src_img.seek(idx)
frame = src_img.copy()
# 检查是否需要透明通道
has_alpha = (
frame.mode in ("RGBA", "LA")
or (frame.mode == "P" and "transparency" in frame.info)
)
if has_alpha:
frame = frame.convert("RGBA")
else:
frame = frame.convert("RGB")
resized_frame = resize_frame(frame)
# 若原图有调色板,尝试保留(可选)
if palette and resized_frame.mode == "P":
try:
resized_frame.putpalette(palette)
except Exception: # noqa
logger.debug("色板应用失败")
pass
ms = frame.info.get("duration", int(BASE_SEGMENT_DURATION * 1000))
dur_sec = max(0.01, ms / 1000.0)
frames_with_duration.append((resized_frame, dur_sec))
max_dur = BASE_SEGMENT_DURATION * speed
accumulated = 0.0
frame_count = 0
# 正向段
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
output_frames.append(img)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
if frame_count == 0:
await ytpgif_cmd.send(
await UniMessage.text("动图帧太短,无法生成有效片段。").export()
)
return
# 镜像段(从头开始)
accumulated = 0.0
frame_count = 0
for img, dur in frames_with_duration:
if accumulated + dur > max_dur or frame_count >= MAX_FRAMES_PER_SEGMENT:
break
flipped = img.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
output_frames.append(flipped)
output_durations_ms.append(int(dur * 1000))
accumulated += dur
frame_count += 1
else:
# === 静态图模式:制作翻转动画 ===
raw_frame = src_img.convert("RGBA")
resized_frame = resize_frame(raw_frame)
interval_sec = max(0.025, min(2.5, BASE_INTERVAL / speed))
duration_ms = int(interval_sec * 1000)
frame1 = resized_frame
frame2 = resized_frame.transpose(Image.Transpose.FLIP_LEFT_RIGHT)
output_frames = [frame1, frame2]
output_durations_ms = [duration_ms, duration_ms]
if len(output_frames) < 1:
await ytpgif_cmd.send(
await UniMessage.text("未能生成任何帧。").export()
)
return
# === 🔐 关键修复:防止无透明图的颜色被当成透明 ===
need_transparency = False
for frame in output_frames:
if frame.mode == "RGBA":
alpha_channel = frame.getchannel("A")
if any(pix < 255 for pix in alpha_channel.getdata()):
need_transparency = True
break
elif frame.mode == "P" and "transparency" in frame.info:
need_transparency = True
break
# 如果不需要透明,则统一转为 RGB 避免调色板污染
if not need_transparency:
output_frames = [f.convert("RGB") for f in output_frames]
# 构建保存参数
save_kwargs = {
"save_all": True,
"append_images": output_frames[1:],
"format": "GIF",
"loop": 0, # 无限循环
"duration": output_durations_ms,
"disposal": 2, # 清除到背景色,避免残留
"optimize": False, # 关闭抖动(等效 -dither none
}
# 只有真正需要透明时才启用 transparency
if need_transparency:
save_kwargs["transparency"] = 0
bio = BytesIO()
output_frames[0].save(bio, **save_kwargs)
result_image = UniMessage.image(raw=bio)
await ytpgif_cmd.send(await result_image.export())
except Exception as e:
print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
)

661
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -21,9 +21,26 @@ dependencies = [
"pillow (>=11.3.0,<12.0.0)",
"imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)",
"ptimeparse (>=0.1.1,<0.2.0)",
"skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)",
]
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
[[tool.poetry.source]]
name = "pt-gitea-pypi"
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
priority = "supplemental"
[[tool.poetry.source]]
name = "mirrors"
url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
priority = "primary"
[tool.poetry.dependencies]
ptimeparse = {source = "pt-gitea-pypi"}

File diff suppressed because it is too large Load Diff

View File

@ -8,9 +8,12 @@ nonebot.load_plugins("konabot/plugins")
plugins = nonebot.get_loaded_plugins()
len_requires = len(
[f for f in (
Path(__file__).parent.parent / "konabot" / "plugins"
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
[
f
for f in (Path(__file__).parent.parent / "konabot" / "plugins").iterdir()
if (f.is_dir() and (f / "__init__.py").exists())
or ((not f.is_dir()) and f.suffix == ".py")
]
)
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]