diff --git a/packages/jumpstarter-driver-pyserial/README.md b/packages/jumpstarter-driver-pyserial/README.md index 3ebdaf044..eeaf4e1a9 100644 --- a/packages/jumpstarter-driver-pyserial/README.md +++ b/packages/jumpstarter-driver-pyserial/README.md @@ -33,6 +33,58 @@ export: | check_present | Check if the serial port exists during exporter initialization, disable if you are connecting to a dynamically created port (i.e. USB from your DUT) | bool | no | True | | cps | Characters per second throttling limit. When set, data transmission will be throttled to simulate slow typing. Useful for devices that can't handle fast input | float | no | None | +## CLI Commands + +The pyserial driver provides two CLI commands for interacting with serial ports: + +### start_console + +Start an interactive serial console with direct terminal access. + +```bash +j serial start-console +``` + +Exit the console by pressing CTRL+B three times. + +### pipe + +Pipe serial port data to stdout or a file. Automatically detects if stdin is piped and enables bidirectional mode. + +When stdin is used, commands are sent until EOF, then continues monitoring serial output until Ctrl+C. + +```bash +# Log serial output to stdout +j serial pipe + +# Log serial output to a file +j serial pipe -o serial.log + +# Send command to serial, then continue monitoring output +echo "hello" | j serial pipe + +# Send commands from file, then continue monitoring output +cat commands.txt | j serial pipe -o serial.log + +# Force bidirectional mode (interactive) +j serial pipe -i + +# Append to log file instead of overwriting +j serial pipe -o serial.log -a + +# Disable stdin input even when piped +cat data.txt | j serial pipe --no-input +``` + +#### Options + +- `-o, --output FILE`: Write serial output to a file instead of stdout +- `-i, --input`: Force enable stdin to serial port (auto-detected if piped) +- `--no-input`: Disable stdin to serial port, even if stdin is piped +- `-a, --append`: Append to output file instead of overwriting + +Exit with Ctrl+C. + ## API Reference ```{eval-rst} diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/cli_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/cli_test.py new file mode 100644 index 000000000..37288bfd0 --- /dev/null +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/cli_test.py @@ -0,0 +1,347 @@ +""" +CLI tests for PySerial driver. + +Tests the Click CLI interface including the pipe command. +""" + +from unittest.mock import AsyncMock, patch + +import pytest +from anyio import BrokenResourceError, EndOfStream +from click.testing import CliRunner + +from .driver import PySerial +from jumpstarter.common.utils import serve + + +@pytest.fixture +def anyio_backend(): + return "asyncio" + + +@pytest.fixture +def pyserial_client(): + """Fixture to create a PySerial client with loop:// URL for testing.""" + instance = PySerial(url="loop://") + with serve(instance) as client: + yield client + + +def test_pipe_command_append_requires_output(pyserial_client): + """Test that --append requires --output.""" + runner = CliRunner() + cli = pyserial_client.cli() + + # Mock the portal to prevent actual execution + with patch.object(pyserial_client, "portal"): + result = runner.invoke(cli, ["pipe", "--append"]) + assert result.exit_code != 0 + assert "--append requires --output" in result.output + + +def test_pipe_command_input_and_no_input_conflict(pyserial_client): + """Test that --input and --no-input cannot be used together.""" + runner = CliRunner() + cli = pyserial_client.cli() + + # Mock the portal to prevent actual execution + with patch.object(pyserial_client, "portal"): + result = runner.invoke(cli, ["pipe", "--input", "--no-input"]) + assert result.exit_code != 0 + assert "Cannot use both --input and --no-input" in result.output + + +def test_pipe_command_with_output_file(pyserial_client): + """Test pipe command with output file option.""" + runner = CliRunner() + cli = pyserial_client.cli() + + with runner.isolated_filesystem(): + # Mock the portal.call to prevent actual execution + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt # Simulate Ctrl+C to exit + + # Use --no-input to explicitly disable input detection + runner.invoke(cli, ["pipe", "-o", "test.log", "--no-input"]) + + # Should have attempted to call _pipe_serial + assert mock_call.called + # Check the arguments passed + args = mock_call.call_args[0] + assert args[1] == "test.log" # output file + assert args[2] is False # input_enabled + assert args[3] is False # append + + +def test_pipe_command_with_append(pyserial_client): + """Test pipe command with append option.""" + runner = CliRunner() + cli = pyserial_client.cli() + + with runner.isolated_filesystem(): + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt + + runner.invoke(cli, ["pipe", "-o", "test.log", "-a"]) + + assert mock_call.called + args = mock_call.call_args[0] + assert args[1] == "test.log" # output file + assert args[3] is True # append + + +def test_pipe_command_with_input_flag(pyserial_client): + """Test pipe command with --input flag.""" + runner = CliRunner() + cli = pyserial_client.cli() + + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt + + runner.invoke(cli, ["pipe", "-i"]) + + assert mock_call.called + args = mock_call.call_args[0] + assert args[2] is True # input_enabled + + +def test_pipe_command_with_no_input_flag(pyserial_client): + """Test pipe command with --no-input flag.""" + runner = CliRunner() + cli = pyserial_client.cli() + + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt + + runner.invoke(cli, ["pipe", "--no-input"]) + + assert mock_call.called + args = mock_call.call_args[0] + assert args[2] is False # input_enabled + + +def test_pipe_command_stdin_auto_detection(pyserial_client): + """Test that pipe command auto-detects piped stdin with CliRunner.""" + runner = CliRunner() + cli = pyserial_client.cli() + + # CliRunner doesn't provide a TTY by default, so stdin.isatty() returns False + # This simulates the behavior when stdin is piped + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt + + runner.invoke(cli, ["pipe"]) + + assert mock_call.called + args = mock_call.call_args[0] + # Should auto-enable input when stdin is not a TTY (CliRunner default behavior) + assert args[2] is True # input_enabled + + +def test_pipe_command_no_auto_detection_with_no_input_flag(pyserial_client): + """Test that pipe command doesn't enable input with --no-input flag.""" + runner = CliRunner() + cli = pyserial_client.cli() + + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt + + runner.invoke(cli, ["pipe", "--no-input"]) + + assert mock_call.called + args = mock_call.call_args[0] + # Should NOT enable input when --no-input is specified + assert args[2] is False # input_enabled + + +def test_pipe_command_status_messages(pyserial_client): + """Test that pipe command prints appropriate status messages.""" + runner = CliRunner() + cli = pyserial_client.cli() + + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt + + # Test read-only mode (with --no-input flag) + result = runner.invoke(cli, ["pipe", "--no-input"]) + assert "Reading from serial port" in result.output + assert "Ctrl+C to exit" in result.output + + # Test bidirectional mode (CliRunner stdin is not a TTY, so it auto-detects) + result = runner.invoke(cli, ["pipe"]) + assert "Bidirectional mode" in result.output or "auto-detected" in result.output + + +def test_pipe_command_with_file_and_input(pyserial_client): + """Test pipe command with both file output and input.""" + runner = CliRunner() + cli = pyserial_client.cli() + + with runner.isolated_filesystem(): + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt + + with patch("sys.stdin.isatty", return_value=False): + runner.invoke(cli, ["pipe", "-o", "test.log"]) + + assert mock_call.called + args = mock_call.call_args[0] + assert args[1] == "test.log" # output file + assert args[2] is True # input_enabled (auto-detected) + assert args[3] is False # append + + +def test_pipe_command_keyboard_interrupt_handling(pyserial_client): + """Test that pipe command handles KeyboardInterrupt gracefully.""" + runner = CliRunner() + cli = pyserial_client.cli() + + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt + + result = runner.invoke(cli, ["pipe"]) + + # Should exit cleanly after KeyboardInterrupt + assert "Stopped" in result.output or result.exit_code == 0 + + +def test_pipe_command_mode_descriptions(pyserial_client): + """Test that pipe command shows correct mode descriptions.""" + runner = CliRunner() + cli = pyserial_client.cli() + + with patch.object(pyserial_client.portal, "call") as mock_call: + mock_call.side_effect = KeyboardInterrupt + + # Test auto-detected mode (CliRunner stdin is not a TTY) + result = runner.invoke(cli, ["pipe"]) + assert "auto-detected" in result.output.lower() + + # Test forced input mode + result = runner.invoke(cli, ["pipe", "-i"]) + assert "forced input" in result.output.lower() or "bidirectional" in result.output.lower() + + # Test read-only mode (with --no-input flag) + result = runner.invoke(cli, ["pipe", "--no-input"]) + assert "read-only" in result.output.lower() + + +def test_start_console_command_structure(pyserial_client): + """Test that start-console command has the correct structure.""" + cli = pyserial_client.cli() + + # Click converts underscores to hyphens in command names + cmd_name = "start-console" if "start-console" in cli.commands else "start_console" + console_cmd = cli.commands[cmd_name] + + assert console_cmd is not None + assert hasattr(console_cmd, "callback") + + +def test_cli_base_command(pyserial_client): + """Test that base CLI command works.""" + runner = CliRunner() + cli = pyserial_client.cli() + + result = runner.invoke(cli, ["--help"]) + assert result.exit_code == 0 + assert "Serial port client" in result.output or "Commands:" in result.output + + +@pytest.mark.anyio +async def test_serial_to_output_handles_end_of_stream(pyserial_client): + """Test that _serial_to_output handles EndOfStream gracefully.""" + # Create a mock stream that raises EndOfStream + mock_stream = AsyncMock() + mock_stream.receive.side_effect = EndOfStream + + # Capture the click output + from click.testing import CliRunner + + runner = CliRunner() + with runner.isolated_filesystem(): + # Call _serial_to_output directly + await pyserial_client._serial_to_output(mock_stream, None, False) + + # The method should have caught EndOfStream and not raised it + # (we're testing that it doesn't propagate) + + +@pytest.mark.anyio +async def test_serial_to_output_handles_broken_resource(pyserial_client): + """Test that _serial_to_output handles BrokenResourceError gracefully.""" + # Create a mock stream that raises BrokenResourceError + mock_stream = AsyncMock() + mock_stream.receive.side_effect = BrokenResourceError + + # Capture the click output + from click.testing import CliRunner + + runner = CliRunner() + with runner.isolated_filesystem(): + # Call _serial_to_output directly + await pyserial_client._serial_to_output(mock_stream, None, False) + + # The method should have caught BrokenResourceError and not raised it + # (we're testing that it doesn't propagate) + + +@pytest.mark.anyio +async def test_serial_to_output_end_of_stream_with_file(pyserial_client): + """Test that _serial_to_output handles EndOfStream when writing to file.""" + # Create a mock stream that raises EndOfStream + mock_stream = AsyncMock() + mock_stream.receive.side_effect = EndOfStream + + from click.testing import CliRunner + + runner = CliRunner() + with runner.isolated_filesystem(): + # Call _serial_to_output with a file output + await pyserial_client._serial_to_output(mock_stream, "test.log", False) + + # The file should have been created (even if empty) + import os + + assert os.path.exists("test.log") + + +@pytest.mark.anyio +async def test_serial_to_output_broken_resource_with_file(pyserial_client): + """Test that _serial_to_output handles BrokenResourceError when writing to file.""" + # Create a mock stream that raises BrokenResourceError + mock_stream = AsyncMock() + mock_stream.receive.side_effect = BrokenResourceError + + from click.testing import CliRunner + + runner = CliRunner() + with runner.isolated_filesystem(): + # Call _serial_to_output with a file output + await pyserial_client._serial_to_output(mock_stream, "test.log", False) + + # The file should have been created (even if empty) + import os + + assert os.path.exists("test.log") + + +@pytest.mark.anyio +async def test_serial_to_output_receives_data_then_end_of_stream(pyserial_client): + """Test that _serial_to_output successfully receives data before EndOfStream.""" + # Create a mock stream that returns data then raises EndOfStream + mock_stream = AsyncMock() + mock_stream.receive.side_effect = [b"Hello", b"World", EndOfStream] + + from click.testing import CliRunner + + runner = CliRunner() + with runner.isolated_filesystem(): + # Call _serial_to_output with a file output + await pyserial_client._serial_to_output(mock_stream, "test.log", False) + + # Verify the file contains the data + with open("test.log", "rb") as f: + content = f.read() + assert content == b"HelloWorld" + diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/client.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/client.py index 7a4f643eb..9fc25eb0e 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/client.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/client.py @@ -1,6 +1,10 @@ +import sys from contextlib import contextmanager +from typing import Optional import click +from anyio import BrokenResourceError, EndOfStream, create_task_group, open_file, sleep +from anyio.streams.file import FileReadStream from jumpstarter_driver_network.adapters import PexpectAdapter from pexpect.fdpexpect import fdspawn @@ -36,7 +40,71 @@ def pexpect(self): with PexpectAdapter(client=self) as adapter: yield adapter - def cli(self): + async def _pipe_serial( + self, + output_file: Optional[str] = None, + input_enabled: bool = False, + append: bool = False, + ): + """ + Pipe serial port data to stdout or a file, optionally reading from stdin. + + Args: + output_file: Path to output file. If None, writes to stdout. + input_enabled: If True, also pipe stdin to serial port. + append: If True, append to file instead of overwriting. + """ + async with self.stream_async(method="connect") as stream: + async with create_task_group() as tg: + # Output task: serial -> file/stdout + tg.start_soon(self._serial_to_output, stream, output_file, append) + + # Input task: stdin -> serial (optional) + if input_enabled: + tg.start_soon(self._stdin_to_serial, stream) + + # Keep running until interrupted (Ctrl+C) + # When input is enabled, this continues even after stdin EOF + while True: + await sleep(1) + + async def _serial_to_output(self, stream, output_file: Optional[str], append: bool): + """Read from serial and write to file or stdout.""" + try: + if output_file: + mode = "ab" if append else "wb" + async with await open_file(output_file, mode) as f: + while True: + data = await stream.receive() + await f.write(data) + await f.flush() + else: + while True: + data = await stream.receive() + sys.stdout.buffer.write(data) + sys.stdout.buffer.flush() + except EndOfStream: + click.echo("\nSerial connection closed normally (end of stream).", err=True) + except BrokenResourceError: + click.echo( + "\nSerial connection lost (broken resource). The connection may have been interrupted.", err=True + ) + + async def _stdin_to_serial(self, stream): + """Read from stdin and write to serial. Returns when stdin reaches EOF.""" + stdin = FileReadStream(sys.stdin.buffer) + try: + while True: + data = await stdin.receive(max_bytes=1024) + if not data: + # EOF on stdin, just stop reading but keep serial output running + return + await stream.send(data) + except EndOfStream: + # EOF on stdin, just stop reading but keep serial output running + return + + def cli(self): # noqa: C901 @driver_click_group(self) def base(): """Serial port client""" @@ -49,4 +117,98 @@ def start_console(): console = Console(serial_client=self) console.run() + @base.command() + @click.option( + "-o", "--output", + type=click.Path(), + default=None, + help="Output file path. If not specified, writes to stdout.", + ) + @click.option( + "-i", "--input", + "input_flag", + is_flag=True, + default=None, + help="Force enable stdin to serial port. Auto-detected if stdin is piped.", + ) + @click.option( + "--no-input", + is_flag=True, + default=False, + help="Disable stdin to serial port, even if stdin is piped.", + ) + @click.option( + "-a", "--append", + is_flag=True, + default=False, + help="Append to output file instead of overwriting.", + ) + def pipe(output, input_flag, no_input, append): # noqa: C901 + """Pipe serial port data to stdout or file. + + By default, reads from the serial port and writes to stdout. + Automatically detects if stdin is piped and enables bidirectional mode. + + When stdin is used, commands are sent until EOF, then continues + monitoring serial output until Ctrl+C. + + Use -o/--output to write to a file instead. + Use -i/--input to force enable stdin to serial (auto-detected). + Use --no-input to disable stdin even when piped. + + Exit with Ctrl+C. + + Examples: + + j serial pipe # Log serial output to stdout + + j serial pipe -o serial.log # Log serial output to a file + + echo "hello" | j serial pipe # Send to serial, continue monitoring + + cat commands.txt | j serial pipe -o serial.log # Send commands, log output + """ + if append and not output: + raise click.UsageError("--append requires --output") + + if input_flag and no_input: + raise click.UsageError("Cannot use both --input and --no-input") + + # Auto-detect stdin: if it's not a TTY (i.e., piped or redirected), enable input + stdin_is_piped = not sys.stdin.isatty() + + # Determine if input should be enabled + if no_input: + input_enabled = False + elif input_flag: + input_enabled = True + else: + input_enabled = stdin_is_piped + + # Show appropriate status message + if input_enabled and stdin_is_piped and not input_flag: + mode_desc = "auto-detected piped stdin" + elif input_enabled and input_flag: + mode_desc = "forced input mode" + elif input_enabled: + mode_desc = "input enabled" + else: + mode_desc = "read-only" + + if not output and not input_enabled: + click.echo(f"Reading from serial port ({mode_desc})... (Ctrl+C to exit)", err=True) + elif not output and input_enabled: + msg = f"Bidirectional mode ({mode_desc}): stdin→serial, serial→stdout (Ctrl+C to exit)" + click.echo(msg, err=True) + elif output and not input_enabled: + click.echo(f"Logging serial output to {output} ({mode_desc})... (Ctrl+C to exit)", err=True) + else: + msg = f"Bidirectional mode ({mode_desc}) with logging to {output}... (Ctrl+C to exit)" + click.echo(msg, err=True) + + try: + self.portal.call(self._pipe_serial, output, input_enabled, append) + except KeyboardInterrupt: + click.echo("\nStopped.", err=True) + return base