Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| deab2d7b2b | |||
| 2a6abbe0d4 | |||
| 30bdc50024 | |||
| be8b1b9999 | |||
| 43d0a09de2 | |||
| 6e0082c1c9 | |||
| 3b8b060c5b | |||
| 8cfe58c7dd | |||
| f997bf945a | |||
| 0dbe164703 | |||
| 818f2b64ec | |||
| a855c69f61 | |||
| 90ee296f55 | |||
| 915f186955 | |||
| a279e9b510 |
@ -1,4 +1,5 @@
|
||||
/.env
|
||||
/.git
|
||||
/data
|
||||
|
||||
__pycache__
|
||||
@ -26,6 +26,14 @@ steps:
|
||||
volumes:
|
||||
- name: docker-socket
|
||||
path: /var/run/docker.sock
|
||||
- name: 在容器中测试插件加载
|
||||
image: docker:dind
|
||||
privileged: true
|
||||
volumes:
|
||||
- name: docker-socket
|
||||
path: /var/run/docker.sock
|
||||
commands:
|
||||
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
|
||||
|
||||
volumes:
|
||||
- name: docker-socket
|
||||
|
||||
4
.env.test
Normal file
@ -0,0 +1,4 @@
|
||||
ENVIRONMENT=test
|
||||
ENABLE_CONSOLE=false
|
||||
ENABLE_QQ=false
|
||||
ENABLE_DISCORD=false
|
||||
1
.gitignore
vendored
@ -1,3 +1,4 @@
|
||||
/.env
|
||||
/data
|
||||
|
||||
__pycache__
|
||||
@ -4,5 +4,11 @@ WORKDIR /app
|
||||
COPY requirements.txt ./
|
||||
RUN pip install -r requirements.txt --no-deps
|
||||
|
||||
COPY . .
|
||||
COPY bot.py pyproject.toml ./
|
||||
COPY assets ./assets
|
||||
COPY scripts ./scripts
|
||||
COPY konabot ./konabot
|
||||
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
CMD [ "python", "bot.py" ]
|
||||
|
||||
|
Before Width: | Height: | Size: 9.2 KiB After Width: | Height: | Size: 9.2 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 8.7 KiB After Width: | Height: | Size: 8.7 KiB |
|
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 11 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 8.7 KiB After Width: | Height: | Size: 8.7 KiB |
|
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 11 KiB |
|
Before Width: | Height: | Size: 9.3 KiB After Width: | Height: | Size: 9.3 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 10 KiB |
|
Before Width: | Height: | Size: 8.7 KiB After Width: | Height: | Size: 8.7 KiB |
|
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 11 KiB |
|
Before Width: | Height: | Size: 9.3 KiB After Width: | Height: | Size: 9.3 KiB |
|
Before Width: | Height: | Size: 9.0 KiB After Width: | Height: | Size: 9.0 KiB |
|
Before Width: | Height: | Size: 219 KiB After Width: | Height: | Size: 219 KiB |
|
Before Width: | Height: | Size: 272 KiB After Width: | Height: | Size: 272 KiB |
0
konabot/__init__.py
Normal file
4
konabot/common/path.py
Normal file
@ -0,0 +1,4 @@
|
||||
from pathlib import Path
|
||||
|
||||
ASSETS_PATH = Path(__file__).resolve().parent.parent.parent / "assets"
|
||||
FONTS_PATH = ASSETS_PATH / "fonts"
|
||||
@ -1,8 +1,8 @@
|
||||
from imagetext_py import EmojiOptions, FontDB
|
||||
|
||||
from .path import ASSETS
|
||||
from konabot.common.path import FONTS_PATH
|
||||
|
||||
FontDB.LoadFromDir(str(ASSETS))
|
||||
FontDB.LoadFromDir(str(FONTS_PATH))
|
||||
|
||||
FontDB.SetDefaultEmojiOptions(EmojiOptions(
|
||||
parse_shortcodes=False,
|
||||
|
||||
@ -1,3 +0,0 @@
|
||||
from pathlib import Path
|
||||
|
||||
ASSETS = Path(__file__).parent.parent.parent / "assets"
|
||||
@ -4,10 +4,11 @@ from typing import Any, cast
|
||||
import imagetext_py
|
||||
import PIL.Image
|
||||
|
||||
from .base.fonts import HARMONYOS_SANS_SC_BLACK
|
||||
from .base.path import ASSETS
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
|
||||
geimao_image = PIL.Image.open(ASSETS / "geimao.jpg").convert("RGBA")
|
||||
from .base.fonts import HARMONYOS_SANS_SC_BLACK
|
||||
|
||||
geimao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "geimao.jpg").convert("RGBA")
|
||||
|
||||
|
||||
def _draw_geimao(saying: str):
|
||||
|
||||
@ -3,10 +3,11 @@ import asyncio
|
||||
import imagetext_py
|
||||
import PIL.Image
|
||||
|
||||
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
|
||||
from .base.path import ASSETS
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
|
||||
pt_image = PIL.Image.open(ASSETS / "ptsay.png").convert("RGBA")
|
||||
from .base.fonts import HARMONYOS_SANS_SC_REGULAR
|
||||
|
||||
pt_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "ptsay.png").convert("RGBA")
|
||||
|
||||
|
||||
def _draw_pt(saying: str):
|
||||
|
||||
@ -1,9 +1,11 @@
|
||||
from io import BytesIO
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
from konabot.plugins.roll_dice.base.path import ASSETS
|
||||
from konabot.common.path import ASSETS_PATH, FONTS_PATH
|
||||
|
||||
|
||||
def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0)):
|
||||
"""
|
||||
@ -13,15 +15,7 @@ def text_to_transparent_image(text, font_size=40, padding=0, text_color=(0, 0, 0
|
||||
temp_image = Image.new('RGB', (1, 1), (255, 255, 255))
|
||||
temp_draw = ImageDraw.Draw(temp_image)
|
||||
|
||||
font = ImageFont.truetype(ASSETS / "montserrat.otf", font_size)
|
||||
# try:
|
||||
# font = ImageFont.truetype(ASSETS / "montserrat.otf", font_size)
|
||||
# except:
|
||||
# try:
|
||||
# font = ImageFont.truetype("arial.ttf", font_size)
|
||||
# except:
|
||||
# # 如果系统字体不可用,使用默认字体
|
||||
# font = ImageFont.load_default()
|
||||
font = ImageFont.truetype(FONTS_PATH / "montserrat.otf", font_size)
|
||||
|
||||
# 获取文本边界框
|
||||
bbox = temp_draw.textbbox((0, 0), text, font=font)
|
||||
@ -177,7 +171,7 @@ async def generate_dice_image(number: int) -> BytesIO:
|
||||
], dtype=np.float32)
|
||||
|
||||
# 加载背景图像,保留透明通道
|
||||
background = cv2.imread(str(ASSETS / "template.png"), cv2.IMREAD_UNCHANGED)
|
||||
background = cv2.imread(str(ASSETS_PATH / "img" / "dice" / "template.png"), cv2.IMREAD_UNCHANGED)
|
||||
|
||||
|
||||
# 对文本图像进行3D变换(保持透明通道)
|
||||
@ -189,7 +183,7 @@ async def generate_dice_image(number: int) -> BytesIO:
|
||||
|
||||
pil_final = Image.fromarray(final_image_simple)
|
||||
# 导入一系列图像
|
||||
images: list[Image.Image] = [Image.open(ASSETS / f"{i}.png") for i in range(1, 12)]
|
||||
images: list[Image.Image] = [Image.open(ASSETS_PATH / "img" / "dice" / f"{i}.png") for i in range(1, 12)]
|
||||
images.append(pil_final)
|
||||
frame_durations = [100] * (len(images) - 1) + [100000]
|
||||
# 保存为BytesIO对象
|
||||
|
||||
198
konabot/plugins/simple_notify/__init__.py
Normal file
@ -0,0 +1,198 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal, cast
|
||||
|
||||
import nonebot
|
||||
from loguru import logger
|
||||
from nonebot import on_message
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.adapters.console import Bot as ConsoleBot
|
||||
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
|
||||
from nonebot.adapters.discord import Bot as DiscordBot
|
||||
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot
|
||||
from nonebot.adapters.onebot.v11.event import \
|
||||
GroupMessageEvent as OnebotV11GroupMessageEvent
|
||||
from nonebot.adapters.onebot.v11.event import \
|
||||
MessageEvent as OnebotV11MessageEvent
|
||||
from nonebot_plugin_alconna import UniMessage, UniMsg
|
||||
from pydantic import BaseModel
|
||||
|
||||
from konabot.plugins.simple_notify.parse_time import get_target_time
|
||||
|
||||
evt = on_message()
|
||||
|
||||
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
|
||||
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json"
|
||||
DATA_FILE_LOCK = asyncio.Lock()
|
||||
|
||||
|
||||
class Notify(BaseModel):
|
||||
platform: Literal["console", "qq", "discord"]
|
||||
|
||||
target: str
|
||||
"需要接受通知的个体"
|
||||
|
||||
target_env: str | None
|
||||
"在哪里进行通知,如果是 None 代表私聊通知"
|
||||
|
||||
notify_time: datetime.datetime
|
||||
notify_msg: str
|
||||
|
||||
def get_str(self):
|
||||
return f"{self.target}-{self.target_env}-{self.platform}-{self.notify_time}"
|
||||
|
||||
|
||||
class NotifyConfigFile(BaseModel):
|
||||
version: int = 1
|
||||
notifies: list[Notify] = []
|
||||
unsent: list[Notify] = []
|
||||
|
||||
|
||||
def load_notify_config() -> NotifyConfigFile:
|
||||
if not DATA_FILE_PATH.exists():
|
||||
return NotifyConfigFile()
|
||||
try:
|
||||
return NotifyConfigFile.model_validate_json(DATA_FILE_PATH.read_text())
|
||||
except Exception as e:
|
||||
logger.warning(f"在解析 Notify 时遇到问题:{e}")
|
||||
return NotifyConfigFile()
|
||||
|
||||
|
||||
def save_notify_config(config: NotifyConfigFile):
|
||||
DATA_FILE_PATH.write_text(config.model_dump_json(indent=4))
|
||||
|
||||
|
||||
async def notify_now(notify: Notify):
|
||||
if notify.platform == 'console':
|
||||
bot = [b for b in nonebot.get_bots().values() if isinstance(b, ConsoleBot)]
|
||||
if len(bot) != 1:
|
||||
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
||||
return False
|
||||
bot = bot[0]
|
||||
await bot.send_private_message(notify.target, f"代办通知:{notify.notify_msg}")
|
||||
elif notify.platform == 'discord':
|
||||
bot = [b for b in nonebot.get_bots().values() if isinstance(b, DiscordBot)]
|
||||
if len(bot) != 1:
|
||||
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
||||
return False
|
||||
bot = bot[0]
|
||||
channel = await bot.create_DM(recipient_id=int(notify.target))
|
||||
await bot.send_to(channel.id, f"代办通知:{notify.notify_msg}")
|
||||
elif notify.platform == 'qq':
|
||||
bot = [b for b in nonebot.get_bots().values() if isinstance(b, OnebotV11Bot)]
|
||||
if len(bot) != 1:
|
||||
logger.warning(f"提醒未成功发送出去:{nonebot.get_bots()} {notify}")
|
||||
return False
|
||||
bot = bot[0]
|
||||
if notify.target_env is None:
|
||||
await bot.send_private_msg(
|
||||
user_id=int(notify.target),
|
||||
message=f"代办通知:{notify.notify_msg}",
|
||||
)
|
||||
else:
|
||||
await bot.send_group_msg(
|
||||
group_id=int(notify.target_env),
|
||||
message=cast(Any,
|
||||
await UniMessage().at(notify.target).text(f" 代办通知:{notify.notify_msg}").export()
|
||||
),
|
||||
)
|
||||
else:
|
||||
logger.warning(f"提醒未成功发送出去:{notify}")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
async def create_notify_task(notify: Notify, fail2remove: bool = True):
|
||||
async def mission():
|
||||
begin_time = datetime.datetime.now()
|
||||
if begin_time < notify.notify_time:
|
||||
await asyncio.sleep((notify.notify_time - begin_time).total_seconds())
|
||||
res = await notify_now(notify)
|
||||
if fail2remove or res:
|
||||
await DATA_FILE_LOCK.acquire()
|
||||
cfg = load_notify_config()
|
||||
cfg.notifies = [n for n in cfg.notifies if n.get_str() != notify.get_str()]
|
||||
if not res:
|
||||
cfg.unsent.append(notify)
|
||||
save_notify_config(cfg)
|
||||
DATA_FILE_LOCK.release()
|
||||
else:
|
||||
pass
|
||||
return asyncio.create_task(mission())
|
||||
|
||||
|
||||
@evt.handle()
|
||||
async def _(msg: UniMsg, mEvt: Event):
|
||||
if mEvt.get_user_id() in nonebot.get_bots():
|
||||
return
|
||||
|
||||
text = msg.extract_plain_text()
|
||||
if "提醒我" not in text:
|
||||
return
|
||||
|
||||
segments = text.split("提醒我", maxsplit=1)
|
||||
if len(segments) != 2:
|
||||
return
|
||||
|
||||
notify_time, notify_text = segments
|
||||
target_time = get_target_time(notify_time)
|
||||
if target_time is None:
|
||||
logger.info(f"无法从 {notify_time} 中解析出时间")
|
||||
return
|
||||
if not notify_text:
|
||||
return
|
||||
|
||||
await DATA_FILE_LOCK.acquire()
|
||||
cfg = load_notify_config()
|
||||
|
||||
if isinstance(mEvt, ConsoleMessageEvent):
|
||||
platform = "console"
|
||||
target = mEvt.get_user_id()
|
||||
target_env = None
|
||||
elif isinstance(mEvt, OnebotV11MessageEvent):
|
||||
platform = "qq"
|
||||
target = mEvt.get_user_id()
|
||||
if isinstance(mEvt, OnebotV11GroupMessageEvent):
|
||||
target_env = str(mEvt.group_id)
|
||||
else:
|
||||
target_env = None
|
||||
elif isinstance(mEvt, DiscordMessageEvent):
|
||||
platform = "discord"
|
||||
target = mEvt.get_user_id()
|
||||
target_env = None
|
||||
else:
|
||||
logger.warning(f"Notify 遇到不支持的平台:{type(mEvt).__name__}")
|
||||
return
|
||||
|
||||
notify = Notify(
|
||||
platform=platform,
|
||||
target=target,
|
||||
target_env=target_env,
|
||||
notify_time=target_time,
|
||||
notify_msg=notify_text,
|
||||
)
|
||||
await create_notify_task(notify)
|
||||
|
||||
cfg.notifies.append(notify)
|
||||
save_notify_config(cfg)
|
||||
DATA_FILE_LOCK.release()
|
||||
|
||||
await evt.send(await UniMessage().at(mEvt.get_user_id()).text(
|
||||
f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export())
|
||||
|
||||
|
||||
driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@driver.on_bot_connect
|
||||
async def _():
|
||||
await DATA_FILE_LOCK.acquire()
|
||||
tasks = []
|
||||
cfg = load_notify_config()
|
||||
for notify in cfg.notifies:
|
||||
tasks.append(create_notify_task(notify, fail2remove=False))
|
||||
DATA_FILE_LOCK.release()
|
||||
|
||||
await asyncio.gather(*tasks)
|
||||
358
konabot/plugins/simple_notify/parse_time.py
Normal file
@ -0,0 +1,358 @@
|
||||
import datetime
|
||||
import re
|
||||
from typing import Optional, Dict, List, Callable, Tuple
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# --- 常量与正则表达式定义 (Constants and Regex Definitions) ---
|
||||
|
||||
# 数字模式,兼容中文和阿拉伯数字
|
||||
P_NUM = r"(\d+|[零一两二三四五六七八九十]+)"
|
||||
|
||||
# 预编译的正则表达式
|
||||
PATTERNS = {
|
||||
# 相对时间, e.g., "5分钟后"
|
||||
"DELTA": re.compile(
|
||||
r"^"
|
||||
r"((?P<days>" + P_NUM + r") ?天)?"
|
||||
r"((?P<hours>" + P_NUM + r") ?个?小?时)?"
|
||||
r"((?P<minutes>" + P_NUM + r") ?分钟?)?"
|
||||
r"((?P<seconds>" + P_NUM + r") ?秒钟?)?"
|
||||
r" ?后 ?$"
|
||||
),
|
||||
# 绝对时间
|
||||
"YEAR": re.compile(r"(" + P_NUM + r") ?年"),
|
||||
"MONTH": re.compile(r"(" + P_NUM + r") ?月"),
|
||||
"DAY": re.compile(r"(" + P_NUM + r") ?[日号]"),
|
||||
"HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"),
|
||||
"MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"),
|
||||
"SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"),
|
||||
"HMS_COLON": re.compile(r"(\d{1,2})[::](\d{1,2})([::](\d{1,2}))?"),
|
||||
"PM": re.compile(r"(下午|PM|晚上)"),
|
||||
# 相对日期
|
||||
"TOMORROW": re.compile(r"明天"),
|
||||
"DAY_AFTER_TOMORROW": re.compile(r"后天"),
|
||||
"TODAY": re.compile(r"今天"),
|
||||
}
|
||||
|
||||
# 中文数字到阿拉伯数字的映射
|
||||
CHINESE_TO_ARABIC_MAP: Dict[str, int] = {
|
||||
'零': 0, '一': 1, '二': 2, '三': 3, '四': 4,
|
||||
'五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10
|
||||
}
|
||||
|
||||
# --- 核心工具函数 (Core Utility Functions) ---
|
||||
|
||||
def parse_number(s: str) -> int:
|
||||
"""
|
||||
将包含中文或阿拉伯数字的字符串解析为整数。
|
||||
例如: "五" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。
|
||||
返回 -1 表示解析失败。
|
||||
"""
|
||||
if not s:
|
||||
return -1
|
||||
|
||||
s = s.strip().replace("两", "二")
|
||||
|
||||
if s.isdigit():
|
||||
return int(s)
|
||||
|
||||
if s in CHINESE_TO_ARABIC_MAP:
|
||||
return CHINESE_TO_ARABIC_MAP[s]
|
||||
|
||||
# 处理 "十" 在不同位置的情况
|
||||
if s.startswith('十'):
|
||||
if len(s) == 1:
|
||||
return 10
|
||||
num = CHINESE_TO_ARABIC_MAP.get(s[1])
|
||||
return 10 + num if num is not None else -1
|
||||
|
||||
if s.endswith('十'):
|
||||
if len(s) == 2:
|
||||
num = CHINESE_TO_ARABIC_MAP.get(s[0])
|
||||
return 10 * num if num is not None else -1
|
||||
|
||||
if '十' in s:
|
||||
parts = s.split('十')
|
||||
if len(parts) == 2:
|
||||
left = CHINESE_TO_ARABIC_MAP.get(parts[0])
|
||||
right = CHINESE_TO_ARABIC_MAP.get(parts[1])
|
||||
if left is not None and right is not None:
|
||||
return left * 10 + right
|
||||
|
||||
return -1
|
||||
|
||||
|
||||
# --- 时间解析器类 (Time Parser Class) ---
|
||||
|
||||
class TimeParser:
|
||||
"""
|
||||
一个用于解析自然语言时间描述的类。
|
||||
"""
|
||||
def __init__(self, content: str):
|
||||
self.original_content: str = content
|
||||
self.content_to_parse: str = self._preprocess(content)
|
||||
self.now: datetime.datetime = datetime.datetime.now()
|
||||
# 将 t 作为结果构建器,初始化为今天的午夜
|
||||
self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
self.is_pm_specified: bool = False
|
||||
self.is_date_specified: bool = False
|
||||
self.is_time_specified: bool = False
|
||||
|
||||
def _preprocess(self, content: str) -> str:
|
||||
"""预处理字符串,移除不相关字符。"""
|
||||
content = re.sub(r"\s+", "", content)
|
||||
content = re.sub(r"[,,\.。::、]", "", content)
|
||||
return content
|
||||
|
||||
def _consume_match(self, match: re.Match) -> str:
|
||||
"""从待解析字符串中移除已匹配的部分。"""
|
||||
self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1)
|
||||
return match.group(0)
|
||||
|
||||
def parse(self) -> Optional[datetime.datetime]:
|
||||
"""
|
||||
主解析方法。
|
||||
首先尝试解析相对时间(如“5分钟后”),失败则尝试解析绝对时间。
|
||||
"""
|
||||
logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'")
|
||||
if not self.content_to_parse:
|
||||
logger.debug("❌ 内容为空,无法解析。")
|
||||
return None
|
||||
|
||||
# 1. 尝试相对时间解析
|
||||
if (target_time := self._parse_relative_time()) is not None:
|
||||
return target_time
|
||||
|
||||
# 2. 尝试绝对时间解析
|
||||
if (target_time := self._parse_absolute_time()) is not None:
|
||||
return target_time
|
||||
|
||||
logger.debug(f"❌ 所有解析模式均未匹配成功。")
|
||||
return None
|
||||
|
||||
def _parse_relative_time(self) -> Optional[datetime.datetime]:
|
||||
"""解析 'X天X小时X分钟后' 这种格式。"""
|
||||
if match := PATTERNS["DELTA"].match(self.content_to_parse):
|
||||
logger.debug("⏳ 匹配到相对时间模式 (DELTA)。")
|
||||
try:
|
||||
delta_parts = {
|
||||
"days": parse_number(match.group("days") or "0"),
|
||||
"hours": parse_number(match.group("hours") or "0"),
|
||||
"minutes": parse_number(match.group("minutes") or "0"),
|
||||
"seconds": parse_number(match.group("seconds") or "0"),
|
||||
}
|
||||
|
||||
# 检查是否有无效的数字解析
|
||||
if any(v < 0 for v in delta_parts.values()):
|
||||
logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}")
|
||||
return None
|
||||
|
||||
delta = datetime.timedelta(**delta_parts)
|
||||
if delta.total_seconds() == 0:
|
||||
logger.debug("❌ 解析出的时间增量为0。")
|
||||
return None
|
||||
|
||||
target_time = self.now + delta
|
||||
logger.debug(f"✅ 相对时间解析成功 -> {target_time}")
|
||||
return target_time
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True)
|
||||
return None
|
||||
return None
|
||||
|
||||
def _parse_absolute_time(self) -> Optional[datetime.datetime]:
|
||||
"""解析一个指定的日期和时间。"""
|
||||
logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}")
|
||||
|
||||
# 定义解析步骤和顺序
|
||||
# (pattern_key, handler_method)
|
||||
parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [
|
||||
("TOMORROW", self._handle_tomorrow),
|
||||
("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow),
|
||||
("TODAY", self._handle_today),
|
||||
("YEAR", self._handle_year),
|
||||
("MONTH", self._handle_month),
|
||||
("DAY", self._handle_day),
|
||||
("HMS_COLON", self._handle_hms_colon),
|
||||
("PM", self._handle_pm),
|
||||
("HOUR", self._handle_hour),
|
||||
("MINUTE", self._handle_minute),
|
||||
("SECOND", self._handle_second),
|
||||
]
|
||||
|
||||
for key, handler in parsing_steps:
|
||||
if match := PATTERNS[key].search(self.content_to_parse):
|
||||
if not handler(match):
|
||||
# 如果任何一个处理器返回False,说明解析失败
|
||||
return None
|
||||
|
||||
# 移除无意义的上午关键词
|
||||
self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "")
|
||||
|
||||
# 如果解析后还有剩余字符,说明有无法识别的部分
|
||||
if self.content_to_parse.strip():
|
||||
logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'")
|
||||
return None
|
||||
|
||||
# 最终调整和检查
|
||||
return self._finalize_datetime()
|
||||
|
||||
# --- Handler Methods for Absolute Time Parsing ---
|
||||
|
||||
def _handle_tomorrow(self, match: re.Match) -> bool:
|
||||
self.t += datetime.timedelta(days=1)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_day_after_tomorrow(self, match: re.Match) -> bool:
|
||||
self.t += datetime.timedelta(days=2)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_today(self, match: re.Match) -> bool:
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_year(self, match: re.Match) -> bool:
|
||||
year = parse_number(match.group(1))
|
||||
if year < 0: return False
|
||||
if year < 100: year += 2000 # 处理 "25年" -> 2025
|
||||
if year < self.now.year:
|
||||
logger.debug(f"❌ 指定的年份 {year} 已过去。")
|
||||
return False
|
||||
self.t = self.t.replace(year=year)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_month(self, match: re.Match) -> bool:
|
||||
month = parse_number(match.group(1))
|
||||
if not (1 <= month <= 12):
|
||||
logger.debug(f"❌ 无效的月份: {month}")
|
||||
return False
|
||||
|
||||
# 如果设置的月份在当前月份之前,且没有指定年份,则年份加一
|
||||
if month < self.t.month and not self.is_date_specified:
|
||||
self.t = self.t.replace(year=self.t.year + 1)
|
||||
logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}")
|
||||
|
||||
self.t = self.t.replace(month=month)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_day(self, match: re.Match) -> bool:
|
||||
day = parse_number(match.group(1))
|
||||
if not (1 <= day <= 31):
|
||||
logger.debug(f"❌ 无效的日期: {day}")
|
||||
return False
|
||||
|
||||
try:
|
||||
# 如果日期小于当前日期,且只指定了日,则月份加一
|
||||
if day < self.t.day and not self.is_date_specified:
|
||||
if self.t.month == 12:
|
||||
self.t = self.t.replace(year=self.t.year + 1, month=1)
|
||||
else:
|
||||
self.t = self.t.replace(month=self.t.month + 1)
|
||||
logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}")
|
||||
|
||||
self.t = self.t.replace(day=day)
|
||||
self.is_date_specified = True
|
||||
logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
except ValueError:
|
||||
logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。")
|
||||
return False
|
||||
|
||||
def _handle_hms_colon(self, match: re.Match) -> bool:
|
||||
h = int(match.group(1))
|
||||
m = int(match.group(2))
|
||||
s_str = match.group(4) # group(3) is with colon, group(4) is the number
|
||||
s = int(s_str) if s_str else 0
|
||||
if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59):
|
||||
logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}")
|
||||
return False
|
||||
self.t = self.t.replace(hour=h, minute=m, second=s)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_pm(self, match: re.Match) -> bool:
|
||||
self.is_pm_specified = True
|
||||
logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_hour(self, match: re.Match) -> bool:
|
||||
hour = parse_number(match.group(1))
|
||||
has_half = match.group(2) == '半'
|
||||
if not (0 <= hour <= 23):
|
||||
logger.debug(f"❌ 无效的小时: {hour}")
|
||||
return False
|
||||
minute = 30 if has_half else self.t.minute
|
||||
self.t = self.t.replace(hour=hour, minute=minute)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_minute(self, match: re.Match) -> bool:
|
||||
minute = parse_number(match.group(1))
|
||||
if not (0 <= minute <= 59):
|
||||
logger.debug(f"❌ 无效的分钟: {minute}")
|
||||
return False
|
||||
self.t = self.t.replace(minute=minute)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _handle_second(self, match: re.Match) -> bool:
|
||||
second = parse_number(match.group(1))
|
||||
if not (0 <= second <= 59):
|
||||
logger.debug(f"❌ 无效的秒: {second}")
|
||||
return False
|
||||
self.t = self.t.replace(second=second)
|
||||
self.is_time_specified = True
|
||||
logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'")
|
||||
return True
|
||||
|
||||
def _finalize_datetime(self) -> Optional[datetime.datetime]:
|
||||
"""对解析出的时间进行最后的调整和检查。"""
|
||||
# 处理下午/晚上
|
||||
if self.is_pm_specified and self.t.hour < 12:
|
||||
self.t = self.t.replace(hour=self.t.hour + 12)
|
||||
logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}")
|
||||
|
||||
# 如果没有指定任何时间或日期部分,则认为解析无效
|
||||
if not self.is_date_specified and not self.is_time_specified:
|
||||
logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。")
|
||||
return None
|
||||
|
||||
# 如果最终计算出的时间点在当前时间之前,自动往后推
|
||||
# 例如:现在是 15:00,说 "14点",应该是指明天的14点
|
||||
if self.t < self.now:
|
||||
# 只有在明确指定了时间的情况下,才自动加一天
|
||||
# 如果只指定了一个过去的日期(如“去年5月1号”),则不应该调整
|
||||
if self.is_time_specified:
|
||||
self.t += datetime.timedelta(days=1)
|
||||
logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}")
|
||||
|
||||
logger.debug(f"✅ 解析成功,最终时间: {self.t}")
|
||||
return self.t
|
||||
|
||||
# --- 公共接口 (Public Interface) ---
|
||||
|
||||
def get_target_time(content: str) -> Optional[datetime.datetime]:
|
||||
"""
|
||||
高级接口,用于将自然语言时间描述转换为 datetime 对象。
|
||||
|
||||
Args:
|
||||
content: 包含时间信息的字符串。
|
||||
|
||||
Returns:
|
||||
一个 datetime 对象,如果解析失败则返回 None。
|
||||
"""
|
||||
parser = TimeParser(content)
|
||||
return parser.parse()
|
||||
18
poetry.lock
generated
@ -1793,20 +1793,20 @@ files = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opencv-python"
|
||||
name = "opencv-python-headless"
|
||||
version = "4.12.0.88"
|
||||
description = "Wrapper package for OpenCV python bindings."
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "opencv-python-4.12.0.88.tar.gz", hash = "sha256:8b738389cede219405f6f3880b851efa3415ccd674752219377353f017d2994d"},
|
||||
{file = "opencv_python-4.12.0.88-cp37-abi3-macosx_13_0_arm64.whl", hash = "sha256:f9a1f08883257b95a5764bf517a32d75aec325319c8ed0f89739a57fae9e92a5"},
|
||||
{file = "opencv_python-4.12.0.88-cp37-abi3-macosx_13_0_x86_64.whl", hash = "sha256:812eb116ad2b4de43ee116fcd8991c3a687f099ada0b04e68f64899c09448e81"},
|
||||
{file = "opencv_python-4.12.0.88-cp37-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:51fd981c7df6af3e8f70b1556696b05224c4e6b6777bdd2a46b3d4fb09de1a92"},
|
||||
{file = "opencv_python-4.12.0.88-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:092c16da4c5a163a818f120c22c5e4a2f96e0db4f24e659c701f1fe629a690f9"},
|
||||
{file = "opencv_python-4.12.0.88-cp37-abi3-win32.whl", hash = "sha256:ff554d3f725b39878ac6a2e1fa232ec509c36130927afc18a1719ebf4fbf4357"},
|
||||
{file = "opencv_python-4.12.0.88-cp37-abi3-win_amd64.whl", hash = "sha256:d98edb20aa932fd8ebd276a72627dad9dc097695b3d435a4257557bbb49a79d2"},
|
||||
{file = "opencv-python-headless-4.12.0.88.tar.gz", hash = "sha256:cfdc017ddf2e59b6c2f53bc12d74b6b0be7ded4ec59083ea70763921af2b6c09"},
|
||||
{file = "opencv_python_headless-4.12.0.88-cp37-abi3-macosx_13_0_arm64.whl", hash = "sha256:1e58d664809b3350c1123484dd441e1667cd7bed3086db1b9ea1b6f6cb20b50e"},
|
||||
{file = "opencv_python_headless-4.12.0.88-cp37-abi3-macosx_13_0_x86_64.whl", hash = "sha256:365bb2e486b50feffc2d07a405b953a8f3e8eaa63865bc650034e5c71e7a5154"},
|
||||
{file = "opencv_python_headless-4.12.0.88-cp37-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:aeb4b13ecb8b4a0beb2668ea07928160ea7c2cd2d9b5ef571bbee6bafe9cc8d0"},
|
||||
{file = "opencv_python_headless-4.12.0.88-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:236c8df54a90f4d02076e6f9c1cc763d794542e886c576a6fee46ec8ff75a7a9"},
|
||||
{file = "opencv_python_headless-4.12.0.88-cp37-abi3-win32.whl", hash = "sha256:fde2cf5c51e4def5f2132d78e0c08f9c14783cd67356922182c6845b9af87dbd"},
|
||||
{file = "opencv_python_headless-4.12.0.88-cp37-abi3-win_amd64.whl", hash = "sha256:86b413bdd6c6bf497832e346cd5371995de148e579b9774f8eba686dee3f5528"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@ -3162,4 +3162,4 @@ type = ["pytest-mypy"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.12,<4.0"
|
||||
content-hash = "1968dbf21fc397693f6d77f371cbadecb97e287a5b07918c9ac7d730d09e614a"
|
||||
content-hash = "673703a789248d0f7369999c364352eb12f8bb5830a8b4b6918f8bab6425a763"
|
||||
|
||||
@ -20,7 +20,7 @@ dependencies = [
|
||||
"lxml (>=6.0.2,<7.0.0)",
|
||||
"pillow (>=11.3.0,<12.0.0)",
|
||||
"imagetext-py (>=2.2.0,<3.0.0)",
|
||||
"opencv-python (>=4.12.0.88,<5.0.0.0)",
|
||||
"opencv-python-headless (>=4.12.0.88,<5.0.0.0)",
|
||||
]
|
||||
|
||||
|
||||
|
||||
@ -1055,14 +1055,14 @@ numpy==2.2.6 ; python_version >= "3.12" and python_version < "4.0" \
|
||||
--hash=sha256:fd83c01228a688733f1ded5201c678f0c53ecc1006ffbc404db9f7a899ac6249 \
|
||||
--hash=sha256:fe27749d33bb772c80dcd84ae7e8df2adc920ae8297400dabec45f0dedb3f6de \
|
||||
--hash=sha256:fee4236c876c4e8369388054d02d0e9bb84821feb1a64dd59e137e6511a551f8
|
||||
opencv-python==4.12.0.88 ; python_version >= "3.12" and python_version < "4.0" \
|
||||
--hash=sha256:092c16da4c5a163a818f120c22c5e4a2f96e0db4f24e659c701f1fe629a690f9 \
|
||||
--hash=sha256:51fd981c7df6af3e8f70b1556696b05224c4e6b6777bdd2a46b3d4fb09de1a92 \
|
||||
--hash=sha256:812eb116ad2b4de43ee116fcd8991c3a687f099ada0b04e68f64899c09448e81 \
|
||||
--hash=sha256:8b738389cede219405f6f3880b851efa3415ccd674752219377353f017d2994d \
|
||||
--hash=sha256:d98edb20aa932fd8ebd276a72627dad9dc097695b3d435a4257557bbb49a79d2 \
|
||||
--hash=sha256:f9a1f08883257b95a5764bf517a32d75aec325319c8ed0f89739a57fae9e92a5 \
|
||||
--hash=sha256:ff554d3f725b39878ac6a2e1fa232ec509c36130927afc18a1719ebf4fbf4357
|
||||
opencv-python-headless==4.12.0.88 ; python_version >= "3.12" and python_version < "4.0" \
|
||||
--hash=sha256:1e58d664809b3350c1123484dd441e1667cd7bed3086db1b9ea1b6f6cb20b50e \
|
||||
--hash=sha256:236c8df54a90f4d02076e6f9c1cc763d794542e886c576a6fee46ec8ff75a7a9 \
|
||||
--hash=sha256:365bb2e486b50feffc2d07a405b953a8f3e8eaa63865bc650034e5c71e7a5154 \
|
||||
--hash=sha256:86b413bdd6c6bf497832e346cd5371995de148e579b9774f8eba686dee3f5528 \
|
||||
--hash=sha256:aeb4b13ecb8b4a0beb2668ea07928160ea7c2cd2d9b5ef571bbee6bafe9cc8d0 \
|
||||
--hash=sha256:cfdc017ddf2e59b6c2f53bc12d74b6b0be7ded4ec59083ea70763921af2b6c09 \
|
||||
--hash=sha256:fde2cf5c51e4def5f2132d78e0c08f9c14783cd67356922182c6845b9af87dbd
|
||||
pillow==11.3.0 ; python_version >= "3.12" and python_version < "4.0" \
|
||||
--hash=sha256:023f6d2d11784a465f09fd09a34b150ea4672e85fb3d05931d89f373ab14abb2 \
|
||||
--hash=sha256:02a723e6bf909e7cea0dac1b0e0310be9d7650cd66222a5f1c571455c0a45214 \
|
||||
|
||||
0
scripts/__init__.py
Normal file
21
scripts/test_plugin_load.py
Normal file
@ -0,0 +1,21 @@
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
from loguru import logger
|
||||
|
||||
nonebot.init()
|
||||
nonebot.load_plugins("konabot/plugins")
|
||||
|
||||
plugins = nonebot.get_loaded_plugins()
|
||||
len_requires = len(
|
||||
[f for f in (
|
||||
Path(__file__).parent.parent / "konabot" / "plugins"
|
||||
).iterdir() if f.is_dir() and (f / "__init__.py").exists()]
|
||||
)
|
||||
|
||||
plugins = [p for p in plugins if p.module.__name__.startswith("konabot.plugins")]
|
||||
|
||||
logger.info(f"已经加载的插件数量 {len(plugins)}")
|
||||
logger.info(f"期待加载的插件数量 {len_requires}")
|
||||
|
||||
assert len(plugins) == len_requires
|
||||