diff --git a/deployment/setup_image.sh b/deployment/setup_image.sh index e8585bc1..1efeca94 100644 --- a/deployment/setup_image.sh +++ b/deployment/setup_image.sh @@ -90,6 +90,8 @@ if [[ "$IMAGE_NAME" == "streamwise" ]]; then cp "$APP_DIR"/*.bash "$IMAGE_DIR/docker_files/" [[ -d "$APP_DIR/static" ]] && cp -R "$APP_DIR/static" "$IMAGE_DIR/docker_files/" [[ -d "$APP_DIR/templates" ]] && cp -R "$APP_DIR/templates" "$IMAGE_DIR/docker_files/" + [[ -d "$APP_DIR/model_provisioner" ]] && cp -R "$APP_DIR/model_provisioner" "$IMAGE_DIR/docker_files/" + [[ -d "$MAIN_DIR/simulator" ]] && cp -R "$MAIN_DIR/simulator" "$IMAGE_DIR/docker_files/" cp "$MAIN_DIR/services.json" "$IMAGE_DIR/docker_files/" # Certs directory (empty by default; populated with --certfile/--keyfile for embedded HTTPS) diff --git a/deployment/streamwise/Dockerfile b/deployment/streamwise/Dockerfile index f7cef49c..a0c05e94 100644 --- a/deployment/streamwise/Dockerfile +++ b/deployment/streamwise/Dockerfile @@ -21,6 +21,8 @@ COPY *.sh . COPY *.json . COPY templates ./templates COPY static ./static +COPY model_provisioner ./model_provisioner +COPY simulator ./simulator # TLS certificates (optional β€” populated by setup_image.sh --certfile/--keyfile or mounted at runtime) RUN mkdir -p /certs diff --git a/streamwise/allocator_bridge.py b/streamwise/allocator_bridge.py new file mode 100644 index 00000000..6f892668 --- /dev/null +++ b/streamwise/allocator_bridge.py @@ -0,0 +1,368 @@ +""" +Bridge between the model provisioner's allocator output and StreamWise pod deployment. + +Translates ModelAllocation results (abstract Model enum + GPU counts) into concrete +container deployment parameters compatible with pod_manager.add_pod(). +""" + +from __future__ import annotations + +import sys +import os + +# Ensure the directory containing this file is on sys.path so model_provisioner is importable. +_HERE = os.path.dirname(os.path.abspath(__file__)) +if _HERE not in sys.path: + sys.path.insert(0, _HERE) +_REPO_ROOT = os.path.dirname(_HERE) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) +import model_provisioner # noqa: E402, F401 β€” adds simulator/ to sys.path + +from dataclasses import dataclass +from typing import Optional + +from sim_types import GPUType +from sim_types import Model +from sim_types import Result + +from auto_model_allocator import AutoModelAllocator +from container_config import COLOCATED_CONTAINERS +from container_config import CONTAINER_RESOURCES +from container_config import GPU_TYPE_TO_POD_STR +from container_config import MIG_AVAILABLE +from container_config import MIG_CAPABLE_GPU_TYPES +from container_config import MIG_CONTAINERS +from data_loading import load_latency_data +from model_provisioner.policies import STREAMWISE_POLICY +from streamwise_apps import STREAMWISE_APPS +from workflows import WORKFLOWS + + +# Mapping from simulator Model enum to concrete container names used by pod_manager. +# Some Model entries map to multiple containers (e.g., OTHERS -> kokoro + yolo). +MODEL_TO_CONTAINERS: dict[Model, list[str]] = { + Model.GEMMA: ["gemma"], + Model.FLUX: ["flux"], + Model.HF: ["hunyuanframepackf1"], + Model.HF_VAE: ["hunyuanframepackvae"], + Model.FT: ["fantasytalking"], + Model.FT_VAE: [], # FT_VAE is handled within fantasytalking container + Model.UPSCALER: ["realesrgan"], + Model.OTHERS: ["kokoro", "yolo"], +} + + +def get_mig_profile(container_name: str, gpu_type: GPUType) -> Optional[str]: + """Return a MIG profile only when MIG is available and the GPU type supports it.""" + if not MIG_AVAILABLE: + return None + if gpu_type not in MIG_CAPABLE_GPU_TYPES: + return None + return MIG_CONTAINERS.get(container_name) + + +# Mapping from StreamWise app name to simulator workflow key +APP_TO_WORKFLOW: dict[str, str] = { + "streamcast": "podcast", + "streampersona": "slide", + "streamchat": "chat", + "streamshort": "short", + "streammovie": "movie", + "streamanimate": "story", + "streamlecture": "lecture", + "streamdub": "dubbing", + "streamedit": "editing", +} + +# Ensure allocator knows about all StreamWise apps (catch drift early). +assert set(APP_TO_WORKFLOW.keys()) == set(STREAMWISE_APPS), ( + f"APP_TO_WORKFLOW keys {set(APP_TO_WORKFLOW.keys())} != STREAMWISE_APPS {set(STREAMWISE_APPS)}" +) + + +@dataclass +class DeploymentSpec: + """A single container deployment specification.""" + container_name: str + cpu: int + memory_gib: int + ephemeral_storage_gib: int + gpu: int + gpu_type: Optional[str] + mig_profile: Optional[str] + + +@dataclass +class DeploymentPlan: + """Complete deployment plan produced by the auto-allocator.""" + specs: list[DeploymentSpec] + result: Result + workflow_name: str + gpu_budget: dict[str, int] + + +def _get_data_dir() -> str: + """Get the path to the simulator data directory.""" + default_path = os.path.join(_REPO_ROOT, "simulator", "data") + return os.getenv("SIMULATOR_DATA_DIR", default_path) + + +# Reverse mapping from pod gpu_type string to GPUType enum +_POD_STR_TO_GPU_TYPE: dict[str, GPUType] = {v: k for k, v in GPU_TYPE_TO_POD_STR.items()} + + +def _calc_actual_gpus_per_type(specs: list['DeploymentSpec']) -> dict[GPUType, int]: + """Calculate actual GPUs needed per GPUType from deployment specs.""" + result: dict[GPUType, int] = {} + for spec in specs: + if spec.mig_profile: + continue + gpu_type = _POD_STR_TO_GPU_TYPE.get(spec.gpu_type or "") + if gpu_type is not None: + result[gpu_type] = result.get(gpu_type, 0) + spec.gpu + return result + + +def _trim_specs_for_type( + specs: list['DeploymentSpec'], gpu_type_str: str, excess: int +) -> list['DeploymentSpec']: + """ + Remove replicas from specs to reduce GPU usage on a specific type by `excess` GPUs. + + Prefers removing replicas of the most-replicated scalable container (typically + realesrgan/upscaler) to minimize impact on pipeline throughput. + """ + # Count replicas per container on this GPU type (only scalable ones) + from collections import Counter + type_counts: Counter[str] = Counter() + for spec in specs: + if spec.gpu_type == gpu_type_str and spec.gpu > 0 and spec.container_name not in COLOCATED_CONTAINERS: + type_counts[spec.container_name] += 1 + + # Prefer trimming containers with most replicas (least impact per removal) + trimmed = 0 + result_specs = list(specs) + for container_name, _count in type_counts.most_common(): + if trimmed >= excess: + break + # Remove replicas from the end of the list + for i in range(len(result_specs) - 1, -1, -1): + if trimmed >= excess: + break + spec = result_specs[i] + if (spec.container_name == container_name + and spec.gpu_type == gpu_type_str + and spec.gpu > 0): + trimmed += spec.gpu + result_specs.pop(i) + return result_specs + + +def get_available_workflows() -> list[str]: + """Return list of available workflow names for the UI.""" + return list(APP_TO_WORKFLOW.keys()) + + +def get_available_gpu_types() -> list[str]: + """Return list of available GPU type strings for the UI.""" + return [gpu_type.value for gpu_type in GPUType] + + +def run_allocator( + gpu_budget: dict[str, int], + workflow_name: str, +) -> DeploymentPlan: + """ + Run the greedy model allocator and return a deployment plan. + + Args: + gpu_budget: GPU counts keyed by GPU type string (e.g., {"A100": 8, "H100": 0}). + workflow_name: StreamWise app name (e.g., "streamcast"). + + Returns: + DeploymentPlan with concrete container deployment specs. + + Raises: + ValueError: If workflow_name or GPU types are invalid. + """ + # Validate workflow + workflow_key = APP_TO_WORKFLOW.get(workflow_name) + if workflow_key is None: + raise ValueError( + f"Unknown workflow '{workflow_name}'. " + f"Available: {list(APP_TO_WORKFLOW.keys())}") + + workflow = WORKFLOWS[workflow_key] + + # Parse GPU budget into GPUType enum + num_gpus: dict[GPUType, int] = {} + for gpu_str, count in gpu_budget.items(): + try: + gpu_type = GPUType(gpu_str) + except ValueError: + raise ValueError( + f"Unknown GPU type '{gpu_str}'. " + f"Available: {[g.value for g in GPUType]}") + if count > 0: + num_gpus[gpu_type] = count + + if not num_gpus or sum(num_gpus.values()) < 8: + raise ValueError("Total GPU budget must be at least 8 GPUs.") + + # The allocator requires GPU counts to be multiples of NUM_GPUS_PER_SERVER (8). + # Round up for the allocator, then trim back to the real budget afterward. + import math + from constants import NUM_GPUS_PER_SERVER + allocator_gpus: dict[GPUType, int] = {} + for gpu_type, count in num_gpus.items(): + server_size = NUM_GPUS_PER_SERVER[gpu_type] + allocator_gpus[gpu_type] = math.ceil(count / server_size) * server_size + + # Load latency data and run allocator + data_dir = _get_data_dir() + latency_data = load_latency_data(data_dir=data_dir) + + allocator = AutoModelAllocator( + workflow=workflow, + latency_data=latency_data, + policy=STREAMWISE_POLICY, + ) + + result = allocator.allocate(num_gpus=allocator_gpus, verbose=False) + + # Convert result to deployment specs + specs = result_to_deployment_specs(result) + + # Trim deployment specs back to the user's actual budget. + # Also handles MIG-unavailable overflow (e.g., OTHERS allocates 1 GPU + # but kokoro+yolo each need a full GPU = 2). + actual_per_type = _calc_actual_gpus_per_type(specs) + for gpu_type, budget_count in num_gpus.items(): + actual = actual_per_type.get(gpu_type, 0) + if actual <= budget_count: + continue + excess = actual - budget_count + gpu_type_str = GPU_TYPE_TO_POD_STR[gpu_type] + specs = _trim_specs_for_type(specs, gpu_type_str, excess) + + return DeploymentPlan( + specs=specs, + result=result, + workflow_name=workflow_name, + gpu_budget=gpu_budget, + ) + + +def result_to_deployment_specs(result: Result) -> list[DeploymentSpec]: + """ + Convert an allocator Result into a list of DeploymentSpec objects. + + Each ModelAllocation with replicas > 0 is mapped to one or more container deployments. + When MIG is unavailable, containers that would normally use MIG slices get 1 full GPU instead. + """ + specs: list[DeploymentSpec] = [] + + for gpu_type, model_dict in result.models.items(): + gpu_type_str = GPU_TYPE_TO_POD_STR[gpu_type] + + for model, allocations in model_dict.items(): + containers = MODEL_TO_CONTAINERS.get(model, []) + if not containers: + continue + + for allocation in allocations: + if allocation.replicas <= 0: + continue + + for container_name in containers: + resources = CONTAINER_RESOURCES.get(container_name, (4, 16, 16)) + cpu, memory_gib, ephemeral_storage_gib = resources + + mig_profile: Optional[str] = None + # Co-locate VAE only when disaggregation is disabled + # TODO: make disaggregation a configuration exposed to the users + is_colocated = ( + container_name in COLOCATED_CONTAINERS + and not STREAMWISE_POLICY.disaggregation.get(Model.HF, False) + ) + if is_colocated: + gpu_count = 0 + elif MIG_AVAILABLE and container_name in MIG_CONTAINERS: + mig_profile = MIG_CONTAINERS[container_name] + gpu_count = 1 + elif container_name in MIG_CONTAINERS: + # MIG not available: use 1 full GPU instead of a MIG slice + gpu_count = 1 + else: + gpu_count = allocation.devices + + for _ in range(allocation.replicas): + specs.append(DeploymentSpec( + container_name=container_name, + cpu=cpu, + memory_gib=memory_gib, + ephemeral_storage_gib=ephemeral_storage_gib, + gpu=gpu_count, + gpu_type=gpu_type_str, + mig_profile=mig_profile, + )) + + return specs + + +def deployment_plan_to_json(plan: DeploymentPlan) -> dict: + """Serialize a DeploymentPlan to a JSON-friendly dict.""" + # Calculate actual GPUs used by the deployment specs (may differ from allocator + # when MIG is unavailable and services fall back to full GPUs). + actual_gpus: dict[str, int] = {} + for spec in plan.specs: + if spec.mig_profile: + continue # MIG slices don't count against full GPU budget + gpu_type_key = spec.gpu_type or "unknown" + actual_gpus[gpu_type_key] = actual_gpus.get(gpu_type_key, 0) + spec.gpu + + total_budget = sum(plan.gpu_budget.values()) + total_actual = sum(actual_gpus.values()) + budget_exceeded = total_actual > total_budget + + warnings: list[str] = [] + if budget_exceeded: + mig_hint = ( + "Enable MIG to fit lightweight services (kokoro, yolo, realesrgan) " + "on shared GPU slices." + ) if not MIG_AVAILABLE else "" + warnings.append( + f"Deployment requires {total_actual} full GPUs but budget is " + f"{total_budget}. {mig_hint}" + ) + + return { + "workflow_name": plan.workflow_name, + "gpu_budget": plan.gpu_budget, + "metrics": { + "total_time_s": round(plan.result.total_time_s, 2), + "ttff_s": round(plan.result.ttff_s, 2), + "cost": round(plan.result.cost, 4), + "gpus_used": { + gpu_type.value: count + for gpu_type, count in plan.result.gpus_used.items() + }, + "actual_gpus_needed": actual_gpus, + "budget_exceeded": budget_exceeded, + }, + "warnings": warnings, + "mig_available": MIG_AVAILABLE, + "specs": [ + { + "container_name": spec.container_name, + "cpu": spec.cpu, + "memory_gib": spec.memory_gib, + "ephemeral_storage_gib": spec.ephemeral_storage_gib, + "gpu": spec.gpu, + "gpu_type": spec.gpu_type, + "mig_profile": spec.mig_profile, + } + for spec in plan.specs + ], + } diff --git a/streamwise/container_config.py b/streamwise/container_config.py new file mode 100644 index 00000000..a9da9ced --- /dev/null +++ b/streamwise/container_config.py @@ -0,0 +1,70 @@ +""" +Shared container deployment configuration for StreamWise. + +Central source of truth for container resource defaults, MIG profiles, +GPU type mappings, and related deployment constants. Both allocator_bridge +and streamwise.py import from here to avoid duplication. +""" + +from __future__ import annotations + +import sys +import os + +_HERE = os.path.dirname(os.path.abspath(__file__)) +if _HERE not in sys.path: + sys.path.insert(0, _HERE) + +_REPO_ROOT = os.path.dirname(_HERE) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +# model_provisioner import adds simulator/ to sys.path +import model_provisioner # noqa: E402, F401 + +from sim_types import GPUType # noqa: E402 + + +# Default CPU/memory/storage for each container when deployed via auto-deploy. +# Format: (cpu_cores, memory_gib, ephemeral_storage_gib) +# Keep in sync with the Helm values in deployment/helm/values.yaml. +CONTAINER_RESOURCES: dict[str, tuple[int, int, int]] = { + "gemma": (16, 192, 64), + "flux": (12, 128, 64), + "hunyuanframepackf1": (24, 128, 64), + "hunyuanframepackvae": (4, 32, 16), + "fantasytalking": (12, 192, 64), + "realesrgan": (4, 32, 16), + "kokoro": (2, 8, 16), + "yolo": (4, 8, 16), +} + +# GPU type string used by pod_manager (lowercase). +GPU_TYPE_TO_POD_STR: dict[GPUType, str] = { + GPUType.A100: "a100", + GPUType.H100: "h100", + GPUType.H200: "h200", + GPUType.GB200: "gb200", +} + +# MIG is only supported by pod_manager on these GPU types. +MIG_CAPABLE_GPU_TYPES: frozenset[GPUType] = frozenset({GPUType.A100, GPUType.H100}) + +# Containers that prefer a MIG slice when the selected GPU type supports MIG. +# When MIG is available on the cluster, these services use a MIG slice (shared GPU). +# When MIG is NOT available, they fall back to 1 full GPU each and the extra GPUs +# are counted against the budget (with a warning if exceeded). +MIG_CONTAINERS: dict[str, str] = { + "kokoro": "1g.10gb", + "yolo": "1g.10gb", + "realesrgan": "1g.10gb", +} + +# Containers that are co-located with their parent model (sharing GPUs on the same server). +# The allocator counts their GPUs as part of the parent model's allocation, so they should +# deploy with gpu=0 to avoid double-counting. +COLOCATED_CONTAINERS: frozenset[str] = frozenset({"hunyuanframepackvae"}) + +# Whether MIG is actually configured on the cluster. +# When False, MIG_CONTAINERS entries fall back to full GPUs. +MIG_AVAILABLE: bool = False diff --git a/streamwise/requirements.txt b/streamwise/requirements.txt index b5695973..468a2136 100644 --- a/streamwise/requirements.txt +++ b/streamwise/requirements.txt @@ -5,3 +5,5 @@ aiohttp numpy scipy colorlog +pandas +tabulate diff --git a/streamwise/streamwise.py b/streamwise/streamwise.py index 1c63eacf..7c0b4de6 100644 --- a/streamwise/streamwise.py +++ b/streamwise/streamwise.py @@ -34,6 +34,9 @@ import pod_manager import node_manager import job_manager +import allocator_bridge +from container_config import CONTAINER_RESOURCES +from container_config import MIG_CONTAINERS from service_manager import get_services from service_manager import get_service_timestamps @@ -219,7 +222,8 @@ async def index() -> QuartReturn: svc["load_balancer"] = await get_lb_pod(pod_name) app_svcs = [svc for svc in svcs if svc.get("container_name") in STREAMWISE_APPS] - wrapper_svcs = [svc for svc in svcs if svc.get("container_name") not in STREAMWISE_APPS] + _system_containers = set(STREAMWISE_APPS) | {"streamwise"} + wrapper_svcs = [svc for svc in svcs if svc.get("container_name") not in _system_containers] return await render_template( "index.html", @@ -545,6 +549,12 @@ async def add_pod(service_name: str) -> str: ) +@route("/auto_deploy", methods=["GET"]) +async def auto_deploy_page() -> str: + """Render the standalone auto-deploy page. (TODO: enable customization after auto-deploy plan is generated)""" + return await render_template("auto_deploy.html") + + @route("/api/pod/", methods=["DELETE"]) async def api_remove_pod(pod_name: str) -> QuartReturn: """API interface to remove a pod by name.""" @@ -557,6 +567,31 @@ async def api_remove_pod(pod_name: str) -> QuartReturn: k8s_cluster=k8s_cluster) +@route("/api/pods/wrappers", methods=["DELETE"]) +async def api_delete_all_wrappers() -> QuartReturn: + """Delete all wrapper pods (non-app, non-system pods) in the namespace.""" + svcs = await get_services(namespace=NAMESPACE, k8s_cluster=k8s_cluster) + # Exclude app pods and the streamwise management pod itself + excluded = set(STREAMWISE_APPS) | {"streamwise"} + wrapper_pods = [ + svc["pod_name"] for svc in svcs + if svc.get("container_name") not in excluded and svc.get("pod_name") + ] + deleted = 0 + errors: list[str] = [] + for pod_name in wrapper_pods: + try: + await pod_manager.remove_pod( + pod_name, namespace=NAMESPACE, k8s_cluster=k8s_cluster) + deleted += 1 + except Exception as e: + errors.append(f"{pod_name}: {e}") + result: dict[str, object] = {"deleted": deleted, "total": len(wrapper_pods)} + if errors: + result["errors"] = errors + return jsonify(result), HTTPStatus.OK + + @route("/api/services", methods=["GET"]) async def api_get_services() -> QuartReturn: """API interface to get the list of services.""" @@ -594,25 +629,29 @@ async def api_add_service( ) -> QuartReturn: """API interface to add pods for all services.""" try: - # CPU, memory GiB, ephemeral storage GiB, GPU count, GPU type - # Keep in sync with the helm values - container_dict: dict[str, tuple[int, int, int, Union[int, str]]] = { - "podcasttranscript": (1, 4, 16, 0), - "slidetranscript": (1, 4, 16, 0), - "gemma": (16, 192, 64, min(2, max_gpus)), - # "hunyuanframepackf1": (32, 192, 64, min(2, max_gpus)), - "hunyuanframepackf1": (24, 128, 64, min(2, max_gpus)), - "hunyuanframepackvae": (4, 32, 16, 1), - # "flux": (16, 192, 64, min(2, max_gpus)), - "flux": (12, 128, 64, min(2, max_gpus)), - "fluxkontext": (12, 128, 64, 1), - # "fantasytalking": (16, 256, 64, min(2, max_gpus)), - "fantasytalking": (12, 192, 64, min(2, max_gpus)), - "realesrgan": (4, 32, 16, "1g.10gb"), - "yolo": (4, 8, 16, "1g.10gb"), - "kokoro": (2, 8, 16, "1g.10gb"), - "whisper": (2, 8, 16, 1), - } + # Build container_dict from shared constants (CONTAINER_RESOURCES + MIG_CONTAINERS). + # Format: container_name -> (cpu, memory_gib, ephemeral_storage_gib, gpu_info) + # gpu_info is either an int (GPU count) or a MIG profile string. + container_dict: dict[str, tuple[int, int, int, Union[int, str]]] = {} + + # Services not in CONTAINER_RESOURCES (CPU-only or extra services) + container_dict["podcasttranscript"] = (1, 4, 16, 0) + container_dict["slidetranscript"] = (1, 4, 16, 0) + + for name, (cpu, mem, storage) in CONTAINER_RESOURCES.items(): + if name in MIG_CONTAINERS: + container_dict[name] = (cpu, mem, storage, MIG_CONTAINERS[name]) + else: + container_dict[name] = (cpu, mem, storage, min(2, max_gpus)) + + # Additional services not covered by CONTAINER_RESOURCES + container_dict["fluxkontext"] = (12, 128, 64, 1) + container_dict["whisper"] = (2, 8, 16, 1) + + # hunyuanframepackvae uses exactly 1 GPU (not scaled by max_gpus) + cpu, mem, storage = CONTAINER_RESOURCES["hunyuanframepackvae"] + container_dict["hunyuanframepackvae"] = (cpu, mem, storage, 1) + for container_name, (cpu, mem_gib, sotrage_gib, gpu_info) in container_dict.items(): num_gpus, mig_profile = parse_gpu_info(gpu_info) await pod_manager.add_pod( @@ -726,6 +765,247 @@ async def api_add_pod() -> QuartReturn: return jsonify({"error": str(ex)}), HTTPStatus.INTERNAL_SERVER_ERROR +@route("/api/auto_deploy", methods=["POST"]) +async def api_auto_deploy() -> QuartReturn: + """Run the model allocator to produce an optimized deployment plan. + + Expects JSON body: + { + "gpu_budget": {"A100": 8, "H100": 0, ...}, + "workflow": "streamcast" + } + + Returns the deployment plan with estimated metrics and per-container specs. + """ + try: + data = await request.get_json() + if not data: + return jsonify({"error": "Request body must be JSON"}), HTTPStatus.BAD_REQUEST + + gpu_budget = data.get("gpu_budget") + workflow_name = data.get("workflow") + + if not gpu_budget or not isinstance(gpu_budget, dict): + return jsonify({"error": "Missing or invalid 'gpu_budget' field"}), HTTPStatus.BAD_REQUEST + for gpu_type_name, count in gpu_budget.items(): + if isinstance(count, bool) or not isinstance(count, int) or count < 0: + return ( + jsonify( + { + "error": ( + "Invalid 'gpu_budget' field: each GPU type count must be a " + "non-negative integer" + ) + } + ), + HTTPStatus.BAD_REQUEST, + ) + if not workflow_name or not isinstance(workflow_name, str): + return jsonify({"error": "Missing or invalid 'workflow' field"}), HTTPStatus.BAD_REQUEST + + plan = await asyncio.to_thread( + allocator_bridge.run_allocator, + gpu_budget=gpu_budget, + workflow_name=workflow_name, + ) + result_json = allocator_bridge.deployment_plan_to_json(plan) + + # Enrich specs with friendly names from services.json and uppercase GPU types + for spec in result_json.get("specs", []): + spec["friendly_name"] = await get_friendly_container_name(spec["container_name"]) + if spec.get("gpu_type"): + spec["gpu_type"] = spec["gpu_type"].upper() + + return jsonify(result_json), HTTPStatus.OK + + except ValueError as ve: + return jsonify({"error": str(ve)}), HTTPStatus.BAD_REQUEST + except AssertionError as ae: + msg = str(ae) if str(ae) else ( + "GPU budget too small. Each GPU type must have at least 8 GPUs " + "(one full server). Use a single GPU type with 8+ GPUs, or " + "ensure each type has at least 8." + ) + return jsonify({"error": msg}), HTTPStatus.BAD_REQUEST + except Exception as ex: + logging.exception("Error in auto_deploy: %s", ex) + return jsonify({"error": str(ex)}), HTTPStatus.INTERNAL_SERVER_ERROR + + +@route("/api/auto_deploy/confirm", methods=["POST"]) +async def api_auto_deploy_confirm() -> QuartReturn: + """Execute a deployment plan produced by /api/auto_deploy. + + Expects JSON body: + { + "specs": [...], + "workflow": "streamcast" (optional: also deploys the application container) + } + + Deploys all model wrapper containers in the plan, plus the application + container if a workflow name is provided. + """ + try: + data = await request.get_json() + if not data: + return jsonify({"error": "Request body must be JSON"}), HTTPStatus.BAD_REQUEST + + specs = data.get("specs") + if not specs or not isinstance(specs, list): + return jsonify({"error": "Missing or invalid 'specs' field"}), HTTPStatus.BAD_REQUEST + + workflow = data.get("workflow") + + deployed: List[str] = [] + errors: List[str] = [] + + for spec in specs: + container_name = spec.get("container_name") + if not container_name: + errors.append("Spec missing 'container_name'") + continue + + try: + add_pod_result = await pod_manager.add_pod( + container_name=container_name, + cpu=int(spec.get("cpu", 4)), + memory_gib=int(spec.get("memory_gib", 16)), + ephemeral_storage_gib=int(spec.get("ephemeral_storage_gib", 16)), + gpu=int(spec.get("gpu", 0)), + gpu_type=spec.get("gpu_type"), + mig_profile=spec.get("mig_profile"), + namespace=NAMESPACE, + k8s_cluster=k8s_cluster, + ) + + status_code = HTTPStatus.OK + if isinstance(add_pod_result, tuple) and len(add_pod_result) >= 2: + status_value = add_pod_result[1] + if isinstance(status_value, HTTPStatus): + status_code = status_value + elif isinstance(status_value, int): + status_code = HTTPStatus(status_value) + + if status_code >= HTTPStatus.BAD_REQUEST: + msg = f"Failed to deploy '{container_name}' (status={int(status_code)})" + logging.error(msg) + errors.append(msg) + else: + deployed.append(container_name) + except Exception as pod_ex: + msg = f"Failed to deploy '{container_name}': {pod_ex}" + logging.error(msg) + errors.append(msg) + + # Also deploy the application container if workflow is specified + if workflow and workflow in STREAMWISE_APPS: + try: + add_pod_result = await pod_manager.add_pod( + container_name=workflow, + cpu=4, + memory_gib=16, + ephemeral_storage_gib=16, + gpu=0, + gpu_type=None, + mig_profile=None, + namespace=NAMESPACE, + k8s_cluster=k8s_cluster, + ) + status_code = HTTPStatus.OK + if isinstance(add_pod_result, tuple) and len(add_pod_result) >= 2: + status_value = add_pod_result[1] + if isinstance(status_value, HTTPStatus): + status_code = status_value + elif isinstance(status_value, int): + status_code = HTTPStatus(status_value) + if status_code >= HTTPStatus.BAD_REQUEST: + msg = f"Failed to deploy app '{workflow}' (status={int(status_code)})" + logging.error(msg) + errors.append(msg) + else: + deployed.append(workflow) + except Exception as app_ex: + msg = f"Failed to deploy app '{workflow}': {app_ex}" + logging.error(msg) + errors.append(msg) + + total_deployed = len(deployed) + total_specs = len(specs) + (1 if workflow and workflow in STREAMWISE_APPS else 0) + status = HTTPStatus.OK if not errors else HTTPStatus.MULTI_STATUS + return jsonify({ + "deployed": deployed, + "errors": errors, + "message": f"Deployed {total_deployed}/{total_specs} containers.", + }), status + + except Exception as ex: + logging.exception("Error in auto_deploy/confirm: %s", ex) + return jsonify({"error": str(ex)}), HTTPStatus.INTERNAL_SERVER_ERROR + + +@route("/api/auto_deploy/workflows", methods=["GET"]) +async def api_auto_deploy_workflows() -> QuartReturn: + """Return available workflows and GPU types for the auto-deploy UI.""" + return jsonify({ + "workflows": allocator_bridge.get_available_workflows(), + "gpu_types": allocator_bridge.get_available_gpu_types(), + }), HTTPStatus.OK + + +@route("/api/auto_deploy/cluster_gpus", methods=["GET"]) +async def api_auto_deploy_cluster_gpus() -> QuartReturn: + """Return aggregated GPU counts by type from the current cluster. + + Inspects all ready nodes and sums up allocatable GPUs grouped by the + nvidia.com/gpu.product label (mapped to canonical names like A100, H100, etc.). + """ + try: + nodes = await get_k8s_nodes(k8s_cluster) + gpu_counts: dict[str, int] = {} + for node in nodes: + if not node.get("is_ready"): + continue + gpu_model = node.get("gpu_model", "N/A") + if gpu_model == "N/A": + continue + gpu_count = node.get("allocatable_resources", {}).get("gpu", 0) + if isinstance(gpu_count, str): + try: + gpu_count = int(gpu_count) + except ValueError: + continue + if gpu_count <= 0: + continue + # Map gpu_model label to canonical type name + canonical = _gpu_label_to_canonical(gpu_model) + gpu_counts[canonical] = gpu_counts.get(canonical, 0) + gpu_count + return jsonify({"gpu_budget": gpu_counts}), HTTPStatus.OK + except Exception as ex: + logging.exception("Error in cluster_gpus: %s", ex) + return jsonify({"error": str(ex)}), HTTPStatus.INTERNAL_SERVER_ERROR + + +def _gpu_label_to_canonical(gpu_model: str) -> str: + """Map a GPU product label to a canonical type name for the allocator.""" + model_upper = gpu_model.upper() + if "H100" in model_upper: + return "H100" + elif "H200" in model_upper: + return "H200" + elif "A100" in model_upper: + return "A100" + elif "GB200" in model_upper: + return "GB200" + elif "GB300" in model_upper: + return "GB300" + elif "V100" in model_upper: + return "V100" + elif "A10" in model_upper: + return "A10" + # Fallback: return as-is + return gpu_model + + @route("/api/node/", methods=["DELETE"]) async def api_remove_node(node_name: str) -> QuartReturn: return await node_manager.remove_node( diff --git a/streamwise/templates/auto_deploy.html b/streamwise/templates/auto_deploy.html new file mode 100644 index 00000000..b477d3ef --- /dev/null +++ b/streamwise/templates/auto_deploy.html @@ -0,0 +1,303 @@ + + + + + Auto Deploy - StreamWise + + + + + + + +
+ 🏠 +
+ +
+

πŸ€– Auto Deploy

+

+ Specify your GPU budget and workflow. The optimizer will determine the best model allocation + and deploy all model wrappers plus the application container. +

+ +
+
+ + πŸ’° GPU Budget + +
+ +
+ +
+ + 🎬 Workflow + +
+ + +
+
+ +
+ +
+
+ + + + +
+ + + + + + diff --git a/streamwise/templates/index.html b/streamwise/templates/index.html index 7fc18a6c..c9913f55 100644 --- a/streamwise/templates/index.html +++ b/streamwise/templates/index.html @@ -244,6 +244,18 @@

πŸ“„πŸ“½οΈ StreamWise Cluster Manager πŸ”‰πŸŽ¬

{% endmacro %} +

πŸ€– Auto Deploy

+
+ + πŸ€– Auto Deploy + + Optimize and deploy all services automatically +
+

🎯 Applications

🌐 Wrappers Add Wrapper {% endif %} + {% if wrapper_svcs and wrapper_svcs | length > 0 %} + + {% endif %}
{% if wrapper_svcs %} {{ svc_table(wrapper_svcs, 'rtgen-table') }} @@ -574,5 +595,28 @@

πŸ«› Pods

}); }); + diff --git a/tests/streamwise/test_allocator_bridge.py b/tests/streamwise/test_allocator_bridge.py new file mode 100644 index 00000000..53364282 --- /dev/null +++ b/tests/streamwise/test_allocator_bridge.py @@ -0,0 +1,287 @@ +""" +Tests for streamwise/allocator_bridge.py. + +Covers: +- Model-to-container name mapping. +- Result to deployment specs conversion. +- run_allocator end-to-end (with real latency data). +- Error handling for invalid inputs. +""" + +from __future__ import annotations + +import sys +import os + +import pytest + +# Add current path and simulator/ permanently so lazy imports +# (e.g. GreedyAllocator via auto_model_allocator) resolve at test time. +sys.path.append(os.getcwd()) +sys.path[:0] = [os.path.join(os.getcwd(), "simulator")] + +from tests.test_utils import temp_sys_path + +with temp_sys_path("streamwise", "simulator"): + from allocator_bridge import ( + MODEL_TO_CONTAINERS, + CONTAINER_RESOURCES, + GPU_TYPE_TO_POD_STR, + APP_TO_WORKFLOW, + DeploymentSpec, + DeploymentPlan, + get_available_workflows, + get_available_gpu_types, + result_to_deployment_specs, + deployment_plan_to_json, + run_allocator, + ) + from sim_types import GPUType, Model, Result + from models import ( + GemmaModelAllocation, + FluxModelAllocation, + HFModelAllocation, + HFVAEModelAllocation, + FTModelAllocation, + OthersModelAllocation, + UpscalerModelAllocation, + ) + + +# --------------------------------------------------------------------------- +# Mapping correctness +# --------------------------------------------------------------------------- + +def test_model_to_containers_covers_all_models() -> None: + """Every Model enum value must have a mapping entry.""" + for model in Model: + assert model in MODEL_TO_CONTAINERS, f"Missing mapping for {model}" + + +def test_container_resources_covers_all_mapped_containers() -> None: + """Every container referenced in MODEL_TO_CONTAINERS must have resource defaults.""" + for model, containers in MODEL_TO_CONTAINERS.items(): + for container in containers: + assert container in CONTAINER_RESOURCES, ( + f"Missing CONTAINER_RESOURCES for '{container}' (from {model})") + + +def test_gpu_type_to_pod_str_covers_all_gpu_types() -> None: + """Every GPUType enum value must have a pod string mapping.""" + for gpu_type in GPUType: + assert gpu_type in GPU_TYPE_TO_POD_STR + + +def test_app_to_workflow_has_expected_entries() -> None: + """Key StreamWise apps should map to workflows.""" + assert "streamcast" in APP_TO_WORKFLOW + assert "streampersona" in APP_TO_WORKFLOW + assert "streamchat" in APP_TO_WORKFLOW + + +# --------------------------------------------------------------------------- +# Utility functions +# --------------------------------------------------------------------------- + +def test_get_available_workflows() -> None: + workflows = get_available_workflows() + assert isinstance(workflows, list) + assert "streamcast" in workflows + assert len(workflows) >= 5 + + +def test_get_available_gpu_types() -> None: + gpu_types = get_available_gpu_types() + assert isinstance(gpu_types, list) + assert "A100" in gpu_types + assert "H100" in gpu_types + + +# --------------------------------------------------------------------------- +# result_to_deployment_specs +# --------------------------------------------------------------------------- + +def test_result_to_deployment_specs_basic() -> None: + """A simple result with one active allocation maps to the right container.""" + models = { + GPUType.A100: { + Model.GEMMA: [GemmaModelAllocation(gpu_type=GPUType.A100, devices=1, replicas=1)], + Model.FLUX: [FluxModelAllocation(gpu_type=GPUType.A100, devices=2, replicas=1)], + Model.HF: [HFModelAllocation(gpu_type=GPUType.A100, devices=2, replicas=2)], + Model.HF_VAE: [HFVAEModelAllocation(gpu_type=GPUType.A100, devices=1, replicas=1)], + Model.FT: [FTModelAllocation(gpu_type=GPUType.A100, devices=1, replicas=0)], + Model.FT_VAE: [], + Model.UPSCALER: [UpscalerModelAllocation(gpu_type=GPUType.A100, devices=1, replicas=0)], + Model.OTHERS: [OthersModelAllocation(gpu_type=GPUType.A100, devices=1, replicas=1)], + } + } + result = Result( + total_time_s=100.0, + ttff_s=10.0, + cost=1.0, + gpus_used={GPUType.A100: 8}, + gpus_total={GPUType.A100: 8}, + models=models, + ) + + specs = result_to_deployment_specs(result) + assert isinstance(specs, list) + assert len(specs) > 0 + + container_names = [s.container_name for s in specs] + assert "gemma" in container_names + assert "flux" in container_names + assert "hunyuanframepackf1" in container_names # HF model + assert "hunyuanframepackvae" in container_names # HF_VAE model + + # OTHERS maps to kokoro + yolo + assert "kokoro" in container_names + assert "yolo" in container_names + + # Check GPU type mapping + gemma_spec = next(s for s in specs if s.container_name == "gemma") + assert gemma_spec.gpu_type == "a100" + assert gemma_spec.gpu == 1 + + # Without MIG, kokoro gets no mig_profile (full GPU) + kokoro_spec = next(s for s in specs if s.container_name == "kokoro") + assert kokoro_spec.mig_profile is None + assert kokoro_spec.gpu == 1 + + # With disaggregation=True for HF, VAE runs on its own GPU + vae_spec = next(s for s in specs if s.container_name == "hunyuanframepackvae") + assert vae_spec.gpu == 1 + + +def test_result_to_deployment_specs_skips_zero_replicas() -> None: + """Allocations with zero replicas should not produce deployment specs.""" + models = { + GPUType.A100: { + Model.GEMMA: [GemmaModelAllocation(gpu_type=GPUType.A100, devices=1, replicas=0)], + Model.FLUX: [FluxModelAllocation(gpu_type=GPUType.A100, devices=1, replicas=0)], + Model.HF: [HFModelAllocation(gpu_type=GPUType.A100, devices=1, replicas=0)], + Model.HF_VAE: [], + Model.FT: [], + Model.FT_VAE: [], + Model.UPSCALER: [], + Model.OTHERS: [], + } + } + result = Result( + total_time_s=0.0, + ttff_s=0.0, + cost=0.0, + gpus_used={GPUType.A100: 0}, + gpus_total={GPUType.A100: 8}, + models=models, + ) + specs = result_to_deployment_specs(result) + assert specs == [] + + +def test_result_to_deployment_specs_multiple_replicas() -> None: + """Multiple replicas should produce multiple deployment specs for same container.""" + models = { + GPUType.H100: { + Model.GEMMA: [GemmaModelAllocation(gpu_type=GPUType.H100, devices=1, replicas=1)], + Model.FLUX: [FluxModelAllocation(gpu_type=GPUType.H100, devices=1, replicas=1)], + Model.HF: [HFModelAllocation(gpu_type=GPUType.H100, devices=2, replicas=3)], + Model.HF_VAE: [], + Model.FT: [], + Model.FT_VAE: [], + Model.UPSCALER: [], + Model.OTHERS: [], + } + } + result = Result( + total_time_s=50.0, + ttff_s=5.0, + cost=0.5, + gpus_used={GPUType.H100: 8}, + gpus_total={GPUType.H100: 16}, + models=models, + ) + specs = result_to_deployment_specs(result) + hf_specs = [s for s in specs if s.container_name == "hunyuanframepackf1"] + assert len(hf_specs) == 3 # 3 replicas + for spec in hf_specs: + assert spec.gpu == 2 + assert spec.gpu_type == "h100" + + +# --------------------------------------------------------------------------- +# deployment_plan_to_json +# --------------------------------------------------------------------------- + +def test_deployment_plan_to_json() -> None: + """Serialization should produce all expected keys.""" + result = Result( + total_time_s=100.0, + ttff_s=10.0, + cost=1.5, + gpus_used={GPUType.A100: 8}, + gpus_total={GPUType.A100: 8}, + models={}, + ) + plan = DeploymentPlan( + specs=[ + DeploymentSpec( + container_name="gemma", cpu=16, memory_gib=192, + ephemeral_storage_gib=64, gpu=2, gpu_type="a100", mig_profile=None) + ], + result=result, + workflow_name="streamcast", + gpu_budget={"A100": 8}, + ) + data = deployment_plan_to_json(plan) + assert data["workflow_name"] == "streamcast" + assert data["gpu_budget"] == {"A100": 8} + assert data["metrics"]["total_time_s"] == 100.0 + assert data["metrics"]["ttff_s"] == 10.0 + assert len(data["specs"]) == 1 + assert data["specs"][0]["container_name"] == "gemma" + + +# --------------------------------------------------------------------------- +# run_allocator (integration with real data) +# --------------------------------------------------------------------------- + +def test_run_allocator_streamcast_8_a100() -> None: + """Run allocator for StreamCast with 8 A100s β€” should produce a valid plan.""" + plan = run_allocator( + gpu_budget={"A100": 8}, + workflow_name="streamcast", + ) + assert isinstance(plan, DeploymentPlan) + assert len(plan.specs) > 0 + assert plan.result.total_time_s > 0 + assert plan.result.ttff_s > 0 + assert plan.workflow_name == "streamcast" + + +def test_run_allocator_streamchat_8_h100() -> None: + """Run allocator for StreamChat with 8 H100s.""" + plan = run_allocator( + gpu_budget={"H100": 8}, + workflow_name="streamchat", + ) + assert isinstance(plan, DeploymentPlan) + assert len(plan.specs) > 0 + + +def test_run_allocator_invalid_workflow() -> None: + """Unknown workflow name raises ValueError.""" + with pytest.raises(ValueError, match="Unknown workflow"): + run_allocator(gpu_budget={"A100": 8}, workflow_name="nonexistent") + + +def test_run_allocator_invalid_gpu_type() -> None: + """Unknown GPU type raises ValueError.""" + with pytest.raises(ValueError, match="Unknown GPU type"): + run_allocator(gpu_budget={"RTX4090": 8}, workflow_name="streamcast") + + +def test_run_allocator_insufficient_gpus() -> None: + """Too few GPUs raises ValueError.""" + with pytest.raises(ValueError, match="at least 8"): + run_allocator(gpu_budget={"A100": 4}, workflow_name="streamcast") diff --git a/tests/streamwise/test_streamwise.py b/tests/streamwise/test_streamwise.py index 6ec7d09c..41df2012 100644 --- a/tests/streamwise/test_streamwise.py +++ b/tests/streamwise/test_streamwise.py @@ -730,3 +730,76 @@ def test_set_verify_ssl_true() -> None: assert http_session_manager.VERIFY_SSL is True finally: http_session_manager.set_verify_ssl(original) + + +@pytest.mark.asyncio +async def test_api_cluster_gpus_aggregates_by_type() -> None: + """The cluster_gpus endpoint aggregates GPU counts by canonical type name.""" + mock_nodes = [ + { + "node_name": "h100-node-0", + "is_ready": True, + "gpu_model": "NVIDIA-H100-80GB-HBM3", + "allocatable_resources": {"gpu": "8"}, + }, + { + "node_name": "h100-node-1", + "is_ready": True, + "gpu_model": "NVIDIA-H100-80GB-HBM3", + "allocatable_resources": {"gpu": "8"}, + }, + { + "node_name": "a100-node-0", + "is_ready": True, + "gpu_model": "NVIDIA A100-SXM4-80GB", + "allocatable_resources": {"gpu": "8"}, + }, + { + "node_name": "cpu-node", + "is_ready": True, + "gpu_model": "N/A", + "allocatable_resources": {"gpu": "0"}, + }, + ] + client = _get_client() + with patch("streamwise.streamwise.get_k8s_nodes", new=AsyncMock(return_value=mock_nodes)): + response = await client.get("/api/auto_deploy/cluster_gpus") + assert response.status_code == HTTPStatus.OK + data = await response.get_json() + assert data["gpu_budget"] == {"H100": 16, "A100": 8} + + +@pytest.mark.asyncio +async def test_api_cluster_gpus_skips_not_ready_nodes() -> None: + """The cluster_gpus endpoint skips nodes that are not ready.""" + mock_nodes = [ + { + "node_name": "h100-node-0", + "is_ready": True, + "gpu_model": "NVIDIA-H100-80GB-HBM3", + "allocatable_resources": {"gpu": "8"}, + }, + { + "node_name": "h100-node-1", + "is_ready": False, + "gpu_model": "NVIDIA-H100-80GB-HBM3", + "allocatable_resources": {"gpu": "8"}, + }, + ] + client = _get_client() + with patch("streamwise.streamwise.get_k8s_nodes", new=AsyncMock(return_value=mock_nodes)): + response = await client.get("/api/auto_deploy/cluster_gpus") + assert response.status_code == HTTPStatus.OK + data = await response.get_json() + assert data["gpu_budget"] == {"H100": 8} + + +@pytest.mark.asyncio +async def test_api_cluster_gpus_empty_cluster() -> None: + """The cluster_gpus endpoint returns empty budget for a cluster with no GPU nodes.""" + client = _get_client() + with patch("streamwise.streamwise.get_k8s_nodes", new=AsyncMock(return_value=[])): + response = await client.get("/api/auto_deploy/cluster_gpus") + assert response.status_code == HTTPStatus.OK + data = await response.get_json() + assert data["gpu_budget"] == {} diff --git a/tests/streamwise/test_streamwise_auto_deploy.py b/tests/streamwise/test_streamwise_auto_deploy.py new file mode 100644 index 00000000..7c6b630e --- /dev/null +++ b/tests/streamwise/test_streamwise_auto_deploy.py @@ -0,0 +1,254 @@ +""" +Tests for the auto-deploy API endpoints in streamwise.py. + +Covers: +- POST /api/auto_deploy β€” returns optimized plan. +- POST /api/auto_deploy/confirm β€” deploys the plan. +- GET /api/auto_deploy/workflows β€” lists available options. +- Error cases (missing fields, invalid inputs). +""" + +from __future__ import annotations + +import sys + +import pytest + +from http import HTTPStatus +from unittest.mock import patch + +from tests.test_utils import temp_sys_path +from tests.k8s_mock import K8sMock + +mock_k8s = K8sMock() + +mock_modules = {} +mock_modules.update(mock_k8s.get_sub_modules()) + +import streamwise.http_session_manager # noqa: F401 β€” registers the streamwise package + +# Permanently inject K8s mocks into sys.modules (not via context manager) +# so that simulator modules loaded alongside streamwise remain importable +# after setup completes. +_original_modules = {} +for mod_name, mock_mod in mock_modules.items(): + _original_modules[mod_name] = sys.modules.get(mod_name) + sys.modules[mod_name] = mock_mod + +with temp_sys_path("streamwise"): + from streamwise import streamwise as sw + + +def _get_client(): # type: ignore[no-untyped-def] + app = sw.app + return app.test_client() + + +@pytest.fixture(scope="function", autouse=True) +def setup_k8s_cluster() -> None: + sw.k8s_cluster = "unittest" + sw.use_https = False + + +# --------------------------------------------------------------------------- +# GET /api/auto_deploy/workflows +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_auto_deploy_workflows() -> None: + """Should return available workflows and GPU types.""" + client = _get_client() + response = await client.get("/api/auto_deploy/workflows") + assert response.status_code == HTTPStatus.OK + data = await response.get_json() + assert "workflows" in data + assert "gpu_types" in data + assert "streamcast" in data["workflows"] + assert "A100" in data["gpu_types"] + + +# --------------------------------------------------------------------------- +# POST /api/auto_deploy +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_auto_deploy_success() -> None: + """Valid request returns an optimized deployment plan.""" + fake_json = { + "workflow_name": "streamcast", + "gpu_budget": {"A100": 8}, + "metrics": {"total_time_s": 3.5, "ttff_s": 1.0, "cost": 12.0, "gpus_used": {"A100": 3}}, + "specs": [ + {"container_name": "gemma", "cpu": 4, "memory_gib": 16, + "ephemeral_storage_gib": 10, "gpu": 1, "gpu_type": "A100", "mig_profile": None}, + {"container_name": "flux", "cpu": 4, "memory_gib": 16, + "ephemeral_storage_gib": 10, "gpu": 2, "gpu_type": "A100", "mig_profile": None}, + ], + } + # Patch on the actual module object that streamwise.py holds a reference to. + with patch.object(sw.allocator_bridge, "run_allocator") as mock_alloc, \ + patch.object(sw.allocator_bridge, "deployment_plan_to_json", return_value=fake_json): + mock_alloc.return_value = "fake_plan" + client = _get_client() + response = await client.post( + "/api/auto_deploy", + json={ + "gpu_budget": {"A100": 8}, + "workflow": "streamcast", + }, + ) + assert response.status_code == HTTPStatus.OK + data = await response.get_json() + assert "specs" in data + assert "metrics" in data + assert len(data["specs"]) == 2 + assert data["metrics"]["total_time_s"] == 3.5 + + +@pytest.mark.asyncio +async def test_auto_deploy_missing_gpu_budget() -> None: + """Missing gpu_budget field returns 400.""" + client = _get_client() + response = await client.post( + "/api/auto_deploy", + json={"workflow": "streamcast"}, + ) + assert response.status_code == HTTPStatus.BAD_REQUEST + + +@pytest.mark.asyncio +async def test_auto_deploy_missing_workflow() -> None: + """Missing workflow field returns 400.""" + client = _get_client() + response = await client.post( + "/api/auto_deploy", + json={"gpu_budget": {"A100": 8}}, + ) + assert response.status_code == HTTPStatus.BAD_REQUEST + + +@pytest.mark.asyncio +async def test_auto_deploy_invalid_workflow() -> None: + """Invalid workflow name returns 400.""" + client = _get_client() + response = await client.post( + "/api/auto_deploy", + json={ + "gpu_budget": {"A100": 8}, + "workflow": "nonexistent", + }, + ) + assert response.status_code == HTTPStatus.BAD_REQUEST + data = await response.get_json() + assert "error" in data + + +@pytest.mark.asyncio +async def test_auto_deploy_insufficient_gpus() -> None: + """Too few GPUs returns 400.""" + client = _get_client() + response = await client.post( + "/api/auto_deploy", + json={ + "gpu_budget": {"A100": 2}, + "workflow": "streamcast", + }, + ) + assert response.status_code == HTTPStatus.BAD_REQUEST + + +@pytest.mark.asyncio +async def test_auto_deploy_no_json_body() -> None: + """No JSON body returns 400.""" + client = _get_client() + response = await client.post("/api/auto_deploy") + assert response.status_code == HTTPStatus.BAD_REQUEST + + +# --------------------------------------------------------------------------- +# POST /api/auto_deploy/confirm +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_auto_deploy_confirm_success() -> None: + """Valid confirm request deploys containers.""" + client = _get_client() + specs = [ + { + "container_name": "gemma", + "cpu": 16, + "memory_gib": 192, + "ephemeral_storage_gib": 64, + "gpu": 2, + "gpu_type": "a100", + "mig_profile": None, + }, + { + "container_name": "flux", + "cpu": 12, + "memory_gib": 128, + "ephemeral_storage_gib": 64, + "gpu": 2, + "gpu_type": "a100", + "mig_profile": None, + }, + ] + with patch.object(sw.pod_manager, "add_pod") as mock_add_pod: + response = await client.post( + "/api/auto_deploy/confirm", + json={"specs": specs}, + ) + # Should succeed without invoking the real pod_manager.add_pod flow + assert response.status_code in (HTTPStatus.OK, HTTPStatus.MULTI_STATUS) + data = await response.get_json() + assert "deployed" in data + assert "message" in data + assert mock_add_pod.call_count == len(specs) + + +@pytest.mark.asyncio +async def test_auto_deploy_confirm_missing_specs() -> None: + """Missing specs returns 400.""" + client = _get_client() + response = await client.post( + "/api/auto_deploy/confirm", + json={}, + ) + assert response.status_code == HTTPStatus.BAD_REQUEST + + +@pytest.mark.asyncio +async def test_auto_deploy_confirm_tracks_add_pod_status_failures() -> None: + """Non-2xx add_pod return statuses are surfaced as deployment errors.""" + client = _get_client() + specs = [ + {"container_name": "gemma", "gpu": 2, "gpu_type": "a100"}, + {"container_name": "flux", "gpu": 2, "gpu_type": "a100"}, + ] + with patch.object( + sw.pod_manager, + "add_pod", + side_effect=[ + (None, HTTPStatus.OK), + (None, HTTPStatus.BAD_REQUEST), + ], + ): + response = await client.post("/api/auto_deploy/confirm", json={"specs": specs}) + + assert response.status_code == HTTPStatus.MULTI_STATUS + data = await response.get_json() + assert data["deployed"] == ["gemma"] + assert len(data["errors"]) == 1 + assert "flux" in data["errors"][0] + assert "status=400" in data["errors"][0] + + +@pytest.mark.asyncio +async def test_auto_deploy_confirm_empty_specs() -> None: + """Empty specs list returns 400.""" + client = _get_client() + response = await client.post( + "/api/auto_deploy/confirm", + json={"specs": []}, + ) + assert response.status_code == HTTPStatus.BAD_REQUEST