Skip to content

Commit 2e524af

Browse files
Copilotintel352
andauthored
fix: observability lifecycle correctness — health on unexpected exit, uptime semantics, subscribe ordering (#13)
* Initial plan * fix: apply review feedback - lifecycle hooks, health on unexpected exit, subscribe ordering Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent 8695bee commit 2e524af

6 files changed

Lines changed: 95 additions & 33 deletions

File tree

internal/broker_module.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func (m *brokerModule) Init() error {
6868
// Start is a no-op; individual per-topic streams are created on demand.
6969
func (m *brokerModule) Start(_ context.Context) error {
7070
m.health.SetRunning(true)
71+
m.metrics.MarkStarted()
7172
m.log.LogStreamStart(m.transport)
7273
return nil
7374
}
@@ -88,6 +89,7 @@ func (m *brokerModule) Stop(ctx context.Context) error {
8889
m.streams = make(map[string]*service.Stream)
8990

9091
m.health.SetRunning(false)
92+
m.metrics.MarkStopped()
9193
snap := m.metrics.Snapshot()
9294
m.log.LogStreamStop(snap.MessagesIn+snap.MessagesOut,
9395
slog.String("transport", m.transport),
@@ -174,9 +176,19 @@ func (m *brokerModule) ensureStream(ctx context.Context, topic string) (*service
174176
)
175177

176178
go func() {
177-
if runErr := stream.Run(ctx); runErr != nil && ctx.Err() == nil {
178-
metrics.RecordError()
179-
log.LogStreamError(runErr, slog.String("topic", topic))
179+
if runErr := stream.Run(ctx); ctx.Err() == nil {
180+
// Stream exited without context cancellation; remove it from the
181+
// active streams map so it can be recreated on next access.
182+
m.mu.Lock()
183+
delete(m.streams, topic)
184+
m.mu.Unlock()
185+
if runErr != nil {
186+
metrics.RecordError()
187+
log.LogStreamError(runErr, slog.String("topic", topic))
188+
}
189+
log.LogTopicEvent("stream_stopped", topic,
190+
slog.String("reason", "run_exited"),
191+
)
180192
}
181193
}()
182194

internal/input_module.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,20 @@ func (m *inputModule) Start(ctx context.Context) error {
136136
m.cancel = cancel
137137

138138
m.health.SetRunning(true)
139+
m.metrics.MarkStarted()
139140
m.log.LogStreamStart("bento.input",
140141
slog.String("target_topic", m.targetTopic),
141142
slog.String("target_broker", m.targetBroker),
142143
)
143144

144145
go func() {
145146
defer close(m.done)
146-
if runErr := stream.Run(runCtx); runErr != nil && runCtx.Err() == nil {
147-
m.metrics.RecordError()
148-
m.log.LogStreamError(runErr)
147+
if runErr := stream.Run(runCtx); runCtx.Err() == nil {
148+
m.health.SetRunning(false)
149+
if runErr != nil {
150+
m.metrics.RecordError()
151+
m.log.LogStreamError(runErr)
152+
}
149153
}
150154
}()
151155

@@ -171,6 +175,7 @@ func (m *inputModule) Stop(ctx context.Context) error {
171175
}
172176

173177
m.health.SetRunning(false)
178+
m.metrics.MarkStopped()
174179
snap := m.metrics.Snapshot()
175180
m.log.LogStreamStop(snap.MessagesOut,
176181
slog.String("target_topic", m.targetTopic),

internal/logger_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ func TestMetricsRecordError(t *testing.T) {
189189

190190
func TestMetricsUptime(t *testing.T) {
191191
m := newStreamMetrics()
192+
m.MarkStarted()
192193
time.Sleep(5 * time.Millisecond)
193194
uptime := m.Uptime()
194195

@@ -215,6 +216,7 @@ func TestMetricsLastMessageTime(t *testing.T) {
215216

216217
func TestMetricsSnapshot(t *testing.T) {
217218
m := newStreamMetrics()
219+
m.MarkStarted()
218220
m.RecordMessageIn()
219221
m.RecordMessageIn()
220222
m.RecordMessageOut()

internal/metrics.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,42 @@ import (
88

99
// StreamMetrics tracks runtime statistics for a single stream or module.
1010
// All counter operations are thread-safe via atomic primitives; the
11-
// startTime/lastMessageTime fields are guarded by a small mutex.
11+
// startTime/stopTime/lastMessageTime fields are guarded by a small mutex.
1212
type StreamMetrics struct {
1313
messagesIn atomic.Int64
1414
messagesOut atomic.Int64
1515
errors atomic.Int64
1616

1717
mu sync.Mutex
1818
startTime time.Time
19+
stopTime time.Time
1920
lastMessageTime time.Time
2021
}
2122

22-
// newStreamMetrics creates a StreamMetrics instance with the start time set
23-
// to the current wall clock.
23+
// newStreamMetrics creates a new StreamMetrics instance. Call MarkStarted
24+
// when the associated module begins running.
2425
func newStreamMetrics() *StreamMetrics {
25-
m := &StreamMetrics{}
26+
return &StreamMetrics{}
27+
}
28+
29+
// MarkStarted records the moment the stream begins running, resetting any
30+
// previously frozen stop time. Call this from module Start().
31+
func (m *StreamMetrics) MarkStarted() {
32+
m.mu.Lock()
2633
m.startTime = time.Now()
27-
return m
34+
m.stopTime = time.Time{}
35+
m.mu.Unlock()
36+
}
37+
38+
// MarkStopped freezes the uptime at the current wall clock. After this call
39+
// Uptime() returns a stable value rather than continuing to grow. Call this
40+
// from module Stop().
41+
func (m *StreamMetrics) MarkStopped() {
42+
m.mu.Lock()
43+
if !m.startTime.IsZero() {
44+
m.stopTime = time.Now()
45+
}
46+
m.mu.Unlock()
2847
}
2948

3049
// RecordMessageIn increments the inbound message counter and updates
@@ -65,13 +84,18 @@ func (m *StreamMetrics) Errors() int64 {
6584
return m.errors.Load()
6685
}
6786

68-
// Uptime returns how long the stream has been running since Start was called.
87+
// Uptime returns how long the stream was running. If MarkStopped has been
88+
// called the duration is frozen; otherwise it grows from MarkStarted.
89+
// Returns 0 if MarkStarted has not been called.
6990
func (m *StreamMetrics) Uptime() time.Duration {
7091
m.mu.Lock()
7192
defer m.mu.Unlock()
7293
if m.startTime.IsZero() {
7394
return 0
7495
}
96+
if !m.stopTime.IsZero() {
97+
return m.stopTime.Sub(m.startTime)
98+
}
7599
return time.Since(m.startTime)
76100
}
77101

@@ -98,7 +122,11 @@ func (m *StreamMetrics) Snapshot() MetricsSnapshot {
98122
m.mu.Lock()
99123
uptime := time.Duration(0)
100124
if !m.startTime.IsZero() {
101-
uptime = time.Since(m.startTime)
125+
if !m.stopTime.IsZero() {
126+
uptime = m.stopTime.Sub(m.startTime)
127+
} else {
128+
uptime = time.Since(m.startTime)
129+
}
102130
}
103131
lastMsg := m.lastMessageTime
104132
m.mu.Unlock()

internal/output_module.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -105,22 +105,9 @@ func (m *outputModule) Start(ctx context.Context) error {
105105
runCtx, cancel := context.WithCancel(context.Background())
106106
m.cancel = cancel
107107

108-
m.health.SetRunning(true)
109-
m.log.LogStreamStart("bento.output",
110-
slog.String("source_topic", m.sourceTopic),
111-
slog.String("source_broker", m.sourceBroker),
112-
)
113-
114-
go func() {
115-
defer close(m.done)
116-
if runErr := stream.Run(runCtx); runErr != nil && runCtx.Err() == nil {
117-
m.metrics.RecordError()
118-
m.log.LogStreamError(runErr)
119-
}
120-
}()
121-
122-
// Subscribe to the host EventBus topic. When messages arrive, forward them
123-
// to the Bento producer.
108+
// Subscribe to the host EventBus topic before starting the stream. When
109+
// messages arrive, forward them to the Bento producer. Subscribing first
110+
// avoids leaking the stream goroutine if Subscribe returns an error.
124111
producerFnRef := m.producerFn
125112
metrics := m.metrics
126113
log := m.log
@@ -140,9 +127,29 @@ func (m *outputModule) Start(ctx context.Context) error {
140127
log.LogMessageProcessed(sourceTopic)
141128
return nil
142129
}); err != nil {
130+
// Cancel the context since the stream goroutine was never launched.
131+
cancel()
143132
return fmt.Errorf("bento.output %q: subscribe to topic %q: %w", m.name, m.sourceTopic, err)
144133
}
145134

135+
m.health.SetRunning(true)
136+
m.metrics.MarkStarted()
137+
m.log.LogStreamStart("bento.output",
138+
slog.String("source_topic", m.sourceTopic),
139+
slog.String("source_broker", m.sourceBroker),
140+
)
141+
142+
go func() {
143+
defer close(m.done)
144+
if runErr := stream.Run(runCtx); runCtx.Err() == nil {
145+
m.health.SetRunning(false)
146+
if runErr != nil {
147+
m.metrics.RecordError()
148+
m.log.LogStreamError(runErr)
149+
}
150+
}
151+
}()
152+
146153
return nil
147154
}
148155

@@ -168,6 +175,7 @@ func (m *outputModule) Stop(ctx context.Context) error {
168175
}
169176

170177
m.health.SetRunning(false)
178+
m.metrics.MarkStopped()
171179
snap := m.metrics.Snapshot()
172180
m.log.LogStreamStop(snap.MessagesIn,
173181
slog.String("source_topic", m.sourceTopic),

internal/stream_module.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,20 @@ func (m *streamModule) Start(ctx context.Context) error {
6262
m.cancel = cancel
6363

6464
m.health.SetRunning(true)
65+
m.metrics.MarkStarted()
6566

6667
m.log.LogStreamStart("bento",
6768
slog.Int("config_keys", len(m.config)),
6869
)
6970

7071
go func() {
7172
defer close(m.done)
72-
if runErr := stream.Run(runCtx); runErr != nil && runCtx.Err() == nil {
73-
m.metrics.RecordError()
74-
m.log.LogStreamError(runErr)
73+
if runErr := stream.Run(runCtx); runCtx.Err() == nil {
74+
m.health.SetRunning(false)
75+
if runErr != nil {
76+
m.metrics.RecordError()
77+
m.log.LogStreamError(runErr)
78+
}
7579
}
7680
}()
7781

@@ -97,9 +101,12 @@ func (m *streamModule) Stop(ctx context.Context) error {
97101
}
98102

99103
m.health.SetRunning(false)
104+
m.metrics.MarkStopped()
100105
snap := m.metrics.Snapshot()
101-
m.log.LogStreamStop(snap.Errors,
106+
// Stream modules don't intercept individual messages, so messages_processed is 0.
107+
m.log.LogStreamStop(0,
102108
slog.Duration("uptime", snap.Uptime),
109+
slog.Int64("errors", snap.Errors),
103110
)
104111

105112
return nil

0 commit comments

Comments
 (0)