Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions charts/weka-operator/templates/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,18 @@ spec:
value: "{{ .Values.hugepagesQlcRatio }}"
- name: SSD_PROXY_HUGEPAGES_OFFSET_MIB
value: "{{ .Values.driveSharing.ssdProxy.hugepagesOffsetMiB }}"
- name: HUGEPAGES_UPDATE_COMPUTE
value: "{{ .Values.hugepagesUpdate.compute }}"
- name: HUGEPAGES_UPDATE_DRIVE
value: "{{ .Values.hugepagesUpdate.drive }}"
- name: HUGEPAGES_UPDATE_S3
value: "{{ .Values.hugepagesUpdate.s3 }}"
- name: HUGEPAGES_UPDATE_NFS
value: "{{ .Values.hugepagesUpdate.nfs }}"
- name: HUGEPAGES_UPDATE_DATA_SERVICES
value: "{{ .Values.hugepagesUpdate.dataServices }}"
- name: COMPUTE_MAX_HUGEPAGES_MIB
value: "{{ .Values.computeMaxHugepagesMiB }}"
- name: PORT_ALLOCATION_STARTING_PORT
value: "{{ .Values.portAllocation.startingPort }}"
- name: LOG_FORMAT
Expand Down
6 changes: 6 additions & 0 deletions charts/weka-operator/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
"required": ["tlc", "qlc"]
}
}
},
"computeMaxHugepagesMiB": {
"type": "integer",
"minimum": 0,
"multipleOf": 2,
"description": "Maximum hugepages (MiB) for compute containers. Must be even. 0 = no cap."
}
}
}
15 changes: 15 additions & 0 deletions charts/weka-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,21 @@ driveSharing:
hugepagesTlcRatio: 1000
hugepagesQlcRatio: 6000

# Hugepages update propagation to existing containers.
# When enabled per container type, the operator recalculates and propagates
# hugepages values to existing containers during HandleSpecUpdates.
# All disabled by default.
hugepagesUpdate:
compute: false
drive: false
s3: false
nfs: false
dataServices: false

# Maximum hugepages (MiB) for compute containers. 0 = no cap.
# Applied during both initial container creation and hugepages updates.
computeMaxHugepagesMiB: 360000

# Port allocation configuration
portAllocation:
# Starting port for Weka container port allocation.
Expand Down
51 changes: 43 additions & 8 deletions internal/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,31 @@ type NfsConfig struct {
NotifyPort int
}

type HugepagesUpdateConfig struct {
Compute bool
Drive bool
S3 bool
Nfs bool
DataServices bool
}

func (h *HugepagesUpdateConfig) IsEnabledForRole(role string) bool {
switch role {
case "compute":
return h.Compute
case "drive":
return h.Drive
case "s3":
return h.S3
case "nfs":
return h.Nfs
case "data-services":
return h.DataServices
default:
return false
}
}

type DriveSharingConfig struct {
DriveTypesRatio v1alpha1.DriveTypesRatio
MaxVirtualDrivesPerCore int
Expand Down Expand Up @@ -231,14 +256,16 @@ var Config struct {
EvictedPodCleanupEnabled bool
EvictedPodCleanupInterval time.Duration

BuilderImages BuilderImagesConfig
Csi EmbeddedCsiSettings
SyslogPackage string
Proxy string
PriorityClasses PriorityClasses
Nfs NfsConfig
DriveSharing DriveSharingConfig
PortAllocation PortAllocationConfig
BuilderImages BuilderImagesConfig
Csi EmbeddedCsiSettings
SyslogPackage string
Proxy string
PriorityClasses PriorityClasses
Nfs NfsConfig
DriveSharing DriveSharingConfig
PortAllocation PortAllocationConfig
HugepagesUpdate HugepagesUpdateConfig
ComputeMaxHugepagesMiB int
}

type NodeAgentRequestsTimeouts struct {
Expand Down Expand Up @@ -460,6 +487,14 @@ func ConfigureEnv(ctx context.Context) {
// Port allocation configuration
Config.PortAllocation.StartingPort = getIntEnvOrDefault("PORT_ALLOCATION_STARTING_PORT", 35000)

// Hugepages update propagation configuration
Config.HugepagesUpdate.Compute = getBoolEnvOrDefault("HUGEPAGES_UPDATE_COMPUTE", false)
Config.HugepagesUpdate.Drive = getBoolEnvOrDefault("HUGEPAGES_UPDATE_DRIVE", false)
Config.HugepagesUpdate.S3 = getBoolEnvOrDefault("HUGEPAGES_UPDATE_S3", false)
Config.HugepagesUpdate.Nfs = getBoolEnvOrDefault("HUGEPAGES_UPDATE_NFS", false)
Config.HugepagesUpdate.DataServices = getBoolEnvOrDefault("HUGEPAGES_UPDATE_DATA_SERVICES", false)
Config.ComputeMaxHugepagesMiB = getIntEnvOrDefault("COMPUTE_MAX_HUGEPAGES_MIB", 360000)

// Evicted pod cleanup configuration
Config.EvictedPodCleanupEnabled = getBoolEnvOrDefault("EVICTED_POD_CLEANUP_ENABLED", true)
Config.EvictedPodCleanupInterval = getDurationEnvOrDefault("EVICTED_POD_CLEANUP_INTERVAL", 2*time.Minute)
Expand Down
80 changes: 68 additions & 12 deletions internal/controllers/allocator/hugepages.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
globalconfig "github.com/weka/weka-operator/internal/config"
)

func calculateDriveHugepages(template ClusterTemplate) int {
func CalculateDriveHugepages(template ClusterTemplate) int {
if template.NumDrives > 0 {
return 1400*template.Cores.Drive + 200*template.NumDrives
} else {
return 1600 * template.Cores.Drive
}
}

func calculateDriveHugepagesOffset(template ClusterTemplate) int {
func CalculateDriveHugepagesOffset(template ClusterTemplate) int {
if template.NumDrives > 0 {
return 200 * template.NumDrives
} else {
Expand All @@ -29,7 +29,7 @@ func calculateDriveHugepagesOffset(template ClusterTemplate) int {
}

// Compute hugepages (capacity-based)
func calculateDynamicComputeHugepages(ctx context.Context, k8sClient client.Client, template ClusterTemplate, cluster *weka.WekaCluster) (hp int, err error) {
func calculateDynamicComputeHugepages(ctx context.Context, k8sClient client.Client, template ClusterTemplate, cluster *weka.WekaCluster, containers []*weka.WekaContainer) (hp int, err error) {
var totalRawCapacityGiB int

if template.ContainerCapacity > 0 {
Expand All @@ -39,15 +39,22 @@ func calculateDynamicComputeHugepages(ctx context.Context, k8sClient client.Clie
// Drive-sharing mode with explicit drive count and capacity
totalRawCapacityGiB = template.NumDrives * template.DriveCapacity * template.Containers.Drive
} else if template.Containers.Drive > 0 {
// Traditional mode without capacity in spec: read from node annotations
driveNodeSelector := cluster.GetNodeSelectorForRole(weka.WekaContainerModeDrive)

maxNodeCap, err := computeMaxNodeDriveCapacity(ctx, k8sClient, driveNodeSelector, template.NumDrives)
if err != nil {
return 0, fmt.Errorf("failed to compute node drive capacity: %w", err)
// Traditional mode without capacity in spec: try ready-cluster path first, fall back to node annotations
readyClusterCap, readyErr := computeMaxNodeDriveCapacityForReadyCluster(containers, template.Containers.Drive, template.NumDrives)
if readyErr == nil {
// Ready-cluster path succeeded: result is already the total capacity
totalRawCapacityGiB = readyClusterCap
} else {
// Fall back to node-annotation sampling
driveNodeSelector := cluster.GetNodeSelectorForRole(weka.WekaContainerModeDrive)

maxNodeCap, err := computeMaxNodeDriveCapacityForInitCluster(ctx, k8sClient, driveNodeSelector, template.Containers.Drive, template.NumDrives)
if err != nil {
return 0, fmt.Errorf("failed to compute node drive capacity: %w", err)
}

totalRawCapacityGiB = template.Containers.Drive * maxNodeCap
}

totalRawCapacityGiB = template.Containers.Drive * maxNodeCap
} else {
return 0, errors.New("either containerCapacity or numDrives must be specified for dynamic template")
}
Expand Down Expand Up @@ -104,9 +111,58 @@ func ComputeCapacityBasedHugepages(ctx context.Context, totalRawCapacityGiB, com
hugepages++
}

logger.Info("Calculated compute hugepages",
// Apply max cap if configured (must be even — validated at Helm level)
maxMiB := globalconfig.Config.ComputeMaxHugepagesMiB
if maxMiB > 0 && hugepages > maxMiB {
hugepages = maxMiB
}

logger.Debug("Calculated compute hugepages",
"totalRawCapacityGiB", totalRawCapacityGiB,
"hugepages", hugepages)

return hugepages
}

func ComputeTotalCapacityFromContainers(containers []*weka.WekaContainer, numDriveContainers, numDrives int) (int, error) {
var goodContainersCapacitySumBytes int64
goodContainersCount := 0
for _, c := range containers {
if !c.IsDriveContainer() || len(c.Status.AddedDrives) == 0 {
continue
}
containerBytes := int64(0)
addedContainerDrives := 0
allHaveSize := true
for _, drive := range c.Status.AddedDrives {
if drive.SizeBytes == 0 || drive.Status != "ACTIVE" {
allHaveSize = false
break
}
containerBytes += drive.SizeBytes
addedContainerDrives++
}
if !allHaveSize {
continue
}
// make sure we only count containers that have the expected number of drives, to avoid underestimating capacity due to some containers not reporting drive sizes yet.
if addedContainerDrives != numDrives {
continue
}

goodContainersCapacitySumBytes += containerBytes
goodContainersCount++
}
if goodContainersCount < 5 {
return 0, fmt.Errorf("not enough drive containers with valid AddedDrives capacity (found %d, need at least 5)", goodContainersCount)
}
avgPerContainer := goodContainersCapacitySumBytes / int64(goodContainersCount)
totalRawBytes := avgPerContainer * int64(numDriveContainers)
return int(totalRawBytes / (1024 * 1024 * 1024)), nil
}

// if cluster is Ready, we don't need to fetch nodes using nodeSelector and compute capacity based
// on node annotations, because we already have real capacity info from the running drive containers.
func computeMaxNodeDriveCapacityForReadyCluster(containers []*weka.WekaContainer, numDriveContainers, numDrives int) (int, error) {
return ComputeTotalCapacityFromContainers(containers, numDriveContainers, numDrives)
}
134 changes: 134 additions & 0 deletions internal/controllers/allocator/hugepages_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package allocator

import (
"context"
"testing"

globalconfig "github.com/weka/weka-operator/internal/config"
)

func TestComputeCapacityBasedHugepages_MaxCap(t *testing.T) {
// Use known scenario: containerCapacity=5000, driveContainers=6, computeContainers=6, computeCores=1
// total=30000GiB, all TLC: 30000*1024/1000=30720 cluster MiB, /6=5120 + 1700 = 6820
totalRawCapacityGiB := 5000 * 6 // 30000
computeContainers := 6
computeCores := 1

tests := []struct {
name string
maxCap int
expected int
}{
{
name: "no cap (0)",
maxCap: 0,
expected: 6820, // uncapped result
},
{
name: "cap above result",
maxCap: 500000,
expected: 6820, // cap not applied
},
{
name: "cap below result, even",
maxCap: 5000,
expected: 5000, // clamped, already even
},
{
name: "cap exactly equals result",
maxCap: 6820,
expected: 6820, // unchanged
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
origMax := globalconfig.Config.ComputeMaxHugepagesMiB
defer func() { globalconfig.Config.ComputeMaxHugepagesMiB = origMax }()
globalconfig.Config.ComputeMaxHugepagesMiB = tt.maxCap

got := ComputeCapacityBasedHugepages(
context.Background(), totalRawCapacityGiB, computeContainers, computeCores, nil,
)
if got != tt.expected {
t.Errorf("expected %d, got %d", tt.expected, got)
}
})
}
}

func TestCalculateDriveHugepages(t *testing.T) {
tests := []struct {
name string
numDrives int
driveCores int
expected int
}{
{
name: "traditional mode (NumDrives > 0)",
numDrives: 4,
driveCores: 2,
expected: 1400*2 + 200*4, // 3600
},
{
name: "drive-sharing mode (NumDrives == 0)",
numDrives: 0,
driveCores: 2,
expected: 1600 * 2, // 3200
},
{
name: "traditional, single core, single drive",
numDrives: 1,
driveCores: 1,
expected: 1400 + 200, // 1600
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
template := ClusterTemplate{
NumDrives: tt.numDrives,
Cores: IntPerWekaRole{Drive: tt.driveCores},
}
got := CalculateDriveHugepages(template)
if got != tt.expected {
t.Errorf("expected %d, got %d", tt.expected, got)
}
})
}
}

func TestCalculateDriveHugepagesOffset(t *testing.T) {
tests := []struct {
name string
numDrives int
driveCores int
expected int
}{
{
name: "traditional mode (NumDrives > 0)",
numDrives: 4,
driveCores: 2,
expected: 200 * 4, // 800
},
{
name: "drive-sharing mode (NumDrives == 0)",
numDrives: 0,
driveCores: 2,
expected: 200 * 2, // 400
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
template := ClusterTemplate{
NumDrives: tt.numDrives,
Cores: IntPerWekaRole{Drive: tt.driveCores},
}
got := CalculateDriveHugepagesOffset(template)
if got != tt.expected {
t.Errorf("expected %d, got %d", tt.expected, got)
}
})
}
}
Loading
Loading