Files
ptimeparse/ptimeparse/__init__.py
2025-10-09 18:59:33 +08:00

636 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import re
from dataclasses import dataclass
from typing import Literal
from ptimeparse.err import (MultipleSpecificationException,
TokenUnhandledException)
@dataclass
class Parser:
now: datetime.datetime
timedelta: datetime.timedelta
hour_delta_triggered: bool = False
minute_delta_triggered: bool = False
second_delta_triggered: bool = False
ampm_specification: Literal["AM", "PM", None, "ABSOLUTE"] = None
ampm_ismid: bool = False
hour_specification: int | None = None
minute_specification: int | None = None
time_spec_day_delta: int = 0
@property
def time_delta_triggered(self):
return self.hour_delta_triggered or self.minute_delta_triggered or self.second_delta_triggered
def __init__(self, now: datetime.datetime | None = None):
self.now = datetime.datetime.now() if now is None else now
self.CN_NUM = {
"": 0,
"": 1,
"": 2,
"": 2,
"": 3,
"": 4,
"": 5,
"": 6,
"": 7,
"": 8,
"": 9,
"": 10,
"": 100,
"": 1000,
}
self.CN_UNIT = {"": 1_0000, "亿": 1_0000_0000, "": 1_0000_0000_0000}
def clear_state(self):
self.timedelta = datetime.timedelta()
self.hour_delta_triggered = False
self.minute_delta_triggered = False
self.second_delta_triggered = False
self.ampm_specification = None
self.ampm_ismid = False
self.hour_specification = None
self.minute_specification = None
self.time_spec_day_delta = 0
def clean(self, content: str) -> str:
return re.sub(r"[ \t的]", "", content)
def parse(self, content: str) -> datetime.datetime:
self.clear_state()
content = self.clean(content)
content = self.digest_relative_date(content)
content = self.digest_weekday_relative(content)
content = self.digest_delta(content)
content = self.digest_date(content)
content = self.digest_early_late_hour(content)
content = self.digest_ampm_specific(content)
content = self.digest_time(content)
content = self.digest_ke(content)
if len(content) != 0:
raise TokenUnhandledException(content)
return self.build()
def digest_relative_date(self, content: str) -> str:
"""
处理明天、昨天、今天、后天、大后天、前天、大前天 等相对日期。
返回剩余未处理字符串。
"""
# 注意:这些词必须完整匹配开头,避免误匹配(如“明天”不能匹配“明天早上”中的“明”)
relative_days = {
"今天": 0,
"明天": 1,
"后天": 2,
"大后天": 3,
"昨日": -1,
"昨天": -1,
"前天": -2,
"大前天": -3,
}
for word, delta_days in relative_days.items():
if content.startswith(word):
# 如果已经设置了时间偏移(如 3 小时后),则冲突
if self.time_delta_triggered:
raise MultipleSpecificationException()
# 如果已经通过其他方式设置了 day delta如 digest_date 中),这里也应检查
# 为简化,我们直接设置
self.time_spec_day_delta = delta_days
return content[len(word):]
return content
def digest_timedelta(self, content: str) -> str:
"""
解析形如 "3天", "2小时", "1星期", "5个月" 等时间增量。
支持中文数字和阿拉伯数字。
返回未处理的剩余字符串。
"""
if content.startswith(""):
# "半"通常指"半小时"
remaining = content[1:]
# 检查是否有"小时"、"时"等
if remaining.startswith(("小时", "")):
if self.hour_delta_triggered:
raise MultipleSpecificationException()
self.hour_delta_triggered = True
self.timedelta = datetime.timedelta(minutes=30)
return remaining[len("小时") if remaining.startswith("小时") else len(""):]
elif remaining.startswith(("分钟", "")):
if self.minute_delta_triggered:
raise MultipleSpecificationException()
self.minute_delta_triggered = True
self.timedelta = datetime.timedelta(minutes=30)
return remaining[len("分钟") if remaining.startswith("分钟") else len(""):]
else:
# 默认为半小时
if self.hour_delta_triggered:
raise MultipleSpecificationException()
self.hour_delta_triggered = True
self.timedelta = datetime.timedelta(minutes=30)
return remaining
# 定义时间单位映射(注意:月需特殊处理)
unit_patterns = [
(r"(秒钟|秒)", "second"),
(r"(分钟|分)", "minute"),
(r"(时|小时|点)", "hour"),
(r"半(时|小时|点)", "hour+30"),
(r"(天|日)", "day"),
(r"(星期|周)", "week"),
(r"(月)", "month"), # 特殊按30天处理
]
remaining = content
delta_kwargs = {
"days": 0,
"seconds": 0,
"minutes": 0,
"hours": 0,
"weeks": 0,
}
month_count = 0 # 单独记录月,最后转为天
while True:
matched = False
for pattern, unit_type in unit_patterns:
m = re.match(rf"^([零一二两三四五六七八九十百千万亿兆]*|\d+)?个?({pattern})", remaining)
if m:
num_str = m.group(1)
if num_str is None or num_str == "":
num = 1 # 默认为1如“明天”实际是“1天后”
else:
# 尝试解析数字(中文或阿拉伯)
_, num = self.digest_chinese_number(num_str)
if num is None:
try:
num = int(num_str)
except ValueError:
continue # 无效数字,跳过
# 设置标志位,防止后续时间规格冲突
if unit_type == "hour":
if self.hour_delta_triggered:
raise MultipleSpecificationException()
self.hour_delta_triggered = True
elif unit_type == "hour+30":
if self.hour_delta_triggered:
raise MultipleSpecificationException()
self.hour_delta_triggered = True
if self.minute_delta_triggered:
raise MultipleSpecificationException()
self.minute_delta_triggered = True
elif unit_type == "minute":
if self.minute_delta_triggered:
raise MultipleSpecificationException()
self.minute_delta_triggered = True
elif unit_type == "second":
if self.second_delta_triggered:
raise MultipleSpecificationException()
self.second_delta_triggered = True
# 累加到对应单位
if unit_type == "second":
delta_kwargs["seconds"] += num
elif unit_type == "minute":
delta_kwargs["minutes"] += num
elif unit_type == "hour":
delta_kwargs["hours"] += num
elif unit_type == "day":
delta_kwargs["days"] += num
elif unit_type == "week":
delta_kwargs["weeks"] += num
elif unit_type == "month":
month_count += num
elif unit_type == "hour+30":
delta_kwargs["hours"] += num
delta_kwargs["minutes"] += 30
# 更新剩余字符串
remaining = remaining[len(m.group(0)):]
matched = True
break
if not matched:
break
# 处理“月” → 按30天/月估算(简单处理)
if month_count > 0:
delta_kwargs["days"] += month_count * 30
# 构建 timedelta
self.timedelta = datetime.timedelta(
days=delta_kwargs["days"],
seconds=delta_kwargs["seconds"],
minutes=delta_kwargs["minutes"],
hours=delta_kwargs["hours"],
weeks=delta_kwargs["weeks"]
)
return remaining
def digest_delta(self, content: str) -> str:
if "" in content:
c1, _ = content.split("", 1)
c1 = self.digest_timedelta(c1)
if c1 != "":
raise TokenUnhandledException(c1)
return c1
if "" in content:
c1, _ = content.split("", 1)
c1 = self.digest_timedelta(c1)
self.timedelta = -self.timedelta
if c1 != "":
raise TokenUnhandledException(c1)
return c1
return content
def digest_date(self, content: str) -> str:
# 1. 尝试 ISO 格式: 2025-10-09T15:30 或 2025-10-09
iso_match = re.match(r"^(\d{4})-(\d{1,2})-(\d{1,2})(?:T(\d{1,2}):(\d{1,2}))?", content)
if iso_match:
year, month, day = int(iso_match.group(1)), int(iso_match.group(2)), int(iso_match.group(3))
try:
target_date = datetime.date(year, month, day)
self.time_spec_day_delta = (target_date - self.now.date()).days
remaining = content[len(iso_match.group(0)):]
if iso_match.group(4): # 有时间
hour = int(iso_match.group(4))
minute = int(iso_match.group(5)) if iso_match.group(5) else 0
self.hour_specification = hour
self.minute_specification = minute
return remaining
except ValueError:
pass
# 2. 尝试 YYYY年MM月DD日
full_date_match = re.match(r"^(\d{4})年(\d{1,2})月(\d{1,2})日", content)
if full_date_match:
year, month, day = map(int, full_date_match.groups())
try:
target_date = datetime.date(year, month, day)
self.time_spec_day_delta = (target_date - self.now.date()).days
return content[len(full_date_match.group(0)):]
except ValueError:
pass
# 3. 尝试 MM月DD日默认今年
md_match = re.match(r"^(\d{1,2})月(\d{1,2})日", content)
if md_match:
month, day = map(int, md_match.groups())
year = self.now.year
try:
target_date = datetime.date(year, month, day)
self.time_spec_day_delta = (target_date - self.now.date()).days
return content[len(md_match.group(0)):]
except ValueError:
pass
# 4. 尝试 YYYY/MM/DD
slash_full = re.match(r"^(\d{4})/(\d{1,2})/(\d{1,2})", content)
if slash_full:
year, month, day = map(int, slash_full.groups())
try:
target_date = datetime.date(year, month, day)
self.time_spec_day_delta = (target_date - self.now.date()).days
return content[len(slash_full.group(0)):]
except ValueError:
pass
# 5. 尝试 MM/DD
slash_md = re.match(r"^(\d{1,2})/(\d{1,2})", content)
if slash_md:
month, day = map(int, slash_md.groups())
year = self.now.year
try:
target_date = datetime.date(year, month, day)
self.time_spec_day_delta = (target_date - self.now.date()).days
return content[len(slash_md.group(0)):]
except ValueError:
pass
# 6. 中文月日:十月九日
cn_md_match = re.match(r"^([一二三四五六七八九十]+)月([一二三四五六七八九十]+)日", content)
if cn_md_match:
month_str, day_str = cn_md_match.groups()
_, month_num = self.digest_chinese_number(month_str + "")
_, day_num = self.digest_chinese_number(day_str + "")
if month_num is not None and day_num is not None:
year = self.now.year
try:
target_date = datetime.date(year, month_num, day_num)
self.time_spec_day_delta = (target_date - self.now.date()).days
return content[len(cn_md_match.group(0)):]
except ValueError:
pass
return content
def digest_time(self, content: str) -> str:
content = self.digest_single_hour(content)
return content
def _chinese_to_int_final(self, cn_str: str) -> int:
result = 0
num_section = 0
for char in cn_str:
if char in self.CN_NUM:
val = self.CN_NUM[char]
if val <= 9:
num_section = val
elif val >= 10:
if num_section == 0 and val == 10:
num_section = 1
result += num_section * val
num_section = 0
result += num_section
return result
def digest_chinese_number(self, content: str) -> tuple[str, int | None]:
"""
识别字符串开头的中文数字并将其转换为整数。
处理范围零到九千九百九十九万九千九百九十九亿九千九百九十九万九千九百九十九约10^16
"""
CN_CHARS = "".join(self.CN_NUM.keys()) + "".join(self.CN_UNIT.keys())
m = re.match(f"^([{CN_CHARS}]+)", content)
if m is None:
return content, None
cn_num_str = m.group(1)
if not cn_num_str:
return content, None
remaining_content = content[len(cn_num_str) :]
if cn_num_str == "":
return remaining_content, 0
if cn_num_str == "":
return remaining_content, 1
if cn_num_str in self.CN_NUM and self.CN_NUM[cn_num_str] <= 9:
return remaining_content, self.CN_NUM[cn_num_str]
pattern = re.compile(r"([^万亿兆]*)([万亿兆]?)")
parts = pattern.findall(cn_num_str)
parts.reverse()
current_unit = 1
total_num = 0
for num_str, unit_char in parts:
if not num_str and not unit_char:
continue
if unit_char in self.CN_UNIT:
current_unit = self.CN_UNIT[unit_char]
if num_str:
section_num = self._chinese_to_int_final(num_str)
total_num += section_num * current_unit
if unit_char in self.CN_UNIT:
pass
elif not unit_char:
current_unit = 1
return remaining_content, total_num
def digest_number(self, content: str) -> tuple[str, int | None]:
c1, num = self.digest_chinese_number(content)
if num is not None:
return c1, num
m = re.match(r"^(\d+)(.+)$", content)
if m is not None:
return m.group(2), int(m.group(1))
return content, None
def digest_ampm_specific(self, content: str) -> str:
am_patterns = ["凌晨", "早上", "上午", "早晨", ""]
pm_patterns = ["中午", "下午", "晚上", "傍晚", ""]
for pat in am_patterns:
if content.startswith(pat):
self.ampm_specification = "AM"
return content[len(pat):]
for pat in pm_patterns:
if content.startswith(pat):
self.ampm_specification = "PM"
if pat == '中午':
self.ampm_ismid = True
return content[len(pat):]
return content
def digest_single_hour(self, content: str) -> str:
c1, num = self.digest_number(content)
if num is None:
return content
if self.time_delta_triggered:
raise MultipleSpecificationException()
self.hour_specification = num
if c1.startswith(""):
c1 = c1[1:]
elif c1.startswith(""):
c1 = c1[1:]
if self.ampm_specification is None:
self.ampm_specification = "ABSOLUTE"
else:
return content
if c1.startswith(''):
c1 = c1[1:]
if c1.startswith(''):
c1 = c1[1:]
self.minute_specification = 0
elif c1.startswith(''):
c1 = c1[1:]
self.minute_specification = 30
if c1.startswith(''):
c1 = c1[1:]
return c1
def digest_ke(self, content: str) -> str:
for pat in ("一刻", "过一刻"):
if content.startswith(pat):
if self.minute_specification is not None:
raise MultipleSpecificationException()
self.minute_specification = 15
return content[len(pat):]
for pat in ("两刻", "过两刻"):
if content.startswith(pat):
if self.minute_specification is not None:
raise MultipleSpecificationException()
self.minute_specification = 30
return content[len(pat):]
for pat in ("三刻", "过三刻"):
if content.startswith(pat):
if self.minute_specification is not None:
raise MultipleSpecificationException()
self.minute_specification = 45
return content[len(pat):]
return content
def digest_early_late_hour(self, content: str) -> str:
if not (content.startswith("") or content.startswith("")):
return content
if self.time_delta_triggered:
raise MultipleSpecificationException()
if self.hour_specification is not None:
raise MultipleSpecificationException()
if self.ampm_specification not in (None, "ABSOLUTE"):
raise MultipleSpecificationException()
prefix = "" if content.startswith("") else ""
rest = content[1:]
remaining, num = self.digest_number(rest)
if num is None:
return content
if not (0 <= num <= 12):
return content
if prefix == "":
self.ampm_specification = "AM"
hour = num
if hour == 12:
hour = 0
else:
self.ampm_specification = "PM"
if num == 12:
hour = 0
else:
hour = num
self.hour_specification = hour
self.minute_specification = 0
return remaining
def _find_weekday(self, week_offset: int, target_weekday: int) -> datetime.datetime:
"""
计算相对周的目标星期几。
:param week_offset: 0=本周, 1=下周, -1=上周
:param target_weekday: Monday=0, Sunday=6 (与 datetime.weekday() 一致)
:return: 对应的 datetime时间部分设为 00:00:00
"""
# 本周一的日期(假设周一为每周开始)
today = self.now.date()
days_since_monday = today.weekday() # Monday is 0
this_monday = today - datetime.timedelta(days=days_since_monday)
# 目标周一
target_monday = this_monday + datetime.timedelta(weeks=week_offset)
# 目标星期几
target_date = target_monday + datetime.timedelta(days=target_weekday)
# 返回 datetime时间归零与“明天”行为一致
return datetime.datetime.combine(target_date, datetime.time.min)
def digest_weekday_relative(self, content: str) -> str:
"""
支持:本周五、下周三、上周一、这周五、下周一 等
返回剩余字符串。
"""
# 星期映射支持星期一、周1、周五 等)
weekday_map = {
"": 0, "1": 0,
"": 1, "2": 1,
"": 2, "3": 2,
"": 3, "4": 3,
"": 4, "5": 4,
"": 5, "6": 5,
"": 6, "": 6, "7": 6,
}
# 周偏移映射
week_offset_map = {
"本周": 0,
"这周": 0,
"下周": 1,
"下下周": 2, # 可选扩展
"上周": -1,
"上上周": -2, # 可选扩展
}
# 尝试匹配 [周标识][星期限定]
for week_key, week_offset in week_offset_map.items():
if content.startswith(week_key):
rest = content[len(week_key):]
if rest.startswith("星期"):
rest = rest[2:]
elif rest.startswith(""):
rest = rest[1:]
if rest and (c := rest[0]) in weekday_map:
target_wd = weekday_map[c]
rest = rest[1:]
else:
continue
if self.time_delta_triggered or self.time_spec_day_delta != 0:
raise MultipleSpecificationException()
target_dt = self._find_weekday(week_offset, target_wd)
# 设置 day delta 相对于 now 的 00:00
base_date = self.now.replace(hour=0, minute=0, second=0, microsecond=0).date()
delta_days = (target_dt.date() - base_date).days
self.time_spec_day_delta = delta_days
return rest
return content
def build(self) -> datetime.datetime:
t = self.now
if not self.time_delta_triggered:
t = t.replace(hour=0, minute=0, second=0, microsecond=0)
if self.hour_specification is not None:
hour = self.hour_specification
if self.ampm_specification == "AM":
if hour == 12:
hour = 0
elif self.ampm_specification == "PM":
if hour != 12:
hour += 12
elif self.ampm_ismid:
hour = 12
else:
hour = 0
self.time_spec_day_delta += 1
elif self.ampm_specification is None:
if hour < self.now.hour and hour < 13:
hour += 12
t = t.replace(hour=hour)
if self.minute_specification is not None:
t = t.replace(minute=self.minute_specification)
t += datetime.timedelta(days=self.time_spec_day_delta)
t += self.timedelta
return t
def parse(content: str) -> datetime.datetime:
return Parser().parse(content)