import nonebot nonebot.init() import asyncio import pytest from konabot.plugins.handle_text.__init__ import ( _get_textfx_user_key, _textfx_running_users, TEXTFX_MAX_RUNTIME_SECONDS, ) from konabot.plugins.handle_text.base import PipelineRunner class DummyEvent: def __init__(self, self_id=None, user_id=None, group_id=None, session_id=None): self.self_id = self_id self.user_id = user_id self.group_id = group_id self._session_id = session_id def get_session_id(self): if self._session_id is None: raise RuntimeError('no session') return self._session_id def test_textfx_user_key_group(): evt = DummyEvent(self_id='123', user_id='456', group_id='789') assert _get_textfx_user_key(evt) == '123:789:456' def test_textfx_user_key_private(): evt = DummyEvent(self_id='123', user_id='456') assert _get_textfx_user_key(evt) == '123:private:456' def test_textfx_user_key_session_fallback(): evt = DummyEvent(session_id='console:alice') assert _get_textfx_user_key(evt) == 'session:console:alice' @pytest.mark.asyncio async def test_textfx_timeout_limit(): """测试脚本执行超时限制""" runner = PipelineRunner.get_runner() # 创建一个会超时的脚本(while true 会触发迭代限制,但我们用 sleep 模拟长时间运行) # 由于实际超时是 60 秒,我们不能真的等那么久,所以这个测试验证超时机制存在 script = "echo start" parsed = runner.parse_pipeline(script) assert not isinstance(parsed, str), "脚本解析应该成功" # 验证 TEXTFX_MAX_RUNTIME_SECONDS 常量存在且合理 assert TEXTFX_MAX_RUNTIME_SECONDS == 60 @pytest.mark.asyncio async def test_textfx_concurrent_limit(): """测试同一用户并发执行限制""" user_key = "test:group:user123" # 清理可能的残留状态 _textfx_running_users.discard(user_key) # 模拟第一个脚本正在运行 assert user_key not in _textfx_running_users _textfx_running_users.add(user_key) # 验证用户已被标记为运行中 assert user_key in _textfx_running_users # 清理 _textfx_running_users.discard(user_key) assert user_key not in _textfx_running_users