Skip to content

Commit fd3939b

Browse files
committed
adds persistent signals
adds retries
1 parent 049c5c9 commit fd3939b

11 files changed

Lines changed: 184 additions & 100 deletions

File tree

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,15 @@ make manifests
7676
### Create new api
7777
```shell
7878
kubebuilder create api --group operator --version v1alpha1 --kind TinySignal
79-
8079
```
8180

8281
**NOTE:** Run `make --help` for more information on all potential `make` targets
8382

8483
More information can be found via the [Kubebuilder Documentation](https://book.kubebuilder.io/introduction.html)
8584

85+
### Module development
86+
tbd
87+
8688
## License
8789

8890
Copyright 2023.

api/v1alpha1/tinynode_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
//ProjectIDLabel project ID k8s label
2727
ProjectIDLabel = "tinysystems.io/project-id"
2828

29+
NodeNameLabel = "tinysystems.io/node-name"
30+
2931
//ModuleNameMajorLabel module major version label
3032
ModuleNameMajorLabel = "tinysystems.io/module-version-major"
3133
//ModuleVersionLabel module exact version label
@@ -40,6 +42,7 @@ const (
4042
ComponentPosYAnnotation = "tinysystems.io/component-pos-y"
4143
ComponentPosSpinAnnotation = "tinysystems.io/component-pos-spin"
4244

45+
SignalNonceAnnotation = "tinysystems.io/signal-nonce"
4346
NodeLabelAnnotation = "tinysystems.io/node-label"
4447
NodeCommentAnnotation = "tinysystems.io/node-comment"
4548
SuggestedHttpPortAnnotation = "tinysystems.io/suggested-http-port"

cli/run.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cli
33
import (
44
"context"
55
"fmt"
6+
"github.com/cenkalti/backoff/v4"
67
"github.com/go-logr/zerologr"
78
"github.com/rs/zerolog/log"
89
"github.com/spf13/cobra"
@@ -182,8 +183,12 @@ var runCmd = &cobra.Command{
182183
return scheduler.Handle(ctx, msg)
183184
}
184185

185-
// gRPC call
186-
return pool.Handler(ctx, msg)
186+
// gRPC call with retries
187+
188+
return backoff.Retry(func() error {
189+
return pool.Handler(ctx, msg)
190+
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
191+
187192
}).
188193
SetLogger(l).
189194
SetMeter(meter).

internal/client/pool.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"context"
5+
"fmt"
56
"github.com/go-logr/logr"
67
cmap "github.com/orcaman/concurrent-map/v2"
78
"github.com/tiny-systems/module/internal/scheduler/runner"
@@ -58,8 +59,7 @@ func (p *AddressPool) Handler(ctx context.Context, msg *runner.Msg) error {
5859

5960
addr, ok := p.addressTable.Get(moduleName)
6061
if !ok {
61-
p.log.Error(err, "module address is unknown", "name", moduleName)
62-
return err
62+
return fmt.Errorf("%s module address is unknown", moduleName)
6363
}
6464
client, err := p.getClient(ctx, addr)
6565
if err != nil {
@@ -73,6 +73,7 @@ func (p *AddressPool) Handler(ctx context.Context, msg *runner.Msg) error {
7373
EdgeID: msg.EdgeID,
7474
To: msg.To,
7575
})
76+
7677
return err
7778
}
7879

internal/controller/tinynode_controller.go

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controller
1818

1919
import (
2020
"context"
21+
"fmt"
2122
operatorv1alpha1 "github.com/tiny-systems/module/api/v1alpha1"
2223
"github.com/tiny-systems/module/internal/scheduler"
2324
"github.com/tiny-systems/module/module"
@@ -64,7 +65,6 @@ type TinyNodeReconciler struct {
6465
func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6566
l := log.FromContext(ctx)
6667

67-
//l.Info("reconcile", "tinynode", req.Name)
6868
m, _, err := module.ParseFullName(req.Name)
6969
if err != nil {
7070
l.Error(err, "node has invalid name", "name", req.Name)
@@ -76,22 +76,49 @@ func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
7676
return reconcile.Result{}, nil
7777
}
7878

79+
l.Info("reconcile", "tinynode", req.Name)
80+
7981
node := &operatorv1alpha1.TinyNode{}
8082

8183
if err = r.Get(context.Background(), req.NamespacedName, node); err != nil {
8284
l.Error(err, "get tinynode error")
8385
if errors.IsNotFound(err) {
8486
// Object not found, return. Created objects are automatically garbage collected.
8587
// For additional cleanup logic use finalizers.
88+
// delete signals
89+
90+
_ = r.DeleteAllOf(context.Background(), &operatorv1alpha1.TinySignal{}, client.InNamespace(req.Namespace), client.MatchingLabels{
91+
operatorv1alpha1.NodeNameLabel: req.Name,
92+
})
93+
8694
if err = r.Scheduler.Destroy(req.Name); err != nil {
87-
l.Error(err, "destroy error")
8895
return reconcile.Result{}, err
8996
}
9097
return reconcile.Result{}, nil
9198
}
9299
// Error reading the object - requeue the request.
93100
return reconcile.Result{}, err
94101
}
102+
originNode := node.DeepCopy()
103+
104+
// select all signals for this node
105+
106+
signalList := &operatorv1alpha1.TinySignalList{}
107+
108+
selector, err := v1.LabelSelectorAsSelector(&v1.LabelSelector{
109+
MatchLabels: map[string]string{
110+
operatorv1alpha1.NodeNameLabel: node.Name,
111+
},
112+
})
113+
if err != nil {
114+
return reconcile.Result{}, fmt.Errorf("build signal selector error: %s", err)
115+
}
116+
117+
if err = r.List(ctx, signalList, client.MatchingLabelsSelector{
118+
Selector: selector,
119+
}, client.InNamespace(node.Namespace)); err != nil {
120+
return reconcile.Result{}, fmt.Errorf("signal list error: %v", err)
121+
}
95122

96123
status := &node.Status
97124

@@ -109,7 +136,7 @@ func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
109136
status.LastUpdateTime = &t
110137
// upsert in scheduler
111138
// todo add app level context
112-
err = r.Scheduler.Update(context.Background(), node)
139+
err = r.Scheduler.Update(context.Background(), node, signalList)
113140
if err != nil {
114141
l.Error(err, "scheduler upsert error")
115142
status.Error = true
@@ -118,7 +145,8 @@ func (r *TinyNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
118145

119146
node.Status = *status
120147

121-
err = r.Status().Update(context.Background(), node)
148+
err = r.Status().Patch(context.Background(), node, client.MergeFrom(originNode))
149+
//err = r.Status().Update(context.Background(), node)
122150
if err != nil {
123151
l.Error(err, "status update error")
124152
return reconcile.Result{}, err
@@ -135,15 +163,43 @@ func (r *TinyNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
135163
return ctrl.NewControllerManagedBy(mgr).
136164
For(&operatorv1alpha1.TinyNode{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
137165
Watches(&operatorv1alpha1.TinySignal{}, handler.TypedFuncs[client.Object, reconcile.Request]{
138-
DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
166+
167+
CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
139168
signal, ok := e.Object.(*operatorv1alpha1.TinySignal)
140169
if !ok {
141170
return
142171
}
143-
if signal.Spec.Port != module.ReconcilePort {
144-
// do not reconcile if signal was used as a way to send data
172+
if signal.Spec.Port == module.ReconcilePort {
173+
// reconcile by signal only applies after deletion of the signal
174+
return
175+
}
176+
q.Add(reconcile.Request{
177+
NamespacedName: types.NamespacedName{
178+
Name: signal.Spec.Node,
179+
Namespace: signal.Namespace,
180+
},
181+
})
182+
},
183+
UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
184+
signal, ok := e.ObjectNew.(*operatorv1alpha1.TinySignal)
185+
if !ok {
186+
return
187+
}
188+
// we do not update reconcile signals, only data ports
189+
q.Add(reconcile.Request{
190+
NamespacedName: types.NamespacedName{
191+
Name: signal.Spec.Node,
192+
Namespace: signal.Namespace,
193+
},
194+
})
195+
},
196+
// when tinySignal deleted signal with reconcile port - reconcile
197+
DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
198+
signal, ok := e.Object.(*operatorv1alpha1.TinySignal)
199+
if !ok {
145200
return
146201
}
202+
// we reconcile if any relates signal deleted
147203
q.Add(reconcile.Request{
148204
NamespacedName: types.NamespacedName{
149205
Name: signal.Spec.Node,

internal/controller/tinysignal_controller.go

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ package controller
1919
import (
2020
"context"
2121
"github.com/tiny-systems/module/internal/scheduler"
22-
"github.com/tiny-systems/module/internal/scheduler/runner"
2322
"github.com/tiny-systems/module/module"
24-
"github.com/tiny-systems/module/pkg/utils"
25-
"k8s.io/apimachinery/pkg/api/errors"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2624
"sigs.k8s.io/controller-runtime/pkg/reconcile"
25+
"strings"
2726

2827
"k8s.io/apimachinery/pkg/runtime"
2928
ctrl "sigs.k8s.io/controller-runtime"
@@ -56,7 +55,6 @@ type TinySignalReconciler struct {
5655
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
5756
func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
5857
l := log.FromContext(ctx)
59-
l.Info("reconcile", "tinysignal", req.Name)
6058
// tiny signal names after name of node it's signaling to
6159

6260
// to avoid making many queries to Kubernetes API we check name itself against current module
@@ -70,35 +68,21 @@ func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request)
7068
return reconcile.Result{}, nil
7169
}
7270

73-
signal := &operatorv1alpha1.TinySignal{}
74-
err = r.Get(context.Background(), req.NamespacedName, signal)
75-
76-
if err != nil {
77-
if errors.IsNotFound(err) {
78-
// delete signal if related node not found
79-
return reconcile.Result{}, nil
80-
}
81-
return reconcile.Result{}, err
82-
}
83-
84-
if signal.Spec.Port == module.ReconcilePort {
85-
// we do not use signals to reconcile directly
86-
_ = r.Delete(ctx, signal)
87-
return ctrl.Result{}, nil
71+
// @todo const
72+
if !strings.HasSuffix(req.Name, "-reconcile") {
73+
return reconcile.Result{}, nil
8874
}
8975

90-
// todo add app level context
91-
if err = r.Scheduler.HandleInternal(context.Background(), &runner.Msg{
92-
EdgeID: utils.GetPortFullName(signal.Spec.Node, signal.Spec.Port),
93-
To: utils.GetPortFullName(signal.Spec.Node, signal.Spec.Port),
94-
From: "signal",
95-
Data: signal.Spec.Data,
96-
}); err != nil {
97-
l.Error(err, "invoke error")
76+
signal := &operatorv1alpha1.TinySignal{
77+
ObjectMeta: metav1.ObjectMeta{
78+
Name: req.Name,
79+
Namespace: req.Namespace,
80+
},
9881
}
9982

10083
_ = r.Delete(ctx, signal)
10184
return ctrl.Result{}, nil
85+
10286
}
10387

10488
// SetupWithManager sets up the controller with the Manager.

internal/resource/manager.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -376,16 +376,43 @@ func (m Manager) DisclosePort(ctx context.Context, port int) error {
376376
}
377377

378378
func (m Manager) CreateClusterNodeSignal(ctx context.Context, node v1alpha1.TinyNode, port string, data []byte) error {
379-
signal := &v1alpha1.TinySignal{
380-
Spec: v1alpha1.TinySignalSpec{
381-
Node: node.Name,
382-
Port: port,
383-
Data: data,
384-
},
379+
380+
signal := &v1alpha1.TinySignal{}
381+
name := fmt.Sprintf("%s-%s", node.Name, strings.ReplaceAll(port, "_", ""))
382+
383+
err := m.client.Get(ctx, client.ObjectKey{Namespace: node.Namespace, Name: name}, signal)
384+
if err != nil && !errors.IsNotFound(err) {
385+
return err
386+
}
387+
388+
newSignal := signal.DeepCopy()
389+
390+
newSignal.Namespace = node.Namespace
391+
newSignal.Name = name
392+
393+
newSignal.Labels = map[string]string{
394+
v1alpha1.NodeNameLabel: node.Name,
395+
}
396+
397+
newSignal.Spec = v1alpha1.TinySignalSpec{
398+
Node: node.Name,
399+
Port: port,
400+
Data: data,
385401
}
386-
signal.Namespace = node.Namespace
387-
signal.GenerateName = fmt.Sprintf("%s-%s-", node.Name, strings.ReplaceAll(port, "_", ""))
388-
return m.client.Create(ctx, signal)
402+
403+
if errors.IsNotFound(err) {
404+
if err := m.client.Create(ctx, newSignal); err != nil {
405+
return err
406+
}
407+
return nil
408+
}
409+
410+
err = m.client.Patch(ctx, newSignal, client.MergeFrom(signal))
411+
if err != nil && !errors.IsNotFound(err) {
412+
return err
413+
}
414+
415+
return nil
389416
}
390417

391418
func (m Manager) Start(ctx context.Context) error {

internal/scheduler/runner/msg.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,6 @@ type Msg struct {
99
// recipient of this message in a format node:port
1010
To string `json:"to"`
1111
Data []byte `json:"data"`
12+
13+
Nonce string `json:"nonce"`
1214
}

0 commit comments

Comments
 (0)