feat: evolve textfx into a mini shell
This commit is contained in:
207
tests/test_textfx_shell.py
Normal file
207
tests/test_textfx_shell.py
Normal file
@ -0,0 +1,207 @@
|
||||
import pytest
|
||||
import nonebot
|
||||
|
||||
nonebot.init()
|
||||
|
||||
from konabot.plugins.handle_text.base import IfNode, PipelineRunner, TextHandlerEnvironment, WhileNode
|
||||
from konabot.plugins.handle_text.handlers.encoding_handlers import THReverse
|
||||
from konabot.plugins.handle_text.handlers.unix_handlers import (
|
||||
THCat,
|
||||
THEcho,
|
||||
THFalse,
|
||||
THRm,
|
||||
THTest,
|
||||
THTrue,
|
||||
)
|
||||
from konabot.plugins.handle_text.handlers.whitespace_handlers import THTrim
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def runner() -> PipelineRunner:
|
||||
runner = PipelineRunner()
|
||||
runner.register(THEcho())
|
||||
runner.register(THCat())
|
||||
runner.register(THRm())
|
||||
runner.register(THTrue())
|
||||
runner.register(THFalse())
|
||||
runner.register(THTest())
|
||||
runner.register(THReverse())
|
||||
runner.register(THTrim())
|
||||
return runner
|
||||
|
||||
|
||||
def test_parse_pipeline_shell_ops(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello | reverse && test a = a || echo no; echo done > out')
|
||||
assert not isinstance(parsed, str)
|
||||
assert len(parsed.statements) == 2
|
||||
first = parsed.statements[0]
|
||||
second = parsed.statements[1]
|
||||
assert not isinstance(first, IfNode)
|
||||
assert not isinstance(first, WhileNode)
|
||||
assert not isinstance(second, IfNode)
|
||||
assert not isinstance(second, WhileNode)
|
||||
assert len(first.chains) == 3
|
||||
assert first.chains[0].pipeline.commands[0].name == 'echo'
|
||||
assert first.chains[0].pipeline.commands[1].name == 'reverse'
|
||||
assert second.chains[0].pipeline.commands[0].redirects[0].target == 'out'
|
||||
|
||||
|
||||
def test_parse_if_statement(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('if test a = a; then echo yes; else echo no; fi')
|
||||
assert not isinstance(parsed, str)
|
||||
assert len(parsed.statements) == 1
|
||||
stmt = parsed.statements[0]
|
||||
assert isinstance(stmt, IfNode)
|
||||
assert stmt.else_body is not None
|
||||
assert len(stmt.then_body.statements) == 1
|
||||
|
||||
|
||||
def test_parse_while_statement(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while false; do echo yes; done')
|
||||
assert not isinstance(parsed, str)
|
||||
assert len(parsed.statements) == 1
|
||||
stmt = parsed.statements[0]
|
||||
assert isinstance(stmt, WhileNode)
|
||||
assert len(stmt.body.statements) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pipeline_pipe(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello | reverse')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert len(results) == 1
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'olleh'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_redirect_and_cat(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello > a; cat a')
|
||||
assert not isinstance(parsed, str)
|
||||
env = TextHandlerEnvironment(False)
|
||||
results = await runner.run_pipeline(parsed, None, env)
|
||||
assert env.buffers['a'] == 'hello'
|
||||
assert results[-1].ostream == 'hello'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_redirect(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo hello > a; echo world >> a; cat a')
|
||||
assert not isinstance(parsed, str)
|
||||
env = TextHandlerEnvironment(False)
|
||||
results = await runner.run_pipeline(parsed, None, env)
|
||||
assert env.buffers['a'] == 'helloworld'
|
||||
assert results[-1].ostream == 'helloworld'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_and_or_short_circuit(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('test a = b && echo bad || echo ok')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert len(results) == 1
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'ok'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_test_bracket_alias(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('[ 2 -gt 1 ] && echo yes')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'yes'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_test_string_ops(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('test -n abc && echo yes; test -z abc || echo no')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert [r.ostream for r in results] == ['yes', 'no']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_quote_and_trim(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('echo " hello world " | trim')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'hello world'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_if_then_else(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('if test a = b; then echo yes; else echo no; fi')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream == 'no'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_if_then_without_else(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('if test a = a; then echo yes; fi')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'yes'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nested_if(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline(
|
||||
'if test a = a; then if test b = c; then echo x; else echo y; fi; else echo z; fi'
|
||||
)
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'y'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_negate_pipeline(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('! test a = b && echo ok')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].ostream == 'ok'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_true_false(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('true && echo yes; false || echo no')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert [r.ostream for r in results] == ['yes', 'no']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_false_noop(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while false; do echo yes; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 0
|
||||
assert results[0].ostream is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_limit_guard(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while true; do echo yes; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 2
|
||||
assert 'while 循环超过最大迭代次数限制' in (results[0].ostream or '')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_with_immediate_break_condition(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while ! false; do false; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_while_body_can_use_if(runner: PipelineRunner):
|
||||
parsed = runner.parse_pipeline('while ! false; do if true; then false; fi; done')
|
||||
assert not isinstance(parsed, str)
|
||||
results = await runner.run_pipeline(parsed, None, TextHandlerEnvironment(False))
|
||||
assert results[0].code == 1
|
||||
Reference in New Issue
Block a user