-
Notifications
You must be signed in to change notification settings - Fork 0
Add auto-deploy feature to StreamWise dashboard #313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
James-QiuHaoran
wants to merge
41
commits into
main
Choose a base branch
from
hqiu/auto-deploy
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
41 commits
Select commit
Hold shift + click to select a range
e971662
Refactor: move model provisioning policies to streamwise/model_provis…
3c324f7
Fix model_provisioner __init__.py to support Docker layout
8f9bc0b
Add auto-deploy feature to StreamWise dashboard
124d7cb
Fix: ensure model_provisioner is importable regardless of working dir…
38997c3
Fix: include model_provisioner and simulator in Docker image
c8f5f2a
Add pandas and tabulate to StreamWise requirements
fc1abc9
Add fallback for GPUs that do not support MIG
James-QiuHaoran 367760b
Move run_allocator to async call
James-QiuHaoran 81ec0a9
Mock k8s API
James-QiuHaoran 32461f8
Add GPU count check
James-QiuHaoran 28e322e
Restore cwd-relative default for simulator data loading
Copilot 0bd57d9
Fix auto-deploy confirm handling for add_pod error statuses
Copilot efcbf80
Merge branch 'hqiu/refactor-model-provisioner' into hqiu/auto-deploy
goiri bc173a2
Fix the data path
1cd5940
Validate budget
d01ee1c
Better error message
0c3f95e
Merge branch 'main' into hqiu/auto-deploy
goiri 14c2aee
Auto-populate GPU budget from cluster state in auto-deploy UI
7e33454
Document GPU Spot node setup: toleration patch and node labels
3a16c29
Fix: default GPU budget to 0, let cluster state populate values
2075db8
Fix allocator budget: OTHERS=2 without MIG, co-locate HF_VAE
b239abc
Update test to reflect MIG_AVAILABLE=False behavior
4713651
Fix per-type GPU budget overflow by trimming excess replicas
485f579
Add skills on demo end-to-end
9d1637b
Fixed lint and comments
8886baf
Fixed data path
cb82b02
Move auto-deploy to standalone page and deploy app container
1ee2801
Fix data dir path: use _HERE instead of _REPO_ROOT for container compat
5d38cf8
Fix allocator: round budget up to server multiples before calling all…
6ce6736
Add 'Delete All' button for wrapper pods on main page
eb3b992
Redesign GPU budget UI: dynamic rows from cluster state
9452ff3
Exclude streamwise and app pods from 'Delete All Wrappers'
b514fbc
Format GPUs Needed as '30 H100s' instead of JSON
d2ae170
Use services.json friendly names and uppercase GPU types in auto-depl…
35af577
Hide co-located containers (gpu=0) from auto-deploy results table
e64f091
Fix: allocate GPU for VAE when HF disaggregation is enabled
9de7648
Replace GPU count text box with range slider (8-100)
19b539d
Left TODOs
03d7d7b
Remove Skills and README changes
9e45542
Fix tests
df2b243
Update auto deploy webpage icon to robot
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,368 @@ | ||
| """ | ||
| Bridge between the model provisioner's allocator output and StreamWise pod deployment. | ||
|
|
||
| Translates ModelAllocation results (abstract Model enum + GPU counts) into concrete | ||
| container deployment parameters compatible with pod_manager.add_pod(). | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import sys | ||
| import os | ||
|
|
||
| # Ensure the directory containing this file is on sys.path so model_provisioner is importable. | ||
| _HERE = os.path.dirname(os.path.abspath(__file__)) | ||
| if _HERE not in sys.path: | ||
| sys.path.insert(0, _HERE) | ||
| _REPO_ROOT = os.path.dirname(_HERE) | ||
| if _REPO_ROOT not in sys.path: | ||
| sys.path.insert(0, _REPO_ROOT) | ||
| import model_provisioner # noqa: E402, F401 — adds simulator/ to sys.path | ||
|
James-QiuHaoran marked this conversation as resolved.
Dismissed
|
||
|
|
||
| from dataclasses import dataclass | ||
| from typing import Optional | ||
|
|
||
| from sim_types import GPUType | ||
| from sim_types import Model | ||
| from sim_types import Result | ||
|
|
||
| from auto_model_allocator import AutoModelAllocator | ||
| from container_config import COLOCATED_CONTAINERS | ||
| from container_config import CONTAINER_RESOURCES | ||
| from container_config import GPU_TYPE_TO_POD_STR | ||
| from container_config import MIG_AVAILABLE | ||
| from container_config import MIG_CAPABLE_GPU_TYPES | ||
| from container_config import MIG_CONTAINERS | ||
| from data_loading import load_latency_data | ||
| from model_provisioner.policies import STREAMWISE_POLICY | ||
| from streamwise_apps import STREAMWISE_APPS | ||
| from workflows import WORKFLOWS | ||
|
|
||
|
|
||
| # Mapping from simulator Model enum to concrete container names used by pod_manager. | ||
| # Some Model entries map to multiple containers (e.g., OTHERS -> kokoro + yolo). | ||
| MODEL_TO_CONTAINERS: dict[Model, list[str]] = { | ||
|
James-QiuHaoran marked this conversation as resolved.
|
||
| Model.GEMMA: ["gemma"], | ||
| Model.FLUX: ["flux"], | ||
| Model.HF: ["hunyuanframepackf1"], | ||
| Model.HF_VAE: ["hunyuanframepackvae"], | ||
| Model.FT: ["fantasytalking"], | ||
| Model.FT_VAE: [], # FT_VAE is handled within fantasytalking container | ||
| Model.UPSCALER: ["realesrgan"], | ||
| Model.OTHERS: ["kokoro", "yolo"], | ||
| } | ||
|
|
||
|
|
||
| def get_mig_profile(container_name: str, gpu_type: GPUType) -> Optional[str]: | ||
| """Return a MIG profile only when MIG is available and the GPU type supports it.""" | ||
| if not MIG_AVAILABLE: | ||
| return None | ||
| if gpu_type not in MIG_CAPABLE_GPU_TYPES: | ||
| return None | ||
| return MIG_CONTAINERS.get(container_name) | ||
|
|
||
|
|
||
| # Mapping from StreamWise app name to simulator workflow key | ||
| APP_TO_WORKFLOW: dict[str, str] = { | ||
|
James-QiuHaoran marked this conversation as resolved.
|
||
| "streamcast": "podcast", | ||
| "streampersona": "slide", | ||
| "streamchat": "chat", | ||
| "streamshort": "short", | ||
| "streammovie": "movie", | ||
| "streamanimate": "story", | ||
| "streamlecture": "lecture", | ||
| "streamdub": "dubbing", | ||
| "streamedit": "editing", | ||
| } | ||
|
|
||
| # Ensure allocator knows about all StreamWise apps (catch drift early). | ||
| assert set(APP_TO_WORKFLOW.keys()) == set(STREAMWISE_APPS), ( | ||
| f"APP_TO_WORKFLOW keys {set(APP_TO_WORKFLOW.keys())} != STREAMWISE_APPS {set(STREAMWISE_APPS)}" | ||
| ) | ||
|
|
||
|
|
||
| @dataclass | ||
| class DeploymentSpec: | ||
| """A single container deployment specification.""" | ||
| container_name: str | ||
| cpu: int | ||
| memory_gib: int | ||
| ephemeral_storage_gib: int | ||
| gpu: int | ||
| gpu_type: Optional[str] | ||
| mig_profile: Optional[str] | ||
|
|
||
|
|
||
| @dataclass | ||
| class DeploymentPlan: | ||
| """Complete deployment plan produced by the auto-allocator.""" | ||
| specs: list[DeploymentSpec] | ||
| result: Result | ||
| workflow_name: str | ||
| gpu_budget: dict[str, int] | ||
|
|
||
|
|
||
| def _get_data_dir() -> str: | ||
| """Get the path to the simulator data directory.""" | ||
| default_path = os.path.join(_REPO_ROOT, "simulator", "data") | ||
| return os.getenv("SIMULATOR_DATA_DIR", default_path) | ||
|
|
||
|
|
||
| # Reverse mapping from pod gpu_type string to GPUType enum | ||
| _POD_STR_TO_GPU_TYPE: dict[str, GPUType] = {v: k for k, v in GPU_TYPE_TO_POD_STR.items()} | ||
|
|
||
|
|
||
| def _calc_actual_gpus_per_type(specs: list['DeploymentSpec']) -> dict[GPUType, int]: | ||
| """Calculate actual GPUs needed per GPUType from deployment specs.""" | ||
| result: dict[GPUType, int] = {} | ||
| for spec in specs: | ||
| if spec.mig_profile: | ||
| continue | ||
| gpu_type = _POD_STR_TO_GPU_TYPE.get(spec.gpu_type or "") | ||
| if gpu_type is not None: | ||
| result[gpu_type] = result.get(gpu_type, 0) + spec.gpu | ||
| return result | ||
|
|
||
|
|
||
| def _trim_specs_for_type( | ||
| specs: list['DeploymentSpec'], gpu_type_str: str, excess: int | ||
| ) -> list['DeploymentSpec']: | ||
| """ | ||
| Remove replicas from specs to reduce GPU usage on a specific type by `excess` GPUs. | ||
|
|
||
| Prefers removing replicas of the most-replicated scalable container (typically | ||
| realesrgan/upscaler) to minimize impact on pipeline throughput. | ||
| """ | ||
| # Count replicas per container on this GPU type (only scalable ones) | ||
| from collections import Counter | ||
| type_counts: Counter[str] = Counter() | ||
| for spec in specs: | ||
| if spec.gpu_type == gpu_type_str and spec.gpu > 0 and spec.container_name not in COLOCATED_CONTAINERS: | ||
| type_counts[spec.container_name] += 1 | ||
|
|
||
| # Prefer trimming containers with most replicas (least impact per removal) | ||
| trimmed = 0 | ||
| result_specs = list(specs) | ||
| for container_name, _count in type_counts.most_common(): | ||
| if trimmed >= excess: | ||
| break | ||
| # Remove replicas from the end of the list | ||
| for i in range(len(result_specs) - 1, -1, -1): | ||
| if trimmed >= excess: | ||
| break | ||
| spec = result_specs[i] | ||
| if (spec.container_name == container_name | ||
| and spec.gpu_type == gpu_type_str | ||
| and spec.gpu > 0): | ||
| trimmed += spec.gpu | ||
| result_specs.pop(i) | ||
| return result_specs | ||
|
|
||
|
|
||
| def get_available_workflows() -> list[str]: | ||
| """Return list of available workflow names for the UI.""" | ||
| return list(APP_TO_WORKFLOW.keys()) | ||
|
|
||
|
|
||
| def get_available_gpu_types() -> list[str]: | ||
| """Return list of available GPU type strings for the UI.""" | ||
| return [gpu_type.value for gpu_type in GPUType] | ||
|
James-QiuHaoran marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def run_allocator( | ||
| gpu_budget: dict[str, int], | ||
| workflow_name: str, | ||
| ) -> DeploymentPlan: | ||
| """ | ||
| Run the greedy model allocator and return a deployment plan. | ||
|
|
||
| Args: | ||
| gpu_budget: GPU counts keyed by GPU type string (e.g., {"A100": 8, "H100": 0}). | ||
| workflow_name: StreamWise app name (e.g., "streamcast"). | ||
|
|
||
| Returns: | ||
| DeploymentPlan with concrete container deployment specs. | ||
|
|
||
| Raises: | ||
| ValueError: If workflow_name or GPU types are invalid. | ||
| """ | ||
| # Validate workflow | ||
| workflow_key = APP_TO_WORKFLOW.get(workflow_name) | ||
| if workflow_key is None: | ||
| raise ValueError( | ||
| f"Unknown workflow '{workflow_name}'. " | ||
| f"Available: {list(APP_TO_WORKFLOW.keys())}") | ||
|
|
||
| workflow = WORKFLOWS[workflow_key] | ||
|
|
||
| # Parse GPU budget into GPUType enum | ||
| num_gpus: dict[GPUType, int] = {} | ||
| for gpu_str, count in gpu_budget.items(): | ||
| try: | ||
| gpu_type = GPUType(gpu_str) | ||
|
James-QiuHaoran marked this conversation as resolved.
|
||
| except ValueError: | ||
| raise ValueError( | ||
| f"Unknown GPU type '{gpu_str}'. " | ||
| f"Available: {[g.value for g in GPUType]}") | ||
| if count > 0: | ||
| num_gpus[gpu_type] = count | ||
|
|
||
| if not num_gpus or sum(num_gpus.values()) < 8: | ||
| raise ValueError("Total GPU budget must be at least 8 GPUs.") | ||
|
|
||
| # The allocator requires GPU counts to be multiples of NUM_GPUS_PER_SERVER (8). | ||
| # Round up for the allocator, then trim back to the real budget afterward. | ||
| import math | ||
| from constants import NUM_GPUS_PER_SERVER | ||
| allocator_gpus: dict[GPUType, int] = {} | ||
| for gpu_type, count in num_gpus.items(): | ||
| server_size = NUM_GPUS_PER_SERVER[gpu_type] | ||
| allocator_gpus[gpu_type] = math.ceil(count / server_size) * server_size | ||
|
|
||
| # Load latency data and run allocator | ||
| data_dir = _get_data_dir() | ||
| latency_data = load_latency_data(data_dir=data_dir) | ||
|
|
||
| allocator = AutoModelAllocator( | ||
| workflow=workflow, | ||
| latency_data=latency_data, | ||
| policy=STREAMWISE_POLICY, | ||
| ) | ||
|
|
||
| result = allocator.allocate(num_gpus=allocator_gpus, verbose=False) | ||
|
|
||
| # Convert result to deployment specs | ||
| specs = result_to_deployment_specs(result) | ||
|
|
||
| # Trim deployment specs back to the user's actual budget. | ||
| # Also handles MIG-unavailable overflow (e.g., OTHERS allocates 1 GPU | ||
| # but kokoro+yolo each need a full GPU = 2). | ||
| actual_per_type = _calc_actual_gpus_per_type(specs) | ||
| for gpu_type, budget_count in num_gpus.items(): | ||
| actual = actual_per_type.get(gpu_type, 0) | ||
| if actual <= budget_count: | ||
| continue | ||
| excess = actual - budget_count | ||
| gpu_type_str = GPU_TYPE_TO_POD_STR[gpu_type] | ||
| specs = _trim_specs_for_type(specs, gpu_type_str, excess) | ||
|
|
||
| return DeploymentPlan( | ||
| specs=specs, | ||
| result=result, | ||
| workflow_name=workflow_name, | ||
| gpu_budget=gpu_budget, | ||
| ) | ||
|
|
||
|
|
||
| def result_to_deployment_specs(result: Result) -> list[DeploymentSpec]: | ||
| """ | ||
| Convert an allocator Result into a list of DeploymentSpec objects. | ||
|
|
||
| Each ModelAllocation with replicas > 0 is mapped to one or more container deployments. | ||
| When MIG is unavailable, containers that would normally use MIG slices get 1 full GPU instead. | ||
| """ | ||
| specs: list[DeploymentSpec] = [] | ||
|
|
||
| for gpu_type, model_dict in result.models.items(): | ||
| gpu_type_str = GPU_TYPE_TO_POD_STR[gpu_type] | ||
|
|
||
| for model, allocations in model_dict.items(): | ||
| containers = MODEL_TO_CONTAINERS.get(model, []) | ||
| if not containers: | ||
| continue | ||
|
|
||
| for allocation in allocations: | ||
| if allocation.replicas <= 0: | ||
| continue | ||
|
|
||
| for container_name in containers: | ||
| resources = CONTAINER_RESOURCES.get(container_name, (4, 16, 16)) | ||
| cpu, memory_gib, ephemeral_storage_gib = resources | ||
|
|
||
| mig_profile: Optional[str] = None | ||
| # Co-locate VAE only when disaggregation is disabled | ||
| # TODO: make disaggregation a configuration exposed to the users | ||
| is_colocated = ( | ||
| container_name in COLOCATED_CONTAINERS | ||
| and not STREAMWISE_POLICY.disaggregation.get(Model.HF, False) | ||
| ) | ||
| if is_colocated: | ||
| gpu_count = 0 | ||
| elif MIG_AVAILABLE and container_name in MIG_CONTAINERS: | ||
| mig_profile = MIG_CONTAINERS[container_name] | ||
| gpu_count = 1 | ||
| elif container_name in MIG_CONTAINERS: | ||
| # MIG not available: use 1 full GPU instead of a MIG slice | ||
| gpu_count = 1 | ||
| else: | ||
| gpu_count = allocation.devices | ||
|
|
||
| for _ in range(allocation.replicas): | ||
| specs.append(DeploymentSpec( | ||
| container_name=container_name, | ||
| cpu=cpu, | ||
| memory_gib=memory_gib, | ||
| ephemeral_storage_gib=ephemeral_storage_gib, | ||
| gpu=gpu_count, | ||
| gpu_type=gpu_type_str, | ||
| mig_profile=mig_profile, | ||
| )) | ||
|
|
||
| return specs | ||
|
|
||
|
|
||
| def deployment_plan_to_json(plan: DeploymentPlan) -> dict: | ||
| """Serialize a DeploymentPlan to a JSON-friendly dict.""" | ||
| # Calculate actual GPUs used by the deployment specs (may differ from allocator | ||
| # when MIG is unavailable and services fall back to full GPUs). | ||
| actual_gpus: dict[str, int] = {} | ||
| for spec in plan.specs: | ||
| if spec.mig_profile: | ||
| continue # MIG slices don't count against full GPU budget | ||
| gpu_type_key = spec.gpu_type or "unknown" | ||
| actual_gpus[gpu_type_key] = actual_gpus.get(gpu_type_key, 0) + spec.gpu | ||
|
|
||
| total_budget = sum(plan.gpu_budget.values()) | ||
| total_actual = sum(actual_gpus.values()) | ||
| budget_exceeded = total_actual > total_budget | ||
|
|
||
| warnings: list[str] = [] | ||
| if budget_exceeded: | ||
| mig_hint = ( | ||
| "Enable MIG to fit lightweight services (kokoro, yolo, realesrgan) " | ||
| "on shared GPU slices." | ||
| ) if not MIG_AVAILABLE else "" | ||
| warnings.append( | ||
| f"Deployment requires {total_actual} full GPUs but budget is " | ||
| f"{total_budget}. {mig_hint}" | ||
| ) | ||
|
|
||
| return { | ||
| "workflow_name": plan.workflow_name, | ||
| "gpu_budget": plan.gpu_budget, | ||
| "metrics": { | ||
| "total_time_s": round(plan.result.total_time_s, 2), | ||
| "ttff_s": round(plan.result.ttff_s, 2), | ||
| "cost": round(plan.result.cost, 4), | ||
| "gpus_used": { | ||
| gpu_type.value: count | ||
| for gpu_type, count in plan.result.gpus_used.items() | ||
| }, | ||
| "actual_gpus_needed": actual_gpus, | ||
| "budget_exceeded": budget_exceeded, | ||
| }, | ||
| "warnings": warnings, | ||
| "mig_available": MIG_AVAILABLE, | ||
| "specs": [ | ||
| { | ||
| "container_name": spec.container_name, | ||
| "cpu": spec.cpu, | ||
| "memory_gib": spec.memory_gib, | ||
| "ephemeral_storage_gib": spec.ephemeral_storage_gib, | ||
| "gpu": spec.gpu, | ||
| "gpu_type": spec.gpu_type, | ||
| "mig_profile": spec.mig_profile, | ||
| } | ||
| for spec in plan.specs | ||
| ], | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.