This repository was archived by the owner on Jan 23, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 21
Nvdemux serial support #798
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
87221e1
Add nvdemux support to serial
mangelajo 9a42401
nvdemux: support multiple instances
mangelajo 8bf0a2e
nvdemux: add instructions to obtain nv_tcu_demuxer
mangelajo 52b2eb2
nvdemux: avoid callbacks within lock
mangelajo e7c4056
nvdemux: Do not stop the monitor and retry after reboots
mangelajo b544d27
nvdemux: improvements
mangelajo 45fa1a1
nvdemux: user anyio.Event
mangelajo cef414c
nvdemux: remove manager callback altogether
mangelajo d95d5c8
nvdemux: poll quicker for the device
mangelajo 856392e
Merge branch 'main' into nvdemux-serial
NickCao File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
120 changes: 120 additions & 0 deletions
120
packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| import time | ||
| from contextlib import asynccontextmanager | ||
| from dataclasses import dataclass, field | ||
| from typing import Optional | ||
|
|
||
| from anyio import sleep | ||
| from anyio._backends._asyncio import StreamReaderWrapper, StreamWriterWrapper | ||
| from serial_asyncio import open_serial_connection | ||
|
|
||
| from ..driver import AsyncSerial | ||
| from .manager import DemuxerManager | ||
| from jumpstarter.driver import Driver, exportstream | ||
|
|
||
| # Default glob pattern for NVIDIA Tegra On-Platform Operator devices | ||
| NV_DEVICE_PATTERN = "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01" | ||
|
|
||
|
|
||
| @dataclass(kw_only=True) | ||
| class NVDemuxSerial(Driver): | ||
| """Serial driver for NVIDIA TCU demultiplexed UART channels. | ||
|
|
||
| This driver wraps the nv_tcu_demuxer tool to extract a specific demultiplexed | ||
| UART channel (like CCPLEX) from a multiplexed serial device. Multiple driver | ||
| instances can share the same demuxer process by specifying different targets. | ||
|
|
||
| Args: | ||
| demuxer_path: Path to the nv_tcu_demuxer binary | ||
| device: Device path or glob pattern for auto-detection. | ||
| Default: /dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01 | ||
| target: Target channel to extract (e.g., "CCPLEX: 0", "BPMP: 1") | ||
| chip: Chip type for demuxer (T234 for Orin, T264 for Thor) | ||
| baudrate: Baud rate for the serial connection | ||
| cps: Characters per second throttling (optional) | ||
| timeout: Timeout waiting for demuxer to detect pts | ||
| poll_interval: Interval to poll for device reappearance after disconnect | ||
|
|
||
| Note: | ||
| Multiple instances can be created with different targets. All instances | ||
| must use the same demuxer_path, device, and chip configuration. | ||
| """ | ||
|
|
||
| demuxer_path: str | ||
| device: str = field(default=NV_DEVICE_PATTERN) | ||
| target: str = field(default="CCPLEX: 0") | ||
| chip: str = field(default="T264") | ||
| baudrate: int = field(default=115200) | ||
| cps: Optional[float] = field(default=None) | ||
| timeout: float = field(default=10.0) | ||
| poll_interval: float = field(default=1.0) | ||
|
|
||
| # Internal state (not init params) | ||
| _registered: bool = field(init=False, default=False) | ||
|
|
||
| def __post_init__(self): | ||
| if hasattr(super(), "__post_init__"): | ||
| super().__post_init__() | ||
|
|
||
| # Register with the DemuxerManager | ||
| manager = DemuxerManager.get_instance() | ||
| try: | ||
| manager.register_driver( | ||
| driver_id=str(self.uuid), | ||
| demuxer_path=self.demuxer_path, | ||
| device=self.device, | ||
| chip=self.chip, | ||
| target=self.target, | ||
| poll_interval=self.poll_interval, | ||
| ) | ||
| self._registered = True | ||
| except ValueError as e: | ||
| self.logger.error("Failed to register with DemuxerManager: %s", e) | ||
| raise | ||
|
|
||
|
|
||
| @classmethod | ||
| def client(cls) -> str: | ||
| return "jumpstarter_driver_pyserial.client.PySerialClient" | ||
|
|
||
| def close(self): | ||
| """Unregister from the DemuxerManager.""" | ||
| if self._registered: | ||
| manager = DemuxerManager.get_instance() | ||
| manager.unregister_driver(str(self.uuid)) | ||
| self._registered = False | ||
|
|
||
| super().close() | ||
|
|
||
| @exportstream | ||
| @asynccontextmanager | ||
| async def connect(self): | ||
| """Connect to the demultiplexed serial port. | ||
|
|
||
| Waits for the demuxer to be ready (device connected and pts path discovered) | ||
| before opening the serial connection. | ||
| """ | ||
| # Poll for pts path until available or timeout | ||
| manager = DemuxerManager.get_instance() | ||
| pts_start = time.monotonic() | ||
| pts_path = manager.get_pts_path(str(self.uuid)) | ||
| while not pts_path: | ||
| elapsed = time.monotonic() - pts_start | ||
| if elapsed >= self.timeout: | ||
| raise TimeoutError( | ||
| f"Timeout waiting for demuxer to become ready (device pattern: {self.device})" | ||
| ) | ||
| await sleep(0.1) | ||
| pts_path = manager.get_pts_path(str(self.uuid)) | ||
|
|
||
| cps_info = f", cps: {self.cps}" if self.cps is not None else "" | ||
| self.logger.info("Connecting to %s at %s, baudrate: %d%s", self.target, pts_path, self.baudrate, cps_info) | ||
|
|
||
| reader, writer = await open_serial_connection(url=pts_path, baudrate=self.baudrate, limit=1) | ||
| writer.transport.set_write_buffer_limits(high=4096, low=0) | ||
| async with AsyncSerial( | ||
| reader=StreamReaderWrapper(reader), | ||
| writer=StreamWriterWrapper(writer), | ||
| cps=self.cps, | ||
| ) as stream: | ||
| yield stream | ||
| self.logger.info("Disconnected from %s", pts_path) | ||
127 changes: 127 additions & 0 deletions
127
packages/jumpstarter-driver-pyserial/jumpstarter_driver_pyserial/nvdemux/driver_test.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| """Tests for NVDemuxSerial driver.""" | ||
|
|
||
| import tempfile | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| from .driver import NVDemuxSerial | ||
|
|
||
|
|
||
| def test_nvdemux_registration(): | ||
| """Test that driver registers with DemuxerManager on init.""" | ||
| with tempfile.NamedTemporaryFile() as device_file: | ||
| with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: | ||
| mock_manager = MagicMock() | ||
| mock_manager_class.get_instance.return_value = mock_manager | ||
|
|
||
| driver = NVDemuxSerial( | ||
| demuxer_path="/usr/bin/demuxer", | ||
| device=device_file.name, | ||
| target="CCPLEX: 0", | ||
| chip="T264", | ||
| timeout=0.1, | ||
| ) | ||
|
|
||
| try: | ||
| # Verify driver registered with manager | ||
| mock_manager.register_driver.assert_called_once() | ||
| call_kwargs = mock_manager.register_driver.call_args[1] | ||
| assert call_kwargs["driver_id"] == str(driver.uuid) | ||
| assert call_kwargs["demuxer_path"] == "/usr/bin/demuxer" | ||
| assert call_kwargs["device"] == device_file.name | ||
| assert call_kwargs["chip"] == "T264" | ||
| assert call_kwargs["target"] == "CCPLEX: 0" | ||
| finally: | ||
| driver.close() | ||
|
|
||
|
|
||
| def test_nvdemux_gets_pts_from_manager(): | ||
| """Test that connect() gets pts path from manager.""" | ||
| with tempfile.NamedTemporaryFile() as device_file: | ||
| with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: | ||
| mock_manager = MagicMock() | ||
| mock_manager_class.get_instance.return_value = mock_manager | ||
| mock_manager.get_pts_path.return_value = "/dev/pts/5" | ||
|
|
||
| driver = NVDemuxSerial( | ||
| demuxer_path="/usr/bin/demuxer", | ||
| device=device_file.name, | ||
| target="CCPLEX: 0", | ||
| timeout=0.1, | ||
| ) | ||
|
|
||
| try: | ||
| # Should call get_pts_path when checking pts availability | ||
| # (We can't test connect() easily without mocking serial, but we can test the logic) | ||
| pts_path = mock_manager.get_pts_path(str(driver.uuid)) | ||
| assert pts_path == "/dev/pts/5" | ||
| finally: | ||
| driver.close() | ||
|
|
||
|
|
||
| def test_nvdemux_unregisters_on_close(): | ||
| """Test that driver unregisters from manager on close.""" | ||
| with tempfile.NamedTemporaryFile() as device_file: | ||
| with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: | ||
| mock_manager = MagicMock() | ||
| mock_manager_class.get_instance.return_value = mock_manager | ||
|
|
||
| driver = NVDemuxSerial( | ||
| demuxer_path="/usr/bin/demuxer", | ||
| device=device_file.name, | ||
| target="CCPLEX: 0", | ||
| timeout=0.1, | ||
| ) | ||
|
|
||
| driver_id = str(driver.uuid) | ||
| driver.close() | ||
|
|
||
| # Verify driver unregistered | ||
| mock_manager.unregister_driver.assert_called_once_with(driver_id) | ||
|
|
||
|
|
||
| def test_nvdemux_default_values(): | ||
| """Test default parameter values.""" | ||
| with tempfile.NamedTemporaryFile() as device_file: | ||
| with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: | ||
| mock_manager = MagicMock() | ||
| mock_manager_class.get_instance.return_value = mock_manager | ||
|
|
||
| driver = NVDemuxSerial( | ||
| demuxer_path="/usr/bin/demuxer", | ||
| device=device_file.name, | ||
| timeout=0.1, | ||
| ) | ||
|
|
||
| try: | ||
| # Check defaults | ||
| assert driver.chip == "T264" | ||
| assert driver.target == "CCPLEX: 0" | ||
| assert driver.baudrate == 115200 | ||
| assert driver.poll_interval == 1.0 | ||
| finally: | ||
| driver.close() | ||
|
|
||
|
|
||
| def test_nvdemux_registration_error_propagates(): | ||
| """Test that registration errors are propagated.""" | ||
| with tempfile.NamedTemporaryFile() as device_file: | ||
| with patch("jumpstarter_driver_pyserial.nvdemux.driver.DemuxerManager") as mock_manager_class: | ||
| mock_manager = MagicMock() | ||
| mock_manager_class.get_instance.return_value = mock_manager | ||
| mock_manager.register_driver.side_effect = ValueError("Config mismatch") | ||
|
|
||
| try: | ||
| _driver = NVDemuxSerial( | ||
| demuxer_path="/usr/bin/demuxer", | ||
| device=device_file.name, | ||
| target="CCPLEX: 0", | ||
| timeout=0.1, | ||
| ) | ||
| raise AssertionError("Should have raised ValueError") | ||
| except ValueError as e: | ||
| assert "Config mismatch" in str(e) | ||
|
|
||
|
|
||
| def test_nvdemux_client_class(): | ||
| """Test that NVDemuxSerial uses PySerialClient.""" | ||
| assert NVDemuxSerial.client() == "jumpstarter_driver_pyserial.client.PySerialClient" |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come ready is already set but pts is still not available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hehe, that was not obvious for Claude either, I had to add this.
Because the manager can be ready (it stays up, ready to launch, start, restart the muxer), but the PTS could not be ready (the muxer just went offline, so no ports...) :)
This became important when I started powering off... trying to connect
ir power cycling and trying to connect too quickly, now it just wait until the muxer is up again and the PTS is ready :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You were actually right, I was confused by previous implementation, the ready event, and this was redundant, in the end I removed the callback and just kept the polling mechanism of this loop.