- Fix tokenizer to emit empty string token when closing quote on empty buffer - Add force parameter to flush_word() to handle empty quoted strings - Add test case for echo "" and echo ''
226 lines
8.2 KiB
Python
226 lines
8.2 KiB
Python
import pytest
|
|
import nonebot
|
|
|
|
nonebot.init()
|
|
|
|
from konabot.plugins.handle_text.base import IfNode, PipelineRunner, TextHandlerEnvironment, WhileNode
|
|
from konabot.plugins.handle_text.handlers.encoding_handlers import THReverse
|
|
from konabot.plugins.handle_text.handlers.unix_handlers import (
|
|
THCat,
|
|
THEcho,
|
|
THFalse,
|
|
THRm,
|
|
THTest,
|
|
THTrue,
|
|
)
|
|
from konabot.plugins.handle_text.handlers.whitespace_handlers import THTrim
|
|
|
|
|
|
@pytest.fixture
|
|
def runner() -> PipelineRunner:
|
|
runner = PipelineRunner()
|
|
runner.register(THEcho())
|
|
runner.register(THCat())
|
|
runner.register(THRm())
|
|
runner.register(THTrue())
|
|
runner.register(THFalse())
|
|
runner.register(THTest())
|
|
runner.register(THReverse())
|
|
runner.register(THTrim())
|
|
return runner
|
|
|
|
|
|
def test_parse_pipeline_shell_ops(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('echo hello | reverse && test a = a || echo no; echo done > out')
|
|
assert not isinstance(parsed, str)
|
|
assert len(parsed.statements) == 2
|
|
first = parsed.statements[0]
|
|
second = parsed.statements[1]
|
|
assert not isinstance(first, IfNode)
|
|
assert not isinstance(first, WhileNode)
|
|
assert not isinstance(second, IfNode)
|
|
assert not isinstance(second, WhileNode)
|
|
assert len(first.chains) == 3
|
|
assert first.chains[0].pipeline.commands[0].name == 'echo'
|
|
assert first.chains[0].pipeline.commands[1].name == 'reverse'
|
|
assert second.chains[0].pipeline.commands[0].redirects[0].target == 'out'
|
|
|
|
|
|
def test_parse_if_statement(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('if test a = a; then echo yes; else echo no; fi')
|
|
assert not isinstance(parsed, str)
|
|
assert len(parsed.statements) == 1
|
|
stmt = parsed.statements[0]
|
|
assert isinstance(stmt, IfNode)
|
|
assert stmt.else_body is not None
|
|
assert len(stmt.then_body.statements) == 1
|
|
|
|
|
|
def test_parse_while_statement(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('while false; do echo yes; done')
|
|
assert not isinstance(parsed, str)
|
|
assert len(parsed.statements) == 1
|
|
stmt = parsed.statements[0]
|
|
assert isinstance(stmt, WhileNode)
|
|
assert len(stmt.body.statements) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pipeline_pipe(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('echo hello | reverse')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert len(results) == 1
|
|
assert results[0].code == 0
|
|
assert results[0].ostream == 'olleh'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redirect_and_cat(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('echo hello > a; cat a')
|
|
assert not isinstance(parsed, str)
|
|
env = TextHandlerEnvironment(False)
|
|
results = await runner.run_pipeline(parsed, None, env)
|
|
assert env.buffers['a'] == 'hello'
|
|
assert results[-1].ostream == 'hello'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_append_redirect(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('echo hello > a; echo world >> a; cat a')
|
|
assert not isinstance(parsed, str)
|
|
env = TextHandlerEnvironment(False)
|
|
results = await runner.run_pipeline(parsed, None, env)
|
|
assert env.buffers['a'] == 'helloworld'
|
|
assert results[-1].ostream == 'helloworld'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_and_or_short_circuit(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('test a = b && echo bad || echo ok')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert len(results) == 1
|
|
assert results[0].code == 0
|
|
assert results[0].ostream == 'ok'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_test_bracket_alias(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('[ 2 -gt 1 ] && echo yes')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].code == 0
|
|
assert results[0].ostream == 'yes'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_test_string_ops(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('test -n abc && echo yes; test -z abc || echo no')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert [r.ostream for r in results] == ['yes', 'no']
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_quote_and_trim(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('echo " hello world " | trim')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].ostream == 'hello world'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_if_then_else(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('if test a = b; then echo yes; else echo no; fi')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].code == 0
|
|
assert results[0].ostream == 'no'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_if_then_without_else(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('if test a = a; then echo yes; fi')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].ostream == 'yes'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_nested_if(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline(
|
|
'if test a = a; then if test b = c; then echo x; else echo y; fi; else echo z; fi'
|
|
)
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].ostream == 'y'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_negate_pipeline(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('! test a = b && echo ok')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].ostream == 'ok'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_true_false(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('true && echo yes; false || echo no')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert [r.ostream for r in results] == ['yes', 'no']
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_while_false_noop(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('while false; do echo yes; done')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].code == 0
|
|
assert results[0].ostream is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_while_limit_guard(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('while true; do echo yes; done')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].code == 2
|
|
assert 'while 循环超过最大迭代次数限制' in (results[0].ostream or '')
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_while_with_immediate_break_condition(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('while ! false; do false; done')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].code == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_while_body_can_use_if(runner: PipelineRunner):
|
|
parsed = runner.parse_pipeline('while ! false; do if true; then false; fi; done')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].code == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_echo_empty_string(runner: PipelineRunner):
|
|
"""测试 echo 空字符串"""
|
|
# 双引号空字符串
|
|
parsed = runner.parse_pipeline('echo ""')
|
|
assert not isinstance(parsed, str)
|
|
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
|
assert results[0].code == 0
|
|
assert results[0].ostream == ''
|
|
|
|
# 单引号空字符串
|
|
parsed2 = runner.parse_pipeline("echo ''")
|
|
assert not isinstance(parsed2, str)
|
|
results2 = await runner.run_pipeline(parsed2, None, TextHandlerEnvironment(False))
|
|
assert results2[0].code == 0
|
|
assert results2[0].ostream == ''
|