From 45e6faccf5f169f420dc1fb0faff6d34e22031cc Mon Sep 17 00:00:00 2001 From: maxnawa Date: Mon, 9 Mar 2026 13:31:13 -0700 Subject: [PATCH 1/4] feat(eventbus): add PublishEvent method for pre-built CloudEvents --- modules/eventbus/module.go | 25 +++++++++++++++++ modules/eventbus/module_test.go | 50 +++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/modules/eventbus/module.go b/modules/eventbus/module.go index 9edfbc44..9680025d 100644 --- a/modules/eventbus/module.go +++ b/modules/eventbus/module.go @@ -423,6 +423,31 @@ func (m *EventBusModule) Constructor() modular.ModuleConstructor { } } +// PublishEvent publishes a pre-built CloudEvents event to the event bus. +// Unlike Publish, this method does not wrap the payload in a new CloudEvent — +// it routes the provided event directly. This allows callers to set custom +// CloudEvent extensions (e.g., encryption metadata) before publishing. +func (m *EventBusModule) PublishEvent(ctx context.Context, event Event) error { + startTime := time.Now() + topic := event.Type() + err := m.router.Publish(ctx, event) + duration := time.Since(startTime) + if err != nil { + go m.emitEvent(ctx, EventTypeMessageFailed, map[string]interface{}{ + "topic": topic, + "error": err.Error(), + "duration_ms": duration.Milliseconds(), + }) + return fmt.Errorf("publishing event to topic %s: %w", topic, err) + } + + go m.emitEvent(ctx, EventTypeMessagePublished, map[string]interface{}{ + "topic": topic, + "duration_ms": duration.Milliseconds(), + }) + return nil +} + // Publish publishes an event to the event bus. // Creates an Event struct with the provided type and payload, then // sends it through the event bus for processing by subscribers. diff --git a/modules/eventbus/module_test.go b/modules/eventbus/module_test.go index 07ed7a80..88e71734 100644 --- a/modules/eventbus/module_test.go +++ b/modules/eventbus/module_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/CrisisTextLine/modular" + cevent "github.com/cloudevents/sdk-go/v2/event" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -374,3 +375,52 @@ func TestEventBusServiceProvider(t *testing.T) { required := module.RequiresServices() assert.Empty(t, required) } +func TestPublishEvent_RoutesPreBuiltCloudEvent(t *testing.T) { + // Setup: create and start the module (same pattern as TestEventBusOperations) + module := NewModule().(*EventBusModule) + app := newMockApp() + err := module.RegisterConfig(app) + require.NoError(t, err) + err = module.Init(app) + require.NoError(t, err) + ctx := context.Background() + err = module.Start(ctx) + require.NoError(t, err) + defer module.Stop(ctx) + + // Create a pre-built CloudEvent with custom extensions + event := cevent.New() + event.SetType("test.event") + event.SetSource("/test/source") + event.SetID("test-id-123") + event.SetTime(time.Now()) + event.SetData("application/json", map[string]string{"key": "value"}) + event.SetExtension("encryption", "aes-256-gcm") + event.SetExtension("encryptedfields", `["key"]`) + + // Subscribe to capture the event + eventReceived := make(chan Event, 1) + subscription, err := module.Subscribe(ctx, "test.event", func(ctx context.Context, e Event) error { + eventReceived <- e + return nil + }) + require.NoError(t, err) + defer subscription.Cancel() + + // Act + err = module.PublishEvent(ctx, event) + require.NoError(t, err) + + // Assert + select { + case received := <-eventReceived: + assert.Equal(t, "test.event", received.Type()) + assert.Equal(t, "/test/source", received.Source()) + assert.Equal(t, "test-id-123", received.ID()) + // Extensions should be preserved (not overwritten by module) + assert.Equal(t, "aes-256-gcm", received.Extensions()["encryption"]) + assert.Equal(t, `["key"]`, received.Extensions()["encryptedfields"]) + case <-time.After(5 * time.Second): + t.Fatal("Event not received within timeout") + } +} From 7db8dde35216c4aac640f5dffc88cf88fd5125e3 Mon Sep 17 00:00:00 2001 From: maxnawa Date: Wed, 11 Mar 2026 07:30:15 -0700 Subject: [PATCH 2/4] Lint fix --- modules/eventbus/module_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/eventbus/module_test.go b/modules/eventbus/module_test.go index 88e71734..11ffa838 100644 --- a/modules/eventbus/module_test.go +++ b/modules/eventbus/module_test.go @@ -375,6 +375,7 @@ func TestEventBusServiceProvider(t *testing.T) { required := module.RequiresServices() assert.Empty(t, required) } + func TestPublishEvent_RoutesPreBuiltCloudEvent(t *testing.T) { // Setup: create and start the module (same pattern as TestEventBusOperations) module := NewModule().(*EventBusModule) From 3ba19bcf04e290a751c254b2dfa52e53cf8e2a87 Mon Sep 17 00:00:00 2001 From: maxnawa Date: Thu, 12 Mar 2026 09:16:42 -0700 Subject: [PATCH 3/4] fix: address PR review feedback on error handling and lint - Capture event.SetData error in test - Assert no error on deferred Stop and Cancel calls - Suppress gosec G118 false positives for context.WithCancel stored on struct --- modules/eventbus/durable_memory.go | 2 +- modules/eventbus/kafka.go | 2 +- modules/eventbus/kinesis.go | 2 +- modules/eventbus/module_test.go | 11 ++++++++--- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/modules/eventbus/durable_memory.go b/modules/eventbus/durable_memory.go index 00cf7db9..b7890367 100644 --- a/modules/eventbus/durable_memory.go +++ b/modules/eventbus/durable_memory.go @@ -216,7 +216,7 @@ func (d *DurableMemoryEventBus) Start(ctx context.Context) error { if d.isStarted { return nil } - d.ctx, d.cancel = context.WithCancel(ctx) + d.ctx, d.cancel = context.WithCancel(ctx) //nolint:gosec // G118: cancel is stored on struct and called in Stop() d.isStarted = true return nil } diff --git a/modules/eventbus/kafka.go b/modules/eventbus/kafka.go index 0439fb94..1f11bef7 100644 --- a/modules/eventbus/kafka.go +++ b/modules/eventbus/kafka.go @@ -235,7 +235,7 @@ func (k *KafkaEventBus) Start(ctx context.Context) error { return nil } - k.ctx, k.cancel = context.WithCancel(ctx) + k.ctx, k.cancel = context.WithCancel(ctx) //nolint:gosec // G118: cancel is stored on struct and called in Stop() k.isStarted = true return nil } diff --git a/modules/eventbus/kinesis.go b/modules/eventbus/kinesis.go index b5b21e73..f3733b07 100644 --- a/modules/eventbus/kinesis.go +++ b/modules/eventbus/kinesis.go @@ -210,7 +210,7 @@ func (k *KinesisEventBus) Start(ctx context.Context) error { if k.config.PollInterval <= 0 { k.config.PollInterval = DefaultKinesisPollInterval } - k.ctx, k.cancel = context.WithCancel(ctx) + k.ctx, k.cancel = context.WithCancel(ctx) //nolint:gosec // G118: cancel is stored on struct and called in Stop() k.isStarted = true return nil } diff --git a/modules/eventbus/module_test.go b/modules/eventbus/module_test.go index 11ffa838..b43f12a4 100644 --- a/modules/eventbus/module_test.go +++ b/modules/eventbus/module_test.go @@ -387,7 +387,9 @@ func TestPublishEvent_RoutesPreBuiltCloudEvent(t *testing.T) { ctx := context.Background() err = module.Start(ctx) require.NoError(t, err) - defer module.Stop(ctx) + defer func() { + require.NoError(t, module.Stop(ctx)) + }() // Create a pre-built CloudEvent with custom extensions event := cevent.New() @@ -395,7 +397,8 @@ func TestPublishEvent_RoutesPreBuiltCloudEvent(t *testing.T) { event.SetSource("/test/source") event.SetID("test-id-123") event.SetTime(time.Now()) - event.SetData("application/json", map[string]string{"key": "value"}) + err = event.SetData("application/json", map[string]string{"key": "value"}) + require.NoError(t, err) event.SetExtension("encryption", "aes-256-gcm") event.SetExtension("encryptedfields", `["key"]`) @@ -406,7 +409,9 @@ func TestPublishEvent_RoutesPreBuiltCloudEvent(t *testing.T) { return nil }) require.NoError(t, err) - defer subscription.Cancel() + defer func() { + require.NoError(t, subscription.Cancel()) + }() // Act err = module.PublishEvent(ctx, event) From 58b51858420050b49ea96baec56f7c4037ca8980 Mon Sep 17 00:00:00 2001 From: Tom Haskins-Vaughan Date: Mon, 16 Mar 2026 15:56:30 -0400 Subject: [PATCH 4/4] Fixing copilot suggestions --- modules/eventbus/module.go | 40 +++++++++++++-------------------- modules/eventbus/module_test.go | 26 +++++++++++++++++++-- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/modules/eventbus/module.go b/modules/eventbus/module.go index 9680025d..f703e10d 100644 --- a/modules/eventbus/module.go +++ b/modules/eventbus/module.go @@ -423,29 +423,16 @@ func (m *EventBusModule) Constructor() modular.ModuleConstructor { } } -// PublishEvent publishes a pre-built CloudEvents event to the event bus. +// PublishCloudEvent publishes a pre-built CloudEvents event to the event bus. // Unlike Publish, this method does not wrap the payload in a new CloudEvent — // it routes the provided event directly. This allows callers to set custom // CloudEvent extensions (e.g., encryption metadata) before publishing. -func (m *EventBusModule) PublishEvent(ctx context.Context, event Event) error { - startTime := time.Now() - topic := event.Type() - err := m.router.Publish(ctx, event) - duration := time.Since(startTime) - if err != nil { - go m.emitEvent(ctx, EventTypeMessageFailed, map[string]interface{}{ - "topic": topic, - "error": err.Error(), - "duration_ms": duration.Milliseconds(), - }) - return fmt.Errorf("publishing event to topic %s: %w", topic, err) - } - - go m.emitEvent(ctx, EventTypeMessagePublished, map[string]interface{}{ - "topic": topic, - "duration_ms": duration.Milliseconds(), - }) - return nil +func (m *EventBusModule) PublishCloudEvent(ctx context.Context, event Event) error { + // Clone the event so the module owns an immutable snapshot for delivery. + // This prevents the caller from mutating the event (or its Extensions map / + // data slice) after PublishCloudEvent returns, which could otherwise cause + // data races with async subscribers that process the event concurrently. + return m.publishEvent(ctx, event.Clone()) } // Publish publishes an event to the event bus. @@ -470,26 +457,31 @@ func (m *EventBusModule) Publish(ctx context.Context, topic string, payload inte if err := event.SetData("application/json", payload); err != nil { return fmt.Errorf("failed to set event data: %w", err) } + return m.publishEvent(ctx, event) +} + +// publishEvent routes a fully-constructed event through the engine router and +// emits the appropriate telemetry event on success or failure. Both Publish and +// PublishCloudEvent delegate here so the timing, error-formatting, and emitted +// fields stay in one place. +func (m *EventBusModule) publishEvent(ctx context.Context, event Event) error { + topic := event.Type() startTime := time.Now() err := m.router.Publish(ctx, event) duration := time.Since(startTime) if err != nil { - // Emit message failed event go m.emitEvent(ctx, EventTypeMessageFailed, map[string]interface{}{ "topic": topic, "error": err.Error(), "duration_ms": duration.Milliseconds(), }) - return fmt.Errorf("publishing event to topic %s: %w", topic, err) } - // Emit message published event go m.emitEvent(ctx, EventTypeMessagePublished, map[string]interface{}{ "topic": topic, "duration_ms": duration.Milliseconds(), }) - return nil } diff --git a/modules/eventbus/module_test.go b/modules/eventbus/module_test.go index b43f12a4..135d159f 100644 --- a/modules/eventbus/module_test.go +++ b/modules/eventbus/module_test.go @@ -376,7 +376,7 @@ func TestEventBusServiceProvider(t *testing.T) { assert.Empty(t, required) } -func TestPublishEvent_RoutesPreBuiltCloudEvent(t *testing.T) { +func TestPublishCloudEvent_RoutesPreBuiltCloudEvent(t *testing.T) { // Setup: create and start the module (same pattern as TestEventBusOperations) module := NewModule().(*EventBusModule) app := newMockApp() @@ -414,7 +414,7 @@ func TestPublishEvent_RoutesPreBuiltCloudEvent(t *testing.T) { }() // Act - err = module.PublishEvent(ctx, event) + err = module.PublishCloudEvent(ctx, event) require.NoError(t, err) // Assert @@ -430,3 +430,25 @@ func TestPublishEvent_RoutesPreBuiltCloudEvent(t *testing.T) { t.Fatal("Event not received within timeout") } } + +func TestPublishCloudEvent_ReturnsErrorWhenBusStopped(t *testing.T) { + module := NewModule().(*EventBusModule) + app := newMockApp() + err := module.RegisterConfig(app) + require.NoError(t, err) + err = module.Init(app) + require.NoError(t, err) + ctx := context.Background() + err = module.Start(ctx) + require.NoError(t, err) + require.NoError(t, module.Stop(ctx)) + + event := cevent.New() + event.SetType("test.event") + event.SetSource("/test/source") + event.SetID("test-id-456") + + err = module.PublishCloudEvent(ctx, event) + require.Error(t, err) + assert.ErrorContains(t, err, "test.event") +}