From 08aad9f8b8ed0ae18abd877cac9d67de0d03b552 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 24 Oct 2025 20:54:13 +0500 Subject: [PATCH 1/2] Switch to nebius sdk 0.3 --- pyproject.toml | 3 +- .../_internal/core/backends/nebius/compute.py | 4 +- .../core/backends/nebius/resources.py | 45 ++++++++++--------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5cceee68bf..6ec6a2339e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -189,8 +189,7 @@ oci = [ "dstack[server]", ] nebius = [ - # 0.2.73 breaks sdk backward compatibility: https://github.com/dstackai/dstack/issues/3171 - "nebius>=0.2.40,<=0.2.72; python_version >= '3.10'", + "nebius>=0.3.0,<0.4; python_version >= '3.10'", "dstack[server]", ] all = [ diff --git a/src/dstack/_internal/core/backends/nebius/compute.py b/src/dstack/_internal/core/backends/nebius/compute.py index 70a41dedd5..b41c31832e 100644 --- a/src/dstack/_internal/core/backends/nebius/compute.py +++ b/src/dstack/_internal/core/backends/nebius/compute.py @@ -379,9 +379,7 @@ def _wait_for_instance(sdk: SDK, op: SDKOperation[Operation]) -> None: op.status(), ) time.sleep(WAIT_FOR_INSTANCE_UPDATE_INTERVAL) - resources.LOOP.await_( - op.update(per_retry_timeout=resources.REQUEST_TIMEOUT, metadata=resources.REQUEST_MD) - ) + resources.LOOP.await_(op.update(per_retry_timeout=resources.REQUEST_TIMEOUT)) def _supported_instances(offer: InstanceOffer) -> bool: diff --git a/src/dstack/_internal/core/backends/nebius/resources.py b/src/dstack/_internal/core/backends/nebius/resources.py index 7e1eb9e2bc..90e732d2d6 100644 --- a/src/dstack/_internal/core/backends/nebius/resources.py +++ b/src/dstack/_internal/core/backends/nebius/resources.py @@ -8,7 +8,6 @@ from tempfile import NamedTemporaryFile from typing import Dict, Optional -from nebius.aio.authorization.options import options_to_metadata from nebius.aio.operation import Operation as SDKOperation from nebius.aio.service_error import RequestError, StatusCode from nebius.aio.token.renewable import OPTION_RENEW_REQUEST_TIMEOUT, OPTION_RENEW_SYNCHRONOUS @@ -66,13 +65,11 @@ LOOP = DaemonEventLoop() # Pass a timeout to all methods to avoid infinite waiting REQUEST_TIMEOUT = 10 -# Pass REQUEST_MD to all methods to avoid infinite retries in case of invalid credentials -REQUEST_MD = options_to_metadata( - { - OPTION_RENEW_SYNCHRONOUS: "true", - OPTION_RENEW_REQUEST_TIMEOUT: "5", - } -) +# Pass REQUEST_AUTH_OPTIONS to all methods to avoid infinite retries in case of invalid credentials +REQUEST_AUTH_OPTIONS = { + OPTION_RENEW_SYNCHRONOUS: "true", + OPTION_RENEW_REQUEST_TIMEOUT: "5", +} # disables log messages about errors such as invalid creds or expired timeouts logging.getLogger("nebius").setLevel(logging.CRITICAL) @@ -120,7 +117,7 @@ def wait_for_operation( if time.monotonic() + interval > deadline: raise TimeoutError(f"Operation {op.id} wait timeout") time.sleep(interval) - LOOP.await_(op.update(per_retry_timeout=REQUEST_TIMEOUT, metadata=REQUEST_MD)) + LOOP.await_(op.update(per_retry_timeout=REQUEST_TIMEOUT)) def get_region_to_project_id_map( @@ -156,7 +153,9 @@ def validate_regions(configured: set[str], available: set[str]) -> None: def list_tenant_projects(sdk: SDK) -> Sequence[Container]: tenants = LOOP.await_( TenantServiceClient(sdk).list( - ListTenantsRequest(), per_retry_timeout=REQUEST_TIMEOUT, metadata=REQUEST_MD + ListTenantsRequest(), + per_retry_timeout=REQUEST_TIMEOUT, + auth_options=REQUEST_AUTH_OPTIONS, ) ) if len(tenants.items) != 1: @@ -166,7 +165,7 @@ def list_tenant_projects(sdk: SDK) -> Sequence[Container]: ProjectServiceClient(sdk).list( ListProjectsRequest(parent_id=tenant_id, page_size=999), per_retry_timeout=REQUEST_TIMEOUT, - metadata=REQUEST_MD, + auth_options=REQUEST_AUTH_OPTIONS, ) ) return projects.items @@ -240,7 +239,7 @@ def get_default_subnet(sdk: SDK, project_id: str) -> Subnet: SubnetServiceClient(sdk).list( ListSubnetsRequest(parent_id=project_id, page_size=999), per_retry_timeout=REQUEST_TIMEOUT, - metadata=REQUEST_MD, + auth_options=REQUEST_AUTH_OPTIONS, ) ) for subnet in subnets.items: @@ -267,14 +266,18 @@ def create_disk( ) with wrap_capacity_errors(): return LOOP.await_( - client.create(request, per_retry_timeout=REQUEST_TIMEOUT, metadata=REQUEST_MD) + client.create( + request, per_retry_timeout=REQUEST_TIMEOUT, auth_options=REQUEST_AUTH_OPTIONS + ) ) def delete_disk(sdk: SDK, disk_id: str) -> None: LOOP.await_( DiskServiceClient(sdk).delete( - DeleteDiskRequest(id=disk_id), per_retry_timeout=REQUEST_TIMEOUT, metadata=REQUEST_MD + DeleteDiskRequest(id=disk_id), + per_retry_timeout=REQUEST_TIMEOUT, + auth_options=REQUEST_AUTH_OPTIONS, ) ) @@ -325,7 +328,9 @@ def create_instance( ) with wrap_capacity_errors(): return LOOP.await_( - client.create(request, per_retry_timeout=REQUEST_TIMEOUT, metadata=REQUEST_MD) + client.create( + request, per_retry_timeout=REQUEST_TIMEOUT, auth_options=REQUEST_AUTH_OPTIONS + ) ) @@ -334,7 +339,7 @@ def get_instance(sdk: SDK, instance_id: str) -> Instance: InstanceServiceClient(sdk).get( GetInstanceRequest(id=instance_id), per_retry_timeout=REQUEST_TIMEOUT, - metadata=REQUEST_MD, + auth_options=REQUEST_AUTH_OPTIONS, ) ) @@ -344,7 +349,7 @@ def delete_instance(sdk: SDK, instance_id: str) -> SDKOperation[Operation]: InstanceServiceClient(sdk).delete( DeleteInstanceRequest(id=instance_id), per_retry_timeout=REQUEST_TIMEOUT, - metadata=REQUEST_MD, + auth_options=REQUEST_AUTH_OPTIONS, ) ) @@ -358,17 +363,17 @@ def create_cluster(sdk: SDK, name: str, project_id: str, fabric: str) -> SDKOper spec=GpuClusterSpec(infiniband_fabric=fabric), ), per_retry_timeout=REQUEST_TIMEOUT, - metadata=REQUEST_MD, + auth_options=REQUEST_AUTH_OPTIONS, ) ) def delete_cluster(sdk: SDK, cluster_id: str) -> None: - return LOOP.await_( + LOOP.await_( GpuClusterServiceClient(sdk).delete( DeleteGpuClusterRequest(id=cluster_id), per_retry_timeout=REQUEST_TIMEOUT, - metadata=REQUEST_MD, + auth_options=REQUEST_AUTH_OPTIONS, ) ) From 38ccc6bf8da0b0ad9160d5805b1bfe6800f65d95 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 29 Oct 2025 13:21:08 +0500 Subject: [PATCH 2/2] Pass auth_options to op.update --- pyproject.toml | 2 +- src/dstack/_internal/core/backends/nebius/compute.py | 7 ++++++- src/dstack/_internal/core/backends/nebius/resources.py | 4 +++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index fa66b28094..de9c2d45c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -190,7 +190,7 @@ oci = [ "dstack[server]", ] nebius = [ - "nebius>=0.3.0,<0.4; python_version >= '3.10'", + "nebius>=0.3.4,<0.4; python_version >= '3.10'", "dstack[server]", ] all = [ diff --git a/src/dstack/_internal/core/backends/nebius/compute.py b/src/dstack/_internal/core/backends/nebius/compute.py index b41c31832e..06bae5b6fc 100644 --- a/src/dstack/_internal/core/backends/nebius/compute.py +++ b/src/dstack/_internal/core/backends/nebius/compute.py @@ -379,7 +379,12 @@ def _wait_for_instance(sdk: SDK, op: SDKOperation[Operation]) -> None: op.status(), ) time.sleep(WAIT_FOR_INSTANCE_UPDATE_INTERVAL) - resources.LOOP.await_(op.update(per_retry_timeout=resources.REQUEST_TIMEOUT)) + resources.LOOP.await_( + op.update( + per_retry_timeout=resources.REQUEST_TIMEOUT, + auth_options=resources.REQUEST_AUTH_OPTIONS, + ) + ) def _supported_instances(offer: InstanceOffer) -> bool: diff --git a/src/dstack/_internal/core/backends/nebius/resources.py b/src/dstack/_internal/core/backends/nebius/resources.py index 90e732d2d6..7fc4206300 100644 --- a/src/dstack/_internal/core/backends/nebius/resources.py +++ b/src/dstack/_internal/core/backends/nebius/resources.py @@ -117,7 +117,9 @@ def wait_for_operation( if time.monotonic() + interval > deadline: raise TimeoutError(f"Operation {op.id} wait timeout") time.sleep(interval) - LOOP.await_(op.update(per_retry_timeout=REQUEST_TIMEOUT)) + LOOP.await_( + op.update(per_retry_timeout=REQUEST_TIMEOUT, auth_options=REQUEST_AUTH_OPTIONS) + ) def get_region_to_project_id_map(