diff --git a/modules/eventbus/durable_memory.go b/modules/eventbus/durable_memory.go index 00cf7db9..b7890367 100644 --- a/modules/eventbus/durable_memory.go +++ b/modules/eventbus/durable_memory.go @@ -216,7 +216,7 @@ func (d *DurableMemoryEventBus) Start(ctx context.Context) error { if d.isStarted { return nil } - d.ctx, d.cancel = context.WithCancel(ctx) + d.ctx, d.cancel = context.WithCancel(ctx) //nolint:gosec // G118: cancel is stored on struct and called in Stop() d.isStarted = true return nil } diff --git a/modules/eventbus/kafka.go b/modules/eventbus/kafka.go index 0439fb94..1f11bef7 100644 --- a/modules/eventbus/kafka.go +++ b/modules/eventbus/kafka.go @@ -235,7 +235,7 @@ func (k *KafkaEventBus) Start(ctx context.Context) error { return nil } - k.ctx, k.cancel = context.WithCancel(ctx) + k.ctx, k.cancel = context.WithCancel(ctx) //nolint:gosec // G118: cancel is stored on struct and called in Stop() k.isStarted = true return nil } diff --git a/modules/eventbus/kinesis.go b/modules/eventbus/kinesis.go index b5b21e73..f3733b07 100644 --- a/modules/eventbus/kinesis.go +++ b/modules/eventbus/kinesis.go @@ -210,7 +210,7 @@ func (k *KinesisEventBus) Start(ctx context.Context) error { if k.config.PollInterval <= 0 { k.config.PollInterval = DefaultKinesisPollInterval } - k.ctx, k.cancel = context.WithCancel(ctx) + k.ctx, k.cancel = context.WithCancel(ctx) //nolint:gosec // G118: cancel is stored on struct and called in Stop() k.isStarted = true return nil } diff --git a/modules/eventbus/module.go b/modules/eventbus/module.go index 9edfbc44..f703e10d 100644 --- a/modules/eventbus/module.go +++ b/modules/eventbus/module.go @@ -423,6 +423,18 @@ func (m *EventBusModule) Constructor() modular.ModuleConstructor { } } +// PublishCloudEvent publishes a pre-built CloudEvents event to the event bus. +// Unlike Publish, this method does not wrap the payload in a new CloudEvent — +// it routes the provided event directly. This allows callers to set custom +// CloudEvent extensions (e.g., encryption metadata) before publishing. +func (m *EventBusModule) PublishCloudEvent(ctx context.Context, event Event) error { + // Clone the event so the module owns an immutable snapshot for delivery. + // This prevents the caller from mutating the event (or its Extensions map / + // data slice) after PublishCloudEvent returns, which could otherwise cause + // data races with async subscribers that process the event concurrently. + return m.publishEvent(ctx, event.Clone()) +} + // Publish publishes an event to the event bus. // Creates an Event struct with the provided type and payload, then // sends it through the event bus for processing by subscribers. @@ -445,26 +457,31 @@ func (m *EventBusModule) Publish(ctx context.Context, topic string, payload inte if err := event.SetData("application/json", payload); err != nil { return fmt.Errorf("failed to set event data: %w", err) } + return m.publishEvent(ctx, event) +} + +// publishEvent routes a fully-constructed event through the engine router and +// emits the appropriate telemetry event on success or failure. Both Publish and +// PublishCloudEvent delegate here so the timing, error-formatting, and emitted +// fields stay in one place. +func (m *EventBusModule) publishEvent(ctx context.Context, event Event) error { + topic := event.Type() startTime := time.Now() err := m.router.Publish(ctx, event) duration := time.Since(startTime) if err != nil { - // Emit message failed event go m.emitEvent(ctx, EventTypeMessageFailed, map[string]interface{}{ "topic": topic, "error": err.Error(), "duration_ms": duration.Milliseconds(), }) - return fmt.Errorf("publishing event to topic %s: %w", topic, err) } - // Emit message published event go m.emitEvent(ctx, EventTypeMessagePublished, map[string]interface{}{ "topic": topic, "duration_ms": duration.Milliseconds(), }) - return nil } diff --git a/modules/eventbus/module_test.go b/modules/eventbus/module_test.go index 07ed7a80..135d159f 100644 --- a/modules/eventbus/module_test.go +++ b/modules/eventbus/module_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/CrisisTextLine/modular" + cevent "github.com/cloudevents/sdk-go/v2/event" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -374,3 +375,80 @@ func TestEventBusServiceProvider(t *testing.T) { required := module.RequiresServices() assert.Empty(t, required) } + +func TestPublishCloudEvent_RoutesPreBuiltCloudEvent(t *testing.T) { + // Setup: create and start the module (same pattern as TestEventBusOperations) + module := NewModule().(*EventBusModule) + app := newMockApp() + err := module.RegisterConfig(app) + require.NoError(t, err) + err = module.Init(app) + require.NoError(t, err) + ctx := context.Background() + err = module.Start(ctx) + require.NoError(t, err) + defer func() { + require.NoError(t, module.Stop(ctx)) + }() + + // Create a pre-built CloudEvent with custom extensions + event := cevent.New() + event.SetType("test.event") + event.SetSource("/test/source") + event.SetID("test-id-123") + event.SetTime(time.Now()) + err = event.SetData("application/json", map[string]string{"key": "value"}) + require.NoError(t, err) + event.SetExtension("encryption", "aes-256-gcm") + event.SetExtension("encryptedfields", `["key"]`) + + // Subscribe to capture the event + eventReceived := make(chan Event, 1) + subscription, err := module.Subscribe(ctx, "test.event", func(ctx context.Context, e Event) error { + eventReceived <- e + return nil + }) + require.NoError(t, err) + defer func() { + require.NoError(t, subscription.Cancel()) + }() + + // Act + err = module.PublishCloudEvent(ctx, event) + require.NoError(t, err) + + // Assert + select { + case received := <-eventReceived: + assert.Equal(t, "test.event", received.Type()) + assert.Equal(t, "/test/source", received.Source()) + assert.Equal(t, "test-id-123", received.ID()) + // Extensions should be preserved (not overwritten by module) + assert.Equal(t, "aes-256-gcm", received.Extensions()["encryption"]) + assert.Equal(t, `["key"]`, received.Extensions()["encryptedfields"]) + case <-time.After(5 * time.Second): + t.Fatal("Event not received within timeout") + } +} + +func TestPublishCloudEvent_ReturnsErrorWhenBusStopped(t *testing.T) { + module := NewModule().(*EventBusModule) + app := newMockApp() + err := module.RegisterConfig(app) + require.NoError(t, err) + err = module.Init(app) + require.NoError(t, err) + ctx := context.Background() + err = module.Start(ctx) + require.NoError(t, err) + require.NoError(t, module.Stop(ctx)) + + event := cevent.New() + event.SetType("test.event") + event.SetSource("/test/source") + event.SetID("test-id-456") + + err = module.PublishCloudEvent(ctx, event) + require.Error(t, err) + assert.ErrorContains(t, err, "test.event") +}