Compare commits
41 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 021133954e | |||
| 7baa04dbc2 | |||
| e55bdbdf4a | |||
| a30c7b8093 | |||
| 3da2c2266f | |||
| 96e3c3fe17 | |||
| 851c9eb3c7 | |||
| 11269b2a5a | |||
| 875e0efc2f | |||
| 4f43312663 | |||
| b2f4768573 | |||
| bc6263ec31 | |||
| bc9d025836 | |||
| b552aacf89 | |||
| f9a0249772 | |||
| c94db33b11 | |||
| 67382a0c0a | |||
| fd4c9302c2 | |||
| f30ad0cb7d | |||
| f7afe48680 | |||
| b42385f780 | |||
| 6cae38dea9 | |||
| 8594b59783 | |||
| f768c91430 | |||
| a65cb118cc | |||
| 75c6bbd23f | |||
| aaf0a75d65 | |||
| 8f560ce1ba | |||
| 9f3f79f51d | |||
| 92048aeff7 | |||
| 81aac10665 | |||
| 3ce230adfe | |||
| 4f885554ca | |||
| 7ebcb8add4 | |||
| e18cc82792 | |||
| eb28cd0a0c | |||
| 2d688a6ed6 | |||
| e9aac52200 | |||
| 4305548ab5 | |||
| 99382a3bf5 | |||
| 92e43785bf |
@ -10,6 +10,10 @@ trigger:
|
|||||||
- master
|
- master
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
|
- name: submodules
|
||||||
|
image: alpine/git
|
||||||
|
commands:
|
||||||
|
- git submodule update --init --recursive
|
||||||
- name: 构建 Docker 镜像
|
- name: 构建 Docker 镜像
|
||||||
image: plugins/docker:latest
|
image: plugins/docker:latest
|
||||||
privileged: true
|
privileged: true
|
||||||
@ -50,6 +54,10 @@ trigger:
|
|||||||
- tag
|
- tag
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
|
- name: submodules
|
||||||
|
image: alpine/git
|
||||||
|
commands:
|
||||||
|
- git submodule update --init --recursive
|
||||||
- name: 构建并推送 Release Docker 镜像
|
- name: 构建并推送 Release Docker 镜像
|
||||||
image: plugins/docker:latest
|
image: plugins/docker:latest
|
||||||
privileged: true
|
privileged: true
|
||||||
|
|||||||
BIN
assets/img/meme/anan_base.png
Normal file
BIN
assets/img/meme/anan_base.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 841 KiB |
BIN
assets/img/meme/anan_top.png
Normal file
BIN
assets/img/meme/anan_top.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 821 KiB |
BIN
assets/img/meme/snaur_1_base.png
Executable file
BIN
assets/img/meme/snaur_1_base.png
Executable file
Binary file not shown.
|
After Width: | Height: | Size: 1.1 MiB |
BIN
assets/img/meme/snaur_1_top.png
Executable file
BIN
assets/img/meme/snaur_1_top.png
Executable file
Binary file not shown.
|
After Width: | Height: | Size: 1008 KiB |
6
bot.py
6
bot.py
@ -20,9 +20,13 @@ env_enable_minecraft = os.environ.get("ENABLE_MINECRAFT", "none")
|
|||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
if env.upper() == 'DEBUG' or env.upper() == 'DEV':
|
||||||
|
console_log_level = 'DEBUG'
|
||||||
|
else:
|
||||||
|
console_log_level = 'INFO'
|
||||||
init_logger(LOG_PATH, [
|
init_logger(LOG_PATH, [
|
||||||
BotExceptionMessage,
|
BotExceptionMessage,
|
||||||
])
|
], console_log_level=console_log_level)
|
||||||
|
|
||||||
nonebot.init()
|
nonebot.init()
|
||||||
|
|
||||||
|
|||||||
36
konabot/common/data_man.py
Normal file
36
konabot/common/data_man.py
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
import asyncio
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Generic, TypeVar
|
||||||
|
|
||||||
|
from pydantic import BaseModel, ValidationError
|
||||||
|
|
||||||
|
T = TypeVar("T", bound=BaseModel)
|
||||||
|
|
||||||
|
|
||||||
|
class DataManager(Generic[T]):
|
||||||
|
def __init__(self, cls: type[T], fp: Path) -> None:
|
||||||
|
self.cls = cls
|
||||||
|
self.fp = fp
|
||||||
|
self._aio_lock = asyncio.Lock()
|
||||||
|
self._data: T | None = None
|
||||||
|
|
||||||
|
def load(self) -> T:
|
||||||
|
if not self.fp.exists():
|
||||||
|
return self.cls()
|
||||||
|
try:
|
||||||
|
return self.cls.model_validate_json(self.fp.read_text())
|
||||||
|
except ValidationError:
|
||||||
|
return self.cls()
|
||||||
|
|
||||||
|
def save(self, data: T):
|
||||||
|
self.fp.write_text(data.model_dump_json())
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def get_data(self):
|
||||||
|
await self._aio_lock.acquire()
|
||||||
|
self._data = self.load()
|
||||||
|
yield self._data
|
||||||
|
self.save(self._data)
|
||||||
|
self._data = None
|
||||||
|
self._aio_lock.release()
|
||||||
@ -18,7 +18,7 @@ def file_exception_filter(
|
|||||||
否则,返回 True(允许记录)。
|
否则,返回 True(允许记录)。
|
||||||
"""
|
"""
|
||||||
exception_info = record.get("exception")
|
exception_info = record.get("exception")
|
||||||
|
|
||||||
if exception_info:
|
if exception_info:
|
||||||
exception_type = exception_info[0]
|
exception_type = exception_info[0]
|
||||||
|
|
||||||
@ -29,8 +29,9 @@ def file_exception_filter(
|
|||||||
|
|
||||||
|
|
||||||
def init_logger(
|
def init_logger(
|
||||||
log_dir: Path,
|
log_dir: Path,
|
||||||
ignored_exceptions: List[Type[Exception]]
|
ignored_exceptions: List[Type[Exception]],
|
||||||
|
console_log_level: str = "INFO",
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
配置全局 Loguru Logger。
|
配置全局 Loguru Logger。
|
||||||
@ -47,7 +48,7 @@ def init_logger(
|
|||||||
|
|
||||||
logger.add(
|
logger.add(
|
||||||
sys.stderr,
|
sys.stderr,
|
||||||
level="INFO",
|
level=console_log_level,
|
||||||
colorize=True,
|
colorize=True,
|
||||||
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
|
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
|
||||||
)
|
)
|
||||||
@ -76,4 +77,4 @@ def init_logger(
|
|||||||
)
|
)
|
||||||
|
|
||||||
logger.info("Loguru Logger 初始化完成!")
|
logger.info("Loguru Logger 初始化完成!")
|
||||||
logger.info(f"控制台日志级别: INFO")
|
logger.info(f"控制台日志级别: {console_log_level}")
|
||||||
|
|||||||
301
konabot/common/longtask.py
Normal file
301
konabot/common/longtask.py
Normal file
@ -0,0 +1,301 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
import datetime
|
||||||
|
import json
|
||||||
|
from typing import Annotated, Any, Callable, Coroutine, cast
|
||||||
|
import asyncio as asynkio
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
import nonebot
|
||||||
|
from nonebot.params import Depends
|
||||||
|
from nonebot.adapters import Event as BaseEvent
|
||||||
|
from nonebot.adapters import Bot as BaseBot
|
||||||
|
from nonebot.adapters.onebot.v11 import Bot as OBBot
|
||||||
|
from nonebot.adapters.onebot.v11 import GroupMessageEvent as OBGroupMessageEvent
|
||||||
|
from nonebot.adapters.onebot.v11 import PrivateMessageEvent as OBPrivateMessageEvent
|
||||||
|
from nonebot.adapters.console import Bot as ConsoleBot
|
||||||
|
from nonebot.adapters.console import MessageEvent as ConsoleMessageEvent
|
||||||
|
from nonebot.adapters.discord import MessageEvent as DCMessageEvent
|
||||||
|
from nonebot.adapters.discord import Bot as DCBot
|
||||||
|
from nonebot_plugin_alconna import UniMessage
|
||||||
|
from pydantic import BaseModel, ValidationError
|
||||||
|
|
||||||
|
from .path import DATA_PATH
|
||||||
|
|
||||||
|
LONGTASK_DATA_DIR = DATA_PATH / "longtasks.json"
|
||||||
|
QQ_PRIVATE_CHAT_CHANNEL_PREFIX = "_CHANNEL_QQ_PRIVATE_"
|
||||||
|
|
||||||
|
|
||||||
|
class LongTaskTarget(BaseModel):
|
||||||
|
"""
|
||||||
|
用于定义长期任务的目标沟通对象,一般通过 DepLongTaskTarget 依赖注入获取:
|
||||||
|
|
||||||
|
```python
|
||||||
|
@cmd.handle()
|
||||||
|
async def _(target: DepLongTaskTarget):
|
||||||
|
...
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
|
||||||
|
platform: str
|
||||||
|
"沟通对象所在的平台"
|
||||||
|
|
||||||
|
self_id: str
|
||||||
|
"进行沟通的对象自己的 ID"
|
||||||
|
|
||||||
|
channel_id: str
|
||||||
|
"沟通对象所在的群或者 Discord Channel。若为空则代表是私聊"
|
||||||
|
|
||||||
|
target_id: str
|
||||||
|
"沟通对象的 ID"
|
||||||
|
|
||||||
|
async def send_message(self, msg: UniMessage, at: bool = True) -> bool:
|
||||||
|
try:
|
||||||
|
bot = nonebot.get_bot(self.self_id)
|
||||||
|
except KeyError:
|
||||||
|
logger.warning(f"试图访问了不存在的 Bot。ID={self.self_id}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if self.platform == "qq":
|
||||||
|
if not isinstance(bot, OBBot):
|
||||||
|
logger.warning(
|
||||||
|
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
|
||||||
|
self.platform
|
||||||
|
} BOT_CLASS={bot.__class__.__name__}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
if self.channel_id.startswith(QQ_PRIVATE_CHAT_CHANNEL_PREFIX):
|
||||||
|
# 私聊模式
|
||||||
|
await bot.send_private_msg(
|
||||||
|
user_id=int(self.target_id),
|
||||||
|
message=cast(Any, await msg.export(bot)),
|
||||||
|
auto_escape=False,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
if at:
|
||||||
|
msg = UniMessage().at(self.target_id).text(" ") + msg
|
||||||
|
await bot.send_group_msg(
|
||||||
|
group_id=int(self.channel_id),
|
||||||
|
message=cast(Any, await msg.export(bot)),
|
||||||
|
auto_escape=False,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
if self.platform == "console":
|
||||||
|
if not isinstance(bot, ConsoleBot):
|
||||||
|
logger.warning(
|
||||||
|
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
|
||||||
|
self.platform
|
||||||
|
} BOT_CLASS={bot.__class__.__name__}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
await bot.send_message(self.channel_id, cast(Any, await msg.export()))
|
||||||
|
return True
|
||||||
|
if self.platform == "discord":
|
||||||
|
if not isinstance(bot, DCBot):
|
||||||
|
logger.warning(
|
||||||
|
f"编号对应的平台并非期望的平台 ID={self.self_id} PLATFORM={
|
||||||
|
self.platform
|
||||||
|
} BOT_CLASS={bot.__class__.__name__}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
await bot.send_to(
|
||||||
|
channel_id=int(self.channel_id),
|
||||||
|
message=cast(
|
||||||
|
Any, await (UniMessage().at(self.target_id) + msg).export()
|
||||||
|
),
|
||||||
|
tts=False,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
logger.warning(f"没有一个平台是期望的平台 PLATFORM={self.platform}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class LongTask(BaseModel):
|
||||||
|
uuid: str
|
||||||
|
data_json: str
|
||||||
|
target: LongTaskTarget
|
||||||
|
callback: str
|
||||||
|
deadline: datetime.datetime
|
||||||
|
canceled: bool = False
|
||||||
|
|
||||||
|
_aio_task: asynkio.Task | None = None
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
if self.deadline < now and not self.canceled:
|
||||||
|
await self._run_task()
|
||||||
|
return
|
||||||
|
await asynkio.sleep((self.deadline - now).total_seconds())
|
||||||
|
if self.canceled:
|
||||||
|
return
|
||||||
|
await self._run_task()
|
||||||
|
|
||||||
|
async def _run_task(self):
|
||||||
|
hdl = registered_long_task_handler.get(self.callback, None)
|
||||||
|
if hdl is None:
|
||||||
|
logger.warning(
|
||||||
|
f"Callback {self.callback} 未曾被注册,但是被期待调用,已忽略"
|
||||||
|
)
|
||||||
|
async with longtask_data() as datafile:
|
||||||
|
datafile.to_handle[self.callback] = [
|
||||||
|
t
|
||||||
|
for t in datafile.to_handle.get(self.callback, [])
|
||||||
|
if t.uuid != self.uuid
|
||||||
|
]
|
||||||
|
datafile.unhandled.setdefault(self.callback, []).append(self)
|
||||||
|
|
||||||
|
return
|
||||||
|
success = False
|
||||||
|
try:
|
||||||
|
await hdl(self)
|
||||||
|
success = True
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(e)
|
||||||
|
async with longtask_data() as datafile:
|
||||||
|
datafile.to_handle[self.callback] = [
|
||||||
|
t for t in datafile.to_handle[self.callback] if t.uuid != self.uuid
|
||||||
|
]
|
||||||
|
if not success:
|
||||||
|
datafile.unhandled.setdefault(self.callback, []).append(self)
|
||||||
|
logger.info(
|
||||||
|
f"LongTask 执行失败 UUID={self.uuid} callback={self.callback}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
f"LongTask 工作完成 UUID={self.uuid} callback={self.callback}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def clean(self):
|
||||||
|
self._aio_task = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def data(self):
|
||||||
|
return json.loads(self.data_json)
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
self._aio_task = asynkio.Task(self.run())
|
||||||
|
self._aio_task.add_done_callback(lambda _: self.clean())
|
||||||
|
|
||||||
|
|
||||||
|
class LongTaskModuleData(BaseModel):
|
||||||
|
to_handle: dict[str, list[LongTask]]
|
||||||
|
unhandled: dict[str, list[LongTask]]
|
||||||
|
|
||||||
|
|
||||||
|
async def get_long_task_target(event: BaseEvent, bot: BaseBot) -> LongTaskTarget | None:
|
||||||
|
if isinstance(event, OBGroupMessageEvent):
|
||||||
|
return LongTaskTarget(
|
||||||
|
platform="qq",
|
||||||
|
self_id=str(event.self_id),
|
||||||
|
channel_id=str(event.group_id),
|
||||||
|
target_id=str(event.user_id),
|
||||||
|
)
|
||||||
|
if isinstance(event, OBPrivateMessageEvent):
|
||||||
|
return LongTaskTarget(
|
||||||
|
platform="qq",
|
||||||
|
self_id=str(event.self_id),
|
||||||
|
channel_id=f"{QQ_PRIVATE_CHAT_CHANNEL_PREFIX}{event.self_id}",
|
||||||
|
target_id=str(event.user_id),
|
||||||
|
)
|
||||||
|
if isinstance(event, ConsoleMessageEvent):
|
||||||
|
return LongTaskTarget(
|
||||||
|
platform="console",
|
||||||
|
self_id=str(event.self_id),
|
||||||
|
channel_id=str(event.channel.id),
|
||||||
|
target_id=str(event.user.id),
|
||||||
|
)
|
||||||
|
if isinstance(event, DCMessageEvent):
|
||||||
|
self_id = ""
|
||||||
|
if isinstance(bot, DCBot):
|
||||||
|
self_id = str(bot.self_id)
|
||||||
|
return LongTaskTarget(
|
||||||
|
platform="discord",
|
||||||
|
self_id=self_id,
|
||||||
|
channel_id=str(event.channel_id),
|
||||||
|
target_id=str(event.user_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_TaskHandler = Callable[[LongTask], Coroutine[Any, Any, Any]]
|
||||||
|
|
||||||
|
|
||||||
|
registered_long_task_handler: dict[str, _TaskHandler] = {}
|
||||||
|
longtask_lock = asynkio.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def handle_long_task(callback_id: str):
|
||||||
|
def _decorator(func: _TaskHandler):
|
||||||
|
assert callback_id not in registered_long_task_handler, (
|
||||||
|
"有长任务的 ID 出现冲突,请换个名字!"
|
||||||
|
)
|
||||||
|
registered_long_task_handler[callback_id] = func
|
||||||
|
return func
|
||||||
|
|
||||||
|
return _decorator
|
||||||
|
|
||||||
|
|
||||||
|
def _load_longtask_data() -> LongTaskModuleData:
|
||||||
|
try:
|
||||||
|
txt = LONGTASK_DATA_DIR.read_text()
|
||||||
|
return LongTaskModuleData.model_validate_json(txt)
|
||||||
|
except (FileNotFoundError, ValidationError) as e:
|
||||||
|
logger.info(f"取得 LongTask 数据时出现问题:{e}")
|
||||||
|
return LongTaskModuleData(
|
||||||
|
to_handle={},
|
||||||
|
unhandled={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _save_longtask_data(data: LongTaskModuleData):
|
||||||
|
LONGTASK_DATA_DIR.write_text(data.model_dump_json())
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def longtask_data():
|
||||||
|
async with longtask_lock:
|
||||||
|
data = _load_longtask_data()
|
||||||
|
yield data
|
||||||
|
_save_longtask_data(data)
|
||||||
|
|
||||||
|
|
||||||
|
async def create_longtask(
|
||||||
|
handler: str,
|
||||||
|
data: dict[str, Any],
|
||||||
|
target: LongTaskTarget,
|
||||||
|
deadline: datetime.datetime,
|
||||||
|
):
|
||||||
|
task = LongTask(
|
||||||
|
uuid=str(uuid.uuid4()),
|
||||||
|
data_json=json.dumps(data),
|
||||||
|
target=target,
|
||||||
|
callback=handler,
|
||||||
|
deadline=deadline,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"创建了新的 LongTask UUID={task.uuid} CALLBACK={task.callback}")
|
||||||
|
await task.start()
|
||||||
|
|
||||||
|
async with longtask_data() as d:
|
||||||
|
d.to_handle.setdefault(handler, []).append(task)
|
||||||
|
|
||||||
|
return task
|
||||||
|
|
||||||
|
|
||||||
|
async def init_longtask():
|
||||||
|
counter = 0
|
||||||
|
req: set[str] = set()
|
||||||
|
|
||||||
|
async with longtask_data() as data:
|
||||||
|
for v in data.to_handle.values():
|
||||||
|
for t in v:
|
||||||
|
await t.start()
|
||||||
|
counter += 1
|
||||||
|
req.add(t.callback)
|
||||||
|
|
||||||
|
logger.info(f"LongTask 启动了任务 数量={counter} 期望的门类=[{','.join(req)}]")
|
||||||
|
|
||||||
|
|
||||||
|
DepLongTaskTarget = Annotated[LongTaskTarget, Depends(get_long_task_target)]
|
||||||
34
konabot/common/nb/wzq_conflict.py
Normal file
34
konabot/common/nb/wzq_conflict.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
from typing import cast
|
||||||
|
from nonebot import get_bot, get_plugin_config, logger
|
||||||
|
from nonebot.adapters import Event as BaseEvent
|
||||||
|
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
|
||||||
|
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
|
||||||
|
from nonebot.rule import Rule
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
|
class WZQConflictConfig(BaseModel):
|
||||||
|
wzq_bot_qq: int = 0
|
||||||
|
|
||||||
|
config = get_plugin_config(WZQConflictConfig)
|
||||||
|
|
||||||
|
|
||||||
|
async def no_wzqbot(evt: BaseEvent):
|
||||||
|
if config.wzq_bot_qq <= 0:
|
||||||
|
return True
|
||||||
|
if not isinstance(evt, GroupMessageEvent):
|
||||||
|
return True
|
||||||
|
gid = evt.group_id
|
||||||
|
sid = evt.self_id
|
||||||
|
bot = cast(OnebotBot, get_bot(str(sid)))
|
||||||
|
|
||||||
|
members = await bot.get_group_member_list(group_id=gid)
|
||||||
|
|
||||||
|
members = set((m.get("user_id", -1) for m in members))
|
||||||
|
if config.wzq_bot_qq in members:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
no_wzqbot_rule = Rule(no_wzqbot)
|
||||||
|
|
||||||
2
konabot/docs/concepts/罗文.txt
Normal file
2
konabot/docs/concepts/罗文.txt
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
关于罗文和洛温:
|
||||||
|
AdoreLowen 希望和洛温阿特金森区分,所以最好就不要叫他洛温了!此方 BOT 会在一些群提醒叫错了的人。
|
||||||
9
konabot/docs/sys/怪话过滤.txt
Normal file
9
konabot/docs/sys/怪话过滤.txt
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
指令介绍
|
||||||
|
怪话过滤 - 去除含有关键词的怪话
|
||||||
|
|
||||||
|
使用方法
|
||||||
|
`怪话过滤 说的道理`
|
||||||
|
去除所有含有“说的道理”的怪话
|
||||||
|
|
||||||
|
另见
|
||||||
|
怪话(1)
|
||||||
@ -45,7 +45,7 @@
|
|||||||
- 帧数必须为正整数(> 0)。
|
- 帧数必须为正整数(> 0)。
|
||||||
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
|
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
|
||||||
|
|
||||||
--s <速度>(可选)
|
--speed <速度>(可选)
|
||||||
- 调整 gif 图的速度。若为负数,则代表倒放
|
- 调整 gif 图的速度。若为负数,则代表倒放
|
||||||
|
|
||||||
使用方式
|
使用方式
|
||||||
|
|||||||
20
konabot/docs/user/卵总展示.txt
Normal file
20
konabot/docs/user/卵总展示.txt
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
指令介绍
|
||||||
|
卵总展示 - 让卵总举起你的图片
|
||||||
|
|
||||||
|
格式
|
||||||
|
<引用图片> 卵总展示 [选项]
|
||||||
|
卵总展示 [选项] <图片>
|
||||||
|
|
||||||
|
选项
|
||||||
|
`--whiteness <number>` 白度
|
||||||
|
将原图进行指数变换,以调整它的白的程度,默认为 0.0
|
||||||
|
|
||||||
|
`--black-level <number>` 黑色等级
|
||||||
|
将原图减淡,数值越大越淡,范围 0.0-1.0,默认 0.2
|
||||||
|
|
||||||
|
`--opacity <number>` 不透明度
|
||||||
|
将你的图片叠放在图片上的不透明度,默认为 0.8
|
||||||
|
|
||||||
|
`--saturation <number>` 饱和度
|
||||||
|
调整原图的饱和度,应该要大于 0.0,默认为 0.85
|
||||||
|
|
||||||
12
konabot/docs/user/怪话.txt
Normal file
12
konabot/docs/user/怪话.txt
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
指令介绍
|
||||||
|
说点怪话/说些怪话 - 让 BOT 学群友胡言乱语
|
||||||
|
|
||||||
|
适用范围
|
||||||
|
为保证安全,只有少数授权的群聊可以使用该指令
|
||||||
|
|
||||||
|
使用方法
|
||||||
|
`说点怪话 今天吃什么`
|
||||||
|
期待 Bot 会回答你什么吧
|
||||||
|
|
||||||
|
`说些怪话 明天不想上体育课`
|
||||||
|
Bot 会回复你三句怪话
|
||||||
25
konabot/plugins/auto_accept.py
Normal file
25
konabot/plugins/auto_accept.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import asyncio
|
||||||
|
import random
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
from nonebot import get_bot, on_request
|
||||||
|
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
|
||||||
|
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
|
||||||
|
|
||||||
|
from konabot.common.nb.is_admin import cfg as adminConfig
|
||||||
|
|
||||||
|
add_request = on_request()
|
||||||
|
|
||||||
|
@add_request.handle()
|
||||||
|
async def _(req: FriendRequestEvent):
|
||||||
|
bot = cast(OnebotBot, get_bot(str(req.self_id)))
|
||||||
|
ok_member_ls: set[int] = set()
|
||||||
|
for group in adminConfig.admin_qq_group:
|
||||||
|
members = await bot.get_group_member_list(group_id=group)
|
||||||
|
ok_member_ls |= cast(set[int], set((m.get("user_id") for m in members)))
|
||||||
|
if req.user_id in ok_member_ls:
|
||||||
|
await asyncio.sleep(random.randint(5, 10))
|
||||||
|
await req.approve(bot)
|
||||||
|
logger.info(f"已经自动同意 {req.user_id} 的好友请求")
|
||||||
|
|
||||||
39
konabot/plugins/bilibili_fetch/__init__.py
Normal file
39
konabot/plugins/bilibili_fetch/__init__.py
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import re
|
||||||
|
|
||||||
|
from nonebot import on_message
|
||||||
|
from nonebot_plugin_alconna import Reference, Reply, UniMsg
|
||||||
|
|
||||||
|
from nonebot.adapters import Event
|
||||||
|
|
||||||
|
|
||||||
|
matcher_fix = on_message()
|
||||||
|
|
||||||
|
pattern = (
|
||||||
|
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
|
||||||
|
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&#93;|]|\])哔哩哔哩).{0,500}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@matcher_fix.handle()
|
||||||
|
async def _(msg: UniMsg, event: Event):
|
||||||
|
to_search = msg.exclude(Reply, Reference).dump(json=True)
|
||||||
|
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
|
||||||
|
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
|
||||||
|
return
|
||||||
|
|
||||||
|
from nonebot_plugin_analysis_bilibili import handle_analysis
|
||||||
|
|
||||||
|
await handle_analysis(event)
|
||||||
|
|
||||||
|
# b_url: str
|
||||||
|
# b_page: str | None
|
||||||
|
# b_time: str | None
|
||||||
|
#
|
||||||
|
# from nonebot_plugin_analysis_bilibili.analysis_bilibili import extract as bilibili_extract
|
||||||
|
#
|
||||||
|
# b_url, b_page, b_time = bilibili_extract(to_search)
|
||||||
|
# if b_url is None:
|
||||||
|
# return
|
||||||
|
#
|
||||||
|
# await matcher_fix.send(await UniMessage().text(b_url).export())
|
||||||
|
|
||||||
@ -1,25 +0,0 @@
|
|||||||
import re
|
|
||||||
from loguru import logger
|
|
||||||
from nonebot import on_message
|
|
||||||
from nonebot_plugin_alconna import Reference, Reply, UniMsg
|
|
||||||
|
|
||||||
from nonebot.adapters import Event
|
|
||||||
|
|
||||||
|
|
||||||
matcher_fix = on_message()
|
|
||||||
|
|
||||||
pattern = (
|
|
||||||
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
|
|
||||||
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&#93;|]|\])哔哩哔哩).{0,500}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@matcher_fix.handle()
|
|
||||||
async def _(msg: UniMsg, event: Event):
|
|
||||||
to_search = msg.exclude(Reply, Reference).dump(json=True)
|
|
||||||
if not re.search(pattern, to_search):
|
|
||||||
return
|
|
||||||
logger.info("检测到有 Bilibili 相关的消息,直接进行一个调用")
|
|
||||||
_module = __import__("nonebot_plugin_analysis_bilibili")
|
|
||||||
await _module.handle_analysis(event)
|
|
||||||
|
|
||||||
68
konabot/plugins/fortune/__init__.py
Normal file
68
konabot/plugins/fortune/__init__.py
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
import asyncio
|
||||||
|
from nonebot import get_plugin_config, on_command, on_message
|
||||||
|
from nonebot.adapters import Bot
|
||||||
|
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
|
||||||
|
from nonebot_plugin_alconna import Alconna, Args, UniMessage, UniMsg, on_alconna
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from konabot.common.nb.is_admin import is_admin
|
||||||
|
from konabot.common.path import DATA_PATH
|
||||||
|
|
||||||
|
from .random_text_record import RandomTextManager
|
||||||
|
|
||||||
|
|
||||||
|
class FortuneConfig(BaseModel):
|
||||||
|
plugin_fortune_collect_groups: list[int] = []
|
||||||
|
|
||||||
|
|
||||||
|
fortune_wtf = RandomTextManager(DATA_PATH / "fortune_wtf.txt")
|
||||||
|
fortune_insert_lock = asyncio.Lock()
|
||||||
|
fortune_config = get_plugin_config(FortuneConfig)
|
||||||
|
|
||||||
|
|
||||||
|
async def is_collect_target(evt: GroupMessageEvent) -> bool:
|
||||||
|
if evt.group_id not in fortune_config.plugin_fortune_collect_groups:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
evt_collector = on_message(rule=is_collect_target)
|
||||||
|
|
||||||
|
@evt_collector.handle()
|
||||||
|
async def _(msg: UniMsg):
|
||||||
|
txt = msg.extract_plain_text()
|
||||||
|
if len(txt) > 50 or not txt.strip():
|
||||||
|
return
|
||||||
|
if txt.startswith("说点怪话") or txt.startswith("说些怪话") or txt.startswith("怪话过滤"):
|
||||||
|
return
|
||||||
|
async with fortune_insert_lock:
|
||||||
|
fortune_wtf.insert(txt)
|
||||||
|
|
||||||
|
|
||||||
|
cmd_guaihua = on_command("说点怪话", rule=is_collect_target)
|
||||||
|
|
||||||
|
@cmd_guaihua.handle()
|
||||||
|
async def _(bot: Bot):
|
||||||
|
await cmd_guaihua.send(await UniMessage().text(fortune_wtf.choice()).export(bot))
|
||||||
|
|
||||||
|
|
||||||
|
cmd_guaihuas = on_command("说些怪话", rule=is_collect_target)
|
||||||
|
|
||||||
|
@cmd_guaihuas.handle()
|
||||||
|
async def _(bot: Bot):
|
||||||
|
for _ in range(3):
|
||||||
|
await cmd_guaihuas.send(await UniMessage().text(fortune_wtf.choice()).export(bot))
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
cmd_filter_guaihua = on_alconna(Alconna(
|
||||||
|
"怪话过滤",
|
||||||
|
Args["keyword", str],
|
||||||
|
), rule=is_admin)
|
||||||
|
|
||||||
|
@cmd_filter_guaihua.handle()
|
||||||
|
async def _(keyword: str, bot: Bot):
|
||||||
|
async with fortune_insert_lock:
|
||||||
|
c = fortune_wtf.filter_out(keyword)
|
||||||
|
await cmd_filter_guaihua.send(await UniMessage().text(f"删除了 {c} 条怪话").export(bot))
|
||||||
|
|
||||||
70
konabot/plugins/fortune/random_text_record.py
Normal file
70
konabot/plugins/fortune/random_text_record.py
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
import base64
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
class RandomTextManager:
|
||||||
|
_cache: list[tuple[float, str]]
|
||||||
|
|
||||||
|
def __init__(self, fp: Path) -> None:
|
||||||
|
self.fp = fp
|
||||||
|
self._cache = []
|
||||||
|
if not self.fp.exists():
|
||||||
|
self.fp.touch()
|
||||||
|
else:
|
||||||
|
self.load()
|
||||||
|
|
||||||
|
def load(self):
|
||||||
|
self._cache = []
|
||||||
|
with self.fp.open("r") as f:
|
||||||
|
for line in f.readlines():
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
if "|" not in line:
|
||||||
|
continue
|
||||||
|
ts, cn = line.split("|")
|
||||||
|
try:
|
||||||
|
ts = float(ts)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
self._cache.append((ts, base64.b64decode(cn).decode("utf-8")))
|
||||||
|
|
||||||
|
def save(self):
|
||||||
|
lines = [
|
||||||
|
str(ts) + "|" + base64.b64encode(cn.encode("utf-8")).decode()
|
||||||
|
for ts, cn in self._cache
|
||||||
|
]
|
||||||
|
with self.fp.open("w") as f:
|
||||||
|
f.writelines(lines)
|
||||||
|
|
||||||
|
def insert(self, text: str, timestamp: float | None = None):
|
||||||
|
if timestamp is None:
|
||||||
|
timestamp = time.time()
|
||||||
|
with self.fp.open("a") as f:
|
||||||
|
f.write(str(timestamp) + "|" + base64.b64encode(text.encode("utf-8")).decode() + "\n")
|
||||||
|
self._cache.append((timestamp, text))
|
||||||
|
|
||||||
|
def choice(self, now: float | None = None):
|
||||||
|
contents: list[str] = []
|
||||||
|
weights: list[float] = []
|
||||||
|
|
||||||
|
if now is None:
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
for ts, cn in self._cache:
|
||||||
|
contents.append(cn)
|
||||||
|
weights.append((abs(now - ts) + 0.01) ** (-1))
|
||||||
|
|
||||||
|
return random.choices(contents, weights)[0]
|
||||||
|
|
||||||
|
def filter_out(self, keyword: str):
|
||||||
|
len1 = len(self._cache)
|
||||||
|
self._cache = [
|
||||||
|
(ts, cn) for ts, cn in self._cache
|
||||||
|
if keyword not in cn
|
||||||
|
]
|
||||||
|
self.save()
|
||||||
|
return len1 - len(self._cache)
|
||||||
|
|
||||||
@ -8,6 +8,8 @@ from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
|
|||||||
on_alconna)
|
on_alconna)
|
||||||
from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply
|
from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply
|
||||||
|
|
||||||
|
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
|
||||||
|
|
||||||
async def download_img(url):
|
async def download_img(url):
|
||||||
resp = requests.get(url.replace("https://multimedia.nt.qq","http://multimedia.nt.qq")) # bim获取QQ的图片时避免SSLv3报错
|
resp = requests.get(url.replace("https://multimedia.nt.qq","http://multimedia.nt.qq")) # bim获取QQ的图片时避免SSLv3报错
|
||||||
img_bytes = BytesIO()
|
img_bytes = BytesIO()
|
||||||
@ -42,7 +44,7 @@ gqrc = on_alconna(Alconna(
|
|||||||
missing_tips=lambda: "请输入你要转换为二维码的文字!"
|
missing_tips=lambda: "请输入你要转换为二维码的文字!"
|
||||||
)],
|
)],
|
||||||
# UniMessage[]
|
# UniMessage[]
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"})
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"}, rule=no_wzqbot_rule)
|
||||||
|
|
||||||
@gqrc.handle()
|
@gqrc.handle()
|
||||||
async def _(saying: list):
|
async def _(saying: list):
|
||||||
@ -66,4 +68,4 @@ async def _(saying: list):
|
|||||||
else:
|
else:
|
||||||
"""
|
"""
|
||||||
# genqr("\n".join(saying))
|
# genqr("\n".join(saying))
|
||||||
await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export())
|
await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export())
|
||||||
|
|||||||
@ -1,177 +1,548 @@
|
|||||||
import base64
|
import asyncio as asynkio
|
||||||
import secrets
|
import datetime
|
||||||
import json
|
import json
|
||||||
from typing import Literal
|
import secrets
|
||||||
|
from enum import Enum
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
from nonebot import on_message
|
from nonebot import on_message
|
||||||
from nonebot.adapters import Event as BaseEvent
|
from nonebot.adapters import Event as BaseEvent
|
||||||
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
|
||||||
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||||||
from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
|
from nonebot_plugin_alconna import (
|
||||||
UniMessage, UniMsg, on_alconna)
|
Alconna,
|
||||||
|
Args,
|
||||||
|
UniMessage,
|
||||||
|
UniMsg,
|
||||||
|
on_alconna,
|
||||||
|
)
|
||||||
|
|
||||||
|
from konabot.common.longtask import DepLongTaskTarget
|
||||||
from konabot.common.path import ASSETS_PATH
|
from konabot.common.path import ASSETS_PATH
|
||||||
|
|
||||||
ALL_WORDS = [] # 所有四字词语
|
DATA_FILE_PATH = (
|
||||||
ALL_IDIOMS = [] # 所有成语
|
Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
|
||||||
IDIOM_FIRST_CHAR = {} # 成语首字字典
|
)
|
||||||
|
|
||||||
INITED = False
|
|
||||||
|
|
||||||
def init_lexicon():
|
def load_banned_ids() -> list[str]:
|
||||||
global ALL_WORDS, ALL_IDIOMS, IDIOM_FIRST_CHAR
|
if not DATA_FILE_PATH.exists():
|
||||||
# 成语大表
|
return []
|
||||||
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
|
try:
|
||||||
ALL_IDIOMS_INFOS = json.load(f)
|
return json.loads(DATA_FILE_PATH.read_text())
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
|
||||||
|
return []
|
||||||
|
|
||||||
# 词语大表
|
|
||||||
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
|
def is_idiom_game_banned(group_id: str) -> bool:
|
||||||
ALL_WORDS = json.load(f)
|
banned_ids = load_banned_ids()
|
||||||
|
return group_id in banned_ids
|
||||||
|
|
||||||
|
|
||||||
|
def add_banned_id(group_id: str):
|
||||||
|
banned_ids = load_banned_ids()
|
||||||
|
if group_id not in banned_ids:
|
||||||
|
banned_ids.append(group_id)
|
||||||
|
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
|
||||||
|
|
||||||
|
|
||||||
|
def remove_banned_id(group_id: str):
|
||||||
|
banned_ids = load_banned_ids()
|
||||||
|
if group_id in banned_ids:
|
||||||
|
banned_ids.remove(group_id)
|
||||||
|
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
|
||||||
|
|
||||||
|
|
||||||
|
class TryStartState(Enum):
|
||||||
|
STARTED = 0
|
||||||
|
ALREADY_PLAYING = 1
|
||||||
|
NO_REMAINING_TIMES = 2
|
||||||
|
|
||||||
|
|
||||||
|
class TryStopState(Enum):
|
||||||
|
STOPPED = 0
|
||||||
|
NOT_PLAYING = 1
|
||||||
|
|
||||||
|
|
||||||
|
class TryVerifyState(Enum):
|
||||||
|
VERIFIED = 0
|
||||||
|
VERIFIED_AND_REAL = 1
|
||||||
|
NOT_IDIOM = 2
|
||||||
|
WRONG_FIRST_CHAR = 3
|
||||||
|
VERIFIED_BUT_NO_NEXT = 4
|
||||||
|
VERIFIED_GAME_END = 5
|
||||||
|
|
||||||
|
|
||||||
|
class IdiomGame:
|
||||||
|
ALL_WORDS = [] # 所有四字词语
|
||||||
|
ALL_IDIOMS = [] # 所有成语
|
||||||
|
INSTANCE_LIST: dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
|
||||||
|
IDIOM_FIRST_CHAR = {} # 所有成语包括词语的首字字典
|
||||||
|
AVALIABLE_IDIOM_FIRST_CHAR = {} # 真正有效的成语首字字典
|
||||||
|
|
||||||
|
__inited = False
|
||||||
|
|
||||||
|
def __init__(self, group_id: str):
|
||||||
|
# 初始化一局游戏
|
||||||
|
self.group_id = ""
|
||||||
|
self.now_playing = False
|
||||||
|
self.score_board = {}
|
||||||
|
self.last_idiom = ""
|
||||||
|
self.last_char = ""
|
||||||
|
self.remain_playing_times = 3
|
||||||
|
self.last_play_date = ""
|
||||||
|
self.all_buff_score = 0
|
||||||
|
self.lock = asynkio.Lock()
|
||||||
|
self.remain_rounds = 0 # 剩余回合数
|
||||||
|
IdiomGame.INSTANCE_LIST[group_id] = self
|
||||||
|
|
||||||
|
def be_able_to_play(self) -> bool:
|
||||||
|
if self.last_play_date != datetime.date.today():
|
||||||
|
self.last_play_date = datetime.date.today()
|
||||||
|
self.remain_playing_times = 1
|
||||||
|
if self.remain_playing_times > 0:
|
||||||
|
self.remain_playing_times -= 1
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def choose_start_idiom(self) -> str:
|
||||||
|
"""
|
||||||
|
随机选择一个成语作为起始成语
|
||||||
|
"""
|
||||||
|
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
|
||||||
|
self.last_char = self.last_idiom[-1]
|
||||||
|
if not self.is_nextable(self.last_char):
|
||||||
|
self.choose_start_idiom()
|
||||||
|
return self.last_idiom
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState:
|
||||||
|
cls.init_lexicon()
|
||||||
|
if not cls.INSTANCE_LIST.get(group_id):
|
||||||
|
cls(group_id)
|
||||||
|
instance = cls.INSTANCE_LIST[group_id]
|
||||||
|
if instance.now_playing:
|
||||||
|
return TryStartState.ALREADY_PLAYING
|
||||||
|
if not instance.be_able_to_play() and not force:
|
||||||
|
return TryStartState.NO_REMAINING_TIMES
|
||||||
|
instance.now_playing = True
|
||||||
|
return TryStartState.STARTED
|
||||||
|
|
||||||
|
def start_game(self, rounds: int = 100):
|
||||||
|
self.now_playing = True
|
||||||
|
self.remain_rounds = rounds
|
||||||
|
self.choose_start_idiom()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def try_stop_game(cls, group_id: str) -> TryStopState:
|
||||||
|
if not cls.INSTANCE_LIST.get(group_id):
|
||||||
|
return TryStopState.NOT_PLAYING
|
||||||
|
instance = cls.INSTANCE_LIST[group_id]
|
||||||
|
if not instance.now_playing:
|
||||||
|
return TryStopState.NOT_PLAYING
|
||||||
|
instance.now_playing = False
|
||||||
|
return TryStopState.STOPPED
|
||||||
|
|
||||||
|
def clear_score_board(self):
|
||||||
|
self.score_board = {}
|
||||||
|
self.last_char = ""
|
||||||
|
|
||||||
|
def get_score_board(self) -> dict:
|
||||||
|
return self.score_board
|
||||||
|
|
||||||
|
def get_all_buff_score(self) -> int:
|
||||||
|
return self.all_buff_score
|
||||||
|
|
||||||
|
async def skip_idiom(self, buff_score: int = -100) -> str:
|
||||||
|
"""
|
||||||
|
跳过当前成语,选择下一个成语
|
||||||
|
"""
|
||||||
|
async with self.lock:
|
||||||
|
self._skip_idiom_async()
|
||||||
|
self.add_buff_score(buff_score)
|
||||||
|
return self.last_idiom
|
||||||
|
|
||||||
|
def _skip_idiom_async(self) -> str:
|
||||||
|
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
|
||||||
|
self.last_char = self.last_idiom[-1]
|
||||||
|
if not self.is_nextable(self.last_char):
|
||||||
|
self._skip_idiom_async()
|
||||||
|
return self.last_idiom
|
||||||
|
|
||||||
|
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
|
||||||
|
"""
|
||||||
|
用户发送成语
|
||||||
|
"""
|
||||||
|
async with self.lock:
|
||||||
|
state = self._verify_idiom(idiom, user_id)
|
||||||
|
return state
|
||||||
|
|
||||||
|
def is_nextable(self, last_char: str) -> bool:
|
||||||
|
"""
|
||||||
|
判断是否有成语可以接
|
||||||
|
"""
|
||||||
|
return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR
|
||||||
|
|
||||||
|
def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
|
||||||
|
# 新成语的首字应与上一条成语的尾字相同
|
||||||
|
if idiom[0] != self.last_char:
|
||||||
|
return TryVerifyState.WRONG_FIRST_CHAR
|
||||||
|
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
|
||||||
|
self.add_score(user_id, -0.1)
|
||||||
|
return TryVerifyState.NOT_IDIOM
|
||||||
|
self.last_idiom = idiom
|
||||||
|
self.last_char = idiom[-1]
|
||||||
|
self.add_score(user_id, 1)
|
||||||
|
if idiom in IdiomGame.ALL_IDIOMS:
|
||||||
|
self.add_score(user_id, 4) # 再加 4 分
|
||||||
|
self.remain_rounds -= 1
|
||||||
|
if self.remain_rounds <= 0:
|
||||||
|
self.now_playing = False
|
||||||
|
return TryVerifyState.VERIFIED_GAME_END
|
||||||
|
if not self.is_nextable(self.last_char):
|
||||||
|
# 没有成语可以接了,自动跳过
|
||||||
|
self._skip_idiom_async()
|
||||||
|
return TryVerifyState.VERIFIED_BUT_NO_NEXT
|
||||||
|
if idiom in IdiomGame.ALL_IDIOMS:
|
||||||
|
return TryVerifyState.VERIFIED_AND_REAL # 真实成语
|
||||||
|
return TryVerifyState.VERIFIED
|
||||||
|
|
||||||
|
def get_user_score(self, user_id: str) -> float:
|
||||||
|
if user_id not in self.score_board:
|
||||||
|
return 0
|
||||||
|
# 避免浮点数精度问题导致过长
|
||||||
|
handled_score = round(self.score_board[user_id]["score"], 1)
|
||||||
|
return handled_score
|
||||||
|
|
||||||
|
def add_score(self, user_id: str, score: int):
|
||||||
|
if user_id not in self.score_board:
|
||||||
|
self.score_board[user_id] = {"name": user_id, "score": 0}
|
||||||
|
self.score_board[user_id]["score"] += score
|
||||||
|
|
||||||
|
def add_buff_score(self, score: int):
|
||||||
|
self.all_buff_score += score
|
||||||
|
|
||||||
|
def get_playing_state(self) -> bool:
|
||||||
|
return self.now_playing
|
||||||
|
|
||||||
|
def get_last_char(self) -> str:
|
||||||
|
return self.last_char
|
||||||
|
|
||||||
COMMON_WORDS = []
|
@classmethod
|
||||||
# 读取 COMMON 词语大表
|
def random_idiom_starting_with(cls, first_char: str) -> Optional[str]:
|
||||||
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
|
cls.init_lexicon()
|
||||||
for line in f:
|
if first_char not in cls.IDIOM_FIRST_CHAR:
|
||||||
word = line.strip()
|
return None
|
||||||
if len(word) == 4:
|
return secrets.choice(cls.IDIOM_FIRST_CHAR[first_char])
|
||||||
COMMON_WORDS.append(word)
|
|
||||||
|
|
||||||
# 读取 THUOCL 成语库
|
@classmethod
|
||||||
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
|
def init_lexicon(cls):
|
||||||
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
|
if cls.__inited:
|
||||||
|
return
|
||||||
|
cls.__inited = True
|
||||||
|
|
||||||
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
|
# 成语大表
|
||||||
THUOCL_WORDS = []
|
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
|
||||||
import os
|
ALL_IDIOMS_INFOS = json.load(f)
|
||||||
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
|
|
||||||
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
|
# 词语大表
|
||||||
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
|
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
|
||||||
for line in f:
|
cls.ALL_WORDS = json.load(f)
|
||||||
word = line.lstrip().split(" ")[0].strip()
|
|
||||||
if len(word) == 4:
|
COMMON_WORDS = []
|
||||||
THUOCL_WORDS.append(word)
|
# 读取 COMMON 词语大表
|
||||||
|
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
|
||||||
|
for line in f:
|
||||||
|
word = line.strip()
|
||||||
|
if len(word) == 4:
|
||||||
|
COMMON_WORDS.append(word)
|
||||||
|
|
||||||
|
# 读取 THUOCL 成语库
|
||||||
|
with open(
|
||||||
|
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt",
|
||||||
|
"r",
|
||||||
|
encoding="utf-8",
|
||||||
|
) as f:
|
||||||
|
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
|
||||||
|
|
||||||
|
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
|
||||||
|
THUOCL_WORDS = []
|
||||||
|
import os
|
||||||
|
|
||||||
|
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
|
||||||
|
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
|
||||||
|
with open(
|
||||||
|
ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename,
|
||||||
|
"r",
|
||||||
|
encoding="utf-8",
|
||||||
|
) as f:
|
||||||
|
for line in f:
|
||||||
|
word = line.lstrip().split(" ")[0].strip()
|
||||||
|
if len(word) == 4:
|
||||||
|
THUOCL_WORDS.append(word)
|
||||||
|
|
||||||
|
# 只有成语的大表
|
||||||
|
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
|
||||||
|
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
|
||||||
|
|
||||||
|
# 其他四字词语表,仅表示可以有这个词
|
||||||
|
cls.ALL_WORDS = (
|
||||||
|
[word for word in cls.ALL_WORDS if len(word) == 4]
|
||||||
|
+ THUOCL_WORDS
|
||||||
|
+ COMMON_WORDS
|
||||||
|
)
|
||||||
|
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
|
||||||
|
|
||||||
|
# 根据成语大表,划分出成语首字字典
|
||||||
|
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
|
||||||
|
if idiom[0] not in cls.IDIOM_FIRST_CHAR:
|
||||||
|
cls.IDIOM_FIRST_CHAR[idiom[0]] = []
|
||||||
|
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
|
||||||
|
|
||||||
|
# 根据真正的成语大表,划分出有效成语首字字典
|
||||||
|
for idiom in cls.ALL_IDIOMS:
|
||||||
|
if idiom[0] not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
|
||||||
|
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]] = []
|
||||||
|
cls.AVALIABLE_IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
|
||||||
|
|
||||||
|
|
||||||
# 只有成语的大表
|
evt = on_alconna(
|
||||||
ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
|
Alconna(
|
||||||
ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
|
"我要玩成语接龙",
|
||||||
|
Args["rounds?", int],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=True,
|
||||||
|
)
|
||||||
|
|
||||||
# 其他四字词语表,仅表示可以有这个词
|
|
||||||
ALL_WORDS = [word for word in ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
|
|
||||||
ALL_WORDS = list(set(ALL_WORDS)) # 去重
|
|
||||||
|
|
||||||
# 根据成语大表,划分出成语首字字典
|
|
||||||
IDIOM_FIRST_CHAR = {}
|
|
||||||
for idiom in ALL_IDIOMS + ALL_WORDS:
|
|
||||||
if idiom[0] not in IDIOM_FIRST_CHAR:
|
|
||||||
IDIOM_FIRST_CHAR[idiom[0]] = []
|
|
||||||
IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
|
|
||||||
|
|
||||||
NOW_PLAYING = False
|
|
||||||
|
|
||||||
SCORE_BOARD = {}
|
|
||||||
|
|
||||||
LAST_CHAR = ""
|
|
||||||
|
|
||||||
USER_NAME_CACHE = {} # 缓存用户名称,避免多次获取
|
|
||||||
|
|
||||||
evt = on_alconna(Alconna(
|
|
||||||
"我要玩成语接龙"
|
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
|
||||||
|
|
||||||
@evt.handle()
|
@evt.handle()
|
||||||
async def _(event: BaseEvent):
|
async def play_game(
|
||||||
global NOW_PLAYING, LAST_CHAR, INITED
|
event: BaseEvent,
|
||||||
if not INITED:
|
target: DepLongTaskTarget,
|
||||||
init_lexicon()
|
force=False,
|
||||||
INITED = True
|
rounds: Optional[int] = 100,
|
||||||
if NOW_PLAYING:
|
):
|
||||||
await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export())
|
# group_id = str(event.get_session_id())
|
||||||
|
group_id = target.channel_id
|
||||||
|
if is_idiom_game_banned(group_id):
|
||||||
|
await evt.send(
|
||||||
|
await UniMessage().text("本群已被禁止使用成语接龙功能!").export()
|
||||||
|
)
|
||||||
return
|
return
|
||||||
NOW_PLAYING = True
|
rounds = rounds or 0
|
||||||
await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export())
|
if rounds <= 0:
|
||||||
# 选择一个随机成语
|
await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export())
|
||||||
idiom = secrets.choice(ALL_IDIOMS)
|
return
|
||||||
LAST_CHAR = idiom[-1]
|
state = IdiomGame.try_start_game(group_id, force)
|
||||||
|
if state == TryStartState.ALREADY_PLAYING:
|
||||||
|
await evt.send(
|
||||||
|
await UniMessage()
|
||||||
|
.text("当前已有成语接龙游戏在进行中,请稍后再试!")
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if state == TryStartState.NO_REMAINING_TIMES:
|
||||||
|
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
|
||||||
|
return
|
||||||
|
await evt.send(
|
||||||
|
await UniMessage()
|
||||||
|
.text(
|
||||||
|
"你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!"
|
||||||
|
)
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
instance = IdiomGame.INSTANCE_LIST[group_id]
|
||||||
|
instance.start_game(rounds)
|
||||||
# 发布成语
|
# 发布成语
|
||||||
await evt.send(await UniMessage().text(f"第一个成语:「{idiom}」,请接!").export())
|
await evt.send(
|
||||||
|
await UniMessage()
|
||||||
|
.text(f"第一个成语:「{instance.last_idiom}」,请接!")
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
evt = on_alconna(
|
||||||
|
Alconna(
|
||||||
|
"老子就是要玩成语接龙!!!",
|
||||||
|
Args["rounds?", int],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=True,
|
||||||
|
)
|
||||||
|
|
||||||
evt = on_alconna(Alconna(
|
|
||||||
"不玩了"
|
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
|
||||||
|
|
||||||
@evt.handle()
|
@evt.handle()
|
||||||
async def _(event: BaseEvent):
|
async def force_play_game(
|
||||||
global NOW_PLAYING, SCORE_BOARD, LAST_CHAR
|
event: BaseEvent, target: DepLongTaskTarget, rounds: Optional[int] = 100
|
||||||
if NOW_PLAYING:
|
):
|
||||||
NOW_PLAYING = False
|
await play_game(event, target, force=True, rounds=rounds)
|
||||||
|
|
||||||
|
|
||||||
|
async def end_game(event: BaseEvent, group_id: str):
|
||||||
|
instance = IdiomGame.INSTANCE_LIST[group_id]
|
||||||
|
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
|
||||||
|
score_board = instance.get_score_board()
|
||||||
|
if len(score_board) == 0:
|
||||||
|
result_text += "无人得分!"
|
||||||
|
else:
|
||||||
|
# 按分数排序,名字用 at 的方式
|
||||||
|
sorted_score = sorted(
|
||||||
|
score_board.items(), key=lambda x: x[1]["score"], reverse=True
|
||||||
|
)
|
||||||
|
for i, (user_id, info) in enumerate(sorted_score):
|
||||||
|
result_text += (
|
||||||
|
f"{i + 1}. "
|
||||||
|
+ UniMessage().at(user_id)
|
||||||
|
+ f": {round(info['score'] + instance.get_all_buff_score(), 1)} 分\n"
|
||||||
|
)
|
||||||
|
await evt.send(await result_text.export())
|
||||||
|
instance.clear_score_board()
|
||||||
|
|
||||||
|
|
||||||
|
evt = on_alconna(
|
||||||
|
Alconna("不玩了"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@evt.handle()
|
||||||
|
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
||||||
|
# group_id = str(event.get_session_id())
|
||||||
|
group_id = target.channel_id
|
||||||
|
state = IdiomGame.try_stop_game(group_id)
|
||||||
|
if state == TryStopState.STOPPED:
|
||||||
# 发送好吧狗图片
|
# 发送好吧狗图片
|
||||||
# 打开好吧狗本地文件
|
# 打开好吧狗本地文件
|
||||||
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
|
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
|
||||||
img_data = f.read()
|
img_data = f.read()
|
||||||
await evt.send(await UniMessage().image(raw=img_data).export())
|
await evt.send(await UniMessage().image(raw=img_data).export())
|
||||||
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
|
await end_game(event, group_id)
|
||||||
if len(SCORE_BOARD) == 0:
|
|
||||||
result_text += "无人得分!"
|
|
||||||
else:
|
|
||||||
# 按分数排序,名字用 at 的方式
|
|
||||||
sorted_score = sorted(SCORE_BOARD.items(), key=lambda x: x[1]["score"], reverse=True)
|
|
||||||
for i, (user_id, info) in enumerate(sorted_score):
|
|
||||||
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score']} 分\n"
|
|
||||||
await evt.send(await result_text.export())
|
|
||||||
# 重置分数板
|
|
||||||
SCORE_BOARD = {}
|
|
||||||
LAST_CHAR = ""
|
|
||||||
else:
|
else:
|
||||||
await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export())
|
await evt.send(
|
||||||
|
await UniMessage().text("当前没有成语接龙游戏在进行中!").export()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# 跳过
|
# 跳过
|
||||||
evt = on_alconna(Alconna(
|
evt = on_alconna(
|
||||||
"跳过成语"
|
Alconna("跳过成语"), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
|
)
|
||||||
|
|
||||||
|
|
||||||
@evt.handle()
|
@evt.handle()
|
||||||
async def _(event: BaseEvent):
|
async def _(target: DepLongTaskTarget):
|
||||||
global NOW_PLAYING, LAST_CHAR
|
# group_id = str(event.get_session_id())
|
||||||
if not NOW_PLAYING:
|
group_id = target.channel_id
|
||||||
|
instance = IdiomGame.INSTANCE_LIST.get(group_id)
|
||||||
|
if not instance or not instance.get_playing_state():
|
||||||
return
|
return
|
||||||
await evt.send(await UniMessage().text("你们太菜了!全部扣100分!").export())
|
avaliable_idiom = IdiomGame.random_idiom_starting_with(instance.get_last_char())
|
||||||
for user_id in SCORE_BOARD:
|
await evt.send(await UniMessage().text(f"你们太菜了,全部扣100分!明明还可以接「{avaliable_idiom}」的!").export())
|
||||||
SCORE_BOARD[user_id]["score"] -= 100
|
idiom = await instance.skip_idiom(-100)
|
||||||
# 选择下一个成语
|
await evt.send(
|
||||||
idiom = secrets.choice(ALL_IDIOMS)
|
await UniMessage().text(f"重新开始,下一个成语是「{idiom}」").export()
|
||||||
LAST_CHAR = idiom[-1]
|
)
|
||||||
await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}」").export())
|
|
||||||
|
|
||||||
# 直接读取消息
|
|
||||||
evt = on_message()
|
|
||||||
|
|
||||||
@evt.handle()
|
def get_user_info(event: BaseEvent):
|
||||||
async def _(event: BaseEvent, msg: UniMsg):
|
|
||||||
global NOW_PLAYING, LAST_CHAR, SCORE_BOARD
|
|
||||||
if not NOW_PLAYING:
|
|
||||||
return
|
|
||||||
user_idiom = msg.extract_plain_text().strip()
|
|
||||||
if(user_idiom[0] != LAST_CHAR):
|
|
||||||
return
|
|
||||||
if(user_idiom not in ALL_IDIOMS and user_idiom not in ALL_WORDS):
|
|
||||||
await evt.send(await UniMessage().text("接不上!这个不一样!").export())
|
|
||||||
return
|
|
||||||
# 成功接上
|
|
||||||
if isinstance(event, DiscordMessageEvent):
|
if isinstance(event, DiscordMessageEvent):
|
||||||
user_id = str(event.author.id)
|
user_id = str(event.author.id)
|
||||||
user_name = str(event.author.name)
|
user_name = str(event.author.name)
|
||||||
else:
|
else:
|
||||||
user_id = str(event.get_user_id())
|
user_id = str(event.get_user_id())
|
||||||
user_name = str(event.get_user_id())
|
user_name = str(event.get_user_id())
|
||||||
|
return user_id, user_name
|
||||||
|
|
||||||
if user_id not in SCORE_BOARD:
|
|
||||||
SCORE_BOARD[user_id] = {
|
# 直接读取消息
|
||||||
"name": user_name,
|
evt = on_message()
|
||||||
"score": 0
|
|
||||||
}
|
|
||||||
SCORE_BOARD[user_id]["score"] += 1
|
@evt.handle()
|
||||||
# at 指定玩家
|
async def _(event: BaseEvent, msg: UniMsg, target: DepLongTaskTarget):
|
||||||
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {SCORE_BOARD[user_id]['score']} 分!").export())
|
# group_id = str(event.get_session_id())
|
||||||
LAST_CHAR = user_idiom[-1]
|
group_id = target.channel_id
|
||||||
await evt.send(await UniMessage().text(f"下一个成语请以「{LAST_CHAR}」开头!").export())
|
instance = IdiomGame.INSTANCE_LIST.get(group_id)
|
||||||
|
if not instance or not instance.get_playing_state():
|
||||||
|
return
|
||||||
|
user_idiom = msg.extract_plain_text().strip()
|
||||||
|
user_id, user_name = get_user_info(event)
|
||||||
|
state = await instance.try_verify_idiom(user_idiom, user_id)
|
||||||
|
if state == TryVerifyState.WRONG_FIRST_CHAR:
|
||||||
|
return
|
||||||
|
if state == TryVerifyState.NOT_IDIOM:
|
||||||
|
await evt.send(
|
||||||
|
await UniMessage()
|
||||||
|
.at(user_id)
|
||||||
|
.text(" 接不上!这个不一样!你被扣了 0.1 分!")
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if state == TryVerifyState.VERIFIED:
|
||||||
|
await evt.send(
|
||||||
|
await UniMessage()
|
||||||
|
.at(user_id)
|
||||||
|
.text(f" 接上了,喜提 1 分!你有 {instance.get_user_score(user_id)} 分!")
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
elif state == TryVerifyState.VERIFIED_AND_REAL:
|
||||||
|
await evt.send(
|
||||||
|
await UniMessage()
|
||||||
|
.at(user_id)
|
||||||
|
.text(f" 接上了,这是个真实成语,喜提 5 分!你有 {instance.get_user_score(user_id)} 分!")
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
if state == TryVerifyState.VERIFIED_GAME_END:
|
||||||
|
await evt.send(await UniMessage().text("全部回合结束!").export())
|
||||||
|
await end_game(event, group_id)
|
||||||
|
return
|
||||||
|
if state == TryVerifyState.VERIFIED_BUT_NO_NEXT:
|
||||||
|
await evt.send(
|
||||||
|
await UniMessage()
|
||||||
|
.text("但是,这是条死路!你们全部都要扣 100 分!")
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
await evt.send(
|
||||||
|
await UniMessage().text(f"重新抽取成语「{instance.last_idiom}」").export()
|
||||||
|
)
|
||||||
|
await evt.send(
|
||||||
|
await UniMessage()
|
||||||
|
.text(f"下一个成语请以「{instance.get_last_char()}」开头!")
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
evt = on_alconna(
|
||||||
|
Alconna("禁止成语接龙"),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@evt.handle()
|
||||||
|
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
||||||
|
# group_id = str(event.get_session_id())
|
||||||
|
group_id = target.channel_id
|
||||||
|
add_banned_id(group_id)
|
||||||
|
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
|
||||||
|
|
||||||
|
|
||||||
|
evt = on_alconna(
|
||||||
|
Alconna("开启成语接龙"),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@evt.handle()
|
||||||
|
async def _(event: BaseEvent, target: DepLongTaskTarget):
|
||||||
|
# group_id = str(event.get_session_id())
|
||||||
|
group_id = target.channel_id
|
||||||
|
remove_banned_id(group_id)
|
||||||
|
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())
|
||||||
|
|||||||
@ -1,10 +1,10 @@
|
|||||||
import re
|
import re
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
|
import PIL.Image
|
||||||
from nonebot import on_message
|
from nonebot import on_message
|
||||||
from nonebot.adapters import Bot
|
from nonebot.adapters import Bot
|
||||||
from nonebot_plugin_alconna import (Alconna, Args, Image, Option, UniMessage,
|
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
|
||||||
on_alconna)
|
|
||||||
|
|
||||||
from konabot.common.nb.exc import BotExceptionMessage
|
from konabot.common.nb.exc import BotExceptionMessage
|
||||||
from konabot.common.nb.extract_image import PIL_Image
|
from konabot.common.nb.extract_image import PIL_Image
|
||||||
@ -29,15 +29,17 @@ def parse_timestamp(tx: str) -> float | None:
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
cmd_giftool = on_alconna(Alconna(
|
cmd_giftool = on_alconna(
|
||||||
"giftool",
|
Alconna(
|
||||||
Args["img", Image | None],
|
"giftool",
|
||||||
Option("--ss", Args["start_point", str]),
|
Args["img", Image | None],
|
||||||
Option("--frames:v", Args["frame_count", int]),
|
Option("--ss", Args["start_point", str]),
|
||||||
Option("-t", Args["length", str]),
|
Option("--frames:v", Args["frame_count", int]),
|
||||||
Option("-to", Args["end_point", str]),
|
Option("-t", Args["length", str]),
|
||||||
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
|
Option("-to", Args["end_point", str]),
|
||||||
))
|
Option("--speed", Args["speed_factor", float], default=1.0, alias=["-s"]),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cmd_giftool.handle()
|
@cmd_giftool.handle()
|
||||||
@ -80,81 +82,66 @@ async def _(
|
|||||||
if not getattr(image, "is_animated", False):
|
if not getattr(image, "is_animated", False):
|
||||||
raise BotExceptionMessage("错误:输入的不是动图(GIF)")
|
raise BotExceptionMessage("错误:输入的不是动图(GIF)")
|
||||||
|
|
||||||
frames = []
|
##
|
||||||
durations = []
|
# 从这里开始,采样整个 GIF 图
|
||||||
total_duration = 0.0
|
frames: list[PIL.Image.Image] = []
|
||||||
|
durations: list[float] = []
|
||||||
try:
|
try:
|
||||||
for i in range(getattr(image, "n_frames")):
|
for i in range(getattr(image, "n_frames")):
|
||||||
image.seek(i)
|
image.seek(i)
|
||||||
frames.append(image.copy())
|
frames.append(image.copy())
|
||||||
duration = image.info.get("duration", 100) # 单位:毫秒
|
duration = image.info.get("duration", 100) / 1000
|
||||||
durations.append(duration)
|
durations.append(duration)
|
||||||
total_duration += duration / 1000.0 # 转为秒
|
|
||||||
except EOFError:
|
except EOFError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if not frames:
|
if not frames:
|
||||||
raise BotExceptionMessage("错误:读取 GIF 帧失败")
|
raise BotExceptionMessage("错误:读取 GIF 帧失败")
|
||||||
|
# 采样结束
|
||||||
|
|
||||||
def time_to_frame_index(target_time: float) -> int:
|
##
|
||||||
if target_time <= 0:
|
# 根据开始、结束时间或者帧数量来裁取 GIF 图
|
||||||
return 0
|
|
||||||
cum = 0.0
|
|
||||||
for idx, dur in enumerate(durations):
|
|
||||||
cum += dur / 1000.0
|
|
||||||
if cum >= target_time:
|
|
||||||
return min(idx, len(frames) - 1)
|
|
||||||
return len(frames) - 1
|
|
||||||
start_frame = 0
|
|
||||||
end_frame = len(frames) - 1
|
|
||||||
if ss is not None:
|
|
||||||
start_frame = time_to_frame_index(ss)
|
|
||||||
if to is not None:
|
|
||||||
end_frame = time_to_frame_index(to)
|
|
||||||
if end_frame < start_frame:
|
|
||||||
end_frame = start_frame
|
|
||||||
elif t is not None:
|
|
||||||
end_time = (ss or 0.0) + t
|
|
||||||
end_frame = time_to_frame_index(end_time)
|
|
||||||
if end_frame < start_frame:
|
|
||||||
end_frame = start_frame
|
|
||||||
|
|
||||||
start_frame = max(0, start_frame)
|
begin_time = ss or 0
|
||||||
end_frame = min(len(frames) - 1, end_frame)
|
end_time = sum(durations)
|
||||||
selected_frames = frames[start_frame : end_frame + 1]
|
end_time = min(begin_time + (t or end_time), to or end_time, end_time)
|
||||||
selected_durations = durations[start_frame : end_frame + 1]
|
|
||||||
|
|
||||||
if frame_count is not None and frame_count > 0:
|
accumulated = 0.0
|
||||||
if frame_count >= len(selected_frames):
|
status = 0
|
||||||
pass
|
|
||||||
else:
|
|
||||||
step = len(selected_frames) / frame_count
|
|
||||||
sampled_frames = []
|
|
||||||
sampled_durations = []
|
|
||||||
for i in range(frame_count):
|
|
||||||
idx = int(i * step)
|
|
||||||
sampled_frames.append(selected_frames[idx])
|
|
||||||
sampled_durations.append(
|
|
||||||
sum(selected_durations) // len(selected_durations)
|
|
||||||
)
|
|
||||||
selected_frames = sampled_frames
|
|
||||||
selected_durations = sampled_durations
|
|
||||||
|
|
||||||
output_img = BytesIO()
|
sel_frames: list[PIL.Image.Image] = []
|
||||||
|
sel_durations: list[float] = []
|
||||||
|
|
||||||
adjusted_durations = [
|
for i in range(len(frames)):
|
||||||
dur / speed_factor for dur in selected_durations
|
frame = frames[i]
|
||||||
]
|
duration = durations[i]
|
||||||
|
|
||||||
|
if status == 0:
|
||||||
|
if accumulated + duration > begin_time:
|
||||||
|
status = 1
|
||||||
|
sel_frames.append(frame)
|
||||||
|
sel_durations.append(accumulated + duration - begin_time)
|
||||||
|
elif status == 1:
|
||||||
|
if accumulated + duration > end_time:
|
||||||
|
sel_frames.append(frame)
|
||||||
|
sel_durations.append(end_time - accumulated)
|
||||||
|
break
|
||||||
|
sel_frames.append(frame)
|
||||||
|
sel_durations.append(duration)
|
||||||
|
|
||||||
|
accumulated += duration
|
||||||
|
|
||||||
|
##
|
||||||
|
# 加速!
|
||||||
|
sel_durations = [dur / speed_factor * 1000 for dur in durations]
|
||||||
|
|
||||||
rframes = []
|
rframes = []
|
||||||
rdur = []
|
rdur = []
|
||||||
|
|
||||||
acc_mod_20 = 0
|
acc_mod_20 = 0
|
||||||
|
|
||||||
for i in range(len(selected_frames)):
|
for i in range(len(sel_frames)):
|
||||||
fr = selected_frames[i]
|
fr = sel_frames[i]
|
||||||
du: float = adjusted_durations[i]
|
du = round(sel_durations[i])
|
||||||
|
|
||||||
if du >= 20:
|
if du >= 20:
|
||||||
rframes.append(fr)
|
rframes.append(fr)
|
||||||
@ -170,10 +157,12 @@ async def _(
|
|||||||
if acc_mod_20 >= 20:
|
if acc_mod_20 >= 20:
|
||||||
acc_mod_20 = 0
|
acc_mod_20 = 0
|
||||||
|
|
||||||
if len(rframes) == 1 and len(selected_frames) > 1:
|
if len(rframes) == 1 and len(sel_frames) > 1:
|
||||||
rframes.append(selected_frames[max(2, len(selected_frames) // 2)])
|
rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
|
||||||
rdur.append(20)
|
rdur.append(20)
|
||||||
|
|
||||||
|
##
|
||||||
|
# 收尾:看看透明度这块
|
||||||
transparency_flag = False
|
transparency_flag = False
|
||||||
for f in rframes:
|
for f in rframes:
|
||||||
if f.mode == "RGBA":
|
if f.mode == "RGBA":
|
||||||
@ -186,12 +175,13 @@ async def _(
|
|||||||
|
|
||||||
tf = {}
|
tf = {}
|
||||||
if transparency_flag:
|
if transparency_flag:
|
||||||
tf['transparency'] = 0
|
tf["transparency"] = 0
|
||||||
|
|
||||||
if is_rev:
|
if is_rev:
|
||||||
rframes = rframes[::-1]
|
rframes = rframes[::-1]
|
||||||
rdur = rdur[::-1]
|
rdur = rdur[::-1]
|
||||||
|
|
||||||
|
output_img = BytesIO()
|
||||||
if rframes:
|
if rframes:
|
||||||
rframes[0].save(
|
rframes[0].save(
|
||||||
output_img,
|
output_img,
|
||||||
|
|||||||
47
konabot/plugins/longtask_core.py
Normal file
47
konabot/plugins/longtask_core.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
# import datetime
|
||||||
|
from loguru import logger
|
||||||
|
import nonebot
|
||||||
|
|
||||||
|
# from nonebot_plugin_alconna import UniMessage
|
||||||
|
from konabot.common.longtask import (
|
||||||
|
# DepLongTaskTarget,
|
||||||
|
# LongTask,
|
||||||
|
# create_longtask,
|
||||||
|
# handle_long_task,
|
||||||
|
init_longtask,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
driver = nonebot.get_driver()
|
||||||
|
INIT_FLAG = {"flag": False}
|
||||||
|
|
||||||
|
|
||||||
|
@driver.on_bot_connect
|
||||||
|
async def _():
|
||||||
|
if INIT_FLAG["flag"]:
|
||||||
|
return
|
||||||
|
INIT_FLAG["flag"] = True
|
||||||
|
logger.info("有 Bot 连接,等待 5 秒后初始化 LongTask 模块")
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
await init_longtask()
|
||||||
|
logger.info("LongTask 初始化完成")
|
||||||
|
|
||||||
|
|
||||||
|
# cmd1 = nonebot.on_command("test114")
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# @handle_long_task("test_callback_001")
|
||||||
|
# async def _(lt: LongTask):
|
||||||
|
# await lt.target.send_message(UniMessage().text("Hello, world!"), at=True)
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# @cmd1.handle()
|
||||||
|
# async def _(target: DepLongTaskTarget):
|
||||||
|
# await create_longtask(
|
||||||
|
# handler="test_callback_001",
|
||||||
|
# data={},
|
||||||
|
# target=target,
|
||||||
|
# deadline=datetime.datetime.now() + datetime.timedelta(seconds=20),
|
||||||
|
# )
|
||||||
@ -53,7 +53,7 @@ async def _(
|
|||||||
if doc is None:
|
if doc is None:
|
||||||
# 检索模式
|
# 检索模式
|
||||||
if section is None:
|
if section is None:
|
||||||
section_set = {1}
|
section_set = {1, 7}
|
||||||
else:
|
else:
|
||||||
section_set = {section}
|
section_set = {section}
|
||||||
if 1 in section_set and is_admin(event):
|
if 1 in section_set and is_admin(event):
|
||||||
|
|||||||
@ -2,25 +2,52 @@ from io import BytesIO
|
|||||||
from typing import Iterable, cast
|
from typing import Iterable, cast
|
||||||
|
|
||||||
from nonebot import on_message
|
from nonebot import on_message
|
||||||
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
|
from nonebot_plugin_alconna import (
|
||||||
UniMessage, UniMsg, on_alconna)
|
Alconna,
|
||||||
|
Args,
|
||||||
|
Field,
|
||||||
|
Image,
|
||||||
|
MultiVar,
|
||||||
|
Option,
|
||||||
|
Text,
|
||||||
|
UniMessage,
|
||||||
|
UniMsg,
|
||||||
|
on_alconna,
|
||||||
|
)
|
||||||
|
|
||||||
from konabot.common.nb.extract_image import extract_image_from_message
|
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
|
||||||
from konabot.plugins.memepack.drawing.display import draw_cao_display
|
from konabot.plugins.memepack.drawing.display import (
|
||||||
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
|
draw_cao_display,
|
||||||
draw_geimao, draw_mnk,
|
draw_snaur_display,
|
||||||
draw_pt, draw_suan)
|
draw_anan_display,
|
||||||
|
)
|
||||||
|
from konabot.plugins.memepack.drawing.saying import (
|
||||||
|
draw_cute_ten,
|
||||||
|
draw_geimao,
|
||||||
|
draw_mnk,
|
||||||
|
draw_pt,
|
||||||
|
draw_suan,
|
||||||
|
)
|
||||||
|
|
||||||
from nonebot.adapters import Bot, Event
|
from nonebot.adapters import Bot, Event
|
||||||
|
|
||||||
from returns.result import Success, Failure
|
from returns.result import Success, Failure
|
||||||
|
|
||||||
geimao = on_alconna(Alconna(
|
geimao = on_alconna(
|
||||||
"给猫说",
|
Alconna(
|
||||||
Args["saying", MultiVar(str, '+'), Field(
|
"给猫说",
|
||||||
missing_tips=lambda: "你没有写给猫说了什么"
|
Args[
|
||||||
)]
|
"saying",
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"给猫哈"})
|
MultiVar(str, "+"),
|
||||||
|
Field(missing_tips=lambda: "你没有写给猫说了什么"),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=False,
|
||||||
|
aliases={"给猫哈"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@geimao.handle()
|
@geimao.handle()
|
||||||
async def _(saying: list[str]):
|
async def _(saying: list[str]):
|
||||||
@ -31,12 +58,21 @@ async def _(saying: list[str]):
|
|||||||
await geimao.send(await UniMessage().image(raw=img_bytes).export())
|
await geimao.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
pt = on_alconna(Alconna(
|
pt = on_alconna(
|
||||||
"pt说",
|
Alconna(
|
||||||
Args["saying", MultiVar(str, '+'), Field(
|
"pt说",
|
||||||
missing_tips=lambda: "你没有写小帕说了什么"
|
Args[
|
||||||
)]
|
"saying",
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"小帕说"})
|
MultiVar(str, "+"),
|
||||||
|
Field(missing_tips=lambda: "你没有写小帕说了什么"),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=False,
|
||||||
|
aliases={"小帕说"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pt.handle()
|
@pt.handle()
|
||||||
async def _(saying: list[str]):
|
async def _(saying: list[str]):
|
||||||
@ -47,12 +83,21 @@ async def _(saying: list[str]):
|
|||||||
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
await pt.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
mnk = on_alconna(Alconna(
|
mnk = on_alconna(
|
||||||
"re:小?黑白子?说",
|
Alconna(
|
||||||
Args["saying", MultiVar(str, '+'), Field(
|
"re:小?黑白子?说",
|
||||||
missing_tips=lambda: "你没有写黑白子说了什么"
|
Args[
|
||||||
)]
|
"saying",
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"mnk说"})
|
MultiVar(str, "+"),
|
||||||
|
Field(missing_tips=lambda: "你没有写黑白子说了什么"),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=False,
|
||||||
|
aliases={"mnk说"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mnk.handle()
|
@mnk.handle()
|
||||||
async def _(saying: list[str]):
|
async def _(saying: list[str]):
|
||||||
@ -63,12 +108,21 @@ async def _(saying: list[str]):
|
|||||||
await mnk.send(await UniMessage().image(raw=img_bytes).export())
|
await mnk.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
suan = on_alconna(Alconna(
|
suan = on_alconna(
|
||||||
"小蒜说",
|
Alconna(
|
||||||
Args["saying", MultiVar(str, '+'), Field(
|
"小蒜说",
|
||||||
missing_tips=lambda: "你没有写小蒜说了什么"
|
Args[
|
||||||
)]
|
"saying",
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
|
MultiVar(str, "+"),
|
||||||
|
Field(missing_tips=lambda: "你没有写小蒜说了什么"),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=False,
|
||||||
|
aliases=set(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@suan.handle()
|
@suan.handle()
|
||||||
async def _(saying: list[str]):
|
async def _(saying: list[str]):
|
||||||
@ -79,12 +133,21 @@ async def _(saying: list[str]):
|
|||||||
await suan.send(await UniMessage().image(raw=img_bytes).export())
|
await suan.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
dsuan = on_alconna(Alconna(
|
dsuan = on_alconna(
|
||||||
"大蒜说",
|
Alconna(
|
||||||
Args["saying", MultiVar(str, '+'), Field(
|
"大蒜说",
|
||||||
missing_tips=lambda: "你没有写大蒜说了什么"
|
Args[
|
||||||
)]
|
"saying",
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases=set())
|
MultiVar(str, "+"),
|
||||||
|
Field(missing_tips=lambda: "你没有写大蒜说了什么"),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=False,
|
||||||
|
aliases=set(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dsuan.handle()
|
@dsuan.handle()
|
||||||
async def _(saying: list[str]):
|
async def _(saying: list[str]):
|
||||||
@ -95,12 +158,21 @@ async def _(saying: list[str]):
|
|||||||
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
|
await dsuan.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
|
||||||
|
|
||||||
cutecat = on_alconna(Alconna(
|
cutecat = on_alconna(
|
||||||
"乖猫说",
|
Alconna(
|
||||||
Args["saying", MultiVar(str, '+'), Field(
|
"乖猫说",
|
||||||
missing_tips=lambda: "你没有写十猫说了什么"
|
Args[
|
||||||
)]
|
"saying",
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"十猫说"})
|
MultiVar(str, "+"),
|
||||||
|
Field(missing_tips=lambda: "你没有写十猫说了什么"),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
use_cmd_start=True,
|
||||||
|
use_cmd_sep=False,
|
||||||
|
skip_for_unmatch=False,
|
||||||
|
aliases={"十猫说"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cutecat.handle()
|
@cutecat.handle()
|
||||||
async def _(saying: list[str]):
|
async def _(saying: list[str]):
|
||||||
@ -113,13 +185,14 @@ async def _(saying: list[str]):
|
|||||||
|
|
||||||
cao_display_cmd = on_message()
|
cao_display_cmd = on_message()
|
||||||
|
|
||||||
|
|
||||||
@cao_display_cmd.handle()
|
@cao_display_cmd.handle()
|
||||||
async def _(msg: UniMsg, evt: Event, bot: Bot):
|
async def _(msg: UniMsg, evt: Event, bot: Bot):
|
||||||
flag = False
|
flag = False
|
||||||
for text in cast(Iterable[Text], msg.get(Text)):
|
for text in cast(Iterable[Text], msg.get(Text)):
|
||||||
if text.text.strip() == "小槽展示":
|
if text.text.strip() == "小槽展示":
|
||||||
flag = True
|
flag = True
|
||||||
elif text.text.strip() == '':
|
elif text.text.strip() == "":
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
@ -134,8 +207,71 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
|
|||||||
case Failure(err):
|
case Failure(err):
|
||||||
await cao_display_cmd.send(
|
await cao_display_cmd.send(
|
||||||
await UniMessage()
|
await UniMessage()
|
||||||
.at(user_id=evt.get_user_id())
|
.at(user_id=evt.get_user_id())
|
||||||
.text(' ')
|
.text(" ")
|
||||||
.text(err)
|
.text(err)
|
||||||
.export()
|
.export()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
snaur_display_cmd = on_alconna(
|
||||||
|
Alconna(
|
||||||
|
"卵总展示",
|
||||||
|
Option("--whiteness", Args["whiteness", float], alias=["-w"]),
|
||||||
|
Option("--black-level", Args["black_level", float], alias=["-b"]),
|
||||||
|
Option("--opacity", Args["opacity", float], alias=["-o"]),
|
||||||
|
Option("--saturation", Args["saturation", float], alias=["-s"]),
|
||||||
|
Args["image", Image | None],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@snaur_display_cmd.handle()
|
||||||
|
async def _(
|
||||||
|
img: PIL_Image,
|
||||||
|
whiteness: float = 0.0,
|
||||||
|
black_level: float = 0.2,
|
||||||
|
opacity: float = 0.8,
|
||||||
|
saturation: float = 0.85,
|
||||||
|
):
|
||||||
|
img_processed = await draw_snaur_display(
|
||||||
|
img,
|
||||||
|
whiteness,
|
||||||
|
black_level,
|
||||||
|
opacity,
|
||||||
|
saturation,
|
||||||
|
)
|
||||||
|
img_data = BytesIO()
|
||||||
|
img_processed.save(img_data, "PNG")
|
||||||
|
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())
|
||||||
|
|
||||||
|
anan_display_cmd = on_message()
|
||||||
|
@anan_display_cmd.handle()
|
||||||
|
async def _(msg: UniMsg, evt: Event, bot: Bot):
|
||||||
|
flag = False
|
||||||
|
for text in cast(Iterable[Text], msg.get(Text)):
|
||||||
|
stripped = text.text.strip()
|
||||||
|
if stripped == "安安展示":
|
||||||
|
flag = True
|
||||||
|
elif stripped == "":
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
if not flag:
|
||||||
|
return
|
||||||
|
|
||||||
|
match await extract_image_from_message(evt.get_message(), evt, bot):
|
||||||
|
case Success(img):
|
||||||
|
img_handled = await draw_anan_display(img)
|
||||||
|
img_bytes = BytesIO()
|
||||||
|
img_handled.save(img_bytes, format="PNG")
|
||||||
|
await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
|
||||||
|
case Failure(err):
|
||||||
|
await anan_display_cmd.send(
|
||||||
|
await UniMessage()
|
||||||
|
.at(user_id=evt.get_user_id())
|
||||||
|
.text(" ")
|
||||||
|
.text(err)
|
||||||
|
.export()
|
||||||
|
)
|
||||||
|
|
||||||
|
|||||||
@ -4,10 +4,12 @@ from typing import Any, cast
|
|||||||
import cv2
|
import cv2
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import PIL.Image
|
import PIL.Image
|
||||||
|
import PIL.ImageChops
|
||||||
|
import PIL.ImageEnhance
|
||||||
|
|
||||||
from konabot.common.path import ASSETS_PATH
|
from konabot.common.path import ASSETS_PATH
|
||||||
|
|
||||||
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
|
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
|
||||||
CAO_QUAD_POINTS = np.float32(cast(Any, [
|
CAO_QUAD_POINTS = np.float32(cast(Any, [
|
||||||
[392, 540],
|
[392, 540],
|
||||||
[577, 557],
|
[577, 557],
|
||||||
@ -15,6 +17,25 @@ CAO_QUAD_POINTS = np.float32(cast(Any, [
|
|||||||
[381, 687],
|
[381, 687],
|
||||||
]))
|
]))
|
||||||
|
|
||||||
|
snaur_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_base.png")
|
||||||
|
snaur_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_top.png")
|
||||||
|
SNAUR_RATIO = (1 / 2) ** .5
|
||||||
|
SNAUR_QUAD_POINTS = np.float32(cast(Any, [
|
||||||
|
[0, 466 ],
|
||||||
|
[673, 471 ],
|
||||||
|
[640, 1196],
|
||||||
|
[106, 1280],
|
||||||
|
]))
|
||||||
|
|
||||||
|
anan_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_base.png")
|
||||||
|
anan_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "anan_top.png")
|
||||||
|
ANAN_QUAD_POINTS = np.float32([
|
||||||
|
[157, 585],
|
||||||
|
[793, 599],
|
||||||
|
[781, 908],
|
||||||
|
[160, 908]
|
||||||
|
])
|
||||||
|
|
||||||
def _draw_cao_display(image: PIL.Image.Image):
|
def _draw_cao_display(image: PIL.Image.Image):
|
||||||
src = np.array(image.convert("RGB"))
|
src = np.array(image.convert("RGB"))
|
||||||
h, w = src.shape[:2]
|
h, w = src.shape[:2]
|
||||||
@ -43,3 +64,136 @@ def _draw_cao_display(image: PIL.Image.Image):
|
|||||||
|
|
||||||
async def draw_cao_display(image: PIL.Image.Image):
|
async def draw_cao_display(image: PIL.Image.Image):
|
||||||
return await asyncio.to_thread(_draw_cao_display, image)
|
return await asyncio.to_thread(_draw_cao_display, image)
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_snaur_display(
|
||||||
|
image : PIL.Image.Image,
|
||||||
|
whiteness : float = 0.0 ,
|
||||||
|
black_level: float = 0.2 ,
|
||||||
|
opacity : float = 0.8 ,
|
||||||
|
saturation : float = 0.85 ,
|
||||||
|
):
|
||||||
|
src = np.array(image.convert("RGBA"))
|
||||||
|
_h, _w = src.shape[:2]
|
||||||
|
|
||||||
|
if _w / _h < SNAUR_RATIO:
|
||||||
|
_w_target = _w
|
||||||
|
_h_target = int(_w / SNAUR_RATIO)
|
||||||
|
else:
|
||||||
|
_w_target = int(_h * SNAUR_RATIO)
|
||||||
|
_h_target = _h
|
||||||
|
|
||||||
|
x_center = _w / 2
|
||||||
|
y_center = _h / 2
|
||||||
|
|
||||||
|
x1 = int(x_center - _w_target / 2)
|
||||||
|
x2 = int(x_center + _w_target / 2)
|
||||||
|
y1 = int(y_center - _h_target / 2)
|
||||||
|
y2 = int(y_center + _h_target / 2)
|
||||||
|
|
||||||
|
src = src[y1:y2, x1:x2, :]
|
||||||
|
|
||||||
|
h, w = src.shape[:2]
|
||||||
|
src_points = np.float32(cast(Any, [
|
||||||
|
[0, 0],
|
||||||
|
[w, 0],
|
||||||
|
[w, h],
|
||||||
|
[0, h],
|
||||||
|
]))
|
||||||
|
dst_points = SNAUR_QUAD_POINTS
|
||||||
|
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
|
||||||
|
output_size = snaur_image_top.size
|
||||||
|
output_w, output_h = output_size
|
||||||
|
warped = cv2.warpPerspective(
|
||||||
|
src,
|
||||||
|
M,
|
||||||
|
(output_w, output_h),
|
||||||
|
flags=cv2.INTER_LINEAR,
|
||||||
|
borderMode=cv2.BORDER_CONSTANT,
|
||||||
|
borderValue=(0, 0, 0)
|
||||||
|
)
|
||||||
|
|
||||||
|
result = PIL.Image.fromarray(warped, 'RGBA')
|
||||||
|
|
||||||
|
r, g, b, a = result.split()
|
||||||
|
a = a.point(lambda p: int(p * opacity))
|
||||||
|
f2 = lambda p: int(
|
||||||
|
((p / 255) ** (2 ** whiteness)) * 255 * (1 - black_level)
|
||||||
|
+ 255 * black_level
|
||||||
|
)
|
||||||
|
r = r.point(f2)
|
||||||
|
g = g.point(f2)
|
||||||
|
b = b.point(f2)
|
||||||
|
result = PIL.Image.merge('RGBA', (r, g, b, a))
|
||||||
|
|
||||||
|
enhancer = PIL.ImageEnhance.Color(result)
|
||||||
|
result = enhancer.enhance(saturation)
|
||||||
|
|
||||||
|
result = PIL.ImageChops.multiply(result, snaur_image_base)
|
||||||
|
|
||||||
|
result = PIL.Image.alpha_composite(snaur_image_base, result)
|
||||||
|
result = PIL.Image.alpha_composite(result, snaur_image_top)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_snaur_display(
|
||||||
|
image : PIL.Image.Image,
|
||||||
|
whiteness : float = 0.0 ,
|
||||||
|
black_level: float = 0.2 ,
|
||||||
|
opacity : float = 0.8 ,
|
||||||
|
saturation : float = 0.85 ,
|
||||||
|
) -> PIL.Image.Image:
|
||||||
|
return await asyncio.to_thread(
|
||||||
|
_draw_snaur_display, image, whiteness, black_level,
|
||||||
|
opacity, saturation,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
|
||||||
|
src = np.array(image.convert("RGBA"))
|
||||||
|
h, w = src.shape[:2]
|
||||||
|
|
||||||
|
src_points = np.float32([
|
||||||
|
[0, 0],
|
||||||
|
[w, 0],
|
||||||
|
[w, h],
|
||||||
|
[0, h]
|
||||||
|
])
|
||||||
|
dst_points = ANAN_QUAD_POINTS
|
||||||
|
|
||||||
|
M = cv2.getPerspectiveTransform(src_points, dst_points)
|
||||||
|
output_w, output_h = anan_image_top.size
|
||||||
|
|
||||||
|
src_rgb = cv2.cvtColor(src, cv2.COLOR_RGBA2RGB) if src.shape[2] == 4 else src
|
||||||
|
warped_rgb = cv2.warpPerspective(
|
||||||
|
src_rgb,
|
||||||
|
M,
|
||||||
|
(output_w, output_h),
|
||||||
|
flags=cv2.INTER_LINEAR,
|
||||||
|
borderMode=cv2.BORDER_CONSTANT,
|
||||||
|
borderValue=(0, 0, 0)
|
||||||
|
)
|
||||||
|
|
||||||
|
mask = np.zeros((h, w), dtype=np.uint8)
|
||||||
|
mask[:, :] = 255
|
||||||
|
warped_mask = cv2.warpPerspective(
|
||||||
|
mask,
|
||||||
|
M,
|
||||||
|
(output_w, output_h),
|
||||||
|
flags=cv2.INTER_LINEAR,
|
||||||
|
borderMode=cv2.BORDER_CONSTANT,
|
||||||
|
borderValue=0
|
||||||
|
)
|
||||||
|
|
||||||
|
warped_rgba = cv2.cvtColor(warped_rgb, cv2.COLOR_RGB2RGBA)
|
||||||
|
warped_rgba[:, :, 3] = warped_mask
|
||||||
|
|
||||||
|
warped_pil = PIL.Image.fromarray(warped_rgba, 'RGBA')
|
||||||
|
|
||||||
|
result = PIL.Image.alpha_composite(anan_image_base, warped_pil)
|
||||||
|
result = PIL.Image.alpha_composite(result, anan_image_top)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def draw_anan_display(image: PIL.Image.Image) -> PIL.Image.Image:
|
||||||
|
return await asyncio.to_thread(_draw_anan_display, image)
|
||||||
44
konabot/plugins/no_luowen.py
Normal file
44
konabot/plugins/no_luowen.py
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
import nonebot
|
||||||
|
|
||||||
|
from nonebot.adapters.onebot.v11.bot import Bot
|
||||||
|
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
|
||||||
|
from nonebot_plugin_alconna import UniMsg, UniMessage
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
|
class NoLuowenConfig(BaseModel):
|
||||||
|
plugin_noluowen_qqid: int = -1
|
||||||
|
plugin_noluowen_enable_group: list[int] = []
|
||||||
|
|
||||||
|
config = nonebot.get_plugin_config(NoLuowenConfig)
|
||||||
|
|
||||||
|
|
||||||
|
async def is_luowen_mentioned(evt: GroupMessageEvent, msg: UniMsg) -> bool:
|
||||||
|
if config.plugin_noluowen_qqid <= 0:
|
||||||
|
return False
|
||||||
|
if evt.user_id == config.plugin_noluowen_qqid:
|
||||||
|
return False
|
||||||
|
if evt.group_id not in config.plugin_noluowen_enable_group:
|
||||||
|
return False
|
||||||
|
txt = msg.extract_plain_text()
|
||||||
|
if "洛温" not in txt:
|
||||||
|
return False
|
||||||
|
if "罗文" in txt:
|
||||||
|
return False
|
||||||
|
if "阿特金森" in txt:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
evt_luowen_mentioned = nonebot.on_message(rule=is_luowen_mentioned)
|
||||||
|
|
||||||
|
|
||||||
|
@evt_luowen_mentioned.handle()
|
||||||
|
async def _(evt: GroupMessageEvent, bot: Bot):
|
||||||
|
msg = (
|
||||||
|
UniMessage()
|
||||||
|
.reply(str(evt.message_id))
|
||||||
|
.at(str(config.plugin_noluowen_qqid))
|
||||||
|
.text(" 好像有人念错了你的 ID")
|
||||||
|
)
|
||||||
|
await evt_luowen_mentioned.send(await msg.export(bot=bot))
|
||||||
|
|
||||||
@ -1,8 +1,10 @@
|
|||||||
import json, time
|
import json, time
|
||||||
|
|
||||||
|
from nonebot.rule import Rule
|
||||||
from nonebot_plugin_alconna import Alconna, Args, Field, MultiVar, on_alconna
|
from nonebot_plugin_alconna import Alconna, Args, Field, MultiVar, on_alconna
|
||||||
from nonebot.adapters.onebot.v11 import Event
|
from nonebot.adapters.onebot.v11 import Event
|
||||||
|
|
||||||
|
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
|
||||||
from konabot.common.path import ASSETS_PATH, DATA_PATH
|
from konabot.common.path import ASSETS_PATH, DATA_PATH
|
||||||
|
|
||||||
|
|
||||||
@ -66,7 +68,7 @@ poll = on_alconna(Alconna(
|
|||||||
Args["saying", MultiVar(str, '+'), Field(
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
missing_tips=lambda: "参数错误。用法:发起投票 <投票标题> <选项1> <选项2> ..."
|
missing_tips=lambda: "参数错误。用法:发起投票 <投票标题> <选项1> <选项2> ..."
|
||||||
)],
|
)],
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"})
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"}, rule=no_wzqbot_rule)
|
||||||
@poll.handle()
|
@poll.handle()
|
||||||
async def _(saying: list, event: Event):
|
async def _(saying: list, event: Event):
|
||||||
if (len(saying) < 3):
|
if (len(saying) < 3):
|
||||||
@ -88,7 +90,7 @@ viewpoll = on_alconna(Alconna(
|
|||||||
Args["saying", MultiVar(str, '+'), Field(
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
missing_tips=lambda: "请指定投票ID或标题!。用法:查看投票 <投票ID或标题>"
|
missing_tips=lambda: "请指定投票ID或标题!。用法:查看投票 <投票ID或标题>"
|
||||||
)],
|
)],
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"})
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"}, rule=no_wzqbot_rule)
|
||||||
@viewpoll.handle()
|
@viewpoll.handle()
|
||||||
async def _(saying: list):
|
async def _(saying: list):
|
||||||
# 参数,投票ID或者标题
|
# 参数,投票ID或者标题
|
||||||
@ -130,7 +132,7 @@ vote = on_alconna(Alconna(
|
|||||||
Args["saying", MultiVar(str, '+'), Field(
|
Args["saying", MultiVar(str, '+'), Field(
|
||||||
missing_tips=lambda: "参数错误。用法:投票 <投票ID/标题> <选项文本>"
|
missing_tips=lambda: "参数错误。用法:投票 <投票ID/标题> <选项文本>"
|
||||||
)],
|
)],
|
||||||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"})
|
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"}, rule=no_wzqbot_rule)
|
||||||
@vote.handle()
|
@vote.handle()
|
||||||
async def _(saying: list, event: Event):
|
async def _(saying: list, event: Event):
|
||||||
if (len(saying) < 2):
|
if (len(saying) < 2):
|
||||||
|
|||||||
@ -1,10 +1,8 @@
|
|||||||
import asyncio as asynkio
|
import asyncio as asynkio
|
||||||
import datetime
|
import datetime
|
||||||
import functools
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Literal, cast
|
from typing import Any, Literal, cast
|
||||||
|
|
||||||
import signal
|
|
||||||
import nonebot
|
import nonebot
|
||||||
import ptimeparse
|
import ptimeparse
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
@ -15,10 +13,10 @@ from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
|||||||
from nonebot.adapters.discord import Bot as DiscordBot
|
from nonebot.adapters.discord import Bot as DiscordBot
|
||||||
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||||||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||||
from nonebot.adapters.onebot.v11.event import \
|
from nonebot.adapters.onebot.v11.event import (
|
||||||
GroupMessageEvent as OnebotV11GroupMessageEvent
|
GroupMessageEvent as OnebotV11GroupMessageEvent,
|
||||||
from nonebot.adapters.onebot.v11.event import \
|
)
|
||||||
MessageEvent as OnebotV11MessageEvent
|
from nonebot.adapters.onebot.v11.event import MessageEvent as OnebotV11MessageEvent
|
||||||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
@ -68,14 +66,14 @@ def save_notify_config(config: NotifyConfigFile):
|
|||||||
|
|
||||||
|
|
||||||
async def notify_now(notify: Notify):
|
async def notify_now(notify: Notify):
|
||||||
if notify.platform == 'console':
|
if notify.platform == "console":
|
||||||
bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)]
|
bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)]
|
||||||
if len(bot) != 1:
|
if len(bot) != 1:
|
||||||
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
||||||
return False
|
return False
|
||||||
bot = bot[0]
|
bot = bot[0]
|
||||||
await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}")
|
await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}")
|
||||||
elif notify.platform == 'discord':
|
elif notify.platform == "discord":
|
||||||
bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)]
|
bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)]
|
||||||
if len(bot) != 1:
|
if len(bot) != 1:
|
||||||
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
||||||
@ -83,7 +81,7 @@ async def notify_now(notify: Notify):
|
|||||||
bot = bot[0]
|
bot = bot[0]
|
||||||
channel = await bot.create_DM(recipient_id=int(notify.target))
|
channel = await bot.create_DM(recipient_id=int(notify.target))
|
||||||
await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}")
|
await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}")
|
||||||
elif notify.platform == 'qq':
|
elif notify.platform == "qq":
|
||||||
bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)]
|
bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)]
|
||||||
if len(bot) != 1:
|
if len(bot) != 1:
|
||||||
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
||||||
@ -92,17 +90,22 @@ async def notify_now(notify: Notify):
|
|||||||
if notify.target_env is None:
|
if notify.target_env is None:
|
||||||
await bot.send_private_msg(
|
await bot.send_private_msg(
|
||||||
user_id=int(notify.target),
|
user_id=int(notify.target),
|
||||||
message=cast(Any, await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
|
message=cast(
|
||||||
bot=bot,
|
Any,
|
||||||
)),
|
await UniMessage.text(f"代办通知:{notify.notify_msg}").export(
|
||||||
|
bot=bot,
|
||||||
|
),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await bot.send_group_msg(
|
await bot.send_group_msg(
|
||||||
group_id=int(notify.target_env),
|
group_id=int(notify.target_env),
|
||||||
message=cast(Any,
|
message=cast(
|
||||||
await UniMessage().at(
|
Any,
|
||||||
notify.target
|
await UniMessage()
|
||||||
).text(f" 代办通知:{notify.notify_msg}").export(bot=bot)
|
.at(notify.target)
|
||||||
|
.text(f" 代办通知:{notify.notify_msg}")
|
||||||
|
.export(bot=bot),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
@ -118,7 +121,9 @@ def create_notify_task(notify: Notify, fail2remove: bool = True):
|
|||||||
try:
|
try:
|
||||||
await asynkio.sleep((notify.notify_time - begin_time).total_seconds())
|
await asynkio.sleep((notify.notify_time - begin_time).total_seconds())
|
||||||
except asynkio.CancelledError:
|
except asynkio.CancelledError:
|
||||||
logger.debug("代办提醒被信号中止,任务退出")
|
logger.debug(
|
||||||
|
f"代办提醒被信号中止,任务退出 NOTIFY={notify.notify_msg} TIME={notify.notify_time}"
|
||||||
|
)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
@ -127,15 +132,17 @@ def create_notify_task(notify: Notify, fail2remove: bool = True):
|
|||||||
)
|
)
|
||||||
res = await notify_now(notify)
|
res = await notify_now(notify)
|
||||||
if fail2remove or res:
|
if fail2remove or res:
|
||||||
await DATA_FILE_LOCK.acquire()
|
async with DATA_FILE_LOCK:
|
||||||
cfg = load_notify_config()
|
cfg = load_notify_config()
|
||||||
cfg.notifies = [n for n in cfg.notifies if n.get_str() != notify.get_str()]
|
cfg.notifies = [
|
||||||
if not res:
|
n for n in cfg.notifies if n.get_str() != notify.get_str()
|
||||||
cfg.unsent.append(notify)
|
]
|
||||||
save_notify_config(cfg)
|
if not res:
|
||||||
DATA_FILE_LOCK.release()
|
cfg.unsent.append(notify)
|
||||||
|
save_notify_config(cfg)
|
||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return asynkio.create_task(mission())
|
return asynkio.create_task(mission())
|
||||||
|
|
||||||
|
|
||||||
@ -155,7 +162,8 @@ async def _(msg: UniMsg, mEvt: Event):
|
|||||||
notify_time, notify_text = segments
|
notify_time, notify_text = segments
|
||||||
# target_time = get_target_time(notify_time)
|
# target_time = get_target_time(notify_time)
|
||||||
try:
|
try:
|
||||||
target_time = ptimeparse.parse(notify_time)
|
# target_time = ptimeparse.parse(notify_time)
|
||||||
|
target_time = ptimeparse.Parser().parse(notify_time)
|
||||||
logger.info(f"从 {notify_time} 解析出了时间:{target_time}")
|
logger.info(f"从 {notify_time} 解析出了时间:{target_time}")
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.info(f"无法从 {notify_time} 中解析出时间")
|
logger.info(f"无法从 {notify_time} 中解析出时间")
|
||||||
@ -201,8 +209,12 @@ async def _(msg: UniMsg, mEvt: Event):
|
|||||||
save_notify_config(cfg)
|
save_notify_config(cfg)
|
||||||
DATA_FILE_LOCK.release()
|
DATA_FILE_LOCK.release()
|
||||||
|
|
||||||
await evt.send(await UniMessage().at(mEvt.get_user_id()).text(
|
await evt.send(
|
||||||
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export())
|
await UniMessage()
|
||||||
|
.at(mEvt.get_user_id())
|
||||||
|
.text(f" 了解啦!将会在 {notify.notify_time} 提醒你哦~")
|
||||||
|
.export()
|
||||||
|
)
|
||||||
logger.info(f"创建了一条于 {notify.notify_time} 的代办提醒")
|
logger.info(f"创建了一条于 {notify.notify_time} 的代办提醒")
|
||||||
|
|
||||||
|
|
||||||
@ -242,19 +254,3 @@ async def _():
|
|||||||
save_notify_config(cfg)
|
save_notify_config(cfg)
|
||||||
DATA_FILE_LOCK.release()
|
DATA_FILE_LOCK.release()
|
||||||
|
|
||||||
loop = asynkio.get_running_loop()
|
|
||||||
|
|
||||||
# 解决 asynk task 没有被 cancel 的问题
|
|
||||||
async def shutdown(sig: signal.Signals):
|
|
||||||
logger.info(f"收到 {sig.name} 指令,正在关闭所有的东西")
|
|
||||||
for task in ASYNK_TASKS:
|
|
||||||
task.cancel()
|
|
||||||
await asynkio.gather(*ASYNK_TASKS, return_exceptions=True)
|
|
||||||
logger.info("所有的代办提醒 Task 都已经退出了")
|
|
||||||
|
|
||||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
||||||
loop.add_signal_handler(sig, functools.partial(
|
|
||||||
asynkio.create_task, shutdown(sig)
|
|
||||||
))
|
|
||||||
|
|
||||||
await asynkio.gather(*ASYNK_TASKS)
|
|
||||||
|
|||||||
492
poetry.lock
generated
492
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -2,30 +2,28 @@
|
|||||||
name = "konabot"
|
name = "konabot"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
description = "在 MTTU 内部使用的 bot"
|
description = "在 MTTU 内部使用的 bot"
|
||||||
authors = [
|
authors = [{ name = "passthem", email = "Passthem183@gmail.com" }]
|
||||||
{name = "passthem",email = "Passthem183@gmail.com"}
|
|
||||||
]
|
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.12,<4.0"
|
requires-python = ">=3.12,<4.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"nonebot2[all] (>=2.4.3,<3.0.0)",
|
"nonebot2[all] (>=2.4.3,<3.0.0)",
|
||||||
"nonebot-adapter-onebot (>=2.4.6,<3.0.0)",
|
"nonebot-adapter-onebot (>=2.4.6,<3.0.0)",
|
||||||
"nonebot-adapter-console (>=0.9.0,<0.10.0)",
|
"nonebot-adapter-console (>=0.9.0,<0.10.0)",
|
||||||
"nonebot-adapter-discord (>=0.1.8,<0.2.0)",
|
"nonebot-adapter-discord (>=0.1.8,<0.2.0)",
|
||||||
"nonebot-adapter-minecraft (>=1.5.2,<2.0.0)",
|
"nonebot-adapter-minecraft (>=1.5.2,<2.0.0)",
|
||||||
"nonebot-plugin-alconna (>=0.59.4,<0.60.0)",
|
"nonebot-plugin-alconna (>=0.59.4,<0.60.0)",
|
||||||
"nonebot-plugin-apscheduler (>=0.5.0,<0.6.0)",
|
"nonebot-plugin-apscheduler (>=0.5.0,<0.6.0)",
|
||||||
"requests (>=2.32.5,<3.0.0)",
|
"requests (>=2.32.5,<3.0.0)",
|
||||||
"beautifulsoup4 (>=4.13.5,<5.0.0)",
|
"beautifulsoup4 (>=4.13.5,<5.0.0)",
|
||||||
"lxml (>=6.0.2,<7.0.0)",
|
"lxml (>=6.0.2,<7.0.0)",
|
||||||
"pillow (>=11.3.0,<12.0.0)",
|
"pillow (>=11.3.0,<12.0.0)",
|
||||||
"imagetext-py (>=2.2.0,<3.0.0)",
|
"imagetext-py (>=2.2.0,<3.0.0)",
|
||||||
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
|
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
|
||||||
"returns (>=0.26.0,<0.27.0)",
|
"returns (>=0.26.0,<0.27.0)",
|
||||||
"ptimeparse (>=0.1.1,<0.2.0)",
|
"skia-python (>=138.0,<139.0)",
|
||||||
"skia-python (>=138.0,<139.0)",
|
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
|
||||||
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
|
"qrcode (>=8.2,<9.0)",
|
||||||
"qrcode (>=8.2,<9.0)",
|
"ptimeparse (>=0.2.1,<0.3.0)",
|
||||||
]
|
]
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
@ -37,5 +35,9 @@ name = "pt-gitea-pypi"
|
|||||||
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
|
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
|
||||||
priority = "supplemental"
|
priority = "supplemental"
|
||||||
|
|
||||||
|
[[tool.poetry.source]]
|
||||||
|
name = "mirrors"
|
||||||
|
url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
|
||||||
|
priority = "primary"
|
||||||
|
|
||||||
[tool.poetry.dependencies]
|
[tool.poetry.dependencies]
|
||||||
ptimeparse = {source = "pt-gitea-pypi"}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user