Skip to content

Commit f42e9aa

Browse files
maksymlumenclaude
andcommitted
fix: wait for leadership status to be known before starting reconciliation
Replicas now wait until they're certain they won't become leader (20s timeout) before starting the reconciliation loop. This prevents the race condition where a pod would start reconciling with IsLeader=false and then become leader. - Leader: starts reconciliation immediately after OnStartedLeading - Replica: starts reconciliation after 20s timeout confirms non-leader status 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 4c28379 commit f42e9aa

2 files changed

Lines changed: 44 additions & 23 deletions

File tree

cli/run.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,16 @@ var runCmd = &cobra.Command{
205205
}
206206

207207
leaderElected := make(chan struct{})
208+
leadershipKnown := make(chan struct{})
208209

209210
leaderCallbacks := leaderelection.LeaderCallbacks{
210211
OnStartedLeading: func(ctx context.Context) {
211212
l.Info("became leader for status updates")
212213
isLeader.Store(true)
214+
// Leadership status is now known - we are the leader
215+
if !isClosed(leadershipKnown) {
216+
close(leadershipKnown)
217+
}
213218
},
214219
OnStoppedLeading: func() {
215220
l.Info("stopped leading for status updates")
@@ -221,6 +226,20 @@ var runCmd = &cobra.Command{
221226
return
222227
}
223228
close(leaderElected)
229+
// Start a goroutine to close leadershipKnown after timeout if we don't become leader
230+
go func() {
231+
select {
232+
case <-leadershipKnown:
233+
// Already closed by OnStartedLeading
234+
return
235+
case <-time.After(20 * time.Second):
236+
// We're not going to become leader - we're a replica
237+
l.Info("leadership election timeout, proceeding as replica")
238+
if !isClosed(leadershipKnown) {
239+
close(leadershipKnown)
240+
}
241+
}
242+
}()
224243
},
225244
}
226245

@@ -498,10 +517,10 @@ var runCmd = &cobra.Command{
498517
})
499518

500519
wg.Go(func() error {
501-
<-leaderElected
520+
<-leadershipKnown
502521

503-
// start manager
504-
l.Info("starting kubebuilder operator")
522+
// start manager (leadership status is now known - either leader or replica)
523+
l.Info("starting kubebuilder operator", "isLeader", isLeader.Load())
505524
defer func() {
506525
l.Info("kubebuilder operator stopped")
507526
}()

internal/controller/tinysignal_controller.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,26 @@ limitations under the License.
1717
package controller
1818

1919
import (
20-
"context"
21-
"github.com/tiny-systems/module/internal/scheduler"
22-
"github.com/tiny-systems/module/internal/scheduler/runner"
23-
"github.com/tiny-systems/module/module"
24-
"github.com/tiny-systems/module/pkg/utils"
25-
"k8s.io/apimachinery/pkg/api/errors"
26-
"k8s.io/apimachinery/pkg/types"
27-
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
28-
"sigs.k8s.io/controller-runtime/pkg/reconcile"
29-
"sync"
30-
"sync/atomic"
31-
"time"
32-
33-
"k8s.io/apimachinery/pkg/runtime"
34-
ctrl "sigs.k8s.io/controller-runtime"
35-
"sigs.k8s.io/controller-runtime/pkg/client"
36-
"sigs.k8s.io/controller-runtime/pkg/log"
37-
38-
operatorv1alpha1 "github.com/tiny-systems/module/api/v1alpha1"
20+
"context"
21+
"sync"
22+
"sync/atomic"
23+
"time"
24+
25+
"github.com/tiny-systems/module/internal/scheduler"
26+
"github.com/tiny-systems/module/internal/scheduler/runner"
27+
"github.com/tiny-systems/module/module"
28+
"github.com/tiny-systems/module/pkg/utils"
29+
"k8s.io/apimachinery/pkg/api/errors"
30+
"k8s.io/apimachinery/pkg/types"
31+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
32+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
33+
34+
"k8s.io/apimachinery/pkg/runtime"
35+
ctrl "sigs.k8s.io/controller-runtime"
36+
"sigs.k8s.io/controller-runtime/pkg/client"
37+
"sigs.k8s.io/controller-runtime/pkg/log"
38+
39+
operatorv1alpha1 "github.com/tiny-systems/module/api/v1alpha1"
3940
)
4041

4142
// TinySignalReconciler reconciles a TinySignal object
@@ -139,6 +140,7 @@ func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request)
139140

140141
_, isRunning := r.runningProcesses[req.NamespacedName]
141142
lastProcessedNonce := signal.Status.ProcessedNonce
143+
142144
currentNonce := signal.Spec.Nonce
143145

144146
specHasChanged := currentNonce != lastProcessedNonce
@@ -328,7 +330,7 @@ func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request)
328330

329331
return
330332
}
331-
}(processCtx, req.NamespacedName, signal.Spec, signal.Spec.Nonce)
333+
}(processCtx, req.NamespacedName, signal.Spec, currentNonce)
332334
}
333335

334336
return ctrl.Result{}, nil

0 commit comments

Comments
 (0)