import pytest import nonebot nonebot.init() from konabot.plugins.handle_text.base import IfNode, PipelineRunner, TextHandlerEnvironment, WhileNode from konabot.plugins.handle_text.handlers.encoding_handlers import THReverse from konabot.plugins.handle_text.handlers.unix_handlers import ( THCat, THEcho, THFalse, THRm, THTest, THTrue, ) from konabot.plugins.handle_text.handlers.whitespace_handlers import THTrim @pytest.fixture def runner() -> PipelineRunner: runner = PipelineRunner() runner.register(THEcho()) runner.register(THCat()) runner.register(THRm()) runner.register(THTrue()) runner.register(THFalse()) runner.register(THTest()) runner.register(THReverse()) runner.register(THTrim()) return runner def test_parse_pipeline_shell_ops(runner: PipelineRunner): parsed = runner.parse_pipeline('echo hello | reverse && test a = a || echo no; echo done > out') assert not isinstance(parsed, str) assert len(parsed.statements) == 2 first = parsed.statements[0] second = parsed.statements[1] assert not isinstance(first, IfNode) assert not isinstance(first, WhileNode) assert not isinstance(second, IfNode) assert not isinstance(second, WhileNode) assert len(first.chains) == 3 assert first.chains[0].pipeline.commands[0].name == 'echo' assert first.chains[0].pipeline.commands[1].name == 'reverse' assert second.chains[0].pipeline.commands[0].redirects[0].target == 'out' def test_parse_if_statement(runner: PipelineRunner): parsed = runner.parse_pipeline('if test a = a; then echo yes; else echo no; fi') assert not isinstance(parsed, str) assert len(parsed.statements) == 1 stmt = parsed.statements[0] assert isinstance(stmt, IfNode) assert stmt.else_body is not None assert len(stmt.then_body.statements) == 1 def test_parse_while_statement(runner: PipelineRunner): parsed = runner.parse_pipeline('while false; do echo yes; done') assert not isinstance(parsed, str) assert len(parsed.statements) == 1 stmt = parsed.statements[0] assert isinstance(stmt, WhileNode) assert len(stmt.body.statements) == 1 @pytest.mark.asyncio async def test_pipeline_pipe(runner: PipelineRunner): parsed = runner.parse_pipeline('echo hello | reverse') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert len(results) == 1 assert results[0].code == 0 assert results[0].ostream == 'olleh' @pytest.mark.asyncio async def test_redirect_and_cat(runner: PipelineRunner): parsed = runner.parse_pipeline('echo hello > a; cat a') assert not isinstance(parsed, str) env = TextHandlerEnvironment(False) results = await runner.run_pipeline(parsed, None, env) assert env.buffers['a'] == 'hello' assert results[-1].ostream == 'hello' @pytest.mark.asyncio async def test_append_redirect(runner: PipelineRunner): parsed = runner.parse_pipeline('echo hello > a; echo world >> a; cat a') assert not isinstance(parsed, str) env = TextHandlerEnvironment(False) results = await runner.run_pipeline(parsed, None, env) assert env.buffers['a'] == 'helloworld' assert results[-1].ostream == 'helloworld' @pytest.mark.asyncio async def test_and_or_short_circuit(runner: PipelineRunner): parsed = runner.parse_pipeline('test a = b && echo bad || echo ok') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert len(results) == 1 assert results[0].code == 0 assert results[0].ostream == 'ok' @pytest.mark.asyncio async def test_test_bracket_alias(runner: PipelineRunner): parsed = runner.parse_pipeline('[ 2 -gt 1 ] && echo yes') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].code == 0 assert results[0].ostream == 'yes' @pytest.mark.asyncio async def test_test_string_ops(runner: PipelineRunner): parsed = runner.parse_pipeline('test -n abc && echo yes; test -z abc || echo no') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert [r.ostream for r in results] == ['yes', 'no'] @pytest.mark.asyncio async def test_quote_and_trim(runner: PipelineRunner): parsed = runner.parse_pipeline('echo " hello world " | trim') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].ostream == 'hello world' @pytest.mark.asyncio async def test_if_then_else(runner: PipelineRunner): parsed = runner.parse_pipeline('if test a = b; then echo yes; else echo no; fi') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].code == 0 assert results[0].ostream == 'no' @pytest.mark.asyncio async def test_if_then_without_else(runner: PipelineRunner): parsed = runner.parse_pipeline('if test a = a; then echo yes; fi') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].ostream == 'yes' @pytest.mark.asyncio async def test_nested_if(runner: PipelineRunner): parsed = runner.parse_pipeline( 'if test a = a; then if test b = c; then echo x; else echo y; fi; else echo z; fi' ) assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].ostream == 'y' @pytest.mark.asyncio async def test_negate_pipeline(runner: PipelineRunner): parsed = runner.parse_pipeline('! test a = b && echo ok') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].ostream == 'ok' @pytest.mark.asyncio async def test_true_false(runner: PipelineRunner): parsed = runner.parse_pipeline('true && echo yes; false || echo no') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert [r.ostream for r in results] == ['yes', 'no'] @pytest.mark.asyncio async def test_while_false_noop(runner: PipelineRunner): parsed = runner.parse_pipeline('while false; do echo yes; done') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].code == 0 assert results[0].ostream is None @pytest.mark.asyncio async def test_while_limit_guard(runner: PipelineRunner): parsed = runner.parse_pipeline('while true; do echo yes; done') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].code == 2 assert 'while 循环超过最大迭代次数限制' in (results[0].ostream or '') @pytest.mark.asyncio async def test_while_with_immediate_break_condition(runner: PipelineRunner): parsed = runner.parse_pipeline('while ! false; do false; done') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].code == 1 @pytest.mark.asyncio async def test_while_body_can_use_if(runner: PipelineRunner): parsed = runner.parse_pipeline('while ! false; do if true; then false; fi; done') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].code == 1 @pytest.mark.asyncio async def test_echo_empty_string(runner: PipelineRunner): """测试 echo 空字符串""" # 双引号空字符串 parsed = runner.parse_pipeline('echo ""') assert not isinstance(parsed, str) results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False)) assert results[0].code == 0 assert results[0].ostream == '' # 单引号空字符串 parsed2 = runner.parse_pipeline("echo ''") assert not isinstance(parsed2, str) results2 = await runner.run_pipeline(parsed2, None, TextHandlerEnvironment(False)) assert results2[0].code == 0 assert results2[0].ostream == ''