Skip to content

Commit 8955c12

Browse files
committed
cli: select transport via TINY_NATS_TRANSPORT (core default, jetstream opt-in)
Defines transport.Transport interface — both NATS (core) and JetStream (durable) satisfy it. cli/run.go picks at boot based on TINY_NATS_TRANSPORT env, defaulting to 'core' for backward compatibility. Setting 'jetstream' ensures the module-edges stream on startup and binds the JS-backed sender/receiver. No change for existing deployments — env unset = today's core req/reply. Flip to durable wire on a per-cluster basis by setting TINY_NATS_TRANSPORT=jetstream in operator.release_values.
1 parent e8002fa commit 8955c12

2 files changed

Lines changed: 29 additions & 4 deletions

File tree

cli/run.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -352,12 +352,27 @@ var runCmd = &cobra.Command{
352352

353353
// NATS transport selector. When TINY_NATS_URL is set we route
354354
// cross-module messages through NATS subjects instead of the
355-
// gRPC AddressPool. Same runner.Handler contract — scheduler
356-
// doesn't notice. Unset = legacy gRPC path.
355+
// gRPC AddressPool. TINY_NATS_TRANSPORT picks between:
356+
// "core" — core req/reply, ephemeral, fast (default)
357+
// "jetstream" — durable WorkQueue stream, AckWait pod-death
358+
// recovery, single-shot on handler error
359+
// Unset TINY_NATS_URL = legacy gRPC path, no change.
357360
natsRt := connectNATS(ctx, l)
358-
var natsTransport *transport.NATS
361+
var natsTransport transport.Transport
359362
if natsRt != nil {
360-
natsTransport = transport.NewNATS(natsRt.NC, moduleInfo.GetNameSanitised(), l)
363+
moduleName := moduleInfo.GetNameSanitised()
364+
switch os.Getenv("TINY_NATS_TRANSPORT") {
365+
case "jetstream":
366+
if err := transport.EnsureEdgeStream(ctx, natsRt.JS); err != nil {
367+
l.Error(err, "ensure edge stream")
368+
os.Exit(1)
369+
}
370+
natsTransport = transport.NewJetStream(natsRt.JS, natsRt.NC, moduleName, l)
371+
l.Info("transport: jetstream-backed durable wire")
372+
default:
373+
natsTransport = transport.NewNATS(natsRt.NC, moduleName, l)
374+
l.Info("transport: core req/reply")
375+
}
361376
}
362377

363378
//

internal/transport/nats.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ func SubjectFor(moduleName string) string {
5858
return fmt.Sprintf("%s.%s.msg", subjectPrefix, moduleName)
5959
}
6060

61+
// Transport is the cross-module wire contract that both the core
62+
// req/reply NATS transport (this file) and the JetStream-backed
63+
// transport (jetstream.go) satisfy. cli/run.go picks one at boot
64+
// based on TINY_NATS_TRANSPORT and binds it to the scheduler — the
65+
// rest of the SDK doesn't care which substrate carries the bytes.
66+
type Transport interface {
67+
Handler(ctx context.Context, msg *runner.Msg) ([]byte, error)
68+
StartReceiver(ctx context.Context, handler runner.Handler) error
69+
}
70+
6171
// NATS holds the cross-module wire backed by NATS core request/reply.
6272
// Implements both sender (Handler) and receiver (StartReceiver) so a
6373
// single instance replaces both AddressPool and the gRPC server.

0 commit comments

Comments
 (0)