Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions engine_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"
"testing"

"github.com/GoCodeAlone/modular"
"github.com/GoCodeAlone/workflow/handlers"
"github.com/GoCodeAlone/workflow/module"
pluginpipeline "github.com/GoCodeAlone/workflow/plugins/pipelinesteps"
Expand Down Expand Up @@ -329,6 +330,102 @@ func TestPipeline_ConfigurePipelines_InlineEventTrigger(t *testing.T) {
}
}

func TestPipeline_ConfigurePipelines_InlineEventBusTrigger(t *testing.T) {
// Use a minimal engine without plugins to avoid trigger collisions.
app := newMockApplication()
engine := NewStdEngine(app, app.Logger())
engine.AddStepType("step.log", module.NewLogStepFactory())
pipelineHandler := handlers.NewPipelineWorkflowHandler()
engine.RegisterWorkflowHandler(pipelineHandler)

// Register a config-capturing mock trigger that responds to "eventbus" type.
var capturedConfig any
ct := &configCapturingTrigger{
mockTrigger: mockTrigger{
name: module.EventBusTriggerName,
configType: "eventbus",
},
captureFunc: func(cfg any) { capturedConfig = cfg },
}
engine.RegisterTrigger(ct)

// Register the eventbus wrapper so the flat pipeline config is translated
// into the EventBusTrigger's native subscriptions format.
engine.RegisterTriggerConfigWrapper("eventbus", func(pipelineName string, cfg map[string]any) map[string]any {
sub := map[string]any{
"workflow": "pipeline:" + pipelineName,
"action": "execute",
}
if t, ok := cfg["topic"]; ok {
sub["topic"] = t
}
if ev, ok := cfg["event"]; ok {
sub["event"] = ev
}
if async, ok := cfg["async"]; ok {
sub["async"] = async
}
return map[string]any{"subscriptions": []any{sub}}
})

pipelineCfg := map[string]any{
"process-follow-up-event": map[string]any{
"trigger": map[string]any{
"type": "eventbus",
"config": map[string]any{
"topic": "follow-up.created",
},
},
"steps": []any{
map[string]any{
"name": "log-event",
"type": "step.log",
"config": map[string]any{
"level": "info",
"message": "EventBus event received",
},
},
},
},
}

err := engine.configurePipelines(pipelineCfg)
if err != nil {
t.Fatalf("configurePipelines failed: %v", err)
}

if !pipelineHandler.CanHandle("process-follow-up-event") {
t.Error("expected pipeline to be registered")
}

if !ct.configuredCalled {
t.Error("expected eventbus trigger to be configured")
}

// Verify the flat config was wrapped into the EventBusTrigger subscriptions format.
cfgMap, ok := capturedConfig.(map[string]any)
if !ok {
t.Fatalf("expected map config, got %T", capturedConfig)
}
subs, ok := cfgMap["subscriptions"].([]any)
if !ok || len(subs) != 1 {
t.Fatalf("expected subscriptions slice with 1 entry, got %v", cfgMap)
}
sub, ok := subs[0].(map[string]any)
if !ok {
t.Fatalf("expected subscription map, got %T", subs[0])
}
if sub["workflow"] != "pipeline:process-follow-up-event" {
t.Errorf("unexpected workflow: %v", sub["workflow"])
}
if sub["action"] != "execute" {
t.Errorf("expected action=execute, got %v", sub["action"])
}
if sub["topic"] != "follow-up.created" {
t.Errorf("expected topic=follow-up.created, got %v", sub["topic"])
}
}

func TestPipeline_ConfigurePipelines_RejectsUnknownStepType(t *testing.T) {
engine, _ := setupPipelineEngine(t)

Expand Down Expand Up @@ -528,3 +625,17 @@ func TestPipeline_ConfigurePipelines_NoPipelineHandler(t *testing.T) {
t.Errorf("expected 'no PipelineWorkflowHandler' in error, got: %v", err)
}
}

// configCapturingTrigger wraps mockTrigger and records the config passed to Configure.
type configCapturingTrigger struct {
mockTrigger
captureFunc func(any)
}

func (t *configCapturingTrigger) Configure(app modular.Application, triggerConfig any) error {
t.configuredCalled = true
if t.captureFunc != nil {
t.captureFunc(triggerConfig)
}
return nil
}
19 changes: 19 additions & 0 deletions plugins/messaging/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (p *Plugin) PipelineTriggerConfigWrappers() map[string]plugin.TriggerConfig
"event": func(pipelineName string, cfg map[string]any) map[string]any {
sub := map[string]any{
"workflow": "pipeline:" + pipelineName,
"action": "execute",
}
if t, ok := cfg["topic"]; ok {
sub["topic"] = t
Expand All @@ -160,6 +161,24 @@ func (p *Plugin) PipelineTriggerConfigWrappers() map[string]plugin.TriggerConfig
"subscriptions": []any{sub},
}
},
"eventbus": func(pipelineName string, cfg map[string]any) map[string]any {
sub := map[string]any{
"workflow": "pipeline:" + pipelineName,
"action": "execute",
}
if t, ok := cfg["topic"]; ok {
sub["topic"] = t
}
if ev, ok := cfg["event"]; ok {
sub["event"] = ev
}
if async, ok := cfg["async"]; ok {
sub["async"] = async
}
return map[string]any{
"subscriptions": []any{sub},
}
},
}
}

Expand Down
101 changes: 101 additions & 0 deletions plugins/messaging/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,104 @@ func TestManifestWorkflowTypes(t *testing.T) {
t.Errorf("expected workflow type 'messaging', got %q", m.WorkflowTypes[0])
}
}

func TestPipelineTriggerConfigWrappers(t *testing.T) {
p := New()
wrappers := p.PipelineTriggerConfigWrappers()

for _, triggerType := range []string{"event", "eventbus"} {
if _, ok := wrappers[triggerType]; !ok {
t.Errorf("missing pipeline trigger config wrapper: %s", triggerType)
}
}
}

func TestPipelineTriggerConfigWrapper_Event(t *testing.T) {
p := New()
wrappers := p.PipelineTriggerConfigWrappers()
wrapper, ok := wrappers["event"]
if !ok {
t.Fatal("missing pipeline trigger config wrapper: event")
}

cfg := map[string]any{
"topic": "orders.created",
"event": "order.placed",
}
result := wrapper("my-pipeline", cfg)

subs, ok := result["subscriptions"].([]any)
if !ok || len(subs) != 1 {
t.Fatalf("expected 1 subscription, got %v", result)
}
sub := subs[0].(map[string]any)
if sub["workflow"] != "pipeline:my-pipeline" {
t.Errorf("unexpected workflow: %v", sub["workflow"])
}
if sub["action"] != "execute" {
t.Errorf("expected action=execute, got %v", sub["action"])
}
if sub["topic"] != "orders.created" {
t.Errorf("unexpected topic: %v", sub["topic"])
}
if sub["event"] != "order.placed" {
t.Errorf("unexpected event: %v", sub["event"])
}
}

func TestPipelineTriggerConfigWrapper_EventBus(t *testing.T) {
p := New()
wrappers := p.PipelineTriggerConfigWrappers()
wrapper, ok := wrappers["eventbus"]
if !ok {
t.Fatal("missing pipeline trigger config wrapper: eventbus")
}

cfg := map[string]any{
"topic": "follow-up.created",
"async": true,
}
result := wrapper("process-follow-up", cfg)

subs, ok := result["subscriptions"].([]any)
if !ok || len(subs) != 1 {
t.Fatalf("expected 1 subscription, got %v", result)
}
sub := subs[0].(map[string]any)
if sub["workflow"] != "pipeline:process-follow-up" {
t.Errorf("unexpected workflow: %v", sub["workflow"])
}
if sub["action"] != "execute" {
t.Errorf("expected action=execute, got %v", sub["action"])
}
if sub["topic"] != "follow-up.created" {
t.Errorf("unexpected topic: %v", sub["topic"])
}
if sub["async"] != true {
t.Errorf("expected async=true, got %v", sub["async"])
}
}

func TestPipelineTriggerConfigWrapper_EventBus_WithEventFilter(t *testing.T) {
p := New()
wrappers := p.PipelineTriggerConfigWrappers()
wrapper, ok := wrappers["eventbus"]
if !ok {
t.Fatal("missing pipeline trigger config wrapper: eventbus")
}

cfg := map[string]any{
"topic": "user.events",
"event": "user.registered",
}
result := wrapper("onboard-pipeline", cfg)

subs, ok := result["subscriptions"].([]any)
if !ok || len(subs) != 1 {
t.Fatalf("expected 1 subscription, got %v", result)
}
sub := subs[0].(map[string]any)
if sub["event"] != "user.registered" {
t.Errorf("unexpected event filter: %v", sub["event"])
}
}
Loading