Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions pkg/runner/data_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package runner

import (
"io"
"log/slog"
"net/netip"
"sync"
"testing"
"time"

"github.com/miekg/dns"
"github.com/smhanov/dawg"
"github.com/spaolacci/murmur3"
)

func TestDataCollectorFlushesPendingDataOnShutdown(t *testing.T) {
edm, wkdTracker := newDataCollectorTestFixture(t, "example.com.")

var wg sync.WaitGroup
wg.Add(1)
go edm.dataCollector(&wg, wkdTracker, "unused-in-shutdown-test.dawg")

serverID := "serverID"
edm.sessionCollectorCh <- &sessionData{ServerID: &serverID}

msg := new(dns.Msg)
msg.SetQuestion("example.com.", dns.TypeA)
ip := netip.MustParseAddr("198.51.100.10")
dawgIndex, suffixMatch, dawgModTime := wkdTracker.lookup(msg)
wkdTracker.updateCh <- wkdUpdate{
dawgIndex: dawgIndex,
suffixMatch: suffixMatch,
dawgModTime: dawgModTime,
histogramData: histogramData{
ACount: 1,
OKCount: 1,
},
hllHash: murmur3.Sum64(ip.AsSlice()),
ip: ip,
}

close(wkdTracker.stop)
waitOrFail(t, &wg, 2*time.Second, "dataCollector did not exit after stop")

ps, ok := <-edm.sessionWriterCh
if !ok {
t.Fatal("sessionWriterCh closed without flushing pending session data")
}
if len(ps.sessions) != 1 {
t.Fatalf("flushed sessions have: %d, want: 1", len(ps.sessions))
}
if ps.startTime.IsZero() {
t.Fatal("flushed sessions should carry the collector interval start")
}
if ps.rotationTime.Before(ps.startTime) {
t.Fatalf("session interval is inverted: start=%s stop=%s", ps.startTime, ps.rotationTime)
}

prevWKD, ok := <-edm.histogramWriterCh
if !ok {
t.Fatal("histogramWriterCh closed without flushing pending histogram data")
}
if len(prevWKD.m) != 1 {
t.Fatalf("flushed histogram domains have: %d, want: 1", len(prevWKD.m))
}
got := prevWKD.m[dawgIndex]
if got == nil {
t.Fatalf("flushed histogram missing DAWG index %d", dawgIndex)
return
}
if got.ACount != 1 || got.OKCount != 1 {
t.Fatalf("flushed histogram counts have A=%d OK=%d, want A=1 OK=1", got.ACount, got.OKCount)
}
if prevWKD.startTime.IsZero() {
t.Fatal("flushed histogram should carry the collector interval start")
}
if prevWKD.rotationTime.Before(prevWKD.startTime) {
t.Fatalf("histogram interval is inverted: start=%s stop=%s", prevWKD.startTime, prevWKD.rotationTime)
}
}

func newDataCollectorTestFixture(t *testing.T, knownDomains ...string) (*dnstapMinimiser, *wellKnownDomainsTracker) {
t.Helper()

logger := slog.New(slog.NewTextHandler(io.Discard, nil))
edm, err := newDnstapMinimiser(logger, defaultTC)
if err != nil {
t.Fatalf("newDnstapMinimiser: %s", err)
}
t.Cleanup(edm.stop)

dBuilder := dawg.New()
for _, domain := range knownDomains {
dBuilder.Add(domain)
}
wkdTracker, err := newWellKnownDomainsTracker(dBuilder.Finish(), time.Unix(0, 0))
if err != nil {
t.Fatalf("newWellKnownDomainsTracker: %s", err)
}

return edm, wkdTracker
}

func waitOrFail(t *testing.T, wg *sync.WaitGroup, timeout time.Duration, message string) {
t.Helper()

done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()

select {
case <-done:
case <-time.After(timeout):
t.Fatal(message)
}
}
173 changes: 115 additions & 58 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ type sessionData struct {

type prevSessions struct {
sessions []*sessionData
startTime time.Time
rotationTime time.Time
}

Expand Down Expand Up @@ -1634,6 +1635,7 @@ type wellKnownDomainsData struct {
// Store a pointer to histogramData so we can assign to it without
// "cannot assign to struct field in map" issues
m map[int]*histogramData
startTime time.Time
rotationTime time.Time
dawgFinder dawg.Finder
dawgIsRotated bool
Expand Down Expand Up @@ -1783,7 +1785,7 @@ func (wkd *wellKnownDomainsTracker) sendUpdate(ipBytes []byte, msg *dns.Msg, daw
wkd.updateCh <- wu
}

func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile string, rotationTime time.Time) (*wellKnownDomainsData, error) {
func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile string, startTime time.Time, rotationTime time.Time) (*wellKnownDomainsData, error) {
dawgFileChanged := false
var dawgFinder dawg.Finder

Expand All @@ -1807,6 +1809,8 @@ func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile
wkd.mutex.Lock()
prevWKD.m = wkd.m
prevWKD.dawgFinder = wkd.dawgFinder
prevWKD.startTime = startTime
prevWKD.rotationTime = rotationTime
wkd.m = map[int]*histogramData{}
if dawgFileChanged {
wkd.dawgFinder = dawgFinder
Expand All @@ -1815,8 +1819,6 @@ func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile
}
wkd.mutex.Unlock()

prevWKD.rotationTime = rotationTime

return prevWKD, nil
}

Expand Down Expand Up @@ -2169,7 +2171,7 @@ func (edm *dnstapMinimiser) createSessionFile(ps *prevSessions, dataDir string)
// Write session file to a sessions dir where it can be read by other tools
sessionsDir := filepath.Join(dataDir, "parquet", "sessions")

startTime := getStartTimeFromRotationTime(ps.rotationTime)
startTime := intervalStartFromTimes(ps.startTime, ps.rotationTime)

absoluteTmpFileName, absoluteFileName := buildParquetFilenames(sessionsDir, "dns_session_block", startTime, ps.rotationTime)

Expand Down Expand Up @@ -2242,7 +2244,7 @@ func (edm *dnstapMinimiser) sessionWriter(dataDir string, wg *sync.WaitGroup) {
}

func (edm *dnstapMinimiser) createHistogramFile(prevWellKnownDomainsData *wellKnownDomainsData, labelLimit int, outboxDir string) (string, error) {
startTime := getStartTimeFromRotationTime(prevWellKnownDomainsData.rotationTime)
startTime := intervalStartFromTimes(prevWellKnownDomainsData.startTime, prevWellKnownDomainsData.rotationTime)

absoluteTmpFileName, absoluteFileName := buildParquetFilenames(outboxDir, "dns_histogram", startTime, prevWellKnownDomainsData.rotationTime)

Expand Down Expand Up @@ -2643,6 +2645,13 @@ func getStartTimeFromRotationTime(rotationTime time.Time) time.Time {
return rotationTime.Add(-time.Second * 60)
}

func intervalStartFromTimes(startTime time.Time, rotationTime time.Time) time.Time {
if !startTime.IsZero() {
return startTime
}
return getStartTimeFromRotationTime(rotationTime)
}

// Unfortunately the hll library does not expose what format
// the HLL is being stored in so figure things out manually.
//
Expand Down Expand Up @@ -2908,6 +2917,8 @@ func (edm *dnstapMinimiser) dataCollector(wg *sync.WaitGroup, wkd *wellKnownDoma
go wkd.updateRetryer(edm, &retryerWg)

sessions := []*sessionData{}
sessionIntervalStart := time.Now().UTC()
histogramIntervalStart := sessionIntervalStart

ticker := time.NewTicker(timeUntilNextMinute())
defer ticker.Stop()
Expand All @@ -2918,71 +2929,111 @@ func (edm *dnstapMinimiser) dataCollector(wg *sync.WaitGroup, wkd *wellKnownDoma

hllSettings := getHllDefaults(conf.HistogramHLLExplicitThreshold)

collectorLoop:
for {
select {
case sd := <-edm.sessionCollectorCh:
sessions = append(sessions, sd)
sessionUpdated = true
processSession := func(sd *sessionData) {
if sd == nil {
return
}
sessions = append(sessions, sd)
sessionUpdated = true
}

case wu := <-wkd.updateCh:
// It is possible an update sitting in the queue has
// been created with an outdated dawgModTime due to a
// call to rotateTracker(). If this is the case we need
// to do a new lookup against the new dawg to make sure
// we have the correct index number (or if it is even
// present in the new dawg).
if wu.dawgModTime != wkd.dawgModTime {
if !retryChannelClosed {
wkd.retryCh <- wu
} else {
edm.log.Info("discarding retry of wkd update because we are shutting down")
}
continue
flushSessions := func(startTime time.Time, rotationTime time.Time) {
if !sessionUpdated {
return
}
ps := &prevSessions{
sessions: sessions,
startTime: startTime,
rotationTime: rotationTime,
}
sessions = []*sessionData{}
sessionUpdated = false
edm.sessionWriterCh <- ps
}

processWKDUpdate := func(wu wkdUpdate) {
// It is possible an update sitting in the queue has
// been created with an outdated dawgModTime due to a
// call to rotateTracker(). If this is the case we need
// to do a new lookup against the new dawg to make sure
// we have the correct index number (or if it is even
// present in the new dawg).
if wu.dawgModTime != wkd.dawgModTime {
if !retryChannelClosed {
wkd.retryCh <- wu
} else {
edm.log.Info("discarding retry of wkd update because we are shutting down")
}
return
}

if _, exists := wkd.m[wu.dawgIndex]; !exists {
wkd.m[wu.dawgIndex] = edm.newHistogramData(hllSettings, wu.suffixMatch)
}

wkd.m[wu.dawgIndex].OKCount += wu.OKCount
wkd.m[wu.dawgIndex].NXCount += wu.NXCount
wkd.m[wu.dawgIndex].FailCount += wu.FailCount
wkd.m[wu.dawgIndex].ACount += wu.ACount
wkd.m[wu.dawgIndex].AAAACount += wu.AAAACount
wkd.m[wu.dawgIndex].MXCount += wu.MXCount
wkd.m[wu.dawgIndex].NSCount += wu.NSCount
wkd.m[wu.dawgIndex].OtherTypeCount += wu.OtherTypeCount
wkd.m[wu.dawgIndex].OtherRcodeCount += wu.OtherRcodeCount
wkd.m[wu.dawgIndex].NonINCount += wu.NonINCount

if _, exists := wkd.m[wu.dawgIndex]; !exists {
wkd.m[wu.dawgIndex] = edm.newHistogramData(hllSettings, wu.suffixMatch)
if wu.ip.IsValid() {
if wu.ip.Unmap().Is4() {
wkd.m[wu.dawgIndex].v4ClientHLL.AddRaw(wu.hllHash)
} else {
wkd.m[wu.dawgIndex].v6ClientHLL.AddRaw(wu.hllHash)
}
}
}

wkd.m[wu.dawgIndex].OKCount += wu.OKCount
wkd.m[wu.dawgIndex].NXCount += wu.NXCount
wkd.m[wu.dawgIndex].FailCount += wu.FailCount
wkd.m[wu.dawgIndex].ACount += wu.ACount
wkd.m[wu.dawgIndex].AAAACount += wu.AAAACount
wkd.m[wu.dawgIndex].MXCount += wu.MXCount
wkd.m[wu.dawgIndex].NSCount += wu.NSCount
wkd.m[wu.dawgIndex].OtherTypeCount += wu.OtherTypeCount
wkd.m[wu.dawgIndex].OtherRcodeCount += wu.OtherRcodeCount
wkd.m[wu.dawgIndex].NonINCount += wu.NonINCount

if wu.ip.IsValid() {
if wu.ip.Unmap().Is4() {
wkd.m[wu.dawgIndex].v4ClientHLL.AddRaw(wu.hllHash)
} else {
wkd.m[wu.dawgIndex].v6ClientHLL.AddRaw(wu.hllHash)
}
drainCollectorQueues := func() {
for {
select {
case sd := <-edm.sessionCollectorCh:
processSession(sd)
case wu := <-wkd.updateCh:
processWKDUpdate(wu)
default:
return
}
}
}

case ts := <-ticker.C:
// We want to tick at the start of each minute
ticker.Reset(timeUntilNextMinute())
flushHistogram := func(startTime time.Time, rotationTime time.Time) {
if len(wkd.m) == 0 {
return
}
edm.histogramWriterCh <- &wellKnownDomainsData{
m: wkd.m,
startTime: startTime,
rotationTime: rotationTime,
dawgFinder: wkd.dawgFinder,
}
wkd.m = map[int]*histogramData{}
}

if sessionUpdated {
ps := &prevSessions{
sessions: sessions,
rotationTime: ts,
}
collectorLoop:
for {
select {
case sd := <-edm.sessionCollectorCh:
processSession(sd)

sessions = []*sessionData{}
case wu := <-wkd.updateCh:
processWKDUpdate(wu)

// We have reset the sessions slice
sessionUpdated = false
case ts := <-ticker.C:
// We want to tick at the start of each minute
ticker.Reset(timeUntilNextMinute())

edm.sessionWriterCh <- ps
}
flushSessions(sessionIntervalStart, ts)
sessionIntervalStart = ts

prevWKD, err := wkd.rotateTracker(edm, dawgFile, ts)
prevWKD, err := wkd.rotateTracker(edm, dawgFile, histogramIntervalStart, ts)
if err != nil {
edm.log.Error("unable to rotate histogram map", "error", err)
continue
Expand All @@ -2992,6 +3043,7 @@ collectorLoop:
if len(prevWKD.m) > 0 {
edm.histogramWriterCh <- prevWKD
}
histogramIntervalStart = ts

// See if we need to modify anything based on a config update
conf = edm.getConfig()
Expand All @@ -3010,8 +3062,13 @@ collectorLoop:
// read from it again in this select statement now that
// it is closed.
wkd.stop = nil

case <-wkd.retryerDone:
edm.log.Info("dataCollector: update retryer is done")
drainCollectorQueues()
shutdownTime := time.Now().UTC()
flushSessions(sessionIntervalStart, shutdownTime)
flushHistogram(histogramIntervalStart, shutdownTime)
break collectorLoop
}
}
Expand Down