diff --git a/cmd/wfctl/type_registry.go b/cmd/wfctl/type_registry.go index ec59006d..6350eacc 100644 --- a/cmd/wfctl/type_registry.go +++ b/cmd/wfctl/type_registry.go @@ -570,7 +570,7 @@ func KnownStepTypes() map[string]StepTypeInfo { "step.event_publish": { Type: "step.event_publish", Plugin: "pipelinesteps", - ConfigKeys: []string{"topic", "broker", "payload", "headers", "event_type"}, + ConfigKeys: []string{"topic", "stream", "broker", "provider", "payload", "data", "headers", "event_type", "source"}, }, "step.http_call": { Type: "step.http_call", diff --git a/mcp/tools.go b/mcp/tools.go index 2a2f4979..9d3cb104 100644 --- a/mcp/tools.go +++ b/mcp/tools.go @@ -518,14 +518,18 @@ func knownStepTypeDescriptions() map[string]stepTypeInfoFull { "step.event_publish": { Type: "step.event_publish", Plugin: "pipelinesteps", - Description: "Publishes a structured event to a messaging broker topic.", - ConfigKeys: []string{"topic", "broker", "payload", "headers", "event_type"}, + Description: "Publishes a structured event in CloudEvents format to a messaging broker or EventPublisher.", + ConfigKeys: []string{"topic", "stream", "broker", "provider", "payload", "data", "headers", "event_type", "source"}, ConfigDefs: []stepConfigKeyDef{ - {Key: "topic", Type: "string", Description: "Topic name to publish to", Required: true}, + {Key: "topic", Type: "string", Description: "Topic name to publish to (or use 'stream' alias)", Required: true}, + {Key: "stream", Type: "string", Description: "Alias for topic — stream name (e.g., Kinesis stream)"}, {Key: "broker", Type: "string", Description: "Messaging broker module name"}, - {Key: "payload", Type: "string|map", Description: "Event payload (template expressions supported)"}, + {Key: "provider", Type: "string", Description: "Alias for broker — EventPublisher or MessageBroker service name"}, + {Key: "payload", Type: "map", Description: "Event payload (template expressions supported)"}, + {Key: "data", Type: "map", Description: "Alias for payload — event data fields"}, {Key: "headers", Type: "map", Description: "Event headers"}, - {Key: "event_type", Type: "string", Description: "Event type identifier"}, + {Key: "event_type", Type: "string", Description: "CloudEvents type identifier"}, + {Key: "source", Type: "string", Description: "CloudEvents source URI (template expressions supported)"}, }, }, "step.cache_get": { diff --git a/module/event_publisher.go b/module/event_publisher.go new file mode 100644 index 00000000..645299e3 --- /dev/null +++ b/module/event_publisher.go @@ -0,0 +1,18 @@ +package module + +import "context" + +// EventPublisher is a generic interface for publishing structured events. +// It provides a high-level abstraction over various messaging backends +// (Kafka, NATS, Kinesis, SQS, in-memory, etc.) and external plugins +// such as the Bento plugin (workflow-plugin-bento). +// +// Services implementing this interface can be registered with the application +// and referenced by name in step.event_publish configurations via the +// "provider" or "broker" config fields. +type EventPublisher interface { + // PublishEvent publishes a structured event to the given topic/stream. + // The event map typically follows the CloudEvents envelope format with + // fields like specversion, type, source, id, time, and data. + PublishEvent(ctx context.Context, topic string, event map[string]any) error +} diff --git a/module/pipeline_step_event_publish.go b/module/pipeline_step_event_publish.go index 0a1d90dd..724ceb78 100644 --- a/module/pipeline_step_event_publish.go +++ b/module/pipeline_step_event_publish.go @@ -4,19 +4,24 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/CrisisTextLine/modular" "github.com/CrisisTextLine/modular/modules/eventbus/v2" + "github.com/google/uuid" ) -// EventPublishStep publishes events to a messaging broker or EventBus from pipeline execution. +// EventPublishStep publishes events to a messaging broker, EventPublisher, or EventBus +// from pipeline execution. It supports CloudEvents envelope format and multiple +// provider backends including external plugins (e.g., Bento). type EventPublishStep struct { name string topic string payload map[string]any headers map[string]string eventType string - broker string // optional service name for a MessageBroker + source string + broker string // service name for a MessageBroker or EventPublisher app modular.Application tmpl *TemplateEngine } @@ -24,7 +29,11 @@ type EventPublishStep struct { // NewEventPublishStepFactory returns a StepFactory that creates EventPublishStep instances. func NewEventPublishStepFactory() StepFactory { return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { + // Support "stream" as alias for "topic" topic, _ := config["topic"].(string) + if topic == "" { + topic, _ = config["stream"].(string) + } if topic == "" { return nil, fmt.Errorf("event_publish step %q: 'topic' is required", name) } @@ -36,8 +45,11 @@ func NewEventPublishStepFactory() StepFactory { tmpl: NewTemplateEngine(), } + // Support "data" as alias for "payload" if payload, ok := config["payload"].(map[string]any); ok { step.payload = payload + } else if data, ok := config["data"].(map[string]any); ok { + step.payload = data } if headers, ok := config["headers"].(map[string]any); ok { @@ -50,7 +62,13 @@ func NewEventPublishStepFactory() StepFactory { } step.eventType, _ = config["event_type"].(string) + step.source, _ = config["source"].(string) + + // Support "provider" as alias for "broker" step.broker, _ = config["broker"].(string) + if step.broker == "" { + step.broker, _ = config["provider"].(string) + } return step, nil } @@ -59,7 +77,7 @@ func NewEventPublishStepFactory() StepFactory { // Name returns the step name. func (s *EventPublishStep) Name() string { return s.name } -// Execute resolves templates in topic, payload, and headers then publishes the event. +// Execute resolves templates in topic, payload, source, and headers then publishes the event. func (s *EventPublishStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { resolvedTopic, err := s.tmpl.Resolve(s.topic, pc) if err != nil { @@ -86,26 +104,65 @@ func (s *EventPublishStep) Execute(ctx context.Context, pc *PipelineContext) (*S } } - // Build event envelope when event_type or headers are present - event := s.buildEventEnvelope(resolvedPayload, resolvedHeaders) + // Resolve source template if configured + var resolvedSource string + if s.source != "" { + resolvedSource, err = s.tmpl.Resolve(s.source, pc) + if err != nil { + return nil, fmt.Errorf("event_publish step %q: failed to resolve source: %w", s.name, err) + } + } + + // Build event envelope for broker/EventPublisher paths + event := s.buildEventEnvelope(resolvedPayload, resolvedHeaders, resolvedSource) if s.broker != "" { + // Try EventPublisher interface first (supports external plugins like Bento) + if pub := s.tryGetEventPublisher(); pub != nil { + return s.publishViaEventPublisher(ctx, resolvedTopic, event, pub) + } + // Fall back to MessageBroker interface return s.publishViaBroker(resolvedTopic, event) } - return s.publishViaEventBus(ctx, resolvedTopic, event) + // The EventBus module builds its own CloudEvents envelope internally, + // so pass the resolved payload directly (not the pre-built envelope). + return s.publishViaEventBus(ctx, resolvedTopic, resolvedPayload) } -// buildEventEnvelope wraps the payload with event_type and headers metadata when present. -func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string) map[string]any { - if s.eventType == "" && len(headers) == 0 { +// tryGetEventPublisher attempts to resolve the broker service as an EventPublisher. +// Returns nil if the service does not implement EventPublisher. +func (s *EventPublishStep) tryGetEventPublisher() (pub EventPublisher) { + defer func() { + if r := recover(); r != nil { + pub = nil + } + }() + if err := s.app.GetService(s.broker, &pub); err != nil || pub == nil { + return nil + } + return pub +} + +// buildEventEnvelope wraps the payload in an envelope for publishing. +// When both event_type and source are configured, a full CloudEvents 1.0-compatible +// envelope is emitted with specversion, type, source, id, time, and data fields. +// When only headers are provided (without event_type/source), the payload is +// wrapped as {data, headers} without adding CloudEvents-required attributes. +func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string) map[string]any { + if s.eventType == "" && resolvedSource == "" && len(headers) == 0 { return payload } envelope := map[string]any{ "data": payload, } - if s.eventType != "" { + // Only emit a CloudEvents envelope when both required attributes are present. + if s.eventType != "" && resolvedSource != "" { + envelope["specversion"] = "1.0" + envelope["id"] = uuid.New().String() + envelope["time"] = time.Now().UTC().Format(time.RFC3339) envelope["type"] = s.eventType + envelope["source"] = resolvedSource } if len(headers) > 0 { envelope["headers"] = headers @@ -113,6 +170,13 @@ func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers ma return envelope } +func (s *EventPublishStep) publishViaEventPublisher(ctx context.Context, topic string, event map[string]any, pub EventPublisher) (*StepResult, error) { + if err := pub.PublishEvent(ctx, topic, event); err != nil { + return nil, fmt.Errorf("event_publish step %q: failed to publish via provider: %w", s.name, err) + } + return &StepResult{Output: map[string]any{"published": true, "topic": topic}}, nil +} + func (s *EventPublishStep) publishViaBroker(topic string, payload map[string]any) (*StepResult, error) { var broker MessageBroker if err := s.app.GetService(s.broker, &broker); err != nil { diff --git a/module/pipeline_step_event_publish_test.go b/module/pipeline_step_event_publish_test.go index 1c3ae1b5..cd62744a 100644 --- a/module/pipeline_step_event_publish_test.go +++ b/module/pipeline_step_event_publish_test.go @@ -188,6 +188,8 @@ func TestEventPublishStep_EventTypeEnvelope(t *testing.T) { app := mockAppWithBroker("bus", broker) factory := NewEventPublishStepFactory() + // Without source, only event_type is insufficient for a full CloudEvents envelope. + // The step wraps as {data: payload} without CloudEvents-required fields. step, err := factory("pub-typed", map[string]any{ "topic": "events", "broker": "bus", @@ -210,9 +212,14 @@ func TestEventPublishStep_EventTypeEnvelope(t *testing.T) { if err := json.Unmarshal(broker.producer.published[0].message, &envelope); err != nil { t.Fatalf("failed to unmarshal: %v", err) } - if envelope["type"] != "order.created" { - t.Errorf("expected type=order.created, got %v", envelope["type"]) + // Without source, CloudEvents required fields (specversion, id, time, type, source) are not added. + if _, ok := envelope["specversion"]; ok { + t.Error("expected no specversion when source is not set") } + if _, ok := envelope["type"]; ok { + t.Error("expected no type when source is not set (incomplete CloudEvents config)") + } + // But the payload is still wrapped under "data" if _, ok := envelope["data"]; !ok { t.Error("expected data field in envelope") } @@ -268,3 +275,272 @@ func TestEventPublishStep_NoBrokerNorEventBus(t *testing.T) { t.Errorf("expected eventbus not available error, got: %v", err) } } + +func TestEventPublishStep_CloudEventsEnvelope(t *testing.T) { + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-cloudevents", map[string]any{ + "topic": "messaging.texter-messages", + "broker": "bus", + "event_type": "messaging.texter-message.received", + "source": "/chimera/messaging", + "payload": map[string]any{ + "messageId": "msg-1", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var envelope map[string]any + if err := json.Unmarshal(broker.producer.published[0].message, &envelope); err != nil { + t.Fatalf("failed to unmarshal: %v", err) + } + if envelope["specversion"] != "1.0" { + t.Errorf("expected specversion=1.0, got %v", envelope["specversion"]) + } + if envelope["type"] != "messaging.texter-message.received" { + t.Errorf("expected type=messaging.texter-message.received, got %v", envelope["type"]) + } + if envelope["source"] != "/chimera/messaging" { + t.Errorf("expected source=/chimera/messaging, got %v", envelope["source"]) + } + if _, ok := envelope["id"].(string); !ok || envelope["id"] == "" { + t.Errorf("expected non-empty id string, got %v", envelope["id"]) + } + if _, ok := envelope["time"].(string); !ok || envelope["time"] == "" { + t.Errorf("expected non-empty time string, got %v", envelope["time"]) + } + data, ok := envelope["data"].(map[string]any) + if !ok { + t.Fatal("expected data field in envelope") + } + if data["messageId"] != "msg-1" { + t.Errorf("expected messageId=msg-1, got %v", data["messageId"]) + } +} + +func TestEventPublishStep_StreamAlias(t *testing.T) { + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-stream", map[string]any{ + "stream": "messaging.texter-messages", + "broker": "bus", + "payload": map[string]any{ + "id": "1", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["topic"] != "messaging.texter-messages" { + t.Errorf("expected topic=messaging.texter-messages, got %v", result.Output["topic"]) + } +} + +func TestEventPublishStep_DataAlias(t *testing.T) { + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-data", map[string]any{ + "topic": "events", + "broker": "bus", + "data": map[string]any{ + "texterId": "42", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var payload map[string]any + if err := json.Unmarshal(broker.producer.published[0].message, &payload); err != nil { + t.Fatalf("failed to unmarshal: %v", err) + } + if payload["texterId"] != "42" { + t.Errorf("expected texterId=42, got %v", payload["texterId"]) + } +} + +func TestEventPublishStep_ProviderAlias(t *testing.T) { + broker := newMockBroker() + app := mockAppWithBroker("kinesis-provider", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-provider", map[string]any{ + "topic": "events", + "provider": "kinesis-provider", + "payload": map[string]any{ + "id": "1", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["published"] != true { + t.Errorf("expected published=true, got %v", result.Output["published"]) + } +} + +// mockEventPublisher implements EventPublisher for testing. +type mockEventPublisher struct { + published []struct { + topic string + event map[string]any + } + publishErr error +} + +func (p *mockEventPublisher) PublishEvent(_ context.Context, topic string, event map[string]any) error { + if p.publishErr != nil { + return p.publishErr + } + p.published = append(p.published, struct { + topic string + event map[string]any + }{topic, event}) + return nil +} + +func TestEventPublishStep_EventPublisherInterface(t *testing.T) { + pub := &mockEventPublisher{} + app := NewMockApplication() + app.Services["bento-output"] = pub + + factory := NewEventPublishStepFactory() + step, err := factory("pub-ep", map[string]any{ + "topic": "events.processed", + "provider": "bento-output", + "event_type": "order.shipped", + "source": "/api/orders", + "payload": map[string]any{ + "orderId": "ORD-99", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["published"] != true { + t.Errorf("expected published=true") + } + + if len(pub.published) != 1 { + t.Fatalf("expected 1 published event, got %d", len(pub.published)) + } + if pub.published[0].topic != "events.processed" { + t.Errorf("expected topic=events.processed, got %v", pub.published[0].topic) + } + event := pub.published[0].event + if event["specversion"] != "1.0" { + t.Errorf("expected specversion=1.0, got %v", event["specversion"]) + } + if event["type"] != "order.shipped" { + t.Errorf("expected type=order.shipped, got %v", event["type"]) + } + if event["source"] != "/api/orders" { + t.Errorf("expected source=/api/orders, got %v", event["source"]) + } + data, ok := event["data"].(map[string]any) + if !ok { + t.Fatal("expected data in event") + } + if data["orderId"] != "ORD-99" { + t.Errorf("expected orderId=ORD-99, got %v", data["orderId"]) + } +} + +func TestEventPublishStep_SourceTemplateResolution(t *testing.T) { + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-src-tmpl", map[string]any{ + "topic": "events", + "broker": "bus", + "event_type": "test.event", + "source": "/api/{{ .service }}", + "payload": map[string]any{ + "id": "1", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{"service": "orders"}, nil) + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var envelope map[string]any + if err := json.Unmarshal(broker.producer.published[0].message, &envelope); err != nil { + t.Fatalf("failed to unmarshal: %v", err) + } + if envelope["source"] != "/api/orders" { + t.Errorf("expected source=/api/orders, got %v", envelope["source"]) + } +} + +func TestEventPublishStep_SourceTemplateError(t *testing.T) { + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-src-err", map[string]any{ + "topic": "events", + "broker": "bus", + "event_type": "test.event", + "source": "/api/{{ .service", // malformed template + "payload": map[string]any{ + "id": "1", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error when source template resolution fails") + } + if !strings.Contains(err.Error(), "failed to resolve source") { + t.Errorf("expected 'failed to resolve source' error, got: %v", err) + } +} diff --git a/schema/module_schema.go b/schema/module_schema.go index 2ddfe42f..008a7018 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1811,13 +1811,17 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.event_publish", Label: "Event Publish", Category: "pipeline_steps", - Description: "Publishes an event to a messaging broker or event bus", + Description: "Publishes a structured event in CloudEvents format to a messaging broker, EventPublisher, or event bus", ConfigFields: []ConfigFieldDef{ - {Key: "topic", Label: "Topic", Type: FieldTypeString, Required: true, Description: "Topic or channel to publish the event to", Placeholder: "user-events"}, + {Key: "topic", Label: "Topic", Type: FieldTypeString, Description: "Topic or channel to publish the event to (also accepts 'stream' alias)", Placeholder: "user-events"}, + {Key: "stream", Label: "Stream", Type: FieldTypeString, Description: "Alias for 'topic' — name of the stream to publish to (e.g., Kinesis stream name)", Placeholder: "messaging.texter-messages"}, {Key: "payload", Label: "Payload", Type: FieldTypeJSON, Description: "Event payload as a JSON object (supports template expressions); defaults to current pipeline context"}, + {Key: "data", Label: "Data", Type: FieldTypeJSON, Description: "Alias for 'payload' — event data fields (supports template expressions)"}, {Key: "headers", Label: "Headers", Type: FieldTypeJSON, Description: "Additional headers/metadata to include with the event as a JSON object"}, - {Key: "event_type", Label: "Event Type", Type: FieldTypeString, Description: "Optional event type identifier to include with the message", Placeholder: "user.created"}, + {Key: "event_type", Label: "Event Type", Type: FieldTypeString, Description: "CloudEvents type identifier (e.g., messaging.texter-message.received)", Placeholder: "user.created"}, + {Key: "source", Label: "Source", Type: FieldTypeString, Description: "CloudEvents source URI identifying the event producer (supports template expressions)", Placeholder: "/chimera/messaging"}, {Key: "broker", Label: "Broker", Type: FieldTypeString, Description: "Name of the messaging broker module to use (falls back to eventbus if not set)"}, + {Key: "provider", Label: "Provider", Type: FieldTypeString, Description: "Alias for 'broker' — name of the EventPublisher or MessageBroker service (e.g., kinesis, bento-output)"}, }, }) diff --git a/schema/snippets.go b/schema/snippets.go index fccd259c..2c7ed521 100644 --- a/schema/snippets.go +++ b/schema/snippets.go @@ -288,14 +288,16 @@ func GetSnippets() []Snippet { { Name: "Step: Event Publish", Prefix: "step-event-publish", - Description: "Publish an event to the message broker", + Description: "Publish a CloudEvents-formatted event to a message broker or EventPublisher", Body: []string{ "- type: step.event_publish", " config:", - " broker: ${1:broker}", - " topic: ${2:events.created}", + " provider: ${1:broker}", + " stream: ${2:messaging.events}", " event_type: ${3:resource.created}", - " payload: ${4:{{ json . }}}", + " source: ${4:/api/resource}", + " data:", + " id: ${5:{{ .id }}}", }, }, {