From 0a320fed22f9c83d904f1776ef5cb8dd24014287 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 13:54:55 +0000 Subject: [PATCH 1/2] Initial plan From 30fea5f5642badaf5962c74b8ca730bb8ca635c0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 14:05:10 +0000 Subject: [PATCH 2/2] Apply PR review feedback: error handling, test improvements, mock refactoring Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- internal/bridge.go | 6 ++- internal/bridge_test.go | 25 ++++++++++++ internal/input_module_test.go | 37 ----------------- internal/mocks_test.go | 38 ++++++++++++++++++ internal/plugin.go | 2 +- internal/plugin_test.go | 70 +++++++++++++++++++++++---------- internal/processor_step_test.go | 10 +++-- 7 files changed, 124 insertions(+), 64 deletions(-) create mode 100644 internal/mocks_test.go diff --git a/internal/bridge.go b/internal/bridge.go index 23f3274..4b25af2 100644 --- a/internal/bridge.go +++ b/internal/bridge.go @@ -39,10 +39,12 @@ func messageToMap(msg *service.Message) (map[string]any, error) { // Copy metadata. meta := map[string]any{} - _ = msg.MetaWalkMut(func(key string, value any) error { + if err := msg.MetaWalkMut(func(key string, value any) error { meta[key] = value return nil - }) + }); err != nil { + return nil, fmt.Errorf("copy message metadata: %w", err) + } result["metadata"] = meta return result, nil diff --git a/internal/bridge_test.go b/internal/bridge_test.go index 3e1491d..454efe7 100644 --- a/internal/bridge_test.go +++ b/internal/bridge_test.go @@ -224,4 +224,29 @@ func TestRoundTrip_MessageToMapToMessage(t *testing.T) { if reconstructed == nil { t.Fatal("mapToMessage() returned nil") } + + // Verify body is preserved. + raw, err := reconstructed.AsBytes() + if err != nil { + t.Fatalf("reconstructed.AsBytes() returned error: %v", err) + } + body, ok := m["body"].(map[string]any) + if !ok { + t.Fatalf("expected body to be map, got %T", m["body"]) + } + if body["name"] != "test" { + t.Errorf("round-trip: body[name] = %v, want %q", body["name"], "test") + } + if len(raw) == 0 { + t.Error("round-trip: reconstructed message bytes should not be empty") + } + + // Verify metadata is preserved. + val, exists := reconstructed.MetaGet("source") + if !exists { + t.Fatal("round-trip: metadata key 'source' not found on reconstructed message") + } + if val != "unit-test" { + t.Errorf("round-trip: metadata[source] = %q, want %q", val, "unit-test") + } } diff --git a/internal/input_module_test.go b/internal/input_module_test.go index 801b044..a24cf23 100644 --- a/internal/input_module_test.go +++ b/internal/input_module_test.go @@ -161,40 +161,3 @@ func TestInputModule_ImplementsMessageAwareModule(t *testing.T) { m.SetMessagePublisher(nil) m.SetMessageSubscriber(nil) } - -// mockPublisher is a test double for sdk.MessagePublisher. -type mockPublisher struct { - published []struct { - topic string - payload []byte - metadata map[string]string - } -} - -func (p *mockPublisher) Publish(topic string, payload []byte, metadata map[string]string) (string, error) { - p.published = append(p.published, struct { - topic string - payload []byte - metadata map[string]string - }{topic, payload, metadata}) - return "msg-id", nil -} - -// mockSubscriber is a test double for sdk.MessageSubscriber. -type mockSubscriber struct { - subscribed map[string]func(payload []byte, metadata map[string]string) error - unsubscribed []string -} - -func (s *mockSubscriber) Subscribe(topic string, handler func(payload []byte, metadata map[string]string) error) error { - if s.subscribed == nil { - s.subscribed = make(map[string]func(payload []byte, metadata map[string]string) error) - } - s.subscribed[topic] = handler - return nil -} - -func (s *mockSubscriber) Unsubscribe(topic string) error { - s.unsubscribed = append(s.unsubscribed, topic) - return nil -} diff --git a/internal/mocks_test.go b/internal/mocks_test.go new file mode 100644 index 0000000..e100216 --- /dev/null +++ b/internal/mocks_test.go @@ -0,0 +1,38 @@ +package internal + +// mockPublisher is a test double for sdk.MessagePublisher. +type mockPublisher struct { + published []struct { + topic string + payload []byte + metadata map[string]string + } +} + +func (p *mockPublisher) Publish(topic string, payload []byte, metadata map[string]string) (string, error) { + p.published = append(p.published, struct { + topic string + payload []byte + metadata map[string]string + }{topic, payload, metadata}) + return "msg-id", nil +} + +// mockSubscriber is a test double for sdk.MessageSubscriber. +type mockSubscriber struct { + subscribed map[string]func(payload []byte, metadata map[string]string) error + unsubscribed []string +} + +func (s *mockSubscriber) Subscribe(topic string, handler func(payload []byte, metadata map[string]string) error) error { + if s.subscribed == nil { + s.subscribed = make(map[string]func(payload []byte, metadata map[string]string) error) + } + s.subscribed[topic] = handler + return nil +} + +func (s *mockSubscriber) Unsubscribe(topic string) error { + s.unsubscribed = append(s.unsubscribed, topic) + return nil +} diff --git a/internal/plugin.go b/internal/plugin.go index ccdf7a0..916c91f 100644 --- a/internal/plugin.go +++ b/internal/plugin.go @@ -136,7 +136,7 @@ func (p *bentoPlugin) ModuleSchemas() []sdk.ModuleSchemaData { Category: "transform", Description: "Runs data through Bento processors (Bloblang, jmespath, etc.) as a pipeline step.", ConfigFields: []sdk.ConfigField{ - {Name: "processors", Type: "string", Description: "YAML string or map defining Bento processors", Required: true}, + {Name: "processors", Type: "string", Description: "YAML string or map defining Bento processors"}, }, }, } diff --git a/internal/plugin_test.go b/internal/plugin_test.go index 9dcfa38..bd84ccf 100644 --- a/internal/plugin_test.go +++ b/internal/plugin_test.go @@ -32,8 +32,11 @@ func TestBentoPlugin_Manifest(t *testing.T) { } func TestBentoPlugin_ModuleTypes(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) - types := p.ModuleTypes() + mp, ok := NewBentoPlugin().(sdk.ModuleProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.ModuleProvider") + } + types := mp.ModuleTypes() want := map[string]bool{ "bento.stream": false, @@ -58,8 +61,11 @@ func TestBentoPlugin_ModuleTypes(t *testing.T) { } func TestBentoPlugin_StepTypes(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) - types := p.StepTypes() + sp, ok := NewBentoPlugin().(sdk.StepProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.StepProvider") + } + types := sp.StepTypes() if len(types) != 1 { t.Fatalf("StepTypes() returned %d types, want 1", len(types)) @@ -70,8 +76,11 @@ func TestBentoPlugin_StepTypes(t *testing.T) { } func TestBentoPlugin_TriggerTypes(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) - types := p.TriggerTypes() + tp, ok := NewBentoPlugin().(sdk.TriggerProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.TriggerProvider") + } + types := tp.TriggerTypes() if len(types) != 1 { t.Fatalf("TriggerTypes() returned %d types, want 1", len(types)) @@ -82,7 +91,10 @@ func TestBentoPlugin_TriggerTypes(t *testing.T) { } func TestBentoPlugin_CreateModule_ValidTypes(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) + mp, ok := NewBentoPlugin().(sdk.ModuleProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.ModuleProvider") + } tests := []struct { typeName string @@ -117,7 +129,7 @@ func TestBentoPlugin_CreateModule_ValidTypes(t *testing.T) { for _, tt := range tests { t.Run(tt.typeName, func(t *testing.T) { - mod, err := p.CreateModule(tt.typeName, "test-"+tt.typeName, tt.config) + mod, err := mp.CreateModule(tt.typeName, "test-"+tt.typeName, tt.config) if err != nil { t.Fatalf("CreateModule(%q) returned error: %v", tt.typeName, err) } @@ -129,9 +141,12 @@ func TestBentoPlugin_CreateModule_ValidTypes(t *testing.T) { } func TestBentoPlugin_CreateModule_UnknownType(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) + mp, ok := NewBentoPlugin().(sdk.ModuleProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.ModuleProvider") + } - mod, err := p.CreateModule("bento.unknown", "test", map[string]any{}) + mod, err := mp.CreateModule("bento.unknown", "test", map[string]any{}) if err == nil { t.Fatal("CreateModule with unknown type should return error") } @@ -141,9 +156,12 @@ func TestBentoPlugin_CreateModule_UnknownType(t *testing.T) { } func TestBentoPlugin_CreateStep_ValidType(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) + sp, ok := NewBentoPlugin().(sdk.StepProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.StepProvider") + } - step, err := p.CreateStep("step.bento", "test-step", map[string]any{}) + step, err := sp.CreateStep("step.bento", "test-step", map[string]any{}) if err != nil { t.Fatalf("CreateStep(step.bento) returned error: %v", err) } @@ -153,9 +171,12 @@ func TestBentoPlugin_CreateStep_ValidType(t *testing.T) { } func TestBentoPlugin_CreateStep_UnknownType(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) + sp, ok := NewBentoPlugin().(sdk.StepProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.StepProvider") + } - step, err := p.CreateStep("step.unknown", "test", map[string]any{}) + step, err := sp.CreateStep("step.unknown", "test", map[string]any{}) if err == nil { t.Fatal("CreateStep with unknown type should return error") } @@ -165,10 +186,13 @@ func TestBentoPlugin_CreateStep_UnknownType(t *testing.T) { } func TestBentoPlugin_CreateTrigger_ValidType(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) + tp, ok := NewBentoPlugin().(sdk.TriggerProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.TriggerProvider") + } cb := func(action string, data map[string]any) error { return nil } - trigger, err := p.CreateTrigger("bento", map[string]any{}, cb) + trigger, err := tp.CreateTrigger("bento", map[string]any{}, cb) if err != nil { t.Fatalf("CreateTrigger(bento) returned error: %v", err) } @@ -178,10 +202,13 @@ func TestBentoPlugin_CreateTrigger_ValidType(t *testing.T) { } func TestBentoPlugin_CreateTrigger_UnknownType(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) + tp, ok := NewBentoPlugin().(sdk.TriggerProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.TriggerProvider") + } cb := func(action string, data map[string]any) error { return nil } - trigger, err := p.CreateTrigger("unknown", map[string]any{}, cb) + trigger, err := tp.CreateTrigger("unknown", map[string]any{}, cb) if err == nil { t.Fatal("CreateTrigger with unknown type should return error") } @@ -191,8 +218,11 @@ func TestBentoPlugin_CreateTrigger_UnknownType(t *testing.T) { } func TestBentoPlugin_ModuleSchemas(t *testing.T) { - p := NewBentoPlugin().(*bentoPlugin) - schemas := p.ModuleSchemas() + scp, ok := NewBentoPlugin().(sdk.SchemaProvider) + if !ok { + t.Fatal("NewBentoPlugin() does not implement sdk.SchemaProvider") + } + schemas := scp.ModuleSchemas() if len(schemas) == 0 { t.Fatal("ModuleSchemas() returned empty slice") diff --git a/internal/processor_step_test.go b/internal/processor_step_test.go index a244026..60317d6 100644 --- a/internal/processor_step_test.go +++ b/internal/processor_step_test.go @@ -2,6 +2,7 @@ package internal import ( "context" + "errors" "testing" "time" @@ -170,7 +171,7 @@ func TestProcessorStep_Execute_WithBloblangPassthrough(t *testing.T) { func TestProcessorStep_Execute_ContextCancelled(t *testing.T) { config := map[string]any{ - "processors": `- bloblang: "root = this"`, + "processors": `bloblang: "root = this"`, } s, _ := newProcessorStep("test", config) @@ -179,9 +180,10 @@ func TestProcessorStep_Execute_ContextCancelled(t *testing.T) { cancel() // Cancel immediately. _, err := s.Execute(ctx, map[string]any{"key": "val"}, nil, nil, nil) - // May succeed (if stream runs fast enough) or return context.Canceled — both are acceptable. - // We just verify it does not panic or block forever. - _ = err + // May succeed (if stream runs fast enough) or return a context.Canceled error — both are acceptable. + if err != nil && !errors.Is(err, context.Canceled) { + t.Errorf("Execute() with cancelled context: unexpected error type %v", err) + } } func TestProcessorStep_Execute_StopPipelineIsFalse(t *testing.T) {