From bde8488f9cbbfc92895b483c3c05c73838d24ff7 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 3 May 2026 17:32:09 -0400 Subject: [PATCH 1/8] feat: scaffold workflow-plugin-eventbus repo + proto contract - go.mod: module github.com/GoCodeAlone/workflow-plugin-eventbus, go 1.26 - proto/eventbus.proto: full proto contract (ClusterConfig, JetStream/Kafka/KinesisConfig, ResourceLimits, StreamConfig, ConsumerConfig, Publish/Consume/Ack Request+Response, Message) - gen/eventbus.pb.go: generated Go bindings (protoc-gen-go v1.36.11) - plugin.json: module/step/trigger type capabilities manifest - plugin.contracts.json: strict_proto contracts for all 3 modules, 3 steps, 1 trigger - Makefile: proto-gen, build, test, vet targets (GOWORK=off) - .github/workflows/ci.yml: mirrors workflow-plugin-audit-chain (test + wfctl-strict-contracts) - .github/workflows/release.yml: GoReleaser v2 on v* tag - .goreleaser.yaml: cross-platform builds (linux/darwin/windows, amd64/arm64) - cmd/workflow-plugin-eventbus/main.go: scaffold entrypoint (TODO: wire sdk.Serve in Task 17) wfctl strict-contracts: OK workflow-plugin-eventbus v0.1.0 go build ./...: exit 0 Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/ci.yml | 31 + .github/workflows/release.yml | 24 + .goreleaser.yaml | 35 + Makefile | 19 + README.md | 100 +++ cmd/workflow-plugin-eventbus/main.go | 11 + gen/eventbus.pb.go | 1185 ++++++++++++++++++++++++++ go.mod | 5 + go.sum | 4 + plugin.contracts.json | 51 ++ plugin.json | 58 ++ proto/eventbus.proto | 178 ++++ 12 files changed, 1701 insertions(+) create mode 100644 .github/workflows/ci.yml create mode 100644 .github/workflows/release.yml create mode 100644 .goreleaser.yaml create mode 100644 Makefile create mode 100644 README.md create mode 100644 cmd/workflow-plugin-eventbus/main.go create mode 100644 gen/eventbus.pb.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 plugin.contracts.json create mode 100644 plugin.json create mode 100644 proto/eventbus.proto diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..44b39e9 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,31 @@ +name: CI +on: + push: + branches: [main] + pull_request: + branches: [main] +jobs: + test: + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version-file: go.mod + - run: go build ./... + - run: go test ./... -v -race -count=1 + - run: go vet ./... + + wfctl-strict-contracts: + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version-file: go.mod + - name: Validate strict plugin contracts + run: go run github.com/GoCodeAlone/workflow/cmd/wfctl@v0.20.1 plugin validate --file plugin.json --strict-contracts diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..e8c1e92 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,24 @@ +name: Release +on: + push: + tags: + - 'v*' +permissions: + contents: write +jobs: + release: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + - uses: actions/setup-go@v6 + with: + go-version-file: go.mod + - uses: goreleaser/goreleaser-action@v7 + with: + distribution: goreleaser + version: '~> v2' + args: release --clean + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.goreleaser.yaml b/.goreleaser.yaml new file mode 100644 index 0000000..299b24f --- /dev/null +++ b/.goreleaser.yaml @@ -0,0 +1,35 @@ +version: 2 + +before: + hooks: + - go mod tidy + - "sed -i.bak 's/\"version\": \".*\"/\"version\": \"{{ .Version }}\"/' plugin.json && rm -f plugin.json.bak" + +builds: + - id: workflow-plugin-eventbus + main: ./cmd/workflow-plugin-eventbus + binary: workflow-plugin-eventbus + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + - windows + goarch: + - amd64 + - arm64 + ldflags: + - -s -w -X main.version={{.Version}} + +archives: + - formats: [tar.gz] + name_template: "{{ .ProjectName }}-{{ .Os }}-{{ .Arch }}" + files: + - plugin.json + - LICENSE + +checksum: + name_template: "checksums.txt" + +changelog: + sort: asc diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3c0dd86 --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +.PHONY: proto-gen build test vet + +# Regenerate Go bindings from proto/eventbus.proto. +# Requires: protoc + protoc-gen-go (go install google.golang.org/protobuf/cmd/protoc-gen-go@latest) +proto-gen: + protoc \ + --proto_path=proto \ + --go_out=gen \ + --go_opt=paths=source_relative \ + proto/eventbus.proto + +build: + GOWORK=off go build ./... + +test: + GOWORK=off go test ./... -v -race -count=1 + +vet: + GOWORK=off go vet ./... diff --git a/README.md b/README.md new file mode 100644 index 0000000..0ce21e8 --- /dev/null +++ b/README.md @@ -0,0 +1,100 @@ +# workflow-plugin-eventbus + +**Status: pre-pilot scaffold** — Provider implementations are in progress (BMW E2E fulfillment pilot, PR 5). + +A [workflow](https://github.com/GoCodeAlone/workflow) external plugin that provisions durable event-bus clusters as IaC and exposes typed pipeline steps for publish/consume operations. + +## Providers + +| Provider | Deploy targets | +|---|---| +| `nats` | DO App Platform, AWS ECS/EKS, Kubernetes StatefulSet | +| `kafka` | DO Managed Kafka, AWS MSK, Kubernetes (Strimzi) | +| `kinesis` | AWS (Kinesis Data Streams) | + +## Usage + +### Declare a cluster + +```yaml +modules: + - name: my-events + type: infra.eventbus + config: + provider: nats + deploy_target: digitalocean.app_platform + version: "2.10" + replicas: 2 + jetstream: + enabled: true + max_storage_bytes: 53687091200 # 50 GB +``` + +### Declare streams and consumers + +```yaml + - name: my-stream + type: infra.eventbus.stream + config: + name: MY_EVENTS + subjects: ["events.>"] + retention_policy: limits + max_bytes: 10737418240 # 10 GB + + - name: my-consumer + type: infra.eventbus.consumer + config: + stream_name: MY_EVENTS + name: my-handler + filter_subject: "events.>" + ack_policy: explicit + max_deliver: 5 +``` + +### Publish from a pipeline step + +```yaml +steps: + - name: publish + type: step.eventbus.publish + config: + subject: events.created + payload: '{{ toJson .input }}' +``` + +### Subscribe trigger + +```yaml +my-handler: + trigger: + type: trigger.eventbus.subscribe + config: + stream_name: MY_EVENTS + name: my-handler + filter_subject: "events.>" + ack_policy: explicit + steps: + - name: ack + type: step.eventbus.ack + config: + ack_token: '{{ .nats.message.ack_token }}' +``` + +## Development + +```sh +# Regenerate proto bindings after editing proto/eventbus.proto +make proto-gen + +# Build +make build + +# Test +make test +``` + +## Planned providers + +- `nats` — NATS JetStream (in progress) +- `kafka` — stub (in progress) +- `kinesis` — stub (in progress) diff --git a/cmd/workflow-plugin-eventbus/main.go b/cmd/workflow-plugin-eventbus/main.go new file mode 100644 index 0000000..c82665c --- /dev/null +++ b/cmd/workflow-plugin-eventbus/main.go @@ -0,0 +1,11 @@ +// Command workflow-plugin-eventbus is a workflow engine external plugin that +// provisions durable event-bus clusters (NATS / Kafka / Kinesis) as IaC and +// exposes typed pipeline steps for publish / consume operations. +// +// Status: pre-pilot scaffold — provider implementations are in progress. +package main + +func main() { + // TODO(Task 17): wire sdk.Serve(plugin.New()) once Provider interface is implemented. + panic("workflow-plugin-eventbus: not yet implemented — scaffold only") +} diff --git a/gen/eventbus.pb.go b/gen/eventbus.pb.go new file mode 100644 index 0000000..e188221 --- /dev/null +++ b/gen/eventbus.pb.go @@ -0,0 +1,1185 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v7.34.1 +// source: eventbus.proto + +package eventbusv1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// ClusterConfig is the typed config for infra.eventbus module. +// provider and deploy_target select the Provider × DeployTarget combination. +type ClusterConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // provider selects the message-broker backend: "nats" | "kafka" | "kinesis". + Provider string `protobuf:"bytes,1,opt,name=provider,proto3" json:"provider,omitempty"` + // deploy_target selects the deployment platform: + // "digitalocean.app_platform" | "aws.ecs" | "aws.eks" | "kubernetes". + DeployTarget string `protobuf:"bytes,2,opt,name=deploy_target,json=deployTarget,proto3" json:"deploy_target,omitempty"` + // version is the broker version to deploy (e.g. "2.10" for NATS). + Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"` + // replicas is the number of broker replicas. + Replicas int32 `protobuf:"varint,4,opt,name=replicas,proto3" json:"replicas,omitempty"` + // jetstream holds NATS JetStream-specific configuration; ignored for other providers. + Jetstream *JetStreamConfig `protobuf:"bytes,5,opt,name=jetstream,proto3" json:"jetstream,omitempty"` + // kafka holds Kafka-specific cluster configuration; ignored for other providers. + Kafka *KafkaConfig `protobuf:"bytes,6,opt,name=kafka,proto3" json:"kafka,omitempty"` + // kinesis holds Kinesis-specific configuration; ignored for other providers. + Kinesis *KinesisConfig `protobuf:"bytes,7,opt,name=kinesis,proto3" json:"kinesis,omitempty"` + // limits sets compute/storage resource limits for the cluster containers. + Limits *ResourceLimits `protobuf:"bytes,8,opt,name=limits,proto3" json:"limits,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ClusterConfig) Reset() { + *x = ClusterConfig{} + mi := &file_eventbus_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ClusterConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClusterConfig) ProtoMessage() {} + +func (x *ClusterConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClusterConfig.ProtoReflect.Descriptor instead. +func (*ClusterConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{0} +} + +func (x *ClusterConfig) GetProvider() string { + if x != nil { + return x.Provider + } + return "" +} + +func (x *ClusterConfig) GetDeployTarget() string { + if x != nil { + return x.DeployTarget + } + return "" +} + +func (x *ClusterConfig) GetVersion() string { + if x != nil { + return x.Version + } + return "" +} + +func (x *ClusterConfig) GetReplicas() int32 { + if x != nil { + return x.Replicas + } + return 0 +} + +func (x *ClusterConfig) GetJetstream() *JetStreamConfig { + if x != nil { + return x.Jetstream + } + return nil +} + +func (x *ClusterConfig) GetKafka() *KafkaConfig { + if x != nil { + return x.Kafka + } + return nil +} + +func (x *ClusterConfig) GetKinesis() *KinesisConfig { + if x != nil { + return x.Kinesis + } + return nil +} + +func (x *ClusterConfig) GetLimits() *ResourceLimits { + if x != nil { + return x.Limits + } + return nil +} + +// JetStreamConfig holds NATS JetStream-specific settings. +type JetStreamConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // enabled activates JetStream on the NATS cluster (required for durability). + Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"` + // max_storage_bytes caps total on-disk storage for all streams (0 = unlimited). + MaxStorageBytes int64 `protobuf:"varint,2,opt,name=max_storage_bytes,json=maxStorageBytes,proto3" json:"max_storage_bytes,omitempty"` + // max_memory_bytes caps in-memory storage (0 = unlimited). + MaxMemoryBytes int64 `protobuf:"varint,3,opt,name=max_memory_bytes,json=maxMemoryBytes,proto3" json:"max_memory_bytes,omitempty"` + // max_age is the default maximum age for messages across all streams. + MaxAge *durationpb.Duration `protobuf:"bytes,4,opt,name=max_age,json=maxAge,proto3" json:"max_age,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *JetStreamConfig) Reset() { + *x = JetStreamConfig{} + mi := &file_eventbus_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *JetStreamConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JetStreamConfig) ProtoMessage() {} + +func (x *JetStreamConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JetStreamConfig.ProtoReflect.Descriptor instead. +func (*JetStreamConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{1} +} + +func (x *JetStreamConfig) GetEnabled() bool { + if x != nil { + return x.Enabled + } + return false +} + +func (x *JetStreamConfig) GetMaxStorageBytes() int64 { + if x != nil { + return x.MaxStorageBytes + } + return 0 +} + +func (x *JetStreamConfig) GetMaxMemoryBytes() int64 { + if x != nil { + return x.MaxMemoryBytes + } + return 0 +} + +func (x *JetStreamConfig) GetMaxAge() *durationpb.Duration { + if x != nil { + return x.MaxAge + } + return nil +} + +// KafkaConfig holds Kafka-specific cluster settings. +type KafkaConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // bootstrap_servers is the initial broker address list (comma-separated). + // Used when connecting to an existing Kafka cluster rather than provisioning one. + BootstrapServers string `protobuf:"bytes,1,opt,name=bootstrap_servers,json=bootstrapServers,proto3" json:"bootstrap_servers,omitempty"` + // security_protocol specifies the protocol for client-broker communication: + // "PLAINTEXT" | "SSL" | "SASL_PLAINTEXT" | "SASL_SSL". + SecurityProtocol string `protobuf:"bytes,2,opt,name=security_protocol,json=securityProtocol,proto3" json:"security_protocol,omitempty"` + // sasl_mechanism for SASL authentication: "PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512". + SaslMechanism string `protobuf:"bytes,3,opt,name=sasl_mechanism,json=saslMechanism,proto3" json:"sasl_mechanism,omitempty"` + // default_replication_factor for new topics (0 = use broker default). + DefaultReplicationFactor int32 `protobuf:"varint,4,opt,name=default_replication_factor,json=defaultReplicationFactor,proto3" json:"default_replication_factor,omitempty"` + // min_insync_replicas is the minimum number of in-sync replicas required for acks. + MinInsyncReplicas int32 `protobuf:"varint,5,opt,name=min_insync_replicas,json=minInsyncReplicas,proto3" json:"min_insync_replicas,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *KafkaConfig) Reset() { + *x = KafkaConfig{} + mi := &file_eventbus_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *KafkaConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KafkaConfig) ProtoMessage() {} + +func (x *KafkaConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KafkaConfig.ProtoReflect.Descriptor instead. +func (*KafkaConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{2} +} + +func (x *KafkaConfig) GetBootstrapServers() string { + if x != nil { + return x.BootstrapServers + } + return "" +} + +func (x *KafkaConfig) GetSecurityProtocol() string { + if x != nil { + return x.SecurityProtocol + } + return "" +} + +func (x *KafkaConfig) GetSaslMechanism() string { + if x != nil { + return x.SaslMechanism + } + return "" +} + +func (x *KafkaConfig) GetDefaultReplicationFactor() int32 { + if x != nil { + return x.DefaultReplicationFactor + } + return 0 +} + +func (x *KafkaConfig) GetMinInsyncReplicas() int32 { + if x != nil { + return x.MinInsyncReplicas + } + return 0 +} + +// KinesisConfig holds AWS Kinesis Data Streams-specific settings. +type KinesisConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // region is the AWS region for the Kinesis stream (e.g. "us-east-1"). + Region string `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"` + // shard_count is the initial number of shards (throughput units). + ShardCount int32 `protobuf:"varint,2,opt,name=shard_count,json=shardCount,proto3" json:"shard_count,omitempty"` + // retention_period_hours is the message retention window (24–8760 hours). + RetentionPeriodHours int32 `protobuf:"varint,3,opt,name=retention_period_hours,json=retentionPeriodHours,proto3" json:"retention_period_hours,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *KinesisConfig) Reset() { + *x = KinesisConfig{} + mi := &file_eventbus_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *KinesisConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KinesisConfig) ProtoMessage() {} + +func (x *KinesisConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KinesisConfig.ProtoReflect.Descriptor instead. +func (*KinesisConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{3} +} + +func (x *KinesisConfig) GetRegion() string { + if x != nil { + return x.Region + } + return "" +} + +func (x *KinesisConfig) GetShardCount() int32 { + if x != nil { + return x.ShardCount + } + return 0 +} + +func (x *KinesisConfig) GetRetentionPeriodHours() int32 { + if x != nil { + return x.RetentionPeriodHours + } + return 0 +} + +// ResourceLimits constrains compute and storage for cluster containers. +type ResourceLimits struct { + state protoimpl.MessageState `protogen:"open.v1"` + // cpu is the CPU limit in Kubernetes notation (e.g. "500m", "2"). + Cpu string `protobuf:"bytes,1,opt,name=cpu,proto3" json:"cpu,omitempty"` + // memory is the memory limit (e.g. "512Mi", "2Gi"). + Memory string `protobuf:"bytes,2,opt,name=memory,proto3" json:"memory,omitempty"` + // storage is the persistent-volume size (e.g. "10Gi"). + Storage string `protobuf:"bytes,3,opt,name=storage,proto3" json:"storage,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ResourceLimits) Reset() { + *x = ResourceLimits{} + mi := &file_eventbus_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ResourceLimits) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ResourceLimits) ProtoMessage() {} + +func (x *ResourceLimits) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ResourceLimits.ProtoReflect.Descriptor instead. +func (*ResourceLimits) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{4} +} + +func (x *ResourceLimits) GetCpu() string { + if x != nil { + return x.Cpu + } + return "" +} + +func (x *ResourceLimits) GetMemory() string { + if x != nil { + return x.Memory + } + return "" +} + +func (x *ResourceLimits) GetStorage() string { + if x != nil { + return x.Storage + } + return "" +} + +// StreamConfig is the typed config for infra.eventbus.stream module. +type StreamConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // name is the stream name (e.g. "BMW_FULFILLMENT"). + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // subjects lists the NATS subject filter patterns bound to this stream. + Subjects []string `protobuf:"bytes,2,rep,name=subjects,proto3" json:"subjects,omitempty"` + // retention_policy controls message retention: "limits" | "interest" | "workqueue". + RetentionPolicy string `protobuf:"bytes,3,opt,name=retention_policy,json=retentionPolicy,proto3" json:"retention_policy,omitempty"` + // num_replicas is the number of stream replicas within the cluster. + NumReplicas int32 `protobuf:"varint,4,opt,name=num_replicas,json=numReplicas,proto3" json:"num_replicas,omitempty"` + // max_bytes caps total on-disk storage for this stream (0 = unlimited). + MaxBytes int64 `protobuf:"varint,5,opt,name=max_bytes,json=maxBytes,proto3" json:"max_bytes,omitempty"` + // max_age is the maximum age for messages in this stream. + MaxAge *durationpb.Duration `protobuf:"bytes,6,opt,name=max_age,json=maxAge,proto3" json:"max_age,omitempty"` + // ack_wait is the default acknowledgement deadline for consumers on this stream. + AckWait *durationpb.Duration `protobuf:"bytes,7,opt,name=ack_wait,json=ackWait,proto3" json:"ack_wait,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamConfig) Reset() { + *x = StreamConfig{} + mi := &file_eventbus_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamConfig) ProtoMessage() {} + +func (x *StreamConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamConfig.ProtoReflect.Descriptor instead. +func (*StreamConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{5} +} + +func (x *StreamConfig) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *StreamConfig) GetSubjects() []string { + if x != nil { + return x.Subjects + } + return nil +} + +func (x *StreamConfig) GetRetentionPolicy() string { + if x != nil { + return x.RetentionPolicy + } + return "" +} + +func (x *StreamConfig) GetNumReplicas() int32 { + if x != nil { + return x.NumReplicas + } + return 0 +} + +func (x *StreamConfig) GetMaxBytes() int64 { + if x != nil { + return x.MaxBytes + } + return 0 +} + +func (x *StreamConfig) GetMaxAge() *durationpb.Duration { + if x != nil { + return x.MaxAge + } + return nil +} + +func (x *StreamConfig) GetAckWait() *durationpb.Duration { + if x != nil { + return x.AckWait + } + return nil +} + +// ConsumerConfig is the typed config for infra.eventbus.consumer module +// and trigger.eventbus.subscribe. +type ConsumerConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // name is the durable consumer name. + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // stream_name is the stream this consumer attaches to. + StreamName string `protobuf:"bytes,2,opt,name=stream_name,json=streamName,proto3" json:"stream_name,omitempty"` + // filter_subject narrows delivery to messages matching this subject pattern. + FilterSubject string `protobuf:"bytes,3,opt,name=filter_subject,json=filterSubject,proto3" json:"filter_subject,omitempty"` + // deliver_policy controls which messages are delivered on first attach: + // "all" | "last" | "new" | "by_start_sequence" | "by_start_time". + DeliverPolicy string `protobuf:"bytes,4,opt,name=deliver_policy,json=deliverPolicy,proto3" json:"deliver_policy,omitempty"` + // ack_policy controls how acknowledgements are required: + // "explicit" | "none" | "all". + AckPolicy string `protobuf:"bytes,5,opt,name=ack_policy,json=ackPolicy,proto3" json:"ack_policy,omitempty"` + // max_deliver is the maximum number of delivery attempts before NACKing (0 = unlimited). + MaxDeliver int32 `protobuf:"varint,6,opt,name=max_deliver,json=maxDeliver,proto3" json:"max_deliver,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ConsumerConfig) Reset() { + *x = ConsumerConfig{} + mi := &file_eventbus_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ConsumerConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConsumerConfig) ProtoMessage() {} + +func (x *ConsumerConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConsumerConfig.ProtoReflect.Descriptor instead. +func (*ConsumerConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{6} +} + +func (x *ConsumerConfig) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ConsumerConfig) GetStreamName() string { + if x != nil { + return x.StreamName + } + return "" +} + +func (x *ConsumerConfig) GetFilterSubject() string { + if x != nil { + return x.FilterSubject + } + return "" +} + +func (x *ConsumerConfig) GetDeliverPolicy() string { + if x != nil { + return x.DeliverPolicy + } + return "" +} + +func (x *ConsumerConfig) GetAckPolicy() string { + if x != nil { + return x.AckPolicy + } + return "" +} + +func (x *ConsumerConfig) GetMaxDeliver() int32 { + if x != nil { + return x.MaxDeliver + } + return 0 +} + +// PublishRequest is the input for step.eventbus.publish. +type PublishRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // subject is the NATS subject (or Kafka topic) to publish to. + Subject string `protobuf:"bytes,1,opt,name=subject,proto3" json:"subject,omitempty"` + // payload is the raw message bytes. + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + // headers are optional key/value metadata attached to the message. + Headers map[string]string `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // correlation_id is an opaque identifier for request/reply correlation or tracing. + CorrelationId string `protobuf:"bytes,4,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishRequest) Reset() { + *x = PublishRequest{} + mi := &file_eventbus_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishRequest) ProtoMessage() {} + +func (x *PublishRequest) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. +func (*PublishRequest) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{7} +} + +func (x *PublishRequest) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +func (x *PublishRequest) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *PublishRequest) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +func (x *PublishRequest) GetCorrelationId() string { + if x != nil { + return x.CorrelationId + } + return "" +} + +// PublishResponse is the output from step.eventbus.publish. +type PublishResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // sequence is the JetStream stream sequence number; empty for non-JetStream brokers. + Sequence string `protobuf:"bytes,1,opt,name=sequence,proto3" json:"sequence,omitempty"` + // acked_at is the timestamp when the broker acknowledged the message (RFC3339). + AckedAt string `protobuf:"bytes,2,opt,name=acked_at,json=ackedAt,proto3" json:"acked_at,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishResponse) Reset() { + *x = PublishResponse{} + mi := &file_eventbus_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishResponse) ProtoMessage() {} + +func (x *PublishResponse) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. +func (*PublishResponse) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{8} +} + +func (x *PublishResponse) GetSequence() string { + if x != nil { + return x.Sequence + } + return "" +} + +func (x *PublishResponse) GetAckedAt() string { + if x != nil { + return x.AckedAt + } + return "" +} + +// ConsumeRequest is the input for step.eventbus.consume. +type ConsumeRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // consumer is the durable consumer name to pull from. + Consumer string `protobuf:"bytes,1,opt,name=consumer,proto3" json:"consumer,omitempty"` + // batch_size is the maximum number of messages to return in one call. + BatchSize int32 `protobuf:"varint,2,opt,name=batch_size,json=batchSize,proto3" json:"batch_size,omitempty"` + // max_wait is the maximum time to block waiting for messages. + MaxWait *durationpb.Duration `protobuf:"bytes,3,opt,name=max_wait,json=maxWait,proto3" json:"max_wait,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ConsumeRequest) Reset() { + *x = ConsumeRequest{} + mi := &file_eventbus_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ConsumeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConsumeRequest) ProtoMessage() {} + +func (x *ConsumeRequest) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConsumeRequest.ProtoReflect.Descriptor instead. +func (*ConsumeRequest) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{9} +} + +func (x *ConsumeRequest) GetConsumer() string { + if x != nil { + return x.Consumer + } + return "" +} + +func (x *ConsumeRequest) GetBatchSize() int32 { + if x != nil { + return x.BatchSize + } + return 0 +} + +func (x *ConsumeRequest) GetMaxWait() *durationpb.Duration { + if x != nil { + return x.MaxWait + } + return nil +} + +// ConsumeResponse is the output from step.eventbus.consume. +type ConsumeResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // messages contains the delivered messages (up to batch_size). + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ConsumeResponse) Reset() { + *x = ConsumeResponse{} + mi := &file_eventbus_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ConsumeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConsumeResponse) ProtoMessage() {} + +func (x *ConsumeResponse) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConsumeResponse.ProtoReflect.Descriptor instead. +func (*ConsumeResponse) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{10} +} + +func (x *ConsumeResponse) GetMessages() []*Message { + if x != nil { + return x.Messages + } + return nil +} + +// Message represents a single delivered event-bus message. +type Message struct { + state protoimpl.MessageState `protogen:"open.v1"` + // subject is the NATS subject (or Kafka topic/partition key) the message was published on. + Subject string `protobuf:"bytes,1,opt,name=subject,proto3" json:"subject,omitempty"` + // payload is the raw message bytes. + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + // headers are key/value metadata attached to the message. + Headers map[string]string `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // sequence is the stream sequence number assigned by the broker. + Sequence string `protobuf:"bytes,4,opt,name=sequence,proto3" json:"sequence,omitempty"` + // published_at is the broker-assigned publish timestamp (RFC3339). + PublishedAt string `protobuf:"bytes,5,opt,name=published_at,json=publishedAt,proto3" json:"published_at,omitempty"` + // ack_token is the opaque token used to acknowledge this message via step.eventbus.ack. + AckToken string `protobuf:"bytes,6,opt,name=ack_token,json=ackToken,proto3" json:"ack_token,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Message) Reset() { + *x = Message{} + mi := &file_eventbus_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{11} +} + +func (x *Message) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +func (x *Message) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *Message) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +func (x *Message) GetSequence() string { + if x != nil { + return x.Sequence + } + return "" +} + +func (x *Message) GetPublishedAt() string { + if x != nil { + return x.PublishedAt + } + return "" +} + +func (x *Message) GetAckToken() string { + if x != nil { + return x.AckToken + } + return "" +} + +// AckRequest is the input for step.eventbus.ack. +type AckRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // ack_token is the opaque acknowledgement token from Message.ack_token. + AckToken string `protobuf:"bytes,1,opt,name=ack_token,json=ackToken,proto3" json:"ack_token,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AckRequest) Reset() { + *x = AckRequest{} + mi := &file_eventbus_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AckRequest) ProtoMessage() {} + +func (x *AckRequest) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[12] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AckRequest.ProtoReflect.Descriptor instead. +func (*AckRequest) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{12} +} + +func (x *AckRequest) GetAckToken() string { + if x != nil { + return x.AckToken + } + return "" +} + +// AckResponse is the output from step.eventbus.ack. +type AckResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AckResponse) Reset() { + *x = AckResponse{} + mi := &file_eventbus_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AckResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AckResponse) ProtoMessage() {} + +func (x *AckResponse) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AckResponse.ProtoReflect.Descriptor instead. +func (*AckResponse) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{13} +} + +var File_eventbus_proto protoreflect.FileDescriptor + +const file_eventbus_proto_rawDesc = "" + + "\n" + + "\x0eeventbus.proto\x12\x1bworkflow.plugin.eventbus.v1\x1a\x1egoogle/protobuf/duration.proto\"\x9d\x03\n" + + "\rClusterConfig\x12\x1a\n" + + "\bprovider\x18\x01 \x01(\tR\bprovider\x12#\n" + + "\rdeploy_target\x18\x02 \x01(\tR\fdeployTarget\x12\x18\n" + + "\aversion\x18\x03 \x01(\tR\aversion\x12\x1a\n" + + "\breplicas\x18\x04 \x01(\x05R\breplicas\x12J\n" + + "\tjetstream\x18\x05 \x01(\v2,.workflow.plugin.eventbus.v1.JetStreamConfigR\tjetstream\x12>\n" + + "\x05kafka\x18\x06 \x01(\v2(.workflow.plugin.eventbus.v1.KafkaConfigR\x05kafka\x12D\n" + + "\akinesis\x18\a \x01(\v2*.workflow.plugin.eventbus.v1.KinesisConfigR\akinesis\x12C\n" + + "\x06limits\x18\b \x01(\v2+.workflow.plugin.eventbus.v1.ResourceLimitsR\x06limits\"\xb5\x01\n" + + "\x0fJetStreamConfig\x12\x18\n" + + "\aenabled\x18\x01 \x01(\bR\aenabled\x12*\n" + + "\x11max_storage_bytes\x18\x02 \x01(\x03R\x0fmaxStorageBytes\x12(\n" + + "\x10max_memory_bytes\x18\x03 \x01(\x03R\x0emaxMemoryBytes\x122\n" + + "\amax_age\x18\x04 \x01(\v2\x19.google.protobuf.DurationR\x06maxAge\"\xfc\x01\n" + + "\vKafkaConfig\x12+\n" + + "\x11bootstrap_servers\x18\x01 \x01(\tR\x10bootstrapServers\x12+\n" + + "\x11security_protocol\x18\x02 \x01(\tR\x10securityProtocol\x12%\n" + + "\x0esasl_mechanism\x18\x03 \x01(\tR\rsaslMechanism\x12<\n" + + "\x1adefault_replication_factor\x18\x04 \x01(\x05R\x18defaultReplicationFactor\x12.\n" + + "\x13min_insync_replicas\x18\x05 \x01(\x05R\x11minInsyncReplicas\"~\n" + + "\rKinesisConfig\x12\x16\n" + + "\x06region\x18\x01 \x01(\tR\x06region\x12\x1f\n" + + "\vshard_count\x18\x02 \x01(\x05R\n" + + "shardCount\x124\n" + + "\x16retention_period_hours\x18\x03 \x01(\x05R\x14retentionPeriodHours\"T\n" + + "\x0eResourceLimits\x12\x10\n" + + "\x03cpu\x18\x01 \x01(\tR\x03cpu\x12\x16\n" + + "\x06memory\x18\x02 \x01(\tR\x06memory\x12\x18\n" + + "\astorage\x18\x03 \x01(\tR\astorage\"\x93\x02\n" + + "\fStreamConfig\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1a\n" + + "\bsubjects\x18\x02 \x03(\tR\bsubjects\x12)\n" + + "\x10retention_policy\x18\x03 \x01(\tR\x0fretentionPolicy\x12!\n" + + "\fnum_replicas\x18\x04 \x01(\x05R\vnumReplicas\x12\x1b\n" + + "\tmax_bytes\x18\x05 \x01(\x03R\bmaxBytes\x122\n" + + "\amax_age\x18\x06 \x01(\v2\x19.google.protobuf.DurationR\x06maxAge\x124\n" + + "\back_wait\x18\a \x01(\v2\x19.google.protobuf.DurationR\aackWait\"\xd3\x01\n" + + "\x0eConsumerConfig\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1f\n" + + "\vstream_name\x18\x02 \x01(\tR\n" + + "streamName\x12%\n" + + "\x0efilter_subject\x18\x03 \x01(\tR\rfilterSubject\x12%\n" + + "\x0edeliver_policy\x18\x04 \x01(\tR\rdeliverPolicy\x12\x1d\n" + + "\n" + + "ack_policy\x18\x05 \x01(\tR\tackPolicy\x12\x1f\n" + + "\vmax_deliver\x18\x06 \x01(\x05R\n" + + "maxDeliver\"\xfb\x01\n" + + "\x0ePublishRequest\x12\x18\n" + + "\asubject\x18\x01 \x01(\tR\asubject\x12\x18\n" + + "\apayload\x18\x02 \x01(\fR\apayload\x12R\n" + + "\aheaders\x18\x03 \x03(\v28.workflow.plugin.eventbus.v1.PublishRequest.HeadersEntryR\aheaders\x12%\n" + + "\x0ecorrelation_id\x18\x04 \x01(\tR\rcorrelationId\x1a:\n" + + "\fHeadersEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"H\n" + + "\x0fPublishResponse\x12\x1a\n" + + "\bsequence\x18\x01 \x01(\tR\bsequence\x12\x19\n" + + "\backed_at\x18\x02 \x01(\tR\aackedAt\"\x81\x01\n" + + "\x0eConsumeRequest\x12\x1a\n" + + "\bconsumer\x18\x01 \x01(\tR\bconsumer\x12\x1d\n" + + "\n" + + "batch_size\x18\x02 \x01(\x05R\tbatchSize\x124\n" + + "\bmax_wait\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\amaxWait\"S\n" + + "\x0fConsumeResponse\x12@\n" + + "\bmessages\x18\x01 \x03(\v2$.workflow.plugin.eventbus.v1.MessageR\bmessages\"\xa2\x02\n" + + "\aMessage\x12\x18\n" + + "\asubject\x18\x01 \x01(\tR\asubject\x12\x18\n" + + "\apayload\x18\x02 \x01(\fR\apayload\x12K\n" + + "\aheaders\x18\x03 \x03(\v21.workflow.plugin.eventbus.v1.Message.HeadersEntryR\aheaders\x12\x1a\n" + + "\bsequence\x18\x04 \x01(\tR\bsequence\x12!\n" + + "\fpublished_at\x18\x05 \x01(\tR\vpublishedAt\x12\x1b\n" + + "\tack_token\x18\x06 \x01(\tR\backToken\x1a:\n" + + "\fHeadersEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\")\n" + + "\n" + + "AckRequest\x12\x1b\n" + + "\tack_token\x18\x01 \x01(\tR\backToken\"\r\n" + + "\vAckResponseB@Z>github.com/GoCodeAlone/workflow-plugin-eventbus/gen;eventbusv1b\x06proto3" + +var ( + file_eventbus_proto_rawDescOnce sync.Once + file_eventbus_proto_rawDescData []byte +) + +func file_eventbus_proto_rawDescGZIP() []byte { + file_eventbus_proto_rawDescOnce.Do(func() { + file_eventbus_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_eventbus_proto_rawDesc), len(file_eventbus_proto_rawDesc))) + }) + return file_eventbus_proto_rawDescData +} + +var file_eventbus_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_eventbus_proto_goTypes = []any{ + (*ClusterConfig)(nil), // 0: workflow.plugin.eventbus.v1.ClusterConfig + (*JetStreamConfig)(nil), // 1: workflow.plugin.eventbus.v1.JetStreamConfig + (*KafkaConfig)(nil), // 2: workflow.plugin.eventbus.v1.KafkaConfig + (*KinesisConfig)(nil), // 3: workflow.plugin.eventbus.v1.KinesisConfig + (*ResourceLimits)(nil), // 4: workflow.plugin.eventbus.v1.ResourceLimits + (*StreamConfig)(nil), // 5: workflow.plugin.eventbus.v1.StreamConfig + (*ConsumerConfig)(nil), // 6: workflow.plugin.eventbus.v1.ConsumerConfig + (*PublishRequest)(nil), // 7: workflow.plugin.eventbus.v1.PublishRequest + (*PublishResponse)(nil), // 8: workflow.plugin.eventbus.v1.PublishResponse + (*ConsumeRequest)(nil), // 9: workflow.plugin.eventbus.v1.ConsumeRequest + (*ConsumeResponse)(nil), // 10: workflow.plugin.eventbus.v1.ConsumeResponse + (*Message)(nil), // 11: workflow.plugin.eventbus.v1.Message + (*AckRequest)(nil), // 12: workflow.plugin.eventbus.v1.AckRequest + (*AckResponse)(nil), // 13: workflow.plugin.eventbus.v1.AckResponse + nil, // 14: workflow.plugin.eventbus.v1.PublishRequest.HeadersEntry + nil, // 15: workflow.plugin.eventbus.v1.Message.HeadersEntry + (*durationpb.Duration)(nil), // 16: google.protobuf.Duration +} +var file_eventbus_proto_depIdxs = []int32{ + 1, // 0: workflow.plugin.eventbus.v1.ClusterConfig.jetstream:type_name -> workflow.plugin.eventbus.v1.JetStreamConfig + 2, // 1: workflow.plugin.eventbus.v1.ClusterConfig.kafka:type_name -> workflow.plugin.eventbus.v1.KafkaConfig + 3, // 2: workflow.plugin.eventbus.v1.ClusterConfig.kinesis:type_name -> workflow.plugin.eventbus.v1.KinesisConfig + 4, // 3: workflow.plugin.eventbus.v1.ClusterConfig.limits:type_name -> workflow.plugin.eventbus.v1.ResourceLimits + 16, // 4: workflow.plugin.eventbus.v1.JetStreamConfig.max_age:type_name -> google.protobuf.Duration + 16, // 5: workflow.plugin.eventbus.v1.StreamConfig.max_age:type_name -> google.protobuf.Duration + 16, // 6: workflow.plugin.eventbus.v1.StreamConfig.ack_wait:type_name -> google.protobuf.Duration + 14, // 7: workflow.plugin.eventbus.v1.PublishRequest.headers:type_name -> workflow.plugin.eventbus.v1.PublishRequest.HeadersEntry + 16, // 8: workflow.plugin.eventbus.v1.ConsumeRequest.max_wait:type_name -> google.protobuf.Duration + 11, // 9: workflow.plugin.eventbus.v1.ConsumeResponse.messages:type_name -> workflow.plugin.eventbus.v1.Message + 15, // 10: workflow.plugin.eventbus.v1.Message.headers:type_name -> workflow.plugin.eventbus.v1.Message.HeadersEntry + 11, // [11:11] is the sub-list for method output_type + 11, // [11:11] is the sub-list for method input_type + 11, // [11:11] is the sub-list for extension type_name + 11, // [11:11] is the sub-list for extension extendee + 0, // [0:11] is the sub-list for field type_name +} + +func init() { file_eventbus_proto_init() } +func file_eventbus_proto_init() { + if File_eventbus_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_eventbus_proto_rawDesc), len(file_eventbus_proto_rawDesc)), + NumEnums: 0, + NumMessages: 16, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_eventbus_proto_goTypes, + DependencyIndexes: file_eventbus_proto_depIdxs, + MessageInfos: file_eventbus_proto_msgTypes, + }.Build() + File_eventbus_proto = out.File + file_eventbus_proto_goTypes = nil + file_eventbus_proto_depIdxs = nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..49ca598 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/GoCodeAlone/workflow-plugin-eventbus + +go 1.26.0 + +require google.golang.org/protobuf v1.36.11 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..296be18 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= diff --git a/plugin.contracts.json b/plugin.contracts.json new file mode 100644 index 0000000..45bc35e --- /dev/null +++ b/plugin.contracts.json @@ -0,0 +1,51 @@ +{ + "version": "1", + "contracts": [ + { + "kind": "module", + "type": "infra.eventbus", + "mode": "strict_proto", + "config": "workflow.plugin.eventbus.v1.ClusterConfig" + }, + { + "kind": "module", + "type": "infra.eventbus.stream", + "mode": "strict_proto", + "config": "workflow.plugin.eventbus.v1.StreamConfig" + }, + { + "kind": "module", + "type": "infra.eventbus.consumer", + "mode": "strict_proto", + "config": "workflow.plugin.eventbus.v1.ConsumerConfig" + }, + { + "kind": "step", + "type": "step.eventbus.publish", + "mode": "strict_proto", + "input": "workflow.plugin.eventbus.v1.PublishRequest", + "output": "workflow.plugin.eventbus.v1.PublishResponse" + }, + { + "kind": "step", + "type": "step.eventbus.consume", + "mode": "strict_proto", + "input": "workflow.plugin.eventbus.v1.ConsumeRequest", + "output": "workflow.plugin.eventbus.v1.ConsumeResponse" + }, + { + "kind": "step", + "type": "step.eventbus.ack", + "mode": "strict_proto", + "input": "workflow.plugin.eventbus.v1.AckRequest", + "output": "workflow.plugin.eventbus.v1.AckResponse" + }, + { + "kind": "trigger", + "type": "trigger.eventbus.subscribe", + "mode": "strict_proto", + "config": "workflow.plugin.eventbus.v1.ConsumerConfig", + "output": "workflow.plugin.eventbus.v1.Message" + } + ] +} diff --git a/plugin.json b/plugin.json new file mode 100644 index 0000000..9024ba8 --- /dev/null +++ b/plugin.json @@ -0,0 +1,58 @@ +{ + "name": "workflow-plugin-eventbus", + "version": "0.1.0", + "description": "Provisions durable event-bus clusters (NATS / Kafka / Kinesis) as IaC and exposes typed pipeline steps for publish / consume operations.", + "author": "GoCodeAlone", + "license": "MIT", + "type": "external", + "tier": "community", + "private": false, + "minEngineVersion": "0.20.0", + "keywords": ["eventbus", "nats", "kafka", "kinesis", "iac", "infra", "jetstream"], + "homepage": "https://github.com/GoCodeAlone/workflow-plugin-eventbus", + "repository": "https://github.com/GoCodeAlone/workflow-plugin-eventbus", + "capabilities": { + "configProvider": false, + "moduleTypes": [ + "infra.eventbus", + "infra.eventbus.stream", + "infra.eventbus.consumer" + ], + "stepTypes": [ + "step.eventbus.publish", + "step.eventbus.consume", + "step.eventbus.ack" + ], + "triggerTypes": [ + "trigger.eventbus.subscribe" + ] + }, + "contracts": [], + "downloads": [ + { + "os": "linux", + "arch": "amd64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-linux-amd64.tar.gz" + }, + { + "os": "linux", + "arch": "arm64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-linux-arm64.tar.gz" + }, + { + "os": "darwin", + "arch": "amd64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-darwin-amd64.tar.gz" + }, + { + "os": "darwin", + "arch": "arm64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-darwin-arm64.tar.gz" + }, + { + "os": "windows", + "arch": "amd64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-windows-amd64.tar.gz" + } + ] +} diff --git a/proto/eventbus.proto b/proto/eventbus.proto new file mode 100644 index 0000000..567e6e3 --- /dev/null +++ b/proto/eventbus.proto @@ -0,0 +1,178 @@ +syntax = "proto3"; +package workflow.plugin.eventbus.v1; + +option go_package = "github.com/GoCodeAlone/workflow-plugin-eventbus/gen;eventbusv1"; + +import "google/protobuf/duration.proto"; + +// ─── cluster / module configs ───────────────────────────────────────────────── + +// ClusterConfig is the typed config for infra.eventbus module. +// provider and deploy_target select the Provider × DeployTarget combination. +message ClusterConfig { + // provider selects the message-broker backend: "nats" | "kafka" | "kinesis". + string provider = 1; + // deploy_target selects the deployment platform: + // "digitalocean.app_platform" | "aws.ecs" | "aws.eks" | "kubernetes". + string deploy_target = 2; + // version is the broker version to deploy (e.g. "2.10" for NATS). + string version = 3; + // replicas is the number of broker replicas. + int32 replicas = 4; + // jetstream holds NATS JetStream-specific configuration; ignored for other providers. + JetStreamConfig jetstream = 5; + // kafka holds Kafka-specific cluster configuration; ignored for other providers. + KafkaConfig kafka = 6; + // kinesis holds Kinesis-specific configuration; ignored for other providers. + KinesisConfig kinesis = 7; + // limits sets compute/storage resource limits for the cluster containers. + ResourceLimits limits = 8; +} + +// JetStreamConfig holds NATS JetStream-specific settings. +message JetStreamConfig { + // enabled activates JetStream on the NATS cluster (required for durability). + bool enabled = 1; + // max_storage_bytes caps total on-disk storage for all streams (0 = unlimited). + int64 max_storage_bytes = 2; + // max_memory_bytes caps in-memory storage (0 = unlimited). + int64 max_memory_bytes = 3; + // max_age is the default maximum age for messages across all streams. + google.protobuf.Duration max_age = 4; +} + +// KafkaConfig holds Kafka-specific cluster settings. +message KafkaConfig { + // bootstrap_servers is the initial broker address list (comma-separated). + // Used when connecting to an existing Kafka cluster rather than provisioning one. + string bootstrap_servers = 1; + // security_protocol specifies the protocol for client-broker communication: + // "PLAINTEXT" | "SSL" | "SASL_PLAINTEXT" | "SASL_SSL". + string security_protocol = 2; + // sasl_mechanism for SASL authentication: "PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512". + string sasl_mechanism = 3; + // default_replication_factor for new topics (0 = use broker default). + int32 default_replication_factor = 4; + // min_insync_replicas is the minimum number of in-sync replicas required for acks. + int32 min_insync_replicas = 5; +} + +// KinesisConfig holds AWS Kinesis Data Streams-specific settings. +message KinesisConfig { + // region is the AWS region for the Kinesis stream (e.g. "us-east-1"). + string region = 1; + // shard_count is the initial number of shards (throughput units). + int32 shard_count = 2; + // retention_period_hours is the message retention window (24–8760 hours). + int32 retention_period_hours = 3; +} + +// ResourceLimits constrains compute and storage for cluster containers. +message ResourceLimits { + // cpu is the CPU limit in Kubernetes notation (e.g. "500m", "2"). + string cpu = 1; + // memory is the memory limit (e.g. "512Mi", "2Gi"). + string memory = 2; + // storage is the persistent-volume size (e.g. "10Gi"). + string storage = 3; +} + +// StreamConfig is the typed config for infra.eventbus.stream module. +message StreamConfig { + // name is the stream name (e.g. "BMW_FULFILLMENT"). + string name = 1; + // subjects lists the NATS subject filter patterns bound to this stream. + repeated string subjects = 2; + // retention_policy controls message retention: "limits" | "interest" | "workqueue". + string retention_policy = 3; + // num_replicas is the number of stream replicas within the cluster. + int32 num_replicas = 4; + // max_bytes caps total on-disk storage for this stream (0 = unlimited). + int64 max_bytes = 5; + // max_age is the maximum age for messages in this stream. + google.protobuf.Duration max_age = 6; + // ack_wait is the default acknowledgement deadline for consumers on this stream. + google.protobuf.Duration ack_wait = 7; +} + +// ConsumerConfig is the typed config for infra.eventbus.consumer module +// and trigger.eventbus.subscribe. +message ConsumerConfig { + // name is the durable consumer name. + string name = 1; + // stream_name is the stream this consumer attaches to. + string stream_name = 2; + // filter_subject narrows delivery to messages matching this subject pattern. + string filter_subject = 3; + // deliver_policy controls which messages are delivered on first attach: + // "all" | "last" | "new" | "by_start_sequence" | "by_start_time". + string deliver_policy = 4; + // ack_policy controls how acknowledgements are required: + // "explicit" | "none" | "all". + string ack_policy = 5; + // max_deliver is the maximum number of delivery attempts before NACKing (0 = unlimited). + int32 max_deliver = 6; +} + +// ─── step inputs / outputs ──────────────────────────────────────────────────── + +// PublishRequest is the input for step.eventbus.publish. +message PublishRequest { + // subject is the NATS subject (or Kafka topic) to publish to. + string subject = 1; + // payload is the raw message bytes. + bytes payload = 2; + // headers are optional key/value metadata attached to the message. + map headers = 3; + // correlation_id is an opaque identifier for request/reply correlation or tracing. + string correlation_id = 4; +} + +// PublishResponse is the output from step.eventbus.publish. +message PublishResponse { + // sequence is the JetStream stream sequence number; empty for non-JetStream brokers. + string sequence = 1; + // acked_at is the timestamp when the broker acknowledged the message (RFC3339). + string acked_at = 2; +} + +// ConsumeRequest is the input for step.eventbus.consume. +message ConsumeRequest { + // consumer is the durable consumer name to pull from. + string consumer = 1; + // batch_size is the maximum number of messages to return in one call. + int32 batch_size = 2; + // max_wait is the maximum time to block waiting for messages. + google.protobuf.Duration max_wait = 3; +} + +// ConsumeResponse is the output from step.eventbus.consume. +message ConsumeResponse { + // messages contains the delivered messages (up to batch_size). + repeated Message messages = 1; +} + +// Message represents a single delivered event-bus message. +message Message { + // subject is the NATS subject (or Kafka topic/partition key) the message was published on. + string subject = 1; + // payload is the raw message bytes. + bytes payload = 2; + // headers are key/value metadata attached to the message. + map headers = 3; + // sequence is the stream sequence number assigned by the broker. + string sequence = 4; + // published_at is the broker-assigned publish timestamp (RFC3339). + string published_at = 5; + // ack_token is the opaque token used to acknowledge this message via step.eventbus.ack. + string ack_token = 6; +} + +// AckRequest is the input for step.eventbus.ack. +message AckRequest { + // ack_token is the opaque acknowledgement token from Message.ack_token. + string ack_token = 1; +} + +// AckResponse is the output from step.eventbus.ack. +message AckResponse {} From 1990d3628fac87dfc9bcfcf0e1bdd54d47cdf065 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 3 May 2026 17:37:23 -0400 Subject: [PATCH 2/8] feat: add Provider interface + DeployTarget compatibility matrix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - iac/iac.go: typed Resource + State structs (zero map[string]any); State.Output() helper - providers/target.go: DeployTarget enum (8 constants) + ValidateProviderTarget matrix: nats → {DO App, AWS ECS/EKS, k8s, self-hosted} kafka → {DO Managed Kafka, AWS MSK, k8s, self-hosted} kinesis → {AWS Kinesis only} unknown provider → error - providers/provider.go: Provider interface (Name/Resources/ConnectionString/StreamResources/HealthCheck) - providers/provider_test.go: table-driven TDD covering all 25 provider×target combos + unknown provider; regression invariant proven (revert→FAIL, restore→PASS) go build ./...: exit 0 | go test ./...: PASS (25 subtests) Co-Authored-By: Claude Sonnet 4.6 --- iac/iac.go | 32 ++++++++++++++++ providers/provider.go | 48 ++++++++++++++++++++++++ providers/provider_test.go | 76 ++++++++++++++++++++++++++++++++++++++ providers/target.go | 61 ++++++++++++++++++++++++++++++ providers/target.go.bak | 61 ++++++++++++++++++++++++++++++ 5 files changed, 278 insertions(+) create mode 100644 iac/iac.go create mode 100644 providers/provider.go create mode 100644 providers/provider_test.go create mode 100644 providers/target.go create mode 100644 providers/target.go.bak diff --git a/iac/iac.go b/iac/iac.go new file mode 100644 index 0000000..98ec406 --- /dev/null +++ b/iac/iac.go @@ -0,0 +1,32 @@ +// Package iac defines typed IaC primitives emitted by eventbus Providers. +// These are consumed by downstream IaC provider plugins +// (workflow-plugin-digitalocean, workflow-plugin-aws, etc.) which realize +// them against the target cloud. Zero map[string]any — all fields are typed. +package iac + +// Resource is a typed IaC resource declaration. +// The Kind field identifies which IaC provider plugin realizes this resource. +type Resource struct { + // Kind is the resource type, e.g. "infra.app", "infra.ecs.task_definition", + // "infra.k8s.statefulset", "infra.aws.kinesis_stream". + Kind string + // Name is the resource instance name, unique within the deployment environment. + Name string + // Labels are metadata key/value pairs attached to the resource (e.g. "env": "prod"). + Labels map[string]string +} + +// State holds resolved outputs from previously provisioned resources. +// All values are typed strings (connection strings, ARNs, cluster IDs, etc.). +// Sensitive outputs (connection strings with credentials) are marked as such +// by convention: keys ending in "_uri" or "_dsn" are treated as sensitive. +type State struct { + // Outputs maps output key to value, e.g. "uri" → "nats://host:4222". + Outputs map[string]string +} + +// Output returns the state output for key, and whether it was present. +func (s State) Output(key string) (string, bool) { + v, ok := s.Outputs[key] + return v, ok +} diff --git a/providers/provider.go b/providers/provider.go new file mode 100644 index 0000000..6232a8d --- /dev/null +++ b/providers/provider.go @@ -0,0 +1,48 @@ +// Package providers defines the Provider interface and DeployTarget compatibility +// matrix for workflow-plugin-eventbus. Each provider (nats, kafka, kinesis) +// implements Provider and emits typed IaC resource declarations for its supported +// deploy targets. +package providers + +import ( + "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" +) + +// HealthCheck represents the result of a liveness probe against an event-bus URI. +type HealthCheck struct { + // URI is the address that was probed. + URI string + // Status is the health state: "healthy", "degraded", or "unreachable". + Status string + // Error is non-nil when Status is "degraded" or "unreachable". + Error error +} + +// Provider is the interface all event-bus provider adapters must implement. +// Each provider translates a ClusterConfig + DeployTarget into a set of typed +// IaC resource declarations without directly calling cloud APIs. +// +// Implementations live at providers/{nats,kafka,kinesis}/. +type Provider interface { + // Name returns the provider identifier: "nats", "kafka", or "kinesis". + Name() string + + // Resources returns the IaC resource declarations required to provision + // the event-bus cluster described by cfg on the given deploy target. + // Returns an error if the provider × target combination is unsupported + // or if cfg is invalid for this provider. + Resources(cfg eventbusv1.ClusterConfig, target DeployTarget) ([]iac.Resource, error) + + // ConnectionString derives the broker connection string from provisioned state. + // env selects environment-specific outputs (e.g. "prod", "staging"). + ConnectionString(state iac.State, env string) (string, error) + + // StreamResources returns the IaC resource declarations required to + // declare the given streams against an already-provisioned cluster + // (represented by state). + StreamResources(streams []eventbusv1.StreamConfig, state iac.State) ([]iac.Resource, error) + + // HealthCheck probes the event-bus cluster at uri and returns its health state. + HealthCheck(uri string) HealthCheck +} diff --git a/providers/provider_test.go b/providers/provider_test.go new file mode 100644 index 0000000..4b6106d --- /dev/null +++ b/providers/provider_test.go @@ -0,0 +1,76 @@ +package providers_test + +import ( + "testing" + + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// TestValidateProviderTarget_CompatibilityMatrix asserts the full provider×target +// compatibility matrix. Every supported combo must return nil; every unsupported +// combo must return a non-nil error. +// +// IMPORTANT: add a new row whenever a new DeployTarget or provider is added — +// this test gates the entire matrix, not just individual combos. +func TestValidateProviderTarget_CompatibilityMatrix(t *testing.T) { + type combo struct { + provider string + target providers.DeployTarget + wantErr bool + } + cases := []combo{ + // ── nats: supported targets ──────────────────────────────────────────── + {"nats", providers.TargetDigitalOceanApp, false}, + {"nats", providers.TargetAWSECS, false}, + {"nats", providers.TargetAWSEKS, false}, + {"nats", providers.TargetKubernetes, false}, + {"nats", providers.TargetSelfHosted, false}, + // ── nats: unsupported targets ────────────────────────────────────────── + {"nats", providers.TargetDigitalOceanManagedKafka, true}, + {"nats", providers.TargetAWSManagedKafka, true}, + {"nats", providers.TargetAWSKinesis, true}, + + // ── kafka: supported targets ─────────────────────────────────────────── + {"kafka", providers.TargetDigitalOceanManagedKafka, false}, + {"kafka", providers.TargetAWSManagedKafka, false}, + {"kafka", providers.TargetKubernetes, false}, + {"kafka", providers.TargetSelfHosted, false}, + // ── kafka: unsupported targets ───────────────────────────────────────── + {"kafka", providers.TargetDigitalOceanApp, true}, + {"kafka", providers.TargetAWSECS, true}, + {"kafka", providers.TargetAWSEKS, true}, + {"kafka", providers.TargetAWSKinesis, true}, + + // ── kinesis: supported targets ───────────────────────────────────────── + {"kinesis", providers.TargetAWSKinesis, false}, + // ── kinesis: unsupported targets ─────────────────────────────────────── + {"kinesis", providers.TargetDigitalOceanApp, true}, // key case per task spec + {"kinesis", providers.TargetDigitalOceanManagedKafka, true}, + {"kinesis", providers.TargetAWSECS, true}, + {"kinesis", providers.TargetAWSEKS, true}, + {"kinesis", providers.TargetAWSManagedKafka, true}, + {"kinesis", providers.TargetKubernetes, true}, + {"kinesis", providers.TargetSelfHosted, true}, + } + + for _, tc := range cases { + t.Run(tc.provider+"×"+string(tc.target), func(t *testing.T) { + err := providers.ValidateProviderTarget(tc.provider, tc.target) + if tc.wantErr && err == nil { + t.Errorf("expected validation error for %s × %s, got nil", tc.provider, tc.target) + } + if !tc.wantErr && err != nil { + t.Errorf("unexpected error for %s × %s: %v", tc.provider, tc.target, err) + } + }) + } +} + +// TestValidateProviderTarget_UnknownProvider asserts that an unrecognised +// provider name returns an error regardless of target. +func TestValidateProviderTarget_UnknownProvider(t *testing.T) { + err := providers.ValidateProviderTarget("rabbitmq", providers.TargetDigitalOceanApp) + if err == nil { + t.Error("expected error for unknown provider 'rabbitmq', got nil") + } +} diff --git a/providers/target.go b/providers/target.go new file mode 100644 index 0000000..6e600b9 --- /dev/null +++ b/providers/target.go @@ -0,0 +1,61 @@ +package providers + +import "fmt" + +// DeployTarget identifies the deployment platform for an event-bus cluster. +type DeployTarget string + +const ( + // TargetDigitalOceanApp deploys to DigitalOcean App Platform. + TargetDigitalOceanApp DeployTarget = "digitalocean.app_platform" + // TargetDigitalOceanManagedKafka deploys to DigitalOcean Managed Kafka. + TargetDigitalOceanManagedKafka DeployTarget = "digitalocean.managed_kafka" + // TargetAWSECS deploys to AWS ECS (Elastic Container Service). + TargetAWSECS DeployTarget = "aws.ecs" + // TargetAWSEKS deploys to AWS EKS (Elastic Kubernetes Service). + TargetAWSEKS DeployTarget = "aws.eks" + // TargetAWSManagedKafka deploys to AWS MSK (Managed Streaming for Apache Kafka). + TargetAWSManagedKafka DeployTarget = "aws.msk" + // TargetAWSKinesis deploys to AWS Kinesis Data Streams. + TargetAWSKinesis DeployTarget = "aws.kinesis" + // TargetKubernetes deploys to a generic Kubernetes cluster. + TargetKubernetes DeployTarget = "kubernetes" + // TargetSelfHosted deploys to a self-managed host (Docker, bare metal). + TargetSelfHosted DeployTarget = "self_hosted" +) + +// supportedTargets encodes the provider × DeployTarget compatibility matrix. +// Each provider maps to the set of deploy targets it supports. +// Combos absent from this map are rejected at config-load time. +var supportedTargets = map[string]map[DeployTarget]bool{ + "nats": { + TargetDigitalOceanApp: true, + TargetAWSECS: true, + TargetAWSEKS: true, + TargetKubernetes: true, + TargetSelfHosted: true, + }, + "kafka": { + TargetDigitalOceanManagedKafka: true, + TargetAWSManagedKafka: true, + TargetKubernetes: true, + TargetSelfHosted: true, + }, + "kinesis": { + TargetAWSKinesis: true, + }, +} + +// ValidateProviderTarget returns an error if the provider × target combination +// is unsupported. It is called at config-load time so misconfigured deployments +// are rejected before any IaC resources are emitted. +func ValidateProviderTarget(provider string, target DeployTarget) error { + targets, ok := supportedTargets[provider] + if !ok { + return fmt.Errorf("eventbus: unknown provider %q — supported providers are: nats, kafka, kinesis", provider) + } + if !targets[target] { + return fmt.Errorf("eventbus: provider %q does not support deploy target %q", provider, target) + } + return nil +} diff --git a/providers/target.go.bak b/providers/target.go.bak new file mode 100644 index 0000000..6e600b9 --- /dev/null +++ b/providers/target.go.bak @@ -0,0 +1,61 @@ +package providers + +import "fmt" + +// DeployTarget identifies the deployment platform for an event-bus cluster. +type DeployTarget string + +const ( + // TargetDigitalOceanApp deploys to DigitalOcean App Platform. + TargetDigitalOceanApp DeployTarget = "digitalocean.app_platform" + // TargetDigitalOceanManagedKafka deploys to DigitalOcean Managed Kafka. + TargetDigitalOceanManagedKafka DeployTarget = "digitalocean.managed_kafka" + // TargetAWSECS deploys to AWS ECS (Elastic Container Service). + TargetAWSECS DeployTarget = "aws.ecs" + // TargetAWSEKS deploys to AWS EKS (Elastic Kubernetes Service). + TargetAWSEKS DeployTarget = "aws.eks" + // TargetAWSManagedKafka deploys to AWS MSK (Managed Streaming for Apache Kafka). + TargetAWSManagedKafka DeployTarget = "aws.msk" + // TargetAWSKinesis deploys to AWS Kinesis Data Streams. + TargetAWSKinesis DeployTarget = "aws.kinesis" + // TargetKubernetes deploys to a generic Kubernetes cluster. + TargetKubernetes DeployTarget = "kubernetes" + // TargetSelfHosted deploys to a self-managed host (Docker, bare metal). + TargetSelfHosted DeployTarget = "self_hosted" +) + +// supportedTargets encodes the provider × DeployTarget compatibility matrix. +// Each provider maps to the set of deploy targets it supports. +// Combos absent from this map are rejected at config-load time. +var supportedTargets = map[string]map[DeployTarget]bool{ + "nats": { + TargetDigitalOceanApp: true, + TargetAWSECS: true, + TargetAWSEKS: true, + TargetKubernetes: true, + TargetSelfHosted: true, + }, + "kafka": { + TargetDigitalOceanManagedKafka: true, + TargetAWSManagedKafka: true, + TargetKubernetes: true, + TargetSelfHosted: true, + }, + "kinesis": { + TargetAWSKinesis: true, + }, +} + +// ValidateProviderTarget returns an error if the provider × target combination +// is unsupported. It is called at config-load time so misconfigured deployments +// are rejected before any IaC resources are emitted. +func ValidateProviderTarget(provider string, target DeployTarget) error { + targets, ok := supportedTargets[provider] + if !ok { + return fmt.Errorf("eventbus: unknown provider %q — supported providers are: nats, kafka, kinesis", provider) + } + if !targets[target] { + return fmt.Errorf("eventbus: provider %q does not support deploy target %q", provider, target) + } + return nil +} From 82df9af7ac91569a7515fe9d14b761ef27b99d7f Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 3 May 2026 17:37:26 -0400 Subject: [PATCH 3/8] chore: remove sed backup file Co-Authored-By: Claude Sonnet 4.6 --- providers/target.go.bak | 61 ----------------------------------------- 1 file changed, 61 deletions(-) delete mode 100644 providers/target.go.bak diff --git a/providers/target.go.bak b/providers/target.go.bak deleted file mode 100644 index 6e600b9..0000000 --- a/providers/target.go.bak +++ /dev/null @@ -1,61 +0,0 @@ -package providers - -import "fmt" - -// DeployTarget identifies the deployment platform for an event-bus cluster. -type DeployTarget string - -const ( - // TargetDigitalOceanApp deploys to DigitalOcean App Platform. - TargetDigitalOceanApp DeployTarget = "digitalocean.app_platform" - // TargetDigitalOceanManagedKafka deploys to DigitalOcean Managed Kafka. - TargetDigitalOceanManagedKafka DeployTarget = "digitalocean.managed_kafka" - // TargetAWSECS deploys to AWS ECS (Elastic Container Service). - TargetAWSECS DeployTarget = "aws.ecs" - // TargetAWSEKS deploys to AWS EKS (Elastic Kubernetes Service). - TargetAWSEKS DeployTarget = "aws.eks" - // TargetAWSManagedKafka deploys to AWS MSK (Managed Streaming for Apache Kafka). - TargetAWSManagedKafka DeployTarget = "aws.msk" - // TargetAWSKinesis deploys to AWS Kinesis Data Streams. - TargetAWSKinesis DeployTarget = "aws.kinesis" - // TargetKubernetes deploys to a generic Kubernetes cluster. - TargetKubernetes DeployTarget = "kubernetes" - // TargetSelfHosted deploys to a self-managed host (Docker, bare metal). - TargetSelfHosted DeployTarget = "self_hosted" -) - -// supportedTargets encodes the provider × DeployTarget compatibility matrix. -// Each provider maps to the set of deploy targets it supports. -// Combos absent from this map are rejected at config-load time. -var supportedTargets = map[string]map[DeployTarget]bool{ - "nats": { - TargetDigitalOceanApp: true, - TargetAWSECS: true, - TargetAWSEKS: true, - TargetKubernetes: true, - TargetSelfHosted: true, - }, - "kafka": { - TargetDigitalOceanManagedKafka: true, - TargetAWSManagedKafka: true, - TargetKubernetes: true, - TargetSelfHosted: true, - }, - "kinesis": { - TargetAWSKinesis: true, - }, -} - -// ValidateProviderTarget returns an error if the provider × target combination -// is unsupported. It is called at config-load time so misconfigured deployments -// are rejected before any IaC resources are emitted. -func ValidateProviderTarget(provider string, target DeployTarget) error { - targets, ok := supportedTargets[provider] - if !ok { - return fmt.Errorf("eventbus: unknown provider %q — supported providers are: nats, kafka, kinesis", provider) - } - if !targets[target] { - return fmt.Errorf("eventbus: provider %q does not support deploy target %q", provider, target) - } - return nil -} From 84b011646f624d0817065f2cf59816381049b985 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 3 May 2026 18:12:17 -0400 Subject: [PATCH 4/8] fix(task16): proto typed enums + rename workflow file to test.yml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - proto/eventbus.proto: replace string fields with typed enums: RetentionPolicy (LIMITS/INTEREST/WORKQUEUE) → StreamConfig.retention_policy DeliverPolicy (ALL/LAST/NEW/BY_START_SEQUENCE/BY_START_TIME) → ConsumerConfig.deliver_policy AckPolicy (EXPLICIT/NONE/ALL) → ConsumerConfig.ack_policy KafkaSecurityProtocol (PLAINTEXT/SSL/SASL_PLAINTEXT/SASL_SSL) → KafkaConfig.security_protocol KafkaSaslMechanism (PLAIN/SCRAM_SHA_256/SCRAM_SHA_512) → KafkaConfig.sasl_mechanism - gen/eventbus.pb.go: regenerated (protoc-gen-go v1.36.11) - .github/workflows/test.yml: renamed from ci.yml per task spec wfctl strict-contracts: OK | go build ./...: exit 0 | go test ./...: PASS Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/{ci.yml => test.yml} | 0 gen/eventbus.pb.go | 462 +++++++++++++++++++++---- proto/eventbus.proto | 81 ++++- 3 files changed, 459 insertions(+), 84 deletions(-) rename .github/workflows/{ci.yml => test.yml} (100%) diff --git a/.github/workflows/ci.yml b/.github/workflows/test.yml similarity index 100% rename from .github/workflows/ci.yml rename to .github/workflows/test.yml diff --git a/gen/eventbus.pb.go b/gen/eventbus.pb.go index e188221..284d6f6 100644 --- a/gen/eventbus.pb.go +++ b/gen/eventbus.pb.go @@ -22,6 +22,291 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +// RetentionPolicy controls how messages are retained in a stream. +type RetentionPolicy int32 + +const ( + RetentionPolicy_RETENTION_POLICY_UNSPECIFIED RetentionPolicy = 0 + // RETENTION_POLICY_LIMITS retains messages up to configured size / age limits. + RetentionPolicy_RETENTION_POLICY_LIMITS RetentionPolicy = 1 + // RETENTION_POLICY_INTEREST retains messages while at least one consumer is active. + RetentionPolicy_RETENTION_POLICY_INTEREST RetentionPolicy = 2 + // RETENTION_POLICY_WORKQUEUE retains messages until they are acknowledged by one consumer. + RetentionPolicy_RETENTION_POLICY_WORKQUEUE RetentionPolicy = 3 +) + +// Enum value maps for RetentionPolicy. +var ( + RetentionPolicy_name = map[int32]string{ + 0: "RETENTION_POLICY_UNSPECIFIED", + 1: "RETENTION_POLICY_LIMITS", + 2: "RETENTION_POLICY_INTEREST", + 3: "RETENTION_POLICY_WORKQUEUE", + } + RetentionPolicy_value = map[string]int32{ + "RETENTION_POLICY_UNSPECIFIED": 0, + "RETENTION_POLICY_LIMITS": 1, + "RETENTION_POLICY_INTEREST": 2, + "RETENTION_POLICY_WORKQUEUE": 3, + } +) + +func (x RetentionPolicy) Enum() *RetentionPolicy { + p := new(RetentionPolicy) + *p = x + return p +} + +func (x RetentionPolicy) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (RetentionPolicy) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[0].Descriptor() +} + +func (RetentionPolicy) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[0] +} + +func (x RetentionPolicy) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use RetentionPolicy.Descriptor instead. +func (RetentionPolicy) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{0} +} + +// DeliverPolicy controls which messages a consumer receives on first attach. +type DeliverPolicy int32 + +const ( + DeliverPolicy_DELIVER_POLICY_UNSPECIFIED DeliverPolicy = 0 + // DELIVER_POLICY_ALL delivers all messages from the beginning of the stream. + DeliverPolicy_DELIVER_POLICY_ALL DeliverPolicy = 1 + // DELIVER_POLICY_LAST delivers only the last message per subject. + DeliverPolicy_DELIVER_POLICY_LAST DeliverPolicy = 2 + // DELIVER_POLICY_NEW delivers only messages published after the consumer is created. + DeliverPolicy_DELIVER_POLICY_NEW DeliverPolicy = 3 + // DELIVER_POLICY_BY_START_SEQUENCE delivers messages starting from a specific sequence. + DeliverPolicy_DELIVER_POLICY_BY_START_SEQUENCE DeliverPolicy = 4 + // DELIVER_POLICY_BY_START_TIME delivers messages starting from a specific time. + DeliverPolicy_DELIVER_POLICY_BY_START_TIME DeliverPolicy = 5 +) + +// Enum value maps for DeliverPolicy. +var ( + DeliverPolicy_name = map[int32]string{ + 0: "DELIVER_POLICY_UNSPECIFIED", + 1: "DELIVER_POLICY_ALL", + 2: "DELIVER_POLICY_LAST", + 3: "DELIVER_POLICY_NEW", + 4: "DELIVER_POLICY_BY_START_SEQUENCE", + 5: "DELIVER_POLICY_BY_START_TIME", + } + DeliverPolicy_value = map[string]int32{ + "DELIVER_POLICY_UNSPECIFIED": 0, + "DELIVER_POLICY_ALL": 1, + "DELIVER_POLICY_LAST": 2, + "DELIVER_POLICY_NEW": 3, + "DELIVER_POLICY_BY_START_SEQUENCE": 4, + "DELIVER_POLICY_BY_START_TIME": 5, + } +) + +func (x DeliverPolicy) Enum() *DeliverPolicy { + p := new(DeliverPolicy) + *p = x + return p +} + +func (x DeliverPolicy) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (DeliverPolicy) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[1].Descriptor() +} + +func (DeliverPolicy) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[1] +} + +func (x DeliverPolicy) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use DeliverPolicy.Descriptor instead. +func (DeliverPolicy) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{1} +} + +// AckPolicy controls how consumers must acknowledge messages. +type AckPolicy int32 + +const ( + AckPolicy_ACK_POLICY_UNSPECIFIED AckPolicy = 0 + // ACK_POLICY_EXPLICIT requires explicit per-message acknowledgement (recommended). + AckPolicy_ACK_POLICY_EXPLICIT AckPolicy = 1 + // ACK_POLICY_NONE disables acknowledgements entirely (fire-and-forget). + AckPolicy_ACK_POLICY_NONE AckPolicy = 2 + // ACK_POLICY_ALL acknowledges all messages up to and including the specified sequence. + AckPolicy_ACK_POLICY_ALL AckPolicy = 3 +) + +// Enum value maps for AckPolicy. +var ( + AckPolicy_name = map[int32]string{ + 0: "ACK_POLICY_UNSPECIFIED", + 1: "ACK_POLICY_EXPLICIT", + 2: "ACK_POLICY_NONE", + 3: "ACK_POLICY_ALL", + } + AckPolicy_value = map[string]int32{ + "ACK_POLICY_UNSPECIFIED": 0, + "ACK_POLICY_EXPLICIT": 1, + "ACK_POLICY_NONE": 2, + "ACK_POLICY_ALL": 3, + } +) + +func (x AckPolicy) Enum() *AckPolicy { + p := new(AckPolicy) + *p = x + return p +} + +func (x AckPolicy) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (AckPolicy) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[2].Descriptor() +} + +func (AckPolicy) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[2] +} + +func (x AckPolicy) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use AckPolicy.Descriptor instead. +func (AckPolicy) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{2} +} + +// KafkaSecurityProtocol selects the client-broker transport security mode. +type KafkaSecurityProtocol int32 + +const ( + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_UNSPECIFIED KafkaSecurityProtocol = 0 + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_PLAINTEXT KafkaSecurityProtocol = 1 + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_SSL KafkaSecurityProtocol = 2 + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT KafkaSecurityProtocol = 3 + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_SASL_SSL KafkaSecurityProtocol = 4 +) + +// Enum value maps for KafkaSecurityProtocol. +var ( + KafkaSecurityProtocol_name = map[int32]string{ + 0: "KAFKA_SECURITY_PROTOCOL_UNSPECIFIED", + 1: "KAFKA_SECURITY_PROTOCOL_PLAINTEXT", + 2: "KAFKA_SECURITY_PROTOCOL_SSL", + 3: "KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT", + 4: "KAFKA_SECURITY_PROTOCOL_SASL_SSL", + } + KafkaSecurityProtocol_value = map[string]int32{ + "KAFKA_SECURITY_PROTOCOL_UNSPECIFIED": 0, + "KAFKA_SECURITY_PROTOCOL_PLAINTEXT": 1, + "KAFKA_SECURITY_PROTOCOL_SSL": 2, + "KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT": 3, + "KAFKA_SECURITY_PROTOCOL_SASL_SSL": 4, + } +) + +func (x KafkaSecurityProtocol) Enum() *KafkaSecurityProtocol { + p := new(KafkaSecurityProtocol) + *p = x + return p +} + +func (x KafkaSecurityProtocol) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (KafkaSecurityProtocol) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[3].Descriptor() +} + +func (KafkaSecurityProtocol) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[3] +} + +func (x KafkaSecurityProtocol) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use KafkaSecurityProtocol.Descriptor instead. +func (KafkaSecurityProtocol) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{3} +} + +// KafkaSaslMechanism selects the SASL authentication mechanism. +type KafkaSaslMechanism int32 + +const ( + KafkaSaslMechanism_KAFKA_SASL_MECHANISM_UNSPECIFIED KafkaSaslMechanism = 0 + KafkaSaslMechanism_KAFKA_SASL_MECHANISM_PLAIN KafkaSaslMechanism = 1 + KafkaSaslMechanism_KAFKA_SASL_MECHANISM_SCRAM_SHA_256 KafkaSaslMechanism = 2 + KafkaSaslMechanism_KAFKA_SASL_MECHANISM_SCRAM_SHA_512 KafkaSaslMechanism = 3 +) + +// Enum value maps for KafkaSaslMechanism. +var ( + KafkaSaslMechanism_name = map[int32]string{ + 0: "KAFKA_SASL_MECHANISM_UNSPECIFIED", + 1: "KAFKA_SASL_MECHANISM_PLAIN", + 2: "KAFKA_SASL_MECHANISM_SCRAM_SHA_256", + 3: "KAFKA_SASL_MECHANISM_SCRAM_SHA_512", + } + KafkaSaslMechanism_value = map[string]int32{ + "KAFKA_SASL_MECHANISM_UNSPECIFIED": 0, + "KAFKA_SASL_MECHANISM_PLAIN": 1, + "KAFKA_SASL_MECHANISM_SCRAM_SHA_256": 2, + "KAFKA_SASL_MECHANISM_SCRAM_SHA_512": 3, + } +) + +func (x KafkaSaslMechanism) Enum() *KafkaSaslMechanism { + p := new(KafkaSaslMechanism) + *p = x + return p +} + +func (x KafkaSaslMechanism) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (KafkaSaslMechanism) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[4].Descriptor() +} + +func (KafkaSaslMechanism) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[4] +} + +func (x KafkaSaslMechanism) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use KafkaSaslMechanism.Descriptor instead. +func (KafkaSaslMechanism) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{4} +} + // ClusterConfig is the typed config for infra.eventbus module. // provider and deploy_target select the Provider × DeployTarget combination. type ClusterConfig struct { @@ -29,7 +314,7 @@ type ClusterConfig struct { // provider selects the message-broker backend: "nats" | "kafka" | "kinesis". Provider string `protobuf:"bytes,1,opt,name=provider,proto3" json:"provider,omitempty"` // deploy_target selects the deployment platform: - // "digitalocean.app_platform" | "aws.ecs" | "aws.eks" | "kubernetes". + // "digitalocean.app_platform" | "aws.ecs" | "aws.eks" | "kubernetes" | etc. DeployTarget string `protobuf:"bytes,2,opt,name=deploy_target,json=deployTarget,proto3" json:"deploy_target,omitempty"` // version is the broker version to deploy (e.g. "2.10" for NATS). Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"` @@ -212,11 +497,10 @@ type KafkaConfig struct { // bootstrap_servers is the initial broker address list (comma-separated). // Used when connecting to an existing Kafka cluster rather than provisioning one. BootstrapServers string `protobuf:"bytes,1,opt,name=bootstrap_servers,json=bootstrapServers,proto3" json:"bootstrap_servers,omitempty"` - // security_protocol specifies the protocol for client-broker communication: - // "PLAINTEXT" | "SSL" | "SASL_PLAINTEXT" | "SASL_SSL". - SecurityProtocol string `protobuf:"bytes,2,opt,name=security_protocol,json=securityProtocol,proto3" json:"security_protocol,omitempty"` - // sasl_mechanism for SASL authentication: "PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512". - SaslMechanism string `protobuf:"bytes,3,opt,name=sasl_mechanism,json=saslMechanism,proto3" json:"sasl_mechanism,omitempty"` + // security_protocol specifies the protocol for client-broker communication. + SecurityProtocol KafkaSecurityProtocol `protobuf:"varint,2,opt,name=security_protocol,json=securityProtocol,proto3,enum=workflow.plugin.eventbus.v1.KafkaSecurityProtocol" json:"security_protocol,omitempty"` + // sasl_mechanism for SASL authentication. + SaslMechanism KafkaSaslMechanism `protobuf:"varint,3,opt,name=sasl_mechanism,json=saslMechanism,proto3,enum=workflow.plugin.eventbus.v1.KafkaSaslMechanism" json:"sasl_mechanism,omitempty"` // default_replication_factor for new topics (0 = use broker default). DefaultReplicationFactor int32 `protobuf:"varint,4,opt,name=default_replication_factor,json=defaultReplicationFactor,proto3" json:"default_replication_factor,omitempty"` // min_insync_replicas is the minimum number of in-sync replicas required for acks. @@ -262,18 +546,18 @@ func (x *KafkaConfig) GetBootstrapServers() string { return "" } -func (x *KafkaConfig) GetSecurityProtocol() string { +func (x *KafkaConfig) GetSecurityProtocol() KafkaSecurityProtocol { if x != nil { return x.SecurityProtocol } - return "" + return KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_UNSPECIFIED } -func (x *KafkaConfig) GetSaslMechanism() string { +func (x *KafkaConfig) GetSaslMechanism() KafkaSaslMechanism { if x != nil { return x.SaslMechanism } - return "" + return KafkaSaslMechanism_KAFKA_SASL_MECHANISM_UNSPECIFIED } func (x *KafkaConfig) GetDefaultReplicationFactor() int32 { @@ -425,8 +709,8 @@ type StreamConfig struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // subjects lists the NATS subject filter patterns bound to this stream. Subjects []string `protobuf:"bytes,2,rep,name=subjects,proto3" json:"subjects,omitempty"` - // retention_policy controls message retention: "limits" | "interest" | "workqueue". - RetentionPolicy string `protobuf:"bytes,3,opt,name=retention_policy,json=retentionPolicy,proto3" json:"retention_policy,omitempty"` + // retention_policy controls message retention semantics. + RetentionPolicy RetentionPolicy `protobuf:"varint,3,opt,name=retention_policy,json=retentionPolicy,proto3,enum=workflow.plugin.eventbus.v1.RetentionPolicy" json:"retention_policy,omitempty"` // num_replicas is the number of stream replicas within the cluster. NumReplicas int32 `protobuf:"varint,4,opt,name=num_replicas,json=numReplicas,proto3" json:"num_replicas,omitempty"` // max_bytes caps total on-disk storage for this stream (0 = unlimited). @@ -483,11 +767,11 @@ func (x *StreamConfig) GetSubjects() []string { return nil } -func (x *StreamConfig) GetRetentionPolicy() string { +func (x *StreamConfig) GetRetentionPolicy() RetentionPolicy { if x != nil { return x.RetentionPolicy } - return "" + return RetentionPolicy_RETENTION_POLICY_UNSPECIFIED } func (x *StreamConfig) GetNumReplicas() int32 { @@ -528,12 +812,10 @@ type ConsumerConfig struct { StreamName string `protobuf:"bytes,2,opt,name=stream_name,json=streamName,proto3" json:"stream_name,omitempty"` // filter_subject narrows delivery to messages matching this subject pattern. FilterSubject string `protobuf:"bytes,3,opt,name=filter_subject,json=filterSubject,proto3" json:"filter_subject,omitempty"` - // deliver_policy controls which messages are delivered on first attach: - // "all" | "last" | "new" | "by_start_sequence" | "by_start_time". - DeliverPolicy string `protobuf:"bytes,4,opt,name=deliver_policy,json=deliverPolicy,proto3" json:"deliver_policy,omitempty"` - // ack_policy controls how acknowledgements are required: - // "explicit" | "none" | "all". - AckPolicy string `protobuf:"bytes,5,opt,name=ack_policy,json=ackPolicy,proto3" json:"ack_policy,omitempty"` + // deliver_policy controls which messages are delivered on first attach. + DeliverPolicy DeliverPolicy `protobuf:"varint,4,opt,name=deliver_policy,json=deliverPolicy,proto3,enum=workflow.plugin.eventbus.v1.DeliverPolicy" json:"deliver_policy,omitempty"` + // ack_policy controls how acknowledgements are required. + AckPolicy AckPolicy `protobuf:"varint,5,opt,name=ack_policy,json=ackPolicy,proto3,enum=workflow.plugin.eventbus.v1.AckPolicy" json:"ack_policy,omitempty"` // max_deliver is the maximum number of delivery attempts before NACKing (0 = unlimited). MaxDeliver int32 `protobuf:"varint,6,opt,name=max_deliver,json=maxDeliver,proto3" json:"max_deliver,omitempty"` unknownFields protoimpl.UnknownFields @@ -591,18 +873,18 @@ func (x *ConsumerConfig) GetFilterSubject() string { return "" } -func (x *ConsumerConfig) GetDeliverPolicy() string { +func (x *ConsumerConfig) GetDeliverPolicy() DeliverPolicy { if x != nil { return x.DeliverPolicy } - return "" + return DeliverPolicy_DELIVER_POLICY_UNSPECIFIED } -func (x *ConsumerConfig) GetAckPolicy() string { +func (x *ConsumerConfig) GetAckPolicy() AckPolicy { if x != nil { return x.AckPolicy } - return "" + return AckPolicy_ACK_POLICY_UNSPECIFIED } func (x *ConsumerConfig) GetMaxDeliver() int32 { @@ -1042,11 +1324,11 @@ const file_eventbus_proto_rawDesc = "" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12*\n" + "\x11max_storage_bytes\x18\x02 \x01(\x03R\x0fmaxStorageBytes\x12(\n" + "\x10max_memory_bytes\x18\x03 \x01(\x03R\x0emaxMemoryBytes\x122\n" + - "\amax_age\x18\x04 \x01(\v2\x19.google.protobuf.DurationR\x06maxAge\"\xfc\x01\n" + + "\amax_age\x18\x04 \x01(\v2\x19.google.protobuf.DurationR\x06maxAge\"\xe1\x02\n" + "\vKafkaConfig\x12+\n" + - "\x11bootstrap_servers\x18\x01 \x01(\tR\x10bootstrapServers\x12+\n" + - "\x11security_protocol\x18\x02 \x01(\tR\x10securityProtocol\x12%\n" + - "\x0esasl_mechanism\x18\x03 \x01(\tR\rsaslMechanism\x12<\n" + + "\x11bootstrap_servers\x18\x01 \x01(\tR\x10bootstrapServers\x12_\n" + + "\x11security_protocol\x18\x02 \x01(\x0e22.workflow.plugin.eventbus.v1.KafkaSecurityProtocolR\x10securityProtocol\x12V\n" + + "\x0esasl_mechanism\x18\x03 \x01(\x0e2/.workflow.plugin.eventbus.v1.KafkaSaslMechanismR\rsaslMechanism\x12<\n" + "\x1adefault_replication_factor\x18\x04 \x01(\x05R\x18defaultReplicationFactor\x12.\n" + "\x13min_insync_replicas\x18\x05 \x01(\x05R\x11minInsyncReplicas\"~\n" + "\rKinesisConfig\x12\x16\n" + @@ -1057,23 +1339,23 @@ const file_eventbus_proto_rawDesc = "" + "\x0eResourceLimits\x12\x10\n" + "\x03cpu\x18\x01 \x01(\tR\x03cpu\x12\x16\n" + "\x06memory\x18\x02 \x01(\tR\x06memory\x12\x18\n" + - "\astorage\x18\x03 \x01(\tR\astorage\"\x93\x02\n" + + "\astorage\x18\x03 \x01(\tR\astorage\"\xc1\x02\n" + "\fStreamConfig\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1a\n" + - "\bsubjects\x18\x02 \x03(\tR\bsubjects\x12)\n" + - "\x10retention_policy\x18\x03 \x01(\tR\x0fretentionPolicy\x12!\n" + + "\bsubjects\x18\x02 \x03(\tR\bsubjects\x12W\n" + + "\x10retention_policy\x18\x03 \x01(\x0e2,.workflow.plugin.eventbus.v1.RetentionPolicyR\x0fretentionPolicy\x12!\n" + "\fnum_replicas\x18\x04 \x01(\x05R\vnumReplicas\x12\x1b\n" + "\tmax_bytes\x18\x05 \x01(\x03R\bmaxBytes\x122\n" + "\amax_age\x18\x06 \x01(\v2\x19.google.protobuf.DurationR\x06maxAge\x124\n" + - "\back_wait\x18\a \x01(\v2\x19.google.protobuf.DurationR\aackWait\"\xd3\x01\n" + + "\back_wait\x18\a \x01(\v2\x19.google.protobuf.DurationR\aackWait\"\xa7\x02\n" + "\x0eConsumerConfig\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1f\n" + "\vstream_name\x18\x02 \x01(\tR\n" + "streamName\x12%\n" + - "\x0efilter_subject\x18\x03 \x01(\tR\rfilterSubject\x12%\n" + - "\x0edeliver_policy\x18\x04 \x01(\tR\rdeliverPolicy\x12\x1d\n" + + "\x0efilter_subject\x18\x03 \x01(\tR\rfilterSubject\x12Q\n" + + "\x0edeliver_policy\x18\x04 \x01(\x0e2*.workflow.plugin.eventbus.v1.DeliverPolicyR\rdeliverPolicy\x12E\n" + "\n" + - "ack_policy\x18\x05 \x01(\tR\tackPolicy\x12\x1f\n" + + "ack_policy\x18\x05 \x01(\x0e2&.workflow.plugin.eventbus.v1.AckPolicyR\tackPolicy\x12\x1f\n" + "\vmax_deliver\x18\x06 \x01(\x05R\n" + "maxDeliver\"\xfb\x01\n" + "\x0ePublishRequest\x12\x18\n" + @@ -1107,7 +1389,35 @@ const file_eventbus_proto_rawDesc = "" + "\n" + "AckRequest\x12\x1b\n" + "\tack_token\x18\x01 \x01(\tR\backToken\"\r\n" + - "\vAckResponseB@Z>github.com/GoCodeAlone/workflow-plugin-eventbus/gen;eventbusv1b\x06proto3" + "\vAckResponse*\x8f\x01\n" + + "\x0fRetentionPolicy\x12 \n" + + "\x1cRETENTION_POLICY_UNSPECIFIED\x10\x00\x12\x1b\n" + + "\x17RETENTION_POLICY_LIMITS\x10\x01\x12\x1d\n" + + "\x19RETENTION_POLICY_INTEREST\x10\x02\x12\x1e\n" + + "\x1aRETENTION_POLICY_WORKQUEUE\x10\x03*\xc0\x01\n" + + "\rDeliverPolicy\x12\x1e\n" + + "\x1aDELIVER_POLICY_UNSPECIFIED\x10\x00\x12\x16\n" + + "\x12DELIVER_POLICY_ALL\x10\x01\x12\x17\n" + + "\x13DELIVER_POLICY_LAST\x10\x02\x12\x16\n" + + "\x12DELIVER_POLICY_NEW\x10\x03\x12$\n" + + " DELIVER_POLICY_BY_START_SEQUENCE\x10\x04\x12 \n" + + "\x1cDELIVER_POLICY_BY_START_TIME\x10\x05*i\n" + + "\tAckPolicy\x12\x1a\n" + + "\x16ACK_POLICY_UNSPECIFIED\x10\x00\x12\x17\n" + + "\x13ACK_POLICY_EXPLICIT\x10\x01\x12\x13\n" + + "\x0fACK_POLICY_NONE\x10\x02\x12\x12\n" + + "\x0eACK_POLICY_ALL\x10\x03*\xda\x01\n" + + "\x15KafkaSecurityProtocol\x12'\n" + + "#KAFKA_SECURITY_PROTOCOL_UNSPECIFIED\x10\x00\x12%\n" + + "!KAFKA_SECURITY_PROTOCOL_PLAINTEXT\x10\x01\x12\x1f\n" + + "\x1bKAFKA_SECURITY_PROTOCOL_SSL\x10\x02\x12*\n" + + "&KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT\x10\x03\x12$\n" + + " KAFKA_SECURITY_PROTOCOL_SASL_SSL\x10\x04*\xaa\x01\n" + + "\x12KafkaSaslMechanism\x12$\n" + + " KAFKA_SASL_MECHANISM_UNSPECIFIED\x10\x00\x12\x1e\n" + + "\x1aKAFKA_SASL_MECHANISM_PLAIN\x10\x01\x12&\n" + + "\"KAFKA_SASL_MECHANISM_SCRAM_SHA_256\x10\x02\x12&\n" + + "\"KAFKA_SASL_MECHANISM_SCRAM_SHA_512\x10\x03B@Z>github.com/GoCodeAlone/workflow-plugin-eventbus/gen;eventbusv1b\x06proto3" var ( file_eventbus_proto_rawDescOnce sync.Once @@ -1121,43 +1431,54 @@ func file_eventbus_proto_rawDescGZIP() []byte { return file_eventbus_proto_rawDescData } +var file_eventbus_proto_enumTypes = make([]protoimpl.EnumInfo, 5) var file_eventbus_proto_msgTypes = make([]protoimpl.MessageInfo, 16) var file_eventbus_proto_goTypes = []any{ - (*ClusterConfig)(nil), // 0: workflow.plugin.eventbus.v1.ClusterConfig - (*JetStreamConfig)(nil), // 1: workflow.plugin.eventbus.v1.JetStreamConfig - (*KafkaConfig)(nil), // 2: workflow.plugin.eventbus.v1.KafkaConfig - (*KinesisConfig)(nil), // 3: workflow.plugin.eventbus.v1.KinesisConfig - (*ResourceLimits)(nil), // 4: workflow.plugin.eventbus.v1.ResourceLimits - (*StreamConfig)(nil), // 5: workflow.plugin.eventbus.v1.StreamConfig - (*ConsumerConfig)(nil), // 6: workflow.plugin.eventbus.v1.ConsumerConfig - (*PublishRequest)(nil), // 7: workflow.plugin.eventbus.v1.PublishRequest - (*PublishResponse)(nil), // 8: workflow.plugin.eventbus.v1.PublishResponse - (*ConsumeRequest)(nil), // 9: workflow.plugin.eventbus.v1.ConsumeRequest - (*ConsumeResponse)(nil), // 10: workflow.plugin.eventbus.v1.ConsumeResponse - (*Message)(nil), // 11: workflow.plugin.eventbus.v1.Message - (*AckRequest)(nil), // 12: workflow.plugin.eventbus.v1.AckRequest - (*AckResponse)(nil), // 13: workflow.plugin.eventbus.v1.AckResponse - nil, // 14: workflow.plugin.eventbus.v1.PublishRequest.HeadersEntry - nil, // 15: workflow.plugin.eventbus.v1.Message.HeadersEntry - (*durationpb.Duration)(nil), // 16: google.protobuf.Duration + (RetentionPolicy)(0), // 0: workflow.plugin.eventbus.v1.RetentionPolicy + (DeliverPolicy)(0), // 1: workflow.plugin.eventbus.v1.DeliverPolicy + (AckPolicy)(0), // 2: workflow.plugin.eventbus.v1.AckPolicy + (KafkaSecurityProtocol)(0), // 3: workflow.plugin.eventbus.v1.KafkaSecurityProtocol + (KafkaSaslMechanism)(0), // 4: workflow.plugin.eventbus.v1.KafkaSaslMechanism + (*ClusterConfig)(nil), // 5: workflow.plugin.eventbus.v1.ClusterConfig + (*JetStreamConfig)(nil), // 6: workflow.plugin.eventbus.v1.JetStreamConfig + (*KafkaConfig)(nil), // 7: workflow.plugin.eventbus.v1.KafkaConfig + (*KinesisConfig)(nil), // 8: workflow.plugin.eventbus.v1.KinesisConfig + (*ResourceLimits)(nil), // 9: workflow.plugin.eventbus.v1.ResourceLimits + (*StreamConfig)(nil), // 10: workflow.plugin.eventbus.v1.StreamConfig + (*ConsumerConfig)(nil), // 11: workflow.plugin.eventbus.v1.ConsumerConfig + (*PublishRequest)(nil), // 12: workflow.plugin.eventbus.v1.PublishRequest + (*PublishResponse)(nil), // 13: workflow.plugin.eventbus.v1.PublishResponse + (*ConsumeRequest)(nil), // 14: workflow.plugin.eventbus.v1.ConsumeRequest + (*ConsumeResponse)(nil), // 15: workflow.plugin.eventbus.v1.ConsumeResponse + (*Message)(nil), // 16: workflow.plugin.eventbus.v1.Message + (*AckRequest)(nil), // 17: workflow.plugin.eventbus.v1.AckRequest + (*AckResponse)(nil), // 18: workflow.plugin.eventbus.v1.AckResponse + nil, // 19: workflow.plugin.eventbus.v1.PublishRequest.HeadersEntry + nil, // 20: workflow.plugin.eventbus.v1.Message.HeadersEntry + (*durationpb.Duration)(nil), // 21: google.protobuf.Duration } var file_eventbus_proto_depIdxs = []int32{ - 1, // 0: workflow.plugin.eventbus.v1.ClusterConfig.jetstream:type_name -> workflow.plugin.eventbus.v1.JetStreamConfig - 2, // 1: workflow.plugin.eventbus.v1.ClusterConfig.kafka:type_name -> workflow.plugin.eventbus.v1.KafkaConfig - 3, // 2: workflow.plugin.eventbus.v1.ClusterConfig.kinesis:type_name -> workflow.plugin.eventbus.v1.KinesisConfig - 4, // 3: workflow.plugin.eventbus.v1.ClusterConfig.limits:type_name -> workflow.plugin.eventbus.v1.ResourceLimits - 16, // 4: workflow.plugin.eventbus.v1.JetStreamConfig.max_age:type_name -> google.protobuf.Duration - 16, // 5: workflow.plugin.eventbus.v1.StreamConfig.max_age:type_name -> google.protobuf.Duration - 16, // 6: workflow.plugin.eventbus.v1.StreamConfig.ack_wait:type_name -> google.protobuf.Duration - 14, // 7: workflow.plugin.eventbus.v1.PublishRequest.headers:type_name -> workflow.plugin.eventbus.v1.PublishRequest.HeadersEntry - 16, // 8: workflow.plugin.eventbus.v1.ConsumeRequest.max_wait:type_name -> google.protobuf.Duration - 11, // 9: workflow.plugin.eventbus.v1.ConsumeResponse.messages:type_name -> workflow.plugin.eventbus.v1.Message - 15, // 10: workflow.plugin.eventbus.v1.Message.headers:type_name -> workflow.plugin.eventbus.v1.Message.HeadersEntry - 11, // [11:11] is the sub-list for method output_type - 11, // [11:11] is the sub-list for method input_type - 11, // [11:11] is the sub-list for extension type_name - 11, // [11:11] is the sub-list for extension extendee - 0, // [0:11] is the sub-list for field type_name + 6, // 0: workflow.plugin.eventbus.v1.ClusterConfig.jetstream:type_name -> workflow.plugin.eventbus.v1.JetStreamConfig + 7, // 1: workflow.plugin.eventbus.v1.ClusterConfig.kafka:type_name -> workflow.plugin.eventbus.v1.KafkaConfig + 8, // 2: workflow.plugin.eventbus.v1.ClusterConfig.kinesis:type_name -> workflow.plugin.eventbus.v1.KinesisConfig + 9, // 3: workflow.plugin.eventbus.v1.ClusterConfig.limits:type_name -> workflow.plugin.eventbus.v1.ResourceLimits + 21, // 4: workflow.plugin.eventbus.v1.JetStreamConfig.max_age:type_name -> google.protobuf.Duration + 3, // 5: workflow.plugin.eventbus.v1.KafkaConfig.security_protocol:type_name -> workflow.plugin.eventbus.v1.KafkaSecurityProtocol + 4, // 6: workflow.plugin.eventbus.v1.KafkaConfig.sasl_mechanism:type_name -> workflow.plugin.eventbus.v1.KafkaSaslMechanism + 0, // 7: workflow.plugin.eventbus.v1.StreamConfig.retention_policy:type_name -> workflow.plugin.eventbus.v1.RetentionPolicy + 21, // 8: workflow.plugin.eventbus.v1.StreamConfig.max_age:type_name -> google.protobuf.Duration + 21, // 9: workflow.plugin.eventbus.v1.StreamConfig.ack_wait:type_name -> google.protobuf.Duration + 1, // 10: workflow.plugin.eventbus.v1.ConsumerConfig.deliver_policy:type_name -> workflow.plugin.eventbus.v1.DeliverPolicy + 2, // 11: workflow.plugin.eventbus.v1.ConsumerConfig.ack_policy:type_name -> workflow.plugin.eventbus.v1.AckPolicy + 19, // 12: workflow.plugin.eventbus.v1.PublishRequest.headers:type_name -> workflow.plugin.eventbus.v1.PublishRequest.HeadersEntry + 21, // 13: workflow.plugin.eventbus.v1.ConsumeRequest.max_wait:type_name -> google.protobuf.Duration + 16, // 14: workflow.plugin.eventbus.v1.ConsumeResponse.messages:type_name -> workflow.plugin.eventbus.v1.Message + 20, // 15: workflow.plugin.eventbus.v1.Message.headers:type_name -> workflow.plugin.eventbus.v1.Message.HeadersEntry + 16, // [16:16] is the sub-list for method output_type + 16, // [16:16] is the sub-list for method input_type + 16, // [16:16] is the sub-list for extension type_name + 16, // [16:16] is the sub-list for extension extendee + 0, // [0:16] is the sub-list for field type_name } func init() { file_eventbus_proto_init() } @@ -1170,13 +1491,14 @@ func file_eventbus_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_eventbus_proto_rawDesc), len(file_eventbus_proto_rawDesc)), - NumEnums: 0, + NumEnums: 5, NumMessages: 16, NumExtensions: 0, NumServices: 0, }, GoTypes: file_eventbus_proto_goTypes, DependencyIndexes: file_eventbus_proto_depIdxs, + EnumInfos: file_eventbus_proto_enumTypes, MessageInfos: file_eventbus_proto_msgTypes, }.Build() File_eventbus_proto = out.File diff --git a/proto/eventbus.proto b/proto/eventbus.proto index 567e6e3..f504a82 100644 --- a/proto/eventbus.proto +++ b/proto/eventbus.proto @@ -5,6 +5,62 @@ option go_package = "github.com/GoCodeAlone/workflow-plugin-eventbus/gen;eventbu import "google/protobuf/duration.proto"; +// ─── enums ──────────────────────────────────────────────────────────────────── + +// RetentionPolicy controls how messages are retained in a stream. +enum RetentionPolicy { + RETENTION_POLICY_UNSPECIFIED = 0; + // RETENTION_POLICY_LIMITS retains messages up to configured size / age limits. + RETENTION_POLICY_LIMITS = 1; + // RETENTION_POLICY_INTEREST retains messages while at least one consumer is active. + RETENTION_POLICY_INTEREST = 2; + // RETENTION_POLICY_WORKQUEUE retains messages until they are acknowledged by one consumer. + RETENTION_POLICY_WORKQUEUE = 3; +} + +// DeliverPolicy controls which messages a consumer receives on first attach. +enum DeliverPolicy { + DELIVER_POLICY_UNSPECIFIED = 0; + // DELIVER_POLICY_ALL delivers all messages from the beginning of the stream. + DELIVER_POLICY_ALL = 1; + // DELIVER_POLICY_LAST delivers only the last message per subject. + DELIVER_POLICY_LAST = 2; + // DELIVER_POLICY_NEW delivers only messages published after the consumer is created. + DELIVER_POLICY_NEW = 3; + // DELIVER_POLICY_BY_START_SEQUENCE delivers messages starting from a specific sequence. + DELIVER_POLICY_BY_START_SEQUENCE = 4; + // DELIVER_POLICY_BY_START_TIME delivers messages starting from a specific time. + DELIVER_POLICY_BY_START_TIME = 5; +} + +// AckPolicy controls how consumers must acknowledge messages. +enum AckPolicy { + ACK_POLICY_UNSPECIFIED = 0; + // ACK_POLICY_EXPLICIT requires explicit per-message acknowledgement (recommended). + ACK_POLICY_EXPLICIT = 1; + // ACK_POLICY_NONE disables acknowledgements entirely (fire-and-forget). + ACK_POLICY_NONE = 2; + // ACK_POLICY_ALL acknowledges all messages up to and including the specified sequence. + ACK_POLICY_ALL = 3; +} + +// KafkaSecurityProtocol selects the client-broker transport security mode. +enum KafkaSecurityProtocol { + KAFKA_SECURITY_PROTOCOL_UNSPECIFIED = 0; + KAFKA_SECURITY_PROTOCOL_PLAINTEXT = 1; + KAFKA_SECURITY_PROTOCOL_SSL = 2; + KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT = 3; + KAFKA_SECURITY_PROTOCOL_SASL_SSL = 4; +} + +// KafkaSaslMechanism selects the SASL authentication mechanism. +enum KafkaSaslMechanism { + KAFKA_SASL_MECHANISM_UNSPECIFIED = 0; + KAFKA_SASL_MECHANISM_PLAIN = 1; + KAFKA_SASL_MECHANISM_SCRAM_SHA_256 = 2; + KAFKA_SASL_MECHANISM_SCRAM_SHA_512 = 3; +} + // ─── cluster / module configs ───────────────────────────────────────────────── // ClusterConfig is the typed config for infra.eventbus module. @@ -13,7 +69,7 @@ message ClusterConfig { // provider selects the message-broker backend: "nats" | "kafka" | "kinesis". string provider = 1; // deploy_target selects the deployment platform: - // "digitalocean.app_platform" | "aws.ecs" | "aws.eks" | "kubernetes". + // "digitalocean.app_platform" | "aws.ecs" | "aws.eks" | "kubernetes" | etc. string deploy_target = 2; // version is the broker version to deploy (e.g. "2.10" for NATS). string version = 3; @@ -46,11 +102,10 @@ message KafkaConfig { // bootstrap_servers is the initial broker address list (comma-separated). // Used when connecting to an existing Kafka cluster rather than provisioning one. string bootstrap_servers = 1; - // security_protocol specifies the protocol for client-broker communication: - // "PLAINTEXT" | "SSL" | "SASL_PLAINTEXT" | "SASL_SSL". - string security_protocol = 2; - // sasl_mechanism for SASL authentication: "PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512". - string sasl_mechanism = 3; + // security_protocol specifies the protocol for client-broker communication. + KafkaSecurityProtocol security_protocol = 2; + // sasl_mechanism for SASL authentication. + KafkaSaslMechanism sasl_mechanism = 3; // default_replication_factor for new topics (0 = use broker default). int32 default_replication_factor = 4; // min_insync_replicas is the minimum number of in-sync replicas required for acks. @@ -83,8 +138,8 @@ message StreamConfig { string name = 1; // subjects lists the NATS subject filter patterns bound to this stream. repeated string subjects = 2; - // retention_policy controls message retention: "limits" | "interest" | "workqueue". - string retention_policy = 3; + // retention_policy controls message retention semantics. + RetentionPolicy retention_policy = 3; // num_replicas is the number of stream replicas within the cluster. int32 num_replicas = 4; // max_bytes caps total on-disk storage for this stream (0 = unlimited). @@ -104,12 +159,10 @@ message ConsumerConfig { string stream_name = 2; // filter_subject narrows delivery to messages matching this subject pattern. string filter_subject = 3; - // deliver_policy controls which messages are delivered on first attach: - // "all" | "last" | "new" | "by_start_sequence" | "by_start_time". - string deliver_policy = 4; - // ack_policy controls how acknowledgements are required: - // "explicit" | "none" | "all". - string ack_policy = 5; + // deliver_policy controls which messages are delivered on first attach. + DeliverPolicy deliver_policy = 4; + // ack_policy controls how acknowledgements are required. + AckPolicy ack_policy = 5; // max_deliver is the maximum number of delivery attempts before NACKing (0 = unlimited). int32 max_deliver = 6; } From 5626862098ba16baea92cb76076c4c4fd821b0f5 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 3 May 2026 18:14:16 -0400 Subject: [PATCH 5/8] =?UTF-8?q?fix(task16+17):=20code-review=20fixes=20?= =?UTF-8?q?=E2=80=94=206=20issues=20addressed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Task 16: - cmd/main.go: add var version = "dev" so -X main.version ldflags stamps correctly; improve panic message with repo URL - gen/proto_smoke_test.go: 3 round-trip tests (ClusterConfig, ConsumerConfig enums, StreamConfig RetentionPolicy) — CI test job is no longer vacuously green - Makefile: pin proto-gen comment to protoc v7.34.1 + protoc-gen-go v1.36.11 with upgrade warning Task 17: - providers/provider.go: HealthStatus typed string (healthy/degraded/unreachable); HealthCheck.Error → Err; Provider.HealthCheck → Probe (avoids method/type collision) - iac/iac.go: Output struct with Sensitive bool enforces sensitivity per design §Security; State.Outputs map[string]Output; Resource gains Properties map[string]string for downstream IaC plugin configuration (e.g. image, shard_count, storage_size) - providers/provider_test.go: edge-case tests for empty provider + wrong-case providers go build ./...: exit 0 | go test ./...: 6 packages PASS (29 subtests) | wfctl: OK Co-Authored-By: Claude Sonnet 4.6 --- Makefile | 7 ++- cmd/workflow-plugin-eventbus/main.go | 5 +- gen/proto_smoke_test.go | 83 ++++++++++++++++++++++++++++ iac/iac.go | 35 +++++++++--- providers/provider.go | 29 +++++++--- providers/provider_test.go | 23 ++++++++ 6 files changed, 162 insertions(+), 20 deletions(-) create mode 100644 gen/proto_smoke_test.go diff --git a/Makefile b/Makefile index 3c0dd86..3cd5cec 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,10 @@ .PHONY: proto-gen build test vet -# Regenerate Go bindings from proto/eventbus.proto. -# Requires: protoc + protoc-gen-go (go install google.golang.org/protobuf/cmd/protoc-gen-go@latest) +# proto-gen regenerates gen/eventbus.pb.go from proto/eventbus.proto. +# Requires: protoc v7.34.1 and protoc-gen-go v1.36.11 +# go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.11 +# WARNING: running with a different protoc/protoc-gen-go version will produce +# a different gen/eventbus.pb.go — commit only if intentionally upgrading. proto-gen: protoc \ --proto_path=proto \ diff --git a/cmd/workflow-plugin-eventbus/main.go b/cmd/workflow-plugin-eventbus/main.go index c82665c..2d24c58 100644 --- a/cmd/workflow-plugin-eventbus/main.go +++ b/cmd/workflow-plugin-eventbus/main.go @@ -5,7 +5,10 @@ // Status: pre-pilot scaffold — provider implementations are in progress. package main +// version is stamped at release time via goreleaser ldflags (-X main.version=). +var version = "dev" + func main() { // TODO(Task 17): wire sdk.Serve(plugin.New()) once Provider interface is implemented. - panic("workflow-plugin-eventbus: not yet implemented — scaffold only") + panic("workflow-plugin-eventbus: provider implementation pending — scaffold only; see github.com/GoCodeAlone/workflow-plugin-eventbus") } diff --git a/gen/proto_smoke_test.go b/gen/proto_smoke_test.go new file mode 100644 index 0000000..1c44d9d --- /dev/null +++ b/gen/proto_smoke_test.go @@ -0,0 +1,83 @@ +package eventbusv1_test + +import ( + "testing" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "google.golang.org/protobuf/proto" +) + +// TestClusterConfigRoundTrip verifies that ClusterConfig survives a proto +// marshal → unmarshal cycle without data loss. This keeps the CI test job +// non-vacuous even before provider implementations exist. +func TestClusterConfigRoundTrip(t *testing.T) { + orig := &eventbusv1.ClusterConfig{ + Provider: "nats", + DeployTarget: "kubernetes", + Replicas: 3, + Jetstream: &eventbusv1.JetStreamConfig{ + Enabled: true, + MaxStorageBytes: 53687091200, + }, + } + b, err := proto.Marshal(orig) + if err != nil { + t.Fatalf("marshal: %v", err) + } + got := &eventbusv1.ClusterConfig{} + if err := proto.Unmarshal(b, got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if got.Provider != orig.Provider || got.Replicas != orig.Replicas || got.DeployTarget != orig.DeployTarget { + t.Errorf("round-trip mismatch: got %v, want %v", got, orig) + } + if got.Jetstream == nil || got.Jetstream.Enabled != orig.Jetstream.Enabled { + t.Errorf("jetstream round-trip mismatch") + } +} + +// TestConsumerConfigEnums verifies that typed enum fields survive a round-trip +// and are not silently zeroed out. +func TestConsumerConfigEnums(t *testing.T) { + orig := &eventbusv1.ConsumerConfig{ + Name: "bmw-handler", + StreamName: "BMW_FULFILLMENT", + DeliverPolicy: eventbusv1.DeliverPolicy_DELIVER_POLICY_NEW, + AckPolicy: eventbusv1.AckPolicy_ACK_POLICY_EXPLICIT, + MaxDeliver: 5, + } + b, err := proto.Marshal(orig) + if err != nil { + t.Fatalf("marshal: %v", err) + } + got := &eventbusv1.ConsumerConfig{} + if err := proto.Unmarshal(b, got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if got.DeliverPolicy != orig.DeliverPolicy { + t.Errorf("deliver_policy: got %v, want %v", got.DeliverPolicy, orig.DeliverPolicy) + } + if got.AckPolicy != orig.AckPolicy { + t.Errorf("ack_policy: got %v, want %v", got.AckPolicy, orig.AckPolicy) + } +} + +// TestStreamConfigRetentionPolicy verifies RetentionPolicy enum round-trip. +func TestStreamConfigRetentionPolicy(t *testing.T) { + orig := &eventbusv1.StreamConfig{ + Name: "BMW_FULFILLMENT", + RetentionPolicy: eventbusv1.RetentionPolicy_RETENTION_POLICY_WORKQUEUE, + NumReplicas: 3, + } + b, err := proto.Marshal(orig) + if err != nil { + t.Fatalf("marshal: %v", err) + } + got := &eventbusv1.StreamConfig{} + if err := proto.Unmarshal(b, got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if got.RetentionPolicy != orig.RetentionPolicy { + t.Errorf("retention_policy: got %v, want %v", got.RetentionPolicy, orig.RetentionPolicy) + } +} diff --git a/iac/iac.go b/iac/iac.go index 98ec406..10adfdc 100644 --- a/iac/iac.go +++ b/iac/iac.go @@ -12,21 +12,38 @@ type Resource struct { Kind string // Name is the resource instance name, unique within the deployment environment. Name string - // Labels are metadata key/value pairs attached to the resource (e.g. "env": "prod"). + // Properties carries resource-type-specific configuration key/value pairs. + // Schema per Kind is defined by the downstream IaC plugin that realizes this resource. + // Examples: image, replicas, storage_size, port, shard_count. + Properties map[string]string + // Labels are metadata tags attached to the resource (e.g. "env": "prod", "team": "bmw"). + // These are not config — they are used for filtering, cost allocation, and observability. Labels map[string]string } +// Output is a single state value with explicit sensitivity marking. +// Sensitive outputs (connection strings, credentials, ARNs with embedded secrets) +// must be marked Sensitive: true to prevent accidental logging in plan output. +type Output struct { + // Value is the resolved output string. + Value string + // Sensitive marks outputs that must not appear in plain-text plan/log output. + // The workflow engine suppresses sensitive outputs unless --show-sensitive is passed. + Sensitive bool +} + // State holds resolved outputs from previously provisioned resources. -// All values are typed strings (connection strings, ARNs, cluster IDs, etc.). -// Sensitive outputs (connection strings with credentials) are marked as such -// by convention: keys ending in "_uri" or "_dsn" are treated as sensitive. +// Each output is explicitly typed and sensitivity-marked. type State struct { - // Outputs maps output key to value, e.g. "uri" → "nats://host:4222". - Outputs map[string]string + // Outputs maps output key to a typed, sensitivity-marked Output value. + // Example: "uri" → Output{Value: "nats://host:4222", Sensitive: true} + Outputs map[string]Output } -// Output returns the state output for key, and whether it was present. +// Output returns the value for key and whether it was present. +// The Sensitive flag is not surfaced here — callers that need it should read +// s.Outputs[key] directly. func (s State) Output(key string) (string, bool) { - v, ok := s.Outputs[key] - return v, ok + out, ok := s.Outputs[key] + return out.Value, ok } diff --git a/providers/provider.go b/providers/provider.go index 6232a8d..71ee999 100644 --- a/providers/provider.go +++ b/providers/provider.go @@ -5,18 +5,31 @@ package providers import ( - "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" ) -// HealthCheck represents the result of a liveness probe against an event-bus URI. +// HealthStatus is the typed health state returned by Provider.Probe. +type HealthStatus string + +const ( + // HealthStatusHealthy indicates the broker is reachable and fully operational. + HealthStatusHealthy HealthStatus = "healthy" + // HealthStatusDegraded indicates the broker is reachable but impaired + // (e.g. a replica is down, JetStream is slow). + HealthStatusDegraded HealthStatus = "degraded" + // HealthStatusUnreachable indicates the broker did not respond within the probe timeout. + HealthStatusUnreachable HealthStatus = "unreachable" +) + +// HealthCheck is the result of a liveness probe against an event-bus URI. type HealthCheck struct { // URI is the address that was probed. URI string - // Status is the health state: "healthy", "degraded", or "unreachable". - Status string - // Error is non-nil when Status is "degraded" or "unreachable". - Error error + // Status is the typed health state. + Status HealthStatus + // Err is non-nil when Status is HealthStatusDegraded or HealthStatusUnreachable. + Err error } // Provider is the interface all event-bus provider adapters must implement. @@ -43,6 +56,6 @@ type Provider interface { // (represented by state). StreamResources(streams []eventbusv1.StreamConfig, state iac.State) ([]iac.Resource, error) - // HealthCheck probes the event-bus cluster at uri and returns its health state. - HealthCheck(uri string) HealthCheck + // Probe probes the event-bus cluster at uri and returns its health state. + Probe(uri string) HealthCheck } diff --git a/providers/provider_test.go b/providers/provider_test.go index 4b6106d..e1a1c9b 100644 --- a/providers/provider_test.go +++ b/providers/provider_test.go @@ -74,3 +74,26 @@ func TestValidateProviderTarget_UnknownProvider(t *testing.T) { t.Error("expected error for unknown provider 'rabbitmq', got nil") } } + +// TestValidateProviderTarget_EdgeCases covers inputs that callers might pass +// accidentally: empty string and wrong case. +func TestValidateProviderTarget_EdgeCases(t *testing.T) { + cases := []struct { + name string + provider string + target providers.DeployTarget + }{ + {"empty provider", "", providers.TargetDigitalOceanApp}, + {"uppercase NATS", "NATS", providers.TargetDigitalOceanApp}, + {"mixed case Kafka", "Kafka", providers.TargetAWSManagedKafka}, + {"uppercase KINESIS", "KINESIS", providers.TargetAWSKinesis}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := providers.ValidateProviderTarget(tc.provider, tc.target) + if err == nil { + t.Errorf("expected error for provider %q, got nil", tc.provider) + } + }) + } +} From dbc6a640ec0d3158bd168fc33a1c4a0cee70ac1c Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 3 May 2026 18:16:22 -0400 Subject: [PATCH 6/8] fix(task16): update README enum names to match typed proto enums MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit retention_policy: limits → RETENTION_POLICY_LIMITS (line 41) ack_policy: explicit → ACK_POLICY_EXPLICIT (lines 50, 75) Co-Authored-By: Claude Sonnet 4.6 --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 0ce21e8..e00344d 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ modules: config: name: MY_EVENTS subjects: ["events.>"] - retention_policy: limits + retention_policy: RETENTION_POLICY_LIMITS max_bytes: 10737418240 # 10 GB - name: my-consumer @@ -47,7 +47,7 @@ modules: stream_name: MY_EVENTS name: my-handler filter_subject: "events.>" - ack_policy: explicit + ack_policy: ACK_POLICY_EXPLICIT max_deliver: 5 ``` @@ -72,7 +72,7 @@ my-handler: stream_name: MY_EVENTS name: my-handler filter_subject: "events.>" - ack_policy: explicit + ack_policy: ACK_POLICY_EXPLICIT steps: - name: ack type: step.eventbus.ack From fd9969e01c235b75146906530a218a62b6db4790 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 3 May 2026 18:19:51 -0400 Subject: [PATCH 7/8] feat(providers): add kafka + kinesis stub registrations (real impls deferred per manifest out-of-scope) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - providers/kafka/kafka.go: stub Provider — all methods return errNotImplemented; Probe returns HealthStatusUnreachable. Fails fast on config errors rather than panicking. - providers/kinesis/kinesis.go: same shape, kinesis-specific error message. - providers/kafka/kafka_test.go: 5 tests (Name, Resources/ConnectionString/StreamResources error, Probe unreachable) - providers/kinesis/kinesis_test.go: same 5 tests TDD regression invariant: revert→FAIL (4 failures), restore→PASS go build ./...: exit 0 | go test ./...: 4 packages PASS (39 tests total) Co-Authored-By: Claude Sonnet 4.6 --- providers/kafka/kafka.go | 54 ++++++++++++++++++++++++ providers/kafka/kafka_test.go | 68 +++++++++++++++++++++++++++++++ providers/kinesis/kinesis.go | 54 ++++++++++++++++++++++++ providers/kinesis/kinesis_test.go | 68 +++++++++++++++++++++++++++++++ 4 files changed, 244 insertions(+) create mode 100644 providers/kafka/kafka.go create mode 100644 providers/kafka/kafka_test.go create mode 100644 providers/kinesis/kinesis.go create mode 100644 providers/kinesis/kinesis_test.go diff --git a/providers/kafka/kafka.go b/providers/kafka/kafka.go new file mode 100644 index 0000000..047952d --- /dev/null +++ b/providers/kafka/kafka.go @@ -0,0 +1,54 @@ +// Package kafka provides a stub implementation of the kafka event-bus Provider. +// +// Per pilot manifest out-of-scope: "DO Managed Kafka and AWS Kinesis as eventbus +// providers active for pilot — built into plugin but not activated; NATS only." +// This stub registers the kafka provider in the registry so that config referencing +// provider: kafka fails fast with a clear error rather than panicking or silently +// doing nothing. Real implementation lands when a downstream consumer activates it. +package kafka + +import ( + "errors" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// errNotImplemented is the canonical error returned by all stub methods. +var errNotImplemented = errors.New("kafka provider not implemented for pilot; register a real implementation when activating") + +// provider is the stub kafka Provider. +type provider struct{} + +// New returns the stub kafka Provider. +func New() providers.Provider { + return &provider{} +} + +// Name implements providers.Provider. +func (p *provider) Name() string { return "kafka" } + +// Resources implements providers.Provider — stub, always errors. +func (p *provider) Resources(_ eventbusv1.ClusterConfig, _ providers.DeployTarget) ([]iac.Resource, error) { + return nil, errNotImplemented +} + +// ConnectionString implements providers.Provider — stub, always errors. +func (p *provider) ConnectionString(_ iac.State, _ string) (string, error) { + return "", errNotImplemented +} + +// StreamResources implements providers.Provider — stub, always errors. +func (p *provider) StreamResources(_ []eventbusv1.StreamConfig, _ iac.State) ([]iac.Resource, error) { + return nil, errNotImplemented +} + +// Probe implements providers.Provider — stub, always returns unreachable. +func (p *provider) Probe(uri string) providers.HealthCheck { + return providers.HealthCheck{ + URI: uri, + Status: providers.HealthStatusUnreachable, + Err: errNotImplemented, + } +} diff --git a/providers/kafka/kafka_test.go b/providers/kafka/kafka_test.go new file mode 100644 index 0000000..1fdca66 --- /dev/null +++ b/providers/kafka/kafka_test.go @@ -0,0 +1,68 @@ +package kafka_test + +import ( + "strings" + "testing" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/kafka" +) + +// TestKafkaProvider_Name asserts the provider reports the correct identifier. +func TestKafkaProvider_Name(t *testing.T) { + p := kafka.New() + if p.Name() != "kafka" { + t.Errorf("Name() = %q, want %q", p.Name(), "kafka") + } +} + +// TestKafkaProvider_Stub_ErrorsOnResources asserts that Resources returns a +// "not implemented" error — the kafka provider is a registry stub for the pilot. +func TestKafkaProvider_Stub_ErrorsOnResources(t *testing.T) { + p := kafka.New() + _, err := p.Resources(eventbusv1.ClusterConfig{}, providers.TargetDigitalOceanManagedKafka) + if err == nil { + t.Fatal("expected stub error from Resources, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKafkaProvider_Stub_ErrorsOnConnectionString asserts ConnectionString also stubs. +func TestKafkaProvider_Stub_ErrorsOnConnectionString(t *testing.T) { + p := kafka.New() + _, err := p.ConnectionString(iac.State{}, "prod") + if err == nil { + t.Fatal("expected stub error from ConnectionString, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKafkaProvider_Stub_ErrorsOnStreamResources asserts StreamResources also stubs. +func TestKafkaProvider_Stub_ErrorsOnStreamResources(t *testing.T) { + p := kafka.New() + _, err := p.StreamResources(nil, iac.State{}) + if err == nil { + t.Fatal("expected stub error from StreamResources, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKafkaProvider_Stub_ProbeReturnsUnreachable asserts Probe returns unreachable. +func TestKafkaProvider_Stub_ProbeReturnsUnreachable(t *testing.T) { + p := kafka.New() + hc := p.Probe("kafka://localhost:9092") + if hc.Status != providers.HealthStatusUnreachable { + t.Errorf("Probe() status = %q, want %q", hc.Status, providers.HealthStatusUnreachable) + } + if hc.Err == nil { + t.Error("Probe() Err should be non-nil for stub") + } +} diff --git a/providers/kinesis/kinesis.go b/providers/kinesis/kinesis.go new file mode 100644 index 0000000..8948731 --- /dev/null +++ b/providers/kinesis/kinesis.go @@ -0,0 +1,54 @@ +// Package kinesis provides a stub implementation of the kinesis event-bus Provider. +// +// Per pilot manifest out-of-scope: "DO Managed Kafka and AWS Kinesis as eventbus +// providers active for pilot — built into plugin but not activated; NATS only." +// This stub registers the kinesis provider so that config referencing provider: kinesis +// fails fast with a clear error. Real implementation lands when a downstream consumer +// activates it. +package kinesis + +import ( + "errors" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// errNotImplemented is the canonical error returned by all stub methods. +var errNotImplemented = errors.New("kinesis provider not implemented for pilot; register a real implementation when activating") + +// provider is the stub kinesis Provider. +type provider struct{} + +// New returns the stub kinesis Provider. +func New() providers.Provider { + return &provider{} +} + +// Name implements providers.Provider. +func (p *provider) Name() string { return "kinesis" } + +// Resources implements providers.Provider — stub, always errors. +func (p *provider) Resources(_ eventbusv1.ClusterConfig, _ providers.DeployTarget) ([]iac.Resource, error) { + return nil, errNotImplemented +} + +// ConnectionString implements providers.Provider — stub, always errors. +func (p *provider) ConnectionString(_ iac.State, _ string) (string, error) { + return "", errNotImplemented +} + +// StreamResources implements providers.Provider — stub, always errors. +func (p *provider) StreamResources(_ []eventbusv1.StreamConfig, _ iac.State) ([]iac.Resource, error) { + return nil, errNotImplemented +} + +// Probe implements providers.Provider — stub, always returns unreachable. +func (p *provider) Probe(uri string) providers.HealthCheck { + return providers.HealthCheck{ + URI: uri, + Status: providers.HealthStatusUnreachable, + Err: errNotImplemented, + } +} diff --git a/providers/kinesis/kinesis_test.go b/providers/kinesis/kinesis_test.go new file mode 100644 index 0000000..afbd630 --- /dev/null +++ b/providers/kinesis/kinesis_test.go @@ -0,0 +1,68 @@ +package kinesis_test + +import ( + "strings" + "testing" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/kinesis" +) + +// TestKinesisProvider_Name asserts the provider reports the correct identifier. +func TestKinesisProvider_Name(t *testing.T) { + p := kinesis.New() + if p.Name() != "kinesis" { + t.Errorf("Name() = %q, want %q", p.Name(), "kinesis") + } +} + +// TestKinesisProvider_Stub_ErrorsOnResources asserts that Resources returns a +// "not implemented" error — the kinesis provider is a registry stub for the pilot. +func TestKinesisProvider_Stub_ErrorsOnResources(t *testing.T) { + p := kinesis.New() + _, err := p.Resources(eventbusv1.ClusterConfig{}, providers.TargetAWSKinesis) + if err == nil { + t.Fatal("expected stub error from Resources, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKinesisProvider_Stub_ErrorsOnConnectionString asserts ConnectionString also stubs. +func TestKinesisProvider_Stub_ErrorsOnConnectionString(t *testing.T) { + p := kinesis.New() + _, err := p.ConnectionString(iac.State{}, "prod") + if err == nil { + t.Fatal("expected stub error from ConnectionString, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKinesisProvider_Stub_ErrorsOnStreamResources asserts StreamResources also stubs. +func TestKinesisProvider_Stub_ErrorsOnStreamResources(t *testing.T) { + p := kinesis.New() + _, err := p.StreamResources(nil, iac.State{}) + if err == nil { + t.Fatal("expected stub error from StreamResources, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKinesisProvider_Stub_ProbeReturnsUnreachable asserts Probe returns unreachable. +func TestKinesisProvider_Stub_ProbeReturnsUnreachable(t *testing.T) { + p := kinesis.New() + hc := p.Probe("kinesis://us-east-1") + if hc.Status != providers.HealthStatusUnreachable { + t.Errorf("Probe() status = %q, want %q", hc.Status, providers.HealthStatusUnreachable) + } + if hc.Err == nil { + t.Error("Probe() Err should be non-nil for stub") + } +} From 6d2bac727ad1663fc8cebdb2a12789606f11e96c Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 3 May 2026 18:25:12 -0400 Subject: [PATCH 8/8] fix(task18): pass proto messages by pointer to fix go vet lock-by-value proto messages embed protoimpl.MessageState which contains sync.Mutex; passing by value is rejected by go vet as unsafe mutex copy. - providers/provider.go: Resources(cfg *ClusterConfig, ...) and StreamResources(streams []*StreamConfig, ...) - providers/kafka/kafka.go: matching method signatures - providers/kinesis/kinesis.go: matching method signatures - providers/kafka/kafka_test.go: Resources(&ClusterConfig{}, ...) - providers/kinesis/kinesis_test.go: Resources(&ClusterConfig{}, ...) go vet ./...: exit 0 | go test ./...: PASS (42 tests, -race) Co-Authored-By: Claude Sonnet 4.6 --- providers/kafka/kafka.go | 4 ++-- providers/kafka/kafka_test.go | 2 +- providers/kinesis/kinesis.go | 4 ++-- providers/kinesis/kinesis_test.go | 2 +- providers/provider.go | 8 +++++--- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/providers/kafka/kafka.go b/providers/kafka/kafka.go index 047952d..0e1d410 100644 --- a/providers/kafka/kafka.go +++ b/providers/kafka/kafka.go @@ -30,7 +30,7 @@ func New() providers.Provider { func (p *provider) Name() string { return "kafka" } // Resources implements providers.Provider — stub, always errors. -func (p *provider) Resources(_ eventbusv1.ClusterConfig, _ providers.DeployTarget) ([]iac.Resource, error) { +func (p *provider) Resources(_ *eventbusv1.ClusterConfig, _ providers.DeployTarget) ([]iac.Resource, error) { return nil, errNotImplemented } @@ -40,7 +40,7 @@ func (p *provider) ConnectionString(_ iac.State, _ string) (string, error) { } // StreamResources implements providers.Provider — stub, always errors. -func (p *provider) StreamResources(_ []eventbusv1.StreamConfig, _ iac.State) ([]iac.Resource, error) { +func (p *provider) StreamResources(_ []*eventbusv1.StreamConfig, _ iac.State) ([]iac.Resource, error) { return nil, errNotImplemented } diff --git a/providers/kafka/kafka_test.go b/providers/kafka/kafka_test.go index 1fdca66..0a6eab1 100644 --- a/providers/kafka/kafka_test.go +++ b/providers/kafka/kafka_test.go @@ -22,7 +22,7 @@ func TestKafkaProvider_Name(t *testing.T) { // "not implemented" error — the kafka provider is a registry stub for the pilot. func TestKafkaProvider_Stub_ErrorsOnResources(t *testing.T) { p := kafka.New() - _, err := p.Resources(eventbusv1.ClusterConfig{}, providers.TargetDigitalOceanManagedKafka) + _, err := p.Resources(&eventbusv1.ClusterConfig{}, providers.TargetDigitalOceanManagedKafka) if err == nil { t.Fatal("expected stub error from Resources, got nil") } diff --git a/providers/kinesis/kinesis.go b/providers/kinesis/kinesis.go index 8948731..2d14338 100644 --- a/providers/kinesis/kinesis.go +++ b/providers/kinesis/kinesis.go @@ -30,7 +30,7 @@ func New() providers.Provider { func (p *provider) Name() string { return "kinesis" } // Resources implements providers.Provider — stub, always errors. -func (p *provider) Resources(_ eventbusv1.ClusterConfig, _ providers.DeployTarget) ([]iac.Resource, error) { +func (p *provider) Resources(_ *eventbusv1.ClusterConfig, _ providers.DeployTarget) ([]iac.Resource, error) { return nil, errNotImplemented } @@ -40,7 +40,7 @@ func (p *provider) ConnectionString(_ iac.State, _ string) (string, error) { } // StreamResources implements providers.Provider — stub, always errors. -func (p *provider) StreamResources(_ []eventbusv1.StreamConfig, _ iac.State) ([]iac.Resource, error) { +func (p *provider) StreamResources(_ []*eventbusv1.StreamConfig, _ iac.State) ([]iac.Resource, error) { return nil, errNotImplemented } diff --git a/providers/kinesis/kinesis_test.go b/providers/kinesis/kinesis_test.go index afbd630..660624d 100644 --- a/providers/kinesis/kinesis_test.go +++ b/providers/kinesis/kinesis_test.go @@ -22,7 +22,7 @@ func TestKinesisProvider_Name(t *testing.T) { // "not implemented" error — the kinesis provider is a registry stub for the pilot. func TestKinesisProvider_Stub_ErrorsOnResources(t *testing.T) { p := kinesis.New() - _, err := p.Resources(eventbusv1.ClusterConfig{}, providers.TargetAWSKinesis) + _, err := p.Resources(&eventbusv1.ClusterConfig{}, providers.TargetAWSKinesis) if err == nil { t.Fatal("expected stub error from Resources, got nil") } diff --git a/providers/provider.go b/providers/provider.go index 71ee999..4736989 100644 --- a/providers/provider.go +++ b/providers/provider.go @@ -43,9 +43,11 @@ type Provider interface { // Resources returns the IaC resource declarations required to provision // the event-bus cluster described by cfg on the given deploy target. + // cfg is passed as a pointer because proto messages embed sync.Mutex via + // protoimpl.MessageState and must not be copied by value. // Returns an error if the provider × target combination is unsupported // or if cfg is invalid for this provider. - Resources(cfg eventbusv1.ClusterConfig, target DeployTarget) ([]iac.Resource, error) + Resources(cfg *eventbusv1.ClusterConfig, target DeployTarget) ([]iac.Resource, error) // ConnectionString derives the broker connection string from provisioned state. // env selects environment-specific outputs (e.g. "prod", "staging"). @@ -53,8 +55,8 @@ type Provider interface { // StreamResources returns the IaC resource declarations required to // declare the given streams against an already-provisioned cluster - // (represented by state). - StreamResources(streams []eventbusv1.StreamConfig, state iac.State) ([]iac.Resource, error) + // (represented by state). Streams are pointers for the same reason as cfg. + StreamResources(streams []*eventbusv1.StreamConfig, state iac.State) ([]iac.Resource, error) // Probe probes the event-bus cluster at uri and returns its health state. Probe(uri string) HealthCheck