Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ func messageToMap(msg *service.Message) (map[string]any, error) {

// Copy metadata.
meta := map[string]any{}
_ = msg.MetaWalkMut(func(key string, value any) error {
if err := msg.MetaWalkMut(func(key string, value any) error {
meta[key] = value
return nil
})
}); err != nil {
return nil, fmt.Errorf("copy message metadata: %w", err)
}
result["metadata"] = meta

return result, nil
Expand Down
25 changes: 25 additions & 0 deletions internal/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,29 @@ func TestRoundTrip_MessageToMapToMessage(t *testing.T) {
if reconstructed == nil {
t.Fatal("mapToMessage() returned nil")
}

// Verify body is preserved.
raw, err := reconstructed.AsBytes()
if err != nil {
t.Fatalf("reconstructed.AsBytes() returned error: %v", err)
}
body, ok := m["body"].(map[string]any)
if !ok {
t.Fatalf("expected body to be map, got %T", m["body"])
}
if body["name"] != "test" {
t.Errorf("round-trip: body[name] = %v, want %q", body["name"], "test")
}
if len(raw) == 0 {
t.Error("round-trip: reconstructed message bytes should not be empty")
}

// Verify metadata is preserved.
val, exists := reconstructed.MetaGet("source")
if !exists {
t.Fatal("round-trip: metadata key 'source' not found on reconstructed message")
}
if val != "unit-test" {
t.Errorf("round-trip: metadata[source] = %q, want %q", val, "unit-test")
}
}
37 changes: 0 additions & 37 deletions internal/input_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,40 +161,3 @@ func TestInputModule_ImplementsMessageAwareModule(t *testing.T) {
m.SetMessagePublisher(nil)
m.SetMessageSubscriber(nil)
}

// mockPublisher is a test double for sdk.MessagePublisher.
type mockPublisher struct {
published []struct {
topic string
payload []byte
metadata map[string]string
}
}

func (p *mockPublisher) Publish(topic string, payload []byte, metadata map[string]string) (string, error) {
p.published = append(p.published, struct {
topic string
payload []byte
metadata map[string]string
}{topic, payload, metadata})
return "msg-id", nil
}

// mockSubscriber is a test double for sdk.MessageSubscriber.
type mockSubscriber struct {
subscribed map[string]func(payload []byte, metadata map[string]string) error
unsubscribed []string
}

func (s *mockSubscriber) Subscribe(topic string, handler func(payload []byte, metadata map[string]string) error) error {
if s.subscribed == nil {
s.subscribed = make(map[string]func(payload []byte, metadata map[string]string) error)
}
s.subscribed[topic] = handler
return nil
}

func (s *mockSubscriber) Unsubscribe(topic string) error {
s.unsubscribed = append(s.unsubscribed, topic)
return nil
}
38 changes: 38 additions & 0 deletions internal/mocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package internal

// mockPublisher is a test double for sdk.MessagePublisher.
type mockPublisher struct {
published []struct {
topic string
payload []byte
metadata map[string]string
}
}

func (p *mockPublisher) Publish(topic string, payload []byte, metadata map[string]string) (string, error) {
p.published = append(p.published, struct {
topic string
payload []byte
metadata map[string]string
}{topic, payload, metadata})
return "msg-id", nil
}

// mockSubscriber is a test double for sdk.MessageSubscriber.
type mockSubscriber struct {
subscribed map[string]func(payload []byte, metadata map[string]string) error
unsubscribed []string
}

func (s *mockSubscriber) Subscribe(topic string, handler func(payload []byte, metadata map[string]string) error) error {
if s.subscribed == nil {
s.subscribed = make(map[string]func(payload []byte, metadata map[string]string) error)
}
s.subscribed[topic] = handler
return nil
}

func (s *mockSubscriber) Unsubscribe(topic string) error {
s.unsubscribed = append(s.unsubscribed, topic)
return nil
}
2 changes: 1 addition & 1 deletion internal/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (p *bentoPlugin) ModuleSchemas() []sdk.ModuleSchemaData {
Category: "transform",
Description: "Runs data through Bento processors (Bloblang, jmespath, etc.) as a pipeline step.",
ConfigFields: []sdk.ConfigField{
{Name: "processors", Type: "string", Description: "YAML string or map defining Bento processors", Required: true},
{Name: "processors", Type: "string", Description: "YAML string or map defining Bento processors"},
},
},
}
Expand Down
70 changes: 50 additions & 20 deletions internal/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ func TestBentoPlugin_Manifest(t *testing.T) {
}

func TestBentoPlugin_ModuleTypes(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
types := p.ModuleTypes()
mp, ok := NewBentoPlugin().(sdk.ModuleProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.ModuleProvider")
}
types := mp.ModuleTypes()

want := map[string]bool{
"bento.stream": false,
Expand All @@ -58,8 +61,11 @@ func TestBentoPlugin_ModuleTypes(t *testing.T) {
}

func TestBentoPlugin_StepTypes(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
types := p.StepTypes()
sp, ok := NewBentoPlugin().(sdk.StepProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.StepProvider")
}
types := sp.StepTypes()

if len(types) != 1 {
t.Fatalf("StepTypes() returned %d types, want 1", len(types))
Expand All @@ -70,8 +76,11 @@ func TestBentoPlugin_StepTypes(t *testing.T) {
}

func TestBentoPlugin_TriggerTypes(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
types := p.TriggerTypes()
tp, ok := NewBentoPlugin().(sdk.TriggerProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.TriggerProvider")
}
types := tp.TriggerTypes()

if len(types) != 1 {
t.Fatalf("TriggerTypes() returned %d types, want 1", len(types))
Expand All @@ -82,7 +91,10 @@ func TestBentoPlugin_TriggerTypes(t *testing.T) {
}

func TestBentoPlugin_CreateModule_ValidTypes(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
mp, ok := NewBentoPlugin().(sdk.ModuleProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.ModuleProvider")
}

tests := []struct {
typeName string
Expand Down Expand Up @@ -117,7 +129,7 @@ func TestBentoPlugin_CreateModule_ValidTypes(t *testing.T) {

for _, tt := range tests {
t.Run(tt.typeName, func(t *testing.T) {
mod, err := p.CreateModule(tt.typeName, "test-"+tt.typeName, tt.config)
mod, err := mp.CreateModule(tt.typeName, "test-"+tt.typeName, tt.config)
if err != nil {
t.Fatalf("CreateModule(%q) returned error: %v", tt.typeName, err)
}
Expand All @@ -129,9 +141,12 @@ func TestBentoPlugin_CreateModule_ValidTypes(t *testing.T) {
}

func TestBentoPlugin_CreateModule_UnknownType(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
mp, ok := NewBentoPlugin().(sdk.ModuleProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.ModuleProvider")
}

mod, err := p.CreateModule("bento.unknown", "test", map[string]any{})
mod, err := mp.CreateModule("bento.unknown", "test", map[string]any{})
if err == nil {
t.Fatal("CreateModule with unknown type should return error")
}
Expand All @@ -141,9 +156,12 @@ func TestBentoPlugin_CreateModule_UnknownType(t *testing.T) {
}

func TestBentoPlugin_CreateStep_ValidType(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
sp, ok := NewBentoPlugin().(sdk.StepProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.StepProvider")
}

step, err := p.CreateStep("step.bento", "test-step", map[string]any{})
step, err := sp.CreateStep("step.bento", "test-step", map[string]any{})
if err != nil {
t.Fatalf("CreateStep(step.bento) returned error: %v", err)
}
Expand All @@ -153,9 +171,12 @@ func TestBentoPlugin_CreateStep_ValidType(t *testing.T) {
}

func TestBentoPlugin_CreateStep_UnknownType(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
sp, ok := NewBentoPlugin().(sdk.StepProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.StepProvider")
}

step, err := p.CreateStep("step.unknown", "test", map[string]any{})
step, err := sp.CreateStep("step.unknown", "test", map[string]any{})
if err == nil {
t.Fatal("CreateStep with unknown type should return error")
}
Expand All @@ -165,10 +186,13 @@ func TestBentoPlugin_CreateStep_UnknownType(t *testing.T) {
}

func TestBentoPlugin_CreateTrigger_ValidType(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
tp, ok := NewBentoPlugin().(sdk.TriggerProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.TriggerProvider")
}

cb := func(action string, data map[string]any) error { return nil }
trigger, err := p.CreateTrigger("bento", map[string]any{}, cb)
trigger, err := tp.CreateTrigger("bento", map[string]any{}, cb)
if err != nil {
t.Fatalf("CreateTrigger(bento) returned error: %v", err)
}
Expand All @@ -178,10 +202,13 @@ func TestBentoPlugin_CreateTrigger_ValidType(t *testing.T) {
}

func TestBentoPlugin_CreateTrigger_UnknownType(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
tp, ok := NewBentoPlugin().(sdk.TriggerProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.TriggerProvider")
}

cb := func(action string, data map[string]any) error { return nil }
trigger, err := p.CreateTrigger("unknown", map[string]any{}, cb)
trigger, err := tp.CreateTrigger("unknown", map[string]any{}, cb)
if err == nil {
t.Fatal("CreateTrigger with unknown type should return error")
}
Expand All @@ -191,8 +218,11 @@ func TestBentoPlugin_CreateTrigger_UnknownType(t *testing.T) {
}

func TestBentoPlugin_ModuleSchemas(t *testing.T) {
p := NewBentoPlugin().(*bentoPlugin)
schemas := p.ModuleSchemas()
scp, ok := NewBentoPlugin().(sdk.SchemaProvider)
if !ok {
t.Fatal("NewBentoPlugin() does not implement sdk.SchemaProvider")
}
schemas := scp.ModuleSchemas()

if len(schemas) == 0 {
t.Fatal("ModuleSchemas() returned empty slice")
Expand Down
10 changes: 6 additions & 4 deletions internal/processor_step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -170,7 +171,7 @@ func TestProcessorStep_Execute_WithBloblangPassthrough(t *testing.T) {

func TestProcessorStep_Execute_ContextCancelled(t *testing.T) {
config := map[string]any{
"processors": `- bloblang: "root = this"`,
"processors": `bloblang: "root = this"`,
}

s, _ := newProcessorStep("test", config)
Expand All @@ -179,9 +180,10 @@ func TestProcessorStep_Execute_ContextCancelled(t *testing.T) {
cancel() // Cancel immediately.

_, err := s.Execute(ctx, map[string]any{"key": "val"}, nil, nil, nil)
// May succeed (if stream runs fast enough) or return context.Canceled — both are acceptable.
// We just verify it does not panic or block forever.
_ = err
// May succeed (if stream runs fast enough) or return a context.Canceled error — both are acceptable.
if err != nil && !errors.Is(err, context.Canceled) {
t.Errorf("Execute() with cancelled context: unexpected error type %v", err)
}
}

func TestProcessorStep_Execute_StopPipelineIsFalse(t *testing.T) {
Expand Down