358 lines
14 KiB
Python
358 lines
14 KiB
Python
import datetime
|
||
import re
|
||
from typing import Optional, Dict, List, Callable, Tuple
|
||
|
||
from loguru import logger
|
||
|
||
# --- 常量与正则表达式定义 (Constants and Regex Definitions) ---
|
||
|
||
# 数字模式,兼容中文和阿拉伯数字
|
||
P_NUM = r"(\d+|[零一两二三四五六七八九十]+)"
|
||
|
||
# 预编译的正则表达式
|
||
PATTERNS = {
|
||
# 相对时间, e.g., "5分钟后"
|
||
"DELTA": re.compile(
|
||
r"^"
|
||
r"((?P<days>" + P_NUM + r") ?天)?"
|
||
r"((?P<hours>" + P_NUM + r") ?个?小?时)?"
|
||
r"((?P<minutes>" + P_NUM + r") ?分钟?)?"
|
||
r"((?P<seconds>" + P_NUM + r") ?秒钟?)?"
|
||
r" ?后 ?$"
|
||
),
|
||
# 绝对时间
|
||
"YEAR": re.compile(r"(" + P_NUM + r") ?年"),
|
||
"MONTH": re.compile(r"(" + P_NUM + r") ?月"),
|
||
"DAY": re.compile(r"(" + P_NUM + r") ?[日号]"),
|
||
"HOUR": re.compile(r"(" + P_NUM + r") ?[点时](半)?钟?"),
|
||
"MINUTE": re.compile(r"(" + P_NUM + r") ?分(钟)?"),
|
||
"SECOND": re.compile(r"(" + P_NUM + r") ?秒(钟)?"),
|
||
"HMS_COLON": re.compile(r"(\d{1,2})[::](\d{1,2})([::](\d{1,2}))?"),
|
||
"PM": re.compile(r"(下午|PM|晚上)"),
|
||
# 相对日期
|
||
"TOMORROW": re.compile(r"明天"),
|
||
"DAY_AFTER_TOMORROW": re.compile(r"后天"),
|
||
"TODAY": re.compile(r"今天"),
|
||
}
|
||
|
||
# 中文数字到阿拉伯数字的映射
|
||
CHINESE_TO_ARABIC_MAP: Dict[str, int] = {
|
||
'零': 0, '一': 1, '二': 2, '三': 3, '四': 4,
|
||
'五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10
|
||
}
|
||
|
||
# --- 核心工具函数 (Core Utility Functions) ---
|
||
|
||
def parse_number(s: str) -> int:
|
||
"""
|
||
将包含中文或阿拉伯数字的字符串解析为整数。
|
||
例如: "五" -> 5, "十五" -> 15, "二十三" -> 23, "12" -> 12。
|
||
返回 -1 表示解析失败。
|
||
"""
|
||
if not s:
|
||
return -1
|
||
|
||
s = s.strip().replace("两", "二")
|
||
|
||
if s.isdigit():
|
||
return int(s)
|
||
|
||
if s in CHINESE_TO_ARABIC_MAP:
|
||
return CHINESE_TO_ARABIC_MAP[s]
|
||
|
||
# 处理 "十" 在不同位置的情况
|
||
if s.startswith('十'):
|
||
if len(s) == 1:
|
||
return 10
|
||
num = CHINESE_TO_ARABIC_MAP.get(s[1])
|
||
return 10 + num if num is not None else -1
|
||
|
||
if s.endswith('十'):
|
||
if len(s) == 2:
|
||
num = CHINESE_TO_ARABIC_MAP.get(s[0])
|
||
return 10 * num if num is not None else -1
|
||
|
||
if '十' in s:
|
||
parts = s.split('十')
|
||
if len(parts) == 2:
|
||
left = CHINESE_TO_ARABIC_MAP.get(parts[0])
|
||
right = CHINESE_TO_ARABIC_MAP.get(parts[1])
|
||
if left is not None and right is not None:
|
||
return left * 10 + right
|
||
|
||
return -1
|
||
|
||
|
||
# --- 时间解析器类 (Time Parser Class) ---
|
||
|
||
class TimeParser:
|
||
"""
|
||
一个用于解析自然语言时间描述的类。
|
||
"""
|
||
def __init__(self, content: str):
|
||
self.original_content: str = content
|
||
self.content_to_parse: str = self._preprocess(content)
|
||
self.now: datetime.datetime = datetime.datetime.now()
|
||
# 将 t 作为结果构建器,初始化为今天的午夜
|
||
self.t: datetime.datetime = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
self.is_pm_specified: bool = False
|
||
self.is_date_specified: bool = False
|
||
self.is_time_specified: bool = False
|
||
|
||
def _preprocess(self, content: str) -> str:
|
||
"""预处理字符串,移除不相关字符。"""
|
||
content = re.sub(r"\s+", "", content)
|
||
content = re.sub(r"[,,\.。::、]", "", content)
|
||
return content
|
||
|
||
def _consume_match(self, match: re.Match) -> str:
|
||
"""从待解析字符串中移除已匹配的部分。"""
|
||
self.content_to_parse = self.content_to_parse.replace(match.group(0), "", 1)
|
||
return match.group(0)
|
||
|
||
def parse(self) -> Optional[datetime.datetime]:
|
||
"""
|
||
主解析方法。
|
||
首先尝试解析相对时间(如“5分钟后”),失败则尝试解析绝对时间。
|
||
"""
|
||
logger.debug(f"🎉 开始解析: '{self.original_content}' -> 清洗后: '{self.content_to_parse}'")
|
||
if not self.content_to_parse:
|
||
logger.debug("❌ 内容为空,无法解析。")
|
||
return None
|
||
|
||
# 1. 尝试相对时间解析
|
||
if (target_time := self._parse_relative_time()) is not None:
|
||
return target_time
|
||
|
||
# 2. 尝试绝对时间解析
|
||
if (target_time := self._parse_absolute_time()) is not None:
|
||
return target_time
|
||
|
||
logger.debug(f"❌ 所有解析模式均未匹配成功。")
|
||
return None
|
||
|
||
def _parse_relative_time(self) -> Optional[datetime.datetime]:
|
||
"""解析 'X天X小时X分钟后' 这种格式。"""
|
||
if match := PATTERNS["DELTA"].match(self.content_to_parse):
|
||
logger.debug("⏳ 匹配到相对时间模式 (DELTA)。")
|
||
try:
|
||
delta_parts = {
|
||
"days": parse_number(match.group("days") or "0"),
|
||
"hours": parse_number(match.group("hours") or "0"),
|
||
"minutes": parse_number(match.group("minutes") or "0"),
|
||
"seconds": parse_number(match.group("seconds") or "0"),
|
||
}
|
||
|
||
# 检查是否有无效的数字解析
|
||
if any(v < 0 for v in delta_parts.values()):
|
||
logger.debug(f"❌ 解析时间片段为数字时失败: {delta_parts}")
|
||
return None
|
||
|
||
delta = datetime.timedelta(**delta_parts)
|
||
if delta.total_seconds() == 0:
|
||
logger.debug("❌ 解析出的时间增量为0。")
|
||
return None
|
||
|
||
target_time = self.now + delta
|
||
logger.debug(f"✅ 相对时间解析成功 -> {target_time}")
|
||
return target_time
|
||
except (ValueError, TypeError) as e:
|
||
logger.debug(f"❌ 解析相对时间时出错: {e}", exc_info=True)
|
||
return None
|
||
return None
|
||
|
||
def _parse_absolute_time(self) -> Optional[datetime.datetime]:
|
||
"""解析一个指定的日期和时间。"""
|
||
logger.debug(f"🎯 启动绝对时间解析,基准时间: {self.t}")
|
||
|
||
# 定义解析步骤和顺序
|
||
# (pattern_key, handler_method)
|
||
parsing_steps: List[Tuple[str, Callable[[re.Match], bool]]] = [
|
||
("TOMORROW", self._handle_tomorrow),
|
||
("DAY_AFTER_TOMORROW", self._handle_day_after_tomorrow),
|
||
("TODAY", self._handle_today),
|
||
("YEAR", self._handle_year),
|
||
("MONTH", self._handle_month),
|
||
("DAY", self._handle_day),
|
||
("HMS_COLON", self._handle_hms_colon),
|
||
("PM", self._handle_pm),
|
||
("HOUR", self._handle_hour),
|
||
("MINUTE", self._handle_minute),
|
||
("SECOND", self._handle_second),
|
||
]
|
||
|
||
for key, handler in parsing_steps:
|
||
if match := PATTERNS[key].search(self.content_to_parse):
|
||
if not handler(match):
|
||
# 如果任何一个处理器返回False,说明解析失败
|
||
return None
|
||
|
||
# 移除无意义的上午关键词
|
||
self.content_to_parse = self.content_to_parse.replace("上午", "").replace("AM", "").replace("凌晨", "")
|
||
|
||
# 如果解析后还有剩余字符,说明有无法识别的部分
|
||
if self.content_to_parse.strip():
|
||
logger.debug(f"❌ 匹配失败,存在未解析的残留内容: '{self.content_to_parse.strip()}'")
|
||
return None
|
||
|
||
# 最终调整和检查
|
||
return self._finalize_datetime()
|
||
|
||
# --- Handler Methods for Absolute Time Parsing ---
|
||
|
||
def _handle_tomorrow(self, match: re.Match) -> bool:
|
||
self.t += datetime.timedelta(days=1)
|
||
self.is_date_specified = True
|
||
logger.debug(f"📅 匹配到 '明天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _handle_day_after_tomorrow(self, match: re.Match) -> bool:
|
||
self.t += datetime.timedelta(days=2)
|
||
self.is_date_specified = True
|
||
logger.debug(f"📅 匹配到 '后天' -> {self.t.date()}, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _handle_today(self, match: re.Match) -> bool:
|
||
self.is_date_specified = True
|
||
logger.debug(f"📅 匹配到 '今天', 日期基准不变, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _handle_year(self, match: re.Match) -> bool:
|
||
year = parse_number(match.group(1))
|
||
if year < 0: return False
|
||
if year < 100: year += 2000 # 处理 "25年" -> 2025
|
||
if year < self.now.year:
|
||
logger.debug(f"❌ 指定的年份 {year} 已过去。")
|
||
return False
|
||
self.t = self.t.replace(year=year)
|
||
self.is_date_specified = True
|
||
logger.debug(f"Y| 年份更新 -> {self.t.year}, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _handle_month(self, match: re.Match) -> bool:
|
||
month = parse_number(match.group(1))
|
||
if not (1 <= month <= 12):
|
||
logger.debug(f"❌ 无效的月份: {month}")
|
||
return False
|
||
|
||
# 如果设置的月份在当前月份之前,且没有指定年份,则年份加一
|
||
if month < self.t.month and not self.is_date_specified:
|
||
self.t = self.t.replace(year=self.t.year + 1)
|
||
logger.debug(f"💡 月份小于当前月份,年份自动进位 -> {self.t.year}")
|
||
|
||
self.t = self.t.replace(month=month)
|
||
self.is_date_specified = True
|
||
logger.debug(f"M| 月份更新 -> {self.t.month}, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _handle_day(self, match: re.Match) -> bool:
|
||
day = parse_number(match.group(1))
|
||
if not (1 <= day <= 31):
|
||
logger.debug(f"❌ 无效的日期: {day}")
|
||
return False
|
||
|
||
try:
|
||
# 如果日期小于当前日期,且只指定了日,则月份加一
|
||
if day < self.t.day and not self.is_date_specified:
|
||
if self.t.month == 12:
|
||
self.t = self.t.replace(year=self.t.year + 1, month=1)
|
||
else:
|
||
self.t = self.t.replace(month=self.t.month + 1)
|
||
logger.debug(f"💡 日期小于当前日期,月份自动进位 -> {self.t.year}-{self.t.month}")
|
||
|
||
self.t = self.t.replace(day=day)
|
||
self.is_date_specified = True
|
||
logger.debug(f"D| 日期更新 -> {self.t.day}, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
except ValueError:
|
||
logger.debug(f"❌ 日期 {day} 对于月份 {self.t.month} 无效 (例如2月30号)。")
|
||
return False
|
||
|
||
def _handle_hms_colon(self, match: re.Match) -> bool:
|
||
h = int(match.group(1))
|
||
m = int(match.group(2))
|
||
s_str = match.group(4) # group(3) is with colon, group(4) is the number
|
||
s = int(s_str) if s_str else 0
|
||
if not (0 <= h <= 23 and 0 <= m <= 59 and 0 <= s <= 59):
|
||
logger.debug(f"❌ 无效的时间格式: H={h}, M={m}, S={s}")
|
||
return False
|
||
self.t = self.t.replace(hour=h, minute=m, second=s)
|
||
self.is_time_specified = True
|
||
logger.debug(f"T| 时分秒(冒号格式)更新 -> {self.t.time()}, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _handle_pm(self, match: re.Match) -> bool:
|
||
self.is_pm_specified = True
|
||
logger.debug(f"PM| 匹配到下午/晚上, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _handle_hour(self, match: re.Match) -> bool:
|
||
hour = parse_number(match.group(1))
|
||
has_half = match.group(2) == '半'
|
||
if not (0 <= hour <= 23):
|
||
logger.debug(f"❌ 无效的小时: {hour}")
|
||
return False
|
||
minute = 30 if has_half else self.t.minute
|
||
self.t = self.t.replace(hour=hour, minute=minute)
|
||
self.is_time_specified = True
|
||
logger.debug(f"H| 小时更新 -> {self.t.hour}{':30' if has_half else ''}, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _handle_minute(self, match: re.Match) -> bool:
|
||
minute = parse_number(match.group(1))
|
||
if not (0 <= minute <= 59):
|
||
logger.debug(f"❌ 无效的分钟: {minute}")
|
||
return False
|
||
self.t = self.t.replace(minute=minute)
|
||
self.is_time_specified = True
|
||
logger.debug(f"M| 分钟更新 -> {self.t.minute}, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _handle_second(self, match: re.Match) -> bool:
|
||
second = parse_number(match.group(1))
|
||
if not (0 <= second <= 59):
|
||
logger.debug(f"❌ 无效的秒: {second}")
|
||
return False
|
||
self.t = self.t.replace(second=second)
|
||
self.is_time_specified = True
|
||
logger.debug(f"S| 秒更新 -> {self.t.second}, 消耗: '{self._consume_match(match)}'")
|
||
return True
|
||
|
||
def _finalize_datetime(self) -> Optional[datetime.datetime]:
|
||
"""对解析出的时间进行最后的调整和检查。"""
|
||
# 处理下午/晚上
|
||
if self.is_pm_specified and self.t.hour < 12:
|
||
self.t = self.t.replace(hour=self.t.hour + 12)
|
||
logger.debug(f"💡 根据 PM 标识,小时调整为 -> {self.t.hour}")
|
||
|
||
# 如果没有指定任何时间或日期部分,则认为解析无效
|
||
if not self.is_date_specified and not self.is_time_specified:
|
||
logger.debug("❌ 未能从输入中解析出任何有效的日期或时间部分。")
|
||
return None
|
||
|
||
# 如果最终计算出的时间点在当前时间之前,自动往后推
|
||
# 例如:现在是 15:00,说 "14点",应该是指明天的14点
|
||
if self.t < self.now:
|
||
# 只有在明确指定了时间的情况下,才自动加一天
|
||
# 如果只指定了一个过去的日期(如“去年5月1号”),则不应该调整
|
||
if self.is_time_specified:
|
||
self.t += datetime.timedelta(days=1)
|
||
logger.debug(f"🔁 目标时间已过,自动调整为明天 -> {self.t}")
|
||
|
||
logger.debug(f"✅ 解析成功,最终时间: {self.t}")
|
||
return self.t
|
||
|
||
# --- 公共接口 (Public Interface) ---
|
||
|
||
def get_target_time(content: str) -> Optional[datetime.datetime]:
|
||
"""
|
||
高级接口,用于将自然语言时间描述转换为 datetime 对象。
|
||
|
||
Args:
|
||
content: 包含时间信息的字符串。
|
||
|
||
Returns:
|
||
一个 datetime 对象,如果解析失败则返回 None。
|
||
"""
|
||
parser = TimeParser(content)
|
||
return parser.parse() |