Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,8 @@ func (edm *dnstapMinimiser) getNumIgnoredClientCIDRs() uint64 {
return edm.ignoredClientCIDRsParsed
}

func (edm *dnstapMinimiser) fsEventWatcher() {
func (edm *dnstapMinimiser) fsEventWatcher(wg *sync.WaitGroup) {
defer wg.Done()
// Like in
// https://github.com/fsnotify/fsnotify/blob/main/cmd/fsnotify/dedup.go
// we keep a timer per registered filename
Expand Down Expand Up @@ -1126,7 +1127,15 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) {
os.Exit(1)
}
defer edm.stop()
defer edm.fsWatcher.Close()
defer func() {
// Safety net: if Run() exits early before the explicit
// fsWatcher.Close() below wg.Wait(), close it here.
if edm.fsWatcher != nil {
if err := edm.fsWatcher.Close(); err != nil {
edm.log.Error("Run: deferred fsWatcher.Close error", "error", err)
}
}
}()

// Create startConf for some initial setup. Other edm methods that need
// to read the config should call edm.getConfig() internally so they
Expand Down Expand Up @@ -1209,8 +1218,6 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) {
os.Exit(1)
}

go edm.fsEventWatcher()

// Setup the dnstap.Input, only one at a time is supported.
var dti *dnstap.FrameStreamSockInput
if startConf.InputUnix != "" {
Expand Down Expand Up @@ -1303,6 +1310,12 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) {

var wg sync.WaitGroup

// Track the fsEventWatcher goroutine so it can be properly synchronized
// during shutdown. It exits when fsWatcher.Close() is called (which is
// deferred in Run()).
wg.Add(1)
go edm.fsEventWatcher(&wg)

// Write histogram file to an outbox dir where it will get picked up by
// the histogram sender. Upon being sent it will be moved to the sent dir.
dataDir := startConf.DataDir
Expand Down Expand Up @@ -1411,9 +1424,16 @@ func Run(logger *slog.Logger, loggerLevel *slog.LevelVar) {
edm.autopahoCancel()
}

// Close fsWatcher to signal fsEventWatcher to exit. Must happen before
// wg.Wait() since fsEventWatcher is now tracked in wg.
if err := edm.fsWatcher.Close(); err != nil {
edm.log.Error("Run: fsWatcher.Close error", "error", err)
}

// Wait for all workers to exit
edm.log.Info("Run: waiting for other workers to exit")
wg.Wait()
edm.fsWatcher = nil

// Wait for graceful disconnection from MQTT bus
if !startConf.DisableMQTT {
Expand Down