From dec2cd468a2570e39c4ea06480cf1edba9ec3ce3 Mon Sep 17 00:00:00 2001 From: Johan Lindh Date: Thu, 30 Apr 2026 13:54:33 +0200 Subject: [PATCH] fix(runner): deduplicate concurrent new qnames --- pkg/runner/run_minimiser_test.go | 68 ++++++++++++++++++++++++++++++++ pkg/runner/runner.go | 22 ++++++----- 2 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 pkg/runner/run_minimiser_test.go diff --git a/pkg/runner/run_minimiser_test.go b/pkg/runner/run_minimiser_test.go new file mode 100644 index 0000000..3e4a614 --- /dev/null +++ b/pkg/runner/run_minimiser_test.go @@ -0,0 +1,68 @@ +package runner + +import ( + "io" + "log/slog" + "path/filepath" + "sync" + "testing" + + "github.com/cockroachdb/pebble" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/miekg/dns" +) + +func TestQnameSeenConcurrentFirstSeenOnce(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + edm, err := newDnstapMinimiser(logger, defaultTC) + if err != nil { + t.Fatalf("newDnstapMinimiser: %s", err) + } + t.Cleanup(edm.stop) + + seenQnameLRU, err := lru.New[string, struct{}](10) + if err != nil { + t.Fatalf("lru.New: %s", err) + } + + pdb, err := pebble.Open(filepath.Join(t.TempDir(), "pebble"), &pebble.Options{}) + if err != nil { + t.Fatalf("pebble.Open: %s", err) + } + t.Cleanup(func() { + if err := pdb.Close(); err != nil { + t.Fatalf("pdb.Close: %s", err) + } + }) + + msg := new(dns.Msg) + msg.SetQuestion("race.example.", dns.TypeA) + + const goroutines = 64 + start := make(chan struct{}) + results := make(chan bool, goroutines) + + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + <-start + results <- edm.qnameSeen(msg, seenQnameLRU, pdb) + }() + } + + close(start) + wg.Wait() + close(results) + + var firstSeen int + for seen := range results { + if !seen { + firstSeen++ + } + } + if firstSeen != 1 { + t.Fatalf("first-seen results have: %d, want: 1", firstSeen) + } +} diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 17cb068..7c626ed 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -1471,6 +1471,7 @@ type dnstapMinimiser struct { reloadMinimiserMutex sync.RWMutex reloadMinimiserConfigCh []chan struct{} reloadHistogramSenderConfigCh chan struct{} + seenQnameLocks [256]sync.Mutex } func createCryptopan(key string, salt string) (*cryptopan.Cryptopan, error) { @@ -1820,18 +1821,21 @@ func (wkd *wellKnownDomainsTracker) rotateTracker(edm *dnstapMinimiser, dawgFile return prevWKD, nil } +func seenQnameLockIndex(qname string) int { + var h uint32 = 2166136261 + for i := 0; i < len(qname); i++ { + h ^= uint32(qname[i]) + h *= 16777619 + } + return int(h % 256) +} + // Check if we have already seen this qname since we started. func (edm *dnstapMinimiser) qnameSeen(msg *dns.Msg, seenQnameLRU *lru.Cache[string, struct{}], pdb *pebble.DB) bool { - // NOTE: This looks like it might be a race (calling - // Get() followed by separate Add()) but since we want - // to keep often looked-up names in the cache we need to - // use Get() for updating recent-ness, and there is no - // GetOrAdd() method available. However, it should be - // safe for multiple threads to call Add() as this will - // only move an already added entry to the front of the - // eviction list which should be OK. - qname := strings.ToLower(msg.Question[0].Name) + seenLock := &edm.seenQnameLocks[seenQnameLockIndex(qname)] + seenLock.Lock() + defer seenLock.Unlock() _, ok := seenQnameLRU.Get(qname) if ok {