From d504de32a7d794fb7dbe23b59c42a634fe4c6081 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:35:11 +0000 Subject: [PATCH 1/3] Initial plan From 52993ca3f90d92a7b673a8ddbea362137233f485 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:51:10 +0000 Subject: [PATCH 2/3] feat: add eventbus pipeline trigger config wrapper Add PipelineTriggerConfigWrapper for 'eventbus' trigger type in the messaging plugin, enabling inline EventBus-triggered pipeline execution. Also fix the existing 'event' wrapper to set default action "execute", which is required by EventTrigger.Configure. Users can now configure event-driven pipelines as: pipelines: process-follow-up-event: trigger: type: eventbus config: topic: follow-up.created steps: - ... The wrapper converts the flat config into the EventBusTrigger's native subscriptions format, mapping the pipeline name to workflow/action. Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- engine_pipeline_test.go | 50 +++++++++++++++++ plugins/messaging/plugin.go | 19 +++++++ plugins/messaging/plugin_test.go | 96 ++++++++++++++++++++++++++++++++ 3 files changed, 165 insertions(+) diff --git a/engine_pipeline_test.go b/engine_pipeline_test.go index 565ff05c..8fc432ff 100644 --- a/engine_pipeline_test.go +++ b/engine_pipeline_test.go @@ -329,6 +329,56 @@ func TestPipeline_ConfigurePipelines_InlineEventTrigger(t *testing.T) { } } +func TestPipeline_ConfigurePipelines_InlineEventBusTrigger(t *testing.T) { + // Use a minimal engine without plugins to avoid trigger collisions. + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + engine.AddStepType("step.log", module.NewLogStepFactory()) + pipelineHandler := handlers.NewPipelineWorkflowHandler() + engine.RegisterWorkflowHandler(pipelineHandler) + + // Register a mock trigger that responds to "eventbus" type + mt := &mockTrigger{ + name: module.EventBusTriggerName, + configType: "eventbus", + } + engine.RegisterTrigger(mt) + + pipelineCfg := map[string]any{ + "process-follow-up-event": map[string]any{ + "trigger": map[string]any{ + "type": "eventbus", + "config": map[string]any{ + "topic": "follow-up.created", + }, + }, + "steps": []any{ + map[string]any{ + "name": "log-event", + "type": "step.log", + "config": map[string]any{ + "level": "info", + "message": "EventBus event received", + }, + }, + }, + }, + } + + err := engine.configurePipelines(pipelineCfg) + if err != nil { + t.Fatalf("configurePipelines failed: %v", err) + } + + if !pipelineHandler.CanHandle("process-follow-up-event") { + t.Error("expected pipeline to be registered") + } + + if !mt.configuredCalled { + t.Error("expected eventbus trigger to be configured") + } +} + func TestPipeline_ConfigurePipelines_RejectsUnknownStepType(t *testing.T) { engine, _ := setupPipelineEngine(t) diff --git a/plugins/messaging/plugin.go b/plugins/messaging/plugin.go index fb93eabd..03c0c4c8 100644 --- a/plugins/messaging/plugin.go +++ b/plugins/messaging/plugin.go @@ -149,6 +149,7 @@ func (p *Plugin) PipelineTriggerConfigWrappers() map[string]plugin.TriggerConfig "event": func(pipelineName string, cfg map[string]any) map[string]any { sub := map[string]any{ "workflow": "pipeline:" + pipelineName, + "action": "execute", } if t, ok := cfg["topic"]; ok { sub["topic"] = t @@ -160,6 +161,24 @@ func (p *Plugin) PipelineTriggerConfigWrappers() map[string]plugin.TriggerConfig "subscriptions": []any{sub}, } }, + "eventbus": func(pipelineName string, cfg map[string]any) map[string]any { + sub := map[string]any{ + "workflow": "pipeline:" + pipelineName, + "action": "execute", + } + if t, ok := cfg["topic"]; ok { + sub["topic"] = t + } + if ev, ok := cfg["event"]; ok { + sub["event"] = ev + } + if async, ok := cfg["async"]; ok { + sub["async"] = async + } + return map[string]any{ + "subscriptions": []any{sub}, + } + }, } } diff --git a/plugins/messaging/plugin_test.go b/plugins/messaging/plugin_test.go index 6841777c..c50b5f54 100644 --- a/plugins/messaging/plugin_test.go +++ b/plugins/messaging/plugin_test.go @@ -253,3 +253,99 @@ func TestManifestWorkflowTypes(t *testing.T) { t.Errorf("expected workflow type 'messaging', got %q", m.WorkflowTypes[0]) } } + +func TestPipelineTriggerConfigWrappers(t *testing.T) { + p := New() + wrappers := p.PipelineTriggerConfigWrappers() + + if len(wrappers) != 2 { + t.Fatalf("expected 2 pipeline trigger config wrappers, got %d", len(wrappers)) + } + + for _, triggerType := range []string{"event", "eventbus"} { + if _, ok := wrappers[triggerType]; !ok { + t.Errorf("missing pipeline trigger config wrapper: %s", triggerType) + } + } +} + +func TestPipelineTriggerConfigWrapper_Event(t *testing.T) { + p := New() + wrappers := p.PipelineTriggerConfigWrappers() + wrapper := wrappers["event"] + + cfg := map[string]any{ + "topic": "orders.created", + "event": "order.placed", + } + result := wrapper("my-pipeline", cfg) + + subs, ok := result["subscriptions"].([]any) + if !ok || len(subs) != 1 { + t.Fatalf("expected 1 subscription, got %v", result) + } + sub := subs[0].(map[string]any) + if sub["workflow"] != "pipeline:my-pipeline" { + t.Errorf("unexpected workflow: %v", sub["workflow"]) + } + if sub["action"] != "execute" { + t.Errorf("expected action=execute, got %v", sub["action"]) + } + if sub["topic"] != "orders.created" { + t.Errorf("unexpected topic: %v", sub["topic"]) + } + if sub["event"] != "order.placed" { + t.Errorf("unexpected event: %v", sub["event"]) + } +} + +func TestPipelineTriggerConfigWrapper_EventBus(t *testing.T) { + p := New() + wrappers := p.PipelineTriggerConfigWrappers() + wrapper := wrappers["eventbus"] + + cfg := map[string]any{ + "topic": "follow-up.created", + "async": true, + } + result := wrapper("process-follow-up", cfg) + + subs, ok := result["subscriptions"].([]any) + if !ok || len(subs) != 1 { + t.Fatalf("expected 1 subscription, got %v", result) + } + sub := subs[0].(map[string]any) + if sub["workflow"] != "pipeline:process-follow-up" { + t.Errorf("unexpected workflow: %v", sub["workflow"]) + } + if sub["action"] != "execute" { + t.Errorf("expected action=execute, got %v", sub["action"]) + } + if sub["topic"] != "follow-up.created" { + t.Errorf("unexpected topic: %v", sub["topic"]) + } + if sub["async"] != true { + t.Errorf("expected async=true, got %v", sub["async"]) + } +} + +func TestPipelineTriggerConfigWrapper_EventBus_WithEventFilter(t *testing.T) { + p := New() + wrappers := p.PipelineTriggerConfigWrappers() + wrapper := wrappers["eventbus"] + + cfg := map[string]any{ + "topic": "user.events", + "event": "user.registered", + } + result := wrapper("onboard-pipeline", cfg) + + subs, ok := result["subscriptions"].([]any) + if !ok || len(subs) != 1 { + t.Fatalf("expected 1 subscription, got %v", result) + } + sub := subs[0].(map[string]any) + if sub["event"] != "user.registered" { + t.Errorf("unexpected event filter: %v", sub["event"]) + } +} From 5b410a7cfc30178d83fdafb7f2332c59a694243f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 16:26:53 +0000 Subject: [PATCH 3/3] test: address review feedback on trigger config wrapper tests - Remove brittle len(wrappers)==2 assertion; only check required keys exist - Add key existence checks before calling wrappers["event"] and wrappers["eventbus"] to produce clear failures instead of nil-function panics - Extend InlineEventBusTrigger test to verify flat config is wrapped into the EventBusTrigger native subscriptions format (workflow, action, topic fields) - Add configCapturingTrigger helper to capture config passed to Configure() Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- engine_pipeline_test.go | 73 +++++++++++++++++++++++++++++--- plugins/messaging/plugin_test.go | 19 ++++++--- 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/engine_pipeline_test.go b/engine_pipeline_test.go index 8fc432ff..d6b59a08 100644 --- a/engine_pipeline_test.go +++ b/engine_pipeline_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/GoCodeAlone/modular" "github.com/GoCodeAlone/workflow/handlers" "github.com/GoCodeAlone/workflow/module" pluginpipeline "github.com/GoCodeAlone/workflow/plugins/pipelinesteps" @@ -337,12 +338,35 @@ func TestPipeline_ConfigurePipelines_InlineEventBusTrigger(t *testing.T) { pipelineHandler := handlers.NewPipelineWorkflowHandler() engine.RegisterWorkflowHandler(pipelineHandler) - // Register a mock trigger that responds to "eventbus" type - mt := &mockTrigger{ - name: module.EventBusTriggerName, - configType: "eventbus", + // Register a config-capturing mock trigger that responds to "eventbus" type. + var capturedConfig any + ct := &configCapturingTrigger{ + mockTrigger: mockTrigger{ + name: module.EventBusTriggerName, + configType: "eventbus", + }, + captureFunc: func(cfg any) { capturedConfig = cfg }, } - engine.RegisterTrigger(mt) + engine.RegisterTrigger(ct) + + // Register the eventbus wrapper so the flat pipeline config is translated + // into the EventBusTrigger's native subscriptions format. + engine.RegisterTriggerConfigWrapper("eventbus", func(pipelineName string, cfg map[string]any) map[string]any { + sub := map[string]any{ + "workflow": "pipeline:" + pipelineName, + "action": "execute", + } + if t, ok := cfg["topic"]; ok { + sub["topic"] = t + } + if ev, ok := cfg["event"]; ok { + sub["event"] = ev + } + if async, ok := cfg["async"]; ok { + sub["async"] = async + } + return map[string]any{"subscriptions": []any{sub}} + }) pipelineCfg := map[string]any{ "process-follow-up-event": map[string]any{ @@ -374,9 +398,32 @@ func TestPipeline_ConfigurePipelines_InlineEventBusTrigger(t *testing.T) { t.Error("expected pipeline to be registered") } - if !mt.configuredCalled { + if !ct.configuredCalled { t.Error("expected eventbus trigger to be configured") } + + // Verify the flat config was wrapped into the EventBusTrigger subscriptions format. + cfgMap, ok := capturedConfig.(map[string]any) + if !ok { + t.Fatalf("expected map config, got %T", capturedConfig) + } + subs, ok := cfgMap["subscriptions"].([]any) + if !ok || len(subs) != 1 { + t.Fatalf("expected subscriptions slice with 1 entry, got %v", cfgMap) + } + sub, ok := subs[0].(map[string]any) + if !ok { + t.Fatalf("expected subscription map, got %T", subs[0]) + } + if sub["workflow"] != "pipeline:process-follow-up-event" { + t.Errorf("unexpected workflow: %v", sub["workflow"]) + } + if sub["action"] != "execute" { + t.Errorf("expected action=execute, got %v", sub["action"]) + } + if sub["topic"] != "follow-up.created" { + t.Errorf("expected topic=follow-up.created, got %v", sub["topic"]) + } } func TestPipeline_ConfigurePipelines_RejectsUnknownStepType(t *testing.T) { @@ -578,3 +625,17 @@ func TestPipeline_ConfigurePipelines_NoPipelineHandler(t *testing.T) { t.Errorf("expected 'no PipelineWorkflowHandler' in error, got: %v", err) } } + +// configCapturingTrigger wraps mockTrigger and records the config passed to Configure. +type configCapturingTrigger struct { + mockTrigger + captureFunc func(any) +} + +func (t *configCapturingTrigger) Configure(app modular.Application, triggerConfig any) error { + t.configuredCalled = true + if t.captureFunc != nil { + t.captureFunc(triggerConfig) + } + return nil +} diff --git a/plugins/messaging/plugin_test.go b/plugins/messaging/plugin_test.go index c50b5f54..eacc3021 100644 --- a/plugins/messaging/plugin_test.go +++ b/plugins/messaging/plugin_test.go @@ -258,10 +258,6 @@ func TestPipelineTriggerConfigWrappers(t *testing.T) { p := New() wrappers := p.PipelineTriggerConfigWrappers() - if len(wrappers) != 2 { - t.Fatalf("expected 2 pipeline trigger config wrappers, got %d", len(wrappers)) - } - for _, triggerType := range []string{"event", "eventbus"} { if _, ok := wrappers[triggerType]; !ok { t.Errorf("missing pipeline trigger config wrapper: %s", triggerType) @@ -272,7 +268,10 @@ func TestPipelineTriggerConfigWrappers(t *testing.T) { func TestPipelineTriggerConfigWrapper_Event(t *testing.T) { p := New() wrappers := p.PipelineTriggerConfigWrappers() - wrapper := wrappers["event"] + wrapper, ok := wrappers["event"] + if !ok { + t.Fatal("missing pipeline trigger config wrapper: event") + } cfg := map[string]any{ "topic": "orders.created", @@ -302,7 +301,10 @@ func TestPipelineTriggerConfigWrapper_Event(t *testing.T) { func TestPipelineTriggerConfigWrapper_EventBus(t *testing.T) { p := New() wrappers := p.PipelineTriggerConfigWrappers() - wrapper := wrappers["eventbus"] + wrapper, ok := wrappers["eventbus"] + if !ok { + t.Fatal("missing pipeline trigger config wrapper: eventbus") + } cfg := map[string]any{ "topic": "follow-up.created", @@ -332,7 +334,10 @@ func TestPipelineTriggerConfigWrapper_EventBus(t *testing.T) { func TestPipelineTriggerConfigWrapper_EventBus_WithEventFilter(t *testing.T) { p := New() wrappers := p.PipelineTriggerConfigWrappers() - wrapper := wrappers["eventbus"] + wrapper, ok := wrappers["eventbus"] + if !ok { + t.Fatal("missing pipeline trigger config wrapper: eventbus") + } cfg := map[string]any{ "topic": "user.events",