Skip to content

Add RCCL RAS and Inspector plugins to CVS cluster-mon#116

Open
nileshnegi wants to merge 24 commits intomainfrom
users/nileshnegi/add-rcclras-inspector-support
Open

Add RCCL RAS and Inspector plugins to CVS cluster-mon#116
nileshnegi wants to merge 24 commits intomainfrom
users/nileshnegi/add-rcclras-inspector-support

Conversation

@nileshnegi
Copy link
Copy Markdown

Motivation

RCCL jobs often suffer from opaque failure modes: hangs from communicator deadlocks, performance loss from degraded network links, cascading failures when a single node becomes unresponsive, etc. Users have no unified way to observe RCCL's internal state during a live job — they resort to ad-hoc NCCL_DEBUG log analysis after the fact, losing critical temporal context.

Technical Details

CVS cluster-mon can now monitor live RCCL jobs in real time across two complementary channels:

Health monitoring via rcclras: Connects directly to the rcclras TCP service embedded in every RCCL process. When a rank dies, hangs, or loses connectivity, the dashboard reflects it within one poll cycle — no log parsing, no application-level timeout required.

Performance monitoring via the RCCL Inspector plugin: Reads JSONL files produced by the RCCL Inspector profiler plugin (ext-profiler/inspector/) to surface per-collective bus bandwidth, algorithm bandwidth, and latency — broken down by rank. Identifies stragglers (the slowest rank relative to peers) automatically.

These two features are complementary:

Capability rcclras Inspector
Rank alive / dead / missing Yes No
Dead peer detection Yes No
Per-collective bus bandwidth (GB/s) No Yes
Per-collective latency (µs) No Yes
Straggler rank identification No Yes
Message size per collective No Yes

More info available at: https://github.com/ROCm/cvs/blob/users/nileshnegi/add-rcclras-inspector-support/cvs/monitors/cluster-mon/backend/app/collectors/cvs_rccl_monitoring_technical_report.md

Test Plan

Test Result

Submission Checklist

… migration

- backend/app/collectors/base.py: BaseCollector ABC with collect_timeout,
  asyncio.wait_for enforcement, supervisor-compatible run(), CollectorState,
  CollectorResult dataclass
- backend/app/core/config.py: Complete Settings with _YamlSource, StorageConfig,
  RCCLConfig, backward-compat properties for existing main.py
- backend/tests/: Initial test suite for BaseCollector and Settings
- Log debug message instead of silently swallowing exception in
  _update_node_status_via_app_state
- Document shared timestamp design decision in latest_metrics
- Fix env var leak in test_settings_env_nested_delimiter (use monkeypatch)
- Remove conditional assertion in timeout test (unconditional assert)
- backend/app/core/ssh_port_forward.py: _run_bridge() bidirectional
  socketpair bridge (no ephemeral port allocation, no TOCTOU race)
- backend/app/core/jump_host_pssh.py: Add _exec_lock to serialize
  concurrent exec() calls; fix exec_async() to use asyncio.to_thread;
  remove dead _make_proxy(); add open_port_forward() async ctx manager
- backend/app/core/cvs_parallel_ssh_reliable.py: Add _pf_clients pool,
  _get_pf_transport(), open_port_forward(), update destroy_clients()
…tate/lifespan

- GPUMetricsCollector, NICMetricsCollector now inherit BaseCollector
- Both have name, poll_interval, collect_timeout, critical class attrs
- Both implement collect() returning CollectorResult
- NIC collector sub-methods now use await ssh_manager.exec_async()
- AppState gains collectors, collector_tasks, collector_results, probe_requested
- lifespan() uses REGISTERED_COLLECTORS + supervised _start_collector_task()
- broadcast_metrics() uses per-client asyncio.wait_for (5s timeout)
- periodic_host_probe() uses probe_requested event instead of fixed sleep
- reload_configuration() updated to new Settings + collector pattern
- Added tests/test_collectors.py with 6 tests for new collector interfaces
- Remove password partial logging (security: never log credentials)
- Protect unreachable_hosts/reachable_hosts mutations with _exec_lock
  to prevent race conditions in concurrent ThreadPoolExecutor workers
- Verify bare except: -> except Exception: consistency
- lifespan() initializes redis.asyncio client (degrades silently if unavailable)
- GET /api/collectors/status with per-collector state + overall_status
  (healthy/degraded/critical based on critical flag per collector)
- docker-compose.yml: add redis:7-alpine service with AOF persistence,
  healthcheck, depends_on for cluster-mon service
- Tests for _compute_overall_status logic (healthy/degraded/critical cases)
- backend/app/models/rccl_models.py: RCCLSnapshot, RCCLJobState, NCCLFunction,
  RCCLCommunicator, RCCLRank, RCCLPeer, RCCLJobSummary, RCCLEvent, RCCLMarker
- backend/app/collectors/rccl_data_store.py: Redis Stream-based ring buffer
  (atomic XADD+MAXLEN; replaces LPUSH+LTRIM pattern; time-range queries via IDs)
- backend/app/collectors/rccl_ras_client.py: Async ncclras TCP client with
  version guards for SET FORMAT (prot 3+) and MONITOR (prot 4+)
- backend/tests/mock_ncclras_server.py: Mock ncclras TCP server for unit tests
- backend/tests/test_rccl_ras_client.py: Protocol client tests
…Phase 1)

- backend/app/collectors/rccl_collector.py: RCCLCollector(BaseCollector),
  _pick_leader() via node_health_status, collect() with port-forward +
  ncclras protocol, NO_JOB/UNREACHABLE/ERROR/HEALTHY/DEGRADED state machine,
  Phase 1 text parser stub (requires fixture capture before full impl)
- backend/app/api/rccl_endpoints.py: GET /api/rccl/status, /communicators,
  /communicators/{hash}, /events, POST /api/rccl/markers
- backend/app/main.py: RCCLCollector in REGISTERED_COLLECTORS, rccl_data_store
  init, latest_rccl_snapshot, rccl_websocket_clients, /ws/rccl WebSocket
- backend/app/api/__init__.py: Register rccl_endpoints router
- main.py: Move Redis init before collector startup (correct ordering)
- main.py: Replace deprecated asyncio.get_event_loop() with get_running_loop()
- main.py: Remove dead collect_metrics_loop() function
- jump_host_pssh.py: Use RLock instead of Lock to prevent _exec_lock deadlock
  when _execute_on_node() acquires lock inside exec() which already holds it
- rccl_data_store.py: Fix b'data' -> 'data' key (decode_responses=True)
- api/config.py: Replace simple_config import with app.core.config.settings
- core/simple_config.py: Deleted (replaced by completed config.py)
…cket queues

- jump_host_pssh.py: Replace RLock with separate _exec_lock (Lock) and
  _hosts_lock (Lock). _execute_on_node() uses _hosts_lock for unreachable/
  reachable list mutations. exec() uses _exec_lock. No deadlock.
- gpu_collector.py: All on-demand methods now use exec_async() instead of
  sync exec() — prevents event loop blocking from API handlers
- main.py: ConnectionManager with per-client bounded asyncio.Queue and
  dedicated send tasks. broadcast_metrics() and broadcast_rccl() are
  non-blocking (put_nowait). Slow clients are auto-disconnected.
- rccl_collector.py: Calls broadcast_rccl() after storing snapshot
- rccl_endpoints.py: Use RCCLMarker Pydantic model for /markers validation
- api/config.py: Stop writing jump host password to cluster.yaml on disk
- main.py: Replace backward-compat property usage with nested access;
  track intermediate _restart tasks; remove dead collect_metrics_loop()
- jump_host_pssh.py: Verify jump_transport liveness in open_port_forward
- rccl_data_store.py: Use config values for Redis maxlen instead of hardcoded
- cvs_parallel_ssh_reliable.py: Replace print() with logger.debug()
- tests: Convert asyncio.run() test to @pytest.mark.asyncio
…ages

- src/pages/RCCLHealthPage.tsx: Job state banner, communicator grid with
  health badges, per-rank table, dead peers display
- src/pages/RCCLTopologyPage.tsx: Peer mesh grid with live/dead status,
  node/GPU summary cards
- src/pages/RCCLTimelinePage.tsx: Event timeline with time range selector,
  type filtering, training marker display
- src/services/api.ts: RCCL API methods (status, communicators, events)
- src/App.tsx: Routes for rccl-health, rccl-topology, rccl-timeline
- src/components/Layout/Sidebar.tsx: Nav entries for the 3 RCCL pages
RCCL's RAS tool binary is rcclras (not ncclras, which is NVIDIA's).
Updated all comments, docstrings, class names, and file names:
- rccl_ras_client.py: all docstrings
- rccl_collector.py: all docstrings and TODOs
- mock_ncclras_server.py → mock_rcclras_server.py, MockRcclRasServer
- test_rccl_ras_client.py: imports and docstring
- RCCLHealthPage.tsx: subtitle text
- backend/app/collectors/rccl_text_parser.py: Regex-based parser for
  rcclras VERBOSE STATUS output (RCCL v2.28.3). Parses RCCL/HIP/driver
  versions, job summary, communicator groups with RUNNING/INCOMPLETE
  status, error sections with missing rank details and comm hashes.
- backend/tests/test_rccl_text_parser.py: 20 tests against real captured
  output from a live 8-GPU MI300X job (healthy + degraded + connection reset)
- backend/tests/fixtures/: 3 real fixture files captured from rcclras -v
- rccl_collector.py: _parse_text_response() now delegates to RCCLTextParser
- main.py: Add asyncio.Lock to reload_configuration() preventing
  concurrent reload races; add SSH scaling ceiling comment; add
  logger.debug to ConnectionManager._sender exception handler;
  fix ConnectionManager to use max_queue_size parameter
- gpu_collector.py: Remove %%% debug markers and dead _OLD method;
  replace datetime.utcnow() with datetime.now(timezone.utc)
- nic_collector.py: Replace datetime.utcnow() with datetime.now(timezone.utc)
- jump_host_pssh.py: Remove password length from log output
- cvs_parallel_ssh_reliable.py: Replace print() with logger calls
- tests/test_websocket_manager.py: Tests for ConnectionManager
  (broadcast, connect, slow client disconnection, cleanup)
- tests/test_reload.py: Tests for reload config diff detection
@nileshnegi nileshnegi requested a review from cijohnson April 6, 2026 15:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant