Skip to content

Commit d59f3d1

Browse files
Copilotintel352
andauthored
fix: address goroutine leak, silent errors, and missing status transition tests (#12)
* Initial plan * fix: address PR review feedback on output/broker modules and add status transition tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent 8007ca4 commit d59f3d1

4 files changed

Lines changed: 160 additions & 17 deletions

File tree

go.sum

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
207207
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
208208
github.com/golobby/cast v1.3.3 h1:s2Lawb9RMz7YyYf8IrfMQY4IFmA1R/lgfmj97Vc6fig=
209209
github.com/golobby/cast v1.3.3/go.mod h1:0oDO5IT84HTXcbLDf1YXuk0xtg/cRDrxhbpWKxwtJCY=
210+
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
210211
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
211212
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
212213
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -306,6 +307,8 @@ github.com/jhump/protoreflect v1.16.0 h1:54fZg+49widqXYQ0b+usAFHbMkBGR4PpXrsHc8+
306307
github.com/jhump/protoreflect v1.16.0/go.mod h1:oYPd7nPvcBw/5wlDfm/AVmU9zH9BgqGCI469pGxfj/8=
307308
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
308309
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
310+
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
311+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
309312
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
310313
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
311314
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
@@ -321,6 +324,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
321324
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
322325
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
323326
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
327+
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg=
324328
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
325329
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
326330
github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg=
@@ -377,6 +381,8 @@ github.com/oklog/run v1.2.0 h1:O8x3yXwah4A73hJdlrwo/2X6J62gE5qTMusH0dvz60E=
377381
github.com/oklog/run v1.2.0/go.mod h1:mgDbKRSwPhJfesJ4PntqFUbKQRZ50NgmZTSPlFA0YFk=
378382
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
379383
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
384+
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
385+
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
380386
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
381387
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
382388
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
@@ -449,6 +455,10 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
449455
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
450456
github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw=
451457
github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs=
458+
github.com/trivago/grok v1.0.0 h1:oV2ljyZT63tgXkmgEHg2U0jMqiKKuL0hkn49s6aRavQ=
459+
github.com/trivago/grok v1.0.0/go.mod h1:9t59xLInhrncYq9a3J7488NgiBZi5y5yC7bss+w4NHM=
460+
github.com/trivago/tgo v1.0.7 h1:uaWH/XIy9aWYWpjm2CU3RpcqZXmX2ysQ9/Go+d9gyrM=
461+
github.com/trivago/tgo v1.0.7/go.mod h1:w4dpD+3tzNIIiIfkWWa85w5/B77tlvdZckQ+6PkFnhc=
452462
github.com/urfave/cli/v2 v2.27.1 h1:8xSQ6szndafKVRmfyeUMxkNUJQMjL1F2zmsZ+qHpfho=
453463
github.com/urfave/cli/v2 v2.27.1/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
454464
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@@ -596,6 +606,9 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
596606
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
597607
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
598608
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
609+
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
610+
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
611+
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
599612
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
600613
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
601614
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/broker_module.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,14 @@ func (m *brokerModule) Stop(ctx context.Context) error {
7272

7373
var firstErr error
7474
for topic, stream := range m.streams {
75-
if err := stream.Stop(ctx); err != nil && firstErr == nil {
75+
if err := stream.Stop(ctx); err != nil {
7676
slog.Error("failed to stop broker stream", "error", err, "module", m.name, "topic", topic)
77-
firstErr = fmt.Errorf("bento.broker %q: stop stream for topic %q: %w", m.name, topic, err)
78-
} else {
79-
slog.Info("broker stream stopped", "module", m.name, "topic", topic)
77+
if firstErr == nil {
78+
firstErr = fmt.Errorf("bento.broker %q: stop stream for topic %q: %w", m.name, topic, err)
79+
}
80+
continue
8081
}
82+
slog.Info("broker stream stopped", "module", m.name, "topic", topic)
8183
}
8284
m.streams = make(map[string]*service.Stream)
8385
return firstErr

internal/output_module.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,10 @@ func (m *outputModule) Start(ctx context.Context) error {
101101
m.cancel = cancel
102102

103103
moduleName := m.name
104-
105-
go func() {
106-
defer close(m.done)
107-
if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil {
108-
slog.Error("bento output stream failed", "error", err, "module", moduleName)
109-
}
110-
}()
111-
112-
slog.Info("bento output running", "module", m.name)
113-
114-
// Subscribe to the host EventBus topic. When messages arrive, forward them
115-
// to the Bento producer.
116104
producerFnRef := m.producerFn
105+
106+
// Subscribe before launching the goroutine so that if Subscribe fails we
107+
// have not yet started the stream and there is no goroutine to clean up.
117108
if err := m.subscriber.Subscribe(m.sourceTopic, func(payload []byte, metadata map[string]string) error {
118109
slog.Debug("sending message to bento output", "module", moduleName, "topic", m.sourceTopic, "size", len(payload))
119110

@@ -127,9 +118,19 @@ func (m *outputModule) Start(ctx context.Context) error {
127118
}
128119
return sendErr
129120
}); err != nil {
121+
cancel()
130122
return fmt.Errorf("bento.output %q: subscribe to topic %q: %w", m.name, m.sourceTopic, err)
131123
}
132124

125+
go func() {
126+
defer close(m.done)
127+
if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil {
128+
slog.Error("bento output stream failed", "error", err, "module", moduleName)
129+
}
130+
}()
131+
132+
slog.Info("bento output running", "module", m.name)
133+
133134
return nil
134135
}
135136

@@ -138,7 +139,9 @@ func (m *outputModule) Stop(ctx context.Context) error {
138139
slog.Info("stopping bento output", "module", m.name)
139140

140141
if m.subscriber != nil {
141-
_ = m.subscriber.Unsubscribe(m.sourceTopic)
142+
if err := m.subscriber.Unsubscribe(m.sourceTopic); err != nil {
143+
slog.Error("error unsubscribing from source topic", "error", err, "module", m.name, "topic", m.sourceTopic)
144+
}
142145
}
143146
if m.stream != nil {
144147
if err := m.stream.Stop(ctx); err != nil {

internal/stream_module_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,128 @@ func TestStreamModule_StopWithoutStart(t *testing.T) {
169169
t.Errorf("Stop() without Start: expected DeadlineExceeded, got %v", err)
170170
}
171171
}
172+
173+
func TestStreamModule_StatusTransitions(t *testing.T) {
174+
t.Run("initial status is stopped", func(t *testing.T) {
175+
m, err := newStreamModule("test-stream", map[string]any{
176+
"input": map[string]any{"generate": map[string]any{}},
177+
"output": map[string]any{"drop": map[string]any{}},
178+
})
179+
if err != nil {
180+
t.Fatalf("newStreamModule() error = %v", err)
181+
}
182+
if got := m.Status(); got != streamStopped {
183+
t.Errorf("initial status = %q, want %q", got, streamStopped)
184+
}
185+
})
186+
187+
t.Run("status is running after successful start", func(t *testing.T) {
188+
m, err := newStreamModule("test-stream", map[string]any{
189+
"input": map[string]any{
190+
"generate": map[string]any{
191+
"mapping": `root = {"test": "data"}`,
192+
"count": 0,
193+
"interval": "1s",
194+
},
195+
},
196+
"output": map[string]any{
197+
"drop": map[string]any{},
198+
},
199+
})
200+
if err != nil {
201+
t.Fatalf("newStreamModule() error = %v", err)
202+
}
203+
204+
if err := m.Init(); err != nil {
205+
t.Fatalf("Init() error = %v", err)
206+
}
207+
208+
ctx := context.Background()
209+
if err := m.Start(ctx); err != nil {
210+
t.Fatalf("Start() error = %v", err)
211+
}
212+
213+
if got := m.Status(); got != streamRunning {
214+
t.Errorf("status after Start = %q, want %q", got, streamRunning)
215+
}
216+
217+
// Allow goroutine to begin running before stopping.
218+
time.Sleep(50 * time.Millisecond)
219+
220+
stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
221+
defer cancel()
222+
_ = m.Stop(stopCtx)
223+
})
224+
225+
t.Run("status is stopped after successful stop", func(t *testing.T) {
226+
m, err := newStreamModule("test-stream", map[string]any{
227+
"input": map[string]any{
228+
"generate": map[string]any{
229+
"mapping": `root = {"test": "data"}`,
230+
"count": 0,
231+
"interval": "1s",
232+
},
233+
},
234+
"output": map[string]any{
235+
"drop": map[string]any{},
236+
},
237+
})
238+
if err != nil {
239+
t.Fatalf("newStreamModule() error = %v", err)
240+
}
241+
242+
if err := m.Init(); err != nil {
243+
t.Fatalf("Init() error = %v", err)
244+
}
245+
246+
ctx := context.Background()
247+
if err := m.Start(ctx); err != nil {
248+
t.Fatalf("Start() error = %v", err)
249+
}
250+
251+
// Allow goroutine to begin running before stopping.
252+
time.Sleep(50 * time.Millisecond)
253+
254+
stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
255+
defer cancel()
256+
257+
if err := m.Stop(stopCtx); err != nil {
258+
t.Fatalf("Stop() error = %v", err)
259+
}
260+
261+
if got := m.Status(); got != streamStopped {
262+
t.Errorf("status after Stop = %q, want %q", got, streamStopped)
263+
}
264+
})
265+
266+
t.Run("status is errored after failed start", func(t *testing.T) {
267+
m, err := newStreamModule("bad-stream", map[string]any{
268+
"input": map[string]any{
269+
"unknown_input_type": map[string]any{
270+
"invalid": "config",
271+
},
272+
},
273+
"output": map[string]any{
274+
"drop": map[string]any{},
275+
},
276+
})
277+
if err != nil {
278+
t.Fatalf("newStreamModule() error = %v", err)
279+
}
280+
281+
if err := m.Init(); err != nil {
282+
t.Fatalf("Init() error = %v", err)
283+
}
284+
285+
ctx := context.Background()
286+
if err := m.Start(ctx); err == nil {
287+
t.Error("Start() expected error for invalid config, got nil")
288+
_ = m.Stop(context.Background())
289+
return
290+
}
291+
292+
if got := m.Status(); got != streamErrored {
293+
t.Errorf("status after failed Start = %q, want %q", got, streamErrored)
294+
}
295+
})
296+
}

0 commit comments

Comments
 (0)