[TRTLLM-11872][perf] Multi-threading async media loading and optimizing video frame decoding in trtllm-serve#13034
[TRTLLM-11872][perf] Multi-threading async media loading and optimizing video frame decoding in trtllm-serve#13034yechank-nvidia wants to merge 5 commits intoNVIDIA:mainfrom
Conversation
📝 WalkthroughWalkthroughThe changes significantly refactor async media loading utilities to improve performance through global session reuse and concurrent operations while offloading CPU-bound work to thread executors. Video frame extraction logic is reworked to use forward scanning with optimized sampling, and modality retrieval is changed to concurrent gathering. Changes
Sequence DiagramsequenceDiagram
participant Caller as Caller
participant Tracker as MultimodalDataTracker
participant EventLoop as Event Loop
participant Gather as asyncio.gather()
participant RemoteA as Remote Server (Audio)
participant RemoteV as Remote Server (Video)
participant ThreadPool as Thread Pool Executor
Caller->>Tracker: retrieve_all_async()
Tracker->>EventLoop: Collect all coroutines<br/>(data + embeddings)<br/>across modalities
Tracker->>Gather: asyncio.gather(*all_coroutines)
par Concurrent Fetch
Gather->>RemoteA: fetch audio_1
Gather->>RemoteV: fetch video_1
Gather->>RemoteA: fetch audio_2
Gather->>RemoteV: fetch video_2
end
par CPU-Bound Offload
RemoteA->>ThreadPool: decode audio
RemoteV->>ThreadPool: extract frames
end
ThreadPool-->>Gather: decoded results
Gather-->>Tracker: all results
Tracker->>Tracker: Regroup by modality
Tracker-->>Caller: {modality: {data, embeddings}}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (5)
tensorrt_llm/inputs/utils.py (2)
34-45: Consider adding a cleanup function for the global session.The global
aiohttp.ClientSessionis created lazily but there's no correspondingclose_aiohttp_session()function for graceful shutdown. While connections are cleaned up on process exit, long-running servers that reload modules or undergo hot-restarts may leak connections.♻️ Proposed addition for session cleanup
async def _get_aiohttp_session() -> aiohttp.ClientSession: """Return the shared aiohttp.ClientSession, creating it on first call.""" global _global_aiohttp_session if _global_aiohttp_session is None or _global_aiohttp_session.closed: _global_aiohttp_session = aiohttp.ClientSession() return _global_aiohttp_session + + +async def _close_aiohttp_session() -> None: + """Close the shared aiohttp.ClientSession if open.""" + global _global_aiohttp_session + if _global_aiohttp_session is not None and not _global_aiohttp_session.closed: + await _global_aiohttp_session.close() + _global_aiohttp_session = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tensorrt_llm/inputs/utils.py` around lines 34 - 45, Add a cleanup function to close the lazily-created shared aiohttp session to avoid leaking connections: implement an async close_aiohttp_session() that checks the module-level _global_aiohttp_session (and its .closed flag), awaits its .close() if open, and sets _global_aiohttp_session to None; call or expose this function for graceful shutdowns and document using it alongside _get_aiohttp_session().
533-534: Addstrict=Truetozip()for defensive programming.While the lengths are guaranteed equal by construction (both derived from the same
asyncio.gathercall), usingstrict=Truewould catch any future bugs that might break this invariant.♻️ Proposed fix
- for modality, result in zip(modality_keys, results): + for modality, result in zip(modality_keys, results, strict=True): out[modality].append(result)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tensorrt_llm/inputs/utils.py` around lines 533 - 534, The loop pairing modality_keys and results should use defensive zip checking: change the zip call in the block that iterates "for modality, result in zip(modality_keys, results):" to use zip(..., strict=True) so mismatched lengths raise immediately; update any tests or callers if they rely on silent truncation. Ensure this change is applied where modality_keys and results are assembled in tensorrt_llm/inputs/utils.py so the strictness guards the invariant.tests/unittest/inputs/test_async_media_loading.py (3)
239-261: Timing-based test may be flaky under CI load.The concurrency test relies on timing (0.15s delay with 0.08s tolerance). While the approach is sound and the values are reasonable, heavily loaded CI runners might occasionally cause flakiness. Consider increasing the tolerance slightly (e.g., 0.12s) or adding
pytest.mark.flakyif flakiness is observed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unittest/inputs/test_async_media_loading.py` around lines 239 - 261, The timing-based concurrency assertion in the test uses DELAY = 0.15 and TOLERANCE = 0.08 which can be flaky on loaded CI runners; update the test to increase the tolerance (e.g., set TOLERANCE = 0.12) or mark the test flaky (e.g., add pytest.mark.flaky) so failures under high load are tolerated; specifically change the TOLERANCE constant referenced next to DELAY and keep the rest of the test (the _slow coroutine and the call to tracker.retrieve_all_async()) unchanged.
114-120: Temp file not cleaned up after test.Using
delete=Falsewithout explicit cleanup leaves test artifacts on disk. Consider using pytest'stmp_pathfixture or adding explicit cleanup.♻️ Proposed fix using tmp_path fixture
`@pytest.mark.asyncio` - async def test_load_audio_from_file(self): - with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: - path = _make_audio_file(f.name) + async def test_load_audio_from_file(self, tmp_path): + path = _make_audio_file(str(tmp_path / "test.wav")) result = await async_load_audio(path) audio_array, sample_rate = result assert isinstance(audio_array, np.ndarray) assert sample_rate == 16000🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unittest/inputs/test_async_media_loading.py` around lines 114 - 120, The test test_load_audio_from_file currently creates a temp file with tempfile.NamedTemporaryFile(delete=False) and never removes it; either switch the test to use pytest's tmp_path fixture to create a disposable path and write the WAV via _make_audio_file there (use tmp_path / "test.wav") or ensure explicit cleanup by removing the created file (os.remove(path)) in a finally/teardown after calling async_load_audio; update references to tempfile.NamedTemporaryFile, _make_audio_file, and async_load_audio accordingly so no temp artifacts remain.
134-135: Same temp file cleanup issue.This test also uses
delete=Falsewithout explicit cleanup. Apply the sametmp_pathfixture pattern suggested above.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unittest/inputs/test_async_media_loading.py` around lines 134 - 135, The test currently creates a temp WAV with tempfile.NamedTemporaryFile(..., delete=False) and never removes it; replace that pattern with the pytest tmp_path fixture: create a Path under tmp_path (e.g., tmp_path / "test.wav"), pass its str to _make_audio_file (or write the needed contents there), and use that path for the test so pytest will manage cleanup; update the test function signature to accept tmp_path and remove the tempfile.NamedTemporaryFile usage in test_async_media_loading.py around the _make_audio_file call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unittest/inputs/test_async_media_loading.py`:
- Line 17: Remove the unused import "defaultdict" from the top-level imports
(the line containing "from collections import defaultdict"); update any imports
only if actually referenced elsewhere (e.g., keep other collections imports
intact) so the unused symbol defaultdict is deleted to satisfy the
linter/pre-commit checks.
---
Nitpick comments:
In `@tensorrt_llm/inputs/utils.py`:
- Around line 34-45: Add a cleanup function to close the lazily-created shared
aiohttp session to avoid leaking connections: implement an async
close_aiohttp_session() that checks the module-level _global_aiohttp_session
(and its .closed flag), awaits its .close() if open, and sets
_global_aiohttp_session to None; call or expose this function for graceful
shutdowns and document using it alongside _get_aiohttp_session().
- Around line 533-534: The loop pairing modality_keys and results should use
defensive zip checking: change the zip call in the block that iterates "for
modality, result in zip(modality_keys, results):" to use zip(..., strict=True)
so mismatched lengths raise immediately; update any tests or callers if they
rely on silent truncation. Ensure this change is applied where modality_keys and
results are assembled in tensorrt_llm/inputs/utils.py so the strictness guards
the invariant.
In `@tests/unittest/inputs/test_async_media_loading.py`:
- Around line 239-261: The timing-based concurrency assertion in the test uses
DELAY = 0.15 and TOLERANCE = 0.08 which can be flaky on loaded CI runners;
update the test to increase the tolerance (e.g., set TOLERANCE = 0.12) or mark
the test flaky (e.g., add pytest.mark.flaky) so failures under high load are
tolerated; specifically change the TOLERANCE constant referenced next to DELAY
and keep the rest of the test (the _slow coroutine and the call to
tracker.retrieve_all_async()) unchanged.
- Around line 114-120: The test test_load_audio_from_file currently creates a
temp file with tempfile.NamedTemporaryFile(delete=False) and never removes it;
either switch the test to use pytest's tmp_path fixture to create a disposable
path and write the WAV via _make_audio_file there (use tmp_path / "test.wav") or
ensure explicit cleanup by removing the created file (os.remove(path)) in a
finally/teardown after calling async_load_audio; update references to
tempfile.NamedTemporaryFile, _make_audio_file, and async_load_audio accordingly
so no temp artifacts remain.
- Around line 134-135: The test currently creates a temp WAV with
tempfile.NamedTemporaryFile(..., delete=False) and never removes it; replace
that pattern with the pytest tmp_path fixture: create a Path under tmp_path
(e.g., tmp_path / "test.wav"), pass its str to _make_audio_file (or write the
needed contents there), and use that path for the test so pytest will manage
cleanup; update the test function signature to accept tmp_path and remove the
tempfile.NamedTemporaryFile usage in test_async_media_loading.py around the
_make_audio_file call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 6dcb14eb-8af7-4199-b355-3c424431e79d
📒 Files selected for processing (2)
tensorrt_llm/inputs/utils.pytests/unittest/inputs/test_async_media_loading.py
|
/bot run |
|
PR_Github #43226 [ run ] triggered by Bot. Commit: |
|
PR_Github #43226 [ run ] completed with state |
2ez4bz
left a comment
There was a problem hiding this comment.
Approving to unblock, but please consider the comments 🙏
a49f565 to
0aefb69
Compare
|
/bot run |
|
PR_Github #43347 [ run ] triggered by Bot. Commit: |
|
PR_Github #43347 [ run ] completed with state
|
0c8b86a to
8b3671a
Compare
|
/bot run |
|
PR_Github #43378 [ run ] triggered by Bot. Commit: |
|
PR_Github #43378 [ run ] completed with state
|
|
/bot run |
|
PR_Github #43441 [ run ] triggered by Bot. Commit: |
|
PR_Github #43441 [ run ] completed with state
|
8b3671a to
ac02802
Compare
|
/bot run |
|
PR_Github #43493 [ run ] triggered by Bot. Commit: |
|
PR_Github #43493 [ run ] completed with state
|
ac02802 to
a045c78
Compare
|
/bot run --disable-fail-fast |
Signed-off-by: yechank <161688079+yechank-nvidia@users.noreply.github.com>
Signed-off-by: yechank <161688079+yechank-nvidia@users.noreply.github.com>
Signed-off-by: yechank <161688079+yechank-nvidia@users.noreply.github.com>
a045c78 to
b6299b9
Compare
Signed-off-by: yechank <161688079+yechank-nvidia@users.noreply.github.com>
|
/bot run |
|
PR_Github #43757 [ run ] triggered by Bot. Commit: |
|
PR_Github #43757 [ run ] completed with state
|
|
/bot run |
|
PR_Github #43843 [ run ] triggered by Bot. Commit: |
|
PR_Github #43843 [ run ] completed with state
|
Summary
Two performance fixes in
tensorrt_llm/inputs/utils.pyfortrtllm-servemultimodal API requests.Fix 1: Truly Async Media Loading
async_load_image/async_load_video/async_load_audiowere blocking the event loop despite beingasync. CPU-bound work (PIL,cv2, soundfile) now runs in a thread pool viarun_in_executor. Additional improvements:Global
aiohttp.ClientSessionsingleton — reuses TCP connections instead of creating a new session per requestretrieve_all_asyncnow launches a singleasyncio.gatheracross all modalities simultaneously instead of gathering per-modality sequentiallyFix 2: Faster Video Frame Decoding
Before:
CAP_PROP_POS_FRAMESseek per frame — each seek decodes from the nearest keyframe to the target frame. Gets worse as GOP size grows (real H.264 videos: 1–8s keyframe intervals).After:
grab()— no per-frame seekImage.fromarray+ToTensorreplaced with directnumpy.transpose→torch.from_numpy(7.3× faster for tensor conversion)Results
Baseline: original per-frame seek + PIL conversion, 32 frames extracted, H.264.
Single request latency
Server throughput (8 concurrent workers)
Short videos benefit most — seek overhead dominates decode time for short clips and is eliminated entirely by the sequential scan.
Files Changed
tensorrt_llm/inputs/utils.pytests/unittest/inputs/test_async_media_loading.py(13 new tests)Summary by CodeRabbit
New Features
Bug Fixes
Tests