Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions helm/temporal-worker-controller/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ rules:
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- temporal.io
resources:
- temporalconnections/finalizers
verbs:
- update
- apiGroups:
- temporal.io
resources:
Expand Down
251 changes: 250 additions & 1 deletion internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
"fmt"
"time"

"github.com/go-logr/logr"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
"go.temporal.io/api/serviceerror"
sdkclient "go.temporal.io/sdk/client"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -25,6 +28,7 @@
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -38,6 +42,12 @@
// TODO(jlegrone): add this everywhere
deployOwnerKey = ".metadata.controller"
buildIDLabel = "temporal.io/build-id"

// finalizerName is the finalizer added to TemporalWorkerDeployment and TemporalConnection
// resources to prevent deletion before cleanup actions are taken. On TWD resources, it
// ensures Temporal server-side versioning data is cleaned up. On TemporalConnection
// resources, it prevents deletion while any TWD still references the connection.
finalizerName = "temporal.io/delete-protection"
)

// getAPIKeySecretName extracts the secret name from a SecretKeySelector
Expand Down Expand Up @@ -95,7 +105,8 @@
//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/finalizers,verbs=update
//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch
//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch;update;patch

Check failure on line 108 in internal/controller/worker_controller.go

View workflow job for this annotation

GitHub Actions / golangci

comment-spacings: no space between comment delimiter and comment text (revive)
//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections/finalizers,verbs=update

Check failure on line 109 in internal/controller/worker_controller.go

View workflow job for this annotation

GitHub Actions / golangci

comment-spacings: no space between comment delimiter and comment text (revive)
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update
Expand Down Expand Up @@ -126,6 +137,41 @@
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// Handle deletion: clean up Temporal server-side versioning data before allowing
// the CRD to be deleted. Without this, stale build ID routing persists in Temporal
// and prevents unversioned workers from picking up tasks on the same task queue.
if !workerDeploy.DeletionTimestamp.IsZero() {
if controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) {
l.Info("TemporalWorkerDeployment is being deleted, running cleanup")
if err := r.handleDeletion(ctx, l, &workerDeploy); err != nil {
l.Error(err, "failed to clean up Temporal server-side deployment data")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

// Remove our finalizer from the TemporalConnection if no other TWDs reference it.
if err := r.removeConnectionFinalizerIfUnused(ctx, l, &workerDeploy); err != nil {
l.Error(err, "failed to remove finalizer from TemporalConnection")
return ctrl.Result{}, err
}

// Cleanup succeeded, remove the finalizer so K8s can delete the resource
controllerutil.RemoveFinalizer(&workerDeploy, finalizerName)
if err := r.Update(ctx, &workerDeploy); err != nil {
return ctrl.Result{}, err
}
l.Info("Temporal server-side cleanup complete, finalizer removed")
}
return ctrl.Result{}, nil
}

// Ensure finalizer is present on non-deleted resources
if !controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) {
controllerutil.AddFinalizer(&workerDeploy, finalizerName)
if err := r.Update(ctx, &workerDeploy); err != nil {
return ctrl.Result{}, err
}
}

// TODO(jlegrone): Set defaults via webhook rather than manually
if err := workerDeploy.Default(ctx, &workerDeploy); err != nil {
l.Error(err, "TemporalWorkerDeployment defaulter failed")
Expand Down Expand Up @@ -157,6 +203,13 @@
return ctrl.Result{}, err
}

// Ensure our finalizer is on the TemporalConnection so it cannot be deleted
// while this TWD still references it. This guarantees the connection is available
// during TWD deletion cleanup.
if err := r.ensureConnectionFinalizer(ctx, l, &temporalConnection); err != nil {
return ctrl.Result{}, err
}

// Get the Auth Mode and Secret Name
authMode, secretName, err := resolveAuthSecretName(&temporalConnection)
if err != nil {
Expand Down Expand Up @@ -296,6 +349,134 @@
}, nil
}

// handleDeletion cleans up Temporal server-side deployment versioning data when
// a TemporalWorkerDeployment CRD is deleted. This prevents stale build ID routing
// from blocking unversioned workers on the same task queue.
//
// The cleanup sequence:
// 1. Set the current version to "unversioned" (empty BuildID) so new tasks route to unversioned workers
// 2. Delete all non-current/non-ramping versions (drained/inactive ones)
// 3. The deployment itself will be garbage collected by Temporal once all versions are removed
func (r *TemporalWorkerDeploymentReconciler) handleDeletion(
ctx context.Context,
l logr.Logger,
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
) error {
// Resolve Temporal connection.
// The TemporalConnection is guaranteed to exist because we hold a finalizer on it
// that prevents deletion while any TWD references it.
var temporalConnection temporaliov1alpha1.TemporalConnection
if err := r.Get(ctx, types.NamespacedName{
Name: workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name,
Namespace: workerDeploy.Namespace,
}, &temporalConnection); err != nil {
return fmt.Errorf("unable to fetch TemporalConnection: %w", err)
}

authMode, secretName, err := resolveAuthSecretName(&temporalConnection)
if err != nil {
return fmt.Errorf("unable to resolve auth secret name: %w", err)
}

temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{
HostPort: temporalConnection.Spec.HostPort,
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
SecretName: secretName,
AuthMode: authMode,
})
if !ok {
clientOpts, key, clientAuth, err := r.TemporalClientPool.ParseClientSecret(ctx, secretName, authMode, clientpool.NewClientOptions{
K8sNamespace: workerDeploy.Namespace,
TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
Spec: temporalConnection.Spec,
Identity: getControllerIdentity(),
})
if err != nil {
return fmt.Errorf("unable to parse Temporal auth secret: %w", err)
}
c, err := r.TemporalClientPool.DialAndUpsertClient(*clientOpts, *key, *clientAuth)
if err != nil {
return fmt.Errorf("unable to create TemporalClient: %w", err)
}
temporalClient = c
}

workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(workerDeploymentName)

// Describe the deployment to get current state
resp, err := deploymentHandler.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{})
if err != nil {
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) {
l.Info("Worker Deployment not found on Temporal server, nothing to clean up")
return nil
}
return fmt.Errorf("unable to describe worker deployment: %w", err)
}

routingConfig := resp.Info.RoutingConfig

// Step 1: Set current version to unversioned (empty BuildID) so tasks route to unversioned workers.
// This is the critical step that unblocks task dispatch.
if routingConfig.CurrentVersion != nil {
l.Info("Setting current version to unversioned", "previousBuildID", routingConfig.CurrentVersion.BuildID)
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
BuildID: "", // empty = unversioned
ConflictToken: resp.ConflictToken,
Identity: getControllerIdentity(),
IgnoreMissingTaskQueues: true,
}); err != nil {
return fmt.Errorf("unable to set current version to unversioned: %w", err)
}
l.Info("Successfully set current version to unversioned")
} else {
l.Info("No current version set, skipping unversioned redirect")
}

// Step 2: If there's a ramping version, clear it.
if routingConfig.RampingVersion != nil {
l.Info("Clearing ramping version", "buildID", routingConfig.RampingVersion.BuildID)
if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{
BuildID: "",
Percentage: 0,
Identity: getControllerIdentity(),
}); err != nil {
l.Info("Failed to clear ramping version (may have been cleared by SetCurrentVersion)", "error", err)
}
} else {
l.Info("No ramping version set, skipping clear ramping version")
}
Comment thread
anujagrawal380 marked this conversation as resolved.

// Step 3: Delete versions that are eligible. Versions that are still draining
// are force-deleted with SkipDrainage since the TWD is being removed entirely.
for _, version := range resp.Info.VersionSummaries {
buildID := version.Version.BuildID
l.Info("Deleting worker deployment version", "buildID", buildID)
if _, err := deploymentHandler.DeleteVersion(ctx, sdkclient.WorkerDeploymentDeleteVersionOptions{
BuildID: buildID,
SkipDrainage: true,
Identity: getControllerIdentity(),
}); err != nil {
// Log but don't fail -- the version may still have pollers or be current.
// Temporal will garbage collect it eventually.
l.Info("Could not delete version (will be garbage collected)", "buildID", buildID, "error", err)
}
}

// Step 4: Attempt to delete the deployment itself. This only succeeds if all versions are gone.
l.Info("Attempting to delete worker deployment from Temporal server", "name", workerDeploymentName)
if _, err := temporalClient.WorkerDeploymentClient().Delete(ctx, sdkclient.WorkerDeploymentDeleteOptions{
Name: workerDeploymentName,
Identity: getControllerIdentity(),
}); err != nil {
// Non-fatal: deployment will be garbage collected once all versions drain.
l.Info("Could not delete worker deployment (will be garbage collected)", "name", workerDeploymentName, "error", err)
}

return nil
}

// setCondition sets a condition on the TemporalWorkerDeployment status.
func (r *TemporalWorkerDeploymentReconciler) setCondition(
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
Expand Down Expand Up @@ -326,6 +507,74 @@
_ = r.Status().Update(ctx, workerDeploy)
}

// ensureConnectionFinalizer adds our finalizer to the TemporalConnection so it
// cannot be deleted while this TWD still needs it for cleanup.
func (r *TemporalWorkerDeploymentReconciler) ensureConnectionFinalizer(
ctx context.Context,
l logr.Logger,
tc *temporaliov1alpha1.TemporalConnection,
) error {
if !controllerutil.ContainsFinalizer(tc, finalizerName) {
l.Info("Adding finalizer to TemporalConnection", "connection", tc.Name)
controllerutil.AddFinalizer(tc, finalizerName)
if err := r.Update(ctx, tc); err != nil {
return fmt.Errorf("unable to add finalizer to TemporalConnection %q: %w", tc.Name, err)
}
}
return nil
}

// removeConnectionFinalizerIfUnused removes our finalizer from the TemporalConnection
// if no other TWDs (besides the one being deleted) still reference it.
func (r *TemporalWorkerDeploymentReconciler) removeConnectionFinalizerIfUnused(
ctx context.Context,
l logr.Logger,
deletingTWD *temporaliov1alpha1.TemporalWorkerDeployment,
) error {
connectionName := deletingTWD.Spec.WorkerOptions.TemporalConnectionRef.Name

// List all TWDs in the same namespace
var twds temporaliov1alpha1.TemporalWorkerDeploymentList
if err := r.List(ctx, &twds, client.InNamespace(deletingTWD.Namespace)); err != nil {
return fmt.Errorf("unable to list TWDs: %w", err)
}

// Check if any other TWD (not the one being deleted) references this connection
for i := range twds.Items {
twd := &twds.Items[i]
if twd.Name == deletingTWD.Name {
continue
}
if twd.Spec.WorkerOptions.TemporalConnectionRef.Name == connectionName {
l.Info("TemporalConnection still referenced by another TWD, keeping finalizer",
"connection", connectionName, "referencedBy", twd.Name)
return nil
}
}

// No other TWDs reference this connection, remove the finalizer
var tc temporaliov1alpha1.TemporalConnection
if err := r.Get(ctx, types.NamespacedName{
Name: connectionName,
Namespace: deletingTWD.Namespace,
}, &tc); err != nil {
if apierrors.IsNotFound(err) {
return nil // already gone
}
return fmt.Errorf("unable to fetch TemporalConnection %q: %w", connectionName, err)
}

if controllerutil.ContainsFinalizer(&tc, finalizerName) {
l.Info("Removing finalizer from TemporalConnection", "connection", connectionName)
controllerutil.RemoveFinalizer(&tc, finalizerName)
if err := r.Update(ctx, &tc); err != nil {
return fmt.Errorf("unable to remove finalizer from TemporalConnection %q: %w", connectionName, err)
}
}

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string {
Expand Down
Loading
Loading