Files
konabot/tests/test_textfx_shell.py
pi-agent 9bac2b8cdf fix: support empty string literals in textfx
- Fix tokenizer to emit empty string token when closing quote on empty buffer
- Add force parameter to flush_word() to handle empty quoted strings
- Add test case for echo "" and echo ''
2026-03-18 19:23:42 +08:00

226 lines
8.2 KiB
Python

import pytest
import nonebot
nonebot.init()
from konabot.plugins.handle_text.base import IfNode, PipelineRunner, TextHandlerEnvironment, WhileNode
from konabot.plugins.handle_text.handlers.encoding_handlers import THReverse
from konabot.plugins.handle_text.handlers.unix_handlers import (
THCat,
THEcho,
THFalse,
THRm,
THTest,
THTrue,
)
from konabot.plugins.handle_text.handlers.whitespace_handlers import THTrim
@pytest.fixture
def runner() -> PipelineRunner:
runner = PipelineRunner()
runner.register(THEcho())
runner.register(THCat())
runner.register(THRm())
runner.register(THTrue())
runner.register(THFalse())
runner.register(THTest())
runner.register(THReverse())
runner.register(THTrim())
return runner
def test_parse_pipeline_shell_ops(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo hello | reverse && test a = a || echo no; echo done > out')
assert not isinstance(parsed, str)
assert len(parsed.statements) == 2
first = parsed.statements[0]
second = parsed.statements[1]
assert not isinstance(first, IfNode)
assert not isinstance(first, WhileNode)
assert not isinstance(second, IfNode)
assert not isinstance(second, WhileNode)
assert len(first.chains) == 3
assert first.chains[0].pipeline.commands[0].name == 'echo'
assert first.chains[0].pipeline.commands[1].name == 'reverse'
assert second.chains[0].pipeline.commands[0].redirects[0].target == 'out'
def test_parse_if_statement(runner: PipelineRunner):
parsed = runner.parse_pipeline('if test a = a; then echo yes; else echo no; fi')
assert not isinstance(parsed, str)
assert len(parsed.statements) == 1
stmt = parsed.statements[0]
assert isinstance(stmt, IfNode)
assert stmt.else_body is not None
assert len(stmt.then_body.statements) == 1
def test_parse_while_statement(runner: PipelineRunner):
parsed = runner.parse_pipeline('while false; do echo yes; done')
assert not isinstance(parsed, str)
assert len(parsed.statements) == 1
stmt = parsed.statements[0]
assert isinstance(stmt, WhileNode)
assert len(stmt.body.statements) == 1
@pytest.mark.asyncio
async def test_pipeline_pipe(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo hello | reverse')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert len(results) == 1
assert results[0].code == 0
assert results[0].ostream == 'olleh'
@pytest.mark.asyncio
async def test_redirect_and_cat(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo hello > a; cat a')
assert not isinstance(parsed, str)
env = TextHandlerEnvironment(False)
results = await runner.run_pipeline(parsed, None, env)
assert env.buffers['a'] == 'hello'
assert results[-1].ostream == 'hello'
@pytest.mark.asyncio
async def test_append_redirect(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo hello > a; echo world >> a; cat a')
assert not isinstance(parsed, str)
env = TextHandlerEnvironment(False)
results = await runner.run_pipeline(parsed, None, env)
assert env.buffers['a'] == 'helloworld'
assert results[-1].ostream == 'helloworld'
@pytest.mark.asyncio
async def test_and_or_short_circuit(runner: PipelineRunner):
parsed = runner.parse_pipeline('test a = b && echo bad || echo ok')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert len(results) == 1
assert results[0].code == 0
assert results[0].ostream == 'ok'
@pytest.mark.asyncio
async def test_test_bracket_alias(runner: PipelineRunner):
parsed = runner.parse_pipeline('[ 2 -gt 1 ] && echo yes')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 0
assert results[0].ostream == 'yes'
@pytest.mark.asyncio
async def test_test_string_ops(runner: PipelineRunner):
parsed = runner.parse_pipeline('test -n abc && echo yes; test -z abc || echo no')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert [r.ostream for r in results] == ['yes', 'no']
@pytest.mark.asyncio
async def test_quote_and_trim(runner: PipelineRunner):
parsed = runner.parse_pipeline('echo " hello world " | trim')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].ostream == 'hello world'
@pytest.mark.asyncio
async def test_if_then_else(runner: PipelineRunner):
parsed = runner.parse_pipeline('if test a = b; then echo yes; else echo no; fi')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 0
assert results[0].ostream == 'no'
@pytest.mark.asyncio
async def test_if_then_without_else(runner: PipelineRunner):
parsed = runner.parse_pipeline('if test a = a; then echo yes; fi')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].ostream == 'yes'
@pytest.mark.asyncio
async def test_nested_if(runner: PipelineRunner):
parsed = runner.parse_pipeline(
'if test a = a; then if test b = c; then echo x; else echo y; fi; else echo z; fi'
)
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].ostream == 'y'
@pytest.mark.asyncio
async def test_negate_pipeline(runner: PipelineRunner):
parsed = runner.parse_pipeline('! test a = b && echo ok')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].ostream == 'ok'
@pytest.mark.asyncio
async def test_true_false(runner: PipelineRunner):
parsed = runner.parse_pipeline('true && echo yes; false || echo no')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert [r.ostream for r in results] == ['yes', 'no']
@pytest.mark.asyncio
async def test_while_false_noop(runner: PipelineRunner):
parsed = runner.parse_pipeline('while false; do echo yes; done')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 0
assert results[0].ostream is None
@pytest.mark.asyncio
async def test_while_limit_guard(runner: PipelineRunner):
parsed = runner.parse_pipeline('while true; do echo yes; done')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 2
assert 'while 循环超过最大迭代次数限制' in (results[0].ostream or '')
@pytest.mark.asyncio
async def test_while_with_immediate_break_condition(runner: PipelineRunner):
parsed = runner.parse_pipeline('while ! false; do false; done')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 1
@pytest.mark.asyncio
async def test_while_body_can_use_if(runner: PipelineRunner):
parsed = runner.parse_pipeline('while ! false; do if true; then false; fi; done')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 1
@pytest.mark.asyncio
async def test_echo_empty_string(runner: PipelineRunner):
"""测试 echo 空字符串"""
# 双引号空字符串
parsed = runner.parse_pipeline('echo ""')
assert not isinstance(parsed, str)
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
assert results[0].code == 0
assert results[0].ostream == ''
# 单引号空字符串
parsed2 = runner.parse_pipeline("echo ''")
assert not isinstance(parsed2, str)
results2 = await runner.run_pipeline(parsed2, None, TextHandlerEnvironment(False))
assert results2[0].code == 0
assert results2[0].ostream == ''