diff --git a/python/ray/dashboard/modules/node/node_head.py b/python/ray/dashboard/modules/node/node_head.py index a5a2392d887..ef5728cf942 100644 --- a/python/ray/dashboard/modules/node/node_head.py +++ b/python/ray/dashboard/modules/node/node_head.py @@ -27,11 +27,6 @@ DEBUG_AUTOSCALING_STATUS, env_integer, ) -from ray.autoscaler._private.util import ( - LoadMetricsSummary, - get_per_node_breakdown_as_dict, - parse_usage, -) from ray.core.generated import gcs_pb2, node_manager_pb2, node_manager_pb2_grpc from ray.dashboard.consts import ( DASHBOARD_AGENT_ADDR_IP_PREFIX, @@ -45,28 +40,6 @@ from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes from ray.dashboard.utils import async_loop_forever -from ray.util.insight import ( - create_http_insight_client, -) -from ray._private.test_utils import get_resource_usage -from ray.dashboard.modules.insight.insight_prompt import PROMPT_TEMPLATE -from flow_insight import ( - BatchNodePhysicalStatsEvent, - BatchNodePhysicalStats, - NodePhysicalStats, - NodeResourceUsage, - DeviceInfo, - BatchServicePhysicalStatsEvent, - ServicePhysicalStats, - ServiceState, - MemoryInfo, - DeviceType, - Service, - ServicePhysicalStatsRecord, - NodeMemoryInfo, - MetaInfoRegisterEvent, -) - logger = logging.getLogger(__name__) @@ -210,20 +183,6 @@ def get_internal_states(self): "module_lifetime_s": time.time() - self._module_start_time, } - def _to_service_state(self, state: str) -> ServiceState: - if state == "ALIVE": - return ServiceState.RUNNING - elif ( - state == "DEPENDENCIES_UNREADY" - or state == "PENDING_CREATION" - or state == "RESTARTING" - ): - return ServiceState.WAITING - elif state == "DEAD": - return ServiceState.TERMINATED - else: - return ServiceState.UNKNOWN - @async_loop_forever(10) async def _emit_node_physical_stats(self): insight_server_address = await self.gcs_client.async_internal_kv_get( @@ -233,6 +192,46 @@ async def _emit_node_physical_stats(self): ) if insight_server_address is None: return + + # flow/insight is optional; only import heavy deps if actually enabled. + from ray.util.insight import create_http_insight_client, is_flow_insight_enabled + + if not is_flow_insight_enabled(): + return + + from ray._private.test_utils import get_resource_usage + from ray.dashboard.modules.insight.insight_prompt import PROMPT_TEMPLATE + + from flow_insight import ( + BatchNodePhysicalStats, + BatchNodePhysicalStatsEvent, + BatchServicePhysicalStatsEvent, + DeviceInfo, + DeviceType, + MemoryInfo, + MetaInfoRegisterEvent, + NodeMemoryInfo, + NodePhysicalStats, + NodeResourceUsage, + Service, + ServicePhysicalStats, + ServicePhysicalStatsRecord, + ServiceState, + ) + + def to_service_state(state: str): + if state == "ALIVE": + return ServiceState.RUNNING + elif ( + state == "DEPENDENCIES_UNREADY" + or state == "PENDING_CREATION" + or state == "RESTARTING" + ): + return ServiceState.WAITING + elif state == "DEAD": + return ServiceState.TERMINATED + else: + return ServiceState.UNKNOWN self._insight_server_address = insight_server_address.decode() if self._insight_client is None: @@ -308,7 +307,7 @@ async def _emit_node_physical_stats(self): service_stats = ServicePhysicalStats( node_id=node_id, pid=actor_pid, - state=self._to_service_state(actor_info.get("state", "UNKNOWN")), + state=to_service_state(actor_info.get("state", "UNKNOWN")), required_resources=actor_info.get("requiredResources", {}), placement_id=actor_info.get("placementGroupId", None), cpu_percent=0, @@ -532,6 +531,7 @@ async def get_nodes_logical_resources(self) -> dict: if is_autoscaler_v2(): from ray.autoscaler.v2.schema import Stats from ray.autoscaler.v2.sdk import ClusterStatusParser + from ray.autoscaler._private.util import parse_usage try: # here we have a sync request @@ -580,11 +580,21 @@ async def get_nodes_logical_resources(self) -> dict: return {} status_dict = json.loads(status_string) + # Legacy autoscaler load metrics parsing is relatively heavy; import only + # when needed (this code path is hit only for the legacy autoscaler). + from ray.autoscaler._private.util import ( + LoadMetricsSummary, + get_per_node_breakdown_as_dict, + ) + lm_summary_dict = status_dict.get("load_metrics_report") - if lm_summary_dict: - lm_summary = LoadMetricsSummary(**lm_summary_dict) + lm_summary = LoadMetricsSummary(**lm_summary_dict) if lm_summary_dict else None - node_logical_resources = get_per_node_breakdown_as_dict(lm_summary) + node_logical_resources = ( + get_per_node_breakdown_as_dict(lm_summary) + if lm_summary and lm_summary.usage_by_node + else {} + ) return node_logical_resources if error is None else {} @routes.get("/nodes") diff --git a/python/ray/dashboard/modules/node/tests/test_node.py b/python/ray/dashboard/modules/node/tests/test_node.py index 65a15ba37d7..91ec879ba9b 100644 --- a/python/ray/dashboard/modules/node/tests/test_node.py +++ b/python/ray/dashboard/modules/node/tests/test_node.py @@ -5,6 +5,7 @@ import threading import time import traceback +import types from datetime import datetime, timedelta import pytest @@ -317,5 +318,68 @@ def _check_worker_pids(): wait_for_condition(_check_worker_pids, timeout=20) +@pytest.mark.asyncio +async def test_node_head_get_nodes_logical_resources_autoscaler_v2_smoke(monkeypatch): + """Smoke test autoscaler v2 branch for NameError regressions (e.g. parse_usage).""" + from ray.dashboard.modules.node.node_head import NodeHead + from ray.dashboard.subprocesses.module import SubprocessModuleConfig + + # Force autoscaler v2 branch. + import ray.autoscaler.v2.utils as autoscaler_v2_utils + + monkeypatch.setattr(autoscaler_v2_utils, "is_autoscaler_v2", lambda: True) + + # Stub parser to avoid depending on real reply schema. + import ray.autoscaler.v2.sdk as autoscaler_v2_sdk + + class _Usage: + def __init__(self, resource_name, used, total): + self.resource_name = resource_name + self.used = used + self.total = total + + class _Node: + def __init__(self, node_id): + self.node_id = node_id + self.resource_usage = types.SimpleNamespace( + usage=[_Usage("CPU", 1.0, 4.0)] + ) + + dummy_status = types.SimpleNamespace(active_nodes=[_Node("n1")], idle_nodes=[]) + monkeypatch.setattr( + autoscaler_v2_sdk.ClusterStatusParser, + "from_get_cluster_status_reply", + staticmethod(lambda reply, stats=None: dummy_status), + ) + + # Ensure parse_usage exists and returns an iterable of strings. + import ray.autoscaler._private.util as autoscaler_util + + monkeypatch.setattr(autoscaler_util, "parse_usage", lambda d, verbose=True: ["ok"]) + + class _StubGcsClient: + async def async_get_cluster_status(self): + return object() + + config = SubprocessModuleConfig( + cluster_id_hex="00" * 28, + gcs_address="127.0.0.1:6379", + session_name="session", + temp_dir="/tmp", + session_dir="/tmp/session", + logging_level="INFO", + logging_format="%(message)s", + log_dir="/tmp", + logging_filename="dashboard.log", + logging_rotate_bytes=1024 * 1024, + logging_rotate_backup_count=1, + socket_dir="/tmp", + ) + head = NodeHead(config) + head._gcs_client = _StubGcsClient() + + result = await head.get_nodes_logical_resources() + assert result == {"n1": "ok"} + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__]))