Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 55 additions & 45 deletions python/ray/dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@
DEBUG_AUTOSCALING_STATUS,
env_integer,
)
from ray.autoscaler._private.util import (
LoadMetricsSummary,
get_per_node_breakdown_as_dict,
parse_usage,
)
from ray.core.generated import gcs_pb2, node_manager_pb2, node_manager_pb2_grpc
from ray.dashboard.consts import (
DASHBOARD_AGENT_ADDR_IP_PREFIX,
Expand All @@ -45,28 +40,6 @@
from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
from ray.dashboard.utils import async_loop_forever

from ray.util.insight import (
create_http_insight_client,
)
from ray._private.test_utils import get_resource_usage
from ray.dashboard.modules.insight.insight_prompt import PROMPT_TEMPLATE
from flow_insight import (
BatchNodePhysicalStatsEvent,
BatchNodePhysicalStats,
NodePhysicalStats,
NodeResourceUsage,
DeviceInfo,
BatchServicePhysicalStatsEvent,
ServicePhysicalStats,
ServiceState,
MemoryInfo,
DeviceType,
Service,
ServicePhysicalStatsRecord,
NodeMemoryInfo,
MetaInfoRegisterEvent,
)

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -210,20 +183,6 @@ def get_internal_states(self):
"module_lifetime_s": time.time() - self._module_start_time,
}

def _to_service_state(self, state: str) -> ServiceState:
if state == "ALIVE":
return ServiceState.RUNNING
elif (
state == "DEPENDENCIES_UNREADY"
or state == "PENDING_CREATION"
or state == "RESTARTING"
):
return ServiceState.WAITING
elif state == "DEAD":
return ServiceState.TERMINATED
else:
return ServiceState.UNKNOWN

@async_loop_forever(10)
async def _emit_node_physical_stats(self):
insight_server_address = await self.gcs_client.async_internal_kv_get(
Expand All @@ -233,6 +192,46 @@ async def _emit_node_physical_stats(self):
)
if insight_server_address is None:
return

# flow/insight is optional; only import heavy deps if actually enabled.
from ray.util.insight import create_http_insight_client, is_flow_insight_enabled

if not is_flow_insight_enabled():
return

from ray._private.test_utils import get_resource_usage
from ray.dashboard.modules.insight.insight_prompt import PROMPT_TEMPLATE

from flow_insight import (
BatchNodePhysicalStats,
BatchNodePhysicalStatsEvent,
BatchServicePhysicalStatsEvent,
DeviceInfo,
DeviceType,
MemoryInfo,
MetaInfoRegisterEvent,
NodeMemoryInfo,
NodePhysicalStats,
NodeResourceUsage,
Service,
ServicePhysicalStats,
ServicePhysicalStatsRecord,
ServiceState,
)

def to_service_state(state: str):
if state == "ALIVE":
return ServiceState.RUNNING
elif (
state == "DEPENDENCIES_UNREADY"
or state == "PENDING_CREATION"
or state == "RESTARTING"
):
return ServiceState.WAITING
elif state == "DEAD":
return ServiceState.TERMINATED
else:
return ServiceState.UNKNOWN
self._insight_server_address = insight_server_address.decode()

if self._insight_client is None:
Expand Down Expand Up @@ -308,7 +307,7 @@ async def _emit_node_physical_stats(self):
service_stats = ServicePhysicalStats(
node_id=node_id,
pid=actor_pid,
state=self._to_service_state(actor_info.get("state", "UNKNOWN")),
state=to_service_state(actor_info.get("state", "UNKNOWN")),
required_resources=actor_info.get("requiredResources", {}),
placement_id=actor_info.get("placementGroupId", None),
cpu_percent=0,
Expand Down Expand Up @@ -532,6 +531,7 @@ async def get_nodes_logical_resources(self) -> dict:
if is_autoscaler_v2():
from ray.autoscaler.v2.schema import Stats
from ray.autoscaler.v2.sdk import ClusterStatusParser
from ray.autoscaler._private.util import parse_usage

try:
# here we have a sync request
Expand Down Expand Up @@ -580,11 +580,21 @@ async def get_nodes_logical_resources(self) -> dict:
return {}
status_dict = json.loads(status_string)

# Legacy autoscaler load metrics parsing is relatively heavy; import only
# when needed (this code path is hit only for the legacy autoscaler).
from ray.autoscaler._private.util import (
LoadMetricsSummary,
get_per_node_breakdown_as_dict,
)

lm_summary_dict = status_dict.get("load_metrics_report")
if lm_summary_dict:
lm_summary = LoadMetricsSummary(**lm_summary_dict)
lm_summary = LoadMetricsSummary(**lm_summary_dict) if lm_summary_dict else None

node_logical_resources = get_per_node_breakdown_as_dict(lm_summary)
node_logical_resources = (
get_per_node_breakdown_as_dict(lm_summary)
if lm_summary and lm_summary.usage_by_node
else {}
)
return node_logical_resources if error is None else {}

@routes.get("/nodes")
Expand Down
64 changes: 64 additions & 0 deletions python/ray/dashboard/modules/node/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import threading
import time
import traceback
import types
from datetime import datetime, timedelta

import pytest
Expand Down Expand Up @@ -317,5 +318,68 @@ def _check_worker_pids():
wait_for_condition(_check_worker_pids, timeout=20)


@pytest.mark.asyncio
async def test_node_head_get_nodes_logical_resources_autoscaler_v2_smoke(monkeypatch):
"""Smoke test autoscaler v2 branch for NameError regressions (e.g. parse_usage)."""
from ray.dashboard.modules.node.node_head import NodeHead
from ray.dashboard.subprocesses.module import SubprocessModuleConfig

# Force autoscaler v2 branch.
import ray.autoscaler.v2.utils as autoscaler_v2_utils

monkeypatch.setattr(autoscaler_v2_utils, "is_autoscaler_v2", lambda: True)

# Stub parser to avoid depending on real reply schema.
import ray.autoscaler.v2.sdk as autoscaler_v2_sdk

class _Usage:
def __init__(self, resource_name, used, total):
self.resource_name = resource_name
self.used = used
self.total = total

class _Node:
def __init__(self, node_id):
self.node_id = node_id
self.resource_usage = types.SimpleNamespace(
usage=[_Usage("CPU", 1.0, 4.0)]
)

dummy_status = types.SimpleNamespace(active_nodes=[_Node("n1")], idle_nodes=[])
monkeypatch.setattr(
autoscaler_v2_sdk.ClusterStatusParser,
"from_get_cluster_status_reply",
staticmethod(lambda reply, stats=None: dummy_status),
)

# Ensure parse_usage exists and returns an iterable of strings.
import ray.autoscaler._private.util as autoscaler_util

monkeypatch.setattr(autoscaler_util, "parse_usage", lambda d, verbose=True: ["ok"])

class _StubGcsClient:
async def async_get_cluster_status(self):
return object()

config = SubprocessModuleConfig(
cluster_id_hex="00" * 28,
gcs_address="127.0.0.1:6379",
session_name="session",
temp_dir="/tmp",
session_dir="/tmp/session",
logging_level="INFO",
logging_format="%(message)s",
log_dir="/tmp",
logging_filename="dashboard.log",
logging_rotate_bytes=1024 * 1024,
logging_rotate_backup_count=1,
socket_dir="/tmp",
)
head = NodeHead(config)
head._gcs_client = _StubGcsClient()

result = await head.get_nodes_logical_resources()
assert result == {"n1": "ok"}

if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))