diff --git a/engine_pipeline_test.go b/engine_pipeline_test.go index 565ff05c..d6b59a08 100644 --- a/engine_pipeline_test.go +++ b/engine_pipeline_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/GoCodeAlone/modular" "github.com/GoCodeAlone/workflow/handlers" "github.com/GoCodeAlone/workflow/module" pluginpipeline "github.com/GoCodeAlone/workflow/plugins/pipelinesteps" @@ -329,6 +330,102 @@ func TestPipeline_ConfigurePipelines_InlineEventTrigger(t *testing.T) { } } +func TestPipeline_ConfigurePipelines_InlineEventBusTrigger(t *testing.T) { + // Use a minimal engine without plugins to avoid trigger collisions. + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + engine.AddStepType("step.log", module.NewLogStepFactory()) + pipelineHandler := handlers.NewPipelineWorkflowHandler() + engine.RegisterWorkflowHandler(pipelineHandler) + + // Register a config-capturing mock trigger that responds to "eventbus" type. + var capturedConfig any + ct := &configCapturingTrigger{ + mockTrigger: mockTrigger{ + name: module.EventBusTriggerName, + configType: "eventbus", + }, + captureFunc: func(cfg any) { capturedConfig = cfg }, + } + engine.RegisterTrigger(ct) + + // Register the eventbus wrapper so the flat pipeline config is translated + // into the EventBusTrigger's native subscriptions format. + engine.RegisterTriggerConfigWrapper("eventbus", func(pipelineName string, cfg map[string]any) map[string]any { + sub := map[string]any{ + "workflow": "pipeline:" + pipelineName, + "action": "execute", + } + if t, ok := cfg["topic"]; ok { + sub["topic"] = t + } + if ev, ok := cfg["event"]; ok { + sub["event"] = ev + } + if async, ok := cfg["async"]; ok { + sub["async"] = async + } + return map[string]any{"subscriptions": []any{sub}} + }) + + pipelineCfg := map[string]any{ + "process-follow-up-event": map[string]any{ + "trigger": map[string]any{ + "type": "eventbus", + "config": map[string]any{ + "topic": "follow-up.created", + }, + }, + "steps": []any{ + map[string]any{ + "name": "log-event", + "type": "step.log", + "config": map[string]any{ + "level": "info", + "message": "EventBus event received", + }, + }, + }, + }, + } + + err := engine.configurePipelines(pipelineCfg) + if err != nil { + t.Fatalf("configurePipelines failed: %v", err) + } + + if !pipelineHandler.CanHandle("process-follow-up-event") { + t.Error("expected pipeline to be registered") + } + + if !ct.configuredCalled { + t.Error("expected eventbus trigger to be configured") + } + + // Verify the flat config was wrapped into the EventBusTrigger subscriptions format. + cfgMap, ok := capturedConfig.(map[string]any) + if !ok { + t.Fatalf("expected map config, got %T", capturedConfig) + } + subs, ok := cfgMap["subscriptions"].([]any) + if !ok || len(subs) != 1 { + t.Fatalf("expected subscriptions slice with 1 entry, got %v", cfgMap) + } + sub, ok := subs[0].(map[string]any) + if !ok { + t.Fatalf("expected subscription map, got %T", subs[0]) + } + if sub["workflow"] != "pipeline:process-follow-up-event" { + t.Errorf("unexpected workflow: %v", sub["workflow"]) + } + if sub["action"] != "execute" { + t.Errorf("expected action=execute, got %v", sub["action"]) + } + if sub["topic"] != "follow-up.created" { + t.Errorf("expected topic=follow-up.created, got %v", sub["topic"]) + } +} + func TestPipeline_ConfigurePipelines_RejectsUnknownStepType(t *testing.T) { engine, _ := setupPipelineEngine(t) @@ -528,3 +625,17 @@ func TestPipeline_ConfigurePipelines_NoPipelineHandler(t *testing.T) { t.Errorf("expected 'no PipelineWorkflowHandler' in error, got: %v", err) } } + +// configCapturingTrigger wraps mockTrigger and records the config passed to Configure. +type configCapturingTrigger struct { + mockTrigger + captureFunc func(any) +} + +func (t *configCapturingTrigger) Configure(app modular.Application, triggerConfig any) error { + t.configuredCalled = true + if t.captureFunc != nil { + t.captureFunc(triggerConfig) + } + return nil +} diff --git a/plugins/messaging/plugin.go b/plugins/messaging/plugin.go index fb93eabd..03c0c4c8 100644 --- a/plugins/messaging/plugin.go +++ b/plugins/messaging/plugin.go @@ -149,6 +149,7 @@ func (p *Plugin) PipelineTriggerConfigWrappers() map[string]plugin.TriggerConfig "event": func(pipelineName string, cfg map[string]any) map[string]any { sub := map[string]any{ "workflow": "pipeline:" + pipelineName, + "action": "execute", } if t, ok := cfg["topic"]; ok { sub["topic"] = t @@ -160,6 +161,24 @@ func (p *Plugin) PipelineTriggerConfigWrappers() map[string]plugin.TriggerConfig "subscriptions": []any{sub}, } }, + "eventbus": func(pipelineName string, cfg map[string]any) map[string]any { + sub := map[string]any{ + "workflow": "pipeline:" + pipelineName, + "action": "execute", + } + if t, ok := cfg["topic"]; ok { + sub["topic"] = t + } + if ev, ok := cfg["event"]; ok { + sub["event"] = ev + } + if async, ok := cfg["async"]; ok { + sub["async"] = async + } + return map[string]any{ + "subscriptions": []any{sub}, + } + }, } } diff --git a/plugins/messaging/plugin_test.go b/plugins/messaging/plugin_test.go index 6841777c..eacc3021 100644 --- a/plugins/messaging/plugin_test.go +++ b/plugins/messaging/plugin_test.go @@ -253,3 +253,104 @@ func TestManifestWorkflowTypes(t *testing.T) { t.Errorf("expected workflow type 'messaging', got %q", m.WorkflowTypes[0]) } } + +func TestPipelineTriggerConfigWrappers(t *testing.T) { + p := New() + wrappers := p.PipelineTriggerConfigWrappers() + + for _, triggerType := range []string{"event", "eventbus"} { + if _, ok := wrappers[triggerType]; !ok { + t.Errorf("missing pipeline trigger config wrapper: %s", triggerType) + } + } +} + +func TestPipelineTriggerConfigWrapper_Event(t *testing.T) { + p := New() + wrappers := p.PipelineTriggerConfigWrappers() + wrapper, ok := wrappers["event"] + if !ok { + t.Fatal("missing pipeline trigger config wrapper: event") + } + + cfg := map[string]any{ + "topic": "orders.created", + "event": "order.placed", + } + result := wrapper("my-pipeline", cfg) + + subs, ok := result["subscriptions"].([]any) + if !ok || len(subs) != 1 { + t.Fatalf("expected 1 subscription, got %v", result) + } + sub := subs[0].(map[string]any) + if sub["workflow"] != "pipeline:my-pipeline" { + t.Errorf("unexpected workflow: %v", sub["workflow"]) + } + if sub["action"] != "execute" { + t.Errorf("expected action=execute, got %v", sub["action"]) + } + if sub["topic"] != "orders.created" { + t.Errorf("unexpected topic: %v", sub["topic"]) + } + if sub["event"] != "order.placed" { + t.Errorf("unexpected event: %v", sub["event"]) + } +} + +func TestPipelineTriggerConfigWrapper_EventBus(t *testing.T) { + p := New() + wrappers := p.PipelineTriggerConfigWrappers() + wrapper, ok := wrappers["eventbus"] + if !ok { + t.Fatal("missing pipeline trigger config wrapper: eventbus") + } + + cfg := map[string]any{ + "topic": "follow-up.created", + "async": true, + } + result := wrapper("process-follow-up", cfg) + + subs, ok := result["subscriptions"].([]any) + if !ok || len(subs) != 1 { + t.Fatalf("expected 1 subscription, got %v", result) + } + sub := subs[0].(map[string]any) + if sub["workflow"] != "pipeline:process-follow-up" { + t.Errorf("unexpected workflow: %v", sub["workflow"]) + } + if sub["action"] != "execute" { + t.Errorf("expected action=execute, got %v", sub["action"]) + } + if sub["topic"] != "follow-up.created" { + t.Errorf("unexpected topic: %v", sub["topic"]) + } + if sub["async"] != true { + t.Errorf("expected async=true, got %v", sub["async"]) + } +} + +func TestPipelineTriggerConfigWrapper_EventBus_WithEventFilter(t *testing.T) { + p := New() + wrappers := p.PipelineTriggerConfigWrappers() + wrapper, ok := wrappers["eventbus"] + if !ok { + t.Fatal("missing pipeline trigger config wrapper: eventbus") + } + + cfg := map[string]any{ + "topic": "user.events", + "event": "user.registered", + } + result := wrapper("onboard-pipeline", cfg) + + subs, ok := result["subscriptions"].([]any) + if !ok || len(subs) != 1 { + t.Fatalf("expected 1 subscription, got %v", result) + } + sub := subs[0].(map[string]any) + if sub["event"] != "user.registered" { + t.Errorf("unexpected event filter: %v", sub["event"]) + } +}