Skip to content

Commit 4e13b89

Browse files
jlegroneclaude
andcommitted
Fix race conditions and improve finalizer reliability
- Replace stale list iteration with proper polling using fresh queries - Add timeout and context cancellation handling with configurable constants - Implement field selector optimization with backward compatibility fallback - Replace brittle time.Sleep with condition-based polling in integration tests - Add comprehensive edge case tests for context cancellation and partial cleanup failures 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 7a7754d commit 4e13b89

3 files changed

Lines changed: 276 additions & 48 deletions

File tree

internal/controller/finalizer_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package controller
66

77
import (
88
"context"
9+
"strings"
910
"testing"
1011

1112
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
@@ -239,3 +240,163 @@ func TestHandleDeletion(t *testing.T) {
239240
// by checking if the finalizer was removed (which we can't easily do since the resource is deleted)
240241
// Instead, we'll verify that the deletion handling completed without error, which means cleanup was successful
241242
}
243+
244+
func TestCleanupWithContextCancellation(t *testing.T) {
245+
// Create a context that will be cancelled during cleanup
246+
ctx, cancel := context.WithCancel(context.Background())
247+
248+
// Create a TemporalWorkerDeployment using test helpers
249+
workerDeploy := testhelpers.ModifyObj(testhelpers.MakeTWDWithName("test-worker", "default"), func(twd *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment {
250+
twd.UID = "worker-uid-123"
251+
return twd
252+
})
253+
254+
// Create fake client using test helpers
255+
client := testhelpers.SetupFakeClient()
256+
257+
reconciler := &TemporalWorkerDeploymentReconciler{
258+
Client: client,
259+
Scheme: testhelpers.SetupTestScheme(),
260+
}
261+
262+
// Create a test logger using testlogr
263+
logger := testlogr.New(t)
264+
265+
// Cancel the context immediately to simulate cancellation during cleanup
266+
cancel()
267+
268+
// Test cleanup with cancelled context - should handle gracefully
269+
err := reconciler.cleanupManagedResources(ctx, logger, workerDeploy)
270+
if err == nil {
271+
t.Error("Expected error when context is cancelled during cleanup")
272+
}
273+
274+
// Error should indicate context cancellation
275+
if ctx.Err() != context.Canceled {
276+
t.Error("Context should be cancelled")
277+
}
278+
}
279+
280+
func TestWaitForOwnedDeploymentsTimeout(t *testing.T) {
281+
ctx := context.Background()
282+
283+
// Create a TemporalWorkerDeployment using test helpers
284+
workerDeploy := testhelpers.ModifyObj(testhelpers.MakeTWDWithName("test-worker", "default"), func(twd *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment {
285+
twd.UID = "worker-uid-123"
286+
return twd
287+
})
288+
289+
// Create a deployment that won't be deleted (simulate stuck deletion)
290+
persistentDeployment := &appsv1.Deployment{
291+
ObjectMeta: metav1.ObjectMeta{
292+
Name: "persistent-deployment",
293+
Namespace: "default",
294+
OwnerReferences: []metav1.OwnerReference{
295+
{
296+
APIVersion: apiGVStr,
297+
Kind: "TemporalWorkerDeployment",
298+
Name: "test-worker",
299+
UID: "worker-uid-123",
300+
},
301+
},
302+
},
303+
}
304+
305+
// Create fake client with the deployment that won't be deleted
306+
client := testhelpers.SetupFakeClient(persistentDeployment)
307+
308+
reconciler := &TemporalWorkerDeploymentReconciler{
309+
Client: client,
310+
Scheme: testhelpers.SetupTestScheme(),
311+
}
312+
313+
// Create a test logger using testlogr
314+
logger := testlogr.New(t)
315+
316+
// Test with a very short timeout to simulate timeout condition
317+
// This will use the actual waitForOwnedDeploymentsToBeDeleted method which has built-in timeout
318+
err := reconciler.waitForOwnedDeploymentsToBeDeleted(ctx, logger, workerDeploy)
319+
320+
// Should timeout waiting for deployments to be deleted
321+
if err == nil {
322+
t.Error("Expected timeout error when deployments don't get deleted")
323+
}
324+
325+
// Error message should indicate timeout
326+
if err != nil && !contains(err.Error(), "timeout") {
327+
t.Errorf("Expected timeout error, got: %v", err)
328+
}
329+
}
330+
331+
func TestPartialCleanupFailure(t *testing.T) {
332+
ctx := context.Background()
333+
334+
// Create a TemporalWorkerDeployment using test helpers
335+
workerDeploy := testhelpers.ModifyObj(testhelpers.MakeTWDWithName("test-worker", "default"), func(twd *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment {
336+
twd.UID = "worker-uid-123"
337+
return twd
338+
})
339+
340+
// Create multiple deployments owned by the worker deployment
341+
deployment1 := &appsv1.Deployment{
342+
ObjectMeta: metav1.ObjectMeta{
343+
Name: "deployment-1",
344+
Namespace: "default",
345+
OwnerReferences: []metav1.OwnerReference{
346+
{
347+
APIVersion: apiGVStr,
348+
Kind: "TemporalWorkerDeployment",
349+
Name: "test-worker",
350+
UID: "worker-uid-123",
351+
},
352+
},
353+
},
354+
}
355+
356+
deployment2 := &appsv1.Deployment{
357+
ObjectMeta: metav1.ObjectMeta{
358+
Name: "deployment-2",
359+
Namespace: "default",
360+
OwnerReferences: []metav1.OwnerReference{
361+
{
362+
APIVersion: apiGVStr,
363+
Kind: "TemporalWorkerDeployment",
364+
Name: "test-worker",
365+
UID: "worker-uid-123",
366+
},
367+
},
368+
},
369+
}
370+
371+
// Create fake client with multiple deployments
372+
client := testhelpers.SetupFakeClient(deployment1, deployment2)
373+
374+
reconciler := &TemporalWorkerDeploymentReconciler{
375+
Client: client,
376+
Scheme: testhelpers.SetupTestScheme(),
377+
}
378+
379+
// Create a test logger using testlogr
380+
logger := testlogr.New(t)
381+
382+
// Delete one deployment manually to simulate partial cleanup
383+
err := client.Delete(ctx, deployment1)
384+
if err != nil {
385+
t.Fatalf("Failed to delete deployment1: %v", err)
386+
}
387+
388+
// Now test cleanup - it should handle the mixed state gracefully
389+
// (one deployment already deleted, one still exists)
390+
err = reconciler.cleanupManagedResources(ctx, logger, workerDeploy)
391+
392+
// This should eventually succeed as the cleanup logic should handle
393+
// deployments that are already deleted gracefully
394+
if err != nil && !contains(err.Error(), "timeout") {
395+
t.Errorf("Cleanup should handle partial cleanup gracefully, got error: %v", err)
396+
}
397+
}
398+
399+
// Helper function to check if a string contains a substring
400+
func contains(s, substr string) bool {
401+
return strings.Contains(s, substr)
402+
}

internal/controller/worker_controller.go

Lines changed: 77 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
appsv1 "k8s.io/api/apps/v1"
1818
apierrors "k8s.io/apimachinery/pkg/api/errors"
1919
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/fields"
2021
"k8s.io/apimachinery/pkg/runtime"
2122
"k8s.io/apimachinery/pkg/types"
2223
ctrl "sigs.k8s.io/controller-runtime"
@@ -36,6 +37,10 @@ const (
3637
buildIDLabel = "temporal.io/build-id"
3738
// TemporalWorkerDeploymentFinalizer is the finalizer used to ensure proper cleanup of resources
3839
TemporalWorkerDeploymentFinalizer = "temporal.io/temporal-worker-deployment-finalizer"
40+
41+
// Cleanup timeout and polling constants
42+
cleanupTimeout = 2 * time.Minute
43+
cleanupPollInterval = 5 * time.Second
3944
)
4045

4146
// TemporalWorkerDeploymentReconciler reconciles a TemporalWorkerDeployment object
@@ -236,18 +241,29 @@ func (r *TemporalWorkerDeploymentReconciler) handleDeletion(ctx context.Context,
236241
func (r *TemporalWorkerDeploymentReconciler) cleanupManagedResources(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment) error {
237242
l.Info("Cleaning up managed resources")
238243

239-
// List all deployments owned by this TemporalWorkerDeployment
240-
deploymentList := &appsv1.DeploymentList{}
244+
// Try to use field selector for efficient querying of owned deployments
245+
// Fall back to listing all deployments if field selector is not available (e.g., in tests)
241246
listOpts := &client.ListOptions{
242-
Namespace: workerDeploy.Namespace,
247+
Namespace: workerDeploy.Namespace,
248+
FieldSelector: fields.OneTermEqualSelector(deployOwnerKey, workerDeploy.Name),
243249
}
244250

245-
if err := r.List(ctx, deploymentList, listOpts); err != nil {
246-
return fmt.Errorf("failed to list deployments: %w", err)
251+
deploymentList := &appsv1.DeploymentList{}
252+
err := r.List(ctx, deploymentList, listOpts)
253+
if err != nil {
254+
// If field selector fails (common in tests), fall back to listing all deployments
255+
l.Info("Field selector not available, falling back to listing all deployments", "error", err.Error())
256+
listOpts = &client.ListOptions{
257+
Namespace: workerDeploy.Namespace,
258+
}
259+
if err := r.List(ctx, deploymentList, listOpts); err != nil {
260+
return fmt.Errorf("failed to list deployments: %w", err)
261+
}
247262
}
248263

249-
// Filter deployments owned by this TemporalWorkerDeployment and delete them
264+
// Delete all owned deployments
250265
for _, deployment := range deploymentList.Items {
266+
// Check ownership for all deployments when not using field selector
251267
if r.isOwnedByWorkerDeployment(&deployment, workerDeploy) {
252268
l.Info("Deleting managed deployment", "deployment", deployment.Name)
253269
if err := r.Delete(ctx, &deployment); err != nil && !apierrors.IsNotFound(err) {
@@ -256,29 +272,64 @@ func (r *TemporalWorkerDeploymentReconciler) cleanupManagedResources(ctx context
256272
}
257273
}
258274

259-
// Wait for all owned deployments to be deleted
260-
for _, deployment := range deploymentList.Items {
261-
if r.isOwnedByWorkerDeployment(&deployment, workerDeploy) {
262-
// Check if deployment still exists
263-
currentDeployment := &appsv1.Deployment{}
264-
err := r.Get(ctx, types.NamespacedName{
265-
Namespace: deployment.Namespace,
266-
Name: deployment.Name,
267-
}, currentDeployment)
268-
269-
if err == nil {
270-
// Deployment still exists, requeue to wait for deletion
271-
l.Info("Waiting for deployment to be deleted", "deployment", deployment.Name)
272-
return fmt.Errorf("still waiting for deployment %s to be deleted", deployment.Name)
273-
} else if !apierrors.IsNotFound(err) {
274-
return fmt.Errorf("failed to check deployment status %s: %w", deployment.Name, err)
275+
// Wait for all owned deployments to be deleted with proper polling
276+
return r.waitForOwnedDeploymentsToBeDeleted(ctx, l, workerDeploy)
277+
}
278+
279+
// waitForOwnedDeploymentsToBeDeleted waits for all owned deployments to be deleted with proper polling and timeout
280+
func (r *TemporalWorkerDeploymentReconciler) waitForOwnedDeploymentsToBeDeleted(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment) error {
281+
// Create a timeout context for cleanup operations
282+
cleanupCtx, cancel := context.WithTimeout(ctx, cleanupTimeout)
283+
defer cancel()
284+
285+
ticker := time.NewTicker(cleanupPollInterval)
286+
defer ticker.Stop()
287+
288+
l.Info("Waiting for owned deployments to be deleted", "timeout", cleanupTimeout)
289+
290+
for {
291+
select {
292+
case <-cleanupCtx.Done():
293+
if cleanupCtx.Err() == context.DeadlineExceeded {
294+
return fmt.Errorf("timeout waiting for deployments to be deleted after %v", cleanupTimeout)
295+
}
296+
return fmt.Errorf("context cancelled while waiting for deployments to be deleted: %w", cleanupCtx.Err())
297+
298+
case <-ticker.C:
299+
// Try to use field selector for efficient querying, with fallback
300+
listOpts := &client.ListOptions{
301+
Namespace: workerDeploy.Namespace,
302+
FieldSelector: fields.OneTermEqualSelector(deployOwnerKey, workerDeploy.Name),
303+
}
304+
305+
deploymentList := &appsv1.DeploymentList{}
306+
err := r.List(cleanupCtx, deploymentList, listOpts)
307+
if err != nil {
308+
// If field selector fails (common in tests), fall back to listing all deployments
309+
listOpts = &client.ListOptions{
310+
Namespace: workerDeploy.Namespace,
311+
}
312+
if err := r.List(cleanupCtx, deploymentList, listOpts); err != nil {
313+
return fmt.Errorf("failed to list deployments during cleanup: %w", err)
314+
}
315+
}
316+
317+
// Check if any owned deployments still exist
318+
hasOwnedDeployments := false
319+
for _, deployment := range deploymentList.Items {
320+
if r.isOwnedByWorkerDeployment(&deployment, workerDeploy) {
321+
hasOwnedDeployments = true
322+
l.Info("Still waiting for deployment to be deleted", "deployment", deployment.Name)
323+
break
324+
}
325+
}
326+
327+
if !hasOwnedDeployments {
328+
l.Info("All owned deployments have been deleted")
329+
return nil
275330
}
276-
// IsNotFound error means deployment was successfully deleted
277331
}
278332
}
279-
280-
l.Info("All managed resources have been cleaned up")
281-
return nil
282333
}
283334

284335
// isOwnedByWorkerDeployment checks if a deployment is owned by the given TemporalWorkerDeployment

internal/tests/internal/integration_test.go

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,24 @@ const (
2626
testDrainageRefreshInterval = time.Second
2727
)
2828

29+
// waitForCondition polls a condition function until it returns true or timeout is reached
30+
func waitForCondition(condition func() bool, timeout, interval time.Duration) bool {
31+
deadline := time.After(timeout)
32+
ticker := time.NewTicker(interval)
33+
defer ticker.Stop()
34+
35+
for {
36+
select {
37+
case <-deadline:
38+
return false
39+
case <-ticker.C:
40+
if condition() {
41+
return true
42+
}
43+
}
44+
}
45+
}
46+
2947
type testEnv struct {
3048
k8sClient client.Client
3149
ts *temporaltest.TestServer
@@ -349,19 +367,24 @@ func testDeploymentDeletionProtection(t *testing.T, k8sClient client.Client, ts
349367
// Direct deletion failed as expected due to owner reference protection
350368
t.Logf("Direct deletion failed as expected: %v", err)
351369
} else {
352-
// If deletion succeeded, verify the controller recreates it
353-
time.Sleep(5 * time.Second) // Give controller time to recreate
370+
// If deletion succeeded, verify the controller recreates it with proper polling
371+
eventuallyRecreated := func() bool {
372+
var recreatedDeployment appsv1.Deployment
373+
err := k8sClient.Get(ctx, types.NamespacedName{
374+
Name: expectedDeploymentName,
375+
Namespace: twd.Namespace,
376+
}, &recreatedDeployment)
377+
378+
if err != nil {
379+
return false // Deployment not found yet
380+
}
354381

355-
var recreatedDeployment appsv1.Deployment
356-
err = k8sClient.Get(ctx, types.NamespacedName{
357-
Name: expectedDeploymentName,
358-
Namespace: twd.Namespace,
359-
}, &recreatedDeployment)
382+
// Check if it's a new deployment (different UID)
383+
return recreatedDeployment.UID != originalUID
384+
}
360385

361-
if err != nil {
362-
assert.Fail(t, "Controller should have recreated the deployment after direct deletion", "Error: %v", err)
363-
} else {
364-
assert.NotEqual(t, originalUID, recreatedDeployment.UID, "Deployment should have been recreated with new UID")
386+
if !waitForCondition(eventuallyRecreated, 30*time.Second, 1*time.Second) {
387+
assert.Fail(t, "Controller should have recreated the deployment after direct deletion within 30 seconds")
365388
}
366389
}
367390

@@ -371,22 +394,15 @@ func testDeploymentDeletionProtection(t *testing.T, k8sClient client.Client, ts
371394
require.NoError(t, err, "failed to delete TemporalWorkerDeployment")
372395

373396
// Wait for the deployment to be cleaned up
374-
deadline := time.Now().Add(30 * time.Second)
375-
deploymentDeleted := false
376-
for time.Now().Before(deadline) {
397+
eventuallyDeleted := func() bool {
377398
var checkDeployment appsv1.Deployment
378-
err = k8sClient.Get(ctx, types.NamespacedName{
399+
err := k8sClient.Get(ctx, types.NamespacedName{
379400
Name: expectedDeploymentName,
380401
Namespace: twd.Namespace,
381402
}, &checkDeployment)
382-
if err != nil {
383-
if client.IgnoreNotFound(err) == nil {
384-
deploymentDeleted = true
385-
break
386-
}
387-
}
388-
time.Sleep(1 * time.Second)
403+
return client.IgnoreNotFound(err) == nil // Returns true if deployment is not found (deleted)
389404
}
390405

406+
deploymentDeleted := waitForCondition(eventuallyDeleted, 30*time.Second, 1*time.Second)
391407
assert.True(t, deploymentDeleted, "Controller should have cleaned up the deployment when TWD was deleted")
392408
}

0 commit comments

Comments
 (0)