Files
konabot/konabot/common/ptimeparse/__init__.py
passthem e09de9eeb6
All checks were successful
continuous-integration/drone/push Build is passing
更改使用 uv 而非 poetry 管理 Docker 内部依赖
2025-11-10 04:41:05 +08:00

654 lines
22 KiB
Python

import re
import datetime
from typing import Tuple, Optional, Dict, Any
from .err import MultipleSpecificationException, TokenUnhandledException
class Parser:
def __init__(self, now: Optional[datetime.datetime] = None):
self.now = now or datetime.datetime.now()
def digest_chinese_number(self, text: str) -> Tuple[str, int]:
if not text:
return text, 0
# Handle "两" at start
if text.startswith(""):
next_char = text[1] if len(text) > 1 else ''
if not next_char or next_char in "十百千万亿":
return text[1:], 2
s = "零一二三四五六七八九"
digits = {c: i for i, c in enumerate(s)}
i = 0
while i < len(text) and text[i] in s + "十百千万亿":
i += 1
if i == 0:
return text, 0
num_str = text[:i]
rest = text[i:]
def parse(s):
if not s:
return 0
if s == "":
return 0
if "亿" in s:
a, b = s.split("亿", 1)
return parse(a) * 100000000 + parse(b)
if "" in s:
a, b = s.split("", 1)
return parse(a) * 10000 + parse(b)
n = 0
t = 0
for c in s:
if c == "":
continue
if c in digits:
t = digits[c]
elif c == "":
if t == 0:
t = 1
n += t * 10
t = 0
elif c == "":
if t == 0:
t = 1
n += t * 100
t = 0
elif c == "":
if t == 0:
t = 1
n += t * 1000
t = 0
n += t
return n
return rest, parse(num_str)
def parse(self, text: str) -> datetime.datetime:
text = text.strip()
if not text:
raise TokenUnhandledException("Empty input")
ctx = {
"date": None,
"time": None,
"relative_delta": None,
"am_pm": None,
"period_word": None,
"has_time": False,
"has_date": False,
"ambiguous_hour": False,
"is_24hour": False,
"has_relative_date": False,
}
rest = self._parse_all(text, ctx)
if rest.strip():
raise TokenUnhandledException(f"Unparsed tokens: {rest.strip()}")
return self._apply_context(ctx)
def _parse_all(self, text: str, ctx: Dict[str, Any]) -> str:
rest = text.lstrip()
while True:
for parser in [
self._parse_absolute_date,
self._parse_relative_date,
self._parse_relative_time,
self._parse_period,
self._parse_time,
]:
new_rest = parser(rest, ctx)
if new_rest != rest:
rest = new_rest.lstrip()
break
else:
break
return rest
def _add_delta(self, ctx, delta):
if ctx["relative_delta"] is None:
ctx["relative_delta"] = delta
else:
ctx["relative_delta"] += delta
def _parse_absolute_date(self, text: str, ctx: Dict[str, Any]) -> str:
text = text.lstrip()
m = re.match(r"^(\d{4})-(\d{1,2})-(\d{1,2})T(\d{1,2}):(\d{2})", text)
if m:
y, mth, d, h, minute = map(int, m.groups())
ctx["date"] = datetime.date(y, mth, d)
ctx["time"] = datetime.time(h, minute)
ctx["has_date"] = True
ctx["has_time"] = True
ctx["is_24hour"] = True
return text[m.end():]
m = re.match(r"^(\d{4})-(\d{1,2})-(\d{1,2})", text)
if m:
y, mth, d = map(int, m.groups())
ctx["date"] = datetime.date(y, mth, d)
ctx["has_date"] = True
return text[m.end():]
m = re.match(r"^(\d{4})/(\d{1,2})/(\d{1,2})", text)
if m:
y, mth, d = map(int, m.groups())
ctx["date"] = datetime.date(y, mth, d)
ctx["has_date"] = True
return text[m.end():]
m = re.match(r"^(\d{4})年(\d{1,2})月(\d{1,2})[日号]", text)
if m:
y, mth, d = map(int, m.groups())
ctx["date"] = datetime.date(y, mth, d)
ctx["has_date"] = True
return text[m.end():]
m = re.match(r"^(\d{1,2})月(\d{1,2})[日号]", text)
if m:
mth, d = map(int, m.groups())
ctx["date"] = datetime.date(self.now.year, mth, d)
ctx["has_date"] = True
return text[m.end():]
m = re.match(r"^(.{1,3})月(.{1,3})[日号]", text)
if m:
m_str, d_str = m.groups()
_, mth = self.digest_chinese_number(m_str)
_, d = self.digest_chinese_number(d_str)
if mth == 0:
mth = 1
if d == 0:
d = 1
ctx["date"] = datetime.date(self.now.year, mth, d)
ctx["has_date"] = True
return text[m.end():]
return text
def _parse_relative_date(self, text: str, ctx: Dict[str, Any]) -> str:
text = text.lstrip()
# Handle "今天", "今晚", "今早", etc.
today_variants = [
("今晚上", "PM"),
("今晚", "PM"),
("今早", "AM"),
("今天早上", "AM"),
("今天早晨", "AM"),
("今天上午", "AM"),
("今天下午", "PM"),
("今天晚上", "PM"),
("今天", None),
]
for variant, period in today_variants:
if text.startswith(variant):
self._add_delta(ctx, datetime.timedelta(days=0))
ctx["has_relative_date"] = True
rest = text[len(variant):]
if period is not None and ctx["am_pm"] is None:
ctx["am_pm"] = period
ctx["period_word"] = variant
return rest
mapping = {
"明天": 1,
"后天": 2,
"大后天": 3,
"昨天": -1,
"前天": -2,
"大前天": -3,
}
for word, days in mapping.items():
if text.startswith(word):
self._add_delta(ctx, datetime.timedelta(days=days))
ctx["has_relative_date"] = True
return text[len(word):]
m = re.match(r"^(\d+|[零一二三四五六七八九十两]+)天(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
n = int(num_str)
else:
_, n = self.digest_chinese_number(num_str)
days = n if direction in ("", "以后", "之后") else -n
self._add_delta(ctx, datetime.timedelta(days=days))
ctx["has_relative_date"] = True
return text[m.end():]
m = re.match(r"^(本|上|下)周([一二三四五六日])", text)
if m:
scope, day = m.groups()
weekday_map = {"": 0, "": 1, "": 2, "": 3, "": 4, "": 5, "": 6}
target = weekday_map[day]
current = self.now.weekday()
if scope == "":
delta = target - current
elif scope == "":
delta = target - current - 7
else:
delta = target - current + 7
self._add_delta(ctx, datetime.timedelta(days=delta))
ctx["has_relative_date"] = True
return text[m.end():]
return text
def _parse_period(self, text: str, ctx: Dict[str, Any]) -> str:
text = text.lstrip()
period_mapping = {
"上午": "AM",
"早晨": "AM",
"早上": "AM",
"": "AM",
"中午": "PM",
"下午": "PM",
"晚上": "PM",
"": "PM",
"凌晨": "AM",
}
for word, tag in period_mapping.items():
if text.startswith(word):
if ctx["am_pm"] is not None:
raise MultipleSpecificationException("Multiple periods")
ctx["am_pm"] = tag
ctx["period_word"] = word
return text[len(word):]
return text
def _parse_time(self, text: str, ctx: Dict[str, Any]) -> str:
if ctx["has_time"]:
return text
text = text.lstrip()
# 1. H:MM pattern
m = re.match(r"^(\d{1,2}):(\d{2})", text)
if m:
h, minute = int(m.group(1)), int(m.group(2))
if 0 <= h <= 23 and 0 <= minute <= 59:
ctx["time"] = datetime.time(h, minute)
ctx["has_time"] = True
ctx["ambiguous_hour"] = 1 <= h <= 12
ctx["is_24hour"] = h > 12 or h == 0
return text[m.end():]
# 2. Parse hour part
hour = None
rest_after_hour = text
is_24hour_format = False
# Try Chinese number + 点/时
temp_rest, num = self.digest_chinese_number(text)
if num >= 0:
temp_rest_stripped = temp_rest.lstrip()
if temp_rest_stripped.startswith(""):
hour = num
is_24hour_format = False
rest_after_hour = temp_rest_stripped[1:]
elif temp_rest_stripped.startswith(""):
hour = num
is_24hour_format = True
rest_after_hour = temp_rest_stripped[1:]
if hour is None:
m = re.match(r"^(\d{1,2})\s*([点时])", text)
if m:
hour = int(m.group(1))
is_24hour_format = m.group(2) == ""
rest_after_hour = text[m.end():]
if hour is None:
if ctx.get("am_pm") is not None:
temp_rest, num = self.digest_chinese_number(text)
if 0 <= num <= 23:
hour = num
is_24hour_format = False
rest_after_hour = temp_rest.lstrip()
else:
m = re.match(r"^(\d{1,2})", text)
if m:
h_val = int(m.group(1))
if 0 <= h_val <= 23:
hour = h_val
is_24hour_format = False
rest_after_hour = text[m.end():].lstrip()
if hour is None:
return text
if not (0 <= hour <= 23):
return text
# Parse minutes
rest = rest_after_hour.lstrip()
minute = 0
minute_spec_count = 0
if rest.startswith(""):
rest = rest[1:].lstrip()
has_zheng = False
if rest.startswith(""):
has_zheng = True
rest = rest[1:].lstrip()
if rest.startswith(""):
minute = 30
minute_spec_count += 1
rest = rest[1:].lstrip()
if rest.startswith(""):
rest = rest[1:].lstrip()
if rest.startswith(""):
rest = rest[1:].lstrip()
if rest.startswith("一刻"):
minute = 15
minute_spec_count += 1
rest = rest[2:].lstrip()
if rest.startswith(""):
rest = rest[1:].lstrip()
if rest.startswith("过一刻"):
minute = 15
minute_spec_count += 1
rest = rest[3:].lstrip()
if rest.startswith(""):
rest = rest[1:].lstrip()
m = re.match(r"^(\d+|[零一二三四五六七八九十]+)分", rest)
if m:
minute_spec_count += 1
m_str = m.group(1)
if m_str.isdigit():
minute = int(m_str)
else:
_, minute = self.digest_chinese_number(m_str)
rest = rest[m.end():].lstrip()
if minute_spec_count == 0:
temp_rest, num = self.digest_chinese_number(rest)
if num > 0 and num <= 59:
minute = num
minute_spec_count += 1
rest = temp_rest.lstrip()
else:
m = re.match(r"^(\d{1,2})", rest)
if m:
m_val = int(m.group(1))
if 0 <= m_val <= 59:
minute = m_val
minute_spec_count += 1
rest = rest[m.end():].lstrip()
if has_zheng and minute_spec_count == 0:
minute_spec_count = 1
if minute_spec_count > 1:
raise MultipleSpecificationException("Multiple minute specifications")
if not (0 <= minute <= 59):
return text
# Hours 13-23 are always 24-hour, even with "点"
if hour >= 13:
is_24hour_format = True
ctx["time"] = datetime.time(hour, minute)
ctx["has_time"] = True
ctx["ambiguous_hour"] = 1 <= hour <= 12 and not is_24hour_format
ctx["is_24hour"] = is_24hour_format
return rest
def _parse_relative_time(self, text: str, ctx: Dict[str, Any]) -> str:
text = text.lstrip()
# 半小时
m = re.match(r"^(半)(?:个)?小时?(后|前|以后|之后)", text)
if m:
direction = m.group(2)
hours = 0.5
delta = datetime.timedelta(
hours=hours if direction in ("", "以后", "之后") else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
# X个半
m = re.match(r"^([0-9零一二三四五六七八九十两]+)个半(?:小时?)?(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
base_hours = int(num_str)
else:
_, base_hours = self.digest_chinese_number(num_str)
if base_hours == 0 and num_str != "":
return text
if base_hours <= 0:
return text
hours = base_hours + 0.5
delta = datetime.timedelta(
hours=hours if direction in ("", "以后", "之后") else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
# 一个半
m = re.match(r"^(一个半)小时?(后|前|以后|之后)", text)
if m:
direction = m.group(2)
hours = 1.5
delta = datetime.timedelta(
hours=hours if direction in ("", "以后", "之后") else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
# X小时
m = re.match(r"^([0-9零一二三四五六七八九十两]+)(?:个)?小时?(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
hours = int(num_str)
else:
_, hours = self.digest_chinese_number(num_str)
if hours == 0 and num_str != "":
return text
if hours <= 0:
return text
delta = datetime.timedelta(
hours=hours if direction in ("", "以后", "之后") else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)(?:个)?小时(后|前)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
hours = int(num_str)
else:
_, hours = self.digest_chinese_number(num_str)
if hours == 0 and num_str != "":
return text
if hours <= 0:
return text
delta = datetime.timedelta(
hours=hours if direction == "" else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
# X分钟
m = re.match(r"^([0-9零一二三四五六七八九十两]+)分钟?(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
minutes = int(num_str)
else:
_, minutes = self.digest_chinese_number(num_str)
if minutes == 0 and num_str != "":
return text
if minutes <= 0:
return text
delta = datetime.timedelta(
minutes=minutes if direction in ("", "以后", "之后") else -minutes
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)分(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
minutes = int(num_str)
else:
_, minutes = self.digest_chinese_number(num_str)
if minutes == 0 and num_str != "":
return text
if minutes <= 0:
return text
delta = datetime.timedelta(
minutes=minutes if direction in ("", "以后", "之后") else -minutes
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)分钟?(后|前)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
minutes = int(num_str)
else:
_, minutes = self.digest_chinese_number(num_str)
if minutes == 0 and num_str != "":
return text
if minutes <= 0:
return text
delta = datetime.timedelta(
minutes=minutes if direction == "" else -minutes
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)分(后|前)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
minutes = int(num_str)
else:
_, minutes = self.digest_chinese_number(num_str)
if minutes == 0 and num_str != "":
return text
if minutes <= 0:
return text
delta = datetime.timedelta(
minutes=minutes if direction == "" else -minutes
)
self._add_delta(ctx, delta)
return text[m.end():]
# === 秒级支持 ===
m = re.match(r"^([0-9零一二三四五六七八九十两]+)秒(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
seconds = int(num_str)
else:
_, seconds = self.digest_chinese_number(num_str)
if seconds == 0 and num_str != "":
return text
if seconds <= 0:
return text
delta = datetime.timedelta(
seconds=seconds if direction in ("", "以后", "之后") else -seconds
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)秒(后|前)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
seconds = int(num_str)
else:
_, seconds = self.digest_chinese_number(num_str)
if seconds == 0 and num_str != "":
return text
if seconds <= 0:
return text
delta = datetime.timedelta(
seconds=seconds if direction == "" else -seconds
)
self._add_delta(ctx, delta)
return text[m.end():]
return text
def _apply_context(self, ctx: Dict[str, Any]) -> datetime.datetime:
result = self.now
has_date = ctx["has_date"]
has_time = ctx["has_time"]
has_delta = ctx["relative_delta"] is not None
has_relative_date = ctx["has_relative_date"]
if has_delta:
result = result + ctx["relative_delta"]
if has_date:
result = result.replace(
year=ctx["date"].year,
month=ctx["date"].month,
day=ctx["date"].day,
)
if has_time:
h = ctx["time"].hour
m = ctx["time"].minute
if ctx["is_24hour"]:
# "10 时" → 10:00, no conversion
pass
elif ctx["am_pm"] == "AM":
if h == 12:
h = 0
elif ctx["am_pm"] == "PM":
if h == 12:
if ctx.get("period_word") in ("晚上", ""):
h = 0
result += datetime.timedelta(days=1)
else:
h = 12
elif 1 <= h <= 11:
h += 12
else:
# No period and not 24-hour (i.e., "点" format)
if ctx["has_relative_date"]:
# "明天五点" → 05:00 AM
if h == 12:
h = 0
# keep h as AM hour (1-11 unchanged)
else:
# Infer from current time
am_hour = 0 if h == 12 else h
candidate_am = result.replace(hour=am_hour, minute=m, second=0, microsecond=0)
if candidate_am < self.now:
# AM time is in the past, so use PM
if h == 12:
h = 12
else:
h += 12
# else: keep as AM (h unchanged)
if h > 23:
h = h % 24
result = result.replace(hour=h, minute=m, second=0, microsecond=0)
else:
if has_date or (has_relative_date and not has_time):
result = result.replace(hour=0, minute=0, second=0, microsecond=0)
return result
def parse(text: str) -> datetime.datetime:
return Parser().parse(text)