Skip to content

Commit 5518821

Browse files
committed
wire: make WaitForReply actually work; export FromSignal
WaitForReply set the x-reply-inbox header but never msg.Reply — and the receive side (transport.handleIncoming) responds via m.RespondMsg, which publishes to the native Reply field and never reads the header. Every WaitForReply call timed out even on successful delivery. Set msg.Reply to the inbox; the header stays for observability. Also export wire.FromSignal and alias runner.FromSignal to it, so external publishers (the platform's publish-signal path) stop hardcoding the "signal" literal that the runner's raw-payload check depends on. Publisher-side only — deployed modules already reply natively, no module re-tags needed.
1 parent 3e46b5d commit 5518821

2 files changed

Lines changed: 24 additions & 4 deletions

File tree

internal/scheduler/runner/runner.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/tiny-systems/module/pkg/resource"
2828
"github.com/tiny-systems/module/pkg/schema"
2929
"github.com/tiny-systems/module/pkg/utils"
30+
"github.com/tiny-systems/module/pkg/wire"
3031
"go.opentelemetry.io/otel/attribute"
3132
"go.opentelemetry.io/otel/codes"
3233
"go.opentelemetry.io/otel/metric"
@@ -72,7 +73,7 @@ type Runner struct {
7273
reconcileDebouncer *ReconcileDebouncer
7374

7475
// pendingNodeUpdaters accumulates metadata update functions during debounce window
75-
pendingNodeUpdaters []func(*v1alpha1.TinyNode) error
76+
pendingNodeUpdaters []func(*v1alpha1.TinyNode) error
7677
pendingNodeUpdatersLock *sync.Mutex
7778

7879
// activeEdges tracks busy edge timestamps for the single gauge callback
@@ -113,7 +114,9 @@ type snapshotRefresher interface {
113114
}
114115

115116
const (
116-
FromSignal = "signal"
117+
// FromSignal aliases wire.FromSignal so external publishers and
118+
// the runner's signal check can never drift apart.
119+
FromSignal = wire.FromSignal
117120
)
118121

119122
func NewRunner(component m.Component) *Runner {
@@ -1139,8 +1142,8 @@ func (c *Runner) clearGauge(edgeID string) {
11391142

11401143
// retryState holds the live retry counter and latest transient error for one edge.
11411144
type retryState struct {
1142-
count int64
1143-
lastErr string
1145+
count int64
1146+
lastErr string
11441147
}
11451148

11461149
func (c *Runner) ensureRetryGauge() {

pkg/wire/publish.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ const (
4141

4242
const subjectPrefix = "tinymodule"
4343

44+
// FromSignal is the Options.From value that marks a message as an
45+
// external signal (dashboard Send, MCP send_signal, debug). The
46+
// runner unmarshals the raw payload into the port struct ONLY for
47+
// signal-originated messages — any other From (including empty)
48+
// makes it fall back to edge-config evaluation or the port's default
49+
// configuration, silently discarding the payload. Mirrors
50+
// runner.FromSignal, which lives in internal/ and can't be imported
51+
// by external publishers.
52+
const FromSignal = "signal"
53+
4454
// SubjectFor returns the business-hop NATS subject for a given
4555
// module. Senders publish here for edge-driven hops; the module's
4656
// queue-group consumer load-balances among pods.
@@ -134,6 +144,13 @@ func Publish(ctx context.Context, nc *nats.Conn, targetNode, port string, data [
134144
}
135145
defer func() { _ = sub.Unsubscribe() }()
136146
msg.Header.Set(HeaderReplyInbox, inbox)
147+
// The receive side (transport.handleIncoming) replies with
148+
// m.RespondMsg, which publishes to the NATS-native Reply field —
149+
// it never reads the x-reply-inbox header. Without msg.Reply set
150+
// here every WaitForReply call timed out even on successful
151+
// delivery. Keep the header for observability, but the native
152+
// Reply field is what makes the round-trip work.
153+
msg.Reply = inbox
137154

138155
if err := nc.PublishMsg(msg); err != nil {
139156
return nil, fmt.Errorf("publish: %w", err)

0 commit comments

Comments
 (0)