Skip to content

Commit 5922bd2

Browse files
committed
STAC-24228: Address comment
1 parent 237e139 commit 5922bd2

1 file changed

Lines changed: 40 additions & 72 deletions

File tree

internal/clients/k8s/client.go

Lines changed: 40 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,11 @@ type RestoreLockInfo struct {
175175

176176
// DeploymentUpdateFunc is a function that modifies a deployment.
177177
// It receives a fresh copy of the deployment and should apply the desired changes.
178-
type DeploymentUpdateFunc func(dep *appsv1.Deployment)
178+
type DeploymentUpdateFunc func(dep *appsv1.Deployment) error
179179

180180
// StatefulSetUpdateFunc is a function that modifies a statefulset.
181181
// It receives a fresh copy of the statefulset and should apply the desired changes.
182-
type StatefulSetUpdateFunc func(sts *appsv1.StatefulSet)
182+
type StatefulSetUpdateFunc func(sts *appsv1.StatefulSet) error
183183

184184
// updateDeploymentWithRetry fetches a fresh copy of the deployment and applies the update function,
185185
// retrying on conflict errors (when resource version has changed).
@@ -191,7 +191,10 @@ func updateDeploymentWithRetry(ctx context.Context, client kubernetes.Interface,
191191
return err
192192
}
193193
// Apply changes
194-
updateFn(dep)
194+
err = updateFn(dep)
195+
if err != nil {
196+
return err
197+
}
195198
// Update
196199
_, err = client.AppsV1().Deployments(namespace).Update(ctx, dep, metav1.UpdateOptions{})
197200
return err
@@ -208,7 +211,10 @@ func updateStatefulSetWithRetry(ctx context.Context, client kubernetes.Interface
208211
return err
209212
}
210213
// Apply changes
211-
updateFn(sts)
214+
err = updateFn(sts)
215+
if err != nil {
216+
return err
217+
}
212218
// Update
213219
_, err = client.AppsV1().StatefulSets(namespace).Update(ctx, sts, metav1.UpdateOptions{})
214220
return err
@@ -222,28 +228,16 @@ func updateStatefulSetWithRetry(ctx context.Context, client kubernetes.Interface
222228
func scaleDownDeployment(ctx context.Context, client kubernetes.Interface, namespace, name string) (int32, error) {
223229
var originalReplicas int32
224230

225-
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
226-
dep, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
227-
if err != nil {
228-
return err
229-
}
230-
231+
err := updateDeploymentWithRetry(ctx, client, namespace, name, func(dep *appsv1.Deployment) error {
231232
if dep.Spec.Replicas != nil {
232233
originalReplicas = *dep.Spec.Replicas
233234
}
234-
235-
// Only update if not already at 0
236-
if originalReplicas > 0 {
237-
if dep.Annotations == nil {
238-
dep.Annotations = make(map[string]string)
239-
}
240-
dep.Annotations[PreRestoreReplicasAnnotation] = fmt.Sprintf("%d", originalReplicas)
241-
zero := int32(0)
242-
dep.Spec.Replicas = &zero
243-
244-
_, err = client.AppsV1().Deployments(namespace).Update(ctx, dep, metav1.UpdateOptions{})
245-
return err
235+
if dep.Annotations == nil {
236+
dep.Annotations = make(map[string]string)
246237
}
238+
dep.Annotations[PreRestoreReplicasAnnotation] = fmt.Sprintf("%d", originalReplicas)
239+
zero := int32(0)
240+
dep.Spec.Replicas = &zero
247241
return nil
248242
})
249243

@@ -257,28 +251,16 @@ func scaleDownDeployment(ctx context.Context, client kubernetes.Interface, names
257251
func scaleDownStatefulSet(ctx context.Context, client kubernetes.Interface, namespace, name string) (int32, error) {
258252
var originalReplicas int32
259253

260-
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
261-
sts, err := client.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
262-
if err != nil {
263-
return err
264-
}
265-
254+
err := updateStatefulSetWithRetry(ctx, client, namespace, name, func(sts *appsv1.StatefulSet) error {
266255
if sts.Spec.Replicas != nil {
267256
originalReplicas = *sts.Spec.Replicas
268257
}
269-
270-
// Only update if not already at 0
271-
if originalReplicas > 0 {
272-
if sts.Annotations == nil {
273-
sts.Annotations = make(map[string]string)
274-
}
275-
sts.Annotations[PreRestoreReplicasAnnotation] = fmt.Sprintf("%d", originalReplicas)
276-
zero := int32(0)
277-
sts.Spec.Replicas = &zero
278-
279-
_, err = client.AppsV1().StatefulSets(namespace).Update(ctx, sts, metav1.UpdateOptions{})
280-
return err
258+
if sts.Annotations == nil {
259+
sts.Annotations = make(map[string]string)
281260
}
261+
sts.Annotations[PreRestoreReplicasAnnotation] = fmt.Sprintf("%d", originalReplicas)
262+
zero := int32(0)
263+
sts.Spec.Replicas = &zero
282264
return nil
283265
})
284266

@@ -291,14 +273,9 @@ func scaleDownStatefulSet(ctx context.Context, client kubernetes.Interface, name
291273
//nolint:dupl // Deployment and StatefulSet are different K8s types requiring separate implementations
292274
func scaleUpDeploymentFromAnnotation(ctx context.Context, client kubernetes.Interface, namespace, name string) (int32, bool, error) {
293275
var scaledTo int32
294-
var found bool
295-
296-
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
297-
dep, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
298-
if err != nil {
299-
return err
300-
}
276+
found := true
301277

278+
err := updateDeploymentWithRetry(ctx, client, namespace, name, func(dep *appsv1.Deployment) error {
302279
if dep.Annotations == nil {
303280
found = false
304281
return nil
@@ -315,15 +292,11 @@ func scaleUpDeploymentFromAnnotation(ctx context.Context, client kubernetes.Inte
315292
return fmt.Errorf("failed to parse replicas annotation: %w", err)
316293
}
317294

318-
dep.Spec.Replicas = &originalReplicas
319295
delete(dep.Annotations, PreRestoreReplicasAnnotation)
296+
dep.Spec.Replicas = &originalReplicas
297+
scaledTo = originalReplicas
320298

321-
_, err = client.AppsV1().Deployments(namespace).Update(ctx, dep, metav1.UpdateOptions{})
322-
if err == nil {
323-
scaledTo = originalReplicas
324-
found = true
325-
}
326-
return err
299+
return nil
327300
})
328301

329302
return scaledTo, found, err
@@ -335,14 +308,9 @@ func scaleUpDeploymentFromAnnotation(ctx context.Context, client kubernetes.Inte
335308
//nolint:dupl // Deployment and StatefulSet are different K8s types requiring separate implementations
336309
func scaleUpStatefulSetFromAnnotation(ctx context.Context, client kubernetes.Interface, namespace, name string) (int32, bool, error) {
337310
var scaledTo int32
338-
var found bool
339-
340-
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
341-
sts, err := client.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
342-
if err != nil {
343-
return err
344-
}
311+
found := true
345312

313+
err := updateStatefulSetWithRetry(ctx, client, namespace, name, func(sts *appsv1.StatefulSet) error {
346314
if sts.Annotations == nil {
347315
found = false
348316
return nil
@@ -359,15 +327,11 @@ func scaleUpStatefulSetFromAnnotation(ctx context.Context, client kubernetes.Int
359327
return fmt.Errorf("failed to parse replicas annotation: %w", err)
360328
}
361329

362-
sts.Spec.Replicas = &originalReplicas
363330
delete(sts.Annotations, PreRestoreReplicasAnnotation)
331+
sts.Spec.Replicas = &originalReplicas
332+
scaledTo = originalReplicas
364333

365-
_, err = client.AppsV1().StatefulSets(namespace).Update(ctx, sts, metav1.UpdateOptions{})
366-
if err == nil {
367-
scaledTo = originalReplicas
368-
found = true
369-
}
370-
return err
334+
return nil
371335
})
372336

373337
return scaledTo, found, err
@@ -570,12 +534,13 @@ func (c *Client) SetRestoreLock(namespace, labelSelector, datastore, startedAt s
570534
}
571535

572536
for _, dep := range deployments.Items {
573-
err := updateDeploymentWithRetry(ctx, c.clientset, namespace, dep.Name, func(d *appsv1.Deployment) {
537+
err := updateDeploymentWithRetry(ctx, c.clientset, namespace, dep.Name, func(d *appsv1.Deployment) error {
574538
if d.Annotations == nil {
575539
d.Annotations = make(map[string]string)
576540
}
577541
d.Annotations[RestoreInProgressAnnotation] = datastore
578542
d.Annotations[RestoreStartedAtAnnotation] = startedAt
543+
return nil
579544
})
580545
if err != nil {
581546
return fmt.Errorf("failed to set restore lock on deployment %s: %w", dep.Name, err)
@@ -591,12 +556,13 @@ func (c *Client) SetRestoreLock(namespace, labelSelector, datastore, startedAt s
591556
}
592557

593558
for _, sts := range statefulSets.Items {
594-
err := updateStatefulSetWithRetry(ctx, c.clientset, namespace, sts.Name, func(s *appsv1.StatefulSet) {
559+
err := updateStatefulSetWithRetry(ctx, c.clientset, namespace, sts.Name, func(s *appsv1.StatefulSet) error {
595560
if s.Annotations == nil {
596561
s.Annotations = make(map[string]string)
597562
}
598563
s.Annotations[RestoreInProgressAnnotation] = datastore
599564
s.Annotations[RestoreStartedAtAnnotation] = startedAt
565+
return nil
600566
})
601567
if err != nil {
602568
return fmt.Errorf("failed to set restore lock on statefulset %s: %w", sts.Name, err)
@@ -640,10 +606,11 @@ func (c *Client) ClearRestoreLock(namespace, labelSelector string) error {
640606
continue
641607
}
642608

643-
err := updateDeploymentWithRetry(ctx, c.clientset, namespace, dep.Name, func(d *appsv1.Deployment) {
609+
err := updateDeploymentWithRetry(ctx, c.clientset, namespace, dep.Name, func(d *appsv1.Deployment) error {
644610
if d.Annotations != nil {
645611
removeRestoreLockAnnotations(d.Annotations)
646612
}
613+
return nil
647614
})
648615
if err != nil {
649616
return fmt.Errorf("failed to clear restore lock on deployment %s: %w", dep.Name, err)
@@ -663,10 +630,11 @@ func (c *Client) ClearRestoreLock(namespace, labelSelector string) error {
663630
continue
664631
}
665632

666-
err := updateStatefulSetWithRetry(ctx, c.clientset, namespace, sts.Name, func(s *appsv1.StatefulSet) {
633+
err := updateStatefulSetWithRetry(ctx, c.clientset, namespace, sts.Name, func(s *appsv1.StatefulSet) error {
667634
if s.Annotations != nil {
668635
removeRestoreLockAnnotations(s.Annotations)
669636
}
637+
return nil
670638
})
671639
if err != nil {
672640
return fmt.Errorf("failed to clear restore lock on statefulset %s: %w", sts.Name, err)

0 commit comments

Comments
 (0)