@@ -21,6 +21,9 @@ type brokerModule struct {
2121 subscriber sdk.MessageSubscriber
2222 streams map [string ]* service.Stream
2323 mu sync.RWMutex
24+ log * bentoLogger
25+ metrics * StreamMetrics
26+ health * healthTracker
2427}
2528
2629// SetMessagePublisher satisfies the MessageAwareModule interface.
@@ -34,10 +37,14 @@ func (m *brokerModule) SetMessageSubscriber(sub sdk.MessageSubscriber) {
3437}
3538
3639func newBrokerModule (name string , config map [string ]any ) (* brokerModule , error ) {
40+ metrics := newStreamMetrics ()
3741 return & brokerModule {
3842 name : name ,
3943 config : config ,
4044 streams : make (map [string ]* service.Stream ),
45+ log : newLogger ("bento.broker" , name ),
46+ metrics : metrics ,
47+ health : newHealthTracker (metrics ),
4148 }, nil
4249}
4350
@@ -60,35 +67,50 @@ func (m *brokerModule) Init() error {
6067
6168// Start is a no-op; individual per-topic streams are created on demand.
6269func (m * brokerModule ) Start (_ context.Context ) error {
70+ m .health .SetRunning (true )
71+ m .metrics .MarkStarted ()
72+ m .log .LogStreamStart (m .transport )
6373 return nil
6474}
6575
6676// Stop shuts down all managed streams.
6777func (m * brokerModule ) Stop (ctx context.Context ) error {
78+ // Copy the streams map under the lock, then release it before calling
79+ // stream.Stop to avoid holding the lock during potentially blocking I/O
80+ // (deadlock risk if a stream goroutine also acquires the lock).
6881 m .mu .Lock ()
69- defer m .mu .Unlock ()
70-
71- slog .Info ("stopping bento broker" , "module" , m .name , "topics" , len (m .streams ))
82+ toStop := make (map [string ]* service.Stream , len (m .streams ))
83+ for topic , s := range m .streams {
84+ toStop [topic ] = s
85+ }
86+ m .streams = make (map [string ]* service.Stream )
87+ m .mu .Unlock ()
7288
7389 var firstErr error
74- for topic , stream := range m .streams {
75- if err := stream .Stop (ctx ); err != nil {
76- slog .Error ("failed to stop broker stream" , "error" , err , "module" , m .name , "topic" , topic )
77- if firstErr == nil {
78- firstErr = fmt .Errorf ("bento.broker %q: stop stream for topic %q: %w" , m .name , topic , err )
79- }
80- continue
90+ for topic , stream := range toStop {
91+ if err := stream .Stop (ctx ); err != nil && firstErr == nil {
92+ m .metrics .RecordError ()
93+ m .log .LogStreamError (err , slog .String ("topic" , topic ))
94+ firstErr = fmt .Errorf ("bento.broker %q: stop stream for topic %q: %w" , m .name , topic , err )
8195 }
82- slog .Info ("broker stream stopped" , "module" , m .name , "topic" , topic )
8396 }
84- m .streams = make (map [string ]* service.Stream )
97+
98+ m .health .SetRunning (false )
99+ m .metrics .MarkStopped ()
100+ snap := m .metrics .Snapshot ()
101+ m .log .LogStreamStop (snap .MessagesIn + snap .MessagesOut ,
102+ slog .String ("transport" , m .transport ),
103+ slog .Duration ("uptime" , snap .Uptime ),
104+ slog .Int64 ("errors" , snap .Errors ),
105+ )
106+
85107 return firstErr
86108}
87109
88110// ensureStream returns (creating if necessary) a running stream for topic.
89111// This is used internally when the broker needs a dedicated in-process pipe.
90112//
91- //nolint:unused // Reserved for future on-demand topic routing implementation .
113+ //nolint:unused // Reserved for future use by broker consumers .
92114func (m * brokerModule ) ensureStream (ctx context.Context , topic string ) (* service.Stream , error ) {
93115 m .mu .RLock ()
94116 if s , ok := m .streams [topic ]; ok {
@@ -105,8 +127,6 @@ func (m *brokerModule) ensureStream(ctx context.Context, topic string) (*service
105127 return s , nil
106128 }
107129
108- slog .Info ("creating broker stream" , "module" , m .name , "topic" , topic , "transport" , m .transport )
109-
110130 // Build a simple in-memory stream that holds messages for this topic.
111131 // The actual transport is configured via transportConfig / transport.
112132 builder := service .NewStreamBuilder ()
@@ -123,28 +143,31 @@ func (m *brokerModule) ensureStream(ctx context.Context, topic string) (*service
123143 }
124144
125145 pub := m .publisher
126- moduleName := m .name
146+ metrics := m .metrics
147+ log := m .log
127148
128149 if pub != nil {
129150 if err := builder .AddConsumerFunc (func (_ context.Context , msg * service.Message ) error {
130151 payload , msgErr := msg .AsBytes ()
131152 if msgErr != nil {
132- slog .Error ("failed to read broker message" , "error" , msgErr , "module" , moduleName , "topic" , topic )
153+ metrics .RecordError ()
154+ log .LogStreamError (msgErr , slog .String ("topic" , topic ))
133155 return msgErr
134156 }
135157 meta := map [string ]string {}
136158 _ = msg .MetaWalkMut (func (k string , v any ) error {
137159 meta [k ] = fmt .Sprintf ("%v" , v )
138160 return nil
139161 })
140-
141- slog .Debug ("broker forwarding message" , "module" , moduleName , "topic" , topic , "size" , len (payload ))
142-
143162 _ , pubErr := pub .Publish (topic , payload , meta )
144163 if pubErr != nil {
145- slog .Error ("failed to publish from broker" , "error" , pubErr , "module" , moduleName , "topic" , topic )
164+ metrics .RecordError ()
165+ log .LogStreamError (pubErr , slog .String ("phase" , "publish" ), slog .String ("topic" , topic ))
166+ return pubErr
146167 }
147- return pubErr
168+ metrics .RecordMessageOut ()
169+ log .LogMessageProcessed (topic )
170+ return nil
148171 }); err != nil {
149172 return nil , fmt .Errorf ("add consumer for topic %q: %w" , topic , err )
150173 }
@@ -155,13 +178,32 @@ func (m *brokerModule) ensureStream(ctx context.Context, topic string) (*service
155178 return nil , fmt .Errorf ("build stream for topic %q: %w" , topic , err )
156179 }
157180
181+ m .log .LogTopicEvent ("stream_created" , topic ,
182+ slog .String ("transport" , m .transport ),
183+ )
184+
158185 go func () {
159- if err := stream .Run (ctx ); err != nil && ctx .Err () == nil {
160- slog .Error ("broker stream failed" , "error" , err , "module" , moduleName , "topic" , topic )
186+ if runErr := stream .Run (ctx ); ctx .Err () == nil {
187+ // Stream exited without context cancellation; remove it from the
188+ // active streams map so it can be recreated on next access.
189+ m .mu .Lock ()
190+ delete (m .streams , topic )
191+ m .mu .Unlock ()
192+ if runErr != nil {
193+ metrics .RecordError ()
194+ log .LogStreamError (runErr , slog .String ("topic" , topic ))
195+ }
196+ log .LogTopicEvent ("stream_stopped" , topic ,
197+ slog .String ("reason" , "run_exited" ),
198+ )
161199 }
162200 }()
163201
164202 m .streams [topic ] = stream
165- slog .Info ("broker stream created" , "module" , m .name , "topic" , topic )
166203 return stream , nil
167204}
205+
206+ // Health returns the current health report for this broker module.
207+ func (m * brokerModule ) Health () HealthReport {
208+ return m .health .Report ()
209+ }
0 commit comments