Skip to content

Commit 39cbce8

Browse files
committed
fix(audit): close logChan with RWMutex coordination to eliminate shutdown race
- Replace shutdownCh with closing logChan to signal the worker, so range-based drain is guaranteed to see all accepted events - Guard Log() sends with sendMu.RLock; Shutdown() takes sendMu.Lock after setting stopped, ensuring all in-flight sends complete before the channel is closed - Worker uses ok-check on receive to detect closed channel and flush
1 parent 5f99f6e commit 39cbce8

2 files changed

Lines changed: 26 additions & 28 deletions

File tree

internal/services/audit.go

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ type AuditService struct {
8383
batchTicker *time.Ticker
8484

8585
// Graceful shutdown
86-
wg sync.WaitGroup
87-
shutdownCh chan struct{}
88-
stopped atomic.Bool
86+
wg sync.WaitGroup
87+
sendMu sync.RWMutex // coordinates Log() senders with Shutdown()
88+
stopped atomic.Bool
8989

9090
// Prometheus counter for dropped events
9191
eventsDropped prometheus.Counter
@@ -102,7 +102,6 @@ func NewAuditService(s core.Store, bufferSize int) *AuditService {
102102
bufferSize: bufferSize,
103103
logChan: make(chan *models.AuditLog, bufferSize),
104104
batchBuffer: make([]*models.AuditLog, 0, 100),
105-
shutdownCh: make(chan struct{}),
106105
eventsDropped: getAuditEventsDroppedCounter(),
107106
}
108107

@@ -114,33 +113,25 @@ func NewAuditService(s core.Store, bufferSize int) *AuditService {
114113
return service
115114
}
116115

117-
// worker is the background goroutine that processes audit logs
116+
// worker is the background goroutine that processes audit logs.
117+
// It drains logChan until the channel is closed by Shutdown, then
118+
// flushes any remaining batch and exits.
118119
func (s *AuditService) worker() {
119120
defer s.wg.Done()
120121

121122
for {
122123
select {
123-
case log := <-s.logChan:
124-
s.addToBatch(log)
124+
case entry, ok := <-s.logChan:
125+
if !ok {
126+
// Channel closed by Shutdown — flush remaining batch.
127+
s.flushBatch()
128+
return
129+
}
130+
s.addToBatch(entry)
125131

126132
case <-s.batchTicker.C:
127133
// Flush batch every second
128134
s.flushBatch()
129-
130-
case <-s.shutdownCh:
131-
// Drain all queued entries that were accepted before shutdown
132-
// completed. Use a non-blocking receive loop rather than a
133-
// len() snapshot so entries enqueued concurrently after the
134-
// snapshot are still flushed.
135-
for {
136-
select {
137-
case entry := <-s.logChan:
138-
s.addToBatch(entry)
139-
default:
140-
s.flushBatch()
141-
return
142-
}
143-
}
144135
}
145136
}
146137
}
@@ -246,7 +237,12 @@ func (s *AuditService) buildAuditLog(
246237

247238
// Log records an audit log entry asynchronously.
248239
// Events submitted after Shutdown has been called are dropped.
240+
// The RWMutex ensures all in-flight sends complete before Shutdown
241+
// closes logChan, eliminating the send-on-closed-channel race.
249242
func (s *AuditService) Log(ctx context.Context, entry core.AuditLogEntry) {
243+
s.sendMu.RLock()
244+
defer s.sendMu.RUnlock()
245+
250246
if s.stopped.Load() {
251247
log.Printf("WARNING: Audit service stopped, dropping event: %s", entry.Action)
252248
s.eventsDropped.Inc()
@@ -287,16 +283,19 @@ func (s *AuditService) GetAuditLogStats(startTime, endTime time.Time) (store.Aud
287283

288284
// Shutdown gracefully shuts down the audit service
289285
func (s *AuditService) Shutdown(ctx context.Context) error {
290-
// Reject new events before draining the channel so nothing is
291-
// enqueued after the worker exits.
286+
// 1. Reject new events so future Log() calls return immediately.
292287
s.stopped.Store(true)
293288

289+
// 2. Wait for all in-flight Log() calls to finish, then close
290+
// logChan. The exclusive lock ensures no sender is mid-send
291+
// when the channel is closed.
292+
s.sendMu.Lock()
293+
close(s.logChan)
294+
s.sendMu.Unlock()
295+
294296
// Stop ticker
295297
s.batchTicker.Stop()
296298

297-
// Signal worker to stop
298-
close(s.shutdownCh)
299-
300299
// Wait for worker to finish with timeout
301300
done := make(chan struct{})
302301
go func() {

internal/services/audit_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ func TestShutdown_DrainsLogChan(t *testing.T) {
158158
bufferSize: 100,
159159
logChan: make(chan *models.AuditLog, 100),
160160
batchBuffer: make([]*models.AuditLog, 0, 100),
161-
shutdownCh: make(chan struct{}),
162161
eventsDropped: getAuditEventsDroppedCounter(),
163162
}
164163

0 commit comments

Comments
 (0)