Skip to content

Enhance step.event_publish with CloudEvents envelope and EventPublisher interface#245

Merged
intel352 merged 5 commits intomainfrom
copilot/add-step-event-publish-type
Mar 3, 2026
Merged

Enhance step.event_publish with CloudEvents envelope and EventPublisher interface#245
intel352 merged 5 commits intomainfrom
copilot/add-step-event-publish-type

Conversation

Copy link
Contributor

Copilot AI commented Mar 3, 2026

Pipelines processing inbound webhooks need to publish structured events to downstream consumers via message buses. The existing step.event_publish lacked CloudEvents envelope format, had no extensibility point for external plugins (e.g., Bento), and used config field names that didn't align with stream-oriented systems.

CloudEvents 1.0 envelope

A full CloudEvents 1.0-compliant envelope is emitted only when both event_type and source are configured (both are required attributes per the CloudEvents spec). The envelope includes specversion, type, source, id (UUID), time (RFC3339), and data:

- type: step.event_publish
  config:
    provider: kinesis
    stream: "messaging.texter-messages"
    event_type: "messaging.texter-message.received"
    source: "/chimera/messaging"
    data:
      messageId: "{{.steps.insert_message.id}}"
      texterId: "{{.steps.lookup_texter.row.id}}"

When only headers are provided (without event_type/source), the payload is wrapped as {data, headers} without adding CloudEvents-required attributes. When neither is configured, the payload is published as-is.

The built-in EventBus fallback path (eventbus.provider) passes the raw resolved payload directly to EventBusModule.Publish(), which already builds a CloudEvents 1.0 envelope internally using the CloudEvents SDK (CrisisTextLine/modular eventbus v2.1.0+).

EventPublisher interface

New EventPublisher interface in module/event_publisher.go:

type EventPublisher interface {
    PublishEvent(ctx context.Context, topic string, event map[string]any) error
}

The step resolves the named service as EventPublisher first, falling back to MessageBroker, then to the built-in EventBus. This allows external plugins (workflow-plugin-bento, custom adapters) to register their own publishing backends without implementing the full MessageBroker interface.

Config aliases

  • streamtopic (Kinesis/stream-oriented naming)
  • datapayload (matches CloudEvents field name)
  • providerbroker (more natural for cloud provider backends)

All aliases are backward compatible — existing configs using topic/payload/broker continue to work unchanged. The topic schema field no longer carries Required: true so configs using only the stream alias pass schema validation.

Files changed

  • module/event_publisher.goEventPublisher interface definition
  • module/pipeline_step_event_publish.go — CloudEvents envelope (strict: requires both event_type and source), source field with error-propagating template resolution, config aliases, EventPublisher lookup, raw payload forwarding to EventBus
  • module/pipeline_step_event_publish_test.go — 7 new tests (14 total), all existing tests pass
  • schema/module_schema.go — Updated with new config fields; topic no longer marked required
  • cmd/wfctl/type_registry.go, mcp/tools.go (payload/data type narrowed to map), schema/snippets.go — Updated with new config fields
Original prompt

This section details on the original issue you should resolve

<issue_title>New step type: step.event_publish for CloudEvents / message bus integration</issue_title>
<issue_description>## Use Case

Pipelines that process inbound webhooks (e.g., Twilio messaging) need to publish events to a message bus (Kinesis, SQS, Kafka, etc.) for downstream consumers. Currently there is no built-in step type for event publishing.

Proposed Solution

A new step.event_publish step type that publishes structured events:

- name: publish_event
  type: step.event_publish
  config:
    provider: kinesis    # or: sqs, kafka, nats, memory
    stream: "messaging.texter-messages"
    event_type: "messaging.texter-message.received"
    source: "/chimera/messaging"
    data:
      messageId: "{{.steps.insert_message.id}}"
      texterId: "{{.steps.lookup_texter.row.id}}"
      affiliateId: "{{.steps.lookup_texter.row.affiliate_id}}"
      messageBody: "{{.steps.parse.body.Body}}"

Features:

  • CloudEvents envelope format (type, source, id, time, data)
  • Multiple provider backends (start with memory/local for testing, add cloud providers)
  • Template resolution for event data fields
  • Configurable serialization (JSON)

Context

Chimera's messaging module receives Twilio webhooks and needs to publish texter-message-received events for downstream conversation routing. On the main branch this uses a Go EventBus + Kinesis. The workflow engine branch currently skips event publishing since there's no built-in step for it.

🤖 Generated with Claude Code</issue_description>

<agent_instructions>In addition to this request, think about the in-memory events, kinesis events, kafka events, nats events that are supported in workflow engine via Modular, but also think about how we could use this with the Bento plugin (https://github.com/GoCodeAlone/workflow-plugin-bento), create an approach that can work with both eventing systems.</agent_instructions>

Comments on the Issue (you are @copilot in this section)


💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.

…interface, and config aliases

- Add CloudEvents envelope fields (specversion, source, id, time) per CloudEvents 1.0 spec
- Add EventPublisher interface for generic event publishing (supports Bento plugin integration)
- Add config aliases: stream→topic, data→payload, provider→broker
- Add source config field with template resolution
- Try EventPublisher interface before falling back to MessageBroker
- Update schema, type registry, MCP tools, and snippets
- Add comprehensive tests for all new features

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Copilot AI changed the title [WIP] Add new step type: step.event_publish for event publishing Enhance step.event_publish with CloudEvents envelope and EventPublisher interface Mar 3, 2026
@intel352 intel352 marked this pull request as ready for review March 3, 2026 19:23
Copilot AI review requested due to automatic review settings March 3, 2026 19:23
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances the existing step.event_publish pipeline step to better support structured event publishing by adding a CloudEvents-style envelope, introducing an EventPublisher abstraction for plugin-based backends, and adding backward-compatible config aliases to align with stream-oriented systems.

Changes:

  • Add EventPublisher interface and update step.event_publish to prefer it over MessageBroker (with fallback to EventBus).
  • Add CloudEvents envelope fields (specversion, id, time, type, source, data) when configured, plus template resolution for source.
  • Update schema/tooling metadata and expand unit tests to cover CloudEvents and alias behavior.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
schema/snippets.go Updates step snippet to the new preferred config keys (provider, stream, data, source).
schema/module_schema.go Extends step schema with alias fields and CloudEvents-related config fields.
module/pipeline_step_event_publish.go Implements CloudEvents envelope behavior, alias parsing, and EventPublisher publish path.
module/pipeline_step_event_publish_test.go Adds tests for CloudEvents envelope, aliases, EventPublisher, and source template resolution.
module/event_publisher.go Introduces the EventPublisher interface for extensible publishing backends.
mcp/tools.go Updates MCP step metadata for new config keys and descriptions.
cmd/wfctl/type_registry.go Updates CLI-known step config keys to include new aliases and fields.

Description: "Publishes a structured event in CloudEvents format to a messaging broker, EventPublisher, or event bus",
ConfigFields: []ConfigFieldDef{
{Key: "topic", Label: "Topic", Type: FieldTypeString, Required: true, Description: "Topic or channel to publish the event to", Placeholder: "user-events"},
{Key: "topic", Label: "Topic", Type: FieldTypeString, Required: true, Description: "Topic or channel to publish the event to (also accepts 'stream' alias)", Placeholder: "user-events"},
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic is marked as Required: true, but this step also supports stream as a full alias (and the snippets/docs now prefer stream). Schema-driven validation (schema/validate.go) will reject configs that provide only stream, even though the step factory accepts it. Consider removing the hard Required on topic and adding a schema/validation rule that requires one of topic or stream (or otherwise making the alias satisfy the required check).

Suggested change
{Key: "topic", Label: "Topic", Type: FieldTypeString, Required: true, Description: "Topic or channel to publish the event to (also accepts 'stream' alias)", Placeholder: "user-events"},
{Key: "topic", Label: "Topic", Type: FieldTypeString, Description: "Topic or channel to publish the event to (also accepts 'stream' alias)", Placeholder: "user-events"},

Copilot uses AI. Check for mistakes.
Comment on lines 146 to 168
// when event_type or source is configured. The envelope includes specversion,
// type, source, id, time, and data fields per the CloudEvents specification.
func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string) map[string]any {
if s.eventType == "" && resolvedSource == "" && len(headers) == 0 {
return payload
}
envelope := map[string]any{
"data": payload,
}
if s.eventType != "" || resolvedSource != "" {
envelope["specversion"] = "1.0"
envelope["id"] = uuid.New().String()
envelope["time"] = time.Now().UTC().Format(time.RFC3339)
}
if s.eventType != "" {
envelope["type"] = s.eventType
}
if resolvedSource != "" {
envelope["source"] = resolvedSource
}
if len(headers) > 0 {
envelope["headers"] = headers
}
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloudEvents 1.0 requires both type and source (and specversion/id) for a spec-compliant event. As implemented, setting only event_type or only source will still add specversion/id/time but omit the other required attribute, producing an invalid CloudEvent while the docs/schema describe CloudEvents output. Consider validating that event_type and source are either both set (when emitting CloudEvents) or neither set, and return an error (or skip CloudEvents wrapping) when the config is incomplete.

Suggested change
// when event_type or source is configured. The envelope includes specversion,
// type, source, id, time, and data fields per the CloudEvents specification.
func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string) map[string]any {
if s.eventType == "" && resolvedSource == "" && len(headers) == 0 {
return payload
}
envelope := map[string]any{
"data": payload,
}
if s.eventType != "" || resolvedSource != "" {
envelope["specversion"] = "1.0"
envelope["id"] = uuid.New().String()
envelope["time"] = time.Now().UTC().Format(time.RFC3339)
}
if s.eventType != "" {
envelope["type"] = s.eventType
}
if resolvedSource != "" {
envelope["source"] = resolvedSource
}
if len(headers) > 0 {
envelope["headers"] = headers
}
// when both event_type and source are configured. The envelope includes specversion,
// type, source, id, time, and data fields per the CloudEvents specification.
func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string) map[string]any {
if s.eventType == "" && resolvedSource == "" && len(headers) == 0 {
return payload
}
hasType := s.eventType != ""
hasSource := resolvedSource != ""
envelope := map[string]any{
"data": payload,
}
// Only emit a CloudEvents envelope when both required attributes are present.
if hasType && hasSource {
envelope["specversion"] = "1.0"
envelope["id"] = uuid.New().String()
envelope["time"] = time.Now().UTC().Format(time.RFC3339)
envelope["type"] = s.eventType
envelope["source"] = resolvedSource
}
if len(headers) > 0 {
envelope["headers"] = headers
}

Copilot uses AI. Check for mistakes.
if s.source != "" {
resolvedSource, err = s.tmpl.Resolve(s.source, pc)
if err != nil {
resolvedSource = s.source
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Source template resolution errors are currently swallowed (falling back to the raw template string), while topic/payload resolution failures return an error. This can lead to publishing malformed source values (including unreplaced {{ }}) without surfacing the underlying template bug. Consider returning a wrapped error on source resolution failure, consistent with topic/payload handling.

Suggested change
resolvedSource = s.source
return nil, fmt.Errorf("event_publish step %q: failed to resolve source: %w", s.name, err)

Copilot uses AI. Check for mistakes.
Comment on lines +145 to +147
// buildEventEnvelope wraps the payload in a CloudEvents-compatible envelope
// when event_type or source is configured. The envelope includes specversion,
// type, source, id, time, and data fields per the CloudEvents specification.
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buildEventEnvelope docstring says the CloudEvents envelope is built “when event_type or source is configured”, but the function also wraps when only headers are present (returning {data,headers} without CloudEvents required fields). Either update the comment to reflect the actual behavior, or adjust the behavior so headers-only publishes don’t claim CloudEvents compatibility.

Suggested change
// buildEventEnvelope wraps the payload in a CloudEvents-compatible envelope
// when event_type or source is configured. The envelope includes specversion,
// type, source, id, time, and data fields per the CloudEvents specification.
// buildEventEnvelope wraps the payload in an envelope for publishing.
// When event_type or source is configured, a CloudEvents-compatible envelope is
// created that includes specversion, type, source, id, time, and data fields per
// the CloudEvents specification. When only headers are provided, the payload is
// wrapped as {data, headers} without adding CloudEvents-required attributes.

Copilot uses AI. Check for mistakes.
mcp/tools.go Outdated
Comment on lines +528 to +529
{Key: "payload", Type: "string|map", Description: "Event payload (template expressions supported)"},
{Key: "data", Type: "string|map", Description: "Alias for payload — event data fields"},
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MCP step config metadata says payload/data are string|map, but step.event_publish currently only reads payload/data when they are map[string]any (and schema marks them as JSON objects). If string payloads aren’t actually supported, these Type hints should be narrowed to map/object to avoid misleading tool/help output (or add real string payload support in the step).

Suggested change
{Key: "payload", Type: "string|map", Description: "Event payload (template expressions supported)"},
{Key: "data", Type: "string|map", Description: "Alias for payload — event data fields"},
{Key: "payload", Type: "map", Description: "Event payload (template expressions supported)"},
{Key: "data", Type: "map", Description: "Alias for payload — event data fields"},

Copilot uses AI. Check for mistakes.
@intel352
Copy link
Contributor

intel352 commented Mar 3, 2026

@copilot apply changes based on the comments in this thread

One note, CrisisTextLine/modular has an eventbus module that already supports CloudEvents directly, eventbus v2.1.0 has that functionality. It uses the CloudEvents SDK. It may make sense to use this existing functionality if you're not already doing so.

…propagation, schema alias fix, MCP type fix

- Require BOTH event_type AND source for CloudEvents envelope (specversion/id/time/type/source);
  headers-only envelope no longer claims CloudEvents compatibility
- Propagate source template resolution errors instead of silently using raw string
- Remove Required:true from topic schema field so stream alias satisfies validation
- Fix mcp/tools.go payload/data type from string|map to map
- Pass raw payload to EventBus (it builds its own CloudEvents envelope via SDK)
- Update test for event_type-only behavior; add source template error test

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Copy link
Contributor Author

Copilot AI commented Mar 3, 2026

@copilot apply changes based on the comments in this thread

One note, CrisisTextLine/modular has a...

Applied all review feedback in b35e980:

  1. CloudEvents strict validation: buildEventEnvelope now only adds specversion, id, time, type, source when both event_type and source are configured. Headers-only publishes produce {data, headers} without claiming CloudEvents compliance.

  2. Source template errors propagate: Source resolution failures now return a wrapped error (failed to resolve source) instead of silently falling back to the raw template string.

  3. Schema topic required fix: Removed Required: true from topic so configs using only stream pass schema validation.

  4. MCP types narrowed: payload/data type changed from string|map to map.

Regarding the eventbus CloudEvents SDK — since EventBusModule.Publish() already builds a proper CloudEvents 1.0 envelope internally (using the SDK), the EventBus path now receives the raw resolved payload directly instead of the pre-built envelope, avoiding double-wrapping.

@intel352 intel352 merged commit 68c8c53 into main Mar 3, 2026
14 checks passed
@intel352 intel352 deleted the copilot/add-step-event-publish-type branch March 3, 2026 21:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

New step type: step.event_publish for CloudEvents / message bus integration

3 participants