diff --git a/cmd/server/coverage_test.go b/cmd/server/coverage_test.go index 44a0820a..3c0d34e9 100644 --- a/cmd/server/coverage_test.go +++ b/cmd/server/coverage_test.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "database/sql" "encoding/json" "fmt" @@ -428,6 +429,49 @@ func TestMaxTransmissionID(t *testing.T) { }) } +// --- MaxTransmissionID incremental tracking --- + +func TestMaxTransmissionIDIncremental(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + seedTestData(t, db) + store := NewPacketStore(db, nil) + store.Load() + + maxTx := store.MaxTransmissionID() + maxObs := store.MaxObservationID() + + if maxTx <= 0 { + t.Fatalf("expected maxTx > 0 after Load, got %d", maxTx) + } + if maxObs <= 0 { + t.Fatalf("expected maxObs > 0 after Load, got %d", maxObs) + } + + // Verify incremental field matches brute-force iteration + store.mu.RLock() + bruteMaxTx := 0 + for id := range store.byTxID { + if id > bruteMaxTx { + bruteMaxTx = id + } + } + bruteMaxObs := 0 + for id := range store.byObsID { + if id > bruteMaxObs { + bruteMaxObs = id + } + } + store.mu.RUnlock() + + if maxTx != bruteMaxTx { + t.Errorf("maxTxID mismatch: incremental=%d brute=%d", maxTx, bruteMaxTx) + } + if maxObs != bruteMaxObs { + t.Errorf("maxObsID mismatch: incremental=%d brute=%d", maxObs, bruteMaxObs) + } +} + // --- Route handler DB fallback (no store) --- func TestHandleBulkHealthNoStore(t *testing.T) { @@ -770,6 +814,56 @@ func TestPrefixMapResolve(t *testing.T) { }) } +func TestPrefixMapCap(t *testing.T) { + // 16-char pubkey — longer than maxPrefixLen + nodes := []nodeInfo{ + {PublicKey: "aabbccdd11223344", Name: "LongKey"}, + {PublicKey: "eeff0011", Name: "ShortKey"}, // exactly 8 chars + } + pm := buildPrefixMap(nodes) + + t.Run("short prefixes still work", func(t *testing.T) { + n := pm.resolve("aabb") + if n == nil || n.Name != "LongKey" { + t.Errorf("expected LongKey for short prefix, got %v", n) + } + }) + + t.Run("full pubkey exact match works", func(t *testing.T) { + n := pm.resolve("aabbccdd11223344") + if n == nil || n.Name != "LongKey" { + t.Errorf("expected LongKey for full key, got %v", n) + } + }) + + t.Run("intermediate prefix beyond cap returns nil", func(t *testing.T) { + // 10-char prefix — beyond maxPrefixLen but not full key + n := pm.resolve("aabbccdd11") + if n != nil { + t.Errorf("expected nil for intermediate prefix beyond cap, got %v", n.Name) + } + }) + + t.Run("short key within cap has all prefixes", func(t *testing.T) { + for l := 2; l <= 8; l++ { + pfx := "eeff0011"[:l] + n := pm.resolve(pfx) + if n == nil || n.Name != "ShortKey" { + t.Errorf("prefix %q: expected ShortKey, got %v", pfx, n) + } + } + }) + + t.Run("map size is capped", func(t *testing.T) { + // LongKey: 7 prefix entries (2..8) + 1 full key = 8 + // ShortKey: 7 prefix entries (2..8), no full key entry (len == maxPrefixLen) = 7 + // No overlapping prefixes between the two nodes → 8 + 7 = 15 unique map keys + if len(pm.m) != 15 { + t.Errorf("expected 15 map entries (8 for LongKey + 7 for ShortKey), got %d", len(pm.m)) + } + }) +} + // --- pathLen --- func TestPathLen(t *testing.T) { @@ -1333,6 +1427,40 @@ func TestGetNodeLocations(t *testing.T) { } } +// --- GetNodeLocationsByKeys --- + +func TestGetNodeLocationsByKeys(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + seedTestData(t, db) + + // Query with a known key + pk := "aabbccdd11223344" + locs := db.GetNodeLocationsByKeys([]string{pk}) + if len(locs) != 1 { + t.Errorf("expected 1 location, got %d", len(locs)) + } + if entry, ok := locs[strings.ToLower(pk)]; ok { + if entry["lat"] == nil { + t.Error("expected non-nil lat") + } + } else { + t.Error("expected node location for test repeater") + } + + // Query with no keys returns empty map + empty := db.GetNodeLocationsByKeys([]string{}) + if len(empty) != 0 { + t.Errorf("expected 0 locations for empty keys, got %d", len(empty)) + } + + // Query with unknown key returns empty map + unknown := db.GetNodeLocationsByKeys([]string{"nonexistent"}) + if len(unknown) != 0 { + t.Errorf("expected 0 locations for unknown key, got %d", len(unknown)) + } +} + // --- Store edge cases --- func TestStoreQueryPacketsEdgeCases(t *testing.T) { @@ -1906,6 +2034,48 @@ func TestTxToMap(t *testing.T) { } } +func TestTxToMapLazyObservations(t *testing.T) { + snr := 10.5 + rssi := -90.0 + tx := &StoreTx{ + ID: 1, + Hash: "abc", + Observations: []*StoreObs{ + {ID: 10, ObserverID: "obs1", ObserverName: "O1", SNR: &snr, RSSI: &rssi, Timestamp: "2025-01-01"}, + {ID: 11, ObserverID: "obs2", ObserverName: "O2", SNR: &snr, RSSI: &rssi, Timestamp: "2025-01-02"}, + }, + } + + // Without flag: no observations key + m := txToMap(tx) + if _, ok := m["observations"]; ok { + t.Error("txToMap without includeObservations should not include observations key") + } + + // With false: no observations key + m = txToMap(tx, false) + if _, ok := m["observations"]; ok { + t.Error("txToMap(tx, false) should not include observations key") + } + + // With true: observations included + m = txToMap(tx, true) + obs, ok := m["observations"] + if !ok { + t.Fatal("txToMap(tx, true) should include observations key") + } + obsList, ok := obs.([]map[string]interface{}) + if !ok { + t.Fatal("observations should be []map[string]interface{}") + } + if len(obsList) != 2 { + t.Errorf("expected 2 observations, got %d", len(obsList)) + } + if obsList[0]["observer_id"] != "obs1" { + t.Errorf("expected observer_id obs1, got %v", obsList[0]["observer_id"]) + } +} + // --- filterTxSlice --- func TestFilterTxSlice(t *testing.T) { @@ -2099,6 +2269,84 @@ func TestSubpathPrecomputedIndex(t *testing.T) { } } +func TestSubpathTxIndexPopulated(t *testing.T) { + db := setupRichTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + store.Load() + + // spTxIndex must be populated alongside spIndex + if len(store.spTxIndex) == 0 { + t.Fatal("expected spTxIndex to be populated after Load()") + } + + // Every key in spIndex must also exist in spTxIndex with matching count + for key, count := range store.spIndex { + txs, ok := store.spTxIndex[key] + if !ok { + t.Errorf("spTxIndex missing key %q that exists in spIndex", key) + continue + } + if len(txs) != count { + t.Errorf("spTxIndex[%q] has %d txs, spIndex count is %d", key, len(txs), count) + } + } + + // GetSubpathDetail should return correct match count via indexed lookup + detail := store.GetSubpathDetail([]string{"eeff", "0011"}) + if detail == nil { + t.Fatal("expected non-nil detail for existing subpath") + } + matches, _ := detail["totalMatches"].(int) + if matches != 1 { + t.Errorf("totalMatches = %d, want 1", matches) + } + + // Non-existent subpath should return 0 matches + detail2 := store.GetSubpathDetail([]string{"zzzz", "yyyy"}) + if detail2 == nil { + t.Fatal("expected non-nil result even for non-existent subpath") + } + matches2, _ := detail2["totalMatches"].(int) + if matches2 != 0 { + t.Errorf("totalMatches for non-existent subpath = %d, want 0", matches2) + } +} + +func TestSubpathDetailMixedCaseHops(t *testing.T) { + db := setupRichTestDB(t) + defer db.Close() + store := NewPacketStore(db, nil) + store.Load() + + // Query with lowercase hops to establish baseline + lower := store.GetSubpathDetail([]string{"eeff", "0011"}) + if lower == nil { + t.Fatal("expected non-nil detail for lowercase subpath") + } + lowerMatches, _ := lower["totalMatches"].(int) + if lowerMatches == 0 { + t.Fatal("expected >0 matches for lowercase subpath") + } + + // Query with mixed-case hops — must return the same results (case-insensitive) + mixed := store.GetSubpathDetail([]string{"EEFF", "0011"}) + if mixed == nil { + t.Fatal("expected non-nil detail for mixed-case subpath") + } + mixedMatches, _ := mixed["totalMatches"].(int) + if mixedMatches != lowerMatches { + t.Errorf("mixed-case totalMatches = %d, want %d (same as lowercase)", mixedMatches, lowerMatches) + } + + // All-uppercase should also match + upper := store.GetSubpathDetail([]string{"EEFF", "0011"}) + upperMatches, _ := upper["totalMatches"].(int) + if upperMatches != lowerMatches { + t.Errorf("uppercase totalMatches = %d, want %d", upperMatches, lowerMatches) + } +} + func TestStoreGetAnalyticsRFCacheHit(t *testing.T) { db := setupRichTestDB(t) defer db.Close() @@ -3716,6 +3964,71 @@ func TestGetChannelMessagesAfterIngest(t *testing.T) { } } +// --- resolveRegionObservers caching --- + +func TestResolveRegionObserversCaching(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + seedTestData(t, db) + + store := &PacketStore{db: db} + + // First call should populate cache. + obs1 := store.resolveRegionObservers("SJC") + if obs1 == nil || len(obs1) == 0 { + t.Fatal("expected observer IDs for SJC on first call") + } + + // Second call should return cached result (same pointer). + obs2 := store.resolveRegionObservers("SJC") + if len(obs2) != len(obs1) { + t.Errorf("cached result differs: got %d, want %d", len(obs2), len(obs1)) + } + + // Non-existent region should return nil even from cache. + obs3 := store.resolveRegionObservers("NONEXIST") + if obs3 != nil { + t.Errorf("expected nil for NONEXIST, got %v", obs3) + } + + // Verify cache fields are set. + if store.regionObsCache == nil { + t.Error("regionObsCache should be non-nil after calls") + } + if store.regionObsCacheTime.IsZero() { + t.Error("regionObsCacheTime should be set") + } +} + +func TestResolveRegionObserversCacheMissNewRegion(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + seedTestData(t, db) + + store := &PacketStore{db: db} + + // Populate cache with SJC. + obs1 := store.resolveRegionObservers("SJC") + if obs1 == nil || len(obs1) == 0 { + t.Fatal("expected observer IDs for SJC on first call") + } + + // Cache is now valid. Request a different region that exists in DB. + // Before the fix, this would return nil from the map lookup instead of + // fetching from DB, silently returning "no observers" for up to 30s. + obs2 := store.resolveRegionObservers("LAX") + // LAX may or may not have data in the test DB, but the key point is: + // a non-existent region should be fetched (not just nil-returned). + // Verify the region key was cached (even if empty). + store.regionObsMu.Lock() + _, cached := store.regionObsCache["LAX"] + store.regionObsMu.Unlock() + if !cached { + t.Error("LAX should be cached after resolveRegionObservers call, even if empty") + } + _ = obs2 +} + func TestIndexByNodePreCheck(t *testing.T) { store := &PacketStore{ byNode: make(map[string][]*StoreTx), @@ -3914,44 +4227,115 @@ func TestBuildTransmissionWhereMultiObserver(t *testing.T) { }) } -// --- Distance index rebuild debounce (#557) --- +// --- Distance index incremental update (#365, replaces debounce #557) --- -func TestDistanceRebuildDebounce(t *testing.T) { +func TestDistanceIncrementalUpdate(t *testing.T) { db := setupTestDB(t) defer db.Close() seedTestData(t, db) store := NewPacketStore(db, nil) store.Load() - // After Load(), distLast is set to now — so distDirty should be false - if store.distDirty { - t.Fatal("distDirty should be false after Load()") - } + // Record initial distance index size. + initialHops := len(store.distHops) + initialPaths := len(store.distPaths) - // Insert a new observation with a different path to trigger distDirty + // Insert a new observation with a different path to trigger an incremental update. maxObsID := db.GetMaxObservationID() db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) VALUES (1, 2, 5.0, -100, '["xx","yy","zz"]', ?)`, time.Now().Unix()) store.IngestNewObservations(maxObsID, 500) - // distDirty should be true (30s hasn't elapsed since Load) - if !store.distDirty { - t.Fatal("distDirty should be true after path change within 30s window") - } - - // Now simulate 30s having elapsed by backdating distLast - store.distLast = time.Now().Add(-31 * time.Second) + // Distance index should have been updated incrementally (sizes may differ + // if the new path resolves differently, but should not panic or corrupt). + _ = len(store.distHops) + _ = len(store.distPaths) - // Insert another observation to trigger another ingest cycle + // Insert another observation with yet another path. maxObsID = db.GetMaxObservationID() db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) VALUES (1, 2, 7.0, -95, '["aa","bb","cc","dd"]', ?)`, time.Now().Unix()) store.IngestNewObservations(maxObsID, 500) - // After 30s elapsed, distDirty should be cleared (rebuild happened) - if store.distDirty { - t.Fatal("distDirty should be false after rebuild (30s elapsed)") + // Verify the index is still coherent (no duplicates for the same tx). + txSeen := make(map[int]int) + for _, r := range store.distPaths { + if r.tx != nil { + txSeen[r.tx.ID]++ + } + } + for txID, count := range txSeen { + if count > 1 { + t.Errorf("distPaths has %d entries for tx %d (expected at most 1)", count, txID) + } } + + t.Logf("Distance index: %d→%d hops, %d→%d paths (incremental)", + initialHops, len(store.distHops), initialPaths, len(store.distPaths)) +} + +func TestHandleBatchObservations(t *testing.T) { + _, router := setupNoStoreServer(t) + + t.Run("empty hashes returns empty results", func(t *testing.T) { + body := strings.NewReader(`{"hashes":[]}`) + req := httptest.NewRequest("POST", "/api/packets/observations", body) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + if w.Code != 200 { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + results, ok := resp["results"].(map[string]interface{}) + if !ok || len(results) != 0 { + t.Fatalf("expected empty results map, got %v", resp) + } + }) + + t.Run("invalid JSON returns 400", func(t *testing.T) { + body := strings.NewReader(`not json`) + req := httptest.NewRequest("POST", "/api/packets/observations", body) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + if w.Code != 400 { + t.Fatalf("expected 400, got %d", w.Code) + } + }) + + t.Run("too many hashes returns 400", func(t *testing.T) { + hashes := make([]string, 201) + for i := range hashes { + hashes[i] = fmt.Sprintf("hash%d", i) + } + data, _ := json.Marshal(map[string][]string{"hashes": hashes}) + req := httptest.NewRequest("POST", "/api/packets/observations", bytes.NewReader(data)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + if w.Code != 400 { + t.Fatalf("expected 400, got %d", w.Code) + } + }) + + t.Run("valid hashes with no store returns empty results", func(t *testing.T) { + body := strings.NewReader(`{"hashes":["abc123","def456"]}`) + req := httptest.NewRequest("POST", "/api/packets/observations", body) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + if w.Code != 200 { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + _, ok := resp["results"].(map[string]interface{}) + if !ok { + t.Fatalf("expected results map, got %v", resp) + } + }) } diff --git a/cmd/server/db.go b/cmd/server/db.go index 99a0ff4d..6c253d6f 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -377,7 +377,8 @@ type PacketQuery struct { Until string Region string Node string - Order string // ASC or DESC + Order string // ASC or DESC + ExpandObservations bool // when true, include observation sub-maps in txToMap output } // PacketResult wraps paginated packet list. @@ -1497,6 +1498,39 @@ func (db *DB) GetNodeLocations() map[string]map[string]interface{} { return result } +// GetNodeLocationsByKeys returns location data only for the given public keys. +// This avoids fetching ALL nodes when only a few keys need to be matched. +func (db *DB) GetNodeLocationsByKeys(keys []string) map[string]map[string]interface{} { + result := make(map[string]map[string]interface{}) + if len(keys) == 0 { + return result + } + placeholders := make([]string, len(keys)) + args := make([]interface{}, len(keys)) + for i, k := range keys { + placeholders[i] = "?" + args[i] = strings.ToLower(k) + } + query := "SELECT public_key, lat, lon, role FROM nodes WHERE LOWER(public_key) IN (" + strings.Join(placeholders, ",") + ")" + rows, err := db.conn.Query(query, args...) + if err != nil { + return result + } + defer rows.Close() + for rows.Next() { + var pk string + var role sql.NullString + var lat, lon sql.NullFloat64 + rows.Scan(&pk, &lat, &lon, &role) + result[strings.ToLower(pk)] = map[string]interface{}{ + "lat": nullFloat(lat), + "lon": nullFloat(lon), + "role": nullStr(role), + } + } + return result +} + // QueryMultiNodePackets returns transmissions referencing any of the given pubkeys. func (db *DB) QueryMultiNodePackets(pubkeys []string, limit, offset int, order, since, until string) (*PacketResult, error) { if len(pubkeys) == 0 { diff --git a/cmd/server/eviction_test.go b/cmd/server/eviction_test.go index 4bba9c0b..9cb5ace6 100644 --- a/cmd/server/eviction_test.go +++ b/cmd/server/eviction_test.go @@ -162,24 +162,50 @@ func TestEvictStale_NoEvictionWhenDisabled(t *testing.T) { func TestEvictStale_MemoryBasedEviction(t *testing.T) { now := time.Now().UTC() - // Create enough packets to exceed a small memory limit - // 1000 packets * 5KB + 2000 obs * 500B ≈ 6MB store := makeTestStore(1000, now.Add(-1*time.Hour), 0) - // All packets are recent (1h old) so time-based won't trigger + // All packets are recent (1h old) so time-based won't trigger. store.retentionHours = 24 - store.maxMemoryMB = 3 // ~3MB limit, should evict roughly half + store.maxMemoryMB = 3 + // Inject deterministic estimator: simulates 6MB (over 3MB limit). + // Uses packet count so it scales correctly after eviction. + store.memoryEstimator = func() float64 { + return float64(len(store.packets)*5120+store.totalObs*500) / 1048576.0 + } evicted := store.EvictStale() if evicted == 0 { t.Fatal("expected some evictions for memory cap") } - // After eviction, estimated memory should be <= 3MB estMB := store.estimatedMemoryMB() - if estMB > 3.5 { // small tolerance + if estMB > 3.5 { t.Fatalf("expected <=3.5MB after eviction, got %.1fMB", estMB) } } +// TestEvictStale_MemoryBasedEviction_UnderestimatedHeap verifies that eviction +// fires correctly when actual heap is much larger than a formula-based estimate +// would report — the scenario that caused OOM kills in production. +func TestEvictStale_MemoryBasedEviction_UnderestimatedHeap(t *testing.T) { + now := time.Now().UTC() + store := makeTestStore(1000, now.Add(-1*time.Hour), 0) + store.retentionHours = 24 + store.maxMemoryMB = 500 + // Simulate actual heap 5x over budget (like production: ~5GB actual vs ~1GB limit). + store.memoryEstimator = func() float64 { + return 2500.0 // 2500MB actual vs 500MB limit + } + + evicted := store.EvictStale() + if evicted == 0 { + t.Fatal("expected evictions when heap is 5x over limit") + } + // Should keep roughly 500/2500 * 0.9 = 18% of packets → ~180 of 1000. + remaining := len(store.packets) + if remaining > 250 { + t.Fatalf("expected most packets evicted (heap 5x over), but %d of 1000 remain", remaining) + } +} + func TestEvictStale_CleansNodeIndexes(t *testing.T) { now := time.Now().UTC() store := makeTestStore(10, now.Add(-48*time.Hour), 0) diff --git a/cmd/server/main.go b/cmd/server/main.go index 3b30709d..a498b697 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -224,8 +224,15 @@ func main() { defer stopEviction() // Auto-prune old packets if retention.packetDays is configured + var stopPrune func() if cfg.Retention != nil && cfg.Retention.PacketDays > 0 { days := cfg.Retention.PacketDays + pruneTicker := time.NewTicker(24 * time.Hour) + pruneDone := make(chan struct{}) + stopPrune = func() { + pruneTicker.Stop() + close(pruneDone) + } go func() { time.Sleep(1 * time.Minute) if n, err := database.PruneOldPackets(days); err != nil { @@ -233,11 +240,16 @@ func main() { } else { log.Printf("[prune] deleted %d transmissions older than %d days", n, days) } - for range time.Tick(24 * time.Hour) { - if n, err := database.PruneOldPackets(days); err != nil { - log.Printf("[prune] error: %v", err) - } else { - log.Printf("[prune] deleted %d transmissions older than %d days", n, days) + for { + select { + case <-pruneTicker.C: + if n, err := database.PruneOldPackets(days); err != nil { + log.Printf("[prune] error: %v", err) + } else { + log.Printf("[prune] deleted %d transmissions older than %d days", n, days) + } + case <-pruneDone: + return } } }() @@ -262,6 +274,11 @@ func main() { // 1. Stop accepting new WebSocket/poll data poller.Stop() + // 1b. Stop auto-prune ticker + if stopPrune != nil { + stopPrune() + } + // 2. Gracefully drain HTTP connections (up to 15s) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() diff --git a/cmd/server/resolve_context_test.go b/cmd/server/resolve_context_test.go index 6d7af222..00ddefee 100644 --- a/cmd/server/resolve_context_test.go +++ b/cmd/server/resolve_context_test.go @@ -166,6 +166,7 @@ func TestResolveHopsAPI_UniquePrefix(t *testing.T) { // Insert a unique node srv.db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, lat, lon) VALUES (?, ?, ?, ?)", "ff11223344", "UniqueNode", 37.0, -122.0) + srv.store.InvalidateNodeCache() req := httptest.NewRequest("GET", "/api/resolve-hops?hops=ff11223344", nil) rr := httptest.NewRecorder() @@ -192,6 +193,7 @@ func TestResolveHopsAPI_AmbiguousNoContext(t *testing.T) { "ee1aaaaaaa", "Node-E1", 37.0, -122.0) srv.db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, lat, lon) VALUES (?, ?, ?, ?)", "ee1bbbbbbb", "Node-E2", 38.0, -121.0) + srv.store.InvalidateNodeCache() req := httptest.NewRequest("GET", "/api/resolve-hops?hops=ee1", nil) rr := httptest.NewRecorder() @@ -204,8 +206,10 @@ func TestResolveHopsAPI_AmbiguousNoContext(t *testing.T) { if hr == nil { t.Fatal("expected hop in resolved map") } - if hr.Confidence != "ambiguous" { - t.Fatalf("expected ambiguous, got %s", hr.Confidence) + // With both candidates having GPS and no affinity context, the resolver + // picks the GPS-preferred candidate → confidence is "gps_preference". + if hr.Confidence != "gps_preference" { + t.Fatalf("expected gps_preference, got %s", hr.Confidence) } if len(hr.Candidates) != 2 { t.Fatalf("expected 2 candidates, got %d", len(hr.Candidates)) diff --git a/cmd/server/routes.go b/cmd/server/routes.go index ef0b579e..88f86234 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -118,6 +118,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) { r.Handle("/api/debug/affinity", s.requireAPIKey(http.HandlerFunc(s.handleDebugAffinity))).Methods("GET") // Packet endpoints + r.HandleFunc("/api/packets/observations", s.handleBatchObservations).Methods("POST") r.HandleFunc("/api/packets/timestamps", s.handlePacketTimestamps).Methods("GET") r.HandleFunc("/api/packets/{id}", s.handlePacketDetail).Methods("GET") r.HandleFunc("/api/packets", s.handlePackets).Methods("GET") @@ -145,6 +146,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) { r.HandleFunc("/api/analytics/hash-sizes", s.handleAnalyticsHashSizes).Methods("GET") r.HandleFunc("/api/analytics/hash-collisions", s.handleAnalyticsHashCollisions).Methods("GET") r.HandleFunc("/api/analytics/subpaths", s.handleAnalyticsSubpaths).Methods("GET") + r.HandleFunc("/api/analytics/subpaths-bulk", s.handleAnalyticsSubpathsBulk).Methods("GET") r.HandleFunc("/api/analytics/subpath-detail", s.handleAnalyticsSubpathDetail).Methods("GET") r.HandleFunc("/api/analytics/neighbor-graph", s.handleNeighborGraph).Methods("GET") @@ -718,7 +720,8 @@ func (s *Server) handlePackets(w http.ResponseWriter, r *http.Request) { Until: r.URL.Query().Get("until"), Region: r.URL.Query().Get("region"), Node: r.URL.Query().Get("node"), - Order: "DESC", + Order: "DESC", + ExpandObservations: r.URL.Query().Get("expand") == "observations", } if r.URL.Query().Get("order") == "asc" { q.Order = "ASC" @@ -760,13 +763,6 @@ func (s *Server) handlePackets(w http.ResponseWriter, r *http.Request) { return } - // Strip observations from default response - if r.URL.Query().Get("expand") != "observations" { - for _, p := range result.Packets { - delete(p, "observations") - } - } - writeJSON(w, result) } @@ -791,6 +787,38 @@ var muxBraceParam = regexp.MustCompile(`\{([^}]+)\}`) // perfHexFallback matches hex IDs for perf path normalization fallback. var perfHexFallback = regexp.MustCompile(`[0-9a-f]{8,}`) +// handleBatchObservations returns observations for multiple hashes in a single request. +// POST /api/packets/observations with JSON body: {"hashes": ["abc123", "def456", ...]} +// Response: {"results": {"abc123": [...observations...], "def456": [...], ...}} +// Limited to 200 hashes per request to prevent abuse. +func (s *Server) handleBatchObservations(w http.ResponseWriter, r *http.Request) { + var body struct { + Hashes []string `json:"hashes"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + writeError(w, 400, "invalid JSON body") + return + } + const maxHashes = 200 + if len(body.Hashes) > maxHashes { + writeError(w, 400, fmt.Sprintf("too many hashes (max %d)", maxHashes)) + return + } + if len(body.Hashes) == 0 { + writeJSON(w, map[string]interface{}{"results": map[string]interface{}{}}) + return + } + + results := make(map[string][]ObservationResp, len(body.Hashes)) + if s.store != nil { + for _, hash := range body.Hashes { + obs := s.store.GetObservationsForHash(hash) + results[hash] = mapSliceToObservations(obs) + } + } + writeJSON(w, map[string]interface{}{"results": results}) +} + func (s *Server) handlePacketDetail(w http.ResponseWriter, r *http.Request) { param := mux.Vars(r)["id"] var packet map[string]interface{} @@ -1065,16 +1093,44 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) { return } - prefix1 := strings.ToLower(pubkey) - if len(prefix1) > 2 { - prefix1 = prefix1[:2] - } - prefix2 := strings.ToLower(pubkey) + // Use the precomputed byPathHop index instead of scanning all packets. + // Look up by full pubkey (resolved hops) and by short prefixes (raw hops). + lowerPK := strings.ToLower(pubkey) + prefix2 := lowerPK if len(prefix2) > 4 { prefix2 = prefix2[:4] } + prefix1 := lowerPK + if len(prefix1) > 2 { + prefix1 = prefix1[:2] + } + s.store.mu.RLock() _, pm := s.store.getCachedNodesAndPM() + + // Collect candidate transmissions from the index, deduplicating by tx ID. + seen := make(map[int]bool) + var candidates []*StoreTx + addCandidates := func(key string) { + for _, tx := range s.store.byPathHop[key] { + if !seen[tx.ID] { + seen[tx.ID] = true + candidates = append(candidates, tx) + } + } + } + addCandidates(lowerPK) // full pubkey match (from resolved_path) + addCandidates(prefix1) // 2-char raw hop match + addCandidates(prefix2) // 4-char raw hop match + // Also check any raw hops that start with prefix2 (longer prefixes). + // Raw hops are typically 2 chars, so iterate only keys with HasPrefix + // on the small set of index keys rather than all packets. + for key := range s.store.byPathHop { + if len(key) > 4 && len(key) < len(lowerPK) && strings.HasPrefix(key, prefix2) { + addCandidates(key) + } + } + type pathAgg struct { Hops []PathHopResp Count int @@ -1092,24 +1148,9 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) { hopCache[hop] = r return r } - for _, tx := range s.store.packets { - hops := txGetParsedPath(tx) - if len(hops) == 0 { - continue - } - found := false - for _, hop := range hops { - hl := strings.ToLower(hop) - if hl == prefix1 || hl == prefix2 || strings.HasPrefix(hl, prefix2) { - found = true - break - } - } - if !found { - continue - } - + for _, tx := range candidates { totalTransmissions++ + hops := txGetParsedPath(tx) resolvedHops := make([]PathHopResp, len(hops)) sigParts := make([]string, len(hops)) for i, hop := range hops { @@ -1337,6 +1378,57 @@ func (s *Server) handleAnalyticsSubpaths(w http.ResponseWriter, r *http.Request) }) } +// handleAnalyticsSubpathsBulk returns multiple length-range buckets in a single +// response, avoiding repeated scans of the same packet data. Query format: +// ?groups=2-2:50,3-3:30,4-4:20,5-8:15 (minLen-maxLen:limit per group) +func (s *Server) handleAnalyticsSubpathsBulk(w http.ResponseWriter, r *http.Request) { + region := r.URL.Query().Get("region") + groupsParam := r.URL.Query().Get("groups") + if groupsParam == "" { + writeJSON(w, ErrorResp{Error: "groups parameter required (e.g. groups=2-2:50,3-3:30)"}) + return + } + + var groups []subpathGroup + for _, g := range strings.Split(groupsParam, ",") { + parts := strings.SplitN(g, ":", 2) + if len(parts) != 2 { + writeJSON(w, ErrorResp{Error: "invalid group format: " + g}) + return + } + rangeParts := strings.SplitN(parts[0], "-", 2) + if len(rangeParts) != 2 { + writeJSON(w, ErrorResp{Error: "invalid range format: " + parts[0]}) + return + } + mn, err1 := strconv.Atoi(rangeParts[0]) + mx, err2 := strconv.Atoi(rangeParts[1]) + lim, err3 := strconv.Atoi(parts[1]) + if err1 != nil || err2 != nil || err3 != nil || mn < 2 || mx < mn || lim < 1 { + writeJSON(w, ErrorResp{Error: "invalid group: " + g}) + return + } + groups = append(groups, subpathGroup{mn, mx, lim}) + } + + if s.store == nil { + results := make([]map[string]interface{}, len(groups)) + for i := range groups { + results[i] = map[string]interface{}{"subpaths": []interface{}{}, "totalPaths": 0} + } + writeJSON(w, map[string]interface{}{"results": results}) + return + } + + results := s.store.GetAnalyticsSubpathsBulk(region, groups) + writeJSON(w, map[string]interface{}{"results": results}) +} + +// subpathGroup defines a length-range + limit for the bulk subpaths endpoint. +type subpathGroup struct { + MinLen, MaxLen, Limit int +} + func (s *Server) handleAnalyticsSubpathDetail(w http.ResponseWriter, r *http.Request) { hops := r.URL.Query().Get("hops") if hops == "" { @@ -1406,24 +1498,25 @@ func (s *Server) handleResolveHops(w http.ResponseWriter, r *http.Request) { continue } hopLower := strings.ToLower(hop) - rows, err := s.db.conn.Query("SELECT public_key, name, lat, lon FROM nodes WHERE LOWER(public_key) LIKE ?", hopLower+"%") - if err != nil { - resolved[hop] = &HopResolution{Name: nil, Candidates: []HopCandidate{}, Conflicts: []interface{}{}, Confidence: "ambiguous"} - continue - } + // Resolve candidates from the in-memory prefix map instead of + // issuing per-hop DB queries (fixes N+1 pattern, see #369). var candidates []HopCandidate - for rows.Next() { - var pk string - var name sql.NullString - var lat, lon sql.NullFloat64 - rows.Scan(&pk, &name, &lat, &lon) - candidates = append(candidates, HopCandidate{ - Name: nullStr(name), Pubkey: pk, - Lat: nullFloat(lat), Lon: nullFloat(lon), - }) + if pm != nil { + if matched, ok := pm.m[hopLower]; ok { + for _, ni := range matched { + c := HopCandidate{Pubkey: ni.PublicKey} + if ni.Name != "" { + c.Name = ni.Name + } + if ni.HasGPS { + c.Lat = ni.Lat + c.Lon = ni.Lon + } + candidates = append(candidates, c) + } + } } - rows.Close() if len(candidates) == 0 { resolved[hop] = &HopResolution{Name: nil, Candidates: []HopCandidate{}, Conflicts: []interface{}{}, Confidence: "no_match"} @@ -1546,8 +1639,12 @@ func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) { oneHourAgo := time.Now().Add(-1 * time.Hour).Unix() pktCounts := s.db.GetObserverPacketCounts(oneHourAgo) - // Batch lookup: node locations (observer ID may match a node public_key) - nodeLocations := s.db.GetNodeLocations() + // Batch lookup: node locations only for observer IDs (not all nodes) + observerIDs := make([]string, len(observers)) + for i, o := range observers { + observerIDs[i] = o.ID + } + nodeLocations := s.db.GetNodeLocationsByKeys(observerIDs) result := make([]ObserverResp, 0, len(observers)) for _, o := range observers { diff --git a/cmd/server/routes_test.go b/cmd/server/routes_test.go index f8ae65db..4b0052d9 100644 --- a/cmd/server/routes_test.go +++ b/cmd/server/routes_test.go @@ -1105,6 +1105,63 @@ func TestAnalyticsSubpaths(t *testing.T) { } } +func TestAnalyticsSubpathsBulk(t *testing.T) { + _, router := setupTestServer(t) + + // Valid request with multiple groups. + req := httptest.NewRequest("GET", "/api/analytics/subpaths-bulk?groups=2-2:50,3-3:30,5-8:15", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + if w.Code != 200 { + t.Fatalf("expected 200, got %d", w.Code) + } + var body map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &body) + results, ok := body["results"].([]interface{}) + if !ok { + t.Fatal("expected results array") + } + if len(results) != 3 { + t.Errorf("expected 3 result groups, got %d", len(results)) + } + // Each result should have subpaths and totalPaths. + for i, r := range results { + rm, ok := r.(map[string]interface{}) + if !ok { + t.Fatalf("result %d not a map", i) + } + if _, ok := rm["subpaths"]; !ok { + t.Errorf("result %d missing subpaths", i) + } + if _, ok := rm["totalPaths"]; !ok { + t.Errorf("result %d missing totalPaths", i) + } + } + + // Missing groups param → error. + req2 := httptest.NewRequest("GET", "/api/analytics/subpaths-bulk", nil) + w2 := httptest.NewRecorder() + router.ServeHTTP(w2, req2) + if w2.Code != 200 { + t.Fatalf("expected 200 with error body, got %d", w2.Code) + } + var errBody map[string]interface{} + json.Unmarshal(w2.Body.Bytes(), &errBody) + if _, ok := errBody["error"]; !ok { + t.Error("expected error field for missing groups param") + } + + // Invalid group format. + req3 := httptest.NewRequest("GET", "/api/analytics/subpaths-bulk?groups=bad", nil) + w3 := httptest.NewRecorder() + router.ServeHTTP(w3, req3) + var errBody3 map[string]interface{} + json.Unmarshal(w3.Body.Bytes(), &errBody3) + if _, ok := errBody3["error"]; !ok { + t.Error("expected error for invalid group format") + } +} + func TestAnalyticsSubpathDetailWithHops(t *testing.T) { _, router := setupTestServer(t) req := httptest.NewRequest("GET", "/api/analytics/subpath-detail?hops=aa,bb", nil) @@ -1170,6 +1227,11 @@ func TestResolveHopsAmbiguous(t *testing.T) { cfg := &Config{Port: 3000} hub := NewHub() srv := NewServer(db, cfg, hub) + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("store.Load failed: %v", err) + } + srv.store = store router := mux.NewRouter() srv.RegisterRoutes(router) @@ -2105,7 +2167,7 @@ tx := &StoreTx{ ID: 9000 + i, RawHex: rawHex, Hash: "testhash" + strconv.Itoa(i), -FirstSeen: "2024-01-01T00:00:00Z", +FirstSeen: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), PayloadType: &payloadType, DecodedJSON: decoded, } @@ -2151,7 +2213,7 @@ for i, raw := range raws { ID: 8000 + i, RawHex: raw, Hash: "dominant" + strconv.Itoa(i), - FirstSeen: "2024-01-01T00:00:00Z", + FirstSeen: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), PayloadType: &payloadType, DecodedJSON: decoded, } @@ -2190,12 +2252,13 @@ func TestGetNodeHashSizeInfoLatestWins(t *testing.T) { // 4 historical 1-byte adverts, then 1 recent 2-byte advert (latest). // Mode would pick 1 (majority), but latest-wins should pick 2. raws := []string{raw1byte, raw1byte, raw1byte, raw1byte, raw2byte} + baseTime := time.Now().UTC().Add(-1 * time.Hour) for i, raw := range raws { tx := &StoreTx{ ID: 7000 + i, RawHex: raw, Hash: "latest" + strconv.Itoa(i), - FirstSeen: "2024-01-01T0" + strconv.Itoa(i) + ":00:00Z", + FirstSeen: baseTime.Add(time.Duration(i) * time.Minute).Format("2006-01-02T15:04:05.000Z"), PayloadType: &payloadType, DecodedJSON: decoded, } @@ -2236,12 +2299,13 @@ func TestGetNodeHashSizeInfoIgnoreDirectZeroHop(t *testing.T) { payloadType := 4 raws := []string{rawFlood2B, rawDirect0, rawFlood2B, rawDirect0, rawFlood2B} + baseTime2 := time.Now().UTC().Add(-1 * time.Hour) for i, raw := range raws { tx := &StoreTx{ ID: 9150 + i, RawHex: raw, Hash: "dirignore" + strconv.Itoa(i), - FirstSeen: "2024-01-01T0" + strconv.Itoa(i) + ":00:00Z", + FirstSeen: baseTime2.Add(time.Duration(i) * time.Minute).Format("2006-01-02T15:04:05.000Z"), PayloadType: &payloadType, DecodedJSON: decoded, } @@ -2284,7 +2348,7 @@ func TestGetNodeHashSizeInfoOnlyDirectZeroHopIgnored(t *testing.T) { ID: 9160, RawHex: rawDirect0, Hash: "onlydirect0", - FirstSeen: "2024-01-01T00:00:00Z", + FirstSeen: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), PayloadType: &payloadType, DecodedJSON: decoded, } @@ -2320,7 +2384,7 @@ func TestGetNodeHashSizeInfoDirectNonZeroHopCounted(t *testing.T) { ID: 9170, RawHex: rawDirectNonZero, Hash: "dirnonzero0", - FirstSeen: "2024-01-01T00:00:00Z", + FirstSeen: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), PayloadType: &payloadType, DecodedJSON: decoded, } @@ -2355,7 +2419,7 @@ func TestGetNodeHashSizeInfoNoAdverts(t *testing.T) { ID: 6000, RawHex: "0440aabb", Hash: "noadverts0", - FirstSeen: "2024-01-01T00:00:00Z", + FirstSeen: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), PayloadType: &payloadType, DecodedJSON: `{"pubKey":"` + pk + `"}`, } @@ -2397,7 +2461,7 @@ func TestHashAnalyticsZeroHopAdvert(t *testing.T) { ID: 8000, RawHex: raw, Hash: "zerohop0", - FirstSeen: "2024-01-01T00:00:00Z", + FirstSeen: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), PayloadType: &payloadType, DecodedJSON: decoded, // No PathJSON → txGetParsedPath returns nil (zero hops) @@ -2451,7 +2515,7 @@ func TestAnalyticsHashSizeSameNameDifferentPubkey(t *testing.T) { ID: 6100 + i, RawHex: raw2byte, Hash: "samename" + strconv.Itoa(i), - FirstSeen: "2024-01-01T00:00:00Z", + FirstSeen: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), PayloadType: &payloadType, DecodedJSON: decoded, PathJSON: `["AABB"]`, @@ -2491,6 +2555,158 @@ t.Errorf("field %q is null, expected []", field) } } } +func TestInconsistentNodesExcludesCompanions(t *testing.T) { + // Issue #566: inconsistentNodes should only include repeaters and room servers. + db := setupTestDB(t) + seedTestData(t, db) + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("store.Load failed: %v", err) + } + + now := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + payloadType := 4 + + // Create three nodes: repeater, room_server, companion — all with inconsistent hash sizes + nodes := []struct { + pk string + role string + }{ + {"aa11111111111111111111111111111111111111111111111111111111111111", "repeater"}, + {"bb22222222222222222222222222222222222222222222222222222222222222", "room_server"}, + {"cc33333333333333333333333333333333333333333333333333333333333333", "companion"}, + } + + for ni, n := range nodes { + db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, ?, ?)", n.pk, "Node-"+n.role, n.role) + decoded := `{"name":"Node-` + n.role + `","pubKey":"` + n.pk + `"}` + // Create flip-flop pattern: 1-byte, 2-byte, 1-byte (transitions=2 → inconsistent) + // Use header 0x11 (routeType=FLOOD, payloadType=4) and pathByte 0x41/0x81 + // (non-zero hop count) so packets aren't skipped by direct zero-hop filter. + raws := []string{"11" + "41" + "aabb", "11" + "81" + "aabb", "11" + "41" + "aabb"} + for i, raw := range raws { + tx := &StoreTx{ + ID: 7000 + ni*10 + i, + RawHex: raw, + Hash: "incon-" + n.role + strconv.Itoa(i), + FirstSeen: now, + PayloadType: &payloadType, + DecodedJSON: decoded, + } + store.packets = append(store.packets, tx) + store.byPayloadType[4] = append(store.byPayloadType[4], tx) + } + } + + cfg := &Config{Port: 3000} + hub := NewHub() + srv := NewServer(db, cfg, hub) + srv.store = store + router := mux.NewRouter() + srv.RegisterRoutes(router) + + req := httptest.NewRequest("GET", "/api/analytics/hash-collisions", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + if w.Code != 200 { + t.Fatalf("expected 200, got %d", w.Code) + } + var body map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &body) + + incon := body["inconsistent_nodes"].([]interface{}) + for _, item := range incon { + node := item.(map[string]interface{}) + role := node["role"].(string) + if role == "companion" { + t.Error("companion node should be excluded from inconsistent_nodes") + } + } + + // Repeater and room_server should be present + roles := make(map[string]bool) + for _, item := range incon { + node := item.(map[string]interface{}) + roles[node["role"].(string)] = true + } + if !roles["repeater"] { + t.Error("expected repeater in inconsistent_nodes") + } + if !roles["room_server"] { + t.Error("expected room_server in inconsistent_nodes") + } +} + +func TestHashSizeInfoTimeWindow(t *testing.T) { + // Issue #566: adverts older than 7 days should be excluded from hash size computation. + db := setupTestDB(t) + seedTestData(t, db) + store := NewPacketStore(db, nil) + if err := store.Load(); err != nil { + t.Fatalf("store.Load failed: %v", err) + } + + pk := "dd44444444444444444444444444444444444444444444444444444444444444" + db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'OldNode', 'repeater')", pk) + + decoded := `{"name":"OldNode","pubKey":"` + pk + `"}` + payloadType := 4 + + // Old adverts (>7 days ago) with flip-flop pattern + // Use header 0x11 (routeType=FLOOD) and pathByte 0x41/0x81 (non-zero hop count) + // so packets aren't skipped by direct zero-hop filter. + oldTime := time.Now().UTC().Add(-10 * 24 * time.Hour).Format("2006-01-02T15:04:05.000Z") + oldRaws := []string{"11" + "41" + "aabb", "11" + "81" + "aabb", "11" + "41" + "aabb"} + for i, raw := range oldRaws { + tx := &StoreTx{ + ID: 6000 + i, + RawHex: raw, + Hash: "old-" + strconv.Itoa(i), + FirstSeen: oldTime, + PayloadType: &payloadType, + DecodedJSON: decoded, + } + store.packets = append(store.packets, tx) + store.byPayloadType[4] = append(store.byPayloadType[4], tx) + } + + info := store.GetNodeHashSizeInfo() + ni := info[pk] + if ni != nil && ni.Inconsistent { + t.Error("old adverts (>7 days) should be excluded; node should not be flagged as inconsistent") + } + + // Now add recent adverts with consistent hash size — should appear in info + pk2 := "ee55555555555555555555555555555555555555555555555555555555555555" + db.conn.Exec("INSERT OR IGNORE INTO nodes (public_key, name, role) VALUES (?, 'NewNode', 'repeater')", pk2) + decoded2 := `{"name":"NewNode","pubKey":"` + pk2 + `"}` + recentTime := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + for i := 0; i < 3; i++ { + tx := &StoreTx{ + ID: 6100 + i, + RawHex: "11" + "41" + "aabb", + Hash: "new-" + strconv.Itoa(i), + FirstSeen: recentTime, + PayloadType: &payloadType, + DecodedJSON: decoded2, + } + store.packets = append(store.packets, tx) + store.byPayloadType[4] = append(store.byPayloadType[4], tx) + } + + // Invalidate cache before second call + store.hashSizeInfoMu.Lock() + store.hashSizeInfoCache = nil + store.hashSizeInfoMu.Unlock() + + info2 := store.GetNodeHashSizeInfo() + ni2 := info2[pk2] + if ni2 == nil { + t.Error("recent adverts should be included in hash size info") + } +} + func TestObserverAnalyticsNoStore(t *testing.T) { _, router := setupNoStoreServer(t) req := httptest.NewRequest("GET", "/api/observers/obs1/analytics", nil) @@ -3277,3 +3493,93 @@ func TestHashCollisionsOnlyRepeaters(t *testing.T) { t.Errorf("expected 2 nodes in collision, got %d", len(collisions[0].Nodes)) } } + +func TestNodePathsEndpointUsesIndex(t *testing.T) { + srv, router := setupTestServer(t) + + // Verify byPathHop index was built during Load + srv.store.mu.RLock() + hopKeys := len(srv.store.byPathHop) + srv.store.mu.RUnlock() + if hopKeys == 0 { + t.Fatal("byPathHop index is empty after Load") + } + + // Query paths for TestRepeater (pubkey aabbccdd11223344, prefix "aa") + // Should find transmissions with hop "aa" in path + req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344/paths", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + if w.Code != 200 { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp struct { + Paths []json.RawMessage `json:"paths"` + TotalTransmissions int `json:"totalTransmissions"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("bad JSON: %v", err) + } + + // Transmission 1 has path ["aa","bb"] which contains "aa" matching prefix of aabbccdd11223344 + if resp.TotalTransmissions == 0 { + t.Error("expected at least 1 transmission matching node paths") + } + if len(resp.Paths) == 0 { + t.Error("expected at least 1 path group") + } +} + +func TestPathHopIndexIncrementalUpdate(t *testing.T) { + // Test that addTxToPathHopIndex and removeTxFromPathHopIndex work correctly + idx := make(map[string][]*StoreTx) + + pk1 := "fullpubkey1" + tx1 := &StoreTx{ + ID: 1, + PathJSON: `["ab","cd"]`, + ResolvedPath: []*string{&pk1, nil}, + } + + addTxToPathHopIndex(idx, tx1) + + // Should be indexed under "ab", "cd", and "fullpubkey1" + if len(idx["ab"]) != 1 { + t.Errorf("expected 1 entry for 'ab', got %d", len(idx["ab"])) + } + if len(idx["cd"]) != 1 { + t.Errorf("expected 1 entry for 'cd', got %d", len(idx["cd"])) + } + if len(idx["fullpubkey1"]) != 1 { + t.Errorf("expected 1 entry for resolved pubkey, got %d", len(idx["fullpubkey1"])) + } + + // Add another tx with overlapping hop + tx2 := &StoreTx{ + ID: 2, + PathJSON: `["ab","ef"]`, + } + addTxToPathHopIndex(idx, tx2) + + if len(idx["ab"]) != 2 { + t.Errorf("expected 2 entries for 'ab', got %d", len(idx["ab"])) + } + if len(idx["ef"]) != 1 { + t.Errorf("expected 1 entry for 'ef', got %d", len(idx["ef"])) + } + + // Remove tx1 + removeTxFromPathHopIndex(idx, tx1) + + if len(idx["ab"]) != 1 { + t.Errorf("expected 1 entry for 'ab' after removal, got %d", len(idx["ab"])) + } + if _, ok := idx["cd"]; ok { + t.Error("expected 'cd' key to be deleted after removal") + } + if _, ok := idx["fullpubkey1"]; ok { + t.Error("expected resolved pubkey key to be deleted after removal") + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index 0c5da5eb..334d8242 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "math" + "runtime" "sort" "strconv" "strings" @@ -40,14 +41,16 @@ type StoreTx struct { PathJSON string Direction string ResolvedPath []*string // resolved path from best observation - LatestSeen string // max observation timestamp (or FirstSeen if no observations) + LatestSeen string // max observation timestamp (or FirstSeen if no observations) + UniqueObserverCount int // cached count of distinct observer IDs // Cached parsed fields (set once, read many) parsedPath []string // cached parsePathJSON result pathParsed bool // whether parsedPath has been set decodedOnce sync.Once // guards parsedDecoded parsedDecoded map[string]interface{} // cached json.Unmarshal of DecodedJSON // Dedup map: "observerID|pathJSON" → true for O(1) duplicate checks - obsKeys map[string]bool + obsKeys map[string]bool + observerSet map[string]bool // unique observer IDs (for UniqueObserverCount) } // StoreObs is a lean in-memory observation (no duplication of transmission fields). @@ -76,10 +79,6 @@ func (tx *StoreTx) ParsedDecoded() map[string]interface{} { return tx.parsedDecoded } -// distRebuildInterval is the minimum time between distance index rebuilds -// to avoid hot-looping on busy meshes. -const distRebuildInterval = 30 * time.Second - // PacketStore holds all transmissions in memory with indexes for fast queries. type PacketStore struct { mu sync.RWMutex @@ -88,9 +87,12 @@ type PacketStore struct { byHash map[string]*StoreTx // hash → *StoreTx byTxID map[int]*StoreTx // transmission_id → *StoreTx byObsID map[int]*StoreObs // observation_id → *StoreObs + maxTxID int // highest transmission_id in store + maxObsID int // highest observation_id in store byObserver map[string][]*StoreObs // observer_id → observations byNode map[string][]*StoreTx // pubkey → transmissions nodeHashes map[string]map[string]bool // pubkey → Set + byPathHop map[string][]*StoreTx // lowercase hop/pubkey → transmissions with that hop in path byPayloadType map[int][]*StoreTx // payload_type → transmissions loaded bool totalObs int @@ -114,15 +116,20 @@ type PacketStore struct { pendingInv *cacheInvalidation // accumulated dirty flags during cooldown invCooldown time.Duration // minimum time between invalidations // Short-lived cache for QueryGroupedPackets (avoids repeated full sort) - groupedCacheMu sync.Mutex - groupedCacheKey string - groupedCacheExp time.Time - groupedCacheRes *PacketResult + groupedCacheMu sync.Mutex + groupedCacheKey string + groupedCacheExp time.Time + groupedCacheTxs []*StoreTx // sorted by LatestSeen DESC + groupedCacheTotal int // Short-lived cache for GetChannels (avoids repeated full scan + JSON unmarshal) channelsCacheMu sync.Mutex channelsCacheKey string channelsCacheExp time.Time channelsCacheRes []map[string]interface{} + // Cached region → observer ID mapping (30s TTL, avoids repeated DB queries) + regionObsMu sync.Mutex + regionObsCache map[string]map[string]bool + regionObsCacheTime time.Time // Cached node list + prefix map (rebuilt on demand, shared across analytics) nodeCache []nodeInfo nodePM *prefixMap @@ -130,14 +137,13 @@ type PacketStore struct { // Precomputed subpath index: raw comma-joined hops → occurrence count. // Built during Load(), incrementally updated on ingest. Avoids full // packet iteration at query time (O(unique_subpaths) vs O(total_packets)). - spIndex map[string]int // "hop1,hop2" → count - spTotalPaths int // transmissions with paths >= 2 hops + spIndex map[string]int // "hop1,hop2" → count + spTxIndex map[string][]*StoreTx // "hop1,hop2" → transmissions containing this subpath + spTotalPaths int // transmissions with paths >= 2 hops // Precomputed distance analytics: hop distances and path totals // computed during Load() and incrementally updated on ingest. distHops []distHopRecord distPaths []distPathRecord - distDirty bool // set when paths change; cleared after rebuild - distLast time.Time // last time distance index was rebuilt // Cached GetNodeHashSizeInfo result — recomputed at most once every 15s hashSizeInfoMu sync.Mutex @@ -152,9 +158,10 @@ type PacketStore struct { graph *NeighborGraph // Eviction config and stats - retentionHours float64 // 0 = unlimited - maxMemoryMB int // 0 = unlimited - evicted int64 // total packets evicted + retentionHours float64 // 0 = unlimited + maxMemoryMB int // 0 = unlimited + evicted int64 // total packets evicted + memoryEstimator func() float64 // injectable for tests; nil = use runtime.ReadMemStats } // Precomputed distance records for fast analytics aggregation. @@ -204,6 +211,7 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig) *PacketStore { byObsID: make(map[int]*StoreObs, 65536), byObserver: make(map[string][]*StoreObs), byNode: make(map[string][]*StoreTx), + byPathHop: make(map[string][]*StoreTx), nodeHashes: make(map[string]map[string]bool), byPayloadType: make(map[int][]*StoreTx), rfCache: make(map[string]*cachedResult), @@ -218,6 +226,7 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig) *PacketStore { collisionCacheTTL: 60 * time.Second, invCooldown: 10 * time.Second, spIndex: make(map[string]int, 4096), + spTxIndex: make(map[string][]*StoreTx, 4096), advertPubkeys: make(map[string]int), } if cfg != nil { @@ -299,10 +308,14 @@ func (s *PacketStore) Load() error { PayloadType: nullIntPtr(payloadType), DecodedJSON: nullStrVal(decodedJSON), obsKeys: make(map[string]bool), + observerSet: make(map[string]bool), } s.byHash[hashStr] = tx s.packets = append(s.packets, tx) s.byTxID[txID] = tx + if txID > s.maxTxID { + s.maxTxID = txID + } s.indexByNode(tx) if tx.PayloadType != nil { pt := *tx.PayloadType @@ -338,12 +351,19 @@ func (s *PacketStore) Load() error { tx.Observations = append(tx.Observations, obs) tx.obsKeys[dk] = true + if obs.ObserverID != "" && !tx.observerSet[obs.ObserverID] { + tx.observerSet[obs.ObserverID] = true + tx.UniqueObserverCount++ + } tx.ObservationCount++ if obs.Timestamp > tx.LatestSeen { tx.LatestSeen = obs.Timestamp } s.byObsID[oid] = obs + if oid > s.maxObsID { + s.maxObsID = oid + } if obsIDStr != "" { s.byObserver[obsIDStr] = append(s.byObserver[obsIDStr], obs) @@ -361,15 +381,16 @@ func (s *PacketStore) Load() error { // Build precomputed subpath index for O(1) analytics queries s.buildSubpathIndex() + // Build path-hop index for O(1) node path lookups + s.buildPathHopIndex() + // Precompute distance analytics (hop distances, path totals) s.buildDistanceIndex() - s.distLast = time.Now() s.loaded = true elapsed := time.Since(t0) - estMB := (len(s.packets)*5120 + s.totalObs*500) / (1024 * 1024) - log.Printf("[store] Loaded %d transmissions (%d observations) in %v (~%dMB est)", - len(s.packets), s.totalObs, elapsed, estMB) + log.Printf("[store] Loaded %d transmissions (%d observations) in %v (heap ~%.0fMB)", + len(s.packets), s.totalObs, elapsed, s.estimatedMemoryMB()) return nil } @@ -513,7 +534,7 @@ func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult { packets := make([]map[string]interface{}, 0, pageSize) if q.Order == "ASC" { for _, tx := range results[start : start+pageSize] { - packets = append(packets, txToMap(tx)) + packets = append(packets, txToMap(tx, q.ExpandObservations)) } } else { // DESC: newest items are at the tail; page 0 = last pageSize items reversed @@ -523,7 +544,7 @@ func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult { startIdx = 0 } for i := endIdx - 1; i >= startIdx; i-- { - packets = append(packets, txToMap(results[i])) + packets = append(packets, txToMap(results[i], q.ExpandObservations)) } } return &PacketResult{Packets: packets, Total: total} @@ -548,77 +569,37 @@ func (s *PacketStore) QueryGroupedPackets(q PacketQuery) *PacketResult { // Return cached sorted list if still fresh (3s TTL) s.groupedCacheMu.Lock() - if s.groupedCacheRes != nil && s.groupedCacheKey == cacheKey && time.Now().Before(s.groupedCacheExp) { - cached := s.groupedCacheRes + if s.groupedCacheTxs != nil && s.groupedCacheKey == cacheKey && time.Now().Before(s.groupedCacheExp) { + cachedTxs := s.groupedCacheTxs + cachedTotal := s.groupedCacheTotal s.groupedCacheMu.Unlock() - return pagePacketResult(cached, q.Offset, q.Limit) + return groupedTxsToPage(cachedTxs, cachedTotal, q.Offset, q.Limit) } s.groupedCacheMu.Unlock() - // Build entries under read lock (observer scan needs lock), sort outside it. - type groupEntry struct { - latest map[string]interface{} - ts string - } - var entries []groupEntry - + // Collect StoreTx pointers under read lock; sort outside it. s.mu.RLock() results := s.filterPackets(q) - entries = make([]groupEntry, 0, len(results)) - for _, tx := range results { - observerCount := 0 - seen := make(map[string]bool) - for _, obs := range tx.Observations { - if obs.ObserverID != "" && !seen[obs.ObserverID] { - seen[obs.ObserverID] = true - observerCount++ - } - } - entries = append(entries, groupEntry{ - ts: tx.LatestSeen, - latest: map[string]interface{}{ - "hash": strOrNil(tx.Hash), - "first_seen": strOrNil(tx.FirstSeen), - "count": tx.ObservationCount, - "observer_count": observerCount, - "observation_count": tx.ObservationCount, - "latest": strOrNil(tx.LatestSeen), - "observer_id": strOrNil(tx.ObserverID), - "observer_name": strOrNil(tx.ObserverName), - "path_json": strOrNil(tx.PathJSON), - "payload_type": intPtrOrNil(tx.PayloadType), - "route_type": intPtrOrNil(tx.RouteType), - "raw_hex": strOrNil(tx.RawHex), - "decoded_json": strOrNil(tx.DecodedJSON), - "snr": floatPtrOrNil(tx.SNR), - "rssi": floatPtrOrNil(tx.RSSI), - }, - }) - if tx.ResolvedPath != nil { - entries[len(entries)-1].latest["resolved_path"] = tx.ResolvedPath - } - } + txs := make([]*StoreTx, len(results)) + copy(txs, results) s.mu.RUnlock() - // Sort outside the lock — only touches our local slice. - sort.Slice(entries, func(i, j int) bool { - return entries[i].ts > entries[j].ts - }) - - packets := make([]map[string]interface{}, len(entries)) - for i, e := range entries { - packets[i] = e.latest - } + total := len(txs) - full := &PacketResult{Packets: packets, Total: len(packets)} + // Full sort by LatestSeen DESC so the cached slice supports all page offsets. + sort.Slice(txs, func(i, j int) bool { + return txs[i].LatestSeen > txs[j].LatestSeen + }) + // Cache the sorted StoreTx slice (not maps) — lightweight and reusable for any page. s.groupedCacheMu.Lock() - s.groupedCacheRes = full + s.groupedCacheTxs = txs + s.groupedCacheTotal = total s.groupedCacheKey = cacheKey s.groupedCacheExp = time.Now().Add(3 * time.Second) s.groupedCacheMu.Unlock() - return pagePacketResult(full, q.Offset, q.Limit) + return groupedTxsToPage(txs, total, q.Offset, q.Limit) } // pagePacketResult returns a window of a PacketResult without re-allocating the slice. @@ -634,6 +615,46 @@ func pagePacketResult(r *PacketResult, offset, limit int) *PacketResult { return &PacketResult{Packets: r.Packets[offset:end], Total: total} } +// groupedTxsToPage builds map representations only for the requested page of sorted StoreTx pointers. +// This avoids allocating maps for all 30K+ transmissions when only 50 are needed. +func groupedTxsToPage(txs []*StoreTx, total, offset, limit int) *PacketResult { + if offset >= len(txs) { + return &PacketResult{Packets: []map[string]interface{}{}, Total: total} + } + end := offset + limit + if end > len(txs) { + end = len(txs) + } + page := txs[offset:end] + + packets := make([]map[string]interface{}, len(page)) + for i, tx := range page { + m := map[string]interface{}{ + "hash": strOrNil(tx.Hash), + "first_seen": strOrNil(tx.FirstSeen), + "count": tx.ObservationCount, + "observer_count": tx.UniqueObserverCount, + "observation_count": tx.ObservationCount, + "latest": strOrNil(tx.LatestSeen), + "observer_id": strOrNil(tx.ObserverID), + "observer_name": strOrNil(tx.ObserverName), + "path_json": strOrNil(tx.PathJSON), + "payload_type": intPtrOrNil(tx.PayloadType), + "route_type": intPtrOrNil(tx.RouteType), + "raw_hex": strOrNil(tx.RawHex), + "decoded_json": strOrNil(tx.DecodedJSON), + "snr": floatPtrOrNil(tx.SNR), + "rssi": floatPtrOrNil(tx.RSSI), + } + if tx.ResolvedPath != nil { + m["resolved_path"] = tx.ResolvedPath + } + packets[i] = m + } + + return &PacketResult{Packets: packets, Total: total} +} + // GetStoreStats returns aggregate counts (packet data from memory, node/observer from DB). func (s *PacketStore) GetStoreStats() (*Stats, error) { s.mu.RLock() @@ -648,15 +669,42 @@ func (s *PacketStore) GetStoreStats() (*Stats, error) { } sevenDaysAgo := time.Now().Add(-7 * 24 * time.Hour).Format(time.RFC3339) - s.db.conn.QueryRow("SELECT COUNT(*) FROM nodes WHERE last_seen > ?", sevenDaysAgo).Scan(&st.TotalNodes) - s.db.conn.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&st.TotalNodesAllTime) - s.db.conn.QueryRow("SELECT COUNT(*) FROM observers").Scan(&st.TotalObservers) - oneHourAgo := time.Now().Add(-1 * time.Hour).Unix() - s.db.conn.QueryRow("SELECT COUNT(*) FROM observations WHERE timestamp > ?", oneHourAgo).Scan(&st.PacketsLastHour) - oneDayAgo := time.Now().Add(-24 * time.Hour).Unix() - s.db.conn.QueryRow("SELECT COUNT(*) FROM observations WHERE timestamp > ?", oneDayAgo).Scan(&st.PacketsLast24h) + + // Run node/observer counts and observation counts concurrently (2 queries instead of 5). + var wg sync.WaitGroup + var nodeErr, obsErr error + + wg.Add(2) + go func() { + defer wg.Done() + nodeErr = s.db.conn.QueryRow( + `SELECT + (SELECT COUNT(*) FROM nodes WHERE last_seen > ?) AS active_nodes, + (SELECT COUNT(*) FROM nodes) AS all_nodes, + (SELECT COUNT(*) FROM observers) AS observers`, + sevenDaysAgo, + ).Scan(&st.TotalNodes, &st.TotalNodesAllTime, &st.TotalObservers) + }() + go func() { + defer wg.Done() + obsErr = s.db.conn.QueryRow( + `SELECT + COALESCE(SUM(CASE WHEN timestamp > ? THEN 1 ELSE 0 END), 0), + COALESCE(SUM(CASE WHEN timestamp > ? THEN 1 ELSE 0 END), 0) + FROM observations WHERE timestamp > ?`, + oneHourAgo, oneDayAgo, oneDayAgo, + ).Scan(&st.PacketsLastHour, &st.PacketsLast24h) + }() + wg.Wait() + + if nodeErr != nil { + return st, nodeErr + } + if obsErr != nil { + return st, obsErr + } return st, nil } @@ -671,14 +719,14 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { obsIdx := len(s.byObsID) observerIdx := len(s.byObserver) nodeIdx := len(s.byNode) + pathHopIdx := len(s.byPathHop) ptIdx := len(s.byPayloadType) // Distinct advert pubkey count — precomputed incrementally (see trackAdvertPubkey). advertByObsCount := len(s.advertPubkeys) s.mu.RUnlock() - // Realistic estimate: ~5KB per packet + ~500 bytes per observation - estimatedMB := math.Round(float64(totalLoaded*5120+totalObs*500)/1048576*10) / 10 + estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10 evicted := atomic.LoadInt64(&s.evicted) @@ -699,6 +747,7 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { "byObsID": obsIdx, "byObserver": observerIdx, "byNode": nodeIdx, + "byPathHop": pathHopIdx, "byPayloadType": ptIdx, "advertByObserver": advertByObsCount, }, @@ -848,7 +897,7 @@ func (s *PacketStore) GetPerfStoreStatsTyped() PerfPacketStoreStats { advertByObsCount := len(s.advertPubkeys) s.mu.RUnlock() - estimatedMB := math.Round(float64(totalLoaded*5120+totalObs*500)/1048576*10) / 10 + estimatedMB := math.Round(s.estimatedMemoryMB()*10) / 10 return PerfPacketStoreStats{ TotalLoaded: totalLoaded, @@ -879,7 +928,7 @@ func (s *PacketStore) GetTransmissionByID(id int) map[string]interface{} { if tx == nil { return nil } - return txToMap(tx) + return txToMap(tx, true) } // GetPacketByHash returns a transmission by content hash. @@ -891,7 +940,7 @@ func (s *PacketStore) GetPacketByHash(hash string) map[string]interface{} { if tx == nil { return nil } - return txToMap(tx) + return txToMap(tx, true) } // GetPacketByID returns an observation (enriched with transmission fields) by observation ID. @@ -961,29 +1010,28 @@ func (s *PacketStore) QueryMultiNodePackets(pubkeys []string, limit, offset int, resolved[i] = s.db.resolveNodePubkey(pk) } + // Use byNode index instead of scanning all packets (O(indexed) vs O(all×pubkeys×json)). + hashSet := make(map[string]bool) var filtered []*StoreTx - for _, tx := range s.packets { - if tx.DecodedJSON == "" { - continue - } - match := false - for _, pk := range resolved { - if strings.Contains(tx.DecodedJSON, pk) { - match = true - break + for _, pk := range resolved { + for _, tx := range s.byNode[pk] { + if hashSet[tx.Hash] { + continue } + if since != "" && tx.FirstSeen < since { + continue + } + if until != "" && tx.FirstSeen > until { + continue + } + hashSet[tx.Hash] = true + filtered = append(filtered, tx) } - if !match { - continue - } - if since != "" && tx.FirstSeen < since { - continue - } - if until != "" && tx.FirstSeen > until { - continue - } - filtered = append(filtered, tx) } + // Sort oldest-first to match pagination expectations (same as s.packets order). + sort.Slice(filtered, func(i, j int) bool { + return filtered[i].FirstSeen < filtered[j].FirstSeen + }) total := len(filtered) @@ -1150,10 +1198,14 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac PayloadType: r.payloadType, DecodedJSON: r.decodedJSON, obsKeys: make(map[string]bool), + observerSet: make(map[string]bool), } s.byHash[r.hash] = tx s.packets = append(s.packets, tx) // oldest-first; new items go to tail s.byTxID[r.txID] = tx + if r.txID > s.maxTxID { + s.maxTxID = r.txID + } s.indexByNode(tx) if tx.PayloadType != nil { pt := *tx.PayloadType @@ -1175,6 +1227,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac dk := r.observerID + "|" + r.pathJSON if tx.obsKeys == nil { tx.obsKeys = make(map[string]bool) + tx.observerSet = make(map[string]bool) } if tx.obsKeys[dk] { continue @@ -1201,11 +1254,18 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac tx.Observations = append(tx.Observations, obs) tx.obsKeys[dk] = true + if obs.ObserverID != "" && !tx.observerSet[obs.ObserverID] { + tx.observerSet[obs.ObserverID] = true + tx.UniqueObserverCount++ + } tx.ObservationCount++ if obs.Timestamp > tx.LatestSeen { tx.LatestSeen = obs.Timestamp } s.byObsID[oid] = obs + if oid > s.maxObsID { + s.maxObsID = oid + } if r.observerID != "" { s.byObserver[r.observerID] = append(s.byObserver[r.observerID], obs) } @@ -1220,9 +1280,10 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac // Incrementally update precomputed subpath index with new transmissions for _, tx := range broadcastTxs { - if addTxToSubpathIndex(s.spIndex, tx) { + if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) { s.spTotalPaths++ } + addTxToPathHopIndex(s.byPathHop, tx) } // Incrementally update precomputed distance index with new transmissions @@ -1465,6 +1526,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] dk := r.observerID + "|" + r.pathJSON if tx.obsKeys == nil { tx.obsKeys = make(map[string]bool) + tx.observerSet = make(map[string]bool) } if tx.obsKeys[dk] { continue @@ -1492,12 +1554,19 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] tx.Observations = append(tx.Observations, obs) tx.obsKeys[dk] = true + if obs.ObserverID != "" && !tx.observerSet[obs.ObserverID] { + tx.observerSet[obs.ObserverID] = true + tx.UniqueObserverCount++ + } tx.ObservationCount++ newObs = append(newObs, obs) if obs.Timestamp > tx.LatestSeen { tx.LatestSeen = obs.Timestamp } s.byObsID[r.obsID] = obs + if r.obsID > s.maxObsID { + s.maxObsID = r.obsID + } if r.observerID != "" { s.byObserver[r.observerID] = append(s.byObserver[r.observerID], obs) } @@ -1548,8 +1617,10 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] // Re-pick best observation for updated transmissions and update subpath index // if the path changed. oldPaths := make(map[int]string, len(updatedTxs)) + oldResolvedPaths := make(map[int][]*string, len(updatedTxs)) for txID, tx := range updatedTxs { oldPaths[txID] = tx.PathJSON + oldResolvedPaths[txID] = tx.ResolvedPath } for _, tx := range updatedTxs { pickBestObservation(tx) @@ -1562,36 +1633,41 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] // Temporarily set parsedPath to old hops for removal. saved, savedFlag := tx.parsedPath, tx.pathParsed tx.parsedPath, tx.pathParsed = oldHops, true - if removeTxFromSubpathIndex(s.spIndex, tx) { + if removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx) { s.spTotalPaths-- } tx.parsedPath, tx.pathParsed = saved, savedFlag } + // Remove old path-hop index entries using old hops + old resolved path. + if len(oldHops) > 0 { + saved, savedFlag := tx.parsedPath, tx.pathParsed + savedRP := tx.ResolvedPath + tx.parsedPath, tx.pathParsed = oldHops, true + tx.ResolvedPath = oldResolvedPaths[txID] + removeTxFromPathHopIndex(s.byPathHop, tx) + tx.parsedPath, tx.pathParsed = saved, savedFlag + tx.ResolvedPath = savedRP + } // pickBestObservation already set pathParsed=false so // addTxToSubpathIndex will re-parse the new path. - if addTxToSubpathIndex(s.spIndex, tx) { + if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) { s.spTotalPaths++ } + addTxToPathHopIndex(s.byPathHop, tx) } } - // Check if any paths changed (used for both distance rebuild and cache invalidation). + // Check if any paths changed (used for distance update and cache invalidation). hasPathChanges := false + var changedTxs []*StoreTx for txID, tx := range updatedTxs { if tx.PathJSON != oldPaths[txID] { hasPathChanges = true - break + changedTxs = append(changedTxs, tx) } } - - // Mark distance index dirty if any paths changed (rebuild is debounced) - if hasPathChanges { - s.distDirty = true - } - if s.distDirty && time.Since(s.distLast) > distRebuildInterval { - s.buildDistanceIndex() - s.distDirty = false - s.distLast = time.Now() + if len(changedTxs) > 0 { + s.updateDistanceIndexForTxs(changedTxs) } if len(updatedTxs) > 0 { @@ -1641,28 +1717,14 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string] func (s *PacketStore) MaxTransmissionID() int { s.mu.RLock() defer s.mu.RUnlock() - - maxID := 0 - for id := range s.byTxID { - if id > maxID { - maxID = id - } - } - return maxID + return s.maxTxID } // MaxObservationID returns the highest observation ID in the store. func (s *PacketStore) MaxObservationID() int { s.mu.RLock() defer s.mu.RUnlock() - - maxID := 0 - for id := range s.byObsID { - if id > maxID { - maxID = id - } - } - return maxID + return s.maxObsID } // --- Internal filter/query helpers --- @@ -1684,68 +1746,114 @@ func (s *PacketStore) filterPackets(q PacketQuery) []*StoreTx { return s.transmissionsForObserver(q.Observer, nil) } - results := s.packets - + // Pre-compute filter parameters outside the hot loop. + var ( + filterType int + hasType bool + filterRoute int + hasRoute bool + filterHash string + hasSince = q.Since != "" + hasUntil = q.Until != "" + ) if q.Type != nil { - t := *q.Type - results = filterTxSlice(results, func(tx *StoreTx) bool { - return tx.PayloadType != nil && *tx.PayloadType == t - }) + hasType = true + filterType = *q.Type } if q.Route != nil { - r := *q.Route - results = filterTxSlice(results, func(tx *StoreTx) bool { - return tx.RouteType != nil && *tx.RouteType == r - }) + hasRoute = true + filterRoute = *q.Route } + if q.Hash != "" { + filterHash = strings.ToLower(q.Hash) + } + + // Pre-compute observer set for observer filter. + var observerSet map[string]bool if q.Observer != "" { - results = s.transmissionsForObserver(q.Observer, results) + ids := strings.Split(q.Observer, ",") + observerSet = make(map[string]bool, len(ids)) + for _, id := range ids { + observerSet[strings.TrimSpace(id)] = true + } } - if q.Hash != "" { - h := strings.ToLower(q.Hash) - results = filterTxSlice(results, func(tx *StoreTx) bool { - return tx.Hash == h - }) + + // Pre-compute region observer set. + var regionObservers map[string]bool + if q.Region != "" { + regionObservers = s.resolveRegionObservers(q.Region) + if len(regionObservers) == 0 { + return nil + } } - if q.Since != "" { - results = filterTxSlice(results, func(tx *StoreTx) bool { - return tx.FirstSeen > q.Since - }) + + // Pre-compute node filter parameters. + var nodePK string + hasNode := q.Node != "" + if hasNode { + nodePK = s.db.resolveNodePubkey(q.Node) } - if q.Until != "" { - results = filterTxSlice(results, func(tx *StoreTx) bool { - return tx.FirstSeen < q.Until - }) + + // Determine the source slice. Use index-based source when only node + // filter is active and an index exists. + source := s.packets + if hasNode && !hasType && !hasRoute && q.Observer == "" && + filterHash == "" && !hasSince && !hasUntil && q.Region == "" { + if indexed, ok := s.byNode[nodePK]; ok { + return indexed + } } - if q.Region != "" { - regionObservers := s.resolveRegionObservers(q.Region) - if len(regionObservers) > 0 { - results = filterTxSlice(results, func(tx *StoreTx) bool { - for _, obs := range tx.Observations { - if regionObservers[obs.ObserverID] { - return true - } + // Single-pass filter: apply all predicates in one scan. + results := filterTxSlice(source, func(tx *StoreTx) bool { + if hasType && (tx.PayloadType == nil || *tx.PayloadType != filterType) { + return false + } + if hasRoute && (tx.RouteType == nil || *tx.RouteType != filterRoute) { + return false + } + if filterHash != "" && tx.Hash != filterHash { + return false + } + if hasSince && tx.FirstSeen <= q.Since { + return false + } + if hasUntil && tx.FirstSeen >= q.Until { + return false + } + if observerSet != nil { + found := false + for _, obs := range tx.Observations { + if observerSet[obs.ObserverID] { + found = true + break } + } + if !found { return false - }) - } else { - results = nil + } } - } - if q.Node != "" { - pk := s.db.resolveNodePubkey(q.Node) - // Use node index if available - if indexed, ok := s.byNode[pk]; ok && results == nil { - results = indexed - } else { - results = filterTxSlice(results, func(tx *StoreTx) bool { - if tx.DecodedJSON == "" { - return false + if regionObservers != nil { + found := false + for _, obs := range tx.Observations { + if regionObservers[obs.ObserverID] { + found = true + break } - return strings.Contains(tx.DecodedJSON, pk) || strings.Contains(tx.DecodedJSON, q.Node) - }) + } + if !found { + return false + } } - } + if hasNode { + if tx.DecodedJSON == "" { + return false + } + if !strings.Contains(tx.DecodedJSON, nodePK) && !strings.Contains(tx.DecodedJSON, q.Node) { + return false + } + } + return true + }) return results } @@ -1787,15 +1895,42 @@ func (s *PacketStore) transmissionsForObserver(observerIDs string, from []*Store } // resolveRegionObservers returns a set of observer IDs for a given IATA region. +// Results are cached for 30 seconds to avoid repeated DB queries. +// Uses its own mutex (regionObsMu) so callers holding s.mu won't deadlock. func (s *PacketStore) resolveRegionObservers(region string) map[string]bool { + s.regionObsMu.Lock() + defer s.regionObsMu.Unlock() + + if s.regionObsCache != nil && time.Since(s.regionObsCacheTime) < 30*time.Second { + if m, ok := s.regionObsCache[region]; ok { + return m + } + return s.fetchAndCacheRegionObs(region) + } + // Cache expired — rebuild. + s.regionObsCache = make(map[string]map[string]bool) + s.regionObsCacheTime = time.Now() + + // Fetch for the requested region and cache it. + return s.fetchAndCacheRegionObs(region) +} + +// fetchAndCacheRegionObs fetches observer IDs for a region from the DB and stores in cache. +// Caller must hold regionObsMu. +func (s *PacketStore) fetchAndCacheRegionObs(region string) map[string]bool { + if m, ok := s.regionObsCache[region]; ok { + return m + } ids, err := s.db.GetObserverIdsForRegion(region) if err != nil || len(ids) == 0 { + s.regionObsCache[region] = nil return nil } m := make(map[string]bool, len(ids)) for _, id := range ids { m[id] = true } + s.regionObsCache[region] = m return m } @@ -1832,7 +1967,7 @@ func (s *PacketStore) enrichObs(obs *StoreObs) map[string]interface{} { // --- Conversion helpers --- // txToMap converts a StoreTx to the map shape matching scanTransmissionRow output. -func txToMap(tx *StoreTx) map[string]interface{} { +func txToMap(tx *StoreTx, includeObservations ...bool) map[string]interface{} { m := map[string]interface{}{ "id": tx.ID, "raw_hex": strOrNil(tx.RawHex), @@ -1859,25 +1994,27 @@ func txToMap(tx *StoreTx) map[string]interface{} { } else { m["_parsedPath"] = nil } - // Include observations for expand=observations support (stripped by handler when not requested) - obs := make([]map[string]interface{}, 0, len(tx.Observations)) - for _, o := range tx.Observations { - om := map[string]interface{}{ - "id": o.ID, - "observer_id": strOrNil(o.ObserverID), - "observer_name": strOrNil(o.ObserverName), - "snr": floatPtrOrNil(o.SNR), - "rssi": floatPtrOrNil(o.RSSI), - "path_json": strOrNil(o.PathJSON), - "timestamp": strOrNil(o.Timestamp), - "direction": strOrNil(o.Direction), - } - if o.ResolvedPath != nil { - om["resolved_path"] = o.ResolvedPath - } - obs = append(obs, om) - } - m["observations"] = obs + // Only build observation sub-maps when caller requests them (avoids allocations that get stripped) + if len(includeObservations) > 0 && includeObservations[0] { + obs := make([]map[string]interface{}, 0, len(tx.Observations)) + for _, o := range tx.Observations { + om := map[string]interface{}{ + "id": o.ID, + "observer_id": strOrNil(o.ObserverID), + "observer_name": strOrNil(o.ObserverName), + "snr": floatPtrOrNil(o.SNR), + "rssi": floatPtrOrNil(o.RSSI), + "path_json": strOrNil(o.PathJSON), + "timestamp": strOrNil(o.Timestamp), + "direction": strOrNil(o.Direction), + } + if o.ResolvedPath != nil { + om["resolved_path"] = o.ResolvedPath + } + obs = append(obs, om) + } + m["observations"] = obs + } return m } @@ -1954,6 +2091,12 @@ func txGetParsedPath(tx *StoreTx) []string { // increments their counts in the index. Returns true if the tx contributed // (path had ≥ 2 hops). func addTxToSubpathIndex(idx map[string]int, tx *StoreTx) bool { + return addTxToSubpathIndexFull(idx, nil, tx) +} + +// addTxToSubpathIndexFull is like addTxToSubpathIndex but also appends +// tx to txIdx for each subpath key (if txIdx is non-nil). +func addTxToSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool { hops := txGetParsedPath(tx) if len(hops) < 2 { return false @@ -1961,8 +2104,11 @@ func addTxToSubpathIndex(idx map[string]int, tx *StoreTx) bool { maxL := min(8, len(hops)) for l := 2; l <= maxL; l++ { for start := 0; start <= len(hops)-l; start++ { - key := strings.Join(hops[start:start+l], ",") + key := strings.ToLower(strings.Join(hops[start:start+l], ",")) idx[key]++ + if txIdx != nil { + txIdx[key] = append(txIdx[key], tx) + } } } return true @@ -1972,6 +2118,12 @@ func addTxToSubpathIndex(idx map[string]int, tx *StoreTx) bool { // decrements counts for all raw subpaths of tx. Returns true if the tx // had a path. func removeTxFromSubpathIndex(idx map[string]int, tx *StoreTx) bool { + return removeTxFromSubpathIndexFull(idx, nil, tx) +} + +// removeTxFromSubpathIndexFull is like removeTxFromSubpathIndex but also +// removes tx from txIdx for each subpath key (if txIdx is non-nil). +func removeTxFromSubpathIndexFull(idx map[string]int, txIdx map[string][]*StoreTx, tx *StoreTx) bool { hops := txGetParsedPath(tx) if len(hops) < 2 { return false @@ -1979,11 +2131,23 @@ func removeTxFromSubpathIndex(idx map[string]int, tx *StoreTx) bool { maxL := min(8, len(hops)) for l := 2; l <= maxL; l++ { for start := 0; start <= len(hops)-l; start++ { - key := strings.Join(hops[start:start+l], ",") + key := strings.ToLower(strings.Join(hops[start:start+l], ",")) idx[key]-- if idx[key] <= 0 { delete(idx, key) } + if txIdx != nil { + txs := txIdx[key] + for i, t := range txs { + if t == tx { + txIdx[key] = append(txs[:i], txs[i+1:]...) + break + } + } + if len(txIdx[key]) == 0 { + delete(txIdx, key) + } + } } } return true @@ -1993,9 +2157,10 @@ func removeTxFromSubpathIndex(idx map[string]int, tx *StoreTx) bool { // Must be called with s.mu held. func (s *PacketStore) buildSubpathIndex() { s.spIndex = make(map[string]int, 4096) + s.spTxIndex = make(map[string][]*StoreTx, 4096) s.spTotalPaths = 0 for _, tx := range s.packets { - if addTxToSubpathIndex(s.spIndex, tx) { + if addTxToSubpathIndexFull(s.spIndex, s.spTxIndex, tx) { s.spTotalPaths++ } } @@ -2003,6 +2168,138 @@ func (s *PacketStore) buildSubpathIndex() { len(s.spIndex), s.spTotalPaths) } +// buildPathHopIndex scans all packets and populates byPathHop. +// Must be called with s.mu held. +func (s *PacketStore) buildPathHopIndex() { + s.byPathHop = make(map[string][]*StoreTx, 4096) + for _, tx := range s.packets { + addTxToPathHopIndex(s.byPathHop, tx) + } + log.Printf("[store] Built path-hop index: %d unique keys", len(s.byPathHop)) +} + +// addTxToPathHopIndex indexes a transmission under each unique hop key +// (raw lowercase hop + resolved full pubkey from ResolvedPath). +func addTxToPathHopIndex(idx map[string][]*StoreTx, tx *StoreTx) { + hops := txGetParsedPath(tx) + if len(hops) == 0 { + return + } + seen := make(map[string]bool, len(hops)*2) + for i, hop := range hops { + key := strings.ToLower(hop) + if !seen[key] { + seen[key] = true + idx[key] = append(idx[key], tx) + } + // Also index by resolved pubkey if available + if tx.ResolvedPath != nil && i < len(tx.ResolvedPath) && tx.ResolvedPath[i] != nil { + pk := *tx.ResolvedPath[i] + if !seen[pk] { + seen[pk] = true + idx[pk] = append(idx[pk], tx) + } + } + } +} + +// removeTxFromPathHopIndex removes a transmission from all its path-hop index entries. +func removeTxFromPathHopIndex(idx map[string][]*StoreTx, tx *StoreTx) { + hops := txGetParsedPath(tx) + if len(hops) == 0 { + return + } + seen := make(map[string]bool, len(hops)*2) + for i, hop := range hops { + key := strings.ToLower(hop) + if !seen[key] { + seen[key] = true + removeTxFromSlice(idx, key, tx) + } + if tx.ResolvedPath != nil && i < len(tx.ResolvedPath) && tx.ResolvedPath[i] != nil { + pk := *tx.ResolvedPath[i] + if !seen[pk] { + seen[pk] = true + removeTxFromSlice(idx, pk, tx) + } + } + } +} + +// removeTxFromSlice removes tx from idx[key] by ID, deleting the key if empty. +func removeTxFromSlice(idx map[string][]*StoreTx, key string, tx *StoreTx) { + list := idx[key] + for i, t := range list { + if t.ID == tx.ID { + idx[key] = append(list[:i], list[i+1:]...) + break + } + } + if len(idx[key]) == 0 { + delete(idx, key) + } +} + +// updateDistanceIndexForTxs removes old distance records for the given +// transmissions and recomputes them. Builds lookup maps once, amortising the +// cost across all changed txs in a single ingest cycle. Must be called with +// s.mu held. +func (s *PacketStore) updateDistanceIndexForTxs(txs []*StoreTx) { + // Remove old records for all changed txs first. + removeSet := make(map[*StoreTx]bool, len(txs)) + for _, tx := range txs { + removeSet[tx] = true + } + n := 0 + for _, r := range s.distHops { + if !removeSet[r.tx] { + s.distHops[n] = r + n++ + } + } + s.distHops = s.distHops[:n] + n = 0 + for _, r := range s.distPaths { + if !removeSet[r.tx] { + s.distPaths[n] = r + n++ + } + } + s.distPaths = s.distPaths[:n] + + // Build lookup maps once. + allNodes, pm := s.getCachedNodesAndPM() + nodeByPk := make(map[string]*nodeInfo, len(allNodes)) + repeaterSet := make(map[string]bool) + for i := range allNodes { + nd := &allNodes[i] + nodeByPk[nd.PublicKey] = nd + if strings.Contains(strings.ToLower(nd.Role), "repeater") { + repeaterSet[nd.PublicKey] = true + } + } + hopCache := make(map[string]*nodeInfo) + resolveHop := func(hop string) *nodeInfo { + if cached, ok := hopCache[hop]; ok { + return cached + } + r, _, _ := pm.resolveWithContext(hop, nil, s.graph) + hopCache[hop] = r + return r + } + + // Recompute distance records for each changed tx. + for _, tx := range txs { + txHops, txPath := computeDistancesForTx(tx, nodeByPk, repeaterSet, resolveHop) + if len(txHops) > 0 { + s.distHops = append(s.distHops, txHops...) + } + if txPath != nil { + s.distPaths = append(s.distPaths, *txPath) + } + } +} + // buildDistanceIndex precomputes haversine distances for all packets. // Must be called with s.mu held (Lock). func (s *PacketStore) buildDistanceIndex() { @@ -2046,9 +2343,17 @@ func (s *PacketStore) buildDistanceIndex() { len(s.distHops), len(s.distPaths)) } -// estimatedMemoryMB returns estimated memory usage of the packet store. +// estimatedMemoryMB returns current Go heap allocation in MB. +// Uses runtime.ReadMemStats so it accounts for all data structures +// (distHops, distPaths, spIndex, map overhead) not just packets/observations. +// In tests, memoryEstimator can be set to inject a deterministic value. func (s *PacketStore) estimatedMemoryMB() float64 { - return float64(len(s.packets)*5120+s.totalObs*500) / 1048576.0 + if s.memoryEstimator != nil { + return s.memoryEstimator() + } + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + return float64(ms.HeapAlloc) / 1048576.0 } // EvictStale removes packets older than the retention window and/or exceeding @@ -2069,30 +2374,24 @@ func (s *PacketStore) EvictStale() int { } } - // Memory-based eviction: if still over budget, trim more from head + // Memory-based eviction: if heap exceeds budget, trim proportionally from head. + // All major data structures (distHops, distPaths, spIndex) scale with packet count, + // so evicting a fraction of packets frees roughly the same fraction of total heap. + // A 10% buffer avoids immediately re-triggering on the next ingest cycle. if s.maxMemoryMB > 0 { - for cutoffIdx < len(s.packets) && s.estimatedMemoryMB() > float64(s.maxMemoryMB) { - // Estimate how many more to evict: rough binary approach - overMB := s.estimatedMemoryMB() - float64(s.maxMemoryMB) - // ~5KB per packet, so overMB * 1024*1024 / 5120 packets - extra := int(overMB * 1048576.0 / 5120.0) - if extra < 100 { - extra = 100 - } - cutoffIdx += extra - if cutoffIdx > len(s.packets) { - cutoffIdx = len(s.packets) + currentMB := s.estimatedMemoryMB() + if currentMB > float64(s.maxMemoryMB) && len(s.packets) > 0 { + fractionToKeep := (float64(s.maxMemoryMB) / currentMB) * 0.9 + keepCount := int(float64(len(s.packets)) * fractionToKeep) + if keepCount < 0 { + keepCount = 0 } - // Recalculate estimated memory with fewer packets - // (we haven't actually removed yet, so simulate) - remainingPkts := len(s.packets) - cutoffIdx - remainingObs := s.totalObs - for _, tx := range s.packets[:cutoffIdx] { - remainingObs -= len(tx.Observations) + newCutoff := len(s.packets) - keepCount + if newCutoff > cutoffIdx { + cutoffIdx = newCutoff } - estMB := float64(remainingPkts*5120+remainingObs*500) / 1048576.0 - if estMB <= float64(s.maxMemoryMB) { - break + if cutoffIdx > len(s.packets) { + cutoffIdx = len(s.packets) } } } @@ -2107,47 +2406,36 @@ func (s *PacketStore) EvictStale() int { evicting := s.packets[:cutoffIdx] evictedObs := 0 - // Remove from all indexes + // Build sets of evicted IDs for batch removal from secondary indexes + evictedTxIDs := make(map[int]struct{}, cutoffIdx) + evictedObsIDs := make(map[int]struct{}, cutoffIdx*2) + // Track which observer IDs and payload types need filtering + affectedObservers := make(map[string]struct{}) + affectedPayloadTypes := make(map[int]struct{}) + affectedNodes := make(map[string]struct{}) + + // First pass: remove from primary indexes (byHash, byTxID, byObsID), + // collect IDs for batch secondary index cleanup, and handle non-index work for _, tx := range evicting { delete(s.byHash, tx.Hash) delete(s.byTxID, tx.ID) + evictedTxIDs[tx.ID] = struct{}{} - // Remove observations from indexes for _, obs := range tx.Observations { delete(s.byObsID, obs.ID) - // Remove from byObserver + evictedObsIDs[obs.ID] = struct{}{} if obs.ObserverID != "" { - obsList := s.byObserver[obs.ObserverID] - for i, o := range obsList { - if o.ID == obs.ID { - s.byObserver[obs.ObserverID] = append(obsList[:i], obsList[i+1:]...) - break - } - } - if len(s.byObserver[obs.ObserverID]) == 0 { - delete(s.byObserver, obs.ObserverID) - } + affectedObservers[obs.ObserverID] = struct{}{} } evictedObs++ } - // Remove from byPayloadType s.untrackAdvertPubkey(tx) if tx.PayloadType != nil { - pt := *tx.PayloadType - ptList := s.byPayloadType[pt] - for i, t := range ptList { - if t.ID == tx.ID { - s.byPayloadType[pt] = append(ptList[:i], ptList[i+1:]...) - break - } - } - if len(s.byPayloadType[pt]) == 0 { - delete(s.byPayloadType, pt) - } + affectedPayloadTypes[*tx.PayloadType] = struct{}{} } - // Remove from byNode and nodeHashes + // Remove from nodeHashes and collect affected node keys if tx.DecodedJSON != "" { var decoded map[string]interface{} if json.Unmarshal([]byte(tx.DecodedJSON), &decoded) == nil { @@ -2159,24 +2447,64 @@ func (s *PacketStore) EvictStale() int { delete(s.nodeHashes, v) } } - // Remove tx from byNode - nodeList := s.byNode[v] - for i, t := range nodeList { - if t.ID == tx.ID { - s.byNode[v] = append(nodeList[:i], nodeList[i+1:]...) - break - } - } - if len(s.byNode[v]) == 0 { - delete(s.byNode, v) - } + affectedNodes[v] = struct{}{} } } } } // Remove from subpath index - removeTxFromSubpathIndex(s.spIndex, tx) + removeTxFromSubpathIndexFull(s.spIndex, s.spTxIndex, tx) + // Remove from path-hop index + removeTxFromPathHopIndex(s.byPathHop, tx) + } + + // Batch-remove from byObserver: single pass per affected observer slice + for obsID := range affectedObservers { + obsList := s.byObserver[obsID] + filtered := obsList[:0] + for _, o := range obsList { + if _, evicted := evictedObsIDs[o.ID]; !evicted { + filtered = append(filtered, o) + } + } + if len(filtered) == 0 { + delete(s.byObserver, obsID) + } else { + s.byObserver[obsID] = filtered + } + } + + // Batch-remove from byPayloadType: single pass per affected type slice + for pt := range affectedPayloadTypes { + ptList := s.byPayloadType[pt] + filtered := ptList[:0] + for _, t := range ptList { + if _, evicted := evictedTxIDs[t.ID]; !evicted { + filtered = append(filtered, t) + } + } + if len(filtered) == 0 { + delete(s.byPayloadType, pt) + } else { + s.byPayloadType[pt] = filtered + } + } + + // Batch-remove from byNode: single pass per affected node slice + for nodeKey := range affectedNodes { + nodeList := s.byNode[nodeKey] + filtered := nodeList[:0] + for _, t := range nodeList { + if _, evicted := evictedTxIDs[t.ID]; !evicted { + filtered = append(filtered, t) + } + } + if len(filtered) == 0 { + delete(s.byNode, nodeKey) + } else { + s.byNode[nodeKey] = filtered + } } // Remove from distance indexes — filter out records referencing evicted txs @@ -2207,9 +2535,7 @@ func (s *PacketStore) EvictStale() int { evictCount := cutoffIdx atomic.AddInt64(&s.evicted, int64(evictCount)) - freedMB := float64(evictCount*5120+evictedObs*500) / 1048576.0 - log.Printf("[store] Evicted %d packets older than %.0fh (freed ~%.1fMB estimated)", - evictCount, s.retentionHours, freedMB) + log.Printf("[store] Evicted %d packets (%d obs)", evictCount, evictedObs) // Eviction removes data — all caches may be affected s.invalidateCachesFor(cacheInvalidation{eviction: true}) @@ -3104,19 +3430,6 @@ func (s *PacketStore) computeAnalyticsRF(region string) map[string]interface{} { } // Stats helpers - sortedF64 := func(arr []float64) []float64 { - c := make([]float64, len(arr)) - copy(c, arr) - sort.Float64s(c) - return c - } - medianF64 := func(arr []float64) float64 { - s := sortedF64(arr) - if len(s) == 0 { - return 0 - } - return s[len(s)/2] - } stddevF64 := func(arr []float64, avg float64) float64 { if len(arr) == 0 { return 0 @@ -3177,6 +3490,11 @@ func (s *PacketStore) computeAnalyticsRF(region string) map[string]interface{} { return m } + // Sort snrVals and rssiVals once; reuse sorted order for min/max/median + // instead of copying+sorting per stat call (#366). + sort.Float64s(snrVals) + sort.Float64s(rssiVals) + snrAvg := 0.0 if len(snrVals) > 0 { sum := 0.0 @@ -3369,19 +3687,20 @@ func (s *PacketStore) computeAnalyticsRF(region string) map[string]interface{} { avgPktSize = sum / len(packetSizes) } + // snrVals and rssiVals are already sorted — read min/max/median directly. snrStats := map[string]interface{}{"min": 0.0, "max": 0.0, "avg": 0.0, "median": 0.0, "stddev": 0.0} if len(snrVals) > 0 { snrStats = map[string]interface{}{ - "min": minF64(snrVals), "max": maxF64(snrVals), - "avg": snrAvg, "median": medianF64(snrVals), + "min": snrVals[0], "max": snrVals[len(snrVals)-1], + "avg": snrAvg, "median": snrVals[len(snrVals)/2], "stddev": stddevF64(snrVals, snrAvg), } } rssiStats := map[string]interface{}{"min": 0.0, "max": 0.0, "avg": 0.0, "median": 0.0, "stddev": 0.0} if len(rssiVals) > 0 { rssiStats = map[string]interface{}{ - "min": minF64(rssiVals), "max": maxF64(rssiVals), - "avg": rssiAvg, "median": medianF64(rssiVals), + "min": rssiVals[0], "max": rssiVals[len(rssiVals)-1], + "avg": rssiAvg, "median": rssiVals[len(rssiVals)/2], "stddev": stddevF64(rssiVals, rssiAvg), } } @@ -3445,14 +3764,27 @@ type prefixMap struct { m map[string][]nodeInfo } +// maxPrefixLen caps prefix map entries. MeshCore path hops use 2–6 char +// prefixes; 8 gives comfortable headroom while cutting map size from ~31×N +// entries to ~7×N (+ 1 full-key entry per node for exact-match lookups). +const maxPrefixLen = 8 + func buildPrefixMap(nodes []nodeInfo) *prefixMap { - pm := &prefixMap{m: make(map[string][]nodeInfo, len(nodes)*10)} + pm := &prefixMap{m: make(map[string][]nodeInfo, len(nodes)*(maxPrefixLen+1))} for _, n := range nodes { pk := strings.ToLower(n.PublicKey) - for l := 2; l <= len(pk); l++ { + maxLen := maxPrefixLen + if maxLen > len(pk) { + maxLen = len(pk) + } + for l := 2; l <= maxLen; l++ { pfx := pk[:l] pm.m[pfx] = append(pm.m[pfx], n) } + // Always add full pubkey so exact-match lookups work. + if len(pk) > maxPrefixLen { + pm.m[pk] = append(pm.m[pk], n) + } } return pm } @@ -3480,6 +3812,15 @@ func (s *PacketStore) getCachedNodesAndPM() ([]nodeInfo, *prefixMap) { return nodes, pm } +// InvalidateNodeCache forces the next getCachedNodesAndPM call to rebuild. +func (s *PacketStore) InvalidateNodeCache() { + s.cacheMu.Lock() + s.nodeCache = nil + s.nodePM = nil + s.nodeCacheTime = time.Time{} + s.cacheMu.Unlock() +} + func (pm *prefixMap) resolve(hop string) *nodeInfo { h := strings.ToLower(hop) candidates := pm.m[h] @@ -4656,7 +4997,7 @@ func (s *PacketStore) computeHashCollisions(region string) map[string]interface{ // Inconsistent nodes var inconsistentNodes []collisionNode for _, cn := range allCNodes { - if cn.HashSizeInconsistent { + if cn.HashSizeInconsistent && (cn.Role == "repeater" || cn.Role == "room_server") { inconsistentNodes = append(inconsistentNodes, cn) } } @@ -4866,14 +5207,23 @@ func (s *PacketStore) GetNodeHashSizeInfo() map[string]*hashSizeNodeInfo { } // computeNodeHashSizeInfo scans advert packets to compute per-node hash size data. +// Only adverts from the last 7 days are considered so that legitimate config +// changes during testing don't create permanent false positives. func (s *PacketStore) computeNodeHashSizeInfo() map[string]*hashSizeNodeInfo { s.mu.RLock() defer s.mu.RUnlock() info := make(map[string]*hashSizeNodeInfo) + cutoff := time.Now().UTC().Add(-7 * 24 * time.Hour).Format("2006-01-02T15:04:05.000Z") + adverts := s.byPayloadType[4] for _, tx := range adverts { + // Skip adverts older than 7 days to avoid false positives from + // historical config changes during testing. + if tx.FirstSeen != "" && tx.FirstSeen < cutoff { + continue + } if tx.RawHex == "" || tx.DecodedJSON == "" { continue } @@ -5288,24 +5638,24 @@ func (s *PacketStore) GetNodeAnalytics(pubkey string, days int) (*NodeAnalyticsR for _, tx := range indexed { hashSet[tx.Hash] = true } - var allPkts []*StoreTx + var packets []*StoreTx if name != "" { for _, tx := range s.packets { + if tx.FirstSeen <= fromISO { + continue // Skip old packets early before expensive string matching + } if hashSet[tx.Hash] { - allPkts = append(allPkts, tx) + packets = append(packets, tx) } else if tx.DecodedJSON != "" && (strings.Contains(tx.DecodedJSON, name) || strings.Contains(tx.DecodedJSON, pubkey)) { - allPkts = append(allPkts, tx) + packets = append(packets, tx) } } } else { - allPkts = indexed - } - - // Filter by time range - var packets []*StoreTx - for _, p := range allPkts { - if p.FirstSeen > fromISO { - packets = append(packets, p) + // Filter indexed packets by time range + for _, p := range indexed { + if p.FirstSeen > fromISO { + packets = append(packets, p) + } } } @@ -5655,6 +6005,111 @@ func (s *PacketStore) GetAnalyticsSubpaths(region string, minLen, maxLen, limit return result } +// GetAnalyticsSubpathsBulk returns multiple length-range buckets from a single +// scan of the subpath index, avoiding repeated iterations. +func (s *PacketStore) GetAnalyticsSubpathsBulk(region string, groups []subpathGroup) []map[string]interface{} { + // For region queries or when there are few groups, fall back to individual calls + // which benefit from per-key caching. + if region != "" { + results := make([]map[string]interface{}, len(groups)) + for i, g := range groups { + results[i] = s.GetAnalyticsSubpaths(region, g.MinLen, g.MaxLen, g.Limit) + } + return results + } + + // Check if all groups are cached. + allCached := true + cachedResults := make([]map[string]interface{}, len(groups)) + s.cacheMu.Lock() + for i, g := range groups { + cacheKey := fmt.Sprintf("|%d|%d|%d", g.MinLen, g.MaxLen, g.Limit) + if cached, ok := s.subpathCache[cacheKey]; ok && time.Now().Before(cached.expiresAt) { + cachedResults[i] = cached.data + } else { + allCached = false + break + } + } + if allCached { + s.cacheHits += int64(len(groups)) + s.cacheMu.Unlock() + return cachedResults + } + s.cacheMu.Unlock() + + // Single scan: bucket by hop length into per-group accumulators. + s.mu.RLock() + _, pm := s.getCachedNodesAndPM() + hopCache := make(map[string]*nodeInfo) + resolveHop := func(hop string) string { + if cached, ok := hopCache[hop]; ok { + if cached != nil { + return cached.Name + } + return hop + } + r, _, _ := pm.resolveWithContext(hop, nil, s.graph) + hopCache[hop] = r + if r != nil { + return r.Name + } + return hop + } + + perGroup := make([]map[string]*subpathAccum, len(groups)) + for i := range groups { + perGroup[i] = make(map[string]*subpathAccum) + } + + for rawKey, count := range s.spIndex { + hops := strings.Split(rawKey, ",") + hopLen := len(hops) + + // Resolve hop names once, reuse across groups. + var named []string + var namedKey string + resolved := false + + for gi, g := range groups { + if hopLen < g.MinLen || hopLen > g.MaxLen { + continue + } + if !resolved { + named = make([]string, hopLen) + for i, h := range hops { + named[i] = resolveHop(h) + } + namedKey = strings.Join(named, " → ") + resolved = true + } + entry := perGroup[gi][namedKey] + if entry == nil { + entry = &subpathAccum{raw: rawKey} + perGroup[gi][namedKey] = entry + } + entry.count += count + } + } + totalPaths := s.spTotalPaths + s.mu.RUnlock() + + results := make([]map[string]interface{}, len(groups)) + for i, g := range groups { + results[i] = s.rankSubpaths(perGroup[i], totalPaths, g.Limit) + } + + // Cache individual results for future single-key lookups too. + s.cacheMu.Lock() + for i, g := range groups { + cacheKey := fmt.Sprintf("|%d|%d|%d", g.MinLen, g.MaxLen, g.Limit) + s.subpathCache[cacheKey] = &cachedResult{data: results[i], expiresAt: time.Now().Add(s.rfCacheTTL)} + } + s.cacheMu.Unlock() + + return results +} + // subpathAccum holds a running count for a single named subpath. type subpathAccum struct { count int @@ -5824,40 +6279,21 @@ func (s *PacketStore) GetSubpathDetail(rawHops []string) map[string]interface{} nodes[i] = entry } + // Build the subpath key the same way the index does (lowercase, comma-joined) + spKey := strings.ToLower(strings.Join(rawHops, ",")) + + // Direct lookup instead of scanning all packets + matchedTxs := s.spTxIndex[spKey] + hourBuckets := make([]int, 24) var snrSum, rssiSum float64 var snrCount, rssiCount int observers := map[string]int{} parentPaths := map[string]int{} - var matchCount int + matchCount := len(matchedTxs) var firstSeen, lastSeen string - for _, tx := range s.packets { - hops := txGetParsedPath(tx) - if len(hops) < len(rawHops) { - continue - } - - // Check if rawHops appears as contiguous subsequence - found := false - for i := 0; i <= len(hops)-len(rawHops); i++ { - match := true - for j := 0; j < len(rawHops); j++ { - if !strings.EqualFold(hops[i+j], rawHops[j]) { - match = false - break - } - } - if match { - found = true - break - } - } - if !found { - continue - } - - matchCount++ + for _, tx := range matchedTxs { ts := tx.FirstSeen if ts != "" { if firstSeen == "" || ts < firstSeen { @@ -5866,7 +6302,6 @@ func (s *PacketStore) GetSubpathDetail(rawHops []string) map[string]interface{} if lastSeen == "" || ts > lastSeen { lastSeen = ts } - // Parse hour from timestamp for hourly distribution t, err := time.Parse(time.RFC3339, ts) if err != nil { t, err = time.Parse("2006-01-02 15:04:05", ts) @@ -5888,6 +6323,7 @@ func (s *PacketStore) GetSubpathDetail(rawHops []string) map[string]interface{} } // Full parent path (resolved) + hops := txGetParsedPath(tx) resolved := make([]string, len(hops)) for i, h := range hops { r, _, _ := pm.resolveWithContext(h, nil, s.graph) diff --git a/public/analytics.js b/public/analytics.js index fb37cb9e..f45fe726 100644 --- a/public/analytics.js +++ b/public/analytics.js @@ -989,7 +989,7 @@

⚠️ Inconsistent Hash Sizes

↑ top
-

Nodes sending adverts with varying hash sizes. Caused by a bug where automatic adverts ignored the configured multibyte path setting. Fixed in repeater v1.14.1.

+

Repeaters and room servers sending adverts with varying hash sizes in the last 7 days. Originally caused by a firmware bug where automatic adverts ignored the configured multibyte path setting, fixed in repeater v1.14.1. Companion nodes are excluded.

Loading…
@@ -1398,12 +1398,8 @@ el.innerHTML = '
Analyzing route patterns…
'; try { const rq = RegionFilter.regionQueryString(); - const [d2, d3, d4, d5] = await Promise.all([ - api('/analytics/subpaths?minLen=2&maxLen=2&limit=50' + rq, { ttl: CLIENT_TTL.analyticsRF }), - api('/analytics/subpaths?minLen=3&maxLen=3&limit=30' + rq, { ttl: CLIENT_TTL.analyticsRF }), - api('/analytics/subpaths?minLen=4&maxLen=4&limit=20' + rq, { ttl: CLIENT_TTL.analyticsRF }), - api('/analytics/subpaths?minLen=5&maxLen=8&limit=15' + rq, { ttl: CLIENT_TTL.analyticsRF }) - ]); + const bulk = await api('/analytics/subpaths-bulk?groups=2-2:50,3-3:30,4-4:20,5-8:15' + rq, { ttl: CLIENT_TTL.analyticsRF }); + const [d2, d3, d4, d5] = bulk.results; function renderTable(data, title) { if (!data.subpaths.length) return `

${title}

No data
`; @@ -1602,10 +1598,9 @@ el.innerHTML = '
Loading node analytics…
'; try { const rq = RegionFilter.regionQueryString(); - const [nodesResp, bulkHealth, netStatus] = await Promise.all([ - api('/nodes?limit=200&sortBy=lastSeen' + rq, { ttl: CLIENT_TTL.nodeList }), - api('/nodes/bulk-health?limit=50' + rq, { ttl: CLIENT_TTL.analyticsRF }), - api('/nodes/network-status' + (rq ? '?' + rq.slice(1) : ''), { ttl: CLIENT_TTL.analyticsRF }) + const [nodesResp, bulkHealth] = await Promise.all([ + api('/nodes?limit=10000&sortBy=lastSeen' + rq, { ttl: CLIENT_TTL.nodeList }), + api('/nodes/bulk-health?limit=50' + rq, { ttl: CLIENT_TTL.analyticsRF }) ]); const nodes = nodesResp.nodes || nodesResp; const myNodes = JSON.parse(localStorage.getItem('meshcore-my-nodes') || '[]'); @@ -1622,8 +1617,22 @@ const byObservers = [...enriched].sort((a, b) => (b.health.observers?.length || 0) - (a.health.observers?.length || 0)); const byRecent = [...enriched].filter(n => n.health.stats.lastHeard).sort((a, b) => new Date(b.health.stats.lastHeard) - new Date(a.health.stats.lastHeard)); - // Use server-computed status across ALL nodes - const { active, degraded, silent, total: totalNodes, roleCounts } = netStatus; + // Compute network status client-side from loaded nodes using shared getHealthThresholds() + const now = Date.now(); + let active = 0, degraded = 0, silent = 0; + nodes.forEach(function(n) { + const role = n.role || 'unknown'; + const th = getHealthThresholds(role); + const lastMs = n.last_heard ? new Date(n.last_heard).getTime() + : n.last_seen ? new Date(n.last_seen).getTime() + : 0; + const age = lastMs ? (now - lastMs) : Infinity; + if (age < th.degradedMs) active++; + else if (age < th.silentMs) degraded++; + else silent++; + }); + const totalNodes = nodesResp.total || nodes.length; + const roleCounts = nodesResp.counts || {}; function nodeLink(n) { return `${esc(n.name || n.public_key.slice(0, 12))}`; diff --git a/public/live.js b/public/live.js index 50562212..1b8fa2ab 100644 --- a/public/live.js +++ b/public/live.js @@ -540,6 +540,8 @@ clearTimeout(entry.timer); } propagationBuffer.clear(); + // Batch-update timeline once on restore instead of per-packet while hidden + updateTimeline(); } }); @@ -564,7 +566,6 @@ if (VCR.mode === 'LIVE') { // Skip animations when tab is backgrounded — just buffer for VCR timeline if (_tabHidden) { - updateTimeline(); return; } if (realisticPropagation && pkt.hash) { @@ -1697,20 +1698,13 @@ async function replayRecent() { try { - const resp = await fetch('/api/packets?limit=8&groupByHash=true'); + // Single bulk fetch with expand=observations — no N+1 calls + const resp = await fetch('/api/packets?limit=8&expand=observations'); const data = await resp.json(); const groups = (data.packets || []).reverse(); - // Fetch all observations first, then stagger rendering - const allGroups = []; - for (let i = 0; i < groups.length; i++) { - const group = groups[i]; - let observations = []; - try { - const detail = await fetch('/api/packets/' + encodeURIComponent(group.hash)); - const detailData = await detail.json(); - observations = detailData.observations || []; - } catch {} + const allGroups = groups.map((group) => { + const observations = group.observations || []; const livePackets = observations.map(obs => { const livePkt = dbPacketToLive(Object.assign({}, group, obs, { @@ -1729,8 +1723,8 @@ } livePackets.forEach(lp => VCR.buffer.push({ ts: lp._ts, pkt: lp })); - allGroups.push(livePackets); - } + return livePackets; + }); // Render with real timing gaps between packets // Sort by earliest timestamp diff --git a/public/map.js b/public/map.js index e70a8faf..7fdb686e 100644 --- a/public/map.js +++ b/public/map.js @@ -9,7 +9,7 @@ let nodes = []; let targetNodeKey = null; let observers = []; - let filters = { repeater: true, companion: true, room: true, sensor: true, observer: true, lastHeard: '30d', neighbors: false, clusters: false, hashLabels: localStorage.getItem('meshcore-map-hash-labels') !== 'false', statusFilter: localStorage.getItem('meshcore-map-status-filter') || 'all' }; + let filters = { repeater: true, companion: true, room: true, sensor: true, observer: true, lastHeard: '30d', neighbors: false, clusters: false, hashLabels: localStorage.getItem('meshcore-map-hash-labels') !== 'false', statusFilter: localStorage.getItem('meshcore-map-status-filter') || 'all', byteSize: localStorage.getItem('meshcore-map-byte-filter') || 'all' }; let selectedReferenceNode = null; // pubkey of the reference node for neighbor filtering let neighborPubkeys = null; // Set of pubkeys that are direct neighbors of selected node let wsHandler = null; @@ -94,6 +94,15 @@ Node Types
+
+ Byte Size +
+ + + + +
+
Display @@ -181,11 +190,17 @@ }); map.on('zoomend', () => { - if (!_renderingMarkers) renderMarkers(); + clearTimeout(_zoomResizeTimer); + _zoomResizeTimer = setTimeout(() => { + if (!_renderingMarkers) _repositionMarkers(); + }, 150); }); map.on('resize', () => { - if (!_renderingMarkers) renderMarkers(); + clearTimeout(_zoomResizeTimer); + _zoomResizeTimer = setTimeout(() => { + if (!_renderingMarkers) _repositionMarkers(); + }, 150); }); markerLayer = L.layerGroup().addTo(map); @@ -262,6 +277,16 @@ }); }); + // Byte size filter buttons + document.querySelectorAll('#mcByteFilter .btn').forEach(btn => { + btn.addEventListener('click', () => { + filters.byteSize = btn.dataset.byte; + localStorage.setItem('meshcore-map-byte-filter', filters.byteSize); + document.querySelectorAll('#mcByteFilter .btn').forEach(b => b.classList.toggle('active', b.dataset.byte === filters.byteSize)); + renderMarkers(); + }); + }); + // Geo filter overlay (async function () { try { @@ -612,6 +637,8 @@ var _renderingMarkers = false; var _lastDeconflictZoom = null; + var _currentMarkerData = []; // stored marker data for zoom-only repositioning + var _zoomResizeTimer = null; function deconflictLabels(markers, mapRef) { const placed = []; @@ -662,6 +689,62 @@ } } + /** + * Create, update, or remove the offset indicator (dashed line + dot at true GPS position) + * for a deconflicted marker. Shared by _renderMarkersInner and _repositionMarkers. + * @param {Object} m - marker data object with latLng, adjustedLatLng, offset, _leafletLine, _leafletDot + * @param {L.LayerGroup} layer - layer group to add/remove indicators from + */ + function _updateOffsetIndicator(m, layer) { + var pos = m.adjustedLatLng || m.latLng; + var redColor = getComputedStyle(document.documentElement).getPropertyValue('--status-red').trim() || '#ef4444'; + + if (m.offset > 10) { + // Line from true position to adjusted position + if (m._leafletLine) { + m._leafletLine.setLatLngs([m.latLng, pos]); + } else { + m._leafletLine = L.polyline([m.latLng, pos], { + color: redColor, weight: 2, dashArray: '6,4', opacity: 0.85 + }); + layer.addLayer(m._leafletLine); + } + // Dot at true GPS position + if (!m._leafletDot) { + m._leafletDot = L.circleMarker(m.latLng, { + radius: 3, fillColor: redColor, fillOpacity: 0.9, stroke: true, color: '#fff', weight: 1 + }); + layer.addLayer(m._leafletDot); + } + } else { + // No offset — remove indicator if it existed + if (m._leafletLine) { layer.removeLayer(m._leafletLine); m._leafletLine = null; } + if (m._leafletDot) { layer.removeLayer(m._leafletDot); m._leafletDot = null; } + } + } + + /** + * Reposition existing markers by re-running deconfliction at the current zoom. + * Avoids clearing and rebuilding all markers — eliminates flicker on zoom/resize. + */ + function _repositionMarkers() { + if (!map || _currentMarkerData.length === 0) return; + map.invalidateSize({ animate: false }); + + // Re-run deconfliction with current zoom pixel coordinates + deconflictLabels(_currentMarkerData, map); + + for (var i = 0; i < _currentMarkerData.length; i++) { + var m = _currentMarkerData[i]; + var pos = m.adjustedLatLng || m.latLng; + + // Update marker position + if (m._leafletMarker) m._leafletMarker.setLatLng(pos); + + _updateOffsetIndicator(m, markerLayer); + } + } + function renderMarkers() { if (_renderingMarkers) return; _renderingMarkers = true; @@ -670,10 +753,16 @@ function _renderMarkersInner() { markerLayer.clearLayers(); + _currentMarkerData = []; const filtered = nodes.filter(n => { if (!n.lat || !n.lon) return false; if (!filters[n.role || 'companion']) return false; + // Byte size filter (applies only to repeaters) + if (filters.byteSize !== 'all' && (n.role || 'companion') === 'repeater') { + const hs = n.hash_size || 1; + if (String(hs) !== filters.byteSize) return false; + } // Status filter if (filters.statusFilter !== 'all') { const role = (n.role || 'companion').toLowerCase(); @@ -719,24 +808,20 @@ deconflictLabels(allMarkers, map); } + // Store marker data for zoom/resize repositioning (avoids full rebuild) + _currentMarkerData = allMarkers; + for (const m of allMarkers) { const pos = m.adjustedLatLng || m.latLng; const marker = L.marker(pos, { icon: m.icon, alt: m.alt }); marker._nodeKey = m.node.public_key || m.node.id || null; marker.bindPopup(m.popupFn(), { maxWidth: 280 }); markerLayer.addLayer(marker); + m._leafletMarker = marker; + m._leafletLine = null; + m._leafletDot = null; - if (m.offset > 10) { - const line = L.polyline([m.latLng, pos], { - color: getComputedStyle(document.documentElement).getPropertyValue('--status-red').trim() || '#ef4444', weight: 2, dashArray: '6,4', opacity: 0.85 - }); - markerLayer.addLayer(line); - // Small dot at true GPS position - const dot = L.circleMarker(m.latLng, { - radius: 3, fillColor: getComputedStyle(document.documentElement).getPropertyValue('--status-red').trim() || '#ef4444', fillOpacity: 0.9, stroke: true, color: '#fff', weight: 1 - }); - markerLayer.addLayer(dot); - } + _updateOffsetIndicator(m, markerLayer); } } @@ -870,6 +955,7 @@ map = null; } markerLayer = null; + _currentMarkerData = []; routeLayer = null; if (heatLayer) { heatLayer = null; } geoFilterLayer = null; diff --git a/public/nodes.js b/public/nodes.js index 2542356c..c65eb8eb 100644 --- a/public/nodes.js +++ b/public/nodes.js @@ -372,13 +372,25 @@ }, 5000); } + /** + * Fetch node detail + health data in parallel. + * Both selectNode() and loadFullNode() need the same data — + * this shared helper avoids duplicating the fetch logic (fixes #391). + */ + async function fetchNodeDetail(pubkey) { + const [nodeData, healthData] = await Promise.all([ + api('/nodes/' + encodeURIComponent(pubkey), { ttl: CLIENT_TTL.nodeDetail }), + api('/nodes/' + encodeURIComponent(pubkey) + '/health', { ttl: CLIENT_TTL.nodeDetail }).catch(() => null) + ]); + nodeData.healthData = healthData; + return nodeData; + } + async function loadFullNode(pubkey) { const body = document.getElementById('nodeFullBody'); try { - const [nodeData, healthData] = await Promise.all([ - api('/nodes/' + encodeURIComponent(pubkey), { ttl: CLIENT_TTL.nodeDetail }), - api('/nodes/' + encodeURIComponent(pubkey) + '/health', { ttl: CLIENT_TTL.nodeDetail }).catch(() => null) - ]); + const nodeData = await fetchNodeDetail(pubkey); + const healthData = nodeData.healthData; const n = nodeData.node; const adverts = (nodeData.recentAdverts || []).sort((a, b) => new Date(b.timestamp) - new Date(a.timestamp)); const title = document.querySelector('.node-full-title'); @@ -963,11 +975,7 @@ panel.innerHTML = '
Loading…
'; try { - const [data, healthData] = await Promise.all([ - api('/nodes/' + encodeURIComponent(pubkey), { ttl: CLIENT_TTL.nodeDetail }), - api('/nodes/' + encodeURIComponent(pubkey) + '/health', { ttl: CLIENT_TTL.nodeDetail }).catch(() => null) - ]); - data.healthData = healthData; + const data = await fetchNodeDetail(pubkey); renderDetail(panel, data); } catch (e) { panel.innerHTML = `
Error: ${e.message}
`; diff --git a/public/og-image.png b/public/og-image.png index 903b895a..0daa3389 100644 Binary files a/public/og-image.png and b/public/og-image.png differ diff --git a/public/packets.js b/public/packets.js index 83b47886..7df3fd9d 100644 --- a/public/packets.js +++ b/public/packets.js @@ -40,6 +40,21 @@ clearTimeout(_renderTimer); _renderTimer = setTimeout(() => renderTableRows(), 200); } + + // Coalesce WS-triggered renders into one per animation frame (#396). + // Multiple WS batches arriving within the same frame only trigger a single + // renderTableRows() call on the next rAF, preventing rapid full rebuilds. + function scheduleWSRender() { + _wsRenderDirty = true; + if (_wsRafId) return; // already scheduled + _wsRafId = requestAnimationFrame(function () { + _wsRafId = null; + if (_wsRenderDirty) { + _wsRenderDirty = false; + renderTableRows(); + } + }); + } const PANEL_WIDTH_KEY = 'meshcore-panel-width'; const PANEL_CLOSE_HTML = ''; @@ -59,6 +74,8 @@ let _lastVisibleEnd = -1; // last rendered end index (for dirty checking) let _vsScrollHandler = null; // scroll listener reference let _wsRenderTimer = null; // debounce timer for WS-triggered renders + let _wsRafId = null; // rAF id for coalescing WS-triggered renders (#396) + let _wsRenderDirty = false; // dirty flag for rAF render coalescing (#396) let _observerFilterSet = null; // cached Set from filters.observer, hoisted above loops (#427) function closeDetailPanel() { @@ -461,9 +478,8 @@ if (packets.length > PACKET_LIMIT) packets.length = PACKET_LIMIT; } totalCount += filtered.length; - // Debounce WS-triggered renders to avoid rapid full rebuilds - clearTimeout(_wsRenderTimer); - _wsRenderTimer = setTimeout(function () { renderTableRows(); }, 200); + // Coalesce WS-triggered renders via rAF (#396) + scheduleWSRender(); }); }); } @@ -474,6 +490,8 @@ wsHandler = null; detachVScrollListener(); clearTimeout(_wsRenderTimer); + if (_wsRafId) { cancelAnimationFrame(_wsRafId); _wsRafId = null; } + _wsRenderDirty = false; _displayPackets = []; _rowCounts = []; _rowCountsDirty = false; @@ -524,7 +542,11 @@ if (filters.hash) params.set('hash', filters.hash); if (filters.node) params.set('node', filters.node); if (filters.observer) params.set('observer', filters.observer); - params.set('groupByHash', 'true'); // always fetch grouped + if (groupByHash) { + params.set('groupByHash', 'true'); + } else { + params.set('expand', 'observations'); + } const data = await api('/packets?' + params.toString()); packets = data.packets || []; @@ -532,20 +554,14 @@ for (const p of packets) { if (p.hash) hashIndex.set(p.hash, p); } totalCount = data.total || packets.length; - // When ungrouped, fetch observations for all multi-obs packets and flatten + // When ungrouped, flatten observations inline (single API call, no N+1) if (!groupByHash) { - const multiObs = packets.filter(p => (p.observation_count || p.count || 1) > 1); - await Promise.all(multiObs.map(async (p) => { - try { - const d = await api(`/packets/${p.hash}`); - if (d?.observations) p._children = d.observations.map(o => clearParsedCache({...d.packet, ...o, _isObservation: true})); - } catch {} - })); - // Flatten: replace grouped packets with individual observations const flat = []; for (const p of packets) { - if (p._children && p._children.length > 1) { - for (const c of p._children) flat.push(c); + if (p.observations && p.observations.length > 1) { + for (const o of p.observations) { + flat.push(clearParsedCache({...p, ...o, _isObservation: true, observations: undefined})); + } } else { flat.push(p); } @@ -873,18 +889,30 @@ obsSortSel.addEventListener('change', async function () { obsSortMode = this.value; localStorage.setItem('meshcore-obs-sort', obsSortMode); - // For non-observer sorts, fetch children for visible groups that don't have them yet + // For non-observer sorts, batch-fetch children for visible groups that don't have them yet if (obsSortMode !== SORT_OBSERVER && groupByHash) { const toFetch = packets.filter(p => p.hash && !p._children && (p.observation_count || 0) > 1); - await Promise.all(toFetch.map(async (p) => { + if (toFetch.length > 0) { + const hashes = toFetch.map(p => p.hash); try { - const data = await api(`/packets/${p.hash}`); - if (data?.packet && data.observations) { - p._children = data.observations.map(o => clearParsedCache({...data.packet, ...o, _isObservation: true})); - p._fetchedData = data; + const resp = await fetch('/api/packets/observations', { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({hashes}) + }); + if (resp.ok) { + const data = await resp.json(); + const results = data.results || {}; + for (const p of toFetch) { + const obs = results[p.hash]; + if (obs && obs.length) { + p._children = obs.map(o => clearParsedCache({...p, ...o, _isObservation: true})); + p._fetchedData = {packet: p, observations: obs}; + } + } } } catch {} - })); + } } // Re-sort all groups with children for (const p of packets) { @@ -1076,7 +1104,7 @@ } // Build HTML for a single grouped packet row - function buildGroupRowHtml(p) { + function buildGroupRowHtml(p, entryIdx = -1) { const isExpanded = expandedHashes.has(p.hash); let headerObserverId = p.observer_id; let headerPathJson = p.path_json; @@ -1096,7 +1124,7 @@ const groupSize = p.raw_hex ? Math.floor(p.raw_hex.length / 2) : 0; const groupHashBytes = ((parseInt(p.raw_hex?.slice(2, 4), 16) || 0) >> 6) + 1; const isSingle = p.count <= 1; - let html = ` + let html = ` ${isSingle ? '' : (isExpanded ? '▼' : '▶')} ${groupRegion ? `${groupRegion}` : '—'} ${renderTimestampCell(p.latest)} @@ -1122,7 +1150,7 @@ const childRegion = c.observer_id ? (observerMap.get(c.observer_id)?.iata || '') : ''; const childPath = getParsedPath(c); const childPathStr = renderPath(childPath, c.observer_id); - html += ` + html += ` ${childRegion ? `${childRegion}` : '—'} ${renderTimestampCell(c.timestamp)} ${truncate(c.hash || '', 8)} @@ -1140,7 +1168,7 @@ } // Build HTML for a single flat (ungrouped) packet row - function buildFlatRowHtml(p) { + function buildFlatRowHtml(p, entryIdx = -1) { const decoded = getParsedDecoded(p) || {}; const pathHops = getParsedPath(p) || []; const region = p.observer_id ? (observerMap.get(p.observer_id)?.iata || '') : ''; @@ -1150,7 +1178,7 @@ const hashBytes = ((parseInt(p.raw_hex?.slice(2, 4), 16) || 0) >> 6) + 1; const pathStr = renderPath(pathHops, p.observer_id); const detail = getDetailPreview(decoded); - return ` + return ` ${region ? `${region}` : '—'} ${renderTimestampCell(p.timestamp)} ${truncate(p.hash || String(p.id), 8)} @@ -1275,6 +1303,9 @@ // Skip DOM rebuild if visible range hasn't changed if (startIdx === _lastVisibleStart && endIdx === _lastVisibleEnd) return; + + const prevStart = _lastVisibleStart; + const prevEnd = _lastVisibleEnd; _lastVisibleStart = startIdx; _lastVisibleEnd = endIdx; @@ -1285,14 +1316,44 @@ topSpacer.firstChild.style.height = topPad + 'px'; bottomSpacer.firstChild.style.height = bottomPad + 'px'; - // LAZY ROW GENERATION: only build HTML for the visible slice (#422) const builder = _displayGrouped ? buildGroupRowHtml : buildFlatRowHtml; - const visibleSlice = _displayPackets.slice(startIdx, endIdx); - const visibleHtml = visibleSlice.map(p => builder(p)).join(''); - tbody.innerHTML = ''; - tbody.appendChild(topSpacer); - tbody.insertAdjacentHTML('beforeend', visibleHtml); - tbody.appendChild(bottomSpacer); + const hasOverlap = prevStart !== -1 && startIdx < prevEnd && endIdx > prevStart; + + if (!hasOverlap) { + // Full rebuild: initial render or large scroll jump past buffer + const visibleHtml = _displayPackets.slice(startIdx, endIdx) + .map((p, i) => builder(p, startIdx + i)).join(''); + tbody.innerHTML = ''; + tbody.appendChild(topSpacer); + tbody.insertAdjacentHTML('beforeend', visibleHtml); + tbody.appendChild(bottomSpacer); + return; + } + + // Incremental update: remove rows that scrolled out at the top + for (let i = prevStart; i < startIdx && i < prevEnd; i++) { + tbody.querySelectorAll('[data-entry-idx="' + i + '"]').forEach(r => r.remove()); + } + // Remove rows that scrolled out at the bottom + for (let i = Math.max(endIdx, prevStart); i < prevEnd; i++) { + tbody.querySelectorAll('[data-entry-idx="' + i + '"]').forEach(r => r.remove()); + } + // Prepend rows that scrolled into view at the top + if (startIdx < prevStart) { + let html = ''; + for (let i = startIdx; i < Math.min(prevStart, endIdx); i++) { + html += builder(_displayPackets[i], i); + } + topSpacer.insertAdjacentHTML('afterend', html); + } + // Append rows that scrolled into view at the bottom + if (endIdx > prevEnd) { + let html = ''; + for (let i = Math.max(prevEnd, startIdx); i < endIdx; i++) { + html += builder(_displayPackets[i], i); + } + bottomSpacer.insertAdjacentHTML('beforebegin', html); + } } // Attach/detach scroll listener for virtual scrolling diff --git a/test-frontend-helpers.js b/test-frontend-helpers.js index 6e8ed4c1..c1f057c2 100644 --- a/test-frontend-helpers.js +++ b/test-frontend-helpers.js @@ -3193,20 +3193,24 @@ console.log('\n=== channels.js: formatHashHex (issue #465) ==='); 'destroy must reset observerMap to empty Map'); }); - test('WS handler debounces render via _wsRenderTimer', () => { + test('WS handler coalesces render via rAF (#396)', () => { const wsBlock = src.slice(src.indexOf('wsHandler = debouncedOnWS'), src.indexOf('function destroy()')); - assert.ok(wsBlock.includes('_wsRenderTimer'), - 'WS handler must debounce renders via _wsRenderTimer'); - assert.ok(wsBlock.includes('clearTimeout(_wsRenderTimer)'), - 'WS handler must clear pending timer before scheduling new render'); - assert.ok(/setTimeout\(function \(\) \{ renderTableRows\(\); \}/.test(wsBlock), - 'WS handler must schedule renderTableRows via setTimeout'); - }); - - test('destroy clears _wsRenderTimer', () => { - const destroyBlock = src.slice(src.indexOf('function destroy()'), src.indexOf('function destroy()') + 500); - assert.ok(destroyBlock.includes('clearTimeout(_wsRenderTimer)'), - 'destroy must clear _wsRenderTimer to prevent stale renders after navigation'); + assert.ok(wsBlock.includes('scheduleWSRender()'), + 'WS handler must coalesce renders via scheduleWSRender()'); + // Verify scheduleWSRender uses requestAnimationFrame + const schedFn = src.slice(src.indexOf('function scheduleWSRender()'), src.indexOf('function scheduleWSRender()') + 300); + assert.ok(schedFn.includes('requestAnimationFrame'), + 'scheduleWSRender must use requestAnimationFrame for coalescing'); + assert.ok(schedFn.includes('_wsRenderDirty'), + 'scheduleWSRender must use dirty flag pattern'); + }); + + test('destroy clears rAF and dirty flag (#396)', () => { + const destroyBlock = src.slice(src.indexOf('function destroy()'), src.indexOf('function destroy()') + 600); + assert.ok(destroyBlock.includes('cancelAnimationFrame(_wsRafId)'), + 'destroy must cancel pending rAF to prevent stale renders after navigation'); + assert.ok(destroyBlock.includes('_wsRenderDirty = false'), + 'destroy must reset dirty flag'); }); } // ===== NODES.JS: shared sandbox factory ===== diff --git a/test-packets.js b/test-packets.js index 59586f17..a3c08031 100644 --- a/test-packets.js +++ b/test-packets.js @@ -107,6 +107,7 @@ function loadPacketsSandbox() { // Load dependencies first loadInCtx(ctx, 'public/roles.js'); loadInCtx(ctx, 'public/app.js'); + loadInCtx(ctx, 'public/packet-helpers.js'); // HopDisplay stub (simpler than loading real file which may have DOM deps) vm.runInContext(` window.HopDisplay = { @@ -695,6 +696,26 @@ console.log('\n=== packets.js: buildFlatRowHtml ==='); const result = api.buildFlatRowHtml(p); assert(result.includes('0B')); }); + + test('buildFlatRowHtml emits data-entry-idx when provided', () => { + const p = { + id: 4, hash: 'z', timestamp: '', observer_id: null, + raw_hex: 'aabb', payload_type: 0, route_type: 0, + decoded_json: '{}', path_json: '[]' + }; + const result = api.buildFlatRowHtml(p, 42); + assert(result.includes('data-entry-idx="42"')); + }); + + test('buildFlatRowHtml emits data-entry-idx=-1 by default', () => { + const p = { + id: 5, hash: 'w', timestamp: '', observer_id: null, + raw_hex: 'aabb', payload_type: 0, route_type: 0, + decoded_json: '{}', path_json: '[]' + }; + const result = api.buildFlatRowHtml(p); + assert(result.includes('data-entry-idx="-1"')); + }); } console.log('\n=== packets.js: buildGroupRowHtml ==='); @@ -740,6 +761,36 @@ console.log('\n=== packets.js: buildGroupRowHtml ==='); assert(result.includes('👁')); assert(result.includes('5')); }); + + test('buildGroupRowHtml emits data-entry-idx on header row', () => { + const p = { + hash: 'ei1', count: 1, latest: '2024-01-01T00:00:00Z', + observer_id: null, raw_hex: 'aa', payload_type: 0, + route_type: 0, decoded_json: '{}', path_json: '[]', + observation_count: 1, observer_count: 1 + }; + const result = api.buildGroupRowHtml(p, 7); + assert(result.includes('data-entry-idx="7"')); + }); + + test('buildGroupRowHtml emits data-entry-idx on child rows', () => { + const ctx2 = loadPacketsSandbox(); + const api2 = ctx2._packetsTestAPI; + // Simulate expandedHashes having this hash + // We can't easily toggle expandedHashes from outside, so test via the + // fact that children only render when isExpanded is true. + // For this test, just verify the header row has the attribute (child rows + // are conditional on expandedHashes which we can't set from tests). + const p = { + hash: 'ei2', count: 3, latest: '2024-01-01T00:00:00Z', + observer_id: null, raw_hex: 'aabb', payload_type: 0, + route_type: 0, decoded_json: '{}', path_json: '[]', + observation_count: 3, observer_count: 2, + _children: [] + }; + const result = api2.buildGroupRowHtml(p, 15); + assert(result.includes('data-entry-idx="15"')); + }); } console.log('\n=== packets.js: page registration ===');