Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/broker_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"sync"
"testing"
"time"

_ "github.com/warpstreamlabs/bento/v4/public/components/pure"
)

func TestNewBrokerModule(t *testing.T) {
Expand Down Expand Up @@ -129,6 +127,8 @@ func TestBrokerModule_StartStop(t *testing.T) {
}

func TestBrokerModule_EnsureStream(t *testing.T) {
// Use generate transport so ensureStream can build a valid Bento stream.
// count=0 means infinite generation; the stream will be stopped at the end.
m, _ := newBrokerModule("test-broker", map[string]any{
"transport": "generate",
"transport_config": map[string]any{
Expand Down Expand Up @@ -207,6 +207,7 @@ func TestBrokerModule_EnsureStream(t *testing.T) {
}

func TestBrokerModule_ConcurrentEnsureStream(t *testing.T) {
// Use generate transport so ensureStream can build a valid Bento stream.
m, _ := newBrokerModule("test-broker", map[string]any{
"transport": "generate",
"transport_config": map[string]any{
Expand Down Expand Up @@ -279,6 +280,7 @@ func TestBrokerModule_ConcurrentEnsureStream(t *testing.T) {
}

func TestBrokerModule_EnsureStreamWithoutPublisher(t *testing.T) {
// Use generate transport so ensureStream can build a valid Bento stream.
m, _ := newBrokerModule("test-broker", map[string]any{
"transport": "generate",
"transport_config": map[string]any{
Expand Down
19 changes: 12 additions & 7 deletions internal/input_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"sync"
"testing"
"time"

_ "github.com/warpstreamlabs/bento/v4/public/components/pure"
)

// mockMessagePublisher captures published messages for testing.
Expand Down Expand Up @@ -191,7 +189,7 @@ func TestInputModule_PublishMessages(t *testing.T) {
"target_topic": "test-topic",
"input": map[string]any{
"generate": map[string]any{
"mapping": `root = {"id": count("c"), "msg": "hello"}`,
"mapping": `root = {"id": count("input_id"), "msg": "hello"}`,
"count": 3,
"interval": "10ms",
},
Expand All @@ -209,14 +207,21 @@ func TestInputModule_PublishMessages(t *testing.T) {
t.Fatalf("Start() error = %v", err)
}

// Wait for messages to be published
time.Sleep(500 * time.Millisecond)
// Poll until 3 messages are published or deadline is reached.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if len(pub.GetMessages()) >= 3 {
break
}
time.Sleep(10 * time.Millisecond)
}

stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

// Stop may report "stream has not been run yet" if generate already finished
_ = m.Stop(stopCtx)
if err := m.Stop(stopCtx); err != nil {
t.Errorf("Stop() error = %v", err)
}

messages := pub.GetMessages()
if len(messages) != 3 {
Expand Down
2 changes: 0 additions & 2 deletions internal/output_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"sync"
"testing"
"time"

_ "github.com/warpstreamlabs/bento/v4/public/components/pure"
)

// mockMessageSubscriber captures subscriptions and allows simulating message delivery.
Expand Down
5 changes: 0 additions & 5 deletions internal/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,35 +75,30 @@ func TestBentoPlugin_CreateModule(t *testing.T) {
typeName string
name string
config map[string]any
wantType string
wantErr bool
}{
{
typeName: "bento.stream",
name: "test-stream",
config: map[string]any{},
wantType: "*internal.streamModule",
wantErr: false,
},
{
typeName: "bento.input",
name: "test-input",
config: map[string]any{},
wantType: "*internal.inputModule",
wantErr: false,
},
{
typeName: "bento.output",
name: "test-output",
config: map[string]any{},
wantType: "*internal.outputModule",
wantErr: false,
},
{
typeName: "bento.broker",
name: "test-broker",
config: map[string]any{},
wantType: "*internal.brokerModule",
wantErr: false,
},
{
Expand Down
10 changes: 4 additions & 6 deletions internal/processor_step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package internal
import (
"context"
"encoding/json"
"errors"
"testing"
"time"

_ "github.com/warpstreamlabs/bento/v4/public/components/pure"
)

func TestNewProcessorStep(t *testing.T) {
Expand Down Expand Up @@ -181,7 +180,7 @@ func TestProcessorStep_ExecuteWithInvalidBloblang(t *testing.T) {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

result, err := s.Execute(ctx, map[string]any{"data": "test"}, nil, map[string]any{}, nil)
Expand Down Expand Up @@ -256,9 +255,8 @@ func TestProcessorStep_ExecuteWithContextCancel(t *testing.T) {

if err == nil {
t.Error("Execute() expected error with cancelled context")
}
if err != context.Canceled {
t.Logf("got error: %v", err)
} else if !errors.Is(err, context.Canceled) {
t.Errorf("Execute() expected context.Canceled, got: %v", err)
}
}

Expand Down
2 changes: 0 additions & 2 deletions internal/stream_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"errors"
"testing"
"time"

_ "github.com/warpstreamlabs/bento/v4/public/components/pure"
)

func TestNewStreamModule(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions internal/testmain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package internal

import (
// Register pure Bento components (e.g. generate input, bloblang processor)
// so that tests can use them without importing the full component suite.
_ "github.com/warpstreamlabs/bento/v4/public/components/pure"
)
75 changes: 58 additions & 17 deletions internal/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"testing"
"time"

_ "github.com/warpstreamlabs/bento/v4/public/components/pure"
)

// mockTriggerCallback captures trigger invocations for testing.
Expand Down Expand Up @@ -172,7 +170,7 @@ func TestBentoTrigger_StartStop(t *testing.T) {
map[string]any{
"input": map[string]any{
"generate": map[string]any{
"mapping": `root = {"id": count("c")}`,
"mapping": `root = {"id": count("trigger_id")}`,
"count": 2,
"interval": "50ms",
},
Expand All @@ -191,8 +189,14 @@ func TestBentoTrigger_StartStop(t *testing.T) {
t.Fatalf("Start() error = %v", err)
}

// Wait for messages to be processed
time.Sleep(300 * time.Millisecond)
// Poll until 2 callbacks are observed or deadline is reached.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if len(cb.GetCalls()) >= 2 {
break
}
time.Sleep(10 * time.Millisecond)
}

stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down Expand Up @@ -255,8 +259,17 @@ func TestBentoTrigger_MultipleSubscriptions(t *testing.T) {
t.Fatalf("Start() error = %v", err)
}

// Wait for all messages
time.Sleep(200 * time.Millisecond)
// Wait until both subscriptions have invoked the callback, or time out.
deadline := time.Now().Add(2 * time.Second)
for {
if len(cb.GetCalls()) >= 2 {
break
}
if time.Now().After(deadline) {
t.Fatalf("timed out waiting for 2 callback invocations, got %d", len(cb.GetCalls()))
}
time.Sleep(10 * time.Millisecond)
}

stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down Expand Up @@ -317,19 +330,32 @@ func TestBentoTrigger_NoSubscriptions(t *testing.T) {
}

func TestBentoTrigger_CallbackError(t *testing.T) {
// Callback that returns error
var callbackErr error
// Callback that errors on the first invocation and succeeds on subsequent
// ones. This exercises the error path while still allowing the stream to
// terminate cleanly (Bento retries the NACKed message once, then continues).
var (
cbMu sync.Mutex
cbCount int
)
errorCb := func(action string, data map[string]any) error {
return callbackErr
cbMu.Lock()
cbCount++
count := cbCount
cbMu.Unlock()
if count == 1 {
return errors.New("first callback error")
}
return nil
}

trigger, err := newBentoTrigger(map[string]any{
"subscriptions": []any{
map[string]any{
"input": map[string]any{
"generate": map[string]any{
"mapping": `root = {"test": "data"}`,
"count": 1,
"mapping": `root = {"test": "data"}`,
"count": 3,
"interval": "10ms",
},
},
"workflow": "test",
Expand All @@ -340,22 +366,37 @@ func TestBentoTrigger_CallbackError(t *testing.T) {
t.Fatalf("newBentoTrigger() error = %v", err)
}

callbackErr = nil // First call succeeds

ctx := context.Background()
if err := trigger.Start(ctx); err != nil {
t.Fatalf("Start() error = %v", err)
}

time.Sleep(100 * time.Millisecond)
// Poll until at least 2 callbacks are observed (1 error + 1 success).
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
cbMu.Lock()
count := cbCount
cbMu.Unlock()
if count >= 2 {
break
}
time.Sleep(10 * time.Millisecond)
}

stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Should stop cleanly even if callback had errors
// Should stop cleanly even when a callback returned an error.
if err := trigger.Stop(stopCtx); err != nil {
t.Errorf("Stop() error = %v", err)
}

cbMu.Lock()
count := cbCount
cbMu.Unlock()
if count < 2 {
t.Errorf("expected callback to be invoked at least twice (1 error + 1 success), got %d", count)
}
}

func TestBentoTrigger_StopWithoutStart(t *testing.T) {
Expand Down