Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions examples/flux/lammps-train-gpus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# This was developed on AWS g4dn.xlarge, Tesla T4 GPUs
# eksctl create cluster --config-file ./eks-config-gpu.yaml
# eksctl delete cluster --config-file ./eks-config-gpu.yaml --wait
# apiVersion: eksctl.io/v1alpha5
# kind: ClusterConfig
# metadata:
# name: gpu-cluster
# region: us-east-2

# nodeGroups:
# - name: hpsf-gpu-workers
# instanceType: g4dn.xlarge
# minSize: 2
# maxSize: 2
# desiredCapacity: 2
apiVersion: trainer.kubeflow.org/v1alpha1
kind: TrainJob
metadata:
name: lammps-flux
spec:
# Reference the pre-defined runtime by name
runtimeRef:
name: flux-runtime
trainer:
numNodes: 2
numProcPerNode: 1
image: ghcr.io/flux-framework/tutorials:gpu-lammps-hwloc
# You do not need to write "flux run, etc" here. It will be wrapped
command: [lmp_gpu, -k, "on", g, "8", "-sf", "kk", "-pk", "kokkos", "cuda/aware", "off", "newton", "on", neigh, half, -in, in.reaxff.hns, -v, "x", "8", -v, "y", "8", -v, z, "16", "-in", in.reaxff.hns, "-nocite"]
resourcesPerNode:
limits:
nvidia.com/gpu: "1"
requests:
nvidia.com/gpu: "1"
env:
- name: OMPI_MCA_btl
value: tcp,self
# This is how we match the view (operating system and version) of the initContainer to install Flux
- name: FLUX_VIEW_IMAGE
value: ghcr.io/converged-computing/flux-view-ubuntu:tag-jammy
4 changes: 4 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ const (
// Path for Flux curve path
FluxCurveVolumePath = "/curve"

// Ensure MPI has full memory of the host
FluxMemoryVolumeName = "shared-memory"
FluxMemoryVolumePath = "/dev/shm"

// emptyDir volume using for complete spack view software
FluxSpackViewVolumeName = "spack-install"

Expand Down
28 changes: 21 additions & 7 deletions pkg/runtime/framework/plugins/flux/flux.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (f *Flux) EnforceMLPolicy(info *runtime.Info, trainJob *trainer.TrainJob) e
*corev1ac.VolumeMount().WithName(constants.FluxSpackViewVolumeName).WithMountPath(constants.FluxSpackViewVolumePath),
*corev1ac.VolumeMount().WithName(configMapName).WithMountPath(constants.FluxConfigVolumeName).WithReadOnly(true),
*corev1ac.VolumeMount().WithName(constants.FluxCurveVolumeName).WithMountPath(constants.FluxCurveVolumePath).WithReadOnly(true),
*corev1ac.VolumeMount().WithName(constants.FluxMemoryVolumeName).WithMountPath(constants.FluxMemoryVolumePath).WithReadOnly(true),
)
}
}
Expand Down Expand Up @@ -296,6 +297,10 @@ func getViewVolumes(configMapName string) []corev1ac.VolumeApplyConfiguration {
spackInstallAC := corev1ac.Volume().
WithName(constants.FluxSpackViewVolumeName).
WithEmptyDir(corev1ac.EmptyDirVolumeSource())
memoryVolumeAC := corev1ac.Volume().
WithName(constants.FluxMemoryVolumeName).
WithEmptyDir(corev1ac.EmptyDirVolumeSource().
WithMedium(corev1.StorageMediumMemory))
fluxVolumeAC := corev1ac.Volume().
Comment on lines +300 to 304
WithEmptyDir(corev1ac.EmptyDirVolumeSource()).
WithName(constants.FluxInstallVolumeName)
Expand All @@ -306,7 +311,7 @@ func getViewVolumes(configMapName string) []corev1ac.VolumeApplyConfiguration {
WithName(configMapName).
WithDefaultMode(0755),
)
return []corev1ac.VolumeApplyConfiguration{*spackInstallAC, *fluxVolumeAC, *cmAC}
return []corev1ac.VolumeApplyConfiguration{*spackInstallAC, *fluxVolumeAC, *cmAC, *memoryVolumeAC}
}

// buildInitScriptConfigMap creates a ConfigMapApplyConfiguration to support server-side Apply
Expand Down Expand Up @@ -403,25 +408,34 @@ func (f *Flux) generateFluxEntrypoint(trainJob *trainer.TrainJob, info *runtime.
// Derive number of tasks
// This may not technically be the number of processes per node,
// but that is all the TrainJob can currently represent.
var tasks string
var tasks int32
var flags string

nodes := *trainJob.Spec.Trainer.NumNodes
if trainJob.Spec.Trainer.NumProcPerNode != nil {
tasks = fmt.Sprintf("-N %d -n %d", nodes, *trainJob.Spec.Trainer.NumProcPerNode*nodes)
tasks = *trainJob.Spec.Trainer.NumProcPerNode
} else {
tasks = fmt.Sprintf("-N %d -n %d", nodes, *info.RuntimePolicy.MLPolicySource.Flux.NumProcPerNode*nodes)
tasks = *info.RuntimePolicy.MLPolicySource.Flux.NumProcPerNode
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vsoch Do you want to add validation for numProcPerNode in the followup PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had it before (in the first PR) but was asked to remove it because it’s part of the API as an annotation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we do need to think about is a case that we have for the Flux Operator (that may not be dealt with well here). Often there is a desire to put the number of nodes, but just say "discover and use all of the cores that are found" and then you would do like:

flux submit -N 16 --exclusive

Note that I don't have any -n specified. For the Flux Operator, I allowed this case when the user put tasks as 0. Is there any way we can support something similar? Is the only way some special envar that flags and triggers the condition (regardless of what the -n is) since the validation of >1 is in the spec?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had it before (in the first PR) but was asked to remove it because it’s part of the API as an annotation?

Please open dedicated PR to introduce this kubebuilder validation, we need to ensure that
numProcPerNode >= 1

Note that I don't have any -n specified. For the Flux Operator, I allowed this case when the user put tasks as 0. Is there any way we can support something similar? I

If Flux can dynamically discover all available devices on the node, we can rely on this functionality when numProcPerNode is omitted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent! I did not know that was an option. Thank you!

flags = fmt.Sprintf("-N %d -n %d", nodes, tasks*nodes)

// Derive number of GPUs from resources. In Flux, -g is --gpus-per-task
resourcesPerNode := ptr.Deref(runtime.ExtractResourcePerNodeFromRuntime(info), corev1.ResourceRequirements{})
if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil && jobTrainer.ResourcesPerNode != nil {
resourcesPerNode = ptr.Deref(jobTrainer.ResourcesPerNode, corev1.ResourceRequirements{})
}
gpus := runtime.GetNumGPUPerNode(&resourcesPerNode)

// Resource file for cluster includes GPUs or not
// flux R encode --hosts=${hosts} --cores=0-1 --gpu=0
coreSpec := generateRange(int32(tasks), 0)
Rspec := fmt.Sprintf("--cores=%s", coreSpec)
if gpus > 0 {
tasks = fmt.Sprintf("%s -g %d", tasks, gpus)
flags = fmt.Sprintf("%s -g %d", flags, gpus)
gpuSpec := generateRange(int32(gpus), 0)
Rspec = fmt.Sprintf("%s --gpu=%s", Rspec, gpuSpec)
}

return fmt.Sprintf(entrypointTemplate, mainHost, tasks)
return fmt.Sprintf(entrypointTemplate, Rspec, mainHost, flags)
}

// generateInitEntrypoint generates the flux entrypoint to prepare flux
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/framework/plugins/flux/templates/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ chown -R ${fluxuid} ${curvepath}

# Generate host resources
hosts=$(cat ${configroot}/etc/flux/system/hostlist)
flux R encode --hosts=${hosts} --local > /tmp/R
flux R encode --hosts=${hosts} %s > /tmp/R
mv /tmp/R ${configroot}/etc/flux/system/R

# Put the state directory in /var/lib on shared view
Expand Down
Loading