From 331dba3fc8cb57d36375f1ee874168e8f50c0f7f Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Fri, 12 Jun 2026 14:48:23 -0700 Subject: [PATCH] feat(deployments): add reconcile controller and prerequisite DAG (#758) Introduce DeploymentsController with deployment/volume reconcilers, prerequisite gating, drift recovery, and orphan cleanup on top of the 755 plugin scaffold. Stacks on PR #280 (AIRCORE-755). AIRCORE-758 Signed-off-by: Tyler Bray --- plugins/nemo-deployments/README.md | 43 +- plugins/nemo-deployments/pyproject.toml | 3 + .../src/nemo_deployments_plugin/config.py | 28 + .../src/nemo_deployments_plugin/controller.py | 284 +++++++++++ .../src/nemo_deployments_plugin/entities.py | 36 +- .../reconciler/deployment_reconciler.py | 331 ++++++++++++ .../reconciler/drift_recovery.py | 72 +++ .../reconciler/listing.py | 74 +++ .../reconciler/orphan_cleanup.py | 38 ++ .../reconciler/prerequisite.py | 82 +++ .../reconciler/volume_mounts.py | 54 ++ .../reconciler/volume_reconciler.py | 93 ++++ .../src/nemo_deployments_plugin/types.py | 10 + .../integration/test_reconcile_docker.py | 20 + .../tests/unit/reconciler/conftest.py | 113 +++++ .../tests/unit/reconciler/test_controller.py | 283 +++++++++++ .../reconciler/test_deployment_reconciler.py | 478 ++++++++++++++++++ .../unit/reconciler/test_drift_recovery.py | 61 +++ .../tests/unit/reconciler/test_listing.py | 45 ++ .../unit/reconciler/test_prerequisite.py | 82 +++ .../unit/reconciler/test_volume_mounts.py | 39 ++ .../unit/reconciler/test_volume_reconciler.py | 45 ++ .../tests/unit/test_config.py | 30 +- .../tests/unit/test_service_startup.py | 6 + 24 files changed, 2335 insertions(+), 15 deletions(-) create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/deployment_reconciler.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/drift_recovery.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/listing.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/orphan_cleanup.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/prerequisite.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/volume_mounts.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/volume_reconciler.py create mode 100644 plugins/nemo-deployments/tests/integration/test_reconcile_docker.py create mode 100644 plugins/nemo-deployments/tests/unit/reconciler/conftest.py create mode 100644 plugins/nemo-deployments/tests/unit/reconciler/test_controller.py create mode 100644 plugins/nemo-deployments/tests/unit/reconciler/test_deployment_reconciler.py create mode 100644 plugins/nemo-deployments/tests/unit/reconciler/test_drift_recovery.py create mode 100644 plugins/nemo-deployments/tests/unit/reconciler/test_listing.py create mode 100644 plugins/nemo-deployments/tests/unit/reconciler/test_prerequisite.py create mode 100644 plugins/nemo-deployments/tests/unit/reconciler/test_volume_mounts.py create mode 100644 plugins/nemo-deployments/tests/unit/reconciler/test_volume_reconciler.py diff --git a/plugins/nemo-deployments/README.md b/plugins/nemo-deployments/README.md index 7623249ac5..25e3e2e3b9 100644 --- a/plugins/nemo-deployments/README.md +++ b/plugins/nemo-deployments/README.md @@ -1,16 +1,39 @@ # NeMo Deployments Plugin Substrate-agnostic deployment lifecycle for the NeMo Platform. This plugin provides -entity schemas, CRUD APIs, a `DeploymentBackend` ABC, and an executor registry. +entity schemas, CRUD APIs, a `DeploymentBackend` ABC, an executor registry, and a +background reconcile controller (`DeploymentsController`). -**Scope (this ticket):** scaffold only — entity types, v1 CRUD routes, backend contract, -and executor registry. Docker/K8s backends and the reconcile controller land in follow-on -tickets (756–758). +**Scope:** entity types, v1 CRUD routes, backend contract, executor registry, and the +reconcile controller (758). Docker/K8s backends land in follow-on tickets (756–757). ## Prerequisites - NeMo Platform workspace bootstrapped (`make bootstrap`, `nemo setup`) - Plugin enabled in root `pyproject.toml` (`enabled-plugins` includes `deployments`) +- At least one executor backend registered for live reconciliation (756+) + +## Controller + +Register `DeploymentsController` via the `nemo.controllers` entry point. The controller: + +- Paginates non-terminal deployment/volume lists (no 100-item cap) +- Reconciles volumes before deployments (puller→server ordering) +- Gates deployment create on mounted volumes reaching `BOUND` +- Writes status via the service-principal entity client +- Tracks list health separately: `_deployments_list_ok` and `_volumes_list_ok`; `is_healthy` is true only when **both** succeed +- Runs orphan substrate cleanup on a configurable interval (skipped when deployment list fails) + +Per-config drift backoff overrides live on `DeploymentConfig.driftRecovery` (`maxAttempts`, `baseDelaySeconds`, `maxDelaySeconds`); unset fields fall back to `DeploymentsConfig.controller`. + +## Deferred (follow-on tickets) + +| Item | Why deferred | +|------|----------------| +| Volume delete → `RELEASED` | Volume DELETE API removes the entity immediately; no `DELETING` state or `list_managed_volume_names` on the backend ABC yet | +| Volume orphan cleanup | Requires backend support to list substrate volumes without entities | +| Docker/K8s E2E | AIRCORE-756/757 — `BACKEND_CLASSES` empty until backends register | +| Per-volume executor routing | No `Volume.executor` field in 755; volumes use `default_executor` | ## API base path @@ -20,14 +43,16 @@ Cross-workspace bulk queries use the entity-store sentinel workspace ``-``: ``GET /apis/deployments/v1/workspaces/-/deployments?status_in=pending,starting`` -## Next steps - -- **756 / 757:** Docker and Kubernetes `DeploymentBackend` implementations -- **758:** Reconcile controller wiring status writes and backend lifecycle - ## Tests ```bash uv sync uv run pytest plugins/nemo-deployments/tests/unit -v ``` + +## Next steps + +- **[AIRCORE-756](https://linear.app/nvidia/issue/AIRCORE-756):** Docker `DeploymentBackend` — unblocks reconciler E2E and volume orphan cleanup +- **[AIRCORE-757](https://linear.app/nvidia/issue/AIRCORE-757):** Kubernetes `DeploymentBackend` +- **[AIRCORE-759](https://linear.app/nvidia/issue/AIRCORE-759):** Models/agents adoption projecting from plugin `Deployment` status +- **755 scaffold:** entity CRUD and executor registry ([PR #280](https://github.com/NVIDIA-NeMo/nemo-platform/pull/280)) diff --git a/plugins/nemo-deployments/pyproject.toml b/plugins/nemo-deployments/pyproject.toml index c4f9f8482e..e9d1f932bc 100644 --- a/plugins/nemo-deployments/pyproject.toml +++ b/plugins/nemo-deployments/pyproject.toml @@ -14,6 +14,9 @@ dependencies = [ [project.entry-points."nemo.services"] deployments = "nemo_deployments_plugin.service:DeploymentsService" +[project.entry-points."nemo.controllers"] +deployments = "nemo_deployments_plugin.controller:DeploymentsController" + [build-system] requires = ["hatchling"] build-backend = "hatchling.build" diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/config.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/config.py index 00bd0fa28b..570c42201b 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/config.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/config.py @@ -17,6 +17,30 @@ class ExecutorConfigEntry(BaseModel): config: dict[str, Any] = Field(default_factory=dict) +class ControllerConfig(BaseModel): + """Configuration for the deployments reconcile controller.""" + + interval_seconds: int = Field(default=5, gt=0, description="Reconciliation loop interval in seconds.") + drift_recovery_max_attempts: int = Field(default=5, ge=0, description="Max drift recovery attempts before FAILED.") + drift_recovery_base_delay_seconds: int = Field( + default=5, ge=0, description="Base delay for drift recovery backoff." + ) + drift_recovery_max_delay_seconds: int = Field( + default=300, ge=0, description="Max delay cap for drift recovery backoff." + ) + orphan_cleanup_every_n_cycles: int = Field( + default=6, + ge=0, + description="Run orphan substrate cleanup every N reconcile cycles (0 disables).", + ) + + @model_validator(mode="after") + def _validate_backoff(self) -> ControllerConfig: + if self.drift_recovery_base_delay_seconds > self.drift_recovery_max_delay_seconds: + raise ValueError("drift_recovery_base_delay_seconds must not exceed drift_recovery_max_delay_seconds") + return self + + class DeploymentsConfig(NemoConfig): plugin_name: ClassVar[str] = "deployments" plugin_description: ClassVar[str] = "Configuration for the NeMo Platform deployments plugin." @@ -29,6 +53,10 @@ class DeploymentsConfig(NemoConfig): default=None, description="Fallback executor when Deployment.executor is unset.", ) + controller: ControllerConfig = Field( + default_factory=ControllerConfig, + description="Deployment reconciler controller settings.", + ) port_range_start: int = Field(default=9000, description="Default Docker port range start.") port_range_end: int = Field(default=9100, description="Default Docker port range end.") diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py new file mode 100644 index 0000000000..baebb71428 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py @@ -0,0 +1,284 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Deployments reconcile controller — drives Deployment and Volume state machines.""" + +from __future__ import annotations + +import logging +from typing import ClassVar + +from nemo_deployments_plugin.backends.registry import ExecutorRegistry +from nemo_deployments_plugin.config import ControllerConfig, DeploymentsConfig +from nemo_deployments_plugin.entities import Deployment, DeploymentConfig, Volume +from nemo_deployments_plugin.reconciler.deployment_reconciler import DeploymentReconciler +from nemo_deployments_plugin.reconciler.listing import get_deployment_for_config_name, list_all_pages +from nemo_deployments_plugin.reconciler.orphan_cleanup import reconcile_orphans +from nemo_deployments_plugin.reconciler.volume_reconciler import VolumeReconciler +from nemo_deployments_plugin.types import NON_TERMINAL_DEPLOYMENT_STATUSES, NON_TERMINAL_VOLUME_STATUSES +from nemo_platform_plugin.controller import NemoController +from nemo_platform_plugin.entity_client import NemoEntitiesClient, NemoEntityConflictError +from nemo_platform_plugin.filter_ops import ComparisonOperation, FilterOperator + +logger = logging.getLogger(__name__) + + +class DeploymentsController(NemoController): + """Reconciles deployments and volumes against registered executor backends.""" + + name = "deployments" + dependencies: ClassVar[list[str]] = ["entities"] + + def __init__(self) -> None: + self._entities: NemoEntitiesClient | None = None + self._registry: ExecutorRegistry | None = None + self._controller_config: ControllerConfig | None = None + self._deployment_reconciler: DeploymentReconciler | None = None + self._volume_reconciler: VolumeReconciler | None = None + self._interval_seconds: float = 5.0 + self._cycle_count: int = 0 + self._deployments_list_ok: bool = True + self._volumes_list_ok: bool = True + + @property + def is_healthy(self) -> bool: + return self._deployments_list_ok and self._volumes_list_ok + + @property + def interval_seconds(self) -> float: + return self._interval_seconds + + @property + def entities(self) -> NemoEntitiesClient: + if self._entities is None: + raise RuntimeError("DeploymentsController.entities accessed before on_startup()") + return self._entities + + @property + def controller_config(self) -> ControllerConfig: + if self._controller_config is None: + raise RuntimeError("DeploymentsController.controller_config accessed before on_startup()") + return self._controller_config + + async def on_startup(self) -> None: + from nemo_deployments_plugin.backends.registry import ExecutorRegistry, ExecutorSpec + from nemo_platform.resources.entities import AsyncEntitiesResource + from nemo_platform_plugin.entity_client import NemoEntitiesClient + from nemo_platform_plugin.sdk_provider import get_async_platform_sdk + + config = DeploymentsConfig.get() + self._controller_config = config.controller + self._interval_seconds = float(config.controller.interval_seconds) + + sdk = get_async_platform_sdk(as_service="deployments", internal=True) + entities_api = AsyncEntitiesResource(sdk) + self._entities = NemoEntitiesClient(entities_api) + + specs = [ExecutorSpec(name=e.name, backend=e.backend, config=e.config) for e in config.executors] + if specs: + registry = ExecutorRegistry.from_config( + sdk, + specs, + default_executor=config.default_executor, + ) + else: + registry = ExecutorRegistry.empty() + self._registry = registry + + self._deployment_reconciler = DeploymentReconciler(self.entities, registry, config.controller) + self._volume_reconciler = VolumeReconciler(self.entities, registry) + logger.info("DeploymentsController started.") + + async def on_shutdown(self) -> None: + if self._registry is not None: + self._registry.shutdown_all() + logger.info("DeploymentsController shut down.") + + async def reconcile(self) -> None: + self._cycle_count += 1 + assert self._deployment_reconciler is not None + assert self._volume_reconciler is not None + assert self._registry is not None + + deployments = await self._list_deployments() + volumes = await self._list_volumes() + configs = await self._load_configs(deployments) + self._deployment_reconciler.set_config_cache(configs) + + by_name, by_config = _index_deployments(deployments) + volumes_by_name = _index_volumes(volumes) + await self._ensure_volume_refs_loaded(configs, volumes_by_name) + await self._ensure_prerequisite_refs_loaded(configs, by_name, by_config) + + for volume in volumes: + try: + await self._volume_reconciler.reconcile_one(volume) + except NemoEntityConflictError: + logger.debug("Optimistic lock conflict on volume '%s' — retry next cycle.", volume.name) + except Exception: + logger.exception("Error reconciling volume %s/%s", volume.workspace, volume.name) + + for deployment in deployments: + try: + await self._deployment_reconciler.reconcile_one( + deployment, + deployments_by_config=by_config, + deployments_by_name=by_name, + volumes_by_name=volumes_by_name, + ) + except NemoEntityConflictError: + logger.debug("Optimistic lock conflict on deployment '%s' — retry next cycle.", deployment.name) + except Exception: + logger.exception("Error reconciling deployment %s/%s", deployment.workspace, deployment.name) + + orphan_every = self.controller_config.orphan_cleanup_every_n_cycles + if orphan_every > 0 and self._cycle_count % orphan_every == 0 and self._deployments_list_ok: + known_ids = {f"{d.workspace}/{d.name}" for d in deployments} + await reconcile_orphans(self._registry.all_backends(), known_ids) + + async def list_objects(self) -> list: + raise NotImplementedError("DeploymentsController uses reconcile() override") + + async def reconcile_one(self, obj: object) -> None: + raise NotImplementedError("DeploymentsController uses reconcile() override") + + async def _list_deployments(self) -> list[Deployment]: + try: + deployments = await list_all_pages( + self.entities, + Deployment, + filter_operation=ComparisonOperation( + operator=FilterOperator.IN, + field="status", + value=list(NON_TERMINAL_DEPLOYMENT_STATUSES), + ), + ) + self._deployments_list_ok = True + return deployments + except Exception: + logger.exception("Failed to list non-terminal deployments") + self._deployments_list_ok = False + return [] + + async def _list_volumes(self) -> list[Volume]: + try: + volumes = await list_all_pages( + self.entities, + Volume, + filter_operation=ComparisonOperation( + operator=FilterOperator.IN, + field="status", + value=list(NON_TERMINAL_VOLUME_STATUSES), + ), + ) + self._volumes_list_ok = True + return volumes + except Exception: + logger.exception("Failed to list non-terminal volumes") + self._volumes_list_ok = False + return [] + + async def _load_configs(self, deployments: list[Deployment]) -> dict[tuple[str, str], DeploymentConfig]: + configs: dict[tuple[str, str], DeploymentConfig] = {} + for deployment in deployments: + key = (deployment.workspace, deployment.deployment_config_name) + if key in configs: + continue + try: + configs[key] = await self.entities.get( + DeploymentConfig, + name=deployment.deployment_config_name, + workspace=deployment.workspace, + ) + except Exception: + logger.warning( + "Failed to load DeploymentConfig '%s' in workspace '%s'", + deployment.deployment_config_name, + deployment.workspace, + exc_info=True, + ) + return configs + + async def _ensure_volume_refs_loaded( + self, + configs: dict[tuple[str, str], DeploymentConfig], + volumes_by_name: dict[tuple[str, str], Volume], + ) -> None: + from nemo_deployments_plugin.reconciler.volume_mounts import collect_volume_mount_names + + for (workspace, _config_name), config in configs.items(): + for mount_name in collect_volume_mount_names(config): + key = (workspace, mount_name) + if key in volumes_by_name: + continue + try: + volumes_by_name[key] = await self.entities.get( + Volume, + name=mount_name, + workspace=workspace, + ) + except Exception: + logger.debug( + "Volume '%s' not yet available in workspace '%s'", + mount_name, + workspace, + ) + + async def _ensure_prerequisite_refs_loaded( + self, + configs: dict[tuple[str, str], DeploymentConfig], + by_name: dict[tuple[str, str], Deployment], + by_config: dict[tuple[str, str], Deployment], + ) -> None: + """Load prerequisite deployments from entity store (including terminal states).""" + from nemo_deployments_plugin.validation import prerequisite_names + + for (workspace, _config_name), config in configs.items(): + for prereq_config_name in prerequisite_names(config.prerequisites): + key_by_config = (workspace, prereq_config_name) + if key_by_config in by_config or (workspace, prereq_config_name) in by_name: + continue + try: + dep = await get_deployment_for_config_name( + self.entities, + workspace=workspace, + config_name=prereq_config_name, + ) + if dep is None: + raise LookupError(prereq_config_name) + except Exception: + logger.debug( + "Prerequisite deployment '%s' not yet available in workspace '%s'", + prereq_config_name, + workspace, + ) + continue + by_name[(workspace, dep.name)] = dep + by_config[(workspace, dep.deployment_config_name)] = dep + if dep.name != prereq_config_name: + by_config[key_by_config] = dep + + +def _index_volumes(volumes: list[Volume]) -> dict[tuple[str, str], Volume]: + return {(volume.workspace, volume.name): volume for volume in volumes} + + +def _index_deployments( + deployments: list[Deployment], +) -> tuple[dict[tuple[str, str], Deployment], dict[tuple[str, str], Deployment]]: + by_name: dict[tuple[str, str], Deployment] = {} + by_config: dict[tuple[str, str], Deployment] = {} + for deployment in deployments: + by_name[(deployment.workspace, deployment.name)] = deployment + config_key = (deployment.workspace, deployment.deployment_config_name) + existing = by_config.get(config_key) + if existing is None or deployment.name == deployment.deployment_config_name: + by_config[config_key] = deployment + elif existing.name != deployment.deployment_config_name: + logger.warning( + "Multiple deployments share DeploymentConfig '%s' in workspace '%s'; prerequisite lookup uses '%s'", + deployment.deployment_config_name, + deployment.workspace, + by_config[config_key].name, + ) + return by_name, by_config diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/entities.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/entities.py index e914bcf3a4..84be48fe71 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/entities.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/entities.py @@ -18,11 +18,12 @@ DesiredState, DriftRecoveryAction, Endpoint, + PrerequisiteCondition, RestartPolicy, VolumeStatus, ) from nemo_platform_plugin.entity import NemoEntity -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator class EnvVar(BaseModel): @@ -178,13 +179,42 @@ class VolumeBackendConfig(BaseModel): class DriftRecoveryPolicy(BaseModel): action: DriftRecoveryAction = "recreate" + max_attempts: int | None = Field( + default=None, + description="Override controller drift_recovery_max_attempts when set.", + ) + base_delay_seconds: int | None = Field( + default=None, + description="Override controller drift_recovery_base_delay_seconds when set.", + ) + max_delay_seconds: int | None = Field( + default=None, + description="Override controller drift_recovery_max_delay_seconds when set.", + ) + + @model_validator(mode="after") + def _validate_delays(self) -> DriftRecoveryPolicy: + if ( + self.base_delay_seconds is not None + and self.max_delay_seconds is not None + and self.base_delay_seconds > self.max_delay_seconds + ): + raise ValueError("base_delay_seconds must not exceed max_delay_seconds") + return self class Prerequisite(BaseModel): deployment_name: str = Field( description=( - "Name of another DeploymentConfig in the same workspace that must reach " - "a ready terminal state before this config's deployment may start." + "Name of another DeploymentConfig in the same workspace whose corresponding " + "Deployment must satisfy condition before this deployment may start." + ), + ) + condition: PrerequisiteCondition = Field( + default="succeeded", + description=( + "ready: prerequisite Deployment.status == READY. " + "succeeded: prerequisite Deployment.status == SUCCEEDED with exit_code == 0." ), ) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/deployment_reconciler.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/deployment_reconciler.py new file mode 100644 index 0000000000..69ea373491 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/deployment_reconciler.py @@ -0,0 +1,331 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Deployment state machine reconciliation against DeploymentBackend.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone + +from nemo_deployments_plugin.backends.base import BackendStatusUpdate, DeploymentBackend +from nemo_deployments_plugin.backends.registry import ExecutorNotFoundError, ExecutorRegistry +from nemo_deployments_plugin.config import ControllerConfig +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL +from nemo_deployments_plugin.entities import Deployment, DeploymentConfig, StatusEvent, Volume +from nemo_deployments_plugin.reconciler.drift_recovery import DriftRecoveryCache, DriftRecoveryLimits, RecoveryAction +from nemo_deployments_plugin.reconciler.prerequisite import PrerequisiteResult, prerequisites_met +from nemo_deployments_plugin.reconciler.volume_mounts import VolumeMountResult, volume_mounts_ready +from nemo_platform_plugin.entity_client import NemoEntitiesClient, NemoEntityConflictError, NemoEntityNotFoundError + +logger = logging.getLogger(__name__) + + +def deployment_id(deployment: Deployment) -> str: + return f"{deployment.workspace}/{deployment.name}" + + +class DeploymentReconciler: + """Reconciles Deployment entities with substrate backends.""" + + def __init__( + self, + entities: NemoEntitiesClient, + registry: ExecutorRegistry, + controller_config: ControllerConfig, + ) -> None: + self._entities = entities + self._registry = registry + self._controller_config = controller_config + self._drift_cache = DriftRecoveryCache() + self._config_cache: dict[tuple[str, str], DeploymentConfig] = {} + + def set_config_cache(self, configs: dict[tuple[str, str], DeploymentConfig]) -> None: + self._config_cache = configs + + def _drift_limits(self, config: DeploymentConfig) -> DriftRecoveryLimits: + policy = config.drift_recovery + ctrl = self._controller_config + return DriftRecoveryLimits( + max_attempts=policy.max_attempts if policy.max_attempts is not None else ctrl.drift_recovery_max_attempts, + base_delay_seconds=( + policy.base_delay_seconds + if policy.base_delay_seconds is not None + else ctrl.drift_recovery_base_delay_seconds + ), + max_delay_seconds=( + policy.max_delay_seconds + if policy.max_delay_seconds is not None + else ctrl.drift_recovery_max_delay_seconds + ), + ) + + def resolve_backend(self, deployment: Deployment) -> DeploymentBackend: + return self._registry.resolve(deployment.executor) + + async def _resolve_backend_or_fail(self, deployment: Deployment) -> DeploymentBackend | None: + try: + return self.resolve_backend(deployment) + except ExecutorNotFoundError as exc: + await self._project_failure(deployment, f"No executor available: {exc}") + return None + + def _try_resolve_backend(self, deployment: Deployment) -> DeploymentBackend | None: + try: + return self.resolve_backend(deployment) + except ExecutorNotFoundError: + return None + + async def reconcile_one( + self, + deployment: Deployment, + *, + deployments_by_config: dict[tuple[str, str], Deployment], + deployments_by_name: dict[tuple[str, str], Deployment], + volumes_by_name: dict[tuple[str, str], Volume], + ) -> None: + if deployment.desired_state == "STOPPED" or deployment.status == "DELETING": + await self._reconcile_delete(deployment) + return + + config_key = (deployment.workspace, deployment.deployment_config_name) + config = self._config_cache.get(config_key) + if config is None: + try: + config = await self._entities.get( + DeploymentConfig, + name=deployment.deployment_config_name, + workspace=deployment.workspace, + ) + self._config_cache[config_key] = config + except NemoEntityNotFoundError: + await self._project_failure( + deployment, f"DeploymentConfig '{deployment.deployment_config_name}' not found" + ) + return + + if deployment.status == "PENDING": + prereq = prerequisites_met( + deployment, + config, + deployments_by_config=deployments_by_config, + deployments_by_name=deployments_by_name, + ) + if not prereq.met: + if _prerequisite_failed(prereq, deployments_by_config, deployments_by_name, deployment): + await self._project_failure(deployment, prereq.reason) + return + await self._project_pending(deployment, prereq.reason) + return + + mount_result = volume_mounts_ready(config, deployment.workspace, volumes_by_name) + if not mount_result.ready: + if mount_result.blocking_volume and _volume_mount_failed( + mount_result, volumes_by_name, deployment.workspace + ): + await self._project_failure(deployment, mount_result.reason) + return + await self._project_pending(deployment, mount_result.reason) + return + + backend = await self._resolve_backend_or_fail(deployment) + if backend is None: + return + + if deployment.status == "PENDING": + await self._reconcile_create(deployment, config, backend) + return + + if deployment.status in ("STARTING", "READY", "LOST"): + status_update = await backend.read_status(workspace=deployment.workspace, name=deployment.name) + if status_update.status == "LOST": + await self._handle_drift(deployment, config, backend) + return + if status_update.status in ("READY", "SUCCEEDED"): + self._drift_cache.remove(deployment_id(deployment)) + await self._project_status(deployment, status_update) + return + + async def _reconcile_create( + self, + deployment: Deployment, + config: DeploymentConfig, + backend: DeploymentBackend, + ) -> None: + dep_id = deployment_id(deployment) + labels = {**config.labels, "managed-by": MANAGED_BY_LABEL} + try: + status_update = await backend.create_deployment( + workspace=deployment.workspace, + name=deployment.name, + config_name=config.name, + labels=labels, + backend_config=config.backend_config.model_dump(by_alias=True, exclude_none=True), + ) + logger.info("Created deployment %s: %s", dep_id, status_update.status) + await self._project_status(deployment, status_update) + except Exception as exc: + logger.exception("Failed to create deployment %s", dep_id) + await self._project_failure(deployment, f"Failed to create deployment: {exc}") + + async def _reconcile_delete(self, deployment: Deployment) -> None: + dep_id = deployment_id(deployment) + self._drift_cache.remove(dep_id) + backend = self._try_resolve_backend(deployment) + if deployment.status != "DELETING": + await self._project_status( + deployment, + BackendStatusUpdate(status="DELETING", status_message="Stopping deployment"), + ) + + if backend is not None: + try: + await backend.delete_deployment(deployment.workspace, deployment.name) + except Exception: + logger.warning("Backend delete failed for %s", dep_id, exc_info=True) + else: + logger.warning("No executor for delete of %s — removing entity only", dep_id) + + try: + await self._entities.delete(Deployment, name=deployment.name, workspace=deployment.workspace) + logger.info("Deleted deployment entity %s", dep_id) + except NemoEntityNotFoundError: + logger.debug("Deployment entity %s already deleted", dep_id) + except NemoEntityConflictError: + raise + except Exception: + logger.exception("Failed to delete deployment entity %s", dep_id) + + async def _handle_drift( + self, + deployment: Deployment, + config: DeploymentConfig, + backend: DeploymentBackend, + ) -> None: + dep_id = deployment_id(deployment) + if config.drift_recovery.action == "ignore": + await self._project_status( + deployment, + BackendStatusUpdate(status="LOST", status_message="Substrate resource lost (drift recovery ignored)"), + ) + return + + if config.restart_policy != "Always": + await self._project_failure(deployment, "Substrate resource lost for non-Always deployment") + return + + cache = self._drift_cache + limits = self._drift_limits(config) + cache.add(dep_id) + match cache.should_recover(dep_id, limits): + case RecoveryAction.EXHAUSTED: + attempts = cache.get_attempts(dep_id) + await self._project_failure( + deployment, + f"Drift recovery failed after {attempts} attempts. Manual intervention required.", + ) + return + case RecoveryAction.BACKOFF: + logger.debug("Drift recovery for %s in backoff period", dep_id) + return + case RecoveryAction.PROCEED: + pass + + attempt = cache.add_attempt(dep_id) + logger.info( + "Drift recovery for %s (attempt %d/%d)", + dep_id, + attempt, + limits.max_attempts, + ) + labels = {**config.labels, "managed-by": MANAGED_BY_LABEL} + try: + status_update = await backend.create_deployment( + workspace=deployment.workspace, + name=deployment.name, + config_name=config.name, + labels=labels, + backend_config=config.backend_config.model_dump(by_alias=True, exclude_none=True), + ) + message = ( + f"Recovering deployment — backend resources recreated " + f"(attempt {attempt}/{limits.max_attempts}). {status_update.status_message}" + ) + status_update = status_update.model_copy(update={"status_message": message}) + await self._project_status(deployment, status_update) + except Exception as exc: + logger.exception("Drift recovery failed for %s", dep_id) + await self._project_status( + deployment, + BackendStatusUpdate( + status="LOST", + status_message=(f"Recovery attempt {attempt}/{limits.max_attempts} failed: {exc}. Will retry."), + ), + ) + + async def _project_pending(self, deployment: Deployment, message: str) -> None: + if deployment.status == "PENDING" and deployment.status_message == message: + return + await self._project_status( + deployment, + BackendStatusUpdate(status="PENDING", status_message=message), + ) + + async def _project_failure(self, deployment: Deployment, message: str) -> None: + await self._project_status( + deployment, + BackendStatusUpdate(status="FAILED", status_message=message), + ) + + async def _project_status(self, deployment: Deployment, update: BackendStatusUpdate) -> None: + if ( + deployment.status == update.status + and deployment.status_message == update.status_message + and deployment.endpoints == update.endpoints + and deployment.exit_code == update.exit_code + and deployment.error_details == update.error_details + ): + return + + deployment.status = update.status + deployment.status_message = update.status_message + deployment.endpoints = update.endpoints + deployment.exit_code = update.exit_code + deployment.error_details = update.error_details + deployment.status_history.append( + StatusEvent( + status=update.status, + message=update.status_message, + timestamp=datetime.now(timezone.utc).isoformat(), + ) + ) + await self._save(deployment) + + async def _save(self, deployment: Deployment) -> None: + await self._entities.update(deployment) + + +def _prerequisite_failed( + result: PrerequisiteResult, + deployments_by_config: dict[tuple[str, str], Deployment], + deployments_by_name: dict[tuple[str, str], Deployment], + deployment: Deployment, +) -> bool: + if result.blocking_prerequisite is None: + return "failed" in result.reason.lower() + workspace = deployment.workspace + target = deployments_by_config.get((workspace, result.blocking_prerequisite)) + if target is None: + target = deployments_by_name.get((workspace, result.blocking_prerequisite)) + return target is not None and target.status == "FAILED" + + +def _volume_mount_failed( + result: VolumeMountResult, + volumes_by_name: dict[tuple[str, str], Volume], + workspace: str, +) -> bool: + if result.blocking_volume is None: + return "failed" in result.reason.lower() + volume = volumes_by_name.get((workspace, result.blocking_volume)) + return volume is not None and volume.status == "FAILED" diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/drift_recovery.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/drift_recovery.py new file mode 100644 index 0000000000..818c5a653a --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/drift_recovery.py @@ -0,0 +1,72 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Drift recovery backoff tracking for lost deployments.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum, auto + + +class RecoveryAction(Enum): + """Result of checking whether drift recovery should proceed.""" + + PROCEED = auto() + BACKOFF = auto() + EXHAUSTED = auto() + + +@dataclass(frozen=True) +class DriftRecoveryLimits: + max_attempts: int + base_delay_seconds: int + max_delay_seconds: int + + +@dataclass +class DriftRecoveryState: + attempts: int = 0 + last_attempt_at: datetime | None = None + + +class DriftRecoveryCache: + """Tracks drift recovery attempts with exponential backoff.""" + + def __init__(self) -> None: + self._states: dict[str, DriftRecoveryState] = {} + + def add(self, deployment_id: str) -> None: + if deployment_id not in self._states: + self._states[deployment_id] = DriftRecoveryState() + + def remove(self, deployment_id: str) -> None: + self._states.pop(deployment_id, None) + + def should_recover(self, deployment_id: str, limits: DriftRecoveryLimits) -> RecoveryAction: + state = self._states.get(deployment_id) + if state is None: + return RecoveryAction.PROCEED + if state.attempts >= limits.max_attempts: + return RecoveryAction.EXHAUSTED + if state.last_attempt_at: + backoff_seconds = min( + limits.base_delay_seconds * (2 ** max(state.attempts - 1, 0)), + limits.max_delay_seconds, + ) + elapsed = (datetime.now(timezone.utc) - state.last_attempt_at).total_seconds() + if elapsed < backoff_seconds: + return RecoveryAction.BACKOFF + return RecoveryAction.PROCEED + + def add_attempt(self, deployment_id: str) -> int: + self.add(deployment_id) + state = self._states[deployment_id] + state.attempts += 1 + state.last_attempt_at = datetime.now(timezone.utc) + return state.attempts + + def get_attempts(self, deployment_id: str) -> int: + state = self._states.get(deployment_id) + return state.attempts if state else 0 diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/listing.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/listing.py new file mode 100644 index 0000000000..700d086f07 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/listing.py @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Paginated entity listing for reconcile loops.""" + +from __future__ import annotations + +from typing import TypeVar + +from nemo_deployments_plugin.entities import Deployment +from nemo_platform_plugin.entity import NemoEntity +from nemo_platform_plugin.entity_client import NemoEntitiesClient, NemoEntityNotFoundError +from nemo_platform_plugin.filter_ops import ComparisonOperation, FilterOperator + +EntityT = TypeVar("EntityT", bound=NemoEntity) + +DEFAULT_LIST_PAGE_SIZE = 100 + + +async def list_all_pages( + entities: NemoEntitiesClient, + entity_type: type[EntityT], + *, + workspace: str = "-", + page_size: int = DEFAULT_LIST_PAGE_SIZE, + filter_operation: ComparisonOperation | None = None, +) -> list[EntityT]: + """Fetch all pages for a cross-workspace entity list query.""" + collected: list[EntityT] = [] + page = 1 + while True: + result = await entities.list( + entity_type, + workspace=workspace, + page=page, + page_size=page_size, + filter_operation=filter_operation, + ) + collected.extend(result.data) + pagination = result.pagination + if pagination is None or page >= pagination.total_pages: + break + page += 1 + return collected + + +async def get_deployment_for_config_name( + entities: NemoEntitiesClient, + *, + workspace: str, + config_name: str, +) -> Deployment | None: + """Resolve a Deployment entity for a DeploymentConfig name (any terminal status).""" + deployments = await list_all_pages( + entities, + Deployment, + workspace=workspace, + filter_operation=ComparisonOperation( + operator=FilterOperator.EQ, + field="deployment_config_name", + value=config_name, + ), + ) + if deployments: + return deployments[0] + + try: + dep = await entities.get(Deployment, name=config_name, workspace=workspace) + except NemoEntityNotFoundError: + return None + + if dep.deployment_config_name == config_name: + return dep + return None diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/orphan_cleanup.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/orphan_cleanup.py new file mode 100644 index 0000000000..1bccd25aca --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/orphan_cleanup.py @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Delete substrate deployments that have no corresponding entity.""" + +from __future__ import annotations + +import logging + +from nemo_deployments_plugin.backends.base import DeploymentBackend + +logger = logging.getLogger(__name__) + + +async def reconcile_orphans( + backends: list[DeploymentBackend], + known_deployment_ids: set[str], +) -> None: + """Delete backend deployments not present in the entity store.""" + for backend in backends: + try: + backend_names = await backend.list_managed_deployment_names() + except Exception: + logger.warning("Failed to list managed deployments for orphan cleanup", exc_info=True) + continue + + orphans = set(backend_names) - known_deployment_ids + for deployment_id in orphans: + parts = deployment_id.split("/", 1) + if len(parts) != 2 or not parts[0] or not parts[1]: + logger.warning("Invalid deployment id from backend: %r, skipping", deployment_id) + continue + workspace, name = parts + try: + logger.info("Deleting orphan deployment %s", deployment_id) + await backend.delete_deployment(workspace, name) + except Exception: + logger.warning("Failed to delete orphan %s", deployment_id, exc_info=True) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/prerequisite.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/prerequisite.py new file mode 100644 index 0000000000..f6efe381aa --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/prerequisite.py @@ -0,0 +1,82 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Prerequisite DAG evaluation for deployment startup gating.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from nemo_deployments_plugin.entities import Deployment, DeploymentConfig, Prerequisite + + +@dataclass(frozen=True) +class PrerequisiteResult: + met: bool + reason: str = "" + blocking_prerequisite: str | None = None + + +def _find_prerequisite_deployment( + prerequisite: Prerequisite, + deployment: Deployment, + deployments_by_config: dict[tuple[str, str], Deployment], + deployments_by_name: dict[tuple[str, str], Deployment], +) -> Deployment | None: + """Resolve the Deployment entity for a prerequisite DeploymentConfig name.""" + workspace = deployment.workspace + key_by_config = (workspace, prerequisite.deployment_name) + target = deployments_by_config.get(key_by_config) + if target is not None: + return target + key_by_name = (workspace, prerequisite.deployment_name) + target = deployments_by_name.get(key_by_name) + if target is not None and target.deployment_config_name == prerequisite.deployment_name: + return target + return None + + +def _condition_met(prerequisite: Prerequisite, target: Deployment) -> bool: + if prerequisite.condition == "ready": + return target.status == "READY" + return target.status == "SUCCEEDED" and target.exit_code == 0 + + +def prerequisites_met( + deployment: Deployment, + config: DeploymentConfig, + *, + deployments_by_config: dict[tuple[str, str], Deployment], + deployments_by_name: dict[tuple[str, str], Deployment], +) -> PrerequisiteResult: + """Evaluate DeploymentConfig.prerequisites against current deployment states.""" + if not config.prerequisites: + return PrerequisiteResult(met=True) + + for prerequisite in config.prerequisites: + target = _find_prerequisite_deployment( + prerequisite, + deployment, + deployments_by_config, + deployments_by_name, + ) + if target is None: + return PrerequisiteResult( + met=False, + reason=f"Waiting for prerequisite deployment '{prerequisite.deployment_name}'", + blocking_prerequisite=prerequisite.deployment_name, + ) + if target.status == "FAILED": + return PrerequisiteResult( + met=False, + reason=f"Prerequisite '{prerequisite.deployment_name}' failed", + blocking_prerequisite=prerequisite.deployment_name, + ) + if not _condition_met(prerequisite, target): + return PrerequisiteResult( + met=False, + reason=f"Waiting for prerequisite '{prerequisite.deployment_name}' ({prerequisite.condition})", + blocking_prerequisite=prerequisite.deployment_name, + ) + + return PrerequisiteResult(met=True) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/volume_mounts.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/volume_mounts.py new file mode 100644 index 0000000000..e7ffe2b479 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/volume_mounts.py @@ -0,0 +1,54 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Volume mount readiness gating before deployment create.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from nemo_deployments_plugin.entities import DeploymentConfig, Volume + + +@dataclass(frozen=True) +class VolumeMountResult: + ready: bool + reason: str = "" + blocking_volume: str | None = None + + +def collect_volume_mount_names(config: DeploymentConfig) -> set[str]: + """Collect unique volume names referenced by pod- and container-level mounts.""" + names = {mount.name for mount in config.volume_mounts} + for container in (*config.containers, *config.init_containers): + names.update(mount.name for mount in container.volume_mounts) + return names + + +def volume_mounts_ready( + config: DeploymentConfig, + workspace: str, + volumes_by_name: dict[tuple[str, str], Volume], +) -> VolumeMountResult: + """Return whether all mounted volumes exist and are BOUND.""" + for mount_name in sorted(collect_volume_mount_names(config)): + volume = volumes_by_name.get((workspace, mount_name)) + if volume is None: + return VolumeMountResult( + ready=False, + reason=f"Waiting for volume '{mount_name}'", + blocking_volume=mount_name, + ) + if volume.status == "FAILED": + return VolumeMountResult( + ready=False, + reason=f"Volume '{mount_name}' failed", + blocking_volume=mount_name, + ) + if volume.status != "BOUND": + return VolumeMountResult( + ready=False, + reason=f"Waiting for volume '{mount_name}' to reach BOUND (currently {volume.status})", + blocking_volume=mount_name, + ) + return VolumeMountResult(ready=True) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/volume_reconciler.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/volume_reconciler.py new file mode 100644 index 0000000000..be5cc1f051 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/reconciler/volume_reconciler.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Volume lifecycle reconciliation against DeploymentBackend.""" + +from __future__ import annotations + +import logging + +from nemo_deployments_plugin.backends.base import DeploymentBackend, VolumeStatusUpdate +from nemo_deployments_plugin.backends.registry import ExecutorNotFoundError, ExecutorRegistry +from nemo_deployments_plugin.entities import Volume +from nemo_platform_plugin.entity_client import NemoEntitiesClient, NemoEntityConflictError + +logger = logging.getLogger(__name__) + + +class VolumeReconciler: + """Reconciles Volume entities with substrate volume resources.""" + + def __init__(self, entities: NemoEntitiesClient, registry: ExecutorRegistry) -> None: + self._entities = entities + self._registry = registry + + def resolve_backend(self, volume: Volume) -> DeploymentBackend: + return self._registry.resolve(None) + + async def reconcile_one(self, volume: Volume) -> None: + try: + backend = self._registry.resolve(None) + except ExecutorNotFoundError as exc: + await self._project_status( + volume, + VolumeStatusUpdate(status="FAILED", status_message=f"No executor available: {exc}"), + ) + return + + if volume.status == "PENDING": + await self._reconcile_create(volume, backend) + elif volume.status == "BOUND": + await self._reconcile_read(volume, backend) + + async def _reconcile_create(self, volume: Volume, backend: DeploymentBackend) -> None: + backend_config = volume.backend_config.model_dump(by_alias=True, exclude_none=True) + try: + update = await backend.create_volume( + workspace=volume.workspace, + name=volume.name, + size=volume.size, + access_modes=list(volume.access_modes), + backend_config=backend_config, + ) + await self._project_status(volume, update) + logger.info("Volume %s/%s created: %s", volume.workspace, volume.name, update.status) + except NemoEntityConflictError: + raise + except Exception as exc: + logger.exception("Failed to create volume %s/%s", volume.workspace, volume.name) + await self._project_status( + volume, + VolumeStatusUpdate(status="FAILED", status_message=f"Failed to create volume: {exc}"), + ) + + async def _reconcile_read(self, volume: Volume, backend: DeploymentBackend) -> None: + try: + update = await backend.read_volume_status(workspace=volume.workspace, name=volume.name) + await self._project_status(volume, update) + except NemoEntityConflictError: + raise + except Exception as exc: + logger.exception("Failed to read volume status %s/%s", volume.workspace, volume.name) + await self._project_status( + volume, + VolumeStatusUpdate(status="FAILED", status_message=f"Failed to read volume status: {exc}"), + ) + + async def _project_status(self, volume: Volume, update: VolumeStatusUpdate) -> None: + if ( + volume.status == update.status + and volume.status_message == update.status_message + and volume.error_details == update.error_details + ): + return + volume.status = update.status + volume.status_message = update.status_message + volume.error_details = update.error_details + await self._save(volume) + + async def _save(self, volume: Volume) -> None: + try: + await self._entities.update(volume) + except NemoEntityConflictError: + raise diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/types.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/types.py index dbcc67a073..2085bbb2b1 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/types.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/types.py @@ -23,6 +23,16 @@ RestartPolicy = Literal["Always", "OnFailure", "Never"] AccessMode = Literal["ReadWriteOnce", "ReadOnlyMany", "ReadWriteMany"] DriftRecoveryAction = Literal["recreate", "ignore"] +PrerequisiteCondition = Literal["ready", "succeeded"] + +NON_TERMINAL_DEPLOYMENT_STATUSES: tuple[DeploymentStatus, ...] = ( + "PENDING", + "STARTING", + "READY", + "LOST", + "DELETING", +) +NON_TERMINAL_VOLUME_STATUSES: tuple[VolumeStatus, ...] = ("PENDING", "BOUND") class Endpoint(BaseModel): diff --git a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py new file mode 100644 index 0000000000..a473a31050 --- /dev/null +++ b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Integration tests for deployment reconciliation. + +Requires AIRCORE-756 DockerDeploymentBackend to be registered in BACKEND_CLASSES. +Until then, these tests are skipped — unit tests with MockDeploymentBackend provide coverage. +""" + +from __future__ import annotations + +import pytest + +pytestmark = pytest.mark.skip(reason="Requires DockerDeploymentBackend (AIRCORE-756)") + + +@pytest.mark.asyncio +async def test_puller_server_prerequisite_chain() -> None: + """Volume → puller (OnFailure) → server (Always + prerequisite) end-to-end.""" + raise NotImplementedError("Enable when AIRCORE-756 lands") diff --git a/plugins/nemo-deployments/tests/unit/reconciler/conftest.py b/plugins/nemo-deployments/tests/unit/reconciler/conftest.py new file mode 100644 index 0000000000..930350c106 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/reconciler/conftest.py @@ -0,0 +1,113 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared fixtures for reconciler unit tests.""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import AsyncMock + +import pytest +from nemo_deployments_plugin.backends.base import BackendStatusUpdate, DeploymentBackend, LogResult, VolumeStatusUpdate +from nemo_deployments_plugin.backends.registry import ExecutorRegistry +from nemo_deployments_plugin.config import ControllerConfig +from nemo_deployments_plugin.reconciler.deployment_reconciler import DeploymentReconciler +from nemo_deployments_plugin.reconciler.volume_reconciler import VolumeReconciler +from nemo_platform import AsyncNeMoPlatform + + +class MockDeploymentBackend(DeploymentBackend): + """Configurable stub backend for reconciler tests.""" + + def __init__( + self, + sdk: AsyncNeMoPlatform | None = None, + config: dict[str, Any] | None = None, + *, + create_status: BackendStatusUpdate | None = None, + read_status: BackendStatusUpdate | None = None, + delete_status: BackendStatusUpdate | None = None, + managed_names: list[str] | None = None, + volume_create_status: VolumeStatusUpdate | None = None, + ) -> None: + self.create_status = create_status or BackendStatusUpdate(status="STARTING", status_message="created") + self.read_status_result = read_status or BackendStatusUpdate(status="READY", status_message="running") + self.delete_status = delete_status or BackendStatusUpdate(status="SUCCEEDED", status_message="deleted") + self.managed_names = list(managed_names or []) + self.volume_create_status = volume_create_status or VolumeStatusUpdate(status="BOUND") + self.create_calls: list[dict[str, Any]] = [] + self.read_calls: list[tuple[str, str]] = [] + self.delete_calls: list[tuple[str, str]] = [] + super().__init__(sdk or AsyncMock(), config or {}) + + def shutdown(self) -> None: + pass + + async def create_deployment(self, **kwargs: Any) -> BackendStatusUpdate: + self.create_calls.append(kwargs) + return self.create_status + + async def read_status(self, *, workspace: str, name: str) -> BackendStatusUpdate: + self.read_calls.append((workspace, name)) + return self.read_status_result + + async def delete_deployment(self, workspace: str, name: str) -> BackendStatusUpdate: + self.delete_calls.append((workspace, name)) + return self.delete_status + + async def list_managed_deployment_names(self) -> list[str]: + return list(self.managed_names) + + async def get_logs(self, **kwargs: Any) -> LogResult: + return LogResult(lines=[]) + + async def create_volume(self, **kwargs: Any) -> VolumeStatusUpdate: + return self.volume_create_status + + async def read_volume_status(self, *, workspace: str, name: str) -> VolumeStatusUpdate: + return VolumeStatusUpdate(status="BOUND") + + async def delete_volume(self, workspace: str, name: str) -> VolumeStatusUpdate: + return VolumeStatusUpdate(status="RELEASED") + + +@pytest.fixture +def controller_config() -> ControllerConfig: + return ControllerConfig( + interval_seconds=5, + drift_recovery_max_attempts=3, + drift_recovery_base_delay_seconds=1, + drift_recovery_max_delay_seconds=10, + ) + + +@pytest.fixture +def mock_entities() -> AsyncMock: + client = AsyncMock() + client.update = AsyncMock(side_effect=lambda entity: entity) + return client + + +@pytest.fixture +def mock_backend() -> MockDeploymentBackend: + return MockDeploymentBackend() + + +@pytest.fixture +def executor_registry(mock_backend: MockDeploymentBackend) -> ExecutorRegistry: + return ExecutorRegistry({"default": mock_backend}, default_executor="default") + + +@pytest.fixture +def deployment_reconciler( + mock_entities: AsyncMock, + executor_registry: ExecutorRegistry, + controller_config: ControllerConfig, +) -> DeploymentReconciler: + return DeploymentReconciler(mock_entities, executor_registry, controller_config) + + +@pytest.fixture +def volume_reconciler(mock_entities: AsyncMock, executor_registry: ExecutorRegistry) -> VolumeReconciler: + return VolumeReconciler(mock_entities, executor_registry) diff --git a/plugins/nemo-deployments/tests/unit/reconciler/test_controller.py b/plugins/nemo-deployments/tests/unit/reconciler/test_controller.py new file mode 100644 index 0000000000..977340ab46 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/reconciler/test_controller.py @@ -0,0 +1,283 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from typing import cast +from unittest.mock import AsyncMock, patch + +import pytest +from helpers import list_response, make_deployment, make_deployment_config, make_volume +from nemo_deployments_plugin.backends.registry import ExecutorRegistry +from nemo_deployments_plugin.config import ControllerConfig +from nemo_deployments_plugin.controller import DeploymentsController +from nemo_deployments_plugin.entities import DeploymentConfig, Prerequisite +from nemo_platform_plugin.entity_client import NemoEntityConflictError + + +def _stub_registry() -> ExecutorRegistry: + return cast(ExecutorRegistry, type("R", (), {"all_backends": lambda self: []})()) + + +@pytest.mark.asyncio +async def test_controller_reconcile_runs_volumes_then_deployments() -> None: + ctrl = DeploymentsController() + dep = make_deployment() + vol = make_volume() + + mock_entities = AsyncMock() + mock_entities.list.side_effect = [ + list_response([dep]), + list_response([vol]), + ] + mock_entities.get.return_value = AsyncMock() + mock_entities.update = AsyncMock(side_effect=lambda e: e) + + mock_registry = _stub_registry() + + mock_dep_reconciler = AsyncMock() + mock_vol_reconciler = AsyncMock() + mock_dep_reconciler.set_config_cache = lambda configs: None + + ctrl._entities = mock_entities + ctrl._registry = mock_registry + ctrl._controller_config = ControllerConfig(orphan_cleanup_every_n_cycles=0) + ctrl._deployment_reconciler = mock_dep_reconciler + ctrl._volume_reconciler = mock_vol_reconciler + + await ctrl.reconcile() + + mock_vol_reconciler.reconcile_one.assert_awaited_once_with(vol) + mock_dep_reconciler.reconcile_one.assert_awaited_once() + call_kwargs = mock_dep_reconciler.reconcile_one.await_args.kwargs + assert "volumes_by_name" in call_kwargs + + +@pytest.mark.asyncio +async def test_controller_swallows_conflict_on_deployment() -> None: + ctrl = DeploymentsController() + dep = make_deployment() + + mock_entities = AsyncMock() + mock_entities.list.side_effect = [list_response([dep]), list_response([])] + + mock_dep_reconciler = AsyncMock() + mock_dep_reconciler.reconcile_one.side_effect = NemoEntityConflictError("conflict") + mock_dep_reconciler.set_config_cache = lambda configs: None + + ctrl._entities = mock_entities + ctrl._registry = _stub_registry() + ctrl._controller_config = ControllerConfig(orphan_cleanup_every_n_cycles=0) + ctrl._deployment_reconciler = mock_dep_reconciler + ctrl._volume_reconciler = AsyncMock() + + await ctrl.reconcile() # should not raise + + +@pytest.mark.asyncio +@patch("nemo_platform_plugin.sdk_provider.get_async_platform_sdk") +@patch("nemo_deployments_plugin.config.DeploymentsConfig.get") +async def test_controller_on_startup(mock_config_get: AsyncMock, mock_sdk: AsyncMock) -> None: + from nemo_deployments_plugin.config import DeploymentsConfig + + mock_config_get.return_value = DeploymentsConfig() + mock_sdk.return_value = AsyncMock() + + ctrl = DeploymentsController() + await ctrl.on_startup() + + assert ctrl._entities is not None + assert ctrl._registry is not None + assert ctrl.interval_seconds == 5.0 + + +@pytest.mark.asyncio +async def test_controller_unhealthy_after_list_failure() -> None: + ctrl = DeploymentsController() + ctrl._entities = AsyncMock() + ctrl._entities.list.side_effect = RuntimeError("entity store down") + + deployments = await ctrl._list_deployments() + assert deployments == [] + assert ctrl.is_healthy is False + + +@pytest.mark.asyncio +async def test_controller_unhealthy_when_deployments_fail_volumes_ok() -> None: + ctrl = DeploymentsController() + ctrl._entities = AsyncMock() + ctrl._entities.list.side_effect = RuntimeError("deployments down") + + await ctrl._list_deployments() + assert ctrl._deployments_list_ok is False + + ctrl._entities.list.side_effect = None + ctrl._entities.list.return_value = list_response([]) + await ctrl._list_volumes() + assert ctrl._volumes_list_ok is True + assert ctrl.is_healthy is False + + +@pytest.mark.asyncio +@patch("nemo_deployments_plugin.controller.reconcile_orphans") +async def test_orphan_cleanup_skipped_when_deployments_list_fails(mock_orphans: AsyncMock) -> None: + ctrl = DeploymentsController() + ctrl._entities = AsyncMock() + ctrl._entities.list.side_effect = RuntimeError("list failed") + + ctrl._registry = _stub_registry() + ctrl._controller_config = ControllerConfig(orphan_cleanup_every_n_cycles=1) + ctrl._deployment_reconciler = AsyncMock() + ctrl._deployment_reconciler.set_config_cache = lambda configs: None + ctrl._volume_reconciler = AsyncMock() + ctrl._cycle_count = 0 + + await ctrl.reconcile() + mock_orphans.assert_not_awaited() + + +@pytest.mark.asyncio +@patch("nemo_deployments_plugin.controller.reconcile_orphans") +async def test_controller_runs_orphan_cleanup_on_interval(mock_orphans: AsyncMock) -> None: + ctrl = DeploymentsController() + dep = make_deployment() + + mock_entities = AsyncMock() + mock_entities.list.side_effect = [ + list_response([dep]), + list_response([]), + list_response([dep]), + list_response([]), + ] + + mock_registry = _stub_registry() + mock_dep_reconciler = AsyncMock() + mock_dep_reconciler.set_config_cache = lambda configs: None + + ctrl._entities = mock_entities + ctrl._registry = mock_registry + ctrl._controller_config = ControllerConfig(orphan_cleanup_every_n_cycles=2) + ctrl._deployment_reconciler = mock_dep_reconciler + ctrl._volume_reconciler = AsyncMock() + + await ctrl.reconcile() + mock_orphans.assert_not_awaited() + + await ctrl.reconcile() + mock_orphans.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_controller_swallows_volume_conflict() -> None: + ctrl = DeploymentsController() + vol = make_volume() + + mock_entities = AsyncMock() + mock_entities.list.side_effect = [list_response([]), list_response([vol])] + + mock_vol_reconciler = AsyncMock() + mock_vol_reconciler.reconcile_one.side_effect = NemoEntityConflictError("conflict") + + ctrl._entities = mock_entities + ctrl._registry = _stub_registry() + ctrl._controller_config = ControllerConfig(orphan_cleanup_every_n_cycles=0) + ctrl._deployment_reconciler = AsyncMock() + ctrl._deployment_reconciler.set_config_cache = lambda configs: None + ctrl._volume_reconciler = mock_vol_reconciler + + await ctrl.reconcile() # should not raise + + +@pytest.mark.asyncio +async def test_controller_fetches_terminal_prerequisite_for_dag() -> None: + """Puller SUCCEEDED is not in the active list but must unblock server create.""" + ctrl = DeploymentsController() + puller = make_deployment("puller") + puller.deployment_config_name = "puller" + puller.status = "SUCCEEDED" + puller.exit_code = 0 + server = make_deployment("server") + server.deployment_config_name = "server" + server.status = "PENDING" + server_cfg = make_deployment_config("server") + server_cfg.prerequisites = [Prerequisite(deployment_name="puller", condition="succeeded")] + + mock_entities = AsyncMock() + mock_entities.list.side_effect = [ + list_response([server]), + list_response([]), + list_response([puller]), + ] + + async def mock_get(entity_cls, name: str, workspace: str = "default", **_kwargs: object): + if entity_cls is DeploymentConfig: + if name == "server": + return server_cfg + return make_deployment_config(name, workspace) + raise AssertionError(f"unexpected get: {entity_cls}") + + mock_entities.get.side_effect = mock_get + + mock_dep_reconciler = AsyncMock() + mock_dep_reconciler.set_config_cache = lambda configs: None + + ctrl._entities = mock_entities + ctrl._registry = _stub_registry() + ctrl._controller_config = ControllerConfig(orphan_cleanup_every_n_cycles=0) + ctrl._deployment_reconciler = mock_dep_reconciler + ctrl._volume_reconciler = AsyncMock() + + await ctrl.reconcile() + + mock_entities.get.assert_awaited() + call_args = mock_dep_reconciler.reconcile_one.await_args + by_name = call_args.kwargs["deployments_by_name"] + assert ("default", "puller") in by_name + assert by_name[("default", "puller")].status == "SUCCEEDED" + + +@pytest.mark.asyncio +async def test_controller_fetches_prerequisite_when_deployment_name_differs_from_config() -> None: + """Prerequisite references DeploymentConfig name; entity name may differ.""" + ctrl = DeploymentsController() + puller = make_deployment("puller-run-1") + puller.deployment_config_name = "puller" + puller.status = "SUCCEEDED" + puller.exit_code = 0 + server = make_deployment("server") + server.deployment_config_name = "server" + server.status = "PENDING" + server_cfg = make_deployment_config("server") + server_cfg.prerequisites = [Prerequisite(deployment_name="puller", condition="succeeded")] + + mock_entities = AsyncMock() + mock_entities.list.side_effect = [ + list_response([server]), + list_response([]), + list_response([puller]), + ] + + async def mock_get(entity_cls, name: str, workspace: str = "default", **_kwargs: object): + if entity_cls is DeploymentConfig: + if name == "server": + return server_cfg + return make_deployment_config(name, workspace) + raise AssertionError(f"unexpected get: {entity_cls} {name}") + + mock_entities.get.side_effect = mock_get + + mock_dep_reconciler = AsyncMock() + mock_dep_reconciler.set_config_cache = lambda configs: None + + ctrl._entities = mock_entities + ctrl._registry = _stub_registry() + ctrl._controller_config = ControllerConfig(orphan_cleanup_every_n_cycles=0) + ctrl._deployment_reconciler = mock_dep_reconciler + ctrl._volume_reconciler = AsyncMock() + + await ctrl.reconcile() + + call_args = mock_dep_reconciler.reconcile_one.await_args + by_config = call_args.kwargs["deployments_by_config"] + assert ("default", "puller") in by_config + assert by_config[("default", "puller")].name == "puller-run-1" diff --git a/plugins/nemo-deployments/tests/unit/reconciler/test_deployment_reconciler.py b/plugins/nemo-deployments/tests/unit/reconciler/test_deployment_reconciler.py new file mode 100644 index 0000000000..3325090d72 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/reconciler/test_deployment_reconciler.py @@ -0,0 +1,478 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import pytest +from helpers import make_deployment, make_deployment_config +from nemo_deployments_plugin.backends.base import BackendStatusUpdate +from nemo_deployments_plugin.entities import Prerequisite, Volume +from nemo_deployments_plugin.reconciler.deployment_reconciler import DeploymentReconciler +from nemo_deployments_plugin.reconciler.orphan_cleanup import reconcile_orphans +from nemo_platform_plugin.entity_client import NemoEntityConflictError, NemoEntityNotFoundError +from reconciler.conftest import MockDeploymentBackend + +NO_VOLUMES: dict[tuple[str, str], Volume] = {} + + +@pytest.mark.asyncio +async def test_pending_creates_deployment( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, + mock_entities: AsyncMock, +) -> None: + dep = make_deployment() + cfg = make_deployment_config() + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + + assert len(mock_backend.create_calls) == 1 + assert dep.status == "STARTING" + mock_entities.update.assert_awaited() + + +@pytest.mark.asyncio +async def test_prerequisite_gating_stays_pending( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + dep = make_deployment("server") + dep.deployment_config_name = "server" + cfg = make_deployment_config("server") + cfg.prerequisites = [Prerequisite(deployment_name="puller", condition="succeeded")] + deployment_reconciler.set_config_cache({("default", "server"): cfg}) + + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + + assert dep.status == "PENDING" + assert "Waiting" in dep.status_message + assert mock_backend.create_calls == [] + + +@pytest.mark.asyncio +async def test_prerequisite_failure_marks_parent_failed( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + puller = make_deployment("puller") + puller.deployment_config_name = "puller" + puller.status = "FAILED" + server = make_deployment("server") + server.deployment_config_name = "server" + cfg = make_deployment_config("server") + cfg.prerequisites = [Prerequisite(deployment_name="puller")] + deployment_reconciler.set_config_cache({("default", "server"): cfg}) + by_name = {("default", "puller"): puller} + by_config = {("default", "puller"): puller} + + await deployment_reconciler.reconcile_one( + server, + deployments_by_config=by_config, + deployments_by_name=by_name, + volumes_by_name=NO_VOLUMES, + ) + + assert server.status == "FAILED" + assert mock_backend.create_calls == [] + + +@pytest.mark.asyncio +async def test_ready_monitoring( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + dep = make_deployment() + dep.status = "READY" + cfg = make_deployment_config() + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + mock_backend.read_status_result = BackendStatusUpdate(status="READY", status_message="healthy") + + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + + assert mock_backend.read_calls == [("default", "dep1")] + assert dep.status == "READY" + + +@pytest.mark.asyncio +async def test_on_failure_succeeded_terminal( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + dep = make_deployment() + dep.status = "STARTING" + cfg = make_deployment_config() + cfg.restart_policy = "OnFailure" + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + mock_backend.read_status_result = BackendStatusUpdate( + status="SUCCEEDED", + status_message="completed", + exit_code=0, + ) + + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + + assert dep.status == "SUCCEEDED" + assert dep.exit_code == 0 + + +@pytest.mark.asyncio +async def test_desired_stopped_deletes( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, + mock_entities: AsyncMock, +) -> None: + dep = make_deployment() + dep.desired_state = "STOPPED" + cfg = make_deployment_config() + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + + assert mock_backend.delete_calls == [("default", "dep1")] + mock_entities.delete.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_delete_proceeds_when_config_missing( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, + mock_entities: AsyncMock, +) -> None: + dep = make_deployment() + dep.desired_state = "STOPPED" + mock_entities.get.side_effect = NemoEntityNotFoundError("missing") + + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + + assert mock_backend.delete_calls == [("default", "dep1")] + mock_entities.delete.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_drift_recovery_recreate( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + dep = make_deployment() + dep.status = "LOST" + cfg = make_deployment_config() + cfg.restart_policy = "Always" + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + mock_backend.read_status_result = BackendStatusUpdate(status="LOST", status_message="missing") + mock_backend.create_status = BackendStatusUpdate(status="STARTING", status_message="recreated") + + dep.status = "LOST" + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + + assert len(mock_backend.create_calls) == 1 + assert dep.status == "STARTING" + assert "Recovering" in dep.status_message + + +@pytest.mark.asyncio +async def test_drift_recovery_exhausted( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + dep = make_deployment() + dep.status = "LOST" + cfg = make_deployment_config() + cfg.restart_policy = "Always" + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + deployment_reconciler._drift_cache.add_attempt("default/dep1") + deployment_reconciler._drift_cache.add_attempt("default/dep1") + deployment_reconciler._drift_cache.add_attempt("default/dep1") + + mock_backend.read_status_result = BackendStatusUpdate(status="LOST") + dep.status = "LOST" + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + + assert dep.status == "FAILED" + assert "Drift recovery failed" in dep.status_message + + +@pytest.mark.asyncio +async def test_conflict_propagates_from_save( + deployment_reconciler: DeploymentReconciler, + mock_entities: AsyncMock, +) -> None: + dep = make_deployment() + cfg = make_deployment_config() + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + mock_entities.update.side_effect = NemoEntityConflictError("conflict") + + with pytest.raises(NemoEntityConflictError): + await deployment_reconciler.reconcile_one( + dep, + deployments_by_config={}, + deployments_by_name={}, + volumes_by_name=NO_VOLUMES, + ) + + +@pytest.mark.asyncio +async def test_missing_config_marks_failed( + deployment_reconciler: DeploymentReconciler, + mock_entities: AsyncMock, +) -> None: + dep = make_deployment() + mock_entities.get.side_effect = NemoEntityNotFoundError("missing") + + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + + assert dep.status == "FAILED" + assert "DeploymentConfig" in dep.status_message + + +@pytest.mark.asyncio +async def test_executor_not_found_marks_failed( + deployment_reconciler: DeploymentReconciler, + mock_entities: AsyncMock, +) -> None: + from nemo_deployments_plugin.backends.registry import ExecutorRegistry + + empty_registry = ExecutorRegistry({}, default_executor=None) + reconciler = DeploymentReconciler(mock_entities, empty_registry, deployment_reconciler._controller_config) + dep = make_deployment() + cfg = make_deployment_config() + reconciler.set_config_cache({("default", "cfg1"): cfg}) + + await reconciler.reconcile_one( + dep, + deployments_by_config={}, + deployments_by_name={}, + volumes_by_name=NO_VOLUMES, + ) + + assert dep.status == "FAILED" + assert "executor" in dep.status_message.lower() + + +@pytest.mark.asyncio +async def test_orphan_cleanup_deletes_unknown( + mock_backend: MockDeploymentBackend, +) -> None: + mock_backend.managed_names = ["default/orphan", "default/known"] + await reconcile_orphans([mock_backend], {"default/known"}) + assert mock_backend.delete_calls == [("default", "orphan")] + + +@pytest.mark.asyncio +async def test_orphan_cleanup_skips_invalid_ids( + mock_backend: MockDeploymentBackend, +) -> None: + mock_backend.managed_names = ["default/valid", "/invalid", "ws/", "default/orphan"] + await reconcile_orphans([mock_backend], {"default/valid"}) + assert mock_backend.delete_calls == [("default", "orphan")] + + +@pytest.mark.asyncio +async def test_on_failure_failed_exit( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + dep = make_deployment() + dep.status = "STARTING" + cfg = make_deployment_config() + cfg.restart_policy = "OnFailure" + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + mock_backend.read_status_result = BackendStatusUpdate( + status="FAILED", + status_message="exit 1", + exit_code=1, + ) + + await deployment_reconciler.reconcile_one( + dep, + deployments_by_config={}, + deployments_by_name={}, + volumes_by_name=NO_VOLUMES, + ) + + assert dep.status == "FAILED" + assert dep.exit_code == 1 + + +@pytest.mark.asyncio +async def test_drift_recovery_ignore( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + dep = make_deployment() + dep.status = "LOST" + cfg = make_deployment_config() + cfg.restart_policy = "Always" + cfg.drift_recovery.action = "ignore" + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + mock_backend.read_status_result = BackendStatusUpdate(status="LOST", status_message="gone") + + await deployment_reconciler.reconcile_one( + dep, + deployments_by_config={}, + deployments_by_name={}, + volumes_by_name=NO_VOLUMES, + ) + + assert dep.status == "LOST" + assert "ignored" in dep.status_message.lower() + assert mock_backend.create_calls == [] + + +@pytest.mark.asyncio +async def test_drift_recovery_backoff_skips_recreate( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + dep = make_deployment() + dep.status = "LOST" + cfg = make_deployment_config() + cfg.restart_policy = "Always" + cfg.drift_recovery.base_delay_seconds = 3600 + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + deployment_reconciler._drift_cache.add_attempt("default/dep1") + mock_backend.read_status_result = BackendStatusUpdate(status="LOST") + + await deployment_reconciler.reconcile_one( + dep, + deployments_by_config={}, + deployments_by_name={}, + volumes_by_name=NO_VOLUMES, + ) + + assert mock_backend.create_calls == [] + assert dep.status == "LOST" + + +@pytest.mark.asyncio +async def test_volume_mount_gating( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + from helpers import make_volume + from nemo_deployments_plugin.entities import VolumeMount + + dep = make_deployment() + cfg = make_deployment_config() + cfg.volume_mounts = [VolumeMount(name="data", mountPath="/data")] + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + vol = make_volume("data") + + await deployment_reconciler.reconcile_one( + dep, + deployments_by_config={}, + deployments_by_name={}, + volumes_by_name={("default", "data"): vol}, + ) + + assert dep.status == "PENDING" + assert "volume" in dep.status_message.lower() + assert mock_backend.create_calls == [] + + +@pytest.mark.asyncio +async def test_prerequisite_ready_through_reconciler( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + worker = make_deployment("worker") + worker.deployment_config_name = "worker" + worker.status = "READY" + server = make_deployment("server") + server.deployment_config_name = "server" + cfg = make_deployment_config("server") + cfg.prerequisites = [Prerequisite(deployment_name="worker", condition="ready")] + deployment_reconciler.set_config_cache({("default", "server"): cfg}) + by_name = {("default", "worker"): worker} + by_config = {("default", "worker"): worker} + + await deployment_reconciler.reconcile_one( + server, + deployments_by_config=by_config, + deployments_by_name=by_name, + volumes_by_name=NO_VOLUMES, + ) + + assert server.status == "STARTING" + assert len(mock_backend.create_calls) == 1 + + +@pytest.mark.asyncio +async def test_drift_recovery_create_failure_stays_lost_and_backoffs( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + dep = make_deployment() + dep.status = "LOST" + cfg = make_deployment_config() + cfg.restart_policy = "Always" + deployment_reconciler.set_config_cache({("default", "cfg1"): cfg}) + mock_backend.read_status_result = BackendStatusUpdate(status="LOST", status_message="missing") + + async def failing_create(**kwargs: object) -> BackendStatusUpdate: + mock_backend.create_calls.append(kwargs) + raise RuntimeError("create failed") + + mock_backend.create_deployment = failing_create # type: ignore[method-assign] + + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + assert dep.status == "LOST" + assert "Will retry" in dep.status_message + assert len(mock_backend.create_calls) == 1 + + await deployment_reconciler.reconcile_one( + dep, deployments_by_config={}, deployments_by_name={}, volumes_by_name=NO_VOLUMES + ) + assert dep.status == "LOST" + assert dep.status != "FAILED" + assert len(mock_backend.create_calls) == 1 + + +@pytest.mark.asyncio +async def test_succeeded_prerequisite_in_index_allows_create( + deployment_reconciler: DeploymentReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + puller = make_deployment("puller") + puller.deployment_config_name = "puller" + puller.status = "SUCCEEDED" + puller.exit_code = 0 + server = make_deployment("server") + server.deployment_config_name = "server" + cfg = make_deployment_config("server") + cfg.prerequisites = [Prerequisite(deployment_name="puller", condition="succeeded")] + deployment_reconciler.set_config_cache({("default", "server"): cfg}) + by_name = {("default", "puller"): puller} + by_config = {("default", "puller"): puller} + + await deployment_reconciler.reconcile_one( + server, + deployments_by_config=by_config, + deployments_by_name=by_name, + volumes_by_name=NO_VOLUMES, + ) + + assert server.status == "STARTING" + assert len(mock_backend.create_calls) == 1 diff --git a/plugins/nemo-deployments/tests/unit/reconciler/test_drift_recovery.py b/plugins/nemo-deployments/tests/unit/reconciler/test_drift_recovery.py new file mode 100644 index 0000000000..34af17a9e4 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/reconciler/test_drift_recovery.py @@ -0,0 +1,61 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +from nemo_deployments_plugin.reconciler.drift_recovery import DriftRecoveryCache, DriftRecoveryLimits, RecoveryAction + +LIMITS = DriftRecoveryLimits(max_attempts=3, base_delay_seconds=1, max_delay_seconds=10) + + +def test_should_recover_proceed_when_untracked() -> None: + cache = DriftRecoveryCache() + assert cache.should_recover("ws/dep", LIMITS) == RecoveryAction.PROCEED + + +def test_should_recover_exhausted_at_max_attempts() -> None: + cache = DriftRecoveryCache() + cache.add_attempt("ws/dep") + cache.add_attempt("ws/dep") + cache.add_attempt("ws/dep") + assert cache.should_recover("ws/dep", LIMITS) == RecoveryAction.EXHAUSTED + + +def test_should_recover_backoff() -> None: + cache = DriftRecoveryCache() + cache.add_attempt("ws/dep") + assert ( + cache.should_recover( + "ws/dep", DriftRecoveryLimits(max_attempts=5, base_delay_seconds=60, max_delay_seconds=300) + ) + == RecoveryAction.BACKOFF + ) + + +def test_first_backoff_uses_base_delay_not_doubled() -> None: + cache = DriftRecoveryCache() + cache.add_attempt("ws/dep") + limits = DriftRecoveryLimits(max_attempts=5, base_delay_seconds=10, max_delay_seconds=300) + state = cache._states["ws/dep"] + assert state.attempts == 1 + assert cache.should_recover("ws/dep", limits) == RecoveryAction.BACKOFF + state.last_attempt_at = datetime.now(timezone.utc) - timedelta(seconds=9) + assert cache.should_recover("ws/dep", limits) == RecoveryAction.BACKOFF + state.last_attempt_at = datetime.now(timezone.utc) - timedelta(seconds=11) + assert cache.should_recover("ws/dep", limits) == RecoveryAction.PROCEED + + +def test_should_recover_proceed_after_backoff() -> None: + cache = DriftRecoveryCache() + cache.add_attempt("ws/dep") + cache._states["ws/dep"].last_attempt_at = datetime.now(timezone.utc) - timedelta(seconds=10) + assert cache.should_recover("ws/dep", LIMITS) == RecoveryAction.PROCEED + + +def test_remove_clears_state() -> None: + cache = DriftRecoveryCache() + cache.add_attempt("ws/dep") + cache.remove("ws/dep") + assert cache.get_attempts("ws/dep") == 0 diff --git a/plugins/nemo-deployments/tests/unit/reconciler/test_listing.py b/plugins/nemo-deployments/tests/unit/reconciler/test_listing.py new file mode 100644 index 0000000000..80308a73ac --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/reconciler/test_listing.py @@ -0,0 +1,45 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import pytest +from nemo_deployments_plugin.entities import Deployment +from nemo_deployments_plugin.reconciler.listing import list_all_pages +from nemo_platform_plugin.entity_client import NemoPaginationInfo +from nemo_platform_plugin.filter_ops import ComparisonOperation, FilterOperator + + +def _page(items: list[Deployment], *, page: int, total_pages: int) -> AsyncMock: + resp = AsyncMock() + resp.data = items + resp.pagination = NemoPaginationInfo( + page=page, + page_size=100, + current_page_size=len(items), + total_pages=total_pages, + total_results=len(items) if total_pages == 1 else 150, + ) + return resp + + +@pytest.mark.asyncio +async def test_list_all_pages_fetches_multiple_pages() -> None: + entities = AsyncMock() + entities.list.side_effect = [ + _page([Deployment(name="a", workspace="w", deployment_config_name="c")], page=1, total_pages=2), + _page([Deployment(name="b", workspace="w", deployment_config_name="c")], page=2, total_pages=2), + ] + result = await list_all_pages( + entities, + Deployment, + filter_operation=ComparisonOperation( + operator=FilterOperator.IN, + field="status", + value=["PENDING"], + ), + ) + assert [d.name for d in result] == ["a", "b"] + assert entities.list.await_count == 2 diff --git a/plugins/nemo-deployments/tests/unit/reconciler/test_prerequisite.py b/plugins/nemo-deployments/tests/unit/reconciler/test_prerequisite.py new file mode 100644 index 0000000000..0ff131479a --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/reconciler/test_prerequisite.py @@ -0,0 +1,82 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from helpers import make_deployment, make_deployment_config +from nemo_deployments_plugin.entities import Prerequisite +from nemo_deployments_plugin.reconciler.prerequisite import prerequisites_met + + +def test_prerequisites_met_when_empty() -> None: + dep = make_deployment() + cfg = make_deployment_config() + result = prerequisites_met(dep, cfg, deployments_by_config={}, deployments_by_name={}) + assert result.met is True + + +def test_prerequisites_waiting_for_missing() -> None: + dep = make_deployment("server") + cfg = make_deployment_config("server") + cfg.prerequisites = [Prerequisite(deployment_name="puller", condition="succeeded")] + result = prerequisites_met(dep, cfg, deployments_by_config={}, deployments_by_name={}) + assert result.met is False + assert result.blocking_prerequisite == "puller" + assert "Waiting" in result.reason + + +def test_prerequisites_succeeded_condition() -> None: + puller = make_deployment("puller") + puller.deployment_config_name = "puller" + puller.status = "SUCCEEDED" + puller.exit_code = 0 + server = make_deployment("server") + cfg = make_deployment_config("server") + cfg.prerequisites = [Prerequisite(deployment_name="puller", condition="succeeded")] + by_name = {("default", "puller"): puller} + by_config = {("default", "puller"): puller} + result = prerequisites_met(server, cfg, deployments_by_config=by_config, deployments_by_name=by_name) + assert result.met is True + + +def test_prerequisites_ready_condition() -> None: + dep = make_deployment("worker") + dep.status = "READY" + server = make_deployment("server") + cfg = make_deployment_config("server") + cfg.prerequisites = [Prerequisite(deployment_name="worker", condition="ready")] + by_name = {("default", "worker"): dep} + by_config = {("default", "worker"): dep} + result = prerequisites_met(server, cfg, deployments_by_config=by_config, deployments_by_name=by_name) + assert result.met is True + + +def test_prerequisites_failed_propagation() -> None: + puller = make_deployment("puller") + puller.status = "FAILED" + server = make_deployment("server") + cfg = make_deployment_config("server") + cfg.prerequisites = [Prerequisite(deployment_name="puller")] + by_name = {("default", "puller"): puller} + by_config = {("default", "puller"): puller} + result = prerequisites_met(server, cfg, deployments_by_config=by_config, deployments_by_name=by_name) + assert result.met is False + assert "failed" in result.reason.lower() + + +def test_prerequisites_prefer_config_index_over_name_collision() -> None: + """deployment_name is a DeploymentConfig name; do not match unrelated deployment names.""" + puller = make_deployment("puller") + puller.deployment_config_name = "puller" + puller.status = "SUCCEEDED" + puller.exit_code = 0 + collision = make_deployment("puller") + collision.deployment_config_name = "other-config" + collision.status = "PENDING" + server = make_deployment("server") + cfg = make_deployment_config("server") + cfg.prerequisites = [Prerequisite(deployment_name="puller", condition="succeeded")] + by_name = {("default", "puller"): collision} + by_config = {("default", "puller"): puller} + result = prerequisites_met(server, cfg, deployments_by_config=by_config, deployments_by_name=by_name) + assert result.met is True diff --git a/plugins/nemo-deployments/tests/unit/reconciler/test_volume_mounts.py b/plugins/nemo-deployments/tests/unit/reconciler/test_volume_mounts.py new file mode 100644 index 0000000000..e016dec67a --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/reconciler/test_volume_mounts.py @@ -0,0 +1,39 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from helpers import make_deployment_config, make_volume +from nemo_deployments_plugin.entities import Container, VolumeMount +from nemo_deployments_plugin.reconciler.volume_mounts import collect_volume_mount_names, volume_mounts_ready + + +def test_collect_volume_mount_names_from_config_and_containers() -> None: + cfg = make_deployment_config() + cfg.volume_mounts = [VolumeMount(name="data", mountPath="/data")] + cfg.containers = [ + Container( + name="main", + image="nginx", + volumeMounts=[VolumeMount(name="cache", mountPath="/cache")], + ) + ] + assert collect_volume_mount_names(cfg) == {"cache", "data"} + + +def test_volume_mounts_ready_when_all_bound() -> None: + cfg = make_deployment_config() + cfg.volume_mounts = [VolumeMount(name="data", mountPath="/data")] + vol = make_volume("data") + vol.status = "BOUND" + result = volume_mounts_ready(cfg, "default", {("default", "data"): vol}) + assert result.ready is True + + +def test_volume_mounts_wait_when_pending() -> None: + cfg = make_deployment_config() + cfg.volume_mounts = [VolumeMount(name="data", mountPath="/data")] + vol = make_volume("data") + result = volume_mounts_ready(cfg, "default", {("default", "data"): vol}) + assert result.ready is False + assert "BOUND" in result.reason diff --git a/plugins/nemo-deployments/tests/unit/reconciler/test_volume_reconciler.py b/plugins/nemo-deployments/tests/unit/reconciler/test_volume_reconciler.py new file mode 100644 index 0000000000..54b3310976 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/reconciler/test_volume_reconciler.py @@ -0,0 +1,45 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import pytest +from helpers import make_volume +from nemo_deployments_plugin.backends.base import VolumeStatusUpdate +from nemo_deployments_plugin.reconciler.volume_reconciler import VolumeReconciler +from reconciler.conftest import MockDeploymentBackend + + +@pytest.mark.asyncio +async def test_pending_volume_becomes_bound( + volume_reconciler: VolumeReconciler, + mock_backend: MockDeploymentBackend, + mock_entities: AsyncMock, +) -> None: + vol = make_volume() + mock_backend.volume_create_status = VolumeStatusUpdate(status="BOUND", status_message="ready") + + await volume_reconciler.reconcile_one(vol) + + assert vol.status == "BOUND" + mock_entities.update.assert_awaited() + + +@pytest.mark.asyncio +async def test_volume_create_failure( + volume_reconciler: VolumeReconciler, + mock_backend: MockDeploymentBackend, +) -> None: + vol = make_volume() + + async def fail(**kwargs: object) -> VolumeStatusUpdate: + raise RuntimeError("docker unavailable") + + mock_backend.create_volume = fail # type: ignore[method-assign] + + await volume_reconciler.reconcile_one(vol) + + assert vol.status == "FAILED" + assert "docker unavailable" in vol.status_message diff --git a/plugins/nemo-deployments/tests/unit/test_config.py b/plugins/nemo-deployments/tests/unit/test_config.py index 9f25478d37..c48a5c43e8 100644 --- a/plugins/nemo-deployments/tests/unit/test_config.py +++ b/plugins/nemo-deployments/tests/unit/test_config.py @@ -1,12 +1,36 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - import pytest -from nemo_deployments_plugin.config import DeploymentsConfig +from nemo_deployments_plugin.config import ControllerConfig, DeploymentsConfig def test_config_rejects_inverted_port_range() -> None: with pytest.raises(ValueError, match="port_range_end"): DeploymentsConfig.model_validate({"port_range_start": 9100, "port_range_end": 9000}) + + +def test_controller_config_defaults() -> None: + cfg = DeploymentsConfig() + assert cfg.controller.interval_seconds == 5 + assert cfg.controller.drift_recovery_max_attempts == 5 + + +def test_controller_config_custom_orphan_interval() -> None: + cfg = ControllerConfig(orphan_cleanup_every_n_cycles=12) + assert cfg.orphan_cleanup_every_n_cycles == 12 + + +def test_controller_config_rejects_non_positive_interval() -> None: + with pytest.raises(ValueError): + ControllerConfig(interval_seconds=0) + + +def test_controller_config_rejects_inverted_backoff() -> None: + with pytest.raises(ValueError, match="drift_recovery_base_delay_seconds"): + ControllerConfig(drift_recovery_base_delay_seconds=60, drift_recovery_max_delay_seconds=5) + + +def test_controller_config_allows_zero_orphan_interval_to_disable() -> None: + cfg = ControllerConfig(orphan_cleanup_every_n_cycles=0) + assert cfg.orphan_cleanup_every_n_cycles == 0 diff --git a/plugins/nemo-deployments/tests/unit/test_service_startup.py b/plugins/nemo-deployments/tests/unit/test_service_startup.py index 13318daa4d..bd8ed1a996 100644 --- a/plugins/nemo-deployments/tests/unit/test_service_startup.py +++ b/plugins/nemo-deployments/tests/unit/test_service_startup.py @@ -28,3 +28,9 @@ def test_service_mounts_core_routes() -> None: def test_service_name_matches_entry_point() -> None: assert DeploymentsService.name == "deployments" + + +def test_controller_entry_point() -> None: + from nemo_deployments_plugin.controller import DeploymentsController + + assert DeploymentsController.name == "deployments"