diff --git a/cmd/server/config.go b/cmd/server/config.go index 2a0a89e5..b9454d1f 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -134,9 +134,10 @@ type NeighborGraphConfig struct { // PacketStoreConfig controls in-memory packet store limits. type PacketStoreConfig struct { - RetentionHours float64 `json:"retentionHours"` // max age of packets in hours (0 = unlimited) - MaxMemoryMB int `json:"maxMemoryMB"` // hard memory ceiling in MB (0 = unlimited) - MaxResolvedPubkeyIndexEntries int `json:"maxResolvedPubkeyIndexEntries"` // warning threshold for index size (0 = 5M default) + RetentionHours float64 `json:"retentionHours"` // max age of packets in hours (0 = unlimited) + MaxMemoryMB int `json:"maxMemoryMB"` // hard memory ceiling in MB (0 = unlimited) + MaxResolvedPubkeyIndexEntries int `json:"maxResolvedPubkeyIndexEntries"` // warning threshold for index size (0 = 5M default) + HotStartupHours float64 `json:"hotStartupHours"` // load only this many hours synchronously; 0 = disabled } // GeoFilterConfig is an alias for the shared geofilter.Config type. diff --git a/cmd/server/hot_startup_test.go b/cmd/server/hot_startup_test.go new file mode 100644 index 00000000..0ef3f0d0 --- /dev/null +++ b/cmd/server/hot_startup_test.go @@ -0,0 +1,389 @@ +package main + +import ( + "database/sql" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +// createTestDBMultiDay creates a test DB with packets spread across numDays days. +// txPerDay transmissions are inserted per day, oldest day first. +// Packets within each day are spaced 1 minute apart. +func createTestDBMultiDay(t *testing.T, numDays, txPerDay int) string { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + execOrFail := func(s string) { + if _, err := conn.Exec(s); err != nil { + t.Fatalf("createTestDBMultiDay setup: %v", err) + } + } + execOrFail(`CREATE TABLE transmissions (id INTEGER PRIMARY KEY, raw_hex TEXT, hash TEXT, first_seen TEXT, route_type INTEGER, payload_type INTEGER, payload_version INTEGER, decoded_json TEXT)`) + execOrFail(`CREATE TABLE observations (id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, observer_name TEXT, direction TEXT, snr REAL, rssi REAL, score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT)`) + execOrFail(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`) + execOrFail(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`) + execOrFail(`CREATE TABLE schema_version (version INTEGER)`) + execOrFail(`INSERT INTO schema_version (version) VALUES (1)`) + execOrFail(`CREATE INDEX idx_tx_first_seen ON transmissions(first_seen)`) + + id := 1 + now := time.Now().UTC() + for day := numDays; day >= 1; day-- { + base := now.Add(-time.Duration(day) * 24 * time.Hour) + for i := 0; i < txPerDay; i++ { + ts := base.Add(time.Duration(i) * time.Minute).Format(time.RFC3339) + hash := fmt.Sprintf("hash%06d", id) + conn.Exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)", id, "aa", hash, ts, `{}`) + conn.Exec("INSERT INTO observations VALUES (?,?,?,?,?,?,?,?,?,?,?)", id, id, "obs1", "Obs1", "RX", -10.0, -80.0, 5, `[]`, ts, "") + id++ + } + } + return dbPath +} + +// waitForBackgroundLoad polls backgroundLoadDone until true or timeout. +func waitForBackgroundLoad(t *testing.T, store *PacketStore, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if store.backgroundLoadDone.Load() { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("background load did not complete within %v", timeout) +} + +func TestHotStartupConfig_Clamp(t *testing.T) { + dbPath := createTestDB(t, 10) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + // hotStartupHours > retentionHours → must be clamped + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 24, + HotStartupHours: 48, + }) + if store.hotStartupHours != 24 { + t.Errorf("expected hotStartupHours clamped to retentionHours=24, got %f", store.hotStartupHours) + } +} + +func TestHotStartupConfig_ZeroIsDisabled(t *testing.T) { + dbPath := createTestDB(t, 10) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 24, + HotStartupHours: 0, + }) + if store.hotStartupHours != 0 { + t.Errorf("expected hotStartupHours=0, got %f", store.hotStartupHours) + } +} + +func TestHotStartup_LoadsOnlyHotWindow(t *testing.T) { + // 50 old packets (48h ago), 10 recent (30min ago) + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, // load only last 1 hour + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // Only the 10 recent packets should be in memory + if len(store.packets) != 10 { + t.Errorf("expected 10 recent packets in hot window, got %d", len(store.packets)) + } + // oldestLoaded should be ~1h ago + if store.oldestLoaded == "" { + t.Fatal("oldestLoaded must be set after Load()") + } + oldest, _ := time.Parse(time.RFC3339, store.oldestLoaded) + diff := time.Since(oldest) + if diff < 30*time.Minute || diff > 90*time.Minute { + t.Errorf("oldestLoaded %s should be ~1h ago, got diff=%v", store.oldestLoaded, diff) + } + // backgroundLoadDone must not be set by Load() itself + if store.backgroundLoadDone.Load() { + t.Error("backgroundLoadDone must not be true after Load()") + } +} + +func TestHotStartup_DisabledWhenZero(t *testing.T) { + // 50 old (48h ago), 10 recent (30min ago) — all within 72h retention + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 0, // disabled → load all retentionHours as before + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // All 60 packets should be loaded (both old and recent within 72h) + if len(store.packets) != 60 { + t.Errorf("expected 60 packets with hotStartupHours=0, got %d", len(store.packets)) + } +} + +func TestHotStartup_loadChunk_AddsOlderData(t *testing.T) { + // 50 old packets (48h ago), 10 recent (30min ago) + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + if len(store.packets) != 10 { + t.Fatalf("setup: expected 10 packets after hot Load, got %d", len(store.packets)) + } + + // Load the old chunk (covers the 50 old packets at ~48h ago) + chunkEnd := time.Now().UTC().Add(-1 * time.Hour) + chunkStart := time.Now().UTC().Add(-72 * time.Hour) + if err := store.loadChunk(chunkStart, chunkEnd); err != nil { + t.Fatalf("loadChunk failed: %v", err) + } + + // Should have 10 recent + 50 old + if len(store.packets) != 60 { + t.Errorf("expected 60 packets after loadChunk, got %d", len(store.packets)) + } + // Packets must remain sorted ASC by first_seen + for i := 1; i < len(store.packets); i++ { + if store.packets[i].FirstSeen < store.packets[i-1].FirstSeen { + t.Fatalf("packets not in ASC order at index %d: %s < %s", + i, store.packets[i].FirstSeen, store.packets[i-1].FirstSeen) + } + } + // byHash must include the old packets + if len(store.byHash) != 60 { + t.Errorf("expected byHash len=60, got %d", len(store.byHash)) + } +} + +func TestHotStartup_BackgroundFillsToRetention(t *testing.T) { + // 3 days × 50 tx/day = 150 total + dbPath := createTestDBMultiDay(t, 3, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 24, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // After hot Load: only ~50 packets (day 1 = last 24h) + afterHot := len(store.packets) + if afterHot < 1 || afterHot > 60 { + t.Errorf("expected ~50 packets after hot Load, got %d", afterHot) + } + + // Start background fill + go store.loadBackgroundChunks() + waitForBackgroundLoad(t, store, 15*time.Second) + + // After background fill: all 150 packets should be loaded + store.mu.RLock() + total := len(store.packets) + store.mu.RUnlock() + + if total < 140 || total > 160 { + t.Errorf("expected ~150 packets after background load, got %d", total) + } + if !store.backgroundLoadDone.Load() { + t.Error("backgroundLoadDone must be true after loadBackgroundChunks returns") + } +} + +func TestHotStartup_ChunkErrorRecovery(t *testing.T) { + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // Close the DB so all subsequent chunk queries fail. + db.conn.Close() + + done := make(chan struct{}) + go func() { + store.loadBackgroundChunks() + close(done) + }() + + select { + case <-done: + // Good — completed without hanging. + case <-time.After(10 * time.Second): + t.Fatal("loadBackgroundChunks hung after DB close") + } + + if !store.backgroundLoadDone.Load() { + t.Error("backgroundLoadDone must be set even when all chunks fail") + } +} + +func TestHotStartup_SQLFallback_TriggeredForOldDate(t *testing.T) { + // 50 old packets (48h ago), 10 recent (30min ago) + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + // Hot load: only last 1h → 10 recent packets in memory + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + if len(store.packets) != 10 { + t.Fatalf("setup: expected 10 in-memory packets, got %d", len(store.packets)) + } + + // Query with Since = 49h ago (before oldestLoaded ~1h ago) → SQL fallback + since49h := time.Now().UTC().Add(-49 * time.Hour).Format(time.RFC3339) + result := store.QueryPackets(PacketQuery{Since: since49h, Limit: 100, Order: "ASC"}) + + // SQL fallback should return the old packets (stored at ~48h ago) + if result.Total == 0 { + t.Error("expected SQL fallback to return old packets for query before oldestLoaded") + } +} + +func TestHotStartup_PerfStats(t *testing.T) { + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + stats := store.GetPerfStoreStats() + + if v, ok := stats["hotStartupHours"]; !ok || v.(float64) != 1 { + t.Errorf("expected hotStartupHours=1 in stats, got %v", v) + } + if v, ok := stats["backgroundLoadComplete"]; !ok || v.(bool) != false { + t.Errorf("expected backgroundLoadComplete=false in stats, got %v", v) + } + if _, ok := stats["backgroundLoadProgress"]; !ok { + t.Error("expected backgroundLoadProgress in stats") + } +} + +func TestHotStartup_SQLFallback_NotTriggeredForRecentDate(t *testing.T) { + // 50 old packets (48h ago), 10 recent (30min ago) + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + // Hot load: last 1h → 10 recent packets in memory + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // Query with Since = 45min ago (after oldestLoaded ~1h ago) → in-memory path + since45m := time.Now().UTC().Add(-45 * time.Minute).Format(time.RFC3339) + result := store.QueryPackets(PacketQuery{Since: since45m, Limit: 100, Order: "ASC"}) + + // In-memory path: returns only the 10 recent packets (all within last 30min) + if result.Total != 10 { + t.Errorf("expected 10 in-memory packets for recent Since query, got %d", result.Total) + } +} diff --git a/cmd/server/main.go b/cmd/server/main.go index 9b3ad842..64f605e2 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -175,6 +175,11 @@ func main() { if err := store.Load(); err != nil { log.Fatalf("[store] failed to load: %v", err) } + if store.hotStartupHours > 0 { + log.Printf("[store] starting background load: filling retentionHours=%.0fh from hotStartupHours=%.0fh", + store.retentionHours, store.hotStartupHours) + go store.loadBackgroundChunks() + } // Initialize persisted neighbor graph dbPath = database.path diff --git a/cmd/server/store.go b/cmd/server/store.go index a9b6662d..204aab7c 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -231,6 +231,12 @@ type PacketStore struct { // Empty string means all data is in memory (no limit applied). oldestLoaded string + // Hot startup: only hotStartupHours of data is loaded synchronously. + // 0 = disabled (current behavior). Background loader fills the rest. + hotStartupHours float64 + backgroundLoadDone atomic.Bool + backgroundLoadProgress atomic.Int64 // 0–100 percent complete + // Async hash migration state: set after migrateContentHashesAsync completes. hashMigrationComplete atomic.Bool @@ -416,6 +422,14 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig, cacheTTLs ...map[string]inte ps.retentionHours = cfg.RetentionHours ps.maxMemoryMB = cfg.MaxMemoryMB ps.maxResolvedPubkeyIndexEntries = cfg.MaxResolvedPubkeyIndexEntries + if cfg.HotStartupHours > 0 { + h := cfg.HotStartupHours + if ps.retentionHours > 0 && h > ps.retentionHours { + log.Printf("[store] warning: hotStartupHours (%.0f) > retentionHours (%.0f) — clamping", h, ps.retentionHours) + h = ps.retentionHours + } + ps.hotStartupHours = h + } } // Wire cacheTTL config values to server-side cache durations. if len(cacheTTLs) > 0 && cacheTTLs[0] != nil { @@ -474,9 +488,14 @@ func (s *PacketStore) Load() error { } // Build WHERE conditions: retention cutoff (mirrors Evict logic) + optional memory-cap limit. + // When hotStartupHours > 0, use it as the initial cutoff (smaller window = fast startup). var loadConditions []string - if s.retentionHours > 0 { - cutoff := time.Now().UTC().Add(-time.Duration(s.retentionHours*3600) * time.Second).Format(time.RFC3339) + hotCutoffHours := s.retentionHours + if s.hotStartupHours > 0 { + hotCutoffHours = s.hotStartupHours + } + if hotCutoffHours > 0 { + cutoff := time.Now().UTC().Add(-time.Duration(hotCutoffHours*3600) * time.Second).Format(time.RFC3339) loadConditions = append(loadConditions, fmt.Sprintf("t.first_seen >= '%s'", cutoff)) } if maxPackets > 0 { @@ -664,7 +683,13 @@ func (s *PacketStore) Load() error { s.buildDistanceIndex() // Track oldest loaded timestamp for future SQL fallback queries. - if len(s.packets) > 0 { + // When hotStartupHours > 0 the load window boundary (cutoff) is the + // authoritative lower bound, not the first packet's timestamp (which may be + // newer if no packets landed exactly at the boundary). + if s.hotStartupHours > 0 { + hotCutoffStr := time.Now().UTC().Add(-time.Duration(s.hotStartupHours*3600) * time.Second).Format(time.RFC3339) + s.oldestLoaded = hotCutoffStr + } else if len(s.packets) > 0 { s.oldestLoaded = s.packets[0].FirstSeen } @@ -680,6 +705,317 @@ func (s *PacketStore) Load() error { return nil } +// loadChunk queries a [from, to) time window from SQLite without holding the +// write lock, builds local data structures, then merges them into the store +// under s.mu.Lock(). It is the building block for the background loader. +// +// The chunk is assumed to be older than the data already in the store, so +// localPackets are prepended to s.packets. +// +// byPathHop, spIndex, and distHops are NOT updated here — the caller +// (loadBackgroundChunks) rebuilds those once after all chunks are merged. +func (s *PacketStore) loadChunk(from, to time.Time) error { + fromStr := from.UTC().Format(time.RFC3339) + toStr := to.UTC().Format(time.RFC3339) + + // Build the same SQL as Load() but with a [from, to) window. + rpCol := "" + if s.db.hasResolvedPath { + rpCol = ",\n\t\t\t\to.resolved_path" + } + obsRawHexCol := "" + if s.db.hasObsRawHex { + obsRawHexCol = ", o.raw_hex" + } + + filterClause := fmt.Sprintf("\n\t\t\tWHERE t.first_seen >= '%s' AND t.first_seen < '%s'", fromStr, toStr) + + var chunkSQL string + if s.db.isV3 { + chunkSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, + t.payload_type, t.payload_version, t.decoded_json, + o.id, obs.id, obs.name, o.direction, + o.snr, o.rssi, o.score, o.path_json, strftime('%Y-%m-%dT%H:%M:%fZ', o.timestamp, 'unixepoch')` + obsRawHexCol + rpCol + ` + FROM transmissions t + LEFT JOIN observations o ON o.transmission_id = t.id + LEFT JOIN observers obs ON obs.rowid = o.observer_idx` + filterClause + ` + ORDER BY t.first_seen ASC, o.timestamp DESC` + } else { + chunkSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, + t.payload_type, t.payload_version, t.decoded_json, + o.id, o.observer_id, o.observer_name, o.direction, + o.snr, o.rssi, o.score, o.path_json, o.timestamp` + obsRawHexCol + rpCol + ` + FROM transmissions t + LEFT JOIN observations o ON o.transmission_id = t.id` + filterClause + ` + ORDER BY t.first_seen ASC, o.timestamp DESC` + } + + rows, err := s.db.conn.Query(chunkSQL) + if err != nil { + return err + } + defer rows.Close() + + // Local data structures — built without holding the lock. + localByHash := make(map[string]*StoreTx) + localPackets := make([]*StoreTx, 0) + localByTxID := make(map[int]*StoreTx) + localByObsID := make(map[int]*StoreObs) + localByObserver := make(map[string][]*StoreObs) + var localTotalObs int + var localTrackedBytes int64 + var localMaxTxID int + var localMaxObsID int + + for rows.Next() { + var txID int + var rawHex, hash, firstSeen, decodedJSON sql.NullString + var routeType, payloadType, payloadVersion sql.NullInt64 + var obsID sql.NullInt64 + var observerID, observerName, direction, pathJSON, obsTimestamp sql.NullString + var snr, rssi sql.NullFloat64 + var score sql.NullInt64 + var obsRawHex sql.NullString + var resolvedPathStr sql.NullString + + scanArgs := []interface{}{&txID, &rawHex, &hash, &firstSeen, &routeType, &payloadType, + &payloadVersion, &decodedJSON, + &obsID, &observerID, &observerName, &direction, + &snr, &rssi, &score, &pathJSON, &obsTimestamp} + if s.db.hasObsRawHex { + scanArgs = append(scanArgs, &obsRawHex) + } + if s.db.hasResolvedPath { + scanArgs = append(scanArgs, &resolvedPathStr) + } + if err := rows.Scan(scanArgs...); err != nil { + log.Printf("[store] loadChunk scan error: %v", err) + continue + } + + hashStr := nullStrVal(hash) + tx := localByHash[hashStr] + if tx == nil { + tx = &StoreTx{ + ID: txID, + RawHex: nullStrVal(rawHex), + Hash: hashStr, + FirstSeen: nullStrVal(firstSeen), + LatestSeen: nullStrVal(firstSeen), + RouteType: nullIntPtr(routeType), + PayloadType: nullIntPtr(payloadType), + DecodedJSON: nullStrVal(decodedJSON), + obsKeys: make(map[string]bool), + observerSet: make(map[string]bool), + } + localByHash[hashStr] = tx + localPackets = append(localPackets, tx) + localByTxID[txID] = tx + if txID > localMaxTxID { + localMaxTxID = txID + } + localTrackedBytes += estimateStoreTxBytes(tx) + } + + if obsID.Valid { + oid := int(obsID.Int64) + obsIDStr := nullStrVal(observerID) + obsPJ := nullStrVal(pathJSON) + + dk := obsIDStr + "|" + obsPJ + if tx.obsKeys[dk] { + continue + } + + obs := &StoreObs{ + ID: oid, + TransmissionID: txID, + ObserverID: obsIDStr, + ObserverName: nullStrVal(observerName), + Direction: nullStrVal(direction), + SNR: nullFloatPtr(snr), + RSSI: nullFloatPtr(rssi), + Score: nullIntPtr(score), + PathJSON: obsPJ, + RawHex: nullStrVal(obsRawHex), + Timestamp: normalizeTimestamp(nullStrVal(obsTimestamp)), + } + + tx.Observations = append(tx.Observations, obs) + tx.obsKeys[dk] = true + if obs.ObserverID != "" && !tx.observerSet[obs.ObserverID] { + tx.observerSet[obs.ObserverID] = true + tx.UniqueObserverCount++ + } + tx.ObservationCount++ + if obs.Timestamp > tx.LatestSeen { + tx.LatestSeen = obs.Timestamp + } + + localByObsID[oid] = obs + if oid > localMaxObsID { + localMaxObsID = oid + } + if obsIDStr != "" { + localByObserver[obsIDStr] = append(localByObserver[obsIDStr], obs) + } + localTotalObs++ + localTrackedBytes += estimateStoreObsBytes(obs) + } + } + if err := rows.Err(); err != nil { + return err + } + + // Pick best observation for each local packet before merging. + for _, tx := range localPackets { + pickBestObservation(tx) + } + + if len(localPackets) == 0 { + return nil + } + + // Merge under write lock. + s.mu.Lock() + defer s.mu.Unlock() + + // Prepend: chunk is older than the hot window already in store. + s.packets = append(localPackets, s.packets...) + + // Merge indexes. + for k, v := range localByHash { + if s.byHash[k] == nil { + s.byHash[k] = v + } + } + for k, v := range localByTxID { + if s.byTxID[k] == nil { + s.byTxID[k] = v + } + } + for k, v := range localByObsID { + if s.byObsID[k] == nil { + s.byObsID[k] = v + } + } + for observerID, obsList := range localByObserver { + for _, o := range obsList { + if s.byObsID[o.ID] == nil { + s.byObserver[observerID] = append(s.byObserver[observerID], o) + } + } + } + + // Index each local packet into byNode and byPayloadType. + for _, tx := range localPackets { + s.indexByNode(tx) + if tx.PayloadType != nil { + pt := *tx.PayloadType + s.byPayloadType[pt] = append(s.byPayloadType[pt], tx) + } + s.trackAdvertPubkey(tx) + } + + // Update counters. + s.totalObs += localTotalObs + s.trackedBytes += localTrackedBytes + if localMaxTxID > s.maxTxID { + s.maxTxID = localMaxTxID + } + if localMaxObsID > s.maxObsID { + s.maxObsID = localMaxObsID + } + s.oldestLoaded = fromStr + + log.Printf("[store] background chunk [%s, %s) merged: %d tx, %d obs", fromStr, toStr, len(localPackets), localTotalObs) + return nil +} + +// loadBackgroundChunks fills the remaining retentionHours window by loading +// daily chunks from oldestLoaded back to the retention cutoff. After all +// chunks are merged it rebuilds analytics indexes once. Chunk errors are +// handled by advancing past the failed window so the loop always terminates. +func (s *PacketStore) loadBackgroundChunks() { + if s.retentionHours <= 0 { + s.backgroundLoadDone.Store(true) + return + } + + target := time.Now().UTC().Add(-time.Duration(s.retentionHours*3600) * time.Second) + totalHours := s.retentionHours - s.hotStartupHours + if totalHours <= 0 { + s.backgroundLoadDone.Store(true) + return + } + + var chunksLoaded float64 + totalChunks := math.Ceil(totalHours / 24) + + for { + s.mu.RLock() + oldest := s.oldestLoaded + s.mu.RUnlock() + + if oldest == "" { + break + } + chunkEnd, err := time.Parse(time.RFC3339, oldest) + if err != nil { + log.Printf("[store] background loader: bad oldestLoaded %q: %v", oldest, err) + break + } + if !chunkEnd.After(target) { + break + } + + chunkStart := chunkEnd.Add(-24 * time.Hour) + if chunkStart.Before(target) { + chunkStart = target + } + + chunkStartStr := chunkStart.Format(time.RFC3339) + if err := s.loadChunk(chunkStart, chunkEnd); err != nil { + log.Printf("[store] background chunk [%s, %s) error: %v — advancing past it", + chunkStartStr, chunkEnd.Format(time.RFC3339), err) + } + // Always advance oldestLoaded to chunkStart so the loop terminates, + // even when the chunk was empty (loadChunk skips the update when 0 packets). + s.mu.Lock() + if s.oldestLoaded > chunkStartStr || s.oldestLoaded == "" { + s.oldestLoaded = chunkStartStr + } + s.mu.Unlock() + + chunksLoaded++ + if totalChunks > 0 { + pct := int64(chunksLoaded / totalChunks * 100) + if pct > 100 { + pct = 100 + } + s.backgroundLoadProgress.Store(pct) + } + + runtime.Gosched() + } + + // Rebuild analytics indexes once after all chunks are merged. + s.mu.Lock() + s.buildSubpathIndex() + s.buildPathHopIndex() + s.buildDistanceIndex() + s.mu.Unlock() + + s.backgroundLoadDone.Store(true) + s.backgroundLoadProgress.Store(100) + + s.mu.RLock() + totalPkts := len(s.packets) + oldest := s.oldestLoaded + s.mu.RUnlock() + log.Printf("[store] background load complete: %d packets in memory, oldestLoaded=%s", totalPkts, oldest) +} + // pickBestObservation selects the observation with the longest path // and sets it as the transmission's display observation. func pickBestObservation(tx *StoreTx) { @@ -830,6 +1166,22 @@ func (s *PacketStore) untrackAdvertPubkey(tx *StoreTx) { // QueryPackets returns filtered, paginated packets from memory. func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult { + // SQL fallback: if the query window predates the in-memory window, delegate + // to the DB layer which covers the full SQLite retention period. + s.mu.RLock() + oldest := s.oldestLoaded + s.mu.RUnlock() + if oldest != "" { + needsSQL := (q.Since != "" && q.Since < oldest) || + (q.Until != "" && q.Until < oldest) + if needsSQL { + if result, err := s.db.QueryPackets(q); err == nil { + return result + } else { + log.Printf("[store] QueryPackets SQL fallback failed: %v — using in-memory", err) + } + } + } atomic.AddInt64(&s.queryCount, 1) s.mu.RLock() defer s.mu.RUnlock() @@ -876,6 +1228,20 @@ func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult { // QueryGroupedPackets returns transmissions grouped by hash (already 1:1). func (s *PacketStore) QueryGroupedPackets(q PacketQuery) *PacketResult { + s.mu.RLock() + oldest := s.oldestLoaded + s.mu.RUnlock() + if oldest != "" { + needsSQL := (q.Since != "" && q.Since < oldest) || + (q.Until != "" && q.Until < oldest) + if needsSQL { + if result, err := s.db.QueryGroupedPackets(q); err == nil { + return result + } else { + log.Printf("[store] QueryGroupedPackets SQL fallback failed: %v — using in-memory", err) + } + } + } atomic.AddInt64(&s.queryCount, 1) if q.Limit <= 0 { @@ -1043,6 +1409,10 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { nodeIdx := len(s.byNode) pathHopIdx := len(s.byPathHop) ptIdx := len(s.byPayloadType) + oldestLoaded := s.oldestLoaded + retentionHours := s.retentionHours + maxMemoryMB := s.maxMemoryMB + hotStartupHours := s.hotStartupHours // Distinct advert pubkey count — precomputed incrementally (see trackAdvertPubkey). advertByObsCount := len(s.advertPubkeys) @@ -1054,18 +1424,21 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { evicted := atomic.LoadInt64(&s.evicted) return map[string]interface{}{ - "totalLoaded": totalLoaded, - "totalObservations": totalObs, - "evicted": evicted, - "inserts": atomic.LoadInt64(&s.insertCount), - "queries": atomic.LoadInt64(&s.queryCount), - "inMemory": totalLoaded, - "sqliteOnly": false, - "retentionHours": s.retentionHours, - "maxMemoryMB": s.maxMemoryMB, - "oldestLoaded": s.oldestLoaded, - "estimatedMB": estimatedMB, - "trackedMB": trackedMB, + "totalLoaded": totalLoaded, + "totalObservations": totalObs, + "evicted": evicted, + "inserts": atomic.LoadInt64(&s.insertCount), + "queries": atomic.LoadInt64(&s.queryCount), + "inMemory": totalLoaded, + "sqliteOnly": false, + "retentionHours": retentionHours, + "maxMemoryMB": maxMemoryMB, + "oldestLoaded": oldestLoaded, + "estimatedMB": estimatedMB, + "trackedMB": trackedMB, + "hotStartupHours": hotStartupHours, + "backgroundLoadComplete": s.backgroundLoadDone.Load(), + "backgroundLoadProgress": s.backgroundLoadProgress.Load(), "indexes": map[string]interface{}{ "byHash": hashIdx, "byTxID": txIdx, diff --git a/config.example.json b/config.example.json index 988d8bd4..df16cde0 100644 --- a/config.example.json +++ b/config.example.json @@ -224,7 +224,8 @@ "maxMemoryMB": 1024, "estimatedPacketBytes": 450, "retentionHours": 168, - "_comment": "In-memory packet store. maxMemoryMB caps RAM usage. retentionHours: only packets younger than this are loaded on startup and kept in memory (0 = unlimited, not recommended for large DBs — causes OOM on cold start). 168 = 7 days. Must be ≤ retention.packetDays * 24.", + "hotStartupHours": 24, + "_comment": "In-memory packet store. maxMemoryMB caps RAM usage. retentionHours: only packets younger than this are kept in memory (0 = unlimited). hotStartupHours: hours loaded synchronously at startup; background loader fills the remaining retentionHours window. Set to 24 for <60s startup on large DBs. 0 = disabled (loads full retentionHours at startup, legacy behavior).", "_comment_gomemlimit": "On startup the server reads GOMEMLIMIT from the environment if set; otherwise it derives a Go runtime soft memory limit of maxMemoryMB * 1.5 and applies it via debug.SetMemoryLimit. This forces aggressive GC under cgroup pressure so the process self-throttles before the kernel SIGKILLs it. To override, set GOMEMLIMIT explicitly (e.g. GOMEMLIMIT=850MiB). See issue #836." }, "resolvedPath": { diff --git a/test-e2e-playwright.js b/test-e2e-playwright.js index 6bbd87e5..b15e39fc 100644 --- a/test-e2e-playwright.js +++ b/test-e2e-playwright.js @@ -2052,7 +2052,17 @@ async function run() { // Test: per-observation raw_hex — hex pane updates when switching observations (#881) await test('Packet detail hex pane updates per observation', async () => { - await page.goto(BASE + '#/packets', { waitUntil: 'domcontentloaded' }); + // The prior "Expanded group children" test leaves groupbyhash=true and the detail pane open. + // A simple page.goto is treated as a hash-change (same document), so the SPA closes the + // detail pane (hiding its 30+ field/obs table rows) before the main table's API fetch + // completes — waitForSelector sees only hidden rows and times out. + // Fix: set localStorage, navigate, then reload with 'load' (same pattern the prior test uses). + await page.evaluate(() => { + localStorage.removeItem('meshcore-groupbyhash'); + localStorage.setItem('meshcore-time-window', '525600'); + }); + await page.goto(`${BASE}/#/packets`, { waitUntil: 'domcontentloaded' }); + await page.reload({ waitUntil: 'load' }); await page.waitForSelector('table tbody tr:not([id^=vscroll])', { timeout: 15000 }); await page.waitForTimeout(500);