Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions packages/jumpstarter-driver-pyserial/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,108 @@ export:
| check_present | Check if the serial port exists during exporter initialization, disable if you are connecting to a dynamically created port (i.e. USB from your DUT) | bool | no | True |
| cps | Characters per second throttling limit. When set, data transmission will be throttled to simulate slow typing. Useful for devices that can't handle fast input | float | no | None |

## NVDemuxSerial Driver

The `NVDemuxSerial` driver provides serial access to NVIDIA Tegra demultiplexed UART channels using the [nv_tcu_demuxer](https://docs.nvidia.com/jetson/archives/r38.2.1/DeveloperGuide/AT/JetsonLinuxDevelopmentTools/TegraCombinedUART.html) tool. It automatically handles device reconnection when the target device restarts.

The nv_tcu_demuxer tool can be obtained from the NVIDIA Jetson BSP, at this path: `Linux_for_Tegra/tools/demuxer/nv_tcu_demuxer`.

### Multi-Instance Support

Multiple driver instances can share a single demuxer process by specifying different target channels. This allows simultaneous access to multiple UART channels (CCPLEX, BPMP, SCE, etc.) from the same physical device.

### Configuration

#### Single channel example:

```yaml
export:
ccplex:
type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial
config:
demuxer_path: "/opt/nvidia/nv_tcu_demuxer"
# device defaults to auto-detect NVIDIA Tegra On-Platform Operator
# chip defaults to T264 (Thor), use T234 for Orin
```

#### Multiple channels example:

```yaml
export:
ccplex:
type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial
config:
demuxer_path: "/opt/nvidia/nv_tcu_demuxer"
target: "CCPLEX: 0"
chip: "T264"

bpmp:
type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial
config:
demuxer_path: "/opt/nvidia/nv_tcu_demuxer"
target: "BPMP: 1"
chip: "T264"

sce:
type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial
config:
demuxer_path: "/opt/nvidia/nv_tcu_demuxer"
target: "SCE: 2"
chip: "T264"
```

### Config parameters

| Parameter | Description | Type | Required | Default |
| -------------- | ----------------------------------------------------------------------------------------------- | ----- | -------- | ------------------------------------------------------------------------- |
| demuxer_path | Path to the `nv_tcu_demuxer` binary | str | yes | |
| device | Device path or glob pattern for auto-detection | str | no | `/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01` |
| target | Target channel to extract from demuxer output | str | no | `CCPLEX: 0` |
| chip | Chip type for demuxer (`T234` for Orin, `T264` for Thor) | str | no | `T264` |
| baudrate | Baud rate for the serial connection | int | no | 115200 |
| cps | Characters per second throttling limit | float | no | None |
| timeout | Timeout in seconds waiting for demuxer to detect pts | float | no | 10.0 |
| poll_interval | Interval in seconds to poll for device reappearance after disconnect | float | no | 1.0 |

### Device Auto-Detection

The `device` parameter supports glob patterns for automatic device discovery:

```yaml
# Auto-detect any NVIDIA Tegra On-Platform Operator device (default)
device: "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01"

# Specific serial number
device: "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_ABC123-if01"

# Direct device path (no glob)
device: "/dev/ttyUSB0"
```

### Auto-Recovery

When the target device restarts (e.g., power cycle), the serial device disappears and the demuxer exits. The driver automatically:

1. Detects the device disconnection
2. Polls for the device to reappear
3. Restarts the demuxer with the new device
4. Discovers the new pts path (which changes on each restart)

Active connections will receive errors when the device disconnects. Clients should reconnect, and the driver will wait for the device to be available again.

### Configuration Validation / Limitations

When using multiple driver instances, all instances must have compatible configurations:

- **demuxer_path**: Must be identical across all instances
- **device**: Must be identical across all instances
- **chip**: Must be identical across all instances
- **target**: Must be unique for each instance (no duplicates allowed)

If these requirements are not met, the driver will raise a `ValueError` during initialization.



## CLI Commands

The pyserial driver provides two CLI commands for interacting with serial ports:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Optional

from anyio import sleep
from anyio._backends._asyncio import StreamReaderWrapper, StreamWriterWrapper
from serial_asyncio import open_serial_connection

from ..driver import AsyncSerial
from .manager import DemuxerManager
from jumpstarter.driver import Driver, exportstream

# Default glob pattern for NVIDIA Tegra On-Platform Operator devices
NV_DEVICE_PATTERN = "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01"


@dataclass(kw_only=True)
class NVDemuxSerial(Driver):
"""Serial driver for NVIDIA TCU demultiplexed UART channels.

This driver wraps the nv_tcu_demuxer tool to extract a specific demultiplexed
UART channel (like CCPLEX) from a multiplexed serial device. Multiple driver
instances can share the same demuxer process by specifying different targets.

Args:
demuxer_path: Path to the nv_tcu_demuxer binary
device: Device path or glob pattern for auto-detection.
Default: /dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01
target: Target channel to extract (e.g., "CCPLEX: 0", "BPMP: 1")
chip: Chip type for demuxer (T234 for Orin, T264 for Thor)
baudrate: Baud rate for the serial connection
cps: Characters per second throttling (optional)
timeout: Timeout waiting for demuxer to detect pts
poll_interval: Interval to poll for device reappearance after disconnect

Note:
Multiple instances can be created with different targets. All instances
must use the same demuxer_path, device, and chip configuration.
"""

demuxer_path: str
device: str = field(default=NV_DEVICE_PATTERN)
target: str = field(default="CCPLEX: 0")
chip: str = field(default="T264")
baudrate: int = field(default=115200)
cps: Optional[float] = field(default=None)
timeout: float = field(default=10.0)
poll_interval: float = field(default=1.0)

# Internal state (not init params)
_registered: bool = field(init=False, default=False)

def __post_init__(self):
if hasattr(super(), "__post_init__"):
super().__post_init__()

# Register with the DemuxerManager
manager = DemuxerManager.get_instance()
try:
manager.register_driver(
driver_id=str(self.uuid),
demuxer_path=self.demuxer_path,
device=self.device,
chip=self.chip,
target=self.target,
poll_interval=self.poll_interval,
)
self._registered = True
except ValueError as e:
self.logger.error("Failed to register with DemuxerManager: %s", e)
raise


@classmethod
def client(cls) -> str:
return "jumpstarter_driver_pyserial.client.PySerialClient"

def close(self):
"""Unregister from the DemuxerManager."""
if self._registered:
manager = DemuxerManager.get_instance()
manager.unregister_driver(str(self.uuid))
self._registered = False

super().close()

@exportstream
@asynccontextmanager
async def connect(self):
"""Connect to the demultiplexed serial port.

Waits for the demuxer to be ready (device connected and pts path discovered)
before opening the serial connection.
"""
# Poll for pts path until available or timeout
manager = DemuxerManager.get_instance()
pts_start = time.monotonic()
pts_path = manager.get_pts_path(str(self.uuid))
while not pts_path:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come ready is already set but pts is still not available?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hehe, that was not obvious for Claude either, I had to add this.

Because the manager can be ready (it stays up, ready to launch, start, restart the muxer), but the PTS could not be ready (the muxer just went offline, so no ports...) :)

This became important when I started powering off... trying to connect
ir power cycling and trying to connect too quickly, now it just wait until the muxer is up again and the PTS is ready :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were actually right, I was confused by previous implementation, the ready event, and this was redundant, in the end I removed the callback and just kept the polling mechanism of this loop.

elapsed = time.monotonic() - pts_start
if elapsed >= self.timeout:
raise TimeoutError(
f"Timeout waiting for demuxer to become ready (device pattern: {self.device})"
)
await sleep(0.1)
pts_path = manager.get_pts_path(str(self.uuid))

cps_info = f", cps: {self.cps}" if self.cps is not None else ""
self.logger.info("Connecting to %s at %s, baudrate: %d%s", self.target, pts_path, self.baudrate, cps_info)

reader, writer = await open_serial_connection(url=pts_path, baudrate=self.baudrate, limit=1)
writer.transport.set_write_buffer_limits(high=4096, low=0)
async with AsyncSerial(
reader=StreamReaderWrapper(reader),
writer=StreamWriterWrapper(writer),
cps=self.cps,
) as stream:
yield stream
self.logger.info("Disconnected from %s", pts_path)
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Tests for NVDemuxSerial driver."""

import tempfile
from unittest.mock import MagicMock, patch

from .driver import NVDemuxSerial


def test_nvdemux_registration():
"""Test that driver registers with DemuxerManager on init."""
with tempfile.NamedTemporaryFile() as device_file:
with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class:
mock_manager = MagicMock()
mock_manager_class.get_instance.return_value = mock_manager

driver = NVDemuxSerial(
demuxer_path="/usr/bin/demuxer",
device=device_file.name,
target="CCPLEX: 0",
chip="T264",
timeout=0.1,
)

try:
# Verify driver registered with manager
mock_manager.register_driver.assert_called_once()
call_kwargs = mock_manager.register_driver.call_args[1]
assert call_kwargs["driver_id"] == str(driver.uuid)
assert call_kwargs["demuxer_path"] == "/usr/bin/demuxer"
assert call_kwargs["device"] == device_file.name
assert call_kwargs["chip"] == "T264"
assert call_kwargs["target"] == "CCPLEX: 0"
finally:
driver.close()


def test_nvdemux_gets_pts_from_manager():
"""Test that connect() gets pts path from manager."""
with tempfile.NamedTemporaryFile() as device_file:
with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class:
mock_manager = MagicMock()
mock_manager_class.get_instance.return_value = mock_manager
mock_manager.get_pts_path.return_value = "/dev/pts/5"

driver = NVDemuxSerial(
demuxer_path="/usr/bin/demuxer",
device=device_file.name,
target="CCPLEX: 0",
timeout=0.1,
)

try:
# Should call get_pts_path when checking pts availability
# (We can't test connect() easily without mocking serial, but we can test the logic)
pts_path = mock_manager.get_pts_path(str(driver.uuid))
assert pts_path == "/dev/pts/5"
finally:
driver.close()


def test_nvdemux_unregisters_on_close():
"""Test that driver unregisters from manager on close."""
with tempfile.NamedTemporaryFile() as device_file:
with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class:
mock_manager = MagicMock()
mock_manager_class.get_instance.return_value = mock_manager

driver = NVDemuxSerial(
demuxer_path="/usr/bin/demuxer",
device=device_file.name,
target="CCPLEX: 0",
timeout=0.1,
)

driver_id = str(driver.uuid)
driver.close()

# Verify driver unregistered
mock_manager.unregister_driver.assert_called_once_with(driver_id)


def test_nvdemux_default_values():
"""Test default parameter values."""
with tempfile.NamedTemporaryFile() as device_file:
with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class:
mock_manager = MagicMock()
mock_manager_class.get_instance.return_value = mock_manager

driver = NVDemuxSerial(
demuxer_path="/usr/bin/demuxer",
device=device_file.name,
timeout=0.1,
)

try:
# Check defaults
assert driver.chip == "T264"
assert driver.target == "CCPLEX: 0"
assert driver.baudrate == 115200
assert driver.poll_interval == 1.0
finally:
driver.close()


def test_nvdemux_registration_error_propagates():
"""Test that registration errors are propagated."""
with tempfile.NamedTemporaryFile() as device_file:
with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class:
mock_manager = MagicMock()
mock_manager_class.get_instance.return_value = mock_manager
mock_manager.register_driver.side_effect = ValueError("Config mismatch")

try:
_driver = NVDemuxSerial(
demuxer_path="/usr/bin/demuxer",
device=device_file.name,
target="CCPLEX: 0",
timeout=0.1,
)
raise AssertionError("Should have raised ValueError")
except ValueError as e:
assert "Config mismatch" in str(e)


def test_nvdemux_client_class():
"""Test that NVDemuxSerial uses PySerialClient."""
assert NVDemuxSerial.client() == "jumpstarter_driver_pyserial.client.PySerialClient"
Loading
Loading