Compare commits

...

6 Commits

Author SHA1 Message Date
d37c4870d8 Merge branch 'master' into feature/sugar 2026-03-18 17:38:59 +08:00
23b9f101b3 语法糖 2026-03-18 17:29:42 +08:00
8c1651ad3d 忘记 await 相关权限了,导致永远判 True
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-18 16:29:36 +08:00
ff60642c62 Merge pull request 'feat: add TRPG roll command' (#59) from pi-agent/konabot:feat/trpg-roll into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #59
Reviewed-by: 钟晓帕 <Passthem183@gmail.com>
2026-03-14 02:19:15 +08:00
69b5908445 refactor: narrow trpg roll message matching 2026-03-14 02:17:20 +08:00
a542ed1fd9 feat: add TRPG roll command 2026-03-14 02:02:41 +08:00
12 changed files with 589 additions and 3 deletions

View File

@ -1,3 +1,5 @@
{
"python.REPL.enableREPLSmartSend": false
"python.REPL.enableREPLSmartSend": false,
"python-envs.defaultEnvManager": "ms-python.python:poetry",
"python-envs.defaultPackageManager": "ms-python.python:poetry"
}

View File

@ -16,6 +16,7 @@
- `konabot/common/permsys/__init__.py`
- 暴露 `PermManager``DepPermManager``require_permission`
- 负责数据库初始化、启动迁移、超级管理员默认授权
- 提供 `register_default_allow_permission()` 用于注册“启动时默认放行”的权限键
- `konabot/common/permsys/entity.py`
- 定义 `PermEntity`
- 将事件转换为可查询的实体链
@ -134,6 +135,14 @@ PermEntity("ob11", "user", str(account)), "*", True
也就是说,配置中的超级管理员会直接拥有全部权限。
此外,模块也支持插件在导入阶段通过 `register_default_allow_permission("some.key")` 注册默认放行的权限键;这些键会在启动时被写入到:
```python
PermEntity("sys", "global", "global"), "some.key", True
```
这适合“默认所有人可用,但仍希望后续能被权限系统单独关闭”的功能。
这属于启动时自动灌入的保底策略,不依赖手工授权命令。
## 在插件中使用

View File

@ -14,6 +14,7 @@ from konabot.common.permsys.repo import PermRepo
db = DatabaseManager(DATA_PATH / "perm.sqlite3")
_default_allow_permissions: set[str] = set()
_EntityLike = Event | PermEntity | list[PermEntity]
@ -91,6 +92,10 @@ def create_startup(): # pragma: no cover
await pm.update_permission(
PermEntity("ob11", "user", str(account)), "*", True
)
for key in _default_allow_permissions:
await pm.update_permission(
PermEntity("sys", "global", "global"), key, True
)
@driver.on_shutdown
async def _():
@ -103,6 +108,10 @@ def create_startup(): # pragma: no cover
DepPermManager = Annotated[PermManager, Depends(perm_manager)]
def register_default_allow_permission(key: str):
_default_allow_permissions.add(key)
def require_permission(perm: str) -> Rule: # pragma: no cover
async def check_permission(event: Event, pm: DepPermManager) -> bool:
return await pm.check_has_permission(event, perm)

View File

@ -0,0 +1,53 @@
**roll** - 面向跑团的文本骰子指令
## 用法
`roll 表达式`
支持常见骰子写法:
- `roll 3d6`
- `roll d20+5`
- `roll 2d8+1d4+3`
- `roll d%`
- `roll 4dF`
## 说明
- `NdM` 表示掷 N 个 M 面骰,例如 `3d6`
- `d20` 等价于 `1d20`
- `d%` 表示百分骰,范围 1 到 100
- `dF` 表示 Fate/Fudge 骰,单骰结果为 -1、0、+1
- 支持用 `+`、`-` 连接多个项,也支持常数修正
## 返回格式
会返回总结果,以及每一项的明细。
例如:
- `roll 3d6`
可能返回:
- `3d6 = 11`
- `+3d6=[2, 4, 5]`
- `roll d20+5`
可能返回:
- `d20+5 = 19`
- `+1d20=[14] +5=5`
## 限制
为防止刷屏和滥用,当前实现会限制:
- 单项最多 100 个骰子
- 单个骰子最多 1000 面
- 一次表达式最多 20 项
- 一次表达式最多实际掷 200 个骰子
- 结果过长时会直接拒绝
## 权限
需要 `trpg.roll` 权限。
默认启动时会给系统全局授予允许,因此通常所有人都能用;如有需要可再用权限系统单独关闭。

View File

@ -15,10 +15,12 @@ class THQwen(TextHandler):
self, env: TextHandlerEnvironment, istream: str | None, args: list[str]
) -> TextHandleResult:
pm = perm_manager()
if env.event is None or not pm.check_has_permission(env.event, "textfx.qwen"):
if env.event is None or not await pm.check_has_permission(
env.event, "textfx.qwen"
):
return TextHandleResult(
code=1,
ostream="这里暂未开启 AI 功能",
ostream="你或当前环境没有使用 qwen 的权限。如有疑问请联系管理员",
)
llm = get_llm()

View File

@ -0,0 +1,210 @@
import copy
import re
from pathlib import Path
import nonebot
from nonebot import on_command
from nonebot.adapters import Bot, Event, Message
from nonebot.log import logger
from nonebot.message import handle_event
from nonebot.params import CommandArg
from konabot.common.database import DatabaseManager
from konabot.common.longtask import DepLongTaskTarget
ROOT_PATH = Path(__file__).resolve().parent
cmd = on_command(cmd="语法糖", aliases={"", "sugar"}, block=True)
db_manager = DatabaseManager()
driver = nonebot.get_driver()
@driver.on_startup
async def register_startup_hook():
await init_db()
@driver.on_shutdown
async def register_shutdown_hook():
await db_manager.close_all_connections()
async def init_db():
await db_manager.execute_by_sql_file(ROOT_PATH / "sql" / "create_table.sql")
table_info = await db_manager.query("PRAGMA table_info(syntactic_sugar)")
columns = {str(row.get("name")) for row in table_info}
if "channel_id" not in columns:
await db_manager.execute(
"ALTER TABLE syntactic_sugar ADD COLUMN channel_id VARCHAR(255) NOT NULL DEFAULT ''"
)
await db_manager.execute("DROP INDEX IF EXISTS idx_syntactic_sugar_name_belong_to")
await db_manager.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target "
"ON syntactic_sugar(name, channel_id, belong_to)"
)
def _extract_reply_plain_text(evt: Event) -> str:
reply = getattr(evt, "reply", None)
if reply is None:
return ""
reply_message = getattr(reply, "message", None)
if reply_message is None:
return ""
extract_plain_text = getattr(reply_message, "extract_plain_text", None)
if callable(extract_plain_text):
return extract_plain_text().strip()
return str(reply_message).strip()
def _split_variables(tokens: list[str]) -> tuple[list[str], dict[str, str]]:
positional: list[str] = []
named: dict[str, str] = {}
for token in tokens:
if "=" in token:
key, value = token.split("=", 1)
key = key.strip()
if key:
named[key] = value
continue
positional.append(token)
return positional, named
def _render_template(content: str, positional: list[str], named: dict[str, str]) -> str:
def replace(match: re.Match[str]) -> str:
key = match.group(1).strip()
if key.isdigit():
idx = int(key) - 1
if 0 <= idx < len(positional):
return positional[idx]
return match.group(0)
return named.get(key, match.group(0))
return re.sub(r"\{([^{}]+)\}", replace, content)
async def _store_sugar(name: str, content: str, belong_to: str, channel_id: str):
await db_manager.execute_by_sql_file(
ROOT_PATH / "sql" / "insert_sugar.sql",
(name, content, belong_to, channel_id),
)
async def _delete_sugar(name: str, belong_to: str, channel_id: str):
await db_manager.execute(
"DELETE FROM syntactic_sugar WHERE name = ? AND belong_to = ? AND channel_id = ?",
(name, belong_to, channel_id),
)
async def _find_sugar(name: str, belong_to: str, channel_id: str) -> str | None:
rows = await db_manager.query(
(
"SELECT content FROM syntactic_sugar "
"WHERE name = ? AND channel_id = ? "
"ORDER BY CASE WHEN belong_to = ? THEN 0 ELSE 1 END, id ASC "
"LIMIT 1"
),
(name, channel_id, belong_to),
)
if not rows:
return None
return rows[0].get("content")
async def _reinject_command(bot: Bot, evt: Event, command_text: str) -> bool:
depth = int(getattr(evt, "_syntactic_sugar_depth", 0))
if depth >= 3:
return False
try:
cloned_evt = copy.deepcopy(evt)
except Exception:
logger.exception("语法糖克隆事件失败")
return False
message = getattr(cloned_evt, "message", None)
if message is None:
return False
try:
msg_obj = type(message)(command_text)
except Exception:
msg_obj = command_text
setattr(cloned_evt, "message", msg_obj)
if hasattr(cloned_evt, "original_message"):
setattr(cloned_evt, "original_message", msg_obj)
if hasattr(cloned_evt, "raw_message"):
setattr(cloned_evt, "raw_message", command_text)
setattr(cloned_evt, "_syntactic_sugar_depth", depth + 1)
try:
await handle_event(bot, cloned_evt)
except Exception:
logger.exception("语法糖回注事件失败")
return False
return True
@cmd.handle()
async def _(bot: Bot, evt: Event, target: DepLongTaskTarget, args: Message = CommandArg()):
raw = args.extract_plain_text().strip()
if not raw:
return
tokens = raw.split()
action = tokens[0]
target_id = target.target_id
channel_id = target.channel_id
if action == "存入":
if len(tokens) < 2:
await cmd.finish("请提供要存入的名称")
name = tokens[1].strip()
content = " ".join(tokens[2:]).strip()
if not content:
content = _extract_reply_plain_text(evt)
if not content:
await cmd.finish("请提供要存入的内容")
await _store_sugar(name, content, target_id, channel_id)
await cmd.finish(f"糖已存入:「{name}」!")
if action == "删除":
if len(tokens) < 2:
await cmd.finish("请提供要删除的名称")
name = tokens[1].strip()
await _delete_sugar(name, target_id, channel_id)
await cmd.finish(f"已删除糖:「{name}」!")
if action == "查看":
if len(tokens) < 2:
await cmd.finish("请提供要查看的名称")
name = tokens[1].strip()
content = await _find_sugar(name, target_id, channel_id)
if content is None:
await cmd.finish(f"没有糖:「{name}")
await cmd.finish(f"糖的内容:「{content}")
name = action
content = await _find_sugar(name, target_id, channel_id)
if content is None:
await cmd.finish(f"没有糖:「{name}")
positional, named = _split_variables(tokens[1:])
rendered = _render_template(content, positional, named)
ok = await _reinject_command(bot, evt, rendered)
if not ok:
await cmd.finish(f"糖的展开结果:「{rendered}")

View File

@ -0,0 +1,12 @@
-- 创建语法糖表
CREATE TABLE IF NOT EXISTS syntactic_sugar (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
belong_to VARCHAR(255) NOT NULL,
channel_id VARCHAR(255) NOT NULL DEFAULT ''
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_syntactic_sugar_name_channel_target
ON syntactic_sugar(name, channel_id, belong_to);

View File

@ -0,0 +1,5 @@
-- 插入语法糖,如果同一用户下名称已存在则更新内容
INSERT INTO syntactic_sugar (name, content, belong_to, channel_id)
VALUES (?, ?, ?, ?)
ON CONFLICT(name, channel_id, belong_to) DO UPDATE SET
content = excluded.content;

View File

@ -0,0 +1,35 @@
import re
import nonebot
from nonebot.adapters import Event
from nonebot_plugin_alconna import UniMessage, UniMsg
from konabot.common.nb import match_keyword
from konabot.common.permsys import register_default_allow_permission, require_permission
from konabot.plugins.trpg_roll.core import RollError, roll_expression
PERMISSION_KEY = "trpg.roll"
register_default_allow_permission(PERMISSION_KEY)
matcher = nonebot.on_message(
rule=match_keyword.match_keyword(re.compile(r"^roll(?:\s+.+)?$", re.I))
& require_permission(PERMISSION_KEY),
)
@matcher.handle()
async def _(event: Event, msg: UniMsg):
text = msg.extract_plain_text().strip()
expr = text[4:].strip()
if not expr:
await UniMessage.text("用法roll 3d6 / roll d20+5 / roll 2d8+1d4+3 / roll 4dF").send(event)
return
try:
result = roll_expression(expr)
await UniMessage.text(result.format()).send(event)
except RollError as e:
await UniMessage.text(str(e)).send(event)

View File

@ -0,0 +1,143 @@
import random
import re
from dataclasses import dataclass
MAX_DICE_COUNT = 100
MAX_DICE_FACES = 1000
MAX_TERM_COUNT = 20
MAX_TOTAL_ROLLS = 200
MAX_EXPRESSION_LENGTH = 200
MAX_MESSAGE_LENGTH = 1200
_TOKEN_RE = re.compile(r"([+-]?)(\d*d(?:%|[fF]|\d+)|\d+)")
_DICE_RE = re.compile(r"(?i)(\d*)d(%|f|\d+)")
# 常见跑团表达式示例3d6、d20+5、2d8+1d4+3、d%、4dF
class RollError(ValueError):
pass
@dataclass(slots=True)
class RollTermResult:
sign: int
source: str
detail: str
value: int
@dataclass(slots=True)
class RollResult:
expression: str
total: int
terms: list[RollTermResult]
def format(self) -> str:
parts = [f"{term.source}={term.detail}" for term in self.terms]
detail = " ".join(parts)
text = f"{self.expression} = {self.total}"
if detail:
text += f"\n{detail}"
if len(text) > MAX_MESSAGE_LENGTH:
raise RollError("结果过长,请减少骰子数量或简化表达式")
return text
def _parse_single_term(raw: str, sign: int, rng: random.Random) -> RollTermResult:
dice_match = _DICE_RE.fullmatch(raw)
if dice_match:
count_text, faces_text = dice_match.groups()
count = int(count_text) if count_text else 1
if count <= 0:
raise RollError("骰子个数必须大于 0")
if count > MAX_DICE_COUNT:
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
if faces_text == "%":
faces = 100
rolls = [rng.randint(1, 100) for _ in range(count)]
elif faces_text.lower() == "f":
rolls = [rng.choice((-1, 0, 1)) for _ in range(count)]
total = sum(rolls) * sign
signed = "+" if sign > 0 else "-"
return RollTermResult(
sign=sign,
source=f"{signed}{count}dF",
detail="[" + ", ".join(f"{v:+d}" for v in rolls) + "]",
value=total,
)
else:
faces = int(faces_text)
if faces <= 0:
raise RollError("骰子面数必须大于 0")
if faces > MAX_DICE_FACES:
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
rolls = [rng.randint(1, faces) for _ in range(count)]
total = sum(rolls) * sign
signed = "+" if sign > 0 else "-"
return RollTermResult(
sign=sign,
source=f"{signed}{count}d{faces_text.upper()}",
detail="[" + ", ".join(map(str, rolls)) + "]",
value=total,
)
value = int(raw) * sign
signed = "+" if sign > 0 else "-"
return RollTermResult(sign=sign, source=f"{signed}{raw}", detail=str(abs(value)), value=value)
def roll_expression(expr: str, rng: random.Random | None = None) -> RollResult:
expr = expr.strip().replace(" ", "")
if not expr:
raise RollError("请提供要掷的表达式,例如 roll 3d6 或 roll d20+5")
if len(expr) > MAX_EXPRESSION_LENGTH:
raise RollError("表达式太长了")
matches = list(_TOKEN_RE.finditer(expr))
if not matches:
raise RollError("无法解析表达式,请使用如 3d6、d20+5、2d8+1d4+3、4dF 这样的格式")
rebuilt = "".join(m.group(0) for m in matches)
if rebuilt != expr:
raise RollError("表达式中含有无法识别的内容")
if len(matches) > MAX_TERM_COUNT:
raise RollError(f"表达式项数不能超过 {MAX_TERM_COUNT}")
total_rolls = 0
for m in matches:
token = m.group(2)
dice_match = _DICE_RE.fullmatch(token)
if dice_match:
count_text, faces_text = dice_match.groups()
count = int(count_text) if count_text else 1
if count <= 0:
raise RollError("骰子个数必须大于 0")
if count > MAX_DICE_COUNT:
raise RollError(f"单项最多只能掷 {MAX_DICE_COUNT} 个骰子")
if faces_text not in {"%", "f", "F"}:
faces = int(faces_text)
if faces <= 0:
raise RollError("骰子面数必须大于 0")
if faces > MAX_DICE_FACES:
raise RollError(f"骰子面数不能超过 {MAX_DICE_FACES}")
total_rolls += count
if total_rolls > MAX_TOTAL_ROLLS:
raise RollError(f"一次最多只能实际掷 {MAX_TOTAL_ROLLS} 个骰子")
rng = rng or random.Random()
terms: list[RollTermResult] = []
total = 0
for idx, match in enumerate(matches):
sign_text, raw = match.groups()
sign = -1 if sign_text == "-" else 1
if idx == 0 and sign_text == "":
sign = 1
term = _parse_single_term(raw, sign, rng)
terms.append(term)
total += term.value
return RollResult(expression=expr, total=total, terms=terms)

View File

@ -0,0 +1,40 @@
from contextlib import asynccontextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from konabot.common.database import DatabaseManager
from konabot.common.permsys import PermManager, register_default_allow_permission
from konabot.common.permsys.entity import PermEntity
from konabot.common.permsys.migrates import execute_migration
@asynccontextmanager
async def tempdb():
with TemporaryDirectory() as _tempdir:
tempdir = Path(_tempdir)
db = DatabaseManager(tempdir / "perm.sqlite3")
yield db
await db.close_all_connections()
@pytest.mark.asyncio
async def test_register_default_allow_permission_records_key():
register_default_allow_permission("test.default.allow")
async with tempdb() as db:
async with db.get_conn() as conn:
await execute_migration(conn)
pm = PermManager(db)
await pm.update_permission(
PermEntity("sys", "global", "global"),
"test.default.allow",
True,
)
assert await pm.check_has_permission(
[PermEntity("dummy", "user", "1"), PermEntity("sys", "global", "global")],
"test.default.allow.sub",
)

66
tests/test_trpg_roll.py Normal file
View File

@ -0,0 +1,66 @@
import random
import pytest
from konabot.plugins.trpg_roll.core import RollError, roll_expression
class FakeRandom:
def __init__(self, randint_values: list[int] | None = None, choice_values: list[int] | None = None):
self._randint_values = list(randint_values or [])
self._choice_values = list(choice_values or [])
def randint(self, _a: int, _b: int) -> int:
assert self._randint_values
return self._randint_values.pop(0)
def choice(self, _seq):
assert self._choice_values
return self._choice_values.pop(0)
def test_roll_expression_basic():
rng = FakeRandom(randint_values=[2, 4, 5])
result = roll_expression("3d6", rng=rng)
assert result.total == 11
assert result.format() == "3d6 = 11\n+3d6=[2, 4, 5]"
def test_roll_expression_multiple_terms():
rng = FakeRandom(randint_values=[14, 3, 1])
result = roll_expression("d20+1d4-2", rng=rng)
assert result.total == 15
assert result.format() == "d20+1d4-2 = 15\n+1d20=[14] +1d4=[3] -2=2"
def test_roll_expression_df():
rng = FakeRandom(choice_values=[-1, 0, 1, 1])
result = roll_expression("4dF", rng=rng)
assert result.total == 1
assert result.format() == "4dF = 1\n+4dF=[-1, +0, +1, +1]"
@pytest.mark.parametrize(
("expr", "message"),
[
("", "请提供要掷的表达式"),
("abc", "无法解析表达式"),
("1d0", "骰子面数必须大于 0"),
("0d6", "骰子个数必须大于 0"),
("101d6", "单项最多只能掷 100 个骰子"),
("1d1001", "骰子面数不能超过 1000"),
("201d1", "单项最多只能掷 100 个骰子"),
("1d6*2", "表达式中含有无法识别的内容"),
],
)
def test_roll_expression_invalid(expr: str, message: str):
with pytest.raises(RollError, match=message):
roll_expression(expr, rng=random.Random(0))
def test_roll_expression_total_roll_limit():
with pytest.raises(RollError, match="一次最多只能实际掷 200 个骰子"):
roll_expression("100d6+100d6+1d6", rng=random.Random(0))