Compare commits

...

53 Commits

Author SHA1 Message Date
18785f034b 调整依赖,不再在运行时安装依赖
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 21:50:16 +08:00
7ba1a92623 解决依赖问题,容器体积什么的以后再修
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-24 20:40:05 +08:00
f6670eb672 先推上去看看有缺什么依赖
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-24 20:18:42 +08:00
eb32c1af9a new
Some checks failed
continuous-integration/drone/push Build is failing
2025-10-24 19:39:06 +08:00
e0c55545ec 添加此方提醒的 CURD 和 ntfy 联动
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 05:08:54 +08:00
164305e81f 调整 man 2025-10-24 02:27:56 +08:00
96679033f3 不再有 fortune
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 02:21:56 +08:00
afda0680ec 调整衰减函数
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:59:41 +08:00
021133954e 调整 man 默认范围
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:33:59 +08:00
7baa04dbc2 添加罗文提示 2025-10-24 01:33:01 +08:00
e55bdbdf4a 怪话不可为空!!!
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:27:40 +08:00
a30c7b8093 添加怪话过滤功能
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-24 01:21:54 +08:00
3da2c2266f 说怪话 bot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-24 01:08:46 +08:00
96e3c3fe17 让 Onebot private channel 也有 ID 2025-10-24 00:46:05 +08:00
851c9eb3c7 修复程序退出耗时太久的问题 2025-10-24 00:01:13 +08:00
11269b2a5a 在罗文被念错时提醒他 2025-10-23 23:32:56 +08:00
875e0efc2f Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-23 22:12:41 +08:00
4f43312663 升级 ptimeparse 2025-10-23 22:12:34 +08:00
b2f4768573 优化判定与计分规则
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 23:52:05 +08:00
bc6263ec31 Merge pull request '添加安安展示' (#27) from tnot/konabot:添加安安展示 into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #27
2025-10-21 22:07:19 +08:00
bc9d025836 修好了bug的安安展示 2025-10-21 22:02:41 +08:00
b552aacf89 添加安安展示 2025-10-21 21:33:32 +08:00
f9a0249772 优化 giftool 的截取逻辑
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-21 18:31:14 +08:00
c94db33b11 更新 ptimeparse 到 0.2.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 22:45:34 +08:00
67382a0c0a 在我写的模块采用更安全的 asyncio 锁写法
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 20:27:18 +08:00
fd4c9302c2 async with lock
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 20:24:47 +08:00
f30ad0cb7d 判定部分优化
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 18:48:10 +08:00
f7afe48680 精度修复
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 18:36:27 +08:00
b42385f780 修复成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 18:24:03 +08:00
6cae38dea9 提升 LongTask 的健壮性
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 16:54:59 +08:00
8594b59783 修复 LongTask 在 Discord 和控制台无法正确返回是否顺利完成任务的问题 2025-10-19 16:51:22 +08:00
f768c91430 完善 LongTask 模块
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-19 16:47:50 +08:00
a65cb118cc 接入我写的模块来获得群上下文
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 04:51:49 +08:00
75c6bbd23f Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot 2025-10-19 04:45:26 +08:00
aaf0a75d65 添加若干有用的小模块 2025-10-19 04:45:15 +08:00
8f560ce1ba 新成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 01:25:34 +08:00
9f3f79f51d 自动同意小团体的好友请求
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-17 01:04:34 +08:00
92048aeff7 让 wzq 东西在 wzq 群不可用 2025-10-17 00:54:14 +08:00
81aac10665 添加文档并修复问题
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 23:27:42 +08:00
3ce230adfe 优化卵总展示光影
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 22:43:54 +08:00
4f885554ca 添加卵总展示
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 22:29:07 +08:00
7ebcb8add4 Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 19:13:51 +08:00
e18cc82792 修复 av/bv 号无法直接被筛选读取的问题 2025-10-16 19:13:36 +08:00
eb28cd0a0c 更正 Giftool 错误的文档 2025-10-16 18:44:22 +08:00
2d688a6ed6 new
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-14 12:43:25 +08:00
e9aac52200 chengyu update 2025-10-14 01:23:49 +00:00
4305548ab5 submodule
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 22:53:44 +08:00
99382a3bf5 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 22:48:17 +08:00
92e43785bf submodule 2025-10-13 22:46:30 +08:00
fc5b11c5e8 调整 notify 的强制退出
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-13 22:16:50 +08:00
0ec66988fa 更新投票存储位置
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 22:05:21 +08:00
e5c3081c22 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
2025-10-13 22:02:44 +08:00
14b356120a 成语接龙 2025-10-13 22:02:33 +08:00
39 changed files with 363262 additions and 475 deletions

View File

@ -10,6 +10,10 @@ trigger:
- master
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: 构建 Docker 镜像
image: plugins/docker:latest
privileged: true
@ -50,6 +54,10 @@ trigger:
- tag
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: 构建并推送 Release Docker 镜像
image: plugins/docker:latest
privileged: true

View File

@ -1,22 +1,31 @@
# copied from https://www.martinrichards.me/post/python_poetry_docker/
FROM python:3.13-slim AS base
ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH"
PATH="/app/.venv/bin:$PATH" \
PLAYWRIGHT_BROWSERS_PATH=0
# 安装所有都需要的底层依赖
RUN apt-get update && \
apt-get install -y --no-install-recommends \
libfontconfig1 \
libgl1 \
libegl1 \
libglvnd0 \
mesa-vulkan-drivers \
libfontconfig1 libgl1 libegl1 libglvnd0 mesa-vulkan-drivers at-spi2-common fontconfig \
libasound2-data libavahi-client3 libavahi-common-data libavahi-common3 libdatrie1 \
libfontenc1 libfribidi0 libgraphite2-3 libharfbuzz0b libice6 libpixman-1-0 \
libsm6 libthai-data libthai0 libunwind8 libxaw7 libxcb-render0 libxfont2 libxi6 \
libxkbfile1 libxmu6 libxpm4 libxrender1 libxt6t64 x11-common x11-xkb-utils \
xfonts-encodings xfonts-utils xkb-data xserver-common libnspr4 libatk1.0-0t64 \
libatk-bridge2.0-0t64 libatspi2.0-0t64 libxcomposite1 libxdamage1 libxfixes3 \
libxkbcommon0 libasound2t64 libnss3 \
&& rm -rf /var/lib/apt/lists/*
FROM base AS builder
# 安装构建依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential cmake git \
&& rm -rf /var/lib/apt/lists/*
ENV POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_IN_PROJECT=1 \
POETRY_VIRTUALENVS_CREATE=1 \
@ -24,7 +33,7 @@ ENV POETRY_NO_INTERACTION=1 \
WORKDIR /app
RUN pip install poetry
RUN pip install --no-cache-dir poetry
COPY pyproject.toml poetry.lock ./
RUN python -m poetry install --no-root && rm -rf $POETRY_CACHE_DIR
@ -37,6 +46,8 @@ COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
WORKDIR /app
RUN python -m playwright install chromium
COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets
COPY scripts ./scripts

BIN
assets/img/dog/haha_dog.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 841 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 821 KiB

BIN
assets/img/meme/snaur_1_base.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

BIN
assets/img/meme/snaur_1_top.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1008 KiB

360393
assets/lexicon/common.txt Normal file

File diff suppressed because it is too large Load Diff

6
bot.py
View File

@ -20,9 +20,13 @@ env_enable_minecraft = os.environ.get("ENABLE_MINECRAFT", "none")
def main():
if env.upper() == 'DEBUG' or env.upper() == 'DEV':
console_log_level = 'DEBUG'
else:
console_log_level = 'INFO'
init_logger(LOG_PATH, [
BotExceptionMessage,
])
], console_log_level=console_log_level)
nonebot.init()

View File

@ -0,0 +1,36 @@
import asyncio
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Generic, TypeVar
from pydantic import BaseModel, ValidationError
T = TypeVar("T", bound=BaseModel)
class DataManager(Generic[T]):
def __init__(self, cls: type[T], fp: Path) -> None:
self.cls = cls
self.fp = fp
self._aio_lock = asyncio.Lock()
self._data: T | None = None
def load(self) -> T:
if not self.fp.exists():
return self.cls()
try:
return self.cls.model_validate_json(self.fp.read_text())
except ValidationError:
return self.cls()
def save(self, data: T):
self.fp.write_text(data.model_dump_json())
@asynccontextmanager
async def get_data(self):
await self._aio_lock.acquire()
self._data = self.load()
yield self._data
self.save(self._data)
self._data = None
self._aio_lock.release()

View File

@ -18,7 +18,7 @@ def file_exception_filter(
否则,返回 True允许记录
"""
exception_info = record.get("exception")
if exception_info:
exception_type = exception_info[0]
@ -29,8 +29,9 @@ def file_exception_filter(
def init_logger(
log_dir: Path,
ignored_exceptions: List[Type[Exception]]
log_dir: Path,
ignored_exceptions: List[Type[Exception]],
console_log_level: str = "INFO",
) -> None:
"""
配置全局 Loguru Logger。
@ -47,7 +48,7 @@ def init_logger(
logger.add(
sys.stderr,
level="INFO",
level=console_log_level,
colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
)
@ -76,4 +77,4 @@ def init_logger(
)
logger.info("Loguru Logger 初始化完成!")
logger.info(f"控制台日志级别: INFO")
logger.info(f"控制台日志级别: {console_log_level}")

295
konabot/common/longtask.py Normal file
View File

@ -0,0 +1,295 @@
from __future__ import annotations
from contextlib import asynccontextmanager
import datetime
import json
from typing import Annotated, Any, Callable, Coroutine, cast
import asyncio as asynkio
import uuid
from loguru import logger
import nonebot
from nonebot.params import Depends
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters import Bot as BaseBot
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot.adapters.onebot.v11 import GroupMessageEvent as OBGroupMessageEvent
from nonebot.adapters.onebot.v11 import PrivateMessageEvent as OBPrivateMessageEvent
from nonebot.adapters.console import Bot as ConsoleBot
from nonebot.adapters.console import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord import MessageEvent as DCMessageEvent
from nonebot.adapters.discord import Bot as DCBot
from nonebot_plugin_alconna import UniMessage
from pydantic import BaseModel, ValidationError
from .path import DATA_PATH
LONGTASK_DATA_DIR = DATA_PATH / "longtasks.json"
QQ_PRIVATE_CHAT_CHANNEL_PREFIX = "_CHANNEL_QQ_PRIVATE_"
class LongTaskTarget(BaseModel):
"""
用于定义长期任务的目标沟通对象,一般通过 DepLongTaskTarget 依赖注入获取:
```python
@cmd.handle()
async def _(target: DepLongTaskTarget):
...
```
"""
platform: str
"沟通对象所在的平台"
self_id: str
"进行沟通的对象自己的 ID"
channel_id: str
"沟通对象所在的群或者 Discord Channel。若为空则代表是私聊"
target_id: str
"沟通对象的 ID"
async def send_message(self, msg: UniMessage, at: bool = True) -> bool:
try:
bot = nonebot.get_bot(self.self_id)
except KeyError:
logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}")
return False
if self.platform == "qq":
if not isinstance(bot, OBBot):
logger.warning(
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
self.platform
} BOT_CLASS={bot.__class__.__name__}"
)
return False
if self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX) or not self.channel_id.strip():
# 私聊模式
await bot.send_private_msg(
user_id=int(self.target_id),
message=cast(Any, await msg.export(bot)),
auto_escape=False,
)
return True
else:
if at:
msg = UniMessage().at(self.target_id).text(" ") + msg
await bot.send_group_msg(
group_id=int(self.channel_id),
message=cast(Any, await msg.export(bot)),
auto_escape=False,
)
return True
if self.platform == "console":
if not isinstance(bot, ConsoleBot):
logger.warning(
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
self.platform
} BOT_CLASS={bot.__class__.__name__}"
)
return False
await bot.send_message(self.channel_id, cast(Any, await msg.export()))
return True
if self.platform == "discord":
if not isinstance(bot, DCBot):
logger.warning(
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
self.platform
} BOT_CLASS={bot.__class__.__name__}"
)
return False
await bot.send_to(
channel_id=int(self.channel_id),
message=cast(
Any, await (UniMessage().at(self.target_id) + msg).export()
),
tts=False,
)
return True
logger.warning(f"没有一个平台是期望的平台 PLATFORM={self.platform}")
return False
class LongTask(BaseModel):
uuid: str
data_json: str
target: LongTaskTarget
callback: str
deadline: datetime.datetime
_aio_task: asynkio.Task | None = None
async def run(self):
now = datetime.datetime.now()
if self.deadline < now:
await self._run_task()
return
await asynkio.sleep((self.deadline - now).total_seconds())
async with longtask_data() as data:
if self.uuid not in data.to_handle[self.callback]:
return
await self._run_task()
async def _run_task(self):
hdl = registered_long_task_handler.get(self.callback, None)
if hdl is None:
logger.warning(
f"Callback {self.callback} 未曾被注册,但是被期待调用,已忽略"
)
async with longtask_data() as datafile:
del datafile.to_handle[self.callback][self.uuid]
datafile.unhandled.setdefault(self.callback, []).append(self)
return
success = False
try:
await hdl(self)
success = True
except Exception as e:
logger.exception(e)
async with longtask_data() as datafile:
del datafile.to_handle[self.callback][self.uuid]
if not success:
datafile.unhandled.setdefault(self.callback, []).append(self)
logger.info(
f"LongTask 执行失败 UUID={self.uuid} callback={self.callback}"
)
else:
logger.info(
f"LongTask 工作完成 UUID={self.uuid} callback={self.callback}"
)
def clean(self):
self._aio_task = None
@property
def data(self):
return json.loads(self.data_json)
async def start(self):
self._aio_task = asynkio.Task(self.run())
self._aio_task.add_done_callback(lambda _: self.clean())
class LongTaskModuleData(BaseModel):
to_handle: dict[str, dict[str, LongTask]]
unhandled: dict[str, list[LongTask]]
async def get_long_task_target(event: BaseEvent, bot: BaseBot) -> LongTaskTarget | None:
if isinstance(event, OBGroupMessageEvent):
return LongTaskTarget(
platform="qq",
self_id=str(event.self_id),
channel_id=str(event.group_id),
target_id=str(event.user_id),
)
if isinstance(event, OBPrivateMessageEvent):
return LongTaskTarget(
platform="qq",
self_id=str(event.self_id),
channel_id=f"{QQ_PRIVATE_CHAT_CHANNEL_PREFIX}{event.self_id}",
target_id=str(event.user_id),
)
if isinstance(event, ConsoleMessageEvent):
return LongTaskTarget(
platform="console",
self_id=str(event.self_id),
channel_id=str(event.channel.id),
target_id=str(event.user.id),
)
if isinstance(event, DCMessageEvent):
self_id = ""
if isinstance(bot, DCBot):
self_id = str(bot.self_id)
return LongTaskTarget(
platform="discord",
self_id=self_id,
channel_id=str(event.channel_id),
target_id=str(event.user_id),
)
_TaskHandler = Callable[[LongTask], Coroutine[Any, Any, Any]]
registered_long_task_handler: dict[str, _TaskHandler] = {}
longtask_lock = asynkio.Lock()
def handle_long_task(callback_id: str):
def _decorator(func: _TaskHandler):
assert callback_id not in registered_long_task_handler, (
"有长任务的 ID 出现冲突,请换个名字!"
)
registered_long_task_handler[callback_id] = func
return func
return _decorator
def _load_longtask_data() -> LongTaskModuleData:
try:
txt = LONGTASK_DATA_DIR.read_text()
return LongTaskModuleData.model_validate_json(txt)
except (FileNotFoundError, ValidationError) as e:
logger.info(f"取得 LongTask 数据时出现问题:{e}")
return LongTaskModuleData(
to_handle={},
unhandled={},
)
def _save_longtask_data(data: LongTaskModuleData):
LONGTASK_DATA_DIR.write_text(data.model_dump_json())
@asynccontextmanager
async def longtask_data():
async with longtask_lock:
data = _load_longtask_data()
yield data
_save_longtask_data(data)
async def create_longtask(
handler: str,
data: dict[str, Any],
target: LongTaskTarget,
deadline: datetime.datetime,
):
task = LongTask(
uuid=str(uuid.uuid4()),
data_json=json.dumps(data),
target=target,
callback=handler,
deadline=deadline,
)
logger.info(f"创建了新的 LongTask UUID={task.uuid} CALLBACK={task.callback}")
await task.start()
async with longtask_data() as d:
d.to_handle.setdefault(handler, {})[task.uuid] = task
return task
async def init_longtask():
counter = 0
req: set[str] = set()
async with longtask_data() as data:
for v in data.to_handle.values():
for t in v.values():
await t.start()
counter += 1
req.add(t.callback)
logger.info(f"LongTask 启动了任务 数量={counter} 期望的门类=[{','.join(req)}]")
DepLongTaskTarget = Annotated[LongTaskTarget, Depends(get_long_task_target)]

View File

@ -0,0 +1,34 @@
from typing import cast
from nonebot import get_bot, get_plugin_config, logger
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from nonebot.rule import Rule
from pydantic import BaseModel
class WZQConflictConfig(BaseModel):
wzq_bot_qq: int = 0
config = get_plugin_config(WZQConflictConfig)
async def no_wzqbot(evt: BaseEvent):
if config.wzq_bot_qq <= 0:
return True
if not isinstance(evt, GroupMessageEvent):
return True
gid = evt.group_id
sid = evt.self_id
bot = cast(OnebotBot, get_bot(str(sid)))
members = await bot.get_group_member_list(group_id=gid)
members = set((m.get("user_id", -1) for m in members))
if config.wzq_bot_qq in members:
return False
return True
no_wzqbot_rule = Rule(no_wzqbot)

View File

@ -12,3 +12,10 @@ DOCS_PATH_MAN1 = DOCS_PATH / "user"
DOCS_PATH_MAN3 = DOCS_PATH / "lib"
DOCS_PATH_MAN7 = DOCS_PATH / "concepts"
DOCS_PATH_MAN8 = DOCS_PATH / "sys"
if not DATA_PATH.exists():
DATA_PATH.mkdir()
if not LOG_PATH.exists():
LOG_PATH.mkdir()

View File

@ -0,0 +1,86 @@
import asyncio
import queue
from loguru import logger
from playwright.async_api import async_playwright, Browser
class WebRenderer:
browser_pool: queue.Queue["WebRendererInstance"] = queue.Queue()
@classmethod
async def render(cls, url: str, target: str, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
'''
访问指定URL并返回截图
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
logger.debug(f"Requesting render for {url} targeting {target} with timeout {timeout}")
if cls.browser_pool.empty():
instance = await WebRendererInstance.create()
cls.browser_pool.put(instance)
instance = cls.browser_pool.get()
cls.browser_pool.put(instance)
logger.debug(f"Using WebRendererInstance {id(instance)} to render {url} targeting {target}")
return await instance.render(url, target, params=params, other_function=other_function, timeout=timeout)
class WebRendererInstance:
def __init__(self):
self.playwright = None
self.browser: Browser = None
self.lock: asyncio.Lock = None
@classmethod
async def create(cls) -> "WebRendererInstance":
instance = cls()
instance.playwright = await async_playwright().start()
instance.browser = await instance.playwright.chromium.launch(headless=True)
instance.lock = asyncio.Lock()
return instance
async def render(self, url: str, target: str, index: int = 0, params: dict = {}, other_function: callable = None, timeout: int = 30) -> bytes:
'''
访问指定URL并返回截图
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param index: 如果目标是一个列表,指定要截图的元素索引
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
'''
async with self.lock:
context = await self.browser.new_context()
page = await context.new_page()
logger.debug(f"Navigating to {url} with timeout {timeout}")
try:
url_with_params = url + ("?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else "")
await page.goto(url_with_params, timeout=timeout * 1000, wait_until="load")
logger.debug(f"Page loaded successfully")
# 等待目标元素出现
await page.wait_for_selector(target, timeout=timeout * 1000)
logger.debug(f"Target element '{target}' found, taking screenshot")
if other_function:
await other_function(page)
elements = await page.query_selector_all(target)
if not elements:
raise Exception(f"Target element '{target}' not found on the page.")
if index >= len(elements):
raise Exception(f"Index {index} out of range for elements matching '{target}'.")
element = elements[index]
screenshot = await element.screenshot()
logger.debug(f"Screenshot taken successfully")
return screenshot
finally:
await page.close()
await context.close()
async def close(self):
await self.browser.close()
await self.playwright.stop()

View File

@ -0,0 +1,2 @@
关于罗文和洛温:
AdoreLowen 希望和洛温阿特金森区分,所以最好就不要叫他洛温了!此方 BOT 会在一些群提醒叫错了的人。

View File

@ -45,7 +45,7 @@
- 帧数必须为正整数(> 0
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
--s <速度>(可选)
--speed <速度>(可选)
- 调整 gif 图的速度。若为负数,则代表倒放
使用方式

View File

@ -0,0 +1,15 @@
指令介绍
ntfy - 配置使用 ntfy 来更好地为你通知此方 BOT 代办
指令示例
`ntfy 创建`
创建一个随机的 ntfy 订阅主题来提醒代办,此方 Bot 将会给你使用指引。你可以前往 https://ntfy.sh/ 官网下载 ntfy APP或者使用网页版 ntfy。
`ntfy 创建 kagami-notice`
创建一个名字含有 kagami-notice 的 ntfy 订阅主题
`ntfy 删除`
清除并不再使用 ntfy 向你通知
另见
提醒我(1) 查询提醒(1) 删除提醒(1)

View File

@ -0,0 +1,8 @@
指令介绍
删除提醒 - 删除在`查询提醒(1)`中查到的提醒
指令示例
`删除提醒 1` 在查询提醒后,删除编号为 1 的提醒
另见
提醒我(1) 查询提醒(1) ntfy(1)

View File

@ -0,0 +1,20 @@
指令介绍
卵总展示 - 让卵总举起你的图片
格式
<引用图片> 卵总展示 [选项]
卵总展示 [选项] <图片>
选项
`--whiteness <number>` 白度
将原图进行指数变换,以调整它的白的程度,默认为 0.0
`--black-level <number>` 黑色等级
将原图减淡,数值越大越淡,范围 0.0-1.0,默认 0.2
`--opacity <number>` 不透明度
将你的图片叠放在图片上的不透明度,默认为 0.8
`--saturation <number>` 饱和度
调整原图的饱和度,应该要大于 0.0,默认为 0.85

View File

@ -0,0 +1,15 @@
指令介绍
提醒我 - 在指定的时间提醒人事项的工具
使用示例
`下午五点提醒我吃饭`
创建一个下午五点的提醒,提醒你吃饭
`两分钟后提醒我睡觉`
创建一个相对于现在推迟 2 分钟的提醒,提醒你睡觉
`2026年4月25日20点整提醒我生日快乐`
创建一个指定日期和时间的提醒
另见
查询提醒(1) 删除提醒(1) ntfy(1)

View File

@ -0,0 +1,9 @@
指令介绍
查询提醒 - 查询已经创建的提醒
指令格式
`查询提醒` 查询提醒
`查询提醒 2` 查询第二页提醒
另见
提醒我(1) 删除提醒(1) ntfy(1)

View File

@ -0,0 +1,25 @@
import asyncio
import random
from typing import cast
from loguru import logger
from nonebot import get_bot, on_request
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from konabot.common.nb.is_admin import cfg as adminConfig
add_request = on_request()
@add_request.handle()
async def _(req: FriendRequestEvent):
bot = cast(OnebotBot, get_bot(str(req.self_id)))
ok_member_ls: set[int] = set()
for group in adminConfig.admin_qq_group:
members = await bot.get_group_member_list(group_id=group)
ok_member_ls |= cast(set[int], set((m.get("user_id") for m in members)))
if req.user_id in ok_member_ls:
await asyncio.sleep(random.randint(5, 10))
await req.approve(bot)
logger.info(f"已经自动同意 {req.user_id} 的好友请求")

View File

@ -0,0 +1,39 @@
import re
from nonebot import on_message
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
matcher_fix = on_message()
pattern = (
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&amp;#93;|&#93;|\])哔哩哔哩).{0,500}"
)
@matcher_fix.handle()
async def _(msg: UniMsg, event: Event):
to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
return
from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event)
# b_url: str
# b_page: str | None
# b_time: str | None
#
# from nonebot_plugin_analysis_bilibili.analysis_bilibili import extract as bilibili_extract
#
# b_url, b_page, b_time = bilibili_extract(to_search)
# if b_url is None:
# return
#
# await matcher_fix.send(await UniMessage().text(b_url).export())

View File

@ -1,25 +0,0 @@
import re
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
matcher_fix = on_message()
pattern = (
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&amp;#93;|&#93;|\])哔哩哔哩).{0,500}"
)
@matcher_fix.handle()
async def _(msg: UniMsg, event: Event):
to_search = msg.exclude(Reply, Reference).dump(json=True)
if not re.search(pattern, to_search):
return
logger.info("检测到有 Bilibili 相关的消息,直接进行一个调用")
_module = __import__("nonebot_plugin_analysis_bilibili")
await _module.handle_analysis(event)

View File

@ -8,6 +8,8 @@ from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
async def download_img(url):
resp = requests.get(url.replace("https://multimedia.nt.qq","http://multimedia.nt.qq")) # bim获取QQ的图片时避免SSLv3报错
img_bytes = BytesIO()
@ -42,7 +44,7 @@ gqrc = on_alconna(Alconna(
missing_tips=lambda: "请输入你要转换为二维码的文字!"
)],
# UniMessage[]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"})
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"}, rule=no_wzqbot_rule)
@gqrc.handle()
async def _(saying: list):
@ -66,4 +68,4 @@ async def _(saying: list):
else:
"""
# genqr("\n".join(saying))
await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export())
await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export())

View File

@ -0,0 +1,217 @@
import random
from typing import Optional
import opencc
from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (
Alconna,
Args,
UniMessage,
UniMsg,
on_alconna,
)
convert_type = ["","","","","",""]
compiled_str = "|".join([f"{a}{mid}{b}" for mid in ["","",""] for a in convert_type for b in convert_type if a != b])
def hanzi_to_abbr(hanzi: str) -> str:
mapping = {
"": "s",
"": "s",
"": "t",
"": "t",
"": "hk",
"": "jp",
}
return mapping.get(hanzi, "")
def check_valid_convert_type(convert_type: str) -> bool:
avaliable_set = ["s2t","t2s","s2tw","tw2s","s2hk","hk2s","s2twp","tw2sp","t2tw","hk2t","t2hk","t2jp","jp2t","tw2t"]
if convert_type in avaliable_set:
return True
return False
def convert(source, src_abbr, dst_abbr):
convert_type_key = f"{src_abbr}2{dst_abbr}"
if not check_valid_convert_type(convert_type_key):
# 先转为繁体,再转为目标
converter = opencc.OpenCC(f"{src_abbr}2t.json")
source = converter.convert(source)
src_abbr = "t"
converter = opencc.OpenCC(f"{src_abbr}2{dst_abbr}.json")
converted = converter.convert(source)
return converted
evt = on_alconna(
Alconna(
f"re:({compiled_str})",
Args["source?", str],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent, source: Optional[str] = None):
if isinstance(event, DiscordMessageEvent):
content = event.get_message().extract_plain_text()
else:
content = event.get_message().extract_plain_text()
prefix = content.split()[0]
to_convert = ""
# 如果回复了消息,则转换回复的内容
if(source is None):
if event.reply:
to_convert = event.reply.message.extract_plain_text()
if not to_convert:
return
else:
return
else:
to_convert = source
parts = []
if "" in prefix:
parts = prefix.split("")
elif "" in prefix:
parts = prefix.split("")
elif "" in prefix:
parts = prefix.split("")
if len(parts) != 2:
notice = "转换格式错误,请使用“简转繁”、“繁转简”等格式。"
await evt.send(await UniMessage().text(notice).export())
return
src, dst = parts
src_abbr = hanzi_to_abbr(src)
dst_abbr = hanzi_to_abbr(dst)
if not src_abbr or not dst_abbr:
notice = "不支持的转换类型,请使用“简”、“繁”、“正”、“港”、“日”等。"
if src_abbr:
notice = convert(notice, "s", src_abbr)
await evt.send(await UniMessage().text(notice).export())
return
converted = convert(to_convert, src_abbr, dst_abbr)
converted_prefix = convert("转换结果", "s", dst_abbr)
await evt.send(await UniMessage().text(f"{converted_prefix}{converted}").export())
shuo = ["",""]
full_name_type = ["简体","簡體","繁體","繁体","正體","正体","港話","港话","日文"]
combined_list = [f"{a}{b}" for a in shuo for b in full_name_type]
compiled_str_2 = "|".join(combined_list)
evt = on_alconna(
Alconna(
f"re:({compiled_str_2})",
Args["source?", str]
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent, source: Optional[str] = None):
if isinstance(event, DiscordMessageEvent):
content = event.get_message().extract_plain_text()
else:
content = event.get_message().extract_plain_text()
prefix = content.split()[0]
to_convert = ""
# 如果回复了消息,则转换回复的内容
if(source is None):
if event.reply:
to_convert = event.reply.message.extract_plain_text()
if not to_convert:
return
else:
return
else:
to_convert = source
# 获取目标转换类型
dst = ""
match prefix:
case "说简体" | "說簡體" | "说簡體" | "說简体":
dst = ""
case "說繁體" | "说繁体" | "說繁体" | "说繁體":
dst = ""
case "說正體" | "说正体" | "說正体" | "说正體":
dst = ""
case "說港話" | "说港话" | "說港话" | "说港話":
dst = ""
case "說日文" | "说日文":
dst = ""
dst_abbr = hanzi_to_abbr(dst)
if not dst_abbr:
notice = "不支持的转换类型,请使用“简体”、“繁體”、“正體”、“港話”、“日文”等。"
await evt.send(await UniMessage().text(notice).export())
return
# 循环,将源语言一次次转换为目标语言
current_text = to_convert
for src_abbr in ["s","hk","jp","tw","t"]:
if src_abbr != dst_abbr:
current_text = convert(current_text, src_abbr, dst_abbr)
converted_prefix = convert("转换结果", "s", dst_abbr)
await evt.send(await UniMessage().text(f"{converted_prefix}{current_text}").export())
def random_char(char: str) -> str:
dst_abbr = random.choice(["s","t","hk","jp","tw"])
for src_abbr in ["s","hk","jp","tw","t"]:
if src_abbr != dst_abbr:
char = convert(char, src_abbr, dst_abbr)
return char
def random_string(text: str) -> str:
final_text = ""
for char in text:
final_text += random_char(char)
return final_text
random_match = ["混乱字形","混亂字形","乱数字形","亂數字形","ランダム字形"]
evt = on_alconna(
Alconna(
f"re:({'|'.join(random_match)})",
Args["source?", str]
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent, source: Optional[str] = None):
if isinstance(event, DiscordMessageEvent):
content = event.get_message().extract_plain_text()
else:
content = event.get_message().extract_plain_text()
prefix = content.split()[0]
to_convert = ""
# 如果回复了消息,则转换回复的内容
if(source is None):
if event.reply:
to_convert = event.reply.message.extract_plain_text()
if not to_convert:
return
else:
return
else:
to_convert = source
final_text = ""
final_text = random_string(to_convert)
converted_prefix = convert(random_string("转换结果"), "s", "s")
await evt.send(await UniMessage().text(f"{converted_prefix}{final_text}").export())

View File

@ -1,169 +1,565 @@
import base64
import secrets
import asyncio as asynkio
import datetime
import json
from typing import Literal
import secrets
from enum import Enum
from pathlib import Path
from typing import Optional
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
UniMessage, UniMsg, on_alconna)
from nonebot_plugin_alconna import (
Alconna,
Args,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
IDIOM_FIRST_CHAR = {} # 成语首字字典
INITED = False
def init_lexicon():
global ALL_WORDS, ALL_IDIOMS, IDIOM_FIRST_CHAR
# 成语大表
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
ALL_IDIOMS_INFOS = json.load(f)
# 词语大表
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
ALL_WORDS = json.load(f)
# 读取 THUOCL 成语库
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
DATA_FILE_PATH = (
Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
)
# 只有成语的大表
ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
return []
try:
return json.loads(DATA_FILE_PATH.read_text())
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
# 其他四字词语表,仅表示可以有这个词
ALL_WORDS = [word for word in ALL_WORDS if len(word) == 4] + THUOCL_WORDS
ALL_WORDS = list(set(ALL_WORDS)) # 去重
# 根据成语大表,划分出成语首字字典
IDIOM_FIRST_CHAR = {}
for idiom in ALL_IDIOMS + ALL_WORDS:
if idiom[0] not in IDIOM_FIRST_CHAR:
IDIOM_FIRST_CHAR[idiom[0]] = []
IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
def is_idiom_game_banned(group_id: str) -> bool:
banned_ids = load_banned_ids()
return group_id in banned_ids
NOW_PLAYING = False
SCORE_BOARD = {}
def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
LAST_CHAR = ""
USER_NAME_CACHE = {} # 缓存用户名称,避免多次获取
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
class TryStartState(Enum):
STARTED = 0
ALREADY_PLAYING = 1
NO_REMAINING_TIMES = 2
class TryStopState(Enum):
STOPPED = 0
NOT_PLAYING = 1
class TryVerifyState(Enum):
VERIFIED = 0
VERIFIED_AND_REAL = 1
NOT_IDIOM = 2
WRONG_FIRST_CHAR = 3
BUT_NO_NEXT = 4
GAME_END = 5
class IdiomGame:
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典
AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典
__inited = False
def __init__(self, group_id: str):
# 初始化一局游戏
self.group_id = ""
self.now_playing = False
self.score_board = {}
self.last_idiom = ""
self.last_char = ""
self.remain_playing_times = 3
self.last_play_date = ""
self.all_buff_score = 0
self.lock = asynkio.Lock()
self.remain_rounds = 0 # 剩余回合数
IdiomGame.INSTANCE_LIST[group_id] = self
def be_able_to_play(self) -> bool:
if self.last_play_date != datetime.date.today():
self.last_play_date = datetime.date.today()
self.remain_playing_times = 1
if self.remain_playing_times > 0:
self.remain_playing_times -= 1
return True
return False
def choose_start_idiom(self) -> str:
"""
随机选择一个成语作为起始成语
"""
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self.choose_start_idiom()
return self.last_idiom
@classmethod
def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState:
cls.init_lexicon()
if not cls.INSTANCE_LIST.get(group_id):
cls(group_id)
instance = cls.INSTANCE_LIST[group_id]
if instance.now_playing:
return TryStartState.ALREADY_PLAYING
if not instance.be_able_to_play() and not force:
return TryStartState.NO_REMAINING_TIMES
instance.now_playing = True
return TryStartState.STARTED
def start_game(self, rounds: int = 100):
self.now_playing = True
self.remain_rounds = rounds
self.choose_start_idiom()
@classmethod
def try_stop_game(cls, group_id: str) -> TryStopState:
if not cls.INSTANCE_LIST.get(group_id):
return TryStopState.NOT_PLAYING
instance = cls.INSTANCE_LIST[group_id]
if not instance.now_playing:
return TryStopState.NOT_PLAYING
instance.now_playing = False
return TryStopState.STOPPED
def clear_score_board(self):
self.score_board = {}
self.last_char = ""
def get_score_board(self) -> dict:
return self.score_board
def get_all_buff_score(self) -> int:
return self.all_buff_score
async def skip_idiom(self, buff_score: int = -100) -> str:
"""
跳过当前成语,选择下一个成语
"""
async with self.lock:
self._skip_idiom_async()
self.add_buff_score(buff_score)
return self.last_idiom
def _skip_idiom_async(self) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self._skip_idiom_async()
return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
"""
用户发送成语
"""
async with self.lock:
state = self._verify_idiom(idiom, user_id)
return state
def is_nextable(self, last_char: str) -> bool:
"""
判断是否有成语可以接
"""
return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR
def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
state = []
# 新成语的首字应与上一条成语的尾字相同
if idiom[0] != self.last_char:
state.append(TryVerifyState.WRONG_FIRST_CHAR)
return state
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
# 成语合法,更新状态
state.append(TryVerifyState.VERIFIED)
self.last_idiom = idiom
self.last_char = idiom[-1]
self.add_score(user_id, 1)
if idiom in IdiomGame.ALL_IDIOMS:
state.append(TryVerifyState.VERIFIED_AND_REAL)
self.add_score(user_id, 4) # 再加 4 分
self.remain_rounds -= 1
if self.remain_rounds <= 0:
self.now_playing = False
state.append(TryVerifyState.GAME_END)
if not self.is_nextable(self.last_char):
# 没有成语可以接了,自动跳过
self._skip_idiom_async()
state.append(TryVerifyState.BUT_NO_NEXT)
return state
def get_user_score(self, user_id: str) -> float:
if user_id not in self.score_board:
return 0
# 避免浮点数精度问题导致过长
handled_score = round(self.score_board[user_id]["score"], 1)
return handled_score
def add_score(self, user_id: str, score: int):
if user_id not in self.score_board:
self.score_board[user_id] = {"name": user_id, "score": 0}
self.score_board[user_id]["score"] += score
def add_buff_score(self, score: int):
self.all_buff_score += score
def get_playing_state(self) -> bool:
return self.now_playing
def get_last_char(self) -> str:
return self.last_char
@classmethod
def random_idiom_starting_with(cls, first_char: str) -> Optional[str]:
cls.init_lexicon()
if first_char not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
return None
return secrets.choice(cls.AVALIABLE_IDIOM_FIRST_CHAR[first_char])
@classmethod
def init_lexicon(cls):
if cls.__inited:
return
cls.__inited = True
# 成语大表
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
ALL_IDIOMS_INFOS = json.load(f)
# 词语大表
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
jsonData = json.load(f)
cls.ALL_WORDS = [item["ci"] for item in jsonData]
logger.debug(f"Loaded {len(cls.ALL_WORDS)} words from ci.json")
logger.debug(f"Sample words: {cls.ALL_WORDS[:5]}")
COMMON_WORDS = []
# 读取 COMMON 词语大表
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
COMMON_WORDS.append(word)
logger.debug(f"Loaded {len(COMMON_WORDS)} common words from common.txt")
logger.debug(f"Sample common words: {COMMON_WORDS[:5]}")
# 读取 THUOCL 成语库
with open(
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt",
"r",
encoding="utf-8",
) as f:
THUOCL_IDIOMS = [line.split(" ")[0].split("\t")[0].strip() for line in f]
logger.debug(f"Loaded {len(THUOCL_IDIOMS)} idioms from THUOCL_chengyu.txt")
logger.debug(f"Sample idioms: {THUOCL_IDIOMS[:5]}")
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename,
"r",
encoding="utf-8",
) as f:
for line in f:
word = line.lstrip().split(" ")[0].split("\t")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
logger.debug(f"Loaded {len(THUOCL_WORDS)} words from THUOCL txt files")
logger.debug(f"Sample words: {THUOCL_WORDS[:5]}")
# 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
# 其他四字词语表,仅表示可以有这个词
cls.ALL_WORDS = (
[word for word in cls.ALL_WORDS if len(word) == 4]
+ THUOCL_WORDS
+ COMMON_WORDS
)
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
# 根据成语大表,划分出成语首字字典
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
if idiom[0] not in cls.IDIOM_FIRST_CHAR:
cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
# 根据真正的成语大表,划分出有效成语首字字典
for idiom in cls.ALL_IDIOMS:
if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = []
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna(
Alconna(
"我要玩成语接龙",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
evt = on_alconna(Alconna(
"我要玩成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, LAST_CHAR, INITED
if not INITED:
init_lexicon()
INITED = True
if NOW_PLAYING:
await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export())
async def play_game(
event: BaseEvent,
target: DepLongTaskTarget,
force=False,
rounds: Optional[int] = 100,
):
# group_id = str(event.get_session_id())
group_id = target.channel_id
if is_idiom_game_banned(group_id):
await evt.send(
await UniMessage().text("本群已被禁止使用成语接龙功能!").export()
)
return
NOW_PLAYING = True
await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export())
# 选择一个随机成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
rounds = rounds or 0
if rounds <= 0:
await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export())
return
state = IdiomGame.try_start_game(group_id, force)
if state == TryStartState.ALREADY_PLAYING:
await evt.send(
await UniMessage()
.text("当前已有成语接龙游戏在进行中,请稍后再试!")
.export()
)
return
if state == TryStartState.NO_REMAINING_TIMES:
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
return
await evt.send(
await UniMessage()
.text(
"你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!"
)
.export()
)
instance = IdiomGame.INSTANCE_LIST[group_id]
instance.start_game(rounds)
# 发布成语
await evt.send(await UniMessage().text(f"第一个成语:「{idiom}」,请接!").export())
await evt.send(
await UniMessage()
.text(f"第一个成语:「{instance.last_idiom}」,请接!")
.export()
)
evt = on_alconna(
Alconna(
"老子就是要玩成语接龙!!!",
Args["rounds?", int],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
evt = on_alconna(Alconna(
"不玩了"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, SCORE_BOARD, LAST_CHAR
if NOW_PLAYING:
NOW_PLAYING = False
async def force_play_game(
event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100
):
await play_game(event, target, force=True, rounds=rounds)
async def end_game(event: BaseEvent, group_id: str):
instance = IdiomGame.INSTANCE_LIST[group_id]
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
score_board = instance.get_score_board()
if len(score_board) == 0:
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(
score_board.items(), key=lambda x: x[1]["score"], reverse=True
)
for i, (user_id, info) in enumerate(sorted_score):
result_text += (
f"{i + 1}. "
+ UniMessage().at(user_id)
+ f": {round(info['score'] + instance.get_all_buff_score(), 1)}\n"
)
await evt.send(await result_text.export())
instance.clear_score_board()
evt = on_alconna(
Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
state = IdiomGame.try_stop_game(group_id)
if state == TryStopState.STOPPED:
# 发送好吧狗图片
# 打开好吧狗本地文件
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
img_data = f.read()
await evt.send(await UniMessage().image(raw=img_data).export())
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
if len(SCORE_BOARD) == 0:
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(SCORE_BOARD.items(), key=lambda x: x[1]["score"], reverse=True)
for i, (user_id, info) in enumerate(sorted_score):
result_text += f"{i+1}. " + UniMessage().at(user_id) + f" - {info['score']}\n"
await evt.send(await result_text.export())
# 重置分数板
SCORE_BOARD = {}
LAST_CHAR = ""
await end_game(event, group_id)
else:
await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export())
await evt.send(
await UniMessage().text("当前没有成语接龙游戏在进行中!").export()
)
# 跳过
evt = on_alconna(Alconna(
"跳过成语"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
evt = on_alconna(
Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
)
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, LAST_CHAR
if not NOW_PLAYING:
async def _(target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
await evt.send(await UniMessage().text("你们太菜了全部扣100分").export())
for user_id in SCORE_BOARD:
SCORE_BOARD[user_id]["score"] -= 100
# 选择下一个成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export())
avaliable_idiom = IdiomGame.random_idiom_starting_with(instance.get_last_char())
# 发送哈哈狗图片
with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f:
img_data = f.read()
await evt.send(await UniMessage().image(raw=img_data).export())
await evt.send(await UniMessage().text(f"你们太菜了全部扣100分明明还可以接「{avaliable_idiom}」的!").export())
idiom = await instance.skip_idiom(-100)
await evt.send(
await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export()
)
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg):
global NOW_PLAYING, LAST_CHAR, SCORE_BOARD
if not NOW_PLAYING:
return
user_idiom = msg.extract_plain_text().strip()
if(user_idiom[0] != LAST_CHAR):
return
if(user_idiom not in ALL_IDIOMS and user_idiom not in ALL_WORDS):
await evt.send(await UniMessage().text("接不上!这个不一样!").export())
return
# 成功接上
def get_user_info(event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
return user_id, user_name
if user_id not in SCORE_BOARD:
SCORE_BOARD[user_id] = {
"name": user_name,
"score": 0
}
SCORE_BOARD[user_id]["score"] += 1
# at 指定玩家
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {SCORE_BOARD[user_id]['score']} 分!").export())
LAST_CHAR = user_idiom[-1]
await evt.send(await UniMessage().text(f"下一个成语请以「{LAST_CHAR}」开头!").export())
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
user_idiom = msg.extract_plain_text().strip()
user_id, user_name = get_user_info(event)
state = await instance.try_verify_idiom(user_idiom, user_id)
if TryVerifyState.WRONG_FIRST_CHAR in state:
return
if TryVerifyState.NOT_IDIOM in state:
await evt.send(
await UniMessage()
.at(user_id)
.text(" 接不上!这个不一样!你被扣了 0.1 分!")
.export()
)
return
if TryVerifyState.VERIFIED_AND_REAL in state:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
elif TryVerifyState.VERIFIED in state:
await evt.send(
await UniMessage()
.at(user_id)
.text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!")
.export()
)
if TryVerifyState.GAME_END in state:
await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id)
return
if TryVerifyState.BUT_NO_NEXT in state:
await evt.send(
await UniMessage()
.text("但是,这是条死路!你们全部都要扣 100 分!")
.export()
)
await evt.send(
await UniMessage().text(f"重新抽取成语「{instance.last_idiom}").export()
)
await evt.send(
await UniMessage()
.text(f"下一个成语请以「{instance.get_last_char()}」开头!")
.export()
)
evt = on_alconna(
Alconna("禁止成语接龙"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
add_banned_id(group_id)
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
evt = on_alconna(
Alconna("开启成语接龙"),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
# group_id = str(event.get_session_id())
group_id = target.channel_id
remove_banned_id(group_id)
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())

View File

@ -1,10 +1,10 @@
import re
from io import BytesIO
import PIL.Image
from nonebot import on_message
from nonebot.adapters import Bot
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
on_alconna)
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image
@ -29,15 +29,17 @@ def parse_timestamp(tx: str) -> float | None:
return res
cmd_giftool = on_alconna(Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
))
cmd_giftool = on_alconna(
Alconna(
"giftool",
Args["img", Image | None],
Option("--ss", Args["start_point", str]),
Option("--frames:v", Args["frame_count", int]),
Option("-t", Args["length", str]),
Option("-to", Args["end_point", str]),
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
)
)
@cmd_giftool.handle()
@ -80,81 +82,66 @@ async def _(
if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF")
frames = []
durations = []
total_duration = 0.0
##
# 从这里开始,采样整个 GIF 图
frames: list[PIL.Image.Image] = []
durations: list[float] = []
try:
for i in range(getattr(image, "n_frames")):
image.seek(i)
frames.append(image.copy())
duration = image.info.get("duration", 100) # 单位:毫秒
duration = image.info.get("duration", 100) / 1000
durations.append(duration)
total_duration += duration / 1000.0 # 转为秒
except EOFError:
pass
if not frames:
raise BotExceptionMessage("错误:读取 GIF 帧失败")
# 采样结束
def time_to_frame_index(target_time: float) -> int:
if target_time <= 0:
return 0
cum = 0.0
for idx, dur in enumerate(durations):
cum += dur / 1000.0
if cum >= target_time:
return min(idx, len(frames) - 1)
return len(frames) - 1
start_frame = 0
end_frame = len(frames) - 1
if ss is not None:
start_frame = time_to_frame_index(ss)
if to is not None:
end_frame = time_to_frame_index(to)
if end_frame < start_frame:
end_frame = start_frame
elif t is not None:
end_time = (ss or 0.0) + t
end_frame = time_to_frame_index(end_time)
if end_frame < start_frame:
end_frame = start_frame
##
# 根据开始、结束时间或者帧数量来裁取 GIF 图
start_frame = max(0, start_frame)
end_frame = min(len(frames) - 1, end_frame)
selected_frames = frames[start_frame : end_frame + 1]
selected_durations = durations[start_frame : end_frame + 1]
begin_time = ss or 0
end_time = sum(durations)
end_time = min(begin_time + (t or end_time), to or end_time, end_time)
if frame_count is not None and frame_count > 0:
if frame_count >= len(selected_frames):
pass
else:
step = len(selected_frames) / frame_count
sampled_frames = []
sampled_durations = []
for i in range(frame_count):
idx = int(i * step)
sampled_frames.append(selected_frames[idx])
sampled_durations.append(
sum(selected_durations) // len(selected_durations)
)
selected_frames = sampled_frames
selected_durations = sampled_durations
accumulated = 0.0
status = 0
output_img = BytesIO()
sel_frames: list[PIL.Image.Image] = []
sel_durations: list[float] = []
adjusted_durations = [
dur / speed_factor for dur in selected_durations
]
for i in range(len(frames)):
frame = frames[i]
duration = durations[i]
if status == 0:
if accumulated + duration > begin_time:
status = 1
sel_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
elif status == 1:
if accumulated + duration > end_time:
sel_frames.append(frame)
sel_durations.append(end_time - accumulated)
break
sel_frames.append(frame)
sel_durations.append(duration)
accumulated += duration
##
# 加速!
sel_durations = [dur / speed_factor * 1000 for dur in durations]
rframes = []
rdur = []
acc_mod_20 = 0
for i in range(len(selected_frames)):
fr = selected_frames[i]
du: float = adjusted_durations[i]
for i in range(len(sel_frames)):
fr = sel_frames[i]
du = round(sel_durations[i])
if du >= 20:
rframes.append(fr)
@ -170,10 +157,12 @@ async def _(
if acc_mod_20 >= 20:
acc_mod_20 = 0
if len(rframes) == 1 and len(selected_frames) > 1:
rframes.append(selected_frames[max(2, len(selected_frames) // 2)])
if len(rframes) == 1 and len(sel_frames) > 1:
rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
rdur.append(20)
##
# 收尾:看看透明度这块
transparency_flag = False
for f in rframes:
if f.mode == "RGBA":
@ -186,12 +175,13 @@ async def _(
tf = {}
if transparency_flag:
tf['transparency'] = 0
tf["transparency"] = 0
if is_rev:
rframes = rframes[::-1]
rdur = rdur[::-1]
output_img = BytesIO()
if rframes:
rframes[0].save(
output_img,

View File

@ -0,0 +1,47 @@
import asyncio
# import datetime
from loguru import logger
import nonebot
# from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import (
# DepLongTaskTarget,
# LongTask,
# create_longtask,
# handle_long_task,
init_longtask,
)
driver = nonebot.get_driver()
INIT_FLAG = {"flag": False}
@driver.on_bot_connect
async def _():
if INIT_FLAG["flag"]:
return
INIT_FLAG["flag"] = True
logger.info("有 Bot 连接,等待 5 秒后初始化 LongTask 模块")
await asyncio.sleep(5)
await init_longtask()
logger.info("LongTask 初始化完成")
# cmd1 = nonebot.on_command("test114")
#
#
# @handle_long_task("test_callback_001")
# async def _(lt: LongTask):
# await lt.target.send_message(UniMessage().text("Hello, world!"), at=True)
#
#
# @cmd1.handle()
# async def _(target: DepLongTaskTarget):
# await create_longtask(
# handler="test_callback_001",
# data={},
# target=target,
# deadline=datetime.datetime.now() + datetime.timedelta(seconds=20),
# )

View File

@ -53,7 +53,7 @@ async def _(
if doc is None:
# 检索模式
if section is None:
section_set = {1}
section_set = {1, 7}
else:
section_set = {section}
if 1 in section_set and is_admin(event):
@ -75,7 +75,7 @@ async def _(
else:
# 查阅模式
if section is None:
section_set = {1}
section_set = {1, 7}
else:
section_set = {section}
if 1 in section_set and is_admin(event):

View File

@ -2,25 +2,52 @@ from io import BytesIO
from typing import Iterable, cast
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
UniMessage, UniMsg, on_alconna)
from nonebot_plugin_alconna import (
Alconna,
Args,
Field,
Image,
MultiVar,
Option,
Text,
UniMessage,
UniMsg,
on_alconna,
)
from konabot.common.nb.extract_image import extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
draw_geimao, draw_mnk,
draw_pt, draw_suan)
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.plugins.memepack.drawing.display import (
draw_cao_display,
draw_snaur_display,
draw_anan_display,
)
from konabot.plugins.memepack.drawing.saying import (
draw_cute_ten,
draw_geimao,
draw_mnk,
draw_pt,
draw_suan,
)
from nonebot.adapters import Bot, Event
from returns.result import Success, Failure
geimao = on_alconna(Alconna(
"给猫说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写给猫说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"给猫哈"})
geimao = on_alconna(
Alconna(
"给猫说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写给猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"给猫哈"},
)
@geimao.handle()
async def _(saying: list[str]):
@ -31,12 +58,21 @@ async def _(saying: list[str]):
await geimao.send(await UniMessage().image(raw=img_bytes).export())
pt = on_alconna(Alconna(
"pt说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写小帕说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"小帕说"})
pt = on_alconna(
Alconna(
"pt说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小帕说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"小帕说"},
)
@pt.handle()
async def _(saying: list[str]):
@ -47,12 +83,21 @@ async def _(saying: list[str]):
await pt.send(await UniMessage().image(raw=img_bytes).export())
mnk = on_alconna(Alconna(
"re:小?黑白子?说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写黑白子说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
mnk = on_alconna(
Alconna(
"re:小?黑白子?说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写黑白子说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"mnk说"},
)
@mnk.handle()
async def _(saying: list[str]):
@ -63,12 +108,21 @@ async def _(saying: list[str]):
await mnk.send(await UniMessage().image(raw=img_bytes).export())
suan = on_alconna(Alconna(
"小蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写小蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
suan = on_alconna(
Alconna(
"小蒜说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写小蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@suan.handle()
async def _(saying: list[str]):
@ -79,12 +133,21 @@ async def _(saying: list[str]):
await suan.send(await UniMessage().image(raw=img_bytes).export())
dsuan = on_alconna(Alconna(
"大蒜说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写大蒜说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
dsuan = on_alconna(
Alconna(
"大蒜说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写大蒜说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases=set(),
)
@dsuan.handle()
async def _(saying: list[str]):
@ -95,12 +158,21 @@ async def _(saying: list[str]):
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
cutecat = on_alconna(Alconna(
"乖猫说",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "你没有写十猫说了什么"
)]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"})
cutecat = on_alconna(
Alconna(
"乖猫说",
Args[
"saying",
MultiVar(str, "+"),
Field(missing_tips=lambda: "你没有写十猫说了什么"),
],
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=False,
aliases={"十猫说"},
)
@cutecat.handle()
async def _(saying: list[str]):
@ -113,13 +185,14 @@ async def _(saying: list[str]):
cao_display_cmd = on_message()
@cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示":
flag = True
elif text.text.strip() == '':
elif text.text.strip() == "":
continue
else:
return
@ -134,8 +207,71 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
case Failure(err):
await cao_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(' ')
.text(err)
.export()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)
snaur_display_cmd = on_alconna(
Alconna(
"卵总展示",
Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--opacity", Args["opacity", float], alias=["-o"]),
Option("--saturation", Args["saturation", float], alias=["-s"]),
Args["image", Image | None],
)
)
@snaur_display_cmd.handle()
async def _(
img: PIL_Image,
whiteness: float = 0.0,
black_level: float = 0.2,
opacity: float = 0.8,
saturation: float = 0.85,
):
img_processed = await draw_snaur_display(
img,
whiteness,
black_level,
opacity,
saturation,
)
img_data = BytesIO()
img_processed.save(img_data, "PNG")
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())
anan_display_cmd = on_message()
@anan_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
stripped = text.text.strip()
if stripped == "安安展示":
flag = True
elif stripped == "":
continue
else:
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_anan_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await anan_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)

View File

@ -4,10 +4,12 @@ from typing import Any, cast
import cv2
import numpy as np
import PIL.Image
import PIL.ImageChops
import PIL.ImageEnhance
from konabot.common.path import ASSETS_PATH
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
CAO_QUAD_POINTS = np.float32(cast(Any, [
[392, 540],
[577, 557],
@ -15,6 +17,25 @@ CAO_QUAD_POINTS = np.float32(cast(Any, [
[381, 687],
]))
snaur_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_base.png")
snaur_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_top.png")
SNAUR_RATIO = (1 / 2) ** .5
SNAUR_QUAD_POINTS = np.float32(cast(Any, [
[0, 466 ],
[673, 471 ],
[640, 1196],
[106, 1280],
]))
anan_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_base.png")
anan_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_top.png")
ANAN_QUAD_POINTS = np.float32([
[157, 585],
[793, 599],
[781, 908],
[160, 908]
])
def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB"))
h, w = src.shape[:2]
@ -43,3 +64,136 @@ def _draw_cao_display(image: PIL.Image.Image):
async def draw_cao_display(image: PIL.Image.Image):
return await asyncio.to_thread(_draw_cao_display, image)
def _draw_snaur_display(
image : PIL.Image.Image,
whiteness : float = 0.0 ,
black_level: float = 0.2 ,
opacity : float = 0.8 ,
saturation : float = 0.85 ,
):
src = np.array(image.convert("RGBA"))
_h, _w = src.shape[:2]
if _w / _h < SNAUR_RATIO:
_w_target = _w
_h_target = int(_w / SNAUR_RATIO)
else:
_w_target = int(_h * SNAUR_RATIO)
_h_target = _h
x_center = _w / 2
y_center = _h / 2
x1 = int(x_center - _w_target / 2)
x2 = int(x_center + _w_target / 2)
y1 = int(y_center - _h_target / 2)
y2 = int(y_center + _h_target / 2)
src = src[y1:y2, x1:x2, :]
h, w = src.shape[:2]
src_points = np.float32(cast(Any, [
[0, 0],
[w, 0],
[w, h],
[0, h],
]))
dst_points = SNAUR_QUAD_POINTS
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
output_size = snaur_image_top.size
output_w, output_h = output_size
warped = cv2.warpPerspective(
src,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
result = PIL.Image.fromarray(warped, 'RGBA')
r, g, b, a = result.split()
a = a.point(lambda p: int(p * opacity))
f2 = lambda p: int(
((p / 255) ** (2 ** whiteness)) * 255 * (1 - black_level)
+ 255 * black_level
)
r = r.point(f2)
g = g.point(f2)
b = b.point(f2)
result = PIL.Image.merge('RGBA', (r, g, b, a))
enhancer = PIL.ImageEnhance.Color(result)
result = enhancer.enhance(saturation)
result = PIL.ImageChops.multiply(result, snaur_image_base)
result = PIL.Image.alpha_composite(snaur_image_base, result)
result = PIL.Image.alpha_composite(result, snaur_image_top)
return result
async def draw_snaur_display(
image : PIL.Image.Image,
whiteness : float = 0.0 ,
black_level: float = 0.2 ,
opacity : float = 0.8 ,
saturation : float = 0.85 ,
) -> PIL.Image.Image:
return await asyncio.to_thread(
_draw_snaur_display, image, whiteness, black_level,
opacity, saturation,
)
def _draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
src = np.array(image.convert("RGBA"))
h, w = src.shape[:2]
src_points = np.float32([
[0, 0],
[w, 0],
[w, h],
[0, h]
])
dst_points = ANAN_QUAD_POINTS
M = cv2.getPerspectiveTransform(src_points, dst_points)
output_w, output_h = anan_image_top.size
src_rgb = cv2.cvtColor(src, cv2.COLOR_RGBA2RGB) if src.shape[2] == 4 else src
warped_rgb = cv2.warpPerspective(
src_rgb,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
mask = np.zeros((h, w), dtype=np.uint8)
mask[:, :] = 255
warped_mask = cv2.warpPerspective(
mask,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0
)
warped_rgba = cv2.cvtColor(warped_rgb, cv2.COLOR_RGB2RGBA)
warped_rgba[:, :, 3] = warped_mask
warped_pil = PIL.Image.fromarray(warped_rgba, 'RGBA')
result = PIL.Image.alpha_composite(anan_image_base, warped_pil)
result = PIL.Image.alpha_composite(result, anan_image_top)
return result
async def draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
return await asyncio.to_thread(_draw_anan_display, image)

View File

@ -0,0 +1,44 @@
import nonebot
from nonebot.adapters.onebot.v11.bot import Bot
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from nonebot_plugin_alconna import UniMsg, UniMessage
from pydantic import BaseModel
class NoLuowenConfig(BaseModel):
plugin_noluowen_qqid: int = -1
plugin_noluowen_enable_group: list[int] = []
config = nonebot.get_plugin_config(NoLuowenConfig)
async def is_luowen_mentioned(evt: GroupMessageEvent, msg: UniMsg) -> bool:
if config.plugin_noluowen_qqid <= 0:
return False
if evt.user_id == config.plugin_noluowen_qqid:
return False
if evt.group_id not in config.plugin_noluowen_enable_group:
return False
txt = msg.extract_plain_text()
if "洛温" not in txt:
return False
if "罗文" in txt:
return False
if "阿特金森" in txt:
return False
return True
evt_luowen_mentioned = nonebot.on_message(rule=is_luowen_mentioned)
@evt_luowen_mentioned.handle()
async def _(evt: GroupMessageEvent, bot: Bot):
msg = (
UniMessage()
.reply(str(evt.message_id))
.at(str(config.plugin_noluowen_qqid))
.text(" 好像有人念错了你的 ID")
)
await evt_luowen_mentioned.send(await msg.export(bot=bot))

View File

@ -1,16 +1,21 @@
import json, time
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply
from nonebot.rule import Rule
from nonebot_plugin_alconna import Alconna, Args, Field, MultiVar, on_alconna
from nonebot.adapters.onebot.v11 import Event
poll_json_path = "assets/json/poll.json"
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
from konabot.common.path import ASSETS_PATH, DATA_PATH
poll_file = open(poll_json_path,"r",encoding="utf-8")
poll_list_raw = poll_file.read()
poll_file.close()
poll_list = json.loads(poll_list_raw)['poll']
POLL_TEMPLATE_FILE = ASSETS_PATH / "json" / "poll.json"
POLL_DATA_FILE = DATA_PATH / "poll.json"
if not POLL_DATA_FILE.exists():
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
async def createpoll(title,qqid,options):
polllength = len(poll_list)
@ -44,8 +49,11 @@ def getpolldata(pollid_or_title):
return [thepoll,polnum]
def writeback():
file = open(poll_json_path,"w",encoding="utf-8")
json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
# file = open(poll_json_path,"w",encoding="utf-8")
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
POLL_DATA_FILE.write_text(json.dumps({
'poll': poll_list,
}, ensure_ascii=False, sort_keys=True))
async def pollvote(polnum,optionnum,qqnum):
optiond = poll_list[polnum]["polldata"]
@ -60,7 +68,7 @@ poll = on_alconna(Alconna(
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:发起投票 <投票标题> <选项1> <选项2> ..."
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"})
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"}, rule=no_wzqbot_rule)
@poll.handle()
async def _(saying: list, event: Event):
if (len(saying) < 3):
@ -82,7 +90,7 @@ viewpoll = on_alconna(Alconna(
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "请指定投票ID或标题。用法查看投票 <投票ID或标题>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"})
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"}, rule=no_wzqbot_rule)
@viewpoll.handle()
async def _(saying: list):
# 参数投票ID或者标题
@ -124,7 +132,7 @@ vote = on_alconna(Alconna(
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:投票 <投票ID/标题> <选项文本>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"})
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"}, rule=no_wzqbot_rule)
@vote.handle()
async def _(saying: list, event: Event):
if (len(saying) < 2):
@ -157,4 +165,4 @@ async def _(saying: list, event: Event):
# 写入项目
else:
await pollvote(polnum,optionnum,event.get_user_id())
await viewpoll.send("投票成功!你投给了 "+saying[1])
await viewpoll.send("投票成功!你投给了 "+saying[1])

View File

@ -1,30 +1,39 @@
import asyncio
import aiohttp
import asyncio as asynkio
import datetime
from math import ceil
from pathlib import Path
from typing import Any, Literal, cast
from typing import Any, Literal
import nanoid
import nonebot
import ptimeparse
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event
from nonebot.adapters.console import Bot as ConsoleBot
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
from nonebot.adapters.onebot.v11.event import \
GroupMessageEvent as OnebotV11GroupMessageEvent
from nonebot.adapters.onebot.v11.event import \
MessageEvent as OnebotV11MessageEvent
from nonebot_plugin_alconna import UniMessage, UniMsg
from nonebot import get_plugin_config, on_message
from nonebot.adapters import Bot, Event
from nonebot.adapters.onebot.v11 import Bot as OBBot
from nonebot.adapters.console import Bot as CBot
from nonebot.adapters.discord import Bot as DCBot
from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg, on_alconna
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget, LongTask, LongTaskTarget, create_longtask, handle_long_task, longtask_data
evt = on_message()
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json"
DATA_FILE_LOCK = asyncio.Lock()
DATA_FILE_LOCK = asynkio.Lock()
ASYNK_TASKS: set[asynkio.Task[Any]] = set()
LONG_TASK_NAME = "TASK_SIMPLE_NOTIFY"
PAGE_SIZE = 6
FMT_STRING = "%Y年%m月%d%H:%M:%S"
class NotifyMessage(BaseModel):
message: str
class Notify(BaseModel):
@ -39,14 +48,64 @@ class Notify(BaseModel):
notify_time: datetime.datetime
notify_msg: str
def get_str(self):
return f"{self.target}-{self.target_env}-{self.platform}-{self.notify_time}"
class NotifyConfigFile(BaseModel):
version: int = 2
notifies: list[Notify] = []
unsent: list[Notify] = []
notify_channels: dict[str, str] = {}
class NotifyPluginConfig(BaseModel):
plugin_notify_enable_ntfy: bool = False
plugin_notify_base_url: str = ""
plugin_notify_access_token: str = ""
plugin_notify_prefix: str = "kona-notice-"
config = get_plugin_config(NotifyPluginConfig)
async def send_notify_to_ntfy_instance(msg: str, channel: str):
if not config.plugin_notify_enable_ntfy:
return
url = f"{config.plugin_notify_base_url}/{channel}"
async with aiohttp.ClientSession() as session:
session.headers["Authorization"] = f"Bearer {config.plugin_notify_access_token}"
session.headers["Title"] = "🔔 此方 BOT 提醒"
async with session.post(url, data=msg) as response:
logger.info(f"访问 {url} 的结果是 {response.status}")
def _get_bot_of(_type: type[Bot]):
for bot in nonebot.get_bots().values():
if isinstance(bot, _type):
return bot.self_id
return ""
def get_target_from_notify(notify: Notify) -> LongTaskTarget:
if notify.platform == "console":
return LongTaskTarget(
platform="console",
self_id=_get_bot_of(CBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
if notify.platform == "discord":
return LongTaskTarget(
platform="discord",
self_id=_get_bot_of(DCBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
return LongTaskTarget(
platform="qq",
self_id=_get_bot_of(OBBot),
channel_id=notify.target_env or "",
target_id=notify.target,
)
def load_notify_config() -> NotifyConfigFile:
@ -63,76 +122,8 @@ def save_notify_config(config: NotifyConfigFile):
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
async def notify_now(notify: Notify):
if notify.platform == 'console':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'discord':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
channel = await bot.create_DM(recipient_id=int(notify.target))
await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}")
elif notify.platform == 'qq':
bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)]
if len(bot) != 1:
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
return False
bot = bot[0]
if notify.target_env is None:
await bot.send_private_msg(
user_id=int(notify.target),
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
bot=bot,
)),
)
else:
await bot.send_group_msg(
group_id=int(notify.target_env),
message=cast(Any,
await UniMessage().at(
notify.target
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot)
),
)
else:
logger.warning(f"提醒未成功发送出去:{notify}")
return False
return True
def create_notify_task(notify: Notify, fail2remove: bool = True):
async def mission():
begin_time = datetime.datetime.now()
if begin_time < notify.notify_time:
await asyncio.sleep((notify.notify_time - begin_time).total_seconds())
else:
logger.warning(
f"期望在 {notify.notify_time} 在平台 {notify.platform} {notify.target_env}"
f" {notify.target} 的代办通知 {notify.notify_msg} 已经超时,将会直接通知!"
)
res = await notify_now(notify)
if fail2remove or res:
await DATA_FILE_LOCK.acquire()
cfg = load_notify_config()
cfg.notifies = [n for n in cfg.notifies if n.get_str() != notify.get_str()]
if not res:
cfg.unsent.append(notify)
save_notify_config(cfg)
DATA_FILE_LOCK.release()
else:
pass
return asyncio.create_task(mission())
@evt.handle()
async def _(msg: UniMsg, mEvt: Event):
async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget):
if mEvt.get_user_id() in nonebot.get_bots():
return
@ -145,57 +136,26 @@ async def _(msg: UniMsg, mEvt: Event):
return
notify_time, notify_text = segments
# target_time = get_target_time(notify_time)
try:
target_time = ptimeparse.parse(notify_time)
target_time = ptimeparse.Parser().parse(notify_time)
logger.info(f"{notify_time} 解析出了时间:{target_time}")
except Exception:
logger.info(f"无法从 {notify_time} 中解析出时间")
return
# if target_time is None:
# logger.info(f"无法从 {notify_time} 中解析出时间")
# return
if not notify_text:
return
await DATA_FILE_LOCK.acquire()
cfg = load_notify_config()
if isinstance(mEvt, ConsoleMessageEvent):
platform = "console"
target = mEvt.get_user_id()
target_env = None
elif isinstance(mEvt, OnebotV11MessageEvent):
platform = "qq"
target = mEvt.get_user_id()
if isinstance(mEvt, OnebotV11GroupMessageEvent):
target_env = str(mEvt.group_id)
else:
target_env = None
elif isinstance(mEvt, DiscordMessageEvent):
platform = "discord"
target = mEvt.get_user_id()
target_env = None
else:
logger.warning(f"Notify 遇到不支持的平台:{type(mEvt).__name__}")
return
notify = Notify(
platform=platform,
target=target,
target_env=target_env,
notify_time=target_time,
notify_msg=notify_text,
await create_longtask(
LONG_TASK_NAME,
{ "message": notify_text },
target,
target_time,
)
create_notify_task(notify)
cfg.notifies.append(notify)
save_notify_config(cfg)
DATA_FILE_LOCK.release()
await evt.send(await UniMessage().at(mEvt.get_user_id()).text(
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export())
logger.info(f"创建了一条于 {notify.notify_time} 的代办提醒")
await target.send_message(
UniMessage().text(f"了解啦!将会在 {target_time.strftime(FMT_STRING)} 提醒你哦~")
)
logger.info(f"创建了一条于 {target_time} 的代办提醒")
driver = nonebot.get_driver()
@ -214,24 +174,156 @@ async def _():
DELTA = 2
logger.info(f"第一次探测到 Bot 连接,等待 {DELTA} 秒后开始通知")
await asyncio.sleep(DELTA)
await asynkio.sleep(DELTA)
await DATA_FILE_LOCK.acquire()
tasks: set[asyncio.Task[Any]] = set()
cfg = load_notify_config()
if cfg.version == 1:
logger.info("将配置文件的版本升级为 2")
cfg.version = 2
else:
counter = 0
for notify in [*cfg.notifies]:
task = create_notify_task(notify, fail2remove=False)
tasks.add(task)
task.add_done_callback(lambda self: tasks.remove(self))
counter += 1
logger.info(f"成功创建了 {counter} 条代办事项")
await create_longtask(
handler=LONG_TASK_NAME,
data={ "message": notify.notify_msg },
target=get_target_from_notify(notify),
deadline=notify.notify_time,
)
cfg.notifies = []
save_notify_config(cfg)
DATA_FILE_LOCK.release()
await asyncio.gather(*tasks)
@handle_long_task("TASK_SIMPLE_NOTIFY")
async def _(task: LongTask):
message = task.data["message"]
await task.target.send_message(
UniMessage().text(f"代办提醒:{message}")
)
async with DATA_FILE_LOCK:
data = load_notify_config()
if (chan := data.notify_channels.get(task.target.target_id)) is not None:
await send_notify_to_ntfy_instance(message, chan)
save_notify_config(data)
USER_CHECKOUT_TASK_CACHE: dict[str, dict[str, str]] = {}
cmd_check_notify_list = on_alconna(Alconna(
"re:(?:我有哪些|查询)(?:提醒|代办)",
Args["page", int, 1]
))
@cmd_check_notify_list.handle()
async def _(page: int, target: DepLongTaskTarget):
if page <= 0:
await target.send_message(UniMessage().text("页数应该大于 0 吧"))
return
async with longtask_data() as data:
tasks = data.to_handle.get(LONG_TASK_NAME, {}).values()
tasks = [t for t in tasks if t.target.target_id == target.target_id]
tasks = sorted(tasks, key=lambda t: t.deadline)
pages = ceil(len(tasks) / PAGE_SIZE)
if page > pages:
await target.send_message(UniMessage().text(f"最多也就 {pages} 页啦!"))
tasks = tasks[(page - 1) * PAGE_SIZE: page * PAGE_SIZE]
message = "你可以输入「删除提醒 序号」来删除一个提醒\n====== 代办清单 ======\n\n"
to_cache = {}
if len(tasks) == 0:
message += "空空如也\n"
else:
for i, task in enumerate(tasks):
to_cache[str(i + 1)] = task.uuid
message += f"{i + 1}) {task.data['message']}{task.deadline.strftime(FMT_STRING)}\n"
message += f"\n==== 第 {page} 页,共 {pages} 页 ===="
USER_CHECKOUT_TASK_CACHE[target.target_id] = to_cache
await target.send_message(UniMessage().text(message))
cmd_remove_task = on_alconna(Alconna(
"re:删除(?:提醒|代办)",
Args["checker", str],
))
@cmd_remove_task.handle()
async def _(checker: str, target: DepLongTaskTarget):
if target.target_id not in USER_CHECKOUT_TASK_CACHE:
await target.send_message(UniMessage().text(
"先用「查询提醒」来查询你有哪些提醒吧"
))
return
if checker not in USER_CHECKOUT_TASK_CACHE[target.target_id]:
await target.send_message(UniMessage().text(
"没有这个任务哦,请检查一下吧"
))
uuid = USER_CHECKOUT_TASK_CACHE[target.target_id][checker]
async with longtask_data() as data:
if uuid not in data.to_handle[LONG_TASK_NAME]:
await target.send_message(UniMessage().text(
"似乎这个提醒已经发出去了,或者已经被删除"
))
return
_msg = data.to_handle[LONG_TASK_NAME][uuid].data["message"]
del data.to_handle[LONG_TASK_NAME][uuid]
await target.send_message(UniMessage().text(
f"成功取消了提醒:{_msg}"
))
cmd_notify_channel = on_alconna(Alconna(
"ntfy",
Subcommand("删除", dest="delete"),
Subcommand("创建", Args["notify_id?", str], dest="create"),
), rule=lambda: config.plugin_notify_enable_ntfy)
@cmd_notify_channel.assign("$main")
async def _(target: DepLongTaskTarget):
await target.send_message(UniMessage.text(
"配置 ntfy 通知:\n\n"
"- ntfy 创建: 启用 ntfy 通知,并为你随机生成一个通知渠道\n"
"- ntfy 删除:禁用 ntfy 通知\n"
))
@cmd_notify_channel.assign("create")
async def _(target: DepLongTaskTarget, notify_id: str = ""):
if notify_id == "":
notify_id = nanoid.generate(
alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz-",
size=16,
)
channel_name = f"{config.plugin_notify_prefix}{notify_id}"
async with DATA_FILE_LOCK:
data = load_notify_config()
data.notify_channels[target.target_id] = channel_name
save_notify_config(data)
await target.send_message(UniMessage.text(
f"了解!将会在 {channel_name} 为你提醒!\n"
"\n"
"食用教程:在你的手机端 / 网页端 ntfy 点击「订阅主题」,选择「使用其他服务器」,"
f"服务器填写 {config.plugin_notify_base_url} ,主题名填写 {channel_name}\n"
f"最后点击订阅,就能看到我给你发的消息啦!"
))
await send_notify_to_ntfy_instance(
"如果你看到这条消息,说明你已经成功订阅主题!此方 BOT 将会在这里提醒你你的代办!",
channel_name,
)
@cmd_notify_channel.assign("delete")
async def _(target: DepLongTaskTarget):
async with DATA_FILE_LOCK:
data = load_notify_config()
del data.notify_channels[target.target_id]
save_notify_config(data)
await target.send_message(UniMessage.text("ok."))

View File

@ -0,0 +1,80 @@
from typing import Optional
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
from konabot.common.web_render import WebRenderer
from nonebot.adapters import Event as BaseEvent
from playwright.async_api import Page
async def continue_handle(page: Page, content: str) -> None:
# 这里可以添加一些预处理逻辑
# 找到 id 为 input 的 textarea 元素
textarea = await page.query_selector("#input")
if textarea:
# 在 textarea 中输入内容
await textarea.fill(content)
# 找到 id 为 submit-btn 的按钮元素
submit_button = await page.query_selector("#submit-btn")
if submit_button:
# 点击按钮提交
await submit_button.click()
evt = on_alconna(
Alconna(
f"生成喜报",
Args["content?", str]
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent, content: Optional[str] = ""):
screenshot = await WebRenderer.render(
url="https://witnessbot.mxowl.com/services/congratulations/",
target="#main-canvas",
other_function=lambda page: continue_handle(page, content),
timeout=30
)
await evt.send(
await UniMessage().image(raw=screenshot).export()
)
async def beibao_continue_handle(page: Page, content: str) -> None:
# 这里可以添加一些预处理逻辑
# 找到 id 为 input 的 textarea 元素
textarea = await page.query_selector("#input")
if textarea:
# 在 textarea 中输入内容
await textarea.fill(content)
# 找到 class 为 btn btn-outline-primaryfor属性为 mode-2 的标签元素
mode_radio = await page.query_selector("label.btn.btn-outline-primary[for='mode-2']")
if mode_radio:
# 点击选择悲报模式
await mode_radio.click()
# 找到 id 为 submit-btn 的按钮元素
submit_button = await page.query_selector("#submit-btn")
if submit_button:
# 点击按钮提交
await submit_button.click()
evt = on_alconna(
Alconna(
f"生成悲报",
Args["content?", str]
),
use_cmd_start=True,
use_cmd_sep=False,
skip_for_unmatch=True,
)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent, content: Optional[str] = ""):
screenshot = await WebRenderer.render(
url="https://witnessbot.mxowl.com/services/congratulations/",
target="#main-canvas",
other_function=lambda page: beibao_continue_handle(page, content),
timeout=30
)
await evt.send(
await UniMessage().image(raw=screenshot).export()
)

663
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -2,30 +2,31 @@
name = "konabot"
version = "0.1.0"
description = "在 MTTU 内部使用的 bot"
authors = [
{name = "passthem",email = "Passthem183@gmail.com"}
]
authors = [{ name = "passthem", email = "Passthem183@gmail.com" }]
readme = "README.md"
requires-python = ">=3.12,<4.0"
dependencies = [
"nonebot2[all] (>=2.4.3,<3.0.0)",
"nonebot-adapter-onebot (>=2.4.6,<3.0.0)",
"nonebot-adapter-console (>=0.9.0,<0.10.0)",
"nonebot-adapter-discord (>=0.1.8,<0.2.0)",
"nonebot-adapter-minecraft (>=1.5.2,<2.0.0)",
"nonebot-plugin-alconna (>=0.59.4,<0.60.0)",
"nonebot-plugin-apscheduler (>=0.5.0,<0.6.0)",
"requests (>=2.32.5,<3.0.0)",
"beautifulsoup4 (>=4.13.5,<5.0.0)",
"lxml (>=6.0.2,<7.0.0)",
"pillow (>=11.3.0,<12.0.0)",
"imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)",
"ptimeparse (>=0.1.1,<0.2.0)",
"skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)",
"nonebot2[all] (>=2.4.3,<3.0.0)",
"nonebot-adapter-onebot (>=2.4.6,<3.0.0)",
"nonebot-adapter-console (>=0.9.0,<0.10.0)",
"nonebot-adapter-discord (>=0.1.8,<0.2.0)",
"nonebot-adapter-minecraft (>=1.5.2,<2.0.0)",
"nonebot-plugin-alconna (>=0.59.4,<0.60.0)",
"nonebot-plugin-apscheduler (>=0.5.0,<0.6.0)",
"requests (>=2.32.5,<3.0.0)",
"beautifulsoup4 (>=4.13.5,<5.0.0)",
"lxml (>=6.0.2,<7.0.0)",
"pillow (>=11.3.0,<12.0.0)",
"imagetext-py (>=2.2.0,<3.0.0)",
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
"returns (>=0.26.0,<0.27.0)",
"skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)",
"ptimeparse (>=0.2.1,<0.3.0)",
"nanoid (>=2.0.0,<3.0.0)",
"opencc (>=1.1.9,<2.0.0)",
"playwright (>=1.55.0,<2.0.0)",
]
[build-system]
@ -37,5 +38,9 @@ name = "pt-gitea-pypi"
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
priority = "supplemental"
[[tool.poetry.source]]
name = "mirrors"
url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
priority = "primary"
[tool.poetry.dependencies]
ptimeparse = {source = "pt-gitea-pypi"}