Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions cmd/workflow-plugin-eventbus/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
// Command workflow-plugin-eventbus is a workflow engine external plugin that
// provisions durable event-bus clusters (NATS / Kafka / Kinesis) as IaC and
// exposes typed pipeline steps for publish / consume operations.
//
// Status: pre-pilot scaffold — provider implementations are in progress.
package main

import sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk"

// version is stamped at release time via goreleaser ldflags (-X main.version=<tag>).
var version = "dev"

func main() {
// TODO(Task 17): wire sdk.Serve(plugin.New()) once Provider interface is implemented.
panic("workflow-plugin-eventbus: provider implementation pending — scaffold only; see github.com/GoCodeAlone/workflow-plugin-eventbus")
sdk.Serve(&eventbusPlugin{})
}
223 changes: 223 additions & 0 deletions cmd/workflow-plugin-eventbus/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package main

import (
"errors"
"fmt"

"google.golang.org/protobuf/types/known/anypb"

eventbus "github.com/GoCodeAlone/workflow-plugin-eventbus"
eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen"
"github.com/GoCodeAlone/workflow-plugin-eventbus/steps"
pb "github.com/GoCodeAlone/workflow/plugin/external/proto"
sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk"
)

// eventbusPlugin implements sdk.PluginProvider, sdk.TypedModuleProvider,
// sdk.TypedStepProvider, sdk.TriggerProvider, and sdk.ContractProvider.
type eventbusPlugin struct{}

// Compile-time assertions.
var (
_ sdk.PluginProvider = (*eventbusPlugin)(nil)
_ sdk.TypedModuleProvider = (*eventbusPlugin)(nil)
_ sdk.TypedStepProvider = (*eventbusPlugin)(nil)
_ sdk.TriggerProvider = (*eventbusPlugin)(nil)
_ sdk.ContractProvider = (*eventbusPlugin)(nil)
)

// ── PluginProvider ────────────────────────────────────────────────────────────

// Manifest returns the plugin metadata used by the workflow engine for
// discovery and capability negotiation.
func (p *eventbusPlugin) Manifest() sdk.PluginManifest {
return sdk.PluginManifest{
Name: "workflow-plugin-eventbus",
Version: version,
Author: "GoCodeAlone",
Description: "Provisions durable event-bus clusters (NATS / Kafka / Kinesis) as IaC and exposes typed pipeline steps for publish / consume operations.",
}
}

// ── TypedModuleProvider ───────────────────────────────────────────────────────

// moduleFactories is the ordered list of TypedModuleProvider instances, one per
// module type family.
var moduleFactories = []sdk.TypedModuleProvider{
&eventbus.ClusterModuleFactory{},
&eventbus.StreamModuleFactory{},
&eventbus.ConsumerModuleFactory{},
&eventbus.SubscribeTriggerModuleFactory{},
}

// TypedModuleTypes returns all module types served by this plugin, including the
// trigger.eventbus.subscribe type which is exposed as a module in the gRPC path.
func (p *eventbusPlugin) TypedModuleTypes() []string {
types := make([]string, 0, len(moduleFactories))
for _, f := range moduleFactories {
types = append(types, f.TypedModuleTypes()...)
}
return types
}

// CreateTypedModule routes the create request to the appropriate factory.
func (p *eventbusPlugin) CreateTypedModule(typeName, name string, config *anypb.Any) (sdk.ModuleInstance, error) {
for _, f := range moduleFactories {
inst, err := f.CreateTypedModule(typeName, name, config)
if err == nil {
return inst, nil
}
if !errors.Is(err, sdk.ErrTypedContractNotHandled) {
return nil, err
}
}
return nil, fmt.Errorf("workflow-plugin-eventbus: unknown module type %q", typeName)
}

// ── TypedStepProvider ─────────────────────────────────────────────────────────

// stepFactories is the ordered list of TypedStepProvider instances.
var stepFactories = []sdk.TypedStepProvider{
steps.PublishFactory,
steps.ConsumeFactory,
steps.AckFactory,
}

// TypedStepTypes returns all step types served by this plugin.
func (p *eventbusPlugin) TypedStepTypes() []string {
types := make([]string, 0, len(stepFactories))
for _, f := range stepFactories {
types = append(types, f.TypedStepTypes()...)
}
return types
}

// CreateTypedStep routes the create request to the appropriate factory.
func (p *eventbusPlugin) CreateTypedStep(typeName, name string, config *anypb.Any) (sdk.StepInstance, error) {
for _, f := range stepFactories {
inst, err := f.CreateTypedStep(typeName, name, config)
if err == nil {
return inst, nil
}
if !errors.Is(err, sdk.ErrTypedContractNotHandled) {
return nil, err
}
}
return nil, fmt.Errorf("workflow-plugin-eventbus: unknown step type %q", typeName)
}

// ── TriggerProvider ───────────────────────────────────────────────────────────

// TriggerTypes returns the trigger type names this plugin provides.
func (p *eventbusPlugin) TriggerTypes() []string {
return []string{"trigger.eventbus.subscribe"}
}

// CreateTrigger creates a trigger instance for the trigger.eventbus.subscribe type.
// In the external plugin gRPC path the callback client is never wired, so cb is
// always nil and Start is a no-op. The trigger module is created via
// CreateTypedModule in that path; this method exists for the legacy TriggerProvider
// interface.
func (p *eventbusPlugin) CreateTrigger(typeName string, config map[string]any, cb sdk.TriggerCallback) (sdk.TriggerInstance, error) {
if typeName != "trigger.eventbus.subscribe" {
return nil, fmt.Errorf("workflow-plugin-eventbus: unknown trigger type %q", typeName)
}
// Perform explicit type assertions so callers get a clear error when a field
// is present but has the wrong type (e.g. config["name"] = 42 gives
// "config[name] must be a string, got int" rather than "config.name is required").
name, err := configString(config, "name")
if err != nil {
return nil, fmt.Errorf("workflow-plugin-eventbus: CreateTrigger %q: %w", typeName, err)
}
streamName, err := configString(config, "stream_name")
if err != nil {
return nil, fmt.Errorf("workflow-plugin-eventbus: CreateTrigger %q: %w", typeName, err)
}
filterSubject, _ := configString(config, "filter_subject") //nolint:errcheck // optional field
cfg := &eventbusv1.ConsumerConfig{
Name: name,
StreamName: streamName,
FilterSubject: filterSubject,
}
inst, err := eventbus.NewSubscribeTrigger(typeName, cfg, cb)
if err != nil {
return nil, err
}
return inst.(sdk.TriggerInstance), nil
}

// configString extracts key from config as a string. Returns an error if the
// key is present but not a string type.
func configString(config map[string]any, key string) (string, error) {
v, ok := config[key]
if !ok {
return "", nil // absent is fine; required-field validation is in NewSubscribeTrigger
}
s, ok := v.(string)
if !ok {
return "", fmt.Errorf("config[%s] must be a string, got %T", key, v)
}
return s, nil
}

// ── ContractProvider ──────────────────────────────────────────────────────────

// ContractRegistry returns the typed contract descriptors for all plugin
// capabilities. These match the entries in plugin.contracts.json and are used
// by the engine for strict-proto contract negotiation.
func (p *eventbusPlugin) ContractRegistry() *pb.ContractRegistry {
strict := pb.ContractMode_CONTRACT_MODE_STRICT_PROTO
return &pb.ContractRegistry{
Contracts: []*pb.ContractDescriptor{
// ── modules ───────────────────────────────────────────────────────
{
Kind: pb.ContractKind_CONTRACT_KIND_MODULE,
ModuleType: "infra.eventbus",
ConfigMessage: "workflow.plugin.eventbus.v1.ClusterConfig",
Mode: strict,
},
{
Kind: pb.ContractKind_CONTRACT_KIND_MODULE,
ModuleType: "infra.eventbus.stream",
ConfigMessage: "workflow.plugin.eventbus.v1.StreamConfig",
Mode: strict,
},
{
Kind: pb.ContractKind_CONTRACT_KIND_MODULE,
ModuleType: "infra.eventbus.consumer",
ConfigMessage: "workflow.plugin.eventbus.v1.ConsumerConfig",
Mode: strict,
},
// ── steps ─────────────────────────────────────────────────────────
{
Kind: pb.ContractKind_CONTRACT_KIND_STEP,
StepType: "step.eventbus.publish",
InputMessage: "workflow.plugin.eventbus.v1.PublishRequest",
OutputMessage: "workflow.plugin.eventbus.v1.PublishResponse",
Mode: strict,
},
{
Kind: pb.ContractKind_CONTRACT_KIND_STEP,
StepType: "step.eventbus.consume",
InputMessage: "workflow.plugin.eventbus.v1.ConsumeRequest",
OutputMessage: "workflow.plugin.eventbus.v1.ConsumeResponse",
Mode: strict,
},
{
Kind: pb.ContractKind_CONTRACT_KIND_STEP,
StepType: "step.eventbus.ack",
InputMessage: "workflow.plugin.eventbus.v1.AckRequest",
OutputMessage: "workflow.plugin.eventbus.v1.AckResponse",
Mode: strict,
},
// ── triggers ──────────────────────────────────────────────────────
{
Kind: pb.ContractKind_CONTRACT_KIND_TRIGGER,
TriggerType: "trigger.eventbus.subscribe",
ConfigMessage: "workflow.plugin.eventbus.v1.ConsumerConfig",
OutputMessage: "workflow.plugin.eventbus.v1.Message",
Mode: strict,
},
},
}
}
129 changes: 129 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package eventbus

import (
"context"
"fmt"
"sync"

"google.golang.org/protobuf/types/known/anypb"

eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen"
sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk"
)

// ── consumer registry ─────────────────────────────────────────────────────────

var (
consumerMu sync.RWMutex
consumerRegistry = make(map[string]*eventbusv1.ConsumerConfig)
)

// RegisterConsumer stores a ConsumerConfig in the global registry under instanceName.
func RegisterConsumer(instanceName string, cfg *eventbusv1.ConsumerConfig) {
consumerMu.Lock()
defer consumerMu.Unlock()
consumerRegistry[instanceName] = cfg
}

// GetConsumer looks up a ConsumerConfig by instance name.
func GetConsumer(instanceName string) (*eventbusv1.ConsumerConfig, bool) {
consumerMu.RLock()
defer consumerMu.RUnlock()
cfg, ok := consumerRegistry[instanceName]
return cfg, ok
}

// UnregisterConsumer removes a ConsumerConfig from the registry.
func UnregisterConsumer(instanceName string) {
consumerMu.Lock()
defer consumerMu.Unlock()
delete(consumerRegistry, instanceName)
}

// GetConsumerByName looks up a ConsumerConfig by its durable consumer name
// (cfg.name), iterating all registered instances. This is used by
// step.eventbus.consume to resolve the consumer config from the durable name
// supplied in ConsumeRequest.consumer.
func GetConsumerByName(durableName string) (*eventbusv1.ConsumerConfig, bool) {
consumerMu.RLock()
defer consumerMu.RUnlock()
for _, cfg := range consumerRegistry {
if cfg.GetName() == durableName {
return cfg, true
}
}
return nil, false
}

// ── ConsumerModuleFactory (TypedModuleProvider) ───────────────────────────────

// ConsumerModuleFactory implements sdk.TypedModuleProvider for the
// infra.eventbus.consumer module type.
type ConsumerModuleFactory struct{}

// Compile-time assertion: ConsumerModuleFactory implements sdk.TypedModuleProvider.
var _ sdk.TypedModuleProvider = (*ConsumerModuleFactory)(nil)

// TypedModuleTypes returns the single module type served by this factory.
func (f *ConsumerModuleFactory) TypedModuleTypes() []string {
return []string{"infra.eventbus.consumer"}
}

// CreateTypedModule unpacks the typed proto config and delegates to NewConsumerModule.
func (f *ConsumerModuleFactory) CreateTypedModule(typeName, name string, config *anypb.Any) (sdk.ModuleInstance, error) {
if typeName != "infra.eventbus.consumer" {
return nil, fmt.Errorf("%w: module type %q", sdk.ErrTypedContractNotHandled, typeName)
}
var cfg eventbusv1.ConsumerConfig
if config != nil {
if err := config.UnmarshalTo(&cfg); err != nil {
return nil, fmt.Errorf("infra.eventbus.consumer %q: unmarshal typed config: %w", name, err)
}
}
return NewConsumerModule(name, &cfg)
}

// ── consumerModule (ModuleInstance) ──────────────────────────────────────────

// consumerModule implements sdk.ModuleInstance for the infra.eventbus.consumer
// module type. It declares a durable JetStream consumer (or Kafka consumer group)
// and registers its config for use by step and trigger modules. No background
// goroutines are started — consumption is pull-based, driven by step execution.
type consumerModule struct {
instanceName string
config *eventbusv1.ConsumerConfig
}

// Compile-time assertion: consumerModule implements sdk.ModuleInstance.
var _ sdk.ModuleInstance = (*consumerModule)(nil)

// NewConsumerModule creates a consumerModule from a typed ConsumerConfig proto.
//
// Returns an error if:
// - config.name is empty
// - config.stream_name is empty
func NewConsumerModule(instanceName string, cfg *eventbusv1.ConsumerConfig) (sdk.ModuleInstance, error) {
if cfg.GetName() == "" {
return nil, fmt.Errorf("infra.eventbus.consumer %q: config.name is required", instanceName)
}
if cfg.GetStreamName() == "" {
return nil, fmt.Errorf("infra.eventbus.consumer %q: config.stream_name is required", instanceName)
}
return &consumerModule{instanceName: instanceName, config: cfg}, nil
}

// Init registers the consumer config in the global registry.
func (m *consumerModule) Init() error {
RegisterConsumer(m.instanceName, m.config)
return nil
}

// Start is a no-op for the consumer module. Pull-based consumption has no
// background goroutines — the step.eventbus.consume step drives fetch calls.
func (m *consumerModule) Start(_ context.Context) error { return nil }

// Stop unregisters the consumer config from the global registry.
func (m *consumerModule) Stop(_ context.Context) error {
UnregisterConsumer(m.instanceName)
return nil
}
Loading