diff --git a/go.mod b/go.mod index 49ca598..d31edb8 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,15 @@ module github.com/GoCodeAlone/workflow-plugin-eventbus go 1.26.0 -require google.golang.org/protobuf v1.36.11 +require ( + github.com/nats-io/nats.go v1.51.0 + google.golang.org/protobuf v1.36.11 +) + +require ( + github.com/klauspost/compress v1.18.5 // indirect + github.com/nats-io/nkeys v0.4.15 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.49.0 // indirect + golang.org/x/sys v0.42.0 // indirect +) diff --git a/go.sum b/go.sum index 296be18..a10f4ef 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,16 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= +github.com/nats-io/nats.go v1.51.0 h1:ByW84XTz6W03GSSsygsZcA+xgKK8vPGaa/FCAAEHnAI= +github.com/nats-io/nats.go v1.51.0/go.mod h1:26HypzazeOkyO3/mqd1zZd53STJN0EjCYF9Uy2ZOBno= +github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4= +github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= +golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= diff --git a/providers/conformance_test.go b/providers/conformance_test.go new file mode 100644 index 0000000..9bf1172 --- /dev/null +++ b/providers/conformance_test.go @@ -0,0 +1,427 @@ +// Package providers_test contains conformance tests that exercise the complete +// Provider interface lifecycle from provision through teardown. +// +// NATS × DigitalOcean App Platform tests are gated behind INTEGRATION_NATS_DO=1 +// and skipped cleanly when the variable is absent — no panic, no setup overhead. +// The runtime phases (publish → consume → ack → drain → teardown) require a +// reachable NATS server; set NATS_URL to override the default localhost:4222. +package providers_test + +import ( + "os" + "strings" + "testing" + "time" + + natsclient "github.com/nats-io/nats.go" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" + natsprovider "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/nats" + "google.golang.org/protobuf/types/known/durationpb" +) + +// integrationKey gates NATS × DigitalOcean App Platform conformance tests. +const integrationKey = "INTEGRATION_NATS_DO" + +// skipUnlessNATSDO skips the calling test when INTEGRATION_NATS_DO is not set. +func skipUnlessNATSDO(t *testing.T) { + t.Helper() + if os.Getenv(integrationKey) == "" { + t.Skipf("set %s=1 to run NATS × DigitalOcean App Platform conformance tests", integrationKey) + } +} + +// natsServerURI returns the NATS server URI for integration tests. +// NATS_URL env var overrides the default; falls back to localhost:4222. +func natsServerURI() string { + if u := os.Getenv("NATS_URL"); u != "" { + return u + } + return natsclient.DefaultURL // "nats://127.0.0.1:4222" +} + +// provisionedState simulates IaC state after a NATS cluster has been provisioned +// on DigitalOcean App Platform (the "uri" output is populated by the realizer). +var provisionedState = iac.State{ + Outputs: map[string]iac.Output{ + "uri": {Value: "nats://nats.internal:4222", Sensitive: true}, + }, +} + +// bmwClusterCfg returns the ClusterConfig for the BMW pilot NATS cluster. +func bmwClusterCfg() *eventbusv1.ClusterConfig { + return &eventbusv1.ClusterConfig{ + Version: "2.10", + Replicas: 3, + Jetstream: &eventbusv1.JetStreamConfig{ + Enabled: true, + MaxStorageBytes: 10 * 1024 * 1024 * 1024, // 10 GiB + }, + } +} + +// bmwStreams returns representative BMW pilot JetStream stream configs. +func bmwStreams() []*eventbusv1.StreamConfig { + return []*eventbusv1.StreamConfig{ + { + Name: "BMW_FULFILLMENT", + Subjects: []string{"fulfillment.>"}, + RetentionPolicy: eventbusv1.RetentionPolicy_RETENTION_POLICY_LIMITS, + NumReplicas: 3, + MaxBytes: 1 << 30, // 1 GiB + }, + { + Name: "BMW_ORDERS", + Subjects: []string{"orders.created", "orders.updated"}, + RetentionPolicy: eventbusv1.RetentionPolicy_RETENTION_POLICY_WORKQUEUE, + NumReplicas: 3, + MaxAge: durationpb.New(7 * 24 * 60 * 60 * 1e9), // 7 days in nanoseconds + }, + } +} + +// TestNATSConformance_DOApp exercises the complete NATS × DigitalOcean App +// Platform Provider lifecycle across nine phases: +// +// 1. provision — Resources() emits valid IaC cluster declarations +// 2. stream — StreamResources() emits valid nats.stream_create resources +// 3. connect — ConnectionString() derives the correct broker URI from state +// 4. probe — Probe() returns a HealthCheck without panicking +// 5. publish — connect to NATS server, create JetStream stream, publish message +// 6. consume — subscribe and fetch the published message +// 7. ack — acknowledge the fetched message +// 8. drain — drain the subscription +// 9. teardown — delete the stream, assert it is gone +// +// All subtests require INTEGRATION_NATS_DO=1. +// Runtime phases (5–9) additionally require a reachable NATS server (NATS_URL +// or localhost:4222 by default). +func TestNATSConformance_DOApp(t *testing.T) { + skipUnlessNATSDO(t) + + // p is declared as the interface — conformance tests must not rely on + // internals of the concrete *provider type. + var p providers.Provider = natsprovider.New() + + // ── 1. provision ───────────────────────────────────────────────────────── + t.Run("provision", func(t *testing.T) { + resources, err := p.Resources(bmwClusterCfg(), providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + + // Locate resources by Kind rather than index so the test is resilient to + // ordering changes in resourcesForDOApp. + var cs, st *iac.Resource + for i := range resources { + switch resources[i].Kind { + case "infra.container_service": + cs = &resources[i] + case "infra.storage": + st = &resources[i] + } + } + if cs == nil { + t.Fatal("provision: no infra.container_service resource emitted") + } + if st == nil { + t.Fatal("provision: no infra.storage resource emitted (JetStream requires it)") + } + + // ── container service ──────────────────────────────────────────────── + if !strings.HasPrefix(cs.Properties["image"], "docker.io/library/nats") { + t.Errorf("image = %q, want prefix %q", cs.Properties["image"], "docker.io/library/nats") + } + if !strings.Contains(cs.Properties["internal_ports"], "4222") { + t.Errorf("internal_ports = %q, want client port 4222", cs.Properties["internal_ports"]) + } + if cs.Properties["storage_ref"] == "" { + t.Error("storage_ref is empty; JetStream requires a volume reference on the container resource") + } + if cs.Labels["deploy_target"] != string(providers.TargetDigitalOceanApp) { + t.Errorf("deploy_target label = %q, want %q", + cs.Labels["deploy_target"], string(providers.TargetDigitalOceanApp)) + } + + // ── jetstream storage volume ───────────────────────────────────────── + if st.Properties["storage_size_bytes"] == "" { + t.Error("infra.storage: storage_size_bytes property is empty") + } + }) + + // ── 2. stream ───────────────────────────────────────────────────────────── + t.Run("stream", func(t *testing.T) { + resources, err := p.StreamResources(bmwStreams(), provisionedState) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + if len(resources) != 2 { + t.Fatalf("StreamResources() returned %d resources, want 2", len(resources)) + } + + for _, r := range resources { + if r.Kind != "nats.stream_create" { + t.Errorf("stream %q Kind = %q, want %q", r.Name, r.Kind, "nats.stream_create") + } + if r.Properties["server_uri"] != "nats://nats.internal:4222" { + t.Errorf("stream %q server_uri = %q, want %q", + r.Name, r.Properties["server_uri"], "nats://nats.internal:4222") + } + rp := r.Properties["retention_policy"] + switch rp { + case "limits", "workqueue", "interest": + // valid NATS-native values + default: + t.Errorf("stream %q retention_policy = %q, want NATS-native value (limits/workqueue/interest)", r.Name, rp) + } + nr := r.Properties["num_replicas"] + if nr == "" || nr == "0" { + t.Errorf("stream %q num_replicas = %q, want ≥1", r.Name, nr) + } + } + + // BMW_ORDERS has max_age=7d — verify it is emitted. + var ordersResource *iac.Resource + for i := range resources { + if resources[i].Name == "BMW_ORDERS" { + ordersResource = &resources[i] + break + } + } + if ordersResource == nil { + t.Fatal("BMW_ORDERS resource not found in StreamResources output") + } + if ordersResource.Properties["max_age"] == "" { + t.Error("BMW_ORDERS: max_age property is empty when MaxAge=7d is configured") + } + }) + + // ── 3. connect ──────────────────────────────────────────────────────────── + t.Run("connect", func(t *testing.T) { + uri, err := p.ConnectionString(provisionedState, "") + if err != nil { + t.Fatalf("ConnectionString() error: %v", err) + } + if !strings.HasPrefix(uri, "nats://") { + t.Errorf("ConnectionString() = %q, want nats:// scheme", uri) + } + if !strings.Contains(uri, ":4222") { + t.Errorf("ConnectionString() = %q, want NATS client port 4222", uri) + } + }) + + // ── 3a. connect / env-override ──────────────────────────────────────────── + t.Run("connect/env-override", func(t *testing.T) { + envState := iac.State{ + Outputs: map[string]iac.Output{ + "uri": {Value: "nats://nats.internal:4222", Sensitive: true}, + "uri.prod": {Value: "nats://nats-prod.internal:4222", Sensitive: true}, + }, + } + uri, err := p.ConnectionString(envState, "prod") + if err != nil { + t.Fatalf("ConnectionString(env=prod) error: %v", err) + } + if uri != "nats://nats-prod.internal:4222" { + t.Errorf("ConnectionString(env=prod) = %q, want env-specific URI %q", + uri, "nats://nats-prod.internal:4222") + } + }) + + // ── 3b. connect / missing state ─────────────────────────────────────────── + t.Run("connect/missing-state", func(t *testing.T) { + _, err := p.ConnectionString(iac.State{}, "") + if err == nil { + t.Error("ConnectionString() with empty state: expected error, got nil") + } + }) + + // ── 4. probe ────────────────────────────────────────────────────────────── + t.Run("probe", func(t *testing.T) { + const uri = "nats://nats.internal:4222" + hc := p.Probe(uri) + + // URI must be echoed back regardless of reachability. + if hc.URI != uri { + t.Errorf("Probe().URI = %q, want %q", hc.URI, uri) + } + // Status must be a recognised value. + switch hc.Status { + case providers.HealthStatusHealthy, providers.HealthStatusDegraded, providers.HealthStatusUnreachable: + // all valid + default: + t.Errorf("Probe().Status = %q, want one of healthy/degraded/unreachable", hc.Status) + } + }) + + // ── 4a. probe / empty URI ───────────────────────────────────────────────── + t.Run("probe/empty-uri", func(t *testing.T) { + hc := p.Probe("") + if hc.Status != providers.HealthStatusUnreachable { + t.Errorf("Probe(\"\").Status = %q, want %q", hc.Status, providers.HealthStatusUnreachable) + } + if hc.Err == nil { + t.Error("Probe(\"\").Err should be non-nil for empty URI") + } + }) + + // ── Runtime phases 5–9: publish → consume → ack → drain → teardown ─────── + // + // These phases connect to a live NATS server (NATS_URL or localhost:4222) + // and verify end-to-end message flow through a JetStream stream. + // Shared mutable state is captured in the parent scope; each subtest nil-guards + // its dependencies so a partial failure produces a SKIP rather than a panic. + // nc is closed via the parent test's Cleanup so it outlives all subtests. + + const ( + conformanceStream = "CONFORMANCE_TEST" + conformanceSubject = "conformance.test" + conformanceConsumer = "conformance-consumer" + conformancePayload = "conformance-check" + ) + + var ( + nc *natsclient.Conn + js natsclient.JetStreamContext + sub *natsclient.Subscription + fetchedMsg *natsclient.Msg + ) + + // Register nc close on the parent test — nc must outlive all subtests. + t.Cleanup(func() { + if nc != nil { + nc.Close() + } + }) + + // ── 5. publish ──────────────────────────────────────────────────────────── + t.Run("publish", func(t *testing.T) { + var err error + nc, err = natsclient.Connect(natsServerURI()) + if err != nil { + t.Fatalf("nats.Connect(%q) error: %v — ensure a NATS server is running (NATS_URL overrides default)", natsServerURI(), err) + } + + js, err = nc.JetStream() + if err != nil { + t.Fatalf("nc.JetStream() error: %v", err) + } + + // Clean up any leftover stream from a previous interrupted run. + _ = js.DeleteStream(conformanceStream) + + _, err = js.AddStream(&natsclient.StreamConfig{ + Name: conformanceStream, + Subjects: []string{conformanceSubject}, + Retention: natsclient.LimitsPolicy, + Replicas: 1, + }) + if err != nil { + t.Fatalf("js.AddStream(%q) error: %v", conformanceStream, err) + } + + pubAck, err := js.Publish(conformanceSubject, []byte(conformancePayload)) + if err != nil { + t.Fatalf("js.Publish(%q) error: %v", conformanceSubject, err) + } + if pubAck.Stream != conformanceStream { + t.Errorf("PubAck.Stream = %q, want %q", pubAck.Stream, conformanceStream) + } + if pubAck.Sequence != 1 { + t.Errorf("PubAck.Sequence = %d, want 1 (first message in stream)", pubAck.Sequence) + } + }) + + // ── 6. consume ──────────────────────────────────────────────────────────── + t.Run("consume", func(t *testing.T) { + if js == nil { + t.Skip("skipping: publish phase did not establish a JetStream context") + } + var err error + sub, err = js.SubscribeSync(conformanceSubject, natsclient.Durable(conformanceConsumer)) + if err != nil { + t.Fatalf("js.SubscribeSync(%q) error: %v", conformanceSubject, err) + } + fetchedMsg, err = sub.NextMsg(2 * time.Second) + if err != nil { + t.Fatalf("sub.NextMsg() error: %v", err) + } + if string(fetchedMsg.Data) != conformancePayload { + t.Errorf("message payload = %q, want %q", string(fetchedMsg.Data), conformancePayload) + } + }) + + // ── 7. ack ──────────────────────────────────────────────────────────────── + t.Run("ack", func(t *testing.T) { + if fetchedMsg == nil { + t.Skip("skipping: consume phase did not produce a message") + } + if err := fetchedMsg.Ack(); err != nil { + t.Fatalf("msg.Ack() error: %v", err) + } + }) + + // ── 8. drain ────────────────────────────────────────────────────────────── + t.Run("drain", func(t *testing.T) { + if sub == nil { + t.Skip("skipping: consume phase did not produce a subscription") + } + if err := sub.Drain(); err != nil { + t.Fatalf("sub.Drain() error: %v", err) + } + }) + + // ── 9. teardown ─────────────────────────────────────────────────────────── + t.Run("teardown", func(t *testing.T) { + if js == nil { + t.Skip("skipping: publish phase did not establish a JetStream context") + } + if err := js.DeleteStream(conformanceStream); err != nil { + t.Fatalf("js.DeleteStream(%q) error: %v", conformanceStream, err) + } + // Confirm the stream is gone — StreamInfo should return an error. + _, err := js.StreamInfo(conformanceStream) + if err == nil { + t.Errorf("js.StreamInfo(%q) succeeded after deletion; stream was not torn down", conformanceStream) + } + }) +} + +// TestNATSConformance_StubTargets asserts that every not-yet-activated deploy +// target (ECS, EKS, Kubernetes, SelfHosted) returns a non-nil error from +// Resources() with a message that mentions "not implemented". No infrastructure +// is required — this test always runs. +func TestNATSConformance_StubTargets(t *testing.T) { + var p providers.Provider = natsprovider.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 1} + + cases := []struct { + target providers.DeployTarget + mention string // substring expected in the error message + }{ + {providers.TargetAWSECS, "aws.ecs"}, + {providers.TargetAWSEKS, "aws.eks"}, + {providers.TargetKubernetes, "kubernetes"}, + {providers.TargetSelfHosted, "self_hosted"}, + } + + for _, tc := range cases { + t.Run(string(tc.target), func(t *testing.T) { + res, err := p.Resources(cfg, tc.target) + if err == nil { + t.Fatalf("Resources(cfg, %q) returned nil error, want stub error", tc.target) + } + if res != nil { + t.Errorf("Resources(cfg, %q) returned non-nil resources with error: %v", tc.target, res) + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error for %q does not contain 'not implemented': %v", tc.target, err) + } + if !strings.Contains(err.Error(), tc.mention) { + t.Errorf("error for %q does not mention %q: %v", tc.target, tc.mention, err) + } + }) + } +} diff --git a/providers/nats/deploy_aws_ecs.go b/providers/nats/deploy_aws_ecs.go new file mode 100644 index 0000000..e53c9a0 --- /dev/null +++ b/providers/nats/deploy_aws_ecs.go @@ -0,0 +1,28 @@ +package nats + +import ( + "fmt" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// resourcesForAWSECS is the deploy-target stub for aws.ecs. +// +// Per BMW pilot manifest: NATS on AWS ECS is built into the plugin but not +// activated for the pilot — NATS × digitalocean.app_platform is the only +// active path. This stub ensures that config referencing +// deploy_target: aws.ecs fails fast with a clear, actionable error rather +// than falling through to a silent no-op or a confusing generic message. +// +// Full implementation (infra.ecs.task_definition + infra.ecs.service + +// infra.efs.file_system for JetStream persistence) lands when a downstream +// consumer activates this target. +func resourcesForAWSECS(_ *eventbusv1.ClusterConfig) ([]iac.Resource, error) { + return nil, fmt.Errorf( + "nats: deploy target %q is not implemented for the pilot; "+ + "only %q is active — activate aws.ecs by implementing deploy_aws_ecs.go", + providers.TargetAWSECS, providers.TargetDigitalOceanApp, + ) +} diff --git a/providers/nats/deploy_aws_eks.go b/providers/nats/deploy_aws_eks.go new file mode 100644 index 0000000..a0d7016 --- /dev/null +++ b/providers/nats/deploy_aws_eks.go @@ -0,0 +1,26 @@ +package nats + +import ( + "fmt" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// resourcesForAWSEKS is the deploy-target stub for aws.eks. +// +// Per BMW pilot manifest: NATS on AWS EKS is built into the plugin but not +// activated for the pilot. This stub ensures config referencing +// deploy_target: aws.eks fails fast with a clear, actionable error. +// +// Full implementation (infra.k8s.statefulset + infra.k8s.service + +// infra.k8s.persistent_volume_claim via EBS for JetStream persistence) +// lands when a downstream consumer activates this target. +func resourcesForAWSEKS(_ *eventbusv1.ClusterConfig) ([]iac.Resource, error) { + return nil, fmt.Errorf( + "nats: deploy target %q is not implemented for the pilot; "+ + "only %q is active — activate aws.eks by implementing deploy_aws_eks.go", + providers.TargetAWSEKS, providers.TargetDigitalOceanApp, + ) +} diff --git a/providers/nats/deploy_digitalocean_app.go b/providers/nats/deploy_digitalocean_app.go new file mode 100644 index 0000000..6982a96 --- /dev/null +++ b/providers/nats/deploy_digitalocean_app.go @@ -0,0 +1,173 @@ +package nats + +import ( + "fmt" + "strings" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// natsClientPort is the standard NATS client connection port. +const natsClientPort = "4222" + +// natsMonitorPort is the NATS HTTP monitoring endpoint port. +const natsMonitorPort = "8222" + +// natsClusterPort is the NATS cluster routing port (inter-node, multi-replica). +const natsClusterPort = "6222" + +// natsImage is the canonical Docker Hub NATS server image prefix. +const natsImage = "docker.io/library/nats" + +// jetStreamStorageDir is the in-container path used for JetStream file storage. +const jetStreamStorageDir = "/data" + +// natsJetStreamStorageName is the canonical name for the JetStream infra.storage +// resource emitted alongside infra.container_service when JetStream is enabled. +// The container service references this name via the storage_ref property so the +// workflow engine can inject Spaces credentials as env vars at provisioning time. +const natsJetStreamStorageName = "nats-jetstream" + +// resourcesForDOApp emits the IaC resource declarations required to run a NATS +// server (optionally with JetStream) on DigitalOcean App Platform. +// +// Emitted resources: +// - infra.container_service — the NATS server process (always). +// - infra.storage — a DigitalOcean Spaces bucket for JetStream +// persistence (emitted only when JetStream is enabled). The container_service +// references the bucket by name via its storage_ref property; the workflow +// engine uses this edge to inject Spaces credentials as env vars so the +// NATS server can access the bucket at runtime. +// +// The infra.container_service Properties are consumed by workflow-plugin-digitalocean +// (infra.container_service resource driver). String-encoded values follow the +// canonical key schema expected by the driver's buildAppSpec helper: +// +// image – Docker Hub image reference including tag. +// instance_count – number of replicas (string-encoded int32). +// run_command – NATS server flags (JetStream, storage dir, monitoring, cluster). +// internal_ports – comma-separated list of exposed container ports. +// storage_ref – name of the infra.storage resource to link (JetStream only). +// +// The infra.storage Properties are consumed by workflow-plugin-digitalocean +// (SpacesDriver). Relevant keys: +// +// storage_size_bytes – optional maximum storage hint (from JetStreamConfig). +// +// Returns an error if cfg.Version is "latest" (unpinned versions are rejected +// to ensure reproducible deployments). +func resourcesForDOApp(cfg *eventbusv1.ClusterConfig) ([]iac.Resource, error) { + version := cfg.GetVersion() + if version == "" { + version = defaultVersion + } + if strings.EqualFold(version, "latest") { + return nil, fmt.Errorf("nats: Version %q is not allowed; specify a pinned version tag (e.g. %q)", version, defaultVersion) + } + + replicas := cfg.GetReplicas() + if replicas <= 0 { + replicas = 1 + } + + image := fmt.Sprintf("%s:%s", natsImage, version) + runCmd := buildRunCommand(cfg) + ports := buildInternalPorts() + + svc := iac.Resource{ + Kind: "infra.container_service", + Name: "nats", + Properties: map[string]string{ + "image": image, + "instance_count": fmt.Sprintf("%d", replicas), + "run_command": runCmd, + "internal_ports": ports, + }, + Labels: map[string]string{ + "provider": "nats", + "deploy_target": string(providers.TargetDigitalOceanApp), + }, + } + + resources := []iac.Resource{svc} + + // Emit a DigitalOcean Spaces bucket as the JetStream backing store when + // JetStream is enabled. The bucket is realised by workflow-plugin-digitalocean's + // SpacesDriver (infra.storage resource kind). The container_service carries a + // storage_ref property pointing at the bucket name so the workflow engine can + // wire Spaces credentials into the container's env at provisioning time. + js := cfg.GetJetstream() + if js != nil && js.GetEnabled() { + // Link the container service to the storage resource explicitly. + resources[0].Properties["storage_ref"] = natsJetStreamStorageName + + vol := buildJetStreamVolume(js) + resources = append(resources, vol) + } + + return resources, nil +} + +// buildJetStreamVolume constructs the infra.storage (DO Spaces) resource that +// backs JetStream persistence for the DO App Platform deploy target. +func buildJetStreamVolume(js *eventbusv1.JetStreamConfig) iac.Resource { + props := map[string]string{} + if js.GetMaxStorageBytes() > 0 { + props["storage_size_bytes"] = fmt.Sprintf("%d", js.GetMaxStorageBytes()) + } + + return iac.Resource{ + Kind: "infra.storage", + Name: natsJetStreamStorageName, + Properties: props, + Labels: map[string]string{ + "provider": "nats", + "purpose": "jetstream", + }, + } +} + +// buildRunCommand constructs the NATS server command-line flags from ClusterConfig. +// +// - HTTP monitoring is always enabled on natsMonitorPort. +// - When JetStream is enabled, -js and -sd flags are added together with +// optional storage and memory limits. +// - Cluster routing (--cluster) is always enabled so replicas can be added +// without a run_command change (zero-config scale-up). +func buildRunCommand(cfg *eventbusv1.ClusterConfig) string { + var flags []string + + // Enable HTTP monitoring on the standard monitoring port. + flags = append(flags, fmt.Sprintf("-m %s", natsMonitorPort)) + + js := cfg.GetJetstream() + if js != nil && js.GetEnabled() { + flags = append(flags, "-js") + flags = append(flags, fmt.Sprintf("-sd %s", jetStreamStorageDir)) + + if js.GetMaxStorageBytes() > 0 { + flags = append(flags, fmt.Sprintf("-ms %d", js.GetMaxStorageBytes())) + } + if js.GetMaxMemoryBytes() > 0 { + flags = append(flags, fmt.Sprintf("-mm %d", js.GetMaxMemoryBytes())) + } + } + + // Always enable cluster routing so instances can join a cluster without a + // run_command change (matching the always-exposed port 6222). + flags = append(flags, fmt.Sprintf("--cluster nats://0.0.0.0:%s", natsClusterPort)) + + return strings.Join(flags, " ") +} + +// buildInternalPorts returns the comma-separated list of container ports to +// expose as internal (non-public) App Platform ports. +// +// - 4222 — NATS client connections (always required). +// - 8222 — HTTP monitoring endpoint (health checks, JetStream stats). +// - 6222 — NATS cluster routing (always exposed; matches always-on --cluster flag). +func buildInternalPorts() string { + return strings.Join([]string{natsClientPort, natsMonitorPort, natsClusterPort}, ",") +} diff --git a/providers/nats/deploy_digitalocean_app_test.go b/providers/nats/deploy_digitalocean_app_test.go new file mode 100644 index 0000000..95b6b0c --- /dev/null +++ b/providers/nats/deploy_digitalocean_app_test.go @@ -0,0 +1,396 @@ +package nats_test + +import ( + "strings" + "testing" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/nats" +) + +// TestDOApp_EmitsContainerService asserts that Resources for TargetDigitalOceanApp +// emits at least one infra.container_service resource. +func TestDOApp_EmitsContainerService(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{ + Provider: "nats", + DeployTarget: string(providers.TargetDigitalOceanApp), + Version: "2.10", + Replicas: 2, + } + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + if len(resources) == 0 { + t.Fatal("Resources() returned empty slice") + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Errorf("no infra.container_service resource emitted; kinds: %v", resourceKindList(resources)) + } +} + +// TestDOApp_NATSImage asserts the NATS Docker image includes the configured version. +func TestDOApp_NATSImage(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 1} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + img := svc.Properties["image"] + if img == "" { + t.Error("image property is empty") + } + if !strings.Contains(img, "2.10") { + t.Errorf("image %q does not contain version 2.10", img) + } +} + +// TestDOApp_DefaultVersion asserts that an empty version defaults to a non-empty image tag. +func TestDOApp_DefaultVersion(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Replicas: 1} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + if svc.Properties["image"] == "" { + t.Error("image property is empty for default version") + } +} + +// TestDOApp_Replicas asserts that ClusterConfig.Replicas maps to instance_count. +func TestDOApp_Replicas(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 3} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + if svc.Properties["instance_count"] != "3" { + t.Errorf("instance_count = %q, want %q", svc.Properties["instance_count"], "3") + } +} + +// TestDOApp_JetStreamEnabled asserts that enabling JetStream adds -js to run_command. +func TestDOApp_JetStreamEnabled(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{ + Version: "2.10", + Replicas: 1, + Jetstream: &eventbusv1.JetStreamConfig{ + Enabled: true, + MaxStorageBytes: 10737418240, + }, + } + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + runCmd := svc.Properties["run_command"] + if !strings.Contains(runCmd, "-js") { + t.Errorf("run_command %q does not contain JetStream flag -js", runCmd) + } +} + +// TestDOApp_JetStreamDisabled asserts -js is absent when JetStream is not enabled. +func TestDOApp_JetStreamDisabled(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 1} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + if strings.Contains(svc.Properties["run_command"], "-js") { + t.Errorf("run_command %q should not contain -js when JetStream is disabled", svc.Properties["run_command"]) + } +} + +// TestDOApp_ClientPort asserts NATS client port 4222 appears in internal_ports. +func TestDOApp_ClientPort(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 1} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + if !strings.Contains(svc.Properties["internal_ports"], "4222") { + t.Errorf("internal_ports %q does not contain NATS client port 4222", svc.Properties["internal_ports"]) + } +} + +// TestDOApp_MonitorPort asserts NATS monitoring port 8222 appears in internal_ports. +func TestDOApp_MonitorPort(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 1} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + if !strings.Contains(svc.Properties["internal_ports"], "8222") { + t.Errorf("internal_ports %q does not contain monitoring port 8222", svc.Properties["internal_ports"]) + } +} + +// TestDOApp_ClusterPort asserts NATS cluster routing port 6222 appears in +// internal_ports unconditionally (regardless of replica count). +func TestDOApp_ClusterPort(t *testing.T) { + tests := []struct { + name string + replicas int32 + }{ + {"single-replica", 1}, + {"multi-replica", 3}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: tc.replicas} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + if !strings.Contains(svc.Properties["internal_ports"], "6222") { + t.Errorf("internal_ports %q does not contain cluster routing port 6222", svc.Properties["internal_ports"]) + } + }) + } +} + +// TestDOApp_Labels asserts provider and deploy_target labels are set correctly +// on the container service, regardless of whether DeployTarget is set in cfg. +func TestDOApp_Labels(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 1} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + if svc.Labels["provider"] != "nats" { + t.Errorf("label provider = %q, want %q", svc.Labels["provider"], "nats") + } + if svc.Labels["deploy_target"] != string(providers.TargetDigitalOceanApp) { + t.Errorf("label deploy_target = %q, want %q", svc.Labels["deploy_target"], string(providers.TargetDigitalOceanApp)) + } +} + +// TestDOApp_JetStreamVolume asserts that enabling JetStream emits an infra.storage +// resource (DigitalOcean Spaces bucket) as the JetStream backing store. +func TestDOApp_JetStreamVolume(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{ + Version: "2.10", + Replicas: 1, + Jetstream: &eventbusv1.JetStreamConfig{ + Enabled: true, + MaxStorageBytes: 10737418240, + }, + } + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + vol := findResourceByKind(resources, "infra.storage") + if vol == nil { + t.Fatalf("no infra.storage resource emitted when JetStream is enabled; kinds: %v", resourceKindList(resources)) + } + if vol.Labels["purpose"] != "jetstream" { + t.Errorf("infra.storage label purpose = %q, want %q", vol.Labels["purpose"], "jetstream") + } +} + +// TestDOApp_JetStreamVolumeAbsent asserts no infra.storage is emitted when +// JetStream is not enabled. +func TestDOApp_JetStreamVolumeAbsent(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 1} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + if vol := findResourceByKind(resources, "infra.storage"); vol != nil { + t.Error("infra.storage resource should not be emitted when JetStream is disabled") + } +} + +// TestDOApp_JetStreamVolumeStorageSizeProperty asserts the infra.storage resource +// carries the storage_size_bytes property when MaxStorageBytes is set. +func TestDOApp_JetStreamVolumeStorageSizeProperty(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{ + Version: "2.10", + Replicas: 1, + Jetstream: &eventbusv1.JetStreamConfig{ + Enabled: true, + MaxStorageBytes: 53687091200, // 50 GB + }, + } + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + vol := findResourceByKind(resources, "infra.storage") + if vol == nil { + t.Fatal("no infra.storage resource emitted") + } + if vol.Properties["storage_size_bytes"] != "53687091200" { + t.Errorf("storage_size_bytes = %q, want %q", vol.Properties["storage_size_bytes"], "53687091200") + } +} + +// TestDOApp_ClusterFlagAlwaysPresent asserts --cluster appears in run_command +// for both single-replica and multi-replica deployments (always-on for zero-config +// scale-up, matching the always-exposed port 6222). +func TestDOApp_ClusterFlagAlwaysPresent(t *testing.T) { + tests := []struct { + name string + replicas int32 + }{ + {"single-replica", 1}, + {"multi-replica", 3}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: tc.replicas} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + if !strings.Contains(svc.Properties["run_command"], "--cluster") { + t.Errorf("run_command %q does not contain --cluster for %s deployment", svc.Properties["run_command"], tc.name) + } + }) + } +} + +// TestDOApp_LatestVersionRejected asserts that Version "latest" (and case +// variants) returns a non-nil error — unpinned tags are not allowed. +func TestDOApp_LatestVersionRejected(t *testing.T) { + variants := []string{"latest", "LATEST", "Latest"} + for _, v := range variants { + t.Run(v, func(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: v, Replicas: 1} + _, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err == nil { + t.Errorf("expected error for Version %q, got nil", v) + } + }) + } +} + +// TestDOApp_StorageRefLinksContainerToVolume asserts that when JetStream is +// enabled the container_service carries a storage_ref property whose value +// matches the name of the emitted infra.storage resource. +func TestDOApp_StorageRefLinksContainerToVolume(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{ + Version: "2.10", + Replicas: 1, + Jetstream: &eventbusv1.JetStreamConfig{ + Enabled: true, + MaxStorageBytes: 10737418240, + }, + } + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + vol := findResourceByKind(resources, "infra.storage") + if vol == nil { + t.Fatal("no infra.storage resource emitted") + } + storageRef := svc.Properties["storage_ref"] + if storageRef == "" { + t.Error("infra.container_service missing storage_ref property when JetStream is enabled") + } + if storageRef != vol.Name { + t.Errorf("storage_ref %q does not match infra.storage name %q", storageRef, vol.Name) + } +} + +// TestDOApp_StorageRefAbsentWithoutJetStream asserts no storage_ref appears +// on the container service when JetStream is not enabled. +func TestDOApp_StorageRefAbsentWithoutJetStream(t *testing.T) { + p := nats.New() + cfg := &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 1} + resources, err := p.Resources(cfg, providers.TargetDigitalOceanApp) + if err != nil { + t.Fatalf("Resources() error: %v", err) + } + svc := findResourceByKind(resources, "infra.container_service") + if svc == nil { + t.Fatal("no infra.container_service resource emitted") + } + if ref := svc.Properties["storage_ref"]; ref != "" { + t.Errorf("storage_ref should be absent when JetStream is disabled, got %q", ref) + } +} + +// ── helpers ───────────────────────────────────────────────────────────────── + +func findResourceByKind(resources []iac.Resource, kind string) *iac.Resource { + for i := range resources { + if resources[i].Kind == kind { + return &resources[i] + } + } + return nil +} + +func resourceKindList(resources []iac.Resource) []string { + kinds := make([]string, len(resources)) + for i, r := range resources { + kinds[i] = r.Kind + } + return kinds +} diff --git a/providers/nats/deploy_kubernetes.go b/providers/nats/deploy_kubernetes.go new file mode 100644 index 0000000..1c092ec --- /dev/null +++ b/providers/nats/deploy_kubernetes.go @@ -0,0 +1,26 @@ +package nats + +import ( + "fmt" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// resourcesForKubernetes is the deploy-target stub for kubernetes. +// +// Per BMW pilot manifest: NATS on generic Kubernetes is built into the plugin +// but not activated for the pilot. This stub ensures config referencing +// deploy_target: kubernetes fails fast with a clear, actionable error. +// +// Full implementation (infra.k8s.statefulset + infra.k8s.service + +// infra.k8s.persistent_volume_claim for JetStream persistence, with optional +// NATS Operator CRDs) lands when a downstream consumer activates this target. +func resourcesForKubernetes(_ *eventbusv1.ClusterConfig) ([]iac.Resource, error) { + return nil, fmt.Errorf( + "nats: deploy target %q is not implemented for the pilot; "+ + "only %q is active — activate kubernetes by implementing deploy_kubernetes.go", + providers.TargetKubernetes, providers.TargetDigitalOceanApp, + ) +} diff --git a/providers/nats/deploy_stubs_test.go b/providers/nats/deploy_stubs_test.go new file mode 100644 index 0000000..c58511a --- /dev/null +++ b/providers/nats/deploy_stubs_test.go @@ -0,0 +1,57 @@ +package nats_test + +import ( + "strings" + "testing" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/nats" +) + +// minimalCfg returns a minimal ClusterConfig suitable for stub target tests. +func minimalCfg() *eventbusv1.ClusterConfig { + return &eventbusv1.ClusterConfig{Version: "2.10", Replicas: 1} +} + +// TestNATSStub_ErrorBehavior asserts that every not-yet-activated deploy target +// (ECS, EKS, Kubernetes) and every default-arm target (SelfHosted) satisfy the +// stub contract: +// +// 1. Resources() returns a non-nil error. +// 2. The resource slice is nil (not empty). +// 3. The error contains "not implemented". +// 4. The error mentions the target name so callers can diagnose config mistakes. +func TestNATSStub_ErrorBehavior(t *testing.T) { + cases := []struct { + target providers.DeployTarget + mention string // expected substring in error message + }{ + // Explicit stub files (Task 20). + {providers.TargetAWSECS, "aws.ecs"}, + {providers.TargetAWSEKS, "aws.eks"}, + {providers.TargetKubernetes, "kubernetes"}, + // Default arm — no dedicated stub file; falls through to the default branch. + {providers.TargetSelfHosted, "self_hosted"}, + } + + p := nats.New() + for _, tc := range cases { + t.Run(string(tc.target), func(t *testing.T) { + res, err := p.Resources(minimalCfg(), tc.target) + + if err == nil { + t.Fatalf("expected error for target %q, got nil", tc.target) + } + if res != nil { + t.Errorf("expected nil resource slice for target %q, got %v", tc.target, res) + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error for %q does not contain 'not implemented': %v", tc.target, err) + } + if !strings.Contains(err.Error(), tc.mention) { + t.Errorf("error for %q does not mention %q: %v", tc.target, tc.mention, err) + } + }) + } +} diff --git a/providers/nats/nats.go b/providers/nats/nats.go new file mode 100644 index 0000000..66845d1 --- /dev/null +++ b/providers/nats/nats.go @@ -0,0 +1,187 @@ +// Package nats provides the NATS event-bus Provider implementation. +// It emits typed IaC resource declarations for supported deploy targets. +// +// Activated targets for the BMW pilot: +// - digitalocean.app_platform (TargetDigitalOceanApp) — fully implemented. +// +// Stub targets (not yet activated, return ErrNotImplemented-style errors): +// - aws.ecs — deploy_aws_ecs.go +// - aws.eks — deploy_aws_eks.go +// - kubernetes — deploy_kubernetes.go +package nats + +import ( + "errors" + "fmt" + "strings" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// defaultVersion is the NATS server version used when ClusterConfig.Version is empty. +const defaultVersion = "2.10" + +// provider is the NATS Provider implementation. +type provider struct{} + +// New returns a fully-initialised NATS Provider. +func New() providers.Provider { + return &provider{} +} + +// Name implements providers.Provider. +func (p *provider) Name() string { return "nats" } + +// Resources implements providers.Provider. +// It dispatches to the deploy-target–specific builder and returns the list of +// IaC resource declarations required to provision a NATS cluster on the target. +func (p *provider) Resources(cfg *eventbusv1.ClusterConfig, target providers.DeployTarget) ([]iac.Resource, error) { + if err := providers.ValidateProviderTarget("nats", target); err != nil { + return nil, err + } + switch target { + case providers.TargetDigitalOceanApp: + return resourcesForDOApp(cfg) + case providers.TargetAWSECS: + return resourcesForAWSECS(cfg) + case providers.TargetAWSEKS: + return resourcesForAWSEKS(cfg) + case providers.TargetKubernetes: + return resourcesForKubernetes(cfg) + default: + // TargetSelfHosted and any future recognised-but-unimplemented targets. + return nil, fmt.Errorf( + "nats: deploy target %q is not implemented for the pilot; "+ + "only %q is active — add a deploy_%s.go stub to activate this target", + target, providers.TargetDigitalOceanApp, + strings.ReplaceAll(string(target), ".", "_"), + ) + } +} + +// ConnectionString implements providers.Provider. +// It derives the broker connection URI from provisioned state. +// +// Lookup order: +// 1. "uri." — environment-specific override (e.g. "uri.prod"). +// 2. "uri" — base URI set by the IaC engine after provisioning. +// +// For DigitalOcean App Platform the value is typically +// "nats://nats.internal:4222" (DO's internal service DNS). +// Returns an error when neither output is present or non-empty. +func (p *provider) ConnectionString(state iac.State, env string) (string, error) { + // Prefer env-scoped key (e.g. "uri.prod") when present and non-empty. + if env != "" { + if uri, ok := state.Output("uri." + env); ok && uri != "" { + return uri, nil + } + } + uri, ok := state.Output("uri") + if !ok || uri == "" { + return "", errors.New("nats: ConnectionString: 'uri' output not found in state; ensure the cluster has been provisioned") + } + return uri, nil +} + +// retentionPolicyString converts a RetentionPolicy proto enum to the +// NATS-native lowercase string value expected by the JetStream API. +func retentionPolicyString(rp eventbusv1.RetentionPolicy) string { + switch rp { + case eventbusv1.RetentionPolicy_RETENTION_POLICY_INTEREST: + return "interest" + case eventbusv1.RetentionPolicy_RETENTION_POLICY_WORKQUEUE: + return "workqueue" + default: // UNSPECIFIED and LIMITS both map to the NATS default + return "limits" + } +} + +// StreamResources implements providers.Provider. +// It returns one nats.stream_create IaC resource for each StreamConfig. +// These resources are consumed at provisioning time to declare JetStream streams +// against the already-provisioned NATS cluster. +// +// Validation rules (fail-fast, return error): +// - stream name must be non-empty +// - at least one subject filter must be provided +// +// The "server_uri" property is populated from state when available so the +// downstream realiser knows which server to configure the stream on. +// Nil entries in streams are silently skipped. +func (p *provider) StreamResources(streams []*eventbusv1.StreamConfig, state iac.State) ([]iac.Resource, error) { + if len(streams) == 0 { + return nil, nil + } + + // Best-effort: derive server URI from state for the stream resources. + serverURI, _ := state.Output("uri") + + resources := make([]iac.Resource, 0, len(streams)) + for i, s := range streams { + if s == nil { + continue + } + if s.GetName() == "" { + return nil, fmt.Errorf("nats: StreamResources: stream at index %d has an empty name", i) + } + if len(s.GetSubjects()) == 0 { + return nil, fmt.Errorf("nats: StreamResources: stream %q has no subjects; at least one subject filter is required", s.GetName()) + } + numReplicas := s.GetNumReplicas() + if numReplicas <= 0 { + numReplicas = 1 + } + props := map[string]string{ + "name": s.GetName(), + "subjects": strings.Join(s.GetSubjects(), ","), + "retention_policy": retentionPolicyString(s.GetRetentionPolicy()), + "num_replicas": fmt.Sprintf("%d", numReplicas), + "max_bytes": fmt.Sprintf("%d", s.GetMaxBytes()), + } + if serverURI != "" { + props["server_uri"] = serverURI + } + if d := s.GetMaxAge(); d != nil && d.IsValid() { + props["max_age"] = d.AsDuration().String() + } + if d := s.GetAckWait(); d != nil && d.IsValid() { + props["ack_wait"] = d.AsDuration().String() + } + resources = append(resources, iac.Resource{ + Kind: "nats.stream_create", + Name: s.GetName(), + Properties: props, + Labels: map[string]string{ + "provider": "nats", + }, + }) + } + return resources, nil +} + +// Probe implements providers.Provider. +// It attempts a lightweight TCP connection to the NATS monitoring endpoint to +// determine cluster health. This implementation is network-free: the eventbus +// plugin does not import a NATS client SDK, so Probe currently returns +// HealthStatusUnreachable for any non-empty URI as a conservative default. +// +// A real health probe (HTTP GET /healthz on port 8222) is added once the +// NATS Go client dependency is approved via the dependency-review gate. +func (p *provider) Probe(uri string) providers.HealthCheck { + if uri == "" { + return providers.HealthCheck{ + URI: uri, + Status: providers.HealthStatusUnreachable, + Err: errors.New("nats: Probe: URI is empty"), + } + } + // Conservative stub — no network I/O. Full probe added when NATS client + // dependency is included. + return providers.HealthCheck{ + URI: uri, + Status: providers.HealthStatusUnreachable, + Err: errors.New("nats: Probe: network probe not yet implemented; verify cluster status via the DigitalOcean console"), + } +} diff --git a/providers/nats/nats_test.go b/providers/nats/nats_test.go new file mode 100644 index 0000000..a1c298a --- /dev/null +++ b/providers/nats/nats_test.go @@ -0,0 +1,309 @@ +package nats_test + +import ( + "strings" + "testing" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/nats" + "google.golang.org/protobuf/types/known/durationpb" +) + +// TestNATSProvider_Name asserts the provider reports the correct identifier. +func TestNATSProvider_Name(t *testing.T) { + p := nats.New() + if got := p.Name(); got != "nats" { + t.Errorf("Name() = %q, want %q", got, "nats") + } +} + +// TestNATSProvider_UnsupportedTarget asserts that Resources returns an error +// for a target not supported by the NATS provider. +func TestNATSProvider_UnsupportedTarget(t *testing.T) { + p := nats.New() + _, err := p.Resources(&eventbusv1.ClusterConfig{Version: "2.10"}, providers.TargetAWSKinesis) + if err == nil { + t.Fatal("expected error for unsupported target aws.kinesis, got nil") + } +} + +// ── ConnectionString ───────────────────────────────────────────────────────── + +// TestNATSProvider_ConnectionString_ErrorsWithoutURI asserts ConnectionString +// returns an error when neither a uri nor an env-specific uri is in state. +func TestNATSProvider_ConnectionString_ErrorsWithoutURI(t *testing.T) { + p := nats.New() + _, err := p.ConnectionString(iac.State{Outputs: map[string]iac.Output{}}, "prod") + if err == nil { + t.Fatal("expected error when uri is absent from state, got nil") + } +} + +// TestNATSProvider_ConnectionString_DOAppPlatformFormat asserts ConnectionString +// returns the DO App Platform internal DNS URI for a provisioned NATS service. +func TestNATSProvider_ConnectionString_DOAppPlatformFormat(t *testing.T) { + p := nats.New() + state := iac.State{Outputs: map[string]iac.Output{ + "uri": {Value: "nats://nats.internal:4222", Sensitive: true}, + }} + got, err := p.ConnectionString(state, "staging") + if err != nil { + t.Fatalf("ConnectionString() error: %v", err) + } + if got != "nats://nats.internal:4222" { + t.Errorf("ConnectionString() = %q, want %q", got, "nats://nats.internal:4222") + } +} + +// TestNATSProvider_ConnectionString_EnvOverride asserts that an env-specific +// output (uri.) takes precedence over the base "uri" key. +func TestNATSProvider_ConnectionString_EnvOverride(t *testing.T) { + p := nats.New() + state := iac.State{Outputs: map[string]iac.Output{ + "uri": {Value: "nats://nats.internal:4222", Sensitive: true}, + "uri.prod": {Value: "nats://nats-prod.internal:4222", Sensitive: true}, + }} + got, err := p.ConnectionString(state, "prod") + if err != nil { + t.Fatalf("ConnectionString() error: %v", err) + } + if got != "nats://nats-prod.internal:4222" { + t.Errorf("ConnectionString(env=prod) = %q, want env-specific URI %q", got, "nats://nats-prod.internal:4222") + } +} + +// TestNATSProvider_ConnectionString_FallsBackToBaseURI asserts that when no +// env-specific output exists, the base "uri" output is returned. +func TestNATSProvider_ConnectionString_FallsBackToBaseURI(t *testing.T) { + p := nats.New() + state := iac.State{Outputs: map[string]iac.Output{ + "uri": {Value: "nats://nats.internal:4222", Sensitive: true}, + }} + got, err := p.ConnectionString(state, "staging") + if err != nil { + t.Fatalf("ConnectionString() error: %v", err) + } + if got != "nats://nats.internal:4222" { + t.Errorf("ConnectionString(env=staging) = %q, want base URI", got) + } +} + +// ── StreamResources ────────────────────────────────────────────────────────── + +// TestNATSProvider_StreamResources_NilStreamList asserts StreamResources with +// a nil slice returns an empty list without error. +func TestNATSProvider_StreamResources_NilStreamList(t *testing.T) { + p := nats.New() + res, err := p.StreamResources(nil, iac.State{}) + if err != nil { + t.Fatalf("StreamResources(nil) error: %v", err) + } + if len(res) != 0 { + t.Errorf("StreamResources(nil) returned %d resources, want 0", len(res)) + } +} + +// TestNATSProvider_StreamResources_EmitsStreamCreate asserts that each +// StreamConfig produces one nats.stream_create resource. +func TestNATSProvider_StreamResources_EmitsStreamCreate(t *testing.T) { + p := nats.New() + streams := []*eventbusv1.StreamConfig{ + {Name: "BMW_FULFILLMENT", Subjects: []string{"fulfillment.>"}}, + } + res, err := p.StreamResources(streams, iac.State{}) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + if len(res) != 1 { + t.Fatalf("StreamResources() returned %d resources, want 1", len(res)) + } + if res[0].Kind != "nats.stream_create" { + t.Errorf("resource Kind = %q, want %q", res[0].Kind, "nats.stream_create") + } +} + +// TestNATSProvider_StreamResources_ResourceName asserts the resource Name +// matches the StreamConfig name. +func TestNATSProvider_StreamResources_ResourceName(t *testing.T) { + p := nats.New() + streams := []*eventbusv1.StreamConfig{{Name: "BMW_FULFILLMENT", Subjects: []string{"fulfillment.>"}}} + res, err := p.StreamResources(streams, iac.State{}) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + if res[0].Name != "BMW_FULFILLMENT" { + t.Errorf("resource Name = %q, want %q", res[0].Name, "BMW_FULFILLMENT") + } +} + +// TestNATSProvider_StreamResources_Subjects asserts subjects are captured in +// the resource properties. +func TestNATSProvider_StreamResources_Subjects(t *testing.T) { + p := nats.New() + streams := []*eventbusv1.StreamConfig{ + {Name: "S", Subjects: []string{"fulfillment.>", "order.>"}}, + } + res, err := p.StreamResources(streams, iac.State{}) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + subjects := res[0].Properties["subjects"] + if !strings.Contains(subjects, "fulfillment.>") || !strings.Contains(subjects, "order.>") { + t.Errorf("subjects property %q missing expected subjects", subjects) + } +} + +// TestNATSProvider_StreamResources_RetentionPolicy asserts the retention_policy +// property is emitted as the NATS-native lowercase value, not the proto enum name. +func TestNATSProvider_StreamResources_RetentionPolicy(t *testing.T) { + cases := []struct { + rp eventbusv1.RetentionPolicy + want string + }{ + {eventbusv1.RetentionPolicy_RETENTION_POLICY_WORKQUEUE, "workqueue"}, + {eventbusv1.RetentionPolicy_RETENTION_POLICY_INTEREST, "interest"}, + {eventbusv1.RetentionPolicy_RETENTION_POLICY_LIMITS, "limits"}, + {eventbusv1.RetentionPolicy_RETENTION_POLICY_UNSPECIFIED, "limits"}, // default → limits + } + p := nats.New() + for _, tc := range cases { + t.Run(tc.want, func(t *testing.T) { + streams := []*eventbusv1.StreamConfig{ + {Name: "S", Subjects: []string{"s.>"}, RetentionPolicy: tc.rp}, + } + res, err := p.StreamResources(streams, iac.State{}) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + if got := res[0].Properties["retention_policy"]; got != tc.want { + t.Errorf("retention_policy = %q, want NATS-native %q", got, tc.want) + } + }) + } +} + +// TestNATSProvider_StreamResources_EmptyNameErrors asserts StreamResources returns +// an error when a StreamConfig has an empty name. +func TestNATSProvider_StreamResources_EmptyNameErrors(t *testing.T) { + p := nats.New() + streams := []*eventbusv1.StreamConfig{{Name: "", Subjects: []string{"s.>"}}} + _, err := p.StreamResources(streams, iac.State{}) + if err == nil { + t.Fatal("StreamResources() with empty name: expected error, got nil") + } +} + +// TestNATSProvider_StreamResources_EmptySubjectsErrors asserts StreamResources +// returns an error when a StreamConfig has no subjects. +func TestNATSProvider_StreamResources_EmptySubjectsErrors(t *testing.T) { + p := nats.New() + streams := []*eventbusv1.StreamConfig{{Name: "S", Subjects: nil}} + _, err := p.StreamResources(streams, iac.State{}) + if err == nil { + t.Fatal("StreamResources() with nil subjects: expected error, got nil") + } +} + +// TestNATSProvider_StreamResources_DefaultNumReplicas asserts that an unset +// NumReplicas (zero value) is emitted as "1", not "0". +func TestNATSProvider_StreamResources_DefaultNumReplicas(t *testing.T) { + p := nats.New() + streams := []*eventbusv1.StreamConfig{{Name: "S", Subjects: []string{"s.>"}}} + res, err := p.StreamResources(streams, iac.State{}) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + if got := res[0].Properties["num_replicas"]; got != "1" { + t.Errorf("num_replicas = %q, want %q (default)", got, "1") + } +} + +// TestNATSProvider_StreamResources_MultipleStreams asserts one resource is +// emitted per StreamConfig. +func TestNATSProvider_StreamResources_MultipleStreams(t *testing.T) { + p := nats.New() + streams := []*eventbusv1.StreamConfig{ + {Name: "A", Subjects: []string{"a.>"}}, + {Name: "B", Subjects: []string{"b.>"}}, + {Name: "C", Subjects: []string{"c.>"}}, + } + res, err := p.StreamResources(streams, iac.State{}) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + if len(res) != 3 { + t.Errorf("StreamResources() returned %d resources, want 3", len(res)) + } +} + +// TestNATSProvider_StreamResources_ServerURIFromState asserts that when state +// contains a "uri" output, each stream resource carries a server_uri property. +func TestNATSProvider_StreamResources_ServerURIFromState(t *testing.T) { + p := nats.New() + state := iac.State{Outputs: map[string]iac.Output{ + "uri": {Value: "nats://nats.internal:4222", Sensitive: true}, + }} + streams := []*eventbusv1.StreamConfig{{Name: "S", Subjects: []string{"s.>"}}} + res, err := p.StreamResources(streams, state) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + if res[0].Properties["server_uri"] != "nats://nats.internal:4222" { + t.Errorf("server_uri = %q, want %q", res[0].Properties["server_uri"], "nats://nats.internal:4222") + } +} + +// TestNATSProvider_StreamResources_MaxAge asserts max_age duration is stored +// when set on the StreamConfig. +func TestNATSProvider_StreamResources_MaxAge(t *testing.T) { + p := nats.New() + streams := []*eventbusv1.StreamConfig{ + { + Name: "S", + Subjects: []string{"s.>"}, + MaxAge: durationpb.New(168 * 60 * 60 * 1e9), // 168h in nanoseconds + }, + } + res, err := p.StreamResources(streams, iac.State{}) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + if res[0].Properties["max_age"] == "" { + t.Error("max_age property is empty when MaxAge is set") + } +} + +// TestNATSProvider_StreamResources_NilSkipped asserts nil entries in the +// stream slice are skipped without error. +func TestNATSProvider_StreamResources_NilSkipped(t *testing.T) { + p := nats.New() + streams := []*eventbusv1.StreamConfig{ + {Name: "A", Subjects: []string{"a.>"}}, + nil, + {Name: "B", Subjects: []string{"b.>"}}, + } + res, err := p.StreamResources(streams, iac.State{}) + if err != nil { + t.Fatalf("StreamResources() error: %v", err) + } + if len(res) != 2 { + t.Errorf("StreamResources() returned %d resources, want 2 (nil skipped)", len(res)) + } +} + +// ── Probe ──────────────────────────────────────────────────────────────────── + +// TestNATSProvider_Probe_UnreachableOnEmptyURI asserts Probe returns +// HealthStatusUnreachable for an empty URI (not a real broker). +func TestNATSProvider_Probe_UnreachableOnEmptyURI(t *testing.T) { + p := nats.New() + hc := p.Probe("") + if hc.Status != providers.HealthStatusUnreachable { + t.Errorf("Probe() status = %q, want %q", hc.Status, providers.HealthStatusUnreachable) + } + if hc.Err == nil { + t.Error("Probe() Err should be non-nil for empty URI") + } +}