Add RCCL RAS and Inspector plugins to CVS cluster-mon#116
Open
nileshnegi wants to merge 24 commits intomainfrom
Open
Add RCCL RAS and Inspector plugins to CVS cluster-mon#116nileshnegi wants to merge 24 commits intomainfrom
nileshnegi wants to merge 24 commits intomainfrom
Conversation
… migration - backend/app/collectors/base.py: BaseCollector ABC with collect_timeout, asyncio.wait_for enforcement, supervisor-compatible run(), CollectorState, CollectorResult dataclass - backend/app/core/config.py: Complete Settings with _YamlSource, StorageConfig, RCCLConfig, backward-compat properties for existing main.py - backend/tests/: Initial test suite for BaseCollector and Settings
- Log debug message instead of silently swallowing exception in _update_node_status_via_app_state - Document shared timestamp design decision in latest_metrics - Fix env var leak in test_settings_env_nested_delimiter (use monkeypatch) - Remove conditional assertion in timeout test (unconditional assert)
- backend/app/core/ssh_port_forward.py: _run_bridge() bidirectional socketpair bridge (no ephemeral port allocation, no TOCTOU race) - backend/app/core/jump_host_pssh.py: Add _exec_lock to serialize concurrent exec() calls; fix exec_async() to use asyncio.to_thread; remove dead _make_proxy(); add open_port_forward() async ctx manager - backend/app/core/cvs_parallel_ssh_reliable.py: Add _pf_clients pool, _get_pf_transport(), open_port_forward(), update destroy_clients()
…tate/lifespan - GPUMetricsCollector, NICMetricsCollector now inherit BaseCollector - Both have name, poll_interval, collect_timeout, critical class attrs - Both implement collect() returning CollectorResult - NIC collector sub-methods now use await ssh_manager.exec_async() - AppState gains collectors, collector_tasks, collector_results, probe_requested - lifespan() uses REGISTERED_COLLECTORS + supervised _start_collector_task() - broadcast_metrics() uses per-client asyncio.wait_for (5s timeout) - periodic_host_probe() uses probe_requested event instead of fixed sleep - reload_configuration() updated to new Settings + collector pattern - Added tests/test_collectors.py with 6 tests for new collector interfaces
- Remove password partial logging (security: never log credentials) - Protect unreachable_hosts/reachable_hosts mutations with _exec_lock to prevent race conditions in concurrent ThreadPoolExecutor workers - Verify bare except: -> except Exception: consistency
- lifespan() initializes redis.asyncio client (degrades silently if unavailable) - GET /api/collectors/status with per-collector state + overall_status (healthy/degraded/critical based on critical flag per collector) - docker-compose.yml: add redis:7-alpine service with AOF persistence, healthcheck, depends_on for cluster-mon service - Tests for _compute_overall_status logic (healthy/degraded/critical cases)
- backend/app/models/rccl_models.py: RCCLSnapshot, RCCLJobState, NCCLFunction, RCCLCommunicator, RCCLRank, RCCLPeer, RCCLJobSummary, RCCLEvent, RCCLMarker - backend/app/collectors/rccl_data_store.py: Redis Stream-based ring buffer (atomic XADD+MAXLEN; replaces LPUSH+LTRIM pattern; time-range queries via IDs) - backend/app/collectors/rccl_ras_client.py: Async ncclras TCP client with version guards for SET FORMAT (prot 3+) and MONITOR (prot 4+) - backend/tests/mock_ncclras_server.py: Mock ncclras TCP server for unit tests - backend/tests/test_rccl_ras_client.py: Protocol client tests
…Phase 1)
- backend/app/collectors/rccl_collector.py: RCCLCollector(BaseCollector),
_pick_leader() via node_health_status, collect() with port-forward +
ncclras protocol, NO_JOB/UNREACHABLE/ERROR/HEALTHY/DEGRADED state machine,
Phase 1 text parser stub (requires fixture capture before full impl)
- backend/app/api/rccl_endpoints.py: GET /api/rccl/status, /communicators,
/communicators/{hash}, /events, POST /api/rccl/markers
- backend/app/main.py: RCCLCollector in REGISTERED_COLLECTORS, rccl_data_store
init, latest_rccl_snapshot, rccl_websocket_clients, /ws/rccl WebSocket
- backend/app/api/__init__.py: Register rccl_endpoints router
- main.py: Move Redis init before collector startup (correct ordering) - main.py: Replace deprecated asyncio.get_event_loop() with get_running_loop() - main.py: Remove dead collect_metrics_loop() function - jump_host_pssh.py: Use RLock instead of Lock to prevent _exec_lock deadlock when _execute_on_node() acquires lock inside exec() which already holds it - rccl_data_store.py: Fix b'data' -> 'data' key (decode_responses=True) - api/config.py: Replace simple_config import with app.core.config.settings - core/simple_config.py: Deleted (replaced by completed config.py)
…cket queues - jump_host_pssh.py: Replace RLock with separate _exec_lock (Lock) and _hosts_lock (Lock). _execute_on_node() uses _hosts_lock for unreachable/ reachable list mutations. exec() uses _exec_lock. No deadlock. - gpu_collector.py: All on-demand methods now use exec_async() instead of sync exec() — prevents event loop blocking from API handlers - main.py: ConnectionManager with per-client bounded asyncio.Queue and dedicated send tasks. broadcast_metrics() and broadcast_rccl() are non-blocking (put_nowait). Slow clients are auto-disconnected. - rccl_collector.py: Calls broadcast_rccl() after storing snapshot
- rccl_endpoints.py: Use RCCLMarker Pydantic model for /markers validation - api/config.py: Stop writing jump host password to cluster.yaml on disk - main.py: Replace backward-compat property usage with nested access; track intermediate _restart tasks; remove dead collect_metrics_loop() - jump_host_pssh.py: Verify jump_transport liveness in open_port_forward - rccl_data_store.py: Use config values for Redis maxlen instead of hardcoded - cvs_parallel_ssh_reliable.py: Replace print() with logger.debug() - tests: Convert asyncio.run() test to @pytest.mark.asyncio
…ages - src/pages/RCCLHealthPage.tsx: Job state banner, communicator grid with health badges, per-rank table, dead peers display - src/pages/RCCLTopologyPage.tsx: Peer mesh grid with live/dead status, node/GPU summary cards - src/pages/RCCLTimelinePage.tsx: Event timeline with time range selector, type filtering, training marker display - src/services/api.ts: RCCL API methods (status, communicators, events) - src/App.tsx: Routes for rccl-health, rccl-topology, rccl-timeline - src/components/Layout/Sidebar.tsx: Nav entries for the 3 RCCL pages
RCCL's RAS tool binary is rcclras (not ncclras, which is NVIDIA's). Updated all comments, docstrings, class names, and file names: - rccl_ras_client.py: all docstrings - rccl_collector.py: all docstrings and TODOs - mock_ncclras_server.py → mock_rcclras_server.py, MockRcclRasServer - test_rccl_ras_client.py: imports and docstring - RCCLHealthPage.tsx: subtitle text
- backend/app/collectors/rccl_text_parser.py: Regex-based parser for rcclras VERBOSE STATUS output (RCCL v2.28.3). Parses RCCL/HIP/driver versions, job summary, communicator groups with RUNNING/INCOMPLETE status, error sections with missing rank details and comm hashes. - backend/tests/test_rccl_text_parser.py: 20 tests against real captured output from a live 8-GPU MI300X job (healthy + degraded + connection reset) - backend/tests/fixtures/: 3 real fixture files captured from rcclras -v - rccl_collector.py: _parse_text_response() now delegates to RCCLTextParser
- main.py: Add asyncio.Lock to reload_configuration() preventing concurrent reload races; add SSH scaling ceiling comment; add logger.debug to ConnectionManager._sender exception handler; fix ConnectionManager to use max_queue_size parameter - gpu_collector.py: Remove %%% debug markers and dead _OLD method; replace datetime.utcnow() with datetime.now(timezone.utc) - nic_collector.py: Replace datetime.utcnow() with datetime.now(timezone.utc) - jump_host_pssh.py: Remove password length from log output - cvs_parallel_ssh_reliable.py: Replace print() with logger calls - tests/test_websocket_manager.py: Tests for ConnectionManager (broadcast, connect, slow client disconnection, cleanup) - tests/test_reload.py: Tests for reload config diff detection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
RCCL jobs often suffer from opaque failure modes: hangs from communicator deadlocks, performance loss from degraded network links, cascading failures when a single node becomes unresponsive, etc. Users have no unified way to observe RCCL's internal state during a live job — they resort to ad-hoc NCCL_DEBUG log analysis after the fact, losing critical temporal context.
Technical Details
CVS
cluster-moncan now monitor live RCCL jobs in real time across two complementary channels:Health monitoring via
rcclras: Connects directly to thercclrasTCP service embedded in every RCCL process. When a rank dies, hangs, or loses connectivity, the dashboard reflects it within one poll cycle — no log parsing, no application-level timeout required.Performance monitoring via the
RCCL Inspector plugin: Reads JSONL files produced by the RCCL Inspector profiler plugin (ext-profiler/inspector/) to surface per-collective bus bandwidth, algorithm bandwidth, and latency — broken down by rank. Identifies stragglers (the slowest rank relative to peers) automatically.These two features are complementary:
More info available at: https://github.com/ROCm/cvs/blob/users/nileshnegi/add-rcclras-inspector-support/cvs/monitors/cluster-mon/backend/app/collectors/cvs_rccl_monitoring_technical_report.md
Test Plan
Test Result
Submission Checklist