From b95407b310ecacf1c673b872fdf582a64155a4bc Mon Sep 17 00:00:00 2001 From: Johan Lindh Date: Thu, 30 Apr 2026 13:55:46 +0200 Subject: [PATCH] fix(runner): flush collector data on shutdown --- pkg/runner/data_collector_test.go | 118 ++++++++++++++++++++ pkg/runner/runner.go | 173 ++++++++++++++++++++---------- 2 files changed, 233 insertions(+), 58 deletions(-) create mode 100644 pkg/runner/data_collector_test.go diff --git a/pkg/runner/data_collector_test.go b/pkg/runner/data_collector_test.go new file mode 100644 index 0000000..3c483cc --- /dev/null +++ b/pkg/runner/data_collector_test.go @@ -0,0 +1,118 @@ +package runner + +import ( + "io" + "log/slog" + "net/netip" + "sync" + "testing" + "time" + + "github.com/miekg/dns" + "github.com/smhanov/dawg" + "github.com/spaolacci/murmur3" +) + +func TestDataCollectorFlushesPendingDataOnShutdown(t *testing.T) { + edm, wkdTracker := newDataCollectorTestFixture(t, "example.com.") + + var wg sync.WaitGroup + wg.Add(1) + go edm.dataCollector(&wg, wkdTracker, "unused-in-shutdown-test.dawg") + + serverID := "serverID" + edm.sessionCollectorCh <- &sessionData{ServerID: &serverID} + + msg := new(dns.Msg) + msg.SetQuestion("example.com.", dns.TypeA) + ip := netip.MustParseAddr("198.51.100.10") + dawgIndex, suffixMatch, dawgModTime := wkdTracker.lookup(msg) + wkdTracker.updateCh <- wkdUpdate{ + dawgIndex: dawgIndex, + suffixMatch: suffixMatch, + dawgModTime: dawgModTime, + histogramData: histogramData{ + ACount: 1, + OKCount: 1, + }, + hllHash: murmur3.Sum64(ip.AsSlice()), + ip: ip, + } + + close(wkdTracker.stop) + waitOrFail(t, &wg, 2*time.Second, "dataCollector did not exit after stop") + + ps, ok := <-edm.sessionWriterCh + if !ok { + t.Fatal("sessionWriterCh closed without flushing pending session data") + } + if len(ps.sessions) != 1 { + t.Fatalf("flushed sessions have: %d, want: 1", len(ps.sessions)) + } + if ps.startTime.IsZero() { + t.Fatal("flushed sessions should carry the collector interval start") + } + if ps.rotationTime.Before(ps.startTime) { + t.Fatalf("session interval is inverted: start=%s stop=%s", ps.startTime, ps.rotationTime) + } + + prevWKD, ok := <-edm.histogramWriterCh + if !ok { + t.Fatal("histogramWriterCh closed without flushing pending histogram data") + } + if len(prevWKD.m) != 1 { + t.Fatalf("flushed histogram domains have: %d, want: 1", len(prevWKD.m)) + } + got := prevWKD.m[dawgIndex] + if got == nil { + t.Fatalf("flushed histogram missing DAWG index %d", dawgIndex) + return + } + if got.ACount != 1 || got.OKCount != 1 { + t.Fatalf("flushed histogram counts have A=%d OK=%d, want A=1 OK=1", got.ACount, got.OKCount) + } + if prevWKD.startTime.IsZero() { + t.Fatal("flushed histogram should carry the collector interval start") + } + if prevWKD.rotationTime.Before(prevWKD.startTime) { + t.Fatalf("histogram interval is inverted: start=%s stop=%s", prevWKD.startTime, prevWKD.rotationTime) + } +} + +func newDataCollectorTestFixture(t *testing.T, knownDomains ...string) (*dnstapMinimiser, *wellKnownDomainsTracker) { + t.Helper() + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + edm, err := newDnstapMinimiser(logger, defaultTC) + if err != nil { + t.Fatalf("newDnstapMinimiser: %s", err) + } + t.Cleanup(edm.stop) + + dBuilder := dawg.New() + for _, domain := range knownDomains { + dBuilder.Add(domain) + } + wkdTracker, err := newWellKnownDomainsTracker(dBuilder.Finish(), time.Unix(0, 0)) + if err != nil { + t.Fatalf("newWellKnownDomainsTracker: %s", err) + } + + return edm, wkdTracker +} + +func waitOrFail(t *testing.T, wg *sync.WaitGroup, timeout time.Duration, message string) { + t.Helper() + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-done: + case <-time.After(timeout): + t.Fatal(message) + } +} diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 17cb068..508b41f 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -251,6 +251,7 @@ type sessionData struct { type prevSessions struct { sessions []*sessionData + startTime time.Time rotationTime time.Time } @@ -1634,6 +1635,7 @@ type wellKnownDomainsData struct { // Store a pointer to histogramData so we can assign to it without // "cannot assign to struct field in map" issues m map[int]*histogramData + startTime time.Time rotationTime time.Time dawgFinder dawg.Finder dawgIsRotated bool @@ -1783,7 +1785,7 @@ func (wkd *wellKnownDomainsTracker) sendUpdate(ipBytes []byte, msg *dns.Msg, daw wkd.updateCh <- wu } -func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile string, rotationTime time.Time) (*wellKnownDomainsData, error) { +func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile string, startTime time.Time, rotationTime time.Time) (*wellKnownDomainsData, error) { dawgFileChanged := false var dawgFinder dawg.Finder @@ -1807,6 +1809,8 @@ func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile wkd.mutex.Lock() prevWKD.m = wkd.m prevWKD.dawgFinder = wkd.dawgFinder + prevWKD.startTime = startTime + prevWKD.rotationTime = rotationTime wkd.m = map[int]*histogramData{} if dawgFileChanged { wkd.dawgFinder = dawgFinder @@ -1815,8 +1819,6 @@ func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile } wkd.mutex.Unlock() - prevWKD.rotationTime = rotationTime - return prevWKD, nil } @@ -2169,7 +2171,7 @@ func (edm *dnstapMinimiser) createSessionFile(ps *prevSessions, dataDir string) // Write session file to a sessions dir where it can be read by other tools sessionsDir := filepath.Join(dataDir, "parquet", "sessions") - startTime := getStartTimeFromRotationTime(ps.rotationTime) + startTime := intervalStartFromTimes(ps.startTime, ps.rotationTime) absoluteTmpFileName, absoluteFileName := buildParquetFilenames(sessionsDir, "dns_session_block", startTime, ps.rotationTime) @@ -2242,7 +2244,7 @@ func (edm *dnstapMinimiser) sessionWriter(dataDir string, wg *sync.WaitGroup) { } func (edm *dnstapMinimiser) createHistogramFile(prevWellKnownDomainsData *wellKnownDomainsData, labelLimit int, outboxDir string) (string, error) { - startTime := getStartTimeFromRotationTime(prevWellKnownDomainsData.rotationTime) + startTime := intervalStartFromTimes(prevWellKnownDomainsData.startTime, prevWellKnownDomainsData.rotationTime) absoluteTmpFileName, absoluteFileName := buildParquetFilenames(outboxDir, "dns_histogram", startTime, prevWellKnownDomainsData.rotationTime) @@ -2643,6 +2645,13 @@ func getStartTimeFromRotationTime(rotationTime time.Time) time.Time { return rotationTime.Add(-time.Second * 60) } +func intervalStartFromTimes(startTime time.Time, rotationTime time.Time) time.Time { + if !startTime.IsZero() { + return startTime + } + return getStartTimeFromRotationTime(rotationTime) +} + // Unfortunately the hll library does not expose what format // the HLL is being stored in so figure things out manually. // @@ -2908,6 +2917,8 @@ func (edm *dnstapMinimiser) dataCollector(wg *sync.WaitGroup, wkd *wellKnownDoma go wkd.updateRetryer(edm, &retryerWg) sessions := []*sessionData{} + sessionIntervalStart := time.Now().UTC() + histogramIntervalStart := sessionIntervalStart ticker := time.NewTicker(timeUntilNextMinute()) defer ticker.Stop() @@ -2918,71 +2929,111 @@ func (edm *dnstapMinimiser) dataCollector(wg *sync.WaitGroup, wkd *wellKnownDoma hllSettings := getHllDefaults(conf.HistogramHLLExplicitThreshold) -collectorLoop: - for { - select { - case sd := <-edm.sessionCollectorCh: - sessions = append(sessions, sd) - sessionUpdated = true + processSession := func(sd *sessionData) { + if sd == nil { + return + } + sessions = append(sessions, sd) + sessionUpdated = true + } - case wu := <-wkd.updateCh: - // It is possible an update sitting in the queue has - // been created with an outdated dawgModTime due to a - // call to rotateTracker(). If this is the case we need - // to do a new lookup against the new dawg to make sure - // we have the correct index number (or if it is even - // present in the new dawg). - if wu.dawgModTime != wkd.dawgModTime { - if !retryChannelClosed { - wkd.retryCh <- wu - } else { - edm.log.Info("discarding retry of wkd update because we are shutting down") - } - continue + flushSessions := func(startTime time.Time, rotationTime time.Time) { + if !sessionUpdated { + return + } + ps := &prevSessions{ + sessions: sessions, + startTime: startTime, + rotationTime: rotationTime, + } + sessions = []*sessionData{} + sessionUpdated = false + edm.sessionWriterCh <- ps + } + + processWKDUpdate := func(wu wkdUpdate) { + // It is possible an update sitting in the queue has + // been created with an outdated dawgModTime due to a + // call to rotateTracker(). If this is the case we need + // to do a new lookup against the new dawg to make sure + // we have the correct index number (or if it is even + // present in the new dawg). + if wu.dawgModTime != wkd.dawgModTime { + if !retryChannelClosed { + wkd.retryCh <- wu + } else { + edm.log.Info("discarding retry of wkd update because we are shutting down") } + return + } + + if _, exists := wkd.m[wu.dawgIndex]; !exists { + wkd.m[wu.dawgIndex] = edm.newHistogramData(hllSettings, wu.suffixMatch) + } + + wkd.m[wu.dawgIndex].OKCount += wu.OKCount + wkd.m[wu.dawgIndex].NXCount += wu.NXCount + wkd.m[wu.dawgIndex].FailCount += wu.FailCount + wkd.m[wu.dawgIndex].ACount += wu.ACount + wkd.m[wu.dawgIndex].AAAACount += wu.AAAACount + wkd.m[wu.dawgIndex].MXCount += wu.MXCount + wkd.m[wu.dawgIndex].NSCount += wu.NSCount + wkd.m[wu.dawgIndex].OtherTypeCount += wu.OtherTypeCount + wkd.m[wu.dawgIndex].OtherRcodeCount += wu.OtherRcodeCount + wkd.m[wu.dawgIndex].NonINCount += wu.NonINCount - if _, exists := wkd.m[wu.dawgIndex]; !exists { - wkd.m[wu.dawgIndex] = edm.newHistogramData(hllSettings, wu.suffixMatch) + if wu.ip.IsValid() { + if wu.ip.Unmap().Is4() { + wkd.m[wu.dawgIndex].v4ClientHLL.AddRaw(wu.hllHash) + } else { + wkd.m[wu.dawgIndex].v6ClientHLL.AddRaw(wu.hllHash) } + } + } - wkd.m[wu.dawgIndex].OKCount += wu.OKCount - wkd.m[wu.dawgIndex].NXCount += wu.NXCount - wkd.m[wu.dawgIndex].FailCount += wu.FailCount - wkd.m[wu.dawgIndex].ACount += wu.ACount - wkd.m[wu.dawgIndex].AAAACount += wu.AAAACount - wkd.m[wu.dawgIndex].MXCount += wu.MXCount - wkd.m[wu.dawgIndex].NSCount += wu.NSCount - wkd.m[wu.dawgIndex].OtherTypeCount += wu.OtherTypeCount - wkd.m[wu.dawgIndex].OtherRcodeCount += wu.OtherRcodeCount - wkd.m[wu.dawgIndex].NonINCount += wu.NonINCount - - if wu.ip.IsValid() { - if wu.ip.Unmap().Is4() { - wkd.m[wu.dawgIndex].v4ClientHLL.AddRaw(wu.hllHash) - } else { - wkd.m[wu.dawgIndex].v6ClientHLL.AddRaw(wu.hllHash) - } + drainCollectorQueues := func() { + for { + select { + case sd := <-edm.sessionCollectorCh: + processSession(sd) + case wu := <-wkd.updateCh: + processWKDUpdate(wu) + default: + return } + } + } - case ts := <-ticker.C: - // We want to tick at the start of each minute - ticker.Reset(timeUntilNextMinute()) + flushHistogram := func(startTime time.Time, rotationTime time.Time) { + if len(wkd.m) == 0 { + return + } + edm.histogramWriterCh <- &wellKnownDomainsData{ + m: wkd.m, + startTime: startTime, + rotationTime: rotationTime, + dawgFinder: wkd.dawgFinder, + } + wkd.m = map[int]*histogramData{} + } - if sessionUpdated { - ps := &prevSessions{ - sessions: sessions, - rotationTime: ts, - } +collectorLoop: + for { + select { + case sd := <-edm.sessionCollectorCh: + processSession(sd) - sessions = []*sessionData{} + case wu := <-wkd.updateCh: + processWKDUpdate(wu) - // We have reset the sessions slice - sessionUpdated = false + case ts := <-ticker.C: + // We want to tick at the start of each minute + ticker.Reset(timeUntilNextMinute()) - edm.sessionWriterCh <- ps - } + flushSessions(sessionIntervalStart, ts) + sessionIntervalStart = ts - prevWKD, err := wkd.rotateTracker(edm, dawgFile, ts) + prevWKD, err := wkd.rotateTracker(edm, dawgFile, histogramIntervalStart, ts) if err != nil { edm.log.Error("unable to rotate histogram map", "error", err) continue @@ -2992,6 +3043,7 @@ collectorLoop: if len(prevWKD.m) > 0 { edm.histogramWriterCh <- prevWKD } + histogramIntervalStart = ts // See if we need to modify anything based on a config update conf = edm.getConfig() @@ -3010,8 +3062,13 @@ collectorLoop: // read from it again in this select statement now that // it is closed. wkd.stop = nil + case <-wkd.retryerDone: edm.log.Info("dataCollector: update retryer is done") + drainCollectorQueues() + shutdownTime := time.Now().UTC() + flushSessions(sessionIntervalStart, shutdownTime) + flushHistogram(histogramIntervalStart, shutdownTime) break collectorLoop } }