From 0dbe164703f3288165fbf3fb8305369d07ad8ca6 Mon Sep 17 00:00:00 2001 From: passthem Date: Tue, 30 Sep 2025 01:59:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=88=E8=B0=83=E6=95=B4=E5=88=B0=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E5=8F=AF=E7=94=A8=E7=9A=84=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- konabot/plugins/simple_notify/__init__.py | 152 +-------- konabot/plugins/simple_notify/parse_time.py | 358 ++++++++++++++++++++ 2 files changed, 364 insertions(+), 146 deletions(-) create mode 100644 konabot/plugins/simple_notify/parse_time.py diff --git a/konabot/plugins/simple_notify/__init__.py b/konabot/plugins/simple_notify/__init__.py index 3dba19a..220ebf6 100644 --- a/konabot/plugins/simple_notify/__init__.py +++ b/konabot/plugins/simple_notify/__init__.py @@ -1,6 +1,5 @@ import asyncio import datetime -import re from pathlib import Path from typing import Any, Literal, cast @@ -20,149 +19,7 @@ from nonebot.adapters.onebot.v11.event import \ from nonebot_plugin_alconna import UniMessage, UniMsg from pydantic import BaseModel -PATTERN_DELTA_HMS = re.compile(r"^((\d+|[零一两二三四五六七八九十]+) ?天)?((\d+|[零一两二三四五六七八九十]+) ?个?小?时)?((\d+|[零一两二三四五六七八九十]+) ?分钟?)?((\d+|[零一两二三四五六七八九十]+) ?秒钟?)? ?后 ?$") - -PATTERN_DATE_SPECIFY = re.compile(r"(\d{1,2}|[零一二三四五六七八九十]+) ?[日号]") -PATTERN_MONTH_SPECIFY = re.compile(r"(\d{1,2}|[零一二三四五六七八九十]+) ?月") -PATTERN_YEAR_SPECIFY = re.compile(r"(\d|[零一二三四五六七八九十]+) ?年") -PATTERN_HOUR_SPECIFY = re.compile(r"(\d|[零一二三四五六七八九十]+) ?[点时](半?)钟?") -PATTERN_MINUTE_SPECIFY = re.compile(r"(\d|[零一二三四五六七八九十]+) ?分(钟)?") -PATTERN_SECOND_SPECIFY = re.compile(r"(\d|[零一二三四五六七八九十]+) ?秒(钟)?") -PATTERN_HMS_SPECIFY = re.compile(r"\d\d[::]\d\d([::]\d\d)?") -PATTERN_PM_SPECIFY = re.compile(r"(下午|PM|晚上)") - - -def parse_chinese_or_digit(s: str) -> int: - if set(s) <= set("0123456789"): - return int(s) - - s = s.replace("两", "二") - - chinese_to_arabic = { - '零': 0, '一': 1, '二': 2, '三': 3, '四': 4, - '五': 5, '六': 6, '七': 7, '八': 8, '九': 9, - '十': 10 - } - - if s in chinese_to_arabic: - return chinese_to_arabic[s] - - if len(s) == 2 and s[0] == '十': - if s[1] not in chinese_to_arabic: - return -1 - return 10 + chinese_to_arabic.get(s[1], 0) - elif len(s) == 2 and s[1] == '十': - if s[0] not in chinese_to_arabic: - return -1 - return 10 * chinese_to_arabic.get(s[0], 0) - elif len(s) == 3 and s[1] == '十': - if s[0] not in chinese_to_arabic or s[2] not in chinese_to_arabic: - return -1 - return 10 * chinese_to_arabic.get(s[0], 0) + chinese_to_arabic.get(s[2], 0) - - try: - return int(s) - except ValueError: - return -1 - - -def get_target_time(content: str) -> datetime.datetime | None: - if match := re.match(PATTERN_DELTA_HMS, content.strip()): - days = parse_chinese_or_digit(match.group(2) or "0") - hours = parse_chinese_or_digit(match.group(4) or "0") - minutes = parse_chinese_or_digit(match.group(6) or "0") - seconds = parse_chinese_or_digit(match.group(8) or "0") - return datetime.datetime.now() + datetime.timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds) - - t = datetime.datetime.now() - content_to_match = content - if "明天" in content_to_match: - content_to_match = "".join(content_to_match.split("明天")) - t += datetime.timedelta(days=1) - elif "后天" in content_to_match: - content_to_match = "".join(content_to_match.split("后天")) - t += datetime.timedelta(days=2) - elif "今天" in content_to_match: - content_to_match = "".join(content_to_match.split("今天")) - - if match1 := re.match(PATTERN_DATE_SPECIFY, content_to_match): - content_to_match = "".join(content_to_match.split(match1.group(0))) - day = parse_chinese_or_digit(match1.group(1)) - if day <= 0 or day > 31: - return - if day < t.day: - if t.month == 12: - t = t.replace(year=t.year + 1, month=1, day=day) - else: - t = t.replace(month=t.month + 1, day=day) - else: - t = t.replace(day=day) - if match2 := re.match(PATTERN_MONTH_SPECIFY, content_to_match): - content_to_match = "".join(content_to_match.split(match2.group(0))) - month = parse_chinese_or_digit(match2.group(1)) - if month <= 0 or month > 12: - return - if month < t.month: - t = t.replace(year=t.year + 1, month=month) - else: - t = t.replace(month=month) - if match3 := re.match(PATTERN_YEAR_SPECIFY, content_to_match): - content_to_match = "".join(content_to_match.split(match3.group(0))) - year = parse_chinese_or_digit(match3.group(1)) - if year < 100: - year += 2000 - if year < t.year: - return - t = t.replace(year=year) - if match4 := re.match(PATTERN_HOUR_SPECIFY, content_to_match): - content_to_match = "".join(content_to_match.split(match4.group(0))) - hour = parse_chinese_or_digit(match4.group(1)) - if hour < 0 or hour > 23: - return - t = t.replace(hour=hour, minute=0, second=0) - if match4.group(2) != None: - t = t.replace(minute=30) - if match5 := re.match(PATTERN_MINUTE_SPECIFY, content_to_match): - content_to_match = "".join(content_to_match.split(match5.group(0))) - minute = parse_chinese_or_digit(match5.group(1)) - if minute < 0 or minute > 59: - return - t = t.replace(minute=minute, second=0) - if match6 := re.match(PATTERN_SECOND_SPECIFY, content_to_match): - content_to_match = "".join(content_to_match.split(match6.group(0))) - second = parse_chinese_or_digit(match6.group(1)) - if second < 0 or second > 59: - return - t = t.replace(second=second) - if match7 := re.match(PATTERN_HMS_SPECIFY, content_to_match): - content_to_match = "".join(content_to_match.split(match7.group(0))) - hms = match7.group(0).replace(":", ":").split(":") - if len(hms) >= 2: - hour = int(hms[0]) - minute = int(hms[1]) - if hour < 0 or hour > 23 or minute < 0 or minute > 59: - return - t = t.replace(hour=hour, minute=minute) - if len(hms) == 3: - second = int(hms[2]) - if second < 0 or second > 59: - return - t = t.replace(second=second) - - content_to_match = content_to_match.replace("上午", "").replace("AM", "").replace("凌晨", "") - if match8 := re.match(PATTERN_PM_SPECIFY, content_to_match): - content_to_match = "".join(content_to_match.split(match8.group(0))) - if t.hour < 12: - t = t.replace(hour=t.hour + 12) - if t.hour == 12: - t += datetime.timedelta(hours=12) - - if len(content_to_match.strip()) != 0: - return - if t < datetime.datetime.now(): - t += datetime.timedelta(days=1) - return t - +from konabot.plugins.simple_notify.parse_time import get_target_time evt = on_message() @@ -268,11 +125,14 @@ async def create_notify_task(notify: Notify, fail2remove: bool = True): @evt.handle() async def _(msg: UniMsg, mEvt: Event): + if mEvt.get_user_id() in nonebot.get_bots(): + return + text = msg.extract_plain_text() if "提醒我" not in text: return - segments = text.split("提醒我") + segments = text.split("提醒我", maxsplit=1) if len(segments) != 2: return @@ -318,7 +178,7 @@ async def _(msg: UniMsg, mEvt: Event): cfg.notifies.append(notify) save_notify_config(cfg) DATA_FILE_LOCK.release() - + await evt.send(await UniMessage().at(mEvt.get_user_id()).text( f" 了解啦!将会在 {notify.notify_time} 提醒你哦~").export()) diff --git a/konabot/plugins/simple_notify/parse_time.py b/konabot/plugins/simple_notify/parse_time.py new file mode 100644 index 0000000..51d51a8 --- /dev/null +++ b/konabot/plugins/simple_notify/parse_time.py @@ -0,0 +1,358 @@ +import datetime +import re +from typing import Optional, Dict, List, Callable, Tuple + +from loguru import logger + +# --- 常量与正则表达式定义 (Constants and Regex Definitions) --- + +# 数字模式,兼容中文和阿拉伯数字 +P_NUM = r"(\d+|[零一两二三四五六七八九十]+)" + +# 预编译的正则表达式 +PATTERNS = { + # 相对时间, e.g., "5分钟后" + "DELTA": re.compile( + r"^" + r"((?P" + P_NUM + r") ?天)?" + r"((?P" + P_NUM + r") ?个?小?时)?" + r"((?P" + P_NUM + r") ?分钟?)?" + r"((?P" + P_NUM + r") ?秒钟?)?" + r" ?后 ?$" + ), + # 绝对时间 + "YEAR": re.compile(r"(" + P_NUM + r") ?年"), + "MONTH": re.compile(r"(" + P_NUM + r") ?月"), + "DAY": re.compile(r"(" + P_NUM + r") ?[日号]"), + "HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"), + "MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"), + "SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"), + "HMS_COLON": re.compile(r"(\d{1,2})[::](\d{1,2})([::](\d{1,2}))?"), + "PM": re.compile(r"(下午|PM|晚上)"), + # 相对日期 + "TOMORROW": re.compile(r"明天"), + "DAY_AFTER_TOMORROW": re.compile(r"后天"), + "TODAY": re.compile(r"今天"), +} + +# 中文数字到阿拉伯数字的映射 +CHINESE_TO_ARABIC_MAP: Dict[str, int] = { + '零': 0, '一': 1, '二': 2, '三': 3, '四': 4, + '五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10 +} + +# --- 核心工具函数 (Core Utility Functions) --- + +def parse_number(s: str) -> int: + """ + 将包含中文或阿拉伯数字的字符串解析为整数。 + 例如: "五" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。 + 返回 -1 表示解析失败。 + """ + if not s: + return -1 + + s = s.strip().replace("两", "二") + + if s.isdigit(): + return int(s) + + if s in CHINESE_TO_ARABIC_MAP: + return CHINESE_TO_ARABIC_MAP[s] + + # 处理 "十" 在不同位置的情况 + if s.startswith('十'): + if len(s) == 1: + return 10 + num = CHINESE_TO_ARABIC_MAP.get(s[1]) + return 10 + num if num is not None else -1 + + if s.endswith('十'): + if len(s) == 2: + num = CHINESE_TO_ARABIC_MAP.get(s[0]) + return 10 * num if num is not None else -1 + + if '十' in s: + parts = s.split('十') + if len(parts) == 2: + left = CHINESE_TO_ARABIC_MAP.get(parts[0]) + right = CHINESE_TO_ARABIC_MAP.get(parts[1]) + if left is not None and right is not None: + return left * 10 + right + + return -1 + + +# --- 时间解析器类 (Time Parser Class) --- + +class TimeParser: + """ + 一个用于解析自然语言时间描述的类。 + """ + def __init__(self, content: str): + self.original_content: str = content + self.content_to_parse: str = self._preprocess(content) + self.now: datetime.datetime = datetime.datetime.now() + # 将 t 作为结果构建器,初始化为今天的午夜 + self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0) + self.is_pm_specified: bool = False + self.is_date_specified: bool = False + self.is_time_specified: bool = False + + def _preprocess(self, content: str) -> str: + """预处理字符串,移除不相关字符。""" + content = re.sub(r"\s+", "", content) + content = re.sub(r"[,,\.。::、]", "", content) + return content + + def _consume_match(self, match: re.Match) -> str: + """从待解析字符串中移除已匹配的部分。""" + self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1) + return match.group(0) + + def parse(self) -> Optional[datetime.datetime]: + """ + 主解析方法。 + 首先尝试解析相对时间(如“5分钟后”),失败则尝试解析绝对时间。 + """ + logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'") + if not self.content_to_parse: + logger.debug("❌ 内容为空,无法解析。") + return None + + # 1. 尝试相对时间解析 + if (target_time := self._parse_relative_time()) is not None: + return target_time + + # 2. 尝试绝对时间解析 + if (target_time := self._parse_absolute_time()) is not None: + return target_time + + logger.debug(f"❌ 所有解析模式均未匹配成功。") + return None + + def _parse_relative_time(self) -> Optional[datetime.datetime]: + """解析 'X天X小时X分钟后' 这种格式。""" + if match := PATTERNS["DELTA"].match(self.content_to_parse): + logger.debug("⏳ 匹配到相对时间模式 (DELTA)。") + try: + delta_parts = { + "days": parse_number(match.group("days") or "0"), + "hours": parse_number(match.group("hours") or "0"), + "minutes": parse_number(match.group("minutes") or "0"), + "seconds": parse_number(match.group("seconds") or "0"), + } + + # 检查是否有无效的数字解析 + if any(v < 0 for v in delta_parts.values()): + logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}") + return None + + delta = datetime.timedelta(**delta_parts) + if delta.total_seconds() == 0: + logger.debug("❌ 解析出的时间增量为0。") + return None + + target_time = self.now + delta + logger.debug(f"✅ 相对时间解析成功 -> {target_time}") + return target_time + except (ValueError, TypeError) as e: + logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True) + return None + return None + + def _parse_absolute_time(self) -> Optional[datetime.datetime]: + """解析一个指定的日期和时间。""" + logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}") + + # 定义解析步骤和顺序 + # (pattern_key, handler_method) + parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [ + ("TOMORROW", self._handle_tomorrow), + ("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow), + ("TODAY", self._handle_today), + ("YEAR", self._handle_year), + ("MONTH", self._handle_month), + ("DAY", self._handle_day), + ("HMS_COLON", self._handle_hms_colon), + ("PM", self._handle_pm), + ("HOUR", self._handle_hour), + ("MINUTE", self._handle_minute), + ("SECOND", self._handle_second), + ] + + for key, handler in parsing_steps: + if match := PATTERNS[key].search(self.content_to_parse): + if not handler(match): + # 如果任何一个处理器返回False,说明解析失败 + return None + + # 移除无意义的上午关键词 + self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "") + + # 如果解析后还有剩余字符,说明有无法识别的部分 + if self.content_to_parse.strip(): + logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'") + return None + + # 最终调整和检查 + return self._finalize_datetime() + + # --- Handler Methods for Absolute Time Parsing --- + + def _handle_tomorrow(self, match: re.Match) -> bool: + self.t += datetime.timedelta(days=1) + self.is_date_specified = True + logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'") + return True + + def _handle_day_after_tomorrow(self, match: re.Match) -> bool: + self.t += datetime.timedelta(days=2) + self.is_date_specified = True + logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'") + return True + + def _handle_today(self, match: re.Match) -> bool: + self.is_date_specified = True + logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'") + return True + + def _handle_year(self, match: re.Match) -> bool: + year = parse_number(match.group(1)) + if year < 0: return False + if year < 100: year += 2000 # 处理 "25年" -> 2025 + if year < self.now.year: + logger.debug(f"❌ 指定的年份 {year} 已过去。") + return False + self.t = self.t.replace(year=year) + self.is_date_specified = True + logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'") + return True + + def _handle_month(self, match: re.Match) -> bool: + month = parse_number(match.group(1)) + if not (1 <= month <= 12): + logger.debug(f"❌ 无效的月份: {month}") + return False + + # 如果设置的月份在当前月份之前,且没有指定年份,则年份加一 + if month < self.t.month and not self.is_date_specified: + self.t = self.t.replace(year=self.t.year + 1) + logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}") + + self.t = self.t.replace(month=month) + self.is_date_specified = True + logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'") + return True + + def _handle_day(self, match: re.Match) -> bool: + day = parse_number(match.group(1)) + if not (1 <= day <= 31): + logger.debug(f"❌ 无效的日期: {day}") + return False + + try: + # 如果日期小于当前日期,且只指定了日,则月份加一 + if day < self.t.day and not self.is_date_specified: + if self.t.month == 12: + self.t = self.t.replace(year=self.t.year + 1, month=1) + else: + self.t = self.t.replace(month=self.t.month + 1) + logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}") + + self.t = self.t.replace(day=day) + self.is_date_specified = True + logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'") + return True + except ValueError: + logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。") + return False + + def _handle_hms_colon(self, match: re.Match) -> bool: + h = int(match.group(1)) + m = int(match.group(2)) + s_str = match.group(4) # group(3) is with colon, group(4) is the number + s = int(s_str) if s_str else 0 + if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59): + logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}") + return False + self.t = self.t.replace(hour=h, minute=m, second=s) + self.is_time_specified = True + logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'") + return True + + def _handle_pm(self, match: re.Match) -> bool: + self.is_pm_specified = True + logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'") + return True + + def _handle_hour(self, match: re.Match) -> bool: + hour = parse_number(match.group(1)) + has_half = match.group(2) == '半' + if not (0 <= hour <= 23): + logger.debug(f"❌ 无效的小时: {hour}") + return False + minute = 30 if has_half else self.t.minute + self.t = self.t.replace(hour=hour, minute=minute) + self.is_time_specified = True + logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'") + return True + + def _handle_minute(self, match: re.Match) -> bool: + minute = parse_number(match.group(1)) + if not (0 <= minute <= 59): + logger.debug(f"❌ 无效的分钟: {minute}") + return False + self.t = self.t.replace(minute=minute) + self.is_time_specified = True + logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'") + return True + + def _handle_second(self, match: re.Match) -> bool: + second = parse_number(match.group(1)) + if not (0 <= second <= 59): + logger.debug(f"❌ 无效的秒: {second}") + return False + self.t = self.t.replace(second=second) + self.is_time_specified = True + logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'") + return True + + def _finalize_datetime(self) -> Optional[datetime.datetime]: + """对解析出的时间进行最后的调整和检查。""" + # 处理下午/晚上 + if self.is_pm_specified and self.t.hour < 12: + self.t = self.t.replace(hour=self.t.hour + 12) + logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}") + + # 如果没有指定任何时间或日期部分,则认为解析无效 + if not self.is_date_specified and not self.is_time_specified: + logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。") + return None + + # 如果最终计算出的时间点在当前时间之前,自动往后推 + # 例如:现在是 15:00,说 "14点",应该是指明天的14点 + if self.t < self.now: + # 只有在明确指定了时间的情况下,才自动加一天 + # 如果只指定了一个过去的日期(如“去年5月1号”),则不应该调整 + if self.is_time_specified: + self.t += datetime.timedelta(days=1) + logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}") + + logger.debug(f"✅ 解析成功,最终时间: {self.t}") + return self.t + +# --- 公共接口 (Public Interface) --- + +def get_target_time(content: str) -> Optional[datetime.datetime]: + """ + 高级接口,用于将自然语言时间描述转换为 datetime 对象。 + + Args: + content: 包含时间信息的字符串。 + + Returns: + 一个 datetime 对象,如果解析失败则返回 None。 + """ + parser = TimeParser(content) + return parser.parse() \ No newline at end of file