From 87221e1c849d0646e142364643949072ac2602cd Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Wed, 14 Jan 2026 17:47:36 +0100 Subject: [PATCH 1/9] Add nvdemux support to serial --- .../jumpstarter-driver-pyserial/README.md | 80 +++++ .../nvdemux/__init__.py | 0 .../nvdemux/driver.py | 276 ++++++++++++++++++ .../nvdemux/driver_test.py | 276 ++++++++++++++++++ 4 files changed, 632 insertions(+) create mode 100644 packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/__init__.py create mode 100644 packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py create mode 100644 packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py diff --git a/packages/jumpstarter-driver-pyserial/README.md b/packages/jumpstarter-driver-pyserial/README.md index eeaf4e1a9..d0b7cbf4d 100644 --- a/packages/jumpstarter-driver-pyserial/README.md +++ b/packages/jumpstarter-driver-pyserial/README.md @@ -33,6 +33,86 @@ export: | check_present | Check if the serial port exists during exporter initialization, disable if you are connecting to a dynamically created port (i.e. USB from your DUT) | bool | no | True | | cps | Characters per second throttling limit. When set, data transmission will be throttled to simulate slow typing. Useful for devices that can't handle fast input | float | no | None | +## NVDemuxSerial Driver + +The `NVDemuxSerial` driver provides serial access to NVIDIA Tegra demultiplexed UART channels using the [nv_tcu_demuxer](https://docs.nvidia.com/jetson/archives/r38.2.1/DeveloperGuide/AT/JetsonLinuxDevelopmentTools/TegraCombinedUART.html) tool. It automatically handles device reconnection when the target device restarts. + +> **⚠️ Important**: This driver currently supports only a single demuxed port per exporter. + +### Configuration + +Example configuration: + +```yaml +export: + ccplex: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + # device defaults to auto-detect NVIDIA Tegra On-Platform Operator + # chip defaults to T264 (Thor), use T234 for Orin +``` + +### Config parameters + +| Parameter | Description | Type | Required | Default | +| -------------- | ----------------------------------------------------------------------------------------------- | ----- | -------- | ------------------------------------------------------------------------- | +| demuxer_path | Path to the `nv_tcu_demuxer` binary | str | yes | | +| device | Device path or glob pattern for auto-detection | str | no | `/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01` | +| target | Target channel to extract from demuxer output | str | no | `CCPLEX: 0` | +| chip | Chip type for demuxer (`T234` for Orin, `T264` for Thor) | str | no | `T264` | +| baudrate | Baud rate for the serial connection | int | no | 115200 | +| cps | Characters per second throttling limit | float | no | None | +| timeout | Timeout in seconds waiting for demuxer to detect pts | float | no | 10.0 | +| poll_interval | Interval in seconds to poll for device reappearance after disconnect | float | no | 1.0 | + +### Device Auto-Detection + +The `device` parameter supports glob patterns for automatic device discovery: + +```yaml +# Auto-detect any NVIDIA Tegra On-Platform Operator device (default) +device: "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01" + +# Specific serial number +device: "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_ABC123-if01" + +# Direct device path (no glob) +device: "/dev/ttyUSB0" +``` + +### Auto-Recovery + +When the target device restarts (e.g., power cycle), the serial device disappears and the demuxer exits. The driver automatically: + +1. Detects the device disconnection +2. Polls for the device to reappear +3. Restarts the demuxer with the new device +4. Discovers the new pts path (which changes on each restart) + +Active connections will receive errors when the device disconnects. Clients should reconnect, and the driver will wait for the device to be available again. + +### Limitations + +**Single Port Per Exporter**: The driver can only manage one demuxed channel at a time. The following configuration will **NOT work**: + +```yaml +# ❌ This will NOT work - both drivers will conflict +export: + ccplex: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + target: "CCPLEX: 0" + bpmp: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + target: "BPMP: 1" # This will conflict with the CCPLEX instance +``` + +To access multiple channels, use separate exporters or modify the driver to support multiple channels. + ## CLI Commands The pyserial driver provides two CLI commands for interacting with serial ports: diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/__init__.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py new file mode 100644 index 000000000..fa77603cb --- /dev/null +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py @@ -0,0 +1,276 @@ +import glob +import os +import subprocess +import threading +import time +from contextlib import asynccontextmanager +from dataclasses import dataclass, field +from typing import Optional + +from anyio import sleep +from anyio._backends._asyncio import StreamReaderWrapper, StreamWriterWrapper +from serial_asyncio import open_serial_connection + +from ..driver import AsyncSerial +from jumpstarter.driver import Driver, exportstream + +# Default glob pattern for NVIDIA Tegra On-Platform Operator devices +NV_DEVICE_PATTERN = "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01" + + +def _has_glob_chars(path: str) -> bool: + """Check if path contains glob wildcard characters.""" + return any(c in path for c in ("*", "?", "[")) + + +def _resolve_device(pattern: str, logger) -> str | None: + """Resolve a device path or glob pattern to an actual device path. + + Returns None if no device found. + """ + if _has_glob_chars(pattern): + matches = sorted(glob.glob(pattern)) + if not matches: + return None + if len(matches) > 1: + logger.warning("Multiple devices match pattern '%s': %s. Using first: %s", pattern, matches, matches[0]) + return matches[0] + else: + # Direct path - check if exists + if os.path.exists(pattern): + return pattern + return None + + +@dataclass(kw_only=True) +class NVDemuxSerial(Driver): + """Serial driver for NVIDIA TCU demultiplexed UART channels. + + This driver wraps the nv_tcu_demuxer tool to extract a specific demultiplexed + UART channel (like CCPLEX) from a multiplexed serial device. It automatically + handles device reconnection when the target device restarts. + + Args: + demuxer_path: Path to the nv_tcu_demuxer binary + device: Device path or glob pattern for auto-detection. + Default: /dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01 + target: Target channel to extract (e.g., "CCPLEX: 0") + chip: Chip type for demuxer (T234 for Orin, T264 for Thor) + baudrate: Baud rate for the serial connection + cps: Characters per second throttling (optional) + timeout: Timeout waiting for demuxer to detect pts + poll_interval: Interval to poll for device reappearance after disconnect + """ + + demuxer_path: str + device: str = field(default=NV_DEVICE_PATTERN) + target: str = field(default="CCPLEX: 0") + chip: str = field(default="T264") + baudrate: int = field(default=115200) + cps: Optional[float] = field(default=None) + timeout: float = field(default=10.0) + poll_interval: float = field(default=1.0) + + # Internal state (not init params) + _ready: threading.Event = field(init=False, default_factory=threading.Event) + _shutdown: threading.Event = field(init=False, default_factory=threading.Event) + _lock: threading.Lock = field(init=False, default_factory=threading.Lock) + _pts_path: Optional[str] = field(init=False, default=None) + _process: Optional[subprocess.Popen] = field(init=False, default=None) + _monitor_thread: Optional[threading.Thread] = field(init=False, default=None) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + + # Start the monitor thread + self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True, name="NVDemuxSerial-monitor") + self._monitor_thread.start() + + # Wait for initial ready state (with timeout) + if not self._ready.wait(timeout=self.timeout): + self.logger.warning("Timeout waiting for demuxer to become ready during initialization") + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_pyserial.client.PySerialClient" + + def _monitor_loop(self): + """Background thread that manages demuxer lifecycle and auto-recovery.""" + while not self._shutdown.is_set(): + try: + self._run_demuxer_cycle() + except Exception as e: + self.logger.error("Error in demuxer monitor loop: %s", e) + # Clear ready state on error + with self._lock: + self._pts_path = None + self._ready.clear() + # Wait before retrying + if self._shutdown.wait(timeout=self.poll_interval): + break + + def _wait_for_device(self) -> str | None: + """Wait for device to appear. Returns resolved device path or None if shutdown.""" + while not self._shutdown.is_set(): + resolved_device = _resolve_device(self.device, self.logger) + if resolved_device: + self.logger.info("Found device: %s", resolved_device) + return resolved_device + self.logger.debug("Device not found, polling... (pattern: %s)", self.device) + if self._shutdown.wait(timeout=self.poll_interval): + return None + return None + + def _start_demuxer_process(self, device: str) -> bool: + """Start the demuxer process. Returns True on success.""" + cmd = [self.demuxer_path, "-m", self.chip, "-d", device] + self.logger.info("Starting demuxer: %s", " ".join(cmd)) + + try: + self._process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, # Line buffered + ) + return True + except (FileNotFoundError, PermissionError) as e: + self.logger.error("Failed to start demuxer: %s", e) + return False + + def _parse_pts_from_line(self, line: str) -> str | None: + """Parse a line for pts path matching the target. Returns pts path or None.""" + if self.target not in line: + return None + + parts = line.split("\t") + if len(parts) < 2: + return None + + # Find the pts path (starts with /dev/) that's paired with our target + for i, part in enumerate(parts): + if self.target in part: + for j, other_part in enumerate(parts): + if i != j and other_part.startswith("/dev/"): + return other_part + return None + + def _read_demuxer_output(self): + """Read demuxer stdout and parse for pts path.""" + pts_found = False + try: + for line in iter(self._process.stdout.readline, ""): + if self._shutdown.is_set(): + break + + line = line.strip() + if not line: + continue + + self.logger.debug("Demuxer output: %s", line) + + if not pts_found: + pts_path = self._parse_pts_from_line(line) + if pts_path: + self.logger.info("Found pts path for target '%s': %s", self.target, pts_path) + with self._lock: + self._pts_path = pts_path + self._ready.set() + pts_found = True + except Exception as e: + self.logger.error("Error reading demuxer output: %s", e) + + def _cleanup_demuxer_process(self): + """Clean up the demuxer process and clear ready state.""" + if self._process: + try: + self._process.wait(timeout=1.0) + except subprocess.TimeoutExpired: + self._process.kill() + self._process.wait() + + exit_code = self._process.returncode + self.logger.info("Demuxer process exited with code %s", exit_code) + self._process = None + + with self._lock: + self._pts_path = None + self._ready.clear() + + def _run_demuxer_cycle(self): + """Run one cycle of: find device -> start demuxer -> monitor until exit.""" + resolved_device = self._wait_for_device() + if not resolved_device or self._shutdown.is_set(): + return + + if not self._start_demuxer_process(resolved_device): + self._shutdown.wait(timeout=self.poll_interval) + return + + self._read_demuxer_output() + self._cleanup_demuxer_process() + + if not self._shutdown.is_set(): + self.logger.info("Device disconnected, will poll for reconnection...") + + def close(self): + """Stop the demuxer and monitor thread.""" + self._shutdown.set() + + # Terminate demuxer process if running + if self._process: + self.logger.info("Terminating demuxer process...") + try: + self._process.terminate() + self._process.wait(timeout=5.0) + except subprocess.TimeoutExpired: + self._process.kill() + self._process.wait() + self._process = None + + # Wait for monitor thread to exit + if self._monitor_thread and self._monitor_thread.is_alive(): + self._monitor_thread.join(timeout=2.0) + + super().close() + + @exportstream + @asynccontextmanager + async def connect(self): + """Connect to the demultiplexed serial port. + + Waits for the demuxer to be ready (device connected and pts path discovered) + before opening the serial connection. + """ + # Wait for ready state + start_time = time.monotonic() + while not self._ready.is_set(): + elapsed = time.monotonic() - start_time + if elapsed >= self.timeout: + raise TimeoutError( + f"Timeout waiting for demuxer to become ready (device pattern: {self.device})" + ) + # Use a short sleep to allow checking ready state + await sleep(0.1) + + # Get the current pts path + with self._lock: + pts_path = self._pts_path + + if not pts_path: + raise RuntimeError("Demuxer ready but no pts path available") + + cps_info = f", cps: {self.cps}" if self.cps is not None else "" + self.logger.info("Connecting to %s, baudrate: %d%s", pts_path, self.baudrate, cps_info) + + reader, writer = await open_serial_connection(url=pts_path, baudrate=self.baudrate, limit=1) + writer.transport.set_write_buffer_limits(high=4096, low=0) + async with AsyncSerial( + reader=StreamReaderWrapper(reader), + writer=StreamWriterWrapper(writer), + cps=self.cps, + ) as stream: + yield stream + self.logger.info("Disconnected from %s", pts_path) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py new file mode 100644 index 000000000..16fed12f0 --- /dev/null +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py @@ -0,0 +1,276 @@ +import os +import tempfile +import time +from unittest.mock import MagicMock, patch + +from .driver import NVDemuxSerial, _has_glob_chars, _resolve_device + + +def test_has_glob_chars(): + """Test glob character detection.""" + assert _has_glob_chars("/dev/ttyUSB*") is True + assert _has_glob_chars("/dev/serial/by-id/usb-NVIDIA_*-if01") is True + assert _has_glob_chars("/dev/tty[0-9]") is True + assert _has_glob_chars("/dev/ttyUSB?") is True + assert _has_glob_chars("/dev/ttyUSB0") is False + assert _has_glob_chars("/dev/serial/by-id/usb-NVIDIA_ABC123-if01") is False + + +def test_resolve_device_direct_path(): + """Test device resolution with direct path.""" + with tempfile.NamedTemporaryFile() as f: + logger = MagicMock() + result = _resolve_device(f.name, logger) + assert result == f.name + + +def test_resolve_device_nonexistent(): + """Test device resolution with non-existent path.""" + logger = MagicMock() + result = _resolve_device("/dev/nonexistent_device_12345", logger) + assert result is None + + +def test_resolve_device_glob_pattern(): + """Test device resolution with glob pattern.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create some test files + for name in ["device_001", "device_002", "device_003"]: + open(os.path.join(tmpdir, name), "w").close() + + logger = MagicMock() + pattern = os.path.join(tmpdir, "device_*") + result = _resolve_device(pattern, logger) + + # Should return first match (sorted) + assert result == os.path.join(tmpdir, "device_001") + + +def test_resolve_device_glob_multiple_warns(): + """Test that multiple glob matches logs a warning.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create multiple test files + for name in ["device_001", "device_002"]: + open(os.path.join(tmpdir, name), "w").close() + + logger = MagicMock() + pattern = os.path.join(tmpdir, "device_*") + _resolve_device(pattern, logger) + + # Should have logged a warning about multiple matches + logger.warning.assert_called_once() + assert "Multiple devices" in logger.warning.call_args[0][0] + + +def test_resolve_device_glob_no_match(): + """Test device resolution with glob pattern that matches nothing.""" + logger = MagicMock() + result = _resolve_device("/dev/nonexistent_pattern_*", logger) + assert result is None + + +class MockPopen: + """Mock subprocess.Popen for testing NVDemuxSerial.""" + + def __init__(self, stdout_lines, returncode=0, delay_per_line=0.01, block_after_lines=False): + self.stdout_lines = stdout_lines + self.returncode = returncode + self.delay_per_line = delay_per_line + self.block_after_lines = block_after_lines + self._line_index = 0 + self.stdout = self + self.stderr = MagicMock() + self._terminated = False + + def readline(self): + if self._terminated: + return "" + if self._line_index >= len(self.stdout_lines): + if self.block_after_lines: + # Block until terminated (simulates long-running process) + while not self._terminated: + time.sleep(0.01) + return "" + time.sleep(self.delay_per_line) + line = self.stdout_lines[self._line_index] + self._line_index += 1 + return line + "\n" + + def wait(self, timeout=None): + return self.returncode + + def terminate(self): + self._terminated = True + + def kill(self): + self._terminated = True + + +def test_nvdemux_parse_pts_from_line(): + """Test pts path parsing from demuxer output lines.""" + with tempfile.NamedTemporaryFile() as device_file: + # Create a mock demuxer that outputs pts info + stdout_lines = [ + "Starting demuxer...", + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines) + + driver = NVDemuxSerial( + demuxer_path="/usr/bin/fake_demuxer", + device=device_file.name, + target="CCPLEX: 0", + timeout=2.0, + ) + + try: + # Wait for pts to be discovered + assert driver._ready.wait(timeout=2.0), "Ready event not set" + assert driver._pts_path == "/dev/pts/5" + finally: + driver.close() + + +def test_nvdemux_different_target(): + """Test selecting a different target channel.""" + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = [ + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + "/dev/pts/7\tSCE: 2", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines) + + driver = NVDemuxSerial( + demuxer_path="/usr/bin/fake_demuxer", + device=device_file.name, + target="BPMP: 1", + timeout=2.0, + ) + + try: + assert driver._ready.wait(timeout=2.0), "Ready event not set" + assert driver._pts_path == "/dev/pts/6" + finally: + driver.close() + + +def test_nvdemux_timeout_no_target(): + """Test timeout when target is never found.""" + with tempfile.NamedTemporaryFile() as device_file: + # Output that doesn't contain our target, and block after to prevent restart loops + stdout_lines = [ + "/dev/pts/5\tOTHER: 0", + "/dev/pts/6\tANOTHER: 1", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: + # Use block_after_lines=True to prevent the monitor thread from looping + mock_proc = MockPopen(stdout_lines, block_after_lines=True) + mock_popen.return_value = mock_proc + + driver = NVDemuxSerial( + demuxer_path="/usr/bin/fake_demuxer", + device=device_file.name, + target="CCPLEX: 0", + timeout=0.5, + ) + + try: + # Should timeout since target is never found + assert not driver._ready.is_set() + assert driver._pts_path is None + finally: + driver.close() + + +def test_nvdemux_demuxer_args(): + """Test that demuxer is called with correct arguments.""" + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines) + + driver = NVDemuxSerial( + demuxer_path="/opt/nvidia/demuxer", + device=device_file.name, + chip="T234", + target="CCPLEX: 0", + timeout=2.0, + ) + + try: + driver._ready.wait(timeout=2.0) + + # Check that Popen was called with correct args + mock_popen.assert_called() + call_args = mock_popen.call_args + cmd = call_args[0][0] + + assert cmd[0] == "/opt/nvidia/demuxer" + assert "-m" in cmd + assert cmd[cmd.index("-m") + 1] == "T234" + assert "-d" in cmd + assert cmd[cmd.index("-d") + 1] == device_file.name + finally: + driver.close() + + +def test_nvdemux_default_values(): + """Test default parameter values.""" + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines) + + driver = NVDemuxSerial( + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + timeout=2.0, + ) + + try: + # Check defaults + assert driver.chip == "T264" + assert driver.target == "CCPLEX: 0" + assert driver.baudrate == 115200 + assert driver.poll_interval == 1.0 + finally: + driver.close() + + +def test_nvdemux_close_terminates_process(): + """Test that close() terminates the demuxer process.""" + with tempfile.NamedTemporaryFile() as device_file: + # Long-running output simulation + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + ["keep alive"] * 100 + + with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: + mock_proc = MockPopen(stdout_lines, delay_per_line=0.1) + mock_popen.return_value = mock_proc + + driver = NVDemuxSerial( + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + timeout=2.0, + ) + + # Wait for ready + driver._ready.wait(timeout=2.0) + + # Close should terminate the process + driver.close() + + assert mock_proc._terminated + + +def test_nvdemux_client_class(): + """Test that NVDemuxSerial uses PySerialClient.""" + assert NVDemuxSerial.client() == "jumpstarter_driver_pyserial.client.PySerialClient" From 9a424014109a65993331ec32a3e6058ae921a374 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Wed, 14 Jan 2026 17:58:58 +0100 Subject: [PATCH 2/9] nvdemux: support multiple instances --- .../jumpstarter-driver-pyserial/README.md | 58 ++- .../nvdemux/driver.py | 217 ++------ .../nvdemux/driver_test.py | 300 ++++------- .../nvdemux/manager.py | 395 ++++++++++++++ .../nvdemux/manager_test.py | 484 ++++++++++++++++++ 5 files changed, 1063 insertions(+), 391 deletions(-) create mode 100644 packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py create mode 100644 packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py diff --git a/packages/jumpstarter-driver-pyserial/README.md b/packages/jumpstarter-driver-pyserial/README.md index d0b7cbf4d..9d3ee0152 100644 --- a/packages/jumpstarter-driver-pyserial/README.md +++ b/packages/jumpstarter-driver-pyserial/README.md @@ -37,11 +37,13 @@ export: The `NVDemuxSerial` driver provides serial access to NVIDIA Tegra demultiplexed UART channels using the [nv_tcu_demuxer](https://docs.nvidia.com/jetson/archives/r38.2.1/DeveloperGuide/AT/JetsonLinuxDevelopmentTools/TegraCombinedUART.html) tool. It automatically handles device reconnection when the target device restarts. -> **⚠️ Important**: This driver currently supports only a single demuxed port per exporter. +### Multi-Instance Support + +Multiple driver instances can share a single demuxer process by specifying different target channels. This allows simultaneous access to multiple UART channels (CCPLEX, BPMP, SCE, etc.) from the same physical device. ### Configuration -Example configuration: +#### Single channel example: ```yaml export: @@ -53,6 +55,32 @@ export: # chip defaults to T264 (Thor), use T234 for Orin ``` +#### Multiple channels example: + +```yaml +export: + ccplex: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + target: "CCPLEX: 0" + chip: "T264" + + bpmp: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + target: "BPMP: 1" + chip: "T264" + + sce: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + target: "SCE: 2" + chip: "T264" +``` + ### Config parameters | Parameter | Description | Type | Required | Default | @@ -92,26 +120,18 @@ When the target device restarts (e.g., power cycle), the serial device disappear Active connections will receive errors when the device disconnects. Clients should reconnect, and the driver will wait for the device to be available again. -### Limitations +### Configuration Validation / Limitations -**Single Port Per Exporter**: The driver can only manage one demuxed channel at a time. The following configuration will **NOT work**: +When using multiple driver instances, all instances must have compatible configurations: + +- **demuxer_path**: Must be identical across all instances +- **device**: Must be identical across all instances +- **chip**: Must be identical across all instances +- **target**: Must be unique for each instance (no duplicates allowed) + +If these requirements are not met, the driver will raise a `ValueError` during initialization. -```yaml -# ❌ This will NOT work - both drivers will conflict -export: - ccplex: - type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial - config: - demuxer_path: "/opt/nvidia/nv_tcu_demuxer" - target: "CCPLEX: 0" - bpmp: - type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial - config: - demuxer_path: "/opt/nvidia/nv_tcu_demuxer" - target: "BPMP: 1" # This will conflict with the CCPLEX instance -``` -To access multiple channels, use separate exporters or modify the driver to support multiple channels. ## CLI Commands diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py index fa77603cb..f8570e1e0 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py @@ -1,6 +1,3 @@ -import glob -import os -import subprocess import threading import time from contextlib import asynccontextmanager @@ -12,54 +9,35 @@ from serial_asyncio import open_serial_connection from ..driver import AsyncSerial +from .manager import DemuxerManager from jumpstarter.driver import Driver, exportstream # Default glob pattern for NVIDIA Tegra On-Platform Operator devices NV_DEVICE_PATTERN = "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01" -def _has_glob_chars(path: str) -> bool: - """Check if path contains glob wildcard characters.""" - return any(c in path for c in ("*", "?", "[")) - - -def _resolve_device(pattern: str, logger) -> str | None: - """Resolve a device path or glob pattern to an actual device path. - - Returns None if no device found. - """ - if _has_glob_chars(pattern): - matches = sorted(glob.glob(pattern)) - if not matches: - return None - if len(matches) > 1: - logger.warning("Multiple devices match pattern '%s': %s. Using first: %s", pattern, matches, matches[0]) - return matches[0] - else: - # Direct path - check if exists - if os.path.exists(pattern): - return pattern - return None - - @dataclass(kw_only=True) class NVDemuxSerial(Driver): """Serial driver for NVIDIA TCU demultiplexed UART channels. This driver wraps the nv_tcu_demuxer tool to extract a specific demultiplexed - UART channel (like CCPLEX) from a multiplexed serial device. It automatically - handles device reconnection when the target device restarts. + UART channel (like CCPLEX) from a multiplexed serial device. Multiple driver + instances can share the same demuxer process by specifying different targets. Args: demuxer_path: Path to the nv_tcu_demuxer binary device: Device path or glob pattern for auto-detection. Default: /dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01 - target: Target channel to extract (e.g., "CCPLEX: 0") + target: Target channel to extract (e.g., "CCPLEX: 0", "BPMP: 1") chip: Chip type for demuxer (T234 for Orin, T264 for Thor) baudrate: Baud rate for the serial connection cps: Characters per second throttling (optional) timeout: Timeout waiting for demuxer to detect pts poll_interval: Interval to poll for device reappearance after disconnect + + Note: + Multiple instances can be created with different targets. All instances + must use the same demuxer_path, device, and chip configuration. """ demuxer_path: str @@ -73,19 +51,28 @@ class NVDemuxSerial(Driver): # Internal state (not init params) _ready: threading.Event = field(init=False, default_factory=threading.Event) - _shutdown: threading.Event = field(init=False, default_factory=threading.Event) - _lock: threading.Lock = field(init=False, default_factory=threading.Lock) - _pts_path: Optional[str] = field(init=False, default=None) - _process: Optional[subprocess.Popen] = field(init=False, default=None) - _monitor_thread: Optional[threading.Thread] = field(init=False, default=None) + _registered: bool = field(init=False, default=False) def __post_init__(self): if hasattr(super(), "__post_init__"): super().__post_init__() - # Start the monitor thread - self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True, name="NVDemuxSerial-monitor") - self._monitor_thread.start() + # Register with the DemuxerManager + manager = DemuxerManager.get_instance() + try: + manager.register_driver( + driver_id=str(self.uuid), + demuxer_path=self.demuxer_path, + device=self.device, + chip=self.chip, + target=self.target, + callback=self._on_target_ready, + poll_interval=self.poll_interval, + ) + self._registered = True + except ValueError as e: + self.logger.error("Failed to register with DemuxerManager: %s", e) + raise # Wait for initial ready state (with timeout) if not self._ready.wait(timeout=self.timeout): @@ -95,144 +82,22 @@ def __post_init__(self): def client(cls) -> str: return "jumpstarter_driver_pyserial.client.PySerialClient" - def _monitor_loop(self): - """Background thread that manages demuxer lifecycle and auto-recovery.""" - while not self._shutdown.is_set(): - try: - self._run_demuxer_cycle() - except Exception as e: - self.logger.error("Error in demuxer monitor loop: %s", e) - # Clear ready state on error - with self._lock: - self._pts_path = None - self._ready.clear() - # Wait before retrying - if self._shutdown.wait(timeout=self.poll_interval): - break - - def _wait_for_device(self) -> str | None: - """Wait for device to appear. Returns resolved device path or None if shutdown.""" - while not self._shutdown.is_set(): - resolved_device = _resolve_device(self.device, self.logger) - if resolved_device: - self.logger.info("Found device: %s", resolved_device) - return resolved_device - self.logger.debug("Device not found, polling... (pattern: %s)", self.device) - if self._shutdown.wait(timeout=self.poll_interval): - return None - return None - - def _start_demuxer_process(self, device: str) -> bool: - """Start the demuxer process. Returns True on success.""" - cmd = [self.demuxer_path, "-m", self.chip, "-d", device] - self.logger.info("Starting demuxer: %s", " ".join(cmd)) - - try: - self._process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - bufsize=1, # Line buffered - ) - return True - except (FileNotFoundError, PermissionError) as e: - self.logger.error("Failed to start demuxer: %s", e) - return False - - def _parse_pts_from_line(self, line: str) -> str | None: - """Parse a line for pts path matching the target. Returns pts path or None.""" - if self.target not in line: - return None - - parts = line.split("\t") - if len(parts) < 2: - return None + def _on_target_ready(self, target: str, pts_path: str): + """Callback invoked by DemuxerManager when target becomes ready. - # Find the pts path (starts with /dev/) that's paired with our target - for i, part in enumerate(parts): - if self.target in part: - for j, other_part in enumerate(parts): - if i != j and other_part.startswith("/dev/"): - return other_part - return None - - def _read_demuxer_output(self): - """Read demuxer stdout and parse for pts path.""" - pts_found = False - try: - for line in iter(self._process.stdout.readline, ""): - if self._shutdown.is_set(): - break - - line = line.strip() - if not line: - continue - - self.logger.debug("Demuxer output: %s", line) - - if not pts_found: - pts_path = self._parse_pts_from_line(line) - if pts_path: - self.logger.info("Found pts path for target '%s': %s", self.target, pts_path) - with self._lock: - self._pts_path = pts_path - self._ready.set() - pts_found = True - except Exception as e: - self.logger.error("Error reading demuxer output: %s", e) - - def _cleanup_demuxer_process(self): - """Clean up the demuxer process and clear ready state.""" - if self._process: - try: - self._process.wait(timeout=1.0) - except subprocess.TimeoutExpired: - self._process.kill() - self._process.wait() - - exit_code = self._process.returncode - self.logger.info("Demuxer process exited with code %s", exit_code) - self._process = None - - with self._lock: - self._pts_path = None - self._ready.clear() - - def _run_demuxer_cycle(self): - """Run one cycle of: find device -> start demuxer -> monitor until exit.""" - resolved_device = self._wait_for_device() - if not resolved_device or self._shutdown.is_set(): - return - - if not self._start_demuxer_process(resolved_device): - self._shutdown.wait(timeout=self.poll_interval) - return - - self._read_demuxer_output() - self._cleanup_demuxer_process() - - if not self._shutdown.is_set(): - self.logger.info("Device disconnected, will poll for reconnection...") + Args: + target: The target channel that became ready + pts_path: The pts path for this target + """ + self.logger.info("Target '%s' ready with pts path: %s", target, pts_path) + self._ready.set() def close(self): - """Stop the demuxer and monitor thread.""" - self._shutdown.set() - - # Terminate demuxer process if running - if self._process: - self.logger.info("Terminating demuxer process...") - try: - self._process.terminate() - self._process.wait(timeout=5.0) - except subprocess.TimeoutExpired: - self._process.kill() - self._process.wait() - self._process = None - - # Wait for monitor thread to exit - if self._monitor_thread and self._monitor_thread.is_alive(): - self._monitor_thread.join(timeout=2.0) + """Unregister from the DemuxerManager.""" + if self._registered: + manager = DemuxerManager.get_instance() + manager.unregister_driver(str(self.uuid)) + self._registered = False super().close() @@ -255,9 +120,9 @@ async def connect(self): # Use a short sleep to allow checking ready state await sleep(0.1) - # Get the current pts path - with self._lock: - pts_path = self._pts_path + # Get the current pts path from manager + manager = DemuxerManager.get_instance() + pts_path = manager.get_pts_path(str(self.uuid)) if not pts_path: raise RuntimeError("Demuxer ready but no pts path available") diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py index 16fed12f0..1ad5d68c7 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py @@ -1,223 +1,137 @@ -import os +"""Tests for NVDemuxSerial driver.""" + import tempfile import time from unittest.mock import MagicMock, patch -from .driver import NVDemuxSerial, _has_glob_chars, _resolve_device - - -def test_has_glob_chars(): - """Test glob character detection.""" - assert _has_glob_chars("/dev/ttyUSB*") is True - assert _has_glob_chars("/dev/serial/by-id/usb-NVIDIA_*-if01") is True - assert _has_glob_chars("/dev/tty[0-9]") is True - assert _has_glob_chars("/dev/ttyUSB?") is True - assert _has_glob_chars("/dev/ttyUSB0") is False - assert _has_glob_chars("/dev/serial/by-id/usb-NVIDIA_ABC123-if01") is False - - -def test_resolve_device_direct_path(): - """Test device resolution with direct path.""" - with tempfile.NamedTemporaryFile() as f: - logger = MagicMock() - result = _resolve_device(f.name, logger) - assert result == f.name - - -def test_resolve_device_nonexistent(): - """Test device resolution with non-existent path.""" - logger = MagicMock() - result = _resolve_device("/dev/nonexistent_device_12345", logger) - assert result is None - - -def test_resolve_device_glob_pattern(): - """Test device resolution with glob pattern.""" - with tempfile.TemporaryDirectory() as tmpdir: - # Create some test files - for name in ["device_001", "device_002", "device_003"]: - open(os.path.join(tmpdir, name), "w").close() - - logger = MagicMock() - pattern = os.path.join(tmpdir, "device_*") - result = _resolve_device(pattern, logger) - - # Should return first match (sorted) - assert result == os.path.join(tmpdir, "device_001") - - -def test_resolve_device_glob_multiple_warns(): - """Test that multiple glob matches logs a warning.""" - with tempfile.TemporaryDirectory() as tmpdir: - # Create multiple test files - for name in ["device_001", "device_002"]: - open(os.path.join(tmpdir, name), "w").close() - - logger = MagicMock() - pattern = os.path.join(tmpdir, "device_*") - _resolve_device(pattern, logger) - - # Should have logged a warning about multiple matches - logger.warning.assert_called_once() - assert "Multiple devices" in logger.warning.call_args[0][0] +from .driver import NVDemuxSerial -def test_resolve_device_glob_no_match(): - """Test device resolution with glob pattern that matches nothing.""" - logger = MagicMock() - result = _resolve_device("/dev/nonexistent_pattern_*", logger) - assert result is None - - -class MockPopen: - """Mock subprocess.Popen for testing NVDemuxSerial.""" - - def __init__(self, stdout_lines, returncode=0, delay_per_line=0.01, block_after_lines=False): - self.stdout_lines = stdout_lines - self.returncode = returncode - self.delay_per_line = delay_per_line - self.block_after_lines = block_after_lines - self._line_index = 0 - self.stdout = self - self.stderr = MagicMock() - self._terminated = False - - def readline(self): - if self._terminated: - return "" - if self._line_index >= len(self.stdout_lines): - if self.block_after_lines: - # Block until terminated (simulates long-running process) - while not self._terminated: - time.sleep(0.01) - return "" - time.sleep(self.delay_per_line) - line = self.stdout_lines[self._line_index] - self._line_index += 1 - return line + "\n" - - def wait(self, timeout=None): - return self.returncode - - def terminate(self): - self._terminated = True - - def kill(self): - self._terminated = True - - -def test_nvdemux_parse_pts_from_line(): - """Test pts path parsing from demuxer output lines.""" +def test_nvdemux_registration(): + """Test that driver registers with DemuxerManager on init.""" with tempfile.NamedTemporaryFile() as device_file: - # Create a mock demuxer that outputs pts info - stdout_lines = [ - "Starting demuxer...", - "/dev/pts/5\tCCPLEX: 0", - "/dev/pts/6\tBPMP: 1", - ] - - with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: - mock_popen.return_value = MockPopen(stdout_lines) + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager driver = NVDemuxSerial( - demuxer_path="/usr/bin/fake_demuxer", + demuxer_path="/usr/bin/demuxer", device=device_file.name, target="CCPLEX: 0", - timeout=2.0, + chip="T264", + timeout=0.1, ) try: - # Wait for pts to be discovered - assert driver._ready.wait(timeout=2.0), "Ready event not set" - assert driver._pts_path == "/dev/pts/5" + # Verify driver registered with manager + mock_manager.register_driver.assert_called_once() + call_kwargs = mock_manager.register_driver.call_args[1] + assert call_kwargs["driver_id"] == str(driver.uuid) + assert call_kwargs["demuxer_path"] == "/usr/bin/demuxer" + assert call_kwargs["device"] == device_file.name + assert call_kwargs["chip"] == "T264" + assert call_kwargs["target"] == "CCPLEX: 0" finally: driver.close() -def test_nvdemux_different_target(): - """Test selecting a different target channel.""" +def test_nvdemux_callback_sets_ready(): + """Test that callback from manager sets the ready event.""" with tempfile.NamedTemporaryFile() as device_file: - stdout_lines = [ - "/dev/pts/5\tCCPLEX: 0", - "/dev/pts/6\tBPMP: 1", - "/dev/pts/7\tSCE: 2", - ] - - with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: - mock_popen.return_value = MockPopen(stdout_lines) + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager driver = NVDemuxSerial( - demuxer_path="/usr/bin/fake_demuxer", + demuxer_path="/usr/bin/demuxer", device=device_file.name, - target="BPMP: 1", - timeout=2.0, + target="CCPLEX: 0", + timeout=0.1, ) try: - assert driver._ready.wait(timeout=2.0), "Ready event not set" - assert driver._pts_path == "/dev/pts/6" + # Get the callback that was registered + callback = mock_manager.register_driver.call_args[1]["callback"] + + # Initially not ready + assert not driver._ready.is_set() + + # Call the callback + callback("CCPLEX: 0", "/dev/pts/5") + + # Should now be ready + assert driver._ready.is_set() finally: driver.close() -def test_nvdemux_timeout_no_target(): - """Test timeout when target is never found.""" +def test_nvdemux_gets_pts_from_manager(): + """Test that connect() gets pts path from manager.""" with tempfile.NamedTemporaryFile() as device_file: - # Output that doesn't contain our target, and block after to prevent restart loops - stdout_lines = [ - "/dev/pts/5\tOTHER: 0", - "/dev/pts/6\tANOTHER: 1", - ] - - with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: - # Use block_after_lines=True to prevent the monitor thread from looping - mock_proc = MockPopen(stdout_lines, block_after_lines=True) - mock_popen.return_value = mock_proc + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager + mock_manager.get_pts_path.return_value = "/dev/pts/5" driver = NVDemuxSerial( - demuxer_path="/usr/bin/fake_demuxer", + demuxer_path="/usr/bin/demuxer", device=device_file.name, target="CCPLEX: 0", - timeout=0.5, + timeout=0.1, ) try: - # Should timeout since target is never found - assert not driver._ready.is_set() - assert driver._pts_path is None + # Trigger callback to set ready + callback = mock_manager.register_driver.call_args[1]["callback"] + callback("CCPLEX: 0", "/dev/pts/5") + + # Should call get_pts_path when checking pts availability + # (We can't test connect() easily without mocking serial, but we can test the logic) + pts_path = mock_manager.get_pts_path(str(driver.uuid)) + assert pts_path == "/dev/pts/5" finally: driver.close() -def test_nvdemux_demuxer_args(): - """Test that demuxer is called with correct arguments.""" +def test_nvdemux_unregisters_on_close(): + """Test that driver unregisters from manager on close.""" with tempfile.NamedTemporaryFile() as device_file: - stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager - with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: - mock_popen.return_value = MockPopen(stdout_lines) + driver = NVDemuxSerial( + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + target="CCPLEX: 0", + timeout=0.1, + ) + + driver_id = str(driver.uuid) + driver.close() + + # Verify driver unregistered + mock_manager.unregister_driver.assert_called_once_with(driver_id) + + +def test_nvdemux_timeout_no_callback(): + """Test timeout when callback is never invoked.""" + with tempfile.NamedTemporaryFile() as device_file: + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager driver = NVDemuxSerial( - demuxer_path="/opt/nvidia/demuxer", + demuxer_path="/usr/bin/demuxer", device=device_file.name, - chip="T234", target="CCPLEX: 0", - timeout=2.0, + timeout=0.1, ) try: - driver._ready.wait(timeout=2.0) - - # Check that Popen was called with correct args - mock_popen.assert_called() - call_args = mock_popen.call_args - cmd = call_args[0][0] - - assert cmd[0] == "/opt/nvidia/demuxer" - assert "-m" in cmd - assert cmd[cmd.index("-m") + 1] == "T234" - assert "-d" in cmd - assert cmd[cmd.index("-d") + 1] == device_file.name + # Callback is never invoked, so ready should not be set + time.sleep(0.2) + assert not driver._ready.is_set() finally: driver.close() @@ -225,15 +139,14 @@ def test_nvdemux_demuxer_args(): def test_nvdemux_default_values(): """Test default parameter values.""" with tempfile.NamedTemporaryFile() as device_file: - stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] - - with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: - mock_popen.return_value = MockPopen(stdout_lines) + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager driver = NVDemuxSerial( demuxer_path="/usr/bin/demuxer", device=device_file.name, - timeout=2.0, + timeout=0.1, ) try: @@ -246,29 +159,24 @@ def test_nvdemux_default_values(): driver.close() -def test_nvdemux_close_terminates_process(): - """Test that close() terminates the demuxer process.""" +def test_nvdemux_registration_error_propagates(): + """Test that registration errors are propagated.""" with tempfile.NamedTemporaryFile() as device_file: - # Long-running output simulation - stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + ["keep alive"] * 100 - - with patch("jumpstarter_driver_pyserial.nvdemux.driver.subprocess.Popen") as mock_popen: - mock_proc = MockPopen(stdout_lines, delay_per_line=0.1) - mock_popen.return_value = mock_proc - - driver = NVDemuxSerial( - demuxer_path="/usr/bin/demuxer", - device=device_file.name, - timeout=2.0, - ) - - # Wait for ready - driver._ready.wait(timeout=2.0) + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager + mock_manager.register_driver.side_effect = ValueError("Config mismatch") - # Close should terminate the process - driver.close() - - assert mock_proc._terminated + try: + _driver = NVDemuxSerial( + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + target="CCPLEX: 0", + timeout=0.1, + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "Config mismatch" in str(e) def test_nvdemux_client_class(): diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py new file mode 100644 index 000000000..888b02c52 --- /dev/null +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py @@ -0,0 +1,395 @@ +"""Singleton manager for NVIDIA TCU demuxer process. + +Manages a single shared demuxer process that can be accessed by multiple +NVDemuxSerial driver instances. Handles process lifecycle, device reconnection, +and distributes pts paths to registered drivers. +""" + +import glob +import logging +import os +import subprocess +import threading +from dataclasses import dataclass +from typing import Callable, Optional + +logger = logging.getLogger(__name__) + + +def _has_glob_chars(path: str) -> bool: + """Check if path contains glob wildcard characters.""" + return any(c in path for c in ("*", "?", "[")) + + +def _resolve_device(pattern: str) -> str | None: + """Resolve a device path or glob pattern to an actual device path. + + Returns None if no device found. + """ + if _has_glob_chars(pattern): + matches = sorted(glob.glob(pattern)) + if not matches: + return None + if len(matches) > 1: + logger.warning("Multiple devices match pattern '%s': %s. Using first: %s", pattern, matches, matches[0]) + return matches[0] + else: + # Direct path - check if exists + if os.path.exists(pattern): + return pattern + return None + + +@dataclass +class DriverInfo: + """Information about a registered driver.""" + + driver_id: str + target: str + callback: Callable[[str, str], None] # (target, pts_path) -> None + + +class DemuxerManager: + """Singleton manager for the NVIDIA TCU demuxer process. + + Manages a single shared demuxer process and distributes pts paths to + multiple driver instances based on their target channels. + """ + + _instance: Optional["DemuxerManager"] = None + _instance_lock = threading.Lock() + + def __init__(self): + """Private constructor. Use get_instance() instead.""" + self._lock = threading.Lock() + self._drivers: dict[str, DriverInfo] = {} + self._pts_map: dict[str, str] = {} # target -> pts_path + self._ready_targets: set[str] = set() + self._process: Optional[subprocess.Popen] = None + self._monitor_thread: Optional[threading.Thread] = None + self._shutdown = threading.Event() + + # Process configuration (must be same for all drivers) + self._demuxer_path: Optional[str] = None + self._device: Optional[str] = None + self._chip: Optional[str] = None + self._poll_interval: float = 1.0 + + @classmethod + def get_instance(cls) -> "DemuxerManager": + """Get the singleton instance of DemuxerManager.""" + if cls._instance is None: + with cls._instance_lock: + if cls._instance is None: + cls._instance = cls() + return cls._instance + + @classmethod + def reset_instance(cls): + """Reset the singleton instance. Used for testing.""" + with cls._instance_lock: + if cls._instance is not None: + cls._instance._cleanup() + cls._instance = None + + def _validate_config(self, demuxer_path: str, device: str, chip: str, target: str): + """Validate driver configuration against existing drivers. + + Raises: + ValueError: If configuration doesn't match or target is duplicate + """ + if self._demuxer_path != demuxer_path: + raise ValueError(f"Demuxer path mismatch: existing={self._demuxer_path}, new={demuxer_path}") + if self._device != device: + raise ValueError(f"Device mismatch: existing={self._device}, new={device}") + if self._chip != chip: + raise ValueError(f"Chip mismatch: existing={self._chip}, new={chip}") + + # Check for duplicate target + for existing_driver in self._drivers.values(): + if existing_driver.target == target: + raise ValueError(f"Target '{target}' already registered by another driver") + + def _notify_if_ready(self, target: str, callback: Callable[[str, str], None]): + """Notify driver immediately if target is already ready.""" + if target in self._ready_targets: + pts_path = self._pts_map.get(target) + if pts_path: + try: + callback(target, pts_path) + except Exception as e: + logger.error("Error in driver callback: %s", e) + + def register_driver( + self, + driver_id: str, + demuxer_path: str, + device: str, + chip: str, + target: str, + callback: Callable[[str, str], None], + poll_interval: float = 1.0, + ) -> None: + """Register a driver instance with the manager. + + Args: + driver_id: Unique identifier for the driver + demuxer_path: Path to nv_tcu_demuxer binary + device: Device path or glob pattern + chip: Chip type (T234 or T264) + target: Target channel (e.g., "CCPLEX: 0") + callback: Function to call when target becomes ready + poll_interval: Polling interval for device reconnection + + Raises: + ValueError: If configuration doesn't match existing process + """ + with self._lock: + # Validate configuration matches existing process + if self._drivers: + self._validate_config(demuxer_path, device, chip, target) + else: + # First driver - set process configuration + self._demuxer_path = demuxer_path + self._device = device + self._chip = chip + self._poll_interval = poll_interval + + # Register the driver + driver_info = DriverInfo(driver_id=driver_id, target=target, callback=callback) + self._drivers[driver_id] = driver_info + + logger.info("Registered driver %s for target '%s'", driver_id, target) + + # If target is already ready, notify immediately + self._notify_if_ready(target, callback) + + # Start monitor thread if this is the first driver + if len(self._drivers) == 1: + self._start_monitor() + + def unregister_driver(self, driver_id: str) -> None: + """Unregister a driver instance. + + Args: + driver_id: Unique identifier for the driver + """ + with self._lock: + if driver_id in self._drivers: + target = self._drivers[driver_id].target + del self._drivers[driver_id] + logger.info("Unregistered driver %s (target: %s)", driver_id, target) + + # Stop monitor thread if this was the last driver + if not self._drivers: + self._stop_monitor() + + def get_pts_path(self, driver_id: str) -> str | None: + """Get the pts path for a registered driver. + + Args: + driver_id: Unique identifier for the driver + + Returns: + The pts path or None if not available + """ + with self._lock: + if driver_id not in self._drivers: + return None + target = self._drivers[driver_id].target + return self._pts_map.get(target) + + def is_ready(self, target: str) -> bool: + """Check if a target is ready. + + Args: + target: Target channel to check + + Returns: + True if the target is ready + """ + with self._lock: + return target in self._ready_targets + + def _start_monitor(self): + """Start the monitor thread.""" + self._shutdown.clear() + self._monitor_thread = threading.Thread( + target=self._monitor_loop, daemon=True, name="DemuxerManager-monitor" + ) + self._monitor_thread.start() + logger.info("Started demuxer monitor thread") + + def _stop_monitor(self): + """Stop the monitor thread.""" + self._shutdown.set() + + # Terminate process if running + if self._process: + logger.info("Terminating demuxer process...") + try: + self._process.terminate() + self._process.wait(timeout=5.0) + except subprocess.TimeoutExpired: + self._process.kill() + self._process.wait() + self._process = None + + # Wait for monitor thread to exit + if self._monitor_thread and self._monitor_thread.is_alive(): + self._monitor_thread.join(timeout=2.0) + self._monitor_thread = None + + logger.info("Stopped demuxer monitor thread") + + def _cleanup(self): + """Clean up resources.""" + self._stop_monitor() + self._drivers.clear() + self._pts_map.clear() + self._ready_targets.clear() + + def _monitor_loop(self): + """Background thread that manages demuxer lifecycle and auto-recovery.""" + while not self._shutdown.is_set(): + try: + self._run_demuxer_cycle() + except Exception as e: + logger.error("Error in demuxer monitor loop: %s", e) + # Clear ready state on error + with self._lock: + self._pts_map.clear() + self._ready_targets.clear() + # Wait before retrying + if self._shutdown.wait(timeout=self._poll_interval): + break + + def _run_demuxer_cycle(self): + """Run one cycle of: find device -> start demuxer -> monitor until exit.""" + # Wait for device to appear + resolved_device = self._wait_for_device() + if not resolved_device or self._shutdown.is_set(): + return + + # Start demuxer process + if not self._start_demuxer_process(resolved_device): + self._shutdown.wait(timeout=self._poll_interval) + return + + # Read and parse demuxer output + self._read_demuxer_output() + + # Cleanup process + self._cleanup_demuxer_process() + + if not self._shutdown.is_set(): + logger.info("Device disconnected, will poll for reconnection...") + + def _wait_for_device(self) -> str | None: + """Wait for device to appear. Returns resolved device path or None if shutdown.""" + while not self._shutdown.is_set(): + resolved_device = _resolve_device(self._device) + if resolved_device: + logger.info("Found device: %s", resolved_device) + return resolved_device + logger.debug("Device not found, polling... (pattern: %s)", self._device) + if self._shutdown.wait(timeout=self._poll_interval): + return None + return None + + def _start_demuxer_process(self, device: str) -> bool: + """Start the demuxer process. Returns True on success.""" + cmd = [self._demuxer_path, "-m", self._chip, "-d", device] + logger.info("Starting demuxer: %s", " ".join(cmd)) + + try: + self._process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, # Line buffered + ) + return True + except (FileNotFoundError, PermissionError) as e: + logger.error("Failed to start demuxer: %s", e) + return False + + def _parse_demuxer_line(self, line: str) -> tuple[str | None, str | None]: + """Parse a demuxer output line to extract pts path and target. + + Returns: + Tuple of (pts_path, target) or (None, None) if not found + """ + parts = line.split("\t") + if len(parts) < 2: + return None, None + + pts_path = None + target = None + + for part in parts: + if part.startswith("/dev/"): + pts_path = part + elif ":" in part: # Targets have format "NAME: N" + target = part + + return pts_path, target + + def _read_demuxer_output(self): + """Read demuxer stdout and parse all pts paths.""" + try: + for line in iter(self._process.stdout.readline, ""): + if self._shutdown.is_set(): + break + + line = line.strip() + if not line: + continue + + logger.debug("Demuxer output: %s", line) + + # Parse line format: "\t" + pts_path, target = self._parse_demuxer_line(line) + + if pts_path and target: + logger.info("Found pts path for target '%s': %s", target, pts_path) + + # Update state immediately for this target + with self._lock: + self._pts_map[target] = pts_path + self._ready_targets.add(target) + + # Notify driver for this specific target + for driver_id, driver_info in self._drivers.items(): + if driver_info.target == target: + try: + driver_info.callback(target, pts_path) + except Exception as e: + logger.error("Error in driver %s callback: %s", driver_id, e) + break # Only one driver per target + + except Exception as e: + logger.error("Error reading demuxer output: %s", e) + + # Clear state when process ends + with self._lock: + self._pts_map.clear() + self._ready_targets.clear() + + def _cleanup_demuxer_process(self): + """Clean up the demuxer process and clear ready state.""" + if self._process: + try: + self._process.wait(timeout=1.0) + except subprocess.TimeoutExpired: + self._process.kill() + self._process.wait() + + exit_code = self._process.returncode + logger.info("Demuxer process exited with code %s", exit_code) + self._process = None + + with self._lock: + self._pts_map.clear() + self._ready_targets.clear() diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py new file mode 100644 index 000000000..941aae71a --- /dev/null +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py @@ -0,0 +1,484 @@ +"""Tests for DemuxerManager.""" + +import os +import tempfile +import time +from unittest.mock import MagicMock, patch + +from .manager import DemuxerManager, _has_glob_chars, _resolve_device + + +def test_has_glob_chars(): + """Test glob character detection.""" + assert _has_glob_chars("/dev/ttyUSB*") is True + assert _has_glob_chars("/dev/serial/by-id/usb-NVIDIA_*-if01") is True + assert _has_glob_chars("/dev/tty[0-9]") is True + assert _has_glob_chars("/dev/ttyUSB?") is True + assert _has_glob_chars("/dev/ttyUSB0") is False + assert _has_glob_chars("/dev/serial/by-id/usb-NVIDIA_ABC123-if01") is False + + +def test_resolve_device_direct_path(): + """Test device resolution with direct path.""" + with tempfile.NamedTemporaryFile() as f: + result = _resolve_device(f.name) + assert result == f.name + + +def test_resolve_device_nonexistent(): + """Test device resolution with non-existent path.""" + result = _resolve_device("/dev/nonexistent_device_12345") + assert result is None + + +def test_resolve_device_glob_pattern(): + """Test device resolution with glob pattern.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create some test files + for name in ["device_001", "device_002", "device_003"]: + open(os.path.join(tmpdir, name), "w").close() + + pattern = os.path.join(tmpdir, "device_*") + result = _resolve_device(pattern) + + # Should return first match (sorted) + assert result == os.path.join(tmpdir, "device_001") + + +def test_resolve_device_glob_no_match(): + """Test device resolution with glob pattern that matches nothing.""" + result = _resolve_device("/dev/nonexistent_pattern_*") + assert result is None + + +class MockPopen: + """Mock subprocess.Popen for testing.""" + + def __init__(self, stdout_lines, returncode=0, delay_per_line=0.01, block_after_lines=False): + self.stdout_lines = stdout_lines + self.returncode = returncode + self.delay_per_line = delay_per_line + self.block_after_lines = block_after_lines + self._line_index = 0 + self.stdout = self + self.stderr = MagicMock() + self._terminated = False + + def readline(self): + if self._terminated: + return "" + if self._line_index >= len(self.stdout_lines): + if self.block_after_lines: + # Block until terminated (simulates long-running process) + while not self._terminated: + time.sleep(0.01) + return "" + time.sleep(self.delay_per_line) + line = self.stdout_lines[self._line_index] + self._line_index += 1 + return line + "\n" + + def wait(self, timeout=None): + return self.returncode + + def terminate(self): + self._terminated = True + + def kill(self): + self._terminated = True + + +def test_manager_singleton(): + """Test that DemuxerManager is a singleton.""" + # Reset singleton for testing + DemuxerManager.reset_instance() + + manager1 = DemuxerManager.get_instance() + manager2 = DemuxerManager.get_instance() + + assert manager1 is manager2 + + # Cleanup + DemuxerManager.reset_instance() + + +def test_single_driver_registration(): + """Test registering a single driver.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = [ + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + callback = MagicMock() + + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + callback=callback, + ) + + # Wait for callback + time.sleep(0.5) + + # Verify callback was called + callback.assert_called_once_with("CCPLEX: 0", "/dev/pts/5") + + # Verify pts path is available + pts_path = manager.get_pts_path("driver1") + assert pts_path == "/dev/pts/5" + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_multiple_drivers_single_process(): + """Test that multiple drivers share a single demuxer process.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = [ + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + "/dev/pts/7\tSCE: 2", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + callback1 = MagicMock() + callback2 = MagicMock() + callback3 = MagicMock() + + # Register three drivers + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + callback=callback1, + ) + + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="BPMP: 1", + callback=callback2, + ) + + manager.register_driver( + driver_id="driver3", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="SCE: 2", + callback=callback3, + ) + + # Wait for callbacks + time.sleep(0.5) + + # Verify all callbacks were called + callback1.assert_called_once_with("CCPLEX: 0", "/dev/pts/5") + callback2.assert_called_once_with("BPMP: 1", "/dev/pts/6") + callback3.assert_called_once_with("SCE: 2", "/dev/pts/7") + + # Verify process was only started once + assert mock_popen.call_count == 1 + + # Verify all pts paths are available + assert manager.get_pts_path("driver1") == "/dev/pts/5" + assert manager.get_pts_path("driver2") == "/dev/pts/6" + assert manager.get_pts_path("driver3") == "/dev/pts/7" + + # Cleanup + manager.unregister_driver("driver1") + manager.unregister_driver("driver2") + manager.unregister_driver("driver3") + DemuxerManager.reset_instance() + + +def test_config_validation_demuxer_path_mismatch(): + """Test that mismatched demuxer_path raises error.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + callback=MagicMock(), + ) + + # Try to register second driver with different demuxer_path + try: + manager.register_driver( + driver_id="driver2", + demuxer_path="/opt/nvidia/demuxer", # Different path + device=device_file.name, + chip="T264", + target="BPMP: 1", + callback=MagicMock(), + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "Demuxer path mismatch" in str(e) + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_config_validation_device_mismatch(): + """Test that mismatched device raises error.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file1, tempfile.NamedTemporaryFile() as device_file2: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file1.name, + chip="T264", + target="CCPLEX: 0", + callback=MagicMock(), + ) + + # Try to register second driver with different device + try: + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file2.name, # Different device + chip="T264", + target="BPMP: 1", + callback=MagicMock(), + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "Device mismatch" in str(e) + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_config_validation_chip_mismatch(): + """Test that mismatched chip raises error.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + callback=MagicMock(), + ) + + # Try to register second driver with different chip + try: + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T234", # Different chip + target="BPMP: 1", + callback=MagicMock(), + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "Chip mismatch" in str(e) + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_duplicate_target_rejected(): + """Test that duplicate target registration is rejected.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + callback=MagicMock(), + ) + + # Try to register second driver with same target + try: + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", # Same target + callback=MagicMock(), + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "already registered" in str(e) + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_reference_counting(): + """Test that process starts/stops based on driver registration.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = [ + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_proc = MockPopen(stdout_lines, block_after_lines=True) + mock_popen.return_value = mock_proc + + manager = DemuxerManager.get_instance() + + # Initially no process should be running + assert manager._process is None + + # Register first driver - process should start + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + callback=MagicMock(), + ) + + time.sleep(0.2) + assert mock_popen.call_count == 1 + + # Register second driver - process should NOT restart + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="BPMP: 1", + callback=MagicMock(), + ) + + time.sleep(0.2) + assert mock_popen.call_count == 1 # Still just one call + + # Unregister first driver - process should continue + manager.unregister_driver("driver1") + time.sleep(0.2) + assert not mock_proc._terminated + + # Unregister second driver - process should stop + manager.unregister_driver("driver2") + time.sleep(0.2) + assert mock_proc._terminated + + # Cleanup + DemuxerManager.reset_instance() + + +def test_immediate_notification_for_ready_target(): + """Test that drivers are notified immediately if target is already ready.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = [ + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + callback1 = MagicMock() + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + callback=callback1, + ) + + time.sleep(0.5) + callback1.assert_called_once() + + # Register second driver for already-ready target - should be notified immediately + callback2 = MagicMock() + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="BPMP: 1", + callback=callback2, + ) + + # Should be called immediately without waiting + callback2.assert_called_once_with("BPMP: 1", "/dev/pts/6") + + # Cleanup + manager.unregister_driver("driver1") + manager.unregister_driver("driver2") + DemuxerManager.reset_instance() From 8bf0a2ecaed85abf136c8d3879df9d858673618d Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Wed, 14 Jan 2026 18:07:10 +0100 Subject: [PATCH 3/9] nvdemux: add instructions to obtain nv_tcu_demuxer Co-authored-by: Claude --- packages/jumpstarter-driver-pyserial/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/jumpstarter-driver-pyserial/README.md b/packages/jumpstarter-driver-pyserial/README.md index 9d3ee0152..cb9939087 100644 --- a/packages/jumpstarter-driver-pyserial/README.md +++ b/packages/jumpstarter-driver-pyserial/README.md @@ -37,6 +37,8 @@ export: The `NVDemuxSerial` driver provides serial access to NVIDIA Tegra demultiplexed UART channels using the [nv_tcu_demuxer](https://docs.nvidia.com/jetson/archives/r38.2.1/DeveloperGuide/AT/JetsonLinuxDevelopmentTools/TegraCombinedUART.html) tool. It automatically handles device reconnection when the target device restarts. +The nv_tcu_demuxer tool can be obtained from the NVIDIA Jetson BSP, at this path: `Linux_for_Tegra/tools/demuxer/nv_tcu_demuxer`. + ### Multi-Instance Support Multiple driver instances can share a single demuxer process by specifying different target channels. This allows simultaneous access to multiple UART channels (CCPLEX, BPMP, SCE, etc.) from the same physical device. From 52b2eb22c147b43a886d314ce6276d9975933a4b Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Thu, 15 Jan 2026 12:16:46 +0100 Subject: [PATCH 4/9] nvdemux: avoid callbacks within lock Co-authored-by: GPT 5.2 Codex --- .../nvdemux/manager.py | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py index 888b02c52..c1ddbb88a 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py @@ -110,15 +110,13 @@ def _validate_config(self, demuxer_path: str, device: str, chip: str, target: st if existing_driver.target == target: raise ValueError(f"Target '{target}' already registered by another driver") - def _notify_if_ready(self, target: str, callback: Callable[[str, str], None]): - """Notify driver immediately if target is already ready.""" + def _get_ready_callback(self, target: str, callback: Callable[[str, str], None]) -> tuple[str, str] | None: + """Return callback args if target is already ready.""" if target in self._ready_targets: pts_path = self._pts_map.get(target) if pts_path: - try: - callback(target, pts_path) - except Exception as e: - logger.error("Error in driver callback: %s", e) + return target, pts_path + return None def register_driver( self, @@ -144,6 +142,7 @@ def register_driver( Raises: ValueError: If configuration doesn't match existing process """ + notify_args: tuple[str, str] | None = None with self._lock: # Validate configuration matches existing process if self._drivers: @@ -162,12 +161,19 @@ def register_driver( logger.info("Registered driver %s for target '%s'", driver_id, target) # If target is already ready, notify immediately - self._notify_if_ready(target, callback) + notify_args = self._get_ready_callback(target, callback) # Start monitor thread if this is the first driver if len(self._drivers) == 1: self._start_monitor() + if notify_args: + # Invoke callbacks outside the lock to avoid deadlocks/reentrancy. + try: + callback(*notify_args) + except Exception as e: + logger.error("Error in driver callback: %s", e) + def unregister_driver(self, driver_id: str) -> None: """Unregister a driver instance. @@ -355,20 +361,24 @@ def _read_demuxer_output(self): if pts_path and target: logger.info("Found pts path for target '%s': %s", target, pts_path) - # Update state immediately for this target + callback_to_invoke = None with self._lock: self._pts_map[target] = pts_path self._ready_targets.add(target) - # Notify driver for this specific target - for driver_id, driver_info in self._drivers.items(): + # Find driver callback for this specific target + for driver_info in self._drivers.values(): if driver_info.target == target: - try: - driver_info.callback(target, pts_path) - except Exception as e: - logger.error("Error in driver %s callback: %s", driver_id, e) + callback_to_invoke = driver_info.callback break # Only one driver per target + if callback_to_invoke: + # Invoke callbacks outside the lock to avoid deadlocks/reentrancy. + try: + callback_to_invoke(target, pts_path) + except Exception as e: + logger.error("Error in driver callback: %s", e) + except Exception as e: logger.error("Error reading demuxer output: %s", e) From e7c405670fb76ef4f78a0ebdec666158e36e6f91 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Date: Thu, 15 Jan 2026 09:45:31 -0500 Subject: [PATCH 5/9] nvdemux: Do not stop the monitor and retry after reboots --- .../jumpstarter_driver_pyserial/nvdemux/driver.py | 12 ++++++++---- .../jumpstarter_driver_pyserial/nvdemux/manager.py | 8 +++----- .../nvdemux/manager_test.py | 4 ++-- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py index f8570e1e0..db2da288b 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py @@ -120,12 +120,16 @@ async def connect(self): # Use a short sleep to allow checking ready state await sleep(0.1) - # Get the current pts path from manager + # Get the current pts path from manager (retry until timeout) manager = DemuxerManager.get_instance() + pts_start = time.monotonic() pts_path = manager.get_pts_path(str(self.uuid)) - - if not pts_path: - raise RuntimeError("Demuxer ready but no pts path available") + while not pts_path: + elapsed = time.monotonic() - pts_start + if elapsed >= self.timeout: + raise TimeoutError("Demuxer ready but no pts path available after retrying") + await sleep(self.poll_interval) + pts_path = manager.get_pts_path(str(self.uuid)) cps_info = f", cps: {self.cps}" if self.cps is not None else "" self.logger.info("Connecting to %s, baudrate: %d%s", pts_path, self.baudrate, cps_info) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py index c1ddbb88a..890d95082 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py @@ -163,8 +163,8 @@ def register_driver( # If target is already ready, notify immediately notify_args = self._get_ready_callback(target, callback) - # Start monitor thread if this is the first driver - if len(self._drivers) == 1: + # Start monitor thread only once + if not self._monitor_thread or not self._monitor_thread.is_alive(): self._start_monitor() if notify_args: @@ -186,9 +186,7 @@ def unregister_driver(self, driver_id: str) -> None: del self._drivers[driver_id] logger.info("Unregistered driver %s (target: %s)", driver_id, target) - # Stop monitor thread if this was the last driver - if not self._drivers: - self._stop_monitor() + # Keep monitor running even if no drivers remain def get_pts_path(self, driver_id: str) -> str | None: """Get the pts path for a registered driver. diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py index 941aae71a..d40b58bc6 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py @@ -426,10 +426,10 @@ def test_reference_counting(): time.sleep(0.2) assert not mock_proc._terminated - # Unregister second driver - process should stop + # Unregister second driver - process should still continue (monitor stays running) manager.unregister_driver("driver2") time.sleep(0.2) - assert mock_proc._terminated + assert not mock_proc._terminated # Cleanup DemuxerManager.reset_instance() From b544d27bfe84e6ea8398485119cc2be453c4b5e8 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Date: Fri, 16 Jan 2026 07:25:16 -0500 Subject: [PATCH 6/9] nvdemux: improvements * Do not wait for ready during driver init * Be more defensive on demuxer process cleanup * Move many log messages to debug * Wait for poll_interval when restarting demuxer always to avoid busy loops --- .../nvdemux/driver.py | 3 - .../nvdemux/manager.py | 238 +++++++++++++++--- .../nvdemux/manager_test.py | 27 +- 3 files changed, 228 insertions(+), 40 deletions(-) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py index db2da288b..e1b98f9b9 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py @@ -74,9 +74,6 @@ def __post_init__(self): self.logger.error("Failed to register with DemuxerManager: %s", e) raise - # Wait for initial ready state (with timeout) - if not self._ready.wait(timeout=self.timeout): - self.logger.warning("Timeout waiting for demuxer to become ready during initialization") @classmethod def client(cls) -> str: diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py index 890d95082..4f51c3c12 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py @@ -5,16 +5,50 @@ and distributes pts paths to registered drivers. """ +import atexit +import ctypes import glob import logging import os +import signal import subprocess +import sys import threading from dataclasses import dataclass from typing import Callable, Optional logger = logging.getLogger(__name__) +# Platform detection +_IS_LINUX = sys.platform.startswith("linux") + + +def _get_preexec_fn() -> Callable[[], None] | None: + """Get platform-specific preexec_fn for subprocess. + + On Linux, returns a function that sets PR_SET_PDEATHSIG to SIGTERM, + ensuring the subprocess receives SIGTERM when the parent process dies. + This works even if the parent is killed with SIGKILL. + + On other platforms, returns None. + """ + if not _IS_LINUX: + return None + + def set_pdeathsig(): + """Set parent death signal to SIGTERM via prctl.""" + try: + libc = ctypes.CDLL("libc.so.6", use_errno=True) + PR_SET_PDEATHSIG = 1 + result = libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM, 0, 0, 0) + if result != 0: + errno = ctypes.get_errno() + logger.warning("prctl(PR_SET_PDEATHSIG) failed with errno %d", errno) + except Exception as e: + logger.warning("Failed to set parent death signal: %s", e) + + return set_pdeathsig + def _has_glob_chars(path: str) -> bool: """Check if path contains glob wildcard characters.""" @@ -58,6 +92,9 @@ class DemuxerManager: _instance: Optional["DemuxerManager"] = None _instance_lock = threading.Lock() + _signal_handlers_installed = False + _original_sigterm_handler: signal.Handlers | None = None + _original_sigint_handler: signal.Handlers | None = None def __init__(self): """Private constructor. Use get_instance() instead.""" @@ -68,6 +105,7 @@ def __init__(self): self._process: Optional[subprocess.Popen] = None self._monitor_thread: Optional[threading.Thread] = None self._shutdown = threading.Event() + self._cleanup_done = False # Process configuration (must be same for all drivers) self._demuxer_path: Optional[str] = None @@ -75,6 +113,13 @@ def __init__(self): self._chip: Optional[str] = None self._poll_interval: float = 1.0 + # Register atexit handler for cleanup on normal exit + atexit.register(self._atexit_cleanup) + logger.debug("Registered atexit handler for demuxer cleanup") + + # Install signal handlers (only once globally) + self._install_signal_handlers() + @classmethod def get_instance(cls) -> "DemuxerManager": """Get the singleton instance of DemuxerManager.""" @@ -89,9 +134,61 @@ def reset_instance(cls): """Reset the singleton instance. Used for testing.""" with cls._instance_lock: if cls._instance is not None: + # Reset cleanup_done flag before cleanup to allow cleanup to run + cls._instance._cleanup_done = False cls._instance._cleanup() cls._instance = None + def _atexit_cleanup(self): + """Cleanup handler called on normal program exit via atexit.""" + if self._cleanup_done: + return + logger.debug("atexit cleanup triggered") + self._cleanup() + + def _install_signal_handlers(self): + """Install signal handlers for SIGTERM and SIGINT. + + Handlers ensure cleanup is performed before the process terminates. + Only installs handlers once globally, and preserves original handlers. + """ + cls = type(self) + if cls._signal_handlers_installed: + return + + def make_handler(sig: signal.Signals) -> Callable[[int, any], None]: + """Create a signal handler that cleans up and re-raises the signal.""" + + def handler(signum: int, frame): + logger.debug("Signal %s received, cleaning up demuxer process", sig.name) + # Cleanup the demuxer process + if cls._instance is not None: + cls._instance._cleanup() + + # Restore original handler and re-raise signal + if sig == signal.SIGTERM and cls._original_sigterm_handler is not None: + signal.signal(signal.SIGTERM, cls._original_sigterm_handler) + elif sig == signal.SIGINT and cls._original_sigint_handler is not None: + signal.signal(signal.SIGINT, cls._original_sigint_handler) + + # Re-raise the signal to allow normal termination + os.kill(os.getpid(), signum) + + return handler + + try: + # Only install signal handlers from the main thread + if threading.current_thread() is not threading.main_thread(): + logger.debug("Not installing signal handlers from non-main thread") + return + + cls._original_sigterm_handler = signal.signal(signal.SIGTERM, make_handler(signal.SIGTERM)) + cls._original_sigint_handler = signal.signal(signal.SIGINT, make_handler(signal.SIGINT)) + cls._signal_handlers_installed = True + logger.debug("Installed signal handlers for SIGTERM and SIGINT") + except Exception as e: + logger.warning("Failed to install signal handlers: %s", e) + def _validate_config(self, demuxer_path: str, device: str, chip: str, target: str): """Validate driver configuration against existing drivers. @@ -158,7 +255,7 @@ def register_driver( driver_info = DriverInfo(driver_id=driver_id, target=target, callback=callback) self._drivers[driver_id] = driver_info - logger.info("Registered driver %s for target '%s'", driver_id, target) + logger.debug("Registered driver %s for target '%s'", driver_id, target) # If target is already ready, notify immediately notify_args = self._get_ready_callback(target, callback) @@ -184,7 +281,7 @@ def unregister_driver(self, driver_id: str) -> None: if driver_id in self._drivers: target = self._drivers[driver_id].target del self._drivers[driver_id] - logger.info("Unregistered driver %s (target: %s)", driver_id, target) + logger.debug("Unregistered driver %s (target: %s)", driver_id, target) # Keep monitor running even if no drivers remain @@ -222,36 +319,72 @@ def _start_monitor(self): target=self._monitor_loop, daemon=True, name="DemuxerManager-monitor" ) self._monitor_thread.start() - logger.info("Started demuxer monitor thread") + logger.debug("Started demuxer monitor thread") def _stop_monitor(self): - """Stop the monitor thread.""" + """Stop the monitor thread. + + This method is idempotent - safe to call multiple times. + """ self._shutdown.set() # Terminate process if running - if self._process: - logger.info("Terminating demuxer process...") + process = self._process + if process is not None: + logger.debug("Terminating demuxer process (PID %s)...", process.pid) try: - self._process.terminate() - self._process.wait(timeout=5.0) - except subprocess.TimeoutExpired: - self._process.kill() - self._process.wait() - self._process = None + # First try graceful termination + process.terminate() + try: + process.wait(timeout=5.0) + logger.debug("Demuxer process terminated gracefully") + except subprocess.TimeoutExpired: + # Force kill if it doesn't respond + logger.warning("Demuxer process did not terminate, killing...") + process.kill() + process.wait(timeout=2.0) + logger.debug("Demuxer process killed") + except ProcessLookupError: + # Process already dead + logger.debug("Demuxer process already exited") + except Exception as e: + logger.error("Error terminating demuxer process: %s", e) + finally: + self._process = None # Wait for monitor thread to exit - if self._monitor_thread and self._monitor_thread.is_alive(): - self._monitor_thread.join(timeout=2.0) + monitor_thread = self._monitor_thread + if monitor_thread is not None and monitor_thread.is_alive(): + # Don't join if we're being called from the monitor thread itself + if threading.current_thread() is not monitor_thread: + monitor_thread.join(timeout=2.0) + if monitor_thread.is_alive(): + logger.warning("Monitor thread did not exit within timeout") self._monitor_thread = None - logger.info("Stopped demuxer monitor thread") + logger.debug("Stopped demuxer monitor") def _cleanup(self): - """Clean up resources.""" + """Clean up resources. + + This method is idempotent - safe to call multiple times. + Ensures the demuxer process is terminated on program exit. + """ + if self._cleanup_done: + logger.debug("Cleanup already done, skipping") + return + + logger.debug("Cleaning up DemuxerManager resources") + self._cleanup_done = True + self._stop_monitor() - self._drivers.clear() - self._pts_map.clear() - self._ready_targets.clear() + + with self._lock: + self._drivers.clear() + self._pts_map.clear() + self._ready_targets.clear() + + logger.info("DemuxerManager cleanup complete") def _monitor_loop(self): """Background thread that manages demuxer lifecycle and auto-recovery.""" @@ -264,9 +397,9 @@ def _monitor_loop(self): with self._lock: self._pts_map.clear() self._ready_targets.clear() - # Wait before retrying - if self._shutdown.wait(timeout=self._poll_interval): - break + # Always wait for poll interval before retrying + if self._shutdown.wait(timeout=self._poll_interval): + break def _run_demuxer_cycle(self): """Run one cycle of: find device -> start demuxer -> monitor until exit.""" @@ -280,9 +413,15 @@ def _run_demuxer_cycle(self): self._shutdown.wait(timeout=self._poll_interval) return - # Read and parse demuxer output + # Read and parse demuxer output (stdout and stderr concurrently) + stderr_thread = threading.Thread(target=self._read_demuxer_stderr, daemon=True) + stderr_thread.start() + self._read_demuxer_output() + # Wait for stderr thread to finish + stderr_thread.join(timeout=1.0) + # Cleanup process self._cleanup_demuxer_process() @@ -294,7 +433,7 @@ def _wait_for_device(self) -> str | None: while not self._shutdown.is_set(): resolved_device = _resolve_device(self._device) if resolved_device: - logger.info("Found device: %s", resolved_device) + logger.debug("Found device: %s", resolved_device) return resolved_device logger.debug("Device not found, polling... (pattern: %s)", self._device) if self._shutdown.wait(timeout=self._poll_interval): @@ -302,9 +441,16 @@ def _wait_for_device(self) -> str | None: return None def _start_demuxer_process(self, device: str) -> bool: - """Start the demuxer process. Returns True on success.""" + """Start the demuxer process. Returns True on success. + + On Linux, uses prctl(PR_SET_PDEATHSIG) to ensure the subprocess + receives SIGTERM when the parent dies (including kill -9). + """ cmd = [self._demuxer_path, "-m", self._chip, "-d", device] - logger.info("Starting demuxer: %s", " ".join(cmd)) + logger.debug("Starting demuxer: %s", " ".join(cmd)) + + # Get platform-specific preexec_fn (Linux: set parent death signal) + preexec_fn = _get_preexec_fn() try: self._process = subprocess.Popen( @@ -313,7 +459,9 @@ def _start_demuxer_process(self, device: str) -> bool: stderr=subprocess.PIPE, text=True, bufsize=1, # Line buffered + preexec_fn=preexec_fn, ) + logger.debug("Demuxer process started with PID %d", self._process.pid) return True except (FileNotFoundError, PermissionError) as e: logger.error("Failed to start demuxer: %s", e) @@ -329,17 +477,37 @@ def _parse_demuxer_line(self, line: str) -> tuple[str | None, str | None]: if len(parts) < 2: return None, None - pts_path = None - target = None - - for part in parts: - if part.startswith("/dev/"): - pts_path = part - elif ":" in part: # Targets have format "NAME: N" - target = part + # First part is the pts path, second part is the target name + pts_path = parts[0].strip() if parts[0].startswith("/dev/") else None + target = parts[1].strip() if len(parts) >= 2 else None return pts_path, target + def _read_demuxer_stderr(self): + """Read demuxer stderr and check for catastrophic errors.""" + try: + for line in iter(self._process.stderr.readline, ""): + if self._shutdown.is_set(): + break + + line = line.strip() + if not line: + continue + + logger.warning("Demuxer stderr: %s", line) + + # Check for catastrophic file lock error + if "ERROR: unable to obtain file lock" in line: + logger.critical( + "Demuxer file lock error detected. Another instance may be running. " + "Terminating exporter to prevent conflicts." + ) + # Force immediate process termination + os._exit(1) + + except Exception as e: + logger.error("Error reading demuxer stderr: %s", e) + def _read_demuxer_output(self): """Read demuxer stdout and parse all pts paths.""" try: @@ -357,7 +525,7 @@ def _read_demuxer_output(self): pts_path, target = self._parse_demuxer_line(line) if pts_path and target: - logger.info("Found pts path for target '%s': %s", target, pts_path) + logger.debug("Found pts path for target '%s': %s", target, pts_path) callback_to_invoke = None with self._lock: diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py index d40b58bc6..b0107d988 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py @@ -51,18 +51,41 @@ def test_resolve_device_glob_no_match(): assert result is None +class MockStderr: + """Mock stderr file-like object for testing.""" + + def __init__(self, stderr_lines=None, terminated_callback=None): + self.stderr_lines = stderr_lines or [] + self._line_index = 0 + self._terminated_callback = terminated_callback + + def readline(self): + if self._terminated_callback and self._terminated_callback(): + return "" + if self._line_index >= len(self.stderr_lines): + return "" + line = self.stderr_lines[self._line_index] + self._line_index += 1 + return line + "\n" + + class MockPopen: """Mock subprocess.Popen for testing.""" - def __init__(self, stdout_lines, returncode=0, delay_per_line=0.01, block_after_lines=False): + _next_pid = 1000 + + def __init__(self, stdout_lines, returncode=0, delay_per_line=0.01, block_after_lines=False, stderr_lines=None): self.stdout_lines = stdout_lines self.returncode = returncode self.delay_per_line = delay_per_line self.block_after_lines = block_after_lines self._line_index = 0 self.stdout = self - self.stderr = MagicMock() + self.stderr = MockStderr(stderr_lines=stderr_lines, terminated_callback=lambda: self._terminated) self._terminated = False + # Assign a mock PID + self.pid = MockPopen._next_pid + MockPopen._next_pid += 1 def readline(self): if self._terminated: From 45fa1a1763d65f250eb795396e81439a01187d3f Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Date: Fri, 16 Jan 2026 09:21:48 -0500 Subject: [PATCH 7/9] nvdemux: user anyio.Event --- .../nvdemux/driver.py | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py index e1b98f9b9..ae1e99971 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py @@ -1,10 +1,9 @@ -import threading import time from contextlib import asynccontextmanager from dataclasses import dataclass, field from typing import Optional -from anyio import sleep +from anyio import Event, fail_after, sleep from anyio._backends._asyncio import StreamReaderWrapper, StreamWriterWrapper from serial_asyncio import open_serial_connection @@ -50,7 +49,7 @@ class NVDemuxSerial(Driver): poll_interval: float = field(default=1.0) # Internal state (not init params) - _ready: threading.Event = field(init=False, default_factory=threading.Event) + _ready: Event = field(init=False, default_factory=Event) _registered: bool = field(init=False, default=False) def __post_init__(self): @@ -106,16 +105,14 @@ async def connect(self): Waits for the demuxer to be ready (device connected and pts path discovered) before opening the serial connection. """ - # Wait for ready state - start_time = time.monotonic() - while not self._ready.is_set(): - elapsed = time.monotonic() - start_time - if elapsed >= self.timeout: - raise TimeoutError( - f"Timeout waiting for demuxer to become ready (device pattern: {self.device})" - ) - # Use a short sleep to allow checking ready state - await sleep(0.1) + # Wait for ready state with timeout + try: + with fail_after(self.timeout): + await self._ready.wait() + except TimeoutError: + raise TimeoutError( + f"Timeout waiting for demuxer to become ready (device pattern: {self.device})" + ) from None # Get the current pts path from manager (retry until timeout) manager = DemuxerManager.get_instance() From cef414c7c71c0f368e2dc50f0b87d0932fb02eb8 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Date: Fri, 16 Jan 2026 09:41:34 -0500 Subject: [PATCH 8/9] nvdemux: remove manager callback altogether --- .../nvdemux/driver.py | 29 ++-------- .../nvdemux/driver_test.py | 57 ------------------- .../nvdemux/manager.py | 38 +------------ .../nvdemux/manager_test.py | 48 +++------------- 4 files changed, 15 insertions(+), 157 deletions(-) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py index ae1e99971..124a365d4 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py @@ -3,7 +3,7 @@ from dataclasses import dataclass, field from typing import Optional -from anyio import Event, fail_after, sleep +from anyio import sleep from anyio._backends._asyncio import StreamReaderWrapper, StreamWriterWrapper from serial_asyncio import open_serial_connection @@ -49,7 +49,6 @@ class NVDemuxSerial(Driver): poll_interval: float = field(default=1.0) # Internal state (not init params) - _ready: Event = field(init=False, default_factory=Event) _registered: bool = field(init=False, default=False) def __post_init__(self): @@ -65,7 +64,6 @@ def __post_init__(self): device=self.device, chip=self.chip, target=self.target, - callback=self._on_target_ready, poll_interval=self.poll_interval, ) self._registered = True @@ -78,16 +76,6 @@ def __post_init__(self): def client(cls) -> str: return "jumpstarter_driver_pyserial.client.PySerialClient" - def _on_target_ready(self, target: str, pts_path: str): - """Callback invoked by DemuxerManager when target becomes ready. - - Args: - target: The target channel that became ready - pts_path: The pts path for this target - """ - self.logger.info("Target '%s' ready with pts path: %s", target, pts_path) - self._ready.set() - def close(self): """Unregister from the DemuxerManager.""" if self._registered: @@ -105,23 +93,16 @@ async def connect(self): Waits for the demuxer to be ready (device connected and pts path discovered) before opening the serial connection. """ - # Wait for ready state with timeout - try: - with fail_after(self.timeout): - await self._ready.wait() - except TimeoutError: - raise TimeoutError( - f"Timeout waiting for demuxer to become ready (device pattern: {self.device})" - ) from None - - # Get the current pts path from manager (retry until timeout) + # Poll for pts path until available or timeout manager = DemuxerManager.get_instance() pts_start = time.monotonic() pts_path = manager.get_pts_path(str(self.uuid)) while not pts_path: elapsed = time.monotonic() - pts_start if elapsed >= self.timeout: - raise TimeoutError("Demuxer ready but no pts path available after retrying") + raise TimeoutError( + f"Timeout waiting for demuxer to become ready (device pattern: {self.device})" + ) await sleep(self.poll_interval) pts_path = manager.get_pts_path(str(self.uuid)) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py index 1ad5d68c7..21ddb0799 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py @@ -1,7 +1,6 @@ """Tests for NVDemuxSerial driver.""" import tempfile -import time from unittest.mock import MagicMock, patch from .driver import NVDemuxSerial @@ -35,36 +34,6 @@ def test_nvdemux_registration(): driver.close() -def test_nvdemux_callback_sets_ready(): - """Test that callback from manager sets the ready event.""" - with tempfile.NamedTemporaryFile() as device_file: - with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: - mock_manager = MagicMock() - mock_manager_class.get_instance.return_value = mock_manager - - driver = NVDemuxSerial( - demuxer_path="/usr/bin/demuxer", - device=device_file.name, - target="CCPLEX: 0", - timeout=0.1, - ) - - try: - # Get the callback that was registered - callback = mock_manager.register_driver.call_args[1]["callback"] - - # Initially not ready - assert not driver._ready.is_set() - - # Call the callback - callback("CCPLEX: 0", "/dev/pts/5") - - # Should now be ready - assert driver._ready.is_set() - finally: - driver.close() - - def test_nvdemux_gets_pts_from_manager(): """Test that connect() gets pts path from manager.""" with tempfile.NamedTemporaryFile() as device_file: @@ -81,10 +50,6 @@ def test_nvdemux_gets_pts_from_manager(): ) try: - # Trigger callback to set ready - callback = mock_manager.register_driver.call_args[1]["callback"] - callback("CCPLEX: 0", "/dev/pts/5") - # Should call get_pts_path when checking pts availability # (We can't test connect() easily without mocking serial, but we can test the logic) pts_path = mock_manager.get_pts_path(str(driver.uuid)) @@ -114,28 +79,6 @@ def test_nvdemux_unregisters_on_close(): mock_manager.unregister_driver.assert_called_once_with(driver_id) -def test_nvdemux_timeout_no_callback(): - """Test timeout when callback is never invoked.""" - with tempfile.NamedTemporaryFile() as device_file: - with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: - mock_manager = MagicMock() - mock_manager_class.get_instance.return_value = mock_manager - - driver = NVDemuxSerial( - demuxer_path="/usr/bin/demuxer", - device=device_file.name, - target="CCPLEX: 0", - timeout=0.1, - ) - - try: - # Callback is never invoked, so ready should not be set - time.sleep(0.2) - assert not driver._ready.is_set() - finally: - driver.close() - - def test_nvdemux_default_values(): """Test default parameter values.""" with tempfile.NamedTemporaryFile() as device_file: diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py index 4f51c3c12..0cb313962 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py @@ -80,7 +80,6 @@ class DriverInfo: driver_id: str target: str - callback: Callable[[str, str], None] # (target, pts_path) -> None class DemuxerManager: @@ -207,14 +206,6 @@ def _validate_config(self, demuxer_path: str, device: str, chip: str, target: st if existing_driver.target == target: raise ValueError(f"Target '{target}' already registered by another driver") - def _get_ready_callback(self, target: str, callback: Callable[[str, str], None]) -> tuple[str, str] | None: - """Return callback args if target is already ready.""" - if target in self._ready_targets: - pts_path = self._pts_map.get(target) - if pts_path: - return target, pts_path - return None - def register_driver( self, driver_id: str, @@ -222,7 +213,6 @@ def register_driver( device: str, chip: str, target: str, - callback: Callable[[str, str], None], poll_interval: float = 1.0, ) -> None: """Register a driver instance with the manager. @@ -233,13 +223,11 @@ def register_driver( device: Device path or glob pattern chip: Chip type (T234 or T264) target: Target channel (e.g., "CCPLEX: 0") - callback: Function to call when target becomes ready poll_interval: Polling interval for device reconnection Raises: ValueError: If configuration doesn't match existing process """ - notify_args: tuple[str, str] | None = None with self._lock: # Validate configuration matches existing process if self._drivers: @@ -252,25 +240,15 @@ def register_driver( self._poll_interval = poll_interval # Register the driver - driver_info = DriverInfo(driver_id=driver_id, target=target, callback=callback) + driver_info = DriverInfo(driver_id=driver_id, target=target) self._drivers[driver_id] = driver_info logger.debug("Registered driver %s for target '%s'", driver_id, target) - # If target is already ready, notify immediately - notify_args = self._get_ready_callback(target, callback) - # Start monitor thread only once if not self._monitor_thread or not self._monitor_thread.is_alive(): self._start_monitor() - if notify_args: - # Invoke callbacks outside the lock to avoid deadlocks/reentrancy. - try: - callback(*notify_args) - except Exception as e: - logger.error("Error in driver callback: %s", e) - def unregister_driver(self, driver_id: str) -> None: """Unregister a driver instance. @@ -527,24 +505,10 @@ def _read_demuxer_output(self): if pts_path and target: logger.debug("Found pts path for target '%s': %s", target, pts_path) - callback_to_invoke = None with self._lock: self._pts_map[target] = pts_path self._ready_targets.add(target) - # Find driver callback for this specific target - for driver_info in self._drivers.values(): - if driver_info.target == target: - callback_to_invoke = driver_info.callback - break # Only one driver per target - - if callback_to_invoke: - # Invoke callbacks outside the lock to avoid deadlocks/reentrancy. - try: - callback_to_invoke(target, pts_path) - except Exception as e: - logger.error("Error in driver callback: %s", e) - except Exception as e: logger.error("Error reading demuxer output: %s", e) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py index b0107d988..cc3a9e727 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py @@ -3,7 +3,7 @@ import os import tempfile import time -from unittest.mock import MagicMock, patch +from unittest.mock import patch from .manager import DemuxerManager, _has_glob_chars, _resolve_device @@ -139,7 +139,6 @@ def test_single_driver_registration(): mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) manager = DemuxerManager.get_instance() - callback = MagicMock() manager.register_driver( driver_id="driver1", @@ -147,15 +146,11 @@ def test_single_driver_registration(): device=device_file.name, chip="T264", target="CCPLEX: 0", - callback=callback, ) - # Wait for callback + # Wait for demuxer to process output time.sleep(0.5) - # Verify callback was called - callback.assert_called_once_with("CCPLEX: 0", "/dev/pts/5") - # Verify pts path is available pts_path = manager.get_pts_path("driver1") assert pts_path == "/dev/pts/5" @@ -180,9 +175,6 @@ def test_multiple_drivers_single_process(): mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) manager = DemuxerManager.get_instance() - callback1 = MagicMock() - callback2 = MagicMock() - callback3 = MagicMock() # Register three drivers manager.register_driver( @@ -191,7 +183,6 @@ def test_multiple_drivers_single_process(): device=device_file.name, chip="T264", target="CCPLEX: 0", - callback=callback1, ) manager.register_driver( @@ -200,7 +191,6 @@ def test_multiple_drivers_single_process(): device=device_file.name, chip="T264", target="BPMP: 1", - callback=callback2, ) manager.register_driver( @@ -209,17 +199,11 @@ def test_multiple_drivers_single_process(): device=device_file.name, chip="T264", target="SCE: 2", - callback=callback3, ) - # Wait for callbacks + # Wait for demuxer to process output time.sleep(0.5) - # Verify all callbacks were called - callback1.assert_called_once_with("CCPLEX: 0", "/dev/pts/5") - callback2.assert_called_once_with("BPMP: 1", "/dev/pts/6") - callback3.assert_called_once_with("SCE: 2", "/dev/pts/7") - # Verify process was only started once assert mock_popen.call_count == 1 @@ -254,7 +238,6 @@ def test_config_validation_demuxer_path_mismatch(): device=device_file.name, chip="T264", target="CCPLEX: 0", - callback=MagicMock(), ) # Try to register second driver with different demuxer_path @@ -265,7 +248,6 @@ def test_config_validation_demuxer_path_mismatch(): device=device_file.name, chip="T264", target="BPMP: 1", - callback=MagicMock(), ) raise AssertionError("Should have raised ValueError") except ValueError as e: @@ -295,7 +277,6 @@ def test_config_validation_device_mismatch(): device=device_file1.name, chip="T264", target="CCPLEX: 0", - callback=MagicMock(), ) # Try to register second driver with different device @@ -306,7 +287,6 @@ def test_config_validation_device_mismatch(): device=device_file2.name, # Different device chip="T264", target="BPMP: 1", - callback=MagicMock(), ) raise AssertionError("Should have raised ValueError") except ValueError as e: @@ -336,7 +316,6 @@ def test_config_validation_chip_mismatch(): device=device_file.name, chip="T264", target="CCPLEX: 0", - callback=MagicMock(), ) # Try to register second driver with different chip @@ -347,7 +326,6 @@ def test_config_validation_chip_mismatch(): device=device_file.name, chip="T234", # Different chip target="BPMP: 1", - callback=MagicMock(), ) raise AssertionError("Should have raised ValueError") except ValueError as e: @@ -377,7 +355,6 @@ def test_duplicate_target_rejected(): device=device_file.name, chip="T264", target="CCPLEX: 0", - callback=MagicMock(), ) # Try to register second driver with same target @@ -388,7 +365,6 @@ def test_duplicate_target_rejected(): device=device_file.name, chip="T264", target="CCPLEX: 0", # Same target - callback=MagicMock(), ) raise AssertionError("Should have raised ValueError") except ValueError as e: @@ -425,7 +401,6 @@ def test_reference_counting(): device=device_file.name, chip="T264", target="CCPLEX: 0", - callback=MagicMock(), ) time.sleep(0.2) @@ -438,7 +413,6 @@ def test_reference_counting(): device=device_file.name, chip="T264", target="BPMP: 1", - callback=MagicMock(), ) time.sleep(0.2) @@ -458,8 +432,8 @@ def test_reference_counting(): DemuxerManager.reset_instance() -def test_immediate_notification_for_ready_target(): - """Test that drivers are notified immediately if target is already ready.""" +def test_pts_path_available_for_ready_target(): + """Test that pts path is available for already-ready targets.""" DemuxerManager.reset_instance() with tempfile.NamedTemporaryFile() as device_file: @@ -474,32 +448,28 @@ def test_immediate_notification_for_ready_target(): manager = DemuxerManager.get_instance() # Register first driver - callback1 = MagicMock() manager.register_driver( driver_id="driver1", demuxer_path="/usr/bin/demuxer", device=device_file.name, chip="T264", target="CCPLEX: 0", - callback=callback1, ) time.sleep(0.5) - callback1.assert_called_once() + assert manager.get_pts_path("driver1") == "/dev/pts/5" - # Register second driver for already-ready target - should be notified immediately - callback2 = MagicMock() + # Register second driver for already-ready target manager.register_driver( driver_id="driver2", demuxer_path="/usr/bin/demuxer", device=device_file.name, chip="T264", target="BPMP: 1", - callback=callback2, ) - # Should be called immediately without waiting - callback2.assert_called_once_with("BPMP: 1", "/dev/pts/6") + # pts path should be immediately available + assert manager.get_pts_path("driver2") == "/dev/pts/6" # Cleanup manager.unregister_driver("driver1") From d95d5c876f01c85a868ebf4abe77fc38acf7b2ce Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Date: Fri, 16 Jan 2026 09:46:35 -0500 Subject: [PATCH 9/9] nvdemux: poll quicker for the device --- .../jumpstarter_driver_pyserial/nvdemux/driver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py index 124a365d4..4746a7b9a 100644 --- a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py @@ -103,11 +103,11 @@ async def connect(self): raise TimeoutError( f"Timeout waiting for demuxer to become ready (device pattern: {self.device})" ) - await sleep(self.poll_interval) + await sleep(0.1) pts_path = manager.get_pts_path(str(self.uuid)) cps_info = f", cps: {self.cps}" if self.cps is not None else "" - self.logger.info("Connecting to %s, baudrate: %d%s", pts_path, self.baudrate, cps_info) + self.logger.info("Connecting to %s at %s, baudrate: %d%s", self.target, pts_path, self.baudrate, cps_info) reader, writer = await open_serial_connection(url=pts_path, baudrate=self.baudrate, limit=1) writer.transport.set_write_buffer_limits(high=4096, low=0)