From a05cd81ebf897a5ef752d557f70b209e21508aa5 Mon Sep 17 00:00:00 2001 From: daiping8 Date: Tue, 20 Jan 2026 11:41:52 +0800 Subject: [PATCH 1/2] Refactor NodeHead module by cleaning up imports and moving service state conversion logic inline. Optimize flow insight dependency imports to load only when enabled. This improves code clarity and performance. Change-Id: I6f2ab36769aad55df6914d18345e8ab572e31fd7 --- .../ray/dashboard/modules/node/node_head.py | 94 ++++++++++--------- 1 file changed, 50 insertions(+), 44 deletions(-) diff --git a/python/ray/dashboard/modules/node/node_head.py b/python/ray/dashboard/modules/node/node_head.py index a5a2392d887..29d410a3b9c 100644 --- a/python/ray/dashboard/modules/node/node_head.py +++ b/python/ray/dashboard/modules/node/node_head.py @@ -27,11 +27,6 @@ DEBUG_AUTOSCALING_STATUS, env_integer, ) -from ray.autoscaler._private.util import ( - LoadMetricsSummary, - get_per_node_breakdown_as_dict, - parse_usage, -) from ray.core.generated import gcs_pb2, node_manager_pb2, node_manager_pb2_grpc from ray.dashboard.consts import ( DASHBOARD_AGENT_ADDR_IP_PREFIX, @@ -45,28 +40,6 @@ from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes from ray.dashboard.utils import async_loop_forever -from ray.util.insight import ( - create_http_insight_client, -) -from ray._private.test_utils import get_resource_usage -from ray.dashboard.modules.insight.insight_prompt import PROMPT_TEMPLATE -from flow_insight import ( - BatchNodePhysicalStatsEvent, - BatchNodePhysicalStats, - NodePhysicalStats, - NodeResourceUsage, - DeviceInfo, - BatchServicePhysicalStatsEvent, - ServicePhysicalStats, - ServiceState, - MemoryInfo, - DeviceType, - Service, - ServicePhysicalStatsRecord, - NodeMemoryInfo, - MetaInfoRegisterEvent, -) - logger = logging.getLogger(__name__) @@ -210,20 +183,6 @@ def get_internal_states(self): "module_lifetime_s": time.time() - self._module_start_time, } - def _to_service_state(self, state: str) -> ServiceState: - if state == "ALIVE": - return ServiceState.RUNNING - elif ( - state == "DEPENDENCIES_UNREADY" - or state == "PENDING_CREATION" - or state == "RESTARTING" - ): - return ServiceState.WAITING - elif state == "DEAD": - return ServiceState.TERMINATED - else: - return ServiceState.UNKNOWN - @async_loop_forever(10) async def _emit_node_physical_stats(self): insight_server_address = await self.gcs_client.async_internal_kv_get( @@ -233,6 +192,46 @@ async def _emit_node_physical_stats(self): ) if insight_server_address is None: return + + # flow/insight is optional; only import heavy deps if actually enabled. + from ray.util.insight import create_http_insight_client, is_flow_insight_enabled + + if not is_flow_insight_enabled(): + return + + from ray._private.test_utils import get_resource_usage + from ray.dashboard.modules.insight.insight_prompt import PROMPT_TEMPLATE + + from flow_insight import ( + BatchNodePhysicalStats, + BatchNodePhysicalStatsEvent, + BatchServicePhysicalStatsEvent, + DeviceInfo, + DeviceType, + MemoryInfo, + MetaInfoRegisterEvent, + NodeMemoryInfo, + NodePhysicalStats, + NodeResourceUsage, + Service, + ServicePhysicalStats, + ServicePhysicalStatsRecord, + ServiceState, + ) + + def to_service_state(state: str): + if state == "ALIVE": + return ServiceState.RUNNING + elif ( + state == "DEPENDENCIES_UNREADY" + or state == "PENDING_CREATION" + or state == "RESTARTING" + ): + return ServiceState.WAITING + elif state == "DEAD": + return ServiceState.TERMINATED + else: + return ServiceState.UNKNOWN self._insight_server_address = insight_server_address.decode() if self._insight_client is None: @@ -308,7 +307,7 @@ async def _emit_node_physical_stats(self): service_stats = ServicePhysicalStats( node_id=node_id, pid=actor_pid, - state=self._to_service_state(actor_info.get("state", "UNKNOWN")), + state=to_service_state(actor_info.get("state", "UNKNOWN")), required_resources=actor_info.get("requiredResources", {}), placement_id=actor_info.get("placementGroupId", None), cpu_percent=0, @@ -532,6 +531,7 @@ async def get_nodes_logical_resources(self) -> dict: if is_autoscaler_v2(): from ray.autoscaler.v2.schema import Stats from ray.autoscaler.v2.sdk import ClusterStatusParser + from ray.autoscaler._private.util import parse_usage try: # here we have a sync request @@ -580,9 +580,15 @@ async def get_nodes_logical_resources(self) -> dict: return {} status_dict = json.loads(status_string) + # Legacy autoscaler load metrics parsing is relatively heavy; import only + # when needed (this code path is hit only for the legacy autoscaler). + from ray.autoscaler._private.util import ( + LoadMetricsSummary, + get_per_node_breakdown_as_dict, + ) + lm_summary_dict = status_dict.get("load_metrics_report") - if lm_summary_dict: - lm_summary = LoadMetricsSummary(**lm_summary_dict) + lm_summary = LoadMetricsSummary(**lm_summary_dict) if lm_summary_dict else None node_logical_resources = get_per_node_breakdown_as_dict(lm_summary) return node_logical_resources if error is None else {} From 4bf8f19dd81d8e66cbad5b938f2f0f2df75681ed Mon Sep 17 00:00:00 2001 From: daiping8 Date: Tue, 20 Jan 2026 14:09:16 +0800 Subject: [PATCH 2/2] Enhance NodeHead module to handle load metrics more robustly by ensuring logical resources are only retrieved when usage data is available. Added a smoke test for the autoscaler v2 branch to prevent NameError regressions. This improves stability and test coverage. Change-Id: I7a588cbcb6ebb347b5b89c0416e28f1e266443bc --- .../ray/dashboard/modules/node/node_head.py | 6 +- .../dashboard/modules/node/tests/test_node.py | 64 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/python/ray/dashboard/modules/node/node_head.py b/python/ray/dashboard/modules/node/node_head.py index 29d410a3b9c..ef5728cf942 100644 --- a/python/ray/dashboard/modules/node/node_head.py +++ b/python/ray/dashboard/modules/node/node_head.py @@ -590,7 +590,11 @@ async def get_nodes_logical_resources(self) -> dict: lm_summary_dict = status_dict.get("load_metrics_report") lm_summary = LoadMetricsSummary(**lm_summary_dict) if lm_summary_dict else None - node_logical_resources = get_per_node_breakdown_as_dict(lm_summary) + node_logical_resources = ( + get_per_node_breakdown_as_dict(lm_summary) + if lm_summary and lm_summary.usage_by_node + else {} + ) return node_logical_resources if error is None else {} @routes.get("/nodes") diff --git a/python/ray/dashboard/modules/node/tests/test_node.py b/python/ray/dashboard/modules/node/tests/test_node.py index 65a15ba37d7..91ec879ba9b 100644 --- a/python/ray/dashboard/modules/node/tests/test_node.py +++ b/python/ray/dashboard/modules/node/tests/test_node.py @@ -5,6 +5,7 @@ import threading import time import traceback +import types from datetime import datetime, timedelta import pytest @@ -317,5 +318,68 @@ def _check_worker_pids(): wait_for_condition(_check_worker_pids, timeout=20) +@pytest.mark.asyncio +async def test_node_head_get_nodes_logical_resources_autoscaler_v2_smoke(monkeypatch): + """Smoke test autoscaler v2 branch for NameError regressions (e.g. parse_usage).""" + from ray.dashboard.modules.node.node_head import NodeHead + from ray.dashboard.subprocesses.module import SubprocessModuleConfig + + # Force autoscaler v2 branch. + import ray.autoscaler.v2.utils as autoscaler_v2_utils + + monkeypatch.setattr(autoscaler_v2_utils, "is_autoscaler_v2", lambda: True) + + # Stub parser to avoid depending on real reply schema. + import ray.autoscaler.v2.sdk as autoscaler_v2_sdk + + class _Usage: + def __init__(self, resource_name, used, total): + self.resource_name = resource_name + self.used = used + self.total = total + + class _Node: + def __init__(self, node_id): + self.node_id = node_id + self.resource_usage = types.SimpleNamespace( + usage=[_Usage("CPU", 1.0, 4.0)] + ) + + dummy_status = types.SimpleNamespace(active_nodes=[_Node("n1")], idle_nodes=[]) + monkeypatch.setattr( + autoscaler_v2_sdk.ClusterStatusParser, + "from_get_cluster_status_reply", + staticmethod(lambda reply, stats=None: dummy_status), + ) + + # Ensure parse_usage exists and returns an iterable of strings. + import ray.autoscaler._private.util as autoscaler_util + + monkeypatch.setattr(autoscaler_util, "parse_usage", lambda d, verbose=True: ["ok"]) + + class _StubGcsClient: + async def async_get_cluster_status(self): + return object() + + config = SubprocessModuleConfig( + cluster_id_hex="00" * 28, + gcs_address="127.0.0.1:6379", + session_name="session", + temp_dir="/tmp", + session_dir="/tmp/session", + logging_level="INFO", + logging_format="%(message)s", + log_dir="/tmp", + logging_filename="dashboard.log", + logging_rotate_bytes=1024 * 1024, + logging_rotate_backup_count=1, + socket_dir="/tmp", + ) + head = NodeHead(config) + head._gcs_client = _StubGcsClient() + + result = await head.get_nodes_logical_resources() + assert result == {"n1": "ok"} + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__]))