Compare commits

...

65 Commits

Author SHA1 Message Date
e0c55545ec 添加此方提醒的 CURD 和 ntfy 联动
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 05:08:54 +08:00
164305e81f 调整 man 2025-10-24 02:27:56 +08:00
96679033f3 不再有 fortune
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 02:21:56 +08:00
afda0680ec 调整衰减函数
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:59:41 +08:00
021133954e 调整 man 默认范围
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:33:59 +08:00
7baa04dbc2 添加罗文提示 2025-10-24 01:33:01 +08:00
e55bdbdf4a 怪话不可为空!!!
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:27:40 +08:00
a30c7b8093 添加怪话过滤功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:21:54 +08:00
3da2c2266f 说怪话 bot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-24 01:08:46 +08:00
96e3c3fe17 让 Onebot private channel 也有 ID 2025-10-24 00:46:05 +08:00
851c9eb3c7 修复程序退出耗时太久的问题 2025-10-24 00:01:13 +08:00
11269b2a5a 在罗文被念错时提醒他 2025-10-23 23:32:56 +08:00
875e0efc2f Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-23 22:12:41 +08:00
4f43312663 升级 ptimeparse 2025-10-23 22:12:34 +08:00
b2f4768573 优化判定与计分规则
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 23:52:05 +08:00
bc6263ec31 Merge pull request '添加安安展示' (#27) from tnot/konabot:添加安安展示 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #27
2025-10-21 22:07:19 +08:00
bc9d025836 修好了bug的安安展示 2025-10-21 22:02:41 +08:00
b552aacf89 添加安安展示 2025-10-21 21:33:32 +08:00
f9a0249772 优化 giftool 的截取逻辑
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 18:31:14 +08:00
c94db33b11 更新 ptimeparse 到 0.2.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 22:45:34 +08:00
67382a0c0a 在我写的模块采用更安全的 asyncio 锁写法
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 20:27:18 +08:00
fd4c9302c2 async with lock
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 20:24:47 +08:00
f30ad0cb7d 判定部分优化
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 18:48:10 +08:00
f7afe48680 精度修复
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 18:36:27 +08:00
b42385f780 修复成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 18:24:03 +08:00
6cae38dea9 提升 LongTask 的健壮性
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 16:54:59 +08:00
8594b59783 修复 LongTask 在 Discord 和控制台无法正确返回是否顺利完成任务的问题 2025-10-19 16:51:22 +08:00
f768c91430 完善 LongTask 模块
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 16:47:50 +08:00
a65cb118cc 接入我写的模块来获得群上下文
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 04:51:49 +08:00
75c6bbd23f Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot 2025-10-19 04:45:26 +08:00
aaf0a75d65 添加若干有用的小模块 2025-10-19 04:45:15 +08:00
8f560ce1ba 新成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 01:25:34 +08:00
9f3f79f51d 自动同意小团体的好友请求
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-17 01:04:34 +08:00
92048aeff7 让 wzq 东西在 wzq 群不可用 2025-10-17 00:54:14 +08:00
81aac10665 添加文档并修复问题
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 23:27:42 +08:00
3ce230adfe 优化卵总展示光影
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 22:43:54 +08:00
4f885554ca 添加卵总展示
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 22:29:07 +08:00
7ebcb8add4 Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 19:13:51 +08:00
e18cc82792 修复 av/bv 号无法直接被筛选读取的问题 2025-10-16 19:13:36 +08:00
eb28cd0a0c 更正 Giftool 错误的文档 2025-10-16 18:44:22 +08:00
2d688a6ed6 new
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-14 12:43:25 +08:00
e9aac52200 chengyu update 2025-10-14 01:23:49 +00:00
4305548ab5 submodule
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 22:53:44 +08:00
99382a3bf5 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 22:48:17 +08:00
92e43785bf submodule 2025-10-13 22:46:30 +08:00
fc5b11c5e8 调整 notify 的强制退出
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 22:16:50 +08:00
0ec66988fa 更新投票存储位置
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 22:05:21 +08:00
e5c3081c22 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 22:02:44 +08:00
14b356120a 成语接龙 2025-10-13 22:02:33 +08:00
a208302cb9 添加依赖
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 21:35:44 +08:00
01ffa451bb Merge pull request '投票功能和二维码生成(从 testpilot 移植)' (#26) from wzq02/konabot:master into master
Some checks failed
continuous-integration/drone/push Build is failing
Reviewed-on: #26
2025-10-13 21:33:03 +08:00
2b6c2e84bd Merge branch 'master' into master 2025-10-13 21:31:40 +08:00
4f0a9af2dc 成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 21:10:18 +08:00
4a4aa6b243 Add submodule: THUOCL 2025-10-13 21:10:05 +08:00
4c8625ae02 小完善(添加对应的 man) 2025-10-13 21:08:32 +08:00
c5f820a1f9 投票功能和二维码生成(从 testpilot 移植) 2025-10-13 20:49:56 +08:00
a3dd2dbbda 添加更加宽松的匹配规则
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 18:28:32 +08:00
8d4f74dafe 添加 Bilibili 视频解析的插件
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 18:12:39 +08:00
7c1bac64c9 修复在 log 文件中没有空格的问题
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 17:03:39 +08:00
e09fa13d0f 修复 Notify 的通知信息
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 16:55:50 +08:00
990a622cf6 添加一些日志用于调试 Notify 功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 11:48:22 +08:00
6144563d4d 添加 giftool 倒放选项 2025-10-13 11:34:06 +08:00
a6413c9809 添加报错和日志
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 21:52:35 +08:00
af566888ab fix2
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 15:24:49 +08:00
e72bc283f8 调整 giftool
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-12 15:18:07 +08:00
45 changed files with 703076 additions and 287 deletions

View File

@ -10,6 +10,10 @@ trigger:
- master
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: 构建 Docker 镜像
image: plugins/docker:latest
privileged: true
@ -50,6 +54,10 @@ trigger:
- tag
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: 构建并推送 Release Docker 镜像
image: plugins/docker:latest
privileged: true

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "assets/lexicon/THUOCL"]
path = assets/lexicon/THUOCL
url = https://github.com/thunlp/THUOCL.git

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 841 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 821 KiB

BIN
assets/img/meme/snaur_1_base.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

BIN
assets/img/meme/snaur_1_top.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1008 KiB

1
assets/json/poll.json Normal file
View File

@ -0,0 +1 @@
{"poll": {"0": {"create": 1760357553, "expiry": 1760443953, "options": {"0": "此方bot", "1": "testpilot", "2": "小镜bot", "3": "可怜bot"}, "polldata": {}, "qq": "2975499623", "title": "我~是~谁~"}}}

1
assets/lexicon/THUOCL Submodule

Submodule assets/lexicon/THUOCL added at a30ce79d89

1
assets/lexicon/ci.json Normal file

File diff suppressed because one or more lines are too long

360393
assets/lexicon/common.txt Normal file

File diff suppressed because it is too large Load Diff

339847
assets/lexicon/idiom.json Normal file

File diff suppressed because it is too large Load Diff

14
bot.py
View File

@ -7,6 +7,10 @@ from nonebot.adapters.discord import Adapter as DiscordAdapter
from nonebot.adapters.minecraft import Adapter as MinecraftAdapter
from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
from konabot.common.log import init_logger
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.path import LOG_PATH
dotenv.load_dotenv()
env = os.environ.get("ENVIRONMENT", "prod")
env_enable_console = os.environ.get("ENABLE_CONSOLE", "none")
@ -14,7 +18,16 @@ env_enable_qq = os.environ.get("ENABLE_QQ", "none")
env_enable_discord = os.environ.get("ENABLE_DISCORD", "none")
env_enable_minecraft = os.environ.get("ENABLE_MINECRAFT", "none")
def main():
if env.upper() == 'DEBUG' or env.upper() == 'DEV':
console_log_level = 'DEBUG'
else:
console_log_level = 'INFO'
init_logger(LOG_PATH, [
BotExceptionMessage,
], console_log_level=console_log_level)
nonebot.init()
driver = nonebot.get_driver()
@ -33,6 +46,7 @@ def main():
# nonebot.load_builtin_plugin("echo")
nonebot.load_plugins("konabot/plugins")
nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
nonebot.run()

View File

@ -0,0 +1,36 @@
import asyncio
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Generic, TypeVar
from pydantic import BaseModel, ValidationError
T = TypeVar("T", bound=BaseModel)
class DataManager(Generic[T]):
def __init__(self, cls: type[T], fp: Path) -> None:
self.cls = cls
self.fp = fp
self._aio_lock = asyncio.Lock()
self._data: T | None = None
def load(self) -> T:
if not self.fp.exists():
return self.cls()
try:
return self.cls.model_validate_json(self.fp.read_text())
except ValidationError:
return self.cls()
def save(self, data: T):
self.fp.write_text(data.model_dump_json())
@asynccontextmanager
async def get_data(self):
await self._aio_lock.acquire()
self._data = self.load()
yield self._data
self.save(self._data)
self._data = None
self._aio_lock.release()

80
konabot/common/log.py Normal file
View File

@ -0,0 +1,80 @@
import sys
from pathlib import Path
from typing import TYPE_CHECKING, List, Type
from loguru import logger
if TYPE_CHECKING:
from loguru import Record
def file_exception_filter(
record: "Record",
ignored_exceptions: tuple[Type[Exception], ...]
) -> bool:
"""
一个自定义的 Loguru 过滤器函数。
如果日志记录包含异常信息,并且该异常的类型在 ignored_exceptions 中,则返回 False忽略
否则,返回 True允许记录
"""
exception_info = record.get("exception")
if exception_info:
exception_type = exception_info[0]
if exception_type and issubclass(exception_type, ignored_exceptions):
return False
return True
def init_logger(
log_dir: Path,
ignored_exceptions: List[Type[Exception]],
console_log_level: str = "INFO",
) -> None:
"""
配置全局 Loguru Logger。
Args:
log_dir (Path): 存放日志文件的文件夹路径,会自动创建。
ignored_exceptions (List[Type[Exception]]): 在 WARNING 级别文件日志中需要忽略的异常类型列表。
"""
ignored_exceptions_tuple = tuple(ignored_exceptions)
logger.remove()
log_dir.mkdir(parents=True, exist_ok=True)
logger.add(
sys.stderr,
level=console_log_level,
colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
)
info_log_path = log_dir / "log.log"
logger.add(
str(info_log_path),
level="INFO",
rotation="10 MB",
retention="7 days",
enqueue=True,
backtrace=False,
diagnose=False,
)
warning_error_log_path = log_dir / "error.log"
logger.add(
str(warning_error_log_path),
level="WARNING",
rotation="10 MB",
compression="zip",
enqueue=True,
filter=lambda record: file_exception_filter(record, ignored_exceptions_tuple),
backtrace=True,
diagnose=True,
)
logger.info("Loguru Logger 初始化完成!")
logger.info(f"控制台日志级别: {console_log_level}")

295
konabot/common/longtask.py Normal file
View File

@ -0,0 +1,295 @@
from __future__ import annotations
from contextlib import asynccontextmanager
import datetime
import json
from typing import Annotated, Any, Callable, Coroutine, cast
import asyncio as asynkio
import uuid
from loguru import logger
import nonebot
from nonebot.params import Depends
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters import Bot as BaseBot
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot.adapters.onebot.v11 import GroupMessageEvent as OBGroupMessageEvent
from nonebot.adapters.onebot.v11 import PrivateMessageEvent as OBPrivateMessageEvent
from nonebot.adapters.console import Bot as ConsoleBot
from nonebot.adapters.console import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord import MessageEvent as DCMessageEvent
from nonebot.adapters.discord import Bot as DCBot
from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, ValidationError
from .path import DATA_PATH
LONGTASK_DATA_DIR = DATA_PATH / "longtasks.json"
QQ_PRIVATE_CHAT_CHANNEL_PREFIX = "_CHANNEL_QQ_PRIVATE_"
class LongTaskTarget(BaseModel):
"""
用于定义长期任务的目标沟通对象,一般通过 DepLongTaskTarget 依赖注入获取:
```python
@cmd.handle()
async def _(target: DepLongTaskTarget):
...
```
"""
platform: str
"沟通对象所在的平台"
self_id: str
"进行沟通的对象自己的 ID"
channel_id: str
"沟通对象所在的群或者 Discord Channel。若为空则代表是私聊"
target_id: str
"沟通对象的 ID"
async def send_message(self, msg: UniMessage, at: bool = True) -> bool:
try:
bot = nonebot.get_bot(self.self_id)
except KeyError:
logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}")
return False
if self.platform == "qq":
if not isinstance(bot, OBBot):
logger.warning(
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
self.platform
} BOT_CLASS={bot.__class__.__name__}"
)
return False
if self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX) or not self.channel_id.strip():
# 私聊模式
await bot.send_private_msg(
user_id=int(self.target_id),
message=cast(Any, await msg.export(bot)),
auto_escape=False,
)
return True
else:
if at:
msg = UniMessage().at(self.target_id).text(" ") + msg
await bot.send_group_msg(
group_id=int(self.channel_id),
message=cast(Any, await msg.export(bot)),
auto_escape=False,
)
return True
if self.platform == "console":
if not isinstance(bot, ConsoleBot):
logger.warning(
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
self.platform
} BOT_CLASS={bot.__class__.__name__}"
)
return False
await bot.send_message(self.channel_id, cast(Any, await msg.export()))
return True
if self.platform == "discord":
if not isinstance(bot, DCBot):
logger.warning(
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
self.platform
} BOT_CLASS={bot.__class__.__name__}"
)
return False
await bot.send_to(
channel_id=int(self.channel_id),
message=cast(
Any, await (UniMessage().at(self.target_id) + msg).export()
),
tts=False,
)
return True
logger.warning(f"没有一个平台是期望的平台 PLATFORM={self.platform}")
return False
class LongTask(BaseModel):
uuid: str
data_json: str
target: LongTaskTarget
callback: str
deadline: datetime.datetime
_aio_task: asynkio.Task | None = None
async def run(self):
now = datetime.datetime.now()
if self.deadline < now:
await self._run_task()
return
await asynkio.sleep((self.deadline - now).total_seconds())
async with longtask_data() as data:
if self.uuid not in data.to_handle[self.callback]:
return
await self._run_task()
async def _run_task(self):
hdl = registered_long_task_handler.get(self.callback, None)
if hdl is None:
logger.warning(
f"Callback {self.callback} 未曾被注册,但是被期待调用,已忽略"
)
async with longtask_data() as datafile:
del datafile.to_handle[self.callback][self.uuid]
datafile.unhandled.setdefault(self.callback, []).append(self)
return
success = False
try:
await hdl(self)
success = True
except Exception as e:
logger.exception(e)
async with longtask_data() as datafile:
del datafile.to_handle[self.callback][self.uuid]
if not success:
datafile.unhandled.setdefault(self.callback, []).append(self)
logger.info(
f"LongTask 执行失败 UUID={self.uuid} callback={self.callback}"
)
else:
logger.info(
f"LongTask 工作完成 UUID={self.uuid} callback={self.callback}"
)
def clean(self):
self._aio_task = None
@property
def data(self):
return json.loads(self.data_json)
async def start(self):
self._aio_task = asynkio.Task(self.run())
self._aio_task.add_done_callback(lambda _: self.clean())
class LongTaskModuleData(BaseModel):
to_handle: dict[str, dict[str, LongTask]]
unhandled: dict[str, list[LongTask]]
async def get_long_task_target(event: BaseEvent, bot: BaseBot) -> LongTaskTarget | None:
if isinstance(event, OBGroupMessageEvent):
return LongTaskTarget(
platform="qq",
self_id=str(event.self_id),
channel_id=str(event.group_id),
target_id=str(event.user_id),
)
if isinstance(event, OBPrivateMessageEvent):
return LongTaskTarget(
platform="qq",
self_id=str(event.self_id),
channel_id=f"{QQ_PRIVATE_CHAT_CHANNEL_PREFIX}{event.self_id}",
target_id=str(event.user_id),
)
if isinstance(event, ConsoleMessageEvent):
return LongTaskTarget(
platform="console",
self_id=str(event.self_id),
channel_id=str(event.channel.id),
target_id=str(event.user.id),
)
if isinstance(event, DCMessageEvent):
self_id = ""
if isinstance(bot, DCBot):
self_id = str(bot.self_id)
return LongTaskTarget(
platform="discord",
self_id=self_id,
channel_id=str(event.channel_id),
target_id=str(event.user_id),
)
_TaskHandler = Callable[[LongTask], Coroutine[Any, Any, Any]]
registered_long_task_handler: dict[str, _TaskHandler] = {}
longtask_lock = asynkio.Lock()
def handle_long_task(callback_id: str):
def _decorator(func: _TaskHandler):
assert callback_id not in registered_long_task_handler, (
"有长任务的 ID 出现冲突,请换个名字!"
)
registered_long_task_handler[callback_id] = func
return func
return _decorator
def _load_longtask_data() -> LongTaskModuleData:
try:
txt = LONGTASK_DATA_DIR.read_text()
return LongTaskModuleData.model_validate_json(txt)
except (FileNotFoundError, ValidationError) as e:
logger.info(f"取得 LongTask 数据时出现问题:{e}")
return LongTaskModuleData(
to_handle={},
unhandled={},
)
def _save_longtask_data(data: LongTaskModuleData):
LONGTASK_DATA_DIR.write_text(data.model_dump_json())
@asynccontextmanager
async def longtask_data():
async with longtask_lock:
data = _load_longtask_data()
yield data
_save_longtask_data(data)
async def create_longtask(
handler: str,
data: dict[str, Any],
target: LongTaskTarget,
deadline: datetime.datetime,
):
task = LongTask(
uuid=str(uuid.uuid4()),
data_json=json.dumps(data),
target=target,
callback=handler,
deadline=deadline,
)
logger.info(f"创建了新的 LongTask UUID={task.uuid} CALLBACK={task.callback}")
await task.start()
async with longtask_data() as d:
d.to_handle.setdefault(handler, {})[task.uuid] = task
return task
async def init_longtask():
counter = 0
req: set[str] = set()
async with longtask_data() as data:
for v in data.to_handle.values():
for t in v.values():
await t.start()
counter += 1
req.add(t.callback)
logger.info(f"LongTask 启动了任务 数量={counter} 期望的门类=[{','.join(req)}]")
DepLongTaskTarget = Annotated[LongTaskTarget, Depends(get_long_task_target)]

View File

@ -0,0 +1,34 @@
from typing import cast
from nonebot import get_bot, get_plugin_config, logger
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from nonebot.rule import Rule
from pydantic import BaseModel
class WZQConflictConfig(BaseModel):
wzq_bot_qq: int = 0
config = get_plugin_config(WZQConflictConfig)
async def no_wzqbot(evt: BaseEvent):
if config.wzq_bot_qq <= 0:
return True
if not isinstance(evt, GroupMessageEvent):
return True
gid = evt.group_id
sid = evt.self_id
bot = cast(OnebotBot, get_bot(str(sid)))
members = await bot.get_group_member_list(group_id=gid)
members = set((m.get("user_id", -1) for m in members))
if config.wzq_bot_qq in members:
return False
return True
no_wzqbot_rule = Rule(no_wzqbot)

View File

@ -4,9 +4,18 @@ ASSETS_PATH = Path(__file__).resolve().parent.parent.parent / "assets"
FONTS_PATH = ASSETS_PATH / "fonts"
SRC_PATH = Path(__file__).resolve().parent.parent
DATA_PATH = SRC_PATH.parent / "data"
LOG_PATH = DATA_PATH / "logs"
DOCS_PATH = SRC_PATH / "docs"
DOCS_PATH_MAN1 = DOCS_PATH / "user"
DOCS_PATH_MAN3 = DOCS_PATH / "lib"
DOCS_PATH_MAN7 = DOCS_PATH / "concepts"
DOCS_PATH_MAN8 = DOCS_PATH / "sys"
if not DATA_PATH.exists():
DATA_PATH.mkdir()
if not LOG_PATH.exists():
LOG_PATH.mkdir()

View File

@ -0,0 +1,2 @@
关于罗文和洛温:
AdoreLowen 希望和洛温阿特金森区分,所以最好就不要叫他洛温了!此方 BOT 会在一些群提醒叫错了的人。

View File

@ -5,17 +5,17 @@
giftool [图片] [选项]
示例
回复一张 GIF 并发送:
`giftool --ss 1.5 -t 2.0`
回复一张 GIF 并发送:
`giftool --ss 1.5 -t 2.0`
从 1.5 秒处开始,截取 2 秒长度的片段。
`giftool [图片] --ss 0:10 -to 0:15`
`giftool [图片] --ss 0:10 -to 0:15`
截取从 10 秒到 15 秒之间的片段(支持 MM:SS 或 HH:MM:SS 格式)。
`giftool [图片] --frames:v 10`
`giftool [图片] --frames:v 10`
将整张 GIF 均匀抽帧,最终保留 10 帧。
`giftool [图片] --ss 2 --frames:v 5`
`giftool [图片] --ss 2 --frames:v 5`
从第 2 秒开始截取,并将结果抽帧为 5 帧。
参数说明
@ -45,8 +45,8 @@
- 帧数必须为正整数(> 0
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
--s <速度>(可选)
- 调整 gif 图的速度
--speed <速度>(可选)
- 调整 gif 图的速度。若为负数,则代表倒放
使用方式
1. 发送指令前,请确保:

View File

@ -0,0 +1,15 @@
指令介绍
ntfy - 配置使用 ntfy 来更好地为你通知此方 BOT 代办
指令示例
`ntfy 创建`
创建一个随机的 ntfy 订阅主题来提醒代办,此方 Bot 将会给你使用指引。你可以前往 https://ntfy.sh/ 官网下载 ntfy APP或者使用网页版 ntfy。
`ntfy 创建 kagami-notice`
创建一个名字含有 kagami-notice 的 ntfy 订阅主题
`ntfy 删除`
清除并不再使用 ntfy 向你通知
另见
提醒我(1) 查询提醒(1) 删除提醒(1)

View File

@ -0,0 +1,8 @@
指令介绍
删除提醒 - 删除在`查询提醒(1)`中查到的提醒
指令示例
`删除提醒 1` 在查询提醒后,删除编号为 1 的提醒
另见
提醒我(1) 查询提醒(1) ntfy(1)

View File

@ -0,0 +1,20 @@
指令介绍
卵总展示 - 让卵总举起你的图片
格式
<引用图片> 卵总展示 [选项]
卵总展示 [选项] <图片>
选项
`--whiteness <number>` 白度
将原图进行指数变换,以调整它的白的程度,默认为 0.0
`--black-level <number>` 黑色等级
将原图减淡,数值越大越淡,范围 0.0-1.0,默认 0.2
`--opacity <number>` 不透明度
将你的图片叠放在图片上的不透明度,默认为 0.8
`--saturation <number>` 饱和度
调整原图的饱和度,应该要大于 0.0,默认为 0.85

View File

@ -0,0 +1,11 @@
指令介绍
发起投票 - 发起一个投票
格式
发起投票 <投票标题> <选项1> <选项2> ...
示例
`发起投票 这是一个投票 A B C` 发起标题为“这是一个投票”选项为“A”、“B”、“C”的投票
说明
投票各个选项之间用空格分隔选项数量为2-15项。投票的默认有效期为24小时。

View File

@ -0,0 +1,12 @@
指令介绍
投票 - 参与已发起的投票
格式
投票 <投票ID/标题> <选项文本>
示例
`投票 1 A` 在ID为1的投票中投给“A”
`投票 这是一个投票 B` 在标题为“这是一个投票”的投票中投给“B”
说明
目前不支持单人多投,每个人只能投一项。

View File

@ -0,0 +1,15 @@
指令介绍
提醒我 - 在指定的时间提醒人事项的工具
使用示例
`下午五点提醒我吃饭`
创建一个下午五点的提醒,提醒你吃饭
`两分钟后提醒我睡觉`
创建一个相对于现在推迟 2 分钟的提醒,提醒你睡觉
`2026年4月25日20点整提醒我生日快乐`
创建一个指定日期和时间的提醒
另见
查询提醒(1) 删除提醒(1) ntfy(1)

View File

@ -0,0 +1,12 @@
指令介绍
查看投票 - 查看已发起的投票
格式
查看投票 <投票ID或标题>
示例
`查看投票 1` 查看ID为1的投票
`查看投票 这是一个投票` 查看标题为“这是一个投票”的投票
说明
投票在进行时,使用此命令可以看到投票的各个选项;投票结束后,则可以看到各项的票数。

View File

@ -0,0 +1,9 @@
指令介绍
查询提醒 - 查询已经创建的提醒
指令格式
`查询提醒` 查询提醒
`查询提醒 2` 查询第二页提醒
另见
提醒我(1) 删除提醒(1) ntfy(1)

View File

@ -0,0 +1,8 @@
指令介绍
生成二维码 - 将文本内容转换为二维码
格式
生成二维码 <文本内容>
示例
`生成二维码 嗨嗨嗨` 生成扫描结果为“嗨嗨嗨”的二维码图片

View File

@ -0,0 +1,25 @@
import asyncio
import random
from typing import cast
from loguru import logger
from nonebot import get_bot, on_request
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from konabot.common.nb.is_admin import cfg as adminConfig
add_request = on_request()
@add_request.handle()
async def _(req: FriendRequestEvent):
bot = cast(OnebotBot, get_bot(str(req.self_id)))
ok_member_ls: set[int] = set()
for group in adminConfig.admin_qq_group:
members = await bot.get_group_member_list(group_id=group)
ok_member_ls |= cast(set[int], set((m.get("user_id") for m in members)))
if req.user_id in ok_member_ls:
await asyncio.sleep(random.randint(5, 10))
await req.approve(bot)
logger.info(f"已经自动同意 {req.user_id} 的好友请求")

View File

@ -0,0 +1,39 @@
import re
from nonebot import on_message
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
matcher_fix = on_message()
pattern = (
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&amp;#93;|&#93;|\])哔哩哔哩).{0,500}"
)
@matcher_fix.handle()
async def _(msg: UniMsg, event: Event):
to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
return
from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event)
# b_url: str
# b_page: str | None
# b_time: str | None
#
# from nonebot_plugin_analysis_bilibili.analysis_bilibili import extract as bilibili_extract
#
# b_url, b_page, b_time = bilibili_extract(to_search)
# if b_url is None:
# return
#
# await matcher_fix.send(await UniMessage().text(b_url).export())

View File

@ -0,0 +1,71 @@
import qrcode
# from pyzbar.pyzbar import decode
# from PIL import Image
import requests
from io import BytesIO
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
async def download_img(url):
resp = requests.get(url.replace("https://multimedia.nt.qq","http://multimedia.nt.qq")) # bim获取QQ的图片时避免SSLv3报错
img_bytes = BytesIO()
with open(img_bytes,"wb") as f:
f.write(resp.content)
return img_bytes
def genqr(data):
qr = qrcode.QRCode(version=1,error_correction=qrcode.constants.ERROR_CORRECT_L,box_size=8,border=4)
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
return img_bytes
"""
async def recqr(url):
im_path = "assets/img/qrcode/2.jpg"
data = await download_img(url)
img = Image.open(im_path)
decoded_objects = decode(img)
data = ""
for obj in decoded_objects:
data += obj.data.decode('utf-8')
return data
"""
gqrc = on_alconna(Alconna(
"genqr",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "请输入你要转换为二维码的文字!"
)],
# UniMessage[]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"}, rule=no_wzqbot_rule)
@gqrc.handle()
async def _(saying: list):
"""
img = await draw_pt("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
# print(saying)
# 二维码识别
if type(saying[0]) == 'image':
data = await recqr(saying[0].data['url'])
if data == "":
await gqrc.send("二维码图片解析失败!")
else:
await gqrc.send(recqr(saying[0].data['url']))
# 二维码生成
else:
"""
# genqr("\n".join(saying))
await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export())

View File

@ -0,0 +1,548 @@
import asyncio as asynkio
import datetime
import json
import secrets
from enum import Enum
from pathlib import Path
from typing import Optional
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (
Alconna,
Args,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
DATA_FILE_PATH = (
Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
)
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
return []
try:
return json.loads(DATA_FILE_PATH.read_text())
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
def is_idiom_game_banned(group_id: str) -> bool:
banned_ids = load_banned_ids()
return group_id in banned_ids
def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
class TryStartState(Enum):
STARTED = 0
ALREADY_PLAYING = 1
NO_REMAINING_TIMES = 2
class TryStopState(Enum):
STOPPED = 0
NOT_PLAYING = 1
class TryVerifyState(Enum):
VERIFIED = 0
VERIFIED_AND_REAL = 1
NOT_IDIOM = 2
WRONG_FIRST_CHAR = 3
VERIFIED_BUT_NO_NEXT = 4
VERIFIED_GAME_END = 5
class IdiomGame:
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典
AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典
__inited = False
def __init__(self, group_id: str):
# 初始化一局游戏
self.group_id = ""
self.now_playing = False
self.score_board = {}
self.last_idiom = ""
self.last_char = ""
self.remain_playing_times = 3
self.last_play_date = ""
self.all_buff_score = 0
self.lock = asynkio.Lock()
self.remain_rounds = 0 # 剩余回合数
IdiomGame.INSTANCE_LIST[group_id] = self
def be_able_to_play(self) -> bool:
if self.last_play_date != datetime.date.today():
self.last_play_date = datetime.date.today()
self.remain_playing_times = 1
if self.remain_playing_times > 0:
self.remain_playing_times -= 1
return True
return False
def choose_start_idiom(self) -> str:
"""
随机选择一个成语作为起始成语
"""
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self.choose_start_idiom()
return self.last_idiom
@classmethod
def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState:
cls.init_lexicon()
if not cls.INSTANCE_LIST.get(group_id):
cls(group_id)
instance = cls.INSTANCE_LIST[group_id]
if instance.now_playing:
return TryStartState.ALREADY_PLAYING
if not instance.be_able_to_play() and not force:
return TryStartState.NO_REMAINING_TIMES
instance.now_playing = True
return TryStartState.STARTED
def start_game(self, rounds: int = 100):
self.now_playing = True
self.remain_rounds = rounds
self.choose_start_idiom()
@classmethod
def try_stop_game(cls, group_id: str) -> TryStopState:
if not cls.INSTANCE_LIST.get(group_id):
return TryStopState.NOT_PLAYING
instance = cls.INSTANCE_LIST[group_id]
if not instance.now_playing:
return TryStopState.NOT_PLAYING
instance.now_playing = False
return TryStopState.STOPPED
def clear_score_board(self):
self.score_board = {}
self.last_char = ""
def get_score_board(self) -> dict:
return self.score_board
def get_all_buff_score(self) -> int:
return self.all_buff_score
async def skip_idiom(self, buff_score: int = -100) -> str:
"""
跳过当前成语,选择下一个成语
"""
async with self.lock:
self._skip_idiom_async()
self.add_buff_score(buff_score)
return self.last_idiom
def _skip_idiom_async(self) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self._skip_idiom_async()
return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
"""
用户发送成语
"""
async with self.lock:
state = self._verify_idiom(idiom, user_id)
return state
def is_nextable(self, last_char: str) -> bool:
"""
判断是否有成语可以接
"""
return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR
def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
# 新成语的首字应与上一条成语的尾字相同
if idiom[0] != self.last_char:
return TryVerifyState.WRONG_FIRST_CHAR
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
self.add_score(user_id, -0.1)
return TryVerifyState.NOT_IDIOM
self.last_idiom = idiom
self.last_char = idiom[-1]
self.add_score(user_id, 1)
if idiom in IdiomGame.ALL_IDIOMS:
self.add_score(user_id, 4) # 再加 4 分
self.remain_rounds -= 1
if self.remain_rounds <= 0:
self.now_playing = False
return TryVerifyState.VERIFIED_GAME_END
if not self.is_nextable(self.last_char):
# 没有成语可以接了,自动跳过
self._skip_idiom_async()
return TryVerifyState.VERIFIED_BUT_NO_NEXT
if idiom in IdiomGame.ALL_IDIOMS:
return TryVerifyState.VERIFIED_AND_REAL # 真实成语
return TryVerifyState.VERIFIED
def get_user_score(self, user_id: str) -> float:
if user_id not in self.score_board:
return 0
# 避免浮点数精度问题导致过长
handled_score = round(self.score_board[user_id]["score"], 1)
return handled_score
def add_score(self, user_id: str, score: int):
if user_id not in self.score_board:
self.score_board[user_id] = {"name": user_id, "score": 0}
self.score_board[user_id]["score"] += score
def add_buff_score(self, score: int):
self.all_buff_score += score
def get_playing_state(self) -> bool:
return self.now_playing
def get_last_char(self) -> str:
return self.last_char
@classmethod
def random_idiom_starting_with(cls, first_char: str) -> Optional[str]:
cls.init_lexicon()
if first_char not in cls.IDIOM_FIRST_CHAR:
return None
return secrets.choice(cls.IDIOM_FIRST_CHAR[first_char])
@classmethod
def init_lexicon(cls):
if cls.__inited:
return
cls.__inited = True
# 成语大表
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
ALL_IDIOMS_INFOS = json.load(f)
# 词语大表
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
cls.ALL_WORDS = json.load(f)
COMMON_WORDS = []
# 读取 COMMON 词语大表
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
COMMON_WORDS.append(word)
# 读取 THUOCL 成语库
with open(
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt",
"r",
encoding="utf-8",
) as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename,
"r",
encoding="utf-8",
) as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
# 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
# 其他四字词语表,仅表示可以有这个词
cls.ALL_WORDS = (
[word for word in cls.ALL_WORDS if len(word) == 4]
+ THUOCL_WORDS
+ COMMON_WORDS
)
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
# 根据成语大表,划分出成语首字字典
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
if idiom[0] not in cls.IDIOM_FIRST_CHAR:
cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
# 根据真正的成语大表,划分出有效成语首字字典
for idiom in cls.ALL_IDIOMS:
if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = []
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna(
Alconna(
"我要玩成语接龙",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def play_game(
event: BaseEvent,
target: DepLongTaskTarget,
force=False,
rounds: Optional[int] = 100,
):
# group_id = str(event.get_session_id())
group_id = target.channel_id
if is_idiom_game_banned(group_id):
await evt.send(
await UniMessage().text("本群已被禁止使用成语接龙功能!").export()
)
return
rounds = rounds or 0
if rounds <= 0:
await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export())
return
state = IdiomGame.try_start_game(group_id, force)
if state == TryStartState.ALREADY_PLAYING:
await evt.send(
await UniMessage()
.text("当前已有成语接龙游戏在进行中,请稍后再试!")
.export()
)
return
if state == TryStartState.NO_REMAINING_TIMES:
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
return
await evt.send(
await UniMessage()
.text(
"你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!"
)
.export()
)
instance = IdiomGame.INSTANCE_LIST[group_id]
instance.start_game(rounds)
# 发布成语
await evt.send(
await UniMessage()
.text(f"第一个成语:「{instance.last_idiom}」,请接!")
.export()
)
evt = on_alconna(
Alconna(
"老子就是要玩成语接龙!!!",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def force_play_game(
event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100
):
await play_game(event, target, force=True, rounds=rounds)
async def end_game(event: BaseEvent, group_id: str):
instance = IdiomGame.INSTANCE_LIST[group_id]
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
score_board = instance.get_score_board()
if len(score_board) == 0:
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(
score_board.items(), key=lambda x: x[1]["score"], reverse=True
)
for i, (user_id, info) in enumerate(sorted_score):
result_text += (
f"{i + 1}. "
+ UniMessage().at(user_id)
+ f": {round(info['score'] + instance.get_all_buff_score(), 1)}\n"
)
await evt.send(await result_text.export())
instance.clear_score_board()
evt = on_alconna(
Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
state = IdiomGame.try_stop_game(group_id)
if state == TryStopState.STOPPED:
# 发送好吧狗图片
# 打开好吧狗本地文件
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
img_data = f.read()
await evt.send(await UniMessage().image(raw=img_data).export())
await end_game(event, group_id)
else:
await evt.send(
await UniMessage().text("当前没有成语接龙游戏在进行中!").export()
)
# 跳过
evt = on_alconna(
Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
avaliable_idiom = IdiomGame.random_idiom_starting_with(instance.get_last_char())
await evt.send(await UniMessage().text(f"你们太菜了全部扣100分明明还可以接「{avaliable_idiom}」的!").export())
idiom = await instance.skip_idiom(-100)
await evt.send(
await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export()
)
def get_user_info(event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
return user_id, user_name
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
user_idiom = msg.extract_plain_text().strip()
user_id, user_name = get_user_info(event)
state = await instance.try_verify_idiom(user_idiom, user_id)
if state == TryVerifyState.WRONG_FIRST_CHAR:
return
if state == TryVerifyState.NOT_IDIOM:
await evt.send(
await UniMessage()
.at(user_id)
.text(" 接不上!这个不一样!你被扣了 0.1 分!")
.export()
)
return
if state == TryVerifyState.VERIFIED:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
elif state == TryVerifyState.VERIFIED_AND_REAL:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
if state == TryVerifyState.VERIFIED_GAME_END:
await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id)
return
if state == TryVerifyState.VERIFIED_BUT_NO_NEXT:
await evt.send(
await UniMessage()
.text("但是,这是条死路!你们全部都要扣 100 分!")
.export()
)
await evt.send(
await UniMessage().text(f"重新抽取成语「{instance.last_idiom}").export()
)
await evt.send(
await UniMessage()
.text(f"下一个成语请以「{instance.get_last_char()}」开头!")
.export()
)
evt = on_alconna(
Alconna("禁止成语接龙"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
add_banned_id(group_id)
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
evt = on_alconna(
Alconna("开启成语接龙"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
remove_banned_id(group_id)
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())

View File

@ -0,0 +1,3 @@
from pathlib import Path
ASSETS = Path(__file__).parent.parent / "assets"

View File

@ -1,10 +1,10 @@
import re
from io import BytesIO
import PIL.Image
from nonebot import on_message
from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
on_alconna)
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image
@ -29,15 +29,17 @@ def parse_timestamp(tx: str) -> float | None:
return res
cmd_giftool = on_alconna(Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
))
cmd_giftool = on_alconna(
Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
)
)
@cmd_giftool.handle()
@ -71,87 +73,126 @@ async def _(
raise BotExceptionMessage("错误:出点时间小于入点")
if frame_count is not None and frame_count <= 0:
raise BotExceptionMessage("错误:帧数量应该大于 0")
if speed_factor <= 0:
raise BotExceptionMessage("错误:--speed 必须大于 0")
if speed_factor == 0:
raise BotExceptionMessage("错误:速度不能为 0")
is_rev = speed_factor < 0
speed_factor = abs(speed_factor)
if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF")
frames = []
durations = []
total_duration = 0.0
##
# 从这里开始,采样整个 GIF 图
frames: list[PIL.Image.Image] = []
durations: list[float] = []
try:
for i in range(getattr(image, "n_frames")):
image.seek(i)
frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒
duration = image.info.get("duration", 100) / 1000
durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError:
pass
if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败")
# 采样结束
def time_to_frame_index(target_time: float) -> int:
if target_time <= 0:
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
##
# 根据开始、结束时间或者帧数量来裁取 GIF 图
start_frame = max(0, start_frame)
end_frame = min(len(frames) - 1, end_frame)
selected_frames = frames[start_frame : end_frame + 1]
selected_durations = durations[start_frame : end_frame + 1]
begin_time = ss or 0
end_time = sum(durations)
end_time = min(begin_time + (t or end_time), to or end_time, end_time)
if frame_count is not None and frame_count > 0:
if frame_count >= len(selected_frames):
pass
accumulated = 0.0
status = 0
sel_frames: list[PIL.Image.Image] = []
sel_durations: list[float] = []
for i in range(len(frames)):
frame = frames[i]
duration = durations[i]
if status == 0:
if accumulated + duration > begin_time:
status = 1
sel_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
elif status == 1:
if accumulated + duration > end_time:
sel_frames.append(frame)
sel_durations.append(end_time - accumulated)
break
sel_frames.append(frame)
sel_durations.append(duration)
accumulated += duration
##
# 加速!
sel_durations = [dur / speed_factor * 1000 for dur in durations]
rframes = []
rdur = []
acc_mod_20 = 0
for i in range(len(sel_frames)):
fr = sel_frames[i]
du = round(sel_durations[i])
if du >= 20:
rframes.append(fr)
rdur.append(int(du))
acc_mod_20 = 0
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
if acc_mod_20 == 0:
rframes.append(fr)
rdur.append(20)
acc_mod_20 += du
else:
acc_mod_20 += du
if acc_mod_20 >= 20:
acc_mod_20 = 0
if len(rframes) == 1 and len(sel_frames) > 1:
rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
rdur.append(20)
##
# 收尾:看看透明度这块
transparency_flag = False
for f in rframes:
if f.mode == "RGBA":
if any(pix < 255 for pix in f.getchannel("A").getdata()):
transparency_flag = True
break
elif f.mode == "P" and "transparency" in f.info:
transparency_flag = True
break
tf = {}
if transparency_flag:
tf["transparency"] = 0
if is_rev:
rframes = rframes[::-1]
rdur = rdur[::-1]
output_img = BytesIO()
adjusted_durations = [
max(10, int(dur / speed_factor)) for dur in selected_durations
]
if selected_frames:
selected_frames[0].save(
if rframes:
rframes[0].save(
output_img,
format="GIF",
save_all=True,
append_images=selected_frames[1:],
duration=adjusted_durations,
append_images=rframes[1:],
duration=rdur,
loop=0,
optimize=False,
disposal=2,
**tf,
)
else:
raise BotExceptionMessage("错误:没有可输出的帧")

View File

@ -0,0 +1,47 @@
import asyncio
# import datetime
from loguru import logger
import nonebot
# from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import (
# DepLongTaskTarget,
# LongTask,
# create_longtask,
# handle_long_task,
init_longtask,
)
driver = nonebot.get_driver()
INIT_FLAG = {"flag": False}
@driver.on_bot_connect
async def _():
if INIT_FLAG["flag"]:
return
INIT_FLAG["flag"] = True
logger.info("有 Bot 连接,等待 5 秒后初始化 LongTask 模块")
await asyncio.sleep(5)
await init_longtask()
logger.info("LongTask 初始化完成")
# cmd1 = nonebot.on_command("test114")
#
#
# @handle_long_task("test_callback_001")
# async def _(lt: LongTask):
# await lt.target.send_message(UniMessage().text("Hello, world!"), at=True)
#
#
# @cmd1.handle()
# async def _(target: DepLongTaskTarget):
# await create_longtask(
# handler="test_callback_001",
# data={},
# target=target,
# deadline=datetime.datetime.now() + datetime.timedelta(seconds=20),
# )

View File

@ -53,7 +53,7 @@ async def _(
if doc is None:
# 检索模式
if section is None:
section_set = {1}
section_set = {1, 7}
else:
section_set = {section}
if 1 in section_set and is_admin(event):
@ -75,7 +75,7 @@ async def _(
else:
# 查阅模式
if section is None:
section_set = {1}
section_set = {1, 7}
else:
section_set = {section}
if 1 in section_set and is_admin(event):

View File

@ -2,25 +2,52 @@ from io import BytesIO
from typing import Iterable, cast
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
UniMessage, UniMsg, on_alconna)
from nonebot_plugin_alconna import (
Alconna,
Args,
Field,
Image,
MultiVar,
Option,
Text,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.nb.extract_image import extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
draw_geimao, draw_mnk,
draw_pt, draw_suan)
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.plugins.memepack.drawing.display import (
draw_cao_display,
draw_snaur_display,
draw_anan_display,
)
from konabot.plugins.memepack.drawing.saying import (
draw_cute_ten,
draw_geimao,
draw_mnk,
draw_pt,
draw_suan,
)
from nonebot.adapters import Bot, Event
from returns.result import Success, Failure
geimao = on_alconna(Alconna(
"给猫说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写给猫说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"给猫哈"})
geimao = on_alconna(
Alconna(
"给猫说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写给猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"给猫哈"},
)
@geimao.handle()
async def _(saying: list[str]):
@ -31,12 +58,21 @@ async def _(saying: list[str]):
await geimao.send(await UniMessage().image(raw=img_bytes).export())
pt = on_alconna(Alconna(
"pt说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写小帕说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"小帕说"})
pt = on_alconna(
Alconna(
"pt说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小帕说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"小帕说"},
)
@pt.handle()
async def _(saying: list[str]):
@ -47,12 +83,21 @@ async def _(saying: list[str]):
await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna(
"re:小?黑白子?说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写黑白子说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
mnk = on_alconna(
Alconna(
"re:小?黑白子?说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写黑白子说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"mnk说"},
)
@mnk.handle()
async def _(saying: list[str]):
@ -63,12 +108,21 @@ async def _(saying: list[str]):
await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna(
"小蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写小蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
suan = on_alconna(
Alconna(
"小蒜说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@suan.handle()
async def _(saying: list[str]):
@ -79,12 +133,21 @@ async def _(saying: list[str]):
await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna(
"大蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写大蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
dsuan = on_alconna(
Alconna(
"大蒜说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写大蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@dsuan.handle()
async def _(saying: list[str]):
@ -95,12 +158,21 @@ async def _(saying: list[str]):
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna(
"乖猫说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写十猫说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"})
cutecat = on_alconna(
Alconna(
"乖猫说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写十猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"十猫说"},
)
@cutecat.handle()
async def _(saying: list[str]):
@ -113,13 +185,14 @@ async def _(saying: list[str]):
cao_display_cmd = on_message()
@cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示":
flag = True
elif text.text.strip() == '':
elif text.text.strip() == "":
continue
else:
return
@ -134,8 +207,71 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
case Failure(err):
await cao_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(' ')
.text(err)
.export()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)
snaur_display_cmd = on_alconna(
Alconna(
"卵总展示",
Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--opacity", Args["opacity", float], alias=["-o"]),
Option("--saturation", Args["saturation", float], alias=["-s"]),
Args["image", Image | None],
)
)
@snaur_display_cmd.handle()
async def _(
img: PIL_Image,
whiteness: float = 0.0,
black_level: float = 0.2,
opacity: float = 0.8,
saturation: float = 0.85,
):
img_processed = await draw_snaur_display(
img,
whiteness,
black_level,
opacity,
saturation,
)
img_data = BytesIO()
img_processed.save(img_data, "PNG")
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())
anan_display_cmd = on_message()
@anan_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
stripped = text.text.strip()
if stripped == "安安展示":
flag = True
elif stripped == "":
continue
else:
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_anan_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await anan_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)

View File

@ -4,10 +4,12 @@ from typing import Any, cast
import cv2
import numpy as np
import PIL.Image
import PIL.ImageChops
import PIL.ImageEnhance
from konabot.common.path import ASSETS_PATH
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
CAO_QUAD_POINTS = np.float32(cast(Any, [
[392, 540],
[577, 557],
@ -15,6 +17,25 @@ CAO_QUAD_POINTS = np.float32(cast(Any, [
[381, 687],
]))
snaur_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_base.png")
snaur_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_top.png")
SNAUR_RATIO = (1 / 2) ** .5
SNAUR_QUAD_POINTS = np.float32(cast(Any, [
[0, 466 ],
[673, 471 ],
[640, 1196],
[106, 1280],
]))
anan_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_base.png")
anan_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_top.png")
ANAN_QUAD_POINTS = np.float32([
[157, 585],
[793, 599],
[781, 908],
[160, 908]
])
def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB"))
h, w = src.shape[:2]
@ -43,3 +64,136 @@ def _draw_cao_display(image: PIL.Image.Image):
async def draw_cao_display(image: PIL.Image.Image):
return await asyncio.to_thread(_draw_cao_display, image)
def _draw_snaur_display(
image : PIL.Image.Image,
whiteness : float = 0.0 ,
black_level: float = 0.2 ,
opacity : float = 0.8 ,
saturation : float = 0.85 ,
):
src = np.array(image.convert("RGBA"))
_h, _w = src.shape[:2]
if _w / _h < SNAUR_RATIO:
_w_target = _w
_h_target = int(_w / SNAUR_RATIO)
else:
_w_target = int(_h * SNAUR_RATIO)
_h_target = _h
x_center = _w / 2
y_center = _h / 2
x1 = int(x_center - _w_target / 2)
x2 = int(x_center + _w_target / 2)
y1 = int(y_center - _h_target / 2)
y2 = int(y_center + _h_target / 2)
src = src[y1:y2, x1:x2, :]
h, w = src.shape[:2]
src_points = np.float32(cast(Any, [
[0, 0],
[w, 0],
[w, h],
[0, h],
]))
dst_points = SNAUR_QUAD_POINTS
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
output_size = snaur_image_top.size
output_w, output_h = output_size
warped = cv2.warpPerspective(
src,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
result = PIL.Image.fromarray(warped, 'RGBA')
r, g, b, a = result.split()
a = a.point(lambda p: int(p * opacity))
f2 = lambda p: int(
((p / 255) ** (2 ** whiteness)) * 255 * (1 - black_level)
+ 255 * black_level
)
r = r.point(f2)
g = g.point(f2)
b = b.point(f2)
result = PIL.Image.merge('RGBA', (r, g, b, a))
enhancer = PIL.ImageEnhance.Color(result)
result = enhancer.enhance(saturation)
result = PIL.ImageChops.multiply(result, snaur_image_base)
result = PIL.Image.alpha_composite(snaur_image_base, result)
result = PIL.Image.alpha_composite(result, snaur_image_top)
return result
async def draw_snaur_display(
image : PIL.Image.Image,
whiteness : float = 0.0 ,
black_level: float = 0.2 ,
opacity : float = 0.8 ,
saturation : float = 0.85 ,
) -> PIL.Image.Image:
return await asyncio.to_thread(
_draw_snaur_display, image, whiteness, black_level,
opacity, saturation,
)
def _draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
src = np.array(image.convert("RGBA"))
h, w = src.shape[:2]
src_points = np.float32([
[0, 0],
[w, 0],
[w, h],
[0, h]
])
dst_points = ANAN_QUAD_POINTS
M = cv2.getPerspectiveTransform(src_points, dst_points)
output_w, output_h = anan_image_top.size
src_rgb = cv2.cvtColor(src, cv2.COLOR_RGBA2RGB) if src.shape[2] == 4 else src
warped_rgb = cv2.warpPerspective(
src_rgb,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
mask = np.zeros((h, w), dtype=np.uint8)
mask[:, :] = 255
warped_mask = cv2.warpPerspective(
mask,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0
)
warped_rgba = cv2.cvtColor(warped_rgb, cv2.COLOR_RGB2RGBA)
warped_rgba[:, :, 3] = warped_mask
warped_pil = PIL.Image.fromarray(warped_rgba, 'RGBA')
result = PIL.Image.alpha_composite(anan_image_base, warped_pil)
result = PIL.Image.alpha_composite(result, anan_image_top)
return result
async def draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
return await asyncio.to_thread(_draw_anan_display, image)

View File

@ -0,0 +1,44 @@
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from nonebot_plugin_alconna import UniMsg, UniMessage
from pydantic import BaseModel
class NoLuowenConfig(BaseModel):
plugin_noluowen_qqid: int = -1
plugin_noluowen_enable_group: list[int] = []
config = nonebot.get_plugin_config(NoLuowenConfig)
async def is_luowen_mentioned(evt: GroupMessageEvent, msg: UniMsg) -> bool:
if config.plugin_noluowen_qqid <= 0:
return False
if evt.user_id == config.plugin_noluowen_qqid:
return False
if evt.group_id not in config.plugin_noluowen_enable_group:
return False
txt = msg.extract_plain_text()
if "洛温" not in txt:
return False
if "罗文" in txt:
return False
if "阿特金森" in txt:
return False
return True
evt_luowen_mentioned = nonebot.on_message(rule=is_luowen_mentioned)
@evt_luowen_mentioned.handle()
async def _(evt: GroupMessageEvent, bot: Bot):
msg = (
UniMessage()
.reply(str(evt.message_id))
.at(str(config.plugin_noluowen_qqid))
.text(" 好像有人念错了你的 ID")
)
await evt_luowen_mentioned.send(await msg.export(bot=bot))

View File

@ -0,0 +1,168 @@
import json, time
from nonebot.rule import Rule
from nonebot_plugin_alconna import Alconna, Args, Field, MultiVar, on_alconna
from nonebot.adapters.onebot.v11 import Event
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
from konabot.common.path import ASSETS_PATH, DATA_PATH
POLL_TEMPLATE_FILE = ASSETS_PATH / "json" / "poll.json"
POLL_DATA_FILE = DATA_PATH / "poll.json"
if not POLL_DATA_FILE.exists():
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
async def createpoll(title,qqid,options):
polllength = len(poll_list)
pollid = str(polllength)
poll_create = int(time.time())
poll_expiry = poll_create + 24*3600
polljson = {"title":title,"qq":qqid,"create":poll_create,"expiry":poll_expiry,"options":options,"polldata":{}}
poll_list[pollid] = polljson
writeback()
return pollid
def getpolldata(pollid_or_title):
# 初始化“被指定的投票项目”
thepoll = {}
polnum = -1
# 判断是ID还是标题
if str.isdigit(pollid_or_title):
if pollid_or_title in poll_list:
thepoll = poll_list[pollid_or_title]
polnum = pollid_or_title
else:
return [{},-1]
else:
for i in poll_list:
if poll_list[i]["title"] == pollid_or_title:
thepoll = poll_list[i]
polnum = i
break
if polnum == -1:
return [{},-1]
return [thepoll,polnum]
def writeback():
# file = open(poll_json_path,"w",encoding="utf-8")
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
POLL_DATA_FILE.write_text(json.dumps({
'poll': poll_list,
}, ensure_ascii=False, sort_keys=True))
async def pollvote(polnum,optionnum,qqnum):
optiond = poll_list[polnum]["polldata"]
if optionnum not in optiond:
poll_list[polnum]["polldata"][optionnum] = []
poll_list[polnum]["polldata"][optionnum].append(qqnum)
writeback()
return
poll = on_alconna(Alconna(
"poll",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:发起投票 <投票标题> <选项1> <选项2> ..."
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"}, rule=no_wzqbot_rule)
@poll.handle()
async def _(saying: list, event: Event):
if (len(saying) < 3):
await poll.send("请提供至少两个投票选项!")
elif (len(saying) < 17):
title = saying[0]
saying.remove(title)
options = {}
for i in saying:
options[saying.index(i)] = i
qqid = event.get_user_id()
result = await createpoll(title,qqid,options)
await poll.send("已创建投票。回复 查看投票 "+str(result)+" 查看该投票。")
else:
await poll.send("投票选项太多了请减少到15个选项以内。")
viewpoll = on_alconna(Alconna(
"viewpoll",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "请指定投票ID或标题。用法查看投票 <投票ID或标题>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"}, rule=no_wzqbot_rule)
@viewpoll.handle()
async def _(saying: list):
# 参数投票ID或者标题
# pollid_or_title = params[0]
polldata = getpolldata(saying[0])
# 被指定的投票项目
thepoll = polldata[0]
polnum = polldata[1]
if polnum == -1:
await viewpoll.send("该投票不存在!")
else:
# 检查投票是否已结束
pollended = 0
if time.time() > thepoll["expiry"]:
pollended = 1
# 回复内容
reply = "投票:"+thepoll["title"]+" [ID: "+str(polnum)+"]"
# 如果投票已结束
if pollended:
for i in thepoll["options"]:
reply += "\n"
# 检查该选项是否有人投票
if i in thepoll["polldata"]:
reply += "["+str(len(thepoll["polldata"][i]))+" 票]"
else:
reply += "[0 票]"
reply += " "+thepoll["options"][i]
reply += "\n\n此投票已结束。"
else:
for i in thepoll["options"]:
reply += "\n"
reply += "- "+thepoll["options"][i]
# reply += "\n\n小提示向bot私聊发送 /viewpoll "+str(polnum)+" 可查看已投票数哦!"
reply += "\n\n发送 投票 "+str(polnum)+" <选项文本> 即可参与投票!"
await viewpoll.send(reply)
vote = on_alconna(Alconna(
"vote",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:投票 <投票ID/标题> <选项文本>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"}, rule=no_wzqbot_rule)
@vote.handle()
async def _(saying: list, event: Event):
if (len(saying) < 2):
await vote.send("请指定投给哪一项!")
else:
polldata = getpolldata(saying[0])
# 被指定的投票项目
thepoll = polldata[0]
polnum = polldata[1]
if polnum == -1:
await viewpoll.finish("没有找到这个投票!")
# thepolldata = thepoll["polldata"]
# 查找对应的投票项
optionnum = -1
for i in thepoll["options"]:
if saying[1] == thepoll["options"][i]:
optionnum = i
break
if optionnum == -1:
reply = "此投票里面没有这一项!可用的选项有:"
for i in thepoll["options"]:
reply += "\n"
reply += "- "+thepoll["options"][i]
await viewpoll.send(reply)
# 检查是否符合投票条件该qq号是否已参与过投票、投票是否过期
elif time.time() > thepoll["expiry"]:
await viewpoll.send("此投票已经结束!请发送 查看投票 "+polnum+" 查看结果。")
elif str(event.get_user_id()) in str(thepoll["polldata"]):
await viewpoll.send("你已参与过此投票!请在投票结束后发送 查看投票 "+polnum+" 查看结果。")
# 写入项目
else:
await pollvote(polnum,optionnum,event.get_user_id())
await viewpoll.send("投票成功!你投给了 "+saying[1])

View File

@ -309,7 +309,7 @@ async def generate_dice_image(number: str) -> BytesIO:
if(len(text) > 50):
output = BytesIO()
push_image = Image.open(ASSETS_PATH / "img" / "dice" / "stick.png")
push_image.save(output,format='PNG')
push_image.save(output,format='GIF')
output.seek(0)
return output

View File

@ -1,30 +1,39 @@
import asyncio
import aiohttp
import asyncio as asynkio
import datetime
from math import ceil
from pathlib import Path
from typing import Any, Literal, cast
from typing import Any, Literal
import nanoid
import nonebot
import ptimeparse
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event
from nonebot.adapters.console import Bot as ConsoleBot
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11.event import \
GroupMessageEvent as OnebotV11GroupMessageEvent
from nonebot.adapters.onebot.v11.event import \
MessageEvent as OnebotV11MessageEvent
from nonebot_plugin_alconna import UniMessage, UniMsg
from nonebot import get_plugin_config, on_message
from nonebot.adapters import Bot, Event
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot.adapters.console import Bot as CBot
from nonebot.adapters.discord import Bot as DCBot
from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg, on_alconna
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget, LongTask, LongTaskTarget, create_longtask, handle_long_task, longtask_data
evt = on_message()
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json"
DATA_FILE_LOCK = asyncio.Lock()
DATA_FILE_LOCK = asynkio.Lock()
ASYNK_TASKS: set[asynkio.Task[Any]] = set()
LONG_TASK_NAME = "TASK_SIMPLE_NOTIFY"
PAGE_SIZE = 6
FMT_STRING = "%Y年%m月%d%H:%M:%S"
class NotifyMessage(BaseModel):
message: str
class Notify(BaseModel):
@ -39,14 +48,64 @@ class Notify(BaseModel):
notify_time: datetime.datetime
notify_msg: str
def get_str(self):
return f"{self.target}-{self.target_env}-{self.platform}-{self.notify_time}"
class NotifyConfigFile(BaseModel):
version: int = 2
notifies: list[Notify] = []
unsent: list[Notify] = []
notify_channels: dict[str, str] = {}
class NotifyPluginConfig(BaseModel):
plugin_notify_enable_ntfy: bool = False
plugin_notify_base_url: str = ""
plugin_notify_access_token: str = ""
plugin_notify_prefix: str = "kona-notice-"
config = get_plugin_config(NotifyPluginConfig)
async def send_notify_to_ntfy_instance(msg: str, channel: str):
if not config.plugin_notify_enable_ntfy:
return
url = f"{config.plugin_notify_base_url}/{channel}"
async with aiohttp.ClientSession() as session:
session.headers["Authorization"] = f"Bearer {config.plugin_notify_access_token}"
session.headers["Title"] = "🔔 此方 BOT 提醒"
async with session.post(url, data=msg) as response:
logger.info(f"访问 {url} 的结果是 {response.status}")
def _get_bot_of(_type: type[Bot]):
for bot in nonebot.get_bots().values():
if isinstance(bot, _type):
return bot.self_id
return ""
def get_target_from_notify(notify: Notify) -> LongTaskTarget:
if notify.platform == "console":
return LongTaskTarget(
platform="console",
self_id=_get_bot_of(CBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
if notify.platform == "discord":
return LongTaskTarget(
platform="discord",
self_id=_get_bot_of(DCBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
return LongTaskTarget(
platform="qq",
self_id=_get_bot_of(OBBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
def load_notify_config() -> NotifyConfigFile:
@ -63,71 +122,8 @@ def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
async def notify_now(notify: Notify):
if notify.platform == 'console':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'discord':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
channel = await bot.create_DM(recipient_id=int(notify.target))
await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'qq':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
if notify.target_env is None:
await bot.send_private_msg(
user_id=int(notify.target),
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
bot=bot,
)),
)
else:
await bot.send_group_msg(
group_id=int(notify.target_env),
message=cast(Any,
await UniMessage().at(
notify.target
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot)
),
)
else:
logger.warning(f"提醒未成功发送出去:{notify}")
return False
return True
async def create_notify_task(notify: Notify, fail2remove: bool = True):
async def mission():
begin_time = datetime.datetime.now()
if begin_time < notify.notify_time:
await asyncio.sleep((notify.notify_time - begin_time).total_seconds())
res = await notify_now(notify)
if fail2remove or res:
await DATA_FILE_LOCK.acquire()
cfg = load_notify_config()
cfg.notifies = [n for n in cfg.notifies if n.get_str() != notify.get_str()]
if not res:
cfg.unsent.append(notify)
save_notify_config(cfg)
DATA_FILE_LOCK.release()
else:
pass
return asyncio.create_task(mission())
@evt.handle()
async def _(msg: UniMsg, mEvt: Event):
async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget):
if mEvt.get_user_id() in nonebot.get_bots():
return
@ -140,55 +136,26 @@ async def _(msg: UniMsg, mEvt: Event):
return
notify_time, notify_text = segments
# target_time = get_target_time(notify_time)
try:
target_time = ptimeparse.parse(notify_time)
target_time = ptimeparse.Parser().parse(notify_time)
logger.info(f"{notify_time} 解析出了时间:{target_time}")
except Exception:
logger.info(f"无法从 {notify_time} 中解析出时间")
return
# if target_time is None:
# logger.info(f"无法从 {notify_time} 中解析出时间")
# return
if not notify_text:
return
await DATA_FILE_LOCK.acquire()
cfg = load_notify_config()
if isinstance(mEvt, ConsoleMessageEvent):
platform = "console"
target = mEvt.get_user_id()
target_env = None
elif isinstance(mEvt, OnebotV11MessageEvent):
platform = "qq"
target = mEvt.get_user_id()
if isinstance(mEvt, OnebotV11GroupMessageEvent):
target_env = str(mEvt.group_id)
else:
target_env = None
elif isinstance(mEvt, DiscordMessageEvent):
platform = "discord"
target = mEvt.get_user_id()
target_env = None
else:
logger.warning(f"Notify 遇到不支持的平台:{type(mEvt).__name__}")
return
notify = Notify(
platform=platform,
target=target,
target_env=target_env,
notify_time=target_time,
notify_msg=notify_text,
await create_longtask(
LONG_TASK_NAME,
{ "message": notify_text },
target,
target_time,
)
await create_notify_task(notify)
cfg.notifies.append(notify)
save_notify_config(cfg)
DATA_FILE_LOCK.release()
await evt.send(await UniMessage().at(mEvt.get_user_id()).text(
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export())
await target.send_message(
UniMessage().text(f"了解啦!将会在 {target_time.strftime(FMT_STRING)} 提醒你哦~")
)
logger.info(f"创建了一条于 {target_time} 的代办提醒")
driver = nonebot.get_driver()
@ -205,15 +172,158 @@ async def _():
NOTIFIED_FLAG["task_added"] = True
await asyncio.sleep(10)
DELTA = 2
logger.info(f"第一次探测到 Bot 连接,等待 {DELTA} 秒后开始通知")
await asynkio.sleep(DELTA)
await DATA_FILE_LOCK.acquire()
tasks = []
cfg = load_notify_config()
if cfg.version == 1:
logger.info("将配置文件的版本升级为 2")
cfg.version = 2
else:
for notify in cfg.notifies:
tasks.append(create_notify_task(notify, fail2remove=False))
for notify in [*cfg.notifies]:
await create_longtask(
handler=LONG_TASK_NAME,
data={ "message": notify.notify_msg },
target=get_target_from_notify(notify),
deadline=notify.notify_time,
)
cfg.notifies = []
save_notify_config(cfg)
DATA_FILE_LOCK.release()
await asyncio.gather(*tasks)
@handle_long_task("TASK_SIMPLE_NOTIFY")
async def _(task: LongTask):
message = task.data["message"]
await task.target.send_message(
UniMessage().text(f"代办提醒:{message}")
)
async with DATA_FILE_LOCK:
data = load_notify_config()
if (chan := data.notify_channels.get(task.target.target_id)) is not None:
await send_notify_to_ntfy_instance(message, chan)
save_notify_config(data)
USER_CHECKOUT_TASK_CACHE: dict[str, dict[str, str]] = {}
cmd_check_notify_list = on_alconna(Alconna(
"re:(?:我有哪些|查询)(?:提醒|代办)",
Args["page", int, 1]
))
@cmd_check_notify_list.handle()
async def _(page: int, target: DepLongTaskTarget):
if page <= 0:
await target.send_message(UniMessage().text("页数应该大于 0 吧"))
return
async with longtask_data() as data:
tasks = data.to_handle.get(LONG_TASK_NAME, {}).values()
tasks = [t for t in tasks if t.target.target_id == target.target_id]
tasks = sorted(tasks, key=lambda t: t.deadline)
pages = ceil(len(tasks) / PAGE_SIZE)
if page > pages:
await target.send_message(UniMessage().text(f"最多也就 {pages} 页啦!"))
tasks = tasks[(page - 1) * PAGE_SIZE: page * PAGE_SIZE]
message = "你可以输入「删除提醒 序号」来删除一个提醒\n====== 代办清单 ======\n\n"
to_cache = {}
if len(tasks) == 0:
message += "空空如也\n"
else:
for i, task in enumerate(tasks):
to_cache[str(i + 1)] = task.uuid
message += f"{i + 1}) {task.data['message']}{task.deadline.strftime(FMT_STRING)}\n"
message += f"\n==== 第 {page} 页,共 {pages} 页 ===="
USER_CHECKOUT_TASK_CACHE[target.target_id] = to_cache
await target.send_message(UniMessage().text(message))
cmd_remove_task = on_alconna(Alconna(
"re:删除(?:提醒|代办)",
Args["checker", str],
))
@cmd_remove_task.handle()
async def _(checker: str, target: DepLongTaskTarget):
if target.target_id not in USER_CHECKOUT_TASK_CACHE:
await target.send_message(UniMessage().text(
"先用「查询提醒」来查询你有哪些提醒吧"
))
return
if checker not in USER_CHECKOUT_TASK_CACHE[target.target_id]:
await target.send_message(UniMessage().text(
"没有这个任务哦,请检查一下吧"
))
uuid = USER_CHECKOUT_TASK_CACHE[target.target_id][checker]
async with longtask_data() as data:
if uuid not in data.to_handle[LONG_TASK_NAME]:
await target.send_message(UniMessage().text(
"似乎这个提醒已经发出去了,或者已经被删除"
))
return
_msg = data.to_handle[LONG_TASK_NAME][uuid].data["message"]
del data.to_handle[LONG_TASK_NAME][uuid]
await target.send_message(UniMessage().text(
f"成功取消了提醒:{_msg}"
))
cmd_notify_channel = on_alconna(Alconna(
"ntfy",
Subcommand("删除", dest="delete"),
Subcommand("创建", Args["notify_id?", str], dest="create"),
), rule=lambda: config.plugin_notify_enable_ntfy)
@cmd_notify_channel.assign("$main")
async def _(target: DepLongTaskTarget):
await target.send_message(UniMessage.text(
"配置 ntfy 通知:\n\n"
"- ntfy 创建: 启用 ntfy 通知,并为你随机生成一个通知渠道\n"
"- ntfy 删除:禁用 ntfy 通知\n"
))
@cmd_notify_channel.assign("create")
async def _(target: DepLongTaskTarget, notify_id: str = ""):
if notify_id == "":
notify_id = nanoid.generate(
alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz-",
size=16,
)
channel_name = f"{config.plugin_notify_prefix}{notify_id}"
async with DATA_FILE_LOCK:
data = load_notify_config()
data.notify_channels[target.target_id] = channel_name
save_notify_config(data)
await target.send_message(UniMessage.text(
f"了解!将会在 {channel_name} 为你提醒!\n"
"\n"
"食用教程:在你的手机端 / 网页端 ntfy 点击「订阅主题」,选择「使用其他服务器」,"
f"服务器填写 {config.plugin_notify_base_url} ,主题名填写 {channel_name}\n"
f"最后点击订阅,就能看到我给你发的消息啦!"
))
await send_notify_to_ntfy_instance(
"如果你看到这条消息,说明你已经成功订阅主题!此方 BOT 将会在这里提醒你你的代办!",
channel_name,
)
@cmd_notify_channel.assign("delete")
async def _(target: DepLongTaskTarget):
async with DATA_FILE_LOCK:
data = load_notify_config()
del data.notify_channels[target.target_id]
save_notify_config(data)
await target.send_message(UniMessage.text("ok."))

588
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -2,28 +2,29 @@
name = "konabot"
version = "0.1.0"
description = "在 MTTU 内部使用的 bot"
authors = [
{name = "passthem",email = "Passthem183@gmail.com"}
]
authors = [{ name = "passthem", email = "Passthem183@gmail.com" }]
readme = "README.md"
requires-python = ">=3.12,<4.0"
dependencies = [
"nonebot2[all] (>=2.4.3,<3.0.0)",
"nonebot-adapter-onebot (>=2.4.6,<3.0.0)",
"nonebot-adapter-console (>=0.9.0,<0.10.0)",
"nonebot-adapter-discord (>=0.1.8,<0.2.0)",
"nonebot-adapter-minecraft (>=1.5.2,<2.0.0)",
"nonebot-plugin-alconna (>=0.59.4,<0.60.0)",
"nonebot-plugin-apscheduler (>=0.5.0,<0.6.0)",
"requests (>=2.32.5,<3.0.0)",
"beautifulsoup4 (>=4.13.5,<5.0.0)",
"lxml (>=6.0.2,<7.0.0)",
"pillow (>=11.3.0,<12.0.0)",
"imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)",
"ptimeparse (>=0.1.1,<0.2.0)",
"skia-python (>=138.0,<139.0)",
"nonebot2[all] (>=2.4.3,<3.0.0)",
"nonebot-adapter-onebot (>=2.4.6,<3.0.0)",
"nonebot-adapter-console (>=0.9.0,<0.10.0)",
"nonebot-adapter-discord (>=0.1.8,<0.2.0)",
"nonebot-adapter-minecraft (>=1.5.2,<2.0.0)",
"nonebot-plugin-alconna (>=0.59.4,<0.60.0)",
"nonebot-plugin-apscheduler (>=0.5.0,<0.6.0)",
"requests (>=2.32.5,<3.0.0)",
"beautifulsoup4 (>=4.13.5,<5.0.0)",
"lxml (>=6.0.2,<7.0.0)",
"pillow (>=11.3.0,<12.0.0)",
"imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)",
"skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)",
"ptimeparse (>=0.2.1,<0.3.0)",
"nanoid (>=2.0.0,<3.0.0)",
]
[build-system]
@ -35,5 +36,9 @@ name = "pt-gitea-pypi"
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
priority = "supplemental"
[[tool.poetry.source]]
name = "mirrors"
url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
priority = "primary"
[tool.poetry.dependencies]
ptimeparse = {source = "pt-gitea-pypi"}