Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion internal/controllers/allocator/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"slices"

"github.com/weka/go-weka-observability/instrumentation"
weka "github.com/weka/weka-k8s-api/api/v1alpha1"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -104,6 +105,9 @@ const maxNodeSample = 3
// computes the top-numDrives capacity sum per node, and returns the maximum.
// This represents the worst-case (most memory) capacity a single drive container could manage.
func computeMaxNodeDriveCapacity(ctx context.Context, k8sClient client.Client, nodeSelector map[string]string, numDrives int) (int, error) {
ctx, logger, end := instrumentation.GetLogSpan(ctx, "computeMaxNodeDriveCapacity", "nodeSelector", nodeSelector, "numDrives", numDrives)
defer end()

kubeService := kubernetes.NewKubeService(k8sClient)
nodes, err := kubeService.GetNodes(ctx, nodeSelector)
if err != nil {
Expand All @@ -113,7 +117,7 @@ func computeMaxNodeDriveCapacity(ctx context.Context, k8sClient client.Client, n
sampleSize := min(len(nodes), maxNodeSample)

maxCapacity := 0
for i := 0; i < sampleSize; i++ {
for i := range sampleSize {
node := nodes[i]
drivesStr, ok := node.Annotations[consts.AnnotationWekaDrives]
if !ok || drivesStr == "" {
Expand All @@ -140,6 +144,8 @@ func computeMaxNodeDriveCapacity(ctx context.Context, k8sClient client.Client, n
maxCapacity = max(maxCapacity, nodeSum)
}

logger.Info("Computed max node drive capacity", "maxCapacityGiB", maxCapacity)

return maxCapacity, nil
}

Expand Down
306 changes: 159 additions & 147 deletions internal/controllers/allocator/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@
DataServicesHugepagesOffset int
}

func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterTemplate {
hgSize := "2Mi"

// BuildDynamicTemplate builds cluster layout (structure) from config, setting defaults for container
// counts and cores. Does not compute hugepages - use GetEnrichedTemplate for full ClusterTemplate
// with hugepages calculated.
func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterLayout {

Check failure on line 60 in internal/controllers/allocator/templates.go

View workflow job for this annotation

GitHub Actions / build

undefined: v1alpha1.WekaConfig
if config.DriveCores == 0 {
config.DriveCores = 1
}
Expand Down Expand Up @@ -97,51 +98,6 @@
config.DataServicesCores = 1
}

if config.DriveHugepages == 0 {
if config.NumDrives > 0 {
config.DriveHugepages = 1400*config.DriveCores + 200*config.NumDrives
} else {
config.DriveHugepages = 1600 * config.DriveCores

}
}

if config.DriveHugepagesOffset == 0 {
if config.NumDrives > 0 {
config.DriveHugepagesOffset = 200 * config.NumDrives
} else {
config.DriveHugepagesOffset = 200 * config.DriveCores
}
}

if config.ComputeHugepagesOffset == 0 {
config.ComputeHugepagesOffset = 200
}

if config.S3FrontendHugepages == 0 {
config.S3FrontendHugepages = 1400 * config.S3Cores
}

if config.NfsFrontendHugepages == 0 {
config.NfsFrontendHugepages = 1400 * config.NfsCores
}

if config.S3FrontendHugepagesOffset == 0 {
config.S3FrontendHugepagesOffset = 200
}

if config.NfsFrontendHugepagesOffset == 0 {
config.NfsFrontendHugepagesOffset = 200
}

if config.DataServicesHugepages == 0 {
config.DataServicesHugepages = 1536 // 1.5GB default
}

if config.DataServicesHugepagesOffset == 0 {
config.DataServicesHugepagesOffset = 200
}

if config.EnvoyCores == 0 {
config.EnvoyCores = 1
}
Expand All @@ -159,116 +115,86 @@
}
}

if config.ComputeHugepages == 0 {
var totalRawCapacityGiB int
if config.ContainerCapacity > 0 {
totalRawCapacityGiB = *config.DriveContainers * config.ContainerCapacity
} else if config.NumDrives > 0 && config.DriveCapacity > 0 {
totalRawCapacityGiB = *config.DriveContainers * config.NumDrives * config.DriveCapacity
}

capacityBased := 0
if *config.ComputeContainers > 0 && totalRawCapacityGiB > 0 {
tlcCapGiB, qlcCapGiB := v1alpha1.GetTlcQlcCapacity(totalRawCapacityGiB, config.DriveTypesRatio)
return ClusterLayout{
DriveCores: config.DriveCores,
DriveExtraCores: config.DriveExtraCores,
ComputeCores: config.ComputeCores,
ComputeExtraCores: config.ComputeExtraCores,
ComputeContainers: *config.ComputeContainers,
DriveContainers: *config.DriveContainers,
S3Containers: config.S3Containers,
S3Cores: config.S3Cores,
S3ExtraCores: config.S3ExtraCores,
NfsContainers: config.NfsContainers,
NumDrives: config.NumDrives,
DriveCapacity: config.DriveCapacity,
ContainerCapacity: config.ContainerCapacity,
DriveTypesRatio: config.DriveTypesRatio,
EnvoyCores: config.EnvoyCores,
NfsCores: config.NfsCores,
NfsExtraCores: config.NfsExtraCores,
DataServicesContainers: config.DataServicesContainers,
DataServicesCores: config.DataServicesCores,
DataServicesExtraCores: config.DataServicesExtraCores,
}

hugepagesTlcRatio := globalconfig.Config.DriveSharing.HugepagesTlcRatio
if hugepagesTlcRatio == 0 {
hugepagesTlcRatio = 1000
}
hugepagesQlcRatio := globalconfig.Config.DriveSharing.HugepagesQlcRatio
if hugepagesQlcRatio == 0 {
hugepagesQlcRatio = 6000
}
}

// Compute cluster-wide hugepages in MiB from TLC and QLC capacities
// Formula: (tlcGiB * 1024 / tlcRatio) + (qlcGiB * 1024 / qlcRatio)
// The *1024 converts GiB capacity to MiB hugepages before applying ratio
clusterHugepagesMiB := 0
if hugepagesTlcRatio > 0 {
clusterHugepagesMiB += tlcCapGiB * 1024 / hugepagesTlcRatio
}
if hugepagesQlcRatio > 0 {
clusterHugepagesMiB += qlcCapGiB * 1024 / hugepagesQlcRatio
}
// ComputeCapacityBasedHugepages calculates compute hugepages using TLC/QLC-aware capacity ratios.
func ComputeCapacityBasedHugepages(totalRawCapacityGiB, computeContainers, computeCores int, driveTypesRatio *v1alpha1.DriveTypesRatio) int {
capacityBased := 0
if computeContainers > 0 && totalRawCapacityGiB > 0 {
tlcCapGiB, qlcCapGiB := v1alpha1.GetTlcQlcCapacity(totalRawCapacityGiB, driveTypesRatio)

// Per-container capacity hugepages
capacityBased = clusterHugepagesMiB / *config.ComputeContainers
hugepagesTlcRatio := globalconfig.Config.DriveSharing.HugepagesTlcRatio
if hugepagesTlcRatio == 0 {
hugepagesTlcRatio = 1000
}
hugepagesQlcRatio := globalconfig.Config.DriveSharing.HugepagesQlcRatio
if hugepagesQlcRatio == 0 {
hugepagesQlcRatio = 6000
}

perCoreComponent := 1700 * config.ComputeCores
config.ComputeHugepages = capacityBased + perCoreComponent

minHugepages := 3000 * config.ComputeCores
config.ComputeHugepages = max(config.ComputeHugepages, minHugepages)
// Must be devidable by 2, ceil-ing up to nearest even number if not:
if config.ComputeHugepages%2 != 0 {
config.ComputeHugepages++
// Compute cluster-wide hugepages in MiB from TLC and QLC capacities
// Formula: (tlcGiB * 1024 / tlcRatio) + (qlcGiB * 1024 / qlcRatio)
clusterHugepagesMiB := 0
if hugepagesTlcRatio > 0 {
clusterHugepagesMiB += tlcCapGiB * 1024 / hugepagesTlcRatio
}
if hugepagesQlcRatio > 0 {
clusterHugepagesMiB += qlcCapGiB * 1024 / hugepagesQlcRatio
}
}

return ClusterTemplate{
ClusterLayout: ClusterLayout{
DriveCores: config.DriveCores,
DriveExtraCores: config.DriveExtraCores,
ComputeCores: config.ComputeCores,
ComputeExtraCores: config.ComputeExtraCores,
ComputeContainers: *config.ComputeContainers,
DriveContainers: *config.DriveContainers,
S3Containers: config.S3Containers,
S3Cores: config.S3Cores,
S3ExtraCores: config.S3ExtraCores,
NfsContainers: config.NfsContainers,
NumDrives: config.NumDrives,
DriveCapacity: config.DriveCapacity,
ContainerCapacity: config.ContainerCapacity,
DriveTypesRatio: config.DriveTypesRatio,
EnvoyCores: config.EnvoyCores,
NfsCores: config.NfsCores,
NfsExtraCores: config.NfsExtraCores,
DataServicesContainers: config.DataServicesContainers,
DataServicesCores: config.DataServicesCores,
DataServicesExtraCores: config.DataServicesExtraCores,
},
DriveHugepages: config.DriveHugepages,
DriveHugepagesOffset: config.DriveHugepagesOffset,
ComputeHugepages: config.ComputeHugepages,
ComputeHugepagesOffset: config.ComputeHugepagesOffset,
S3FrontendHugepages: config.S3FrontendHugepages,
S3FrontendHugepagesOffset: config.S3FrontendHugepagesOffset,
HugePageSize: hgSize,
NfsFrontendHugepages: config.NfsFrontendHugepages,
NfsFrontendHugepagesOffset: config.NfsFrontendHugepagesOffset,
DataServicesHugepages: config.DataServicesHugepages,
DataServicesHugepagesOffset: config.DataServicesHugepagesOffset,
capacityBased = clusterHugepagesMiB / computeContainers
}

}

// computeHugepagesForCompute calculates compute hugepages from cores and raw capacity.
func computeHugepagesForCompute(computeCores, totalRawCapacityGiB, computeContainers int) int {
capacityComponent := 0
if computeContainers > 0 && totalRawCapacityGiB > 0 {
capacityComponent = totalRawCapacityGiB / computeContainers
}
perCoreComponent := 1700 * computeCores
minHugepages := 3000 * computeCores
return max(capacityComponent+perCoreComponent, minHugepages)
hugepages := max(capacityBased+perCoreComponent, minHugepages)

// Must be divisible by 2, ceiling up to nearest even number if not
if hugepages%2 != 0 {
hugepages++
}

return hugepages
}

// GetTemplateByName returns the ClusterLayout (no hugepages) for the named template.
// Use GetEnrichedTemplate when hugepages fields are needed.
func GetTemplateByName(name string, cluster v1alpha1.WekaCluster) (ClusterLayout, bool) {
if name == "dynamic" {
return BuildDynamicTemplate(cluster.Spec.Dynamic).ClusterLayout, true
return BuildDynamicTemplate(cluster.Spec.Dynamic), true
}

template, ok := WekaClusterTemplates[name]
return template.ClusterLayout, ok
}

// GetEnrichedTemplate returns a full ClusterTemplate (including hugepages) for the named template.
// For dynamic templates, it enriches compute hugepages with actual drive capacity from node
// annotations when the spec doesn't provide capacity info (traditional drive mode).
// For dynamic templates, it computes all hugepages defaults for fields not explicitly set by the user.
// ComputeHugepages is capacity-based when drive capacity is available, otherwise falls back to minimum.
// For traditional mode without explicit capacity, it reads drive capacity from node annotations.
func GetEnrichedTemplate(ctx context.Context, k8sClient client.Client, name string, cluster v1alpha1.WekaCluster) (*ClusterTemplate, error) {
if name != "dynamic" {
template, ok := WekaClusterTemplates[name]
Expand All @@ -281,25 +207,111 @@
return nil, nil
}

tmpl := BuildDynamicTemplate(cluster.Spec.Dynamic)
layout := BuildDynamicTemplate(cluster.Spec.Dynamic)
spec := cluster.Spec.Dynamic

// Enrich compute hugepages with actual drive capacity from nodes
// when the spec doesn't provide capacity info (traditional drive mode)
if tmpl.ContainerCapacity == 0 && tmpl.DriveCapacity == 0 &&
tmpl.ComputeHugepages == 3000*tmpl.ComputeCores {
// Initialize template with layout and set hugepages defaults for fields not explicitly set by user
tmpl := &ClusterTemplate{
ClusterLayout: layout,
HugePageSize: "2Mi",
}

maxNodeCap, err := computeMaxNodeDriveCapacity(ctx, k8sClient, cluster.Spec.NodeSelector, tmpl.NumDrives)
if err != nil {
return nil, fmt.Errorf("failed to compute node drive capacity: %w", err)
// Drive hugepages
if spec.DriveHugepages == 0 {
if layout.NumDrives > 0 {
tmpl.DriveHugepages = 1400*layout.DriveCores + 200*layout.NumDrives
} else {
tmpl.DriveHugepages = 1600 * layout.DriveCores
}
if maxNodeCap > 0 {
totalRawCapacity := tmpl.DriveContainers * maxNodeCap
tmpl.ComputeHugepages = computeHugepagesForCompute(
tmpl.ComputeCores, totalRawCapacity, tmpl.ComputeContainers)
} else {
tmpl.DriveHugepages = spec.DriveHugepages
}

if spec.DriveHugepagesOffset == 0 {
if layout.NumDrives > 0 {
tmpl.DriveHugepagesOffset = 200 * layout.NumDrives
} else {
tmpl.DriveHugepagesOffset = 200 * layout.DriveCores
}
} else {
tmpl.DriveHugepagesOffset = spec.DriveHugepagesOffset
}

// Compute hugepages (capacity-based)
if spec.ComputeHugepages == 0 {
var totalRawCapacityGiB int
if layout.ContainerCapacity > 0 {
// Drive-sharing mode: capacity per container × number of drive containers
totalRawCapacityGiB = layout.DriveContainers * layout.ContainerCapacity
} else if layout.NumDrives > 0 && layout.DriveCapacity > 0 {
// Traditional mode with explicit drive capacity in spec
totalRawCapacityGiB = layout.DriveContainers * layout.NumDrives * layout.DriveCapacity
} else if layout.NumDrives > 0 {
// Traditional mode without capacity in spec: read from node annotations
maxNodeCap, err := computeMaxNodeDriveCapacity(ctx, k8sClient, cluster.Spec.NodeSelector, layout.NumDrives)
if err != nil {
return nil, fmt.Errorf("failed to compute node drive capacity: %w", err)
}
totalRawCapacityGiB = layout.DriveContainers * maxNodeCap
}

if totalRawCapacityGiB > 0 {
tmpl.ComputeHugepages = ComputeCapacityBasedHugepages(
totalRawCapacityGiB, layout.ComputeContainers, layout.ComputeCores, layout.DriveTypesRatio)
} else {
// Fallback minimum when capacity is unknown
tmpl.ComputeHugepages = 3000 * layout.ComputeCores
}
} else {
tmpl.ComputeHugepages = spec.ComputeHugepages
}

if spec.ComputeHugepagesOffset == 0 {
tmpl.ComputeHugepagesOffset = 200
} else {
tmpl.ComputeHugepagesOffset = spec.ComputeHugepagesOffset
}

// S3 frontend hugepages
if spec.S3FrontendHugepages == 0 {
tmpl.S3FrontendHugepages = 1400 * layout.S3Cores
} else {
tmpl.S3FrontendHugepages = spec.S3FrontendHugepages
}

if spec.S3FrontendHugepagesOffset == 0 {
tmpl.S3FrontendHugepagesOffset = 200
} else {
tmpl.S3FrontendHugepagesOffset = spec.S3FrontendHugepagesOffset
}

// NFS frontend hugepages
if spec.NfsFrontendHugepages == 0 {
tmpl.NfsFrontendHugepages = 1400 * layout.NfsCores
} else {
tmpl.NfsFrontendHugepages = spec.NfsFrontendHugepages
}

if spec.NfsFrontendHugepagesOffset == 0 {
tmpl.NfsFrontendHugepagesOffset = 200
} else {
tmpl.NfsFrontendHugepagesOffset = spec.NfsFrontendHugepagesOffset
}

// Data services hugepages
if spec.DataServicesHugepages == 0 {
tmpl.DataServicesHugepages = 1536 // 1.5GB default
} else {
tmpl.DataServicesHugepages = spec.DataServicesHugepages
}

if spec.DataServicesHugepagesOffset == 0 {
tmpl.DataServicesHugepagesOffset = 200
} else {
tmpl.DataServicesHugepagesOffset = spec.DataServicesHugepagesOffset
}

return &tmpl, nil
return tmpl, nil
}

var WekaClusterTemplates = map[string]ClusterTemplate{
Expand Down
Loading
Loading