Skip to content

Commit e4470ca

Browse files
committed
adds leader election
adds response support
1 parent 345f394 commit e4470ca

19 files changed

Lines changed: 338 additions & 99 deletions

api/v1alpha1/tinynode_types.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@ const (
4242
ComponentPosYAnnotation = "tinysystems.io/component-pos-y"
4343
ComponentPosSpinAnnotation = "tinysystems.io/component-pos-spin"
4444

45-
SignalNonceAnnotation = "tinysystems.io/signal-nonce"
46-
NodeLabelAnnotation = "tinysystems.io/node-label"
47-
NodeCommentAnnotation = "tinysystems.io/node-comment"
48-
SuggestedHttpPortAnnotation = "tinysystems.io/suggested-http-port"
45+
SignalNonceAnnotation = "tinysystems.io/signal-nonce"
46+
NodeLabelAnnotation = "tinysystems.io/node-label"
47+
NodeCommentAnnotation = "tinysystems.io/node-comment"
4948

5049
IngressHostNameSuffixAnnotation = "tinysystems.io/ingress-hostname-suffix"
5150
)

cli/run.go

Lines changed: 164 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package cli
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"github.com/cenkalti/backoff/v4"
78
"github.com/go-logr/zerologr"
9+
"github.com/goccy/go-json"
810
"github.com/rs/zerolog/log"
911
"github.com/spf13/cobra"
1012
"github.com/tiny-systems/module/api/v1alpha1"
@@ -21,16 +23,22 @@ import (
2123
"go.opentelemetry.io/otel"
2224
"golang.org/x/sync/errgroup"
2325
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/client-go/kubernetes"
2427
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2528
"k8s.io/client-go/rest"
2629
"k8s.io/client-go/tools/clientcmd"
30+
"k8s.io/client-go/tools/leaderelection"
31+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2732
"net"
2833
"os"
34+
"reflect"
2935
ctrl "sigs.k8s.io/controller-runtime"
3036
"sigs.k8s.io/controller-runtime/pkg/cache"
3137
"sigs.k8s.io/controller-runtime/pkg/healthz"
3238
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
3339
"strings"
40+
"sync/atomic"
41+
"time"
3442
)
3543

3644
// override by ldflags
@@ -63,6 +71,7 @@ var runCmd = &cobra.Command{
6371
)
6472
if err != nil {
6573
l.Error(err, "configure opentelemetry error")
74+
os.Exit(1)
6675
}
6776

6877
// Send buffered spans and free resources.
@@ -86,15 +95,18 @@ var runCmd = &cobra.Command{
8695

8796
if name == "" {
8897
l.Error(ErrInvalidModuleName, "module name is empty")
98+
os.Exit(1)
8999
return
90100
}
91101
if version == "" {
92102
l.Error(ErrInvalidModuleVersion, "module name is empty")
103+
os.Exit(1)
93104
return
94105
}
95106

96107
if strings.HasPrefix(version, "v") {
97108
l.Error(ErrInvalidModuleVersion, "version should not start with v prefix")
109+
os.Exit(1)
98110
return
99111
}
100112

@@ -119,11 +131,13 @@ var runCmd = &cobra.Command{
119131
config, err = rest.InClusterConfig()
120132
if err != nil {
121133
l.Error(err, "unable to get kubeconfig")
134+
os.Exit(1)
122135
return
123136
}
124137
}
125138
if config == nil {
126139
l.Error(fmt.Errorf("config is nil"), "unable to create kubeconfig")
140+
os.Exit(1)
127141
return
128142
}
129143

@@ -144,15 +158,125 @@ var runCmd = &cobra.Command{
144158
})
145159
if err != nil {
146160
l.Error(err, "unable to create manager")
161+
os.Exit(1)
147162
return
148163
}
149164

150-
// run all systems
165+
// custom leader elector
166+
coreClient, err := kubernetes.NewForConfig(config)
167+
if err != nil {
168+
l.Error(err, "unable to create core kubernetes client for leader election")
169+
os.Exit(1)
170+
}
171+
172+
podName := os.Getenv("HOSTNAME")
173+
if podName == "" {
174+
l.Error(err, "HOSTNAME environment variable not set, required for leader identity")
175+
os.Exit(1)
176+
}
177+
178+
isLeader := &atomic.Bool{}
179+
isLeader.Store(false)
180+
181+
lock, err := resourcelock.New(
182+
resourcelock.LeasesResourceLock,
183+
namespace,
184+
fmt.Sprintf("%s-lock", name),
185+
nil,
186+
coreClient.CoordinationV1(), // Event recorder
187+
resourcelock.ResourceLockConfig{
188+
Identity: podName,
189+
},
190+
)
191+
if err != nil {
192+
l.Error(err, "unable to create resource lock for custom leader election")
193+
os.Exit(1)
194+
}
195+
196+
leaderElected := make(chan struct{})
197+
198+
leaderCallbacks := leaderelection.LeaderCallbacks{
199+
OnStartedLeading: func(ctx context.Context) {
200+
l.Info("became leader for status updates")
201+
isLeader.Store(true)
202+
},
203+
OnStoppedLeading: func() {
204+
l.Info("stopped leading for status updates")
205+
isLeader.Store(false)
206+
},
207+
OnNewLeader: func(identity string) {
208+
l.Info("new leader elected for status updates", "leader", identity)
209+
if isClosed(leaderElected) {
210+
return
211+
}
212+
close(leaderElected)
213+
},
214+
}
215+
216+
elector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
217+
Lock: lock,
218+
LeaseDuration: 5 * time.Second,
219+
RenewDeadline: 3 * time.Second,
220+
RetryPeriod: 2 * time.Second,
221+
Callbacks: leaderCallbacks,
222+
ReleaseOnCancel: true,
223+
Name: fmt.Sprintf("%s-leader-elector", name),
224+
})
225+
if err != nil {
226+
l.Error(err, "unable to create leader elector for status updates")
227+
os.Exit(1)
228+
}
229+
151230
cmdCtx, cancel := context.WithCancelCause(cmd.Context())
152231
defer cancel(nil)
153232

154233
wg, ctx := errgroup.WithContext(cmdCtx)
155234

235+
// Start the custom leader elector in a goroutine
236+
237+
wg.Go(func() error {
238+
239+
leLogger := l.WithName("custom-leader-elector-loop")
240+
241+
backoffDuration := 5 * time.Second // Initial backoff duration
242+
maxBackoffDuration := 5 * time.Minute // Maximum backoff duration
243+
244+
for {
245+
leLogger.Info("starting custom leader election process")
246+
247+
elector.Run(ctx)
248+
// If we reach here, elector.Run() has exited.
249+
250+
select {
251+
case <-ctx.Done():
252+
leLogger.Info("context cancelled, will not retry leader election", "reason", context.Cause(ctx))
253+
254+
if err := context.Cause(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
255+
return err
256+
}
257+
return nil
258+
default:
259+
// The context is not done, so elector.Run() likely exited due to losing the lease
260+
// or an unrecoverable error during renewal (like the API being unreachable).
261+
leLogger.Error(nil, "custom leader election ended unexpectedly, will retry after backoff", "current_backoff", backoffDuration)
262+
263+
// Wait for the backoff period, but also listen for context cancellation.
264+
select {
265+
case <-time.After(backoffDuration):
266+
// Exponential backoff
267+
backoffDuration *= 2
268+
if backoffDuration > maxBackoffDuration {
269+
backoffDuration = maxBackoffDuration
270+
}
271+
case <-ctx.Done():
272+
leLogger.Info("context cancelled during backoff, stopping leader election retries", "reason", context.Cause(ctx))
273+
return nil
274+
}
275+
}
276+
}
277+
278+
})
279+
156280
listenAddr := make(chan string)
157281
defer close(listenAddr)
158282

@@ -170,10 +294,10 @@ var runCmd = &cobra.Command{
170294
)
171295

172296
//
173-
scheduler = sch.New(func(ctx context.Context, msg *runner.Msg) error {
297+
scheduler = sch.New(func(ctx context.Context, msg *runner.Msg) (any, error) {
174298
m, _, err := m.ParseFullName(msg.To)
175299
if err != nil {
176-
return fmt.Errorf("parse destination error: %v", err)
300+
return nil, fmt.Errorf("parse destination error: %v", err)
177301
}
178302

179303
if m == moduleInfo.GetNameSanitised() {
@@ -182,11 +306,26 @@ var runCmd = &cobra.Command{
182306
}
183307

184308
// gRPC call with retries
309+
var resp []byte
310+
311+
err = backoff.Retry(func() error {
312+
resp, err = pool.Handler(ctx, msg)
313+
return err
185314

186-
return backoff.Retry(func() error {
187-
return pool.Handler(ctx, msg)
188315
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
316+
if err != nil {
317+
return nil, err
318+
}
319+
320+
if msg.Resp == nil {
321+
return resp, nil
322+
}
189323

324+
respData := reflect.New(reflect.TypeOf(msg.Resp)).Elem()
325+
if err = json.Unmarshal(resp, respData.Addr().Interface()); err != nil {
326+
return nil, err
327+
}
328+
return respData.Interface(), err
190329
}).
191330
SetLogger(l).
192331
SetMeter(meter).
@@ -240,6 +379,7 @@ var runCmd = &cobra.Command{
240379
Scheme: mgr.GetScheme(),
241380
Scheduler: scheduler,
242381
Module: moduleInfo,
382+
IsLeader: isLeader,
243383
}
244384

245385
if err = nodeController.SetupWithManager(mgr); err != nil {
@@ -252,6 +392,7 @@ var runCmd = &cobra.Command{
252392
Scheme: mgr.GetScheme(),
253393
Module: moduleInfo,
254394
ClientPool: pool,
395+
IsLeader: isLeader,
255396
}
256397

257398
if err = moduleController.SetupWithManager(mgr); err != nil {
@@ -260,9 +401,10 @@ var runCmd = &cobra.Command{
260401
}
261402

262403
if err = (&controller.TinyTrackerReconciler{
263-
Client: mgr.GetClient(),
264-
Scheme: mgr.GetScheme(),
265-
Manager: trackManager,
404+
Client: mgr.GetClient(),
405+
Scheme: mgr.GetScheme(),
406+
Manager: trackManager,
407+
IsLeader: isLeader,
266408
//
267409
}).SetupWithManager(mgr); err != nil {
268410
l.Error(err, "unable to create tinytracker controller")
@@ -274,6 +416,7 @@ var runCmd = &cobra.Command{
274416
Scheme: mgr.GetScheme(),
275417
Scheduler: scheduler,
276418
Module: moduleInfo,
419+
IsLeader: isLeader,
277420
}).SetupWithManager(mgr); err != nil {
278421
l.Error(err, "unable to create tinysignal controller")
279422
return
@@ -319,9 +462,10 @@ var runCmd = &cobra.Command{
319462
return nil
320463
})
321464

322-
// kubebuilder start
323-
324465
wg.Go(func() error {
466+
<-leaderElected
467+
468+
// start manager
325469
l.Info("starting kubebuilder operator")
326470
defer func() {
327471
l.Info("kubebuilder operator stopped")
@@ -333,7 +477,7 @@ var runCmd = &cobra.Command{
333477
return nil
334478
})
335479

336-
// grpc client start
480+
// start grpc client pool
337481

338482
wg.Go(func() error {
339483
l.Info("starting gRPC client pool")
@@ -380,3 +524,12 @@ var runCmd = &cobra.Command{
380524
l.Info("all done")
381525
},
382526
}
527+
528+
func isClosed(ch <-chan struct{}) bool {
529+
select {
530+
case <-ch:
531+
return true
532+
default:
533+
}
534+
return false
535+
}

internal/client/pool.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,30 +51,33 @@ func (p *AddressPool) SetLogger(l logr.Logger) *AddressPool {
5151
return p
5252
}
5353

54-
func (p *AddressPool) Handler(ctx context.Context, msg *runner.Msg) error {
54+
func (p *AddressPool) Handler(ctx context.Context, msg *runner.Msg) ([]byte, error) {
5555
moduleName, _, err := module2.ParseFullName(msg.To)
5656
if err != nil {
57-
return err
57+
return nil, err
5858
}
5959

6060
addr, ok := p.addressTable.Get(moduleName)
6161
if !ok {
62-
return fmt.Errorf("%s module address is unknown", moduleName)
62+
return nil, fmt.Errorf("%s module address is unknown", moduleName)
6363
}
6464
client, err := p.getClient(ctx, addr)
6565
if err != nil {
6666
p.log.Error(err, "unable to get client", "addr", addr)
6767
}
6868

6969
// sending request using gRPC
70-
_, err = client.Message(ctx, &module.MessageRequest{
70+
resp, err := client.Message(ctx, &module.MessageRequest{
7171
From: msg.From,
7272
Payload: msg.Data,
7373
EdgeID: msg.EdgeID,
7474
To: msg.To,
7575
})
76+
if err != nil {
77+
return nil, err
78+
}
7679

77-
return err
80+
return resp.Data, nil
7881
}
7982

8083
func (p *AddressPool) Start(ctx context.Context) error {

internal/controller/tinymodule_controller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/tiny-systems/module/registry"
2424
"k8s.io/apimachinery/pkg/api/errors"
2525
"sigs.k8s.io/controller-runtime/pkg/reconcile"
26+
"sync/atomic"
2627

2728
"k8s.io/apimachinery/pkg/runtime"
2829
ctrl "sigs.k8s.io/controller-runtime"
@@ -38,6 +39,7 @@ type TinyModuleReconciler struct {
3839
Scheme *runtime.Scheme
3940
Module module.Info
4041
ClientPool clientpool.Pool
42+
IsLeader *atomic.Bool
4143
}
4244

4345
//+kubebuilder:rbac:groups=operator.tinysystems.io,resources=tinymodules,verbs=get;list;watch;create;update;patch;delete;deletecollection
@@ -54,6 +56,11 @@ type TinyModuleReconciler struct {
5456
// For more details, check Reconcile and its Result here:
5557
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
5658
func (r *TinyModuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
59+
60+
if !r.IsLeader.Load() {
61+
return reconcile.Result{}, nil
62+
}
63+
// only leaders update
5764
l := log.FromContext(ctx)
5865

5966
instance := &operatorv1alpha1.TinyModule{}

0 commit comments

Comments
 (0)