Skip to content

Commit 850416d

Browse files
tpoxaclaude
andcommitted
fix: durable signal delivery preserves source port for edge config eval
v0.4.0 had a real bug exposed by in-cluster validation: durable-port persist captures msg.Data BEFORE MsgHandler runs edge config evaluation. The signal stored the source's raw output (e.g. an http response), but the target port (e.g. kv:store) expected an already-mapped target shape (e.g. StoreRequest with primary key populated by edge expressions). Reconciler delivery would then unmarshal raw source bytes as target type and the component would reject with structural errors ("primary key not found in document"), the reconciler would treat as transient, retry forever, and signals would accumulate. Fix: - TinySignalReconciler now sends msg.From = signal.Spec.From (the original source port) when the spec carries it, instead of always FromSignal. This makes MsgHandler treat the message as a regular edge delivery and run edge config evaluation, mapping source's raw payload into the target port's expected shape. - To prevent infinite re-persist (msg.From != FromSignal would normally retrigger the durable-port persist branch), reconciler marks the delivery context with utils.WithDurableDelivery, and scheduler.Handle checks this flag in addition to From. Externally-injected signals (send_signal MCP, ticker fire) keep their existing behavior: Spec.From is empty, msg.From defaults to FromSignal, MsgHandler treats data as already-typed. New test in scheduler_durable_test.go covers the durable-delivery-context-skips-persist case so future regressions can't reintroduce the loop. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a131c38 commit 850416d

4 files changed

Lines changed: 84 additions & 8 deletions

File tree

internal/controller/tinysignal_controller.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request)
128128
deliveryCtx, cancel := context.WithTimeout(context.Background(), deliveryTimeout)
129129
defer cancel()
130130
deliveryCtx = utils.WithLeader(deliveryCtx, true)
131+
// Mark this delivery so scheduler.Handle skips the durable-port persist
132+
// step (we ARE the reconciler delivering a persisted signal — re-persisting
133+
// would be an infinite loop).
134+
deliveryCtx = utils.WithDurableDelivery(deliveryCtx)
131135

132136
// Inject a remote span context if the signal carries a TraceID so
133137
// the resulting trace stitches under the caller-supplied root.
@@ -146,9 +150,19 @@ func (r *TinySignalReconciler) Reconcile(ctx context.Context, req ctrl.Request)
146150
}
147151
}
148152

149-
l.Info("delivering signal", "targetPort", targetPort)
153+
// For durable-port-persisted signals, signal.Spec.From carries the
154+
// original source port so MsgHandler runs edge config evaluation
155+
// (target's "from this source" port-config maps source output → target
156+
// input shape). For externally-injected signals (send_signal MCP),
157+
// Spec.From is empty and we use FromSignal — MsgHandler treats the
158+
// payload as already-typed in that case (existing legacy behavior).
159+
deliveryFrom := runner.FromSignal
160+
if signal.Spec.From != "" {
161+
deliveryFrom = signal.Spec.From
162+
}
163+
l.Info("delivering signal", "targetPort", targetPort, "from", deliveryFrom)
150164
_, deliverErr := r.Scheduler.Handle(deliveryCtx, &runner.Msg{
151-
From: runner.FromSignal,
165+
From: deliveryFrom,
152166
To: targetPort,
153167
Data: signal.Spec.Data,
154168
EdgeID: signal.Spec.EdgeID,

internal/scheduler/scheduler.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,18 @@ func (s *Schedule) Handle(ctx context.Context, msg *runner.Msg) (any, error) {
218218
}
219219

220220
// Durable-port intake. If the target port is marked Durable AND this
221-
// message did not arrive from the TinySignal reconciler (which would
222-
// already be a delivery of a persisted signal), persist a TinySignal
223-
// CRD and ack immediately. The reconciler will deliver it later.
224-
if msg.From != runner.FromSignal {
221+
// is NOT a reconciler-driven delivery of an already-persisted signal,
222+
// persist a TinySignal CRD with the source's raw payload and ack
223+
// immediately. The reconciler later replays the signal back through
224+
// scheduler.Handle with utils.WithDurableDelivery(ctx), which skips
225+
// this branch and lets MsgHandler evaluate the edge config (mapping
226+
// source's output shape into target's expected port shape).
227+
//
228+
// Externally-injected signals (send_signal MCP, ticker fire) arrive
229+
// with msg.From == runner.FromSignal; we let those through without
230+
// persisting (they're already targeting a port and the existing
231+
// signal-path treats data as already-typed).
232+
if !utils.IsDurableDelivery(ctx) && msg.From != runner.FromSignal {
225233
if p, ok := instance.GetPort(port); ok && p.Durable && s.manager != nil {
226234
name := signalname.For(nodeName, port, msg.Data, msg.EdgeID, "")
227235
if err := s.manager.PersistDurableSignal(ctx, name, nodeName, instance.Node().Namespace, port, msg.Data, msg.EdgeID, msg.From); err != nil {

internal/scheduler/scheduler_durable_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/tiny-systems/module/internal/scheduler/runner"
2020
"github.com/tiny-systems/module/module"
2121
"github.com/tiny-systems/module/pkg/state"
22+
"github.com/tiny-systems/module/pkg/utils"
2223
metricnoop "go.opentelemetry.io/otel/metric/noop"
2324
tracenoop "go.opentelemetry.io/otel/trace/noop"
2425
"k8s.io/apimachinery/pkg/runtime"
@@ -281,6 +282,41 @@ func TestDurablePort_SignalDeliveryDoesNotRePersist(t *testing.T) {
281282
}
282283
}
283284

285+
// TestDurablePort_DeliveryContextSkipsPersist verifies the "don't loop forever"
286+
// guard: when the reconciler replays a persisted signal back through
287+
// scheduler.Handle with utils.WithDurableDelivery(ctx), the durable-port
288+
// branch must skip and dispatch normally — otherwise we'd re-persist the
289+
// same signal infinitely.
290+
func TestDurablePort_DeliveryContextSkipsPersist(t *testing.T) {
291+
mgr := &recordingManager{}
292+
s := newDurableTestSchedule(t, mgr)
293+
294+
cmp := &durableSinkComponent{name: "dsink"}
295+
if err := s.Install(cmp); err != nil {
296+
t.Fatalf("Install: %v", err)
297+
}
298+
installNode(t, s, "dsink-1", "dsink")
299+
300+
// Use a non-FromSignal From (mimics real durable-port persist) but
301+
// mark the context as durable delivery (mimics reconciler replay).
302+
ctx := utils.WithDurableDelivery(context.Background())
303+
_, err := s.Handle(ctx, &runner.Msg{
304+
From: "source-node:source-port",
305+
To: "dsink-1:in",
306+
Data: []byte(`{}`),
307+
})
308+
if err != nil {
309+
t.Fatalf("Handle: %v", err)
310+
}
311+
312+
if calls := mgr.recordedCalls(); len(calls) != 0 {
313+
t.Errorf("durable-delivery context should suppress persist; got %d calls", len(calls))
314+
}
315+
if cmp.hits() != 1 {
316+
t.Errorf("expected Component.Handle to be invoked once; got %d", cmp.hits())
317+
}
318+
}
319+
284320
// TestDurablePort_NonDurablePortStillFastPaths verifies that ports without
285321
// Durable=true preserve the existing blocking-I/O fast path: no persist
286322
// call, Handle invoked synchronously.

pkg/utils/ctx.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,28 @@ import "context"
55
type contextKey string
66

77
const (
8-
leaderKey contextKey = "leader"
9-
sourceNodeKey contextKey = "sourceNode"
8+
leaderKey contextKey = "leader"
9+
sourceNodeKey contextKey = "sourceNode"
10+
durableDeliveryKey contextKey = "durableDelivery"
1011
)
1112

13+
// WithDurableDelivery marks the context as a TinySignalReconciler-driven
14+
// delivery of a previously-persisted durable-port signal. Scheduler.Handle
15+
// uses this to skip the durable-port persist step and instead route the
16+
// message through normal MsgHandler dispatch (so edge config evaluation
17+
// runs at delivery time, mapping the source's raw output into the target
18+
// port's expected shape).
19+
func WithDurableDelivery(ctx context.Context) context.Context {
20+
return context.WithValue(ctx, durableDeliveryKey, true)
21+
}
22+
23+
// IsDurableDelivery reports whether the current call is the reconciler
24+
// delivering an already-persisted durable signal.
25+
func IsDurableDelivery(ctx context.Context) bool {
26+
v, _ := ctx.Value(durableDeliveryKey).(bool)
27+
return v
28+
}
29+
1230
func WithLeader(ctx context.Context, isLeader bool) context.Context {
1331
return context.WithValue(ctx, leaderKey, isLeader)
1432
}

0 commit comments

Comments
 (0)