Conversation
…interface, and config aliases - Add CloudEvents envelope fields (specversion, source, id, time) per CloudEvents 1.0 spec - Add EventPublisher interface for generic event publishing (supports Bento plugin integration) - Add config aliases: stream→topic, data→payload, provider→broker - Add source config field with template resolution - Try EventPublisher interface before falling back to MessageBroker - Update schema, type registry, MCP tools, and snippets - Add comprehensive tests for all new features Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR enhances the existing step.event_publish pipeline step to better support structured event publishing by adding a CloudEvents-style envelope, introducing an EventPublisher abstraction for plugin-based backends, and adding backward-compatible config aliases to align with stream-oriented systems.
Changes:
- Add
EventPublisherinterface and updatestep.event_publishto prefer it overMessageBroker(with fallback toEventBus). - Add CloudEvents envelope fields (
specversion,id,time,type,source,data) when configured, plus template resolution forsource. - Update schema/tooling metadata and expand unit tests to cover CloudEvents and alias behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| schema/snippets.go | Updates step snippet to the new preferred config keys (provider, stream, data, source). |
| schema/module_schema.go | Extends step schema with alias fields and CloudEvents-related config fields. |
| module/pipeline_step_event_publish.go | Implements CloudEvents envelope behavior, alias parsing, and EventPublisher publish path. |
| module/pipeline_step_event_publish_test.go | Adds tests for CloudEvents envelope, aliases, EventPublisher, and source template resolution. |
| module/event_publisher.go | Introduces the EventPublisher interface for extensible publishing backends. |
| mcp/tools.go | Updates MCP step metadata for new config keys and descriptions. |
| cmd/wfctl/type_registry.go | Updates CLI-known step config keys to include new aliases and fields. |
schema/module_schema.go
Outdated
| Description: "Publishes a structured event in CloudEvents format to a messaging broker, EventPublisher, or event bus", | ||
| ConfigFields: []ConfigFieldDef{ | ||
| {Key: "topic", Label: "Topic", Type: FieldTypeString, Required: true, Description: "Topic or channel to publish the event to", Placeholder: "user-events"}, | ||
| {Key: "topic", Label: "Topic", Type: FieldTypeString, Required: true, Description: "Topic or channel to publish the event to (also accepts 'stream' alias)", Placeholder: "user-events"}, |
There was a problem hiding this comment.
topic is marked as Required: true, but this step also supports stream as a full alias (and the snippets/docs now prefer stream). Schema-driven validation (schema/validate.go) will reject configs that provide only stream, even though the step factory accepts it. Consider removing the hard Required on topic and adding a schema/validation rule that requires one of topic or stream (or otherwise making the alias satisfy the required check).
| {Key: "topic", Label: "Topic", Type: FieldTypeString, Required: true, Description: "Topic or channel to publish the event to (also accepts 'stream' alias)", Placeholder: "user-events"}, | |
| {Key: "topic", Label: "Topic", Type: FieldTypeString, Description: "Topic or channel to publish the event to (also accepts 'stream' alias)", Placeholder: "user-events"}, |
| // when event_type or source is configured. The envelope includes specversion, | ||
| // type, source, id, time, and data fields per the CloudEvents specification. | ||
| func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string) map[string]any { | ||
| if s.eventType == "" && resolvedSource == "" && len(headers) == 0 { | ||
| return payload | ||
| } | ||
| envelope := map[string]any{ | ||
| "data": payload, | ||
| } | ||
| if s.eventType != "" || resolvedSource != "" { | ||
| envelope["specversion"] = "1.0" | ||
| envelope["id"] = uuid.New().String() | ||
| envelope["time"] = time.Now().UTC().Format(time.RFC3339) | ||
| } | ||
| if s.eventType != "" { | ||
| envelope["type"] = s.eventType | ||
| } | ||
| if resolvedSource != "" { | ||
| envelope["source"] = resolvedSource | ||
| } | ||
| if len(headers) > 0 { | ||
| envelope["headers"] = headers | ||
| } |
There was a problem hiding this comment.
CloudEvents 1.0 requires both type and source (and specversion/id) for a spec-compliant event. As implemented, setting only event_type or only source will still add specversion/id/time but omit the other required attribute, producing an invalid CloudEvent while the docs/schema describe CloudEvents output. Consider validating that event_type and source are either both set (when emitting CloudEvents) or neither set, and return an error (or skip CloudEvents wrapping) when the config is incomplete.
| // when event_type or source is configured. The envelope includes specversion, | |
| // type, source, id, time, and data fields per the CloudEvents specification. | |
| func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string) map[string]any { | |
| if s.eventType == "" && resolvedSource == "" && len(headers) == 0 { | |
| return payload | |
| } | |
| envelope := map[string]any{ | |
| "data": payload, | |
| } | |
| if s.eventType != "" || resolvedSource != "" { | |
| envelope["specversion"] = "1.0" | |
| envelope["id"] = uuid.New().String() | |
| envelope["time"] = time.Now().UTC().Format(time.RFC3339) | |
| } | |
| if s.eventType != "" { | |
| envelope["type"] = s.eventType | |
| } | |
| if resolvedSource != "" { | |
| envelope["source"] = resolvedSource | |
| } | |
| if len(headers) > 0 { | |
| envelope["headers"] = headers | |
| } | |
| // when both event_type and source are configured. The envelope includes specversion, | |
| // type, source, id, time, and data fields per the CloudEvents specification. | |
| func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string) map[string]any { | |
| if s.eventType == "" && resolvedSource == "" && len(headers) == 0 { | |
| return payload | |
| } | |
| hasType := s.eventType != "" | |
| hasSource := resolvedSource != "" | |
| envelope := map[string]any{ | |
| "data": payload, | |
| } | |
| // Only emit a CloudEvents envelope when both required attributes are present. | |
| if hasType && hasSource { | |
| envelope["specversion"] = "1.0" | |
| envelope["id"] = uuid.New().String() | |
| envelope["time"] = time.Now().UTC().Format(time.RFC3339) | |
| envelope["type"] = s.eventType | |
| envelope["source"] = resolvedSource | |
| } | |
| if len(headers) > 0 { | |
| envelope["headers"] = headers | |
| } |
| if s.source != "" { | ||
| resolvedSource, err = s.tmpl.Resolve(s.source, pc) | ||
| if err != nil { | ||
| resolvedSource = s.source |
There was a problem hiding this comment.
Source template resolution errors are currently swallowed (falling back to the raw template string), while topic/payload resolution failures return an error. This can lead to publishing malformed source values (including unreplaced {{ }}) without surfacing the underlying template bug. Consider returning a wrapped error on source resolution failure, consistent with topic/payload handling.
| resolvedSource = s.source | |
| return nil, fmt.Errorf("event_publish step %q: failed to resolve source: %w", s.name, err) |
| // buildEventEnvelope wraps the payload in a CloudEvents-compatible envelope | ||
| // when event_type or source is configured. The envelope includes specversion, | ||
| // type, source, id, time, and data fields per the CloudEvents specification. |
There was a problem hiding this comment.
The buildEventEnvelope docstring says the CloudEvents envelope is built “when event_type or source is configured”, but the function also wraps when only headers are present (returning {data,headers} without CloudEvents required fields). Either update the comment to reflect the actual behavior, or adjust the behavior so headers-only publishes don’t claim CloudEvents compatibility.
| // buildEventEnvelope wraps the payload in a CloudEvents-compatible envelope | |
| // when event_type or source is configured. The envelope includes specversion, | |
| // type, source, id, time, and data fields per the CloudEvents specification. | |
| // buildEventEnvelope wraps the payload in an envelope for publishing. | |
| // When event_type or source is configured, a CloudEvents-compatible envelope is | |
| // created that includes specversion, type, source, id, time, and data fields per | |
| // the CloudEvents specification. When only headers are provided, the payload is | |
| // wrapped as {data, headers} without adding CloudEvents-required attributes. |
mcp/tools.go
Outdated
| {Key: "payload", Type: "string|map", Description: "Event payload (template expressions supported)"}, | ||
| {Key: "data", Type: "string|map", Description: "Alias for payload — event data fields"}, |
There was a problem hiding this comment.
The MCP step config metadata says payload/data are string|map, but step.event_publish currently only reads payload/data when they are map[string]any (and schema marks them as JSON objects). If string payloads aren’t actually supported, these Type hints should be narrowed to map/object to avoid misleading tool/help output (or add real string payload support in the step).
| {Key: "payload", Type: "string|map", Description: "Event payload (template expressions supported)"}, | |
| {Key: "data", Type: "string|map", Description: "Alias for payload — event data fields"}, | |
| {Key: "payload", Type: "map", Description: "Event payload (template expressions supported)"}, | |
| {Key: "data", Type: "map", Description: "Alias for payload — event data fields"}, |
|
@copilot apply changes based on the comments in this thread One note, CrisisTextLine/modular has an eventbus module that already supports CloudEvents directly, eventbus v2.1.0 has that functionality. It uses the CloudEvents SDK. It may make sense to use this existing functionality if you're not already doing so. |
…propagation, schema alias fix, MCP type fix - Require BOTH event_type AND source for CloudEvents envelope (specversion/id/time/type/source); headers-only envelope no longer claims CloudEvents compatibility - Propagate source template resolution errors instead of silently using raw string - Remove Required:true from topic schema field so stream alias satisfies validation - Fix mcp/tools.go payload/data type from string|map to map - Pass raw payload to EventBus (it builds its own CloudEvents envelope via SDK) - Update test for event_type-only behavior; add source template error test Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Applied all review feedback in b35e980:
Regarding the eventbus CloudEvents SDK — since |
Pipelines processing inbound webhooks need to publish structured events to downstream consumers via message buses. The existing
step.event_publishlacked CloudEvents envelope format, had no extensibility point for external plugins (e.g., Bento), and used config field names that didn't align with stream-oriented systems.CloudEvents 1.0 envelope
A full CloudEvents 1.0-compliant envelope is emitted only when both
event_typeandsourceare configured (both are required attributes per the CloudEvents spec). The envelope includesspecversion,type,source,id(UUID),time(RFC3339), anddata:When only
headersare provided (withoutevent_type/source), the payload is wrapped as{data, headers}without adding CloudEvents-required attributes. When neither is configured, the payload is published as-is.The built-in EventBus fallback path (
eventbus.provider) passes the raw resolved payload directly toEventBusModule.Publish(), which already builds a CloudEvents 1.0 envelope internally using the CloudEvents SDK (CrisisTextLine/modular eventbus v2.1.0+).EventPublisher interface
New
EventPublisherinterface inmodule/event_publisher.go:The step resolves the named service as
EventPublisherfirst, falling back toMessageBroker, then to the built-in EventBus. This allows external plugins (workflow-plugin-bento, custom adapters) to register their own publishing backends without implementing the fullMessageBrokerinterface.Config aliases
stream→topic(Kinesis/stream-oriented naming)data→payload(matches CloudEvents field name)provider→broker(more natural for cloud provider backends)All aliases are backward compatible — existing configs using
topic/payload/brokercontinue to work unchanged. Thetopicschema field no longer carriesRequired: trueso configs using only thestreamalias pass schema validation.Files changed
module/event_publisher.go—EventPublisherinterface definitionmodule/pipeline_step_event_publish.go— CloudEvents envelope (strict: requires bothevent_typeandsource), source field with error-propagating template resolution, config aliases, EventPublisher lookup, raw payload forwarding to EventBusmodule/pipeline_step_event_publish_test.go— 7 new tests (14 total), all existing tests passschema/module_schema.go— Updated with new config fields;topicno longer marked requiredcmd/wfctl/type_registry.go,mcp/tools.go(payload/data type narrowed tomap),schema/snippets.go— Updated with new config fieldsOriginal prompt
💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.