diff --git a/charts/weka-operator/resources/weka_runtime.py b/charts/weka-operator/resources/weka_runtime.py index 7ecadc666..650d6193f 100644 --- a/charts/weka-operator/resources/weka_runtime.py +++ b/charts/weka-operator/resources/weka_runtime.py @@ -33,6 +33,7 @@ class Disk: path: str is_mounted: bool serial_id: Optional[str] + capacity_gib: Optional[int] = None MODE = os.environ.get("MODE") @@ -454,12 +455,24 @@ def has_mountpoint(device_info: dict) -> bool: if has_mountpoint(child): return True return False + + async def get_capacity_gib(device_path: str) -> int: + """Get the capacity of the device in GiB.""" + cmd = f"blockdev --getsize64 {device_path}" + stdout, stderr, ec = await run_command(cmd, capture_stdout=True) + if ec == 0: + size_bytes = int(stdout.decode().strip()) + return size_bytes // (1024 ** 3) # Convert to GiB + else: + raise Exception(f"Failed to get capacity for {device_path}: {stderr.decode()}") for device in data.get("blockdevices", []): if device.get("type") == "disk": is_mounted = has_mountpoint(device) serial_id = device.get("serial") device_path = device["name"] + capacity_gib = await get_capacity_gib(device_path) + if not serial_id: logging.warning(f"lsblk did not return serial for {device_path}. Using fallback.") serial_id = await get_serial_id_fallback(device_path) @@ -469,7 +482,7 @@ def has_mountpoint(device_info: dict) -> bool: serial_id = await get_serial_id_cos_specific(device_name) logging.info(f"Found drive: {device_path}, mounted: {is_mounted}, serial: {serial_id}") - disks.append(Disk(path=device_path, is_mounted=is_mounted, serial_id=serial_id)) + disks.append(Disk(path=device_path, is_mounted=is_mounted, serial_id=serial_id, capacity_gib=capacity_gib)) return disks diff --git a/internal/controllers/allocator/allocator.go b/internal/controllers/allocator/allocator.go index a39ff6f15..011c375b8 100644 --- a/internal/controllers/allocator/allocator.go +++ b/internal/controllers/allocator/allocator.go @@ -111,7 +111,8 @@ type Allocator interface { } type AllocatorNodeInfo struct { - AvailableDrives []string + // AvailableDrives contains available (non-blocked) drives for non-proxy mode. + AvailableDrives []domain.DriveEntry // SharedDrives contains shared drive information for drive sharing mode (proxy mode) // Empty if node doesn't have shared drives or is using non-proxy mode SharedDrives []domain.SharedDriveInfo diff --git a/internal/controllers/allocator/container_allocator.go b/internal/controllers/allocator/container_allocator.go index 25ecec54d..903d466c7 100644 --- a/internal/controllers/allocator/container_allocator.go +++ b/internal/controllers/allocator/container_allocator.go @@ -184,11 +184,11 @@ func (a *ContainerResourceAllocator) getAvailableDrivesFromStatus(ctx context.Co allDrives := nodeInfo.AvailableDrives logger.Debug("Found drives on node", "total", len(allDrives)) - // Filter out allocated drives + // Filter out allocated drives (keyed by serial) availableDrives := []string{} for _, drive := range allDrives { - if !allocatedDrives[drive] { - availableDrives = append(availableDrives, drive) + if !allocatedDrives[drive.Serial] { + availableDrives = append(availableDrives, drive.Serial) } } diff --git a/internal/controllers/allocator/node_info.go b/internal/controllers/allocator/node_info.go index 46506984f..bb1bdc0cc 100644 --- a/internal/controllers/allocator/node_info.go +++ b/internal/controllers/allocator/node_info.go @@ -12,6 +12,7 @@ import ( "github.com/weka/weka-operator/internal/consts" "github.com/weka/weka-operator/internal/pkg/domain" + "github.com/weka/weka-operator/internal/services/kubernetes" ) type NodeInfoGetter func(ctx context.Context, nodeName weka.NodeName) (*AllocatorNodeInfo, error) @@ -46,17 +47,19 @@ func NewK8sNodeInfoGetter(k8sClient client.Client) NodeInfoGetter { return } - availableDrives := []string{} - allDrives := []string{} - err = json.Unmarshal([]byte(allDrivesStr), &allDrives) + // Parse as new []DriveEntry format only — old []string format is an error + // (sign-drives will convert old format on its next run) + var allEntries []domain.DriveEntry + err = json.Unmarshal([]byte(allDrivesStr), &allEntries) if err != nil { - err = fmt.Errorf("failed to unmarshal weka-drives: %v", err) + err = fmt.Errorf("failed to unmarshal weka-drives as DriveEntry format (old format pending migration): %v", err) return } - for _, drive := range allDrives { - if !slices.Contains(blockedDriveSerials, drive) { - availableDrives = append(availableDrives, drive) + availableDrives := make([]domain.DriveEntry, 0, len(allEntries)) + for _, entry := range allEntries { + if !slices.Contains(blockedDriveSerials, entry.Serial) { + availableDrives = append(availableDrives, entry) } } @@ -64,7 +67,7 @@ func NewK8sNodeInfoGetter(k8sClient client.Client) NodeInfoGetter { } else { // No exclusive drives annotation - set empty list // This is expected in drive-sharing/proxy mode where we only use shared drives - nodeInfo.AvailableDrives = []string{} + nodeInfo.AvailableDrives = []domain.DriveEntry{} } var sharedDrives []domain.SharedDriveInfo @@ -95,6 +98,51 @@ func NewK8sNodeInfoGetter(k8sClient client.Client) NodeInfoGetter { } } +const maxNodeSample = 3 + +// computeMaxNodeDriveCapacity samples up to maxNodeSample nodes matching the selector, +// computes the top-numDrives capacity sum per node, and returns the maximum. +// This represents the worst-case (most memory) capacity a single drive container could manage. +func computeMaxNodeDriveCapacity(ctx context.Context, k8sClient client.Client, nodeSelector map[string]string, numDrives int) (int, error) { + kubeService := kubernetes.NewKubeService(k8sClient) + nodes, err := kubeService.GetNodes(ctx, nodeSelector) + if err != nil { + return 0, err + } + + sampleSize := min(len(nodes), maxNodeSample) + + maxCapacity := 0 + for i := 0; i < sampleSize; i++ { + node := nodes[i] + drivesStr, ok := node.Annotations[consts.AnnotationWekaDrives] + if !ok || drivesStr == "" { + continue + } + entries, _, err := domain.ParseDriveEntries(drivesStr) + if err != nil { + continue + } + + // Sort capacities descending, take top numDrives + capacities := make([]int, 0, len(entries)) + for _, e := range entries { + if e.CapacityGiB > 0 { + capacities = append(capacities, e.CapacityGiB) + } + } + slices.SortFunc(capacities, func(a, b int) int { return b - a }) // descending + + nodeSum := 0 + for j := 0; j < min(numDrives, len(capacities)); j++ { + nodeSum += capacities[j] + } + maxCapacity = max(maxCapacity, nodeSum) + } + + return maxCapacity, nil +} + // filterBlockedSharedDrives removes blocked drives from the list // blockedUUIDs is a list of virtual UUIDs that are blocked (via shared drive annotation or drive serials) func filterBlockedSharedDrives(drives []domain.SharedDriveInfo, blockedDrivePhysicalUUIDs, blockedDriveSerials []string) []domain.SharedDriveInfo { diff --git a/internal/controllers/allocator/templates.go b/internal/controllers/allocator/templates.go index 6635495b8..4fd78a1f8 100644 --- a/internal/controllers/allocator/templates.go +++ b/internal/controllers/allocator/templates.go @@ -1,30 +1,45 @@ package allocator import ( + "context" + "fmt" + "github.com/weka/weka-k8s-api/api/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/client" globalconfig "github.com/weka/weka-operator/internal/config" "github.com/weka/weka-operator/pkg/util" ) +// ClusterLayout contains container counts, cores, and drive configuration fields. +// It is returned by GetTemplateByName for callers that don't need hugepages. +type ClusterLayout struct { + DriveCores int + DriveExtraCores int + ComputeCores int + ComputeExtraCores int + EnvoyCores int + S3Cores int + S3ExtraCores int + NfsCores int + NfsExtraCores int + ComputeContainers int + DriveContainers int + S3Containers int + NfsContainers int + NumDrives int + DriveCapacity int + ContainerCapacity int + DriveTypesRatio *v1alpha1.DriveTypesRatio + DataServicesCores int + DataServicesExtraCores int + DataServicesContainers int +} + +// ClusterTemplate embeds ClusterLayout and adds hugepages fields. +// It is returned by GetEnrichedTemplate for the container creation path. type ClusterTemplate struct { - DriveCores int - DriveExtraCores int - ComputeCores int - ComputeExtraCores int - EnvoyCores int - S3Cores int - S3ExtraCores int - NfsCores int - NfsExtraCores int - ComputeContainers int - DriveContainers int - S3Containers int - NfsContainers int - NumDrives int - DriveCapacity int - ContainerCapacity int - DriveTypesRatio *v1alpha1.DriveTypesRatio + ClusterLayout DriveHugepages int DriveHugepagesOffset int ComputeHugepages int @@ -35,9 +50,6 @@ type ClusterTemplate struct { S3FrontendHugepagesOffset int NfsFrontendHugepages int NfsFrontendHugepagesOffset int - DataServicesCores int - DataServicesExtraCores int - DataServicesContainers int DataServicesHugepages int DataServicesHugepagesOffset int } @@ -195,20 +207,28 @@ func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterTemplate { } return ClusterTemplate{ - DriveCores: config.DriveCores, - DriveExtraCores: config.DriveExtraCores, - ComputeCores: config.ComputeCores, - ComputeExtraCores: config.ComputeExtraCores, - ComputeContainers: *config.ComputeContainers, - DriveContainers: *config.DriveContainers, - S3Containers: config.S3Containers, - S3Cores: config.S3Cores, - S3ExtraCores: config.S3ExtraCores, - NfsContainers: config.NfsContainers, - NumDrives: config.NumDrives, - DriveCapacity: config.DriveCapacity, - ContainerCapacity: config.ContainerCapacity, - DriveTypesRatio: config.DriveTypesRatio, + ClusterLayout: ClusterLayout{ + DriveCores: config.DriveCores, + DriveExtraCores: config.DriveExtraCores, + ComputeCores: config.ComputeCores, + ComputeExtraCores: config.ComputeExtraCores, + ComputeContainers: *config.ComputeContainers, + DriveContainers: *config.DriveContainers, + S3Containers: config.S3Containers, + S3Cores: config.S3Cores, + S3ExtraCores: config.S3ExtraCores, + NfsContainers: config.NfsContainers, + NumDrives: config.NumDrives, + DriveCapacity: config.DriveCapacity, + ContainerCapacity: config.ContainerCapacity, + DriveTypesRatio: config.DriveTypesRatio, + EnvoyCores: config.EnvoyCores, + NfsCores: config.NfsCores, + NfsExtraCores: config.NfsExtraCores, + DataServicesContainers: config.DataServicesContainers, + DataServicesCores: config.DataServicesCores, + DataServicesExtraCores: config.DataServicesExtraCores, + }, DriveHugepages: config.DriveHugepages, DriveHugepagesOffset: config.DriveHugepagesOffset, ComputeHugepages: config.ComputeHugepages, @@ -216,81 +236,132 @@ func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterTemplate { S3FrontendHugepages: config.S3FrontendHugepages, S3FrontendHugepagesOffset: config.S3FrontendHugepagesOffset, HugePageSize: hgSize, - EnvoyCores: config.EnvoyCores, - NfsCores: config.NfsCores, - NfsExtraCores: config.NfsExtraCores, NfsFrontendHugepages: config.NfsFrontendHugepages, NfsFrontendHugepagesOffset: config.NfsFrontendHugepagesOffset, - DataServicesContainers: config.DataServicesContainers, - DataServicesCores: config.DataServicesCores, - DataServicesExtraCores: config.DataServicesExtraCores, DataServicesHugepages: config.DataServicesHugepages, DataServicesHugepagesOffset: config.DataServicesHugepagesOffset, } } -func GetTemplateByName(name string, cluster v1alpha1.WekaCluster) (ClusterTemplate, bool) { +// computeHugepagesForCompute calculates compute hugepages from cores and raw capacity. +func computeHugepagesForCompute(computeCores, totalRawCapacityGiB, computeContainers int) int { + capacityComponent := 0 + if computeContainers > 0 && totalRawCapacityGiB > 0 { + capacityComponent = totalRawCapacityGiB / computeContainers + } + perCoreComponent := 1700 * computeCores + minHugepages := 3000 * computeCores + return max(capacityComponent+perCoreComponent, minHugepages) +} + +// GetTemplateByName returns the ClusterLayout (no hugepages) for the named template. +// Use GetEnrichedTemplate when hugepages fields are needed. +func GetTemplateByName(name string, cluster v1alpha1.WekaCluster) (ClusterLayout, bool) { if name == "dynamic" { - return BuildDynamicTemplate(cluster.Spec.Dynamic), true + return BuildDynamicTemplate(cluster.Spec.Dynamic).ClusterLayout, true } template, ok := WekaClusterTemplates[name] - return template, ok + return template.ClusterLayout, ok +} + +// GetEnrichedTemplate returns a full ClusterTemplate (including hugepages) for the named template. +// For dynamic templates, it enriches compute hugepages with actual drive capacity from node +// annotations when the spec doesn't provide capacity info (traditional drive mode). +func GetEnrichedTemplate(ctx context.Context, k8sClient client.Client, name string, cluster v1alpha1.WekaCluster) (*ClusterTemplate, error) { + if name != "dynamic" { + template, ok := WekaClusterTemplates[name] + if !ok { + return nil, nil + } + return &template, nil + } + if cluster.Spec.Dynamic == nil { + return nil, nil + } + + tmpl := BuildDynamicTemplate(cluster.Spec.Dynamic) + + // Enrich compute hugepages with actual drive capacity from nodes + // when the spec doesn't provide capacity info (traditional drive mode) + if tmpl.ContainerCapacity == 0 && tmpl.DriveCapacity == 0 && + tmpl.ComputeHugepages == 3000*tmpl.ComputeCores { + + maxNodeCap, err := computeMaxNodeDriveCapacity(ctx, k8sClient, cluster.Spec.NodeSelector, tmpl.NumDrives) + if err != nil { + return nil, fmt.Errorf("failed to compute node drive capacity: %w", err) + } + if maxNodeCap > 0 { + totalRawCapacity := tmpl.DriveContainers * maxNodeCap + tmpl.ComputeHugepages = computeHugepagesForCompute( + tmpl.ComputeCores, totalRawCapacity, tmpl.ComputeContainers) + } + } + + return &tmpl, nil } var WekaClusterTemplates = map[string]ClusterTemplate{ "small_s3": { - DriveCores: 1, - ComputeCores: 1, - ComputeContainers: 6, - DriveContainers: 6, - S3Containers: 6, - S3Cores: 1, - S3ExtraCores: 1, - NumDrives: 1, + ClusterLayout: ClusterLayout{ + DriveCores: 1, + ComputeCores: 1, + ComputeContainers: 6, + DriveContainers: 6, + S3Containers: 6, + S3Cores: 1, + S3ExtraCores: 1, + NumDrives: 1, + EnvoyCores: 1, + }, DriveHugepages: 1500, ComputeHugepages: 3000, S3FrontendHugepages: 1400, HugePageSize: "2Mi", - EnvoyCores: 1, }, "small": { - DriveCores: 1, - ComputeCores: 1, - ComputeContainers: 6, - DriveContainers: 6, - NumDrives: 1, - DriveHugepages: 1500, - ComputeHugepages: 3000, - HugePageSize: "2Mi", - EnvoyCores: 1, + ClusterLayout: ClusterLayout{ + DriveCores: 1, + ComputeCores: 1, + ComputeContainers: 6, + DriveContainers: 6, + NumDrives: 1, + EnvoyCores: 1, + }, + DriveHugepages: 1500, + ComputeHugepages: 3000, + HugePageSize: "2Mi", }, "large": { - DriveCores: 1, - ComputeCores: 1, - ComputeContainers: 20, - DriveContainers: 20, - NumDrives: 1, - DriveHugepages: 1500, - ComputeHugepages: 3000, - HugePageSize: "2Mi", - EnvoyCores: 2, - S3Containers: 5, - S3Cores: 1, - S3ExtraCores: 2, + ClusterLayout: ClusterLayout{ + DriveCores: 1, + ComputeCores: 1, + ComputeContainers: 20, + DriveContainers: 20, + NumDrives: 1, + EnvoyCores: 2, + S3Containers: 5, + S3Cores: 1, + S3ExtraCores: 2, + }, + DriveHugepages: 1500, + ComputeHugepages: 3000, + HugePageSize: "2Mi", }, "small_nfs": { - DriveCores: 1, - ComputeCores: 1, - ComputeContainers: 6, - DriveContainers: 6, - NfsContainers: 2, - NumDrives: 1, + ClusterLayout: ClusterLayout{ + DriveCores: 1, + ComputeCores: 1, + ComputeContainers: 6, + DriveContainers: 6, + NfsContainers: 2, + NumDrives: 1, + EnvoyCores: 1, + }, DriveHugepages: 1500, ComputeHugepages: 3000, NfsFrontendHugepages: 1200, HugePageSize: "2Mi", - EnvoyCores: 1, }, } diff --git a/internal/controllers/allocator/templates_test.go b/internal/controllers/allocator/templates_test.go index 799af4493..9d8db7d98 100644 --- a/internal/controllers/allocator/templates_test.go +++ b/internal/controllers/allocator/templates_test.go @@ -1,10 +1,18 @@ package allocator import ( + "context" + "encoding/json" "testing" weka "github.com/weka/weka-k8s-api/api/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "github.com/weka/weka-operator/internal/consts" + "github.com/weka/weka-operator/internal/pkg/domain" "github.com/weka/weka-operator/pkg/util" ) @@ -120,3 +128,185 @@ func TestBuildDynamicTemplate_ComputeHugepages(t *testing.T) { }) } } + +func makeNode(name string, drives []domain.DriveEntry, labels map[string]string) *v1.Node { + annotations := map[string]string{} + if drives != nil { + b, _ := json.Marshal(drives) + annotations[consts.AnnotationWekaDrives] = string(b) + } + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: labels, + Annotations: annotations, + }, + } +} + +func TestGetEnrichedTemplate_EnrichesFromNodeDrives(t *testing.T) { + labels := map[string]string{"weka.io/role": "server"} + drives := []domain.DriveEntry{ + {Serial: "sn1", CapacityGiB: 3000}, + {Serial: "sn2", CapacityGiB: 4000}, + } + node := makeNode("node1", drives, labels) + + scheme := runtime.NewScheme() + _ = v1.AddToScheme(scheme) + k8sClient := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(node).Build() + + cluster := weka.WekaCluster{ + Spec: weka.WekaClusterSpec{ + Template: "dynamic", + NodeSelector: labels, + Dynamic: &weka.WekaConfig{ + ComputeCores: 1, + NumDrives: 2, // takes top 2 drives per node → 3000+4000 = 7000 per drive container + // No ContainerCapacity/DriveCapacity → traditional mode + }, + }, + } + + tmpl, err := GetEnrichedTemplate(context.Background(), k8sClient, "dynamic", cluster) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if tmpl == nil { + t.Fatal("expected template to be found") + } + + // totalRawCapacity = driveContainers(6) * maxNodeCap(7000) = 42000 + // hugepages = max(42000/6 + 1700*1, 3000*1) = max(7000+1700, 3000) = 8700 + if tmpl.ComputeHugepages != 8700 { + t.Errorf("expected enriched ComputeHugepages=8700, got %d", tmpl.ComputeHugepages) + } +} + +func TestGetEnrichedTemplate_SkipsEnrichmentWhenContainerCapacitySet(t *testing.T) { + labels := map[string]string{"weka.io/role": "server"} + drives := []domain.DriveEntry{ + {Serial: "sn1", CapacityGiB: 5000}, + } + node := makeNode("node1", drives, labels) + + scheme := runtime.NewScheme() + _ = v1.AddToScheme(scheme) + k8sClient := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(node).Build() + + cluster := weka.WekaCluster{ + Spec: weka.WekaClusterSpec{ + Template: "dynamic", + NodeSelector: labels, + Dynamic: &weka.WekaConfig{ + ComputeCores: 1, + ContainerCapacity: 2000, // capacity set → no enrichment + }, + }, + } + + tmpl, err := GetEnrichedTemplate(context.Background(), k8sClient, "dynamic", cluster) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if tmpl == nil { + t.Fatal("expected template to be found") + } + + // With ContainerCapacity=2000, driveContainers=6, computeContainers=6, all TLC: + // totalRaw=12000GiB, tlcMiB=12000*1024/1000=12288, /6=2048 + 1700 = 3748 + if tmpl.ComputeHugepages != 3748 { + t.Errorf("expected ComputeHugepages=3748 (from spec capacity), got %d", tmpl.ComputeHugepages) + } +} + +func TestGetEnrichedTemplate_SkipsEnrichmentWhenUserOverridesHugepages(t *testing.T) { + labels := map[string]string{"weka.io/role": "server"} + drives := []domain.DriveEntry{ + {Serial: "sn1", CapacityGiB: 5000}, + } + node := makeNode("node1", drives, labels) + + scheme := runtime.NewScheme() + _ = v1.AddToScheme(scheme) + k8sClient := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(node).Build() + + cluster := weka.WekaCluster{ + Spec: weka.WekaClusterSpec{ + Template: "dynamic", + NodeSelector: labels, + Dynamic: &weka.WekaConfig{ + ComputeCores: 1, + NumDrives: 1, + ComputeHugepages: 9999, // user override + }, + }, + } + + tmpl, err := GetEnrichedTemplate(context.Background(), k8sClient, "dynamic", cluster) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if tmpl == nil { + t.Fatal("expected template to be found") + } + + if tmpl.ComputeHugepages != 9999 { + t.Errorf("expected user override ComputeHugepages=9999, got %d", tmpl.ComputeHugepages) + } +} + +func TestGetEnrichedTemplate_StaticTemplateUnchanged(t *testing.T) { + scheme := runtime.NewScheme() + _ = v1.AddToScheme(scheme) + k8sClient := fakeclient.NewClientBuilder().WithScheme(scheme).Build() + + cluster := weka.WekaCluster{ + Spec: weka.WekaClusterSpec{ + Template: "small", + }, + } + + tmpl, err := GetEnrichedTemplate(context.Background(), k8sClient, "small", cluster) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if tmpl == nil { + t.Fatal("expected template to be found") + } + + expected := WekaClusterTemplates["small"] + if tmpl.ComputeHugepages != expected.ComputeHugepages { + t.Errorf("expected ComputeHugepages=%d, got %d", expected.ComputeHugepages, tmpl.ComputeHugepages) + } +} + +func TestGetEnrichedTemplate_NoNodesGracefulFallback(t *testing.T) { + scheme := runtime.NewScheme() + _ = v1.AddToScheme(scheme) + k8sClient := fakeclient.NewClientBuilder().WithScheme(scheme).Build() // no nodes + + cluster := weka.WekaCluster{ + Spec: weka.WekaClusterSpec{ + Template: "dynamic", + NodeSelector: map[string]string{"weka.io/role": "server"}, + Dynamic: &weka.WekaConfig{ + ComputeCores: 1, + NumDrives: 1, + }, + }, + } + + tmpl, err := GetEnrichedTemplate(context.Background(), k8sClient, "dynamic", cluster) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if tmpl == nil { + t.Fatal("expected template to be found") + } + + // No nodes → no enrichment → default minimum + if tmpl.ComputeHugepages != 3000 { + t.Errorf("expected fallback ComputeHugepages=3000, got %d", tmpl.ComputeHugepages) + } +} diff --git a/internal/controllers/operations/block_drives.go b/internal/controllers/operations/block_drives.go index 53f80c3ad..7f3179cb4 100644 --- a/internal/controllers/operations/block_drives.go +++ b/internal/controllers/operations/block_drives.go @@ -138,7 +138,8 @@ func (o *BlockDrivesOperation) UnblockDrives(ctx context.Context) error { allDrives := []string{} if allDrivesStr, ok := node.Annotations[consts.AnnotationWekaDrives]; ok { - json.Unmarshal([]byte(allDrivesStr), &allDrives) + entries, _, _ := domain.ParseDriveEntries(allDrivesStr) + allDrives = domain.DriveEntrySerials(entries) } logger.Debug("Available drives", "drives", allDrives) @@ -217,7 +218,8 @@ func (o *BlockDrivesOperation) BlockDrives(ctx context.Context) error { allDrives := []string{} if allDrivesStr, ok := node.Annotations[consts.AnnotationWekaDrives]; ok { - json.Unmarshal([]byte(allDrivesStr), &allDrives) + entries, _, _ := domain.ParseDriveEntries(allDrivesStr) + allDrives = domain.DriveEntrySerials(entries) } logger.Debug("Available drives", "drives", allDrives) diff --git a/internal/controllers/operations/discover_drives.go b/internal/controllers/operations/discover_drives.go index dd6431b96..dc84aaaf7 100644 --- a/internal/controllers/operations/discover_drives.go +++ b/internal/controllers/operations/discover_drives.go @@ -44,9 +44,10 @@ type DiscoverDrivesOperation struct { } type DriveRawInfo struct { - SerialId string `json:"serial_id"` - Path string `json:"path"` - IsMounted bool `json:"is_mounted"` + SerialId string `json:"serial_id"` + Path string `json:"path"` + IsMounted bool `json:"is_mounted"` + CapacityGiB int `json:"capacity_gib"` } type DriveNodeResults struct { diff --git a/internal/controllers/operations/sign_drives.go b/internal/controllers/operations/sign_drives.go index 8dac6365e..fbb78f22b 100644 --- a/internal/controllers/operations/sign_drives.go +++ b/internal/controllers/operations/sign_drives.go @@ -172,6 +172,19 @@ func (o *SignDrivesOperation) EnsureContainers(ctx context.Context) error { // if data exists and not force - skip if !o.force { + // Detect old annotation format and invalidate hash to trigger re-run + if drivesStr, ok := node.Annotations[consts.AnnotationWekaDrives]; ok && drivesStr != "" { + _, isOldFormat, _ := domain.ParseDriveEntries(drivesStr) + if isOldFormat { + // Clear hash so sign-drives re-runs and writes the new format + delete(node.Annotations, consts.AnnotationSignDrivesHash) + if err := o.client.Update(ctx, &node); err != nil { + return fmt.Errorf("failed to clear sign-drives hash for format migration on node %s: %w", node.Name, err) + } + return lifecycle.NewWaitError(fmt.Errorf("detected old weka-drives annotation format on node %s, cleared hash to trigger migration, waiting for re-run", node.Name)) + } + } + targetHash := domain.CalculateNodeDriveSignHash(&node) if node.Annotations[consts.AnnotationSignDrivesHash] == targetHash { skip += 1 @@ -390,11 +403,11 @@ func getAlreadySignedDrives(node *v1.Node) []string { return alreadySignedDrives } - // Regular drives (non-proxy mode) + // Regular drives (non-proxy mode) — handles both old []string and new []DriveEntry formats if drivesStr, ok := node.Annotations[consts.AnnotationWekaDrives]; ok && drivesStr != "" { - var drives []string - if err := json.Unmarshal([]byte(drivesStr), &drives); err == nil { - alreadySignedDrives = append(alreadySignedDrives, drives...) + entries, _, err := domain.ParseDriveEntries(drivesStr) + if err == nil { + alreadySignedDrives = append(alreadySignedDrives, domain.DriveEntrySerials(entries)...) } } diff --git a/internal/controllers/operations/sign_drives_test.go b/internal/controllers/operations/sign_drives_test.go new file mode 100644 index 000000000..7913ae8ed --- /dev/null +++ b/internal/controllers/operations/sign_drives_test.go @@ -0,0 +1,120 @@ +package operations + +import ( + "encoding/json" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/weka/weka-operator/internal/consts" + "github.com/weka/weka-operator/internal/pkg/domain" +) + +func TestGetAlreadySignedDrives_NewFormat(t *testing.T) { + entries := []domain.DriveEntry{ + {Serial: "SERIAL1", CapacityGiB: 500}, + {Serial: "SERIAL2", CapacityGiB: 1000}, + } + data, _ := json.Marshal(entries) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + consts.AnnotationWekaDrives: string(data), + }, + }, + } + + drives := getAlreadySignedDrives(node) + if len(drives) != 2 { + t.Fatalf("expected 2 drives, got %d", len(drives)) + } + if drives[0] != "SERIAL1" || drives[1] != "SERIAL2" { + t.Errorf("unexpected drives: %v", drives) + } +} + +func TestGetAlreadySignedDrives_OldFormat(t *testing.T) { + serials := []string{"OLD1", "OLD2", "OLD3"} + data, _ := json.Marshal(serials) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + consts.AnnotationWekaDrives: string(data), + }, + }, + } + + drives := getAlreadySignedDrives(node) + if len(drives) != 3 { + t.Fatalf("expected 3 drives, got %d", len(drives)) + } + for i, serial := range serials { + if drives[i] != serial { + t.Errorf("drive %d: expected %q, got %q", i, serial, drives[i]) + } + } +} + +func TestGetAlreadySignedDrives_SharedDrives(t *testing.T) { + sharedDrives := []domain.SharedDriveInfo{ + {PhysicalUUID: "uuid1", Serial: "SHARED1", CapacityGiB: 100, Type: "TLC"}, + {PhysicalUUID: "uuid2", Serial: "SHARED2", CapacityGiB: 200, Type: "QLC"}, + } + data, _ := json.Marshal(sharedDrives) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + consts.AnnotationSharedDrives: string(data), + }, + }, + } + + drives := getAlreadySignedDrives(node) + if len(drives) != 2 { + t.Fatalf("expected 2 drives, got %d", len(drives)) + } + if drives[0] != "SHARED1" || drives[1] != "SHARED2" { + t.Errorf("unexpected drives: %v", drives) + } +} + +func TestGetAlreadySignedDrives_BothAnnotations(t *testing.T) { + regularEntries := []domain.DriveEntry{ + {Serial: "REG1", CapacityGiB: 500}, + } + regularData, _ := json.Marshal(regularEntries) + + sharedDrives := []domain.SharedDriveInfo{ + {PhysicalUUID: "uuid1", Serial: "SHARED1", CapacityGiB: 100, Type: "TLC"}, + } + sharedData, _ := json.Marshal(sharedDrives) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + consts.AnnotationWekaDrives: string(regularData), + consts.AnnotationSharedDrives: string(sharedData), + }, + }, + } + + drives := getAlreadySignedDrives(node) + if len(drives) != 2 { + t.Fatalf("expected 2 drives, got %d", len(drives)) + } +} + +func TestGetAlreadySignedDrives_NoAnnotations(t *testing.T) { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{}, + } + + drives := getAlreadySignedDrives(node) + if len(drives) != 0 { + t.Fatalf("expected 0 drives, got %d", len(drives)) + } +} diff --git a/internal/controllers/wekacluster/steps_cluster_creation.go b/internal/controllers/wekacluster/steps_cluster_creation.go index b802b6d40..31bcbe3fc 100644 --- a/internal/controllers/wekacluster/steps_cluster_creation.go +++ b/internal/controllers/wekacluster/steps_cluster_creation.go @@ -259,8 +259,12 @@ func (r *wekaClusterReconcilerLoop) EnsureWekaContainers(ctx context.Context) er return err } - template, ok := allocator.GetTemplateByName(cluster.Spec.Template, *cluster) - if !ok { + template, err := allocator.GetEnrichedTemplate(ctx, r.Manager.GetClient(), cluster.Spec.Template, *cluster) + if err != nil { + logger.Error(err, "Failed to get template with node capacity") + return err + } + if template == nil { keys := make([]string, 0, len(allocator.WekaClusterTemplates)) for k := range allocator.WekaClusterTemplates { keys = append(keys, k) @@ -271,7 +275,7 @@ func (r *wekaClusterReconcilerLoop) EnsureWekaContainers(ctx context.Context) er } //newContainersLimit := config.Consts.NewContainersLimit - missingContainers, err := BuildMissingContainers(ctx, cluster, template, r.containers) + missingContainers, err := BuildMissingContainers(ctx, cluster, *template, r.containers) if err != nil { logger.Error(err, "Failed to create missing containers") return err diff --git a/internal/controllers/wekacontainer/funcs_oneoff.go b/internal/controllers/wekacontainer/funcs_oneoff.go index 55fc3db54..2f0e455f7 100644 --- a/internal/controllers/wekacontainer/funcs_oneoff.go +++ b/internal/controllers/wekacontainer/funcs_oneoff.go @@ -142,18 +142,26 @@ func (r *containerReconcilerLoop) updateNodeAnnotations(ctx context.Context) err } // Update weka.io/weka-drives annotation (regular mode) - previousDrives := []string{} newDrivesFound := 0 - if existingDrivesStr, ok := node.Annotations[consts.AnnotationWekaDrives]; ok { - _ = json.Unmarshal([]byte(existingDrivesStr), &previousDrives) + + // Build a map from raw drives for capacity lookup + rawDriveCapacity := make(map[string]int) + for _, raw := range opResult.RawDrives { + if raw.SerialId != "" { + rawDriveCapacity[raw.SerialId] = raw.CapacityGiB + } } - seenDrives := make(map[string]bool) - for _, drive := range previousDrives { - if drive == "" { - continue // clean bad records of empty serial ids + // Parse existing annotation (handles both old []string and new []DriveEntry formats) + seenDrives := make(map[string]domain.DriveEntry) + if existingDrivesStr, ok := node.Annotations[consts.AnnotationWekaDrives]; ok && existingDrivesStr != "" { + existingEntries, _, _ := domain.ParseDriveEntries(existingDrivesStr) + for _, entry := range existingEntries { + if entry.Serial == "" { + continue // clean bad records of empty serial ids + } + seenDrives[entry.Serial] = entry } - seenDrives[drive] = true } complete := func() error { @@ -168,16 +176,17 @@ func (r *containerReconcilerLoop) updateNodeAnnotations(ctx context.Context) err if _, ok := seenDrives[drive.SerialId]; !ok { newDrivesFound++ } - seenDrives[drive.SerialId] = true + capacity := rawDriveCapacity[drive.SerialId] + seenDrives[drive.SerialId] = domain.DriveEntry{Serial: drive.SerialId, CapacityGiB: capacity} } if newDrivesFound == 0 { logger.Info("No new drives found") } - updatedDrivesList := []string{} - for drive := range seenDrives { - updatedDrivesList = append(updatedDrivesList, drive) + updatedDrivesList := make([]domain.DriveEntry, 0, len(seenDrives)) + for _, entry := range seenDrives { + updatedDrivesList = append(updatedDrivesList, entry) } newDrivesStr, err := json.Marshal(updatedDrivesList) if err != nil { @@ -213,8 +222,8 @@ func (r *containerReconcilerLoop) updateNodeAnnotations(ctx context.Context) err } availableDrives := 0 - for _, drive := range updatedDrivesList { - if !slices.Contains(blockedDrives, drive) { + for _, entry := range updatedDrivesList { + if !slices.Contains(blockedDrives, entry.Serial) { availableDrives++ } } diff --git a/internal/pkg/domain/drives.go b/internal/pkg/domain/drives.go index 8c3b6a260..5f9f52ab9 100644 --- a/internal/pkg/domain/drives.go +++ b/internal/pkg/domain/drives.go @@ -1,5 +1,54 @@ package domain +import "encoding/json" + +// DriveEntry represents a drive in the weka.io/weka-drives annotation (non-proxy mode). +type DriveEntry struct { + Serial string `json:"serial"` + CapacityGiB int `json:"capacity_gib"` +} + +// ParseDriveEntries parses the weka.io/weka-drives annotation value, handling both +// the new []DriveEntry format and the old []string format for backward compatibility. +// Returns (entries, isOldFormat, error). +func ParseDriveEntries(annotation string) ([]DriveEntry, bool, error) { + if annotation == "" { + return nil, false, nil + } + + // Try new format first: []DriveEntry + var entries []DriveEntry + if err := json.Unmarshal([]byte(annotation), &entries); err == nil { + // Verify it's actually the new format by checking that we didn't get zero-value structs + // from a plain string array. A plain ["serial"] would fail unmarshal into []DriveEntry, + // so if we got here it's genuinely the new format. + return entries, false, nil + } + + // Fallback: try old format []string + var serials []string + if err := json.Unmarshal([]byte(annotation), &serials); err != nil { + return nil, false, err + } + + entries = make([]DriveEntry, 0, len(serials)) + for _, s := range serials { + if s != "" { + entries = append(entries, DriveEntry{Serial: s, CapacityGiB: 0}) + } + } + return entries, true, nil +} + +// DriveEntrySerials extracts serial strings from a slice of DriveEntry. +func DriveEntrySerials(entries []DriveEntry) []string { + serials := make([]string, 0, len(entries)) + for _, e := range entries { + serials = append(serials, e.Serial) + } + return serials +} + // SharedDriveInfo represents a signed drive for proxy mode // This matches the format returned by weka_runtime.py sign_device_path_for_proxy() type SharedDriveInfo struct { diff --git a/internal/pkg/domain/drives_test.go b/internal/pkg/domain/drives_test.go new file mode 100644 index 000000000..fa8ebe210 --- /dev/null +++ b/internal/pkg/domain/drives_test.go @@ -0,0 +1,122 @@ +package domain + +import ( + "encoding/json" + "testing" +) + +func TestParseDriveEntries_NewFormat(t *testing.T) { + entries := []DriveEntry{ + {Serial: "SERIAL1", CapacityGiB: 500}, + {Serial: "SERIAL2", CapacityGiB: 1000}, + } + data, _ := json.Marshal(entries) + + result, isOld, err := ParseDriveEntries(string(data)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if isOld { + t.Fatal("expected isOldFormat=false for new format") + } + if len(result) != 2 { + t.Fatalf("expected 2 entries, got %d", len(result)) + } + if result[0].Serial != "SERIAL1" || result[0].CapacityGiB != 500 { + t.Errorf("unexpected first entry: %+v", result[0]) + } + if result[1].Serial != "SERIAL2" || result[1].CapacityGiB != 1000 { + t.Errorf("unexpected second entry: %+v", result[1]) + } +} + +func TestParseDriveEntries_OldFormat(t *testing.T) { + serials := []string{"SERIAL1", "SERIAL2", "SERIAL3"} + data, _ := json.Marshal(serials) + + result, isOld, err := ParseDriveEntries(string(data)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !isOld { + t.Fatal("expected isOldFormat=true for old format") + } + if len(result) != 3 { + t.Fatalf("expected 3 entries, got %d", len(result)) + } + for i, entry := range result { + if entry.Serial != serials[i] { + t.Errorf("entry %d: expected serial %q, got %q", i, serials[i], entry.Serial) + } + if entry.CapacityGiB != 0 { + t.Errorf("entry %d: expected CapacityGiB=0, got %d", i, entry.CapacityGiB) + } + } +} + +func TestParseDriveEntries_OldFormatSkipsEmpty(t *testing.T) { + serials := []string{"SERIAL1", "", "SERIAL3"} + data, _ := json.Marshal(serials) + + result, isOld, err := ParseDriveEntries(string(data)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !isOld { + t.Fatal("expected isOldFormat=true") + } + if len(result) != 2 { + t.Fatalf("expected 2 entries (empty skipped), got %d", len(result)) + } +} + +func TestParseDriveEntries_Empty(t *testing.T) { + result, isOld, err := ParseDriveEntries("") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if isOld { + t.Fatal("expected isOldFormat=false for empty") + } + if result != nil { + t.Fatalf("expected nil result for empty, got %+v", result) + } +} + +func TestParseDriveEntries_InvalidJSON(t *testing.T) { + _, _, err := ParseDriveEntries("not-json") + if err == nil { + t.Fatal("expected error for invalid JSON") + } +} + +func TestParseDriveEntries_EmptyArray(t *testing.T) { + result, isOld, err := ParseDriveEntries("[]") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if isOld { + t.Fatal("expected isOldFormat=false for empty array") + } + if len(result) != 0 { + t.Fatalf("expected 0 entries, got %d", len(result)) + } +} + +func TestDriveEntrySerials(t *testing.T) { + entries := []DriveEntry{ + {Serial: "A", CapacityGiB: 100}, + {Serial: "B", CapacityGiB: 200}, + } + serials := DriveEntrySerials(entries) + if len(serials) != 2 || serials[0] != "A" || serials[1] != "B" { + t.Errorf("unexpected serials: %v", serials) + } +} + +func TestDriveEntrySerials_Empty(t *testing.T) { + serials := DriveEntrySerials(nil) + if len(serials) != 0 { + t.Errorf("expected empty, got %v", serials) + } +}