Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions contributing/BACKENDS.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ git clone https://github.com/dstackai/gpuhunt.git

- **Offline providers** offer static machine configurations that are not frequently updated.
`gpuhunt` collects offline providers' instance offers on an hourly basis.
Examples: `aws`, `gcp`, `azure`, etc.
Examples: `aws`, `gcp`, `azure`, etc.
- **Online providers** offer dynamic machine configurations that are available at the very moment
when you fetch configurations (e.g., GPU marketplaces).
`gpuhunt` collects online providers' instance offers each time a `dstack` user provisions a new instance.
Examples: `tensordock`, `vastai`, etc.

### 1.3. Create the provider class

Create the provider class file under `src/gpuhunt/providers`.
Create the provider class file under `src/gpuhunt/providers`.

Make sure your class extends the [`AbstractProvider`](https://github.com/dstackai/gpuhunt/blob/main/src/gpuhunt/providers/__init__.py)
base class. See its docstrings for descriptions of the methods that your class should implement.
Expand Down Expand Up @@ -69,13 +69,13 @@ Refer to examples: [test_datacrunch.py](https://github.com/dstackai/gpuhunt/blob

### 1.6. Submit a pull request

Once the cloud provider is added, submit a pull request.
Once the cloud provider is added, submit a pull request.

> Anything unclear? Ask questions on the [Discord server](https://discord.gg/u8SmfwPpMd).

## 2. Integrate the cloud provider to dstackai/dstack

Once the provider is added to `gpuhunt`, we can proceed with implementing
Once the provider is added to `gpuhunt`, we can proceed with implementing
the corresponding backend with `dstack`. Follow the steps below.

### 2.1. Determine if you will implement a VM-based or a container-based backend
Expand Down Expand Up @@ -124,10 +124,10 @@ Then add these models to `AnyBackendConfig*` unions in [`src/dstack/_internal/co

The script also generates `*BackendStoredConfig` that extends `*BackendConfig` to be able to store extra parameters in the DB. By the same logic, it generates `*Config` that extends `*BackendStoredConfig` with creds and uses it as the main `Backend` and `Compute` config instead of using `*BackendConfigWithCreds` directly.

Refer to examples:
[datacrunch](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/datacrunch/models.py),
[aws](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/aws/models.py),
[gcp](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/gcp/models.py),
Refer to examples:
[datacrunch](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/datacrunch/models.py),
[aws](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/aws/models.py),
[gcp](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/gcp/models.py),
[azure](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/models.py), etc.

### 2.7. Implement the backend compute class
Expand All @@ -147,8 +147,8 @@ Go to `configurator.py` and implement custom `Configurator` logic. At minimum, y
You may also need to validate other config parameters if there are any.

Refer to examples: [datacrunch](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/datacrunch/configurator.py),
[aws](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/aws/configurator.py),
[gcp](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/gcp/configurator.py),
[aws](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/aws/configurator.py),
[gcp](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/gcp/configurator.py),
[azure](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/azure/configurator.py), etc.

Register configurator by appending it to `_CONFIGURATOR_CLASSES` in [`src/dstack/_internal/core/backends/configurators.py`](https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/core/backends/configurators.py).
Expand Down Expand Up @@ -181,6 +181,9 @@ The agent controls the VM and starts Docker containers for users' jobs.
Since `dstack` controls the entire VM, VM-based backends can support more features,
such as blocks, instance volumes, privileged containers, and reusable instances.

Note, all VM-based backend `Compute`s should sublass the `ComputeWithPrivilegedSupport` mixin,
as the `dstack-shim` agent provides this functionality OOTB.

To support a VM-based backend, `dstack` expects the following:

- An API for creating and terminating VMs
Expand Down
16 changes: 16 additions & 0 deletions runner/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/dstackai/dstack/runner/internal/schemas"
"github.com/dstackai/dstack/runner/internal/types"
"github.com/prometheus/procfs"
"golang.org/x/sys/unix"
)

// TODO: Tune these parameters for optimal experience/performance
Expand Down Expand Up @@ -518,6 +519,21 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error

cmd.Env = envMap.Render()

// Configure process resource limits
// TODO: Make rlimits customizable in the run configuration. Currently, we only set max locked memory
// to unlimited to fix the issue with InfiniBand/RDMA: "Cannot allocate memory".
// See: https://github.com/ofiwg/libfabric/issues/6437
// See: https://github.com/openucx/ucx/issues/8229
// Note: we already set RLIMIT_MEMLOCK to unlimited in the shim if we've detected IB devices
// (see configureHpcNetworkingIfAvailable() function), but, as it's on the shim side, it only works
// with VM-based backends.
rlimitMemlock := unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY}
// TODO: Check if we have CAP_SYS_RESOURCE. In container environments, even root usually doesn't have
// this capability.
if err := unix.Setrlimit(unix.RLIMIT_MEMLOCK, &rlimitMemlock); err != nil {
log.Error(ctx, "Failed to set resource limits", "err", err)
}

log.Trace(ctx, "Starting exec", "cmd", cmd.String(), "working_dir", cmd.Dir, "env", cmd.Env)

ptm, err := startCommand(cmd)
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/aws/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ComputeWithMultinodeSupport,
ComputeWithPlacementGroupSupport,
ComputeWithPrivateGatewaySupport,
ComputeWithPrivilegedSupport,
ComputeWithReservationSupport,
ComputeWithVolumeSupport,
generate_unique_gateway_instance_name,
Expand Down Expand Up @@ -90,6 +91,7 @@ def _ec2client_cache_methodkey(self, ec2_client, *args, **kwargs):
class AWSCompute(
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
ComputeWithMultinodeSupport,
ComputeWithReservationSupport,
ComputeWithPlacementGroupSupport,
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/azure/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
ComputeWithCreateInstanceSupport,
ComputeWithGatewaySupport,
ComputeWithMultinodeSupport,
ComputeWithPrivilegedSupport,
generate_unique_gateway_instance_name,
generate_unique_instance_name,
get_gateway_user_data,
Expand Down Expand Up @@ -78,6 +79,7 @@
class AzureCompute(
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
ComputeWithMultinodeSupport,
ComputeWithGatewaySupport,
Compute,
Expand Down
9 changes: 9 additions & 0 deletions src/dstack/_internal/core/backends/base/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,15 @@ def _restrict_instance_offer_az_to_volumes_az(
]


class ComputeWithPrivilegedSupport:
"""
Must be subclassed to support runs with `privileged: true`.
All VM-based Computes (that is, Computes that use the shim) should subclass this mixin.
"""

pass


class ComputeWithMultinodeSupport:
"""
Must be subclassed to support multinode tasks and cluster fleets.
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/cloudrift/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Compute,
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
get_shim_commands,
)
from dstack._internal.core.backends.base.offers import get_catalog_offers
Expand All @@ -27,6 +28,7 @@
class CloudRiftCompute(
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
Compute,
):
def __init__(self, config: CloudRiftConfig):
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/cudo/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dstack._internal.core.backends.base.compute import (
ComputeWithCreateInstanceSupport,
ComputeWithFilteredOffersCached,
ComputeWithPrivilegedSupport,
generate_unique_instance_name,
get_shim_commands,
)
Expand All @@ -32,6 +33,7 @@
class CudoCompute(
ComputeWithFilteredOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
Compute,
):
def __init__(self, config: CudoConfig):
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/datacrunch/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dstack._internal.core.backends.base.compute import (
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
generate_unique_instance_name,
get_shim_commands,
)
Expand Down Expand Up @@ -39,6 +40,7 @@
class DataCrunchCompute(
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
Compute,
):
def __init__(self, config: DataCrunchConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dstack._internal.core.backends.base.compute import (
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
generate_unique_instance_name,
get_user_data,
)
Expand Down Expand Up @@ -40,6 +41,7 @@
class BaseDigitalOceanCompute(
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
Compute,
):
def __init__(self, config: BaseDigitalOceanConfig, api_url: str, type: BackendType):
Expand Down
5 changes: 5 additions & 0 deletions src/dstack/_internal/core/backends/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
ComputeWithMultinodeSupport,
ComputeWithPlacementGroupSupport,
ComputeWithPrivateGatewaySupport,
ComputeWithPrivilegedSupport,
ComputeWithReservationSupport,
ComputeWithVolumeSupport,
)
Expand Down Expand Up @@ -38,6 +39,10 @@ def _get_backends_with_compute_feature(
configurator_classes=_configurator_classes,
compute_feature_class=ComputeWithCreateInstanceSupport,
)
BACKENDS_WITH_PRIVILEGED_SUPPORT = _get_backends_with_compute_feature(
configurator_classes=_configurator_classes,
compute_feature_class=ComputeWithPrivilegedSupport,
)
BACKENDS_WITH_MULTINODE_SUPPORT = [BackendType.REMOTE] + _get_backends_with_compute_feature(
configurator_classes=_configurator_classes,
compute_feature_class=ComputeWithMultinodeSupport,
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ComputeWithMultinodeSupport,
ComputeWithPlacementGroupSupport,
ComputeWithPrivateGatewaySupport,
ComputeWithPrivilegedSupport,
ComputeWithVolumeSupport,
generate_unique_gateway_instance_name,
generate_unique_instance_name,
Expand Down Expand Up @@ -90,6 +91,7 @@ class GCPVolumeDiskBackendData(CoreModel):
class GCPCompute(
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
ComputeWithMultinodeSupport,
ComputeWithPlacementGroupSupport,
ComputeWithGatewaySupport,
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/hotaisle/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Compute,
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
get_shim_commands,
)
from dstack._internal.core.backends.base.offers import get_catalog_offers
Expand Down Expand Up @@ -47,6 +48,7 @@
class HotAisleCompute(
ComputeWithAllOffersCached,
ComputeWithCreateInstanceSupport,
ComputeWithPrivilegedSupport,
Compute,
):
def __init__(self, config: HotAisleConfig):
Expand Down
54 changes: 47 additions & 7 deletions src/dstack/_internal/core/backends/kubernetes/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
Compute,
ComputeWithFilteredOffersCached,
ComputeWithGatewaySupport,
ComputeWithMultinodeSupport,
ComputeWithPrivilegedSupport,
generate_unique_gateway_instance_name,
generate_unique_instance_name_for_job,
get_docker_commands,
Expand Down Expand Up @@ -60,10 +62,14 @@
NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS}
NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys()

DUMMY_REGION = "-"


class KubernetesCompute(
ComputeWithFilteredOffersCached,
ComputeWithPrivilegedSupport,
ComputeWithGatewaySupport,
ComputeWithMultinodeSupport,
Compute,
):
def __init__(self, config: KubernetesConfig):
Expand Down Expand Up @@ -118,7 +124,7 @@ def get_offers_by_requirements(
),
),
price=0,
region="-",
region=DUMMY_REGION,
availability=InstanceAvailability.AVAILABLE,
instance_runtime=InstanceRuntime.RUNNER,
)
Expand Down Expand Up @@ -282,6 +288,13 @@ def run_job(
# TODO(#1535): support non-root images properly
run_as_user=0,
run_as_group=0,
privileged=job.job_spec.privileged,
capabilities=client.V1Capabilities(
add=[
# Allow to increase hard resource limits, see getrlimit(2)
"SYS_RESOURCE",
],
),
),
resources=client.V1ResourceRequirements(
requests=resources_requests,
Expand All @@ -300,7 +313,7 @@ def run_job(
namespace=self.config.namespace,
body=pod,
)
service = call_api_method(
call_api_method(
self.api.create_namespaced_service,
client.V1Service,
namespace=self.config.namespace,
Expand All @@ -313,14 +326,16 @@ def run_job(
),
),
)
service_ip = get_value(service, ".spec.cluster_ip", str, required=True)
return JobProvisioningData(
backend=instance_offer.backend,
instance_type=instance_offer.instance,
instance_id=instance_name,
hostname=service_ip,
# Although we can already get Service's ClusterIP from the `V1Service` object returned
# by the `create_namespaced_service` method, we still need PodIP for multinode runs.
# We'll update both hostname and internal_ip once the pod is assigned to the node.
hostname=None,
internal_ip=None,
region="local",
region=instance_offer.region,
price=instance_offer.price,
username="root",
ssh_port=DSTACK_RUNNER_SSH_PORT,
Expand All @@ -333,6 +348,30 @@ def run_job(
backend_data=None,
)

def update_provisioning_data(
self,
provisioning_data: JobProvisioningData,
project_ssh_public_key: str,
project_ssh_private_key: str,
):
pod = call_api_method(
self.api.read_namespaced_pod,
client.V1Pod,
name=provisioning_data.instance_id,
namespace=self.config.namespace,
)
pod_ip = get_value(pod, ".status.pod_ip", str)
if not pod_ip:
return
provisioning_data.internal_ip = pod_ip
service = call_api_method(
self.api.read_namespaced_service,
client.V1Service,
name=_get_pod_service_name(provisioning_data.instance_id),
namespace=self.config.namespace,
)
provisioning_data.hostname = get_value(service, ".spec.cluster_ip", str, required=True)

def terminate_instance(
self, instance_id: str, region: str, backend_data: Optional[str] = None
):
Expand Down Expand Up @@ -438,16 +477,17 @@ def create_gateway(
namespace=self.config.namespace,
service_name=_get_pod_service_name(instance_name),
)
region = DUMMY_REGION
if hostname is None:
self.terminate_instance(instance_name, region="-")
self.terminate_instance(instance_name, region=region)
raise ComputeError(
"Failed to get gateway hostname. "
"Ensure the Kubernetes cluster supports Load Balancer services."
)
return GatewayProvisioningData(
instance_id=instance_name,
ip_address=hostname,
region="-",
region=region,
)

def terminate_gateway(
Expand Down
Loading