diff --git a/packages/jumpstarter-driver-pyserial/README.md b/packages/jumpstarter-driver-pyserial/README.md index eeaf4e1a9..cb9939087 100644 --- a/packages/jumpstarter-driver-pyserial/README.md +++ b/packages/jumpstarter-driver-pyserial/README.md @@ -33,6 +33,108 @@ export: | check_present | Check if the serial port exists during exporter initialization, disable if you are connecting to a dynamically created port (i.e. USB from your DUT) | bool | no | True | | cps | Characters per second throttling limit. When set, data transmission will be throttled to simulate slow typing. Useful for devices that can't handle fast input | float | no | None | +## NVDemuxSerial Driver + +The `NVDemuxSerial` driver provides serial access to NVIDIA Tegra demultiplexed UART channels using the [nv_tcu_demuxer](https://docs.nvidia.com/jetson/archives/r38.2.1/DeveloperGuide/AT/JetsonLinuxDevelopmentTools/TegraCombinedUART.html) tool. It automatically handles device reconnection when the target device restarts. + +The nv_tcu_demuxer tool can be obtained from the NVIDIA Jetson BSP, at this path: `Linux_for_Tegra/tools/demuxer/nv_tcu_demuxer`. + +### Multi-Instance Support + +Multiple driver instances can share a single demuxer process by specifying different target channels. This allows simultaneous access to multiple UART channels (CCPLEX, BPMP, SCE, etc.) from the same physical device. + +### Configuration + +#### Single channel example: + +```yaml +export: + ccplex: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + # device defaults to auto-detect NVIDIA Tegra On-Platform Operator + # chip defaults to T264 (Thor), use T234 for Orin +``` + +#### Multiple channels example: + +```yaml +export: + ccplex: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + target: "CCPLEX: 0" + chip: "T264" + + bpmp: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + target: "BPMP: 1" + chip: "T264" + + sce: + type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial + config: + demuxer_path: "/opt/nvidia/nv_tcu_demuxer" + target: "SCE: 2" + chip: "T264" +``` + +### Config parameters + +| Parameter | Description | Type | Required | Default | +| -------------- | ----------------------------------------------------------------------------------------------- | ----- | -------- | ------------------------------------------------------------------------- | +| demuxer_path | Path to the `nv_tcu_demuxer` binary | str | yes | | +| device | Device path or glob pattern for auto-detection | str | no | `/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01` | +| target | Target channel to extract from demuxer output | str | no | `CCPLEX: 0` | +| chip | Chip type for demuxer (`T234` for Orin, `T264` for Thor) | str | no | `T264` | +| baudrate | Baud rate for the serial connection | int | no | 115200 | +| cps | Characters per second throttling limit | float | no | None | +| timeout | Timeout in seconds waiting for demuxer to detect pts | float | no | 10.0 | +| poll_interval | Interval in seconds to poll for device reappearance after disconnect | float | no | 1.0 | + +### Device Auto-Detection + +The `device` parameter supports glob patterns for automatic device discovery: + +```yaml +# Auto-detect any NVIDIA Tegra On-Platform Operator device (default) +device: "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01" + +# Specific serial number +device: "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_ABC123-if01" + +# Direct device path (no glob) +device: "/dev/ttyUSB0" +``` + +### Auto-Recovery + +When the target device restarts (e.g., power cycle), the serial device disappears and the demuxer exits. The driver automatically: + +1. Detects the device disconnection +2. Polls for the device to reappear +3. Restarts the demuxer with the new device +4. Discovers the new pts path (which changes on each restart) + +Active connections will receive errors when the device disconnects. Clients should reconnect, and the driver will wait for the device to be available again. + +### Configuration Validation / Limitations + +When using multiple driver instances, all instances must have compatible configurations: + +- **demuxer_path**: Must be identical across all instances +- **device**: Must be identical across all instances +- **chip**: Must be identical across all instances +- **target**: Must be unique for each instance (no duplicates allowed) + +If these requirements are not met, the driver will raise a `ValueError` during initialization. + + + ## CLI Commands The pyserial driver provides two CLI commands for interacting with serial ports: diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/__init__.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py new file mode 100644 index 000000000..4746a7b9a --- /dev/null +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py @@ -0,0 +1,120 @@ +import time +from contextlib import asynccontextmanager +from dataclasses import dataclass, field +from typing import Optional + +from anyio import sleep +from anyio._backends._asyncio import StreamReaderWrapper, StreamWriterWrapper +from serial_asyncio import open_serial_connection + +from ..driver import AsyncSerial +from .manager import DemuxerManager +from jumpstarter.driver import Driver, exportstream + +# Default glob pattern for NVIDIA Tegra On-Platform Operator devices +NV_DEVICE_PATTERN = "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01" + + +@dataclass(kw_only=True) +class NVDemuxSerial(Driver): + """Serial driver for NVIDIA TCU demultiplexed UART channels. + + This driver wraps the nv_tcu_demuxer tool to extract a specific demultiplexed + UART channel (like CCPLEX) from a multiplexed serial device. Multiple driver + instances can share the same demuxer process by specifying different targets. + + Args: + demuxer_path: Path to the nv_tcu_demuxer binary + device: Device path or glob pattern for auto-detection. + Default: /dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01 + target: Target channel to extract (e.g., "CCPLEX: 0", "BPMP: 1") + chip: Chip type for demuxer (T234 for Orin, T264 for Thor) + baudrate: Baud rate for the serial connection + cps: Characters per second throttling (optional) + timeout: Timeout waiting for demuxer to detect pts + poll_interval: Interval to poll for device reappearance after disconnect + + Note: + Multiple instances can be created with different targets. All instances + must use the same demuxer_path, device, and chip configuration. + """ + + demuxer_path: str + device: str = field(default=NV_DEVICE_PATTERN) + target: str = field(default="CCPLEX: 0") + chip: str = field(default="T264") + baudrate: int = field(default=115200) + cps: Optional[float] = field(default=None) + timeout: float = field(default=10.0) + poll_interval: float = field(default=1.0) + + # Internal state (not init params) + _registered: bool = field(init=False, default=False) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + + # Register with the DemuxerManager + manager = DemuxerManager.get_instance() + try: + manager.register_driver( + driver_id=str(self.uuid), + demuxer_path=self.demuxer_path, + device=self.device, + chip=self.chip, + target=self.target, + poll_interval=self.poll_interval, + ) + self._registered = True + except ValueError as e: + self.logger.error("Failed to register with DemuxerManager: %s", e) + raise + + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_pyserial.client.PySerialClient" + + def close(self): + """Unregister from the DemuxerManager.""" + if self._registered: + manager = DemuxerManager.get_instance() + manager.unregister_driver(str(self.uuid)) + self._registered = False + + super().close() + + @exportstream + @asynccontextmanager + async def connect(self): + """Connect to the demultiplexed serial port. + + Waits for the demuxer to be ready (device connected and pts path discovered) + before opening the serial connection. + """ + # Poll for pts path until available or timeout + manager = DemuxerManager.get_instance() + pts_start = time.monotonic() + pts_path = manager.get_pts_path(str(self.uuid)) + while not pts_path: + elapsed = time.monotonic() - pts_start + if elapsed >= self.timeout: + raise TimeoutError( + f"Timeout waiting for demuxer to become ready (device pattern: {self.device})" + ) + await sleep(0.1) + pts_path = manager.get_pts_path(str(self.uuid)) + + cps_info = f", cps: {self.cps}" if self.cps is not None else "" + self.logger.info("Connecting to %s at %s, baudrate: %d%s", self.target, pts_path, self.baudrate, cps_info) + + reader, writer = await open_serial_connection(url=pts_path, baudrate=self.baudrate, limit=1) + writer.transport.set_write_buffer_limits(high=4096, low=0) + async with AsyncSerial( + reader=StreamReaderWrapper(reader), + writer=StreamWriterWrapper(writer), + cps=self.cps, + ) as stream: + yield stream + self.logger.info("Disconnected from %s", pts_path) diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py new file mode 100644 index 000000000..21ddb0799 --- /dev/null +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py @@ -0,0 +1,127 @@ +"""Tests for NVDemuxSerial driver.""" + +import tempfile +from unittest.mock import MagicMock, patch + +from .driver import NVDemuxSerial + + +def test_nvdemux_registration(): + """Test that driver registers with DemuxerManager on init.""" + with tempfile.NamedTemporaryFile() as device_file: + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager + + driver = NVDemuxSerial( + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + target="CCPLEX: 0", + chip="T264", + timeout=0.1, + ) + + try: + # Verify driver registered with manager + mock_manager.register_driver.assert_called_once() + call_kwargs = mock_manager.register_driver.call_args[1] + assert call_kwargs["driver_id"] == str(driver.uuid) + assert call_kwargs["demuxer_path"] == "/usr/bin/demuxer" + assert call_kwargs["device"] == device_file.name + assert call_kwargs["chip"] == "T264" + assert call_kwargs["target"] == "CCPLEX: 0" + finally: + driver.close() + + +def test_nvdemux_gets_pts_from_manager(): + """Test that connect() gets pts path from manager.""" + with tempfile.NamedTemporaryFile() as device_file: + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager + mock_manager.get_pts_path.return_value = "/dev/pts/5" + + driver = NVDemuxSerial( + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + target="CCPLEX: 0", + timeout=0.1, + ) + + try: + # Should call get_pts_path when checking pts availability + # (We can't test connect() easily without mocking serial, but we can test the logic) + pts_path = mock_manager.get_pts_path(str(driver.uuid)) + assert pts_path == "/dev/pts/5" + finally: + driver.close() + + +def test_nvdemux_unregisters_on_close(): + """Test that driver unregisters from manager on close.""" + with tempfile.NamedTemporaryFile() as device_file: + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager + + driver = NVDemuxSerial( + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + target="CCPLEX: 0", + timeout=0.1, + ) + + driver_id = str(driver.uuid) + driver.close() + + # Verify driver unregistered + mock_manager.unregister_driver.assert_called_once_with(driver_id) + + +def test_nvdemux_default_values(): + """Test default parameter values.""" + with tempfile.NamedTemporaryFile() as device_file: + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager + + driver = NVDemuxSerial( + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + timeout=0.1, + ) + + try: + # Check defaults + assert driver.chip == "T264" + assert driver.target == "CCPLEX: 0" + assert driver.baudrate == 115200 + assert driver.poll_interval == 1.0 + finally: + driver.close() + + +def test_nvdemux_registration_error_propagates(): + """Test that registration errors are propagated.""" + with tempfile.NamedTemporaryFile() as device_file: + with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: + mock_manager = MagicMock() + mock_manager_class.get_instance.return_value = mock_manager + mock_manager.register_driver.side_effect = ValueError("Config mismatch") + + try: + _driver = NVDemuxSerial( + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + target="CCPLEX: 0", + timeout=0.1, + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "Config mismatch" in str(e) + + +def test_nvdemux_client_class(): + """Test that NVDemuxSerial uses PySerialClient.""" + assert NVDemuxSerial.client() == "jumpstarter_driver_pyserial.client.PySerialClient" diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py new file mode 100644 index 000000000..0cb313962 --- /dev/null +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager.py @@ -0,0 +1,535 @@ +"""Singleton manager for NVIDIA TCU demuxer process. + +Manages a single shared demuxer process that can be accessed by multiple +NVDemuxSerial driver instances. Handles process lifecycle, device reconnection, +and distributes pts paths to registered drivers. +""" + +import atexit +import ctypes +import glob +import logging +import os +import signal +import subprocess +import sys +import threading +from dataclasses import dataclass +from typing import Callable, Optional + +logger = logging.getLogger(__name__) + +# Platform detection +_IS_LINUX = sys.platform.startswith("linux") + + +def _get_preexec_fn() -> Callable[[], None] | None: + """Get platform-specific preexec_fn for subprocess. + + On Linux, returns a function that sets PR_SET_PDEATHSIG to SIGTERM, + ensuring the subprocess receives SIGTERM when the parent process dies. + This works even if the parent is killed with SIGKILL. + + On other platforms, returns None. + """ + if not _IS_LINUX: + return None + + def set_pdeathsig(): + """Set parent death signal to SIGTERM via prctl.""" + try: + libc = ctypes.CDLL("libc.so.6", use_errno=True) + PR_SET_PDEATHSIG = 1 + result = libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM, 0, 0, 0) + if result != 0: + errno = ctypes.get_errno() + logger.warning("prctl(PR_SET_PDEATHSIG) failed with errno %d", errno) + except Exception as e: + logger.warning("Failed to set parent death signal: %s", e) + + return set_pdeathsig + + +def _has_glob_chars(path: str) -> bool: + """Check if path contains glob wildcard characters.""" + return any(c in path for c in ("*", "?", "[")) + + +def _resolve_device(pattern: str) -> str | None: + """Resolve a device path or glob pattern to an actual device path. + + Returns None if no device found. + """ + if _has_glob_chars(pattern): + matches = sorted(glob.glob(pattern)) + if not matches: + return None + if len(matches) > 1: + logger.warning("Multiple devices match pattern '%s': %s. Using first: %s", pattern, matches, matches[0]) + return matches[0] + else: + # Direct path - check if exists + if os.path.exists(pattern): + return pattern + return None + + +@dataclass +class DriverInfo: + """Information about a registered driver.""" + + driver_id: str + target: str + + +class DemuxerManager: + """Singleton manager for the NVIDIA TCU demuxer process. + + Manages a single shared demuxer process and distributes pts paths to + multiple driver instances based on their target channels. + """ + + _instance: Optional["DemuxerManager"] = None + _instance_lock = threading.Lock() + _signal_handlers_installed = False + _original_sigterm_handler: signal.Handlers | None = None + _original_sigint_handler: signal.Handlers | None = None + + def __init__(self): + """Private constructor. Use get_instance() instead.""" + self._lock = threading.Lock() + self._drivers: dict[str, DriverInfo] = {} + self._pts_map: dict[str, str] = {} # target -> pts_path + self._ready_targets: set[str] = set() + self._process: Optional[subprocess.Popen] = None + self._monitor_thread: Optional[threading.Thread] = None + self._shutdown = threading.Event() + self._cleanup_done = False + + # Process configuration (must be same for all drivers) + self._demuxer_path: Optional[str] = None + self._device: Optional[str] = None + self._chip: Optional[str] = None + self._poll_interval: float = 1.0 + + # Register atexit handler for cleanup on normal exit + atexit.register(self._atexit_cleanup) + logger.debug("Registered atexit handler for demuxer cleanup") + + # Install signal handlers (only once globally) + self._install_signal_handlers() + + @classmethod + def get_instance(cls) -> "DemuxerManager": + """Get the singleton instance of DemuxerManager.""" + if cls._instance is None: + with cls._instance_lock: + if cls._instance is None: + cls._instance = cls() + return cls._instance + + @classmethod + def reset_instance(cls): + """Reset the singleton instance. Used for testing.""" + with cls._instance_lock: + if cls._instance is not None: + # Reset cleanup_done flag before cleanup to allow cleanup to run + cls._instance._cleanup_done = False + cls._instance._cleanup() + cls._instance = None + + def _atexit_cleanup(self): + """Cleanup handler called on normal program exit via atexit.""" + if self._cleanup_done: + return + logger.debug("atexit cleanup triggered") + self._cleanup() + + def _install_signal_handlers(self): + """Install signal handlers for SIGTERM and SIGINT. + + Handlers ensure cleanup is performed before the process terminates. + Only installs handlers once globally, and preserves original handlers. + """ + cls = type(self) + if cls._signal_handlers_installed: + return + + def make_handler(sig: signal.Signals) -> Callable[[int, any], None]: + """Create a signal handler that cleans up and re-raises the signal.""" + + def handler(signum: int, frame): + logger.debug("Signal %s received, cleaning up demuxer process", sig.name) + # Cleanup the demuxer process + if cls._instance is not None: + cls._instance._cleanup() + + # Restore original handler and re-raise signal + if sig == signal.SIGTERM and cls._original_sigterm_handler is not None: + signal.signal(signal.SIGTERM, cls._original_sigterm_handler) + elif sig == signal.SIGINT and cls._original_sigint_handler is not None: + signal.signal(signal.SIGINT, cls._original_sigint_handler) + + # Re-raise the signal to allow normal termination + os.kill(os.getpid(), signum) + + return handler + + try: + # Only install signal handlers from the main thread + if threading.current_thread() is not threading.main_thread(): + logger.debug("Not installing signal handlers from non-main thread") + return + + cls._original_sigterm_handler = signal.signal(signal.SIGTERM, make_handler(signal.SIGTERM)) + cls._original_sigint_handler = signal.signal(signal.SIGINT, make_handler(signal.SIGINT)) + cls._signal_handlers_installed = True + logger.debug("Installed signal handlers for SIGTERM and SIGINT") + except Exception as e: + logger.warning("Failed to install signal handlers: %s", e) + + def _validate_config(self, demuxer_path: str, device: str, chip: str, target: str): + """Validate driver configuration against existing drivers. + + Raises: + ValueError: If configuration doesn't match or target is duplicate + """ + if self._demuxer_path != demuxer_path: + raise ValueError(f"Demuxer path mismatch: existing={self._demuxer_path}, new={demuxer_path}") + if self._device != device: + raise ValueError(f"Device mismatch: existing={self._device}, new={device}") + if self._chip != chip: + raise ValueError(f"Chip mismatch: existing={self._chip}, new={chip}") + + # Check for duplicate target + for existing_driver in self._drivers.values(): + if existing_driver.target == target: + raise ValueError(f"Target '{target}' already registered by another driver") + + def register_driver( + self, + driver_id: str, + demuxer_path: str, + device: str, + chip: str, + target: str, + poll_interval: float = 1.0, + ) -> None: + """Register a driver instance with the manager. + + Args: + driver_id: Unique identifier for the driver + demuxer_path: Path to nv_tcu_demuxer binary + device: Device path or glob pattern + chip: Chip type (T234 or T264) + target: Target channel (e.g., "CCPLEX: 0") + poll_interval: Polling interval for device reconnection + + Raises: + ValueError: If configuration doesn't match existing process + """ + with self._lock: + # Validate configuration matches existing process + if self._drivers: + self._validate_config(demuxer_path, device, chip, target) + else: + # First driver - set process configuration + self._demuxer_path = demuxer_path + self._device = device + self._chip = chip + self._poll_interval = poll_interval + + # Register the driver + driver_info = DriverInfo(driver_id=driver_id, target=target) + self._drivers[driver_id] = driver_info + + logger.debug("Registered driver %s for target '%s'", driver_id, target) + + # Start monitor thread only once + if not self._monitor_thread or not self._monitor_thread.is_alive(): + self._start_monitor() + + def unregister_driver(self, driver_id: str) -> None: + """Unregister a driver instance. + + Args: + driver_id: Unique identifier for the driver + """ + with self._lock: + if driver_id in self._drivers: + target = self._drivers[driver_id].target + del self._drivers[driver_id] + logger.debug("Unregistered driver %s (target: %s)", driver_id, target) + + # Keep monitor running even if no drivers remain + + def get_pts_path(self, driver_id: str) -> str | None: + """Get the pts path for a registered driver. + + Args: + driver_id: Unique identifier for the driver + + Returns: + The pts path or None if not available + """ + with self._lock: + if driver_id not in self._drivers: + return None + target = self._drivers[driver_id].target + return self._pts_map.get(target) + + def is_ready(self, target: str) -> bool: + """Check if a target is ready. + + Args: + target: Target channel to check + + Returns: + True if the target is ready + """ + with self._lock: + return target in self._ready_targets + + def _start_monitor(self): + """Start the monitor thread.""" + self._shutdown.clear() + self._monitor_thread = threading.Thread( + target=self._monitor_loop, daemon=True, name="DemuxerManager-monitor" + ) + self._monitor_thread.start() + logger.debug("Started demuxer monitor thread") + + def _stop_monitor(self): + """Stop the monitor thread. + + This method is idempotent - safe to call multiple times. + """ + self._shutdown.set() + + # Terminate process if running + process = self._process + if process is not None: + logger.debug("Terminating demuxer process (PID %s)...", process.pid) + try: + # First try graceful termination + process.terminate() + try: + process.wait(timeout=5.0) + logger.debug("Demuxer process terminated gracefully") + except subprocess.TimeoutExpired: + # Force kill if it doesn't respond + logger.warning("Demuxer process did not terminate, killing...") + process.kill() + process.wait(timeout=2.0) + logger.debug("Demuxer process killed") + except ProcessLookupError: + # Process already dead + logger.debug("Demuxer process already exited") + except Exception as e: + logger.error("Error terminating demuxer process: %s", e) + finally: + self._process = None + + # Wait for monitor thread to exit + monitor_thread = self._monitor_thread + if monitor_thread is not None and monitor_thread.is_alive(): + # Don't join if we're being called from the monitor thread itself + if threading.current_thread() is not monitor_thread: + monitor_thread.join(timeout=2.0) + if monitor_thread.is_alive(): + logger.warning("Monitor thread did not exit within timeout") + self._monitor_thread = None + + logger.debug("Stopped demuxer monitor") + + def _cleanup(self): + """Clean up resources. + + This method is idempotent - safe to call multiple times. + Ensures the demuxer process is terminated on program exit. + """ + if self._cleanup_done: + logger.debug("Cleanup already done, skipping") + return + + logger.debug("Cleaning up DemuxerManager resources") + self._cleanup_done = True + + self._stop_monitor() + + with self._lock: + self._drivers.clear() + self._pts_map.clear() + self._ready_targets.clear() + + logger.info("DemuxerManager cleanup complete") + + def _monitor_loop(self): + """Background thread that manages demuxer lifecycle and auto-recovery.""" + while not self._shutdown.is_set(): + try: + self._run_demuxer_cycle() + except Exception as e: + logger.error("Error in demuxer monitor loop: %s", e) + # Clear ready state on error + with self._lock: + self._pts_map.clear() + self._ready_targets.clear() + # Always wait for poll interval before retrying + if self._shutdown.wait(timeout=self._poll_interval): + break + + def _run_demuxer_cycle(self): + """Run one cycle of: find device -> start demuxer -> monitor until exit.""" + # Wait for device to appear + resolved_device = self._wait_for_device() + if not resolved_device or self._shutdown.is_set(): + return + + # Start demuxer process + if not self._start_demuxer_process(resolved_device): + self._shutdown.wait(timeout=self._poll_interval) + return + + # Read and parse demuxer output (stdout and stderr concurrently) + stderr_thread = threading.Thread(target=self._read_demuxer_stderr, daemon=True) + stderr_thread.start() + + self._read_demuxer_output() + + # Wait for stderr thread to finish + stderr_thread.join(timeout=1.0) + + # Cleanup process + self._cleanup_demuxer_process() + + if not self._shutdown.is_set(): + logger.info("Device disconnected, will poll for reconnection...") + + def _wait_for_device(self) -> str | None: + """Wait for device to appear. Returns resolved device path or None if shutdown.""" + while not self._shutdown.is_set(): + resolved_device = _resolve_device(self._device) + if resolved_device: + logger.debug("Found device: %s", resolved_device) + return resolved_device + logger.debug("Device not found, polling... (pattern: %s)", self._device) + if self._shutdown.wait(timeout=self._poll_interval): + return None + return None + + def _start_demuxer_process(self, device: str) -> bool: + """Start the demuxer process. Returns True on success. + + On Linux, uses prctl(PR_SET_PDEATHSIG) to ensure the subprocess + receives SIGTERM when the parent dies (including kill -9). + """ + cmd = [self._demuxer_path, "-m", self._chip, "-d", device] + logger.debug("Starting demuxer: %s", " ".join(cmd)) + + # Get platform-specific preexec_fn (Linux: set parent death signal) + preexec_fn = _get_preexec_fn() + + try: + self._process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, # Line buffered + preexec_fn=preexec_fn, + ) + logger.debug("Demuxer process started with PID %d", self._process.pid) + return True + except (FileNotFoundError, PermissionError) as e: + logger.error("Failed to start demuxer: %s", e) + return False + + def _parse_demuxer_line(self, line: str) -> tuple[str | None, str | None]: + """Parse a demuxer output line to extract pts path and target. + + Returns: + Tuple of (pts_path, target) or (None, None) if not found + """ + parts = line.split("\t") + if len(parts) < 2: + return None, None + + # First part is the pts path, second part is the target name + pts_path = parts[0].strip() if parts[0].startswith("/dev/") else None + target = parts[1].strip() if len(parts) >= 2 else None + + return pts_path, target + + def _read_demuxer_stderr(self): + """Read demuxer stderr and check for catastrophic errors.""" + try: + for line in iter(self._process.stderr.readline, ""): + if self._shutdown.is_set(): + break + + line = line.strip() + if not line: + continue + + logger.warning("Demuxer stderr: %s", line) + + # Check for catastrophic file lock error + if "ERROR: unable to obtain file lock" in line: + logger.critical( + "Demuxer file lock error detected. Another instance may be running. " + "Terminating exporter to prevent conflicts." + ) + # Force immediate process termination + os._exit(1) + + except Exception as e: + logger.error("Error reading demuxer stderr: %s", e) + + def _read_demuxer_output(self): + """Read demuxer stdout and parse all pts paths.""" + try: + for line in iter(self._process.stdout.readline, ""): + if self._shutdown.is_set(): + break + + line = line.strip() + if not line: + continue + + logger.debug("Demuxer output: %s", line) + + # Parse line format: "\t" + pts_path, target = self._parse_demuxer_line(line) + + if pts_path and target: + logger.debug("Found pts path for target '%s': %s", target, pts_path) + + with self._lock: + self._pts_map[target] = pts_path + self._ready_targets.add(target) + + except Exception as e: + logger.error("Error reading demuxer output: %s", e) + + # Clear state when process ends + with self._lock: + self._pts_map.clear() + self._ready_targets.clear() + + def _cleanup_demuxer_process(self): + """Clean up the demuxer process and clear ready state.""" + if self._process: + try: + self._process.wait(timeout=1.0) + except subprocess.TimeoutExpired: + self._process.kill() + self._process.wait() + + exit_code = self._process.returncode + logger.info("Demuxer process exited with code %s", exit_code) + self._process = None + + with self._lock: + self._pts_map.clear() + self._ready_targets.clear() diff --git a/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py new file mode 100644 index 000000000..cc3a9e727 --- /dev/null +++ b/packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/manager_test.py @@ -0,0 +1,477 @@ +"""Tests for DemuxerManager.""" + +import os +import tempfile +import time +from unittest.mock import patch + +from .manager import DemuxerManager, _has_glob_chars, _resolve_device + + +def test_has_glob_chars(): + """Test glob character detection.""" + assert _has_glob_chars("/dev/ttyUSB*") is True + assert _has_glob_chars("/dev/serial/by-id/usb-NVIDIA_*-if01") is True + assert _has_glob_chars("/dev/tty[0-9]") is True + assert _has_glob_chars("/dev/ttyUSB?") is True + assert _has_glob_chars("/dev/ttyUSB0") is False + assert _has_glob_chars("/dev/serial/by-id/usb-NVIDIA_ABC123-if01") is False + + +def test_resolve_device_direct_path(): + """Test device resolution with direct path.""" + with tempfile.NamedTemporaryFile() as f: + result = _resolve_device(f.name) + assert result == f.name + + +def test_resolve_device_nonexistent(): + """Test device resolution with non-existent path.""" + result = _resolve_device("/dev/nonexistent_device_12345") + assert result is None + + +def test_resolve_device_glob_pattern(): + """Test device resolution with glob pattern.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create some test files + for name in ["device_001", "device_002", "device_003"]: + open(os.path.join(tmpdir, name), "w").close() + + pattern = os.path.join(tmpdir, "device_*") + result = _resolve_device(pattern) + + # Should return first match (sorted) + assert result == os.path.join(tmpdir, "device_001") + + +def test_resolve_device_glob_no_match(): + """Test device resolution with glob pattern that matches nothing.""" + result = _resolve_device("/dev/nonexistent_pattern_*") + assert result is None + + +class MockStderr: + """Mock stderr file-like object for testing.""" + + def __init__(self, stderr_lines=None, terminated_callback=None): + self.stderr_lines = stderr_lines or [] + self._line_index = 0 + self._terminated_callback = terminated_callback + + def readline(self): + if self._terminated_callback and self._terminated_callback(): + return "" + if self._line_index >= len(self.stderr_lines): + return "" + line = self.stderr_lines[self._line_index] + self._line_index += 1 + return line + "\n" + + +class MockPopen: + """Mock subprocess.Popen for testing.""" + + _next_pid = 1000 + + def __init__(self, stdout_lines, returncode=0, delay_per_line=0.01, block_after_lines=False, stderr_lines=None): + self.stdout_lines = stdout_lines + self.returncode = returncode + self.delay_per_line = delay_per_line + self.block_after_lines = block_after_lines + self._line_index = 0 + self.stdout = self + self.stderr = MockStderr(stderr_lines=stderr_lines, terminated_callback=lambda: self._terminated) + self._terminated = False + # Assign a mock PID + self.pid = MockPopen._next_pid + MockPopen._next_pid += 1 + + def readline(self): + if self._terminated: + return "" + if self._line_index >= len(self.stdout_lines): + if self.block_after_lines: + # Block until terminated (simulates long-running process) + while not self._terminated: + time.sleep(0.01) + return "" + time.sleep(self.delay_per_line) + line = self.stdout_lines[self._line_index] + self._line_index += 1 + return line + "\n" + + def wait(self, timeout=None): + return self.returncode + + def terminate(self): + self._terminated = True + + def kill(self): + self._terminated = True + + +def test_manager_singleton(): + """Test that DemuxerManager is a singleton.""" + # Reset singleton for testing + DemuxerManager.reset_instance() + + manager1 = DemuxerManager.get_instance() + manager2 = DemuxerManager.get_instance() + + assert manager1 is manager2 + + # Cleanup + DemuxerManager.reset_instance() + + +def test_single_driver_registration(): + """Test registering a single driver.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = [ + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + ) + + # Wait for demuxer to process output + time.sleep(0.5) + + # Verify pts path is available + pts_path = manager.get_pts_path("driver1") + assert pts_path == "/dev/pts/5" + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_multiple_drivers_single_process(): + """Test that multiple drivers share a single demuxer process.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = [ + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + "/dev/pts/7\tSCE: 2", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register three drivers + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + ) + + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="BPMP: 1", + ) + + manager.register_driver( + driver_id="driver3", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="SCE: 2", + ) + + # Wait for demuxer to process output + time.sleep(0.5) + + # Verify process was only started once + assert mock_popen.call_count == 1 + + # Verify all pts paths are available + assert manager.get_pts_path("driver1") == "/dev/pts/5" + assert manager.get_pts_path("driver2") == "/dev/pts/6" + assert manager.get_pts_path("driver3") == "/dev/pts/7" + + # Cleanup + manager.unregister_driver("driver1") + manager.unregister_driver("driver2") + manager.unregister_driver("driver3") + DemuxerManager.reset_instance() + + +def test_config_validation_demuxer_path_mismatch(): + """Test that mismatched demuxer_path raises error.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + ) + + # Try to register second driver with different demuxer_path + try: + manager.register_driver( + driver_id="driver2", + demuxer_path="/opt/nvidia/demuxer", # Different path + device=device_file.name, + chip="T264", + target="BPMP: 1", + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "Demuxer path mismatch" in str(e) + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_config_validation_device_mismatch(): + """Test that mismatched device raises error.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file1, tempfile.NamedTemporaryFile() as device_file2: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file1.name, + chip="T264", + target="CCPLEX: 0", + ) + + # Try to register second driver with different device + try: + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file2.name, # Different device + chip="T264", + target="BPMP: 1", + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "Device mismatch" in str(e) + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_config_validation_chip_mismatch(): + """Test that mismatched chip raises error.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + ) + + # Try to register second driver with different chip + try: + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T234", # Different chip + target="BPMP: 1", + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "Chip mismatch" in str(e) + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_duplicate_target_rejected(): + """Test that duplicate target registration is rejected.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = ["/dev/pts/5\tCCPLEX: 0"] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + ) + + # Try to register second driver with same target + try: + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", # Same target + ) + raise AssertionError("Should have raised ValueError") + except ValueError as e: + assert "already registered" in str(e) + + # Cleanup + manager.unregister_driver("driver1") + DemuxerManager.reset_instance() + + +def test_reference_counting(): + """Test that process starts/stops based on driver registration.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = [ + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_proc = MockPopen(stdout_lines, block_after_lines=True) + mock_popen.return_value = mock_proc + + manager = DemuxerManager.get_instance() + + # Initially no process should be running + assert manager._process is None + + # Register first driver - process should start + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + ) + + time.sleep(0.2) + assert mock_popen.call_count == 1 + + # Register second driver - process should NOT restart + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="BPMP: 1", + ) + + time.sleep(0.2) + assert mock_popen.call_count == 1 # Still just one call + + # Unregister first driver - process should continue + manager.unregister_driver("driver1") + time.sleep(0.2) + assert not mock_proc._terminated + + # Unregister second driver - process should still continue (monitor stays running) + manager.unregister_driver("driver2") + time.sleep(0.2) + assert not mock_proc._terminated + + # Cleanup + DemuxerManager.reset_instance() + + +def test_pts_path_available_for_ready_target(): + """Test that pts path is available for already-ready targets.""" + DemuxerManager.reset_instance() + + with tempfile.NamedTemporaryFile() as device_file: + stdout_lines = [ + "/dev/pts/5\tCCPLEX: 0", + "/dev/pts/6\tBPMP: 1", + ] + + with patch("jumpstarter_driver_pyserial.nvdemux.manager.subprocess.Popen") as mock_popen: + mock_popen.return_value = MockPopen(stdout_lines, block_after_lines=True) + + manager = DemuxerManager.get_instance() + + # Register first driver + manager.register_driver( + driver_id="driver1", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="CCPLEX: 0", + ) + + time.sleep(0.5) + assert manager.get_pts_path("driver1") == "/dev/pts/5" + + # Register second driver for already-ready target + manager.register_driver( + driver_id="driver2", + demuxer_path="/usr/bin/demuxer", + device=device_file.name, + chip="T264", + target="BPMP: 1", + ) + + # pts path should be immediately available + assert manager.get_pts_path("driver2") == "/dev/pts/6" + + # Cleanup + manager.unregister_driver("driver1") + manager.unregister_driver("driver2") + DemuxerManager.reset_instance()