Compare commits

...

43 Commits

Author SHA1 Message Date
40be5ce335 fx 完善
All checks were successful
continuous-integration/drone/push Build is passing
2025-12-02 14:45:07 +08:00
8e6131473d fximage 2025-12-02 12:17:11 +08:00
78bda5fc0a 添加云盾
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-28 16:59:58 +08:00
97658a6c56 补充文档
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-28 16:54:29 +08:00
3fedc685a9 没有人需要的提取首字母功能 2025-11-28 16:51:16 +08:00
d1a3e44c45 调整日志等级和内容
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-26 13:09:37 +08:00
f637778173 完成排行榜部分
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-26 13:02:26 +08:00
145bfedf67 Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-25 14:29:46 +08:00
61b9d733a5 添加阿里绿网云盾 API 2025-11-25 14:29:26 +08:00
ae59c20e2f 添加对 pyrightconfig 的 ignore,方便使用其他 IDE 的人使用 config 文件指定虚拟环境位置等
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-24 18:36:06 +08:00
0b7d21aeb0 新增一条示例,以便处理几百年的(
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-22 01:22:42 +08:00
d6ede3e6cd 标准化时间的解析
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-21 16:56:00 +08:00
07ace8e6e9 就是加个 Y 的事情(
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-21 16:19:19 +08:00
6f08c22b5b LLM 胜利了!!!!!!
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-21 16:13:38 +08:00
3e5c1941c8 重构 ptimeparse 模块
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-21 06:03:28 +08:00
f6e7dfcd93 空调最高峰,空调数据库挂载再优化
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-19 16:24:24 +08:00
1233677eea 将成语接龙还原为内存存储,空调优化为部分内存存储且具有过期期限,避免频繁数据库查询
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-19 11:04:13 +08:00
00bdb90e3c Merge pull request '为此方 Bot 接入数据库' (#49) from database into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #49
2025-11-19 00:52:01 +08:00
988965451b 坏坏 AI 怎么把 diff 文件交上去了 2025-11-19 00:47:24 +08:00
f6fadb7226 Qwen 说让我再改点东西所以改了,强化了数据库相关的事情 2025-11-19 00:44:44 +08:00
0d540eea4c 我拿 AI 改坏枪代码! 2025-11-18 23:55:31 +08:00
f21da657db database 接入 2025-11-18 19:36:05 +08:00
a8a7b62f76 修复 playwright 在不同源的版本不同导致的问题
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-18 02:22:35 +08:00
789500842c wocao1
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-15 20:31:38 +08:00
2f22f11d57 调整 Gif 图渲染策略
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-15 20:16:42 +08:00
eff25435e3 让 MAN 使用坏枪的 Markdown 处理器
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-11 01:47:10 +08:00
df28fad697 调整信息 2025-11-11 01:24:29 +08:00
561f6981aa 答题必须 At bot
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-11 01:20:24 +08:00
2632215af9 补充 MAN
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-11 01:06:11 +08:00
bfde559892 添加一个可供管理的订阅制模块,并且接入 KonaPH 2025-11-11 00:53:17 +08:00
857f8c5955 Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot 2025-11-10 22:12:33 +08:00
500053e630 更稳定的 MarkDown 和 LaTeX 生成!
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-10 21:59:45 +08:00
30cfb4cadd 添加 Justfile 相关库,简化项目启动流程 2025-11-10 21:23:41 +08:00
e2f99af73b 将浏览器依赖放在最最前面安装,以保证依赖更新时,尽可能不用重装浏览器
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-10 05:00:51 +08:00
e09de9eeb6 更改使用 uv 而非 poetry 管理 Docker 内部依赖
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-10 04:41:05 +08:00
4a3b49ce79 德摩根律(
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-09 23:47:26 +08:00
03900f4416 成语接龙接入 LLM 和 MarkDown、LaTeX 接入
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-09 23:12:04 +08:00
62f4195e46 Merge pull request '让豆包水印使用相对大小' (#47) from feature/doubao-watermark into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #47
2025-11-07 21:17:16 +08:00
751297e3bc Merge branch 'master' into feature/doubao-watermark 2025-11-07 21:17:09 +08:00
b450998f3f 让豆包水印使用相对大小 2025-11-07 21:15:19 +08:00
ae6297b98d Merge pull request '添加豆包水印' (#46) from feature/doubao-watermark into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #46
2025-11-07 19:18:41 +08:00
dacae29054 添加豆包水印 2025-11-07 19:18:24 +08:00
8acb546c6a Merge pull request '让浏览器等久一点' (#42) from feature/konaweb into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #42
2025-11-07 02:41:49 +08:00
103 changed files with 7484 additions and 1919 deletions

View File

@ -38,6 +38,14 @@ steps:
path: /var/run/docker.sock
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_plugin_load.py
- name: 在容器中测试 Playwright 工作正常
image: docker:dind
privileged: true
volumes:
- name: docker-socket
path: /var/run/docker.sock
commands:
- docker run --rm gitea.service.jazzwhom.top/mttu-developers/konabot:nightly-${DRONE_COMMIT_SHA} python scripts/test_playwright.py
- name: 发送构建结果到 ntfy
image: parrazam/drone-ntfy
when:

View File

@ -1,4 +1,4 @@
ENVIRONMENT=dev
PORT=21333
DATABASE_PATH="./data/database.db"
ENABLE_CONSOLE=true

9
.gitignore vendored
View File

@ -1,4 +1,11 @@
# 基本的数据文件,以及环境用文件
/.env
/data
/pyrightconfig.json
/pyrightconfig.toml
__pycache__
# 缓存文件
__pycache__
# 可能会偶然生成的 diff 文件
/*.diff

View File

@ -2,7 +2,7 @@ FROM python:3.13-slim AS base
ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH" \
PLAYWRIGHT_BROWSERS_PATH=0
PLAYWRIGHT_BROWSERS_PATH=/usr/lib/pw-browsers
# 安装所有都需要的底层依赖
RUN apt-get update && \
@ -19,7 +19,6 @@ RUN apt-get update && \
&& rm -rf /var/lib/apt/lists/*
FROM base AS builder
# 安装构建依赖
@ -27,17 +26,12 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential cmake git \
&& rm -rf /var/lib/apt/lists/*
ENV POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_IN_PROJECT=1 \
POETRY_VIRTUALENVS_CREATE=1 \
POETRY_CACHE_DIR=/tmp/poetry_cache
RUN pip install --no-cache-dir uv
WORKDIR /app
RUN pip install --no-cache-dir poetry
COPY pyproject.toml poetry.lock ./
RUN python -m poetry install --no-root && rm -rf $POETRY_CACHE_DIR
RUN uv sync --no-install-project

187
QWEN.md Normal file
View File

@ -0,0 +1,187 @@
# Konabot Project Context
## Project Overview
Konabot is a multi-platform chatbot built using the NoneBot2 framework, primarily used within MTTU (likely an organization or community). The bot supports multiple adapters including Discord, QQ (via Onebot), Minecraft, and Console interfaces.
### Key Features
- Multi-platform support (Discord, QQ, Minecraft, Console)
- Rich plugin ecosystem with over 20 built-in plugins
- Asynchronous database system with connection pooling (SQLite-based)
- Advanced image processing capabilities
- Integration with external services like Bilibili analysis
- Support for Large Language Models (LLM)
- Web rendering capabilities for advanced image generation
## Technology Stack
- **Framework**: NoneBot2
- **Language**: Python 3.12+
- **Dependency Management**: Poetry
- **Database**: SQLite with aiosqlite for async operations
- **Build System**: Just (task runner)
- **Containerization**: Docker
- **CI/CD**: Drone CI
- **Testing**: Pytest
## Project Structure
```
konabot/
├── bot.py # Main entry point
├── pyproject.toml # Project dependencies and metadata
├── justfile # Task definitions
├── Dockerfile # Container build definition
├── .drone.yml # CI/CD pipeline configuration
├── konabot/ # Main source code
│ ├── common/ # Shared utilities and modules
│ │ ├── database/ # Async database manager with connection pooling
│ │ ├── llm/ # Large Language Model integration
│ │ ├── web_render/ # Web-based image rendering
│ │ └── ... # Other utilities
│ ├── plugins/ # Plugin modules (core functionality)
│ │ ├── air_conditioner/
│ │ ├── bilibili_fetch/
│ │ ├── gen_qrcode/
│ │ ├── hanzi/
│ │ ├── idiomgame/
│ │ ├── image_process/
│ │ ├── roll_dice/
│ │ ├── weather/
│ │ └── ... (20+ plugins)
│ └── test/
├── tests/ # Test suite
├── scripts/ # Utility scripts
├── docs/ # Documentation
├── assets/ # Static assets
└── data/ # Runtime data storage
```
## Development Environment Setup
### Prerequisites
- Python 3.12+
- Git
- Poetry (installed via pipx)
### Installation Steps
1. Clone the repository:
```bash
git clone https://gitea.service.jazzwhom.top/Passthem/konabot.git
cd konabot
```
2. Install dependencies:
```bash
poetry install
```
3. Configure environment:
- Copy `.env.example` to `.env`
- Modify settings as needed for your platform adapters
### Platform Adapters Configuration
- **Discord**: Set `ENABLE_DISCORD=true` and configure bot token
- **QQ (Onebot)**: Set `ENABLE_QQ=true` and configure connection
- **Console**: Enabled by default, disable with `ENABLE_CONSOLE=false`
- **Minecraft**: Set `ENABLE_MINECRAFT=true`
## Building and Running
### Development
- Auto-reload development mode:
```bash
poetry run just watch
```
- Manual start:
```bash
poetry run python bot.py
```
### Production
- Docker container build and run:
```bash
docker build -t konabot .
docker run konabot
```
## Testing
Run the test suite with:
```bash
poetry run pytest
```
Tests are located in the `tests/` directory and focus primarily on core functionality like the database manager.
## Database System
The project implements a custom asynchronous database manager (`konabot/common/database/__init__.py`) with these features:
- Connection pooling for performance
- Parameterized queries for security
- SQL file execution support
- Support for both string and Path objects for file paths
- Automatic resource management
Example usage:
```python
from konabot.common.database import DatabaseManager
db = DatabaseManager()
results = await db.query("SELECT * FROM users WHERE age > ?", (18,))
await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("John", "john@example.com"))
```
## Plugin Architecture
Plugins are organized in `konabot/plugins/` and follow the NoneBot2 plugin structure. Each plugin typically consists of:
- `__init__.py`: Main plugin logic using Alconna command parser
- Supporting modules for specific functionality
Popular plugins include:
- `roll_dice`: Dice rolling with image generation
- `weather`: Weather radar image fetching
- `bilibili_fetch`: Bilibili video analysis
- `image_process`: Image manipulation tools
- `markdown`: Markdown rendering
## CI/CD Pipeline
Drone CI is configured with two pipelines:
1. **Nightly builds**: Triggered on pushes to master branch
2. **Release builds**: Triggered on git tags
Both pipelines:
- Build Docker images
- Test plugin loading
- Verify Playwright functionality
- Send notifications via ntfy
## Development Conventions
- Use Poetry for dependency management
- Follow NoneBot2 plugin development patterns
- Write async code for database operations
- Use Alconna for command parsing
- Organize SQL queries in separate files when complex
- Write tests for core functionality
- Document features in the `docs/` directory
## Common Development Tasks
1. **Add a new plugin**:
- Create a new directory in `konabot/plugins/`
- Implement functionality in `__init__.py`
- Use Alconna for command definition
2. **Database operations**:
- Use the `DatabaseManager` class
- Always parameterize queries
- Store complex SQL in separate `.sql` files
3. **Image processing**:
- Leverage existing utilities in `image_process` plugin
- Use Pillow and Skia-Python for advanced graphics
4. **Testing**:
- Add tests to the `tests/` directory
- Use pytest with async support
- Mock external services when needed

View File

@ -71,12 +71,16 @@ code .
详见[konabot-web 配置文档](/docs/konabot-web.md)
#### 数据库配置
本项目使用SQLite作为数据库默认数据库文件位于`./data/database.db`。可以通过设置`DATABASE_PATH`环境变量来指定其他位置。
### 运行
使用命令行手动启动 Bot
```bash
poetry run watchfiles bot.main . --filter scripts.watch_filter.filter
poetry run just watch
```
如果你不希望自动重载,只是想运行 Bot可以直接运行
@ -91,3 +95,7 @@ poetry run python bot.py
- [事件响应器](https://nonebot.dev/docs/tutorial/matcher)
- [事件处理](https://nonebot.dev/docs/tutorial/handler)
- [Alconna 插件](https://nonebot.dev/docs/best-practice/alconna/)
## 数据库模块
本项目的数据库模块已更新为异步实现,使用连接池来提高性能,并支持现代的`pathlib.Path`参数类型。详细使用方法请参考[数据库使用文档](/docs/database.md)。

BIN
assets/img/meme/doubao.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.0 KiB

9
bot.py
View File

@ -10,6 +10,8 @@ from nonebot.adapters.onebot.v11 import Adapter as OnebotAdapter
from konabot.common.log import init_logger
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.path import LOG_PATH
from konabot.common.database import get_global_db_manager
dotenv.load_dotenv()
env = os.environ.get("ENVIRONMENT", "prod")
@ -48,6 +50,13 @@ def main():
nonebot.load_plugins("konabot/plugins")
nonebot.load_plugin("nonebot_plugin_analysis_bilibili")
# 注册关闭钩子
@driver.on_shutdown
async def shutdown_handler():
# 关闭全局数据库管理器
db_manager = get_global_db_manager()
await db_manager.close_all_connections()
nonebot.run()
if __name__ == "__main__":

223
docs/database.md Normal file
View File

@ -0,0 +1,223 @@
# 数据库系统使用文档
本文档详细介绍了本项目中使用的异步数据库系统,包括其架构设计、使用方法和最佳实践。
## 系统概述
本项目的数据库系统基于 `aiosqlite` 库构建,提供了异步的 SQLite 数据库访问接口。系统主要特性包括:
1. **异步操作**:完全支持异步/await模式适配NoneBot2框架
2. **连接池**:内置连接池机制,提高数据库访问性能
3. **参数化查询**支持安全的参数化查询防止SQL注入
4. **SQL文件支持**可以直接执行SQL文件中的脚本
5. **类型支持**:支持 `pathlib.Path``str` 类型的路径参数
## 核心类和方法
### DatabaseManager 类
`DatabaseManager` 是数据库操作的核心类,提供了以下主要方法:
#### 初始化
```python
from konabot.common.database import DatabaseManager
from pathlib import Path
# 使用默认数据库路径
db = DatabaseManager()
# 指定了义数据库路径
db = DatabaseManager("./data/myapp.db")
db = DatabaseManager(Path("./data/myapp.db"))
```
#### 查询操作
```python
# 执行查询语句并返回结果
results = await db.query("SELECT * FROM users WHERE age > ?", (18,))
# 从SQL文件执行查询
results = await db.query_by_sql_file("./sql/get_users.sql", (18,))
```
#### 执行操作
```python
# 执行非查询语句
await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("张三", "zhangsan@example.com"))
# 执行SQL脚本不带参数
await db.execute_script("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
);
INSERT INTO users (name, email) VALUES ('测试用户', 'test@example.com');
""")
# 从SQL文件执行非查询语句
await db.execute_by_sql_file("./sql/create_tables.sql")
# 带参数执行SQL文件
await db.execute_by_sql_file("./sql/insert_user.sql", ("张三", "zhangsan@example.com"))
# 执行多条语句(每条语句使用相同参数)
await db.execute_many("INSERT INTO users (name, email) VALUES (?, ?)", [
("张三", "zhangsan@example.com"),
("李四", "lisi@example.com"),
("王五", "wangwu@example.com")
])
# 从SQL文件执行多条语句每条语句使用相同参数
await db.execute_many_values_by_sql_file("./sql/batch_insert.sql", [
("张三", "zhangsan@example.com"),
("李四", "lisi@example.com")
])
```
## SQL文件处理机制
### 单语句SQL文件
```sql
-- insert_user.sql
INSERT INTO users (name, email) VALUES (?, ?);
```
```python
# 使用方式
await db.execute_by_sql_file("./sql/insert_user.sql", ("张三", "zhangsan@example.com"))
```
### 多语句SQL文件
```sql
-- setup.sql
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
);
CREATE TABLE IF NOT EXISTS profiles (
user_id INTEGER,
age INTEGER,
FOREIGN KEY (user_id) REFERENCES users(id)
);
```
```python
# 使用方式
await db.execute_by_sql_file("./sql/setup.sql")
```
### 多语句带不同参数的SQL文件
```sql
-- batch_operations.sql
INSERT INTO users (name, email) VALUES (?, ?);
INSERT INTO profiles (user_id, age) VALUES (?, ?);
```
```python
# 使用方式
await db.execute_by_sql_file("./sql/batch_operations.sql", [
("张三", "zhangsan@example.com"), # 第一条语句的参数
(1, 25) # 第二条语句的参数
])
```
## 最佳实践
### 1. 数据库表设计
```sql
-- 推荐的表设计实践
CREATE TABLE IF NOT EXISTS example_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
```
### 2. SQL文件组织
建议按照功能模块组织SQL文件
```
plugin/
├── sql/
│ ├── create_tables.sql
│ ├── insert_data.sql
│ ├── update_data.sql
│ └── query_data.sql
└── __init__.py
```
### 3. 错误处理
```python
try:
results = await db.query("SELECT * FROM users WHERE id = ?", (user_id,))
except Exception as e:
logger.error(f"数据库查询失败: {e}")
# 处理错误情况
```
### 4. 连接管理
```python
# 在应用启动时初始化
db_manager = DatabaseManager()
# 在应用关闭时清理连接
async def shutdown():
await db_manager.close_all_connections()
```
## 高级特性
### 连接池配置
```python
class DatabaseManager:
def __init__(self, db_path: Optional[Union[str, Path]] = None):
# 连接池大小配置
self._pool_size = 5 # 可根据需要调整
```
### 事务支持
```python
# 通过execute方法的自动提交机制支持事务
await db.execute("BEGIN TRANSACTION")
try:
await db.execute("INSERT INTO users (name) VALUES (?)", ("张三",))
await db.execute("INSERT INTO profiles (user_id, age) VALUES (?, ?)", (1, 25))
await db.execute("COMMIT")
except Exception:
await db.execute("ROLLBACK")
raise
```
## 注意事项
1. **异步环境**:所有数据库操作都必须在异步环境中执行
2. **参数安全**始终使用参数化查询避免SQL注入
3. **资源管理**:确保在应用关闭时调用 `close_all_connections()`
4. **SQL解析**:使用 `sqlparse` 库准确解析SQL语句正确处理包含分号的字符串和注释
5. **错误处理**:适当处理数据库操作可能抛出的异常
## 常见问题
### Q: 如何处理数据库约束错误?
A: 确保SQL语句中的字段名正确引用特别是保留字需要使用双引号包围
```sql
CREATE TABLE air_conditioner (
id VARCHAR(128) PRIMARY KEY,
"on" BOOLEAN NOT NULL, -- 使用双引号包围保留字
temperature REAL NOT NULL
);
```
### Q: 如何处理多个语句和参数的匹配?
A: 当SQL文件包含多个语句时参数应该是参数列表每个语句对应一个参数元组
```python
await db.execute_by_sql_file("./sql/batch.sql", [
("参数1", "参数2"), # 第一个语句的参数
("参数3", "参数4") # 第二个语句的参数
])
```
通过遵循这些指南和最佳实践,您可以充分利用本项目的异步数据库系统,构建高性能、安全的数据库应用。

4
justfile Normal file
View File

@ -0,0 +1,4 @@
watch:
poetry run watchfiles bot.main . --filter scripts.watch_filter.filter

View File

@ -0,0 +1,90 @@
import asyncio
import json
from alibabacloud_green20220302.client import Client as AlibabaGreenClient
from alibabacloud_green20220302.models import TextModerationPlusRequest
from alibabacloud_tea_openapi.models import Config as AlibabaTeaConfig
from loguru import logger
from pydantic import BaseModel
import nonebot
class AlibabaGreenPluginConfig(BaseModel):
module_aligreen_enable: bool = False
module_aligreen_access_key_id: str = ""
module_aligreen_access_key_secret: str = ""
module_aligreen_region_id: str = "cn-shenzhen"
module_aligreen_endpoint: str = "green-cip.cn-shenzhen.aliyuncs.com"
module_aligreen_service: str = "llm_query_moderation"
class AlibabaGreen:
_client: AlibabaGreenClient | None = None
_config: AlibabaGreenPluginConfig | None = None
@staticmethod
def get_client() -> AlibabaGreenClient:
assert AlibabaGreen._client is not None
return AlibabaGreen._client
@staticmethod
def get_config() -> AlibabaGreenPluginConfig:
assert AlibabaGreen._config is not None
return AlibabaGreen._config
@staticmethod
def init():
config = nonebot.get_plugin_config(AlibabaGreenPluginConfig)
AlibabaGreen._config = config
if not config.module_aligreen_enable:
logger.info("该环境未启用阿里内容审查,跳过初始化")
return
AlibabaGreen._client = AlibabaGreenClient(AlibabaTeaConfig(
access_key_id=config.module_aligreen_access_key_id,
access_key_secret=config.module_aligreen_access_key_secret,
connect_timeout=10000,
read_timeout=3000,
region_id=config.module_aligreen_region_id,
endpoint=config.module_aligreen_endpoint,
))
@staticmethod
def _detect_sync(content: str) -> bool:
if not AlibabaGreen.get_config().module_aligreen_enable:
logger.debug("该环境未启用阿里内容审查,直接跳过")
return True
client = AlibabaGreen.get_client()
try:
response = client.text_moderation_plus(TextModerationPlusRequest(
service=AlibabaGreen.get_config().module_aligreen_service,
service_parameters=json.dumps({
"content": content,
}),
))
if response.status_code == 200:
result = response.body
logger.info(f"检测违规内容 API 调用成功:{result}")
risk_level: str = result.data.risk_level or "none"
if risk_level == "high":
return False
return True
logger.error(f"检测违规内容 API 调用失败:{response}")
return True
except Exception as e:
logger.error("检测违规内容 API 调用失败")
logger.exception(e)
return True
@staticmethod
async def detect(content: str) -> bool:
return await asyncio.to_thread(AlibabaGreen._detect_sync, content)
driver = nonebot.get_driver()
@driver.on_startup
async def _():
AlibabaGreen.init()

View File

@ -0,0 +1,218 @@
import os
import asyncio
import sqlparse
from pathlib import Path
from typing import List, Dict, Any, Optional, Union, TYPE_CHECKING
import aiosqlite
if TYPE_CHECKING:
from . import DatabaseManager
# 全局数据库管理器实例
_global_db_manager: Optional['DatabaseManager'] = None
def get_global_db_manager() -> 'DatabaseManager':
"""获取全局数据库管理器实例"""
global _global_db_manager
if _global_db_manager is None:
from . import DatabaseManager
_global_db_manager = DatabaseManager()
return _global_db_manager
def close_global_db_manager() -> None:
"""关闭全局数据库管理器实例"""
global _global_db_manager
if _global_db_manager is not None:
# 注意这个函数应该在async环境中调用close_all_connections
_global_db_manager = None
class DatabaseManager:
"""异步数据库管理器"""
def __init__(self, db_path: Optional[Union[str, Path]] = None, pool_size: int = 5):
"""
初始化数据库管理器
Args:
db_path: 数据库文件路径支持str和Path类型
pool_size: 连接池大小
"""
if db_path is None:
self.db_path = os.environ.get("DATABASE_PATH", "./data/database.db")
else:
self.db_path = str(db_path) if isinstance(db_path, Path) else db_path
# 连接池
self._connection_pool = []
self._pool_size = pool_size
self._lock = asyncio.Lock()
self._in_use = set() # 跟踪正在使用的连接
async def _get_connection(self) -> aiosqlite.Connection:
"""从连接池获取连接"""
async with self._lock:
# 尝试从池中获取现有连接
while self._connection_pool:
conn = self._connection_pool.pop()
# 检查连接是否仍然有效
try:
await conn.execute("SELECT 1")
self._in_use.add(conn)
return conn
except:
# 连接已失效,关闭它
try:
await conn.close()
except:
pass
# 如果连接池为空,创建新连接
conn = await aiosqlite.connect(self.db_path)
await conn.execute("PRAGMA foreign_keys = ON")
self._in_use.add(conn)
return conn
async def _return_connection(self, conn: aiosqlite.Connection) -> None:
"""将连接返回到连接池"""
async with self._lock:
self._in_use.discard(conn)
if len(self._connection_pool) < self._pool_size:
self._connection_pool.append(conn)
else:
# 池已满,直接关闭连接
try:
await conn.close()
except:
pass
async def query(
self, query: str, params: Optional[tuple] = None
) -> List[Dict[str, Any]]:
"""执行查询语句并返回结果"""
conn = await self._get_connection()
try:
cursor = await conn.execute(query, params or ())
columns = [description[0] for description in cursor.description]
rows = await cursor.fetchall()
results = [dict(zip(columns, row)) for row in rows]
await cursor.close()
return results
except Exception as e:
# 记录错误但重新抛出,让调用者处理
raise Exception(f"数据库查询失败: {str(e)}") from e
finally:
await self._return_connection(conn)
async def query_by_sql_file(
self, file_path: Union[str, Path], params: Optional[tuple] = None
) -> List[Dict[str, Any]]:
"""从 SQL 文件中读取查询语句并执行"""
path = str(file_path) if isinstance(file_path, Path) else file_path
with open(path, "r", encoding="utf-8") as f:
query = f.read()
return await self.query(query, params)
async def execute(self, command: str, params: Optional[tuple] = None) -> None:
"""执行非查询语句"""
conn = await self._get_connection()
try:
await conn.execute(command, params or ())
await conn.commit()
except Exception as e:
# 记录错误但重新抛出,让调用者处理
raise Exception(f"数据库执行失败: {str(e)}") from e
finally:
await self._return_connection(conn)
async def execute_script(self, script: str) -> None:
"""执行SQL脚本"""
conn = await self._get_connection()
try:
await conn.executescript(script)
await conn.commit()
except Exception as e:
# 记录错误但重新抛出,让调用者处理
raise Exception(f"数据库脚本执行失败: {str(e)}") from e
finally:
await self._return_connection(conn)
def _parse_sql_statements(self, script: str) -> List[str]:
"""解析SQL脚本分割成独立的语句"""
# 使用sqlparse库更准确地分割SQL语句
parsed = sqlparse.split(script)
statements = []
for statement in parsed:
statement = statement.strip()
if statement:
statements.append(statement)
return statements
async def execute_by_sql_file(
self, file_path: Union[str, Path], params: Optional[Union[tuple, List[tuple]]] = None
) -> None:
"""从 SQL 文件中读取非查询语句并执行"""
path = str(file_path) if isinstance(file_path, Path) else file_path
with open(path, "r", encoding="utf-8") as f:
script = f.read()
# 如果有参数且是元组使用execute执行整个脚本
if params is not None and isinstance(params, tuple):
await self.execute(script, params)
# 如果有参数且是列表,分别执行每个语句
elif params is not None and isinstance(params, list):
# 使用sqlparse准确分割SQL语句
statements = self._parse_sql_statements(script)
if len(statements) != len(params):
raise ValueError(f"语句数量({len(statements)})与参数组数量({len(params)})不匹配")
for statement, stmt_params in zip(statements, params):
if statement:
await self.execute(statement, stmt_params)
# 如果无参数使用executescript
else:
await self.execute_script(script)
async def execute_many(self, command: str, seq_of_params: List[tuple]) -> None:
"""执行多条非查询语句"""
conn = await self._get_connection()
try:
await conn.executemany(command, seq_of_params)
await conn.commit()
except Exception as e:
# 记录错误但重新抛出,让调用者处理
raise Exception(f"数据库批量执行失败: {str(e)}") from e
finally:
await self._return_connection(conn)
async def execute_many_values_by_sql_file(
self, file_path: Union[str, Path], seq_of_params: List[tuple]
) -> None:
"""从 SQL 文件中读取一条语句,但是被不同值同时执行"""
path = str(file_path) if isinstance(file_path, Path) else file_path
with open(path, "r", encoding="utf-8") as f:
command = f.read()
await self.execute_many(command, seq_of_params)
async def close_all_connections(self) -> None:
"""关闭所有连接"""
async with self._lock:
# 关闭池中的连接
for conn in self._connection_pool:
try:
await conn.close()
except:
pass
self._connection_pool.clear()
# 关闭正在使用的连接
for conn in self._in_use.copy():
try:
await conn.close()
except:
pass
self._in_use.clear()

View File

@ -59,6 +59,9 @@ def get_llm(llm_model: str | None = None):
if llm_model is None:
llm_model = llm_config.default_llm
if llm_model not in llm_config.llms:
raise NotImplementedError("LLM 未配置,该功能无法使用")
if llm_config.default_llm in llm_config.llms:
logger.warning(f"[LLM] 需求的 LLM 不存在,回退到默认模型 REQUIRED={llm_model}")
return llm_config.llms[llm_config.default_llm]
raise NotImplementedError("[LLM] LLM 未配置,该功能无法使用")
return llm_config.llms[llm_model]

View File

@ -1,4 +1,5 @@
from io import BytesIO
from pathlib import Path
from typing import Annotated
import httpx
@ -19,15 +20,21 @@ from PIL import UnidentifiedImageError
from pydantic import BaseModel
from returns.result import Failure, Result, Success
from konabot.common.path import ASSETS_PATH
discordConfig = nonebot.get_plugin_config(DiscordConfig)
class ExtractImageConfig(BaseModel):
module_extract_image_no_download: bool = False
"要不要算了,不下载了,直接爆炸算了,适用于一些比较奇怪的网络环境,无法从协议端下载文件"
"""
要不要算了,不下载了,直接爆炸算了,
适用于一些比较奇怪的网络环境,无法从协议端下载文件
"""
module_extract_image_target: str = './assets/img/other/boom.jpg'
"""
使用哪个图片呢
"""
module_config = nonebot.get_plugin_config(ExtractImageConfig)
@ -37,7 +44,7 @@ async def download_image_bytes(url: str, proxy: str | None = None) -> Result[byt
# if "/matcha/cache/" in url:
# url = url.replace('127.0.0.1', '10.126.126.101')
if module_config.module_extract_image_no_download:
return Success((ASSETS_PATH / "img" / "other" / "boom.jpg").read_bytes())
return Success(Path(module_config.module_extract_image_target).read_bytes())
logger.debug(f"开始从 {url} 下载图片")
async with httpx.AsyncClient(proxy=proxy) as c:
try:
@ -70,15 +77,22 @@ def bytes_to_pil(raw_data: bytes | BytesIO) -> Result[PIL.Image.Image, str]:
return Failure("图像无法读取可能是网络存在问题orz")
async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]:
async def unimsg_img_to_bytes(image: Image) -> Result[bytes, str]:
if image.url is not None:
raw_result = await download_image_bytes(image.url)
elif image.raw is not None:
raw_result = Success(image.raw)
if isinstance(image.raw, bytes):
raw_result = Success(image.raw)
else:
raw_result = Success(image.raw.getvalue())
else:
return Failure("由于一些内部问题下载图片失败了orz")
return raw_result.bind(bytes_to_pil)
return raw_result
async def unimsg_img_to_pil(image: Image) -> Result[PIL.Image.Image, str]:
return (await unimsg_img_to_bytes(image)).bind(bytes_to_pil)
async def extract_image_from_qq_message(
@ -86,7 +100,7 @@ async def extract_image_from_qq_message(
evt: OnebotV11MessageEvent,
bot: OnebotV11Bot,
allow_reply: bool = True,
) -> Result[PIL.Image.Image, str]:
) -> Result[bytes, str]:
if allow_reply and (reply := evt.reply) is not None:
return await extract_image_from_qq_message(
reply.message,
@ -118,18 +132,17 @@ async def extract_image_from_qq_message(
url = seg.data.get("url")
if url is None:
return Failure("无法下载图片,可能有一些网络问题")
data = await download_image_bytes(url)
return data.bind(bytes_to_pil)
return await download_image_bytes(url)
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
async def extract_image_from_message(
async def extract_image_data_from_message(
msg: Message,
evt: Event,
bot: Bot,
allow_reply: bool = True,
) -> Result[PIL.Image.Image, str]:
) -> Result[bytes, str]:
if (
isinstance(bot, OnebotV11Bot)
and isinstance(msg, OnebotV11Message)
@ -145,18 +158,18 @@ async def extract_image_from_message(
if "image/" not in a.content_type:
continue
url = a.proxy_url
return (await download_image_bytes(url, discordConfig.discord_proxy)).bind(bytes_to_pil)
return await download_image_bytes(url, discordConfig.discord_proxy)
for seg in UniMessage.of(msg, bot):
logger.info(seg)
if isinstance(seg, Image):
return await unimsg_img_to_pil(seg)
return await unimsg_img_to_bytes(seg)
elif isinstance(seg, Reply) and allow_reply:
msg2 = seg.msg
logger.debug(f"深入搜索引用的消息:{msg2}")
if msg2 is None or isinstance(msg2, str):
continue
return await extract_image_from_message(msg2, evt, bot, False)
return await extract_image_data_from_message(msg2, evt, bot, False)
elif isinstance(seg, RefNode) and allow_reply:
if isinstance(bot, DiscordBot):
return Failure("暂时不支持在 Discord 中通过引用的方式获取图片")
@ -165,12 +178,12 @@ async def extract_image_from_message(
return Failure("请在消息中包含图片,或者引用一个含有图片的消息")
async def _ext_img(
async def _ext_img_data(
evt: Event,
bot: Bot,
matcher: Matcher,
) -> PIL.Image.Image | None:
match await extract_image_from_message(evt.get_message(), evt, bot):
) -> bytes | None:
match await extract_image_data_from_message(evt.get_message(), evt, bot):
case Success(img):
return img
case Failure(err):
@ -180,4 +193,20 @@ async def _ext_img(
assert False
PIL_Image = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]
async def _ext_img(
evt: Event,
bot: Bot,
matcher: Matcher,
) -> PIL.Image.Image | None:
r = await _ext_img_data(evt, bot, matcher)
if r:
match bytes_to_pil(r):
case Success(img):
return img
case Failure(msg):
await matcher.send(await UniMessage.text(msg).export())
return None
DepImageBytes = Annotated[bytes, nonebot.params.Depends(_ext_img_data)]
DepPILImage = Annotated[PIL.Image.Image, nonebot.params.Depends(_ext_img)]

76
konabot/common/pager.py Normal file
View File

@ -0,0 +1,76 @@
from dataclasses import dataclass
from math import ceil
from typing import Any, Callable
from nonebot_plugin_alconna import UniMessage
@dataclass
class PagerQuery:
page_index: int
page_size: int
def apply[T](self, ls: list[T]) -> "PagerResult[T]":
if self.page_size <= 0:
return PagerResult(
success=False,
message="每页元素数量应该大于 0",
data=[],
page_count=-1,
query=self,
)
page_count = ceil(len(ls) / self.page_size)
if self.page_index <= 0 or self.page_size <= 0:
return PagerResult(
success=False,
message="页数必须大于 0",
data=[],
page_count=page_count,
query=self,
)
data = ls[(self.page_index - 1) * self.page_size: self.page_index * self.page_size]
if len(data) > 0:
return PagerResult(
success=True,
message="",
data=data,
page_count=page_count,
query=self,
)
return PagerResult(
success=False,
message="指定的页数超过最大页数",
data=data,
page_count=page_count,
query=self,
)
@dataclass
class PagerResult[T]:
data: list[T]
success: bool
message: str
page_count: int
query: PagerQuery
def to_unimessage(
self,
formatter: Callable[[T], str | UniMessage[Any]] = str,
title: str = '查询结果',
list_indicator: str = '- ',
) -> UniMessage[Any]:
msg = UniMessage.text(f'===== {title} =====\n\n')
if not self.success:
msg = msg.text(f'⚠️ {self.message}\n')
else:
for obj in self.data:
msg = msg.text(list_indicator)
msg += formatter(obj)
msg += '\n'
msg = msg.text(f'\n===== 第 {self.query.page_index} 页,共 {self.page_count} 页 =====')
return msg

View File

@ -0,0 +1,3 @@
# 已废弃
坏枪用简单的 LLM + 提示词工程,完成了这 200 块的 `qwen3-coder-plus` 都搞不定的 nb 功能

View File

@ -0,0 +1,58 @@
"""
Professional time parsing module for Chinese and English time expressions.
This module provides a robust parser for natural language time expressions,
supporting both Chinese and English formats with proper whitespace handling.
"""
import datetime
from typing import Optional
from .expression import TimeExpression
def parse(text: str, now: Optional[datetime.datetime] = None) -> datetime.datetime:
"""
Parse a time expression and return a datetime object.
Args:
text: The time expression to parse
now: The reference time (defaults to current time)
Returns:
A datetime object representing the parsed time
Raises:
TokenUnhandledException: If the input cannot be parsed
"""
return TimeExpression.parse(text, now)
class Parser:
"""
Parser for time expressions with backward compatibility.
Maintains the original interface:
>>> parser = Parser()
>>> result = parser.parse("10分钟后")
"""
def __init__(self, now: Optional[datetime.datetime] = None):
self.now = now or datetime.datetime.now()
def parse(self, text: str) -> datetime.datetime:
"""
Parse a time expression and return a datetime object.
This maintains backward compatibility with the original interface.
Args:
text: The time expression to parse
Returns:
A datetime object representing the parsed time
Raises:
TokenUnhandledException: If the input cannot be parsed
"""
return TimeExpression.parse(text, self.now)

View File

@ -0,0 +1,133 @@
"""
Chinese number parser for the time expression parser.
"""
import re
from typing import Tuple
class ChineseNumberParser:
"""Parser for Chinese numbers."""
def __init__(self):
self.digits = {"": 0, "": 1, "": 2, "": 3, "": 4,
"": 5, "": 6, "": 7, "": 8, "": 9}
self.units = {"": 10, "": 100, "": 1000, "": 10000, "亿": 100000000}
def digest(self, text: str) -> Tuple[str, int]:
"""
Parse a Chinese number from the beginning of text and return the rest and the parsed number.
Args:
text: Text that may start with a Chinese number
Returns:
Tuple of (remaining_text, parsed_number)
"""
if not text:
return text, 0
# Handle "两" at start
if text.startswith(""):
# Check if "两" is followed by a time unit
# Look ahead to see if we have a valid pattern like "两小时", "两分钟", etc.
if len(text) >= 2:
# Check for time units that start with the second character
time_units = ["小时", "分钟", ""]
for unit in time_units:
if text[1:].startswith(unit):
# Return the text starting from the time unit, not after it
# The parser will handle the time unit in the next step
return text[1:], 2
# Check for single character time units
next_char = text[1]
if next_char in "时分秒":
return text[1:], 2
# Check for Chinese number units
if next_char in "十百千万亿":
# This will be handled by the normal parsing below
pass
# If "两" is at the end of string, treat it as standalone
elif len(text) == 1:
return "", 2
# Also accept "两" followed by whitespace and then time units
elif next_char.isspace():
# Check if after whitespace we have time units
rest_after_space = text[2:].lstrip()
for unit in time_units:
if rest_after_space.startswith(unit):
# Return the text starting from the time unit
space_len = len(text[2:]) - len(rest_after_space)
return text[2+space_len:], 2
# Check single character time units after whitespace
if rest_after_space and rest_after_space[0] in "时分秒":
return text[2:], 2
else:
# Just "两" by itself
return "", 2
s = "零一二三四五六七八九"
i = 0
while i < len(text) and text[i] in s + "十百千万亿":
i += 1
if i == 0:
return text, 0
num_str = text[:i]
rest = text[i:]
return rest, self.parse(num_str)
def parse(self, text: str) -> int:
"""
Parse a Chinese number string and return its integer value.
Args:
text: Chinese number string
Returns:
Integer value of the Chinese number
"""
if not text:
return 0
if text == "":
return 0
if text == "":
return 2
# Handle special case for "十"
if text == "":
return 10
# Handle numbers with "亿"
if "亿" in text:
parts = text.split("亿", 1)
a, b = parts[0], parts[1]
return self.parse(a) * 100000000 + self.parse(b)
# Handle numbers with "万"
if "" in text:
parts = text.split("", 1)
a, b = parts[0], parts[1]
return self.parse(a) * 10000 + self.parse(b)
# Handle remaining numbers
result = 0
temp = 0
for char in text:
if char == "":
continue
elif char == "":
temp = 2
elif char in self.digits:
temp = self.digits[char]
elif char in self.units:
unit = self.units[char]
if unit == 10 and temp == 0:
# Special case for numbers like "十三"
temp = 1
result += temp * unit
temp = 0
result += temp
return result

View File

@ -0,0 +1,11 @@
class PTimeParseException(Exception):
...
class TokenUnhandledException(PTimeParseException):
...
class MultipleSpecificationException(PTimeParseException):
...
class OutOfRangeSpecificationException(PTimeParseException):
...

View File

@ -0,0 +1,63 @@
"""
Main time expression parser class that integrates all components.
"""
import datetime
from typing import Optional
from .lexer import Lexer
from .parser import Parser
from .semantic import SemanticAnalyzer
from .ptime_ast import TimeExpressionNode
from .err import TokenUnhandledException
class TimeExpression:
"""Main class for parsing time expressions."""
def __init__(self, text: str, now: Optional[datetime.datetime] = None):
self.text = text.strip()
self.now = now or datetime.datetime.now()
if not self.text:
raise TokenUnhandledException("Empty input")
# Initialize components
self.lexer = Lexer(self.text, self.now)
self.parser = Parser(self.text, self.now)
self.semantic_analyzer = SemanticAnalyzer(self.now)
# Parse the expression
self.ast = self._parse()
def _parse(self) -> TimeExpressionNode:
"""Parse the time expression and return the AST."""
try:
return self.parser.parse()
except Exception as e:
raise TokenUnhandledException(f"Failed to parse '{self.text}': {str(e)}")
def evaluate(self) -> datetime.datetime:
"""Evaluate the time expression and return the datetime."""
try:
return self.semantic_analyzer.evaluate(self.ast)
except Exception as e:
raise TokenUnhandledException(f"Failed to evaluate '{self.text}': {str(e)}")
@classmethod
def parse(cls, text: str, now: Optional[datetime.datetime] = None) -> datetime.datetime:
"""
Parse a time expression and return a datetime object.
Args:
text: The time expression to parse
now: The reference time (defaults to current time)
Returns:
A datetime object representing the parsed time
Raises:
TokenUnhandledException: If the input cannot be parsed
"""
expression = cls(text, now)
return expression.evaluate()

View File

@ -0,0 +1,225 @@
"""
Lexical analyzer for time expressions.
"""
import re
from typing import Iterator, Optional
import datetime
from .ptime_token import Token, TokenType
from .chinese_number import ChineseNumberParser
class Lexer:
"""Lexical analyzer for time expressions."""
def __init__(self, text: str, now: Optional[datetime.datetime] = None):
self.text = text
self.pos = 0
self.current_char = self.text[self.pos] if self.text else None
self.now = now or datetime.datetime.now()
self.chinese_parser = ChineseNumberParser()
# Define token patterns
self.token_patterns = [
# Whitespace
(r'^\s+', TokenType.WHITESPACE),
# Time separators
(r'^:', TokenType.TIME_SEPARATOR),
(r'^点', TokenType.TIME_SEPARATOR),
(r'^时', TokenType.TIME_SEPARATOR),
(r'^分', TokenType.TIME_SEPARATOR),
(r'^秒', TokenType.TIME_SEPARATOR),
# Special time markers
(r'^半', TokenType.HALF),
(r'^一刻', TokenType.QUARTER),
(r'^整', TokenType.ZHENG),
(r'^钟', TokenType.ZHONG),
# Period indicators (must come before relative time patterns to avoid conflicts)
(r'^(上午|早晨|早上|清晨|早(?!\d))', TokenType.PERIOD_AM),
(r'^(中午|下午|晚上|晚(?!\d)|凌晨|午夜)', TokenType.PERIOD_PM),
# Week scope (more specific patterns first)
(r'^本周', TokenType.WEEK_SCOPE_CURRENT),
(r'^上周', TokenType.WEEK_SCOPE_LAST),
(r'^下周', TokenType.WEEK_SCOPE_NEXT),
# Relative directions
(r'^(后|以后|之后)', TokenType.RELATIVE_DIRECTION_FORWARD),
(r'^(前|以前|之前)', TokenType.RELATIVE_DIRECTION_BACKWARD),
# Extended relative time
(r'^明年', TokenType.RELATIVE_NEXT),
(r'^去年', TokenType.RELATIVE_LAST),
(r'^今年', TokenType.RELATIVE_THIS),
(r'^下(?![午年月周])', TokenType.RELATIVE_NEXT),
(r'^(上|去)(?![午年月周])', TokenType.RELATIVE_LAST),
(r'^这', TokenType.RELATIVE_THIS),
(r'^本(?![周月年])', TokenType.RELATIVE_THIS), # Match "本" but not "本周", "本月", "本年"
# Week scope (fallback for standalone terms)
(r'^本', TokenType.WEEK_SCOPE_CURRENT),
(r'^上', TokenType.WEEK_SCOPE_LAST),
(r'^下(?![午年月周])', TokenType.WEEK_SCOPE_NEXT),
# Week days (order matters - longer patterns first)
(r'^周一', TokenType.WEEKDAY_MONDAY),
(r'^周二', TokenType.WEEKDAY_TUESDAY),
(r'^周三', TokenType.WEEKDAY_WEDNESDAY),
(r'^周四', TokenType.WEEKDAY_THURSDAY),
(r'^周五', TokenType.WEEKDAY_FRIDAY),
(r'^周六', TokenType.WEEKDAY_SATURDAY),
(r'^周日', TokenType.WEEKDAY_SUNDAY),
# Single character weekdays should be matched after numbers
# (r'^一', TokenType.WEEKDAY_MONDAY),
# (r'^二', TokenType.WEEKDAY_TUESDAY),
# (r'^三', TokenType.WEEKDAY_WEDNESDAY),
# (r'^四', TokenType.WEEKDAY_THURSDAY),
# (r'^五', TokenType.WEEKDAY_FRIDAY),
# (r'^六', TokenType.WEEKDAY_SATURDAY),
# (r'^日', TokenType.WEEKDAY_SUNDAY),
# Student-friendly time expressions
(r'^早(?=\d)', TokenType.EARLY_MORNING),
(r'^晚(?=\d)', TokenType.LATE_NIGHT),
# Relative today variants
(r'^今晚上', TokenType.RELATIVE_TODAY),
(r'^今晚', TokenType.RELATIVE_TODAY),
(r'^今早', TokenType.RELATIVE_TODAY),
(r'^今天早上', TokenType.RELATIVE_TODAY),
(r'^今天早晨', TokenType.RELATIVE_TODAY),
(r'^今天上午', TokenType.RELATIVE_TODAY),
(r'^今天下午', TokenType.RELATIVE_TODAY),
(r'^今天晚上', TokenType.RELATIVE_TODAY),
(r'^今天', TokenType.RELATIVE_TODAY),
# Relative days
(r'^明天', TokenType.RELATIVE_TOMORROW),
(r'^后天', TokenType.RELATIVE_DAY_AFTER_TOMORROW),
(r'^大后天', TokenType.RELATIVE_THREE_DAYS_AFTER_TOMORROW),
(r'^昨天', TokenType.RELATIVE_YESTERDAY),
(r'^前天', TokenType.RELATIVE_DAY_BEFORE_YESTERDAY),
(r'^大前天', TokenType.RELATIVE_THREE_DAYS_BEFORE_YESTERDAY),
# Digits
(r'^\d+', TokenType.INTEGER),
# Time units (must come after date separators to avoid conflicts)
(r'^年(?![月日号])', TokenType.YEAR),
(r'^月(?![日号])', TokenType.MONTH),
(r'^[日号](?![月年])', TokenType.DAY),
(r'^天', TokenType.DAY),
(r'^周', TokenType.WEEK),
(r'^小时', TokenType.HOUR),
(r'^分钟', TokenType.MINUTE),
(r'^秒', TokenType.SECOND),
# Date separators (fallback patterns)
(r'^年', TokenType.DATE_SEPARATOR),
(r'^月', TokenType.DATE_SEPARATOR),
(r'^[日号]', TokenType.DATE_SEPARATOR),
(r'^[-/]', TokenType.DATE_SEPARATOR),
]
def advance(self):
"""Advance the position pointer and set the current character."""
self.pos += 1
if self.pos >= len(self.text):
self.current_char = None
else:
self.current_char = self.text[self.pos]
def skip_whitespace(self):
"""Skip whitespace characters."""
while self.current_char is not None and self.current_char.isspace():
self.advance()
def integer(self) -> int:
"""Parse an integer from the input."""
result = ''
while self.current_char is not None and self.current_char.isdigit():
result += self.current_char
self.advance()
return int(result)
def chinese_number(self) -> int:
"""Parse a Chinese number from the input."""
# Find the longest prefix that can be parsed as a Chinese number
for i in range(len(self.text) - self.pos, 0, -1):
prefix = self.text[self.pos:self.pos + i]
try:
# Use digest to get both the remaining text and the parsed value
remaining, value = self.chinese_parser.digest(prefix)
# Check if we actually consumed part of the prefix
consumed_length = len(prefix) - len(remaining)
if consumed_length > 0:
# Advance position by the length of the consumed text
for _ in range(consumed_length):
self.advance()
return value
except ValueError:
continue
# If no Chinese number found, just return 0
return 0
def get_next_token(self) -> Token:
"""Lexical analyzer that breaks the sentence into tokens."""
while self.current_char is not None:
# Skip whitespace
if self.current_char.isspace():
self.skip_whitespace()
continue
# Try to match each pattern
text_remaining = self.text[self.pos:]
for pattern, token_type in self.token_patterns:
match = re.match(pattern, text_remaining)
if match:
value = match.group(0)
position = self.pos
# Advance position
for _ in range(len(value)):
self.advance()
# Special handling for some tokens
if token_type == TokenType.INTEGER:
value = int(value)
elif token_type == TokenType.RELATIVE_TODAY and value in [
"今早上", "今天早上", "今天早晨", "今天上午"
]:
token_type = TokenType.PERIOD_AM
elif token_type == TokenType.RELATIVE_TODAY and value in [
"今晚上", "今天下午", "今天晚上"
]:
token_type = TokenType.PERIOD_PM
return Token(token_type, value, position)
# Try to parse Chinese numbers
chinese_start_pos = self.pos
try:
chinese_value = self.chinese_number()
if chinese_value > 0:
# We successfully parsed a Chinese number
return Token(TokenType.CHINESE_NUMBER, chinese_value, chinese_start_pos)
except ValueError:
pass
# If no pattern matches, skip the character and continue
self.advance()
# End of file
return Token(TokenType.EOF, None, self.pos)
def tokenize(self) -> Iterator[Token]:
"""Generate all tokens from the input."""
while True:
token = self.get_next_token()
yield token
if token.type == TokenType.EOF:
break

View File

@ -0,0 +1,846 @@
"""
Parser for time expressions that builds an Abstract Syntax Tree (AST).
"""
from typing import Iterator, Optional, List
import datetime
from .ptime_token import Token, TokenType
from .ptime_ast import (
ASTNode, NumberNode, DateNode, TimeNode,
RelativeDateNode, RelativeTimeNode, WeekdayNode, TimeExpressionNode
)
from .lexer import Lexer
class ParserError(Exception):
"""Exception raised for parser errors."""
pass
class Parser:
"""Parser for time expressions that builds an AST."""
def __init__(self, text: str, now: Optional[datetime.datetime] = None):
self.lexer = Lexer(text, now)
self.tokens: List[Token] = list(self.lexer.tokenize())
self.pos = 0
self.now = now or datetime.datetime.now()
@property
def current_token(self) -> Token:
"""Get the current token."""
if self.pos < len(self.tokens):
return self.tokens[self.pos]
return Token(TokenType.EOF, None, len(self.tokens))
def eat(self, token_type: TokenType) -> Token:
"""Consume a token of the expected type."""
if self.current_token.type == token_type:
token = self.current_token
self.pos += 1
return token
else:
raise ParserError(
f"Expected token {token_type}, got {self.current_token.type} "
f"at position {self.current_token.position}"
)
def peek(self, offset: int = 1) -> Token:
"""Look ahead at the next token without consuming it."""
next_pos = self.pos + offset
if next_pos < len(self.tokens):
return self.tokens[next_pos]
return Token(TokenType.EOF, None, len(self.tokens))
def parse_number(self) -> NumberNode:
"""Parse a number (integer or Chinese number)."""
token = self.current_token
if token.type == TokenType.INTEGER:
self.eat(TokenType.INTEGER)
return NumberNode(value=token.value)
elif token.type == TokenType.CHINESE_NUMBER:
self.eat(TokenType.CHINESE_NUMBER)
return NumberNode(value=token.value)
else:
raise ParserError(
f"Expected number, got {token.type} at position {token.position}"
)
def parse_date(self) -> DateNode:
"""Parse a date specification."""
year_node = None
month_node = None
day_node = None
# Try YYYY-MM-DD or YYYY/MM/DD format
if (self.current_token.type == TokenType.INTEGER and
self.peek().type == TokenType.DATE_SEPARATOR and
self.peek().value in ['-', '/'] and
self.peek(2).type == TokenType.INTEGER and
self.peek(3).type == TokenType.DATE_SEPARATOR and
self.peek(3).value in ['-', '/'] and
self.peek(4).type == TokenType.INTEGER):
year_token = self.current_token
self.eat(TokenType.INTEGER)
separator1 = self.eat(TokenType.DATE_SEPARATOR).value
month_token = self.current_token
self.eat(TokenType.INTEGER)
separator2 = self.eat(TokenType.DATE_SEPARATOR).value
day_token = self.current_token
self.eat(TokenType.INTEGER)
year_node = NumberNode(value=year_token.value)
month_node = NumberNode(value=month_token.value)
day_node = NumberNode(value=day_token.value)
return DateNode(year=year_node, month=month_node, day=day_node)
# Try YYYY年MM月DD[日号] format
if (self.current_token.type == TokenType.INTEGER and
self.peek().type in [TokenType.DATE_SEPARATOR, TokenType.YEAR] and
self.peek(2).type == TokenType.INTEGER and
self.peek(3).type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and
self.peek(4).type == TokenType.INTEGER):
year_token = self.current_token
self.eat(TokenType.INTEGER)
self.eat(self.current_token.type) # 年 (could be DATE_SEPARATOR or YEAR)
month_token = self.current_token
self.eat(TokenType.INTEGER)
self.eat(self.current_token.type) # 月 (could be DATE_SEPARATOR or MONTH)
day_token = self.current_token
self.eat(TokenType.INTEGER)
# Optional 日 or 号
if self.current_token.type in [TokenType.DATE_SEPARATOR, TokenType.DAY]:
self.eat(self.current_token.type)
year_node = NumberNode(value=year_token.value)
month_node = NumberNode(value=month_token.value)
day_node = NumberNode(value=day_token.value)
return DateNode(year=year_node, month=month_node, day=day_node)
# Try MM月DD[日号] format (without year)
if (self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and
self.peek().type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and
self.peek().value == '' and
self.peek(2).type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]):
month_token = self.current_token
self.eat(month_token.type)
self.eat(self.current_token.type) # 月 (could be DATE_SEPARATOR or MONTH)
day_token = self.current_token
self.eat(day_token.type)
# Optional 日 or 号
if self.current_token.type in [TokenType.DATE_SEPARATOR, TokenType.DAY]:
self.eat(self.current_token.type)
month_node = NumberNode(value=month_token.value)
day_node = NumberNode(value=day_token.value)
return DateNode(year=None, month=month_node, day=day_node)
# Try Chinese MM月DD[日号] format
if (self.current_token.type == TokenType.CHINESE_NUMBER and
self.peek().type == TokenType.DATE_SEPARATOR and
self.peek().value == '' and
self.peek(2).type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]):
month_token = self.current_token
self.eat(TokenType.CHINESE_NUMBER)
self.eat(TokenType.DATE_SEPARATOR) # 月
day_token = self.current_token
self.eat(day_token.type)
# Optional 日 or 号
if self.current_token.type == TokenType.DATE_SEPARATOR:
self.eat(TokenType.DATE_SEPARATOR)
month_node = NumberNode(value=month_token.value)
day_node = NumberNode(value=day_token.value)
return DateNode(year=None, month=month_node, day=day_node)
raise ParserError(
f"Unable to parse date at position {self.current_token.position}"
)
def parse_time(self) -> TimeNode:
"""Parse a time specification."""
hour_node = None
minute_node = None
second_node = None
is_24hour = False
period = None
# Try HH:MM format
if (self.current_token.type == TokenType.INTEGER and
self.peek().type == TokenType.TIME_SEPARATOR and
self.peek().value == ':'):
hour_token = self.current_token
self.eat(TokenType.INTEGER)
self.eat(TokenType.TIME_SEPARATOR) # :
minute_token = self.current_token
self.eat(TokenType.INTEGER)
hour_node = NumberNode(value=hour_token.value)
minute_node = NumberNode(value=minute_token.value)
is_24hour = True # HH:MM is always interpreted as 24-hour
# Optional :SS
if (self.current_token.type == TokenType.TIME_SEPARATOR and
self.peek().type == TokenType.INTEGER):
self.eat(TokenType.TIME_SEPARATOR) # :
second_token = self.current_token
self.eat(TokenType.INTEGER)
second_node = NumberNode(value=second_token.value)
return TimeNode(
hour=hour_node,
minute=minute_node,
second=second_node,
is_24hour=is_24hour,
period=period
)
# Try Chinese time format (X点X分)
# First check for period indicators
period = None
if self.current_token.type in [TokenType.PERIOD_AM, TokenType.PERIOD_PM]:
if self.current_token.type == TokenType.PERIOD_AM:
period = "AM"
else:
period = "PM"
self.eat(self.current_token.type)
if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER, TokenType.EARLY_MORNING, TokenType.LATE_NIGHT]:
if self.current_token.type == TokenType.EARLY_MORNING:
self.eat(TokenType.EARLY_MORNING)
is_24hour = True
period = "AM"
# Expect a number next
if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]:
hour_token = self.current_token
self.eat(hour_token.type)
hour_node = NumberNode(value=hour_token.value)
# "早八" should be interpreted as 08:00
# If hour is greater than 12, treat as 24-hour
if hour_node.value > 12:
is_24hour = True
period = None
else:
raise ParserError(
f"Expected number after '', got {self.current_token.type} "
f"at position {self.current_token.position}"
)
elif self.current_token.type == TokenType.LATE_NIGHT:
self.eat(TokenType.LATE_NIGHT)
is_24hour = True
period = "PM"
# Expect a number next
if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]:
hour_token = self.current_token
self.eat(hour_token.type)
hour_node = NumberNode(value=hour_token.value)
# "晚十" should be interpreted as 22:00
# Adjust hour to 24-hour format
if hour_node.value <= 12:
hour_node.value += 12
is_24hour = True
period = None
else:
raise ParserError(
f"Expected number after '', got {self.current_token.type} "
f"at position {self.current_token.position}"
)
else:
# Regular time parsing
hour_token = self.current_token
self.eat(hour_token.type)
# Check for 点 or 时
if self.current_token.type == TokenType.TIME_SEPARATOR:
separator = self.current_token.value
self.eat(TokenType.TIME_SEPARATOR)
if separator == '':
is_24hour = False
elif separator == '':
is_24hour = True
hour_node = NumberNode(value=hour_token.value)
# Optional minutes
if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]:
minute_token = self.current_token
self.eat(minute_token.type)
# Optional 分
if self.current_token.type == TokenType.TIME_SEPARATOR and \
self.current_token.value == '':
self.eat(TokenType.TIME_SEPARATOR)
minute_node = NumberNode(value=minute_token.value)
# Handle special markers
if self.current_token.type == TokenType.HALF:
self.eat(TokenType.HALF)
minute_node = NumberNode(value=30)
elif self.current_token.type == TokenType.QUARTER:
self.eat(TokenType.QUARTER)
minute_node = NumberNode(value=15)
elif self.current_token.type == TokenType.ZHENG:
self.eat(TokenType.ZHENG)
if minute_node is None:
minute_node = NumberNode(value=0)
# Optional 钟
if self.current_token.type == TokenType.ZHONG:
self.eat(TokenType.ZHONG)
else:
# If no separator, treat as hour-only time (like "三点")
hour_node = NumberNode(value=hour_token.value)
is_24hour = False
return TimeNode(
hour=hour_node,
minute=minute_node,
second=second_node,
is_24hour=is_24hour,
period=period
)
raise ParserError(
f"Unable to parse time at position {self.current_token.position}"
)
def parse_relative_date(self) -> RelativeDateNode:
"""Parse a relative date specification."""
years = 0
months = 0
weeks = 0
days = 0
# Handle today variants
if self.current_token.type == TokenType.RELATIVE_TODAY:
self.eat(TokenType.RELATIVE_TODAY)
days = 0
elif self.current_token.type == TokenType.RELATIVE_TOMORROW:
self.eat(TokenType.RELATIVE_TOMORROW)
days = 1
elif self.current_token.type == TokenType.RELATIVE_DAY_AFTER_TOMORROW:
self.eat(TokenType.RELATIVE_DAY_AFTER_TOMORROW)
days = 2
elif self.current_token.type == TokenType.RELATIVE_THREE_DAYS_AFTER_TOMORROW:
self.eat(TokenType.RELATIVE_THREE_DAYS_AFTER_TOMORROW)
days = 3
elif self.current_token.type == TokenType.RELATIVE_YESTERDAY:
self.eat(TokenType.RELATIVE_YESTERDAY)
days = -1
elif self.current_token.type == TokenType.RELATIVE_DAY_BEFORE_YESTERDAY:
self.eat(TokenType.RELATIVE_DAY_BEFORE_YESTERDAY)
days = -2
elif self.current_token.type == TokenType.RELATIVE_THREE_DAYS_BEFORE_YESTERDAY:
self.eat(TokenType.RELATIVE_THREE_DAYS_BEFORE_YESTERDAY)
days = -3
else:
# Check if this looks like an absolute date pattern before processing
# Look ahead to see if this matches absolute date patterns
is_likely_absolute_date = False
# Check for MM月DD[日号] patterns (like "6月20日")
if (self.pos + 2 < len(self.tokens) and
self.tokens[self.pos].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and
self.tokens[self.pos + 1].type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and
self.tokens[self.pos + 1].value == '' and
self.tokens[self.pos + 2].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]):
is_likely_absolute_date = True
if is_likely_absolute_date:
# This looks like an absolute date, skip relative date parsing
raise ParserError("Looks like absolute date format")
# Try to parse extended relative time expressions
# Handle patterns like "明年", "去年", "下个月", "上个月", etc.
original_pos = self.pos
try:
# Check for "今年", "明年", "去年"
if self.current_token.type == TokenType.RELATIVE_THIS and self.peek().type == TokenType.YEAR:
self.eat(TokenType.RELATIVE_THIS)
self.eat(TokenType.YEAR)
years = 0 # Current year
elif self.current_token.type == TokenType.RELATIVE_NEXT and self.peek().type == TokenType.YEAR:
self.eat(TokenType.RELATIVE_NEXT)
self.eat(TokenType.YEAR)
years = 1 # Next year
elif self.current_token.type == TokenType.RELATIVE_LAST and self.peek().type == TokenType.YEAR:
self.eat(TokenType.RELATIVE_LAST)
self.eat(TokenType.YEAR)
years = -1 # Last year
elif self.current_token.type == TokenType.RELATIVE_NEXT and self.current_token.value == "明年":
self.eat(TokenType.RELATIVE_NEXT)
years = 1 # Next year
# Check if there's a month after "明年"
if (self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and
self.peek().type == TokenType.MONTH):
# Parse the month
month_node = self.parse_number()
self.eat(TokenType.MONTH) # Eat the "月" token
# Store the month in the months field as a special marker
# We'll handle this in semantic analysis
months = month_node.value - 100 # Use negative offset to indicate absolute month
elif self.current_token.type == TokenType.RELATIVE_LAST and self.current_token.value == "去年":
self.eat(TokenType.RELATIVE_LAST)
years = -1 # Last year
elif self.current_token.type == TokenType.RELATIVE_THIS and self.current_token.value == "今年":
self.eat(TokenType.RELATIVE_THIS)
years = 0 # Current year
# Check for "这个月", "下个月", "上个月"
elif self.current_token.type == TokenType.RELATIVE_THIS and self.peek().type == TokenType.MONTH:
self.eat(TokenType.RELATIVE_THIS)
self.eat(TokenType.MONTH)
months = 0 # Current month
elif self.current_token.type == TokenType.RELATIVE_NEXT and self.peek().type == TokenType.MONTH:
self.eat(TokenType.RELATIVE_NEXT)
self.eat(TokenType.MONTH)
months = 1 # Next month
# Handle patterns like "下个月五号"
if (self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and
self.peek().type == TokenType.DAY):
# Parse the day
day_node = self.parse_number()
self.eat(TokenType.DAY) # Eat the "号" token
# Instead of adding days to the current date, we should set a specific day in the target month
# We'll handle this in semantic analysis by setting a flag or special value
days = 0 # Reset days - we'll handle the day differently
# Use a special marker to indicate we want a specific day in the target month
# For now, we'll just store the target day in the weeks field as a temporary solution
weeks = day_node.value # This is a hack - we'll fix this in semantic analysis
elif self.current_token.type == TokenType.RELATIVE_LAST and self.peek().type == TokenType.MONTH:
self.eat(TokenType.RELATIVE_LAST)
self.eat(TokenType.MONTH)
months = -1 # Last month
# Check for "下周", "上周"
elif self.current_token.type == TokenType.RELATIVE_NEXT and self.peek().type == TokenType.WEEK:
self.eat(TokenType.RELATIVE_NEXT)
self.eat(TokenType.WEEK)
weeks = 1 # Next week
elif self.current_token.type == TokenType.RELATIVE_LAST and self.peek().type == TokenType.WEEK:
self.eat(TokenType.RELATIVE_LAST)
self.eat(TokenType.WEEK)
weeks = -1 # Last week
# Handle more complex patterns like "X年后", "X个月后", etc.
elif self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]:
# Check if this is likely an absolute date format (e.g., "2025年11月21日")
# If the next token after the number is a date separator or date unit,
# and the number looks like a year (4 digits) or the pattern continues,
# it might be an absolute date. In that case, skip relative date parsing.
# Look ahead to see if this matches absolute date patterns
lookahead_pos = self.pos
is_likely_absolute_date = False
# Check for YYYY-MM-DD or YYYY/MM/DD patterns
if (lookahead_pos + 4 < len(self.tokens) and
self.tokens[lookahead_pos].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and
self.tokens[lookahead_pos + 1].type in [TokenType.DATE_SEPARATOR, TokenType.YEAR] and
self.tokens[lookahead_pos + 1].value in ['-', '/', ''] and
self.tokens[lookahead_pos + 2].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and
self.tokens[lookahead_pos + 3].type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and
self.tokens[lookahead_pos + 3].value in ['-', '/', '']):
is_likely_absolute_date = True
# Check for YYYY年MM月DD patterns
if (lookahead_pos + 4 < len(self.tokens) and
self.tokens[lookahead_pos].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and
self.tokens[lookahead_pos + 1].type in [TokenType.DATE_SEPARATOR, TokenType.YEAR] and
self.tokens[lookahead_pos + 1].value == '' and
self.tokens[lookahead_pos + 2].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and
self.tokens[lookahead_pos + 3].type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and
self.tokens[lookahead_pos + 3].value == ''):
is_likely_absolute_date = True
# Check for MM月DD[日号] patterns (like "6月20日")
if (self.pos + 2 < len(self.tokens) and
self.tokens[self.pos].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER] and
self.tokens[self.pos + 1].type in [TokenType.DATE_SEPARATOR, TokenType.MONTH] and
self.tokens[self.pos + 1].value == '' and
self.tokens[self.pos + 2].type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]):
is_likely_absolute_date = True
if is_likely_absolute_date:
# This looks like an absolute date, skip relative date parsing
raise ParserError("Looks like absolute date format")
print(f"DEBUG: Parsing complex relative date pattern")
# Parse the number
number_node = self.parse_number()
number_value = number_node.value
print(f"DEBUG: Parsed number: {number_value}")
# Check the unit
if self.current_token.type == TokenType.YEAR:
self.eat(TokenType.YEAR)
years = number_value
print(f"DEBUG: Set years to {years}")
elif self.current_token.type == TokenType.MONTH:
self.eat(TokenType.MONTH)
months = number_value
print(f"DEBUG: Set months to {months}")
elif self.current_token.type == TokenType.WEEK:
self.eat(TokenType.WEEK)
weeks = number_value
print(f"DEBUG: Set weeks to {weeks}")
elif self.current_token.type == TokenType.DAY:
self.eat(TokenType.DAY)
days = number_value
print(f"DEBUG: Set days to {days}")
else:
print(f"DEBUG: Unexpected token type: {self.current_token.type}")
raise ParserError(
f"Expected time unit, got {self.current_token.type} "
f"at position {self.current_token.position}"
)
# Check direction (前/后)
if self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD:
self.eat(TokenType.RELATIVE_DIRECTION_FORWARD)
print(f"DEBUG: Forward direction, values are already positive")
# Values are already positive
elif self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD:
self.eat(TokenType.RELATIVE_DIRECTION_BACKWARD)
print(f"DEBUG: Backward direction, negating values")
years = -years
months = -months
weeks = -weeks
days = -days
except ParserError:
# Reset position if parsing failed
self.pos = original_pos
raise ParserError(
f"Expected relative date, got {self.current_token.type} "
f"at position {self.current_token.position}"
)
return RelativeDateNode(years=years, months=months, weeks=weeks, days=days)
def parse_weekday(self) -> WeekdayNode:
"""Parse a weekday specification."""
# Parse week scope (本, 上, 下)
scope = "current"
if self.current_token.type == TokenType.WEEK_SCOPE_CURRENT:
self.eat(TokenType.WEEK_SCOPE_CURRENT)
scope = "current"
elif self.current_token.type == TokenType.WEEK_SCOPE_LAST:
self.eat(TokenType.WEEK_SCOPE_LAST)
scope = "last"
elif self.current_token.type == TokenType.WEEK_SCOPE_NEXT:
self.eat(TokenType.WEEK_SCOPE_NEXT)
scope = "next"
# Parse weekday
weekday_map = {
TokenType.WEEKDAY_MONDAY: 0,
TokenType.WEEKDAY_TUESDAY: 1,
TokenType.WEEKDAY_WEDNESDAY: 2,
TokenType.WEEKDAY_THURSDAY: 3,
TokenType.WEEKDAY_FRIDAY: 4,
TokenType.WEEKDAY_SATURDAY: 5,
TokenType.WEEKDAY_SUNDAY: 6,
# Handle Chinese numbers (1=Monday, 2=Tuesday, etc.)
TokenType.CHINESE_NUMBER: lambda x: x - 1 if 1 <= x <= 7 else None,
}
if self.current_token.type in weekday_map:
if self.current_token.type == TokenType.CHINESE_NUMBER:
# Handle numeric weekday (1=Monday, 2=Tuesday, etc.)
weekday_num = self.current_token.value
if 1 <= weekday_num <= 7:
weekday = weekday_num - 1 # Convert to 0-based index
self.eat(TokenType.CHINESE_NUMBER)
return WeekdayNode(weekday=weekday, scope=scope)
else:
raise ParserError(
f"Invalid weekday number: {weekday_num} "
f"at position {self.current_token.position}"
)
else:
weekday = weekday_map[self.current_token.type]
self.eat(self.current_token.type)
return WeekdayNode(weekday=weekday, scope=scope)
raise ParserError(
f"Expected weekday, got {self.current_token.type} "
f"at position {self.current_token.position}"
)
def parse_relative_time(self) -> RelativeTimeNode:
"""Parse a relative time specification."""
hours = 0.0
minutes = 0.0
seconds = 0.0
def parse_relative_time(self) -> RelativeTimeNode:
"""Parse a relative time specification."""
hours = 0.0
minutes = 0.0
seconds = 0.0
# Parse sequences of relative time expressions
while self.current_token.type in [
TokenType.INTEGER, TokenType.CHINESE_NUMBER,
TokenType.HALF, TokenType.QUARTER
] or (self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD or
self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD):
# Handle 半小时
if (self.current_token.type == TokenType.HALF):
self.eat(TokenType.HALF)
# Optional 个
if (self.current_token.type == TokenType.INTEGER and
self.current_token.value == ""):
self.eat(TokenType.INTEGER)
# Optional 小时
if self.current_token.type == TokenType.HOUR:
self.eat(TokenType.HOUR)
hours += 0.5
# Check for direction
if self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD:
self.eat(TokenType.RELATIVE_DIRECTION_FORWARD)
elif self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD:
self.eat(TokenType.RELATIVE_DIRECTION_BACKWARD)
hours = -hours
continue
# Handle 一刻钟 (15 minutes)
if self.current_token.type == TokenType.QUARTER:
self.eat(TokenType.QUARTER)
# Optional 钟
if self.current_token.type == TokenType.ZHONG:
self.eat(TokenType.ZHONG)
minutes += 15
# Check for direction
if self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD:
self.eat(TokenType.RELATIVE_DIRECTION_FORWARD)
elif self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD:
self.eat(TokenType.RELATIVE_DIRECTION_BACKWARD)
minutes = -minutes
continue
# Parse number if we have one
if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]:
number_node = self.parse_number()
number_value = number_node.value
# Determine unit and direction
unit = None
direction = 1 # Forward by default
# Check for unit
if self.current_token.type == TokenType.HOUR:
self.eat(TokenType.HOUR)
# Optional 个
if (self.current_token.type == TokenType.INTEGER and
self.current_token.value == ""):
self.eat(TokenType.INTEGER)
unit = "hour"
elif self.current_token.type == TokenType.MINUTE:
self.eat(TokenType.MINUTE)
unit = "minute"
elif self.current_token.type == TokenType.SECOND:
self.eat(TokenType.SECOND)
unit = "second"
elif self.current_token.type == TokenType.TIME_SEPARATOR:
# Handle "X点", "X分", "X秒" format
sep_value = self.current_token.value
self.eat(TokenType.TIME_SEPARATOR)
if sep_value == "":
unit = "hour"
# Optional 钟
if self.current_token.type == TokenType.ZHONG:
self.eat(TokenType.ZHONG)
# If we have "X点" without a direction, this is likely an absolute time
# Check if there's a direction after
if not (self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD or
self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD):
# This is probably an absolute time, not relative time
# Push back the number and break
break
elif sep_value == "":
unit = "minute"
# Optional 钟
if self.current_token.type == TokenType.ZHONG:
self.eat(TokenType.ZHONG)
elif sep_value == "":
unit = "second"
else:
# If no unit specified, but we have a number followed by a direction,
# assume it's hours
if (self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD or
self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD):
unit = "hour"
else:
# If no unit and no direction, this might not be a relative time expression
# Push the number back and break
# We can't easily push back, so let's break
break
# Check for direction (后/前)
if self.current_token.type == TokenType.RELATIVE_DIRECTION_FORWARD:
self.eat(TokenType.RELATIVE_DIRECTION_FORWARD)
direction = 1
elif self.current_token.type == TokenType.RELATIVE_DIRECTION_BACKWARD:
self.eat(TokenType.RELATIVE_DIRECTION_BACKWARD)
direction = -1
# Apply the value based on unit
if unit == "hour":
hours += number_value * direction
elif unit == "minute":
minutes += number_value * direction
elif unit == "second":
seconds += number_value * direction
continue
# If we still haven't handled the current token, break
break
return RelativeTimeNode(hours=hours, minutes=minutes, seconds=seconds)
def parse_time_expression(self) -> TimeExpressionNode:
"""Parse a complete time expression."""
date_node = None
time_node = None
relative_date_node = None
relative_time_node = None
weekday_node = None
# Parse different parts of the expression
while self.current_token.type != TokenType.EOF:
# Try to parse date first (absolute dates should take precedence)
if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER]:
if date_node is None:
original_pos = self.pos
try:
date_node = self.parse_date()
continue
except ParserError:
# Reset position if parsing failed
self.pos = original_pos
pass
# Try to parse relative date
if self.current_token.type in [
TokenType.RELATIVE_TODAY, TokenType.RELATIVE_TOMORROW,
TokenType.RELATIVE_DAY_AFTER_TOMORROW, TokenType.RELATIVE_THREE_DAYS_AFTER_TOMORROW,
TokenType.RELATIVE_YESTERDAY, TokenType.RELATIVE_DAY_BEFORE_YESTERDAY,
TokenType.RELATIVE_THREE_DAYS_BEFORE_YESTERDAY,
TokenType.INTEGER, TokenType.CHINESE_NUMBER, # For patterns like "X年后", "X个月后", etc.
TokenType.RELATIVE_NEXT, TokenType.RELATIVE_LAST, TokenType.RELATIVE_THIS
]:
if relative_date_node is None:
original_pos = self.pos
try:
relative_date_node = self.parse_relative_date()
continue
except ParserError:
# Reset position if parsing failed
self.pos = original_pos
pass
# Try to parse relative time first (since it can have numbers)
if self.current_token.type in [
TokenType.INTEGER, TokenType.CHINESE_NUMBER,
TokenType.HALF, TokenType.QUARTER,
TokenType.RELATIVE_DIRECTION_FORWARD, TokenType.RELATIVE_DIRECTION_BACKWARD
]:
if relative_time_node is None:
original_pos = self.pos
try:
relative_time_node = self.parse_relative_time()
# Only continue if we actually parsed some relative time
if relative_time_node.hours != 0 or relative_time_node.minutes != 0 or relative_time_node.seconds != 0:
continue
else:
# If we didn't parse any relative time, reset position
self.pos = original_pos
except ParserError:
# Reset position if parsing failed
self.pos = original_pos
pass
# Try to parse time
if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER, TokenType.TIME_SEPARATOR, TokenType.PERIOD_AM, TokenType.PERIOD_PM]:
if time_node is None:
original_pos = self.pos
try:
time_node = self.parse_time()
continue
except ParserError:
# Reset position if parsing failed
self.pos = original_pos
pass
# Try to parse time
if self.current_token.type in [TokenType.INTEGER, TokenType.CHINESE_NUMBER, TokenType.TIME_SEPARATOR, TokenType.PERIOD_AM, TokenType.PERIOD_PM]:
if time_node is None:
original_pos = self.pos
try:
time_node = self.parse_time()
continue
except ParserError:
# Reset position if parsing failed
self.pos = original_pos
pass
# Try to parse weekday
if self.current_token.type in [
TokenType.WEEK_SCOPE_CURRENT, TokenType.WEEK_SCOPE_LAST, TokenType.WEEK_SCOPE_NEXT,
TokenType.WEEKDAY_MONDAY, TokenType.WEEKDAY_TUESDAY, TokenType.WEEKDAY_WEDNESDAY,
TokenType.WEEKDAY_THURSDAY, TokenType.WEEKDAY_FRIDAY, TokenType.WEEKDAY_SATURDAY,
TokenType.WEEKDAY_SUNDAY
]:
if weekday_node is None:
original_pos = self.pos
try:
weekday_node = self.parse_weekday()
continue
except ParserError:
# Reset position if parsing failed
self.pos = original_pos
pass
# If we get here and couldn't parse anything, skip the token
self.pos += 1
return TimeExpressionNode(
date=date_node,
time=time_node,
relative_date=relative_date_node,
relative_time=relative_time_node,
weekday=weekday_node
)
def parse(self) -> TimeExpressionNode:
"""Parse the complete time expression and return the AST."""
return self.parse_time_expression()

View File

@ -0,0 +1,71 @@
"""
Abstract Syntax Tree (AST) nodes for the time expression parser.
"""
from abc import ABC
from typing import Optional
from dataclasses import dataclass
@dataclass
class ASTNode(ABC):
"""Base class for all AST nodes."""
pass
@dataclass
class NumberNode(ASTNode):
"""Represents a numeric value."""
value: int
@dataclass
class DateNode(ASTNode):
"""Represents a date specification."""
year: Optional[ASTNode]
month: Optional[ASTNode]
day: Optional[ASTNode]
@dataclass
class TimeNode(ASTNode):
"""Represents a time specification."""
hour: Optional[ASTNode]
minute: Optional[ASTNode]
second: Optional[ASTNode]
is_24hour: bool = False
period: Optional[str] = None # AM or PM
@dataclass
class RelativeDateNode(ASTNode):
"""Represents a relative date specification."""
years: int = 0
months: int = 0
weeks: int = 0
days: int = 0
@dataclass
class RelativeTimeNode(ASTNode):
"""Represents a relative time specification."""
hours: float = 0.0
minutes: float = 0.0
seconds: float = 0.0
@dataclass
class WeekdayNode(ASTNode):
"""Represents a weekday specification."""
weekday: int # 0=Monday, 6=Sunday
scope: str # current, last, next
@dataclass
class TimeExpressionNode(ASTNode):
"""Represents a complete time expression."""
date: Optional[DateNode] = None
time: Optional[TimeNode] = None
relative_date: Optional[RelativeDateNode] = None
relative_time: Optional[RelativeTimeNode] = None
weekday: Optional[WeekdayNode] = None

View File

@ -0,0 +1,95 @@
"""
Token definitions for the time parser.
"""
from enum import Enum
from typing import Union
from dataclasses import dataclass
class TokenType(Enum):
"""Types of tokens recognized by the lexer."""
# Numbers
INTEGER = "INTEGER"
CHINESE_NUMBER = "CHINESE_NUMBER"
# Time units
YEAR = "YEAR"
MONTH = "MONTH"
DAY = "DAY"
WEEK = "WEEK"
HOUR = "HOUR"
MINUTE = "MINUTE"
SECOND = "SECOND"
# Date separators
DATE_SEPARATOR = "DATE_SEPARATOR" # -, /, 年, 月, 日, 号
# Time separators
TIME_SEPARATOR = "TIME_SEPARATOR" # :, 点, 时, 分, 秒
# Period indicators
PERIOD_AM = "PERIOD_AM" # 上午, 早上, 早晨, etc.
PERIOD_PM = "PERIOD_PM" # 下午, 晚上, 中午, etc.
# Relative time
RELATIVE_TODAY = "RELATIVE_TODAY" # 今天, 今晚, 今早, etc.
RELATIVE_TOMORROW = "RELATIVE_TOMORROW" # 明天
RELATIVE_DAY_AFTER_TOMORROW = "RELATIVE_DAY_AFTER_TOMORROW" # 后天
RELATIVE_THREE_DAYS_AFTER_TOMORROW = "RELATIVE_THREE_DAYS_AFTER_TOMORROW" # 大后天
RELATIVE_YESTERDAY = "RELATIVE_YESTERDAY" # 昨天
RELATIVE_DAY_BEFORE_YESTERDAY = "RELATIVE_DAY_BEFORE_YESTERDAY" # 前天
RELATIVE_THREE_DAYS_BEFORE_YESTERDAY = "RELATIVE_THREE_DAYS_BEFORE_YESTERDAY" # 大前天
RELATIVE_DIRECTION_FORWARD = "RELATIVE_DIRECTION_FORWARD" # 后, 以后, 之后
RELATIVE_DIRECTION_BACKWARD = "RELATIVE_DIRECTION_BACKWARD" # 前, 以前, 之前
# Extended relative time
RELATIVE_NEXT = "RELATIVE_NEXT" # 下
RELATIVE_LAST = "RELATIVE_LAST" # 上, 去
RELATIVE_THIS = "RELATIVE_THIS" # 这, 本
# Week days
WEEKDAY_MONDAY = "WEEKDAY_MONDAY"
WEEKDAY_TUESDAY = "WEEKDAY_TUESDAY"
WEEKDAY_WEDNESDAY = "WEEKDAY_WEDNESDAY"
WEEKDAY_THURSDAY = "WEEKDAY_THURSDAY"
WEEKDAY_FRIDAY = "WEEKDAY_FRIDAY"
WEEKDAY_SATURDAY = "WEEKDAY_SATURDAY"
WEEKDAY_SUNDAY = "WEEKDAY_SUNDAY"
# Week scope
WEEK_SCOPE_CURRENT = "WEEK_SCOPE_CURRENT" # 本
WEEK_SCOPE_LAST = "WEEK_SCOPE_LAST" # 上
WEEK_SCOPE_NEXT = "WEEK_SCOPE_NEXT" # 下
# Special time markers
HALF = "HALF" # 半
QUARTER = "QUARTER" # 一刻
ZHENG = "ZHENG" # 整
ZHONG = "ZHONG" # 钟
# Student-friendly time expressions
EARLY_MORNING = "EARLY_MORNING" # 早X
LATE_NIGHT = "LATE_NIGHT" # 晚X
# Whitespace
WHITESPACE = "WHITESPACE"
# End of input
EOF = "EOF"
@dataclass
class Token:
"""Represents a single token from the lexer."""
type: TokenType
value: Union[str, int]
position: int
def __str__(self):
return f"Token({self.type.value}, {repr(self.value)}, {self.position})"
def __repr__(self):
return self.__str__()

View File

@ -0,0 +1,369 @@
"""
Semantic analyzer for time expressions that evaluates the AST and produces datetime objects.
"""
import datetime
import calendar
from typing import Optional
from .ptime_ast import (
TimeExpressionNode, DateNode, TimeNode,
RelativeDateNode, RelativeTimeNode, WeekdayNode, NumberNode
)
from .err import TokenUnhandledException
class SemanticAnalyzer:
"""Semantic analyzer that evaluates time expression ASTs."""
def __init__(self, now: Optional[datetime.datetime] = None):
self.now = now or datetime.datetime.now()
def evaluate_number(self, node: NumberNode) -> int:
"""Evaluate a number node."""
return node.value
def evaluate_date(self, node: DateNode) -> datetime.date:
"""Evaluate a date node."""
year = self.now.year
month = 1
day = 1
if node.year is not None:
year = self.evaluate_number(node.year)
if node.month is not None:
month = self.evaluate_number(node.month)
if node.day is not None:
day = self.evaluate_number(node.day)
return datetime.date(year, month, day)
def evaluate_time(self, node: TimeNode) -> datetime.time:
"""Evaluate a time node."""
hour = 0
minute = 0
second = 0
if node.hour is not None:
hour = self.evaluate_number(node.hour)
if node.minute is not None:
minute = self.evaluate_number(node.minute)
if node.second is not None:
second = self.evaluate_number(node.second)
# Handle 24-hour vs 12-hour format
if not node.is_24hour and node.period is not None:
if node.period == "AM":
if hour == 12:
hour = 0
elif node.period == "PM":
if hour != 12 and hour <= 12:
hour += 12
# Validate time values
if not (0 <= hour <= 23):
raise TokenUnhandledException(f"Invalid hour: {hour}")
if not (0 <= minute <= 59):
raise TokenUnhandledException(f"Invalid minute: {minute}")
if not (0 <= second <= 59):
raise TokenUnhandledException(f"Invalid second: {second}")
return datetime.time(hour, minute, second)
def evaluate_relative_date(self, node: RelativeDateNode) -> datetime.timedelta:
"""Evaluate a relative date node."""
# Start with current time
result = self.now
# Special case: If weeks contains a target day (hacky way to pass target day info)
# This is for patterns like "下个月五号"
if node.weeks > 0 and node.weeks <= 31: # Valid day range
target_day = node.weeks
# Calculate the target month
if node.months != 0:
# Handle month arithmetic carefully
total_months = result.month + node.months - 1
new_year = result.year + total_months // 12
new_month = total_months % 12 + 1
# Handle day overflow (e.g., Jan 31 + 1 month = Feb 28/29)
max_day_in_target_month = calendar.monthrange(new_year, new_month)[1]
target_day = min(target_day, max_day_in_target_month)
try:
result = result.replace(year=new_year, month=new_month, day=target_day)
except ValueError:
# Handle edge cases
result = result.replace(year=new_year, month=new_month, day=max_day_in_target_month)
# Return the difference between the new date and the original date
return result - self.now
# Apply years
if node.years != 0:
# Handle year arithmetic carefully due to leap years
new_year = result.year + node.years
try:
result = result.replace(year=new_year)
except ValueError:
# Handle leap year edge case (Feb 29 -> Feb 28)
result = result.replace(year=new_year, month=2, day=28)
# Apply months
if node.months != 0:
# Check if this is a special marker for absolute month (negative offset)
if node.months < 0:
# This is an absolute month specification (e.g., from "明年五月")
absolute_month = node.months + 100
if 1 <= absolute_month <= 12:
result = result.replace(year=result.year, month=absolute_month, day=result.day)
else:
# Handle month arithmetic carefully
total_months = result.month + node.months - 1
new_year = result.year + total_months // 12
new_month = total_months % 12 + 1
# Handle day overflow (e.g., Jan 31 + 1 month = Feb 28/29)
new_day = min(result.day, calendar.monthrange(new_year, new_month)[1])
result = result.replace(year=new_year, month=new_month, day=new_day)
# Apply weeks and days
if node.weeks != 0 or node.days != 0:
delta_days = node.weeks * 7 + node.days
result = result + datetime.timedelta(days=delta_days)
return result - self.now
def evaluate_relative_time(self, node: RelativeTimeNode) -> datetime.timedelta:
"""Evaluate a relative time node."""
# Convert all values to seconds for precise calculation
total_seconds = (
node.hours * 3600 +
node.minutes * 60 +
node.seconds
)
return datetime.timedelta(seconds=total_seconds)
def evaluate_weekday(self, node: WeekdayNode) -> datetime.timedelta:
"""Evaluate a weekday node."""
current_weekday = self.now.weekday() # 0=Monday, 6=Sunday
target_weekday = node.weekday
if node.scope == "current":
delta = target_weekday - current_weekday
elif node.scope == "last":
delta = target_weekday - current_weekday - 7
elif node.scope == "next":
delta = target_weekday - current_weekday + 7
else:
delta = target_weekday - current_weekday
return datetime.timedelta(days=delta)
def infer_smart_time(self, hour: int, minute: int = 0, second: int = 0, base_time: Optional[datetime.datetime] = None) -> datetime.datetime:
"""
Smart time inference based on current time.
For example:
- If now is 14:30 and user says "3点", interpret as 15:00
- If now is 14:30 and user says "1点", interpret as next day 01:00
- If now is 8:00 and user says "3点", interpret as 15:00
- If now is 8:00 and user says "9点", interpret as 09:00
"""
# Use base_time if provided, otherwise use self.now
now = base_time if base_time is not None else self.now
# Handle 24-hour format directly (13-23)
if 13 <= hour <= 23:
candidate = now.replace(hour=hour, minute=minute, second=second, microsecond=0)
if candidate <= now:
candidate += datetime.timedelta(days=1)
return candidate
# Handle 12 (noon/midnight)
if hour == 12:
# For 12 specifically, we need to be more careful
# Try noon first
noon_candidate = now.replace(hour=12, minute=minute, second=second, microsecond=0)
midnight_candidate = now.replace(hour=0, minute=minute, second=second, microsecond=0)
# Special case: If it's afternoon or evening, "十二点" likely means next day midnight
if now.hour >= 12:
result = midnight_candidate + datetime.timedelta(days=1)
return result
# If noon is in the future and closer than midnight, use it
if noon_candidate > now and (midnight_candidate <= now or noon_candidate < midnight_candidate):
return noon_candidate
# If midnight is in the future, use it
elif midnight_candidate > now:
return midnight_candidate
# Both are in the past, use the closer one
elif noon_candidate > midnight_candidate:
return noon_candidate
# Otherwise use midnight next day
else:
result = midnight_candidate + datetime.timedelta(days=1)
return result
# Handle 1-11 (12-hour format)
if 1 <= hour <= 11:
# Calculate 12-hour format candidates
pm_hour = hour + 12
pm_candidate = now.replace(hour=pm_hour, minute=minute, second=second, microsecond=0)
am_candidate = now.replace(hour=hour, minute=minute, second=second, microsecond=0)
# Special case: If it's afternoon (12:00-18:00) and the hour is 1-6,
# user might mean either PM today or AM tomorrow.
# But if PM is in the future, that's more likely what they mean.
if 12 <= now.hour <= 18 and 1 <= hour <= 6:
if pm_candidate > now:
return pm_candidate
else:
# PM is in the past, so use AM tomorrow
result = am_candidate + datetime.timedelta(days=1)
return result
# Special case: If it's late evening (after 22:00) and user specifies early morning hours (1-5),
# user likely means next day early morning
if now.hour >= 22 and 1 <= hour <= 5:
result = am_candidate + datetime.timedelta(days=1)
return result
# Special case: In the morning (0-12:00)
if now.hour < 12:
# In the morning, for hours 1-11, generally prefer AM interpretation
# unless it's a very early hour that's much earlier than current time
# Only push to next day for very early hours (1-2) that are significantly earlier
if hour <= 2 and hour < now.hour and now.hour - hour >= 6:
# Very early morning hour that's significantly earlier, use next day
result = am_candidate + datetime.timedelta(days=1)
return result
else:
# For morning, generally prefer AM if it's in the future
if am_candidate > now:
return am_candidate
# If PM is in the future, use it
elif pm_candidate > now:
return pm_candidate
# Both are in the past, prefer AM if it's closer
elif am_candidate > pm_candidate:
return am_candidate
# Otherwise use PM next day
else:
result = pm_candidate + datetime.timedelta(days=1)
return result
else:
# General case: choose the one that's in the future and closer
if pm_candidate > now and (am_candidate <= now or pm_candidate < am_candidate):
return pm_candidate
elif am_candidate > now:
return am_candidate
# Both are in the past, use the closer one
elif pm_candidate > am_candidate:
return pm_candidate
# Otherwise use AM next day
else:
result = am_candidate + datetime.timedelta(days=1)
return result
# Handle 0 (midnight)
if hour == 0:
candidate = now.replace(hour=0, minute=minute, second=second, microsecond=0)
if candidate <= now:
candidate += datetime.timedelta(days=1)
return candidate
# Default case (should not happen with valid input)
candidate = now.replace(hour=hour, minute=minute, second=second, microsecond=0)
if candidate <= now:
candidate += datetime.timedelta(days=1)
return candidate
def evaluate(self, node: TimeExpressionNode) -> datetime.datetime:
"""Evaluate a complete time expression node."""
result = self.now
# Apply relative date (should set time to 00:00:00 for dates)
if node.relative_date is not None:
delta = self.evaluate_relative_date(node.relative_date)
result = result + delta
# For relative dates like "今天", "明天", set time to 00:00:00
# But only for cases where we're dealing with days, not years/months
if (node.date is None and node.time is None and node.weekday is None and
node.relative_date.years == 0 and node.relative_date.months == 0):
result = result.replace(hour=0, minute=0, second=0, microsecond=0)
# Apply weekday
if node.weekday is not None:
delta = self.evaluate_weekday(node.weekday)
result = result + delta
# For weekdays, set time to 00:00:00
if node.date is None and node.time is None:
result = result.replace(hour=0, minute=0, second=0, microsecond=0)
# Apply relative time
if node.relative_time is not None:
delta = self.evaluate_relative_time(node.relative_time)
result = result + delta
# Apply absolute date
if node.date is not None:
date = self.evaluate_date(node.date)
result = result.replace(year=date.year, month=date.month, day=date.day)
# For absolute dates without time, set time to 00:00:00
if node.time is None:
result = result.replace(hour=0, minute=0, second=0, microsecond=0)
# Apply time
if node.time is not None:
time = self.evaluate_time(node.time)
# Handle explicit period or student-friendly expressions
if node.time.is_24hour or node.time.period is not None:
# Handle explicit period
if not node.time.is_24hour and node.time.period is not None:
hour = time.hour
minute = time.minute
second = time.second
if node.time.period == "AM":
if hour == 12:
hour = 0
elif node.time.period == "PM":
# Special case: "晚上十二点" should be interpreted as next day 00:00
if hour == 12 and minute == 0 and second == 0:
# Move to next day at 00:00:00
result = result.replace(hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=1)
# Skip the general replacement since we've already handled it
skip_general_replacement = True
else:
# For other PM times, convert to 24-hour format
if hour != 12 and hour <= 12:
hour += 12
# Validate hour
if not (0 <= hour <= 23):
raise TokenUnhandledException(f"Invalid hour: {hour}")
# Only do general replacement if we haven't handled it specially
if not locals().get('skip_general_replacement', False):
result = result.replace(hour=hour, minute=minute, second=second, microsecond=0)
else:
# Already in 24-hour format
result = result.replace(hour=time.hour, minute=time.minute, second=time.second, microsecond=0)
else:
# Use smart time inference for regular times
# But if we have an explicit date, treat the time as 24-hour format
if node.date is not None or node.relative_date is not None:
# For explicit dates, treat time as 24-hour format
result = result.replace(hour=time.hour, minute=time.minute or 0, second=time.second or 0, microsecond=0)
else:
# Use smart time inference for regular times
smart_time = self.infer_smart_time(time.hour, time.minute, time.second, base_time=result)
result = smart_time
return result

View File

@ -5,6 +5,7 @@ from pydantic import BaseModel
class Config(BaseModel):
module_web_render_weburl: str = "localhost:5173"
module_web_render_instance: str = ""
module_web_render_playwright_ws: str = ""
def get_instance_baseurl(self):
if self.module_web_render_instance:

View File

@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
import asyncio
import queue
from typing import Any, Callable, Coroutine
from typing import Any, Callable, Coroutine, Generic, TypeVar
from loguru import logger
from playwright.async_api import (
Page,
@ -8,9 +9,14 @@ from playwright.async_api import (
async_playwright,
Browser,
BrowserContext,
Error as PlaywrightError,
)
from .config import web_render_config
from playwright.async_api import ConsoleMessage, Page
T = TypeVar("T")
TFunction = Callable[[T], Coroutine[Any, Any, Any]]
PageFunction = Callable[[Page], Coroutine[Any, Any, Any]]
@ -22,23 +28,17 @@ class WebRenderer:
@classmethod
async def get_browser_instance(cls) -> "WebRendererInstance":
if cls.browser_pool.empty():
instance = await WebRendererInstance.create()
if web_render_config.module_web_render_playwright_ws:
instance = await RemotePlaywrightInstance.create(
web_render_config.module_web_render_playwright_ws
)
else:
instance = await LocalPlaywrightInstance.create()
cls.browser_pool.put(instance)
instance = cls.browser_pool.get()
cls.browser_pool.put(instance)
return instance
@classmethod
async def get_browser_context(cls) -> BrowserContext:
instance = await cls.get_browser_instance()
if id(instance) not in cls.context_pool:
context = await instance.browser.new_context()
cls.context_pool[id(instance)] = context
logger.debug(
f"Created new persistent browser context for WebRendererInstance {id(instance)}"
)
return cls.context_pool[id(instance)]
@classmethod
async def render(
cls,
@ -67,49 +67,6 @@ class WebRenderer:
url, target, params=params, other_function=other_function, timeout=timeout
)
@classmethod
async def render_persistent_page(
cls,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
"""
使用长期挂载的页面访问指定URL并返回截图
:param page_id: 页面唯一标识符
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
"""
logger.debug(
f"Requesting persistent render for page_id {page_id} at {url} targeting {target} with timeout {timeout}"
)
instance = await cls.get_browser_instance()
if page_id not in cls.page_pool:
context = await cls.get_browser_context()
page = await context.new_page()
cls.page_pool[page_id] = page
logger.debug(
f"Created new persistent page for page_id {page_id} using WebRendererInstance {id(instance)}"
)
page = cls.page_pool[page_id]
return await instance.render_with_page(
page,
url,
target,
params=params,
other_function=other_function,
timeout=timeout,
)
@classmethod
async def render_file(
cls,
@ -142,6 +99,75 @@ class WebRenderer:
timeout=timeout,
)
@classmethod
async def render_with_persistent_page(
cls,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
"""
使用长期挂载的页面进行渲染
:param page_id: 页面唯一标识符
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
"""
instance = await cls.get_browser_instance()
logger.debug(
f"Using WebRendererInstance {id(instance)} to render with persistent page {page_id} targeting {target}"
)
return await instance.render_with_persistent_page(
page_id,
url,
target,
params=params,
other_function=other_function,
timeout=timeout,
)
@classmethod
async def get_persistent_page(cls, page_id: str, url: str) -> Page:
"""
获取长期挂载的页面,如果不存在则创建一个新的页面并存储
"""
if page_id in cls.page_pool:
return cls.page_pool[page_id]
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
instance = await cls.get_browser_instance()
if isinstance(instance, RemotePlaywrightInstance):
context = await instance.browser.new_context()
page = await context.new_page()
await page.goto(url)
cls.page_pool[page_id] = page
logger.debug(f"Created new persistent page for page_id {page_id}, navigated to {url}")
page.on('console', on_console)
return page
elif isinstance(instance, LocalPlaywrightInstance):
context = await instance.browser.new_context()
page = await context.new_page()
await page.goto(url)
cls.page_pool[page_id] = page
logger.debug(f"Created new persistent page for page_id {page_id}, navigated to {url}")
page.on('console', on_console)
return page
else:
raise NotImplementedError("Unsupported WebRendererInstance type")
@classmethod
async def close_persistent_page(cls, page_id: str) -> None:
"""
@ -156,38 +182,56 @@ class WebRenderer:
logger.debug(f"Closed and removed persistent page for page_id {page_id}")
class WebRendererInstance:
def __init__(self):
self._playwright: Playwright | None = None
self._browser: Browser | None = None
class WebRendererInstance(ABC, Generic[T]):
@abstractmethod
async def render(
self,
url: str,
target: str,
index: int = 0,
params: dict[str, Any] | None = None,
other_function: TFunction | None = None,
timeout: int = 30,
) -> bytes: ...
@abstractmethod
async def render_file(
self,
file_path: str,
target: str,
index: int = 0,
params: dict[str, Any] | None = None,
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes: ...
@abstractmethod
async def render_with_persistent_page(
self,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes: ...
class PlaywrightInstance(WebRendererInstance[Page]):
def __init__(self) -> None:
super().__init__()
self.lock = asyncio.Lock()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None
return self._browser
async def init(self):
self._playwright = await async_playwright().start()
self._browser = await self.playwright.chromium.launch(headless=True)
@classmethod
async def create(cls) -> "WebRendererInstance":
instance = cls()
await instance.init()
return instance
@abstractmethod
def browser(self) -> Browser: ...
async def render(
self,
url: str,
target: str,
index: int = 0,
params: dict = {},
params: dict[str, Any] | None = None,
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
@ -207,42 +251,41 @@ class WebRendererInstance:
context = await self.browser.new_context()
page = await context.new_page()
screenshot = await self.inner_render(
page, url, target, index, params, other_function, timeout
page, url, target, index, params or {}, other_function, timeout
)
await page.close()
await context.close()
return screenshot
async def render_with_page(
self,
page: Page,
url: str,
target: str,
index: int = 0,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
async with self.lock:
screenshot = await self.inner_render(
page, url, target, index, params, other_function, timeout
)
return screenshot
async def render_file(
self,
file_path: str,
target: str,
index: int = 0,
params: dict = {},
params: dict[str, Any] | None = None,
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
file_path = "file:///" + str(file_path).replace("\\", "/")
return await self.render(
file_path, target, index, params, other_function, timeout
file_path, target, index, params or {}, other_function, timeout
)
async def render_with_persistent_page(
self,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
page = await WebRenderer.get_persistent_page(page_id, url)
screenshot = await self.inner_render(
page, url, target, 0, params, other_function, timeout
)
return screenshot
async def inner_render(
self,
page: Page,
@ -276,6 +319,85 @@ class WebRendererInstance:
logger.debug("Screenshot taken successfully")
return screenshot
class LocalPlaywrightInstance(PlaywrightInstance):
def __init__(self):
self._playwright: Playwright | None = None
self._browser: Browser | None = None
super().__init__()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None
return self._browser
async def init(self):
self._playwright = await async_playwright().start()
self._browser = await self.playwright.chromium.launch(headless=True)
@classmethod
async def create(cls) -> "WebRendererInstance":
instance = cls()
await instance.init()
return instance
async def close(self):
await self.browser.close()
await self.playwright.stop()
class RemotePlaywrightInstance(PlaywrightInstance):
def __init__(self, ws_endpoint: str) -> None:
self._playwright: Playwright | None = None
self._browser: Browser | None = None
self._ws_endpoint = ws_endpoint
super().__init__()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None, "Playwright must be initialized by calling init()."
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None, "Browser must be connected by calling init()."
return self._browser
async def init(self):
logger.info(f"尝试连接远程 Playwright 服务器: {self._ws_endpoint}")
self._playwright = await async_playwright().start()
try:
self._browser = await self.playwright.chromium.connect(
self._ws_endpoint
)
logger.info("成功连接到远程 Playwright 服务器。")
except PlaywrightError as e:
await self.playwright.stop()
raise ConnectionError(
f"无法连接到远程 Playwright 服务器 ({self._ws_endpoint}){e}"
) from e
@classmethod
async def create(cls, ws_endpoint: str) -> "RemotePlaywrightInstance":
"""
创建并初始化远程 Playwright 实例的工厂方法。
"""
instance = cls(ws_endpoint)
await instance.init()
return instance
async def close(self):
"""
断开与远程浏览器的连接并停止本地 Playwright 实例。
"""
if self._browser:
await self.browser.close()
if self._playwright:
await self.playwright.stop()
print("已断开远程连接,本地 Playwright 实例已停止。")

View File

@ -1,4 +1,4 @@
关于「中间答案」或者「提示」
# 关于「中间答案」或者「提示」
在 KonaPH 中,当有人发送「提交答案 答案」时,会检查答案是否符合你设置的中间答案的 pattern。这个 pattern 可以有两种方式:

View File

@ -1,2 +1,3 @@
关于罗文和洛温
AdoreLowen 希望和洛温阿特金森区分,所以最好就不要叫他洛温了!此方 BOT 会在一些群提醒叫错了的人。
# 关于罗文和洛温
AdoreLowen 希望和洛温阿特金森区分,所以最好就不要叫他洛温了!此方 BOT 会在一些群提醒叫错了的人。

View File

@ -1,45 +1,48 @@
指令介绍
is_admin - 用于判断当前事件是否来自管理员的内部权限校验函数
# 指令介绍
格式
from konabot.common.nb.is_admin import is_admin
from nonebot import on
from nonebot.adapters import Event
from loguru import logger
`is_admin` - 用于判断当前事件是否来自管理员的内部权限校验函数
@on().handle()
async def _(event: Event):
if is_admin(event):
logger.info("管理员发送了消息")
# 格式
说明
is_admin 是 Bot 内部用于权限控制的核心函数根据事件来源QQ、Discord、控制台及插件配置判断触发事件的用户或群组是否具有管理员权限。
```python
from konabot.common.nb.is_admin import is_admin
from nonebot import on
from nonebot.adapters import Event
from loguru import logger
@on().handle()
async def _(event: Event):
if is_admin(event):
logger.info("管理员发送了消息")
```
# 说明
is_admin 是 Bot 内部用于权限控制的核心函数根据事件来源QQ、Discord、控制台及插件配置判断触发事件的用户或群组是否具有管理员权限。
支持的适配器与判定逻辑:
• OneBot V11QQ
- 若用户 ID 在配置项 admin_qq_account 中,则视为管理员
- 若为群聊消息,且群 ID 在配置项 admin_qq_group 中,则视为管理员
• Discord
- 若频道 ID 在配置项 admin_discord_channel 中,则视为管理员
- 若用户 ID 在配置项 admin_discord_account 中,则视为管理员
- OneBot V11QQ
- 若用户 ID 在配置项 admin_qq_account 中,则视为管理员
- 若为群聊消息,且群 ID 在配置项 admin_qq_group 中,则视为管理员
- Discord
- 若频道 ID 在配置项 admin_discord_channel 中,则视为管理员
- 若用户 ID 在配置项 admin_discord_account 中,则视为管理员
- Console控制台
- 所有控制台输入均默认视为管理员操作,自动返回 True
• Console控制台
- 所有控制台输入均默认视为管理员操作,自动返回 True
# 配置项(位于插件配置中
配置项(位于插件配置中)
ADMIN_QQ_GROUP: list[int]
允许的管理员 QQ 群 ID 列表
- `ADMIN_QQ_GROUP`: `list[int]`
- 允许的管理员 QQ 群 ID 列表
- `ADMIN_QQ_ACCOUNT`: `list[int]`
- 允许的管理员 QQ 账号 ID 列表
- `ADMIN_DISCORD_CHANNEL`: `list[int]`
- 允许的管理员 Discord 频道 ID 列表
- `ADMIN_DISCORD_ACCOUNT`: `list[int]`
- 允许的管理员 Discord 用户 ID 列表
ADMIN_QQ_ACCOUNT: list[int]
允许的管理员 QQ 账号 ID 列表
# 注意事项
ADMIN_DISCORD_CHANNEL: list[int]
允许的管理员 Discord 频道 ID 列表
ADMIN_DISCORD_ACCOUNT: list[int]
允许的管理员 Discord 用户 ID 列表
注意事项
- 若未在配置文件中设置任何管理员 ID该函数对所有非控制台事件返回 False
- 控制台事件始终拥有管理员权限,便于本地调试与运维
- 若未在配置文件中设置任何管理员 ID该函数对所有非控制台事件返回 False
- 控制台事件始终拥有管理员权限,便于本地调试与运维

View File

@ -1,4 +1,5 @@
指令介绍
konaph - KonaBot 的 PuzzleHunt 管理工具
# 指令介绍
`konaph` - KonaBot 的 PuzzleHunt 管理工具
详细介绍请直接输入 konaph 获取使用指引(该指令权限仅对部分人开放。如果你有权限的话才有响应。建议在此方 BOT 私聊使用该指令。)

51
konabot/docs/user/fx.txt Normal file
View File

@ -0,0 +1,51 @@
## 指令介绍
`fx` - 用于对图片应用各种滤镜效果的指令
## 格式
```
fx [滤镜名称] <参数1> <参数2> ...
```
## 示例
- `fx 模糊`
- `fx 阈值 150`
- `fx 缩放 2.0`
- `fx 色彩 1.8`
- `fx 色键 rgb(0,255,0) 50`
## 可用滤镜列表
### 基础滤镜
* ```fx 模糊 <半径=10>```
* ```fx 马赛克 <像素大小=10>```
* ```fx 轮廓```
* ```fx 锐化```
* ```fx 边缘增强```
* ```fx 浮雕```
* ```fx 查找边缘```
* ```fx 平滑```
### 色彩处理滤镜
* ```fx 反色```
* ```fx 黑白```
* ```fx 阈值 <阈值=128>```
* ```fx 对比度 <因子=1.5>```
* ```fx 亮度 <因子=1.5>```
* ```fx 色彩 <因子=1.5>```
* ```fx 色调 <颜色="rgb(255,0,0)">```
### 几何变换滤镜
* ```fx 缩放 <比例=1.5>```
* ```fx 波纹 <振幅=5> <波长=20>```
### 特殊效果滤镜
* ```fx 色键 <目标颜色="rgb(255,0,0)"> <容差=60>```
## 颜色名称支持
- **基本颜色**:红、绿、蓝、黄、紫、黑、白、橙、粉、灰、青、靛、棕
- **修饰词**:浅、深、亮、暗(可组合使用,如`浅红`、`深蓝`
- **RGB格式**`rgb(255,0,0)`、`rgb(0,255,0)`、`(255,0,0)` 等
- **HEX格式**`#66ccff`等

View File

@ -1,59 +1,83 @@
指令介绍
giftool - 对 GIF 动图进行裁剪、抽帧等处理
# giftool - 对 GIF 动图进行裁剪、抽帧等处理
格式
giftool [图片] [选项]
## 格式
示例
回复一张 GIF 并发送:
`giftool --ss 1.5 -t 2.0`
从 1.5 秒处开始,截取 2 秒长度的片段。
```bash
giftool [图片] [选项]
```
`giftool [图片] --ss 0:10 -to 0:15`
截取从 10 秒到 15 秒之间的片段(支持 MM:SS 或 HH:MM:SS 格式)。
## 示例
`giftool [图片] --frames:v 10`
将整张 GIF 均匀抽帧,最终保留 10 帧。
- **回复一张 GIF 并发送:**
`giftool [图片] --ss 2 --frames:v 5`
从第 2 秒开始截取,并将结果抽帧为 5 帧。
```bash
giftool --ss 1.5 -t 2.0
```
参数说明
图片(必需)
- 必须是 GIF 动图。
- 支持直接附带图片,或回复一条含 GIF 的消息后使用指令。
从 1.5 秒处开始,截取 2 秒长度的片段。
--ss <时间戳>(可选)
- 指定开始时间(单位:秒),可使用以下格式:
• 纯数字(如 `1.5` 表示 1.5 秒)
• 分秒格式(如 `1:30` 表示 1 分 30 秒)
• 时分秒格式(如 `0:1:30` 表示 1 分 30 秒)
- 默认从开头开始0 秒)。
- ```bash
giftool [图片] --ss 0:10 -to 0:15
```
-t <持续时间>(可选)
- 指定截取的持续时间(单位:秒),格式同 --ss。
- 与 --ss 配合使用:截取 [ss, ss + t] 区间。
- 不能与 --to 同时使用。
截取从 10 秒到 15 秒之间的片段(支持 `MM:SS` 或 `HH:MM:SS` 格式)。
--to <时间戳>(可选)
- 指定结束时间(单位:秒),格式同 --ss。
- 与 --ss 配合使用:截取 [ss, to] 区间。
- 不能与 -t 同时使用。
- ```bash
giftool [图片] --frames:v 10
```
--frames:v <帧数>(可选)
- 对截取后的片段进行均匀抽帧,保留指定数量的帧。
- 帧数必须为正整数(> 0
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
将整张 GIF 均匀抽帧,最终保留 10 帧。
--speed <速度>(可选)
- 调整 gif 图的速度。若为负数,则代表倒放
- ```bash
giftool [图片] --ss 2 --frames:v 5
```
使用方式
1. 发送指令前,请确保:
- 消息中附带一张 GIF 动图,或
- 回复一条包含 GIF 动图的消息后再发送指令。
2. 插件会自动:
- 解析 GIF 的每一帧及其持续时间duration
- 根据时间参数转换为帧索引进行裁剪
- 如指定抽帧,则对裁剪后的片段均匀采样
- 生成新的 GIF 并保持原始循环设置loop=0
从第 2 秒开始截取,并将结果抽帧为 5 帧。
## 参数说明
### 图片(必需)
- 必须是 GIF 动图。
- 支持直接附带图片,或回复一条含 GIF 的消息后使用指令。
### `--ss <时间戳>`(可选)
- 指定开始时间(单位:秒),可使用以下格式:
- 纯数字(如 `1.5` 表示 1.5 秒)
- 分秒格式(如 `1:30` 表示 1 分 30 秒)
- 时分秒格式(如 `0:1:30` 表示 1 分 30 秒)
- 默认从开头开始0 秒)。
### `-t <持续时间>`(可选)
- 指定截取的持续时间(单位:秒),格式同 `--ss`。
- 与 `--ss` 配合使用:截取 `[ss, ss + t]` 区间。
- **不能与 `--to` 同时使用。**
### `--to <时间戳>`(可选)
- 指定结束时间(单位:秒),格式同 `--ss`。
- 与 `--ss` 配合使用:截取 `[ss, to]` 区间。
- **不能与 `-t` 同时使用。**
### `--frames:v <帧数>`(可选)
- 对截取后的片段进行均匀抽帧,保留指定数量的帧。
- 帧数必须为正整数(> 0
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
### `--speed <速度>`(可选)
- 调整 GIF 图的速度。若为负数,则代表倒放。
## 使用方式
1. 发送指令前,请确保:
- 消息中附带一张 GIF 动图,**或**
- 回复一条包含 GIF 动图的消息后再发送指令。
2. 插件会自动:
- 解析 GIF 的每一帧及其持续时间duration
- 根据时间参数转换为帧索引进行裁剪
- 如指定抽帧,则对裁剪后的片段均匀采样
- 生成新的 GIF 并保持原始循环设置(`loop=0`

View File

@ -1,20 +1,33 @@
指令介绍
man - 用于展示此方 BOT 使用手册的指令
# 指令介绍
格式
man 文档类型
man [文档类型] <指令>
`man` - 用于展示此方 BOT 使用手册的指令
示例
`man` 查看所有有文档的指令清单
`man 3` 列举所有可读文档的库函数清单
`man 喵` 查看指令「喵」的使用说明
`man 8 out` 查看管理员指令「out」的使用说明
## 格式
文档类型
文档类型用来区分同一指令在不同场景下的情景。你可以使用数字编号进行筛选。分为这些种类:
```
man 文档类型
man [文档类型] <指令>
```
- 1 用户态指令,用于日常使用的指令
- 3 库函数指令,用于 Bot 开发用的函数查询
- 7 概念指令,用于概念解释
- 8 系统指令,仅管理员可用
## 示例
- ``man``
查看所有有文档的指令清单
- ``man 3``
列举所有可读文档的库函数清单
- ``man 喵``
查看指令「喵」的使用说明
- ``man 8 out``
查看管理员指令「out」的使用说明
## 文档类型
文档类型用来区分同一指令在不同场景下的情景。你可以使用数字编号进行筛选。分为以下种类:
- **1** 用户态指令:用于日常使用的指令
- **3** 库函数指令:用于 Bot 开发用的函数查询
- **7** 概念指令:用于概念解释
- **8** 系统指令:仅管理员可用

View File

@ -1,15 +1,16 @@
指令介绍
ntfy - 配置使用 ntfy 来更好地为你通知此方 BOT 代办
## 指令介绍
**`ntfy`** - 配置使用 [ntfy](https://ntfy.sh/) 来更好地为你通知此方 BOT 代办事项。
指令示例
`ntfy 创建`
创建一个随机的 ntfy 订阅主题来提醒代办,此方 Bot 将会给你使用指引。你可以前往 https://ntfy.sh/ 官网下载 ntfy APP或者使用网页版 ntfy。
## 指令示例
`ntfy 创建 kagami-notice`
创建一个名字含有 kagami-notice 的 ntfy 订阅主题
- **`ntfy 创建`**
创建一个随机的 ntfy 订阅主题来提醒代办。此方 Bot 将会给你使用指引。你可以前往 [https://ntfy.sh/](https://ntfy.sh/) 官网下载 ntfy APP或者使用网页版 ntfy。
`ntfy 删除`
清除并不再使用 ntfy 向你通知
- **`ntfy 创建 kagami-notice`**
创建一个名称包含 `kagami-notice` 的 ntfy 订阅主题。
另见
提醒我(1) 查询提醒(1) 删除提醒(1)
- **`ntfy 删除`**
清除配置,不再使用 ntfy 向你发送通知。
## 另见
[`提醒我(1)`](#) [`查询提醒(1)`](#) [`删除提醒(1)`](#)

View File

@ -1,21 +1,39 @@
指令介绍
openssl - 用于生成指定长度的加密安全随机数据
# 指令介绍
格式
openssl rand <模式> <字节数>
`openssl rand` — 用于生成指定长度的加密安全随机数据。
示例
`openssl rand -hex 16` 生成 16 字节的十六进制随机数
`openssl rand -base64 32` 生成 32 字节并以 Base64 编码输出的随机数据
## 格式
说明
该指令使用 Python 的 secrets 模块生成加密安全的随机字节,并支持以十六进制(-hex或 Base64-base64格式输出。
```bash
openssl rand <模式> <字节数>
```
参数说明
模式mode
- -hex :以十六进制字符串形式输出随机数据
- -base64 :以 Base64 编码字符串形式输出随机数据
## 示例
字节数num
- 必须为正整数
- 最大支持 256 字节
- ```bash
openssl rand -hex 16
```
生成 16 字节的十六进制随机数。
- ```bash
openssl rand -base64 32
```
生成 32 字节并以 Base64 编码输出的随机数据。
## 说明
该指令使用 Python 的 `secrets` 模块生成加密安全的随机字节,并支持以以下格式输出:
- 十六进制(`-hex`
- Base64 编码(`-base64`
## 参数说明
### 模式mode
- `-hex`:以十六进制字符串形式输出随机数据
- `-base64`:以 Base64 编码字符串形式输出随机数据
### 字节数num
- 必须为正整数
- 最大支持 256 字节

View File

@ -1,47 +1,55 @@
指令介绍
shadertool - 使用 SkSLSkia Shader Language代码实时渲染并生成 GIF 动画
# 指令介绍
`shadertool` - 使用 SkSLSkia Shader Language代码实时渲染并生成 GIF 动画
格式
shadertool [选项] <SkSL 代码>
## 格式
```bash
shadertool [选项] <SkSL 代码>
```
示例
shadertool """
uniform float u_time;
uniform float2 u_resolution;
## 示例
```bash
shadertool """
uniform float u_time;
uniform float2 u_resolution;
half4 main(float2 coord) {
return half4(
1.0,
sin((coord.y / u_resolution.y + u_time) * 3.1415926 * 2) * 0.5 + 0.5,
coord.x / u_resolution.x,
1.0
);
}
"""
half4 main(float2 coord) {
return half4(
1.0,
sin((coord.y / u_resolution.y + u_time) * 3.1415926 * 2) * 0.5 + 0.5,
coord.x / u_resolution.x,
1.0
);
}
"""
```
参数说明
SkSL 代码(必填)
- 类型:字符串(建议用英文双引号包裹)
- 内容:符合 SkSL 语法的片段着色器代码,必须包含 `void main()` 函数,并为 `sk_FragColor` 赋值。
- 注意:插件会自动去除代码首尾的单引号或双引号,便于命令行输入。
## 参数说明
--width <整数>(可选)
- 默认值320
- 作用:输出 GIF 的宽度(像素),必须大于 0
### SkSL 代码(必填)
- **类型**:字符串(建议用英文双引号包裹)
- **内容**:符合 SkSL 语法的片段着色器代码,必须包含 `main` 函数,并返回 `half4` 类型的颜色值
- **注意**:插件会自动去除代码首尾的单引号或双引号,便于命令行输入。
--height <整数>(可选)
- 默认值180
- 作用:输出 GIF 的度(像素),必须大于 0。
### `--width <整数>`(可选)
- **默认值**`320`
- **作用**:输出 GIF 的度(像素),必须大于 0。
--duration <浮点数>(可选)
- 默认值1.0
- 作用:动画总时长(秒),必须大于 0。
- 限制:`duration × fps` 必须 ≥ 1 且 ≤ 100即至少 1 帧,最多 100 帧)。
### `--height <整数>`(可选)
- **默认值**`180`
- **作用**:输出 GIF 的高度(像素),必须大于 0。
--fps <浮点数>(可选)
- 默认值15.0
- 作用:每秒帧数,控制动画流畅度,必须大于 0。
- 常见值10低配流畅、15默认、24/30电影/视频级)。
### `--duration <浮点数>`(可选)
- **默认值**`1.0`
- **作用**:动画总时长(秒),必须大于 0。
- **限制**`duration × fps` 必须 ≥ 1 且 ≤ 100即至少 1 帧,最多 100 帧)。
使用方式
直接在群聊或私聊中发送 `shadertool` 指令,附上合法的 SkSL 代码即可。
### `--fps <浮点数>`(可选)
- **默认值**`15.0`
- **作用**:每秒帧数,控制动画流畅度,必须大于 0。
- **常见值**
- `10`:低配流畅
- `15`:默认
- `24` / `30`:电影/视频级流畅度
## 使用方式
直接在群聊或私聊中发送 `shadertool` 指令,附上合法的 SkSL 代码即可。

View File

@ -0,0 +1,24 @@
# tqszm
引用一条消息,让此方帮你提取首字母。
例子:
```
John: 11-28 16:50:37
谁来总结一下今天的工作?
Jack: 11-28 16:50:55
[引用John的消息] tqszm
此方Bot: 11-28 16:50:56
slzjyxjtdgz
```
或者,你也可以直接以正常指令的方式调用:
```
提取首字母 中山大学
> zsdx
```

View File

@ -1,41 +1,72 @@
指令介绍
ytpgif - 生成来回镜像翻转的仿 YTPMV 动图
# `ytpgif` 指令说明
格式
ytpgif [倍速]
## 功能简介
`ytpgif` 用于生成来回镜像翻转的仿 YTPMVYouTube Poop Music Video风格动图。
示例
`ytpgif`
使用默认倍速1.0)处理你发送或回复的图片,生成镜像动图。
---
`ytpgif 2.5`
以 2.5 倍速处理图片,生成更快节奏的镜像动图。
## 命令格式
```bash
ytpgif [倍速]
```
回复一张图片并发送 `ytpgif 0.5`
以慢速0.5 倍)生成镜像动图。
---
参数说明
倍速(可选)
- 类型:浮点数
- 默认值1.0
- 有效范围0.1 20.0
- 作用:
• 对于静态图:控制镜像切换的快慢(值越大,切换越快)。
• 对于动图:控制截取原始动图正向和反向片段的时长(值越大,截取的片段越长)。
## 使用示例
使用方式
发送指令前,请确保:
- 直接在消息中附带一张图片,或
- 回复一条包含图片的消息后再发送指令。
- **默认倍速**
```bash
ytpgif
```
使用默认倍速1.0)处理你发送或回复的图片,生成镜像动图。
插件会自动:
- 下载并识别图片(支持静态图和 GIF 动图)
- 自动缩放至最大边长不超过 256 像素(保持宽高比)
- 静态图 → 生成“原图↔镜像”循环动图
- 动图 → 截取开头一段正向播放 + 同一段镜像翻转播放,拼接成新动图
- 保留透明通道(如原图含透明),否则转为 RGB 避免颜色异常
- **指定倍速(较快)**
```bash
ytpgif 2.5
```
以 2.5 倍速处理图片,生成节奏更快的镜像动图
注意事项
- 图片过大、格式损坏或网络问题可能导致处理失败。
- 动图帧数过多或单帧过短可能无法生成有效输出。
- 输出 GIF 最大单段帧数限制为 500 帧,以防资源耗尽。
- **指定倍速(较慢)**
回复一张图片并发送:
```bash
ytpgif 0.5
```
以 0.5 倍速生成慢节奏的镜像动图。
---
## 参数说明
### `倍速`(可选)
- **类型**:浮点数
- **默认值**`1.0`
- **有效范围**`0.1 20.0`
#### 作用:
- **静态图**:控制“原图 ↔ 镜像”切换的速度(值越大,切换越快)。
- **GIF 动图**:控制截取原始动图正向与反向片段的时长(值越大,截取的片段越长)。
---
## 使用方式
在发送指令前,请确保满足以下任一条件:
- 在消息中**直接附带一张图片**,或
- **回复一条包含图片的消息**后再发送指令。
插件将自动执行以下操作:
1. 下载并识别图片(支持静态图和 GIF 动图)。
2. 自动缩放图像,**最大边长不超过 256 像素**(保持宽高比)。
3. 根据图片类型处理:
- **静态图** → 生成“原图 ↔ 镜像”循环动图。
- **GIF 动图** → 截取开头一段正向播放 + 同一段镜像翻转播放,拼接成新动图。
4. **保留透明通道**(若原图含透明),否则转为 RGB 模式以避免颜色异常。
---
## 注意事项
⚠️ 以下情况可能导致处理失败或效果不佳:
- 图片过大、格式损坏或网络问题;
- 动图帧数过多或单帧持续时间过短;
- 输出 GIF 单段帧数超过 **500 帧**(系统将自动限制以防资源耗尽)。

View File

@ -1,8 +1,9 @@
指令介绍
删除提醒 - 删除在`查询提醒(1)`中查到的提醒
## 指令介绍
**删除提醒** - 删除在 [`查询提醒(1)`](查询提醒(1)) 中查到的提醒
指令示例
`删除提醒 1` 在查询提醒后,删除编号为 1 的提醒
## 指令示例
`删除提醒 1`
在查询提醒后,删除编号为 1 的提醒
另见
提醒我(1) 查询提醒(1) ntfy(1)
## 另见
[`提醒我(1)`](提醒我(1)) [`查询提醒(1)`](查询提醒(1)) [`ntfy(1)`](ntfy(1))

View File

@ -1,20 +1,24 @@
指令介绍
卵总展示 - 让卵总举起你的图片
# 指令介绍
格式
<引用图片> 卵总展示 [选项]
卵总展示 [选项] <图片>
**卵总展示** - 让卵总举起你的图片
选项
`--whiteness <number>` 白度
将原图进行指数变换,以调整它的白的程度,默认为 0.0
## 格式
`--black-level <number>` 黑色等级
将原图减淡,数值越大越淡,范围 0.0-1.0,默认 0.2
```
<引用图片> 卵总展示 [选项]
卵总展示 [选项] <图片>
```
`--opacity <number>` 不透明度
将你的图片叠放在图片上的不透明度,默认为 0.8
## 选项
`--saturation <number>` 饱和度
调整原图的饱和度,应该要大于 0.0,默认为 0.85
- `--whiteness <number>` **白度**
将原图进行指数变换,以调整它的白的程度,默认为 `0.0`。
- `--black-level <number>` **黑色等级**
将原图减淡,数值越大越淡,范围 `0.01.0`,默认为 `0.2`。
- `--opacity <number>` **不透明度**
将你的图片叠放在图片上的不透明度,默认为 `0.8`。
- `--saturation <number>` **饱和度**
调整原图的饱和度,应大于 `0.0`,默认为 `0.85`。

View File

@ -1,11 +1,16 @@
指令介绍
发起投票 - 发起一个投票
### 指令介绍
**发起投票** - 发起一个投票
格式
发起投票 <投票标题> <选项1> <选项2> ...
### 格式
```
发起投票 <投票标题> <选项1> <选项2> ...
```
示例
`发起投票 这是一个投票 A B C` 发起标题为“这是一个投票”选项为“A”、“B”、“C”的投票
### 示例
`发起投票 这是一个投票 A B C`
发起标题为“这是一个投票”选项为“A”、“B”、“C”的投票。
说明
投票各个选项之间用空格分隔选项数量为2-15项。投票的默认有效期为24小时
### 说明
- 投票各个选项之间用空格分隔。
- 选项数量必须为 **2 到 15 项**。
- 投票的默认有效期为 **24 小时**。

View File

@ -1,2 +1,3 @@
指令介绍
喵 - 你发喵,此方就会回复喵
# 指令介绍
喵 - 你发喵,此方就会回复喵

View File

@ -1,12 +1,16 @@
指令介绍
投票 - 参与已发起的投票
## 指令介绍
**投票** - 参与已发起的投票
格式
投票 <投票ID/标题> <选项文本>
## 格式
```
投票 <投票ID/标题> <选项文本>
```
示例
`投票 1 A` 在ID为1的投票中投给“A”
`投票 这是一个投票 B` 在标题为“这是一个投票”的投票中,投给“B
## 示例
- `投票 1 A`
在 ID 为 1 的投票中,投给 “A
- `投票 这是一个投票 B`
在标题为 “这是一个投票” 的投票中,投给 “B”
说明
目前不支持单人多投,每个人只能投一项。
## 说明
目前不支持单人多投,每个人只能投一项。

View File

@ -1,15 +1,18 @@
指令介绍
提醒我 - 在指定的时间提醒人事项的工具
## 指令介绍
使用示例
`下午五点提醒我吃饭`
创建一个下午五点的提醒,提醒你吃饭
**提醒我** - 在指定的时间提醒人事项的工具
`两分钟后提醒我睡觉`
创建一个相对于现在推迟 2 分钟的提醒,提醒你睡觉
## 使用示例
`2026年4月25日20点整提醒我生日快乐`
创建一个指定日期和时间的提醒
- `下午五点提醒我吃饭`
创建一个下午五点的提醒,提醒你吃饭
另见
查询提醒(1) 删除提醒(1) ntfy(1)
- `两分钟后提醒我睡觉`
创建一个相对于现在推迟 2 分钟的提醒,提醒你睡觉
- `2026年4月25日20点整提醒我生日快乐`
创建一个指定日期和时间的提醒
## 另见
[`查询提醒(1)`](查询提醒) [`删除提醒(1)`](删除提醒) [`ntfy(1)`](ntfy)

View File

@ -1,7 +1,13 @@
指令介绍
摇数字 - 生成一个随机数字并发送
## 指令介绍
示例
`摇数字` 随机生成一个 1-6 的数字
**摇数字** - 生成一个随机数字并发送
该指令不接受任何参数,直接调用即可。
### 示例
```
摇数字
```
随机生成一个 1-6 的数字。
> 该指令不接受任何参数,直接调用即可。

View File

@ -1,22 +1,33 @@
指令介绍
摇骰子 - 用于生成随机数并以骰子图像形式展示的指令
# 指令介绍
格式
摇骰子 [最小值] [最大值]
**摇骰子** - 用于生成随机数并以骰子图像形式展示的指令
示例
`摇骰子` 随机生成一个 1-6 的数字,并显示对应的骰子图像
`摇骰子 10` 生成 1 到 10 之间的随机整数
`摇骰子 0.5` 生成 0 到 0.5 之间的随机小数
`摇骰子 -5 5` 生成 -5 到 5 之间的随机数
## 格式
说明
该指令支持以下几种调用方式:
- 不带参数:使用默认范围生成随机数
- 仅指定一个参数 f1
- 若 f1 > 1则生成 [1, f1] 范围内的随机数
- 若 0 < f1 ≤ 1则生成 [0, f1] 范围内的随机数
- 若 f1 ≤ 0则生成 [f1, 0] 范围内的随机数
- 指定两个参数 f1 和 f2生成 [f1, f2] 范围内的随机数(顺序无关,内部会自动处理大小)
```
摇骰子 [最小值] [最大值]
```
## 示例
- `摇骰子`
随机生成一个 16 的数字,并显示对应的骰子图像
- `摇骰子 10`
生成 1 到 10 之间的随机整数
- `摇骰子 0.5`
生成 0 到 0.5 之间的随机小数
- `摇骰子 -5 5`
生成 -5 到 5 之间的随机数
## 说明
该指令支持以下几种调用方式:
- **不带参数**使用默认范围16生成随机数
- **仅指定一个参数 `f1`**
- 若 `f1 > 1`,则生成 `[1, f1]` 范围内的随机数
- 若 `0 < f1 ≤ 1`,则生成 `[0, f1]` 范围内的随机数
- 若 `f1 ≤ 0`,则生成 `[f1, 0]` 范围内的随机数
- **指定两个参数 `f1` 和 `f2`**:生成 `[f1, f2]` 范围内的随机数(顺序无关,内部会自动处理大小)
返回结果将以骰子样式的图像形式展示生成的随机数值。

View File

@ -1,12 +1,22 @@
指令介绍
查看投票 - 查看已发起的投票
# 指令介绍
格式
查看投票 <投票ID或标题>
**查看投票** - 查看已发起的投票
示例
`查看投票 1` 查看ID为1的投票
`查看投票 这是一个投票` 查看标题为“这是一个投票”的投票
## 格式
说明
投票在进行时,使用此命令可以看到投票的各个选项;投票结束后,则可以看到各项的票数。
```
查看投票 <投票ID或标题>
```
## 示例
- `查看投票 1`
查看 ID 为 1 的投票
- `查看投票 这是一个投票`
查看标题为“这是一个投票”的投票
## 说明
- 投票进行中时,使用此命令可查看投票的各个选项;
- 投票结束后,可查看各选项的最终票数。

View File

@ -1,9 +1,9 @@
指令介绍
查询提醒 - 查询已经创建的提醒
# 指令介绍
**查询提醒** - 查询已经创建的提醒
指令格式
`查询提醒` 查询提醒
`查询提醒 2` 查询第二页提醒
## 指令格式
- `查询提醒`查询提醒
- `查询提醒 2`查询第二页提醒
另见
提醒我(1) 删除提醒(1) ntfy(1)
## 另见
[提醒我(1)]()[删除提醒(1)]()[ntfy(1)]()

View File

@ -1,8 +1,17 @@
指令介绍
生成二维码 - 将文本内容转换为二维码
## 指令介绍
格式
生成二维码 <文本内容>
**生成二维码** - 将文本内容转换为二维码
示例
`生成二维码 嗨嗨嗨` 生成扫描结果为“嗨嗨嗨”的二维码图片
### 格式
```
生成二维码 <文本内容>
```
### 示例
```
生成二维码 嗨嗨嗨
```
生成扫描结果为“嗨嗨嗨”的二维码图片

View File

@ -0,0 +1,30 @@
# 指令介绍
**订阅** - 收听此方 BOT 的自动消息发送。
---
## 格式
- `订阅 <频道名称>`
- `取消订阅 <频道名称>`
- `查询订阅 [页码]`
- `可用订阅 [页码]`
---
## 示例
- **`订阅 此方谜题`**
在当前的聊天上下文订阅「此方谜题」频道。此后会每天推送此方谜题(由 konaph(8) 管理的)。
- 如果你是私聊,则能够每天发送此方谜题到你的私聊;
- 如果在群聊中使用该指令,则会每天发送题目到这个群里面。
- **`取消订阅 此方谜题`**
取消订阅「此方谜题」频道。
- **`查询订阅`**
查询当前聊天上下文订阅的所有频道。
- **`可用订阅 2`**
查询所有可用的订阅的第二页。

View File

@ -1,13 +1,20 @@
指令介绍
雷达回波 - 用于获取指定地区的天气雷达回波图像
# 指令介绍
格式
雷达回波 <地区>
**雷达回波** - 用于获取指定地区的天气雷达回波图像。
示例
`雷达回波 华南` 获取华南地区的天气雷达回波图
`雷达回波 全国` 获取全国的天气雷达回波图
## 格式
说明
该指令通过查询中国气象局 https://www.nmc.cn/publish/radar/chinaall.html ,获取指定地区的实时天气雷达回波图像。
支持的地区有:全国 华北 东北 华东 华中 华南 西南 西北。
```
雷达回波 <地区>
```
## 示例
- `雷达回波 华南`:获取华南地区的天气雷达回波图
- `雷达回波 全国`:获取全国的天气雷达回波图
## 说明
该指令通过查询中国气象局 [https://www.nmc.cn/publish/radar/chinaall.html](https://www.nmc.cn/publish/radar/chinaall.html),获取指定地区的实时天气雷达回波图像。
支持的地区有:**全国**、**华北**、**东北**、**华东**、**华中**、**华南**、**西南**、**西北**。

View File

@ -1,5 +1,7 @@
指令介绍
黑白 - 将图片经过一个黑白滤镜的处理
## 指令介绍
示例
引用一个带有图片的消息,或者消息本身携带图片,然后发送「黑白」即可
**黑白** - 将图片经过一个黑白滤镜的处理
## 示例
引用一个带有图片的消息,或者消息本身携带图片,然后发送「黑白」即可

View File

@ -1,22 +1,29 @@
from io import BytesIO
from typing import Optional, Union
import cv2
import nonebot
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import Alconna, AlconnaMatcher, Args, UniMessage, on_alconna
from PIL import Image
import numpy as np
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
from konabot.common.web_render import WebRenderer
from konabot.plugins.air_conditioner.ac import AirConditioner, CrashType, generate_ac_image, wiggle_transform
from pathlib import Path
import random
import math
def get_ac(id: str) -> AirConditioner:
ac = AirConditioner.air_conditioners.get(id)
ROOT_PATH = Path(__file__).resolve().parent
# 创建全局数据库管理器实例
db_manager = DatabaseManager()
async def get_ac(id: str) -> AirConditioner:
ac = await AirConditioner.get_ac(id)
if ac is None:
ac = AirConditioner(id)
return ac
@ -43,14 +50,32 @@ async def send_ac_image(event: type[AlconnaMatcher], ac: AirConditioner):
ac_image = await generate_ac_image(ac)
await event.send(await UniMessage().image(raw=ac_image).export())
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
"""注册启动时需要执行的函数"""
# 初始化数据库表
await db_manager.execute_by_sql_file(
Path(__file__).resolve().parent / "sql" / "create_table.sql"
)
@driver.on_shutdown
async def register_shutdown_hook():
"""注册关闭时需要执行的函数"""
# 关闭所有数据库连接
await db_manager.close_all_connections()
evt = on_alconna(Alconna(
"群空调"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
async def _(target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac = await get_ac(id)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
@ -58,10 +83,10 @@ evt = on_alconna(Alconna(
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
async def _(target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.on = True
ac = await get_ac(id)
await ac.update_ac(state=True)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
@ -69,10 +94,10 @@ evt = on_alconna(Alconna(
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
async def _(target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.on = False
ac = await get_ac(id)
await ac.update_ac(state=False)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
@ -81,31 +106,29 @@ evt = on_alconna(Alconna(
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
async def _(target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp is None:
temp = 1
if temp <= 0:
return
id = target.channel_id
ac = get_ac(id)
ac = await get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
ac.temperature += temp
if ac.temperature > 40:
# 根据温度随机出是否爆炸40度开始呈指数增长
possibility = -math.e ** ((40-ac.temperature) / 50) + 1
if random.random() < possibility:
# 打开爆炸图片
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
output = BytesIO()
# 爆炸抖动
frames = wiggle_transform(np.array(Image.open(f)), intensity=5)
pil_frames = [Image.fromarray(frame) for frame in frames]
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=35, disposal=2)
output.seek(0)
await evt.send(await UniMessage().image(raw=output).export())
ac.broke_ac(CrashType.BURNT)
await evt.send("太热啦,空调炸了!")
return
await ac.update_ac(temperature_delta=temp)
if ac.burnt:
# 打开爆炸图片
with open(ASSETS_PATH / "img" / "other" / "boom.jpg", "rb") as f:
output = BytesIO()
# 爆炸抖动
frames = wiggle_transform(np.array(Image.open(f)), intensity=5)
pil_frames = [Image.fromarray(frame) for frame in frames]
pil_frames[0].save(output, format="GIF", save_all=True, append_images=pil_frames[1:], loop=0, duration=35, disposal=2)
output.seek(0)
await evt.send(await UniMessage().image(raw=output).export())
await evt.send("太热啦,空调炸了!")
return
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
@ -114,20 +137,17 @@ evt = on_alconna(Alconna(
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
async def _(target: DepLongTaskTarget, temp: Optional[Union[int, float]] = 1):
if temp is None:
temp = 1
if temp <= 0:
return
id = target.channel_id
ac = get_ac(id)
ac = await get_ac(id)
if not ac.on or ac.burnt == True or ac.frozen == True:
await send_ac_image(evt, ac)
return
ac.temperature -= temp
if ac.temperature < 0:
# 根据温度随机出是否冻结0度开始呈指数增长
possibility = -math.e ** (ac.temperature / 50) + 1
if random.random() < possibility:
ac.broke_ac(CrashType.FROZEN)
await ac.update_ac(temperature_delta=-temp)
await send_ac_image(evt, ac)
evt = on_alconna(Alconna(
@ -135,21 +155,34 @@ evt = on_alconna(Alconna(
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
async def _(target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
ac.change_ac()
ac = await get_ac(id)
await ac.change_ac()
await send_ac_image(evt, ac)
async def query_number_ranking(id: str) -> tuple[int, int]:
result = await db_manager.query_by_sql_file(
ROOT_PATH / "sql" / "query_crash_and_rank.sql",
(id,id)
)
if len(result) == 0:
return 0, 0
else:
# 将字典转换为值的元组
values = list(result[0].values())
return values[0], values[1]
evt = on_alconna(Alconna(
"空调炸炸排行榜",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent, target: DepLongTaskTarget):
async def _(target: DepLongTaskTarget):
id = target.channel_id
ac = get_ac(id)
number, ranking = ac.get_crashes_and_ranking()
# ac = get_ac(id)
# number, ranking = ac.get_crashes_and_ranking()
number, ranking = await query_number_ranking(id)
params = {
"number": number,
"ranking": ranking
@ -159,4 +192,37 @@ async def _(event: BaseEvent, target: DepLongTaskTarget):
target=".box",
params=params
)
await evt.send(await UniMessage().image(raw=image).export())
await evt.send(await UniMessage().image(raw=image).export())
evt = on_alconna(Alconna(
"空调最高峰",
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(target: DepLongTaskTarget):
result = await db_manager.query_by_sql_file(
ROOT_PATH / "sql" / "query_peak.sql"
)
if len(result) == 0:
await evt.send("没有空调记录!")
return
max_temp = result[0].get("max")
min_temp = result[0].get("min")
his_max = result[0].get("his_max")
his_min = result[0].get("his_min")
# 再从内存里的空调池中获取最高温度和最低温度
for ac in AirConditioner.InstancesPool.values():
if ac.on and not ac.burnt and not ac.frozen:
if max_temp is None or min_temp is None:
max_temp = ac.temperature
min_temp = ac.temperature
max_temp = max(max_temp, ac.temperature)
min_temp = min(min_temp, ac.temperature)
if max_temp is None or min_temp is None:
await evt.send(f"目前全部空调都被炸掉了!")
else:
await evt.send(f"全球在线空调最高温度为 {'%.1f' % max_temp}°C最低温度为 {'%.1f' % min_temp}°C")
if his_max is None or his_min is None:
pass
else:
await evt.send(f"历史最高温度为 {'%.1f' % his_max}°C最低温度为 {'%.1f' % his_min}°C\n(要进入历史记录,温度需至少保持 5 分钟)")

View File

@ -1,20 +1,193 @@
import asyncio
from enum import Enum
from io import BytesIO
import math
from pathlib import Path
import random
import signal
import time
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from nonebot import logger
from konabot.common.database import DatabaseManager
from konabot.common.path import ASSETS_PATH, FONTS_PATH
from konabot.common.path import DATA_PATH
import nonebot
import json
ROOT_PATH = Path(__file__).resolve().parent
# 创建全局数据库管理器实例
db_manager = DatabaseManager()
class CrashType(Enum):
BURNT = 0
FROZEN = 1
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
await ac_manager.start_auto_save()
@driver.on_shutdown
async def register_shutdown_hook():
"""注册关闭时需要执行的函数"""
# 停止自动保存任务
if ac_manager:
await ac_manager.stop_auto_save()
class AirConditionerManager:
def __init__(self, save_interval: int = 300): # 默认5分钟保存一次
self.save_interval = save_interval
self._save_task = None
self._running = False
async def start_auto_save(self):
"""启动自动保存任务"""
self._running = True
self._save_task = asyncio.create_task(self._auto_save_loop())
logger.info(f"自动保存任务已启动,间隔: {self.save_interval}")
async def stop_auto_save(self):
"""停止自动保存任务"""
if self._save_task:
self._running = False
self._save_task.cancel()
try:
await self._save_task
except asyncio.CancelledError:
pass
logger.info("自动保存任务已停止")
else:
logger.warning("没有正在运行的自动保存任务")
async def _auto_save_loop(self):
"""自动保存循环"""
while self._running:
try:
await asyncio.sleep(self.save_interval)
await self.save_all_instances()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"定时保存失败: {e}")
async def save_all_instances(self):
save_time = time.time()
to_remove = []
"""保存所有实例到数据库"""
for ac_id, ac_instance in AirConditioner.InstancesPool.items():
try:
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "update_ac.sql",
[(ac_instance.on, ac_instance.temperature,
ac_instance.burnt, ac_instance.frozen, ac_id),(ac_id,)]
)
if(save_time - ac_instance.instance_get_time >= 300): # 5 分钟
to_remove.append(ac_id)
except Exception as e:
logger.error(f"保存空调 {ac_id} 失败: {e}")
logger.info(f"定时保存完成,共保存 {len(AirConditioner.InstancesPool)} 个空调实例")
# 删除时间过长实例
for ac_id in to_remove:
del AirConditioner.InstancesPool[ac_id]
logger.info(f"清理长期不活跃的空调实例完成,目前池内共有 {len(AirConditioner.InstancesPool)} 个实例")
ac_manager = AirConditionerManager(save_interval=300) # 5分钟
class AirConditioner:
air_conditioners: dict[str, "AirConditioner"] = {}
InstancesPool: dict[str, 'AirConditioner'] = {}
@classmethod
async def refresh_ac(cls, id: str):
cls.InstancesPool[id].instance_get_time = time.time()
@classmethod
async def storage_ac(cls, id: str, ac: 'AirConditioner'):
cls.InstancesPool[id] = ac
@classmethod
async def get_ac(cls, id: str) -> 'AirConditioner':
if(id in cls.InstancesPool):
await cls.refresh_ac(id)
return cls.InstancesPool[id]
# 如果没有,那么从数据库重新实例化一个 AC 出来
result = await db_manager.query_by_sql_file(ROOT_PATH / "sql" / "query_ac.sql", (id,))
if len(result) == 0:
ac = await cls.create_ac(id)
return ac
ac_data = result[0]
ac = AirConditioner(id)
ac.on = bool(ac_data["on"])
ac.temperature = float(ac_data["temperature"])
ac.burnt = bool(ac_data["burnt"])
ac.frozen = bool(ac_data["frozen"])
await cls.storage_ac(id, ac)
return ac
@classmethod
async def create_ac(cls, id: str) -> 'AirConditioner':
ac = AirConditioner(id)
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_ac.sql",
[(id, ac.on, ac.temperature, ac.burnt, ac.frozen),(id,)]
)
await cls.storage_ac(id, ac)
return ac
async def change_ac_temp(self, temperature_delta: float) -> None:
'''
改变空调的温度
:param temperature_delta: float 温度变化量
'''
changed_temp = self.temperature + temperature_delta
random_poss = random.random()
if temperature_delta < 0 and changed_temp < 0:
# 根据温度随机出是否冻结0度开始呈指数增长
possibility = -math.e ** (changed_temp / 50) + 1
if random_poss < possibility:
await self.broke_ac(CrashType.FROZEN)
elif temperature_delta > 0 and changed_temp > 40:
# 根据温度随机出是否烧坏40度开始呈指数增长
possibility = -math.e ** ((40-changed_temp) / 50) + 1
if random_poss < possibility:
await self.broke_ac(CrashType.BURNT)
self.temperature = changed_temp
async def update_ac(self, state: bool = None, temperature_delta: float = None, burnt: bool = None, frozen: bool = None) -> 'AirConditioner':
if state is not None:
self.on = state
if temperature_delta is not None:
await self.change_ac_temp(temperature_delta)
if burnt is not None:
self.burnt = burnt
if frozen is not None:
self.frozen = frozen
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "update_ac.sql",
# (self.on, self.temperature, self.burnt, self.frozen, self.id)
# )
return self
async def change_ac(self) -> 'AirConditioner':
self.on = False
self.temperature = 24
self.burnt = False
self.frozen = False
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "update_ac.sql",
# (self.on, self.temperature, self.burnt, self.frozen, self.id)
# )
return self
def __init__(self, id: str) -> None:
self.id = id
@ -22,45 +195,42 @@ class AirConditioner:
self.temperature = 24 # 默认温度
self.burnt = False
self.frozen = False
AirConditioner.air_conditioners[id] = self
def change_ac(self):
self.burnt = False
self.frozen = False
self.on = False
self.temperature = 24 # 重置为默认温度
self.instance_get_time = time.time()
def broke_ac(self, crash_type: CrashType):
async def broke_ac(self, crash_type: CrashType):
'''
让空调坏掉,并保存数据
让空调坏掉
:param crash_type: CrashType 枚举,表示空调坏掉的类型
'''
match crash_type:
case CrashType.BURNT:
self.burnt = True
await self.update_ac(burnt=True)
case CrashType.FROZEN:
self.frozen = True
self.save_crash_data(crash_type)
await self.update_ac(frozen=True)
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_crash.sql",
(self.id, crash_type.value)
)
def save_crash_data(self, crash_type: CrashType):
'''
如果空调爆炸了,就往本地的 ac_crash_data.json 里该 id 的记录加一
'''
data_file = DATA_PATH / "ac_crash_data.json"
crash_data = {}
if data_file.exists():
with open(data_file, "r", encoding="utf-8") as f:
crash_data = json.load(f)
if self.id not in crash_data:
crash_data[self.id] = {"burnt": 0, "frozen": 0}
match crash_type:
case CrashType.BURNT:
crash_data[self.id]["burnt"] += 1
case CrashType.FROZEN:
crash_data[self.id]["frozen"] += 1
with open(data_file, "w", encoding="utf-8") as f:
json.dump(crash_data, f, ensure_ascii=False, indent=4)
# def save_crash_data(self, crash_type: CrashType):
# '''
# 如果空调爆炸了,就往本地的 ac_crash_data.json 里该 id 的记录加一
# '''
# data_file = DATA_PATH / "ac_crash_data.json"
# crash_data = {}
# if data_file.exists():
# with open(data_file, "r", encoding="utf-8") as f:
# crash_data = json.load(f)
# if self.id not in crash_data:
# crash_data[self.id] = {"burnt": 0, "frozen": 0}
# match crash_type:
# case CrashType.BURNT:
# crash_data[self.id]["burnt"] += 1
# case CrashType.FROZEN:
# crash_data[self.id]["frozen"] += 1
# with open(data_file, "w", encoding="utf-8") as f:
# json.dump(crash_data, f, ensure_ascii=False, indent=4)
def get_crashes_and_ranking(self) -> tuple[int, int]:
'''

View File

@ -0,0 +1,26 @@
-- 创建所有表
CREATE TABLE IF NOT EXISTS air_conditioner (
id VARCHAR(128) PRIMARY KEY,
"on" BOOLEAN NOT NULL,
temperature REAL NOT NULL,
burnt BOOLEAN NOT NULL,
frozen BOOLEAN NOT NULL
);
CREATE TABLE IF NOT EXISTS air_conditioner_log (
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
log_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
id VARCHAR(128),
"on" BOOLEAN NOT NULL,
temperature REAL NOT NULL,
burnt BOOLEAN NOT NULL,
frozen BOOLEAN NOT NULL,
FOREIGN KEY (id) REFERENCES air_conditioner(id)
);
CREATE TABLE IF NOT EXISTS air_conditioner_crash_log (
id VARCHAR(128) NOT NULL,
crash_type INT NOT NULL,
timestamp DATETIME NOT NULL,
FOREIGN KEY (id) REFERENCES air_conditioner(id)
);

View File

@ -0,0 +1,8 @@
-- 插入一台新空调
INSERT INTO air_conditioner (id, "on", temperature, burnt, frozen)
VALUES (?, ?, ?, ?, ?);
-- 使用返回的数据插入日志
INSERT INTO air_conditioner_log (id, "on", temperature, burnt, frozen)
SELECT id, "on", temperature, burnt, frozen
FROM air_conditioner
WHERE id = ?;

View File

@ -0,0 +1,3 @@
-- 插入一条空调爆炸记录
INSERT INTO air_conditioner_crash_log (id, crash_type, timestamp)
VALUES (?, ?, CURRENT_TIMESTAMP);

View File

@ -0,0 +1,4 @@
-- 查询空调状态
SELECT *
FROM air_conditioner
WHERE id = ?;

View File

@ -0,0 +1,23 @@
-- 从 air_conditioner_crash_log 表中获取指定 id 损坏的次数以及损坏次数的排名
SELECT crash_count, crash_rank
FROM (
SELECT id,
COUNT(*) AS crash_count,
RANK() OVER (ORDER BY COUNT(*) DESC) AS crash_rank
FROM air_conditioner_crash_log
GROUP BY id
) AS ranked_data
WHERE id = ?
-- 如果该 id 没有损坏记录,则返回 0 次损坏和对应的最后一名
UNION
SELECT 0 AS crash_count,
(SELECT COUNT(DISTINCT id) + 1 FROM air_conditioner_crash_log) AS crash_rank
FROM (
SELECT DISTINCT id
FROM air_conditioner_crash_log
) AS ranked_data
WHERE NOT EXISTS (
SELECT 1
FROM air_conditioner_crash_log
WHERE id = ?
);

View File

@ -0,0 +1,13 @@
-- 查询目前所有空调中的最高温度与最低温度与历史最高低温
SELECT
(SELECT MAX(temperature) FROM air_conditioner
WHERE "on" = TRUE AND NOT frozen AND NOT burnt) AS max,
(SELECT MIN(temperature) FROM air_conditioner
WHERE "on" = TRUE AND NOT frozen AND NOT burnt) AS min,
(SELECT MAX(temperature) FROM air_conditioner_log
WHERE "on" = TRUE AND NOT frozen AND NOT burnt) AS his_max,
(SELECT MIN(temperature) FROM air_conditioner_log
WHERE "on" = TRUE AND NOT frozen AND NOT burnt) AS his_min;

View File

@ -0,0 +1,10 @@
-- 更新空调状态
UPDATE air_conditioner
SET "on" = ?, temperature = ?, burnt = ?, frozen = ?
WHERE id = ?;
-- 插入日志记录(从更新后的数据获取)
INSERT INTO air_conditioner_log (id, "on", temperature, burnt, frozen)
SELECT id, "on", temperature, burnt, frozen
FROM air_conditioner
WHERE id = ?;

View File

@ -6,34 +6,25 @@ from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
matcher_fix = on_message()
pattern = (
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&amp;#93;|&#93;|\])哔哩哔哩).{0,500}"
)
@matcher_fix.handle()
async def _(msg: UniMsg, event: Event):
def _rule(msg: UniMsg):
to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
return
return False
return True
matcher_fix = on_message(rule=_rule)
@matcher_fix.handle()
async def _(event: Event):
from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event)
# b_url: str
# b_page: str | None
# b_time: str | None
#
# from nonebot_plugin_analysis_bilibili.analysis_bilibili import extract as bilibili_extract
#
# b_url, b_page, b_time = bilibili_extract(to_search)
# if b_url is None:
# return
#
# await matcher_fix.send(await UniMessage().text(b_url).export())

View File

@ -0,0 +1,58 @@
from io import BytesIO
from inspect import signature
from konabot.common.nb.extract_image import DepPILImage
from nonebot.adapters import Event as BaseEvent
from nonebot import on_message
from nonebot_plugin_alconna import (
UniMessage,
UniMsg
)
from konabot.plugins.fx_process.fx_manager import ImageFilterManager
def is_fx_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "fx" not in txt[:3]:
return False
return True
fx_on = on_message(rule=is_fx_mentioned)
@fx_on.handle()
async def _(msg: UniMsg, event: BaseEvent, img: DepPILImage):
args = msg.extract_plain_text().split()
if len(args) < 2:
return
filter_name = args[1]
filter_func = ImageFilterManager.get_filter(filter_name)
if not filter_func:
return
# 获取函数最大参数数量
sig = signature(filter_func)
max_params = len(sig.parameters) - 1 # 减去第一个参数 image
# 从 args 提取参数,并转换为适当类型
func_args = []
for i in range(2, min(len(args), max_params + 2)):
# 尝试将参数转换为函数签名中对应的类型
param = list(sig.parameters.values())[i - 1]
param_type = param.annotation
arg_value = args[i]
try:
if param_type is float:
converted_value = float(arg_value)
elif param_type is int:
converted_value = int(arg_value)
else:
converted_value = arg_value
except Exception:
converted_value = arg_value
func_args.append(converted_value)
# 应用滤镜
out_img = filter_func(img, *func_args)
output = BytesIO()
out_img.save(output, format="PNG")
await fx_on.send(await UniMessage().image(raw=output).export())

View File

@ -0,0 +1,50 @@
from typing import Optional
from PIL import ImageColor
class ColorHandle:
color_name_map = {
"": (255, 0, 0),
"绿": (0, 255, 0),
"": (0, 0, 255),
"": (255, 255, 0),
"": (128, 0, 128),
"": (0, 0, 0),
"": (255, 255, 255),
"": (255, 165, 0),
"": (255, 192, 203),
"": (128, 128, 128),
"": (0, 255, 255),
"": (75, 0, 130),
"": (165, 42, 42),
"": (200, 200, 200),
"": (50, 50, 50),
"": (255, 255, 224),
"": (47, 79, 79),
}
@staticmethod
def set_or_blend_color(ori_color: Optional[tuple], target_color: tuple) -> tuple:
# 如果没有指定初始颜色,返回目标颜色
if ori_color is None:
return target_color
# 混合颜色,取平均值
blended_color = tuple((o + t) // 2 for o, t in zip(ori_color, target_color))
return blended_color
@staticmethod
def parse_color(color_str: str) -> tuple:
# 如果是纯括号,则加上前缀 rgb
if color_str.startswith('(') and color_str.endswith(')'):
color_str = 'rgb' + color_str
try:
return ImageColor.getrgb(color_str)
except ValueError:
pass
base_color = None
color_str = color_str.replace('', '')
for name, rgb in ColorHandle.color_name_map.items():
if name in color_str:
base_color = ColorHandle.set_or_blend_color(base_color, rgb)
if base_color is not None:
return base_color
return (255, 255, 255) # 默认白色

View File

@ -0,0 +1,167 @@
from PIL import Image, ImageFilter
from PIL import ImageEnhance
from konabot.plugins.fx_process.color_handle import ColorHandle
import math
class ImageFilterImplement:
@staticmethod
def apply_blur(image: Image.Image, radius: float = 10) -> Image.Image:
return image.filter(ImageFilter.GaussianBlur(radius))
# 马赛克
@staticmethod
def apply_mosaic(image: Image.Image, pixel_size: int = 10) -> Image.Image:
if pixel_size <= 0:
pixel_size = 1
# 缩小图像
small_image = image.resize(
(image.width // pixel_size, image.height // pixel_size),
Image.Resampling.NEAREST
)
# 放大图像
return small_image.resize(image.size, Image.Resampling.NEAREST)
@staticmethod
def apply_contour(image: Image.Image) -> Image.Image:
return image.filter(ImageFilter.CONTOUR)
@staticmethod
def apply_sharpen(image: Image.Image) -> Image.Image:
return image.filter(ImageFilter.SHARPEN)
@staticmethod
def apply_edge_enhance(image: Image.Image) -> Image.Image:
return image.filter(ImageFilter.EDGE_ENHANCE)
@staticmethod
def apply_emboss(image: Image.Image) -> Image.Image:
return image.filter(ImageFilter.EMBOSS)
@staticmethod
def apply_find_edges(image: Image.Image) -> Image.Image:
return image.filter(ImageFilter.FIND_EDGES)
@staticmethod
def apply_smooth(image: Image.Image) -> Image.Image:
return image.filter(ImageFilter.SMOOTH)
# 反色
@staticmethod
def apply_invert(image: Image.Image) -> Image.Image:
# 确保图像是RGBA模式保留透明度通道
if image.mode != 'RGBA':
image = image.convert('RGBA')
r, g, b, a = image.split()
r = r.point(lambda i: 255 - i)
g = g.point(lambda i: 255 - i)
b = b.point(lambda i: 255 - i)
return Image.merge('RGBA', (r, g, b, a))
# 黑白灰度
@staticmethod
def apply_black_white(image: Image.Image) -> Image.Image:
# 保留透明度通道
if image.mode != 'RGBA':
image = image.convert('RGBA')
r, g, b, a = image.split()
gray = Image.merge('RGB', (r, g, b)).convert('L')
return Image.merge('RGBA', (gray, gray, gray, a))
# 阈值
@staticmethod
def apply_threshold(image: Image.Image, threshold: int = 128) -> Image.Image:
# 保留透明度通道
if image.mode != 'RGBA':
image = image.convert('RGBA')
r, g, b, a = image.split()
gray = Image.merge('RGB', (r, g, b)).convert('L')
bw = gray.point(lambda x: 255 if x >= threshold else 0, '1')
return Image.merge('RGBA', (bw.convert('L'), bw.convert('L'), bw.convert('L'), a))
# 对比度
@staticmethod
def apply_contrast(image: Image.Image, factor: float = 1.5) -> Image.Image:
enhancer = ImageEnhance.Contrast(image)
return enhancer.enhance(factor)
# 亮度
@staticmethod
def apply_brightness(image: Image.Image, factor: float = 1.5) -> Image.Image:
enhancer = ImageEnhance.Brightness(image)
return enhancer.enhance(factor)
# 色彩
@staticmethod
def apply_color(image: Image.Image, factor: float = 1.5) -> Image.Image:
enhancer = ImageEnhance.Color(image)
return enhancer.enhance(factor)
# 三色调
@staticmethod
def apply_to_color(image: Image.Image, color: str = 'rgb(255,0,0)') -> Image.Image:
if image.mode != 'RGBA':
image = image.convert('RGBA')
# 转为灰度图
gray = image.convert('L')
# 获取目标颜色的RGB值
rgb_color = ColorHandle.parse_color(color)
# 高光默认为白色,阴影默认为黑色
highlight = (255, 255, 255)
shadow = (0, 0, 0)
# 创建新的图像
new_image = Image.new('RGBA', image.size)
width, height = image.size
for x in range(width):
for y in range(height):
lum = gray.getpixel((x, y))
# 计算新颜色
new_r = int((rgb_color[0] * lum + shadow[0] * (highlight[0] - lum)) / 255)
new_g = int((rgb_color[1] * lum + shadow[1] * (highlight[1] - lum)) / 255)
new_b = int((rgb_color[2] * lum + shadow[2] * (highlight[2] - lum)) / 255)
a = image.getpixel((x, y))[3] # 保留原图的透明度
new_image.putpixel((x, y), (new_r, new_g, new_b, a))
return new_image
# 缩放
@staticmethod
def apply_resize(image: Image.Image, scale: float = 1.5) -> Image.Image:
if scale <= 0:
scale = 1.0
new_size = (int(image.width * scale), int(image.height * scale))
return image.resize(new_size, Image.Resampling.LANCZOS)
# 波纹
@staticmethod
def apply_wave(image: Image.Image, amplitude: float = 5, wavelength: float = 20) -> Image.Image:
width, height = image.size
new_image = Image.new('RGBA', (width, height))
for x in range(width):
for y in range(height):
offset_x = int(amplitude * math.sin(2 * math.pi * y / wavelength))
offset_y = int(amplitude * math.cos(2 * math.pi * x / wavelength))
new_x = x + offset_x
new_y = y + offset_y
if 0 <= new_x < width and 0 <= new_y < height:
new_image.putpixel((x, y), image.getpixel((new_x, new_y)))
else:
new_image.putpixel((x, y), (0, 0, 0, 0)) # 透明像素
return new_image
def apply_color_key(image: Image.Image, target_color: str = 'rgb(255,0,0)', tolerance: int = 60) -> Image.Image:
if image.mode != 'RGBA':
image = image.convert('RGBA')
target_rgb = ColorHandle.parse_color(target_color)
width, height = image.size
new_image = Image.new('RGBA', (width, height))
for x in range(width):
for y in range(height):
r, g, b, a = image.getpixel((x, y))
# 计算与目标颜色的距离
distance = math.sqrt((r - target_rgb[0]) ** 2 + (g - target_rgb[1]) ** 2 + (b - target_rgb[2]) ** 2)
if distance <= tolerance:
new_image.putpixel((x, y), (r, g, b, 0)) # 设置为透明
else:
new_image.putpixel((x, y), (r, g, b, a)) # 保留原像素
return new_image

View File

@ -0,0 +1,28 @@
from typing import Optional
from konabot.plugins.fx_process.fx_handle import ImageFilterImplement
class ImageFilterManager:
filter_map = {
"模糊": ImageFilterImplement.apply_blur,
"马赛克": ImageFilterImplement.apply_mosaic,
"轮廓": ImageFilterImplement.apply_contour,
"锐化": ImageFilterImplement.apply_sharpen,
"边缘增强": ImageFilterImplement.apply_edge_enhance,
"浮雕": ImageFilterImplement.apply_emboss,
"查找边缘": ImageFilterImplement.apply_find_edges,
"平滑": ImageFilterImplement.apply_smooth,
"反色": ImageFilterImplement.apply_invert,
"黑白": ImageFilterImplement.apply_black_white,
"阈值": ImageFilterImplement.apply_threshold,
"对比度": ImageFilterImplement.apply_contrast,
"亮度": ImageFilterImplement.apply_brightness,
"色彩": ImageFilterImplement.apply_color,
"色调": ImageFilterImplement.apply_to_color,
"缩放": ImageFilterImplement.apply_resize,
"波纹": ImageFilterImplement.apply_wave,
"色键": ImageFilterImplement.apply_color_key,
}
@classmethod
def get_filter(cls, name: str) -> Optional[callable]:
return cls.filter_map.get(name)

View File

@ -8,6 +8,7 @@ from typing import Optional
from loguru import logger
from nonebot import on_message
import nonebot
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (
@ -18,13 +19,22 @@ from nonebot_plugin_alconna import (
on_alconna,
)
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
from konabot.common.llm import get_llm
ROOT_PATH = Path(__file__).resolve().parent
DATA_DIR = Path(__file__).parent.parent.parent.parent / "data"
DATA_FILE_PATH = (
Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
DATA_DIR / "idiom_banned.json"
)
# 创建全局数据库管理器实例
db_manager = DatabaseManager()
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
@ -55,6 +65,21 @@ def remove_banned_id(group_id: str):
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4), "utf-8")
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
"""注册启动时需要执行的函数"""
await IdiomGame.init_lexicon()
@driver.on_shutdown
async def register_shutdown_hook():
"""注册关闭时需要执行的函数"""
# 关闭所有数据库连接
await db_manager.close_all_connections()
class TryStartState(Enum):
STARTED = 0
ALREADY_PLAYING = 1
@ -75,6 +100,32 @@ class TryVerifyState(Enum):
BUT_NO_NEXT = 5
GAME_END = 6
class IdiomGameLLM:
@classmethod
async def verify_idiom_with_llm(cls, idiom: str) -> bool:
if len(idiom) != 4:
return False
llm = get_llm()
system_prompt = "请判断用户的输入是否为一个合理的成语或者这四个字在中文环境下是否说得通。如果是请回答「T」否则回答「F」。请注意即使这个词不是成语如果说得通也就是能念起来很通顺你也该输出「T」。请不要包含任何解释也不要包含任何标点符号。"
message = await llm.chat([{"role": "system", "content": system_prompt}, {"role": "user", "content": idiom}])
answer = message.content
logger.info(f"LLM 对成语 {idiom} 的判断结果是 {answer}")
if answer == "T":
await cls.storage_idiom(idiom)
return answer == "T"
@classmethod
async def storage_idiom(cls, idiom: str):
# 将 idiom 存入数据库
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "insert_custom_word.sql",
# (idiom,)
# )
# 将 idiom 存入本地文件以备后续分析
with open(DATA_DIR / "idiom_llm_storage.txt", "a", encoding="utf-8") as f:
f.write(idiom + "\n")
IdiomGame.append_into_word_list(idiom)
class IdiomGame:
ALL_WORDS = [] # 所有四字词语
@ -101,6 +152,21 @@ class IdiomGame:
self.idiom_history: list[list[str]] = [] # 成语使用历史记录,多个数组以存储不同成语链
IdiomGame.INSTANCE_LIST[group_id] = self
@classmethod
async def append_into_word_list(cls, word: str):
'''
将一个新词加入到词语列表中
'''
if word not in cls.ALL_WORDS:
cls.ALL_WORDS.append(word)
if word[0] not in cls.IDIOM_FIRST_CHAR:
cls.IDIOM_FIRST_CHAR[word[0]] = []
cls.IDIOM_FIRST_CHAR[word[0]].append(word)
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "insert_custom_word.sql",
# (word,)
# )
def be_able_to_play(self) -> bool:
if self.last_play_date != datetime.date.today():
self.last_play_date = datetime.date.today()
@ -110,21 +176,29 @@ class IdiomGame:
return True
return False
def choose_start_idiom(self) -> str:
@staticmethod
async def random_idiom() -> str:
# result = await db_manager.query_by_sql_file(
# ROOT_PATH / "sql" / "random_choose_idiom.sql"
# )
# return result[0]["idiom"]
return secrets.choice(IdiomGame.ALL_IDIOMS)
async def choose_start_idiom(self) -> str:
"""
随机选择一个成语作为起始成语
"""
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_idiom = await IdiomGame.random_idiom()
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self.choose_start_idiom()
if not await self.is_nextable(self.last_char):
await self.choose_start_idiom()
else:
self.add_history_idiom(self.last_idiom, new_chain=True)
return self.last_idiom
@classmethod
def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState:
cls.init_lexicon()
async def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState:
await cls.init_lexicon()
if not cls.INSTANCE_LIST.get(group_id):
cls(group_id)
instance = cls.INSTANCE_LIST[group_id]
@ -135,10 +209,10 @@ class IdiomGame:
instance.now_playing = True
return TryStartState.STARTED
def start_game(self, rounds: int = 100):
async def start_game(self, rounds: int = 100):
self.now_playing = True
self.remain_rounds = rounds
self.choose_start_idiom()
await self.choose_start_idiom()
@classmethod
def try_stop_game(cls, group_id: str) -> TryStopState:
@ -168,33 +242,38 @@ class IdiomGame:
跳过当前成语,选择下一个成语
"""
async with self.lock:
self._skip_idiom_async()
await self._skip_idiom_async()
self.add_buff_score(buff_score)
return self.last_idiom
def _skip_idiom_async(self) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
async def _skip_idiom_async(self) -> str:
self.last_idiom = await IdiomGame.random_idiom()
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self._skip_idiom_async()
if not await self.is_nextable(self.last_char):
await self._skip_idiom_async()
else:
self.add_history_idiom(self.last_idiom, new_chain=True)
return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
async def try_verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
"""
用户发送成语
"""
async with self.lock:
state = self._verify_idiom(idiom, user_id)
state = await self._verify_idiom(idiom, user_id)
return state
def is_nextable(self, last_char: str) -> bool:
async def is_nextable(self, last_char: str) -> bool:
"""
判断是否有成语可以接
"""
# result = await db_manager.query_by_sql_file(
# ROOT_PATH / "sql" / "is_nextable.sql",
# (last_char,)
# )
# return result[0]["DEED"] == 1
return last_char in IdiomGame.AVALIABLE_IDIOM_FIRST_CHAR
def add_already_idiom(self, idiom: str):
if idiom in self.already_idioms:
self.already_idioms[idiom] += 1
@ -218,16 +297,31 @@ class IdiomGame:
result.append(" -> ".join(chain))
return result
def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
async def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
state = []
# 新成语的首字应与上一条成语的尾字相同
if idiom[0] != self.last_char:
state.append(TryVerifyState.WRONG_FIRST_CHAR)
return state
# 成语是否存在
# result = await db_manager.query_by_sql_file(
# ROOT_PATH / "sql" / "query_idiom.sql",
# (idiom, idiom, idiom)
# )
# status_result = result[0]["status"]
# if status_result == -1:
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
logger.info(f"用户 {user_id} 发送了未知词语 {idiom},正在使用 LLM 进行验证")
try:
if not await IdiomGameLLM.verify_idiom_with_llm(idiom):
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
except Exception as e:
logger.error(f"LLM 验证成语 {idiom} 时出现错误:{e}")
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
# 成语合法,更新状态
self.add_history_idiom(idiom)
score_k = 0.5 ** self.get_already_used_num(idiom) # 每被使用过一次,得分减半
@ -238,6 +332,7 @@ class IdiomGame:
self.last_idiom = idiom
self.last_char = idiom[-1]
self.add_score(user_id, 1 * score_k) # 先加 1 分
# if status_result == 1:
if idiom in IdiomGame.ALL_IDIOMS:
state.append(TryVerifyState.VERIFIED_AND_REAL)
self.add_score(user_id, 4 * score_k) # 再加 4 分
@ -245,9 +340,9 @@ class IdiomGame:
if self.remain_rounds <= 0:
self.now_playing = False
state.append(TryVerifyState.GAME_END)
if not self.is_nextable(self.last_char):
if not await self.is_nextable(self.last_char):
# 没有成语可以接了,自动跳过
self._skip_idiom_async()
await self._skip_idiom_async()
self.add_buff_score(-100)
state.append(TryVerifyState.BUT_NO_NEXT)
return state
@ -274,16 +369,27 @@ class IdiomGame:
return self.last_char
@classmethod
def random_idiom_starting_with(cls, first_char: str) -> Optional[str]:
cls.init_lexicon()
async def random_idiom_starting_with(cls, first_char: str) -> Optional[str]:
# await cls.init_lexicon()
# result = await db_manager.query_by_sql_file(
# ROOT_PATH / "sql" / "query_idiom_start_with.sql",
# (first_char,)
# )
# if len(result) == 0:
# return None
# return result[0]["idiom"]
await cls.init_lexicon()
if first_char not in cls.AVALIABLE_IDIOM_FIRST_CHAR:
return None
return secrets.choice(cls.AVALIABLE_IDIOM_FIRST_CHAR[first_char])
@classmethod
def init_lexicon(cls):
async def init_lexicon(cls):
if cls.__inited:
return
# await db_manager.execute_by_sql_file(
# ROOT_PATH / "sql" / "create_table.sql"
# ) # 确保数据库初始化
cls.__inited = True
# 成语大表
@ -291,11 +397,12 @@ class IdiomGame:
ALL_IDIOMS_INFOS = json.load(f)
# 词语大表
ALL_WORDS = []
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
jsonData = json.load(f)
cls.ALL_WORDS = [item["ci"] for item in jsonData]
logger.debug(f"Loaded {len(cls.ALL_WORDS)} words from ci.json")
logger.debug(f"Sample words: {cls.ALL_WORDS[:5]}")
ALL_WORDS = [item["ci"] for item in jsonData]
logger.debug(f"Loaded {len(ALL_WORDS)} words from ci.json")
logger.debug(f"Sample words: {ALL_WORDS[:5]}")
COMMON_WORDS = []
# 读取 COMMON 词语大表
@ -335,17 +442,47 @@ class IdiomGame:
logger.debug(f"Loaded {len(THUOCL_WORDS)} words from THUOCL txt files")
logger.debug(f"Sample words: {THUOCL_WORDS[:5]}")
# 读取本地的 idiom_llm_storage.txt 文件,补充词语表
LOCAL_LLM_WORDS = []
if (DATA_DIR / "idiom_llm_storage.txt").exists():
with open(DATA_DIR / "idiom_llm_storage.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
LOCAL_LLM_WORDS.append(word)
logger.debug(f"Loaded additional {len(LOCAL_LLM_WORDS)} words from idiom_llm_storage.txt")
# 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
# 批量插入数据库
# await db_manager.execute_many_values_by_sql_file(
# ROOT_PATH / "sql" / "insert_idiom.sql",
# [(idiom,) for idiom in ALL_IDIOMS]
# )
# 其他四字词语表,仅表示可以有这个词
cls.ALL_WORDS = (
[word for word in cls.ALL_WORDS if len(word) == 4]
ALL_WORDS = (
[word for word in ALL_WORDS if len(word) == 4]
+ THUOCL_WORDS
+ COMMON_WORDS
)
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
cls.ALL_WORDS = ALL_WORDS + LOCAL_LLM_WORDS
cls.ALL_IDIOMS = ALL_IDIOMS
# 插入数据库
# await db_manager.execute_many_values_by_sql_file(
# ROOT_PATH / "sql" / "insert_word.sql",
# [(word,) for word in ALL_WORDS]
# )
# 自定义词语 LOCAL_LLM_WORDS 插入数据库,兼容用
# await db_manager.execute_many_values_by_sql_file(
# ROOT_PATH / "sql" / "insert_custom_word.sql",
# [(word,) for word in LOCAL_LLM_WORDS]
# )
# 根据成语大表,划分出成语首字字典
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
@ -389,7 +526,7 @@ async def play_game(
if rounds <= 0:
await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export())
return
state = IdiomGame.try_start_game(group_id, force)
state = await IdiomGame.try_start_game(group_id, force)
if state == TryStartState.ALREADY_PLAYING:
await evt.send(
await UniMessage()
@ -408,7 +545,7 @@ async def play_game(
.export()
)
instance = IdiomGame.INSTANCE_LIST[group_id]
instance.start_game(rounds)
await instance.start_game(rounds)
# 发布成语
await evt.send(
await UniMessage()
@ -460,7 +597,9 @@ async def end_game(event: BaseEvent, group_id: str):
for line in history_lines:
result_text += line + "\n"
await evt.send(await result_text.export())
instance.clear_score_board()
# instance.clear_score_board()
# 将实例删除
del IdiomGame.INSTANCE_LIST[group_id]
evt = on_alconna(
@ -499,7 +638,7 @@ async def _(target: DepLongTaskTarget):
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
avaliable_idiom = IdiomGame.random_idiom_starting_with(instance.get_last_char())
avaliable_idiom = await IdiomGame.random_idiom_starting_with(instance.get_last_char())
# 发送哈哈狗图片
with open(ASSETS_PATH / "img" / "dog" / "haha_dog.jpg", "rb") as f:
img_data = f.read()

View File

@ -0,0 +1,15 @@
-- 创建成语大表
CREATE TABLE IF NOT EXISTS all_idioms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
idiom VARCHAR(128) NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS all_words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word VARCHAR(128) NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS custom_words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word VARCHAR(128) NOT NULL UNIQUE
);

View File

@ -0,0 +1,3 @@
-- 插入自定义词
INSERT OR IGNORE INTO custom_words (word)
VALUES (?);

View File

@ -0,0 +1,3 @@
-- 插入成语大表,避免重复插入
INSERT OR IGNORE INTO all_idioms (idiom)
VALUES (?);

View File

@ -0,0 +1,3 @@
-- 插入词
INSERT OR IGNORE INTO all_words (word)
VALUES (?);

View File

@ -0,0 +1,5 @@
-- 查询是否有以 xx 开头的成语,有则返回真,否则假
SELECT EXISTS(
SELECT 1 FROM all_idioms
WHERE idiom LIKE ? || '%'
) AS DEED;

View File

@ -0,0 +1,7 @@
-- 查询成语是否在 all_idioms 中,如果存在则返回 1否则再判断是否在 custom_words 或 all_words 中,存在则返回 0否则返回 -1
SELECT
CASE
WHEN EXISTS (SELECT 1 FROM all_idioms WHERE idiom = ?) THEN 1
WHEN EXISTS (SELECT 1 FROM custom_words WHERE word = ?) OR EXISTS (SELECT 1 FROM all_words WHERE word = ?) THEN 0
ELSE -1
END AS status;

View File

@ -0,0 +1,4 @@
-- 查询以 xx 开头的成语,随机打乱后只取第一个
SELECT idiom FROM all_idioms
WHERE idiom LIKE ? || '%'
ORDER BY RANDOM() LIMIT 1;

View File

@ -0,0 +1,2 @@
-- 随机从 all_idioms 表中选择一个成语
SELECT idiom FROM all_idioms ORDER BY RANDOM() LIMIT 1;

View File

@ -1,24 +1,32 @@
import re
from io import BytesIO
from typing import Any
import PIL
import PIL.Image
import cv2
import imageio.v3 as iio
from nonebot import on_message
from nonebot.adapters import Bot
from nonebot_plugin_alconna import Alconna, Args, Image, Option, UniMessage, on_alconna
import numpy
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import PIL_Image
from konabot.common.nb.extract_image import DepImageBytes, DepPILImage
from konabot.common.nb.match_keyword import match_keyword
from konabot.common.nb.reply_image import reply_image
# 保持不变
cmd_black_white = on_message(rule=match_keyword("黑白"))
@cmd_black_white.handle()
async def _(img: PIL_Image, bot: Bot):
async def _(img: DepPILImage, bot: Bot):
# 保持不变
await reply_image(cmd_black_white, bot, img.convert("LA"))
# 保持不变
def parse_timestamp(tx: str) -> float | None:
res = 0.0
for component in tx.split(":"):
@ -29,6 +37,7 @@ def parse_timestamp(tx: str) -> float | None:
return res
# 保持不变
cmd_giftool = on_alconna(
Alconna(
"giftool",
@ -44,7 +53,7 @@ cmd_giftool = on_alconna(
@cmd_giftool.handle()
async def _(
image: PIL_Image,
image: DepImageBytes,
start_point: str | None = None,
frame_count: int | None = None,
length: str | None = None,
@ -79,28 +88,24 @@ async def _(
is_rev = speed_factor < 0
speed_factor = abs(speed_factor)
if not getattr(image, "is_animated", False):
raise BotExceptionMessage("错误输入的不是动图GIF")
##
# 从这里开始,采样整个 GIF 图
frames: list[PIL.Image.Image] = []
durations: list[float] = []
try:
for i in range(getattr(image, "n_frames")):
image.seek(i)
frames.append(image.copy())
duration = image.info.get("duration", 100) / 1000
durations.append(duration)
except EOFError:
pass
if not frames:
reader = iio.imread(BytesIO(image), extension=".gif", index=None)
np_frames = list(reader)
_pil = PIL.Image.open(BytesIO(image))
durations: list[float] = []
while True:
try:
duration = _pil.info.get('duration', 20)
durations.append(max(duration, 20) / 1000)
_pil.seek(_pil.tell() + 1)
except EOFError:
break
except Exception:
raise BotExceptionMessage("错误:读取 GIF 帧失败")
# 采样结束
##
# 根据开始、结束时间或者帧数量来裁取 GIF 图
begin_time = ss or 0
end_time = sum(durations)
end_time = min(begin_time + (t or end_time), to or end_time, end_time)
@ -108,94 +113,95 @@ async def _(
accumulated = 0.0
status = 0
sel_frames: list[PIL.Image.Image] = []
sel_np_frames: list[numpy.ndarray[Any, Any]] = []
sel_durations: list[float] = []
for i in range(len(frames)):
frame = frames[i]
for i in range(len(np_frames)):
frame = np_frames[i]
duration = durations[i]
if status == 0:
if accumulated + duration > begin_time:
status = 1
sel_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
sel_np_frames.append(frame)
sel_durations.append(accumulated + duration - begin_time)
elif accumulated + duration == begin_time:
status = 1
elif status == 1:
if accumulated + duration > end_time:
sel_frames.append(frame)
sel_durations.append(end_time - accumulated)
if accumulated + duration >= end_time:
included_duration = end_time - accumulated
if included_duration > 0:
sel_np_frames.append(frame)
sel_durations.append(included_duration)
break
sel_frames.append(frame)
sel_np_frames.append(frame)
sel_durations.append(duration)
accumulated += duration
##
# 加速!
sel_durations = [dur / speed_factor * 1000 for dur in durations]
if not sel_np_frames:
raise BotExceptionMessage("错误:裁取 GIF 帧失败(可能时间设置错误)")
rframes = []
rdur = []
rdur_ms_unprocessed = [dur / speed_factor * 1000 for dur in sel_durations]
rframes: list[numpy.ndarray] = []
rdur_ms: list[int] = []
acc_mod_20 = 0
for i in range(len(sel_frames)):
fr = sel_frames[i]
du = round(sel_durations[i])
for i in range(len(sel_np_frames)):
fr = sel_np_frames[i]
du = rdur_ms_unprocessed[i]
if du >= 20:
rframes.append(fr)
rdur.append(int(du))
rdur_ms.append(int(round(du)))
acc_mod_20 = 0
else:
if acc_mod_20 == 0:
rframes.append(fr)
rdur.append(20)
rdur_ms.append(20)
acc_mod_20 += du
else:
acc_mod_20 += du
if acc_mod_20 >= 20:
acc_mod_20 = 0
if len(rframes) == 1 and len(sel_frames) > 1:
rframes.append(sel_frames[max(2, len(sel_frames) // 2)])
rdur.append(20)
##
# 收尾:看看透明度这块
transparency_flag = False
for f in rframes:
if f.mode == "RGBA":
if any(pix < 255 for pix in f.getchannel("A").getdata()):
transparency_flag = True
break
elif f.mode == "P" and "transparency" in f.info:
transparency_flag = True
break
tf = {}
if transparency_flag:
tf["transparency"] = 0
if len(rframes) == 1 and len(sel_np_frames) > 1:
middle_index = max(2, len(sel_np_frames) // 2)
rframes.append(sel_np_frames[middle_index])
rdur_ms.append(20)
if is_rev:
rframes = rframes[::-1]
rdur = rdur[::-1]
rdur_ms = rdur_ms[::-1]
output_img = BytesIO()
if rframes:
rframes[0].save(
output_img,
format="GIF",
save_all=True,
append_images=rframes[1:],
duration=rdur,
loop=0,
optimize=False,
disposal=2,
**tf,
)
do_transparent = any((f.shape[2] == 4 for f in rframes))
if do_transparent:
rframes = [(
f
if f.shape[2] == 4
else cv2.cvtColor(f, cv2.COLOR_RGB2RGBA)
) for f in rframes]
kwargs = { "transparency": 0, "disposal": 2, "mode": "RGBA" }
else:
kwargs = {}
try:
iio.imwrite(
output_img,
rframes,
extension=".gif",
duration=rdur_ms,
loop=0,
optimize=True,
plugin="pillow",
**kwargs,
)
except Exception as e:
raise BotExceptionMessage(f"错误:写入 GIF 失败: {e}")
else:
raise BotExceptionMessage("错误:没有可输出的帧")
output_img.seek(0)
await cmd_giftool.send(await UniMessage().image(raw=output_img).export())

View File

@ -4,33 +4,34 @@ from math import ceil
from loguru import logger
from nonebot import on_message
import nonebot
from nonebot.rule import to_me
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
on_alconna)
from nonebot_plugin_apscheduler import scheduler
from konabot.common import username
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.pager import PagerQuery
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE,
create_admin_commands,
puzzle_manager)
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
from konabot.plugins.poster.service import broadcast
create_admin_commands()
register_poster_info("每日谜题", info=PosterInfo(
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
description="此方 BOT 每日谜题推送",
))
async def is_play_group(target: DepLongTaskTarget):
if target.is_private_chat:
return True
if target.channel_id in config.plugin_puzzle_playgroup:
return True
return False
cmd_submit = on_message(rule=is_play_group)
cmd_submit = on_message(rule=to_me())
@cmd_submit.handle()
@ -52,7 +53,7 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
cmd_query = on_alconna(Alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
), rule=is_play_group)
), rule=to_me())
@cmd_query.handle()
async def _(target: DepLongTaskTarget):
@ -65,7 +66,7 @@ async def _(target: DepLongTaskTarget):
cmd_query_submission = on_alconna(Alconna(
"今日答题情况"
), rule=is_play_group)
), rule=to_me())
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
@ -77,10 +78,10 @@ async def _(target: DepLongTaskTarget):
cmd_history = on_alconna(Alconna(
"历史题目",
"re:历史(题目|谜题)",
Args["page?", int],
Args["index_id?", str],
), rule=is_play_group)
), rule=to_me())
@cmd_history.handle()
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@ -119,17 +120,39 @@ async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
await target.send_message(msg)
cmd_leadboard = on_alconna(Alconna(
"re:此方(解谜|谜题)排行榜",
Args["page?", int],
))
@cmd_leadboard.handle()
async def _(target: DepLongTaskTarget, page: int = 1):
async with puzzle_manager() as manager:
result = manager.get_leadboard(PagerQuery(page, 10))
await target.send_message(result.to_unimessage(
title="此方解谜排行榜",
formatter=lambda data: (
f"{data[1]} 已完成 | "
f"{username.get_username(data[0])}"
)
))
@scheduler.scheduled_job("cron", hour="8")
async def _():
async with puzzle_manager() as manager:
yesterday = get_today_date() - datetime.timedelta(days=1)
msg2 = get_daily_report(manager, yesterday)
if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
await broadcast("每日谜题", msg2)
puzzle = manager.get_today_puzzle()
if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
await broadcast("每日谜题", get_puzzle_description(puzzle))
else:
logger.info("自动任务:没有找到题目,跳过")
driver = nonebot.get_driver()

View File

@ -17,7 +17,7 @@ class PuzzleImageManager:
21,
)
img_name = f"{id}{suffix}"
(KONAPH_IMAGE_BASE / img_name).write_bytes(data)
_ = (KONAPH_IMAGE_BASE / img_name).write_bytes(data)
return img_name
def remove_puzzle_image(self, img_name: str):

View File

@ -33,11 +33,11 @@ def get_puzzle_description(puzzle: Puzzle, with_answer: bool = False) -> UniMess
)
result = result.text(f"\n\n出题者:{get_username(puzzle.author_id)}")
if with_answer:
result = result.text(f"\n\n题目答案:{puzzle.flag}")
else:
result = result.text("\n\n输入「提交答案 答案」来提交你的解答")
result = result.text("\n\nAt 我或者私聊我「提交答案 答案」来提交你的解答")
return result

View File

@ -1,5 +1,6 @@
import asyncio
import datetime
import functools
import random
import re
from contextlib import asynccontextmanager
@ -7,6 +8,7 @@ from contextlib import asynccontextmanager
import nanoid
from pydantic import BaseModel, Field, ValidationError
from konabot.common.pager import PagerQuery
from konabot.plugins.kona_ph.core.path import KONAPH_DATA_JSON
@ -112,6 +114,10 @@ class PuzzleManager(BaseModel):
index_id_counter: int = 1
submissions: dict[str, dict[str, list[PuzzleSubmission]]] = {}
"""
类型:{ [raw_id: str]: { [user_id: str]: PuzzleSubmission[] } }
"""
last_checked_date: datetime.date = Field(
default_factory=lambda: get_today_date() - datetime.timedelta(days=1)
)
@ -235,6 +241,20 @@ class PuzzleManager(BaseModel):
if p.author_id == user
], key=lambda p: p.created_at, reverse=True)
def get_leadboard(self, pager: PagerQuery):
return pager.apply(sorted([
(user, sum((
len([
s for s in sl.get(user, [])
if s.success
]) for sl in self.submissions.values()
)))
for user in functools.reduce(
lambda x, y: x | y,
(set(sl.keys()) for sl in self.submissions.values()),
)
], key=lambda t: t[1], reverse=True))
lock = asyncio.Lock()

View File

@ -10,7 +10,6 @@ from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.exc import BotExceptionMessage
from konabot.common.nb.extract_image import download_image_bytes
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.common.username import get_username
from konabot.plugins.kona_ph.core.image import get_image_manager
from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_puzzle_hint_list,
@ -19,6 +18,7 @@ from konabot.plugins.kona_ph.core.message import (get_puzzle_description, get_pu
from konabot.plugins.kona_ph.core.storage import (Puzzle, PuzzleHint, PuzzleManager,
get_today_date,
puzzle_manager)
from konabot.plugins.poster.service import broadcast
PUZZLE_PAGE_SIZE = 10
@ -344,7 +344,7 @@ def create_admin_commands():
p = manager.get_today_puzzle(strong=True)
if p is None:
return await target.send_message("上架失败了orz可能是没题了")
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(p))
await broadcast("每日谜题", get_puzzle_description(p))
return await target.send_message("Ok!")
@cmd_admin.assign("preview")

View File

@ -2,13 +2,13 @@ from pathlib import Path
import nonebot
import nonebot.adapters
import nonebot.adapters.discord
import nonebot.rule
from nonebot import on_command
from nonebot_plugin_alconna import Alconna, Args, UniMessage, on_alconna
from konabot.common.nb.is_admin import is_admin
from konabot.common.path import DOCS_PATH_MAN1, DOCS_PATH_MAN3, DOCS_PATH_MAN7, DOCS_PATH_MAN8
from konabot.plugins.markdown.core import MarkDownCore
def search_man(section: int) -> dict[tuple[int, str], Path]:
base_path = {
@ -64,7 +64,7 @@ async def _(
for section in section_set:
mans += [f"{n}({s})" for s, n in search_man(section).keys()]
mans.sort()
await man.send(UniMessage().text(
(
"★此方 BOT 使用帮助★\n"
@ -94,9 +94,9 @@ async def _(
await man.send(UniMessage().text("你所检索的指令不存在"))
return
mans_msg = mans_fp.read_text('utf-8', 'replace')
if isinstance(event, nonebot.adapters.discord.event.MessageEvent):
mans_msg = f'```\n{mans_msg}\n```'
await man.send(UniMessage().text(mans_msg))
# await man.send(UniMessage().text(mans_msg))
img = await MarkDownCore.render_markdown(mans_msg)
await man.send(UniMessage.image(raw=img))
help_deprecated = on_command('help', rule=nonebot.rule.to_me())

View File

@ -0,0 +1,72 @@
from loguru import logger
import nonebot
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (
UniMessage,
UniMsg
)
from konabot.plugins.markdown.core import MarkDownCore
def is_markdown_mentioned(msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "markdown" not in txt[:8] and "md" not in txt[:2]:
return False
return True
evt = nonebot.on_message(rule=is_markdown_mentioned)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
content = msg.extract_plain_text()
else:
content = msg.extract_plain_text()
logger.debug(f"Received markdown command with content: {content}")
if "md" in content[:2]:
message = content.replace("md", "", 1).strip()
else:
message = content.replace("markdown", "", 1).strip()
# 如果回复了消息,则转换回复的内容
if(len(message) == 0):
if event.reply:
message = event.reply.message.extract_plain_text()
else:
return
logger.debug(f"Markdown content to render: {message}")
out = await MarkDownCore.render_markdown(message, theme="dark")
await evt.send(await UniMessage().image(raw=out).export())
def is_latex_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "latex" not in txt[:5]:
return False
return True
evt = nonebot.on_message(rule=is_latex_mentioned)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
content = msg.extract_plain_text()
else:
content = msg.extract_plain_text()
logger.debug(f"Received markdown command with content: {content}")
message = content.replace("latex", "", 1).strip()
# 如果回复了消息,则转换回复的内容
if(len(message) == 0):
if event.reply:
message = event.reply.message.extract_plain_text()
else:
return
logger.debug(f"Latex content to render: {message}")
out = await MarkDownCore.render_latex(message, theme="dark")
await evt.send(await UniMessage().image(raw=out).export())

View File

@ -0,0 +1,57 @@
from loguru import logger
from playwright.async_api import ConsoleMessage, Page
from konabot.common.web_render import konaweb
from konabot.common.web_render.core import WebRenderer
class MarkDownCore:
@staticmethod
async def render_markdown(markdown_text: str, theme: str = "dark", params: dict = {}) -> bytes:
async def page_function(page: Page):
await page.emulate_media(color_scheme=theme)
await page.locator('textarea[name=content]').fill(markdown_text)
await page.locator('#button').click()
# 等待 checkState 函数加载完成
await page.wait_for_function("typeof checkState === 'function'", timeout=1000)
# 访问 checkState 函数,确保渲染完成
await page.wait_for_function("checkState() === true", timeout=1000)
out = await WebRenderer.render_with_persistent_page(
"markdown_renderer",
konaweb('markdown'),
target='#main',
other_function=page_function,
params=params
)
return out
@staticmethod
async def render_latex(text: str, theme: str = "dark") -> bytes:
params = {
"size": "2.5em",
}
async def page_function(page: Page):
await page.emulate_media(color_scheme=theme)
page.wait_for_selector('textarea[name=content]')
await page.locator('textarea[name=content]').fill(f"$$ {text} $$")
page.wait_for_selector('#button')
await page.locator('#button').click()
# 等待 checkState 函数加载完成
await page.wait_for_function("typeof checkState === 'function'", timeout=2000)
# 访问 checkState 函数,确保渲染完成
await page.wait_for_function("checkState() === true", timeout=10000)
out = await WebRenderer.render_with_persistent_page(
"latex_renderer",
konaweb('latex'),
target='#main',
other_function=page_function,
params=params
)
return out

View File

@ -17,7 +17,8 @@ from nonebot_plugin_alconna import (
)
from playwright.async_api import ConsoleMessage, Page
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.common.nb.match_keyword import match_keyword
from konabot.common.nb.extract_image import DepPILImage
from konabot.common.web_render import konaweb
from konabot.common.web_render.core import WebRenderer
from konabot.common.web_render.host_images import host_tempdir
@ -34,10 +35,8 @@ from konabot.plugins.memepack.drawing.saying import (
draw_pt,
draw_suan,
)
from konabot.plugins.memepack.drawing.watermark import draw_doubao_watermark
from nonebot.adapters import Bot, Event
from returns.result import Success, Failure
geimao = on_alconna(
Alconna(
@ -189,11 +188,11 @@ async def _(saying: list[str]):
await cutecat.send(await UniMessage().image(raw=img_bytes).export())
cao_display_cmd = on_message()
cao_display_cmd = on_message(rule=match_keyword("小槽展示"))
@cao_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
async def _(msg: UniMsg, img: DepPILImage):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
if text.text.strip() == "小槽展示":
@ -204,20 +203,10 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
return
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_cao_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await cao_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)
img_handled = await draw_cao_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await cao_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
snaur_display_cmd = on_alconna(
@ -234,7 +223,7 @@ snaur_display_cmd = on_alconna(
@snaur_display_cmd.handle()
async def _(
img: PIL_Image,
img: DepPILImage,
whiteness: float = 0.0,
black_level: float = 0.2,
opacity: float = 0.8,
@ -251,9 +240,9 @@ async def _(
img_processed.save(img_data, "PNG")
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())
anan_display_cmd = on_message()
anan_display_cmd = on_message(rule=match_keyword("安安展示"))
@anan_display_cmd.handle()
async def _(msg: UniMsg, evt: Event, bot: Bot):
async def _(msg: UniMsg, img: DepPILImage):
flag = False
for text in cast(Iterable[Text], msg.get(Text)):
stripped = text.text.strip()
@ -266,20 +255,10 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
if not flag:
return
match await extract_image_from_message(evt.get_message(), evt, bot):
case Success(img):
img_handled = await draw_anan_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
case Failure(err):
await anan_display_cmd.send(
await UniMessage()
.at(user_id=evt.get_user_id())
.text(" ")
.text(err)
.export()
)
img_handled = await draw_anan_display(img)
img_bytes = BytesIO()
img_handled.save(img_bytes, format="PNG")
await anan_display_cmd.send(await UniMessage().image(raw=img_bytes).export())
kiosay = on_alconna(
@ -315,7 +294,7 @@ quote_cmd = on_alconna(Alconna(
), aliases={"quote"})
@quote_cmd.handle()
async def _(quote: str, author: str, img: PIL_Image):
async def _(quote: str, author: str, img: DepPILImage):
async with host_tempdir() as tempdir:
img_path = tempdir.path / "image.png"
img_url = tempdir.url_of(img_path)
@ -342,3 +321,16 @@ async def _(quote: str, author: str, img: PIL_Image):
)
await quote_cmd.send(await UniMessage().image(raw=out).export())
doubao_cmd = on_alconna(Alconna(
"豆包水印",
Args["image?", Image | None],
))
@doubao_cmd.handle()
async def _(img: DepPILImage):
result = await draw_doubao_watermark(img)
result_bytes = BytesIO()
result.save(result_bytes, format="PNG")
await doubao_cmd.send(await UniMessage().image(raw=result_bytes).export())

View File

@ -0,0 +1,20 @@
import PIL
import PIL.Image
from konabot.common.path import ASSETS_PATH
from konabot.common.utils.to_async import make_async
doubao_watermark = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "doubao.png").convert("RGBA")
@make_async
def draw_doubao_watermark(base: PIL.Image.Image) -> PIL.Image.Image:
base = base.copy().convert("RGBA")
w = base.size[0] / 768 * 140
h = base.size[0] / 768 * 40
x = base.size[0] / 768 * 160
y = base.size[0] / 768 * 60
w, h, x, y = map(int, (w, h, x, y))
base.alpha_composite(doubao_watermark.resize((w, h)), (base.size[0] - x, base.size[1] - y))
return base

View File

@ -1,9 +1,10 @@
from nonebot import on_message
from nonebot_plugin_alconna import UniMessage, UniMsg
from nonebot_plugin_alconna import UniMessage
evt = on_message()
from konabot.common.nb.match_keyword import match_keyword
evt = on_message(rule=match_keyword(""))
@evt.handle()
async def _(msg: UniMsg):
if msg.extract_plain_text() == "":
await evt.send(await UniMessage().text("").export())
async def _():
await evt.send(await UniMessage().text("").export())

View File

@ -0,0 +1,85 @@
import nonebot
from nonebot_plugin_alconna import Alconna, Args, on_alconna
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.service import dep_poster_service
cmd_subscribe = on_alconna(Alconna(
"订阅",
Args["channel", str],
))
@cmd_subscribe.handle()
async def _(target: DepLongTaskTarget, channel: str):
async with dep_poster_service() as service:
result = await service.subscribe(channel, target)
if result:
await target.send_message(f"已订阅「{channel}")
else:
await target.send_message(f"已经订阅过「{channel}」了")
cmd_list = on_alconna(Alconna(
"re:(?:查询|我的|获取)订阅(列表)?",
Args["page?", int],
))
def better_channel_message(channel_id: str) -> str:
if channel_id not in POSTER_INFO_DATA:
return channel_id
data = POSTER_INFO_DATA[channel_id]
return f"{channel_id}{data.description}"
@cmd_list.handle()
async def _(target: DepLongTaskTarget, page: int = 1):
async with dep_poster_service() as service:
result = await service.get_channels(target, PagerQuery(
page_index=page,
page_size=10,
))
await target.send_message(result.to_unimessage(title="订阅列表", formatter=better_channel_message))
cmd_list_available = on_alconna(Alconna(
"re:(查询)?可用订阅(列表)?",
Args["page?", int],
))
@cmd_list_available.handle()
async def _(target: DepLongTaskTarget, page: int = 1):
result = PagerQuery(
page_index=page,
page_size=10,
).apply(sorted(POSTER_INFO_DATA.keys()))
await target.send_message(result.to_unimessage(title="可用订阅列表", formatter=better_channel_message))
cmd_unsubscribe = on_alconna(Alconna(
"取消订阅",
Args["channel", str],
))
@cmd_unsubscribe.handle()
async def _(target: DepLongTaskTarget, channel: str):
async with dep_poster_service() as service:
result = await service.subscribe(channel, target)
if result:
await target.send_message(f"已取消订阅「{channel}")
else:
await target.send_message(f"这里没有订阅过「{channel}")
driver = nonebot.get_driver()
@driver.on_startup
async def _():
async with dep_poster_service() as service:
await service.fix_data()

View File

@ -0,0 +1,15 @@
from dataclasses import dataclass, field
@dataclass
class PosterInfo:
aliases: set[str] = field(default_factory=set)
description: str = field(default='')
POSTER_INFO_DATA: dict[str, PosterInfo] = {}
def register_poster_info(channel: str, info: PosterInfo):
POSTER_INFO_DATA[channel] = info

View File

@ -0,0 +1,112 @@
import asyncio
from contextlib import asynccontextmanager
from typing import Annotated
from nonebot.params import Depends
from pydantic import BaseModel, ValidationError
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
from konabot.common.path import DATA_PATH
from konabot.plugins.poster.repository import IPosterRepo
class ChannelData(BaseModel):
targets: list[LongTaskTarget] = []
class PosterData(BaseModel):
channels: dict[str, ChannelData] = {}
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
if (target1.is_private_chat and not target2.is_private_chat):
return False
if (target2.is_private_chat and not target1.is_private_chat):
return False
if target1.platform != target2.platform:
return False
# 如果是群聊,则要求 channel_id 相同
if not target1.is_private_chat:
return target1.channel_id == target2.channel_id
return target1.target_id == target2.target_id
class LocalPosterRepo(IPosterRepo):
def __init__(self, data: PosterData) -> None:
self.data = data
super().__init__()
async def get_channel_targets(self, channel: str) -> list[LongTaskTarget]:
if channel not in self.data.channels:
self.data.channels[channel] = ChannelData()
return self.data.channels[channel].targets
async def add_channel_target(self, channel: str, target: LongTaskTarget) -> bool:
targets = await self.get_channel_targets(channel)
for t in targets:
if is_the_same_target(t, target):
return False
targets.append(target)
return True
async def remove_channel_target(self, channel: str, target: LongTaskTarget) -> bool:
targets = await self.get_channel_targets(channel)
len0 = len(targets)
self.data.channels[channel].targets = [
t for t in targets if not is_the_same_target(t, target)
]
len1 = len(self.data.channels[channel].targets)
return len0 != len1
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
channels: list[str] = []
for channel_id, channel in self.data.channels.items():
for t in channel.targets:
if is_the_same_target(target, t):
channels.append(channel_id)
break
channels = sorted(channels)
return pager.apply(channels)
async def merge_channel(self, from_channel: str, to_channel: str) -> None:
channel_from = await self.get_channel_targets(from_channel)
channel_to = await self.get_channel_targets(to_channel)
for t1 in channel_from:
flag = True
for t2 in channel_to:
if is_the_same_target(t1, t2):
flag = False
break
if flag:
channel_to.append(t1)
del self.data.channels[from_channel]
LOCAL_POSTER_DATA_LOCK = asyncio.Lock()
LOCAL_POSTER_DATA_PATH = DATA_PATH / "module_poster_data.json"
@asynccontextmanager
async def local_poster_data():
async with LOCAL_POSTER_DATA_LOCK:
if not LOCAL_POSTER_DATA_PATH.exists():
data = PosterData()
else:
try:
data = PosterData.model_validate_json(LOCAL_POSTER_DATA_PATH.read_text())
except ValidationError:
data = PosterData()
yield data
LOCAL_POSTER_DATA_PATH.write_text(data.model_dump_json())
@asynccontextmanager
async def local_poster():
async with local_poster_data() as data:
yield LocalPosterRepo(data)
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]

View File

@ -0,0 +1,37 @@
from abc import ABC, abstractmethod
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
class IPosterRepo(ABC):
@abstractmethod
async def get_channel_targets(self, channel: str) -> list[LongTaskTarget]:
"""
获取广播通道的所有广播对象
"""
@abstractmethod
async def add_channel_target(self, channel: str, target: LongTaskTarget) -> bool:
"""
向广播通道添加一个广播目标。若目标已存在,则返回 False
"""
@abstractmethod
async def remove_channel_target(self, channel: str, target: LongTaskTarget) -> bool:
"""
移除一个广播通道的目标。若目标不存在,则返回 False
"""
@abstractmethod
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
"""
获得一个目标已经订阅了的广播通道
"""
@abstractmethod
async def merge_channel(self, from_channel: str, to_channel: str) -> None:
"""
合并两个 Channel 为一个,并移除另一个
"""

View File

@ -0,0 +1,59 @@
from contextlib import asynccontextmanager
from typing import Annotated, Any
from nonebot.params import Depends
from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.repo_local_data import local_poster
from konabot.plugins.poster.repository import IPosterRepo
class PosterService:
def __init__(self, repo: IPosterRepo) -> None:
self.repo = repo
def parse_channel_id(self, channel: str):
for cid, cinfo in POSTER_INFO_DATA.items():
if channel in cinfo.aliases:
return cid
return channel
async def subscribe(self, channel: str, target: LongTaskTarget) -> bool:
channel = self.parse_channel_id(channel)
return await self.repo.add_channel_target(channel, target)
async def unsubscribe(self, channel: str, target: LongTaskTarget) -> bool:
channel = self.parse_channel_id(channel)
return await self.repo.remove_channel_target(channel, target)
async def broadcast(self, channel: str, message: UniMessage[Any] | str) -> list[LongTaskTarget]:
channel = self.parse_channel_id(channel)
targets = await self.repo.get_channel_targets(channel)
for target in targets:
# 因为是订阅消息,就不要 At 对方了
await target.send_message(message, at=False)
return targets
async def get_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
return await self.repo.get_subscribed_channels(target, pager)
async def fix_data(self):
for cid, cinfo in POSTER_INFO_DATA.items():
for alias in cinfo.aliases:
await self.repo.merge_channel(alias, cid)
@asynccontextmanager
async def dep_poster_service():
async with local_poster() as repo:
yield PosterService(repo)
async def broadcast(channel: str, message: UniMessage[Any] | str):
async with dep_poster_service() as service:
return await service.broadcast(channel, message)
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]

View File

@ -1,3 +1,4 @@
import re
import aiohttp
import asyncio as asynkio
from math import ceil
@ -6,7 +7,6 @@ from typing import Any
import nanoid
import nonebot
import ptimeparse
from loguru import logger
from nonebot import get_plugin_config, on_message
from nonebot.adapters import Event
@ -14,8 +14,10 @@ from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget, LongTask, create_longtask, handle_long_task, longtask_data
from konabot.common.nb.match_keyword import match_keyword
from konabot.plugins.simple_notify.ask_llm import ask_ai
evt = on_message()
evt = on_message(rule=match_keyword(re.compile("^.+提醒我.+$")))
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json"
@ -75,21 +77,12 @@ async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget):
return
text = msg.extract_plain_text()
if "提醒我" not in text:
return
segments = text.split("提醒我", maxsplit=1)
if len(segments) != 2:
return
notify_time, notify_text = segments
try:
target_time = ptimeparse.Parser().parse(notify_time)
logger.info(f"{notify_time} 解析出了时间:{target_time}")
except Exception:
logger.info(f"无法从 {notify_time} 中解析出时间")
return
if not notify_text:
target_time, notify_text = await ask_ai(text)
if target_time is None:
return
await create_longtask(

View File

@ -0,0 +1,158 @@
import datetime
import json
import re
from loguru import logger
from konabot.common.apis.ali_content_safety import AlibabaGreen
from konabot.common.llm import get_llm
SYSTEM_PROMPT = """你是一个专门解析提醒请求的助手。请分析用户输入识别其中是否包含提醒信息并输出标准化的JSON格式结果。
输入格式通常是:"xxxx提醒我yyyy",其中:
- xxxx 是用户提供的时间信息
- yyyy 是提醒内容。有些时候用户会有一些需求。你可以在合理的范围进行衍生。
输出要求:
- 必须是有效的JSON对象
- 包含以下字段:
* datetime: 如果是绝对时间填入ISO 8601格式的日期时间字符串否则为null
* datetime_delta: 如果是相对时间填入ISO 8601持续时间格式否则为null
* datetime_delta_minus: 如果时间偏移量是负数,则此项为 true否则为 false
* content: 提醒内容的字符串
* is_notice: 布尔值,表示这是否是真正的提醒请求
时间处理规则:
- 绝对时间示例:如果 xxxx 输入了非常明确的时间点,如"2024年12月25日" → 转换为具体datetime
- 相对时间示例:如果 xxxx 没有输入非常明确的时间点,如"10分钟后""2小时后""3天后" → 转换为datetime_delta
- 如果用户输入了需要计算的时间,你需要计算出正确的结果,如"10分钟后的8分钟前" → 转换为 “PT2M”
- zzzz 是系统提供的时间,每句话肯定都有,这不是你判断相对或绝对时间的依据,需严格按照 xxx 来判断
- datetime和datetime_delta有且仅有一个不为null
时间格式要求:
- datetime: "YYYY-MM-DDTHH:MM:SS" (ISO 8601)
- datetime_delta: "PxYxMxDTxHxMxS" 格式 (如"PT1H30M"表示1小时30分钟"P3DT4H"表示三天四小时,"P5MT2M"表示五个月两分钟)
判断标准:
- is_notice=true: 明确包含时间+提醒内容的请求
- is_notice=false: 闲聊、疑问句、或不符合提醒格式的内容
示例:
用户:"明天下午2点提醒我开会"
输出:{"datetime": "2024-01-16T14:00:00", "datetime_delta": null,
"datetime_delta_minus": false, "content": "开会", "is_notice": true}
用户:"5分钟后提醒我关火"
输出:{"datetime": null, "datetime_delta": "PT5M", "datetime_delta_minus": false, "content": "关火", "is_notice": true}
用户:"5分钟前提醒我关火"
输出:{"datetime": null, "datetime_delta": "PT5M", "datetime_delta_minus": true, "content": "关火", "is_notice": true}
用户:"五百年后提醒我关火"
输出:{"datetime": null, "datetime_delta": "P500Y", "datetime_delta_minus": false, "content": "关火", "is_notice": true}
用户:"昨天提醒我关火"
输出:{"datetime": null, "datetime_delta": "P1D", "datetime_delta_minus": true, "content": "关火", "is_notice": true}
用户:"什么是提醒功能?"
输出:{"datetime": null, "datetime_delta": null, "datetime_delta_minus": false, "content": "", "is_notice": false}
用户:"过一会会,用可爱的语气提醒我该睡觉了"
输出:{"datetime": null, "datetime_delta": "PT10M", "datetime_delta_minus": false, "content": "呼呼!该睡觉了哦!ヾ(•ω•`)o", "is_notice": true}
请严格按照上述格式输出JSON不要添加任何其他文字说明。现在是 DATETIME"""
pt_pattern = re.compile(
r"^P"
r"((?P<year>\d+)Y)?"
r"((?P<month>\d+)M)?"
r"((?P<day>\d+)D)?"
r"(T((?P<hour>\d+)H)?"
r"((?P<minute>\d+)M)?"
r"((?P<second>\d+)S)?)?$"
)
def tryint(s: str | None):
if s:
if re.match(r"^\d+$", s):
return int(s)
return 0
async def ask_ai(expression: str, now: datetime.datetime | None = None) -> tuple[datetime.datetime | None, str]:
if now is None:
now = datetime.datetime.now()
prompt = SYSTEM_PROMPT.replace("DATETIME", f"{now}, 星期 {now.weekday() + 1}")
is_safe = await AlibabaGreen.detect(expression)
if not is_safe:
logger.info(f"提醒功能:消息被阿里绿网拦截 message={expression}")
return None, ""
llm = get_llm("qwen3-max")
message = await llm.chat([
{ "role": "system", "content": prompt },
{ "role": "user", "content": expression },
])
result = message.content
if result is None:
return (None, "")
try:
data = json.loads(result)
except json.JSONDecodeError:
logger.info(f"提醒功能:解析 AI 返回值时出现问题 raw={result}")
return (None, "")
datetime_absolute = data.get("datetime", None)
datetime_delta = data.get("datetime_delta", None)
is_minus = data.get("datetime_delta_minus", False)
content = data.get("content", "")
is_notice = data.get("is_notice", False)
if not is_notice:
return (None, "")
if datetime_absolute:
try:
res = datetime.datetime.strptime(datetime_absolute, "%Y-%m-%dT%H:%M:%S"), content
logger.info(f"提醒功能:使用绝对时间解析 AI 返回值 raw={result} target={res[0]}")
return res
except ValueError:
pass
if datetime_delta and (match := pt_pattern.match(datetime_delta)):
years = tryint(match.group("year"))
months = tryint(match.group("month"))
days = tryint(match.group("day"))
hours = tryint(match.group("hour"))
minutes = tryint(match.group("minute"))
seconds = tryint(match.group("second"))
dt = datetime.timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
if is_minus:
dt = -dt
if is_minus:
now2 = now.replace(year=now.year - years)
m = now2.month
if (months - m) >= 0:
neg_months = -(m - months - 1)
neg_years = (neg_months + 11) // 12
target_month = 12 - ((neg_months - 1) % 12)
now2 = now2.replace(year=now2.year - neg_years, month=target_month)
else:
now2 = now2.replace(month=m - months)
else:
now2 = now.replace(year=now.year + years)
m = now2.month
now2 = now2.replace(
year=now2.year + (m + months - 1) // 12,
month=(m + months - 1) % 12 + 1
)
logger.info(f"提醒功能:使用相对时间解析 AI 返回值 raw={result} target={now2+dt}")
return (now2 + dt, content)
logger.warning(f"提醒功能:解析 AI 返回值时没有找到解析方法 raw={result}")
return (None, "")

View File

@ -0,0 +1,71 @@
"""
提取首字母
...谁需要啊!!!
"""
import functools
from loguru import logger
from nonebot import on_message
from nonebot.rule import KeywordsRule, Rule, ToMeRule
from nonebot.adapters.onebot.v11.bot import Bot as OBBot
from nonebot.adapters.onebot.v11.event import MessageEvent as OBMessageEvent
from nonebot.adapters.onebot.v11.message import MessageSegment as OBMessageSegment
from nonebot_plugin_alconna import Text, UniMsg
from nonebot_plugin_alconna.uniseg.adapters.onebot11.builder import Onebot11MessageBuilder
from pypinyin import pinyin, Style as PinyinStyle
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.apis.ali_content_safety import AlibabaGreen
keywords = ("szmtq", "tqszm", "提取首字母", "首字母提取", )
cmd_tqszm = on_message(rule=Rule(ToMeRule(), KeywordsRule(*keywords)))
@cmd_tqszm.handle()
async def _(target: DepLongTaskTarget, msg: UniMsg, evt: OBMessageEvent | None = None, bot: OBBot | None = None):
texts = ""
command_occurred = False
is_reply_mode = False
if evt is not None and bot is not None:
reply = await Onebot11MessageBuilder().extract_reply(evt, bot)
if reply is not None:
is_reply_mode = True
for seg in reply.data.get('msg', []):
if isinstance(seg, OBMessageSegment) and seg.type == 'text':
segtext: str = seg.data.get('text', '')
texts += segtext
if is_reply_mode:
for seg in msg:
if isinstance(seg, Text):
if all(seg.text.strip() != k for k in keywords):
return
else:
for seg in msg:
if isinstance(seg, Text):
if command_occurred:
texts += seg.text
continue
if any(seg.text.startswith(w) for w in keywords):
command_occurred = True
texts += next(
seg.text.removeprefix(w) for w in keywords
if seg.text.startswith(w)
).removeprefix(" ")
result = pinyin(texts, style=PinyinStyle.FIRST_LETTER, errors=lambda x: x)
print(result)
result_text = functools.reduce(lambda x, y: x + y, [
p[0] for p in result if len(p) > 0
], "")
if not await AlibabaGreen.detect(result_text):
logger.info(f"对首字母序列的检测未通过安全测试 RAW={texts} FORMATTED={result_text}")
await target.send_message(result_text, at=False)

View File

@ -1,14 +1,12 @@
from io import BytesIO
from loguru import logger
from nonebot.adapters import Bot as BaseBot
from nonebot.adapters import Event as BaseEvent
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Field, UniMessage, on_alconna
from PIL import Image
from returns.result import Failure, Success
import nonebot_plugin_alconna
from konabot.common.nb.extract_image import extract_image_from_message
from konabot.common.nb.extract_image import DepPILImage
__plugin_meta__ = PluginMetadata(
name="ytpgif",
@ -43,6 +41,7 @@ ytpgif_cmd = on_alconna(
unmatch_tips=lambda x: f"{x}”不是有效数值。{SPEED_TIPS}",
),
],
Args["image?", nonebot_plugin_alconna.Image | None],
),
use_cmd_start=True,
use_cmd_sep=False,
@ -63,7 +62,7 @@ def resize_frame(frame: Image.Image) -> Image.Image:
@ytpgif_cmd.handle()
async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
async def handle_ytpgif(src_img: DepPILImage, speed: float = 1.0):
# === 校验 speed 范围 ===
if not (MIN_SPEED <= speed <= MAX_SPEED):
await ytpgif_cmd.send(
@ -71,19 +70,6 @@ async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
)
return
match await extract_image_from_message(event.get_message(), event, bot):
case Success(img):
src_img = img
case Failure(msg):
await ytpgif_cmd.send(
await UniMessage.text(msg).export()
)
return
case _:
return
try:
try:
n_frames = getattr(src_img, "n_frames", 1)
@ -217,4 +203,4 @@ async def handle_ytpgif(event: BaseEvent, bot: BaseBot, speed: float = 1.0):
print(f"[YTPGIF] 处理失败: {e}")
await ytpgif_cmd.send(
await UniMessage.text("❌ 处理失败,可能是图片格式不支持、文件损坏或过大。").export()
)
)

3145
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,6 @@ name = "konabot"
version = "0.1.0"
description = "在 MTTU 内部使用的 bot"
authors = [{ name = "passthem", email = "Passthem183@gmail.com" }]
readme = "README.md"
requires-python = ">=3.12,<4.0"
dependencies = [
"nonebot2[all] (>=2.4.3,<3.0.0)",
@ -23,25 +22,35 @@ dependencies = [
"skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)",
"ptimeparse (>=0.2.1,<0.3.0)",
"nanoid (>=2.0.0,<3.0.0)",
"opencc (>=1.1.9,<2.0.0)",
"playwright (>=1.55.0,<2.0.0)",
"openai (>=2.7.1,<3.0.0)",
"imageio (>=2.37.2,<3.0.0)",
"aiosqlite (>=0.20.0,<1.0.0)",
"sqlparse (>=0.5.0,<1.0.0)",
"alibabacloud-green20220302 (>=3.0.1,<4.0.0)",
"pypinyin (>=0.55.0,<0.56.0)",
]
[tool.poetry]
package-mode = false
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
[[tool.poetry.source]]
name = "pt-gitea-pypi"
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
priority = "supplemental"
[[tool.poetry.source]]
name = "mirrors"
url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
priority = "primary"
[tool.poetry.dependencies]
[dependency-groups]
dev = [
"rust-just (>=1.43.0,<2.0.0)",
"pytest (>=9.0.1,<10.0.0)",
"pytest-asyncio (>=1.3.0,<2.0.0)"
]

View File

@ -0,0 +1,15 @@
import playwright.sync_api
def main():
with playwright.sync_api.sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto("https://www.baidu.com")
print("Playwright + Chromium 环境正常")
browser.close()
if __name__ == "__main__":
main()

Some files were not shown because too many files have changed in this diff Show More