diff --git a/internal/broker_module_test.go b/internal/broker_module_test.go index 16a32f7..c5088c6 100644 --- a/internal/broker_module_test.go +++ b/internal/broker_module_test.go @@ -5,8 +5,6 @@ import ( "sync" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) func TestNewBrokerModule(t *testing.T) { @@ -129,6 +127,8 @@ func TestBrokerModule_StartStop(t *testing.T) { } func TestBrokerModule_EnsureStream(t *testing.T) { + // Use generate transport so ensureStream can build a valid Bento stream. + // count=0 means infinite generation; the stream will be stopped at the end. m, _ := newBrokerModule("test-broker", map[string]any{ "transport": "generate", "transport_config": map[string]any{ @@ -207,6 +207,7 @@ func TestBrokerModule_EnsureStream(t *testing.T) { } func TestBrokerModule_ConcurrentEnsureStream(t *testing.T) { + // Use generate transport so ensureStream can build a valid Bento stream. m, _ := newBrokerModule("test-broker", map[string]any{ "transport": "generate", "transport_config": map[string]any{ @@ -279,6 +280,7 @@ func TestBrokerModule_ConcurrentEnsureStream(t *testing.T) { } func TestBrokerModule_EnsureStreamWithoutPublisher(t *testing.T) { + // Use generate transport so ensureStream can build a valid Bento stream. m, _ := newBrokerModule("test-broker", map[string]any{ "transport": "generate", "transport_config": map[string]any{ diff --git a/internal/input_module_test.go b/internal/input_module_test.go index 8ae8838..fdb7f46 100644 --- a/internal/input_module_test.go +++ b/internal/input_module_test.go @@ -5,8 +5,6 @@ import ( "sync" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) // mockMessagePublisher captures published messages for testing. @@ -191,7 +189,7 @@ func TestInputModule_PublishMessages(t *testing.T) { "target_topic": "test-topic", "input": map[string]any{ "generate": map[string]any{ - "mapping": `root = {"id": count("c"), "msg": "hello"}`, + "mapping": `root = {"id": count("input_id"), "msg": "hello"}`, "count": 3, "interval": "10ms", }, @@ -209,14 +207,21 @@ func TestInputModule_PublishMessages(t *testing.T) { t.Fatalf("Start() error = %v", err) } - // Wait for messages to be published - time.Sleep(500 * time.Millisecond) + // Poll until 3 messages are published or deadline is reached. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if len(pub.GetMessages()) >= 3 { + break + } + time.Sleep(10 * time.Millisecond) + } stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - // Stop may report "stream has not been run yet" if generate already finished - _ = m.Stop(stopCtx) + if err := m.Stop(stopCtx); err != nil { + t.Errorf("Stop() error = %v", err) + } messages := pub.GetMessages() if len(messages) != 3 { diff --git a/internal/output_module_test.go b/internal/output_module_test.go index 835eec1..add2a61 100644 --- a/internal/output_module_test.go +++ b/internal/output_module_test.go @@ -5,8 +5,6 @@ import ( "sync" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) // mockMessageSubscriber captures subscriptions and allows simulating message delivery. diff --git a/internal/plugin_test.go b/internal/plugin_test.go index a9dd71f..706be2b 100644 --- a/internal/plugin_test.go +++ b/internal/plugin_test.go @@ -75,35 +75,30 @@ func TestBentoPlugin_CreateModule(t *testing.T) { typeName string name string config map[string]any - wantType string wantErr bool }{ { typeName: "bento.stream", name: "test-stream", config: map[string]any{}, - wantType: "*internal.streamModule", wantErr: false, }, { typeName: "bento.input", name: "test-input", config: map[string]any{}, - wantType: "*internal.inputModule", wantErr: false, }, { typeName: "bento.output", name: "test-output", config: map[string]any{}, - wantType: "*internal.outputModule", wantErr: false, }, { typeName: "bento.broker", name: "test-broker", config: map[string]any{}, - wantType: "*internal.brokerModule", wantErr: false, }, { diff --git a/internal/processor_step_test.go b/internal/processor_step_test.go index 2ba3ea7..0879245 100644 --- a/internal/processor_step_test.go +++ b/internal/processor_step_test.go @@ -3,10 +3,9 @@ package internal import ( "context" "encoding/json" + "errors" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) func TestNewProcessorStep(t *testing.T) { @@ -181,7 +180,7 @@ func TestProcessorStep_ExecuteWithInvalidBloblang(t *testing.T) { return } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() result, err := s.Execute(ctx, map[string]any{"data": "test"}, nil, map[string]any{}, nil) @@ -256,9 +255,8 @@ func TestProcessorStep_ExecuteWithContextCancel(t *testing.T) { if err == nil { t.Error("Execute() expected error with cancelled context") - } - if err != context.Canceled { - t.Logf("got error: %v", err) + } else if !errors.Is(err, context.Canceled) { + t.Errorf("Execute() expected context.Canceled, got: %v", err) } } diff --git a/internal/stream_module_test.go b/internal/stream_module_test.go index 9c1466c..3a850a4 100644 --- a/internal/stream_module_test.go +++ b/internal/stream_module_test.go @@ -5,8 +5,6 @@ import ( "errors" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) func TestNewStreamModule(t *testing.T) { diff --git a/internal/testmain_test.go b/internal/testmain_test.go new file mode 100644 index 0000000..ff4882e --- /dev/null +++ b/internal/testmain_test.go @@ -0,0 +1,7 @@ +package internal + +import ( + // Register pure Bento components (e.g. generate input, bloblang processor) + // so that tests can use them without importing the full component suite. + _ "github.com/warpstreamlabs/bento/v4/public/components/pure" +) diff --git a/internal/trigger_test.go b/internal/trigger_test.go index 41e773d..0439bea 100644 --- a/internal/trigger_test.go +++ b/internal/trigger_test.go @@ -6,8 +6,6 @@ import ( "sync" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) // mockTriggerCallback captures trigger invocations for testing. @@ -172,7 +170,7 @@ func TestBentoTrigger_StartStop(t *testing.T) { map[string]any{ "input": map[string]any{ "generate": map[string]any{ - "mapping": `root = {"id": count("c")}`, + "mapping": `root = {"id": count("trigger_id")}`, "count": 2, "interval": "50ms", }, @@ -191,8 +189,14 @@ func TestBentoTrigger_StartStop(t *testing.T) { t.Fatalf("Start() error = %v", err) } - // Wait for messages to be processed - time.Sleep(300 * time.Millisecond) + // Poll until 2 callbacks are observed or deadline is reached. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if len(cb.GetCalls()) >= 2 { + break + } + time.Sleep(10 * time.Millisecond) + } stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -255,8 +259,17 @@ func TestBentoTrigger_MultipleSubscriptions(t *testing.T) { t.Fatalf("Start() error = %v", err) } - // Wait for all messages - time.Sleep(200 * time.Millisecond) + // Wait until both subscriptions have invoked the callback, or time out. + deadline := time.Now().Add(2 * time.Second) + for { + if len(cb.GetCalls()) >= 2 { + break + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for 2 callback invocations, got %d", len(cb.GetCalls())) + } + time.Sleep(10 * time.Millisecond) + } stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -317,10 +330,22 @@ func TestBentoTrigger_NoSubscriptions(t *testing.T) { } func TestBentoTrigger_CallbackError(t *testing.T) { - // Callback that returns error - var callbackErr error + // Callback that errors on the first invocation and succeeds on subsequent + // ones. This exercises the error path while still allowing the stream to + // terminate cleanly (Bento retries the NACKed message once, then continues). + var ( + cbMu sync.Mutex + cbCount int + ) errorCb := func(action string, data map[string]any) error { - return callbackErr + cbMu.Lock() + cbCount++ + count := cbCount + cbMu.Unlock() + if count == 1 { + return errors.New("first callback error") + } + return nil } trigger, err := newBentoTrigger(map[string]any{ @@ -328,8 +353,9 @@ func TestBentoTrigger_CallbackError(t *testing.T) { map[string]any{ "input": map[string]any{ "generate": map[string]any{ - "mapping": `root = {"test": "data"}`, - "count": 1, + "mapping": `root = {"test": "data"}`, + "count": 3, + "interval": "10ms", }, }, "workflow": "test", @@ -340,22 +366,37 @@ func TestBentoTrigger_CallbackError(t *testing.T) { t.Fatalf("newBentoTrigger() error = %v", err) } - callbackErr = nil // First call succeeds - ctx := context.Background() if err := trigger.Start(ctx); err != nil { t.Fatalf("Start() error = %v", err) } - time.Sleep(100 * time.Millisecond) + // Poll until at least 2 callbacks are observed (1 error + 1 success). + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + cbMu.Lock() + count := cbCount + cbMu.Unlock() + if count >= 2 { + break + } + time.Sleep(10 * time.Millisecond) + } - stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - // Should stop cleanly even if callback had errors + // Should stop cleanly even when a callback returned an error. if err := trigger.Stop(stopCtx); err != nil { t.Errorf("Stop() error = %v", err) } + + cbMu.Lock() + count := cbCount + cbMu.Unlock() + if count < 2 { + t.Errorf("expected callback to be invoked at least twice (1 error + 1 success), got %d", count) + } } func TestBentoTrigger_StopWithoutStart(t *testing.T) {