Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion charts/weka-operator/resources/weka_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Disk:
path: str
is_mounted: bool
serial_id: Optional[str]
capacity_gib: Optional[int] = None


MODE = os.environ.get("MODE")
Expand Down Expand Up @@ -454,12 +455,24 @@ def has_mountpoint(device_info: dict) -> bool:
if has_mountpoint(child):
return True
return False

async def get_capacity_gib(device_path: str) -> int:
"""Get the capacity of the device in GiB."""
cmd = f"blockdev --getsize64 {device_path}"
stdout, stderr, ec = await run_command(cmd, capture_stdout=True)
if ec == 0:
size_bytes = int(stdout.decode().strip())
return size_bytes // (1024 ** 3) # Convert to GiB
else:
raise Exception(f"Failed to get capacity for {device_path}: {stderr.decode()}")

for device in data.get("blockdevices", []):
if device.get("type") == "disk":
is_mounted = has_mountpoint(device)
serial_id = device.get("serial")
device_path = device["name"]
capacity_gib = await get_capacity_gib(device_path)

if not serial_id:
logging.warning(f"lsblk did not return serial for {device_path}. Using fallback.")
serial_id = await get_serial_id_fallback(device_path)
Expand All @@ -469,7 +482,7 @@ def has_mountpoint(device_info: dict) -> bool:
serial_id = await get_serial_id_cos_specific(device_name)

logging.info(f"Found drive: {device_path}, mounted: {is_mounted}, serial: {serial_id}")
disks.append(Disk(path=device_path, is_mounted=is_mounted, serial_id=serial_id))
disks.append(Disk(path=device_path, is_mounted=is_mounted, serial_id=serial_id, capacity_gib=capacity_gib))

return disks

Expand Down
3 changes: 2 additions & 1 deletion internal/controllers/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ type Allocator interface {
}

type AllocatorNodeInfo struct {
AvailableDrives []string
// AvailableDrives contains available (non-blocked) drives for non-proxy mode.
AvailableDrives []domain.DriveEntry
// SharedDrives contains shared drive information for drive sharing mode (proxy mode)
// Empty if node doesn't have shared drives or is using non-proxy mode
SharedDrives []domain.SharedDriveInfo
Expand Down
6 changes: 3 additions & 3 deletions internal/controllers/allocator/container_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ func (a *ContainerResourceAllocator) getAvailableDrivesFromStatus(ctx context.Co
allDrives := nodeInfo.AvailableDrives
logger.Debug("Found drives on node", "total", len(allDrives))

// Filter out allocated drives
// Filter out allocated drives (keyed by serial)
availableDrives := []string{}
for _, drive := range allDrives {
if !allocatedDrives[drive] {
availableDrives = append(availableDrives, drive)
if !allocatedDrives[drive.Serial] {
availableDrives = append(availableDrives, drive.Serial)
}
}

Expand Down
64 changes: 56 additions & 8 deletions internal/controllers/allocator/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/weka/weka-operator/internal/consts"
"github.com/weka/weka-operator/internal/pkg/domain"
"github.com/weka/weka-operator/internal/services/kubernetes"
)

type NodeInfoGetter func(ctx context.Context, nodeName weka.NodeName) (*AllocatorNodeInfo, error)
Expand Down Expand Up @@ -46,25 +47,27 @@ func NewK8sNodeInfoGetter(k8sClient client.Client) NodeInfoGetter {
return
}

availableDrives := []string{}
allDrives := []string{}
err = json.Unmarshal([]byte(allDrivesStr), &allDrives)
// Parse as new []DriveEntry format only — old []string format is an error
// (sign-drives will convert old format on its next run)
var allEntries []domain.DriveEntry
err = json.Unmarshal([]byte(allDrivesStr), &allEntries)
if err != nil {
err = fmt.Errorf("failed to unmarshal weka-drives: %v", err)
err = fmt.Errorf("failed to unmarshal weka-drives as DriveEntry format (old format pending migration): %v", err)
return
}

for _, drive := range allDrives {
if !slices.Contains(blockedDriveSerials, drive) {
availableDrives = append(availableDrives, drive)
availableDrives := make([]domain.DriveEntry, 0, len(allEntries))
for _, entry := range allEntries {
if !slices.Contains(blockedDriveSerials, entry.Serial) {
availableDrives = append(availableDrives, entry)
}
}

nodeInfo.AvailableDrives = availableDrives
} else {
// No exclusive drives annotation - set empty list
// This is expected in drive-sharing/proxy mode where we only use shared drives
nodeInfo.AvailableDrives = []string{}
nodeInfo.AvailableDrives = []domain.DriveEntry{}
}

var sharedDrives []domain.SharedDriveInfo
Expand Down Expand Up @@ -95,6 +98,51 @@ func NewK8sNodeInfoGetter(k8sClient client.Client) NodeInfoGetter {
}
}

const maxNodeSample = 3

// computeMaxNodeDriveCapacity samples up to maxNodeSample nodes matching the selector,
// computes the top-numDrives capacity sum per node, and returns the maximum.
// This represents the worst-case (most memory) capacity a single drive container could manage.
func computeMaxNodeDriveCapacity(ctx context.Context, k8sClient client.Client, nodeSelector map[string]string, numDrives int) (int, error) {
kubeService := kubernetes.NewKubeService(k8sClient)
nodes, err := kubeService.GetNodes(ctx, nodeSelector)
if err != nil {
return 0, err
}

sampleSize := min(len(nodes), maxNodeSample)

maxCapacity := 0
for i := 0; i < sampleSize; i++ {
node := nodes[i]
drivesStr, ok := node.Annotations[consts.AnnotationWekaDrives]
if !ok || drivesStr == "" {
continue
}
entries, _, err := domain.ParseDriveEntries(drivesStr)
if err != nil {
continue
}

// Sort capacities descending, take top numDrives
capacities := make([]int, 0, len(entries))
for _, e := range entries {
if e.CapacityGiB > 0 {
capacities = append(capacities, e.CapacityGiB)
}
}
slices.SortFunc(capacities, func(a, b int) int { return b - a }) // descending

nodeSum := 0
for j := 0; j < min(numDrives, len(capacities)); j++ {
nodeSum += capacities[j]
}
maxCapacity = max(maxCapacity, nodeSum)
}

return maxCapacity, nil
}

// filterBlockedSharedDrives removes blocked drives from the list
// blockedUUIDs is a list of virtual UUIDs that are blocked (via shared drive annotation or drive serials)
func filterBlockedSharedDrives(drives []domain.SharedDriveInfo, blockedDrivePhysicalUUIDs, blockedDriveSerials []string) []domain.SharedDriveInfo {
Expand Down
Loading
Loading