diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..e8c1e92 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,24 @@ +name: Release +on: + push: + tags: + - 'v*' +permissions: + contents: write +jobs: + release: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + - uses: actions/setup-go@v6 + with: + go-version-file: go.mod + - uses: goreleaser/goreleaser-action@v7 + with: + distribution: goreleaser + version: '~> v2' + args: release --clean + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..44b39e9 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,31 @@ +name: CI +on: + push: + branches: [main] + pull_request: + branches: [main] +jobs: + test: + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version-file: go.mod + - run: go build ./... + - run: go test ./... -v -race -count=1 + - run: go vet ./... + + wfctl-strict-contracts: + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version-file: go.mod + - name: Validate strict plugin contracts + run: go run github.com/GoCodeAlone/workflow/cmd/wfctl@v0.20.1 plugin validate --file plugin.json --strict-contracts diff --git a/.goreleaser.yaml b/.goreleaser.yaml new file mode 100644 index 0000000..299b24f --- /dev/null +++ b/.goreleaser.yaml @@ -0,0 +1,35 @@ +version: 2 + +before: + hooks: + - go mod tidy + - "sed -i.bak 's/\"version\": \".*\"/\"version\": \"{{ .Version }}\"/' plugin.json && rm -f plugin.json.bak" + +builds: + - id: workflow-plugin-eventbus + main: ./cmd/workflow-plugin-eventbus + binary: workflow-plugin-eventbus + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + - windows + goarch: + - amd64 + - arm64 + ldflags: + - -s -w -X main.version={{.Version}} + +archives: + - formats: [tar.gz] + name_template: "{{ .ProjectName }}-{{ .Os }}-{{ .Arch }}" + files: + - plugin.json + - LICENSE + +checksum: + name_template: "checksums.txt" + +changelog: + sort: asc diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3cd5cec --- /dev/null +++ b/Makefile @@ -0,0 +1,22 @@ +.PHONY: proto-gen build test vet + +# proto-gen regenerates gen/eventbus.pb.go from proto/eventbus.proto. +# Requires: protoc v7.34.1 and protoc-gen-go v1.36.11 +# go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.11 +# WARNING: running with a different protoc/protoc-gen-go version will produce +# a different gen/eventbus.pb.go — commit only if intentionally upgrading. +proto-gen: + protoc \ + --proto_path=proto \ + --go_out=gen \ + --go_opt=paths=source_relative \ + proto/eventbus.proto + +build: + GOWORK=off go build ./... + +test: + GOWORK=off go test ./... -v -race -count=1 + +vet: + GOWORK=off go vet ./... diff --git a/README.md b/README.md new file mode 100644 index 0000000..e00344d --- /dev/null +++ b/README.md @@ -0,0 +1,100 @@ +# workflow-plugin-eventbus + +**Status: pre-pilot scaffold** — Provider implementations are in progress (BMW E2E fulfillment pilot, PR 5). + +A [workflow](https://github.com/GoCodeAlone/workflow) external plugin that provisions durable event-bus clusters as IaC and exposes typed pipeline steps for publish/consume operations. + +## Providers + +| Provider | Deploy targets | +|---|---| +| `nats` | DO App Platform, AWS ECS/EKS, Kubernetes StatefulSet | +| `kafka` | DO Managed Kafka, AWS MSK, Kubernetes (Strimzi) | +| `kinesis` | AWS (Kinesis Data Streams) | + +## Usage + +### Declare a cluster + +```yaml +modules: + - name: my-events + type: infra.eventbus + config: + provider: nats + deploy_target: digitalocean.app_platform + version: "2.10" + replicas: 2 + jetstream: + enabled: true + max_storage_bytes: 53687091200 # 50 GB +``` + +### Declare streams and consumers + +```yaml + - name: my-stream + type: infra.eventbus.stream + config: + name: MY_EVENTS + subjects: ["events.>"] + retention_policy: RETENTION_POLICY_LIMITS + max_bytes: 10737418240 # 10 GB + + - name: my-consumer + type: infra.eventbus.consumer + config: + stream_name: MY_EVENTS + name: my-handler + filter_subject: "events.>" + ack_policy: ACK_POLICY_EXPLICIT + max_deliver: 5 +``` + +### Publish from a pipeline step + +```yaml +steps: + - name: publish + type: step.eventbus.publish + config: + subject: events.created + payload: '{{ toJson .input }}' +``` + +### Subscribe trigger + +```yaml +my-handler: + trigger: + type: trigger.eventbus.subscribe + config: + stream_name: MY_EVENTS + name: my-handler + filter_subject: "events.>" + ack_policy: ACK_POLICY_EXPLICIT + steps: + - name: ack + type: step.eventbus.ack + config: + ack_token: '{{ .nats.message.ack_token }}' +``` + +## Development + +```sh +# Regenerate proto bindings after editing proto/eventbus.proto +make proto-gen + +# Build +make build + +# Test +make test +``` + +## Planned providers + +- `nats` — NATS JetStream (in progress) +- `kafka` — stub (in progress) +- `kinesis` — stub (in progress) diff --git a/cmd/workflow-plugin-eventbus/main.go b/cmd/workflow-plugin-eventbus/main.go new file mode 100644 index 0000000..2d24c58 --- /dev/null +++ b/cmd/workflow-plugin-eventbus/main.go @@ -0,0 +1,14 @@ +// Command workflow-plugin-eventbus is a workflow engine external plugin that +// provisions durable event-bus clusters (NATS / Kafka / Kinesis) as IaC and +// exposes typed pipeline steps for publish / consume operations. +// +// Status: pre-pilot scaffold — provider implementations are in progress. +package main + +// version is stamped at release time via goreleaser ldflags (-X main.version=). +var version = "dev" + +func main() { + // TODO(Task 17): wire sdk.Serve(plugin.New()) once Provider interface is implemented. + panic("workflow-plugin-eventbus: provider implementation pending — scaffold only; see github.com/GoCodeAlone/workflow-plugin-eventbus") +} diff --git a/gen/eventbus.pb.go b/gen/eventbus.pb.go new file mode 100644 index 0000000..284d6f6 --- /dev/null +++ b/gen/eventbus.pb.go @@ -0,0 +1,1507 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v7.34.1 +// source: eventbus.proto + +package eventbusv1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// RetentionPolicy controls how messages are retained in a stream. +type RetentionPolicy int32 + +const ( + RetentionPolicy_RETENTION_POLICY_UNSPECIFIED RetentionPolicy = 0 + // RETENTION_POLICY_LIMITS retains messages up to configured size / age limits. + RetentionPolicy_RETENTION_POLICY_LIMITS RetentionPolicy = 1 + // RETENTION_POLICY_INTEREST retains messages while at least one consumer is active. + RetentionPolicy_RETENTION_POLICY_INTEREST RetentionPolicy = 2 + // RETENTION_POLICY_WORKQUEUE retains messages until they are acknowledged by one consumer. + RetentionPolicy_RETENTION_POLICY_WORKQUEUE RetentionPolicy = 3 +) + +// Enum value maps for RetentionPolicy. +var ( + RetentionPolicy_name = map[int32]string{ + 0: "RETENTION_POLICY_UNSPECIFIED", + 1: "RETENTION_POLICY_LIMITS", + 2: "RETENTION_POLICY_INTEREST", + 3: "RETENTION_POLICY_WORKQUEUE", + } + RetentionPolicy_value = map[string]int32{ + "RETENTION_POLICY_UNSPECIFIED": 0, + "RETENTION_POLICY_LIMITS": 1, + "RETENTION_POLICY_INTEREST": 2, + "RETENTION_POLICY_WORKQUEUE": 3, + } +) + +func (x RetentionPolicy) Enum() *RetentionPolicy { + p := new(RetentionPolicy) + *p = x + return p +} + +func (x RetentionPolicy) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (RetentionPolicy) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[0].Descriptor() +} + +func (RetentionPolicy) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[0] +} + +func (x RetentionPolicy) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use RetentionPolicy.Descriptor instead. +func (RetentionPolicy) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{0} +} + +// DeliverPolicy controls which messages a consumer receives on first attach. +type DeliverPolicy int32 + +const ( + DeliverPolicy_DELIVER_POLICY_UNSPECIFIED DeliverPolicy = 0 + // DELIVER_POLICY_ALL delivers all messages from the beginning of the stream. + DeliverPolicy_DELIVER_POLICY_ALL DeliverPolicy = 1 + // DELIVER_POLICY_LAST delivers only the last message per subject. + DeliverPolicy_DELIVER_POLICY_LAST DeliverPolicy = 2 + // DELIVER_POLICY_NEW delivers only messages published after the consumer is created. + DeliverPolicy_DELIVER_POLICY_NEW DeliverPolicy = 3 + // DELIVER_POLICY_BY_START_SEQUENCE delivers messages starting from a specific sequence. + DeliverPolicy_DELIVER_POLICY_BY_START_SEQUENCE DeliverPolicy = 4 + // DELIVER_POLICY_BY_START_TIME delivers messages starting from a specific time. + DeliverPolicy_DELIVER_POLICY_BY_START_TIME DeliverPolicy = 5 +) + +// Enum value maps for DeliverPolicy. +var ( + DeliverPolicy_name = map[int32]string{ + 0: "DELIVER_POLICY_UNSPECIFIED", + 1: "DELIVER_POLICY_ALL", + 2: "DELIVER_POLICY_LAST", + 3: "DELIVER_POLICY_NEW", + 4: "DELIVER_POLICY_BY_START_SEQUENCE", + 5: "DELIVER_POLICY_BY_START_TIME", + } + DeliverPolicy_value = map[string]int32{ + "DELIVER_POLICY_UNSPECIFIED": 0, + "DELIVER_POLICY_ALL": 1, + "DELIVER_POLICY_LAST": 2, + "DELIVER_POLICY_NEW": 3, + "DELIVER_POLICY_BY_START_SEQUENCE": 4, + "DELIVER_POLICY_BY_START_TIME": 5, + } +) + +func (x DeliverPolicy) Enum() *DeliverPolicy { + p := new(DeliverPolicy) + *p = x + return p +} + +func (x DeliverPolicy) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (DeliverPolicy) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[1].Descriptor() +} + +func (DeliverPolicy) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[1] +} + +func (x DeliverPolicy) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use DeliverPolicy.Descriptor instead. +func (DeliverPolicy) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{1} +} + +// AckPolicy controls how consumers must acknowledge messages. +type AckPolicy int32 + +const ( + AckPolicy_ACK_POLICY_UNSPECIFIED AckPolicy = 0 + // ACK_POLICY_EXPLICIT requires explicit per-message acknowledgement (recommended). + AckPolicy_ACK_POLICY_EXPLICIT AckPolicy = 1 + // ACK_POLICY_NONE disables acknowledgements entirely (fire-and-forget). + AckPolicy_ACK_POLICY_NONE AckPolicy = 2 + // ACK_POLICY_ALL acknowledges all messages up to and including the specified sequence. + AckPolicy_ACK_POLICY_ALL AckPolicy = 3 +) + +// Enum value maps for AckPolicy. +var ( + AckPolicy_name = map[int32]string{ + 0: "ACK_POLICY_UNSPECIFIED", + 1: "ACK_POLICY_EXPLICIT", + 2: "ACK_POLICY_NONE", + 3: "ACK_POLICY_ALL", + } + AckPolicy_value = map[string]int32{ + "ACK_POLICY_UNSPECIFIED": 0, + "ACK_POLICY_EXPLICIT": 1, + "ACK_POLICY_NONE": 2, + "ACK_POLICY_ALL": 3, + } +) + +func (x AckPolicy) Enum() *AckPolicy { + p := new(AckPolicy) + *p = x + return p +} + +func (x AckPolicy) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (AckPolicy) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[2].Descriptor() +} + +func (AckPolicy) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[2] +} + +func (x AckPolicy) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use AckPolicy.Descriptor instead. +func (AckPolicy) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{2} +} + +// KafkaSecurityProtocol selects the client-broker transport security mode. +type KafkaSecurityProtocol int32 + +const ( + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_UNSPECIFIED KafkaSecurityProtocol = 0 + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_PLAINTEXT KafkaSecurityProtocol = 1 + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_SSL KafkaSecurityProtocol = 2 + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT KafkaSecurityProtocol = 3 + KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_SASL_SSL KafkaSecurityProtocol = 4 +) + +// Enum value maps for KafkaSecurityProtocol. +var ( + KafkaSecurityProtocol_name = map[int32]string{ + 0: "KAFKA_SECURITY_PROTOCOL_UNSPECIFIED", + 1: "KAFKA_SECURITY_PROTOCOL_PLAINTEXT", + 2: "KAFKA_SECURITY_PROTOCOL_SSL", + 3: "KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT", + 4: "KAFKA_SECURITY_PROTOCOL_SASL_SSL", + } + KafkaSecurityProtocol_value = map[string]int32{ + "KAFKA_SECURITY_PROTOCOL_UNSPECIFIED": 0, + "KAFKA_SECURITY_PROTOCOL_PLAINTEXT": 1, + "KAFKA_SECURITY_PROTOCOL_SSL": 2, + "KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT": 3, + "KAFKA_SECURITY_PROTOCOL_SASL_SSL": 4, + } +) + +func (x KafkaSecurityProtocol) Enum() *KafkaSecurityProtocol { + p := new(KafkaSecurityProtocol) + *p = x + return p +} + +func (x KafkaSecurityProtocol) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (KafkaSecurityProtocol) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[3].Descriptor() +} + +func (KafkaSecurityProtocol) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[3] +} + +func (x KafkaSecurityProtocol) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use KafkaSecurityProtocol.Descriptor instead. +func (KafkaSecurityProtocol) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{3} +} + +// KafkaSaslMechanism selects the SASL authentication mechanism. +type KafkaSaslMechanism int32 + +const ( + KafkaSaslMechanism_KAFKA_SASL_MECHANISM_UNSPECIFIED KafkaSaslMechanism = 0 + KafkaSaslMechanism_KAFKA_SASL_MECHANISM_PLAIN KafkaSaslMechanism = 1 + KafkaSaslMechanism_KAFKA_SASL_MECHANISM_SCRAM_SHA_256 KafkaSaslMechanism = 2 + KafkaSaslMechanism_KAFKA_SASL_MECHANISM_SCRAM_SHA_512 KafkaSaslMechanism = 3 +) + +// Enum value maps for KafkaSaslMechanism. +var ( + KafkaSaslMechanism_name = map[int32]string{ + 0: "KAFKA_SASL_MECHANISM_UNSPECIFIED", + 1: "KAFKA_SASL_MECHANISM_PLAIN", + 2: "KAFKA_SASL_MECHANISM_SCRAM_SHA_256", + 3: "KAFKA_SASL_MECHANISM_SCRAM_SHA_512", + } + KafkaSaslMechanism_value = map[string]int32{ + "KAFKA_SASL_MECHANISM_UNSPECIFIED": 0, + "KAFKA_SASL_MECHANISM_PLAIN": 1, + "KAFKA_SASL_MECHANISM_SCRAM_SHA_256": 2, + "KAFKA_SASL_MECHANISM_SCRAM_SHA_512": 3, + } +) + +func (x KafkaSaslMechanism) Enum() *KafkaSaslMechanism { + p := new(KafkaSaslMechanism) + *p = x + return p +} + +func (x KafkaSaslMechanism) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (KafkaSaslMechanism) Descriptor() protoreflect.EnumDescriptor { + return file_eventbus_proto_enumTypes[4].Descriptor() +} + +func (KafkaSaslMechanism) Type() protoreflect.EnumType { + return &file_eventbus_proto_enumTypes[4] +} + +func (x KafkaSaslMechanism) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use KafkaSaslMechanism.Descriptor instead. +func (KafkaSaslMechanism) EnumDescriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{4} +} + +// ClusterConfig is the typed config for infra.eventbus module. +// provider and deploy_target select the Provider × DeployTarget combination. +type ClusterConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // provider selects the message-broker backend: "nats" | "kafka" | "kinesis". + Provider string `protobuf:"bytes,1,opt,name=provider,proto3" json:"provider,omitempty"` + // deploy_target selects the deployment platform: + // "digitalocean.app_platform" | "aws.ecs" | "aws.eks" | "kubernetes" | etc. + DeployTarget string `protobuf:"bytes,2,opt,name=deploy_target,json=deployTarget,proto3" json:"deploy_target,omitempty"` + // version is the broker version to deploy (e.g. "2.10" for NATS). + Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"` + // replicas is the number of broker replicas. + Replicas int32 `protobuf:"varint,4,opt,name=replicas,proto3" json:"replicas,omitempty"` + // jetstream holds NATS JetStream-specific configuration; ignored for other providers. + Jetstream *JetStreamConfig `protobuf:"bytes,5,opt,name=jetstream,proto3" json:"jetstream,omitempty"` + // kafka holds Kafka-specific cluster configuration; ignored for other providers. + Kafka *KafkaConfig `protobuf:"bytes,6,opt,name=kafka,proto3" json:"kafka,omitempty"` + // kinesis holds Kinesis-specific configuration; ignored for other providers. + Kinesis *KinesisConfig `protobuf:"bytes,7,opt,name=kinesis,proto3" json:"kinesis,omitempty"` + // limits sets compute/storage resource limits for the cluster containers. + Limits *ResourceLimits `protobuf:"bytes,8,opt,name=limits,proto3" json:"limits,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ClusterConfig) Reset() { + *x = ClusterConfig{} + mi := &file_eventbus_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ClusterConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClusterConfig) ProtoMessage() {} + +func (x *ClusterConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClusterConfig.ProtoReflect.Descriptor instead. +func (*ClusterConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{0} +} + +func (x *ClusterConfig) GetProvider() string { + if x != nil { + return x.Provider + } + return "" +} + +func (x *ClusterConfig) GetDeployTarget() string { + if x != nil { + return x.DeployTarget + } + return "" +} + +func (x *ClusterConfig) GetVersion() string { + if x != nil { + return x.Version + } + return "" +} + +func (x *ClusterConfig) GetReplicas() int32 { + if x != nil { + return x.Replicas + } + return 0 +} + +func (x *ClusterConfig) GetJetstream() *JetStreamConfig { + if x != nil { + return x.Jetstream + } + return nil +} + +func (x *ClusterConfig) GetKafka() *KafkaConfig { + if x != nil { + return x.Kafka + } + return nil +} + +func (x *ClusterConfig) GetKinesis() *KinesisConfig { + if x != nil { + return x.Kinesis + } + return nil +} + +func (x *ClusterConfig) GetLimits() *ResourceLimits { + if x != nil { + return x.Limits + } + return nil +} + +// JetStreamConfig holds NATS JetStream-specific settings. +type JetStreamConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // enabled activates JetStream on the NATS cluster (required for durability). + Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"` + // max_storage_bytes caps total on-disk storage for all streams (0 = unlimited). + MaxStorageBytes int64 `protobuf:"varint,2,opt,name=max_storage_bytes,json=maxStorageBytes,proto3" json:"max_storage_bytes,omitempty"` + // max_memory_bytes caps in-memory storage (0 = unlimited). + MaxMemoryBytes int64 `protobuf:"varint,3,opt,name=max_memory_bytes,json=maxMemoryBytes,proto3" json:"max_memory_bytes,omitempty"` + // max_age is the default maximum age for messages across all streams. + MaxAge *durationpb.Duration `protobuf:"bytes,4,opt,name=max_age,json=maxAge,proto3" json:"max_age,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *JetStreamConfig) Reset() { + *x = JetStreamConfig{} + mi := &file_eventbus_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *JetStreamConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JetStreamConfig) ProtoMessage() {} + +func (x *JetStreamConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JetStreamConfig.ProtoReflect.Descriptor instead. +func (*JetStreamConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{1} +} + +func (x *JetStreamConfig) GetEnabled() bool { + if x != nil { + return x.Enabled + } + return false +} + +func (x *JetStreamConfig) GetMaxStorageBytes() int64 { + if x != nil { + return x.MaxStorageBytes + } + return 0 +} + +func (x *JetStreamConfig) GetMaxMemoryBytes() int64 { + if x != nil { + return x.MaxMemoryBytes + } + return 0 +} + +func (x *JetStreamConfig) GetMaxAge() *durationpb.Duration { + if x != nil { + return x.MaxAge + } + return nil +} + +// KafkaConfig holds Kafka-specific cluster settings. +type KafkaConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // bootstrap_servers is the initial broker address list (comma-separated). + // Used when connecting to an existing Kafka cluster rather than provisioning one. + BootstrapServers string `protobuf:"bytes,1,opt,name=bootstrap_servers,json=bootstrapServers,proto3" json:"bootstrap_servers,omitempty"` + // security_protocol specifies the protocol for client-broker communication. + SecurityProtocol KafkaSecurityProtocol `protobuf:"varint,2,opt,name=security_protocol,json=securityProtocol,proto3,enum=workflow.plugin.eventbus.v1.KafkaSecurityProtocol" json:"security_protocol,omitempty"` + // sasl_mechanism for SASL authentication. + SaslMechanism KafkaSaslMechanism `protobuf:"varint,3,opt,name=sasl_mechanism,json=saslMechanism,proto3,enum=workflow.plugin.eventbus.v1.KafkaSaslMechanism" json:"sasl_mechanism,omitempty"` + // default_replication_factor for new topics (0 = use broker default). + DefaultReplicationFactor int32 `protobuf:"varint,4,opt,name=default_replication_factor,json=defaultReplicationFactor,proto3" json:"default_replication_factor,omitempty"` + // min_insync_replicas is the minimum number of in-sync replicas required for acks. + MinInsyncReplicas int32 `protobuf:"varint,5,opt,name=min_insync_replicas,json=minInsyncReplicas,proto3" json:"min_insync_replicas,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *KafkaConfig) Reset() { + *x = KafkaConfig{} + mi := &file_eventbus_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *KafkaConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KafkaConfig) ProtoMessage() {} + +func (x *KafkaConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KafkaConfig.ProtoReflect.Descriptor instead. +func (*KafkaConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{2} +} + +func (x *KafkaConfig) GetBootstrapServers() string { + if x != nil { + return x.BootstrapServers + } + return "" +} + +func (x *KafkaConfig) GetSecurityProtocol() KafkaSecurityProtocol { + if x != nil { + return x.SecurityProtocol + } + return KafkaSecurityProtocol_KAFKA_SECURITY_PROTOCOL_UNSPECIFIED +} + +func (x *KafkaConfig) GetSaslMechanism() KafkaSaslMechanism { + if x != nil { + return x.SaslMechanism + } + return KafkaSaslMechanism_KAFKA_SASL_MECHANISM_UNSPECIFIED +} + +func (x *KafkaConfig) GetDefaultReplicationFactor() int32 { + if x != nil { + return x.DefaultReplicationFactor + } + return 0 +} + +func (x *KafkaConfig) GetMinInsyncReplicas() int32 { + if x != nil { + return x.MinInsyncReplicas + } + return 0 +} + +// KinesisConfig holds AWS Kinesis Data Streams-specific settings. +type KinesisConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // region is the AWS region for the Kinesis stream (e.g. "us-east-1"). + Region string `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"` + // shard_count is the initial number of shards (throughput units). + ShardCount int32 `protobuf:"varint,2,opt,name=shard_count,json=shardCount,proto3" json:"shard_count,omitempty"` + // retention_period_hours is the message retention window (24–8760 hours). + RetentionPeriodHours int32 `protobuf:"varint,3,opt,name=retention_period_hours,json=retentionPeriodHours,proto3" json:"retention_period_hours,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *KinesisConfig) Reset() { + *x = KinesisConfig{} + mi := &file_eventbus_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *KinesisConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KinesisConfig) ProtoMessage() {} + +func (x *KinesisConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KinesisConfig.ProtoReflect.Descriptor instead. +func (*KinesisConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{3} +} + +func (x *KinesisConfig) GetRegion() string { + if x != nil { + return x.Region + } + return "" +} + +func (x *KinesisConfig) GetShardCount() int32 { + if x != nil { + return x.ShardCount + } + return 0 +} + +func (x *KinesisConfig) GetRetentionPeriodHours() int32 { + if x != nil { + return x.RetentionPeriodHours + } + return 0 +} + +// ResourceLimits constrains compute and storage for cluster containers. +type ResourceLimits struct { + state protoimpl.MessageState `protogen:"open.v1"` + // cpu is the CPU limit in Kubernetes notation (e.g. "500m", "2"). + Cpu string `protobuf:"bytes,1,opt,name=cpu,proto3" json:"cpu,omitempty"` + // memory is the memory limit (e.g. "512Mi", "2Gi"). + Memory string `protobuf:"bytes,2,opt,name=memory,proto3" json:"memory,omitempty"` + // storage is the persistent-volume size (e.g. "10Gi"). + Storage string `protobuf:"bytes,3,opt,name=storage,proto3" json:"storage,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ResourceLimits) Reset() { + *x = ResourceLimits{} + mi := &file_eventbus_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ResourceLimits) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ResourceLimits) ProtoMessage() {} + +func (x *ResourceLimits) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ResourceLimits.ProtoReflect.Descriptor instead. +func (*ResourceLimits) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{4} +} + +func (x *ResourceLimits) GetCpu() string { + if x != nil { + return x.Cpu + } + return "" +} + +func (x *ResourceLimits) GetMemory() string { + if x != nil { + return x.Memory + } + return "" +} + +func (x *ResourceLimits) GetStorage() string { + if x != nil { + return x.Storage + } + return "" +} + +// StreamConfig is the typed config for infra.eventbus.stream module. +type StreamConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // name is the stream name (e.g. "BMW_FULFILLMENT"). + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // subjects lists the NATS subject filter patterns bound to this stream. + Subjects []string `protobuf:"bytes,2,rep,name=subjects,proto3" json:"subjects,omitempty"` + // retention_policy controls message retention semantics. + RetentionPolicy RetentionPolicy `protobuf:"varint,3,opt,name=retention_policy,json=retentionPolicy,proto3,enum=workflow.plugin.eventbus.v1.RetentionPolicy" json:"retention_policy,omitempty"` + // num_replicas is the number of stream replicas within the cluster. + NumReplicas int32 `protobuf:"varint,4,opt,name=num_replicas,json=numReplicas,proto3" json:"num_replicas,omitempty"` + // max_bytes caps total on-disk storage for this stream (0 = unlimited). + MaxBytes int64 `protobuf:"varint,5,opt,name=max_bytes,json=maxBytes,proto3" json:"max_bytes,omitempty"` + // max_age is the maximum age for messages in this stream. + MaxAge *durationpb.Duration `protobuf:"bytes,6,opt,name=max_age,json=maxAge,proto3" json:"max_age,omitempty"` + // ack_wait is the default acknowledgement deadline for consumers on this stream. + AckWait *durationpb.Duration `protobuf:"bytes,7,opt,name=ack_wait,json=ackWait,proto3" json:"ack_wait,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamConfig) Reset() { + *x = StreamConfig{} + mi := &file_eventbus_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamConfig) ProtoMessage() {} + +func (x *StreamConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamConfig.ProtoReflect.Descriptor instead. +func (*StreamConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{5} +} + +func (x *StreamConfig) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *StreamConfig) GetSubjects() []string { + if x != nil { + return x.Subjects + } + return nil +} + +func (x *StreamConfig) GetRetentionPolicy() RetentionPolicy { + if x != nil { + return x.RetentionPolicy + } + return RetentionPolicy_RETENTION_POLICY_UNSPECIFIED +} + +func (x *StreamConfig) GetNumReplicas() int32 { + if x != nil { + return x.NumReplicas + } + return 0 +} + +func (x *StreamConfig) GetMaxBytes() int64 { + if x != nil { + return x.MaxBytes + } + return 0 +} + +func (x *StreamConfig) GetMaxAge() *durationpb.Duration { + if x != nil { + return x.MaxAge + } + return nil +} + +func (x *StreamConfig) GetAckWait() *durationpb.Duration { + if x != nil { + return x.AckWait + } + return nil +} + +// ConsumerConfig is the typed config for infra.eventbus.consumer module +// and trigger.eventbus.subscribe. +type ConsumerConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + // name is the durable consumer name. + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // stream_name is the stream this consumer attaches to. + StreamName string `protobuf:"bytes,2,opt,name=stream_name,json=streamName,proto3" json:"stream_name,omitempty"` + // filter_subject narrows delivery to messages matching this subject pattern. + FilterSubject string `protobuf:"bytes,3,opt,name=filter_subject,json=filterSubject,proto3" json:"filter_subject,omitempty"` + // deliver_policy controls which messages are delivered on first attach. + DeliverPolicy DeliverPolicy `protobuf:"varint,4,opt,name=deliver_policy,json=deliverPolicy,proto3,enum=workflow.plugin.eventbus.v1.DeliverPolicy" json:"deliver_policy,omitempty"` + // ack_policy controls how acknowledgements are required. + AckPolicy AckPolicy `protobuf:"varint,5,opt,name=ack_policy,json=ackPolicy,proto3,enum=workflow.plugin.eventbus.v1.AckPolicy" json:"ack_policy,omitempty"` + // max_deliver is the maximum number of delivery attempts before NACKing (0 = unlimited). + MaxDeliver int32 `protobuf:"varint,6,opt,name=max_deliver,json=maxDeliver,proto3" json:"max_deliver,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ConsumerConfig) Reset() { + *x = ConsumerConfig{} + mi := &file_eventbus_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ConsumerConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConsumerConfig) ProtoMessage() {} + +func (x *ConsumerConfig) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConsumerConfig.ProtoReflect.Descriptor instead. +func (*ConsumerConfig) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{6} +} + +func (x *ConsumerConfig) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ConsumerConfig) GetStreamName() string { + if x != nil { + return x.StreamName + } + return "" +} + +func (x *ConsumerConfig) GetFilterSubject() string { + if x != nil { + return x.FilterSubject + } + return "" +} + +func (x *ConsumerConfig) GetDeliverPolicy() DeliverPolicy { + if x != nil { + return x.DeliverPolicy + } + return DeliverPolicy_DELIVER_POLICY_UNSPECIFIED +} + +func (x *ConsumerConfig) GetAckPolicy() AckPolicy { + if x != nil { + return x.AckPolicy + } + return AckPolicy_ACK_POLICY_UNSPECIFIED +} + +func (x *ConsumerConfig) GetMaxDeliver() int32 { + if x != nil { + return x.MaxDeliver + } + return 0 +} + +// PublishRequest is the input for step.eventbus.publish. +type PublishRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // subject is the NATS subject (or Kafka topic) to publish to. + Subject string `protobuf:"bytes,1,opt,name=subject,proto3" json:"subject,omitempty"` + // payload is the raw message bytes. + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + // headers are optional key/value metadata attached to the message. + Headers map[string]string `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // correlation_id is an opaque identifier for request/reply correlation or tracing. + CorrelationId string `protobuf:"bytes,4,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishRequest) Reset() { + *x = PublishRequest{} + mi := &file_eventbus_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishRequest) ProtoMessage() {} + +func (x *PublishRequest) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. +func (*PublishRequest) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{7} +} + +func (x *PublishRequest) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +func (x *PublishRequest) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *PublishRequest) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +func (x *PublishRequest) GetCorrelationId() string { + if x != nil { + return x.CorrelationId + } + return "" +} + +// PublishResponse is the output from step.eventbus.publish. +type PublishResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // sequence is the JetStream stream sequence number; empty for non-JetStream brokers. + Sequence string `protobuf:"bytes,1,opt,name=sequence,proto3" json:"sequence,omitempty"` + // acked_at is the timestamp when the broker acknowledged the message (RFC3339). + AckedAt string `protobuf:"bytes,2,opt,name=acked_at,json=ackedAt,proto3" json:"acked_at,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishResponse) Reset() { + *x = PublishResponse{} + mi := &file_eventbus_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishResponse) ProtoMessage() {} + +func (x *PublishResponse) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. +func (*PublishResponse) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{8} +} + +func (x *PublishResponse) GetSequence() string { + if x != nil { + return x.Sequence + } + return "" +} + +func (x *PublishResponse) GetAckedAt() string { + if x != nil { + return x.AckedAt + } + return "" +} + +// ConsumeRequest is the input for step.eventbus.consume. +type ConsumeRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // consumer is the durable consumer name to pull from. + Consumer string `protobuf:"bytes,1,opt,name=consumer,proto3" json:"consumer,omitempty"` + // batch_size is the maximum number of messages to return in one call. + BatchSize int32 `protobuf:"varint,2,opt,name=batch_size,json=batchSize,proto3" json:"batch_size,omitempty"` + // max_wait is the maximum time to block waiting for messages. + MaxWait *durationpb.Duration `protobuf:"bytes,3,opt,name=max_wait,json=maxWait,proto3" json:"max_wait,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ConsumeRequest) Reset() { + *x = ConsumeRequest{} + mi := &file_eventbus_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ConsumeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConsumeRequest) ProtoMessage() {} + +func (x *ConsumeRequest) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConsumeRequest.ProtoReflect.Descriptor instead. +func (*ConsumeRequest) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{9} +} + +func (x *ConsumeRequest) GetConsumer() string { + if x != nil { + return x.Consumer + } + return "" +} + +func (x *ConsumeRequest) GetBatchSize() int32 { + if x != nil { + return x.BatchSize + } + return 0 +} + +func (x *ConsumeRequest) GetMaxWait() *durationpb.Duration { + if x != nil { + return x.MaxWait + } + return nil +} + +// ConsumeResponse is the output from step.eventbus.consume. +type ConsumeResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // messages contains the delivered messages (up to batch_size). + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ConsumeResponse) Reset() { + *x = ConsumeResponse{} + mi := &file_eventbus_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ConsumeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConsumeResponse) ProtoMessage() {} + +func (x *ConsumeResponse) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConsumeResponse.ProtoReflect.Descriptor instead. +func (*ConsumeResponse) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{10} +} + +func (x *ConsumeResponse) GetMessages() []*Message { + if x != nil { + return x.Messages + } + return nil +} + +// Message represents a single delivered event-bus message. +type Message struct { + state protoimpl.MessageState `protogen:"open.v1"` + // subject is the NATS subject (or Kafka topic/partition key) the message was published on. + Subject string `protobuf:"bytes,1,opt,name=subject,proto3" json:"subject,omitempty"` + // payload is the raw message bytes. + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + // headers are key/value metadata attached to the message. + Headers map[string]string `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // sequence is the stream sequence number assigned by the broker. + Sequence string `protobuf:"bytes,4,opt,name=sequence,proto3" json:"sequence,omitempty"` + // published_at is the broker-assigned publish timestamp (RFC3339). + PublishedAt string `protobuf:"bytes,5,opt,name=published_at,json=publishedAt,proto3" json:"published_at,omitempty"` + // ack_token is the opaque token used to acknowledge this message via step.eventbus.ack. + AckToken string `protobuf:"bytes,6,opt,name=ack_token,json=ackToken,proto3" json:"ack_token,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Message) Reset() { + *x = Message{} + mi := &file_eventbus_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{11} +} + +func (x *Message) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +func (x *Message) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *Message) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +func (x *Message) GetSequence() string { + if x != nil { + return x.Sequence + } + return "" +} + +func (x *Message) GetPublishedAt() string { + if x != nil { + return x.PublishedAt + } + return "" +} + +func (x *Message) GetAckToken() string { + if x != nil { + return x.AckToken + } + return "" +} + +// AckRequest is the input for step.eventbus.ack. +type AckRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // ack_token is the opaque acknowledgement token from Message.ack_token. + AckToken string `protobuf:"bytes,1,opt,name=ack_token,json=ackToken,proto3" json:"ack_token,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AckRequest) Reset() { + *x = AckRequest{} + mi := &file_eventbus_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AckRequest) ProtoMessage() {} + +func (x *AckRequest) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[12] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AckRequest.ProtoReflect.Descriptor instead. +func (*AckRequest) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{12} +} + +func (x *AckRequest) GetAckToken() string { + if x != nil { + return x.AckToken + } + return "" +} + +// AckResponse is the output from step.eventbus.ack. +type AckResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AckResponse) Reset() { + *x = AckResponse{} + mi := &file_eventbus_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AckResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AckResponse) ProtoMessage() {} + +func (x *AckResponse) ProtoReflect() protoreflect.Message { + mi := &file_eventbus_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AckResponse.ProtoReflect.Descriptor instead. +func (*AckResponse) Descriptor() ([]byte, []int) { + return file_eventbus_proto_rawDescGZIP(), []int{13} +} + +var File_eventbus_proto protoreflect.FileDescriptor + +const file_eventbus_proto_rawDesc = "" + + "\n" + + "\x0eeventbus.proto\x12\x1bworkflow.plugin.eventbus.v1\x1a\x1egoogle/protobuf/duration.proto\"\x9d\x03\n" + + "\rClusterConfig\x12\x1a\n" + + "\bprovider\x18\x01 \x01(\tR\bprovider\x12#\n" + + "\rdeploy_target\x18\x02 \x01(\tR\fdeployTarget\x12\x18\n" + + "\aversion\x18\x03 \x01(\tR\aversion\x12\x1a\n" + + "\breplicas\x18\x04 \x01(\x05R\breplicas\x12J\n" + + "\tjetstream\x18\x05 \x01(\v2,.workflow.plugin.eventbus.v1.JetStreamConfigR\tjetstream\x12>\n" + + "\x05kafka\x18\x06 \x01(\v2(.workflow.plugin.eventbus.v1.KafkaConfigR\x05kafka\x12D\n" + + "\akinesis\x18\a \x01(\v2*.workflow.plugin.eventbus.v1.KinesisConfigR\akinesis\x12C\n" + + "\x06limits\x18\b \x01(\v2+.workflow.plugin.eventbus.v1.ResourceLimitsR\x06limits\"\xb5\x01\n" + + "\x0fJetStreamConfig\x12\x18\n" + + "\aenabled\x18\x01 \x01(\bR\aenabled\x12*\n" + + "\x11max_storage_bytes\x18\x02 \x01(\x03R\x0fmaxStorageBytes\x12(\n" + + "\x10max_memory_bytes\x18\x03 \x01(\x03R\x0emaxMemoryBytes\x122\n" + + "\amax_age\x18\x04 \x01(\v2\x19.google.protobuf.DurationR\x06maxAge\"\xe1\x02\n" + + "\vKafkaConfig\x12+\n" + + "\x11bootstrap_servers\x18\x01 \x01(\tR\x10bootstrapServers\x12_\n" + + "\x11security_protocol\x18\x02 \x01(\x0e22.workflow.plugin.eventbus.v1.KafkaSecurityProtocolR\x10securityProtocol\x12V\n" + + "\x0esasl_mechanism\x18\x03 \x01(\x0e2/.workflow.plugin.eventbus.v1.KafkaSaslMechanismR\rsaslMechanism\x12<\n" + + "\x1adefault_replication_factor\x18\x04 \x01(\x05R\x18defaultReplicationFactor\x12.\n" + + "\x13min_insync_replicas\x18\x05 \x01(\x05R\x11minInsyncReplicas\"~\n" + + "\rKinesisConfig\x12\x16\n" + + "\x06region\x18\x01 \x01(\tR\x06region\x12\x1f\n" + + "\vshard_count\x18\x02 \x01(\x05R\n" + + "shardCount\x124\n" + + "\x16retention_period_hours\x18\x03 \x01(\x05R\x14retentionPeriodHours\"T\n" + + "\x0eResourceLimits\x12\x10\n" + + "\x03cpu\x18\x01 \x01(\tR\x03cpu\x12\x16\n" + + "\x06memory\x18\x02 \x01(\tR\x06memory\x12\x18\n" + + "\astorage\x18\x03 \x01(\tR\astorage\"\xc1\x02\n" + + "\fStreamConfig\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1a\n" + + "\bsubjects\x18\x02 \x03(\tR\bsubjects\x12W\n" + + "\x10retention_policy\x18\x03 \x01(\x0e2,.workflow.plugin.eventbus.v1.RetentionPolicyR\x0fretentionPolicy\x12!\n" + + "\fnum_replicas\x18\x04 \x01(\x05R\vnumReplicas\x12\x1b\n" + + "\tmax_bytes\x18\x05 \x01(\x03R\bmaxBytes\x122\n" + + "\amax_age\x18\x06 \x01(\v2\x19.google.protobuf.DurationR\x06maxAge\x124\n" + + "\back_wait\x18\a \x01(\v2\x19.google.protobuf.DurationR\aackWait\"\xa7\x02\n" + + "\x0eConsumerConfig\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1f\n" + + "\vstream_name\x18\x02 \x01(\tR\n" + + "streamName\x12%\n" + + "\x0efilter_subject\x18\x03 \x01(\tR\rfilterSubject\x12Q\n" + + "\x0edeliver_policy\x18\x04 \x01(\x0e2*.workflow.plugin.eventbus.v1.DeliverPolicyR\rdeliverPolicy\x12E\n" + + "\n" + + "ack_policy\x18\x05 \x01(\x0e2&.workflow.plugin.eventbus.v1.AckPolicyR\tackPolicy\x12\x1f\n" + + "\vmax_deliver\x18\x06 \x01(\x05R\n" + + "maxDeliver\"\xfb\x01\n" + + "\x0ePublishRequest\x12\x18\n" + + "\asubject\x18\x01 \x01(\tR\asubject\x12\x18\n" + + "\apayload\x18\x02 \x01(\fR\apayload\x12R\n" + + "\aheaders\x18\x03 \x03(\v28.workflow.plugin.eventbus.v1.PublishRequest.HeadersEntryR\aheaders\x12%\n" + + "\x0ecorrelation_id\x18\x04 \x01(\tR\rcorrelationId\x1a:\n" + + "\fHeadersEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"H\n" + + "\x0fPublishResponse\x12\x1a\n" + + "\bsequence\x18\x01 \x01(\tR\bsequence\x12\x19\n" + + "\backed_at\x18\x02 \x01(\tR\aackedAt\"\x81\x01\n" + + "\x0eConsumeRequest\x12\x1a\n" + + "\bconsumer\x18\x01 \x01(\tR\bconsumer\x12\x1d\n" + + "\n" + + "batch_size\x18\x02 \x01(\x05R\tbatchSize\x124\n" + + "\bmax_wait\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\amaxWait\"S\n" + + "\x0fConsumeResponse\x12@\n" + + "\bmessages\x18\x01 \x03(\v2$.workflow.plugin.eventbus.v1.MessageR\bmessages\"\xa2\x02\n" + + "\aMessage\x12\x18\n" + + "\asubject\x18\x01 \x01(\tR\asubject\x12\x18\n" + + "\apayload\x18\x02 \x01(\fR\apayload\x12K\n" + + "\aheaders\x18\x03 \x03(\v21.workflow.plugin.eventbus.v1.Message.HeadersEntryR\aheaders\x12\x1a\n" + + "\bsequence\x18\x04 \x01(\tR\bsequence\x12!\n" + + "\fpublished_at\x18\x05 \x01(\tR\vpublishedAt\x12\x1b\n" + + "\tack_token\x18\x06 \x01(\tR\backToken\x1a:\n" + + "\fHeadersEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\")\n" + + "\n" + + "AckRequest\x12\x1b\n" + + "\tack_token\x18\x01 \x01(\tR\backToken\"\r\n" + + "\vAckResponse*\x8f\x01\n" + + "\x0fRetentionPolicy\x12 \n" + + "\x1cRETENTION_POLICY_UNSPECIFIED\x10\x00\x12\x1b\n" + + "\x17RETENTION_POLICY_LIMITS\x10\x01\x12\x1d\n" + + "\x19RETENTION_POLICY_INTEREST\x10\x02\x12\x1e\n" + + "\x1aRETENTION_POLICY_WORKQUEUE\x10\x03*\xc0\x01\n" + + "\rDeliverPolicy\x12\x1e\n" + + "\x1aDELIVER_POLICY_UNSPECIFIED\x10\x00\x12\x16\n" + + "\x12DELIVER_POLICY_ALL\x10\x01\x12\x17\n" + + "\x13DELIVER_POLICY_LAST\x10\x02\x12\x16\n" + + "\x12DELIVER_POLICY_NEW\x10\x03\x12$\n" + + " DELIVER_POLICY_BY_START_SEQUENCE\x10\x04\x12 \n" + + "\x1cDELIVER_POLICY_BY_START_TIME\x10\x05*i\n" + + "\tAckPolicy\x12\x1a\n" + + "\x16ACK_POLICY_UNSPECIFIED\x10\x00\x12\x17\n" + + "\x13ACK_POLICY_EXPLICIT\x10\x01\x12\x13\n" + + "\x0fACK_POLICY_NONE\x10\x02\x12\x12\n" + + "\x0eACK_POLICY_ALL\x10\x03*\xda\x01\n" + + "\x15KafkaSecurityProtocol\x12'\n" + + "#KAFKA_SECURITY_PROTOCOL_UNSPECIFIED\x10\x00\x12%\n" + + "!KAFKA_SECURITY_PROTOCOL_PLAINTEXT\x10\x01\x12\x1f\n" + + "\x1bKAFKA_SECURITY_PROTOCOL_SSL\x10\x02\x12*\n" + + "&KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT\x10\x03\x12$\n" + + " KAFKA_SECURITY_PROTOCOL_SASL_SSL\x10\x04*\xaa\x01\n" + + "\x12KafkaSaslMechanism\x12$\n" + + " KAFKA_SASL_MECHANISM_UNSPECIFIED\x10\x00\x12\x1e\n" + + "\x1aKAFKA_SASL_MECHANISM_PLAIN\x10\x01\x12&\n" + + "\"KAFKA_SASL_MECHANISM_SCRAM_SHA_256\x10\x02\x12&\n" + + "\"KAFKA_SASL_MECHANISM_SCRAM_SHA_512\x10\x03B@Z>github.com/GoCodeAlone/workflow-plugin-eventbus/gen;eventbusv1b\x06proto3" + +var ( + file_eventbus_proto_rawDescOnce sync.Once + file_eventbus_proto_rawDescData []byte +) + +func file_eventbus_proto_rawDescGZIP() []byte { + file_eventbus_proto_rawDescOnce.Do(func() { + file_eventbus_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_eventbus_proto_rawDesc), len(file_eventbus_proto_rawDesc))) + }) + return file_eventbus_proto_rawDescData +} + +var file_eventbus_proto_enumTypes = make([]protoimpl.EnumInfo, 5) +var file_eventbus_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_eventbus_proto_goTypes = []any{ + (RetentionPolicy)(0), // 0: workflow.plugin.eventbus.v1.RetentionPolicy + (DeliverPolicy)(0), // 1: workflow.plugin.eventbus.v1.DeliverPolicy + (AckPolicy)(0), // 2: workflow.plugin.eventbus.v1.AckPolicy + (KafkaSecurityProtocol)(0), // 3: workflow.plugin.eventbus.v1.KafkaSecurityProtocol + (KafkaSaslMechanism)(0), // 4: workflow.plugin.eventbus.v1.KafkaSaslMechanism + (*ClusterConfig)(nil), // 5: workflow.plugin.eventbus.v1.ClusterConfig + (*JetStreamConfig)(nil), // 6: workflow.plugin.eventbus.v1.JetStreamConfig + (*KafkaConfig)(nil), // 7: workflow.plugin.eventbus.v1.KafkaConfig + (*KinesisConfig)(nil), // 8: workflow.plugin.eventbus.v1.KinesisConfig + (*ResourceLimits)(nil), // 9: workflow.plugin.eventbus.v1.ResourceLimits + (*StreamConfig)(nil), // 10: workflow.plugin.eventbus.v1.StreamConfig + (*ConsumerConfig)(nil), // 11: workflow.plugin.eventbus.v1.ConsumerConfig + (*PublishRequest)(nil), // 12: workflow.plugin.eventbus.v1.PublishRequest + (*PublishResponse)(nil), // 13: workflow.plugin.eventbus.v1.PublishResponse + (*ConsumeRequest)(nil), // 14: workflow.plugin.eventbus.v1.ConsumeRequest + (*ConsumeResponse)(nil), // 15: workflow.plugin.eventbus.v1.ConsumeResponse + (*Message)(nil), // 16: workflow.plugin.eventbus.v1.Message + (*AckRequest)(nil), // 17: workflow.plugin.eventbus.v1.AckRequest + (*AckResponse)(nil), // 18: workflow.plugin.eventbus.v1.AckResponse + nil, // 19: workflow.plugin.eventbus.v1.PublishRequest.HeadersEntry + nil, // 20: workflow.plugin.eventbus.v1.Message.HeadersEntry + (*durationpb.Duration)(nil), // 21: google.protobuf.Duration +} +var file_eventbus_proto_depIdxs = []int32{ + 6, // 0: workflow.plugin.eventbus.v1.ClusterConfig.jetstream:type_name -> workflow.plugin.eventbus.v1.JetStreamConfig + 7, // 1: workflow.plugin.eventbus.v1.ClusterConfig.kafka:type_name -> workflow.plugin.eventbus.v1.KafkaConfig + 8, // 2: workflow.plugin.eventbus.v1.ClusterConfig.kinesis:type_name -> workflow.plugin.eventbus.v1.KinesisConfig + 9, // 3: workflow.plugin.eventbus.v1.ClusterConfig.limits:type_name -> workflow.plugin.eventbus.v1.ResourceLimits + 21, // 4: workflow.plugin.eventbus.v1.JetStreamConfig.max_age:type_name -> google.protobuf.Duration + 3, // 5: workflow.plugin.eventbus.v1.KafkaConfig.security_protocol:type_name -> workflow.plugin.eventbus.v1.KafkaSecurityProtocol + 4, // 6: workflow.plugin.eventbus.v1.KafkaConfig.sasl_mechanism:type_name -> workflow.plugin.eventbus.v1.KafkaSaslMechanism + 0, // 7: workflow.plugin.eventbus.v1.StreamConfig.retention_policy:type_name -> workflow.plugin.eventbus.v1.RetentionPolicy + 21, // 8: workflow.plugin.eventbus.v1.StreamConfig.max_age:type_name -> google.protobuf.Duration + 21, // 9: workflow.plugin.eventbus.v1.StreamConfig.ack_wait:type_name -> google.protobuf.Duration + 1, // 10: workflow.plugin.eventbus.v1.ConsumerConfig.deliver_policy:type_name -> workflow.plugin.eventbus.v1.DeliverPolicy + 2, // 11: workflow.plugin.eventbus.v1.ConsumerConfig.ack_policy:type_name -> workflow.plugin.eventbus.v1.AckPolicy + 19, // 12: workflow.plugin.eventbus.v1.PublishRequest.headers:type_name -> workflow.plugin.eventbus.v1.PublishRequest.HeadersEntry + 21, // 13: workflow.plugin.eventbus.v1.ConsumeRequest.max_wait:type_name -> google.protobuf.Duration + 16, // 14: workflow.plugin.eventbus.v1.ConsumeResponse.messages:type_name -> workflow.plugin.eventbus.v1.Message + 20, // 15: workflow.plugin.eventbus.v1.Message.headers:type_name -> workflow.plugin.eventbus.v1.Message.HeadersEntry + 16, // [16:16] is the sub-list for method output_type + 16, // [16:16] is the sub-list for method input_type + 16, // [16:16] is the sub-list for extension type_name + 16, // [16:16] is the sub-list for extension extendee + 0, // [0:16] is the sub-list for field type_name +} + +func init() { file_eventbus_proto_init() } +func file_eventbus_proto_init() { + if File_eventbus_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_eventbus_proto_rawDesc), len(file_eventbus_proto_rawDesc)), + NumEnums: 5, + NumMessages: 16, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_eventbus_proto_goTypes, + DependencyIndexes: file_eventbus_proto_depIdxs, + EnumInfos: file_eventbus_proto_enumTypes, + MessageInfos: file_eventbus_proto_msgTypes, + }.Build() + File_eventbus_proto = out.File + file_eventbus_proto_goTypes = nil + file_eventbus_proto_depIdxs = nil +} diff --git a/gen/proto_smoke_test.go b/gen/proto_smoke_test.go new file mode 100644 index 0000000..1c44d9d --- /dev/null +++ b/gen/proto_smoke_test.go @@ -0,0 +1,83 @@ +package eventbusv1_test + +import ( + "testing" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "google.golang.org/protobuf/proto" +) + +// TestClusterConfigRoundTrip verifies that ClusterConfig survives a proto +// marshal → unmarshal cycle without data loss. This keeps the CI test job +// non-vacuous even before provider implementations exist. +func TestClusterConfigRoundTrip(t *testing.T) { + orig := &eventbusv1.ClusterConfig{ + Provider: "nats", + DeployTarget: "kubernetes", + Replicas: 3, + Jetstream: &eventbusv1.JetStreamConfig{ + Enabled: true, + MaxStorageBytes: 53687091200, + }, + } + b, err := proto.Marshal(orig) + if err != nil { + t.Fatalf("marshal: %v", err) + } + got := &eventbusv1.ClusterConfig{} + if err := proto.Unmarshal(b, got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if got.Provider != orig.Provider || got.Replicas != orig.Replicas || got.DeployTarget != orig.DeployTarget { + t.Errorf("round-trip mismatch: got %v, want %v", got, orig) + } + if got.Jetstream == nil || got.Jetstream.Enabled != orig.Jetstream.Enabled { + t.Errorf("jetstream round-trip mismatch") + } +} + +// TestConsumerConfigEnums verifies that typed enum fields survive a round-trip +// and are not silently zeroed out. +func TestConsumerConfigEnums(t *testing.T) { + orig := &eventbusv1.ConsumerConfig{ + Name: "bmw-handler", + StreamName: "BMW_FULFILLMENT", + DeliverPolicy: eventbusv1.DeliverPolicy_DELIVER_POLICY_NEW, + AckPolicy: eventbusv1.AckPolicy_ACK_POLICY_EXPLICIT, + MaxDeliver: 5, + } + b, err := proto.Marshal(orig) + if err != nil { + t.Fatalf("marshal: %v", err) + } + got := &eventbusv1.ConsumerConfig{} + if err := proto.Unmarshal(b, got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if got.DeliverPolicy != orig.DeliverPolicy { + t.Errorf("deliver_policy: got %v, want %v", got.DeliverPolicy, orig.DeliverPolicy) + } + if got.AckPolicy != orig.AckPolicy { + t.Errorf("ack_policy: got %v, want %v", got.AckPolicy, orig.AckPolicy) + } +} + +// TestStreamConfigRetentionPolicy verifies RetentionPolicy enum round-trip. +func TestStreamConfigRetentionPolicy(t *testing.T) { + orig := &eventbusv1.StreamConfig{ + Name: "BMW_FULFILLMENT", + RetentionPolicy: eventbusv1.RetentionPolicy_RETENTION_POLICY_WORKQUEUE, + NumReplicas: 3, + } + b, err := proto.Marshal(orig) + if err != nil { + t.Fatalf("marshal: %v", err) + } + got := &eventbusv1.StreamConfig{} + if err := proto.Unmarshal(b, got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if got.RetentionPolicy != orig.RetentionPolicy { + t.Errorf("retention_policy: got %v, want %v", got.RetentionPolicy, orig.RetentionPolicy) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..49ca598 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/GoCodeAlone/workflow-plugin-eventbus + +go 1.26.0 + +require google.golang.org/protobuf v1.36.11 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..296be18 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= diff --git a/iac/iac.go b/iac/iac.go new file mode 100644 index 0000000..10adfdc --- /dev/null +++ b/iac/iac.go @@ -0,0 +1,49 @@ +// Package iac defines typed IaC primitives emitted by eventbus Providers. +// These are consumed by downstream IaC provider plugins +// (workflow-plugin-digitalocean, workflow-plugin-aws, etc.) which realize +// them against the target cloud. Zero map[string]any — all fields are typed. +package iac + +// Resource is a typed IaC resource declaration. +// The Kind field identifies which IaC provider plugin realizes this resource. +type Resource struct { + // Kind is the resource type, e.g. "infra.app", "infra.ecs.task_definition", + // "infra.k8s.statefulset", "infra.aws.kinesis_stream". + Kind string + // Name is the resource instance name, unique within the deployment environment. + Name string + // Properties carries resource-type-specific configuration key/value pairs. + // Schema per Kind is defined by the downstream IaC plugin that realizes this resource. + // Examples: image, replicas, storage_size, port, shard_count. + Properties map[string]string + // Labels are metadata tags attached to the resource (e.g. "env": "prod", "team": "bmw"). + // These are not config — they are used for filtering, cost allocation, and observability. + Labels map[string]string +} + +// Output is a single state value with explicit sensitivity marking. +// Sensitive outputs (connection strings, credentials, ARNs with embedded secrets) +// must be marked Sensitive: true to prevent accidental logging in plan output. +type Output struct { + // Value is the resolved output string. + Value string + // Sensitive marks outputs that must not appear in plain-text plan/log output. + // The workflow engine suppresses sensitive outputs unless --show-sensitive is passed. + Sensitive bool +} + +// State holds resolved outputs from previously provisioned resources. +// Each output is explicitly typed and sensitivity-marked. +type State struct { + // Outputs maps output key to a typed, sensitivity-marked Output value. + // Example: "uri" → Output{Value: "nats://host:4222", Sensitive: true} + Outputs map[string]Output +} + +// Output returns the value for key and whether it was present. +// The Sensitive flag is not surfaced here — callers that need it should read +// s.Outputs[key] directly. +func (s State) Output(key string) (string, bool) { + out, ok := s.Outputs[key] + return out.Value, ok +} diff --git a/plugin.contracts.json b/plugin.contracts.json new file mode 100644 index 0000000..45bc35e --- /dev/null +++ b/plugin.contracts.json @@ -0,0 +1,51 @@ +{ + "version": "1", + "contracts": [ + { + "kind": "module", + "type": "infra.eventbus", + "mode": "strict_proto", + "config": "workflow.plugin.eventbus.v1.ClusterConfig" + }, + { + "kind": "module", + "type": "infra.eventbus.stream", + "mode": "strict_proto", + "config": "workflow.plugin.eventbus.v1.StreamConfig" + }, + { + "kind": "module", + "type": "infra.eventbus.consumer", + "mode": "strict_proto", + "config": "workflow.plugin.eventbus.v1.ConsumerConfig" + }, + { + "kind": "step", + "type": "step.eventbus.publish", + "mode": "strict_proto", + "input": "workflow.plugin.eventbus.v1.PublishRequest", + "output": "workflow.plugin.eventbus.v1.PublishResponse" + }, + { + "kind": "step", + "type": "step.eventbus.consume", + "mode": "strict_proto", + "input": "workflow.plugin.eventbus.v1.ConsumeRequest", + "output": "workflow.plugin.eventbus.v1.ConsumeResponse" + }, + { + "kind": "step", + "type": "step.eventbus.ack", + "mode": "strict_proto", + "input": "workflow.plugin.eventbus.v1.AckRequest", + "output": "workflow.plugin.eventbus.v1.AckResponse" + }, + { + "kind": "trigger", + "type": "trigger.eventbus.subscribe", + "mode": "strict_proto", + "config": "workflow.plugin.eventbus.v1.ConsumerConfig", + "output": "workflow.plugin.eventbus.v1.Message" + } + ] +} diff --git a/plugin.json b/plugin.json new file mode 100644 index 0000000..9024ba8 --- /dev/null +++ b/plugin.json @@ -0,0 +1,58 @@ +{ + "name": "workflow-plugin-eventbus", + "version": "0.1.0", + "description": "Provisions durable event-bus clusters (NATS / Kafka / Kinesis) as IaC and exposes typed pipeline steps for publish / consume operations.", + "author": "GoCodeAlone", + "license": "MIT", + "type": "external", + "tier": "community", + "private": false, + "minEngineVersion": "0.20.0", + "keywords": ["eventbus", "nats", "kafka", "kinesis", "iac", "infra", "jetstream"], + "homepage": "https://github.com/GoCodeAlone/workflow-plugin-eventbus", + "repository": "https://github.com/GoCodeAlone/workflow-plugin-eventbus", + "capabilities": { + "configProvider": false, + "moduleTypes": [ + "infra.eventbus", + "infra.eventbus.stream", + "infra.eventbus.consumer" + ], + "stepTypes": [ + "step.eventbus.publish", + "step.eventbus.consume", + "step.eventbus.ack" + ], + "triggerTypes": [ + "trigger.eventbus.subscribe" + ] + }, + "contracts": [], + "downloads": [ + { + "os": "linux", + "arch": "amd64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-linux-amd64.tar.gz" + }, + { + "os": "linux", + "arch": "arm64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-linux-arm64.tar.gz" + }, + { + "os": "darwin", + "arch": "amd64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-darwin-amd64.tar.gz" + }, + { + "os": "darwin", + "arch": "arm64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-darwin-arm64.tar.gz" + }, + { + "os": "windows", + "arch": "amd64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-eventbus/releases/download/v0.1.0/workflow-plugin-eventbus-windows-amd64.tar.gz" + } + ] +} diff --git a/proto/eventbus.proto b/proto/eventbus.proto new file mode 100644 index 0000000..f504a82 --- /dev/null +++ b/proto/eventbus.proto @@ -0,0 +1,231 @@ +syntax = "proto3"; +package workflow.plugin.eventbus.v1; + +option go_package = "github.com/GoCodeAlone/workflow-plugin-eventbus/gen;eventbusv1"; + +import "google/protobuf/duration.proto"; + +// ─── enums ──────────────────────────────────────────────────────────────────── + +// RetentionPolicy controls how messages are retained in a stream. +enum RetentionPolicy { + RETENTION_POLICY_UNSPECIFIED = 0; + // RETENTION_POLICY_LIMITS retains messages up to configured size / age limits. + RETENTION_POLICY_LIMITS = 1; + // RETENTION_POLICY_INTEREST retains messages while at least one consumer is active. + RETENTION_POLICY_INTEREST = 2; + // RETENTION_POLICY_WORKQUEUE retains messages until they are acknowledged by one consumer. + RETENTION_POLICY_WORKQUEUE = 3; +} + +// DeliverPolicy controls which messages a consumer receives on first attach. +enum DeliverPolicy { + DELIVER_POLICY_UNSPECIFIED = 0; + // DELIVER_POLICY_ALL delivers all messages from the beginning of the stream. + DELIVER_POLICY_ALL = 1; + // DELIVER_POLICY_LAST delivers only the last message per subject. + DELIVER_POLICY_LAST = 2; + // DELIVER_POLICY_NEW delivers only messages published after the consumer is created. + DELIVER_POLICY_NEW = 3; + // DELIVER_POLICY_BY_START_SEQUENCE delivers messages starting from a specific sequence. + DELIVER_POLICY_BY_START_SEQUENCE = 4; + // DELIVER_POLICY_BY_START_TIME delivers messages starting from a specific time. + DELIVER_POLICY_BY_START_TIME = 5; +} + +// AckPolicy controls how consumers must acknowledge messages. +enum AckPolicy { + ACK_POLICY_UNSPECIFIED = 0; + // ACK_POLICY_EXPLICIT requires explicit per-message acknowledgement (recommended). + ACK_POLICY_EXPLICIT = 1; + // ACK_POLICY_NONE disables acknowledgements entirely (fire-and-forget). + ACK_POLICY_NONE = 2; + // ACK_POLICY_ALL acknowledges all messages up to and including the specified sequence. + ACK_POLICY_ALL = 3; +} + +// KafkaSecurityProtocol selects the client-broker transport security mode. +enum KafkaSecurityProtocol { + KAFKA_SECURITY_PROTOCOL_UNSPECIFIED = 0; + KAFKA_SECURITY_PROTOCOL_PLAINTEXT = 1; + KAFKA_SECURITY_PROTOCOL_SSL = 2; + KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT = 3; + KAFKA_SECURITY_PROTOCOL_SASL_SSL = 4; +} + +// KafkaSaslMechanism selects the SASL authentication mechanism. +enum KafkaSaslMechanism { + KAFKA_SASL_MECHANISM_UNSPECIFIED = 0; + KAFKA_SASL_MECHANISM_PLAIN = 1; + KAFKA_SASL_MECHANISM_SCRAM_SHA_256 = 2; + KAFKA_SASL_MECHANISM_SCRAM_SHA_512 = 3; +} + +// ─── cluster / module configs ───────────────────────────────────────────────── + +// ClusterConfig is the typed config for infra.eventbus module. +// provider and deploy_target select the Provider × DeployTarget combination. +message ClusterConfig { + // provider selects the message-broker backend: "nats" | "kafka" | "kinesis". + string provider = 1; + // deploy_target selects the deployment platform: + // "digitalocean.app_platform" | "aws.ecs" | "aws.eks" | "kubernetes" | etc. + string deploy_target = 2; + // version is the broker version to deploy (e.g. "2.10" for NATS). + string version = 3; + // replicas is the number of broker replicas. + int32 replicas = 4; + // jetstream holds NATS JetStream-specific configuration; ignored for other providers. + JetStreamConfig jetstream = 5; + // kafka holds Kafka-specific cluster configuration; ignored for other providers. + KafkaConfig kafka = 6; + // kinesis holds Kinesis-specific configuration; ignored for other providers. + KinesisConfig kinesis = 7; + // limits sets compute/storage resource limits for the cluster containers. + ResourceLimits limits = 8; +} + +// JetStreamConfig holds NATS JetStream-specific settings. +message JetStreamConfig { + // enabled activates JetStream on the NATS cluster (required for durability). + bool enabled = 1; + // max_storage_bytes caps total on-disk storage for all streams (0 = unlimited). + int64 max_storage_bytes = 2; + // max_memory_bytes caps in-memory storage (0 = unlimited). + int64 max_memory_bytes = 3; + // max_age is the default maximum age for messages across all streams. + google.protobuf.Duration max_age = 4; +} + +// KafkaConfig holds Kafka-specific cluster settings. +message KafkaConfig { + // bootstrap_servers is the initial broker address list (comma-separated). + // Used when connecting to an existing Kafka cluster rather than provisioning one. + string bootstrap_servers = 1; + // security_protocol specifies the protocol for client-broker communication. + KafkaSecurityProtocol security_protocol = 2; + // sasl_mechanism for SASL authentication. + KafkaSaslMechanism sasl_mechanism = 3; + // default_replication_factor for new topics (0 = use broker default). + int32 default_replication_factor = 4; + // min_insync_replicas is the minimum number of in-sync replicas required for acks. + int32 min_insync_replicas = 5; +} + +// KinesisConfig holds AWS Kinesis Data Streams-specific settings. +message KinesisConfig { + // region is the AWS region for the Kinesis stream (e.g. "us-east-1"). + string region = 1; + // shard_count is the initial number of shards (throughput units). + int32 shard_count = 2; + // retention_period_hours is the message retention window (24–8760 hours). + int32 retention_period_hours = 3; +} + +// ResourceLimits constrains compute and storage for cluster containers. +message ResourceLimits { + // cpu is the CPU limit in Kubernetes notation (e.g. "500m", "2"). + string cpu = 1; + // memory is the memory limit (e.g. "512Mi", "2Gi"). + string memory = 2; + // storage is the persistent-volume size (e.g. "10Gi"). + string storage = 3; +} + +// StreamConfig is the typed config for infra.eventbus.stream module. +message StreamConfig { + // name is the stream name (e.g. "BMW_FULFILLMENT"). + string name = 1; + // subjects lists the NATS subject filter patterns bound to this stream. + repeated string subjects = 2; + // retention_policy controls message retention semantics. + RetentionPolicy retention_policy = 3; + // num_replicas is the number of stream replicas within the cluster. + int32 num_replicas = 4; + // max_bytes caps total on-disk storage for this stream (0 = unlimited). + int64 max_bytes = 5; + // max_age is the maximum age for messages in this stream. + google.protobuf.Duration max_age = 6; + // ack_wait is the default acknowledgement deadline for consumers on this stream. + google.protobuf.Duration ack_wait = 7; +} + +// ConsumerConfig is the typed config for infra.eventbus.consumer module +// and trigger.eventbus.subscribe. +message ConsumerConfig { + // name is the durable consumer name. + string name = 1; + // stream_name is the stream this consumer attaches to. + string stream_name = 2; + // filter_subject narrows delivery to messages matching this subject pattern. + string filter_subject = 3; + // deliver_policy controls which messages are delivered on first attach. + DeliverPolicy deliver_policy = 4; + // ack_policy controls how acknowledgements are required. + AckPolicy ack_policy = 5; + // max_deliver is the maximum number of delivery attempts before NACKing (0 = unlimited). + int32 max_deliver = 6; +} + +// ─── step inputs / outputs ──────────────────────────────────────────────────── + +// PublishRequest is the input for step.eventbus.publish. +message PublishRequest { + // subject is the NATS subject (or Kafka topic) to publish to. + string subject = 1; + // payload is the raw message bytes. + bytes payload = 2; + // headers are optional key/value metadata attached to the message. + map headers = 3; + // correlation_id is an opaque identifier for request/reply correlation or tracing. + string correlation_id = 4; +} + +// PublishResponse is the output from step.eventbus.publish. +message PublishResponse { + // sequence is the JetStream stream sequence number; empty for non-JetStream brokers. + string sequence = 1; + // acked_at is the timestamp when the broker acknowledged the message (RFC3339). + string acked_at = 2; +} + +// ConsumeRequest is the input for step.eventbus.consume. +message ConsumeRequest { + // consumer is the durable consumer name to pull from. + string consumer = 1; + // batch_size is the maximum number of messages to return in one call. + int32 batch_size = 2; + // max_wait is the maximum time to block waiting for messages. + google.protobuf.Duration max_wait = 3; +} + +// ConsumeResponse is the output from step.eventbus.consume. +message ConsumeResponse { + // messages contains the delivered messages (up to batch_size). + repeated Message messages = 1; +} + +// Message represents a single delivered event-bus message. +message Message { + // subject is the NATS subject (or Kafka topic/partition key) the message was published on. + string subject = 1; + // payload is the raw message bytes. + bytes payload = 2; + // headers are key/value metadata attached to the message. + map headers = 3; + // sequence is the stream sequence number assigned by the broker. + string sequence = 4; + // published_at is the broker-assigned publish timestamp (RFC3339). + string published_at = 5; + // ack_token is the opaque token used to acknowledge this message via step.eventbus.ack. + string ack_token = 6; +} + +// AckRequest is the input for step.eventbus.ack. +message AckRequest { + // ack_token is the opaque acknowledgement token from Message.ack_token. + string ack_token = 1; +} + +// AckResponse is the output from step.eventbus.ack. +message AckResponse {} diff --git a/providers/kafka/kafka.go b/providers/kafka/kafka.go new file mode 100644 index 0000000..0e1d410 --- /dev/null +++ b/providers/kafka/kafka.go @@ -0,0 +1,54 @@ +// Package kafka provides a stub implementation of the kafka event-bus Provider. +// +// Per pilot manifest out-of-scope: "DO Managed Kafka and AWS Kinesis as eventbus +// providers active for pilot — built into plugin but not activated; NATS only." +// This stub registers the kafka provider in the registry so that config referencing +// provider: kafka fails fast with a clear error rather than panicking or silently +// doing nothing. Real implementation lands when a downstream consumer activates it. +package kafka + +import ( + "errors" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// errNotImplemented is the canonical error returned by all stub methods. +var errNotImplemented = errors.New("kafka provider not implemented for pilot; register a real implementation when activating") + +// provider is the stub kafka Provider. +type provider struct{} + +// New returns the stub kafka Provider. +func New() providers.Provider { + return &provider{} +} + +// Name implements providers.Provider. +func (p *provider) Name() string { return "kafka" } + +// Resources implements providers.Provider — stub, always errors. +func (p *provider) Resources(_ *eventbusv1.ClusterConfig, _ providers.DeployTarget) ([]iac.Resource, error) { + return nil, errNotImplemented +} + +// ConnectionString implements providers.Provider — stub, always errors. +func (p *provider) ConnectionString(_ iac.State, _ string) (string, error) { + return "", errNotImplemented +} + +// StreamResources implements providers.Provider — stub, always errors. +func (p *provider) StreamResources(_ []*eventbusv1.StreamConfig, _ iac.State) ([]iac.Resource, error) { + return nil, errNotImplemented +} + +// Probe implements providers.Provider — stub, always returns unreachable. +func (p *provider) Probe(uri string) providers.HealthCheck { + return providers.HealthCheck{ + URI: uri, + Status: providers.HealthStatusUnreachable, + Err: errNotImplemented, + } +} diff --git a/providers/kafka/kafka_test.go b/providers/kafka/kafka_test.go new file mode 100644 index 0000000..0a6eab1 --- /dev/null +++ b/providers/kafka/kafka_test.go @@ -0,0 +1,68 @@ +package kafka_test + +import ( + "strings" + "testing" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/kafka" +) + +// TestKafkaProvider_Name asserts the provider reports the correct identifier. +func TestKafkaProvider_Name(t *testing.T) { + p := kafka.New() + if p.Name() != "kafka" { + t.Errorf("Name() = %q, want %q", p.Name(), "kafka") + } +} + +// TestKafkaProvider_Stub_ErrorsOnResources asserts that Resources returns a +// "not implemented" error — the kafka provider is a registry stub for the pilot. +func TestKafkaProvider_Stub_ErrorsOnResources(t *testing.T) { + p := kafka.New() + _, err := p.Resources(&eventbusv1.ClusterConfig{}, providers.TargetDigitalOceanManagedKafka) + if err == nil { + t.Fatal("expected stub error from Resources, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKafkaProvider_Stub_ErrorsOnConnectionString asserts ConnectionString also stubs. +func TestKafkaProvider_Stub_ErrorsOnConnectionString(t *testing.T) { + p := kafka.New() + _, err := p.ConnectionString(iac.State{}, "prod") + if err == nil { + t.Fatal("expected stub error from ConnectionString, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKafkaProvider_Stub_ErrorsOnStreamResources asserts StreamResources also stubs. +func TestKafkaProvider_Stub_ErrorsOnStreamResources(t *testing.T) { + p := kafka.New() + _, err := p.StreamResources(nil, iac.State{}) + if err == nil { + t.Fatal("expected stub error from StreamResources, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKafkaProvider_Stub_ProbeReturnsUnreachable asserts Probe returns unreachable. +func TestKafkaProvider_Stub_ProbeReturnsUnreachable(t *testing.T) { + p := kafka.New() + hc := p.Probe("kafka://localhost:9092") + if hc.Status != providers.HealthStatusUnreachable { + t.Errorf("Probe() status = %q, want %q", hc.Status, providers.HealthStatusUnreachable) + } + if hc.Err == nil { + t.Error("Probe() Err should be non-nil for stub") + } +} diff --git a/providers/kinesis/kinesis.go b/providers/kinesis/kinesis.go new file mode 100644 index 0000000..2d14338 --- /dev/null +++ b/providers/kinesis/kinesis.go @@ -0,0 +1,54 @@ +// Package kinesis provides a stub implementation of the kinesis event-bus Provider. +// +// Per pilot manifest out-of-scope: "DO Managed Kafka and AWS Kinesis as eventbus +// providers active for pilot — built into plugin but not activated; NATS only." +// This stub registers the kinesis provider so that config referencing provider: kinesis +// fails fast with a clear error. Real implementation lands when a downstream consumer +// activates it. +package kinesis + +import ( + "errors" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// errNotImplemented is the canonical error returned by all stub methods. +var errNotImplemented = errors.New("kinesis provider not implemented for pilot; register a real implementation when activating") + +// provider is the stub kinesis Provider. +type provider struct{} + +// New returns the stub kinesis Provider. +func New() providers.Provider { + return &provider{} +} + +// Name implements providers.Provider. +func (p *provider) Name() string { return "kinesis" } + +// Resources implements providers.Provider — stub, always errors. +func (p *provider) Resources(_ *eventbusv1.ClusterConfig, _ providers.DeployTarget) ([]iac.Resource, error) { + return nil, errNotImplemented +} + +// ConnectionString implements providers.Provider — stub, always errors. +func (p *provider) ConnectionString(_ iac.State, _ string) (string, error) { + return "", errNotImplemented +} + +// StreamResources implements providers.Provider — stub, always errors. +func (p *provider) StreamResources(_ []*eventbusv1.StreamConfig, _ iac.State) ([]iac.Resource, error) { + return nil, errNotImplemented +} + +// Probe implements providers.Provider — stub, always returns unreachable. +func (p *provider) Probe(uri string) providers.HealthCheck { + return providers.HealthCheck{ + URI: uri, + Status: providers.HealthStatusUnreachable, + Err: errNotImplemented, + } +} diff --git a/providers/kinesis/kinesis_test.go b/providers/kinesis/kinesis_test.go new file mode 100644 index 0000000..660624d --- /dev/null +++ b/providers/kinesis/kinesis_test.go @@ -0,0 +1,68 @@ +package kinesis_test + +import ( + "strings" + "testing" + + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/kinesis" +) + +// TestKinesisProvider_Name asserts the provider reports the correct identifier. +func TestKinesisProvider_Name(t *testing.T) { + p := kinesis.New() + if p.Name() != "kinesis" { + t.Errorf("Name() = %q, want %q", p.Name(), "kinesis") + } +} + +// TestKinesisProvider_Stub_ErrorsOnResources asserts that Resources returns a +// "not implemented" error — the kinesis provider is a registry stub for the pilot. +func TestKinesisProvider_Stub_ErrorsOnResources(t *testing.T) { + p := kinesis.New() + _, err := p.Resources(&eventbusv1.ClusterConfig{}, providers.TargetAWSKinesis) + if err == nil { + t.Fatal("expected stub error from Resources, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKinesisProvider_Stub_ErrorsOnConnectionString asserts ConnectionString also stubs. +func TestKinesisProvider_Stub_ErrorsOnConnectionString(t *testing.T) { + p := kinesis.New() + _, err := p.ConnectionString(iac.State{}, "prod") + if err == nil { + t.Fatal("expected stub error from ConnectionString, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKinesisProvider_Stub_ErrorsOnStreamResources asserts StreamResources also stubs. +func TestKinesisProvider_Stub_ErrorsOnStreamResources(t *testing.T) { + p := kinesis.New() + _, err := p.StreamResources(nil, iac.State{}) + if err == nil { + t.Fatal("expected stub error from StreamResources, got nil") + } + if !strings.Contains(err.Error(), "not implemented") { + t.Errorf("error does not contain 'not implemented': %v", err) + } +} + +// TestKinesisProvider_Stub_ProbeReturnsUnreachable asserts Probe returns unreachable. +func TestKinesisProvider_Stub_ProbeReturnsUnreachable(t *testing.T) { + p := kinesis.New() + hc := p.Probe("kinesis://us-east-1") + if hc.Status != providers.HealthStatusUnreachable { + t.Errorf("Probe() status = %q, want %q", hc.Status, providers.HealthStatusUnreachable) + } + if hc.Err == nil { + t.Error("Probe() Err should be non-nil for stub") + } +} diff --git a/providers/provider.go b/providers/provider.go new file mode 100644 index 0000000..4736989 --- /dev/null +++ b/providers/provider.go @@ -0,0 +1,63 @@ +// Package providers defines the Provider interface and DeployTarget compatibility +// matrix for workflow-plugin-eventbus. Each provider (nats, kafka, kinesis) +// implements Provider and emits typed IaC resource declarations for its supported +// deploy targets. +package providers + +import ( + eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen" + "github.com/GoCodeAlone/workflow-plugin-eventbus/iac" +) + +// HealthStatus is the typed health state returned by Provider.Probe. +type HealthStatus string + +const ( + // HealthStatusHealthy indicates the broker is reachable and fully operational. + HealthStatusHealthy HealthStatus = "healthy" + // HealthStatusDegraded indicates the broker is reachable but impaired + // (e.g. a replica is down, JetStream is slow). + HealthStatusDegraded HealthStatus = "degraded" + // HealthStatusUnreachable indicates the broker did not respond within the probe timeout. + HealthStatusUnreachable HealthStatus = "unreachable" +) + +// HealthCheck is the result of a liveness probe against an event-bus URI. +type HealthCheck struct { + // URI is the address that was probed. + URI string + // Status is the typed health state. + Status HealthStatus + // Err is non-nil when Status is HealthStatusDegraded or HealthStatusUnreachable. + Err error +} + +// Provider is the interface all event-bus provider adapters must implement. +// Each provider translates a ClusterConfig + DeployTarget into a set of typed +// IaC resource declarations without directly calling cloud APIs. +// +// Implementations live at providers/{nats,kafka,kinesis}/. +type Provider interface { + // Name returns the provider identifier: "nats", "kafka", or "kinesis". + Name() string + + // Resources returns the IaC resource declarations required to provision + // the event-bus cluster described by cfg on the given deploy target. + // cfg is passed as a pointer because proto messages embed sync.Mutex via + // protoimpl.MessageState and must not be copied by value. + // Returns an error if the provider × target combination is unsupported + // or if cfg is invalid for this provider. + Resources(cfg *eventbusv1.ClusterConfig, target DeployTarget) ([]iac.Resource, error) + + // ConnectionString derives the broker connection string from provisioned state. + // env selects environment-specific outputs (e.g. "prod", "staging"). + ConnectionString(state iac.State, env string) (string, error) + + // StreamResources returns the IaC resource declarations required to + // declare the given streams against an already-provisioned cluster + // (represented by state). Streams are pointers for the same reason as cfg. + StreamResources(streams []*eventbusv1.StreamConfig, state iac.State) ([]iac.Resource, error) + + // Probe probes the event-bus cluster at uri and returns its health state. + Probe(uri string) HealthCheck +} diff --git a/providers/provider_test.go b/providers/provider_test.go new file mode 100644 index 0000000..e1a1c9b --- /dev/null +++ b/providers/provider_test.go @@ -0,0 +1,99 @@ +package providers_test + +import ( + "testing" + + "github.com/GoCodeAlone/workflow-plugin-eventbus/providers" +) + +// TestValidateProviderTarget_CompatibilityMatrix asserts the full provider×target +// compatibility matrix. Every supported combo must return nil; every unsupported +// combo must return a non-nil error. +// +// IMPORTANT: add a new row whenever a new DeployTarget or provider is added — +// this test gates the entire matrix, not just individual combos. +func TestValidateProviderTarget_CompatibilityMatrix(t *testing.T) { + type combo struct { + provider string + target providers.DeployTarget + wantErr bool + } + cases := []combo{ + // ── nats: supported targets ──────────────────────────────────────────── + {"nats", providers.TargetDigitalOceanApp, false}, + {"nats", providers.TargetAWSECS, false}, + {"nats", providers.TargetAWSEKS, false}, + {"nats", providers.TargetKubernetes, false}, + {"nats", providers.TargetSelfHosted, false}, + // ── nats: unsupported targets ────────────────────────────────────────── + {"nats", providers.TargetDigitalOceanManagedKafka, true}, + {"nats", providers.TargetAWSManagedKafka, true}, + {"nats", providers.TargetAWSKinesis, true}, + + // ── kafka: supported targets ─────────────────────────────────────────── + {"kafka", providers.TargetDigitalOceanManagedKafka, false}, + {"kafka", providers.TargetAWSManagedKafka, false}, + {"kafka", providers.TargetKubernetes, false}, + {"kafka", providers.TargetSelfHosted, false}, + // ── kafka: unsupported targets ───────────────────────────────────────── + {"kafka", providers.TargetDigitalOceanApp, true}, + {"kafka", providers.TargetAWSECS, true}, + {"kafka", providers.TargetAWSEKS, true}, + {"kafka", providers.TargetAWSKinesis, true}, + + // ── kinesis: supported targets ───────────────────────────────────────── + {"kinesis", providers.TargetAWSKinesis, false}, + // ── kinesis: unsupported targets ─────────────────────────────────────── + {"kinesis", providers.TargetDigitalOceanApp, true}, // key case per task spec + {"kinesis", providers.TargetDigitalOceanManagedKafka, true}, + {"kinesis", providers.TargetAWSECS, true}, + {"kinesis", providers.TargetAWSEKS, true}, + {"kinesis", providers.TargetAWSManagedKafka, true}, + {"kinesis", providers.TargetKubernetes, true}, + {"kinesis", providers.TargetSelfHosted, true}, + } + + for _, tc := range cases { + t.Run(tc.provider+"×"+string(tc.target), func(t *testing.T) { + err := providers.ValidateProviderTarget(tc.provider, tc.target) + if tc.wantErr && err == nil { + t.Errorf("expected validation error for %s × %s, got nil", tc.provider, tc.target) + } + if !tc.wantErr && err != nil { + t.Errorf("unexpected error for %s × %s: %v", tc.provider, tc.target, err) + } + }) + } +} + +// TestValidateProviderTarget_UnknownProvider asserts that an unrecognised +// provider name returns an error regardless of target. +func TestValidateProviderTarget_UnknownProvider(t *testing.T) { + err := providers.ValidateProviderTarget("rabbitmq", providers.TargetDigitalOceanApp) + if err == nil { + t.Error("expected error for unknown provider 'rabbitmq', got nil") + } +} + +// TestValidateProviderTarget_EdgeCases covers inputs that callers might pass +// accidentally: empty string and wrong case. +func TestValidateProviderTarget_EdgeCases(t *testing.T) { + cases := []struct { + name string + provider string + target providers.DeployTarget + }{ + {"empty provider", "", providers.TargetDigitalOceanApp}, + {"uppercase NATS", "NATS", providers.TargetDigitalOceanApp}, + {"mixed case Kafka", "Kafka", providers.TargetAWSManagedKafka}, + {"uppercase KINESIS", "KINESIS", providers.TargetAWSKinesis}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := providers.ValidateProviderTarget(tc.provider, tc.target) + if err == nil { + t.Errorf("expected error for provider %q, got nil", tc.provider) + } + }) + } +} diff --git a/providers/target.go b/providers/target.go new file mode 100644 index 0000000..6e600b9 --- /dev/null +++ b/providers/target.go @@ -0,0 +1,61 @@ +package providers + +import "fmt" + +// DeployTarget identifies the deployment platform for an event-bus cluster. +type DeployTarget string + +const ( + // TargetDigitalOceanApp deploys to DigitalOcean App Platform. + TargetDigitalOceanApp DeployTarget = "digitalocean.app_platform" + // TargetDigitalOceanManagedKafka deploys to DigitalOcean Managed Kafka. + TargetDigitalOceanManagedKafka DeployTarget = "digitalocean.managed_kafka" + // TargetAWSECS deploys to AWS ECS (Elastic Container Service). + TargetAWSECS DeployTarget = "aws.ecs" + // TargetAWSEKS deploys to AWS EKS (Elastic Kubernetes Service). + TargetAWSEKS DeployTarget = "aws.eks" + // TargetAWSManagedKafka deploys to AWS MSK (Managed Streaming for Apache Kafka). + TargetAWSManagedKafka DeployTarget = "aws.msk" + // TargetAWSKinesis deploys to AWS Kinesis Data Streams. + TargetAWSKinesis DeployTarget = "aws.kinesis" + // TargetKubernetes deploys to a generic Kubernetes cluster. + TargetKubernetes DeployTarget = "kubernetes" + // TargetSelfHosted deploys to a self-managed host (Docker, bare metal). + TargetSelfHosted DeployTarget = "self_hosted" +) + +// supportedTargets encodes the provider × DeployTarget compatibility matrix. +// Each provider maps to the set of deploy targets it supports. +// Combos absent from this map are rejected at config-load time. +var supportedTargets = map[string]map[DeployTarget]bool{ + "nats": { + TargetDigitalOceanApp: true, + TargetAWSECS: true, + TargetAWSEKS: true, + TargetKubernetes: true, + TargetSelfHosted: true, + }, + "kafka": { + TargetDigitalOceanManagedKafka: true, + TargetAWSManagedKafka: true, + TargetKubernetes: true, + TargetSelfHosted: true, + }, + "kinesis": { + TargetAWSKinesis: true, + }, +} + +// ValidateProviderTarget returns an error if the provider × target combination +// is unsupported. It is called at config-load time so misconfigured deployments +// are rejected before any IaC resources are emitted. +func ValidateProviderTarget(provider string, target DeployTarget) error { + targets, ok := supportedTargets[provider] + if !ok { + return fmt.Errorf("eventbus: unknown provider %q — supported providers are: nats, kafka, kinesis", provider) + } + if !targets[target] { + return fmt.Errorf("eventbus: provider %q does not support deploy target %q", provider, target) + } + return nil +}