From 87f60d847d37c6dbbb644fd73dc494e07cb74780 Mon Sep 17 00:00:00 2001 From: Johan Lindh Date: Thu, 30 Apr 2026 13:59:00 +0200 Subject: [PATCH] fix(runner): track fs event watcher shutdown --- pkg/runner/runner.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 17cb068..0988d24 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -867,7 +867,8 @@ func (edm *dnstapMinimiser) getNumIgnoredClientCIDRs() uint64 { return edm.ignoredClientCIDRsParsed } -func (edm *dnstapMinimiser) fsEventWatcher() { +func (edm *dnstapMinimiser) fsEventWatcher(wg *sync.WaitGroup) { + defer wg.Done() // Like in // https://github.com/fsnotify/fsnotify/blob/main/cmd/fsnotify/dedup.go // we keep a timer per registered filename @@ -1126,7 +1127,15 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) { os.Exit(1) } defer edm.stop() - defer edm.fsWatcher.Close() + defer func() { + // Safety net: if Run() exits early before the explicit + // fsWatcher.Close() below wg.Wait(), close it here. + if edm.fsWatcher != nil { + if err := edm.fsWatcher.Close(); err != nil { + edm.log.Error("Run: deferred fsWatcher.Close error", "error", err) + } + } + }() // Create startConf for some initial setup. Other edm methods that need // to read the config should call edm.getConfig() internally so they @@ -1209,8 +1218,6 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) { os.Exit(1) } - go edm.fsEventWatcher() - // Setup the dnstap.Input, only one at a time is supported. var dti *dnstap.FrameStreamSockInput if startConf.InputUnix != "" { @@ -1303,6 +1310,12 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) { var wg sync.WaitGroup + // Track the fsEventWatcher goroutine so it can be properly synchronized + // during shutdown. It exits when fsWatcher.Close() is called (which is + // deferred in Run()). + wg.Add(1) + go edm.fsEventWatcher(&wg) + // Write histogram file to an outbox dir where it will get picked up by // the histogram sender. Upon being sent it will be moved to the sent dir. dataDir := startConf.DataDir @@ -1411,9 +1424,16 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) { edm.autopahoCancel() } + // Close fsWatcher to signal fsEventWatcher to exit. Must happen before + // wg.Wait() since fsEventWatcher is now tracked in wg. + if err := edm.fsWatcher.Close(); err != nil { + edm.log.Error("Run: fsWatcher.Close error", "error", err) + } + // Wait for all workers to exit edm.log.Info("Run: waiting for other workers to exit") wg.Wait() + edm.fsWatcher = nil // Wait for graceful disconnection from MQTT bus if !startConf.DisableMQTT {