Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/eventbus/durable_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (d *DurableMemoryEventBus) Start(ctx context.Context) error {
if d.isStarted {
return nil
}
d.ctx, d.cancel = context.WithCancel(ctx)
d.ctx, d.cancel = context.WithCancel(ctx) //nolint:gosec // G118: cancel is stored on struct and called in Stop()
d.isStarted = true
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion modules/eventbus/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (k *KafkaEventBus) Start(ctx context.Context) error {
return nil
}

k.ctx, k.cancel = context.WithCancel(ctx)
k.ctx, k.cancel = context.WithCancel(ctx) //nolint:gosec // G118: cancel is stored on struct and called in Stop()
k.isStarted = true
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion modules/eventbus/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (k *KinesisEventBus) Start(ctx context.Context) error {
if k.config.PollInterval <= 0 {
k.config.PollInterval = DefaultKinesisPollInterval
}
k.ctx, k.cancel = context.WithCancel(ctx)
k.ctx, k.cancel = context.WithCancel(ctx) //nolint:gosec // G118: cancel is stored on struct and called in Stop()
k.isStarted = true
return nil
}
Expand Down
25 changes: 21 additions & 4 deletions modules/eventbus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,18 @@ func (m *EventBusModule) Constructor() modular.ModuleConstructor {
}
}

// PublishCloudEvent publishes a pre-built CloudEvents event to the event bus.
// Unlike Publish, this method does not wrap the payload in a new CloudEvent —
// it routes the provided event directly. This allows callers to set custom
// CloudEvent extensions (e.g., encryption metadata) before publishing.
func (m *EventBusModule) PublishCloudEvent(ctx context.Context, event Event) error {
// Clone the event so the module owns an immutable snapshot for delivery.
// This prevents the caller from mutating the event (or its Extensions map /
// data slice) after PublishCloudEvent returns, which could otherwise cause
// data races with async subscribers that process the event concurrently.
return m.publishEvent(ctx, event.Clone())
}

// Publish publishes an event to the event bus.
// Creates an Event struct with the provided type and payload, then
// sends it through the event bus for processing by subscribers.
Expand All @@ -445,26 +457,31 @@ func (m *EventBusModule) Publish(ctx context.Context, topic string, payload inte
if err := event.SetData("application/json", payload); err != nil {
return fmt.Errorf("failed to set event data: %w", err)
}
return m.publishEvent(ctx, event)
}

// publishEvent routes a fully-constructed event through the engine router and
// emits the appropriate telemetry event on success or failure. Both Publish and
// PublishCloudEvent delegate here so the timing, error-formatting, and emitted
// fields stay in one place.
func (m *EventBusModule) publishEvent(ctx context.Context, event Event) error {
topic := event.Type()
startTime := time.Now()
err := m.router.Publish(ctx, event)
duration := time.Since(startTime)
if err != nil {
// Emit message failed event
go m.emitEvent(ctx, EventTypeMessageFailed, map[string]interface{}{
"topic": topic,
"error": err.Error(),
"duration_ms": duration.Milliseconds(),
})

return fmt.Errorf("publishing event to topic %s: %w", topic, err)
}

// Emit message published event
go m.emitEvent(ctx, EventTypeMessagePublished, map[string]interface{}{
"topic": topic,
"duration_ms": duration.Milliseconds(),
})

return nil
}

Expand Down
78 changes: 78 additions & 0 deletions modules/eventbus/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/CrisisTextLine/modular"
cevent "github.com/cloudevents/sdk-go/v2/event"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -374,3 +375,80 @@ func TestEventBusServiceProvider(t *testing.T) {
required := module.RequiresServices()
assert.Empty(t, required)
}

func TestPublishCloudEvent_RoutesPreBuiltCloudEvent(t *testing.T) {
// Setup: create and start the module (same pattern as TestEventBusOperations)
module := NewModule().(*EventBusModule)
app := newMockApp()
err := module.RegisterConfig(app)
require.NoError(t, err)
err = module.Init(app)
require.NoError(t, err)
ctx := context.Background()
err = module.Start(ctx)
require.NoError(t, err)
defer func() {
require.NoError(t, module.Stop(ctx))
}()

// Create a pre-built CloudEvent with custom extensions
event := cevent.New()
event.SetType("test.event")
event.SetSource("/test/source")
event.SetID("test-id-123")
event.SetTime(time.Now())
err = event.SetData("application/json", map[string]string{"key": "value"})
require.NoError(t, err)
event.SetExtension("encryption", "aes-256-gcm")
event.SetExtension("encryptedfields", `["key"]`)

// Subscribe to capture the event
eventReceived := make(chan Event, 1)
subscription, err := module.Subscribe(ctx, "test.event", func(ctx context.Context, e Event) error {
eventReceived <- e
return nil
})
require.NoError(t, err)
defer func() {
require.NoError(t, subscription.Cancel())
}()

// Act
err = module.PublishCloudEvent(ctx, event)
require.NoError(t, err)

// Assert
select {
case received := <-eventReceived:
assert.Equal(t, "test.event", received.Type())
assert.Equal(t, "/test/source", received.Source())
assert.Equal(t, "test-id-123", received.ID())
// Extensions should be preserved (not overwritten by module)
assert.Equal(t, "aes-256-gcm", received.Extensions()["encryption"])
assert.Equal(t, `["key"]`, received.Extensions()["encryptedfields"])
case <-time.After(5 * time.Second):
t.Fatal("Event not received within timeout")
}
}

func TestPublishCloudEvent_ReturnsErrorWhenBusStopped(t *testing.T) {
module := NewModule().(*EventBusModule)
app := newMockApp()
err := module.RegisterConfig(app)
require.NoError(t, err)
err = module.Init(app)
require.NoError(t, err)
ctx := context.Background()
err = module.Start(ctx)
require.NoError(t, err)
require.NoError(t, module.Stop(ctx))

event := cevent.New()
event.SetType("test.event")
event.SetSource("/test/source")
event.SetID("test-id-456")

err = module.PublishCloudEvent(ctx, event)
require.Error(t, err)
assert.ErrorContains(t, err, "test.event")
}
Loading