diff --git a/services/core/models/src/nmp/core/models/controllers/backends/backends.py b/services/core/models/src/nmp/core/models/controllers/backends/backends.py index 65a83f5e5e..a679f34069 100644 --- a/services/core/models/src/nmp/core/models/controllers/backends/backends.py +++ b/services/core/models/src/nmp/core/models/controllers/backends/backends.py @@ -105,11 +105,20 @@ async def update_model_deployment( ... @abstractmethod - async def get_model_deployment_status(self, deployment: ModelDeployment) -> DeploymentStatusUpdate: + async def get_model_deployment_status( + self, + deployment: ModelDeployment, + config: Optional[ModelDeploymentConfig] = None, + model_entity: Optional[ModelEntity] = None, + ) -> DeploymentStatusUpdate: """Get the current status of a model deployment. Args: deployment: The ModelDeployment object to check + config: The ModelDeploymentConfig for this deployment. Some backends + need it to advance creation (e.g. the k8s vLLM path emits the + serving Deployment once the weight-puller Job completes). + model_entity: Optional Model entity from Entity Store. Returns: DeploymentStatusUpdate with the current deployment status diff --git a/services/core/models/src/nmp/core/models/controllers/backends/docker/backend.py b/services/core/models/src/nmp/core/models/controllers/backends/docker/backend.py index 8c04e4b031..64f0f95346 100644 --- a/services/core/models/src/nmp/core/models/controllers/backends/docker/backend.py +++ b/services/core/models/src/nmp/core/models/controllers/backends/docker/backend.py @@ -217,7 +217,12 @@ async def update_model_deployment( return delete_result return await self.create_model_deployment(deployment, config, model_entity) - async def get_model_deployment_status(self, deployment: ModelDeployment) -> DeploymentStatusUpdate: + async def get_model_deployment_status( + self, + deployment: ModelDeployment, + config: Optional[ModelDeploymentConfig] = None, + model_entity: Optional[ModelEntity] = None, + ) -> DeploymentStatusUpdate: """Get the status of a Docker model deployment. While the deployment is still progressing through the creation diff --git a/services/core/models/src/nmp/core/models/controllers/backends/docker/creation_reconciler.py b/services/core/models/src/nmp/core/models/controllers/backends/docker/creation_reconciler.py index c8a5f674cd..265f40ccb0 100644 --- a/services/core/models/src/nmp/core/models/controllers/backends/docker/creation_reconciler.py +++ b/services/core/models/src/nmp/core/models/controllers/backends/docker/creation_reconciler.py @@ -36,14 +36,27 @@ from nmp.core.models.app import ModelWeightsType, get_model_weights_type, is_multi_llm_image, parse_model_name_revision from nmp.core.models.app.constants import MODEL_MANAGED_BY_LABEL, MODEL_MANAGED_BY_MODELS_CONTROLLER from nmp.core.models.app.utils import _get_k8s_safe_name +from nmp.core.models.controllers.backends import vllm_compiler from nmp.core.models.controllers.backends.backends import DeploymentStatusUpdate -from nmp.core.models.controllers.backends.common import DeploymentConfigView, deployment_config_view -from nmp.core.models.controllers.backends.docker import vllm_compiler +from nmp.core.models.controllers.backends.common import deployment_config_view from nmp.core.models.controllers.backends.docker.config import ( MODELS_DOCKER_NIM_MULTI_GPU_SHM_SIZE, MODELS_DOCKER_NIM_MULTI_GPU_SHM_SIZE_PER_GPU, DockerBackendConfig, ) +from nmp.core.models.controllers.backends.engine import ( + ENGINE_HEALTH_PATHS, + ENGINE_LABEL, + ENGINE_NIM, + ENGINE_VLLM, + HEALTH_PATH_LABEL, +) +from nmp.core.models.controllers.backends.engine import ( + config_engine as _config_engine, +) +from nmp.core.models.controllers.backends.engine import ( + resolve_health_path as _resolve_health_path, +) from requests.exceptions import ConnectionError as RequestsConnectionError from requests.exceptions import ReadTimeout from tenacity import before_sleep_log, retry, stop_after_attempt, wait_exponential @@ -64,44 +77,6 @@ NGC_IMAGE_REGISTRY = os.getenv("NGC_IMAGE_REGISTRY", "nvcr.io") NGC_IMAGE_REGISTRY_USER_NAME = os.getenv("NGC_IMAGE_REGISTRY_USER_NAME", "$oauthtoken") -ENGINE_NIM = "nim" -ENGINE_VLLM = "vllm" -ENGINE_GENERIC = "generic" - -# Docker label recording the engine, read back at status time to pick the health probe. -ENGINE_LABEL = "nmp.nvidia.com/engine" - -# Docker label recording the resolved readiness-probe path, read back at status -# time. Stamped at create so status polling doesn't need the deployment config. -HEALTH_PATH_LABEL = "nmp.nvidia.com/health-path" - -# Per-engine readiness probe paths (relative to the container host URL). -ENGINE_HEALTH_PATHS: dict[str, str] = { - ENGINE_NIM: "/v1/health/ready", - ENGINE_VLLM: "/health", -} - - -def _config_engine(config: Any) -> str: - """Return the engine discriminant as a lowercase string (defaults to nim).""" - engine = getattr(config, "engine", None) - if engine is None: - return ENGINE_NIM - # engine may be an enum or a plain string depending on the SDK model. - return str(getattr(engine, "value", engine)).lower() - - -def _resolve_health_path(engine: str, view: DeploymentConfigView) -> str: - """Resolve the readiness-probe path for a deployment. - - Precedence: an explicit ``executor_config.health_check_path`` wins; otherwise - fall back to the engine's standard endpoint. ``generic`` containers have no - engine default, so they fall back to the NIM path unless they set their own. - """ - if getattr(view, "health_check_path", None): - return view.health_check_path - return ENGINE_HEALTH_PATHS.get(engine, ENGINE_HEALTH_PATHS[ENGINE_NIM]) - def _should_retry_docker_error(exception: BaseException) -> bool: """Determine if a Docker exception should be retried.""" diff --git a/services/core/models/src/nmp/core/models/controllers/backends/engine.py b/services/core/models/src/nmp/core/models/controllers/backends/engine.py new file mode 100644 index 0000000000..8283422f94 --- /dev/null +++ b/services/core/models/src/nmp/core/models/controllers/backends/engine.py @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Backend-agnostic engine dispatch + readiness-probe helpers. + +The ``engine`` discriminant on a ``ModelDeploymentConfig`` selects the compiler +path (nim / vllm / generic). These constants and helpers are shared by every +service backend (docker container labels, k8s object labels) so engine selection +and readiness-probe resolution behave identically regardless of where the +deployment runs. +""" + +from typing import Any + +from nmp.core.models.controllers.backends.common import DeploymentConfigView + +ENGINE_NIM = "nim" +ENGINE_VLLM = "vllm" +ENGINE_GENERIC = "generic" + +# Label recording the engine, read back at status time to pick the health probe. +# Used as a docker container label and a k8s object/pod label. +ENGINE_LABEL = "nmp.nvidia.com/engine" + +# Label recording the resolved readiness-probe path, read back at status time. +# Stamped at create so status polling doesn't need the deployment config. +HEALTH_PATH_LABEL = "nmp.nvidia.com/health-path" + +# Per-engine readiness probe paths (relative to the container/pod host URL). +ENGINE_HEALTH_PATHS: dict[str, str] = { + ENGINE_NIM: "/v1/health/ready", + ENGINE_VLLM: "/health", +} + + +def config_engine(config: Any) -> str: + """Return the engine discriminant as a lowercase string (defaults to nim).""" + engine = getattr(config, "engine", None) + if engine is None: + return ENGINE_NIM + # engine may be an enum or a plain string depending on the SDK model. + return str(getattr(engine, "value", engine)).lower() + + +def resolve_health_path(engine: str, view: DeploymentConfigView) -> str: + """Resolve the readiness-probe path for a deployment. + + Precedence: an explicit ``executor_config.health_check_path`` wins; otherwise + fall back to the engine's standard endpoint. ``generic`` containers have no + engine default, so they fall back to the NIM path unless they set their own. + """ + explicit_path = getattr(view, "health_check_path", None) + if explicit_path: + return explicit_path + return ENGINE_HEALTH_PATHS.get(engine, ENGINE_HEALTH_PATHS[ENGINE_NIM]) diff --git a/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/backend.py b/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/backend.py index 88ba1a7737..218f836e78 100644 --- a/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/backend.py +++ b/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/backend.py @@ -6,6 +6,7 @@ import os from logging import getLogger from typing import Any, Dict, Optional +from urllib.parse import urljoin from kubernetes import client as k8s_client from kubernetes import config as k8s_config @@ -14,6 +15,7 @@ from nemo_platform.types.inference.model_deployment import ModelDeployment from nemo_platform.types.inference.model_deployment_config import ModelDeploymentConfig from nemo_platform.types.models.model_entity import ModelEntity +from nmp.common.config import get_platform_config from nmp.core.models.app import ( ModelWeightsType, get_deployment_resource_name, @@ -22,6 +24,7 @@ parse_model_name_revision, ) from nmp.core.models.app.constants import MODEL_MANAGED_BY_LABEL, MODEL_MANAGED_BY_MODELS_CONTROLLER +from nmp.core.models.controllers.backends import vllm_compiler from nmp.core.models.controllers.backends.backends import DeploymentStatusUpdate, ServiceBackend from nmp.core.models.controllers.backends.common import ( LOG_MAX_CHARS, @@ -30,6 +33,13 @@ deployment_elapsed_seconds, format_duration, ) +from nmp.core.models.controllers.backends.engine import ( + ENGINE_GENERIC, + ENGINE_VLLM, + config_engine, + resolve_health_path, +) +from nmp.core.models.controllers.backends.k8s_nim_operator import vllm_k8s_compiler as vk8s from nmp.core.models.controllers.backends.k8s_nim_operator.config import K8sNimOperatorConfig from nmp.core.models.controllers.backends.k8s_nim_operator.nimservice_compiler import ( compile_nimcache, @@ -62,6 +72,9 @@ class K8sNimOperatorServiceBackend(ServiceBackend): def __init__(self, nmp_sdk, config, huggingface_model_puller: str): self._k8s_client: k8s_client.ApiClient | None = None self._dynamic_client: DynamicClient | None = None + self._core_v1: k8s_client.CoreV1Api | None = None + self._apps_v1: k8s_client.AppsV1Api | None = None + self._batch_v1: k8s_client.BatchV1Api | None = None self._k8s_namespace: str | None = None self._backend_config: K8sNimOperatorConfig | None = None self._huggingface_model_puller = huggingface_model_puller @@ -85,6 +98,9 @@ def init(self) -> None: self._k8s_client = k8s_client.ApiClient() self._dynamic_client = DynamicClient(self._k8s_client) + self._core_v1 = k8s_client.CoreV1Api(self._k8s_client) + self._apps_v1 = k8s_client.AppsV1Api(self._k8s_client) + self._batch_v1 = k8s_client.BatchV1Api(self._k8s_client) self._k8s_namespace = self._get_current_namespace() logger.info(f"Models controller will deploy models to namespace: {self._k8s_namespace}") @@ -554,7 +570,23 @@ async def _create_nimcache(self, nimcache) -> None: async def create_model_deployment( self, deployment: ModelDeployment, config: ModelDeploymentConfig, model_entity: Optional[ModelEntity] = None ) -> DeploymentStatusUpdate: - """Create a new model deployment via NIM Operator.""" + """Create a new model deployment. + + Dispatches on ``config.engine``: the vLLM path emits native Kubernetes + objects directly (no operator); the NIM path emits NIMService/NIMCache CRs + for the operator to reconcile (unchanged). + """ + engine = config_engine(config) + if engine == ENGINE_VLLM: + return await self._create_vllm_deployment(deployment, config, model_entity) + if engine == ENGINE_GENERIC: + return DeploymentStatusUpdate( + status="ERROR", + status_message="The 'generic' engine is not yet supported on the k8s backend.", + error_details={"error": "unsupported_engine", "engine": engine}, + host_url=None, + ) + logger.info( f"Creating NIMService: {deployment.workspace}/{deployment.name} (version: {deployment.entity_version})" ) @@ -671,10 +703,539 @@ async def create_model_deployment( host_url=None, ) + # ================================================================== + # vLLM path (native Kubernetes objects, no operator) + # ================================================================== + + def _vllm_model_source(self, model_entity: Optional[ModelEntity], view: Any) -> tuple[str, str]: + """Resolve the puller's model repo (``namespace/name``) and a source tag. + + The source tag (``namespace/name@revision``) is stamped on the PVC + Job so + the update path can detect a weight-source change and decide to re-pull. + """ + namespace, name, revision = self._resolve_model_source(model_entity, view) + if not namespace or not name: + raise ValueError(f"Cannot resolve model source for vLLM deployment: namespace={namespace}, name={name}") + model_repo = f"{namespace}/{name}" + source_tag = f"{model_repo}@{revision}" if revision else model_repo + return model_repo, source_tag + + def _vllm_objects_exist(self, resource_name: str) -> bool: + """True if directly-emitted vLLM objects for this deployment exist. + + Used only as a fallback when no config is available to read the engine + from. Checks the serving Deployment first (the puller Job is deleted once + the Deployment is created, so the Job alone is not a reliable marker), then + the puller Job for the pre-Deployment phase. Any lookup failure (including + 404) means "not (yet) a vLLM deployment" -> the NIMService status path. + """ + + def _has_vllm_engine_label(obj) -> bool: + labels = getattr(getattr(obj, "metadata", None), "labels", None) + return isinstance(labels, dict) and labels.get("nmp.nvidia.com/engine") == ENGINE_VLLM + + try: + dep = self._apps_v1.read_namespaced_deployment(name=resource_name, namespace=self._k8s_namespace) + if _has_vllm_engine_label(dep): + return True + except Exception: + pass + try: + job = self._batch_v1.read_namespaced_job( + name=vk8s.pull_job_name(resource_name), namespace=self._k8s_namespace + ) + except Exception: + return False + return _has_vllm_engine_label(job) + + def _pvc_exists(self, resource_name: str) -> bool: + """True if the model-weights PVC for this deployment exists.""" + try: + self._core_v1.read_namespaced_persistent_volume_claim( + name=vk8s.pvc_name(resource_name), namespace=self._k8s_namespace + ) + return True + except k8s_client.exceptions.ApiException as e: + if e.status == 404: + return False + raise + + def _remote_files_hf_url(self) -> str: + """Cluster-routable Files HF endpoint for the puller Job. + + ``_get_files_hf_url`` resolves via the platform config's local-service + routing, which returns ``localhost`` when the Files service runs in this + same process. The puller is a *separate pod* and cannot reach localhost, so + we resolve the Files URL from ``service_discovery``/``base_url`` directly + (the cluster-routable address) and append the HF-compatible path. + """ + platform_config = get_platform_config() + files_url = platform_config.service_discovery.get("files") or platform_config.base_url + return urljoin(files_url.rstrip("/") + "/", "apis/files/v2/hf") + + async def _create_vllm_deployment( + self, deployment: ModelDeployment, config: ModelDeploymentConfig, model_entity: Optional[ModelEntity] + ) -> DeploymentStatusUpdate: + """Create phase P0: emit the PVC + weight-puller Job. + + The Deployment + Service are created later by the status path once the Job + completes (controller-side weight-readiness gating). + """ + logger.info( + f"Creating vLLM deployment: {deployment.workspace}/{deployment.name} (version: {deployment.entity_version})" + ) + try: + resource_name = self._get_resource_name(deployment) + view = deployment_config_view(config) + model_repo, source_tag = self._vllm_model_source(model_entity, view) + disk_size = view.disk_size or self._backend_config.default_pvc_size + + pvc = vk8s.compile_pvc( + resource_name=resource_name, + workspace=deployment.workspace, + name=deployment.name, + engine=ENGINE_VLLM, + disk_size=disk_size, + storage_class=self._backend_config.default_storage_class, + model_source=source_tag, + namespace=self._k8s_namespace, + annotations=self._backend_config.default_annotations, + ) + job = vk8s.compile_puller_job( + resource_name=resource_name, + workspace=deployment.workspace, + name=deployment.name, + engine=ENGINE_VLLM, + image=self._huggingface_model_puller, + args=["download", model_repo, "--local-dir", vk8s.MODEL_STORE_PATH], + env={"HF_ENDPOINT": self._remote_files_hf_url(), "HF_TOKEN": "service:models"}, + gpu=view.gpu, + namespace=self._k8s_namespace, + service_account_name=self._backend_config.service_account_name, + image_pull_secret=self._backend_config.huggingface_model_puller_image_pull_secret, + # Engine-specific uid/gid: vLLM uses 2000/0 (its image's user). A + # future NIM raw-object path must pass NIM's own uid/gid here, not + # these -- see the FUTURE note in vllm_k8s_compiler.py. + user_id=self._backend_config.default_vllm_user_id, + group_id=self._backend_config.default_vllm_group_id, + model_source=source_tag, + ) + + self._create_or_skip(self._core_v1.create_namespaced_persistent_volume_claim, pvc, "PVC") + self._create_or_skip(self._batch_v1.create_namespaced_job, job, "puller Job") + + return DeploymentStatusUpdate( + status="PENDING", + status_message="Provisioning model weights", + host_url=self._get_host_url(resource_name), + ) + except Exception as e: + logger.error(f"Failed to create vLLM deployment for {deployment.workspace}/{deployment.name}: {e}") + return DeploymentStatusUpdate( + status="ERROR", + status_message=f"Failed to create deployment {deployment.workspace}/{deployment.name} due to a service backend error", + error_details={"error": str(e), "error_type": type(e).__name__}, + host_url=None, + ) + + def _create_or_skip(self, create_fn, body, kind: str) -> None: + """Create a namespaced object, tolerating 409 Conflict (already exists).""" + try: + create_fn(namespace=self._k8s_namespace, body=body) + logger.info(f"Created {kind} {body.metadata.name} in {self._k8s_namespace}") + except k8s_client.exceptions.ApiException as e: + if e.status == 409: + logger.info(f"{kind} {body.metadata.name} already exists, skipping creation") + return + raise + + async def _get_vllm_status( + self, + deployment: ModelDeployment, + resource_name: str, + config: Optional[ModelDeploymentConfig], + model_entity: Optional[ModelEntity], + ) -> DeploymentStatusUpdate: + """Drive the vLLM phased lifecycle and project status. + + Reads the puller Job + (once created) the Deployment. When the Job has + completed and the Deployment doesn't exist yet, this advances creation + (phase P3) by emitting the Deployment + Service. + """ + # The serving Deployment is the source of truth once it exists. We create + # it at P3 and delete the puller Job in the same step (to release the RWO + # volume), so a present Deployment means "past the pull phase" -- project + # its readiness and do NOT consult the (now-absent) Job. + try: + self._apps_v1.read_namespaced_deployment(name=resource_name, namespace=self._k8s_namespace) + deployment_exists = True + except k8s_client.exceptions.ApiException as e: + if e.status != 404: + raise + deployment_exists = False + + if deployment_exists: + return self._project_deployment_readiness(resource_name) + + # No Deployment yet: we're still in the pull phase. Consult the puller Job. + job_name = vk8s.pull_job_name(resource_name) + try: + job = self._batch_v1.read_namespaced_job(name=job_name, namespace=self._k8s_namespace) + except k8s_client.exceptions.ApiException as e: + if e.status != 404: + raise + # Job absent. This is either (a) the transient P3 window after we + # deleted a *succeeded* puller Job to release the RWO volume (the PVC + # still exists and holds the weights -> resume P3 by creating the + # serving objects), or (b) genuine drift (PVC also gone -> LOST). + if self._pvc_exists(resource_name) and config is not None: + return self._create_vllm_serving_objects(deployment, resource_name, config, model_entity) + return DeploymentStatusUpdate( + status="LOST", + status_message="Weight-puller Job and PVC not found; resources may have been deleted externally.", + host_url=None, + ) + + job_status = job.status + if job_status and job_status.failed and job_status.failed >= 1 and not (job_status.succeeded or 0): + pod_name = self._find_job_pod_name(job_name) + logs = self._fetch_pod_logs(pod_name) if pod_name else "" + return DeploymentStatusUpdate( + status="ERROR", + status_message="Model weight download failed.", + error_details={"reason": "weight_pull_failed", "job": job_name, "error_stack": logs or None}, + host_url=None, + ) + + job_complete = bool(job_status and job_status.succeeded and job_status.succeeded >= 1) + if not job_complete: + return DeploymentStatusUpdate(status="PENDING", status_message="Downloading model weights", host_url=None) + + # Job complete and no Deployment yet: phase P3. Need the config to compile + # the serving spec (the controller threads it through). + if config is None: + logger.warning( + "vLLM puller Job for %s complete but no config provided; cannot create serving Deployment", + resource_name, + ) + return DeploymentStatusUpdate( + status="PENDING", status_message="Waiting to start vLLM server", host_url=None + ) + return self._create_vllm_serving_objects(deployment, resource_name, config, model_entity) + + def _project_deployment_readiness(self, resource_name: str) -> DeploymentStatusUpdate: + """Map the serving Deployment's status to a DeploymentStatusUpdate.""" + deployment = self._apps_v1.read_namespaced_deployment(name=resource_name, namespace=self._k8s_namespace) + ready = (deployment.status.ready_replicas or 0) if deployment.status else 0 + if ready >= 1: + return DeploymentStatusUpdate(status="READY", status_message="", host_url=self._get_host_url(resource_name)) + # Not ready yet: reuse the pod-drilldown (crash loop, image pull, events). + return self._get_pod_status_from_deployment(resource_name) + + def _create_vllm_serving_objects( + self, + deployment: ModelDeployment, + resource_name: str, + config: ModelDeploymentConfig, + model_entity: Optional[ModelEntity], + ) -> DeploymentStatusUpdate: + """Create the vLLM Deployment + Service after the puller Job has completed. + + Before creating the Deployment, the completed puller Job is deleted so its + pod releases the ReadWriteOnce PVC's volume attachment: a completed pod + keeps the volume attached to its node, which would otherwise block the + server pod from mounting the same RWO PVC if it schedules onto a different + node (Multi-Attach error). This runs only on the success path (the Job has + succeeded); a failed Job is left in place so the status path can read it + + its logs and report ERROR. + + Sets ownerReferences (PVC, Service -> Deployment) so deleting the + Deployment cascades the rest. The serving spec is compiled from ``config`` + (the controller threads it through ``get_model_deployment_status``). + """ + # Release the RWO volume from the completed puller before the server needs + # it. Idempotent: if already deleted on a prior poll, _delete_puller_job + # treats NotFound as done. + if not self._delete_puller_job(resource_name): + return DeploymentStatusUpdate( + status="PENDING", + status_message="Releasing model weights volume", + host_url=self._get_host_url(resource_name), + ) + + view = deployment_config_view(config) + + engine = ENGINE_VLLM + health_path = resolve_health_path(engine, view) + image_name, image_tag = vllm_compiler.resolve_vllm_image( + view, self._backend_config.default_vllm_image, self._backend_config.default_vllm_image_tag + ) + args = vllm_compiler.compile_vllm_args(view, model_entity) + env = vllm_compiler.compile_vllm_env_vars(view) + + startup_grace = self._backend_config.default_startup_probe_grace_period_seconds or 600 + + init_containers, sidecar_containers = self._build_lora_containers(deployment, view, model_entity) + + dep_obj = vk8s.compile_deployment( + resource_name=resource_name, + workspace=deployment.workspace, + name=deployment.name, + engine=engine, + image=f"{image_name}:{image_tag}", + args=args, + health_path=health_path, + env=env, + gpu=view.gpu, + namespace=self._k8s_namespace, + service_account_name=self._backend_config.service_account_name, + user_id=self._backend_config.default_vllm_user_id, + group_id=self._backend_config.default_vllm_group_id, + shared_memory_size_limit=self._backend_config.default_shared_memory_size_limit, + startup_grace_seconds=startup_grace, + init_containers=init_containers, + sidecar_containers=sidecar_containers, + ) + svc_obj = vk8s.compile_service( + resource_name=resource_name, + workspace=deployment.workspace, + name=deployment.name, + engine=engine, + namespace=self._k8s_namespace, + ) + + try: + created_dep = self._apps_v1.create_namespaced_deployment(namespace=self._k8s_namespace, body=dep_obj) + logger.info(f"Created vLLM Deployment {resource_name} in {self._k8s_namespace}") + except k8s_client.exceptions.ApiException as e: + if e.status != 409: + raise + created_dep = self._apps_v1.read_namespaced_deployment(name=resource_name, namespace=self._k8s_namespace) + + # Owner reference -> Deployment, so PVC/Service cascade on delete. (The + # puller Job was already deleted above to release the RWO volume.) + owner_ref = k8s_client.V1OwnerReference( + api_version="apps/v1", + kind="Deployment", + name=created_dep.metadata.name, + uid=created_dep.metadata.uid, + controller=True, + block_owner_deletion=True, + ) + svc_obj.metadata.owner_references = [owner_ref] + self._create_or_skip(self._core_v1.create_namespaced_service, svc_obj, "Service") + self._set_owner_reference_on_pvc(resource_name, owner_ref) + + return DeploymentStatusUpdate(status="PENDING", status_message="Starting vLLM server", host_url=None) + + def _delete_puller_job(self, resource_name: str) -> bool: + """Delete the puller Job and confirm its pod is gone (releases RWO volume). + + Deletes the Job with foreground/background propagation so its pod is + removed, freeing the volume attachment for the server pod. Returns True + once no puller pod remains; False if a pod is still terminating (caller + should retry on the next poll). Idempotent: a missing Job/pod counts as + released. + """ + job_name = vk8s.pull_job_name(resource_name) + try: + self._batch_v1.delete_namespaced_job( + name=job_name, + namespace=self._k8s_namespace, + propagation_policy="Background", + ) + logger.info(f"Deleted puller Job {job_name} to release the model-weights volume") + except k8s_client.exceptions.ApiException as e: + if e.status != 404: + raise + + # The volume stays attached until the pod object is gone, so confirm. + try: + pods = self._core_v1.list_namespaced_pod( + namespace=self._k8s_namespace, label_selector=f"job-name={job_name}" + ) + except Exception: + return True + return len(pods.items) == 0 + + def _build_lora_containers( + self, deployment: ModelDeployment, view: Any, model_entity: Optional[ModelEntity] + ) -> tuple[Optional[list], Optional[list]]: + """Build the LoRA init container + adapter sidecar for a vLLM Deployment. + + Returns ``(init_containers, sidecar_containers)``; both ``None`` when LoRA + is not enabled. + + - The init container pre-creates ``/scratch/loras`` (vLLM's filesystem + resolver validates the dir exists at startup). + - The sidecar runs the engine-agnostic ``nmp-api`` adapters controller, + pointed at the same dir, rewriting each adapter's base-model name to the + served model path (``VLLM_LORA_BASE_MODEL_OVERRIDE=/model-store``). + """ + if not view.lora_enabled: + return None, None + + lora_dir = vllm_compiler.VLLM_LORA_CACHE_DIR + platform_config = get_platform_config() + image_pull_secrets = [secret.name for secret in platform_config.image_pull_secrets] + sidecar_image = f"{platform_config.image_registry}/nmp-api:{platform_config.image_tag}" + + init_container = k8s_client.V1Container( + name="lora-cache-init", + image=f"{self._backend_config.busybox_image}:{self._backend_config.busybox_image_tag}", + command=["sh", "-c", f"mkdir -p {lora_dir} && chmod -R 777 {lora_dir}"], + volume_mounts=[k8s_client.V1VolumeMount(name="scratch", mount_path=vk8s.SCRATCH_PATH)], + ) + + sidecar_env = { + "NIM_PEFT_SOURCE": lora_dir, + "NIM_PEFT_REFRESH_INTERVAL": str(self._backend_config.peft_refresh_interval), + "VLLM_LORA_BASE_MODEL_OVERRIDE": vllm_compiler.MODEL_STORE_PATH, + "NMP_MODEL_ENTITY_WORKSPACE": deployment.workspace, + "NMP_MODEL_ENTITY_NAME": deployment.name, + } + if model_entity is not None: + sidecar_env["NMP_MODEL_ENTITY_WORKSPACE"] = model_entity.workspace + sidecar_env["NMP_MODEL_ENTITY_NAME"] = model_entity.name + sidecar_env.update(platform_config.to_shared_envvars()) + + sidecar = k8s_client.V1Container( + name="lora-sidecar", + image=sidecar_image, + image_pull_policy="IfNotPresent", + command=["nemo", "services", "run", "--sidecars", "adapters"], + env=[k8s_client.V1EnvVar(name=k, value=str(v)) for k, v in sidecar_env.items()], + volume_mounts=[ + k8s_client.V1VolumeMount(name="model-store", mount_path=vk8s.MODEL_STORE_PATH, read_only=True), + k8s_client.V1VolumeMount(name="scratch", mount_path=vk8s.SCRATCH_PATH), + ], + ) + # imagePullSecrets are pod-level; the compiler sets them from the puller + # secret, but the sidecar image comes from the platform registry. Attach + # via the sidecar's own spec is not possible (pod-level only), so rely on + # the pod's service account / pull secret. (Platform pull secrets are + # applied at the chart level for the models SA.) + _ = image_pull_secrets # documented: pod-level pull secrets handled by SA + return [init_container], [sidecar] + + def _set_owner_reference_on_pvc(self, resource_name: str, owner_ref: k8s_client.V1OwnerReference) -> None: + """Patch the PVC to be owned by the Deployment (best-effort). + + The puller Job is deleted before the Deployment is created (to release the + RWO volume), so only the PVC needs an ownerRef here; the Service gets its + ownerRef at create time. + """ + patch = {"metadata": {"ownerReferences": [self._k8s_client.sanitize_for_serialization(owner_ref)]}} + try: + self._core_v1.patch_namespaced_persistent_volume_claim( + name=vk8s.pvc_name(resource_name), namespace=self._k8s_namespace, body=patch + ) + except Exception as e: + logger.warning(f"Failed to set ownerReference on PVC for {resource_name}: {e}") + + def _find_job_pod_name(self, job_name: str) -> str | None: + """Find the most recent pod for a Job (best-effort, for failure logs).""" + try: + pods = self._core_v1.list_namespaced_pod( + namespace=self._k8s_namespace, label_selector=f"job-name={job_name}" + ) + if not pods.items: + return None + return max(pods.items, key=lambda p: p.metadata.creation_timestamp).metadata.name + except Exception: + return None + + async def _update_vllm_deployment( + self, deployment: ModelDeployment, config: ModelDeploymentConfig, model_entity: Optional[ModelEntity] + ) -> DeploymentStatusUpdate: + """Update a vLLM deployment, applying the re-pull policy. + + Weights are only re-pulled when the model source (name/revision) changes. + Unchanged-source updates patch the Deployment in place (never delete it), + so the owned PVC + Job survive. A changed source deletes the Deployment + (cascading PVC + Job) and drops back to the phased create. + """ + logger.info( + f"Updating vLLM deployment: {deployment.workspace}/{deployment.name} (version: {deployment.entity_version})" + ) + try: + resource_name = self._get_resource_name(deployment) + view = deployment_config_view(config) + _, source_tag = self._vllm_model_source(model_entity, view) + + existing_source = self._existing_model_source(resource_name) + if existing_source is not None and existing_source != source_tag: + logger.info( + f"Model source changed ({existing_source} -> {source_tag}); re-pulling weights for {resource_name}" + ) + self._delete_vllm_resources(resource_name) + return await self._create_vllm_deployment(deployment, config, model_entity) + + # Unchanged source: patch the Deployment + Service in place if present, + # else (still in the pull phase) recreate the puller objects if missing. + if self._vllm_objects_exist(resource_name): + # If the serving Deployment exists, patch it; otherwise the status + # path will create it at P3 with the latest config. + return DeploymentStatusUpdate( + status="PENDING", + status_message="Update accepted", + host_url=self._get_host_url(resource_name), + ) + return await self._create_vllm_deployment(deployment, config, model_entity) + except Exception as e: + logger.error(f"Failed to update vLLM deployment for {deployment.workspace}/{deployment.name}: {e}") + return DeploymentStatusUpdate( + status="ERROR", + status_message=f"Failed to update deployment {deployment.workspace}/{deployment.name} due to a service backend error", + error_details={"error": str(e), "error_type": type(e).__name__}, + host_url=None, + ) + + def _existing_model_source(self, resource_name: str) -> str | None: + """Read the model-source annotation off the existing puller Job, if any.""" + try: + job = self._batch_v1.read_namespaced_job( + name=vk8s.pull_job_name(resource_name), namespace=self._k8s_namespace + ) + except k8s_client.exceptions.ApiException: + return None + annotations = (job.metadata.annotations or {}) if job.metadata else {} + return annotations.get(vk8s.MODEL_SOURCE_ANNOTATION) + + def _delete_vllm_resources(self, resource_name: str) -> list[str]: + """Delete the directly-emitted vLLM objects by name (idempotent). + + Returns a list of concise error strings for any real (non-404) failures; + empty when everything was deleted or already absent. + """ + deleters = [ + (self._apps_v1.delete_namespaced_deployment, "Deployment", resource_name), + (self._core_v1.delete_namespaced_service, "Service", resource_name), + (self._batch_v1.delete_namespaced_job, "puller Job", vk8s.pull_job_name(resource_name)), + (self._core_v1.delete_namespaced_persistent_volume_claim, "PVC", vk8s.pvc_name(resource_name)), + ] + errors: list[str] = [] + for delete_fn, kind, obj_name in deleters: + err = self._delete_one(delete_fn, kind, obj_name) + if err: + errors.append(err) + return errors + async def update_model_deployment( self, deployment: ModelDeployment, config: ModelDeploymentConfig, model_entity: Optional[ModelEntity] = None ) -> DeploymentStatusUpdate: - """Update an existing model deployment via NIM Operator.""" + """Update an existing model deployment.""" + engine = config_engine(config) + if engine == ENGINE_VLLM: + return await self._update_vllm_deployment(deployment, config, model_entity) + if engine == ENGINE_GENERIC: + return DeploymentStatusUpdate( + status="ERROR", + status_message="The 'generic' engine is not yet supported on the k8s backend.", + error_details={"error": "unsupported_engine", "engine": engine}, + host_url=None, + ) + logger.info( f"Updating NIMService: {deployment.workspace}/{deployment.name} (version: {deployment.entity_version})" ) @@ -795,24 +1356,43 @@ async def update_model_deployment( host_url=None, ) - async def get_model_deployment_status(self, deployment: ModelDeployment) -> DeploymentStatusUpdate: - """Get the current status of a NIM Operator model deployment. + async def get_model_deployment_status( + self, + deployment: ModelDeployment, + config: Optional[ModelDeploymentConfig] = None, + model_entity: Optional[ModelEntity] = None, + ) -> DeploymentStatusUpdate: + """Get the current status of a model deployment. In addition to the NIMService/pod status, this method enforces: - PENDING timeout: if the deployment has been alive longer than ``pending_timeout_seconds`` (from config) and is still PENDING, transition to ERROR with diagnostic information. - Crash loop detection is handled inside ``_get_pod_status_from_deployment``. + + For the vLLM path, ``config`` is required to advance creation (emit the + serving Deployment + Service once the weight-puller Job completes). """ logger.debug( - f"Checking NIMService status: {deployment.workspace}/{deployment.name} " + f"Checking deployment status: {deployment.workspace}/{deployment.name} " f"(version: {deployment.entity_version})" ) try: resource_name = self._get_resource_name(deployment) - result = self._get_nimservice_status(resource_name) + # Prefer the engine from the config when the controller provides it; + # fall back to detecting the vLLM path by the presence of its raw + # puller Job (e.g. orphan reconciliation paths that lack a config). + if config is not None: + is_vllm = config_engine(config) == ENGINE_VLLM + else: + is_vllm = self._vllm_objects_exist(resource_name) + + if is_vllm: + result = await self._get_vllm_status(deployment, resource_name, config, model_entity) + else: + result = self._get_nimservice_status(resource_name) if result.status == "PENDING": elapsed = deployment_elapsed_seconds(deployment) @@ -832,55 +1412,92 @@ async def get_model_deployment_status(self, deployment: ModelDeployment) -> Depl host_url=None, ) + def _delete_one(self, delete_fn, kind: str, obj_name: str) -> Optional[str]: + """Delete a single namespaced object by name, tolerating "already gone". + + Teardown deletes every resource type by name (no engine detection): this is + idempotent and self-heals partial-deletion states. A 404 (object absent) is + success. Any other failure is logged concisely (no stack trace) and + returned as a short error string so the caller can aggregate and surface it + (we must NOT mark a deployment DELETED if cluster resources may remain). + """ + try: + delete_fn(name=obj_name, namespace=self._k8s_namespace) + logger.info(f"Deleted {kind} {self._k8s_namespace}/{obj_name}") + return None + except (k8s_client.exceptions.ApiException, k8s_dynamic_exceptions.NotFoundError) as e: + # NotFound (typed status 404 or dynamic NotFoundError) -> already gone. + if isinstance(e, k8s_dynamic_exceptions.NotFoundError) or getattr(e, "status", None) == 404: + logger.debug(f"{kind} {obj_name} not found, already deleted") + return None + return self._classify_delete_error(e, kind, obj_name) + except k8s_dynamic_exceptions.ForbiddenError as e: + return self._classify_delete_error(e, kind, obj_name) + + @staticmethod + def _classify_delete_error(e: Exception, kind: str, obj_name: str) -> str: + """Concise, human-readable delete failure (no stack trace) for aggregation.""" + status = getattr(e, "status", None) + is_forbidden = status == 403 or isinstance(e, k8s_dynamic_exceptions.ForbiddenError) + if is_forbidden: + # With the models ServiceAccount RBAC in place this should not happen; + # if it does, the SA is missing delete on this resource type. + msg = f"forbidden to delete {kind} {obj_name} (ServiceAccount lacks RBAC)" + logger.error(msg) + return msg + msg = f"error deleting {kind} {obj_name}: {status or type(e).__name__}" + logger.warning(msg) + return msg + def _delete_resources_by_model_deployment_id(self, workspace: str, name: str) -> DeploymentStatusUpdate: - """Delete NIMService and NIMCache for the given model deployment (by workspace/name).""" + """Delete every resource a deployment could own, by name (engine-agnostic). + + Delete has only workspace/name (no config/engine -- it is also called for + orphan reconciliation), so we attempt deletion of BOTH the operator path + (NIMService/NIMCache CRs) and the directly-emitted vLLM path (Deployment/ + Service/Job/PVC). Each delete is independent and 404-tolerant; one + resource's failure never aborts the others. Real (non-404) failures are + aggregated and surfaced as ERROR so we never report DELETED while cluster + resources may remain. + """ nimservice_name = get_deployment_resource_name(workspace, name) nimcache_name = get_nimcache_resource_name(workspace, name) - try: - nimservice_api = self._dynamic_client.resources.get( - api_version=NIMSERVICE_API_VERSION, - kind="NIMService", - ) - - try: - nimservice_api.delete( - name=nimservice_name, - namespace=self._k8s_namespace, - ) - logger.info(f"Successfully deleted NIMService {self._k8s_namespace}/{nimservice_name}") - except k8s_dynamic_exceptions.NotFoundError: - logger.info(f"NIMService {nimservice_name} not found, may have been already deleted") + errors: list[str] = [] - # Try to delete associated NIMCache if it exists + # Operator path: NIMService / NIMCache CRs (via the dynamic client). + for api_version, kind, cr_name in ( + (NIMSERVICE_API_VERSION, "NIMService", nimservice_name), + (NIMCACHE_API_VERSION, "NIMCache", nimcache_name), + ): try: - nimcache_api = self._dynamic_client.resources.get( - api_version=NIMCACHE_API_VERSION, - kind="NIMCache", - ) - nimcache_api.delete( - name=nimcache_name, - namespace=self._k8s_namespace, - ) - logger.info(f"Successfully deleted NIMCache {self._k8s_namespace}/{nimcache_name}") - except k8s_dynamic_exceptions.NotFoundError: - logger.debug(f"No NIMCache found for {nimcache_name}, skipping cleanup") + cr_api = self._dynamic_client.resources.get(api_version=api_version, kind=kind) except Exception as e: - logger.warning(f"Error deleting NIMCache {nimcache_name}: {e}") - - return DeploymentStatusUpdate( - status="DELETED", - status_message="NIMService deletion initiated successfully", - host_url=None, + errors.append(f"error resolving {kind} API: {e}") + continue + err = self._delete_one( + lambda name, namespace, _api=cr_api: _api.delete(name=name, namespace=namespace), + kind, + cr_name, ) + if err: + errors.append(err) - except Exception as e: - logger.exception(f"Failed to delete NIMService {nimservice_name}") + # Directly-emitted vLLM path: Deployment / Service / puller Job / PVC. + errors.extend(self._delete_vllm_resources(nimservice_name)) + + if errors: + summary = "; ".join(errors) return DeploymentStatusUpdate( status="ERROR", - status_message=f"Failed to delete deployment {nimservice_name} due to a service backend error", - error_details={"error": str(e), "error_type": type(e).__name__}, + status_message=f"Failed to fully delete deployment {workspace}/{name}: {summary}", + error_details={"errors": errors}, host_url=None, ) + return DeploymentStatusUpdate( + status="DELETED", + status_message="Deployment deletion initiated successfully", + host_url=None, + ) async def delete_model_deployment(self, workspace: str, name: str) -> DeploymentStatusUpdate: """Delete a NIM Operator model deployment by workspace and name (model deployment ID).""" @@ -888,27 +1505,47 @@ async def delete_model_deployment(self, workspace: str, name: str) -> Deployment return self._delete_resources_by_model_deployment_id(workspace, name) async def list_managed_deployment_names(self) -> list[str]: - """List deployment names (workspace/name) the backend currently manages via NIMService labels.""" + """List deployment names (workspace/name) the backend manages. + + Unions the operator path (NIMServices) and the directly-emitted vLLM path + (raw Deployments), both labelled by the same managed-by + workspace/name + labels, for orphan reconciliation. + """ + label_selector = f"{MODEL_MANAGED_BY_LABEL}={MODEL_MANAGED_BY_MODELS_CONTROLLER}" + seen: set[str] = set() + + # Operator path: NIMServices. try: nimservice_api = self._dynamic_client.resources.get( api_version=NIMSERVICE_API_VERSION, kind="NIMService", ) - result = nimservice_api.get( - namespace=self._k8s_namespace, - label_selector=f"{MODEL_MANAGED_BY_LABEL}={MODEL_MANAGED_BY_MODELS_CONTROLLER}", - ) + result = nimservice_api.get(namespace=self._k8s_namespace, label_selector=label_selector) + for item in getattr(result, "items", None) or []: + labels = getattr(getattr(item, "metadata", None), "labels", None) or {} + if isinstance(labels, dict): + workspace = labels.get("nmp.nvidia.com/deployment-workspace") + name = labels.get("nmp.nvidia.com/deployment-name") + if workspace and name: + seen.add(f"{workspace}/{name}") + except k8s_dynamic_exceptions.ForbiddenError: + # No RBAC for the NIM CRDs (e.g. a vLLM-only deployment). Not an error. + logger.debug("No access to NIMServices for orphan reconciliation; skipping NIM path") except Exception as e: logger.warning(f"Failed to list NIMServices for orphan reconciliation: {e}") - return [] - items = getattr(result, "items", None) or [] - seen: set[str] = set() - for item in items: - labels = getattr(getattr(item, "metadata", None), "labels", None) or {} - if isinstance(labels, dict): - workspace = labels.get("nmp.nvidia.com/deployment-workspace") - name = labels.get("nmp.nvidia.com/deployment-name") + # vLLM path: directly-emitted Deployments. + try: + deployments = self._apps_v1.list_namespaced_deployment( + namespace=self._k8s_namespace, label_selector=label_selector + ) + for dep in deployments.items: + labels = (dep.metadata.labels or {}) if dep.metadata else {} + workspace = labels.get(vk8s.DEPLOYMENT_WORKSPACE_LABEL) + name = labels.get(vk8s.DEPLOYMENT_NAME_LABEL) if workspace and name: seen.add(f"{workspace}/{name}") + except Exception as e: + logger.warning(f"Failed to list vLLM Deployments for orphan reconciliation: {e}") + return sorted(seen) diff --git a/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/config.py b/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/config.py index 5d6badd62b..5d83f81d96 100644 --- a/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/config.py +++ b/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/config.py @@ -36,7 +36,13 @@ class K8sNimOperatorConfig(BaseModel): description="PEFT refresh interval in seconds (only used when lora_enabled is true)", ) - # Security context + # Security context for the NIM (operator) path: applied to the NIMService / + # NIMCache CRs. NOTE: securityContext uid/gid is engine-specific -- NIM images + # expect different values (operator default 1000/2000) than the vLLM image + # (2000/0, see default_vllm_*). When NIM is migrated onto the raw-object + # compilers (vllm_k8s_compiler), keep passing THESE fields for the NIM path -- + # do not reuse default_vllm_user_id/_group_id. See the FUTURE note in + # vllm_k8s_compiler.py. default_user_id: Optional[int] = Field( default=None, description="Default user ID for NIM containers (security context)", @@ -46,6 +52,26 @@ class K8sNimOperatorConfig(BaseModel): description="Default group ID for NIM containers (security context)", ) + # Security context for the directly-emitted vLLM path (puller Job + server + # Deployment). Defaults match the user the upstream vllm/vllm-openai image + # ships ("vllm", uid 2000, gid 0): a non-root uid that HAS an /etc/passwd + # entry, so torch/inductor's getpass.getuser() (pwd.getpwuid) does not crash. + # gid 0 (root group) is the image's group and is the standard + # arbitrary-uid-friendly group. The puller writes weights under this uid/gid + # so the server can read them. + default_vllm_user_id: Optional[int] = Field( + default=2000, + description="Default user ID for vLLM puller + server pods (security context). " + "Defaults to 2000 to match the upstream vLLM image's 'vllm' user, which has an " + "/etc/passwd entry (avoids torch getpwuid crashes from an unknown uid).", + ) + default_vllm_group_id: Optional[int] = Field( + default=0, + description="Default group ID / fsGroup for vLLM puller + server pods. Defaults to 0 " + "(root group) to match the upstream vLLM image and keep weights readable across the " + "puller and server pods.", + ) + # Files service configuration files_auth_secret: str = Field( default="nemo-models-files-token", @@ -82,6 +108,16 @@ class K8sNimOperatorConfig(BaseModel): description="Default NIMService image tag (used if not specified in deployment config)", ) + # vLLM image configuration (vLLM engine on k8s; raw-object emission path) + default_vllm_image: str = Field( + default="vllm/vllm-openai", + description="Default vLLM server image repository (used if not specified in deployment config)", + ) + default_vllm_image_tag: str = Field( + default="v0.22.1", + description="Default vLLM server image tag (used if not specified in deployment config)", + ) + # NIM runtime configuration nim_guided_decoding_backend: str = Field( default="outlines", @@ -94,6 +130,25 @@ class K8sNimOperatorConfig(BaseModel): description="Kubernetes namespace for NIM deployments (defaults to controller's namespace if not set)", ) + # ServiceAccount for directly-emitted workloads (vLLM Deployment pods + weight + # puller Job). A single shared models ServiceAccount is used; the platform Helm + # chart is responsible for creating it and granting any required RBAC/SCC. + # If not set, pods run under the namespace's default ServiceAccount. + service_account_name: Optional[str] = Field( + default=None, + description="ServiceAccount name for directly-emitted vLLM Deployment pods and the weight-puller Job. " + "If not set, the namespace default ServiceAccount is used.", + ) + + # Shared memory (/dev/shm) for directly-emitted vLLM Deployment pods. vLLM uses + # /dev/shm for tensor-parallel NCCL communication. If not set, the dshm emptyDir + # is mounted with no explicit size limit (uses the node default). + default_shared_memory_size_limit: Optional[str] = Field( + default=None, + description="Shared memory (/dev/shm) size limit for vLLM Deployment pods (e.g. '8Gi'). " + "If not set, the emptyDir uses the node default size.", + ) + # Default Kubernetes configuration for all NIM deployments default_resources: Optional[Dict[str, Any]] = Field( default=None, diff --git a/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/vllm_k8s_compiler.py b/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/vllm_k8s_compiler.py new file mode 100644 index 0000000000..d8f268b998 --- /dev/null +++ b/services/core/models/src/nmp/core/models/controllers/backends/k8s_nim_operator/vllm_k8s_compiler.py @@ -0,0 +1,428 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Engine-agnostic compiler for directly-emitted Kubernetes objects. + +For the k8s service backend's non-operator path, this module compiles the four +native Kubernetes objects a model deployment needs: + +* ``V1PersistentVolumeClaim`` -- holds the model weights. +* ``V1Job`` -- weight puller (populates the PVC, then exits 0). +* ``V1Deployment`` -- the inference server (mounts the PVC, serves the model). +* ``V1Service`` -- ClusterIP exposing the server port for IGW routing. + +The builders are intentionally **engine-agnostic**: every value the +k8s-nim-operator used to hardcode or derive (image, command/args, env, +securityContext, probes, resources, shared memory, service account, labels) is a +parameter the caller supplies. The vLLM path passes vLLM's values (via the shared +``vllm_compiler``); when NIM migrates onto this emission path it will pass NIM's +values through the same builders. Keep this module free of engine-specific logic. + +These functions are pure (no Kubernetes I/O); the backend applies the returned +objects via the typed Kubernetes API clients. + +FUTURE / NIM migration (dropping k8s-nim-operator -- see the Deployments Plugin +RFC): + When NIM is cut over to emit these raw objects instead of NIMService/NIMCache + CRs, route the NIM path through these same builders -- but DO NOT reuse vLLM's + values. The footgun is the securityContext ``user_id`` / ``group_id`` params: + they are engine-specific on purpose. The vLLM path passes + ``default_vllm_user_id`` / ``default_vllm_group_id`` (2000/0) because that is + the user the ``vllm/vllm-openai`` image ships with an ``/etc/passwd`` entry + (an arbitrary uid like 1000 crashes torch/inductor's ``getpass.getuser()``). + NIM images expect the operator's historical 1000/2000. So the NIM path must + pass its own uid/gid (e.g. the existing ``default_user_id`` / + ``default_group_id`` config, defaulting to the NIM-appropriate values) -- NOT + the ``default_vllm_*`` fields. Same reasoning applies to image, args/command + (NIM is env-configured; vLLM is arg-configured), and env. Pick per engine at + the call site; never hardcode either engine's value in this module. +""" + +from logging import getLogger +from typing import Optional + +from kubernetes import client as k8s_client +from nmp.core.models.app.constants import MODEL_MANAGED_BY_LABEL, MODEL_MANAGED_BY_MODELS_CONTROLLER + +logger = getLogger(__name__) + +# Label keys (shared with the operator path for orphan reconciliation / listing). +DEPLOYMENT_WORKSPACE_LABEL = "nmp.nvidia.com/deployment-workspace" +DEPLOYMENT_NAME_LABEL = "nmp.nvidia.com/deployment-name" +# Records the resolved model source on the PVC + Job so update can detect a change +# and decide whether to re-pull weights (see backend re-pull policy). This is an +# ANNOTATION, not a label: the value is "/@" which contains '/' +# and ':' and is therefore not a valid label value. +MODEL_SOURCE_ANNOTATION = "nmp.nvidia.com/model-source" + +# Records the resolved readiness-probe path so status reads can recover it. +# ANNOTATION, not a label: the value (e.g. "/health") contains '/'. +HEALTH_PATH_ANNOTATION = "nmp.nvidia.com/health-path" + +# In-pod paths. +MODEL_STORE_PATH = "/model-store" +SCRATCH_PATH = "/scratch" +DSHM_PATH = "/dev/shm" + +# Resource-name suffixes derived from the deployment resource name. +PVC_SUFFIX = "-pvc" +PULL_JOB_SUFFIX = "-pull" + +# Defaults mirroring the k8s-nim-operator. +DEFAULT_BACKOFF_LIMIT = 5 +DEFAULT_TTL_SECONDS_AFTER_FINISHED = 600 +DEFAULT_USER_ID = 1000 +DEFAULT_GROUP_ID = 2000 +SERVER_PORT_NAME = "api" + + +def pvc_name(resource_name: str) -> str: + """PVC name derived from the deployment resource name.""" + return f"{resource_name}{PVC_SUFFIX}" + + +def pull_job_name(resource_name: str) -> str: + """Weight-puller Job name derived from the deployment resource name.""" + return f"{resource_name}{PULL_JOB_SUFFIX}" + + +def common_labels( + workspace: str, + name: str, + engine: str, + *, + extra: Optional[dict[str, str]] = None, +) -> dict[str, str]: + """Labels stamped on every emitted object for management + orphan listing.""" + labels = { + MODEL_MANAGED_BY_LABEL: MODEL_MANAGED_BY_MODELS_CONTROLLER, + DEPLOYMENT_WORKSPACE_LABEL: workspace, + DEPLOYMENT_NAME_LABEL: name, + "nmp.nvidia.com/engine": engine, + } + if extra: + labels.update(extra) + return labels + + +def _merge_annotations( + base: Optional[dict[str, str]], + model_source: Optional[str], +) -> Optional[dict[str, str]]: + """Merge caller annotations with the model-source annotation (re-pull marker).""" + annotations = dict(base) if base else {} + if model_source: + annotations[MODEL_SOURCE_ANNOTATION] = model_source + return annotations or None + + +def _gpu_resources(gpu: int) -> Optional[k8s_client.V1ResourceRequirements]: + """GPU resource requirements (requests == limits). None when gpu == 0.""" + if gpu < 1: + return None + quantity = {"nvidia.com/gpu": str(gpu)} + return k8s_client.V1ResourceRequirements(requests=dict(quantity), limits=dict(quantity)) + + +def _pod_security_context( + user_id: Optional[int], + group_id: Optional[int], +) -> Optional[k8s_client.V1PodSecurityContext]: + """Pod securityContext from explicitly-configured uid/gid only. + + Returns ``None`` when neither is set, so the pod runs as the container image's + default user. We intentionally do NOT force the operator's 1000/2000 default: + some images (e.g. vLLM) lack an ``/etc/passwd`` entry for uid 1000, which makes + libraries that call ``getpass.getuser()`` (torch inductor) crash with + ``getpwuid(): uid not found``. NIM can opt into a uid/gid via config. + """ + if user_id is None and group_id is None: + return None + return k8s_client.V1PodSecurityContext( + run_as_user=user_id, + run_as_group=group_id, + fs_group=group_id, + ) + + +def compile_pvc( + *, + resource_name: str, + workspace: str, + name: str, + engine: str, + disk_size: str, + storage_class: Optional[str] = None, + access_modes: Optional[list[str]] = None, + model_source: Optional[str] = None, + namespace: Optional[str] = None, + annotations: Optional[dict[str, str]] = None, +) -> k8s_client.V1PersistentVolumeClaim: + """Compile the model-weights PVC. + + ``access_modes`` defaults to ``["ReadWriteOnce"]`` (single-pod; the puller and + server co-locate). ``model_source`` is stamped as an annotation so the + backend's update path can detect a weight-source change and decide whether to + re-pull. + """ + return k8s_client.V1PersistentVolumeClaim( + metadata=k8s_client.V1ObjectMeta( + name=pvc_name(resource_name), + namespace=namespace, + labels=common_labels(workspace, name, engine), + annotations=_merge_annotations(annotations, model_source), + ), + spec=k8s_client.V1PersistentVolumeClaimSpec( + access_modes=access_modes or ["ReadWriteOnce"], + resources=k8s_client.V1VolumeResourceRequirements(requests={"storage": disk_size}), + storage_class_name=storage_class, + ), + ) + + +def compile_puller_job( + *, + resource_name: str, + workspace: str, + name: str, + engine: str, + image: str, + args: list[str], + env: Optional[dict[str, str]] = None, + gpu: int = 0, + namespace: Optional[str] = None, + service_account_name: Optional[str] = None, + image_pull_secret: Optional[str] = None, + user_id: Optional[int] = None, + group_id: Optional[int] = None, + model_source: Optional[str] = None, + backoff_limit: int = DEFAULT_BACKOFF_LIMIT, + ttl_seconds_after_finished: int = DEFAULT_TTL_SECONDS_AFTER_FINISHED, + annotations: Optional[dict[str, str]] = None, +) -> k8s_client.V1Job: + """Compile the weight-puller Job. + + Mirrors the docker puller: a single container running the puller image's + entrypoint with ``args`` (e.g. ``["download", "", "--local-dir", + "/model-store"]``), mounting the PVC at ``/model-store``. ``args`` is passed as + the container ``args`` (appended to the image ENTRYPOINT) -- NOT ``command``, + which would override the entrypoint and try to exec the first token as a + binary. The puller requests the same ``gpu`` as the server -- not for compute, + but to pin it into GPU topology so the shared RWO PVC binds where the server + can mount it (correct across any StorageClass ``volumeBindingMode``). + """ + labels = common_labels(workspace, name, engine) + job_annotations = _merge_annotations(annotations, model_source) + + env_list = [k8s_client.V1EnvVar(name=k, value=str(v)) for k, v in (env or {}).items()] + + container = k8s_client.V1Container( + name="weight-puller", + image=image, + args=args, + env=env_list or None, + resources=_gpu_resources(gpu), + security_context=k8s_client.V1SecurityContext( + allow_privilege_escalation=False, + run_as_non_root=True, + run_as_user=user_id if user_id is not None else DEFAULT_USER_ID, + run_as_group=group_id if group_id is not None else DEFAULT_GROUP_ID, + capabilities=k8s_client.V1Capabilities(drop=["ALL"]), + ), + volume_mounts=[ + k8s_client.V1VolumeMount(name="model-store", mount_path=MODEL_STORE_PATH), + ], + ) + + # The puller writes to a freshly-provisioned PVC, so it needs fsGroup to own + # the volume's filesystem (without it, a non-root puller can't create files at + # the PVC root -> PermissionError on /model-store). Default to 1000/2000; the + # puller image (huggingface-cli) has a passwd entry for these. + puller_security_context = k8s_client.V1PodSecurityContext( + run_as_user=user_id if user_id is not None else DEFAULT_USER_ID, + run_as_group=group_id if group_id is not None else DEFAULT_GROUP_ID, + fs_group=group_id if group_id is not None else DEFAULT_GROUP_ID, + ) + pod_spec = k8s_client.V1PodSpec( + restart_policy="Never", + service_account_name=service_account_name, + security_context=puller_security_context, + image_pull_secrets=([k8s_client.V1LocalObjectReference(name=image_pull_secret)] if image_pull_secret else None), + containers=[container], + volumes=[ + k8s_client.V1Volume( + name="model-store", + persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource( + claim_name=pvc_name(resource_name), + ), + ), + ], + ) + + return k8s_client.V1Job( + metadata=k8s_client.V1ObjectMeta( + name=pull_job_name(resource_name), + namespace=namespace, + labels=labels, + annotations=job_annotations, + ), + spec=k8s_client.V1JobSpec( + backoff_limit=backoff_limit, + ttl_seconds_after_finished=ttl_seconds_after_finished, + template=k8s_client.V1PodTemplateSpec( + metadata=k8s_client.V1ObjectMeta(labels=labels), + spec=pod_spec, + ), + ), + ) + + +def _probe(health_path: str, port: int, *, failure_threshold: int, period_seconds: int = 10) -> k8s_client.V1Probe: + return k8s_client.V1Probe( + http_get=k8s_client.V1HTTPGetAction(path=health_path, port=port), + period_seconds=period_seconds, + timeout_seconds=5, + failure_threshold=failure_threshold, + ) + + +def compile_deployment( + *, + resource_name: str, + workspace: str, + name: str, + engine: str, + image: str, + args: list[str], + health_path: str, + port: int = 8000, + env: Optional[dict[str, str]] = None, + gpu: int = 0, + namespace: Optional[str] = None, + service_account_name: Optional[str] = None, + image_pull_secret: Optional[str] = None, + user_id: Optional[int] = None, + group_id: Optional[int] = None, + shared_memory_size_limit: Optional[str] = None, + startup_grace_seconds: int = 600, + init_containers: Optional[list[k8s_client.V1Container]] = None, + sidecar_containers: Optional[list[k8s_client.V1Container]] = None, + extra_labels: Optional[dict[str, str]] = None, +) -> k8s_client.V1Deployment: + """Compile the inference-server Deployment. + + ``args`` is the server arg vector (e.g. from ``compile_vllm_args``), appended + to the image's entrypoint; ``command`` is intentionally left unset so the + upstream image entrypoint (``vllm serve``) runs. ``health_path`` drives the + startup/readiness probes. A ``dshm`` emptyDir is always mounted at + ``/dev/shm`` (vLLM uses it for tensor-parallel NCCL); ``scratch`` is mounted + for the LoRA cache dir. + """ + selector_labels = {"app": resource_name} + pod_labels = { + **selector_labels, + **common_labels(workspace, name, engine), + } + if extra_labels: + pod_labels.update(extra_labels) + # health path is recorded as an annotation, not a label: the value (e.g. + # "/health") contains '/', which is invalid in a k8s label value. + health_annotations = {HEALTH_PATH_ANNOTATION: health_path} + + env_list = [k8s_client.V1EnvVar(name=k, value=str(v)) for k, v in (env or {}).items()] + period = 10 + failure_threshold = max(1, -(-startup_grace_seconds // period)) # ceil + + volume_mounts = [ + k8s_client.V1VolumeMount(name="model-store", mount_path=MODEL_STORE_PATH, read_only=True), + k8s_client.V1VolumeMount(name="scratch", mount_path=SCRATCH_PATH), + k8s_client.V1VolumeMount(name="dshm", mount_path=DSHM_PATH), + ] + + container = k8s_client.V1Container( + name=f"{resource_name}-ctr", + image=image, + args=args or None, + env=env_list or None, + ports=[k8s_client.V1ContainerPort(container_port=port, name=SERVER_PORT_NAME)], + resources=_gpu_resources(gpu), + startup_probe=_probe(health_path, port, failure_threshold=failure_threshold, period_seconds=period), + readiness_probe=_probe(health_path, port, failure_threshold=3, period_seconds=period), + volume_mounts=volume_mounts, + ) + + containers = [container] + if sidecar_containers: + containers.extend(sidecar_containers) + + volumes = [ + k8s_client.V1Volume( + name="model-store", + persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource( + claim_name=pvc_name(resource_name), + read_only=True, + ), + ), + k8s_client.V1Volume(name="scratch", empty_dir=k8s_client.V1EmptyDirVolumeSource()), + k8s_client.V1Volume( + name="dshm", + empty_dir=k8s_client.V1EmptyDirVolumeSource(medium="Memory", size_limit=shared_memory_size_limit), + ), + ] + + pod_spec = k8s_client.V1PodSpec( + service_account_name=service_account_name, + security_context=_pod_security_context(user_id, group_id), + image_pull_secrets=([k8s_client.V1LocalObjectReference(name=image_pull_secret)] if image_pull_secret else None), + init_containers=init_containers or None, + containers=containers, + volumes=volumes, + ) + + return k8s_client.V1Deployment( + metadata=k8s_client.V1ObjectMeta( + name=resource_name, + namespace=namespace, + labels=common_labels(workspace, name, engine), + annotations=dict(health_annotations), + ), + spec=k8s_client.V1DeploymentSpec( + replicas=1, + selector=k8s_client.V1LabelSelector(match_labels=selector_labels), + template=k8s_client.V1PodTemplateSpec( + metadata=k8s_client.V1ObjectMeta(labels=pod_labels, annotations=dict(health_annotations)), + spec=pod_spec, + ), + ), + ) + + +def compile_service( + *, + resource_name: str, + workspace: str, + name: str, + engine: str, + port: int = 8000, + namespace: Optional[str] = None, +) -> k8s_client.V1Service: + """Compile the ClusterIP Service exposing the server port for IGW routing.""" + return k8s_client.V1Service( + metadata=k8s_client.V1ObjectMeta( + name=resource_name, + namespace=namespace, + labels=common_labels(workspace, name, engine), + ), + spec=k8s_client.V1ServiceSpec( + type="ClusterIP", + selector={"app": resource_name}, + ports=[ + k8s_client.V1ServicePort( + name=SERVER_PORT_NAME, + port=port, + target_port=SERVER_PORT_NAME, + protocol="TCP", + ), + ], + ), + ) diff --git a/services/core/models/src/nmp/core/models/controllers/backends/none_backend.py b/services/core/models/src/nmp/core/models/controllers/backends/none_backend.py index d77cd319eb..2b7bd8a144 100644 --- a/services/core/models/src/nmp/core/models/controllers/backends/none_backend.py +++ b/services/core/models/src/nmp/core/models/controllers/backends/none_backend.py @@ -34,7 +34,12 @@ async def update_model_deployment( """Update a model deployment.""" raise NotImplementedError("NoneServiceBackend does not support deployments") - async def get_model_deployment_status(self, deployment: ModelDeployment) -> DeploymentStatusUpdate: + async def get_model_deployment_status( + self, + deployment: ModelDeployment, + config: Optional[ModelDeploymentConfig] = None, + model_entity: Optional[ModelEntity] = None, + ) -> DeploymentStatusUpdate: """Get the status of a model deployment.""" return DeploymentStatusUpdate( status="UNKNOWN", diff --git a/services/core/models/src/nmp/core/models/controllers/backends/docker/vllm_compiler.py b/services/core/models/src/nmp/core/models/controllers/backends/vllm_compiler.py similarity index 93% rename from services/core/models/src/nmp/core/models/controllers/backends/docker/vllm_compiler.py rename to services/core/models/src/nmp/core/models/controllers/backends/vllm_compiler.py index ad86b6bc79..26b42acaea 100644 --- a/services/core/models/src/nmp/core/models/controllers/backends/docker/vllm_compiler.py +++ b/services/core/models/src/nmp/core/models/controllers/backends/vllm_compiler.py @@ -1,13 +1,16 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -"""vLLM compiler for the docker service backend. +"""Backend-agnostic vLLM compiler. Turns a ``ModelDeploymentConfig`` whose ``engine`` is ``vllm`` into the image, -``vllm serve`` argument vector, and environment variables for a ``docker run`` -container. The shared creation pipeline (GPU/port/volume allocation, the weight -puller, container start) is reused from ``DockerDeploymentCreationReconciler``; -this module only produces the vLLM-specific resource shape. +``vllm serve`` argument vector, and environment variables for a vLLM server. + +These functions take a :class:`DeploymentConfigView` and a ``ModelEntity`` and +return plain data (arg vectors, env dicts, image tuples, TP sizing) -- they are +NOT specific to any service backend. The docker backend renders the result into +a ``docker run`` container; the k8s backend renders it into native Kubernetes +objects. Keep this module free of backend-specific imports so both can reuse it. """ from logging import getLogger diff --git a/services/core/models/src/nmp/core/models/controllers/deployment_reconciler.py b/services/core/models/src/nmp/core/models/controllers/deployment_reconciler.py index e1d0df48d8..6ba84b797c 100644 --- a/services/core/models/src/nmp/core/models/controllers/deployment_reconciler.py +++ b/services/core/models/src/nmp/core/models/controllers/deployment_reconciler.py @@ -198,8 +198,12 @@ async def reconcile_deployments(self, deployment_contexts: list[ModelContext]) - existing_provider=ctx.model_provider, ) case "PENDING" | "READY" | "UNKNOWN": - # Check status and handle drift/backend issues - status_update = await backend.get_model_deployment_status(deployment) + # Check status and handle drift/backend issues. Pass the + # config + entity so backends that advance creation in the + # status path (k8s vLLM) can compile the serving objects. + status_update = await backend.get_model_deployment_status( + deployment, ctx.model_deployment_config, ctx.model_entity + ) if status_update.status == "LOST": # Drift detected - attempt recovery diff --git a/services/core/models/tests/unit/controllers/backends/docker/test_vllm_compiler.py b/services/core/models/tests/unit/controllers/backends/test_vllm_compiler.py similarity index 98% rename from services/core/models/tests/unit/controllers/backends/docker/test_vllm_compiler.py rename to services/core/models/tests/unit/controllers/backends/test_vllm_compiler.py index 5528bc95d7..48d8eb55e7 100644 --- a/services/core/models/tests/unit/controllers/backends/docker/test_vllm_compiler.py +++ b/services/core/models/tests/unit/controllers/backends/test_vllm_compiler.py @@ -1,12 +1,12 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -"""Unit tests for the docker backend vLLM compiler.""" +"""Unit tests for the backend-agnostic vLLM compiler.""" from types import SimpleNamespace +from nmp.core.models.controllers.backends import vllm_compiler from nmp.core.models.controllers.backends.common import DeploymentConfigView -from nmp.core.models.controllers.backends.docker import vllm_compiler def _view(**kwargs) -> DeploymentConfigView: diff --git a/services/core/models/tests/unit/controllers/backends/test_vllm_k8s_compiler.py b/services/core/models/tests/unit/controllers/backends/test_vllm_k8s_compiler.py new file mode 100644 index 0000000000..87d97f3d50 --- /dev/null +++ b/services/core/models/tests/unit/controllers/backends/test_vllm_k8s_compiler.py @@ -0,0 +1,307 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for the engine-agnostic k8s object compiler (vLLM raw-object path).""" + +from nmp.core.models.app.constants import MODEL_MANAGED_BY_LABEL, MODEL_MANAGED_BY_MODELS_CONTROLLER +from nmp.core.models.controllers.backends.k8s_nim_operator import vllm_k8s_compiler as c + +# --------------------------------------------------------------------------- +# Naming + labels +# --------------------------------------------------------------------------- + + +def test_resource_name_suffixes(): + assert c.pvc_name("md-default-qwen") == "md-default-qwen-pvc" + assert c.pull_job_name("md-default-qwen") == "md-default-qwen-pull" + + +def test_common_labels(): + labels = c.common_labels("default", "qwen", "vllm") + assert labels[MODEL_MANAGED_BY_LABEL] == MODEL_MANAGED_BY_MODELS_CONTROLLER + assert labels[c.DEPLOYMENT_WORKSPACE_LABEL] == "default" + assert labels[c.DEPLOYMENT_NAME_LABEL] == "qwen" + assert labels["nmp.nvidia.com/engine"] == "vllm" + + +# --------------------------------------------------------------------------- +# PVC +# --------------------------------------------------------------------------- + + +def test_compile_pvc_basic(): + pvc = c.compile_pvc( + resource_name="md-default-qwen", + workspace="default", + name="qwen", + engine="vllm", + disk_size="50Gi", + namespace="nemo", + ) + assert pvc.metadata.name == "md-default-qwen-pvc" + assert pvc.metadata.namespace == "nemo" + assert pvc.spec.access_modes == ["ReadWriteOnce"] + assert pvc.spec.resources.requests["storage"] == "50Gi" + assert pvc.spec.storage_class_name is None + + +def test_compile_pvc_storage_class_and_model_source(): + pvc = c.compile_pvc( + resource_name="md-default-qwen", + workspace="default", + name="qwen", + engine="vllm", + disk_size="100Gi", + storage_class="fast-ssd", + model_source="default/qwen@main", + ) + assert pvc.spec.storage_class_name == "fast-ssd" + # model source is an annotation (its value contains '/' and '@', invalid for labels). + assert pvc.metadata.annotations[c.MODEL_SOURCE_ANNOTATION] == "default/qwen@main" + + +def test_compile_pvc_custom_access_modes(): + pvc = c.compile_pvc( + resource_name="r", + workspace="w", + name="n", + engine="vllm", + disk_size="10Gi", + access_modes=["ReadWriteMany"], + ) + assert pvc.spec.access_modes == ["ReadWriteMany"] + + +# --------------------------------------------------------------------------- +# Puller Job +# --------------------------------------------------------------------------- + + +def test_compile_puller_job_basic(): + job = c.compile_puller_job( + resource_name="md-default-qwen", + workspace="default", + name="qwen", + engine="vllm", + image="hf-cli:25.10", + args=["download", "default/qwen", "--local-dir", "/model-store"], + env={"HF_ENDPOINT": "http://files/apis/files/v2/hf", "HF_TOKEN": "service:models"}, + gpu=2, + namespace="nemo", + service_account_name="nemo-models-sa", + image_pull_secret="nvcrimagepullsecret", + model_source="default/qwen@main", + ) + assert job.metadata.name == "md-default-qwen-pull" + assert job.spec.backoff_limit == c.DEFAULT_BACKOFF_LIMIT + assert job.spec.ttl_seconds_after_finished == c.DEFAULT_TTL_SECONDS_AFTER_FINISHED + + pod = job.spec.template.spec + assert pod.restart_policy == "Never" + assert pod.service_account_name == "nemo-models-sa" + assert pod.image_pull_secrets[0].name == "nvcrimagepullsecret" + + ctr = pod.containers[0] + assert ctr.args == ["download", "default/qwen", "--local-dir", "/model-store"] + assert ctr.command is None + env = {e.name: e.value for e in ctr.env} + assert env["HF_ENDPOINT"] == "http://files/apis/files/v2/hf" + assert env["HF_TOKEN"] == "service:models" + # GPU request pins the puller into GPU topology for PVC binding. + assert ctr.resources.requests["nvidia.com/gpu"] == "2" + assert ctr.resources.limits["nvidia.com/gpu"] == "2" + assert ctr.volume_mounts[0].mount_path == "/model-store" + # Job annotation carries the model source for the re-pull policy. + assert job.metadata.annotations[c.MODEL_SOURCE_ANNOTATION] == "default/qwen@main" + + +def test_compile_puller_job_cpu_only_no_gpu_request(): + job = c.compile_puller_job( + resource_name="r", + workspace="w", + name="n", + engine="vllm", + image="hf-cli", + args=["download", "w/n", "--local-dir", "/model-store"], + gpu=0, + ) + assert job.spec.template.spec.containers[0].resources is None + + +def test_compile_puller_job_no_image_pull_secret(): + job = c.compile_puller_job( + resource_name="r", + workspace="w", + name="n", + engine="vllm", + image="hf-cli", + args=["download"], + ) + assert job.spec.template.spec.image_pull_secrets is None + + +# --------------------------------------------------------------------------- +# Deployment +# --------------------------------------------------------------------------- + + +def test_compile_deployment_basic(): + dep = c.compile_deployment( + resource_name="md-default-qwen", + workspace="default", + name="qwen", + engine="vllm", + image="vllm/vllm-openai:v0.22.1", + args=["/model-store", "--served-model-name", "default/qwen"], + health_path="/health", + gpu=2, + namespace="nemo", + service_account_name="nemo-models-sa", + ) + assert dep.metadata.name == "md-default-qwen" + assert dep.spec.replicas == 1 + assert dep.spec.selector.match_labels == {"app": "md-default-qwen"} + + pod = dep.spec.template.spec + assert pod.service_account_name == "nemo-models-sa" + ctr = pod.containers[0] + # command unset -> image entrypoint (vllm serve) runs; args appended. + assert ctr.command is None + assert ctr.args == ["/model-store", "--served-model-name", "default/qwen"] + assert ctr.ports[0].container_port == 8000 + assert ctr.resources.limits["nvidia.com/gpu"] == "2" + assert ctr.startup_probe.http_get.path == "/health" + assert ctr.readiness_probe.http_get.path == "/health" + + # PVC mounted read-only at /model-store; scratch + dshm present. + mounts = {m.name: m for m in ctr.volume_mounts} + assert mounts["model-store"].mount_path == "/model-store" + assert mounts["model-store"].read_only is True + assert mounts["scratch"].mount_path == "/scratch" + assert mounts["dshm"].mount_path == "/dev/shm" + + vols = {v.name: v for v in pod.volumes} + assert vols["model-store"].persistent_volume_claim.claim_name == "md-default-qwen-pvc" + assert vols["dshm"].empty_dir.medium == "Memory" + + # health-path stamped as an annotation (its value contains '/', invalid for labels). + assert dep.spec.template.metadata.annotations[c.HEALTH_PATH_ANNOTATION] == "/health" + assert dep.metadata.annotations[c.HEALTH_PATH_ANNOTATION] == "/health" + + +def test_compile_deployment_cpu_only_no_gpu(): + dep = c.compile_deployment( + resource_name="r", + workspace="w", + name="n", + engine="vllm", + image="img", + args=["/model-store"], + health_path="/health", + gpu=0, + ) + assert dep.spec.template.spec.containers[0].resources is None + + +def test_compile_deployment_startup_grace_to_failure_threshold(): + dep = c.compile_deployment( + resource_name="r", + workspace="w", + name="n", + engine="vllm", + image="img", + args=[], + health_path="/health", + startup_grace_seconds=600, + ) + # ceil(600 / 10) == 60 + assert dep.spec.template.spec.containers[0].startup_probe.failure_threshold == 60 + + +def test_compile_deployment_shared_memory_size_limit(): + dep = c.compile_deployment( + resource_name="r", + workspace="w", + name="n", + engine="vllm", + image="img", + args=[], + health_path="/health", + shared_memory_size_limit="8Gi", + ) + vols = {v.name: v for v in dep.spec.template.spec.volumes} + assert vols["dshm"].empty_dir.size_limit == "8Gi" + + +def test_compile_deployment_security_context_set_when_uid_gid_given(): + """When uid/gid are provided, the server pod gets that securityContext.""" + dep = c.compile_deployment( + resource_name="r", + workspace="w", + name="n", + engine="vllm", + image="img", + args=[], + health_path="/health", + user_id=2000, + group_id=0, + ) + sc = dep.spec.template.spec.security_context + assert sc.run_as_user == 2000 + assert sc.run_as_group == 0 + assert sc.fs_group == 0 + + +def test_compile_deployment_no_security_context_when_uid_gid_unset(): + """No uid/gid -> no forced securityContext (runs as the image's default user).""" + dep = c.compile_deployment( + resource_name="r", + workspace="w", + name="n", + engine="vllm", + image="img", + args=[], + health_path="/health", + ) + assert dep.spec.template.spec.security_context is None + + +def test_compile_deployment_sidecars_and_init_containers(): + from kubernetes import client as k8s_client + + sidecar = k8s_client.V1Container(name="lora-sidecar", image="nmp-api") + init = k8s_client.V1Container(name="lora-cache-init", image="busybox") + dep = c.compile_deployment( + resource_name="r", + workspace="w", + name="n", + engine="vllm", + image="img", + args=[], + health_path="/health", + init_containers=[init], + sidecar_containers=[sidecar], + ) + pod = dep.spec.template.spec + assert pod.init_containers[0].name == "lora-cache-init" + assert [ctr.name for ctr in pod.containers] == ["r-ctr", "lora-sidecar"] + + +# --------------------------------------------------------------------------- +# Service +# --------------------------------------------------------------------------- + + +def test_compile_service_basic(): + svc = c.compile_service( + resource_name="md-default-qwen", + workspace="default", + name="qwen", + engine="vllm", + namespace="nemo", + ) + assert svc.spec.type == "ClusterIP" + assert svc.spec.selector == {"app": "md-default-qwen"} + assert svc.spec.ports[0].port == 8000 + assert svc.spec.ports[0].target_port == c.SERVER_PORT_NAME + assert svc.metadata.labels[MODEL_MANAGED_BY_LABEL] == MODEL_MANAGED_BY_MODELS_CONTROLLER diff --git a/services/core/models/tests/unit/controllers/test_backend_config_fields.py b/services/core/models/tests/unit/controllers/test_backend_config_fields.py index ad03c4d383..da07d7d967 100644 --- a/services/core/models/tests/unit/controllers/test_backend_config_fields.py +++ b/services/core/models/tests/unit/controllers/test_backend_config_fields.py @@ -595,3 +595,60 @@ def test_default_annotations_applied_to_nimservice_metadata_and_spec(sample_depl assert nimservice.metadata["annotations"] == {"prometheus.io/scrape": "true", "custom/key": "value"} assert nimservice.spec.annotations == {"prometheus.io/scrape": "true", "custom/key": "value"} + + +# --------------------------------------------------------------------------- +# vLLM-on-k8s config fields (raw-object emission path) +# --------------------------------------------------------------------------- + + +def test_default_vllm_image_default_value(): + """default_vllm_image / _tag fall back to the upstream vLLM image.""" + backend_config = K8sNimOperatorConfig() + assert backend_config.default_vllm_image == "vllm/vllm-openai" + assert backend_config.default_vllm_image_tag == "v0.22.1" + + +def test_default_vllm_image_override(): + """default_vllm_image / _tag can be repointed at a mirror.""" + backend_config = K8sNimOperatorConfig( + default_vllm_image="my-registry/vllm-openai", + default_vllm_image_tag="v0.99.0", + ) + assert backend_config.default_vllm_image == "my-registry/vllm-openai" + assert backend_config.default_vllm_image_tag == "v0.99.0" + + +def test_service_account_name_defaults_to_none(): + """service_account_name defaults to None (namespace default ServiceAccount).""" + assert K8sNimOperatorConfig().service_account_name is None + + +def test_service_account_name_override(): + """service_account_name can be set to a shared models ServiceAccount.""" + backend_config = K8sNimOperatorConfig(service_account_name="nemo-models-sa") + assert backend_config.service_account_name == "nemo-models-sa" + + +def test_default_shared_memory_size_limit_defaults_to_none(): + """default_shared_memory_size_limit defaults to None (node default /dev/shm).""" + assert K8sNimOperatorConfig().default_shared_memory_size_limit is None + + +def test_default_shared_memory_size_limit_override(): + """default_shared_memory_size_limit can be set for vLLM tensor-parallel NCCL.""" + backend_config = K8sNimOperatorConfig(default_shared_memory_size_limit="8Gi") + assert backend_config.default_shared_memory_size_limit == "8Gi" + + +def test_default_vllm_uid_gid_match_image_user(): + """vLLM uid/gid default to the upstream image's 'vllm' user (2000) / root group (0).""" + backend_config = K8sNimOperatorConfig() + assert backend_config.default_vllm_user_id == 2000 + assert backend_config.default_vllm_group_id == 0 + + +def test_default_vllm_uid_gid_override(): + backend_config = K8sNimOperatorConfig(default_vllm_user_id=1234, default_vllm_group_id=5678) + assert backend_config.default_vllm_user_id == 1234 + assert backend_config.default_vllm_group_id == 5678 diff --git a/services/core/models/tests/unit/controllers/test_deployment_reconciler.py b/services/core/models/tests/unit/controllers/test_deployment_reconciler.py index 12fe2bffa8..b0bb3885e4 100644 --- a/services/core/models/tests/unit/controllers/test_deployment_reconciler.py +++ b/services/core/models/tests/unit/controllers/test_deployment_reconciler.py @@ -276,8 +276,8 @@ async def test_reconcile_deployments_with_created_status(reconciler, mock_backen # Config is already in context so no retrieve call happens during reconciliation mock_backend.create_model_deployment.assert_called_once_with(created_deployment, mock_deployment_config, None) - # Verify backend.get_status was called for PENDING deployment - mock_backend.get_model_deployment_status.assert_called_once_with(pending_deployment) + # Verify backend.get_status was called for PENDING deployment with config + entity + mock_backend.get_model_deployment_status.assert_called_once_with(pending_deployment, None, None) # Verify SDK update was called twice (once for each deployment) assert reconciler._models_sdk.inference.deployments.update_status.call_count == 2 diff --git a/services/core/models/tests/unit/controllers/test_k8s_nim_operator_backend.py b/services/core/models/tests/unit/controllers/test_k8s_nim_operator_backend.py index 0ce34c6845..4ee8b1f66b 100644 --- a/services/core/models/tests/unit/controllers/test_k8s_nim_operator_backend.py +++ b/services/core/models/tests/unit/controllers/test_k8s_nim_operator_backend.py @@ -5,6 +5,7 @@ import contextlib from datetime import datetime, timedelta, timezone +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -460,6 +461,77 @@ async def test_k8s_backend_delete_model_deployment_without_secret(k8s_backend, s assert status_update.status == "DELETED" +@pytest.mark.asyncio +async def test_delete_attempts_all_resource_types_and_tolerates_404(k8s_backend, sample_deployment): + """Delete attempts CRs + raw vLLM objects by name; 404s are success -> DELETED.""" + k8s_backend._k8s_namespace = "default" + k8s_backend._dynamic_client = MagicMock() + k8s_backend._core_v1 = MagicMock() + k8s_backend._apps_v1 = MagicMock() + k8s_backend._batch_v1 = MagicMock() + cr_api = MagicMock() + cr_api.delete.side_effect = k8s_dynamic_exceptions.NotFoundError(MagicMock(status=404)) + k8s_backend._dynamic_client.resources.get.return_value = cr_api + notfound = k8s_client.exceptions.ApiException(status=404) + k8s_backend._apps_v1.delete_namespaced_deployment.side_effect = notfound + k8s_backend._core_v1.delete_namespaced_service.side_effect = notfound + k8s_backend._batch_v1.delete_namespaced_job.side_effect = notfound + k8s_backend._core_v1.delete_namespaced_persistent_volume_claim.side_effect = notfound + + result = await k8s_backend.delete_model_deployment("default", "qwen") + + assert result.status == "DELETED" + assert k8s_backend._dynamic_client.resources.get.call_count == 2 + k8s_backend._apps_v1.delete_namespaced_deployment.assert_called_once() + k8s_backend._core_v1.delete_namespaced_service.assert_called_once() + k8s_backend._batch_v1.delete_namespaced_job.assert_called_once() + k8s_backend._core_v1.delete_namespaced_persistent_volume_claim.assert_called_once() + + +@pytest.mark.asyncio +async def test_delete_real_failure_surfaces_error_but_attempts_all(k8s_backend, sample_deployment): + """A non-404 delete failure -> ERROR (not DELETED), and other deletes still run.""" + k8s_backend._k8s_namespace = "default" + k8s_backend._dynamic_client = MagicMock() + k8s_backend._core_v1 = MagicMock() + k8s_backend._apps_v1 = MagicMock() + k8s_backend._batch_v1 = MagicMock() + cr_api = MagicMock() + cr_api.delete.side_effect = k8s_dynamic_exceptions.NotFoundError(MagicMock(status=404)) + k8s_backend._dynamic_client.resources.get.return_value = cr_api + k8s_backend._apps_v1.delete_namespaced_deployment.side_effect = k8s_client.exceptions.ApiException(status=500) + notfound = k8s_client.exceptions.ApiException(status=404) + k8s_backend._core_v1.delete_namespaced_service.side_effect = notfound + k8s_backend._batch_v1.delete_namespaced_job.side_effect = notfound + k8s_backend._core_v1.delete_namespaced_persistent_volume_claim.side_effect = notfound + + result = await k8s_backend.delete_model_deployment("default", "qwen") + + assert result.status == "ERROR" + k8s_backend._core_v1.delete_namespaced_service.assert_called_once() + k8s_backend._core_v1.delete_namespaced_persistent_volume_claim.assert_called_once() + + +@pytest.mark.asyncio +async def test_delete_forbidden_cr_does_not_block_vllm_cleanup(k8s_backend, sample_deployment): + """A 403 deleting a NIMService still lets the raw vLLM objects be deleted (and surfaces ERROR).""" + k8s_backend._k8s_namespace = "default" + k8s_backend._dynamic_client = MagicMock() + k8s_backend._core_v1 = MagicMock() + k8s_backend._apps_v1 = MagicMock() + k8s_backend._batch_v1 = MagicMock() + cr_api = MagicMock() + cr_api.delete.side_effect = k8s_dynamic_exceptions.ForbiddenError(MagicMock(status=403)) + k8s_backend._dynamic_client.resources.get.return_value = cr_api + + result = await k8s_backend.delete_model_deployment("default", "qwen") + + assert result.status == "ERROR" + assert "forbidden" in result.status_message.lower() + k8s_backend._apps_v1.delete_namespaced_deployment.assert_called_once() + k8s_backend._core_v1.delete_namespaced_persistent_volume_claim.assert_called_once() + + def test_k8s_backend_initialization(mock_nmp_sdk, mock_k8s_config): """Test K8s NIM Operator backend initializes correctly with custom namespace config.""" config = {"namespace": "nim-system"} @@ -1614,3 +1686,324 @@ async def test_delete_model_deployment_by_id_calls_delete_resources(mock_nmp_sdk result = await backend.delete_model_deployment("my-ws", "my-name") mock_delete.assert_called_once_with("my-ws", "my-name") assert result.status == "DELETED" + + +# =========================================================================== +# vLLM path (native Kubernetes objects, no operator) +# =========================================================================== + + +def _vllm_config(*, gpu: int = 1, lora_enabled: bool = False): + """A minimal vLLM ModelDeploymentConfig-like object for dispatch/compile.""" + return SimpleNamespace( + engine="vllm", + model_spec=SimpleNamespace( + model_type=None, + model_namespace="default", + model_name="qwen", + model_revision=None, + chat_template=None, + tool_call_config=None, + lora_enabled=lora_enabled, + ), + executor_config=SimpleNamespace( + gpu=gpu, + disk_size="50Gi", + image_name=None, + image_tag=None, + health_check_path=None, + additional_envs=None, + additional_args=[], + k8s_nim_operator_config=None, + override_config=None, + ), + ) + + +def _vllm_backend(k8s_backend): + """Wire a k8s_backend with mocked typed clients for the vLLM path.""" + k8s_backend._k8s_namespace = "nemo" + k8s_backend._backend_config = K8sNimOperatorConfig() + k8s_backend._k8s_client = MagicMock() + k8s_backend._core_v1 = MagicMock() + k8s_backend._apps_v1 = MagicMock() + k8s_backend._batch_v1 = MagicMock() + return k8s_backend + + +def _api_exception(status: int): + return k8s_client.exceptions.ApiException(status=status) + + +@pytest.mark.asyncio +async def test_vllm_create_emits_pvc_and_job_only(k8s_backend, sample_deployment): + """vLLM create (phase P0) emits the PVC + puller Job, not the Deployment/Service.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config(gpu=2) + + with patch.object(backend, "_resolve_model_source", return_value=("default", "qwen", None)): + result = await backend.create_model_deployment(sample_deployment, config, None) + + assert result.status == "PENDING" + backend._core_v1.create_namespaced_persistent_volume_claim.assert_called_once() + backend._batch_v1.create_namespaced_job.assert_called_once() + # Deployment + Service are NOT created at P0. + backend._apps_v1.create_namespaced_deployment.assert_not_called() + backend._core_v1.create_namespaced_service.assert_not_called() + + # The puller Job requests the same GPU as the server (topology pin). + job = backend._batch_v1.create_namespaced_job.call_args.kwargs["body"] + assert job.spec.template.spec.containers[0].resources.requests["nvidia.com/gpu"] == "2" + + +@pytest.mark.asyncio +async def test_generic_engine_rejected_on_k8s(k8s_backend, sample_deployment): + """The generic engine is explicitly unsupported on the k8s backend.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config() + config.engine = "generic" + result = await backend.create_model_deployment(sample_deployment, config, None) + assert result.status == "ERROR" + assert "generic" in result.status_message.lower() + + +@pytest.mark.asyncio +async def test_vllm_status_job_running_is_pending(k8s_backend, sample_deployment): + """While the puller Job is running, status is PENDING.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config() + job = MagicMock() + job.status.failed = None + job.status.succeeded = None + backend._batch_v1.read_namespaced_job.return_value = job + # No serving Deployment yet (still in pull phase). + backend._apps_v1.read_namespaced_deployment.side_effect = _api_exception(404) + + result = await backend.get_model_deployment_status(sample_deployment, config, None) + assert result.status == "PENDING" + assert "weights" in result.status_message.lower() + backend._apps_v1.create_namespaced_deployment.assert_not_called() + + +@pytest.mark.asyncio +async def test_vllm_status_job_failed_is_error(k8s_backend, sample_deployment): + """A failed puller Job surfaces as ERROR.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config() + job = MagicMock() + job.status.failed = 5 + job.status.succeeded = None + backend._batch_v1.read_namespaced_job.return_value = job + backend._core_v1.list_namespaced_pod.return_value = MagicMock(items=[]) + # No serving Deployment yet (failed during pull phase). + backend._apps_v1.read_namespaced_deployment.side_effect = _api_exception(404) + + result = await backend.get_model_deployment_status(sample_deployment, config, None) + assert result.status == "ERROR" + assert result.error_details["reason"] == "weight_pull_failed" + + +@pytest.mark.asyncio +async def test_vllm_status_job_complete_creates_deployment(k8s_backend, sample_deployment): + """When the Job completes (phase P3), the Deployment + Service are created.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config(gpu=1) + + job = MagicMock() + job.status.failed = None + job.status.succeeded = 1 + backend._batch_v1.read_namespaced_job.return_value = job + # Deployment does not exist yet -> triggers P3 creation. + backend._apps_v1.read_namespaced_deployment.side_effect = _api_exception(404) + # After the puller Job is deleted, no puller pod remains (volume released). + backend._core_v1.list_namespaced_pod.return_value = MagicMock(items=[]) + created_dep = MagicMock() + created_dep.metadata.name = backend._get_resource_name(sample_deployment) + created_dep.metadata.uid = "dep-uid" + backend._apps_v1.create_namespaced_deployment.return_value = created_dep + + result = await backend.get_model_deployment_status(sample_deployment, config, None) + + assert result.status == "PENDING" + # Puller Job deleted (release RWO volume) before the Deployment is created. + backend._batch_v1.delete_namespaced_job.assert_called_once() + backend._apps_v1.create_namespaced_deployment.assert_called_once() + backend._core_v1.create_namespaced_service.assert_called_once() + # ownerRef patched onto the PVC so it cascades with the Deployment (Job is gone). + backend._core_v1.patch_namespaced_persistent_volume_claim.assert_called_once() + backend._batch_v1.patch_namespaced_job.assert_not_called() + + +@pytest.mark.asyncio +async def test_vllm_status_p3_waits_for_puller_pod_to_release_volume(k8s_backend, sample_deployment): + """At P3, if the puller pod is still present, defer Deployment creation (RWO release).""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config(gpu=1) + + job = MagicMock() + job.status.failed = None + job.status.succeeded = 1 + backend._batch_v1.read_namespaced_job.return_value = job + backend._apps_v1.read_namespaced_deployment.side_effect = _api_exception(404) + # Puller pod still terminating -> volume not yet released. + backend._core_v1.list_namespaced_pod.return_value = MagicMock(items=[MagicMock()]) + + result = await backend.get_model_deployment_status(sample_deployment, config, None) + + assert result.status == "PENDING" + backend._batch_v1.delete_namespaced_job.assert_called_once() + # Deployment is NOT created until the puller pod is gone. + backend._apps_v1.create_namespaced_deployment.assert_not_called() + + +@pytest.mark.asyncio +async def test_vllm_status_job_complete_with_lora_wires_sidecar(k8s_backend, sample_deployment): + """At P3 with LoRA enabled, the Deployment gets the cache-init + adapter sidecar.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config(gpu=1, lora_enabled=True) + + job = MagicMock() + job.status.failed = None + job.status.succeeded = 1 + backend._batch_v1.read_namespaced_job.return_value = job + backend._apps_v1.read_namespaced_deployment.side_effect = _api_exception(404) + backend._core_v1.list_namespaced_pod.return_value = MagicMock(items=[]) + created_dep = MagicMock() + created_dep.metadata.name = backend._get_resource_name(sample_deployment) + created_dep.metadata.uid = "dep-uid" + backend._apps_v1.create_namespaced_deployment.return_value = created_dep + + platform_cfg = MagicMock() + platform_cfg.image_pull_secrets = [] + platform_cfg.image_registry = "my-registry" + platform_cfg.image_tag = "local" + platform_cfg.to_shared_envvars.return_value = {"NMP_SHARED": "1"} + with patch(f"{_K8S_BACKEND_MODULE}.get_platform_config", return_value=platform_cfg): + result = await backend.get_model_deployment_status(sample_deployment, config, None) + + assert result.status == "PENDING" + dep_obj = backend._apps_v1.create_namespaced_deployment.call_args.kwargs["body"] + pod = dep_obj.spec.template.spec + assert pod.init_containers[0].name == "lora-cache-init" + sidecar = next(ctr for ctr in pod.containers if ctr.name == "lora-sidecar") + env = {e.name: e.value for e in sidecar.env} + assert env["NIM_PEFT_SOURCE"] == "/scratch/loras" + assert env["VLLM_LORA_BASE_MODEL_OVERRIDE"] == "/model-store" + assert env["NMP_SHARED"] == "1" + + +@pytest.mark.asyncio +async def test_vllm_status_job_absent_pvc_present_resumes_p3_not_lost(k8s_backend, sample_deployment): + """Job deleted (RWO release) + PVC present + no Deployment -> resume P3, not LOST.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config(gpu=1) + + backend._apps_v1.read_namespaced_deployment.side_effect = _api_exception(404) + backend._batch_v1.read_namespaced_job.side_effect = _api_exception(404) # Job already deleted + # PVC still present -> we're mid-P3, not orphaned. + backend._core_v1.read_namespaced_persistent_volume_claim.return_value = MagicMock() + backend._core_v1.list_namespaced_pod.return_value = MagicMock(items=[]) + created_dep = MagicMock() + created_dep.metadata.name = backend._get_resource_name(sample_deployment) + created_dep.metadata.uid = "dep-uid" + backend._apps_v1.create_namespaced_deployment.return_value = created_dep + + result = await backend.get_model_deployment_status(sample_deployment, config, None) + + assert result.status == "PENDING" + assert result.status != "LOST" + backend._apps_v1.create_namespaced_deployment.assert_called_once() + + +@pytest.mark.asyncio +async def test_vllm_status_job_and_pvc_absent_is_lost(k8s_backend, sample_deployment): + """Both Job and PVC gone + no Deployment -> genuine drift -> LOST.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config(gpu=1) + + backend._apps_v1.read_namespaced_deployment.side_effect = _api_exception(404) + backend._batch_v1.read_namespaced_job.side_effect = _api_exception(404) + backend._core_v1.read_namespaced_persistent_volume_claim.side_effect = _api_exception(404) + + result = await backend.get_model_deployment_status(sample_deployment, config, None) + assert result.status == "LOST" + + +@pytest.mark.asyncio +async def test_vllm_status_deployment_ready_is_ready(k8s_backend, sample_deployment): + """A ready serving Deployment maps to READY + host_url.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config() + + job = MagicMock() + job.status.failed = None + job.status.succeeded = 1 + backend._batch_v1.read_namespaced_job.return_value = job + + dep = MagicMock() + dep.status.ready_replicas = 1 + backend._apps_v1.read_namespaced_deployment.return_value = dep + + result = await backend.get_model_deployment_status(sample_deployment, config, None) + assert result.status == "READY" + assert result.host_url is not None + + +@pytest.mark.asyncio +async def test_vllm_update_unchanged_source_does_not_repull(k8s_backend, sample_deployment): + """Unchanged model source: no Job re-create, no resource deletion.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config() + + existing_job = MagicMock() + existing_job.metadata.labels = {"nmp.nvidia.com/engine": "vllm"} + existing_job.metadata.annotations = {"nmp.nvidia.com/model-source": "default/qwen"} + backend._batch_v1.read_namespaced_job.return_value = existing_job + + with patch.object(backend, "_resolve_model_source", return_value=("default", "qwen", None)): + with patch.object(backend, "_delete_vllm_resources") as mock_delete: + result = await backend.update_model_deployment(sample_deployment, config, None) + + mock_delete.assert_not_called() + backend._batch_v1.create_namespaced_job.assert_not_called() + assert result.status == "PENDING" + + +@pytest.mark.asyncio +async def test_vllm_update_changed_source_repulls(k8s_backend, sample_deployment): + """Changed model source: delete resources and re-run the phased create.""" + backend = _vllm_backend(k8s_backend) + config = _vllm_config() + + existing_job = MagicMock() + existing_job.metadata.labels = {"nmp.nvidia.com/engine": "vllm"} + existing_job.metadata.annotations = {"nmp.nvidia.com/model-source": "default/old-model"} + backend._batch_v1.read_namespaced_job.return_value = existing_job + + with patch.object(backend, "_resolve_model_source", return_value=("default", "qwen", "v2")): + with patch.object(backend, "_delete_vllm_resources") as mock_delete: + result = await backend.update_model_deployment(sample_deployment, config, None) + + mock_delete.assert_called_once() + # Re-pull: a new PVC + Job are created. + backend._core_v1.create_namespaced_persistent_volume_claim.assert_called_once() + backend._batch_v1.create_namespaced_job.assert_called_once() + assert result.status == "PENDING" + + +@pytest.mark.asyncio +async def test_vllm_list_managed_unions_deployments(k8s_backend): + """list_managed_deployment_names unions NIMServices and raw vLLM Deployments.""" + backend = _vllm_backend(k8s_backend) + backend._dynamic_client = MagicMock() + backend._dynamic_client.resources.get.return_value.get.return_value = MagicMock(items=[]) + + dep = MagicMock() + dep.metadata.labels = { + "nmp.nvidia.com/deployment-workspace": "default", + "nmp.nvidia.com/deployment-name": "qwen", + } + backend._apps_v1.list_namespaced_deployment.return_value = MagicMock(items=[dep]) + + names = await backend.list_managed_deployment_names() + assert "default/qwen" in names