Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/wfctl/type_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func KnownStepTypes() map[string]StepTypeInfo {
"step.event_publish": {
Type: "step.event_publish",
Plugin: "pipelinesteps",
ConfigKeys: []string{"topic", "broker", "payload", "headers", "event_type"},
ConfigKeys: []string{"topic", "stream", "broker", "provider", "payload", "data", "headers", "event_type", "source"},
},
"step.http_call": {
Type: "step.http_call",
Expand Down
14 changes: 9 additions & 5 deletions mcp/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,14 +518,18 @@ func knownStepTypeDescriptions() map[string]stepTypeInfoFull {
"step.event_publish": {
Type: "step.event_publish",
Plugin: "pipelinesteps",
Description: "Publishes a structured event to a messaging broker topic.",
ConfigKeys: []string{"topic", "broker", "payload", "headers", "event_type"},
Description: "Publishes a structured event in CloudEvents format to a messaging broker or EventPublisher.",
ConfigKeys: []string{"topic", "stream", "broker", "provider", "payload", "data", "headers", "event_type", "source"},
ConfigDefs: []stepConfigKeyDef{
{Key: "topic", Type: "string", Description: "Topic name to publish to", Required: true},
{Key: "topic", Type: "string", Description: "Topic name to publish to (or use 'stream' alias)", Required: true},
{Key: "stream", Type: "string", Description: "Alias for topic — stream name (e.g., Kinesis stream)"},
{Key: "broker", Type: "string", Description: "Messaging broker module name"},
{Key: "payload", Type: "string|map", Description: "Event payload (template expressions supported)"},
{Key: "provider", Type: "string", Description: "Alias for broker — EventPublisher or MessageBroker service name"},
{Key: "payload", Type: "map", Description: "Event payload (template expressions supported)"},
{Key: "data", Type: "map", Description: "Alias for payload — event data fields"},
{Key: "headers", Type: "map", Description: "Event headers"},
{Key: "event_type", Type: "string", Description: "Event type identifier"},
{Key: "event_type", Type: "string", Description: "CloudEvents type identifier"},
{Key: "source", Type: "string", Description: "CloudEvents source URI (template expressions supported)"},
},
},
"step.cache_get": {
Expand Down
18 changes: 18 additions & 0 deletions module/event_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package module

import "context"

// EventPublisher is a generic interface for publishing structured events.
// It provides a high-level abstraction over various messaging backends
// (Kafka, NATS, Kinesis, SQS, in-memory, etc.) and external plugins
// such as the Bento plugin (workflow-plugin-bento).
//
// Services implementing this interface can be registered with the application
// and referenced by name in step.event_publish configurations via the
// "provider" or "broker" config fields.
type EventPublisher interface {
// PublishEvent publishes a structured event to the given topic/stream.
// The event map typically follows the CloudEvents envelope format with
// fields like specversion, type, source, id, time, and data.
PublishEvent(ctx context.Context, topic string, event map[string]any) error
}
84 changes: 74 additions & 10 deletions module/pipeline_step_event_publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,36 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/CrisisTextLine/modular"
"github.com/CrisisTextLine/modular/modules/eventbus/v2"
"github.com/google/uuid"
)

// EventPublishStep publishes events to a messaging broker or EventBus from pipeline execution.
// EventPublishStep publishes events to a messaging broker, EventPublisher, or EventBus
// from pipeline execution. It supports CloudEvents envelope format and multiple
// provider backends including external plugins (e.g., Bento).
type EventPublishStep struct {
name string
topic string
payload map[string]any
headers map[string]string
eventType string
broker string // optional service name for a MessageBroker
source string
broker string // service name for a MessageBroker or EventPublisher
app modular.Application
tmpl *TemplateEngine
}

// NewEventPublishStepFactory returns a StepFactory that creates EventPublishStep instances.
func NewEventPublishStepFactory() StepFactory {
return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) {
// Support "stream" as alias for "topic"
topic, _ := config["topic"].(string)
if topic == "" {
topic, _ = config["stream"].(string)
}
if topic == "" {
return nil, fmt.Errorf("event_publish step %q: 'topic' is required", name)
}
Expand All @@ -36,8 +45,11 @@ func NewEventPublishStepFactory() StepFactory {
tmpl: NewTemplateEngine(),
}

// Support "data" as alias for "payload"
if payload, ok := config["payload"].(map[string]any); ok {
step.payload = payload
} else if data, ok := config["data"].(map[string]any); ok {
step.payload = data
}

if headers, ok := config["headers"].(map[string]any); ok {
Expand All @@ -50,7 +62,13 @@ func NewEventPublishStepFactory() StepFactory {
}

step.eventType, _ = config["event_type"].(string)
step.source, _ = config["source"].(string)

// Support "provider" as alias for "broker"
step.broker, _ = config["broker"].(string)
if step.broker == "" {
step.broker, _ = config["provider"].(string)
}

return step, nil
}
Expand All @@ -59,7 +77,7 @@ func NewEventPublishStepFactory() StepFactory {
// Name returns the step name.
func (s *EventPublishStep) Name() string { return s.name }

// Execute resolves templates in topic, payload, and headers then publishes the event.
// Execute resolves templates in topic, payload, source, and headers then publishes the event.
func (s *EventPublishStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) {
resolvedTopic, err := s.tmpl.Resolve(s.topic, pc)
if err != nil {
Expand All @@ -86,33 +104,79 @@ func (s *EventPublishStep) Execute(ctx context.Context, pc *PipelineContext) (*S
}
}

// Build event envelope when event_type or headers are present
event := s.buildEventEnvelope(resolvedPayload, resolvedHeaders)
// Resolve source template if configured
var resolvedSource string
if s.source != "" {
resolvedSource, err = s.tmpl.Resolve(s.source, pc)
if err != nil {
return nil, fmt.Errorf("event_publish step %q: failed to resolve source: %w", s.name, err)
}
}

// Build event envelope for broker/EventPublisher paths
event := s.buildEventEnvelope(resolvedPayload, resolvedHeaders, resolvedSource)

if s.broker != "" {
// Try EventPublisher interface first (supports external plugins like Bento)
if pub := s.tryGetEventPublisher(); pub != nil {
return s.publishViaEventPublisher(ctx, resolvedTopic, event, pub)
}
// Fall back to MessageBroker interface
return s.publishViaBroker(resolvedTopic, event)
}

return s.publishViaEventBus(ctx, resolvedTopic, event)
// The EventBus module builds its own CloudEvents envelope internally,
// so pass the resolved payload directly (not the pre-built envelope).
return s.publishViaEventBus(ctx, resolvedTopic, resolvedPayload)
}

// buildEventEnvelope wraps the payload with event_type and headers metadata when present.
func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string) map[string]any {
if s.eventType == "" && len(headers) == 0 {
// tryGetEventPublisher attempts to resolve the broker service as an EventPublisher.
// Returns nil if the service does not implement EventPublisher.
func (s *EventPublishStep) tryGetEventPublisher() (pub EventPublisher) {
defer func() {
if r := recover(); r != nil {
pub = nil
}
}()
if err := s.app.GetService(s.broker, &pub); err != nil || pub == nil {
return nil
}
return pub
}

// buildEventEnvelope wraps the payload in an envelope for publishing.
// When both event_type and source are configured, a full CloudEvents 1.0-compatible
// envelope is emitted with specversion, type, source, id, time, and data fields.
// When only headers are provided (without event_type/source), the payload is
// wrapped as {data, headers} without adding CloudEvents-required attributes.
func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string) map[string]any {
if s.eventType == "" && resolvedSource == "" && len(headers) == 0 {
return payload
}
envelope := map[string]any{
"data": payload,
}
if s.eventType != "" {
// Only emit a CloudEvents envelope when both required attributes are present.
if s.eventType != "" && resolvedSource != "" {
envelope["specversion"] = "1.0"
envelope["id"] = uuid.New().String()
envelope["time"] = time.Now().UTC().Format(time.RFC3339)
envelope["type"] = s.eventType
envelope["source"] = resolvedSource
}
if len(headers) > 0 {
envelope["headers"] = headers
}
return envelope
}

func (s *EventPublishStep) publishViaEventPublisher(ctx context.Context, topic string, event map[string]any, pub EventPublisher) (*StepResult, error) {
if err := pub.PublishEvent(ctx, topic, event); err != nil {
return nil, fmt.Errorf("event_publish step %q: failed to publish via provider: %w", s.name, err)
}
return &StepResult{Output: map[string]any{"published": true, "topic": topic}}, nil
}

func (s *EventPublishStep) publishViaBroker(topic string, payload map[string]any) (*StepResult, error) {
var broker MessageBroker
if err := s.app.GetService(s.broker, &broker); err != nil {
Expand Down
Loading
Loading