From 9a856896687783729209dc912c0c6ce428baad8c Mon Sep 17 00:00:00 2001 From: efiten Date: Tue, 31 Mar 2026 16:07:50 +0200 Subject: [PATCH 1/3] perf: eliminate O(n*obs) scan and cache sort in QueryGroupedPackets - Add LatestSeen field to StoreTx, maintained in all three observation write paths (load, real-time ingest, poll). Eliminates the per-packet observation scan that was O(total_packets * avg_observations). - Build grouped packet maps under read lock (correct), sort the local copy outside the lock (avoids holding lock during O(n log n) sort). - Cache the full sorted result for 3 seconds keyed by filter params. Repeated requests within the TTL return instantly without re-sorting. Fixes /packets?limit=50000&groupByHash=true taking 16s on large stores. Co-Authored-By: Claude Sonnet 4.6 --- cmd/server/store.go | 139 +++++++++++++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 47 deletions(-) diff --git a/cmd/server/store.go b/cmd/server/store.go index d3a30cee..a37041bd 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -39,6 +39,7 @@ type StoreTx struct { RSSI *float64 PathJSON string Direction string + LatestSeen string // max observation timestamp (or FirstSeen if no observations) // Cached parsed fields (set once, read many) parsedPath []string // cached parsePathJSON result pathParsed bool // whether parsedPath has been set @@ -85,6 +86,11 @@ type PacketStore struct { rfCacheTTL time.Duration cacheHits int64 cacheMisses int64 + // Short-lived cache for QueryGroupedPackets (avoids repeated full sort) + groupedCacheMu sync.Mutex + groupedCacheKey string + groupedCacheExp time.Time + groupedCacheRes *PacketResult // Cached node list + prefix map (rebuilt on demand, shared across analytics) nodeCache []nodeInfo nodePM *prefixMap @@ -233,6 +239,7 @@ func (s *PacketStore) Load() error { RawHex: nullStrVal(rawHex), Hash: hashStr, FirstSeen: nullStrVal(firstSeen), + LatestSeen: nullStrVal(firstSeen), RouteType: nullIntPtr(routeType), PayloadType: nullIntPtr(payloadType), DecodedJSON: nullStrVal(decodedJSON), @@ -279,6 +286,9 @@ func (s *PacketStore) Load() error { tx.Observations = append(tx.Observations, obs) tx.ObservationCount++ + if obs.Timestamp > tx.LatestSeen { + tx.LatestSeen = obs.Timestamp + } s.byObsID[oid] = obs @@ -416,47 +426,40 @@ func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult { // QueryGroupedPackets returns transmissions grouped by hash (already 1:1). func (s *PacketStore) QueryGroupedPackets(q PacketQuery) *PacketResult { atomic.AddInt64(&s.queryCount, 1) - s.mu.RLock() - defer s.mu.RUnlock() if q.Limit <= 0 { q.Limit = 50 } - results := s.filterPackets(q) - - // Build grouped output sorted by latest observation DESC - type groupEntry struct { - tx *StoreTx - latest string + // Cache key covers all filter dimensions. Empty key = no filters. + cacheKey := q.Since + "|" + q.Until + "|" + q.Region + "|" + q.Node + "|" + q.Hash + "|" + q.Observer + if q.Type != nil { + cacheKey += fmt.Sprintf("|t%d", *q.Type) } - entries := make([]groupEntry, len(results)) - for i, tx := range results { - latest := tx.FirstSeen - for _, obs := range tx.Observations { - if obs.Timestamp > latest { - latest = obs.Timestamp - } - } - entries[i] = groupEntry{tx: tx, latest: latest} + if q.Route != nil { + cacheKey += fmt.Sprintf("|r%d", *q.Route) } - sort.Slice(entries, func(i, j int) bool { - return entries[i].latest > entries[j].latest - }) - total := len(entries) - start := q.Offset - if start >= total { - return &PacketResult{Packets: []map[string]interface{}{}, Total: total} + // Return cached sorted list if still fresh (3s TTL) + s.groupedCacheMu.Lock() + if s.groupedCacheRes != nil && s.groupedCacheKey == cacheKey && time.Now().Before(s.groupedCacheExp) { + cached := s.groupedCacheRes + s.groupedCacheMu.Unlock() + return pagePacketResult(cached, q.Offset, q.Limit) } - end := start + q.Limit - if end > total { - end = total + s.groupedCacheMu.Unlock() + + // Build entries under read lock (observer scan needs lock), sort outside it. + type groupEntry struct { + latest map[string]interface{} + ts string } + var entries []groupEntry - packets := make([]map[string]interface{}, 0, end-start) - for _, e := range entries[start:end] { - tx := e.tx + s.mu.RLock() + results := s.filterPackets(q) + entries = make([]groupEntry, 0, len(results)) + for _, tx := range results { observerCount := 0 seen := make(map[string]bool) for _, obs := range tx.Observations { @@ -465,26 +468,61 @@ func (s *PacketStore) QueryGroupedPackets(q PacketQuery) *PacketResult { observerCount++ } } - packets = append(packets, map[string]interface{}{ - "hash": strOrNil(tx.Hash), - "first_seen": strOrNil(tx.FirstSeen), - "count": tx.ObservationCount, - "observer_count": observerCount, - "observation_count": tx.ObservationCount, - "latest": strOrNil(e.latest), - "observer_id": strOrNil(tx.ObserverID), - "observer_name": strOrNil(tx.ObserverName), - "path_json": strOrNil(tx.PathJSON), - "payload_type": intPtrOrNil(tx.PayloadType), - "route_type": intPtrOrNil(tx.RouteType), - "raw_hex": strOrNil(tx.RawHex), - "decoded_json": strOrNil(tx.DecodedJSON), - "snr": floatPtrOrNil(tx.SNR), - "rssi": floatPtrOrNil(tx.RSSI), + entries = append(entries, groupEntry{ + ts: tx.LatestSeen, + latest: map[string]interface{}{ + "hash": strOrNil(tx.Hash), + "first_seen": strOrNil(tx.FirstSeen), + "count": tx.ObservationCount, + "observer_count": observerCount, + "observation_count": tx.ObservationCount, + "latest": strOrNil(tx.LatestSeen), + "observer_id": strOrNil(tx.ObserverID), + "observer_name": strOrNil(tx.ObserverName), + "path_json": strOrNil(tx.PathJSON), + "payload_type": intPtrOrNil(tx.PayloadType), + "route_type": intPtrOrNil(tx.RouteType), + "raw_hex": strOrNil(tx.RawHex), + "decoded_json": strOrNil(tx.DecodedJSON), + "snr": floatPtrOrNil(tx.SNR), + "rssi": floatPtrOrNil(tx.RSSI), + }, }) } + s.mu.RUnlock() - return &PacketResult{Packets: packets, Total: total} + // Sort outside the lock — only touches our local slice. + sort.Slice(entries, func(i, j int) bool { + return entries[i].ts > entries[j].ts + }) + + packets := make([]map[string]interface{}, len(entries)) + for i, e := range entries { + packets[i] = e.latest + } + + full := &PacketResult{Packets: packets, Total: len(packets)} + + s.groupedCacheMu.Lock() + s.groupedCacheRes = full + s.groupedCacheKey = cacheKey + s.groupedCacheExp = time.Now().Add(3 * time.Second) + s.groupedCacheMu.Unlock() + + return pagePacketResult(full, q.Offset, q.Limit) +} + +// pagePacketResult returns a window of a PacketResult without re-allocating the slice. +func pagePacketResult(r *PacketResult, offset, limit int) *PacketResult { + total := r.Total + if offset >= total { + return &PacketResult{Packets: []map[string]interface{}{}, Total: total} + } + end := offset + limit + if end > total { + end = total + } + return &PacketResult{Packets: r.Packets[offset:end], Total: total} } // GetStoreStats returns aggregate counts (packet data from memory, node/observer from DB). @@ -950,6 +988,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac RawHex: r.rawHex, Hash: r.hash, FirstSeen: r.firstSeen, + LatestSeen: r.firstSeen, RouteType: r.routeType, PayloadType: r.payloadType, DecodedJSON: r.decodedJSON, @@ -999,6 +1038,9 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac } tx.Observations = append(tx.Observations, obs) tx.ObservationCount++ + if obs.Timestamp > tx.LatestSeen { + tx.LatestSeen = obs.Timestamp + } s.byObsID[oid] = obs if r.observerID != "" { s.byObserver[r.observerID] = append(s.byObserver[r.observerID], obs) @@ -1230,6 +1272,9 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] } tx.Observations = append(tx.Observations, obs) tx.ObservationCount++ + if obs.Timestamp > tx.LatestSeen { + tx.LatestSeen = obs.Timestamp + } s.byObsID[r.obsID] = obs if r.observerID != "" { s.byObserver[r.observerID] = append(s.byObserver[r.observerID], obs) From 3bf1df55b017da0b0d6f5d6acb32cca74d711c7e Mon Sep 17 00:00:00 2001 From: efiten Date: Wed, 1 Apr 2026 15:00:24 +0200 Subject: [PATCH 2/3] perf: cache GetChannels and move JSON unmarshal outside read lock GetChannels was iterating all payload-type-5 packets and JSON-unmarshaling each one while holding s.mu.RLock(), blocking all concurrent reads. On stores with many channel messages this caused /api/channels to take 13s+. - Copy only the needed fields under the read lock, release before unmarshal - Cache the result for 15 seconds keyed by region param - Invalidate cache on new packet ingestion Co-Authored-By: Claude Sonnet 4.6 --- cmd/server/store.go | 97 ++++++++++++++++++++++++++++++--------------- 1 file changed, 65 insertions(+), 32 deletions(-) diff --git a/cmd/server/store.go b/cmd/server/store.go index a37041bd..b8d46cb2 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -91,6 +91,11 @@ type PacketStore struct { groupedCacheKey string groupedCacheExp time.Time groupedCacheRes *PacketResult + // Short-lived cache for GetChannels (avoids repeated full scan + JSON unmarshal) + channelsCacheMu sync.Mutex + channelsCacheKey string + channelsCacheExp time.Time + channelsCacheRes []map[string]interface{} // Cached node list + prefix map (rebuilt on demand, shared across analytics) nodeCache []nodeInfo nodePM *prefixMap @@ -1149,6 +1154,9 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac s.distCache = make(map[string]*cachedResult) s.subpathCache = make(map[string]*cachedResult) s.cacheMu.Unlock() + s.channelsCacheMu.Lock() + s.channelsCacheRes = nil + s.channelsCacheMu.Unlock() } return result, newMaxID @@ -1368,6 +1376,9 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] s.distCache = make(map[string]*cachedResult) s.subpathCache = make(map[string]*cachedResult) s.cacheMu.Unlock() + s.channelsCacheMu.Lock() + s.channelsCacheRes = nil + s.channelsCacheMu.Unlock() // analytics caches cleared; no per-cycle log to avoid stdout overhead } @@ -2107,14 +2118,50 @@ func hasGarbageChars(s string) bool { // GetChannels returns channel list from in-memory packets (payload_type 5, decoded type CHAN). func (s *PacketStore) GetChannels(region string) []map[string]interface{} { - s.mu.RLock() - defer s.mu.RUnlock() + cacheKey := region + + s.channelsCacheMu.Lock() + if s.channelsCacheRes != nil && s.channelsCacheKey == cacheKey && time.Now().Before(s.channelsCacheExp) { + res := s.channelsCacheRes + s.channelsCacheMu.Unlock() + return res + } + s.channelsCacheMu.Unlock() + + type txSnapshot struct { + firstSeen string + decodedJSON string + hasRegion bool + } + // Copy only the fields needed — release the lock before JSON unmarshal. + s.mu.RLock() var regionObs map[string]bool if region != "" { regionObs = s.resolveRegionObservers(region) } + grpTxts := s.byPayloadType[5] + snapshots := make([]txSnapshot, 0, len(grpTxts)) + for _, tx := range grpTxts { + inRegion := true + if regionObs != nil { + inRegion = false + for _, obs := range tx.Observations { + if regionObs[obs.ObserverID] { + inRegion = true + break + } + } + } + snapshots = append(snapshots, txSnapshot{ + firstSeen: tx.FirstSeen, + decodedJSON: tx.DecodedJSON, + hasRegion: inRegion, + }) + } + s.mu.RUnlock() + // JSON unmarshal outside the lock. type chanInfo struct { Hash string Name string @@ -2130,53 +2177,32 @@ func (s *PacketStore) GetChannels(region string) []map[string]interface{} { Sender string `json:"sender"` } channelMap := map[string]*chanInfo{} - - grpTxts := s.byPayloadType[5] - for _, tx := range grpTxts { - - // Region filter: check if any observation is from a regional observer - if regionObs != nil { - match := false - for _, obs := range tx.Observations { - if regionObs[obs.ObserverID] { - match = true - break - } - } - if !match { - continue - } + for _, snap := range snapshots { + if !snap.hasRegion { + continue } - var decoded decodedGrp - if json.Unmarshal([]byte(tx.DecodedJSON), &decoded) != nil { + if json.Unmarshal([]byte(snap.decodedJSON), &decoded) != nil { continue } if decoded.Type != "CHAN" { continue } - // Filter out garbage-decrypted channel names/messages (pre-#197 data still in DB) if hasGarbageChars(decoded.Channel) || hasGarbageChars(decoded.Text) { continue } - channelName := decoded.Channel if channelName == "" { channelName = "unknown" } - key := channelName - - ch := channelMap[key] + ch := channelMap[channelName] if ch == nil { - ch = &chanInfo{ - Hash: key, Name: channelName, - LastActivity: tx.FirstSeen, - } - channelMap[key] = ch + ch = &chanInfo{Hash: channelName, Name: channelName, LastActivity: snap.firstSeen} + channelMap[channelName] = ch } ch.MessageCount++ - if tx.FirstSeen >= ch.LastActivity { - ch.LastActivity = tx.FirstSeen + if snap.firstSeen >= ch.LastActivity { + ch.LastActivity = snap.firstSeen if decoded.Text != "" { idx := strings.Index(decoded.Text, ": ") if idx > 0 { @@ -2199,6 +2225,13 @@ func (s *PacketStore) GetChannels(region string) []map[string]interface{} { "messageCount": ch.MessageCount, "lastActivity": ch.LastActivity, }) } + + s.channelsCacheMu.Lock() + s.channelsCacheRes = channels + s.channelsCacheKey = cacheKey + s.channelsCacheExp = time.Now().Add(15 * time.Second) + s.channelsCacheMu.Unlock() + return channels } From 026e0ac4813868d6a1bac966abadb97532eaa85e Mon Sep 17 00:00:00 2001 From: efiten Date: Wed, 1 Apr 2026 15:10:13 +0200 Subject: [PATCH 3/3] test: add tests for LatestSeen maintenance, grouped packet sort, and channel cache - TestLatestSeenMaintained: verifies StoreTx.LatestSeen is set >= FirstSeen and >= all observation timestamps after store load - TestQueryGroupedPacketsSortedByLatest: verifies packets with more-recent observations sort before packets with newer first_seen but older observations - TestQueryGroupedPacketsCacheReturnsConsistentResult: verifies cache returns consistent total and ordering on back-to-back calls - TestGetChannelsCacheReturnsConsistentResult: verifies GetChannels cache returns same channel names on repeated calls - TestGetChannelsNotBlockedByLargeLock: verifies GetChannels returns correct data (channel name, messageCount) after lock-copy-unmarshal refactor Co-Authored-By: Claude Sonnet 4.6 --- cmd/server/routes_test.go | 203 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) diff --git a/cmd/server/routes_test.go b/cmd/server/routes_test.go index 9c7018f1..28fe06aa 100644 --- a/cmd/server/routes_test.go +++ b/cmd/server/routes_test.go @@ -2402,3 +2402,206 @@ func min(a, b int) int { } return b } + +// TestLatestSeenMaintained verifies that StoreTx.LatestSeen is populated after Load() +// and is >= FirstSeen for packets that have observations. +func TestLatestSeenMaintained(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + seedTestData(t, db) + + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("store.Load failed: %v", err) + } + + store.mu.RLock() + defer store.mu.RUnlock() + + if len(store.packets) == 0 { + t.Fatal("expected packets in store after Load") + } + + for _, tx := range store.packets { + if tx.LatestSeen == "" { + t.Errorf("packet %s has empty LatestSeen (FirstSeen=%s)", tx.Hash, tx.FirstSeen) + continue + } + // LatestSeen must be >= FirstSeen (string comparison works for RFC3339/ISO8601) + if tx.LatestSeen < tx.FirstSeen { + t.Errorf("packet %s: LatestSeen %q < FirstSeen %q", tx.Hash, tx.LatestSeen, tx.FirstSeen) + } + // For packets with observations, LatestSeen must be >= all observation timestamps. + for _, obs := range tx.Observations { + if obs.Timestamp != "" && obs.Timestamp > tx.LatestSeen { + t.Errorf("packet %s: obs.Timestamp %q > LatestSeen %q", tx.Hash, obs.Timestamp, tx.LatestSeen) + } + } + } +} + +// TestQueryGroupedPacketsSortedByLatest verifies that QueryGroupedPackets returns packets +// sorted by LatestSeen DESC — i.e. the packet whose most-recent observation is newest +// comes first, even if its first_seen is older. +func TestQueryGroupedPacketsSortedByLatest(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + now := time.Now().UTC() + // oldFirst: first_seen is old, but observation is very recent. + oldFirst := now.Add(-48 * time.Hour).Format(time.RFC3339) + // newFirst: first_seen is recent, but observation is old. + newFirst := now.Add(-1 * time.Hour).Format(time.RFC3339) + recentEpoch := now.Add(-5 * time.Minute).Unix() + oldEpoch := now.Add(-72 * time.Hour).Unix() + + db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES ('sortobs', 'Sort Observer', 'TST', ?, '2026-01-01T00:00:00Z', 1)`, now.Format(time.RFC3339)) + + // Packet A: old first_seen, but a very recent observation — should sort first. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + VALUES ('AA01', 'sort_old_first_recent_obs', ?, 1, 2, '{"type":"TXT_MSG","text":"old first"}')`, oldFirst) + var idA int64 + db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='sort_old_first_recent_obs'`).Scan(&idA) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (?, 1, 10.0, -90, '[]', ?)`, idA, recentEpoch) + + // Packet B: newer first_seen, but an old observation — should sort second. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + VALUES ('BB02', 'sort_new_first_old_obs', ?, 1, 2, '{"type":"TXT_MSG","text":"new first"}')`, newFirst) + var idB int64 + db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='sort_new_first_old_obs'`).Scan(&idB) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (?, 1, 10.0, -90, '[]', ?)`, idB, oldEpoch) + + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("store.Load failed: %v", err) + } + + result := store.QueryGroupedPackets(PacketQuery{Limit: 50}) + if result.Total < 2 { + t.Fatalf("expected at least 2 packets, got %d", result.Total) + } + + // Find the two test packets in the result (may be mixed with other entries). + firstHash := "" + secondHash := "" + for _, p := range result.Packets { + h, _ := p["hash"].(string) + if h == "sort_old_first_recent_obs" || h == "sort_new_first_old_obs" { + if firstHash == "" { + firstHash = h + } else { + secondHash = h + break + } + } + } + + if firstHash != "sort_old_first_recent_obs" { + t.Errorf("expected sort_old_first_recent_obs to appear before sort_new_first_old_obs in sorted results; got first=%q second=%q", firstHash, secondHash) + } +} + +// TestQueryGroupedPacketsCacheReturnsConsistentResult verifies that two rapid successive +// calls to QueryGroupedPackets return the same total count and first packet hash. +func TestQueryGroupedPacketsCacheReturnsConsistentResult(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + seedTestData(t, db) + + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("store.Load failed: %v", err) + } + + q := PacketQuery{Limit: 50} + r1 := store.QueryGroupedPackets(q) + r2 := store.QueryGroupedPackets(q) + + if r1.Total != r2.Total { + t.Errorf("cache inconsistency: first call total=%d, second call total=%d", r1.Total, r2.Total) + } + if r1.Total == 0 { + t.Fatal("expected non-zero results from QueryGroupedPackets") + } + h1, _ := r1.Packets[0]["hash"].(string) + h2, _ := r2.Packets[0]["hash"].(string) + if h1 != h2 { + t.Errorf("cache inconsistency: first call first hash=%q, second call first hash=%q", h1, h2) + } +} + +// TestGetChannelsCacheReturnsConsistentResult verifies that two rapid successive calls +// to GetChannels return the same number of channels with the same names. +func TestGetChannelsCacheReturnsConsistentResult(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + seedTestData(t, db) + + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("store.Load failed: %v", err) + } + + r1 := store.GetChannels("") + r2 := store.GetChannels("") + + if len(r1) != len(r2) { + t.Errorf("cache inconsistency: first call len=%d, second call len=%d", len(r1), len(r2)) + } + if len(r1) == 0 { + t.Fatal("expected at least one channel from seedTestData") + } + + names1 := make(map[string]bool) + for _, ch := range r1 { + if n, ok := ch["name"].(string); ok { + names1[n] = true + } + } + for _, ch := range r2 { + if n, ok := ch["name"].(string); ok { + if !names1[n] { + t.Errorf("cache inconsistency: channel %q in second result but not first", n) + } + } + } +} + +// TestGetChannelsNotBlockedByLargeLock verifies that GetChannels returns correct channel +// data (count and messageCount) after observations have been added — i.e. the lock-copy +// pattern works correctly and the JSON unmarshal outside the lock produces valid results. +func TestGetChannelsNotBlockedByLargeLock(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + seedTestData(t, db) + + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("store.Load failed: %v", err) + } + + channels := store.GetChannels("") + + // seedTestData inserts one GRP_TXT (payload_type=5) packet with channel "#test". + if len(channels) != 1 { + t.Fatalf("expected 1 channel, got %d", len(channels)) + } + + ch := channels[0] + name, ok := ch["name"].(string) + if !ok || name != "#test" { + t.Errorf("expected channel name '#test', got %v", ch["name"]) + } + + // messageCount should be 1 (one CHAN packet for #test). + msgCount, ok := ch["messageCount"].(int) + if !ok { + // JSON numbers may unmarshal as float64 — but GetChannels returns native Go values. + t.Errorf("expected messageCount to be int, got %T (%v)", ch["messageCount"], ch["messageCount"]) + } else if msgCount != 1 { + t.Errorf("expected messageCount=1, got %d", msgCount) + } +}