Conversation
Add PipelineTriggerConfigWrapper for 'eventbus' trigger type in the
messaging plugin, enabling inline EventBus-triggered pipeline execution.
Also fix the existing 'event' wrapper to set default action "execute",
which is required by EventTrigger.Configure.
Users can now configure event-driven pipelines as:
pipelines:
process-follow-up-event:
trigger:
type: eventbus
config:
topic: follow-up.created
steps:
- ...
The wrapper converts the flat config into the EventBusTrigger's native
subscriptions format, mapping the pipeline name to workflow/action.
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
eventbus trigger type
There was a problem hiding this comment.
Pull request overview
Adds inline pipeline trigger support for EventBus-driven execution by introducing an eventbus pipeline trigger config wrapper in the messaging plugin, enabling event-driven pipeline fan-out patterns.
Changes:
- Add
eventbuspipeline trigger config wrapper (and fixeventwrapper to inject defaultaction: "execute"). - Add unit tests for
eventandeventbuswrapper outputs (topic/event/async/action/workflow mapping). - Add an engine-level pipeline configuration test for inline
eventbustrigger matching/configure invocation.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| plugins/messaging/plugin.go | Adds/updates pipeline trigger config wrappers for event and eventbus, ensuring action: execute is present and config is translated into subscriptions format. |
| plugins/messaging/plugin_test.go | Adds tests validating wrapper registration and wrapper output structure for event/eventbus. |
| engine_pipeline_test.go | Adds coverage that inline eventbus triggers are matched and Configure() is invoked during pipeline configuration. |
| p := New() | ||
| wrappers := p.PipelineTriggerConfigWrappers() | ||
| wrapper := wrappers["eventbus"] | ||
|
|
There was a problem hiding this comment.
This test pulls wrapper := wrappers["eventbus"] without checking the key exists; if the wrapper is missing, the test will panic with a nil-function call instead of producing a clear assertion failure. Capture the boolean (wrapper, ok := ...) and t.Fatal when it’s missing.
| func TestPipeline_ConfigurePipelines_InlineEventBusTrigger(t *testing.T) { | ||
| // Use a minimal engine without plugins to avoid trigger collisions. | ||
| app := newMockApplication() | ||
| engine := NewStdEngine(app, app.Logger()) | ||
| engine.AddStepType("step.log", module.NewLogStepFactory()) | ||
| pipelineHandler := handlers.NewPipelineWorkflowHandler() | ||
| engine.RegisterWorkflowHandler(pipelineHandler) | ||
|
|
||
| // Register a mock trigger that responds to "eventbus" type | ||
| mt := &mockTrigger{ | ||
| name: module.EventBusTriggerName, | ||
| configType: "eventbus", | ||
| } | ||
| engine.RegisterTrigger(mt) | ||
|
|
||
| pipelineCfg := map[string]any{ | ||
| "process-follow-up-event": map[string]any{ | ||
| "trigger": map[string]any{ | ||
| "type": "eventbus", | ||
| "config": map[string]any{ | ||
| "topic": "follow-up.created", | ||
| }, | ||
| }, | ||
| "steps": []any{ | ||
| map[string]any{ | ||
| "name": "log-event", | ||
| "type": "step.log", | ||
| "config": map[string]any{ | ||
| "level": "info", | ||
| "message": "EventBus event received", | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| err := engine.configurePipelines(pipelineCfg) | ||
| if err != nil { | ||
| t.Fatalf("configurePipelines failed: %v", err) | ||
| } | ||
|
|
||
| if !pipelineHandler.CanHandle("process-follow-up-event") { | ||
| t.Error("expected pipeline to be registered") | ||
| } | ||
|
|
||
| if !mt.configuredCalled { | ||
| t.Error("expected eventbus trigger to be configured") | ||
| } |
There was a problem hiding this comment.
TestPipeline_ConfigurePipelines_InlineEventBusTrigger currently only verifies that a trigger with configType "eventbus" gets Configure() called; it does not verify that the inline flat config was wrapped into the EventBusTrigger native subscriptions format (including the injected default action: execute). Consider capturing the config passed to the mock trigger and asserting it contains subscriptions[0].workflow == "pipeline:process-follow-up-event" and subscriptions[0].action == "execute", and/or registering the wrapper via engine.RegisterTriggerConfigWrapper("eventbus", ...) / engine.LoadPlugin(messaging.New()) so this test actually exercises the wrapper integration.
| p := New() | ||
| wrappers := p.PipelineTriggerConfigWrappers() | ||
| wrapper := wrappers["eventbus"] | ||
|
|
There was a problem hiding this comment.
This test pulls wrapper := wrappers["eventbus"] without checking the key exists; if the wrapper is missing, the test will panic with a nil-function call instead of producing a clear assertion failure. Capture the boolean (wrapper, ok := ...) and t.Fatal when it’s missing.
⏱ Benchmark Results✅ No significant performance regressions detected. benchstat comparison (baseline → PR)
|
|
@copilot apply changes based on the comments in this thread |
- Remove brittle len(wrappers)==2 assertion; only check required keys exist - Add key existence checks before calling wrappers["event"] and wrappers["eventbus"] to produce clear failures instead of nil-function panics - Extend InlineEventBusTrigger test to verify flat config is wrapped into the EventBusTrigger native subscriptions format (workflow, action, topic fields) - Add configCapturingTrigger helper to capture config passed to Configure() Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Applied all review feedback in 5b410a7:
|
Pipelines had no way to react to EventBus messages — the
EventBusTriggermodule existed but couldn't be wired via inline pipeline trigger config, making event-driven fan-out patterns impossible without workarounds.Changes
plugins/messaging/plugin.go: AddedPipelineTriggerConfigWrapperfor theeventbustrigger type, converting flat pipeline config (topic,event,async) intoEventBusTrigger's native subscriptions format. Also fixes theeventwrapper to inject a defaultaction: "execute"— previously omitted, which would causeEventTrigger.Configureto fail at runtime.plugins/messaging/plugin_test.go: Tests for botheventandeventbusconfig wrappers, covering topic, event filter, and async passthrough.engine_pipeline_test.go:TestPipeline_ConfigurePipelines_InlineEventBusTriggerverifying theeventbustrigger type is correctly matched and configured during pipeline setup.Usage
The wrapper translates this into the
EventBusTriggersubscriptions format, routing messages topipeline:process-follow-up-eventwith actionexecute.Original prompt
📱 Kick off Copilot coding agent tasks wherever you are with GitHub Mobile, available on iOS and Android.