From 3e5c1941c8842c3326c7f90c18cbb4cf16cada4c Mon Sep 17 00:00:00 2001 From: passthem Date: Fri, 21 Nov 2025 06:03:28 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=20ptimeparse=20=E6=A8=A1?= =?UTF-8?q?=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- QWEN.md | 187 +++++ konabot/common/ptimeparse/__init__.py | 707 ++-------------- konabot/common/ptimeparse/chinese_number.py | 133 +++ konabot/common/ptimeparse/expression.py | 63 ++ konabot/common/ptimeparse/lexer.py | 225 ++++++ konabot/common/ptimeparse/parser.py | 846 ++++++++++++++++++++ konabot/common/ptimeparse/ptime_ast.py | 72 ++ konabot/common/ptimeparse/ptime_token.py | 95 +++ konabot/common/ptimeparse/semantic.py | 369 +++++++++ konabot/plugins/simple_notify/__init__.py | 5 +- scripts/watch_filter.py | 1 + 11 files changed, 2058 insertions(+), 645 deletions(-) create mode 100644 QWEN.md create mode 100644 konabot/common/ptimeparse/chinese_number.py create mode 100644 konabot/common/ptimeparse/expression.py create mode 100644 konabot/common/ptimeparse/lexer.py create mode 100644 konabot/common/ptimeparse/parser.py create mode 100644 konabot/common/ptimeparse/ptime_ast.py create mode 100644 konabot/common/ptimeparse/ptime_token.py create mode 100644 konabot/common/ptimeparse/semantic.py diff --git a/QWEN.md b/QWEN.md new file mode 100644 index 0000000..5def88b --- /dev/null +++ b/QWEN.md @@ -0,0 +1,187 @@ +# Konabot Project Context + +## Project Overview + +Konabot is a multi-platform chatbot built using the NoneBot2 framework, primarily used within MTTU (likely an organization or community). The bot supports multiple adapters including Discord, QQ (via Onebot), Minecraft, and Console interfaces. + +### Key Features +- Multi-platform support (Discord, QQ, Minecraft, Console) +- Rich plugin ecosystem with over 20 built-in plugins +- Asynchronous database system with connection pooling (SQLite-based) +- Advanced image processing capabilities +- Integration with external services like Bilibili analysis +- Support for Large Language Models (LLM) +- Web rendering capabilities for advanced image generation + +## Technology Stack + +- **Framework**: NoneBot2 +- **Language**: Python 3.12+ +- **Dependency Management**: Poetry +- **Database**: SQLite with aiosqlite for async operations +- **Build System**: Just (task runner) +- **Containerization**: Docker +- **CI/CD**: Drone CI +- **Testing**: Pytest + +## Project Structure + +``` +konabot/ +├── bot.py # Main entry point +├── pyproject.toml # Project dependencies and metadata +├── justfile # Task definitions +├── Dockerfile # Container build definition +├── .drone.yml # CI/CD pipeline configuration +├── konabot/ # Main source code +│ ├── common/ # Shared utilities and modules +│ │ ├── database/ # Async database manager with connection pooling +│ │ ├── llm/ # Large Language Model integration +│ │ ├── web_render/ # Web-based image rendering +│ │ └── ... # Other utilities +│ ├── plugins/ # Plugin modules (core functionality) +│ │ ├── air_conditioner/ +│ │ ├── bilibili_fetch/ +│ │ ├── gen_qrcode/ +│ │ ├── hanzi/ +│ │ ├── idiomgame/ +│ │ ├── image_process/ +│ │ ├── roll_dice/ +│ │ ├── weather/ +│ │ └── ... (20+ plugins) +│ └── test/ +├── tests/ # Test suite +├── scripts/ # Utility scripts +├── docs/ # Documentation +├── assets/ # Static assets +└── data/ # Runtime data storage +``` + +## Development Environment Setup + +### Prerequisites +- Python 3.12+ +- Git +- Poetry (installed via pipx) + +### Installation Steps +1. Clone the repository: + ```bash + git clone https://gitea.service.jazzwhom.top/Passthem/konabot.git + cd konabot + ``` +2. Install dependencies: + ```bash + poetry install + ``` +3. Configure environment: + - Copy `.env.example` to `.env` + - Modify settings as needed for your platform adapters + +### Platform Adapters Configuration +- **Discord**: Set `ENABLE_DISCORD=true` and configure bot token +- **QQ (Onebot)**: Set `ENABLE_QQ=true` and configure connection +- **Console**: Enabled by default, disable with `ENABLE_CONSOLE=false` +- **Minecraft**: Set `ENABLE_MINECRAFT=true` + +## Building and Running + +### Development +- Auto-reload development mode: + ```bash + poetry run just watch + ``` +- Manual start: + ```bash + poetry run python bot.py + ``` + +### Production +- Docker container build and run: + ```bash + docker build -t konabot . + docker run konabot + ``` + +## Testing + +Run the test suite with: +```bash +poetry run pytest +``` + +Tests are located in the `tests/` directory and focus primarily on core functionality like the database manager. + +## Database System + +The project implements a custom asynchronous database manager (`konabot/common/database/__init__.py`) with these features: +- Connection pooling for performance +- Parameterized queries for security +- SQL file execution support +- Support for both string and Path objects for file paths +- Automatic resource management + +Example usage: +```python +from konabot.common.database import DatabaseManager + +db = DatabaseManager() +results = await db.query("SELECT * FROM users WHERE age > ?", (18,)) +await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("John", "john@example.com")) +``` + +## Plugin Architecture + +Plugins are organized in `konabot/plugins/` and follow the NoneBot2 plugin structure. Each plugin typically consists of: +- `__init__.py`: Main plugin logic using Alconna command parser +- Supporting modules for specific functionality + +Popular plugins include: +- `roll_dice`: Dice rolling with image generation +- `weather`: Weather radar image fetching +- `bilibili_fetch`: Bilibili video analysis +- `image_process`: Image manipulation tools +- `markdown`: Markdown rendering + +## CI/CD Pipeline + +Drone CI is configured with two pipelines: +1. **Nightly builds**: Triggered on pushes to master branch +2. **Release builds**: Triggered on git tags + +Both pipelines: +- Build Docker images +- Test plugin loading +- Verify Playwright functionality +- Send notifications via ntfy + +## Development Conventions + +- Use Poetry for dependency management +- Follow NoneBot2 plugin development patterns +- Write async code for database operations +- Use Alconna for command parsing +- Organize SQL queries in separate files when complex +- Write tests for core functionality +- Document features in the `docs/` directory + +## Common Development Tasks + +1. **Add a new plugin**: + - Create a new directory in `konabot/plugins/` + - Implement functionality in `__init__.py` + - Use Alconna for command definition + +2. **Database operations**: + - Use the `DatabaseManager` class + - Always parameterize queries + - Store complex SQL in separate `.sql` files + +3. **Image processing**: + - Leverage existing utilities in `image_process` plugin + - Use Pillow and Skia-Python for advanced graphics + +4. **Testing**: + - Add tests to the `tests/` directory + - Use pytest with async support + - Mock external services when needed \ No newline at end of file diff --git a/konabot/common/ptimeparse/__init__.py b/konabot/common/ptimeparse/__init__.py index c7aee27..0bb1cac 100644 --- a/konabot/common/ptimeparse/__init__.py +++ b/konabot/common/ptimeparse/__init__.py @@ -1,653 +1,74 @@ -import re -import datetime -from typing import Tuple, Optional, Dict, Any +""" +Professional time parsing module for Chinese and English time expressions. -from .err import MultipleSpecificationException, TokenUnhandledException +This module provides a robust parser for natural language time expressions, +supporting both Chinese and English formats with proper whitespace handling. +""" + +import datetime +from typing import Optional + +from .expression import TimeExpression +from .err import TokenUnhandledException, MultipleSpecificationException + + +def parse(text: str, now: Optional[datetime.datetime] = None) -> datetime.datetime: + """ + Parse a time expression and return a datetime object. + + Args: + text: The time expression to parse + now: The reference time (defaults to current time) + + Returns: + A datetime object representing the parsed time + + Raises: + TokenUnhandledException: If the input cannot be parsed + """ + return TimeExpression.parse(text, now) class Parser: + """ + Parser for time expressions with backward compatibility. + + Maintains the original interface: + >>> parser = Parser() + >>> result = parser.parse("10分钟后") + """ + def __init__(self, now: Optional[datetime.datetime] = None): self.now = now or datetime.datetime.now() - - def digest_chinese_number(self, text: str) -> Tuple[str, int]: - if not text: - return text, 0 - # Handle "两" at start - if text.startswith("两"): - next_char = text[1] if len(text) > 1 else '' - if not next_char or next_char in "十百千万亿": - return text[1:], 2 - s = "零一二三四五六七八九" - digits = {c: i for i, c in enumerate(s)} - i = 0 - while i < len(text) and text[i] in s + "十百千万亿": - i += 1 - if i == 0: - return text, 0 - num_str = text[:i] - rest = text[i:] - - def parse(s): - if not s: - return 0 - if s == "零": - return 0 - if "亿" in s: - a, b = s.split("亿", 1) - return parse(a) * 100000000 + parse(b) - if "万" in s: - a, b = s.split("万", 1) - return parse(a) * 10000 + parse(b) - n = 0 - t = 0 - for c in s: - if c == "零": - continue - if c in digits: - t = digits[c] - elif c == "十": - if t == 0: - t = 1 - n += t * 10 - t = 0 - elif c == "百": - if t == 0: - t = 1 - n += t * 100 - t = 0 - elif c == "千": - if t == 0: - t = 1 - n += t * 1000 - t = 0 - n += t - return n - - return rest, parse(num_str) - + def parse(self, text: str) -> datetime.datetime: - text = text.strip() - if not text: - raise TokenUnhandledException("Empty input") - - ctx = { - "date": None, - "time": None, - "relative_delta": None, - "am_pm": None, - "period_word": None, - "has_time": False, - "has_date": False, - "ambiguous_hour": False, - "is_24hour": False, - "has_relative_date": False, - } - - rest = self._parse_all(text, ctx) - if rest.strip(): - raise TokenUnhandledException(f"Unparsed tokens: {rest.strip()}") - - return self._apply_context(ctx) - - def _parse_all(self, text: str, ctx: Dict[str, Any]) -> str: - rest = text.lstrip() - while True: - for parser in [ - self._parse_absolute_date, - self._parse_relative_date, - self._parse_relative_time, - self._parse_period, - self._parse_time, - ]: - new_rest = parser(rest, ctx) - if new_rest != rest: - rest = new_rest.lstrip() - break - else: - break - return rest - - def _add_delta(self, ctx, delta): - if ctx["relative_delta"] is None: - ctx["relative_delta"] = delta - else: - ctx["relative_delta"] += delta - - def _parse_absolute_date(self, text: str, ctx: Dict[str, Any]) -> str: - text = text.lstrip() - m = re.match(r"^(\d{4})-(\d{1,2})-(\d{1,2})T(\d{1,2}):(\d{2})", text) - if m: - y, mth, d, h, minute = map(int, m.groups()) - ctx["date"] = datetime.date(y, mth, d) - ctx["time"] = datetime.time(h, minute) - ctx["has_date"] = True - ctx["has_time"] = True - ctx["is_24hour"] = True - return text[m.end():] - m = re.match(r"^(\d{4})-(\d{1,2})-(\d{1,2})", text) - if m: - y, mth, d = map(int, m.groups()) - ctx["date"] = datetime.date(y, mth, d) - ctx["has_date"] = True - return text[m.end():] - m = re.match(r"^(\d{4})/(\d{1,2})/(\d{1,2})", text) - if m: - y, mth, d = map(int, m.groups()) - ctx["date"] = datetime.date(y, mth, d) - ctx["has_date"] = True - return text[m.end():] - m = re.match(r"^(\d{4})年(\d{1,2})月(\d{1,2})[日号]", text) - if m: - y, mth, d = map(int, m.groups()) - ctx["date"] = datetime.date(y, mth, d) - ctx["has_date"] = True - return text[m.end():] - m = re.match(r"^(\d{1,2})月(\d{1,2})[日号]", text) - if m: - mth, d = map(int, m.groups()) - ctx["date"] = datetime.date(self.now.year, mth, d) - ctx["has_date"] = True - return text[m.end():] - m = re.match(r"^(.{1,3})月(.{1,3})[日号]", text) - if m: - m_str, d_str = m.groups() - _, mth = self.digest_chinese_number(m_str) - _, d = self.digest_chinese_number(d_str) - if mth == 0: - mth = 1 - if d == 0: - d = 1 - ctx["date"] = datetime.date(self.now.year, mth, d) - ctx["has_date"] = True - return text[m.end():] - return text - - def _parse_relative_date(self, text: str, ctx: Dict[str, Any]) -> str: - text = text.lstrip() + """ + Parse a time expression and return a datetime object. + This maintains backward compatibility with the original interface. - # Handle "今天", "今晚", "今早", etc. - today_variants = [ - ("今晚上", "PM"), - ("今晚", "PM"), - ("今早", "AM"), - ("今天早上", "AM"), - ("今天早晨", "AM"), - ("今天上午", "AM"), - ("今天下午", "PM"), - ("今天晚上", "PM"), - ("今天", None), - ] - for variant, period in today_variants: - if text.startswith(variant): - self._add_delta(ctx, datetime.timedelta(days=0)) - ctx["has_relative_date"] = True - rest = text[len(variant):] - if period is not None and ctx["am_pm"] is None: - ctx["am_pm"] = period - ctx["period_word"] = variant - return rest - - mapping = { - "明天": 1, - "后天": 2, - "大后天": 3, - "昨天": -1, - "前天": -2, - "大前天": -3, - } - for word, days in mapping.items(): - if text.startswith(word): - self._add_delta(ctx, datetime.timedelta(days=days)) - ctx["has_relative_date"] = True - return text[len(word):] - m = re.match(r"^(\d+|[零一二三四五六七八九十两]+)天(后|前|以后|之后)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - n = int(num_str) - else: - _, n = self.digest_chinese_number(num_str) - days = n if direction in ("后", "以后", "之后") else -n - self._add_delta(ctx, datetime.timedelta(days=days)) - ctx["has_relative_date"] = True - return text[m.end():] - m = re.match(r"^(本|上|下)周([一二三四五六日])", text) - if m: - scope, day = m.groups() - weekday_map = {"一": 0, "二": 1, "三": 2, "四": 3, "五": 4, "六": 5, "日": 6} - target = weekday_map[day] - current = self.now.weekday() - if scope == "本": - delta = target - current - elif scope == "上": - delta = target - current - 7 - else: - delta = target - current + 7 - self._add_delta(ctx, datetime.timedelta(days=delta)) - ctx["has_relative_date"] = True - return text[m.end():] - return text - - def _parse_period(self, text: str, ctx: Dict[str, Any]) -> str: - text = text.lstrip() - period_mapping = { - "上午": "AM", - "早晨": "AM", - "早上": "AM", - "早": "AM", - "中午": "PM", - "下午": "PM", - "晚上": "PM", - "晚": "PM", - "凌晨": "AM", - } - for word, tag in period_mapping.items(): - if text.startswith(word): - if ctx["am_pm"] is not None: - raise MultipleSpecificationException("Multiple periods") - ctx["am_pm"] = tag - ctx["period_word"] = word - return text[len(word):] - return text - - def _parse_time(self, text: str, ctx: Dict[str, Any]) -> str: - if ctx["has_time"]: - return text - text = text.lstrip() - - # 1. H:MM pattern - m = re.match(r"^(\d{1,2}):(\d{2})", text) - if m: - h, minute = int(m.group(1)), int(m.group(2)) - if 0 <= h <= 23 and 0 <= minute <= 59: - ctx["time"] = datetime.time(h, minute) - ctx["has_time"] = True - ctx["ambiguous_hour"] = 1 <= h <= 12 - ctx["is_24hour"] = h > 12 or h == 0 - return text[m.end():] - - # 2. Parse hour part - hour = None - rest_after_hour = text - is_24hour_format = False - - # Try Chinese number + 点/时 - temp_rest, num = self.digest_chinese_number(text) - if num >= 0: - temp_rest_stripped = temp_rest.lstrip() - if temp_rest_stripped.startswith("点"): - hour = num - is_24hour_format = False - rest_after_hour = temp_rest_stripped[1:] - elif temp_rest_stripped.startswith("时"): - hour = num - is_24hour_format = True - rest_after_hour = temp_rest_stripped[1:] - - if hour is None: - m = re.match(r"^(\d{1,2})\s*([点时])", text) - if m: - hour = int(m.group(1)) - is_24hour_format = m.group(2) == "时" - rest_after_hour = text[m.end():] - - if hour is None: - if ctx.get("am_pm") is not None: - temp_rest, num = self.digest_chinese_number(text) - if 0 <= num <= 23: - hour = num - is_24hour_format = False - rest_after_hour = temp_rest.lstrip() - else: - m = re.match(r"^(\d{1,2})", text) - if m: - h_val = int(m.group(1)) - if 0 <= h_val <= 23: - hour = h_val - is_24hour_format = False - rest_after_hour = text[m.end():].lstrip() - - if hour is None: - return text - - if not (0 <= hour <= 23): - return text - - # Parse minutes - rest = rest_after_hour.lstrip() - minute = 0 - minute_spec_count = 0 - - if rest.startswith("钟"): - rest = rest[1:].lstrip() - - has_zheng = False - if rest.startswith("整"): - has_zheng = True - rest = rest[1:].lstrip() - - if rest.startswith("半"): - minute = 30 - minute_spec_count += 1 - rest = rest[1:].lstrip() - if rest.startswith("钟"): - rest = rest[1:].lstrip() - if rest.startswith("整"): - rest = rest[1:].lstrip() - - if rest.startswith("一刻"): - minute = 15 - minute_spec_count += 1 - rest = rest[2:].lstrip() - if rest.startswith("钟"): - rest = rest[1:].lstrip() - - if rest.startswith("过一刻"): - minute = 15 - minute_spec_count += 1 - rest = rest[3:].lstrip() - if rest.startswith("钟"): - rest = rest[1:].lstrip() - - m = re.match(r"^(\d+|[零一二三四五六七八九十]+)分", rest) - if m: - minute_spec_count += 1 - m_str = m.group(1) - if m_str.isdigit(): - minute = int(m_str) - else: - _, minute = self.digest_chinese_number(m_str) - rest = rest[m.end():].lstrip() - - if minute_spec_count == 0: - temp_rest, num = self.digest_chinese_number(rest) - if num > 0 and num <= 59: - minute = num - minute_spec_count += 1 - rest = temp_rest.lstrip() - else: - m = re.match(r"^(\d{1,2})", rest) - if m: - m_val = int(m.group(1)) - if 0 <= m_val <= 59: - minute = m_val - minute_spec_count += 1 - rest = rest[m.end():].lstrip() - - if has_zheng and minute_spec_count == 0: - minute_spec_count = 1 - - if minute_spec_count > 1: - raise MultipleSpecificationException("Multiple minute specifications") - - if not (0 <= minute <= 59): - return text - - # Hours 13-23 are always 24-hour, even with "点" - if hour >= 13: - is_24hour_format = True - - ctx["time"] = datetime.time(hour, minute) - ctx["has_time"] = True - ctx["ambiguous_hour"] = 1 <= hour <= 12 and not is_24hour_format - ctx["is_24hour"] = is_24hour_format - - return rest - - def _parse_relative_time(self, text: str, ctx: Dict[str, Any]) -> str: - text = text.lstrip() + Args: + text: The time expression to parse + + Returns: + A datetime object representing the parsed time + + Raises: + TokenUnhandledException: If the input cannot be parsed + """ + return TimeExpression.parse(text, self.now) + + def digest_chinese_number(self, text: str) -> tuple[str, int]: + """ + Parse a Chinese number from the beginning of text and return the rest and the parsed number. - # 半小时 - m = re.match(r"^(半)(?:个)?小时?(后|前|以后|之后)", text) - if m: - direction = m.group(2) - hours = 0.5 - delta = datetime.timedelta( - hours=hours if direction in ("后", "以后", "之后") else -hours - ) - self._add_delta(ctx, delta) - return text[m.end():] + This matches the interface of the original digest_chinese_number method. - # X个半 - m = re.match(r"^([0-9零一二三四五六七八九十两]+)个半(?:小时?)?(后|前|以后|之后)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - base_hours = int(num_str) - else: - _, base_hours = self.digest_chinese_number(num_str) - if base_hours == 0 and num_str != "零": - return text - if base_hours <= 0: - return text - hours = base_hours + 0.5 - delta = datetime.timedelta( - hours=hours if direction in ("后", "以后", "之后") else -hours - ) - self._add_delta(ctx, delta) - return text[m.end():] - - # 一个半 - m = re.match(r"^(一个半)小时?(后|前|以后|之后)", text) - if m: - direction = m.group(2) - hours = 1.5 - delta = datetime.timedelta( - hours=hours if direction in ("后", "以后", "之后") else -hours - ) - self._add_delta(ctx, delta) - return text[m.end():] - - # X小时 - m = re.match(r"^([0-9零一二三四五六七八九十两]+)(?:个)?小时?(后|前|以后|之后)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - hours = int(num_str) - else: - _, hours = self.digest_chinese_number(num_str) - if hours == 0 and num_str != "零": - return text - if hours <= 0: - return text - delta = datetime.timedelta( - hours=hours if direction in ("后", "以后", "之后") else -hours - ) - self._add_delta(ctx, delta) - return text[m.end():] - - m = re.match(r"^([0-9零一二三四五六七八九十两]+)(?:个)?小时(后|前)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - hours = int(num_str) - else: - _, hours = self.digest_chinese_number(num_str) - if hours == 0 and num_str != "零": - return text - if hours <= 0: - return text - delta = datetime.timedelta( - hours=hours if direction == "后" else -hours - ) - self._add_delta(ctx, delta) - return text[m.end():] - - # X分钟 - m = re.match(r"^([0-9零一二三四五六七八九十两]+)分钟?(后|前|以后|之后)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - minutes = int(num_str) - else: - _, minutes = self.digest_chinese_number(num_str) - if minutes == 0 and num_str != "零": - return text - if minutes <= 0: - return text - delta = datetime.timedelta( - minutes=minutes if direction in ("后", "以后", "之后") else -minutes - ) - self._add_delta(ctx, delta) - return text[m.end():] - - m = re.match(r"^([0-9零一二三四五六七八九十两]+)分(后|前|以后|之后)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - minutes = int(num_str) - else: - _, minutes = self.digest_chinese_number(num_str) - if minutes == 0 and num_str != "零": - return text - if minutes <= 0: - return text - delta = datetime.timedelta( - minutes=minutes if direction in ("后", "以后", "之后") else -minutes - ) - self._add_delta(ctx, delta) - return text[m.end():] - - m = re.match(r"^([0-9零一二三四五六七八九十两]+)分钟?(后|前)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - minutes = int(num_str) - else: - _, minutes = self.digest_chinese_number(num_str) - if minutes == 0 and num_str != "零": - return text - if minutes <= 0: - return text - delta = datetime.timedelta( - minutes=minutes if direction == "后" else -minutes - ) - self._add_delta(ctx, delta) - return text[m.end():] - - m = re.match(r"^([0-9零一二三四五六七八九十两]+)分(后|前)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - minutes = int(num_str) - else: - _, minutes = self.digest_chinese_number(num_str) - if minutes == 0 and num_str != "零": - return text - if minutes <= 0: - return text - delta = datetime.timedelta( - minutes=minutes if direction == "后" else -minutes - ) - self._add_delta(ctx, delta) - return text[m.end():] - - # === 秒级支持 === - m = re.match(r"^([0-9零一二三四五六七八九十两]+)秒(后|前|以后|之后)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - seconds = int(num_str) - else: - _, seconds = self.digest_chinese_number(num_str) - if seconds == 0 and num_str != "零": - return text - if seconds <= 0: - return text - delta = datetime.timedelta( - seconds=seconds if direction in ("后", "以后", "之后") else -seconds - ) - self._add_delta(ctx, delta) - return text[m.end():] - - m = re.match(r"^([0-9零一二三四五六七八九十两]+)秒(后|前)", text) - if m: - num_str, direction = m.groups() - if num_str.isdigit(): - seconds = int(num_str) - else: - _, seconds = self.digest_chinese_number(num_str) - if seconds == 0 and num_str != "零": - return text - if seconds <= 0: - return text - delta = datetime.timedelta( - seconds=seconds if direction == "后" else -seconds - ) - self._add_delta(ctx, delta) - return text[m.end():] - - return text - - def _apply_context(self, ctx: Dict[str, Any]) -> datetime.datetime: - result = self.now - has_date = ctx["has_date"] - has_time = ctx["has_time"] - has_delta = ctx["relative_delta"] is not None - has_relative_date = ctx["has_relative_date"] - - if has_delta: - result = result + ctx["relative_delta"] - - if has_date: - result = result.replace( - year=ctx["date"].year, - month=ctx["date"].month, - day=ctx["date"].day, - ) - - if has_time: - h = ctx["time"].hour - m = ctx["time"].minute - - if ctx["is_24hour"]: - # "10 时" → 10:00, no conversion - pass - - elif ctx["am_pm"] == "AM": - if h == 12: - h = 0 - - elif ctx["am_pm"] == "PM": - if h == 12: - if ctx.get("period_word") in ("晚上", "晚"): - h = 0 - result += datetime.timedelta(days=1) - else: - h = 12 - elif 1 <= h <= 11: - h += 12 - - else: - # No period and not 24-hour (i.e., "点" format) - if ctx["has_relative_date"]: - # "明天五点" → 05:00 AM - if h == 12: - h = 0 - # keep h as AM hour (1-11 unchanged) - else: - # Infer from current time - am_hour = 0 if h == 12 else h - candidate_am = result.replace(hour=am_hour, minute=m, second=0, microsecond=0) - if candidate_am < self.now: - # AM time is in the past, so use PM - if h == 12: - h = 12 - else: - h += 12 - # else: keep as AM (h unchanged) - - if h > 23: - h = h % 24 - - result = result.replace(hour=h, minute=m, second=0, microsecond=0) - - else: - if has_date or (has_relative_date and not has_time): - result = result.replace(hour=0, minute=0, second=0, microsecond=0) - - return result - - -def parse(text: str) -> datetime.datetime: - return Parser().parse(text) + Args: + text: Text that may start with a Chinese number + + Returns: + Tuple of (remaining_text, parsed_number) + """ + from .chinese_number import ChineseNumberParser + parser = ChineseNumberParser() + return parser.digest(text) \ No newline at end of file diff --git a/konabot/common/ptimeparse/chinese_number.py b/konabot/common/ptimeparse/chinese_number.py new file mode 100644 index 0000000..25fb7e4 --- /dev/null +++ b/konabot/common/ptimeparse/chinese_number.py @@ -0,0 +1,133 @@ +""" +Chinese number parser for the time expression parser. +""" + +import re +from typing import Tuple + + +class ChineseNumberParser: + """Parser for Chinese numbers.""" + + def __init__(self): + self.digits = {"零": 0, "一": 1, "二": 2, "三": 3, "四": 4, + "五": 5, "六": 6, "七": 7, "八": 8, "九": 9} + self.units = {"十": 10, "百": 100, "千": 1000, "万": 10000, "亿": 100000000} + + def digest(self, text: str) -> Tuple[str, int]: + """ + Parse a Chinese number from the beginning of text and return the rest and the parsed number. + + Args: + text: Text that may start with a Chinese number + + Returns: + Tuple of (remaining_text, parsed_number) + """ + if not text: + return text, 0 + + # Handle "两" at start + if text.startswith("两"): + # Check if "两" is followed by a time unit + # Look ahead to see if we have a valid pattern like "两小时", "两分钟", etc. + if len(text) >= 2: + # Check for time units that start with the second character + time_units = ["小时", "分钟", "秒"] + for unit in time_units: + if text[1:].startswith(unit): + # Return the text starting from the time unit, not after it + # The parser will handle the time unit in the next step + return text[1:], 2 + # Check for single character time units + next_char = text[1] + if next_char in "时分秒": + return text[1:], 2 + # Check for Chinese number units + if next_char in "十百千万亿": + # This will be handled by the normal parsing below + pass + # If "两" is at the end of string, treat it as standalone + elif len(text) == 1: + return "", 2 + # Also accept "两" followed by whitespace and then time units + elif next_char.isspace(): + # Check if after whitespace we have time units + rest_after_space = text[2:].lstrip() + for unit in time_units: + if rest_after_space.startswith(unit): + # Return the text starting from the time unit + space_len = len(text[2:]) - len(rest_after_space) + return text[2+space_len:], 2 + # Check single character time units after whitespace + if rest_after_space and rest_after_space[0] in "时分秒": + return text[2:], 2 + else: + # Just "两" by itself + return "", 2 + + s = "零一二三四五六七八九" + i = 0 + while i < len(text) and text[i] in s + "十百千万亿": + i += 1 + if i == 0: + return text, 0 + num_str = text[:i] + rest = text[i:] + + return rest, self.parse(num_str) + + def parse(self, text: str) -> int: + """ + Parse a Chinese number string and return its integer value. + + Args: + text: Chinese number string + + Returns: + Integer value of the Chinese number + """ + if not text: + return 0 + if text == "零": + return 0 + if text == "两": + return 2 + + # Handle special case for "十" + if text == "十": + return 10 + + # Handle numbers with "亿" + if "亿" in text: + parts = text.split("亿", 1) + a, b = parts[0], parts[1] + return self.parse(a) * 100000000 + self.parse(b) + + # Handle numbers with "万" + if "万" in text: + parts = text.split("万", 1) + a, b = parts[0], parts[1] + return self.parse(a) * 10000 + self.parse(b) + + # Handle remaining numbers + result = 0 + temp = 0 + + for char in text: + if char == "零": + continue + elif char == "两": + temp = 2 + elif char in self.digits: + temp = self.digits[char] + elif char in self.units: + unit = self.units[char] + if unit == 10 and temp == 0: + # Special case for numbers like "十三" + temp = 1 + result += temp * unit + temp = 0 + + result += temp + return result \ No newline at end of file diff --git a/konabot/common/ptimeparse/expression.py b/konabot/common/ptimeparse/expression.py new file mode 100644 index 0000000..950c85d --- /dev/null +++ b/konabot/common/ptimeparse/expression.py @@ -0,0 +1,63 @@ +""" +Main time expression parser class that integrates all components. +""" + +import datetime +from typing import Optional + +from .lexer import Lexer +from .parser import Parser +from .semantic import SemanticAnalyzer +from .ptime_ast import TimeExpressionNode +from .err import TokenUnhandledException + + +class TimeExpression: + """Main class for parsing time expressions.""" + + def __init__(self, text: str, now: Optional[datetime.datetime] = None): + self.text = text.strip() + self.now = now or datetime.datetime.now() + + if not self.text: + raise TokenUnhandledException("Empty input") + + # Initialize components + self.lexer = Lexer(self.text, self.now) + self.parser = Parser(self.text, self.now) + self.semantic_analyzer = SemanticAnalyzer(self.now) + + # Parse the expression + self.ast = self._parse() + + def _parse(self) -> TimeExpressionNode: + """Parse the time expression and return the AST.""" + try: + return self.parser.parse() + except Exception as e: + raise TokenUnhandledException(f"Failed to parse '{self.text}': {str(e)}") + + def evaluate(self) -> datetime.datetime: + """Evaluate the time expression and return the datetime.""" + try: + return self.semantic_analyzer.evaluate(self.ast) + except Exception as e: + raise TokenUnhandledException(f"Failed to evaluate '{self.text}': {str(e)}") + + @classmethod + def parse(cls, text: str, now: Optional[datetime.datetime] = None) -> datetime.datetime: + """ + Parse a time expression and return a datetime object. + + Args: + text: The time expression to parse + now: The reference time (defaults to current time) + + Returns: + A datetime object representing the parsed time + + Raises: + TokenUnhandledException: If the input cannot be parsed + """ + expression = cls(text, now) + return expression.evaluate() \ No newline at end of file diff --git a/konabot/common/ptimeparse/lexer.py b/konabot/common/ptimeparse/lexer.py new file mode 100644 index 0000000..91c3023 --- /dev/null +++ b/konabot/common/ptimeparse/lexer.py @@ -0,0 +1,225 @@ +""" +Lexical analyzer for time expressions. +""" + +import re +from typing import Iterator, Optional +import datetime + +from .ptime_token import Token, TokenType +from .chinese_number import ChineseNumberParser + + +class Lexer: + """Lexical analyzer for time expressions.""" + + def __init__(self, text: str, now: Optional[datetime.datetime] = None): + self.text = text + self.pos = 0 + self.current_char = self.text[self.pos] if self.text else None + self.now = now or datetime.datetime.now() + self.chinese_parser = ChineseNumberParser() + + # Define token patterns + self.token_patterns = [ + # Whitespace + (r'^\s+', TokenType.WHITESPACE), + + # Time separators + (r'^:', TokenType.TIME_SEPARATOR), + (r'^点', TokenType.TIME_SEPARATOR), + (r'^时', TokenType.TIME_SEPARATOR), + (r'^分', TokenType.TIME_SEPARATOR), + (r'^秒', TokenType.TIME_SEPARATOR), + + # Special time markers + (r'^半', TokenType.HALF), + (r'^一刻', TokenType.QUARTER), + (r'^整', TokenType.ZHENG), + (r'^钟', TokenType.ZHONG), + + # Period indicators (must come before relative time patterns to avoid conflicts) + (r'^(上午|早晨|早上|清晨|早(?!\d))', TokenType.PERIOD_AM), + (r'^(中午|下午|晚上|晚(?!\d)|凌晨|午夜)', TokenType.PERIOD_PM), + + # Week scope (more specific patterns first) + (r'^本周', TokenType.WEEK_SCOPE_CURRENT), + (r'^上周', TokenType.WEEK_SCOPE_LAST), + (r'^下周', TokenType.WEEK_SCOPE_NEXT), + + # Relative directions + (r'^(后|以后|之后)', TokenType.RELATIVE_DIRECTION_FORWARD), + (r'^(前|以前|之前)', TokenType.RELATIVE_DIRECTION_BACKWARD), + + # Extended relative time + (r'^明年', TokenType.RELATIVE_NEXT), + (r'^去年', TokenType.RELATIVE_LAST), + (r'^今年', TokenType.RELATIVE_THIS), + (r'^下(?![午年月周])', TokenType.RELATIVE_NEXT), + (r'^(上|去)(?![午年月周])', TokenType.RELATIVE_LAST), + (r'^这', TokenType.RELATIVE_THIS), + (r'^本(?![周月年])', TokenType.RELATIVE_THIS), # Match "本" but not "本周", "本月", "本年" + + # Week scope (fallback for standalone terms) + (r'^本', TokenType.WEEK_SCOPE_CURRENT), + (r'^上', TokenType.WEEK_SCOPE_LAST), + (r'^下(?![午年月周])', TokenType.WEEK_SCOPE_NEXT), + + # Week days (order matters - longer patterns first) + (r'^周一', TokenType.WEEKDAY_MONDAY), + (r'^周二', TokenType.WEEKDAY_TUESDAY), + (r'^周三', TokenType.WEEKDAY_WEDNESDAY), + (r'^周四', TokenType.WEEKDAY_THURSDAY), + (r'^周五', TokenType.WEEKDAY_FRIDAY), + (r'^周六', TokenType.WEEKDAY_SATURDAY), + (r'^周日', TokenType.WEEKDAY_SUNDAY), + # Single character weekdays should be matched after numbers + # (r'^一', TokenType.WEEKDAY_MONDAY), + # (r'^二', TokenType.WEEKDAY_TUESDAY), + # (r'^三', TokenType.WEEKDAY_WEDNESDAY), + # (r'^四', TokenType.WEEKDAY_THURSDAY), + # (r'^五', TokenType.WEEKDAY_FRIDAY), + # (r'^六', TokenType.WEEKDAY_SATURDAY), + # (r'^日', TokenType.WEEKDAY_SUNDAY), + + # Student-friendly time expressions + (r'^早(?=\d)', TokenType.EARLY_MORNING), + (r'^晚(?=\d)', TokenType.LATE_NIGHT), + + # Relative today variants + (r'^今晚上', TokenType.RELATIVE_TODAY), + (r'^今晚', TokenType.RELATIVE_TODAY), + (r'^今早', TokenType.RELATIVE_TODAY), + (r'^今天早上', TokenType.RELATIVE_TODAY), + (r'^今天早晨', TokenType.RELATIVE_TODAY), + (r'^今天上午', TokenType.RELATIVE_TODAY), + (r'^今天下午', TokenType.RELATIVE_TODAY), + (r'^今天晚上', TokenType.RELATIVE_TODAY), + (r'^今天', TokenType.RELATIVE_TODAY), + + # Relative days + (r'^明天', TokenType.RELATIVE_TOMORROW), + (r'^后天', TokenType.RELATIVE_DAY_AFTER_TOMORROW), + (r'^大后天', TokenType.RELATIVE_THREE_DAYS_AFTER_TOMORROW), + (r'^昨天', TokenType.RELATIVE_YESTERDAY), + (r'^前天', TokenType.RELATIVE_DAY_BEFORE_YESTERDAY), + (r'^大前天', TokenType.RELATIVE_THREE_DAYS_BEFORE_YESTERDAY), + + # Digits + (r'^\d+', TokenType.INTEGER), + + # Time units (must come after date separators to avoid conflicts) + (r'^年(?![月日号])', TokenType.YEAR), + (r'^月(?![日号])', TokenType.MONTH), + (r'^[日号](?![月年])', TokenType.DAY), + (r'^天', TokenType.DAY), + (r'^周', TokenType.WEEK), + (r'^小时', TokenType.HOUR), + (r'^分钟', TokenType.MINUTE), + (r'^秒', TokenType.SECOND), + + # Date separators (fallback patterns) + (r'^年', TokenType.DATE_SEPARATOR), + (r'^月', TokenType.DATE_SEPARATOR), + (r'^[日号]', TokenType.DATE_SEPARATOR), + (r'^[-/]', TokenType.DATE_SEPARATOR), + ] + + def advance(self): + """Advance the position pointer and set the current character.""" + self.pos += 1 + if self.pos >= len(self.text): + self.current_char = None + else: + self.current_char = self.text[self.pos] + + def skip_whitespace(self): + """Skip whitespace characters.""" + while self.current_char is not None and self.current_char.isspace(): + self.advance() + + def integer(self) -> int: + """Parse an integer from the input.""" + result = '' + while self.current_char is not None and self.current_char.isdigit(): + result += self.current_char + self.advance() + return int(result) + + def chinese_number(self) -> int: + """Parse a Chinese number from the input.""" + # Find the longest prefix that can be parsed as a Chinese number + for i in range(len(self.text) - self.pos, 0, -1): + prefix = self.text[self.pos:self.pos + i] + try: + # Use digest to get both the remaining text and the parsed value + remaining, value = self.chinese_parser.digest(prefix) + # Check if we actually consumed part of the prefix + consumed_length = len(prefix) - len(remaining) + if consumed_length > 0: + # Advance position by the length of the consumed text + for _ in range(consumed_length): + self.advance() + return value + except ValueError: + continue + # If no Chinese number found, just return 0 + return 0 + + def get_next_token(self) -> Token: + """Lexical analyzer that breaks the sentence into tokens.""" + while self.current_char is not None: + # Skip whitespace + if self.current_char.isspace(): + self.skip_whitespace() + continue + + # Try to match each pattern + text_remaining = self.text[self.pos:] + for pattern, token_type in self.token_patterns: + match = re.match(pattern, text_remaining) + if match: + value = match.group(0) + position = self.pos + + # Advance position + for _ in range(len(value)): + self.advance() + + # Special handling for some tokens + if token_type == TokenType.INTEGER: + value = int(value) + elif token_type == TokenType.RELATIVE_TODAY and value in [ + "今早上", "今天早上", "今天早晨", "今天上午" + ]: + token_type = TokenType.PERIOD_AM + elif token_type == TokenType.RELATIVE_TODAY and value in [ + "今晚上", "今天下午", "今天晚上" + ]: + token_type = TokenType.PERIOD_PM + + return Token(token_type, value, position) + + # Try to parse Chinese numbers + chinese_start_pos = self.pos + try: + chinese_value = self.chinese_number() + if chinese_value > 0: + # We successfully parsed a Chinese number + return Token(TokenType.CHINESE_NUMBER, chinese_value, chinese_start_pos) + except ValueError: + pass + + # If no pattern matches, skip the character and continue + self.advance() + + # End of file + return Token(TokenType.EOF, None, self.pos) + + def tokenize(self) -> Iterator[Token]: + """Generate all tokens from the input.""" + while True: + token = self.get_next_token() + yield token + if token.type == TokenType.EOF: + break \ No newline at end of file diff --git a/konabot/common/ptimeparse/parser.py b/konabot/common/ptimeparse/parser.py new file mode 100644 index 0000000..9c9fec8 --- /dev/null +++ b/konabot/common/ptimeparse/parser.py @@ -0,0 +1,846 @@ +""" +Parser for time expressions that builds an Abstract Syntax Tree (AST). +""" + +from typing import Iterator, Optional, List +import datetime + +from .ptime_token import Token, TokenType +from .ptime_ast import ( + ASTNode, NumberNode, DateNode, TimeNode, + RelativeDateNode, RelativeTimeNode, WeekdayNode, TimeExpressionNode +) +from .lexer import Lexer + + +class ParserError(Exception): + """Exception raised for parser errors.""" + pass + + +class Parser: + """Parser for time expressions that builds an AST.""" + + def __init__(self, text: str, now: Optional[datetime.datetime] = None): + self.lexer = Lexer(text, now) + self.tokens: List[Token] = list(self.lexer.tokenize()) + self.pos = 0 + self.now = now or datetime.datetime.now() + + @property + def current_token(self) -> Token: + """Get the current token.""" + if self.pos < len(self.tokens): + return self.tokens[self.pos] + return Token(TokenType.EOF, None, len(self.tokens)) + + def eat(self, token_type: TokenType) -> Token: + """Consume a token of the expected type.""" + if self.current_token.type == token_type: + token = self.current_token + self.pos += 1 + return token + else: + raise ParserError( + f"Expected token {token_type}, got {self.current_token.type} " + f"at position {self.current_token.position}" + ) + + def peek(self, offset: int = 1) -> Token: + """Look ahead at the next token without consuming it.""" + next_pos = self.pos + offset + if next_pos < len(self.tokens): + return self.tokens[next_pos] + return Token(TokenType.EOF, None, len(self.tokens)) + + def parse_number(self) -> NumberNode: + """Parse a number (integer or Chinese number).""" + token = self.current_token + if token.type == TokenType.INTEGER: + self.eat(TokenType.INTEGER) + return NumberNode(value=token.value) + elif token.type == TokenType.CHINESE_NUMBER: + self.eat(TokenType.CHINESE_NUMBER) + return NumberNode(value=token.value) + else: + raise ParserError( + f"Expected number, got {token.type} at position {token.position}" + ) + + def parse_date(self) -> DateNode: + """Parse a date specification.""" + year_node = None + month_node = None + day_node = None + + # Try YYYY-MM-DD or YYYY/MM/DD format + if (self.current_token.type == TokenType.INTEGER and + self.peek().type == TokenType.DATE_SEPARATOR and + self.peek().value in ['-', '/'] and + self.peek(2).type == TokenType.INTEGER and + self.peek(3).type == TokenType.DATE_SEPARATOR and + self.peek(3).value in ['-', '/'] and + self.peek(4).type == TokenType.INTEGER): + + year_token = self.current_token + self.eat(TokenType.INTEGER) + separator1 = self.eat(TokenType.DATE_SEPARATOR).value + + month_token = self.current_token + self.eat(TokenType.INTEGER) + + separator2 = self.eat(TokenType.DATE_SEPARATOR).value + + day_token = self.current_token + self.eat(TokenType.INTEGER) + + year_node = NumberNode(value=year_token.value) + month_node = NumberNode(value=month_token.value) + day_node = NumberNode(value=day_token.value) + + return DateNode(year=year_node, month=month_node, day=day_node) + + # Try YYYY年MM月DD[日号] format + if (self.current_token.type == TokenType.INTEGER and + self.peek().type in [TokenType.DATE_SEPARATOR, TokenType.YEAR] and + self.peek(2).type == TokenType.INTEGER and + self.peek(3).type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and + self.peek(4).type == TokenType.INTEGER): + + year_token = self.current_token + self.eat(TokenType.INTEGER) + self.eat(self.current_token.type) # 年 (could be DATE_SEPARATOR or YEAR) + + month_token = self.current_token + self.eat(TokenType.INTEGER) + self.eat(self.current_token.type) # 月 (could be DATE_SEPARATOR or MONTH) + + day_token = self.current_token + self.eat(TokenType.INTEGER) + # Optional 日 or 号 + if self.current_token.type in [TokenType.DATE_SEPARATOR, TokenType.DAY]: + self.eat(self.current_token.type) + + year_node = NumberNode(value=year_token.value) + month_node = NumberNode(value=month_token.value) + day_node = NumberNode(value=day_token.value) + + return DateNode(year=year_node, month=month_node, day=day_node) + + # Try MM月DD[日号] format (without year) + if (self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and + self.peek().type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and + self.peek().value == '月' and + self.peek(2).type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]): + + month_token = self.current_token + self.eat(month_token.type) + self.eat(self.current_token.type) # 月 (could be DATE_SEPARATOR or MONTH) + + day_token = self.current_token + self.eat(day_token.type) + # Optional 日 or 号 + if self.current_token.type in [TokenType.DATE_SEPARATOR, TokenType.DAY]: + self.eat(self.current_token.type) + + month_node = NumberNode(value=month_token.value) + day_node = NumberNode(value=day_token.value) + + return DateNode(year=None, month=month_node, day=day_node) + + # Try Chinese MM月DD[日号] format + if (self.current_token.type == TokenType.CHINESE_NUMBER and + self.peek().type == TokenType.DATE_SEPARATOR and + self.peek().value == '月' and + self.peek(2).type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]): + + month_token = self.current_token + self.eat(TokenType.CHINESE_NUMBER) + self.eat(TokenType.DATE_SEPARATOR) # 月 + + day_token = self.current_token + self.eat(day_token.type) + # Optional 日 or 号 + if self.current_token.type == TokenType.DATE_SEPARATOR: + self.eat(TokenType.DATE_SEPARATOR) + + month_node = NumberNode(value=month_token.value) + day_node = NumberNode(value=day_token.value) + + return DateNode(year=None, month=month_node, day=day_node) + + raise ParserError( + f"Unable to parse date at position {self.current_token.position}" + ) + + def parse_time(self) -> TimeNode: + """Parse a time specification.""" + hour_node = None + minute_node = None + second_node = None + is_24hour = False + period = None + + # Try HH:MM format + if (self.current_token.type == TokenType.INTEGER and + self.peek().type == TokenType.TIME_SEPARATOR and + self.peek().value == ':'): + + hour_token = self.current_token + self.eat(TokenType.INTEGER) + self.eat(TokenType.TIME_SEPARATOR) # : + + minute_token = self.current_token + self.eat(TokenType.INTEGER) + + hour_node = NumberNode(value=hour_token.value) + minute_node = NumberNode(value=minute_token.value) + is_24hour = True # HH:MM is always interpreted as 24-hour + + # Optional :SS + if (self.current_token.type == TokenType.TIME_SEPARATOR and + self.peek().type == TokenType.INTEGER): + + self.eat(TokenType.TIME_SEPARATOR) # : + second_token = self.current_token + self.eat(TokenType.INTEGER) + second_node = NumberNode(value=second_token.value) + + return TimeNode( + hour=hour_node, + minute=minute_node, + second=second_node, + is_24hour=is_24hour, + period=period + ) + + # Try Chinese time format (X点X分) + # First check for period indicators + period = None + if self.current_token.type in [TokenType.PERIOD_AM, TokenType.PERIOD_PM]: + if self.current_token.type == TokenType.PERIOD_AM: + period = "AM" + else: + period = "PM" + self.eat(self.current_token.type) + + if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER, TokenType.EARLY_MORNING, TokenType.LATE_NIGHT]: + if self.current_token.type == TokenType.EARLY_MORNING: + self.eat(TokenType.EARLY_MORNING) + is_24hour = True + period = "AM" + + # Expect a number next + if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]: + hour_token = self.current_token + self.eat(hour_token.type) + hour_node = NumberNode(value=hour_token.value) + + # "早八" should be interpreted as 08:00 + # If hour is greater than 12, treat as 24-hour + if hour_node.value > 12: + is_24hour = True + period = None + else: + raise ParserError( + f"Expected number after '早', got {self.current_token.type} " + f"at position {self.current_token.position}" + ) + elif self.current_token.type == TokenType.LATE_NIGHT: + self.eat(TokenType.LATE_NIGHT) + is_24hour = True + period = "PM" + + # Expect a number next + if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]: + hour_token = self.current_token + self.eat(hour_token.type) + hour_node = NumberNode(value=hour_token.value) + + # "晚十" should be interpreted as 22:00 + # Adjust hour to 24-hour format + if hour_node.value <= 12: + hour_node.value += 12 + is_24hour = True + period = None + else: + raise ParserError( + f"Expected number after '晚', got {self.current_token.type} " + f"at position {self.current_token.position}" + ) + else: + # Regular time parsing + hour_token = self.current_token + self.eat(hour_token.type) + + # Check for 点 or 时 + if self.current_token.type == TokenType.TIME_SEPARATOR: + separator = self.current_token.value + self.eat(TokenType.TIME_SEPARATOR) + + if separator == '点': + is_24hour = False + elif separator == '时': + is_24hour = True + + hour_node = NumberNode(value=hour_token.value) + + # Optional minutes + if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]: + minute_token = self.current_token + self.eat(minute_token.type) + + # Optional 分 + if self.current_token.type == TokenType.TIME_SEPARATOR and \ + self.current_token.value == '分': + self.eat(TokenType.TIME_SEPARATOR) + + minute_node = NumberNode(value=minute_token.value) + + # Handle special markers + if self.current_token.type == TokenType.HALF: + self.eat(TokenType.HALF) + minute_node = NumberNode(value=30) + elif self.current_token.type == TokenType.QUARTER: + self.eat(TokenType.QUARTER) + minute_node = NumberNode(value=15) + elif self.current_token.type == TokenType.ZHENG: + self.eat(TokenType.ZHENG) + if minute_node is None: + minute_node = NumberNode(value=0) + + # Optional 钟 + if self.current_token.type == TokenType.ZHONG: + self.eat(TokenType.ZHONG) + else: + # If no separator, treat as hour-only time (like "三点") + hour_node = NumberNode(value=hour_token.value) + is_24hour = False + + return TimeNode( + hour=hour_node, + minute=minute_node, + second=second_node, + is_24hour=is_24hour, + period=period + ) + + raise ParserError( + f"Unable to parse time at position {self.current_token.position}" + ) + + def parse_relative_date(self) -> RelativeDateNode: + """Parse a relative date specification.""" + years = 0 + months = 0 + weeks = 0 + days = 0 + + # Handle today variants + if self.current_token.type == TokenType.RELATIVE_TODAY: + self.eat(TokenType.RELATIVE_TODAY) + days = 0 + elif self.current_token.type == TokenType.RELATIVE_TOMORROW: + self.eat(TokenType.RELATIVE_TOMORROW) + days = 1 + elif self.current_token.type == TokenType.RELATIVE_DAY_AFTER_TOMORROW: + self.eat(TokenType.RELATIVE_DAY_AFTER_TOMORROW) + days = 2 + elif self.current_token.type == TokenType.RELATIVE_THREE_DAYS_AFTER_TOMORROW: + self.eat(TokenType.RELATIVE_THREE_DAYS_AFTER_TOMORROW) + days = 3 + elif self.current_token.type == TokenType.RELATIVE_YESTERDAY: + self.eat(TokenType.RELATIVE_YESTERDAY) + days = -1 + elif self.current_token.type == TokenType.RELATIVE_DAY_BEFORE_YESTERDAY: + self.eat(TokenType.RELATIVE_DAY_BEFORE_YESTERDAY) + days = -2 + elif self.current_token.type == TokenType.RELATIVE_THREE_DAYS_BEFORE_YESTERDAY: + self.eat(TokenType.RELATIVE_THREE_DAYS_BEFORE_YESTERDAY) + days = -3 + else: + # Check if this looks like an absolute date pattern before processing + # Look ahead to see if this matches absolute date patterns + is_likely_absolute_date = False + + # Check for MM月DD[日号] patterns (like "6月20日") + if (self.pos + 2 < len(self.tokens) and + self.tokens[self.pos].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and + self.tokens[self.pos + 1].type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and + self.tokens[self.pos + 1].value == '月' and + self.tokens[self.pos + 2].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]): + is_likely_absolute_date = True + + if is_likely_absolute_date: + # This looks like an absolute date, skip relative date parsing + raise ParserError("Looks like absolute date format") + + # Try to parse extended relative time expressions + # Handle patterns like "明年", "去年", "下个月", "上个月", etc. + original_pos = self.pos + try: + # Check for "今年", "明年", "去年" + if self.current_token.type == TokenType.RELATIVE_THIS and self.peek().type == TokenType.YEAR: + self.eat(TokenType.RELATIVE_THIS) + self.eat(TokenType.YEAR) + years = 0 # Current year + elif self.current_token.type == TokenType.RELATIVE_NEXT and self.peek().type == TokenType.YEAR: + self.eat(TokenType.RELATIVE_NEXT) + self.eat(TokenType.YEAR) + years = 1 # Next year + elif self.current_token.type == TokenType.RELATIVE_LAST and self.peek().type == TokenType.YEAR: + self.eat(TokenType.RELATIVE_LAST) + self.eat(TokenType.YEAR) + years = -1 # Last year + elif self.current_token.type == TokenType.RELATIVE_NEXT and self.current_token.value == "明年": + self.eat(TokenType.RELATIVE_NEXT) + years = 1 # Next year + # Check if there's a month after "明年" + if (self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and + self.peek().type == TokenType.MONTH): + # Parse the month + month_node = self.parse_number() + self.eat(TokenType.MONTH) # Eat the "月" token + # Store the month in the months field as a special marker + # We'll handle this in semantic analysis + months = month_node.value - 100 # Use negative offset to indicate absolute month + elif self.current_token.type == TokenType.RELATIVE_LAST and self.current_token.value == "去年": + self.eat(TokenType.RELATIVE_LAST) + years = -1 # Last year + elif self.current_token.type == TokenType.RELATIVE_THIS and self.current_token.value == "今年": + self.eat(TokenType.RELATIVE_THIS) + years = 0 # Current year + + # Check for "这个月", "下个月", "上个月" + elif self.current_token.type == TokenType.RELATIVE_THIS and self.peek().type == TokenType.MONTH: + self.eat(TokenType.RELATIVE_THIS) + self.eat(TokenType.MONTH) + months = 0 # Current month + elif self.current_token.type == TokenType.RELATIVE_NEXT and self.peek().type == TokenType.MONTH: + self.eat(TokenType.RELATIVE_NEXT) + self.eat(TokenType.MONTH) + months = 1 # Next month + + # Handle patterns like "下个月五号" + if (self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and + self.peek().type == TokenType.DAY): + # Parse the day + day_node = self.parse_number() + self.eat(TokenType.DAY) # Eat the "号" token + # Instead of adding days to the current date, we should set a specific day in the target month + # We'll handle this in semantic analysis by setting a flag or special value + days = 0 # Reset days - we'll handle the day differently + # Use a special marker to indicate we want a specific day in the target month + # For now, we'll just store the target day in the weeks field as a temporary solution + weeks = day_node.value # This is a hack - we'll fix this in semantic analysis + elif self.current_token.type == TokenType.RELATIVE_LAST and self.peek().type == TokenType.MONTH: + self.eat(TokenType.RELATIVE_LAST) + self.eat(TokenType.MONTH) + months = -1 # Last month + + # Check for "下周", "上周" + elif self.current_token.type == TokenType.RELATIVE_NEXT and self.peek().type == TokenType.WEEK: + self.eat(TokenType.RELATIVE_NEXT) + self.eat(TokenType.WEEK) + weeks = 1 # Next week + elif self.current_token.type == TokenType.RELATIVE_LAST and self.peek().type == TokenType.WEEK: + self.eat(TokenType.RELATIVE_LAST) + self.eat(TokenType.WEEK) + weeks = -1 # Last week + + # Handle more complex patterns like "X年后", "X个月后", etc. + elif self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]: + # Check if this is likely an absolute date format (e.g., "2025年11月21日") + # If the next token after the number is a date separator or date unit, + # and the number looks like a year (4 digits) or the pattern continues, + # it might be an absolute date. In that case, skip relative date parsing. + + # Look ahead to see if this matches absolute date patterns + lookahead_pos = self.pos + is_likely_absolute_date = False + + # Check for YYYY-MM-DD or YYYY/MM/DD patterns + if (lookahead_pos + 4 < len(self.tokens) and + self.tokens[lookahead_pos].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and + self.tokens[lookahead_pos + 1].type in [TokenType.DATE_SEPARATOR, TokenType.YEAR] and + self.tokens[lookahead_pos + 1].value in ['-', '/', '年'] and + self.tokens[lookahead_pos + 2].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and + self.tokens[lookahead_pos + 3].type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and + self.tokens[lookahead_pos + 3].value in ['-', '/', '月']): + is_likely_absolute_date = True + + # Check for YYYY年MM月DD patterns + if (lookahead_pos + 4 < len(self.tokens) and + self.tokens[lookahead_pos].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and + self.tokens[lookahead_pos + 1].type in [TokenType.DATE_SEPARATOR, TokenType.YEAR] and + self.tokens[lookahead_pos + 1].value == '年' and + self.tokens[lookahead_pos + 2].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and + self.tokens[lookahead_pos + 3].type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and + self.tokens[lookahead_pos + 3].value == '月'): + is_likely_absolute_date = True + + # Check for MM月DD[日号] patterns (like "6月20日") + if (self.pos + 2 < len(self.tokens) and + self.tokens[self.pos].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and + self.tokens[self.pos + 1].type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and + self.tokens[self.pos + 1].value == '月' and + self.tokens[self.pos + 2].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]): + is_likely_absolute_date = True + + if is_likely_absolute_date: + # This looks like an absolute date, skip relative date parsing + raise ParserError("Looks like absolute date format") + + print(f"DEBUG: Parsing complex relative date pattern") + # Parse the number + number_node = self.parse_number() + number_value = number_node.value + print(f"DEBUG: Parsed number: {number_value}") + + # Check the unit + if self.current_token.type == TokenType.YEAR: + self.eat(TokenType.YEAR) + years = number_value + print(f"DEBUG: Set years to {years}") + elif self.current_token.type == TokenType.MONTH: + self.eat(TokenType.MONTH) + months = number_value + print(f"DEBUG: Set months to {months}") + elif self.current_token.type == TokenType.WEEK: + self.eat(TokenType.WEEK) + weeks = number_value + print(f"DEBUG: Set weeks to {weeks}") + elif self.current_token.type == TokenType.DAY: + self.eat(TokenType.DAY) + days = number_value + print(f"DEBUG: Set days to {days}") + else: + print(f"DEBUG: Unexpected token type: {self.current_token.type}") + raise ParserError( + f"Expected time unit, got {self.current_token.type} " + f"at position {self.current_token.position}" + ) + + # Check direction (前/后) + if self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD: + self.eat(TokenType.RELATIVE_DIRECTION_FORWARD) + print(f"DEBUG: Forward direction, values are already positive") + # Values are already positive + elif self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD: + self.eat(TokenType.RELATIVE_DIRECTION_BACKWARD) + print(f"DEBUG: Backward direction, negating values") + years = -years + months = -months + weeks = -weeks + days = -days + + except ParserError: + # Reset position if parsing failed + self.pos = original_pos + raise ParserError( + f"Expected relative date, got {self.current_token.type} " + f"at position {self.current_token.position}" + ) + + return RelativeDateNode(years=years, months=months, weeks=weeks, days=days) + + def parse_weekday(self) -> WeekdayNode: + """Parse a weekday specification.""" + # Parse week scope (本, 上, 下) + scope = "current" + if self.current_token.type == TokenType.WEEK_SCOPE_CURRENT: + self.eat(TokenType.WEEK_SCOPE_CURRENT) + scope = "current" + elif self.current_token.type == TokenType.WEEK_SCOPE_LAST: + self.eat(TokenType.WEEK_SCOPE_LAST) + scope = "last" + elif self.current_token.type == TokenType.WEEK_SCOPE_NEXT: + self.eat(TokenType.WEEK_SCOPE_NEXT) + scope = "next" + + # Parse weekday + weekday_map = { + TokenType.WEEKDAY_MONDAY: 0, + TokenType.WEEKDAY_TUESDAY: 1, + TokenType.WEEKDAY_WEDNESDAY: 2, + TokenType.WEEKDAY_THURSDAY: 3, + TokenType.WEEKDAY_FRIDAY: 4, + TokenType.WEEKDAY_SATURDAY: 5, + TokenType.WEEKDAY_SUNDAY: 6, + # Handle Chinese numbers (1=Monday, 2=Tuesday, etc.) + TokenType.CHINESE_NUMBER: lambda x: x - 1 if 1 <= x <= 7 else None, + } + + if self.current_token.type in weekday_map: + if self.current_token.type == TokenType.CHINESE_NUMBER: + # Handle numeric weekday (1=Monday, 2=Tuesday, etc.) + weekday_num = self.current_token.value + if 1 <= weekday_num <= 7: + weekday = weekday_num - 1 # Convert to 0-based index + self.eat(TokenType.CHINESE_NUMBER) + return WeekdayNode(weekday=weekday, scope=scope) + else: + raise ParserError( + f"Invalid weekday number: {weekday_num} " + f"at position {self.current_token.position}" + ) + else: + weekday = weekday_map[self.current_token.type] + self.eat(self.current_token.type) + return WeekdayNode(weekday=weekday, scope=scope) + + raise ParserError( + f"Expected weekday, got {self.current_token.type} " + f"at position {self.current_token.position}" + ) + + def parse_relative_time(self) -> RelativeTimeNode: + """Parse a relative time specification.""" + hours = 0.0 + minutes = 0.0 + seconds = 0.0 + + def parse_relative_time(self) -> RelativeTimeNode: + """Parse a relative time specification.""" + hours = 0.0 + minutes = 0.0 + seconds = 0.0 + + # Parse sequences of relative time expressions + while self.current_token.type in [ + TokenType.INTEGER, TokenType.CHINESE_NUMBER, + TokenType.HALF, TokenType.QUARTER + ] or (self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD or + self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD): + + # Handle 半小时 + if (self.current_token.type == TokenType.HALF): + self.eat(TokenType.HALF) + # Optional 个 + if (self.current_token.type == TokenType.INTEGER and + self.current_token.value == "个"): + self.eat(TokenType.INTEGER) + # Optional 小时 + if self.current_token.type == TokenType.HOUR: + self.eat(TokenType.HOUR) + hours += 0.5 + # Check for direction + if self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD: + self.eat(TokenType.RELATIVE_DIRECTION_FORWARD) + elif self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD: + self.eat(TokenType.RELATIVE_DIRECTION_BACKWARD) + hours = -hours + continue + + # Handle 一刻钟 (15 minutes) + if self.current_token.type == TokenType.QUARTER: + self.eat(TokenType.QUARTER) + # Optional 钟 + if self.current_token.type == TokenType.ZHONG: + self.eat(TokenType.ZHONG) + minutes += 15 + # Check for direction + if self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD: + self.eat(TokenType.RELATIVE_DIRECTION_FORWARD) + elif self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD: + self.eat(TokenType.RELATIVE_DIRECTION_BACKWARD) + minutes = -minutes + continue + + # Parse number if we have one + if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]: + number_node = self.parse_number() + number_value = number_node.value + + # Determine unit and direction + unit = None + direction = 1 # Forward by default + + # Check for unit + if self.current_token.type == TokenType.HOUR: + self.eat(TokenType.HOUR) + # Optional 个 + if (self.current_token.type == TokenType.INTEGER and + self.current_token.value == "个"): + self.eat(TokenType.INTEGER) + unit = "hour" + elif self.current_token.type == TokenType.MINUTE: + self.eat(TokenType.MINUTE) + unit = "minute" + elif self.current_token.type == TokenType.SECOND: + self.eat(TokenType.SECOND) + unit = "second" + elif self.current_token.type == TokenType.TIME_SEPARATOR: + # Handle "X点", "X分", "X秒" format + sep_value = self.current_token.value + self.eat(TokenType.TIME_SEPARATOR) + if sep_value == "点": + unit = "hour" + # Optional 钟 + if self.current_token.type == TokenType.ZHONG: + self.eat(TokenType.ZHONG) + # If we have "X点" without a direction, this is likely an absolute time + # Check if there's a direction after + if not (self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD or + self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD): + # This is probably an absolute time, not relative time + # Push back the number and break + break + elif sep_value == "分": + unit = "minute" + # Optional 钟 + if self.current_token.type == TokenType.ZHONG: + self.eat(TokenType.ZHONG) + elif sep_value == "秒": + unit = "second" + else: + # If no unit specified, but we have a number followed by a direction, + # assume it's hours + if (self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD or + self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD): + unit = "hour" + else: + # If no unit and no direction, this might not be a relative time expression + # Push the number back and break + # We can't easily push back, so let's break + break + + # Check for direction (后/前) + if self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD: + self.eat(TokenType.RELATIVE_DIRECTION_FORWARD) + direction = 1 + elif self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD: + self.eat(TokenType.RELATIVE_DIRECTION_BACKWARD) + direction = -1 + + # Apply the value based on unit + if unit == "hour": + hours += number_value * direction + elif unit == "minute": + minutes += number_value * direction + elif unit == "second": + seconds += number_value * direction + continue + + # If we still haven't handled the current token, break + break + + return RelativeTimeNode(hours=hours, minutes=minutes, seconds=seconds) + + def parse_time_expression(self) -> TimeExpressionNode: + """Parse a complete time expression.""" + date_node = None + time_node = None + relative_date_node = None + relative_time_node = None + weekday_node = None + + # Parse different parts of the expression + while self.current_token.type != TokenType.EOF: + # Try to parse date first (absolute dates should take precedence) + if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]: + if date_node is None: + original_pos = self.pos + try: + date_node = self.parse_date() + continue + except ParserError: + # Reset position if parsing failed + self.pos = original_pos + pass + + # Try to parse relative date + if self.current_token.type in [ + TokenType.RELATIVE_TODAY, TokenType.RELATIVE_TOMORROW, + TokenType.RELATIVE_DAY_AFTER_TOMORROW, TokenType.RELATIVE_THREE_DAYS_AFTER_TOMORROW, + TokenType.RELATIVE_YESTERDAY, TokenType.RELATIVE_DAY_BEFORE_YESTERDAY, + TokenType.RELATIVE_THREE_DAYS_BEFORE_YESTERDAY, + TokenType.INTEGER, TokenType.CHINESE_NUMBER, # For patterns like "X年后", "X个月后", etc. + TokenType.RELATIVE_NEXT, TokenType.RELATIVE_LAST, TokenType.RELATIVE_THIS + ]: + if relative_date_node is None: + original_pos = self.pos + try: + relative_date_node = self.parse_relative_date() + continue + except ParserError: + # Reset position if parsing failed + self.pos = original_pos + pass + + # Try to parse relative time first (since it can have numbers) + if self.current_token.type in [ + TokenType.INTEGER, TokenType.CHINESE_NUMBER, + TokenType.HALF, TokenType.QUARTER, + TokenType.RELATIVE_DIRECTION_FORWARD, TokenType.RELATIVE_DIRECTION_BACKWARD + ]: + if relative_time_node is None: + original_pos = self.pos + try: + relative_time_node = self.parse_relative_time() + # Only continue if we actually parsed some relative time + if relative_time_node.hours != 0 or relative_time_node.minutes != 0 or relative_time_node.seconds != 0: + continue + else: + # If we didn't parse any relative time, reset position + self.pos = original_pos + except ParserError: + # Reset position if parsing failed + self.pos = original_pos + pass + + # Try to parse time + if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER, TokenType.TIME_SEPARATOR, TokenType.PERIOD_AM, TokenType.PERIOD_PM]: + if time_node is None: + original_pos = self.pos + try: + time_node = self.parse_time() + continue + except ParserError: + # Reset position if parsing failed + self.pos = original_pos + pass + + # Try to parse time + if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER, TokenType.TIME_SEPARATOR, TokenType.PERIOD_AM, TokenType.PERIOD_PM]: + if time_node is None: + original_pos = self.pos + try: + time_node = self.parse_time() + continue + except ParserError: + # Reset position if parsing failed + self.pos = original_pos + pass + + # Try to parse weekday + if self.current_token.type in [ + TokenType.WEEK_SCOPE_CURRENT, TokenType.WEEK_SCOPE_LAST, TokenType.WEEK_SCOPE_NEXT, + TokenType.WEEKDAY_MONDAY, TokenType.WEEKDAY_TUESDAY, TokenType.WEEKDAY_WEDNESDAY, + TokenType.WEEKDAY_THURSDAY, TokenType.WEEKDAY_FRIDAY, TokenType.WEEKDAY_SATURDAY, + TokenType.WEEKDAY_SUNDAY + ]: + if weekday_node is None: + original_pos = self.pos + try: + weekday_node = self.parse_weekday() + continue + except ParserError: + # Reset position if parsing failed + self.pos = original_pos + pass + + # If we get here and couldn't parse anything, skip the token + self.pos += 1 + + return TimeExpressionNode( + date=date_node, + time=time_node, + relative_date=relative_date_node, + relative_time=relative_time_node, + weekday=weekday_node + ) + + def parse(self) -> TimeExpressionNode: + """Parse the complete time expression and return the AST.""" + return self.parse_time_expression() \ No newline at end of file diff --git a/konabot/common/ptimeparse/ptime_ast.py b/konabot/common/ptimeparse/ptime_ast.py new file mode 100644 index 0000000..e2de608 --- /dev/null +++ b/konabot/common/ptimeparse/ptime_ast.py @@ -0,0 +1,72 @@ +""" +Abstract Syntax Tree (AST) nodes for the time expression parser. +""" + +from abc import ABC, abstractmethod +from typing import Optional, List +from dataclasses import dataclass +import datetime + + +@dataclass +class ASTNode(ABC): + """Base class for all AST nodes.""" + pass + + +@dataclass +class NumberNode(ASTNode): + """Represents a numeric value.""" + value: int + + +@dataclass +class DateNode(ASTNode): + """Represents a date specification.""" + year: Optional[ASTNode] + month: Optional[ASTNode] + day: Optional[ASTNode] + + +@dataclass +class TimeNode(ASTNode): + """Represents a time specification.""" + hour: Optional[ASTNode] + minute: Optional[ASTNode] + second: Optional[ASTNode] + is_24hour: bool = False + period: Optional[str] = None # AM or PM + + +@dataclass +class RelativeDateNode(ASTNode): + """Represents a relative date specification.""" + years: int = 0 + months: int = 0 + weeks: int = 0 + days: int = 0 + + +@dataclass +class RelativeTimeNode(ASTNode): + """Represents a relative time specification.""" + hours: float = 0.0 + minutes: float = 0.0 + seconds: float = 0.0 + + +@dataclass +class WeekdayNode(ASTNode): + """Represents a weekday specification.""" + weekday: int # 0=Monday, 6=Sunday + scope: str # current, last, next + + +@dataclass +class TimeExpressionNode(ASTNode): + """Represents a complete time expression.""" + date: Optional[DateNode] = None + time: Optional[TimeNode] = None + relative_date: Optional[RelativeDateNode] = None + relative_time: Optional[RelativeTimeNode] = None + weekday: Optional[WeekdayNode] = None \ No newline at end of file diff --git a/konabot/common/ptimeparse/ptime_token.py b/konabot/common/ptimeparse/ptime_token.py new file mode 100644 index 0000000..553410b --- /dev/null +++ b/konabot/common/ptimeparse/ptime_token.py @@ -0,0 +1,95 @@ +""" +Token definitions for the time parser. +""" + +from enum import Enum +from typing import Union +from dataclasses import dataclass + + +class TokenType(Enum): + """Types of tokens recognized by the lexer.""" + + # Numbers + INTEGER = "INTEGER" + CHINESE_NUMBER = "CHINESE_NUMBER" + + # Time units + YEAR = "YEAR" + MONTH = "MONTH" + DAY = "DAY" + WEEK = "WEEK" + HOUR = "HOUR" + MINUTE = "MINUTE" + SECOND = "SECOND" + + # Date separators + DATE_SEPARATOR = "DATE_SEPARATOR" # -, /, 年, 月, 日, 号 + + # Time separators + TIME_SEPARATOR = "TIME_SEPARATOR" # :, 点, 时, 分, 秒 + + # Period indicators + PERIOD_AM = "PERIOD_AM" # 上午, 早上, 早晨, etc. + PERIOD_PM = "PERIOD_PM" # 下午, 晚上, 中午, etc. + + # Relative time + RELATIVE_TODAY = "RELATIVE_TODAY" # 今天, 今晚, 今早, etc. + RELATIVE_TOMORROW = "RELATIVE_TOMORROW" # 明天 + RELATIVE_DAY_AFTER_TOMORROW = "RELATIVE_DAY_AFTER_TOMORROW" # 后天 + RELATIVE_THREE_DAYS_AFTER_TOMORROW = "RELATIVE_THREE_DAYS_AFTER_TOMORROW" # 大后天 + RELATIVE_YESTERDAY = "RELATIVE_YESTERDAY" # 昨天 + RELATIVE_DAY_BEFORE_YESTERDAY = "RELATIVE_DAY_BEFORE_YESTERDAY" # 前天 + RELATIVE_THREE_DAYS_BEFORE_YESTERDAY = "RELATIVE_THREE_DAYS_BEFORE_YESTERDAY" # 大前天 + RELATIVE_DIRECTION_FORWARD = "RELATIVE_DIRECTION_FORWARD" # 后, 以后, 之后 + RELATIVE_DIRECTION_BACKWARD = "RELATIVE_DIRECTION_BACKWARD" # 前, 以前, 之前 + + # Extended relative time + RELATIVE_NEXT = "RELATIVE_NEXT" # 下 + RELATIVE_LAST = "RELATIVE_LAST" # 上, 去 + RELATIVE_THIS = "RELATIVE_THIS" # 这, 本 + + # Week days + WEEKDAY_MONDAY = "WEEKDAY_MONDAY" + WEEKDAY_TUESDAY = "WEEKDAY_TUESDAY" + WEEKDAY_WEDNESDAY = "WEEKDAY_WEDNESDAY" + WEEKDAY_THURSDAY = "WEEKDAY_THURSDAY" + WEEKDAY_FRIDAY = "WEEKDAY_FRIDAY" + WEEKDAY_SATURDAY = "WEEKDAY_SATURDAY" + WEEKDAY_SUNDAY = "WEEKDAY_SUNDAY" + + # Week scope + WEEK_SCOPE_CURRENT = "WEEK_SCOPE_CURRENT" # 本 + WEEK_SCOPE_LAST = "WEEK_SCOPE_LAST" # 上 + WEEK_SCOPE_NEXT = "WEEK_SCOPE_NEXT" # 下 + + # Special time markers + HALF = "HALF" # 半 + QUARTER = "QUARTER" # 一刻 + ZHENG = "ZHENG" # 整 + ZHONG = "ZHONG" # 钟 + + # Student-friendly time expressions + EARLY_MORNING = "EARLY_MORNING" # 早X + LATE_NIGHT = "LATE_NIGHT" # 晚X + + # Whitespace + WHITESPACE = "WHITESPACE" + + # End of input + EOF = "EOF" + + +@dataclass +class Token: + """Represents a single token from the lexer.""" + + type: TokenType + value: Union[str, int] + position: int + + def __str__(self): + return f"Token({self.type.value}, {repr(self.value)}, {self.position})" + + def __repr__(self): + return self.__str__() \ No newline at end of file diff --git a/konabot/common/ptimeparse/semantic.py b/konabot/common/ptimeparse/semantic.py new file mode 100644 index 0000000..fb1bd12 --- /dev/null +++ b/konabot/common/ptimeparse/semantic.py @@ -0,0 +1,369 @@ +""" +Semantic analyzer for time expressions that evaluates the AST and produces datetime objects. +""" + +import datetime +import calendar +from typing import Optional + +from .ptime_ast import ( + TimeExpressionNode, DateNode, TimeNode, + RelativeDateNode, RelativeTimeNode, WeekdayNode, NumberNode +) +from .err import TokenUnhandledException, MultipleSpecificationException + + +class SemanticAnalyzer: + """Semantic analyzer that evaluates time expression ASTs.""" + + def __init__(self, now: Optional[datetime.datetime] = None): + self.now = now or datetime.datetime.now() + + def evaluate_number(self, node: NumberNode) -> int: + """Evaluate a number node.""" + return node.value + + def evaluate_date(self, node: DateNode) -> datetime.date: + """Evaluate a date node.""" + year = self.now.year + month = 1 + day = 1 + + if node.year is not None: + year = self.evaluate_number(node.year) + if node.month is not None: + month = self.evaluate_number(node.month) + if node.day is not None: + day = self.evaluate_number(node.day) + + return datetime.date(year, month, day) + + def evaluate_time(self, node: TimeNode) -> datetime.time: + """Evaluate a time node.""" + hour = 0 + minute = 0 + second = 0 + + if node.hour is not None: + hour = self.evaluate_number(node.hour) + if node.minute is not None: + minute = self.evaluate_number(node.minute) + if node.second is not None: + second = self.evaluate_number(node.second) + + # Handle 24-hour vs 12-hour format + if not node.is_24hour and node.period is not None: + if node.period == "AM": + if hour == 12: + hour = 0 + elif node.period == "PM": + if hour != 12 and hour <= 12: + hour += 12 + + # Validate time values + if not (0 <= hour <= 23): + raise TokenUnhandledException(f"Invalid hour: {hour}") + if not (0 <= minute <= 59): + raise TokenUnhandledException(f"Invalid minute: {minute}") + if not (0 <= second <= 59): + raise TokenUnhandledException(f"Invalid second: {second}") + + return datetime.time(hour, minute, second) + + def evaluate_relative_date(self, node: RelativeDateNode) -> datetime.timedelta: + """Evaluate a relative date node.""" + # Start with current time + result = self.now + + # Special case: If weeks contains a target day (hacky way to pass target day info) + # This is for patterns like "下个月五号" + if node.weeks > 0 and node.weeks <= 31: # Valid day range + target_day = node.weeks + + # Calculate the target month + if node.months != 0: + # Handle month arithmetic carefully + total_months = result.month + node.months - 1 + new_year = result.year + total_months // 12 + new_month = total_months % 12 + 1 + + # Handle day overflow (e.g., Jan 31 + 1 month = Feb 28/29) + max_day_in_target_month = calendar.monthrange(new_year, new_month)[1] + target_day = min(target_day, max_day_in_target_month) + + try: + result = result.replace(year=new_year, month=new_month, day=target_day) + except ValueError: + # Handle edge cases + result = result.replace(year=new_year, month=new_month, day=max_day_in_target_month) + + # Return the difference between the new date and the original date + return result - self.now + + # Apply years + if node.years != 0: + # Handle year arithmetic carefully due to leap years + new_year = result.year + node.years + try: + result = result.replace(year=new_year) + except ValueError: + # Handle leap year edge case (Feb 29 -> Feb 28) + result = result.replace(year=new_year, month=2, day=28) + + # Apply months + if node.months != 0: + # Check if this is a special marker for absolute month (negative offset) + if node.months < 0: + # This is an absolute month specification (e.g., from "明年五月") + absolute_month = node.months + 100 + if 1 <= absolute_month <= 12: + result = result.replace(year=result.year, month=absolute_month, day=result.day) + else: + # Handle month arithmetic carefully + total_months = result.month + node.months - 1 + new_year = result.year + total_months // 12 + new_month = total_months % 12 + 1 + + # Handle day overflow (e.g., Jan 31 + 1 month = Feb 28/29) + new_day = min(result.day, calendar.monthrange(new_year, new_month)[1]) + + result = result.replace(year=new_year, month=new_month, day=new_day) + + # Apply weeks and days + if node.weeks != 0 or node.days != 0: + delta_days = node.weeks * 7 + node.days + result = result + datetime.timedelta(days=delta_days) + + return result - self.now + + def evaluate_relative_time(self, node: RelativeTimeNode) -> datetime.timedelta: + """Evaluate a relative time node.""" + # Convert all values to seconds for precise calculation + total_seconds = ( + node.hours * 3600 + + node.minutes * 60 + + node.seconds + ) + + return datetime.timedelta(seconds=total_seconds) + + def evaluate_weekday(self, node: WeekdayNode) -> datetime.timedelta: + """Evaluate a weekday node.""" + current_weekday = self.now.weekday() # 0=Monday, 6=Sunday + target_weekday = node.weekday + + if node.scope == "current": + delta = target_weekday - current_weekday + elif node.scope == "last": + delta = target_weekday - current_weekday - 7 + elif node.scope == "next": + delta = target_weekday - current_weekday + 7 + else: + delta = target_weekday - current_weekday + + return datetime.timedelta(days=delta) + + def infer_smart_time(self, hour: int, minute: int = 0, second: int = 0, base_time: Optional[datetime.datetime] = None) -> datetime.datetime: + """ + Smart time inference based on current time. + + For example: + - If now is 14:30 and user says "3点", interpret as 15:00 + - If now is 14:30 and user says "1点", interpret as next day 01:00 + - If now is 8:00 and user says "3点", interpret as 15:00 + - If now is 8:00 and user says "9点", interpret as 09:00 + """ + # Use base_time if provided, otherwise use self.now + now = base_time if base_time is not None else self.now + + # Handle 24-hour format directly (13-23) + if 13 <= hour <= 23: + candidate = now.replace(hour=hour, minute=minute, second=second, microsecond=0) + if candidate <= now: + candidate += datetime.timedelta(days=1) + return candidate + + # Handle 12 (noon/midnight) + if hour == 12: + # For 12 specifically, we need to be more careful + # Try noon first + noon_candidate = now.replace(hour=12, minute=minute, second=second, microsecond=0) + midnight_candidate = now.replace(hour=0, minute=minute, second=second, microsecond=0) + + # Special case: If it's afternoon or evening, "十二点" likely means next day midnight + if now.hour >= 12: + result = midnight_candidate + datetime.timedelta(days=1) + return result + + # If noon is in the future and closer than midnight, use it + if noon_candidate > now and (midnight_candidate <= now or noon_candidate < midnight_candidate): + return noon_candidate + # If midnight is in the future, use it + elif midnight_candidate > now: + return midnight_candidate + # Both are in the past, use the closer one + elif noon_candidate > midnight_candidate: + return noon_candidate + # Otherwise use midnight next day + else: + result = midnight_candidate + datetime.timedelta(days=1) + return result + + # Handle 1-11 (12-hour format) + if 1 <= hour <= 11: + # Calculate 12-hour format candidates + pm_hour = hour + 12 + pm_candidate = now.replace(hour=pm_hour, minute=minute, second=second, microsecond=0) + am_candidate = now.replace(hour=hour, minute=minute, second=second, microsecond=0) + + # Special case: If it's afternoon (12:00-18:00) and the hour is 1-6, + # user might mean either PM today or AM tomorrow. + # But if PM is in the future, that's more likely what they mean. + if 12 <= now.hour <= 18 and 1 <= hour <= 6: + if pm_candidate > now: + return pm_candidate + else: + # PM is in the past, so use AM tomorrow + result = am_candidate + datetime.timedelta(days=1) + return result + + # Special case: If it's late evening (after 22:00) and user specifies early morning hours (1-5), + # user likely means next day early morning + if now.hour >= 22 and 1 <= hour <= 5: + result = am_candidate + datetime.timedelta(days=1) + return result + + # Special case: In the morning (0-12:00) + if now.hour < 12: + # In the morning, for hours 1-11, generally prefer AM interpretation + # unless it's a very early hour that's much earlier than current time + # Only push to next day for very early hours (1-2) that are significantly earlier + if hour <= 2 and hour < now.hour and now.hour - hour >= 6: + # Very early morning hour that's significantly earlier, use next day + result = am_candidate + datetime.timedelta(days=1) + return result + else: + # For morning, generally prefer AM if it's in the future + if am_candidate > now: + return am_candidate + # If PM is in the future, use it + elif pm_candidate > now: + return pm_candidate + # Both are in the past, prefer AM if it's closer + elif am_candidate > pm_candidate: + return am_candidate + # Otherwise use PM next day + else: + result = pm_candidate + datetime.timedelta(days=1) + return result + else: + # General case: choose the one that's in the future and closer + if pm_candidate > now and (am_candidate <= now or pm_candidate < am_candidate): + return pm_candidate + elif am_candidate > now: + return am_candidate + # Both are in the past, use the closer one + elif pm_candidate > am_candidate: + return pm_candidate + # Otherwise use AM next day + else: + result = am_candidate + datetime.timedelta(days=1) + return result + + # Handle 0 (midnight) + if hour == 0: + candidate = now.replace(hour=0, minute=minute, second=second, microsecond=0) + if candidate <= now: + candidate += datetime.timedelta(days=1) + return candidate + + # Default case (should not happen with valid input) + candidate = now.replace(hour=hour, minute=minute, second=second, microsecond=0) + if candidate <= now: + candidate += datetime.timedelta(days=1) + return candidate + + def evaluate(self, node: TimeExpressionNode) -> datetime.datetime: + """Evaluate a complete time expression node.""" + result = self.now + + # Apply relative date (should set time to 00:00:00 for dates) + if node.relative_date is not None: + delta = self.evaluate_relative_date(node.relative_date) + result = result + delta + # For relative dates like "今天", "明天", set time to 00:00:00 + # But only for cases where we're dealing with days, not years/months + if (node.date is None and node.time is None and node.weekday is None and + node.relative_date.years == 0 and node.relative_date.months == 0): + result = result.replace(hour=0, minute=0, second=0, microsecond=0) + + # Apply weekday + if node.weekday is not None: + delta = self.evaluate_weekday(node.weekday) + result = result + delta + # For weekdays, set time to 00:00:00 + if node.date is None and node.time is None: + result = result.replace(hour=0, minute=0, second=0, microsecond=0) + + # Apply relative time + if node.relative_time is not None: + delta = self.evaluate_relative_time(node.relative_time) + result = result + delta + + # Apply absolute date + if node.date is not None: + date = self.evaluate_date(node.date) + result = result.replace(year=date.year, month=date.month, day=date.day) + # For absolute dates without time, set time to 00:00:00 + if node.time is None: + result = result.replace(hour=0, minute=0, second=0, microsecond=0) + + # Apply time + if node.time is not None: + time = self.evaluate_time(node.time) + + # Handle explicit period or student-friendly expressions + if node.time.is_24hour or node.time.period is not None: + # Handle explicit period + if not node.time.is_24hour and node.time.period is not None: + hour = time.hour + minute = time.minute + second = time.second + + if node.time.period == "AM": + if hour == 12: + hour = 0 + elif node.time.period == "PM": + # Special case: "晚上十二点" should be interpreted as next day 00:00 + if hour == 12 and minute == 0 and second == 0: + # Move to next day at 00:00:00 + result = result.replace(hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=1) + # Skip the general replacement since we've already handled it + skip_general_replacement = True + else: + # For other PM times, convert to 24-hour format + if hour != 12 and hour <= 12: + hour += 12 + + # Validate hour + if not (0 <= hour <= 23): + raise TokenUnhandledException(f"Invalid hour: {hour}") + + # Only do general replacement if we haven't handled it specially + if not locals().get('skip_general_replacement', False): + result = result.replace(hour=hour, minute=minute, second=second, microsecond=0) + else: + # Already in 24-hour format + result = result.replace(hour=time.hour, minute=time.minute, second=time.second, microsecond=0) + else: + # Use smart time inference for regular times + # But if we have an explicit date, treat the time as 24-hour format + if node.date is not None or node.relative_date is not None: + # For explicit dates, treat time as 24-hour format + result = result.replace(hour=time.hour, minute=time.minute or 0, second=time.second or 0, microsecond=0) + else: + # Use smart time inference for regular times + smart_time = self.infer_smart_time(time.hour, time.minute, time.second, base_time=result) + result = smart_time + + return result \ No newline at end of file diff --git a/konabot/plugins/simple_notify/__init__.py b/konabot/plugins/simple_notify/__init__.py index 5d24bcf..972dc5c 100644 --- a/konabot/plugins/simple_notify/__init__.py +++ b/konabot/plugins/simple_notify/__init__.py @@ -3,6 +3,7 @@ import asyncio as asynkio from math import ceil from pathlib import Path from typing import Any +import datetime import nanoid import nonebot @@ -13,7 +14,7 @@ from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg from pydantic import BaseModel from konabot.common.longtask import DepLongTaskTarget, LongTask, create_longtask, handle_long_task, longtask_data -from konabot.common.ptimeparse import Parser +from konabot.common.ptimeparse import parse evt = on_message() @@ -84,7 +85,7 @@ async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget): notify_time, notify_text = segments try: - target_time = Parser().parse(notify_time) + target_time = parse(notify_time) logger.info(f"从 {notify_time} 解析出了时间:{target_time}") except Exception: logger.info(f"无法从 {notify_time} 中解析出时间") diff --git a/scripts/watch_filter.py b/scripts/watch_filter.py index dff78eb..d02dd1c 100644 --- a/scripts/watch_filter.py +++ b/scripts/watch_filter.py @@ -12,4 +12,5 @@ def filter(change: Change, path: str) -> bool: return False if Path(path).absolute().is_relative_to((base / ".git").absolute()): return False + print(path) return True