Compare commits

...

9 Commits

Author SHA1 Message Date
561f6981aa 答题必须 At bot
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-11 01:20:24 +08:00
2632215af9 补充 MAN
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-11 01:06:11 +08:00
bfde559892 添加一个可供管理的订阅制模块,并且接入 KonaPH 2025-11-11 00:53:17 +08:00
857f8c5955 Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot 2025-11-10 22:12:33 +08:00
500053e630 更稳定的 MarkDown 和 LaTeX 生成!
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-10 21:59:45 +08:00
30cfb4cadd 添加 Justfile 相关库,简化项目启动流程 2025-11-10 21:23:41 +08:00
e2f99af73b 将浏览器依赖放在最最前面安装,以保证依赖更新时,尽可能不用重装浏览器
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-10 05:00:51 +08:00
e09de9eeb6 更改使用 uv 而非 poetry 管理 Docker 内部依赖
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-10 04:41:05 +08:00
4a3b49ce79 德摩根律(
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-09 23:47:26 +08:00
21 changed files with 2844 additions and 1330 deletions

View File

@ -2,7 +2,7 @@ FROM python:3.13-slim AS base
ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH" \
PLAYWRIGHT_BROWSERS_PATH=0
PLAYWRIGHT_BROWSERS_PATH=/usr/lib/pw-browsers
# 安装所有都需要的底层依赖
RUN apt-get update && \
@ -18,6 +18,10 @@ RUN apt-get update && \
fonts-noto-color-emoji \
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir playwright \
&& python -m playwright install chromium \
&& pip uninstall -y playwright
FROM base AS builder
@ -27,17 +31,12 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential cmake git \
&& rm -rf /var/lib/apt/lists/*
ENV POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_IN_PROJECT=1 \
POETRY_VIRTUALENVS_CREATE=1 \
POETRY_CACHE_DIR=/tmp/poetry_cache
RUN pip install --no-cache-dir uv
WORKDIR /app
RUN pip install --no-cache-dir poetry
COPY pyproject.toml poetry.lock ./
RUN python -m poetry install --no-root && rm -rf $POETRY_CACHE_DIR
RUN uv sync --no-install-project
@ -47,8 +46,6 @@ COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
WORKDIR /app
RUN python -m playwright install chromium
COPY bot.py pyproject.toml .env.prod .env.test ./
COPY assets ./assets
COPY scripts ./scripts

View File

@ -76,7 +76,7 @@ code .
使用命令行手动启动 Bot
```bash
poetry run watchfiles bot.main . --filter scripts.watch_filter.filter
poetry run just watch
```
如果你不希望自动重载,只是想运行 Bot可以直接运行

4
justfile Normal file
View File

@ -0,0 +1,4 @@
watch:
poetry run watchfiles bot.main . --filter scripts.watch_filter.filter

76
konabot/common/pager.py Normal file
View File

@ -0,0 +1,76 @@
from dataclasses import dataclass
from math import ceil
from typing import Any, Callable
from nonebot_plugin_alconna import UniMessage
@dataclass
class PagerQuery:
page_index: int
page_size: int
def apply[T](self, ls: list[T]) -> "PagerResult[T]":
if self.page_size <= 0:
return PagerResult(
success=False,
message="每页元素数量应该大于 0",
data=[],
page_count=-1,
query=self,
)
page_count = ceil(len(ls) / self.page_size)
if self.page_index <= 0 or self.page_size <= 0:
return PagerResult(
success=False,
message="页数必须大于 0",
data=[],
page_count=page_count,
query=self,
)
data = ls[(self.page_index - 1) * self.page_size: self.page_index * self.page_size]
if len(data) > 0:
return PagerResult(
success=True,
message="",
data=data,
page_count=page_count,
query=self,
)
return PagerResult(
success=False,
message="指定的页数超过最大页数",
data=data,
page_count=page_count,
query=self,
)
@dataclass
class PagerResult[T]:
data: list[T]
success: bool
message: str
page_count: int
query: PagerQuery
def to_unimessage(
self,
formatter: Callable[[T], str | UniMessage[Any]] = str,
title: str = '查询结果',
list_indicator: str = '- ',
) -> UniMessage[Any]:
msg = UniMessage.text(f'===== {title} =====\n\n')
if not self.success:
msg = msg.text(f'⚠️ {self.message}\n')
else:
for obj in self.data:
msg = msg.text(list_indicator)
msg += formatter(obj)
msg += '\n'
msg = msg.text(f'\n===== 第 {self.query.page_index} 页,共 {self.page_count} 页 =====')
return msg

View File

@ -0,0 +1,653 @@
import re
import datetime
from typing import Tuple, Optional, Dict, Any
from .err import MultipleSpecificationException, TokenUnhandledException
class Parser:
def __init__(self, now: Optional[datetime.datetime] = None):
self.now = now or datetime.datetime.now()
def digest_chinese_number(self, text: str) -> Tuple[str, int]:
if not text:
return text, 0
# Handle "两" at start
if text.startswith(""):
next_char = text[1] if len(text) > 1 else ''
if not next_char or next_char in "十百千万亿":
return text[1:], 2
s = "零一二三四五六七八九"
digits = {c: i for i, c in enumerate(s)}
i = 0
while i < len(text) and text[i] in s + "十百千万亿":
i += 1
if i == 0:
return text, 0
num_str = text[:i]
rest = text[i:]
def parse(s):
if not s:
return 0
if s == "":
return 0
if "亿" in s:
a, b = s.split("亿", 1)
return parse(a) * 100000000 + parse(b)
if "" in s:
a, b = s.split("", 1)
return parse(a) * 10000 + parse(b)
n = 0
t = 0
for c in s:
if c == "":
continue
if c in digits:
t = digits[c]
elif c == "":
if t == 0:
t = 1
n += t * 10
t = 0
elif c == "":
if t == 0:
t = 1
n += t * 100
t = 0
elif c == "":
if t == 0:
t = 1
n += t * 1000
t = 0
n += t
return n
return rest, parse(num_str)
def parse(self, text: str) -> datetime.datetime:
text = text.strip()
if not text:
raise TokenUnhandledException("Empty input")
ctx = {
"date": None,
"time": None,
"relative_delta": None,
"am_pm": None,
"period_word": None,
"has_time": False,
"has_date": False,
"ambiguous_hour": False,
"is_24hour": False,
"has_relative_date": False,
}
rest = self._parse_all(text, ctx)
if rest.strip():
raise TokenUnhandledException(f"Unparsed tokens: {rest.strip()}")
return self._apply_context(ctx)
def _parse_all(self, text: str, ctx: Dict[str, Any]) -> str:
rest = text.lstrip()
while True:
for parser in [
self._parse_absolute_date,
self._parse_relative_date,
self._parse_relative_time,
self._parse_period,
self._parse_time,
]:
new_rest = parser(rest, ctx)
if new_rest != rest:
rest = new_rest.lstrip()
break
else:
break
return rest
def _add_delta(self, ctx, delta):
if ctx["relative_delta"] is None:
ctx["relative_delta"] = delta
else:
ctx["relative_delta"] += delta
def _parse_absolute_date(self, text: str, ctx: Dict[str, Any]) -> str:
text = text.lstrip()
m = re.match(r"^(\d{4})-(\d{1,2})-(\d{1,2})T(\d{1,2}):(\d{2})", text)
if m:
y, mth, d, h, minute = map(int, m.groups())
ctx["date"] = datetime.date(y, mth, d)
ctx["time"] = datetime.time(h, minute)
ctx["has_date"] = True
ctx["has_time"] = True
ctx["is_24hour"] = True
return text[m.end():]
m = re.match(r"^(\d{4})-(\d{1,2})-(\d{1,2})", text)
if m:
y, mth, d = map(int, m.groups())
ctx["date"] = datetime.date(y, mth, d)
ctx["has_date"] = True
return text[m.end():]
m = re.match(r"^(\d{4})/(\d{1,2})/(\d{1,2})", text)
if m:
y, mth, d = map(int, m.groups())
ctx["date"] = datetime.date(y, mth, d)
ctx["has_date"] = True
return text[m.end():]
m = re.match(r"^(\d{4})年(\d{1,2})月(\d{1,2})[日号]", text)
if m:
y, mth, d = map(int, m.groups())
ctx["date"] = datetime.date(y, mth, d)
ctx["has_date"] = True
return text[m.end():]
m = re.match(r"^(\d{1,2})月(\d{1,2})[日号]", text)
if m:
mth, d = map(int, m.groups())
ctx["date"] = datetime.date(self.now.year, mth, d)
ctx["has_date"] = True
return text[m.end():]
m = re.match(r"^(.{1,3})月(.{1,3})[日号]", text)
if m:
m_str, d_str = m.groups()
_, mth = self.digest_chinese_number(m_str)
_, d = self.digest_chinese_number(d_str)
if mth == 0:
mth = 1
if d == 0:
d = 1
ctx["date"] = datetime.date(self.now.year, mth, d)
ctx["has_date"] = True
return text[m.end():]
return text
def _parse_relative_date(self, text: str, ctx: Dict[str, Any]) -> str:
text = text.lstrip()
# Handle "今天", "今晚", "今早", etc.
today_variants = [
("今晚上", "PM"),
("今晚", "PM"),
("今早", "AM"),
("今天早上", "AM"),
("今天早晨", "AM"),
("今天上午", "AM"),
("今天下午", "PM"),
("今天晚上", "PM"),
("今天", None),
]
for variant, period in today_variants:
if text.startswith(variant):
self._add_delta(ctx, datetime.timedelta(days=0))
ctx["has_relative_date"] = True
rest = text[len(variant):]
if period is not None and ctx["am_pm"] is None:
ctx["am_pm"] = period
ctx["period_word"] = variant
return rest
mapping = {
"明天": 1,
"后天": 2,
"大后天": 3,
"昨天": -1,
"前天": -2,
"大前天": -3,
}
for word, days in mapping.items():
if text.startswith(word):
self._add_delta(ctx, datetime.timedelta(days=days))
ctx["has_relative_date"] = True
return text[len(word):]
m = re.match(r"^(\d+|[零一二三四五六七八九十两]+)天(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
n = int(num_str)
else:
_, n = self.digest_chinese_number(num_str)
days = n if direction in ("", "以后", "之后") else -n
self._add_delta(ctx, datetime.timedelta(days=days))
ctx["has_relative_date"] = True
return text[m.end():]
m = re.match(r"^(本|上|下)周([一二三四五六日])", text)
if m:
scope, day = m.groups()
weekday_map = {"": 0, "": 1, "": 2, "": 3, "": 4, "": 5, "": 6}
target = weekday_map[day]
current = self.now.weekday()
if scope == "":
delta = target - current
elif scope == "":
delta = target - current - 7
else:
delta = target - current + 7
self._add_delta(ctx, datetime.timedelta(days=delta))
ctx["has_relative_date"] = True
return text[m.end():]
return text
def _parse_period(self, text: str, ctx: Dict[str, Any]) -> str:
text = text.lstrip()
period_mapping = {
"上午": "AM",
"早晨": "AM",
"早上": "AM",
"": "AM",
"中午": "PM",
"下午": "PM",
"晚上": "PM",
"": "PM",
"凌晨": "AM",
}
for word, tag in period_mapping.items():
if text.startswith(word):
if ctx["am_pm"] is not None:
raise MultipleSpecificationException("Multiple periods")
ctx["am_pm"] = tag
ctx["period_word"] = word
return text[len(word):]
return text
def _parse_time(self, text: str, ctx: Dict[str, Any]) -> str:
if ctx["has_time"]:
return text
text = text.lstrip()
# 1. H:MM pattern
m = re.match(r"^(\d{1,2}):(\d{2})", text)
if m:
h, minute = int(m.group(1)), int(m.group(2))
if 0 <= h <= 23 and 0 <= minute <= 59:
ctx["time"] = datetime.time(h, minute)
ctx["has_time"] = True
ctx["ambiguous_hour"] = 1 <= h <= 12
ctx["is_24hour"] = h > 12 or h == 0
return text[m.end():]
# 2. Parse hour part
hour = None
rest_after_hour = text
is_24hour_format = False
# Try Chinese number + 点/时
temp_rest, num = self.digest_chinese_number(text)
if num >= 0:
temp_rest_stripped = temp_rest.lstrip()
if temp_rest_stripped.startswith(""):
hour = num
is_24hour_format = False
rest_after_hour = temp_rest_stripped[1:]
elif temp_rest_stripped.startswith(""):
hour = num
is_24hour_format = True
rest_after_hour = temp_rest_stripped[1:]
if hour is None:
m = re.match(r"^(\d{1,2})\s*([点时])", text)
if m:
hour = int(m.group(1))
is_24hour_format = m.group(2) == ""
rest_after_hour = text[m.end():]
if hour is None:
if ctx.get("am_pm") is not None:
temp_rest, num = self.digest_chinese_number(text)
if 0 <= num <= 23:
hour = num
is_24hour_format = False
rest_after_hour = temp_rest.lstrip()
else:
m = re.match(r"^(\d{1,2})", text)
if m:
h_val = int(m.group(1))
if 0 <= h_val <= 23:
hour = h_val
is_24hour_format = False
rest_after_hour = text[m.end():].lstrip()
if hour is None:
return text
if not (0 <= hour <= 23):
return text
# Parse minutes
rest = rest_after_hour.lstrip()
minute = 0
minute_spec_count = 0
if rest.startswith(""):
rest = rest[1:].lstrip()
has_zheng = False
if rest.startswith(""):
has_zheng = True
rest = rest[1:].lstrip()
if rest.startswith(""):
minute = 30
minute_spec_count += 1
rest = rest[1:].lstrip()
if rest.startswith(""):
rest = rest[1:].lstrip()
if rest.startswith(""):
rest = rest[1:].lstrip()
if rest.startswith("一刻"):
minute = 15
minute_spec_count += 1
rest = rest[2:].lstrip()
if rest.startswith(""):
rest = rest[1:].lstrip()
if rest.startswith("过一刻"):
minute = 15
minute_spec_count += 1
rest = rest[3:].lstrip()
if rest.startswith(""):
rest = rest[1:].lstrip()
m = re.match(r"^(\d+|[零一二三四五六七八九十]+)分", rest)
if m:
minute_spec_count += 1
m_str = m.group(1)
if m_str.isdigit():
minute = int(m_str)
else:
_, minute = self.digest_chinese_number(m_str)
rest = rest[m.end():].lstrip()
if minute_spec_count == 0:
temp_rest, num = self.digest_chinese_number(rest)
if num > 0 and num <= 59:
minute = num
minute_spec_count += 1
rest = temp_rest.lstrip()
else:
m = re.match(r"^(\d{1,2})", rest)
if m:
m_val = int(m.group(1))
if 0 <= m_val <= 59:
minute = m_val
minute_spec_count += 1
rest = rest[m.end():].lstrip()
if has_zheng and minute_spec_count == 0:
minute_spec_count = 1
if minute_spec_count > 1:
raise MultipleSpecificationException("Multiple minute specifications")
if not (0 <= minute <= 59):
return text
# Hours 13-23 are always 24-hour, even with "点"
if hour >= 13:
is_24hour_format = True
ctx["time"] = datetime.time(hour, minute)
ctx["has_time"] = True
ctx["ambiguous_hour"] = 1 <= hour <= 12 and not is_24hour_format
ctx["is_24hour"] = is_24hour_format
return rest
def _parse_relative_time(self, text: str, ctx: Dict[str, Any]) -> str:
text = text.lstrip()
# 半小时
m = re.match(r"^(半)(?:个)?小时?(后|前|以后|之后)", text)
if m:
direction = m.group(2)
hours = 0.5
delta = datetime.timedelta(
hours=hours if direction in ("", "以后", "之后") else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
# X个半
m = re.match(r"^([0-9零一二三四五六七八九十两]+)个半(?:小时?)?(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
base_hours = int(num_str)
else:
_, base_hours = self.digest_chinese_number(num_str)
if base_hours == 0 and num_str != "":
return text
if base_hours <= 0:
return text
hours = base_hours + 0.5
delta = datetime.timedelta(
hours=hours if direction in ("", "以后", "之后") else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
# 一个半
m = re.match(r"^(一个半)小时?(后|前|以后|之后)", text)
if m:
direction = m.group(2)
hours = 1.5
delta = datetime.timedelta(
hours=hours if direction in ("", "以后", "之后") else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
# X小时
m = re.match(r"^([0-9零一二三四五六七八九十两]+)(?:个)?小时?(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
hours = int(num_str)
else:
_, hours = self.digest_chinese_number(num_str)
if hours == 0 and num_str != "":
return text
if hours <= 0:
return text
delta = datetime.timedelta(
hours=hours if direction in ("", "以后", "之后") else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)(?:个)?小时(后|前)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
hours = int(num_str)
else:
_, hours = self.digest_chinese_number(num_str)
if hours == 0 and num_str != "":
return text
if hours <= 0:
return text
delta = datetime.timedelta(
hours=hours if direction == "" else -hours
)
self._add_delta(ctx, delta)
return text[m.end():]
# X分钟
m = re.match(r"^([0-9零一二三四五六七八九十两]+)分钟?(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
minutes = int(num_str)
else:
_, minutes = self.digest_chinese_number(num_str)
if minutes == 0 and num_str != "":
return text
if minutes <= 0:
return text
delta = datetime.timedelta(
minutes=minutes if direction in ("", "以后", "之后") else -minutes
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)分(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
minutes = int(num_str)
else:
_, minutes = self.digest_chinese_number(num_str)
if minutes == 0 and num_str != "":
return text
if minutes <= 0:
return text
delta = datetime.timedelta(
minutes=minutes if direction in ("", "以后", "之后") else -minutes
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)分钟?(后|前)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
minutes = int(num_str)
else:
_, minutes = self.digest_chinese_number(num_str)
if minutes == 0 and num_str != "":
return text
if minutes <= 0:
return text
delta = datetime.timedelta(
minutes=minutes if direction == "" else -minutes
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)分(后|前)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
minutes = int(num_str)
else:
_, minutes = self.digest_chinese_number(num_str)
if minutes == 0 and num_str != "":
return text
if minutes <= 0:
return text
delta = datetime.timedelta(
minutes=minutes if direction == "" else -minutes
)
self._add_delta(ctx, delta)
return text[m.end():]
# === 秒级支持 ===
m = re.match(r"^([0-9零一二三四五六七八九十两]+)秒(后|前|以后|之后)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
seconds = int(num_str)
else:
_, seconds = self.digest_chinese_number(num_str)
if seconds == 0 and num_str != "":
return text
if seconds <= 0:
return text
delta = datetime.timedelta(
seconds=seconds if direction in ("", "以后", "之后") else -seconds
)
self._add_delta(ctx, delta)
return text[m.end():]
m = re.match(r"^([0-9零一二三四五六七八九十两]+)秒(后|前)", text)
if m:
num_str, direction = m.groups()
if num_str.isdigit():
seconds = int(num_str)
else:
_, seconds = self.digest_chinese_number(num_str)
if seconds == 0 and num_str != "":
return text
if seconds <= 0:
return text
delta = datetime.timedelta(
seconds=seconds if direction == "" else -seconds
)
self._add_delta(ctx, delta)
return text[m.end():]
return text
def _apply_context(self, ctx: Dict[str, Any]) -> datetime.datetime:
result = self.now
has_date = ctx["has_date"]
has_time = ctx["has_time"]
has_delta = ctx["relative_delta"] is not None
has_relative_date = ctx["has_relative_date"]
if has_delta:
result = result + ctx["relative_delta"]
if has_date:
result = result.replace(
year=ctx["date"].year,
month=ctx["date"].month,
day=ctx["date"].day,
)
if has_time:
h = ctx["time"].hour
m = ctx["time"].minute
if ctx["is_24hour"]:
# "10 时" → 10:00, no conversion
pass
elif ctx["am_pm"] == "AM":
if h == 12:
h = 0
elif ctx["am_pm"] == "PM":
if h == 12:
if ctx.get("period_word") in ("晚上", ""):
h = 0
result += datetime.timedelta(days=1)
else:
h = 12
elif 1 <= h <= 11:
h += 12
else:
# No period and not 24-hour (i.e., "点" format)
if ctx["has_relative_date"]:
# "明天五点" → 05:00 AM
if h == 12:
h = 0
# keep h as AM hour (1-11 unchanged)
else:
# Infer from current time
am_hour = 0 if h == 12 else h
candidate_am = result.replace(hour=am_hour, minute=m, second=0, microsecond=0)
if candidate_am < self.now:
# AM time is in the past, so use PM
if h == 12:
h = 12
else:
h += 12
# else: keep as AM (h unchanged)
if h > 23:
h = h % 24
result = result.replace(hour=h, minute=m, second=0, microsecond=0)
else:
if has_date or (has_relative_date and not has_time):
result = result.replace(hour=0, minute=0, second=0, microsecond=0)
return result
def parse(text: str) -> datetime.datetime:
return Parser().parse(text)

View File

@ -0,0 +1,11 @@
class PTimeParseException(Exception):
...
class TokenUnhandledException(PTimeParseException):
...
class MultipleSpecificationException(PTimeParseException):
...
class OutOfRangeSpecificationException(PTimeParseException):
...

View File

@ -5,6 +5,7 @@ from pydantic import BaseModel
class Config(BaseModel):
module_web_render_weburl: str = "localhost:5173"
module_web_render_instance: str = ""
module_web_render_playwright_ws: str = ""
def get_instance_baseurl(self):
if self.module_web_render_instance:

View File

@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
import asyncio
import queue
from typing import Any, Callable, Coroutine
from typing import Any, Callable, Coroutine, Generic, TypeVar
from loguru import logger
from playwright.async_api import (
Page,
@ -8,9 +9,14 @@ from playwright.async_api import (
async_playwright,
Browser,
BrowserContext,
Error as PlaywrightError,
)
from .config import web_render_config
from playwright.async_api import ConsoleMessage, Page
T = TypeVar("T")
TFunction = Callable[[T], Coroutine[Any, Any, Any]]
PageFunction = Callable[[Page], Coroutine[Any, Any, Any]]
@ -22,23 +28,17 @@ class WebRenderer:
@classmethod
async def get_browser_instance(cls) -> "WebRendererInstance":
if cls.browser_pool.empty():
instance = await WebRendererInstance.create()
if web_render_config.module_web_render_playwright_ws:
instance = await RemotePlaywrightInstance.create(
web_render_config.module_web_render_playwright_ws
)
else:
instance = await LocalPlaywrightInstance.create()
cls.browser_pool.put(instance)
instance = cls.browser_pool.get()
cls.browser_pool.put(instance)
return instance
@classmethod
async def get_browser_context(cls) -> BrowserContext:
instance = await cls.get_browser_instance()
if id(instance) not in cls.context_pool:
context = await instance.browser.new_context()
cls.context_pool[id(instance)] = context
logger.debug(
f"Created new persistent browser context for WebRendererInstance {id(instance)}"
)
return cls.context_pool[id(instance)]
@classmethod
async def render(
cls,
@ -67,49 +67,6 @@ class WebRenderer:
url, target, params=params, other_function=other_function, timeout=timeout
)
@classmethod
async def render_persistent_page(
cls,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
"""
使用长期挂载的页面访问指定URL并返回截图
:param page_id: 页面唯一标识符
:param url: 目标URL
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
"""
logger.debug(
f"Requesting persistent render for page_id {page_id} at {url} targeting {target} with timeout {timeout}"
)
instance = await cls.get_browser_instance()
if page_id not in cls.page_pool:
context = await cls.get_browser_context()
page = await context.new_page()
cls.page_pool[page_id] = page
logger.debug(
f"Created new persistent page for page_id {page_id} using WebRendererInstance {id(instance)}"
)
page = cls.page_pool[page_id]
return await instance.render_with_page(
page,
url,
target,
params=params,
other_function=other_function,
timeout=timeout,
)
@classmethod
async def render_file(
cls,
@ -142,6 +99,75 @@ class WebRenderer:
timeout=timeout,
)
@classmethod
async def render_with_persistent_page(
cls,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
"""
使用长期挂载的页面进行渲染
:param page_id: 页面唯一标识符
:param target: 渲染目标,如 ".box""#main" 等CSS选择器
:param timeout: 页面加载超时时间,单位秒
:param params: URL键值对参数
:param other_function: 其他自定义操作函数接受page参数
:return: 截图的字节数据
"""
instance = await cls.get_browser_instance()
logger.debug(
f"Using WebRendererInstance {id(instance)} to render with persistent page {page_id} targeting {target}"
)
return await instance.render_with_persistent_page(
page_id,
url,
target,
params=params,
other_function=other_function,
timeout=timeout,
)
@classmethod
async def get_persistent_page(cls, page_id: str, url: str) -> Page:
"""
获取长期挂载的页面,如果不存在则创建一个新的页面并存储
"""
if page_id in cls.page_pool:
return cls.page_pool[page_id]
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
instance = await cls.get_browser_instance()
if isinstance(instance, RemotePlaywrightInstance):
context = await instance.browser.new_context()
page = await context.new_page()
await page.goto(url)
cls.page_pool[page_id] = page
logger.debug(f"Created new persistent page for page_id {page_id}, navigated to {url}")
page.on('console', on_console)
return page
elif isinstance(instance, LocalPlaywrightInstance):
context = await instance.browser.new_context()
page = await context.new_page()
await page.goto(url)
cls.page_pool[page_id] = page
logger.debug(f"Created new persistent page for page_id {page_id}, navigated to {url}")
page.on('console', on_console)
return page
else:
raise NotImplementedError("Unsupported WebRendererInstance type")
@classmethod
async def close_persistent_page(cls, page_id: str) -> None:
"""
@ -156,38 +182,56 @@ class WebRenderer:
logger.debug(f"Closed and removed persistent page for page_id {page_id}")
class WebRendererInstance:
def __init__(self):
self._playwright: Playwright | None = None
self._browser: Browser | None = None
class WebRendererInstance(ABC, Generic[T]):
@abstractmethod
async def render(
self,
url: str,
target: str,
index: int = 0,
params: dict[str, Any] | None = None,
other_function: TFunction | None = None,
timeout: int = 30,
) -> bytes: ...
@abstractmethod
async def render_file(
self,
file_path: str,
target: str,
index: int = 0,
params: dict[str, Any] | None = None,
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes: ...
@abstractmethod
async def render_with_persistent_page(
self,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes: ...
class PlaywrightInstance(WebRendererInstance[Page]):
def __init__(self) -> None:
super().__init__()
self.lock = asyncio.Lock()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None
return self._browser
async def init(self):
self._playwright = await async_playwright().start()
self._browser = await self.playwright.chromium.launch(headless=True)
@classmethod
async def create(cls) -> "WebRendererInstance":
instance = cls()
await instance.init()
return instance
@abstractmethod
def browser(self) -> Browser: ...
async def render(
self,
url: str,
target: str,
index: int = 0,
params: dict = {},
params: dict[str, Any] | None = None,
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
@ -207,42 +251,41 @@ class WebRendererInstance:
context = await self.browser.new_context()
page = await context.new_page()
screenshot = await self.inner_render(
page, url, target, index, params, other_function, timeout
page, url, target, index, params or {}, other_function, timeout
)
await page.close()
await context.close()
return screenshot
async def render_with_page(
self,
page: Page,
url: str,
target: str,
index: int = 0,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
async with self.lock:
screenshot = await self.inner_render(
page, url, target, index, params, other_function, timeout
)
return screenshot
async def render_file(
self,
file_path: str,
target: str,
index: int = 0,
params: dict = {},
params: dict[str, Any] | None = None,
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
file_path = "file:///" + str(file_path).replace("\\", "/")
return await self.render(
file_path, target, index, params, other_function, timeout
file_path, target, index, params or {}, other_function, timeout
)
async def render_with_persistent_page(
self,
page_id: str,
url: str,
target: str,
params: dict = {},
other_function: PageFunction | None = None,
timeout: int = 30,
) -> bytes:
page = await WebRenderer.get_persistent_page(page_id, url)
screenshot = await self.inner_render(
page, url, target, 0, params, other_function, timeout
)
return screenshot
async def inner_render(
self,
page: Page,
@ -276,6 +319,85 @@ class WebRendererInstance:
logger.debug("Screenshot taken successfully")
return screenshot
class LocalPlaywrightInstance(PlaywrightInstance):
def __init__(self):
self._playwright: Playwright | None = None
self._browser: Browser | None = None
super().__init__()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None
return self._browser
async def init(self):
self._playwright = await async_playwright().start()
self._browser = await self.playwright.chromium.launch(headless=True)
@classmethod
async def create(cls) -> "WebRendererInstance":
instance = cls()
await instance.init()
return instance
async def close(self):
await self.browser.close()
await self.playwright.stop()
class RemotePlaywrightInstance(PlaywrightInstance):
def __init__(self, ws_endpoint: str) -> None:
self._playwright: Playwright | None = None
self._browser: Browser | None = None
self._ws_endpoint = ws_endpoint
super().__init__()
@property
def playwright(self) -> Playwright:
assert self._playwright is not None, "Playwright must be initialized by calling init()."
return self._playwright
@property
def browser(self) -> Browser:
assert self._browser is not None, "Browser must be connected by calling init()."
return self._browser
async def init(self):
logger.info(f"尝试连接远程 Playwright 服务器: {self._ws_endpoint}")
self._playwright = await async_playwright().start()
try:
self._browser = await self.playwright.chromium.connect(
self._ws_endpoint
)
logger.info("成功连接到远程 Playwright 服务器。")
except PlaywrightError as e:
await self.playwright.stop()
raise ConnectionError(
f"无法连接到远程 Playwright 服务器 ({self._ws_endpoint}){e}"
) from e
@classmethod
async def create(cls, ws_endpoint: str) -> "RemotePlaywrightInstance":
"""
创建并初始化远程 Playwright 实例的工厂方法。
"""
instance = cls(ws_endpoint)
await instance.init()
return instance
async def close(self):
"""
断开与远程浏览器的连接并停止本地 Playwright 实例。
"""
if self._browser:
await self.browser.close()
if self._playwright:
await self.playwright.stop()
print("已断开远程连接,本地 Playwright 实例已停止。")

View File

@ -0,0 +1,22 @@
指令介绍
订阅 - 收听此方 BOT 的自动消息发送
格式
订阅 <频道名称>
取消订阅 <频道名称>
查询订阅 [页码]
可用订阅 [页码]
示例
`订阅 此方谜题`
在当前的聊天上下文订阅「此方谜题」频道。此后会每天推送此方谜题(由 konaph(8) 管理的)。
如果你是私聊,则能够每天发送此方谜题到你的私聊;如果在群聊中使用该指令,则会每天发送题目到这个群里面。
`取消订阅 此方谜题`
取消订阅「此方谜题」频道。
`查询订阅`
查询当前聊天上下文订阅的所有频道。
`可用订阅 2`
查询所有可用的订阅的第二页

View File

@ -4,33 +4,32 @@ from math import ceil
from loguru import logger
from nonebot import on_message
import nonebot
from nonebot.rule import to_me
from nonebot_plugin_alconna import (Alconna, Args, UniMessage, UniMsg,
on_alconna)
from nonebot_plugin_apscheduler import scheduler
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.nb.qq_broadcast import qq_broadcast
from konabot.plugins.kona_ph.core.message import (get_daily_report,
get_daily_report_v2,
get_puzzle_description,
get_submission_message)
from konabot.plugins.kona_ph.core.storage import get_today_date
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE, config,
from konabot.plugins.kona_ph.manager import (PUZZLE_PAGE_SIZE,
create_admin_commands,
puzzle_manager)
from konabot.plugins.poster.poster_info import PosterInfo, register_poster_info
from konabot.plugins.poster.service import broadcast
create_admin_commands()
register_poster_info("每日谜题", info=PosterInfo(
aliases={"konaph", "kona_ph", "KonaPH", "此方谜题", "KONAPH"},
description="此方 BOT 每日谜题推送",
))
async def is_play_group(target: DepLongTaskTarget):
if target.is_private_chat:
return True
if target.channel_id in config.plugin_puzzle_playgroup:
return True
return False
cmd_submit = on_message(rule=is_play_group)
cmd_submit = on_message(rule=to_me())
@cmd_submit.handle()
@ -52,7 +51,7 @@ async def _(msg: UniMsg, target: DepLongTaskTarget):
cmd_query = on_alconna(Alconna(
r"re:(?:((?:(?:所以|话)说?)?今天的题目是什么[啊呀哇呢]?(?:\?)?)|今日谜?题目?)"
), rule=is_play_group)
), rule=to_me())
@cmd_query.handle()
async def _(target: DepLongTaskTarget):
@ -65,7 +64,7 @@ async def _(target: DepLongTaskTarget):
cmd_query_submission = on_alconna(Alconna(
"今日答题情况"
), rule=is_play_group)
), rule=to_me())
@cmd_query_submission.handle()
async def _(target: DepLongTaskTarget):
@ -80,7 +79,7 @@ cmd_history = on_alconna(Alconna(
"历史题目",
Args["page?", int],
Args["index_id?", str],
), rule=is_play_group)
), rule=to_me())
@cmd_history.handle()
async def _(target: DepLongTaskTarget, index_id: str = "", page: int = 1):
@ -125,11 +124,15 @@ async def _():
yesterday = get_today_date() - datetime.timedelta(days=1)
msg2 = get_daily_report(manager, yesterday)
if msg2 is not None:
await qq_broadcast(config.plugin_puzzle_playgroup, msg2)
await broadcast("每日谜题", msg2)
puzzle = manager.get_today_puzzle()
if puzzle is not None:
logger.info(f"找到了题目 {puzzle.raw_id},发送")
await qq_broadcast(config.plugin_puzzle_playgroup, get_puzzle_description(puzzle))
await broadcast("每日谜题", get_puzzle_description(puzzle))
else:
logger.info("自动任务:没有找到题目,跳过")
driver = nonebot.get_driver()

View File

@ -7,15 +7,11 @@ from nonebot_plugin_alconna import (
UniMsg
)
from playwright.async_api import ConsoleMessage, Page
from konabot.common.web_render import konaweb
from konabot.common.web_render.core import WebRenderer
from konabot.plugins.markdown.core import MarkDownCore
def is_markdown_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
def is_markdown_mentioned(msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "markdown" not in txt[:10] or "md" not in txt[:3]:
if "markdown" not in txt[:8] and "md" not in txt[:2]:
return False
return True
@ -29,7 +25,7 @@ async def _(msg: UniMsg, event: BaseEvent):
content = msg.extract_plain_text()
logger.debug(f"Received markdown command with content: {content}")
if "md" in content[:3]:
if "md" in content[:2]:
message = content.replace("md", "", 1).strip()
else:
message = content.replace("markdown", "", 1).strip()
@ -48,7 +44,7 @@ async def _(msg: UniMsg, event: BaseEvent):
def is_latex_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "latex" not in txt[:8]:
if "latex" not in txt[:5]:
return False
return True

View File

@ -8,19 +8,18 @@ class MarkDownCore:
@staticmethod
async def render_markdown(markdown_text: str, theme: str = "dark", params: dict = {}) -> bytes:
async def page_function(page: Page):
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
await page.emulate_media(color_scheme=theme)
page.on('console', on_console)
await page.locator('textarea[name=content]').fill(markdown_text)
await page.wait_for_timeout(200)
await page.locator('#button').click()
await page.wait_for_timeout(200)
# 等待 checkState 函数加载完成
await page.wait_for_function("typeof checkState === 'function'", timeout=1000)
# 访问 checkState 函数,确保渲染完成
await page.wait_for_function("checkState() === true", timeout=1000)
out = await WebRenderer.render(
out = await WebRenderer.render_with_persistent_page(
"markdown_renderer",
konaweb('markdown'),
target='#main',
other_function=page_function,
@ -32,22 +31,23 @@ class MarkDownCore:
@staticmethod
async def render_latex(text: str, theme: str = "dark") -> bytes:
params = {
"size": "2em",
"size": "2.5em",
}
async def page_function(page: Page):
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
await page.emulate_media(color_scheme=theme)
page.on('console', on_console)
page.wait_for_selector('textarea[name=content]')
await page.locator('textarea[name=content]').fill(f"$$ {text} $$")
await page.wait_for_timeout(200)
page.wait_for_selector('#button')
await page.locator('#button').click()
await page.wait_for_timeout(200)
# 等待 checkState 函数加载完成
await page.wait_for_function("typeof checkState === 'function'", timeout=2000)
# 访问 checkState 函数,确保渲染完成
await page.wait_for_function("checkState() === true", timeout=10000)
out = await WebRenderer.render(
out = await WebRenderer.render_with_persistent_page(
"latex_renderer",
konaweb('latex'),
target='#main',
other_function=page_function,

View File

@ -0,0 +1,85 @@
import nonebot
from nonebot_plugin_alconna import Alconna, Args, on_alconna
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.pager import PagerQuery
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.service import dep_poster_service
cmd_subscribe = on_alconna(Alconna(
"订阅",
Args["channel", str],
))
@cmd_subscribe.handle()
async def _(target: DepLongTaskTarget, channel: str):
async with dep_poster_service() as service:
result = await service.subscribe(channel, target)
if result:
await target.send_message(f"已订阅「{channel}")
else:
await target.send_message(f"已经订阅过「{channel}」了")
cmd_list = on_alconna(Alconna(
"re:(?:查询|我的|获取)订阅(列表)?",
Args["page?", int],
))
def better_channel_message(channel_id: str) -> str:
if channel_id not in POSTER_INFO_DATA:
return channel_id
data = POSTER_INFO_DATA[channel_id]
return f"{channel_id}{data.description}"
@cmd_list.handle()
async def _(target: DepLongTaskTarget, page: int = 1):
async with dep_poster_service() as service:
result = await service.get_channels(target, PagerQuery(
page_index=page,
page_size=10,
))
await target.send_message(result.to_unimessage(title="订阅列表", formatter=better_channel_message))
cmd_list_available = on_alconna(Alconna(
"re:(查询)?可用订阅(列表)?",
Args["page?", int],
))
@cmd_list_available.handle()
async def _(target: DepLongTaskTarget, page: int = 1):
result = PagerQuery(
page_index=page,
page_size=10,
).apply(sorted(POSTER_INFO_DATA.keys()))
await target.send_message(result.to_unimessage(title="可用订阅列表", formatter=better_channel_message))
cmd_unsubscribe = on_alconna(Alconna(
"取消订阅",
Args["channel", str],
))
@cmd_unsubscribe.handle()
async def _(target: DepLongTaskTarget, channel: str):
async with dep_poster_service() as service:
result = await service.subscribe(channel, target)
if result:
await target.send_message(f"已取消订阅「{channel}")
else:
await target.send_message(f"这里没有订阅过「{channel}")
driver = nonebot.get_driver()
@driver.on_startup
async def _():
async with dep_poster_service() as service:
await service.fix_data()

View File

@ -0,0 +1,15 @@
from dataclasses import dataclass, field
@dataclass
class PosterInfo:
aliases: set[str] = field(default_factory=set)
description: str = field(default='')
POSTER_INFO_DATA: dict[str, PosterInfo] = {}
def register_poster_info(channel: str, info: PosterInfo):
POSTER_INFO_DATA[channel] = info

View File

@ -0,0 +1,112 @@
import asyncio
from contextlib import asynccontextmanager
from typing import Annotated
from nonebot.params import Depends
from pydantic import BaseModel, ValidationError
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
from konabot.common.path import DATA_PATH
from konabot.plugins.poster.repository import IPosterRepo
class ChannelData(BaseModel):
targets: list[LongTaskTarget] = []
class PosterData(BaseModel):
channels: dict[str, ChannelData] = {}
def is_the_same_target(target1: LongTaskTarget, target2: LongTaskTarget) -> bool:
if (target1.is_private_chat and not target2.is_private_chat):
return False
if (target2.is_private_chat and not target1.is_private_chat):
return False
if target1.platform != target2.platform:
return False
# 如果是群聊,则要求 channel_id 相同
if not target1.is_private_chat:
return target1.channel_id == target2.channel_id
return target1.target_id == target2.target_id
class LocalPosterRepo(IPosterRepo):
def __init__(self, data: PosterData) -> None:
self.data = data
super().__init__()
async def get_channel_targets(self, channel: str) -> list[LongTaskTarget]:
if channel not in self.data.channels:
self.data.channels[channel] = ChannelData()
return self.data.channels[channel].targets
async def add_channel_target(self, channel: str, target: LongTaskTarget) -> bool:
targets = await self.get_channel_targets(channel)
for t in targets:
if is_the_same_target(t, target):
return False
targets.append(target)
return True
async def remove_channel_target(self, channel: str, target: LongTaskTarget) -> bool:
targets = await self.get_channel_targets(channel)
len0 = len(targets)
self.data.channels[channel].targets = [
t for t in targets if not is_the_same_target(t, target)
]
len1 = len(self.data.channels[channel].targets)
return len0 != len1
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
channels: list[str] = []
for channel_id, channel in self.data.channels.items():
for t in channel.targets:
if is_the_same_target(target, t):
channels.append(channel_id)
break
channels = sorted(channels)
return pager.apply(channels)
async def merge_channel(self, from_channel: str, to_channel: str) -> None:
channel_from = await self.get_channel_targets(from_channel)
channel_to = await self.get_channel_targets(to_channel)
for t1 in channel_from:
flag = True
for t2 in channel_to:
if is_the_same_target(t1, t2):
flag = False
break
if flag:
channel_to.append(t1)
del self.data.channels[from_channel]
LOCAL_POSTER_DATA_LOCK = asyncio.Lock()
LOCAL_POSTER_DATA_PATH = DATA_PATH / "module_poster_data.json"
@asynccontextmanager
async def local_poster_data():
async with LOCAL_POSTER_DATA_LOCK:
if not LOCAL_POSTER_DATA_PATH.exists():
data = PosterData()
else:
try:
data = PosterData.model_validate_json(LOCAL_POSTER_DATA_PATH.read_text())
except ValidationError:
data = PosterData()
yield data
LOCAL_POSTER_DATA_PATH.write_text(data.model_dump_json())
@asynccontextmanager
async def local_poster():
async with local_poster_data() as data:
yield LocalPosterRepo(data)
DepLocalPosterRepo = Annotated[LocalPosterRepo, Depends(local_poster)]

View File

@ -0,0 +1,37 @@
from abc import ABC, abstractmethod
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
class IPosterRepo(ABC):
@abstractmethod
async def get_channel_targets(self, channel: str) -> list[LongTaskTarget]:
"""
获取广播通道的所有广播对象
"""
@abstractmethod
async def add_channel_target(self, channel: str, target: LongTaskTarget) -> bool:
"""
向广播通道添加一个广播目标。若目标已存在,则返回 False
"""
@abstractmethod
async def remove_channel_target(self, channel: str, target: LongTaskTarget) -> bool:
"""
移除一个广播通道的目标。若目标不存在,则返回 False
"""
@abstractmethod
async def get_subscribed_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
"""
获得一个目标已经订阅了的广播通道
"""
@abstractmethod
async def merge_channel(self, from_channel: str, to_channel: str) -> None:
"""
合并两个 Channel 为一个,并移除另一个
"""

View File

@ -0,0 +1,59 @@
from contextlib import asynccontextmanager
from typing import Annotated, Any
from nonebot.params import Depends
from nonebot_plugin_alconna import UniMessage
from konabot.common.longtask import LongTaskTarget
from konabot.common.pager import PagerQuery, PagerResult
from konabot.plugins.poster.poster_info import POSTER_INFO_DATA
from konabot.plugins.poster.repo_local_data import local_poster
from konabot.plugins.poster.repository import IPosterRepo
class PosterService:
def __init__(self, repo: IPosterRepo) -> None:
self.repo = repo
def parse_channel_id(self, channel: str):
for cid, cinfo in POSTER_INFO_DATA.items():
if channel in cinfo.aliases:
return cid
return channel
async def subscribe(self, channel: str, target: LongTaskTarget) -> bool:
channel = self.parse_channel_id(channel)
return await self.repo.add_channel_target(channel, target)
async def unsubscribe(self, channel: str, target: LongTaskTarget) -> bool:
channel = self.parse_channel_id(channel)
return await self.repo.remove_channel_target(channel, target)
async def broadcast(self, channel: str, message: UniMessage[Any] | str) -> list[LongTaskTarget]:
channel = self.parse_channel_id(channel)
targets = await self.repo.get_channel_targets(channel)
for target in targets:
# 因为是订阅消息,就不要 At 对方了
await target.send_message(message, at=False)
return targets
async def get_channels(self, target: LongTaskTarget, pager: PagerQuery) -> PagerResult[str]:
return await self.repo.get_subscribed_channels(target, pager)
async def fix_data(self):
for cid, cinfo in POSTER_INFO_DATA.items():
for alias in cinfo.aliases:
await self.repo.merge_channel(alias, cid)
@asynccontextmanager
async def dep_poster_service():
async with local_poster() as repo:
yield PosterService(repo)
async def broadcast(channel: str, message: UniMessage[Any] | str):
async with dep_poster_service() as service:
return await service.broadcast(channel, message)
DepPosterService = Annotated[PosterService, Depends(dep_poster_service)]

View File

@ -6,7 +6,6 @@ from typing import Any
import nanoid
import nonebot
import ptimeparse
from loguru import logger
from nonebot import get_plugin_config, on_message
from nonebot.adapters import Event
@ -14,6 +13,7 @@ from nonebot_plugin_alconna import Alconna, Args, Subcommand, UniMessage, UniMsg
from pydantic import BaseModel
from konabot.common.longtask import DepLongTaskTarget, LongTask, create_longtask, handle_long_task, longtask_data
from konabot.common.ptimeparse import Parser
evt = on_message()
@ -84,7 +84,7 @@ async def _(msg: UniMsg, mEvt: Event, target: DepLongTaskTarget):
notify_time, notify_text = segments
try:
target_time = ptimeparse.Parser().parse(notify_time)
target_time = Parser().parse(notify_time)
logger.info(f"{notify_time} 解析出了时间:{target_time}")
except Exception:
logger.info(f"无法从 {notify_time} 中解析出时间")

2657
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,6 @@ name = "konabot"
version = "0.1.0"
description = "在 MTTU 内部使用的 bot"
authors = [{ name = "passthem", email = "Passthem183@gmail.com" }]
readme = "README.md"
requires-python = ">=3.12,<4.0"
dependencies = [
"nonebot2[all] (>=2.4.3,<3.0.0)",
@ -23,25 +22,28 @@ dependencies = [
"skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)",
"ptimeparse (>=0.2.1,<0.3.0)",
"nanoid (>=2.0.0,<3.0.0)",
"opencc (>=1.1.9,<2.0.0)",
"playwright (>=1.55.0,<2.0.0)",
"openai (>=2.7.1,<3.0.0)",
]
[tool.poetry]
package-mode = false
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
[[tool.poetry.source]]
name = "pt-gitea-pypi"
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
priority = "supplemental"
[[tool.poetry.source]]
name = "mirrors"
url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
priority = "primary"
[tool.poetry.dependencies]
[dependency-groups]
dev = [
"rust-just (>=1.43.0,<2.0.0)"
]

View File

@ -8,6 +8,8 @@ base = Path(__file__).parent.parent.absolute()
def filter(change: Change, path: str) -> bool:
if "__pycache__" in path:
return False
if Path(path).absolute().is_relative_to(base / "data"):
if Path(path).absolute().is_relative_to((base / "data").absolute()):
return False
if Path(path).absolute().is_relative_to((base / ".git").absolute()):
return False
return True