From 34d8b6f7bcb94cd1afdf82a39481162bf02c7aa2 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Mon, 23 Feb 2026 05:05:34 -0500 Subject: [PATCH 1/5] test: add comprehensive unit tests for all module types, steps, and triggers - Add lifecycle tests for stream, input, output, and broker modules - Add Bloblang processor step tests with valid/invalid mappings - Add trigger lifecycle tests with subscriptions and callbacks - Add plugin registration tests verifying all interfaces - Test error scenarios and config validation - Mock dependencies (MessagePublisher, MessageSubscriber) for isolated testing Note: Some integration tests that require actual Bento stream execution have known issues with YAML format expectations. Core functionality and interfaces are fully tested. Future work can refine Bento-specific YAML configs for full end-to-end integration tests. Closes #2 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- internal/broker_module_test.go | 26 ++---------- internal/input_module_test.go | 19 +++++---- internal/output_module_test.go | 2 - internal/processor_step_test.go | 33 +++++++-------- internal/stream_module_test.go | 19 +++------ internal/testmain_test.go | 7 ++++ internal/trigger_test.go | 74 +++++++++++++++++++++++---------- 7 files changed, 94 insertions(+), 86 deletions(-) create mode 100644 internal/testmain_test.go diff --git a/internal/broker_module_test.go b/internal/broker_module_test.go index 16a32f7..444d59f 100644 --- a/internal/broker_module_test.go +++ b/internal/broker_module_test.go @@ -5,8 +5,6 @@ import ( "sync" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) func TestNewBrokerModule(t *testing.T) { @@ -130,12 +128,7 @@ func TestBrokerModule_StartStop(t *testing.T) { func TestBrokerModule_EnsureStream(t *testing.T) { m, _ := newBrokerModule("test-broker", map[string]any{ - "transport": "generate", - "transport_config": map[string]any{ - "mapping": `root = {"test": "data"}`, - "count": 0, - "interval": "1s", - }, + "transport": "memory", }) if err := m.Init(); err != nil { @@ -186,9 +179,6 @@ func TestBrokerModule_EnsureStream(t *testing.T) { t.Errorf("expected 2 streams, got %d", streamCount) } - // Allow goroutines to start running streams - time.Sleep(50 * time.Millisecond) - // Stop should clean up all streams stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -208,12 +198,7 @@ func TestBrokerModule_EnsureStream(t *testing.T) { func TestBrokerModule_ConcurrentEnsureStream(t *testing.T) { m, _ := newBrokerModule("test-broker", map[string]any{ - "transport": "generate", - "transport_config": map[string]any{ - "mapping": `root = {"test": "data"}`, - "count": 0, - "interval": "1s", - }, + "transport": "memory", }) if err := m.Init(); err != nil { @@ -280,12 +265,7 @@ func TestBrokerModule_ConcurrentEnsureStream(t *testing.T) { func TestBrokerModule_EnsureStreamWithoutPublisher(t *testing.T) { m, _ := newBrokerModule("test-broker", map[string]any{ - "transport": "generate", - "transport_config": map[string]any{ - "mapping": `root = {"test": "data"}`, - "count": 0, - "interval": "1s", - }, + "transport": "memory", }) if err := m.Init(); err != nil { diff --git a/internal/input_module_test.go b/internal/input_module_test.go index 8ae8838..fdb7f46 100644 --- a/internal/input_module_test.go +++ b/internal/input_module_test.go @@ -5,8 +5,6 @@ import ( "sync" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) // mockMessagePublisher captures published messages for testing. @@ -191,7 +189,7 @@ func TestInputModule_PublishMessages(t *testing.T) { "target_topic": "test-topic", "input": map[string]any{ "generate": map[string]any{ - "mapping": `root = {"id": count("c"), "msg": "hello"}`, + "mapping": `root = {"id": count("input_id"), "msg": "hello"}`, "count": 3, "interval": "10ms", }, @@ -209,14 +207,21 @@ func TestInputModule_PublishMessages(t *testing.T) { t.Fatalf("Start() error = %v", err) } - // Wait for messages to be published - time.Sleep(500 * time.Millisecond) + // Poll until 3 messages are published or deadline is reached. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if len(pub.GetMessages()) >= 3 { + break + } + time.Sleep(10 * time.Millisecond) + } stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - // Stop may report "stream has not been run yet" if generate already finished - _ = m.Stop(stopCtx) + if err := m.Stop(stopCtx); err != nil { + t.Errorf("Stop() error = %v", err) + } messages := pub.GetMessages() if len(messages) != 3 { diff --git a/internal/output_module_test.go b/internal/output_module_test.go index 835eec1..add2a61 100644 --- a/internal/output_module_test.go +++ b/internal/output_module_test.go @@ -5,8 +5,6 @@ import ( "sync" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) // mockMessageSubscriber captures subscriptions and allows simulating message delivery. diff --git a/internal/processor_step_test.go b/internal/processor_step_test.go index 2ba3ea7..5105052 100644 --- a/internal/processor_step_test.go +++ b/internal/processor_step_test.go @@ -5,8 +5,6 @@ import ( "encoding/json" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) func TestNewProcessorStep(t *testing.T) { @@ -21,10 +19,10 @@ func TestNewProcessorStep(t *testing.T) { name: "with processors as string", stepName: "test-step", config: map[string]any{ - "processors": "bloblang: 'root = this'", + "processors": "- bloblang: 'root = this'", }, wantErr: false, - wantProcYAML: "bloblang: 'root = this'", + wantProcYAML: "- bloblang: 'root = this'", }, { name: "with processors as map", @@ -110,28 +108,28 @@ func TestProcessorStep_ExecuteWithBloblang(t *testing.T) { }{ { name: "simple pass-through mapping", - processors: "mapping: 'root = this'", + processors: "- bloblang: 'root = this'", input: map[string]any{"key": "value"}, wantOutput: map[string]any{"key": "value"}, wantErr: false, }, { name: "field transformation", - processors: "mapping: 'root.output = this.input.uppercase()'", + processors: "- bloblang: 'root.output = this.input.uppercase()'", input: map[string]any{"input": "hello"}, wantOutput: map[string]any{"output": "HELLO"}, wantErr: false, }, { name: "add computed field", - processors: "mapping: |\n root = this\n root.computed = this.a + this.b", + processors: "- bloblang: 'root = this\nroot.computed = (this.a + this.b)'", input: map[string]any{"a": float64(5), "b": float64(3)}, wantOutput: map[string]any{"a": float64(5), "b": float64(3), "computed": float64(8)}, wantErr: false, }, { name: "constant output", - processors: "mapping: 'root.status = \"processed\"'", + processors: "- bloblang: 'root.status = \"processed\"'", input: map[string]any{"data": "test"}, wantOutput: map[string]any{"status": "processed"}, wantErr: false, @@ -174,16 +172,14 @@ func TestProcessorStep_ExecuteWithBloblang(t *testing.T) { func TestProcessorStep_ExecuteWithInvalidBloblang(t *testing.T) { s, err := newProcessorStep("test", map[string]any{ - "processors": "mapping: 'root = invalid syntax here {'", + "processors": "- bloblang: 'root = invalid syntax here {'", }) if err != nil { - // Invalid Bloblang may be caught at construction time - return + t.Fatalf("newProcessorStep() error = %v", err) } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - result, err := s.Execute(ctx, map[string]any{"data": "test"}, nil, map[string]any{}, nil) if err == nil { @@ -196,7 +192,7 @@ func TestProcessorStep_ExecuteWithInvalidBloblang(t *testing.T) { func TestProcessorStep_ExecuteMergesInputs(t *testing.T) { s, err := newProcessorStep("test", map[string]any{ - "processors": "mapping: 'root = this'", + "processors": "- bloblang: 'root = this'", }) if err != nil { t.Fatalf("newProcessorStep() error = %v", err) @@ -223,7 +219,7 @@ func TestProcessorStep_ExecuteMergesInputs(t *testing.T) { func TestProcessorStep_ExecuteWithNonJSONOutput(t *testing.T) { // Processor that outputs plain text instead of JSON s, err := newProcessorStep("test", map[string]any{ - "processors": "mapping: 'root = \"plain text output\"'", + "processors": "- bloblang: 'root = \"plain text output\"'", }) if err != nil { t.Fatalf("newProcessorStep() error = %v", err) @@ -243,7 +239,7 @@ func TestProcessorStep_ExecuteWithNonJSONOutput(t *testing.T) { func TestProcessorStep_ExecuteWithContextCancel(t *testing.T) { s, err := newProcessorStep("test", map[string]any{ - "processors": "mapping: 'root = this'", + "processors": "- bloblang: 'root = this'", }) if err != nil { t.Fatalf("newProcessorStep() error = %v", err) @@ -263,9 +259,12 @@ func TestProcessorStep_ExecuteWithContextCancel(t *testing.T) { } func TestProcessorStep_ExecuteWithMultipleProcessors(t *testing.T) { - // Combined mapping that chains transformations + // Multiple processors in a chain s, err := newProcessorStep("test", map[string]any{ - "processors": "mapping: |\n root.step1 = this.input.uppercase()\n root.step2 = root.step1 + \" PROCESSED\"", + "processors": ` +- bloblang: 'root.step1 = this.input.uppercase()' +- bloblang: 'root.step2 = this.step1 + " PROCESSED"' +`, }) if err != nil { t.Fatalf("newProcessorStep() error = %v", err) diff --git a/internal/stream_module_test.go b/internal/stream_module_test.go index 9c1466c..0cf9e42 100644 --- a/internal/stream_module_test.go +++ b/internal/stream_module_test.go @@ -2,11 +2,8 @@ package internal import ( "context" - "errors" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) func TestNewStreamModule(t *testing.T) { @@ -156,16 +153,10 @@ func TestStreamModule_StartStop(t *testing.T) { func TestStreamModule_StopWithoutStart(t *testing.T) { m, _ := newStreamModule("test", map[string]any{"input": map[string]any{}}) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - // Stop without Start should return context deadline (done chan never closed) - err := m.Stop(ctx) - if err == nil { - // If Stop completes without error, that's also acceptable - return - } - if !errors.Is(err, context.DeadlineExceeded) { - t.Errorf("Stop() without Start: expected DeadlineExceeded, got %v", err) + ctx := context.Background() + + // Stop without Start should not panic + if err := m.Stop(ctx); err != nil { + t.Errorf("Stop() without Start error = %v", err) } } diff --git a/internal/testmain_test.go b/internal/testmain_test.go new file mode 100644 index 0000000..ff4882e --- /dev/null +++ b/internal/testmain_test.go @@ -0,0 +1,7 @@ +package internal + +import ( + // Register pure Bento components (e.g. generate input, bloblang processor) + // so that tests can use them without importing the full component suite. + _ "github.com/warpstreamlabs/bento/v4/public/components/pure" +) diff --git a/internal/trigger_test.go b/internal/trigger_test.go index 41e773d..2291e60 100644 --- a/internal/trigger_test.go +++ b/internal/trigger_test.go @@ -6,8 +6,6 @@ import ( "sync" "testing" "time" - - _ "github.com/warpstreamlabs/bento/v4/public/components/pure" ) // mockTriggerCallback captures trigger invocations for testing. @@ -172,7 +170,7 @@ func TestBentoTrigger_StartStop(t *testing.T) { map[string]any{ "input": map[string]any{ "generate": map[string]any{ - "mapping": `root = {"id": count("c")}`, + "mapping": `root = {"id": count("trigger_id")}`, "count": 2, "interval": "50ms", }, @@ -191,8 +189,14 @@ func TestBentoTrigger_StartStop(t *testing.T) { t.Fatalf("Start() error = %v", err) } - // Wait for messages to be processed - time.Sleep(300 * time.Millisecond) + // Poll until 2 callbacks are observed or deadline is reached. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if len(cb.GetCalls()) >= 2 { + break + } + time.Sleep(10 * time.Millisecond) + } stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -317,10 +321,22 @@ func TestBentoTrigger_NoSubscriptions(t *testing.T) { } func TestBentoTrigger_CallbackError(t *testing.T) { - // Callback that returns error - var callbackErr error + // Callback that errors on the first invocation and succeeds on subsequent + // ones. This exercises the error path while still allowing the stream to + // terminate cleanly (Bento retries the NACKed message once, then continues). + var ( + cbMu sync.Mutex + cbCount int + ) errorCb := func(action string, data map[string]any) error { - return callbackErr + cbMu.Lock() + cbCount++ + count := cbCount + cbMu.Unlock() + if count == 1 { + return errors.New("first callback error") + } + return nil } trigger, err := newBentoTrigger(map[string]any{ @@ -328,8 +344,9 @@ func TestBentoTrigger_CallbackError(t *testing.T) { map[string]any{ "input": map[string]any{ "generate": map[string]any{ - "mapping": `root = {"test": "data"}`, - "count": 1, + "mapping": `root = {"test": "data"}`, + "count": 3, + "interval": "10ms", }, }, "workflow": "test", @@ -340,22 +357,37 @@ func TestBentoTrigger_CallbackError(t *testing.T) { t.Fatalf("newBentoTrigger() error = %v", err) } - callbackErr = nil // First call succeeds - ctx := context.Background() if err := trigger.Start(ctx); err != nil { t.Fatalf("Start() error = %v", err) } - time.Sleep(100 * time.Millisecond) + // Poll until at least 2 callbacks are observed (1 error + 1 success). + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + cbMu.Lock() + count := cbCount + cbMu.Unlock() + if count >= 2 { + break + } + time.Sleep(10 * time.Millisecond) + } - stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - // Should stop cleanly even if callback had errors + // Should stop cleanly even when a callback returned an error. if err := trigger.Stop(stopCtx); err != nil { t.Errorf("Stop() error = %v", err) } + + cbMu.Lock() + count := cbCount + cbMu.Unlock() + if count == 0 { + t.Error("expected callback to be invoked at least once") + } } func TestBentoTrigger_StopWithoutStart(t *testing.T) { @@ -365,13 +397,9 @@ func TestBentoTrigger_StopWithoutStart(t *testing.T) { t.Fatalf("newBentoTrigger() error = %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - // Stop without Start — done channel never closed, expect timeout or nil - err = trigger.Stop(ctx) - if err == nil || errors.Is(err, context.DeadlineExceeded) { - return + ctx := context.Background() + // Stop without Start should not panic + if err := trigger.Stop(ctx); err != nil { + t.Errorf("Stop() without Start error = %v", err) } - t.Errorf("Stop() without Start: expected nil or DeadlineExceeded, got %v", err) } From 4f38bc4d8aff4a160f15183bee8812cade770adf Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 10:29:16 +0000 Subject: [PATCH 2/5] Initial plan From f1729942f0b46c7ba76a259be461110839895a18 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 10:54:02 +0000 Subject: [PATCH 3/5] fix: address review comments on test reliability and correctness - Replace time.Sleep with deadline-based polling in TestInputModule_PublishMessages and TestBentoTrigger_StartStop - Remove unused wantType field from TestBentoPlugin_CreateModule test table - Add context timeout to TestProcessorStep_ExecuteWithInvalidBloblang - Fix TestBentoTrigger_CallbackError to actually return a non-nil error and verify clean stop - Fix count() Bloblang calls to include required name argument - Add testmain_test.go to register Bento pure components for tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- internal/plugin_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/plugin_test.go b/internal/plugin_test.go index a9dd71f..706be2b 100644 --- a/internal/plugin_test.go +++ b/internal/plugin_test.go @@ -75,35 +75,30 @@ func TestBentoPlugin_CreateModule(t *testing.T) { typeName string name string config map[string]any - wantType string wantErr bool }{ { typeName: "bento.stream", name: "test-stream", config: map[string]any{}, - wantType: "*internal.streamModule", wantErr: false, }, { typeName: "bento.input", name: "test-input", config: map[string]any{}, - wantType: "*internal.inputModule", wantErr: false, }, { typeName: "bento.output", name: "test-output", config: map[string]any{}, - wantType: "*internal.outputModule", wantErr: false, }, { typeName: "bento.broker", name: "test-broker", config: map[string]any{}, - wantType: "*internal.brokerModule", wantErr: false, }, { From 7fdedba525c0401e3e2407e0bcadfbb69926efb7 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Mon, 23 Feb 2026 09:31:10 -0500 Subject: [PATCH 4/5] fix: resolve merge conflicts and fix test correctness after rebase - broker_module_test.go: use generate transport (not memory) for ensureStream tests since memory is not a valid Bento input type; keep memory transport for Init/StartStop tests that don't call ensureStream; restore time.Sleep before Stop to allow goroutines to start running streams - processor_step_test.go: use mapping: YAML format (not - bloblang: list format) since AddProcessorYAML expects a single processor object, not a sequence; reduce invalid bloblang timeout from 5s to 2s; keep lenient error handling for construction-time failures - stream_module_test.go: restore errors import and timeout context for StopWithoutStart since Stop blocks on done channel when stream was never started - trigger_test.go: restore timeout context for StopWithoutStart since done channel is never closed when Start was not called Co-Authored-By: Claude Opus 4.6 --- internal/broker_module_test.go | 28 +++++++++++++++++++++++++--- internal/processor_step_test.go | 33 ++++++++++++++++----------------- internal/stream_module_test.go | 17 ++++++++++++----- internal/trigger_test.go | 12 ++++++++---- 4 files changed, 61 insertions(+), 29 deletions(-) diff --git a/internal/broker_module_test.go b/internal/broker_module_test.go index 444d59f..c5088c6 100644 --- a/internal/broker_module_test.go +++ b/internal/broker_module_test.go @@ -127,8 +127,15 @@ func TestBrokerModule_StartStop(t *testing.T) { } func TestBrokerModule_EnsureStream(t *testing.T) { + // Use generate transport so ensureStream can build a valid Bento stream. + // count=0 means infinite generation; the stream will be stopped at the end. m, _ := newBrokerModule("test-broker", map[string]any{ - "transport": "memory", + "transport": "generate", + "transport_config": map[string]any{ + "mapping": `root = {"test": "data"}`, + "count": 0, + "interval": "1s", + }, }) if err := m.Init(); err != nil { @@ -179,6 +186,9 @@ func TestBrokerModule_EnsureStream(t *testing.T) { t.Errorf("expected 2 streams, got %d", streamCount) } + // Allow goroutines to start running streams + time.Sleep(50 * time.Millisecond) + // Stop should clean up all streams stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -197,8 +207,14 @@ func TestBrokerModule_EnsureStream(t *testing.T) { } func TestBrokerModule_ConcurrentEnsureStream(t *testing.T) { + // Use generate transport so ensureStream can build a valid Bento stream. m, _ := newBrokerModule("test-broker", map[string]any{ - "transport": "memory", + "transport": "generate", + "transport_config": map[string]any{ + "mapping": `root = {"test": "data"}`, + "count": 0, + "interval": "1s", + }, }) if err := m.Init(); err != nil { @@ -264,8 +280,14 @@ func TestBrokerModule_ConcurrentEnsureStream(t *testing.T) { } func TestBrokerModule_EnsureStreamWithoutPublisher(t *testing.T) { + // Use generate transport so ensureStream can build a valid Bento stream. m, _ := newBrokerModule("test-broker", map[string]any{ - "transport": "memory", + "transport": "generate", + "transport_config": map[string]any{ + "mapping": `root = {"test": "data"}`, + "count": 0, + "interval": "1s", + }, }) if err := m.Init(); err != nil { diff --git a/internal/processor_step_test.go b/internal/processor_step_test.go index 5105052..e3d54e3 100644 --- a/internal/processor_step_test.go +++ b/internal/processor_step_test.go @@ -19,10 +19,10 @@ func TestNewProcessorStep(t *testing.T) { name: "with processors as string", stepName: "test-step", config: map[string]any{ - "processors": "- bloblang: 'root = this'", + "processors": "bloblang: 'root = this'", }, wantErr: false, - wantProcYAML: "- bloblang: 'root = this'", + wantProcYAML: "bloblang: 'root = this'", }, { name: "with processors as map", @@ -108,28 +108,28 @@ func TestProcessorStep_ExecuteWithBloblang(t *testing.T) { }{ { name: "simple pass-through mapping", - processors: "- bloblang: 'root = this'", + processors: "mapping: 'root = this'", input: map[string]any{"key": "value"}, wantOutput: map[string]any{"key": "value"}, wantErr: false, }, { name: "field transformation", - processors: "- bloblang: 'root.output = this.input.uppercase()'", + processors: "mapping: 'root.output = this.input.uppercase()'", input: map[string]any{"input": "hello"}, wantOutput: map[string]any{"output": "HELLO"}, wantErr: false, }, { name: "add computed field", - processors: "- bloblang: 'root = this\nroot.computed = (this.a + this.b)'", + processors: "mapping: |\n root = this\n root.computed = this.a + this.b", input: map[string]any{"a": float64(5), "b": float64(3)}, wantOutput: map[string]any{"a": float64(5), "b": float64(3), "computed": float64(8)}, wantErr: false, }, { name: "constant output", - processors: "- bloblang: 'root.status = \"processed\"'", + processors: "mapping: 'root.status = \"processed\"'", input: map[string]any{"data": "test"}, wantOutput: map[string]any{"status": "processed"}, wantErr: false, @@ -172,14 +172,16 @@ func TestProcessorStep_ExecuteWithBloblang(t *testing.T) { func TestProcessorStep_ExecuteWithInvalidBloblang(t *testing.T) { s, err := newProcessorStep("test", map[string]any{ - "processors": "- bloblang: 'root = invalid syntax here {'", + "processors": "mapping: 'root = invalid syntax here {'", }) if err != nil { - t.Fatalf("newProcessorStep() error = %v", err) + // Invalid Bloblang may be caught at construction time + return } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() + result, err := s.Execute(ctx, map[string]any{"data": "test"}, nil, map[string]any{}, nil) if err == nil { @@ -192,7 +194,7 @@ func TestProcessorStep_ExecuteWithInvalidBloblang(t *testing.T) { func TestProcessorStep_ExecuteMergesInputs(t *testing.T) { s, err := newProcessorStep("test", map[string]any{ - "processors": "- bloblang: 'root = this'", + "processors": "mapping: 'root = this'", }) if err != nil { t.Fatalf("newProcessorStep() error = %v", err) @@ -219,7 +221,7 @@ func TestProcessorStep_ExecuteMergesInputs(t *testing.T) { func TestProcessorStep_ExecuteWithNonJSONOutput(t *testing.T) { // Processor that outputs plain text instead of JSON s, err := newProcessorStep("test", map[string]any{ - "processors": "- bloblang: 'root = \"plain text output\"'", + "processors": "mapping: 'root = \"plain text output\"'", }) if err != nil { t.Fatalf("newProcessorStep() error = %v", err) @@ -239,7 +241,7 @@ func TestProcessorStep_ExecuteWithNonJSONOutput(t *testing.T) { func TestProcessorStep_ExecuteWithContextCancel(t *testing.T) { s, err := newProcessorStep("test", map[string]any{ - "processors": "- bloblang: 'root = this'", + "processors": "mapping: 'root = this'", }) if err != nil { t.Fatalf("newProcessorStep() error = %v", err) @@ -259,12 +261,9 @@ func TestProcessorStep_ExecuteWithContextCancel(t *testing.T) { } func TestProcessorStep_ExecuteWithMultipleProcessors(t *testing.T) { - // Multiple processors in a chain + // Combined mapping that chains transformations s, err := newProcessorStep("test", map[string]any{ - "processors": ` -- bloblang: 'root.step1 = this.input.uppercase()' -- bloblang: 'root.step2 = this.step1 + " PROCESSED"' -`, + "processors": "mapping: |\n root.step1 = this.input.uppercase()\n root.step2 = root.step1 + \" PROCESSED\"", }) if err != nil { t.Fatalf("newProcessorStep() error = %v", err) diff --git a/internal/stream_module_test.go b/internal/stream_module_test.go index 0cf9e42..3a850a4 100644 --- a/internal/stream_module_test.go +++ b/internal/stream_module_test.go @@ -2,6 +2,7 @@ package internal import ( "context" + "errors" "testing" "time" ) @@ -153,10 +154,16 @@ func TestStreamModule_StartStop(t *testing.T) { func TestStreamModule_StopWithoutStart(t *testing.T) { m, _ := newStreamModule("test", map[string]any{"input": map[string]any{}}) - ctx := context.Background() - - // Stop without Start should not panic - if err := m.Stop(ctx); err != nil { - t.Errorf("Stop() without Start error = %v", err) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // Stop without Start should return context deadline (done chan never closed) + err := m.Stop(ctx) + if err == nil { + // If Stop completes without error, that's also acceptable + return + } + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("Stop() without Start: expected DeadlineExceeded, got %v", err) } } diff --git a/internal/trigger_test.go b/internal/trigger_test.go index 2291e60..79e877b 100644 --- a/internal/trigger_test.go +++ b/internal/trigger_test.go @@ -397,9 +397,13 @@ func TestBentoTrigger_StopWithoutStart(t *testing.T) { t.Fatalf("newBentoTrigger() error = %v", err) } - ctx := context.Background() - // Stop without Start should not panic - if err := trigger.Stop(ctx); err != nil { - t.Errorf("Stop() without Start error = %v", err) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // Stop without Start — done channel never closed, expect timeout or nil + err = trigger.Stop(ctx) + if err == nil || errors.Is(err, context.DeadlineExceeded) { + return } + t.Errorf("Stop() without Start: expected nil or DeadlineExceeded, got %v", err) } From 11e3758810b201035b5eed2b7fec7de862796dd5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 14:48:39 +0000 Subject: [PATCH 5/5] fix: address remaining test flakiness and assertion correctness - Replace time.Sleep(200ms) with deadline-based polling in TestBentoTrigger_MultipleSubscriptions - Strengthen TestBentoTrigger_CallbackError assertion to require at least 2 invocations - Change TestProcessorStep_ExecuteWithContextCancel from t.Logf to a hard errors.Is assertion Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- internal/processor_step_test.go | 6 +++--- internal/trigger_test.go | 17 +++++++++++++---- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/internal/processor_step_test.go b/internal/processor_step_test.go index e3d54e3..0879245 100644 --- a/internal/processor_step_test.go +++ b/internal/processor_step_test.go @@ -3,6 +3,7 @@ package internal import ( "context" "encoding/json" + "errors" "testing" "time" ) @@ -254,9 +255,8 @@ func TestProcessorStep_ExecuteWithContextCancel(t *testing.T) { if err == nil { t.Error("Execute() expected error with cancelled context") - } - if err != context.Canceled { - t.Logf("got error: %v", err) + } else if !errors.Is(err, context.Canceled) { + t.Errorf("Execute() expected context.Canceled, got: %v", err) } } diff --git a/internal/trigger_test.go b/internal/trigger_test.go index 79e877b..0439bea 100644 --- a/internal/trigger_test.go +++ b/internal/trigger_test.go @@ -259,8 +259,17 @@ func TestBentoTrigger_MultipleSubscriptions(t *testing.T) { t.Fatalf("Start() error = %v", err) } - // Wait for all messages - time.Sleep(200 * time.Millisecond) + // Wait until both subscriptions have invoked the callback, or time out. + deadline := time.Now().Add(2 * time.Second) + for { + if len(cb.GetCalls()) >= 2 { + break + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for 2 callback invocations, got %d", len(cb.GetCalls())) + } + time.Sleep(10 * time.Millisecond) + } stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -385,8 +394,8 @@ func TestBentoTrigger_CallbackError(t *testing.T) { cbMu.Lock() count := cbCount cbMu.Unlock() - if count == 0 { - t.Error("expected callback to be invoked at least once") + if count < 2 { + t.Errorf("expected callback to be invoked at least twice (1 error + 1 success), got %d", count) } }