Skip to content

Commit 7b01fd7

Browse files
Copilotintel352
andcommitted
fix: address review comments on test reliability and correctness
- Replace time.Sleep with deadline-based polling in TestInputModule_PublishMessages and TestBentoTrigger_StartStop - Remove unused wantType field from TestBentoPlugin_CreateModule test table - Add context timeout to TestProcessorStep_ExecuteWithInvalidBloblang - Fix TestBentoTrigger_CallbackError to actually return a non-nil error and verify clean stop - Fix count() Bloblang calls to include required name argument - Add testmain_test.go to register Bento pure components for tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent 563306f commit 7b01fd7

7 files changed

Lines changed: 90 additions & 22 deletions

File tree

go.mod

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ require (
3030
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0 // indirect
3131
github.com/IBM/sarama v1.46.3 // indirect
3232
github.com/Jeffail/gabs/v2 v2.7.0 // indirect
33+
github.com/Jeffail/grok v1.1.0 // indirect
3334
github.com/Jeffail/shutdown v1.0.0 // indirect
3435
github.com/Microsoft/go-winio v0.6.2 // indirect
3536
github.com/OneOfOne/xxhash v1.2.8 // indirect
@@ -104,9 +105,12 @@ require (
104105
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect
105106
github.com/hashicorp/go-sockaddr v1.0.7 // indirect
106107
github.com/hashicorp/go-uuid v1.0.3 // indirect
108+
github.com/hashicorp/golang-lru/arc/v2 v2.0.7 // indirect
109+
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
107110
github.com/hashicorp/hcl v1.0.1-vault-7 // indirect
108111
github.com/hashicorp/vault/api v1.22.0 // indirect
109112
github.com/hashicorp/yamux v0.1.2 // indirect
113+
github.com/influxdata/go-syslog/v3 v3.0.0 // indirect
110114
github.com/itchyny/gojq v0.12.18 // indirect
111115
github.com/itchyny/timefmt-go v0.1.7 // indirect
112116
github.com/jackc/pgpassfile v1.0.0 // indirect
@@ -118,8 +122,10 @@ require (
118122
github.com/jcmturner/gofork v1.7.6 // indirect
119123
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
120124
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
125+
github.com/jmespath/go-jmespath v0.4.0 // indirect
121126
github.com/json-iterator/go v1.1.12 // indirect
122127
github.com/klauspost/compress v1.18.1 // indirect
128+
github.com/klauspost/pgzip v1.2.6 // indirect
123129
github.com/matoous/go-nanoid/v2 v2.0.0 // indirect
124130
github.com/mattn/go-colorable v0.1.14 // indirect
125131
github.com/mattn/go-isatty v0.0.20 // indirect
@@ -147,9 +153,13 @@ require (
147153
github.com/prometheus/client_model v0.6.2 // indirect
148154
github.com/prometheus/common v0.48.0 // indirect
149155
github.com/prometheus/procfs v0.12.0 // indirect
156+
github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc // indirect
150157
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
151158
github.com/redis/go-redis/v9 v9.18.0 // indirect
152159
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
160+
github.com/rickb777/date v1.20.5 // indirect
161+
github.com/rickb777/plural v1.4.1 // indirect
162+
github.com/robfig/cron/v3 v3.0.1 // indirect
153163
github.com/russross/blackfriday/v2 v2.1.0 // indirect
154164
github.com/ryanuber/go-glob v1.0.0 // indirect
155165
github.com/segmentio/ksuid v1.0.4 // indirect

go.sum

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
207207
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
208208
github.com/golobby/cast v1.3.3 h1:s2Lawb9RMz7YyYf8IrfMQY4IFmA1R/lgfmj97Vc6fig=
209209
github.com/golobby/cast v1.3.3/go.mod h1:0oDO5IT84HTXcbLDf1YXuk0xtg/cRDrxhbpWKxwtJCY=
210+
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
210211
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
211212
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
212213
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -306,6 +307,8 @@ github.com/jhump/protoreflect v1.16.0 h1:54fZg+49widqXYQ0b+usAFHbMkBGR4PpXrsHc8+
306307
github.com/jhump/protoreflect v1.16.0/go.mod h1:oYPd7nPvcBw/5wlDfm/AVmU9zH9BgqGCI469pGxfj/8=
307308
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
308309
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
310+
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
311+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
309312
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
310313
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
311314
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
@@ -321,6 +324,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
321324
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
322325
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
323326
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
327+
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg=
324328
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
325329
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
326330
github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg=
@@ -377,6 +381,8 @@ github.com/oklog/run v1.2.0 h1:O8x3yXwah4A73hJdlrwo/2X6J62gE5qTMusH0dvz60E=
377381
github.com/oklog/run v1.2.0/go.mod h1:mgDbKRSwPhJfesJ4PntqFUbKQRZ50NgmZTSPlFA0YFk=
378382
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
379383
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
384+
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
385+
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
380386
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
381387
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
382388
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
@@ -449,6 +455,10 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
449455
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
450456
github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw=
451457
github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs=
458+
github.com/trivago/grok v1.0.0 h1:oV2ljyZT63tgXkmgEHg2U0jMqiKKuL0hkn49s6aRavQ=
459+
github.com/trivago/grok v1.0.0/go.mod h1:9t59xLInhrncYq9a3J7488NgiBZi5y5yC7bss+w4NHM=
460+
github.com/trivago/tgo v1.0.7 h1:uaWH/XIy9aWYWpjm2CU3RpcqZXmX2ysQ9/Go+d9gyrM=
461+
github.com/trivago/tgo v1.0.7/go.mod h1:w4dpD+3tzNIIiIfkWWa85w5/B77tlvdZckQ+6PkFnhc=
452462
github.com/urfave/cli/v2 v2.27.1 h1:8xSQ6szndafKVRmfyeUMxkNUJQMjL1F2zmsZ+qHpfho=
453463
github.com/urfave/cli/v2 v2.27.1/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
454464
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@@ -596,6 +606,9 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
596606
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
597607
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
598608
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
609+
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
610+
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
611+
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
599612
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
600613
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
601614
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/input_module_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func TestInputModule_PublishMessages(t *testing.T) {
189189
"target_topic": "test-topic",
190190
"input": map[string]any{
191191
"generate": map[string]any{
192-
"mapping": `root = {"id": count(), "msg": "hello"}`,
192+
"mapping": `root = {"id": count("input_id"), "msg": "hello"}`,
193193
"count": 3,
194194
"interval": "10ms",
195195
},
@@ -207,8 +207,14 @@ func TestInputModule_PublishMessages(t *testing.T) {
207207
t.Fatalf("Start() error = %v", err)
208208
}
209209

210-
// Wait for messages to be published
211-
time.Sleep(200 * time.Millisecond)
210+
// Poll until 3 messages are published or deadline is reached.
211+
deadline := time.Now().Add(2 * time.Second)
212+
for time.Now().Before(deadline) {
213+
if len(pub.GetMessages()) >= 3 {
214+
break
215+
}
216+
time.Sleep(10 * time.Millisecond)
217+
}
212218

213219
stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
214220
defer cancel()

internal/plugin_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,35 +75,30 @@ func TestBentoPlugin_CreateModule(t *testing.T) {
7575
typeName string
7676
name string
7777
config map[string]any
78-
wantType string
7978
wantErr bool
8079
}{
8180
{
8281
typeName: "bento.stream",
8382
name: "test-stream",
8483
config: map[string]any{},
85-
wantType: "*internal.streamModule",
8684
wantErr: false,
8785
},
8886
{
8987
typeName: "bento.input",
9088
name: "test-input",
9189
config: map[string]any{},
92-
wantType: "*internal.inputModule",
9390
wantErr: false,
9491
},
9592
{
9693
typeName: "bento.output",
9794
name: "test-output",
9895
config: map[string]any{},
99-
wantType: "*internal.outputModule",
10096
wantErr: false,
10197
},
10298
{
10399
typeName: "bento.broker",
104100
name: "test-broker",
105101
config: map[string]any{},
106-
wantType: "*internal.brokerModule",
107102
wantErr: false,
108103
},
109104
{

internal/processor_step_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"testing"
7+
"time"
78
)
89

910
func TestNewProcessorStep(t *testing.T) {
@@ -177,7 +178,8 @@ func TestProcessorStep_ExecuteWithInvalidBloblang(t *testing.T) {
177178
t.Fatalf("newProcessorStep() error = %v", err)
178179
}
179180

180-
ctx := context.Background()
181+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
182+
defer cancel()
181183
result, err := s.Execute(ctx, map[string]any{"data": "test"}, nil, map[string]any{}, nil)
182184

183185
if err == nil {

internal/testmain_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package internal
2+
3+
import (
4+
// Register pure Bento components (e.g. generate input, bloblang processor)
5+
// so that tests can use them without importing the full component suite.
6+
_ "github.com/warpstreamlabs/bento/v4/public/components/pure"
7+
)

internal/trigger_test.go

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package internal
22

33
import (
44
"context"
5+
"errors"
56
"sync"
67
"testing"
78
"time"
@@ -169,7 +170,7 @@ func TestBentoTrigger_StartStop(t *testing.T) {
169170
map[string]any{
170171
"input": map[string]any{
171172
"generate": map[string]any{
172-
"mapping": `root = {"id": count()}`,
173+
"mapping": `root = {"id": count("trigger_id")}`,
173174
"count": 2,
174175
"interval": "50ms",
175176
},
@@ -188,8 +189,14 @@ func TestBentoTrigger_StartStop(t *testing.T) {
188189
t.Fatalf("Start() error = %v", err)
189190
}
190191

191-
// Wait for messages to be processed
192-
time.Sleep(300 * time.Millisecond)
192+
// Poll until 2 callbacks are observed or deadline is reached.
193+
deadline := time.Now().Add(2 * time.Second)
194+
for time.Now().Before(deadline) {
195+
if len(cb.GetCalls()) >= 2 {
196+
break
197+
}
198+
time.Sleep(10 * time.Millisecond)
199+
}
193200

194201
stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
195202
defer cancel()
@@ -314,19 +321,32 @@ func TestBentoTrigger_NoSubscriptions(t *testing.T) {
314321
}
315322

316323
func TestBentoTrigger_CallbackError(t *testing.T) {
317-
// Callback that returns error
318-
var callbackErr error
324+
// Callback that errors on the first invocation and succeeds on subsequent
325+
// ones. This exercises the error path while still allowing the stream to
326+
// terminate cleanly (Bento retries the NACKed message once, then continues).
327+
var (
328+
cbMu sync.Mutex
329+
cbCount int
330+
)
319331
errorCb := func(action string, data map[string]any) error {
320-
return callbackErr
332+
cbMu.Lock()
333+
cbCount++
334+
count := cbCount
335+
cbMu.Unlock()
336+
if count == 1 {
337+
return errors.New("first callback error")
338+
}
339+
return nil
321340
}
322341

323342
trigger, err := newBentoTrigger(map[string]any{
324343
"subscriptions": []any{
325344
map[string]any{
326345
"input": map[string]any{
327346
"generate": map[string]any{
328-
"mapping": `root = {"test": "data"}`,
329-
"count": 1,
347+
"mapping": `root = {"test": "data"}`,
348+
"count": 3,
349+
"interval": "10ms",
330350
},
331351
},
332352
"workflow": "test",
@@ -337,22 +357,37 @@ func TestBentoTrigger_CallbackError(t *testing.T) {
337357
t.Fatalf("newBentoTrigger() error = %v", err)
338358
}
339359

340-
callbackErr = nil // First call succeeds
341-
342360
ctx := context.Background()
343361
if err := trigger.Start(ctx); err != nil {
344362
t.Fatalf("Start() error = %v", err)
345363
}
346364

347-
time.Sleep(100 * time.Millisecond)
365+
// Poll until at least 2 callbacks are observed (1 error + 1 success).
366+
deadline := time.Now().Add(2 * time.Second)
367+
for time.Now().Before(deadline) {
368+
cbMu.Lock()
369+
count := cbCount
370+
cbMu.Unlock()
371+
if count >= 2 {
372+
break
373+
}
374+
time.Sleep(10 * time.Millisecond)
375+
}
348376

349-
stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
377+
stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
350378
defer cancel()
351379

352-
// Should stop cleanly even if callback had errors
380+
// Should stop cleanly even when a callback returned an error.
353381
if err := trigger.Stop(stopCtx); err != nil {
354382
t.Errorf("Stop() error = %v", err)
355383
}
384+
385+
cbMu.Lock()
386+
count := cbCount
387+
cbMu.Unlock()
388+
if count == 0 {
389+
t.Error("expected callback to be invoked at least once")
390+
}
356391
}
357392

358393
func TestBentoTrigger_StopWithoutStart(t *testing.T) {

0 commit comments

Comments
 (0)