From 814d168941379d8972d96155021ef68e12cc0094 Mon Sep 17 00:00:00 2001 From: efiten Date: Sat, 9 May 2026 20:47:27 +0200 Subject: [PATCH 01/11] feat(startup): add packetStore.hotStartupHours config field and PacketStore wiring Co-Authored-By: Claude Sonnet 4.6 --- cmd/server/config.go | 7 ++- cmd/server/hot_startup_test.go | 106 +++++++++++++++++++++++++++++++++ cmd/server/store.go | 14 +++++ 3 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 cmd/server/hot_startup_test.go diff --git a/cmd/server/config.go b/cmd/server/config.go index 2a0a89e5..b9454d1f 100644 --- a/cmd/server/config.go +++ b/cmd/server/config.go @@ -134,9 +134,10 @@ type NeighborGraphConfig struct { // PacketStoreConfig controls in-memory packet store limits. type PacketStoreConfig struct { - RetentionHours float64 `json:"retentionHours"` // max age of packets in hours (0 = unlimited) - MaxMemoryMB int `json:"maxMemoryMB"` // hard memory ceiling in MB (0 = unlimited) - MaxResolvedPubkeyIndexEntries int `json:"maxResolvedPubkeyIndexEntries"` // warning threshold for index size (0 = 5M default) + RetentionHours float64 `json:"retentionHours"` // max age of packets in hours (0 = unlimited) + MaxMemoryMB int `json:"maxMemoryMB"` // hard memory ceiling in MB (0 = unlimited) + MaxResolvedPubkeyIndexEntries int `json:"maxResolvedPubkeyIndexEntries"` // warning threshold for index size (0 = 5M default) + HotStartupHours float64 `json:"hotStartupHours"` // load only this many hours synchronously; 0 = disabled } // GeoFilterConfig is an alias for the shared geofilter.Config type. diff --git a/cmd/server/hot_startup_test.go b/cmd/server/hot_startup_test.go new file mode 100644 index 00000000..0df1cb11 --- /dev/null +++ b/cmd/server/hot_startup_test.go @@ -0,0 +1,106 @@ +package main + +import ( + "database/sql" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +// createTestDBMultiDay creates a test DB with packets spread across numDays days. +// txPerDay transmissions are inserted per day, oldest day first. +// Packets within each day are spaced 1 minute apart. +func createTestDBMultiDay(t *testing.T, numDays, txPerDay int) string { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + execOrFail := func(s string) { + if _, err := conn.Exec(s); err != nil { + t.Fatalf("createTestDBMultiDay setup: %v", err) + } + } + execOrFail(`CREATE TABLE transmissions (id INTEGER PRIMARY KEY, raw_hex TEXT, hash TEXT, first_seen TEXT, route_type INTEGER, payload_type INTEGER, payload_version INTEGER, decoded_json TEXT)`) + execOrFail(`CREATE TABLE observations (id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, observer_name TEXT, direction TEXT, snr REAL, rssi REAL, score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT)`) + execOrFail(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`) + execOrFail(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`) + execOrFail(`CREATE TABLE schema_version (version INTEGER)`) + execOrFail(`INSERT INTO schema_version (version) VALUES (1)`) + execOrFail(`CREATE INDEX idx_tx_first_seen ON transmissions(first_seen)`) + + id := 1 + now := time.Now().UTC() + for day := numDays; day >= 1; day-- { + base := now.Add(-time.Duration(day) * 24 * time.Hour) + for i := 0; i < txPerDay; i++ { + ts := base.Add(time.Duration(i) * time.Minute).Format(time.RFC3339) + hash := fmt.Sprintf("hash%06d", id) + conn.Exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)", id, "aa", hash, ts, `{}`) + conn.Exec("INSERT INTO observations VALUES (?,?,?,?,?,?,?,?,?,?,?)", id, id, "obs1", "Obs1", "RX", -10.0, -80.0, 5, `[]`, ts, "") + id++ + } + } + return dbPath +} + +// waitForBackgroundLoad polls backgroundLoadDone until true or timeout. +func waitForBackgroundLoad(t *testing.T, store *PacketStore, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if store.backgroundLoadDone.Load() { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("background load did not complete within %v", timeout) +} + +func TestHotStartupConfig_Clamp(t *testing.T) { + dbPath := createTestDB(t, 10) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + // hotStartupHours > retentionHours → must be clamped + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 24, + HotStartupHours: 48, + }) + if store.hotStartupHours != 24 { + t.Errorf("expected hotStartupHours clamped to retentionHours=24, got %f", store.hotStartupHours) + } +} + +func TestHotStartupConfig_ZeroIsDisabled(t *testing.T) { + dbPath := createTestDB(t, 10) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 24, + HotStartupHours: 0, + }) + if store.hotStartupHours != 0 { + t.Errorf("expected hotStartupHours=0, got %f", store.hotStartupHours) + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index a9b6662d..600e63dd 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -231,6 +231,12 @@ type PacketStore struct { // Empty string means all data is in memory (no limit applied). oldestLoaded string + // Hot startup: only hotStartupHours of data is loaded synchronously. + // 0 = disabled (current behavior). Background loader fills the rest. + hotStartupHours float64 + backgroundLoadDone atomic.Bool + backgroundLoadProgress atomic.Int64 // 0–100 percent complete + // Async hash migration state: set after migrateContentHashesAsync completes. hashMigrationComplete atomic.Bool @@ -416,6 +422,14 @@ func NewPacketStore(db *DB, cfg *PacketStoreConfig, cacheTTLs ...map[string]inte ps.retentionHours = cfg.RetentionHours ps.maxMemoryMB = cfg.MaxMemoryMB ps.maxResolvedPubkeyIndexEntries = cfg.MaxResolvedPubkeyIndexEntries + if cfg.HotStartupHours > 0 { + h := cfg.HotStartupHours + if ps.retentionHours > 0 && h > ps.retentionHours { + log.Printf("[store] warning: hotStartupHours (%.0f) > retentionHours (%.0f) — clamping", h, ps.retentionHours) + h = ps.retentionHours + } + ps.hotStartupHours = h + } } // Wire cacheTTL config values to server-side cache durations. if len(cacheTTLs) > 0 && cacheTTLs[0] != nil { From e5e23475a79f877ca0b47fb08f580736f5ce66ac Mon Sep 17 00:00:00 2001 From: efiten Date: Sun, 10 May 2026 10:01:44 +0200 Subject: [PATCH 02/11] feat(startup): Load() uses hotStartupHours as WHERE cutoff when set When hotStartupHours > 0, Load() applies it as the initial time-window cutoff instead of retentionHours, enabling fast startup with a smaller in-memory set. Adds TestHotStartup_LoadsOnlyHotWindow and TestHotStartup_DisabledWhenZero to verify both code paths. --- cmd/server/hot_startup_test.go | 63 ++++++++++++++++++++++++++++++++++ cmd/server/store.go | 9 +++-- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/cmd/server/hot_startup_test.go b/cmd/server/hot_startup_test.go index 0df1cb11..629aa799 100644 --- a/cmd/server/hot_startup_test.go +++ b/cmd/server/hot_startup_test.go @@ -104,3 +104,66 @@ func TestHotStartupConfig_ZeroIsDisabled(t *testing.T) { t.Errorf("expected hotStartupHours=0, got %f", store.hotStartupHours) } } + +func TestHotStartup_LoadsOnlyHotWindow(t *testing.T) { + // 50 old packets (48h ago), 10 recent (30min ago) + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, // load only last 1 hour + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // Only the 10 recent packets should be in memory + if len(store.packets) != 10 { + t.Errorf("expected 10 recent packets in hot window, got %d", len(store.packets)) + } + // oldestLoaded should be ~1h ago + if store.oldestLoaded == "" { + t.Fatal("oldestLoaded must be set after Load()") + } + oldest, _ := time.Parse(time.RFC3339, store.oldestLoaded) + diff := time.Since(oldest) + if diff < 30*time.Minute || diff > 90*time.Minute { + t.Errorf("oldestLoaded %s should be ~1h ago, got diff=%v", store.oldestLoaded, diff) + } + // backgroundLoadDone must not be set by Load() itself + if store.backgroundLoadDone.Load() { + t.Error("backgroundLoadDone must not be true after Load()") + } +} + +func TestHotStartup_DisabledWhenZero(t *testing.T) { + // 50 old (48h ago), 10 recent (30min ago) — all within 72h retention + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 0, // disabled → load all retentionHours as before + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // All 60 packets should be loaded (both old and recent within 72h) + if len(store.packets) != 60 { + t.Errorf("expected 60 packets with hotStartupHours=0, got %d", len(store.packets)) + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index 600e63dd..c2a0a6c4 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -488,9 +488,14 @@ func (s *PacketStore) Load() error { } // Build WHERE conditions: retention cutoff (mirrors Evict logic) + optional memory-cap limit. + // When hotStartupHours > 0, use it as the initial cutoff (smaller window = fast startup). var loadConditions []string - if s.retentionHours > 0 { - cutoff := time.Now().UTC().Add(-time.Duration(s.retentionHours*3600) * time.Second).Format(time.RFC3339) + hotCutoffHours := s.retentionHours + if s.hotStartupHours > 0 { + hotCutoffHours = s.hotStartupHours + } + if hotCutoffHours > 0 { + cutoff := time.Now().UTC().Add(-time.Duration(hotCutoffHours*3600) * time.Second).Format(time.RFC3339) loadConditions = append(loadConditions, fmt.Sprintf("t.first_seen >= '%s'", cutoff)) } if maxPackets > 0 { From 5bd5194485f8815b3f47f77ffe44ca9b7687dac2 Mon Sep 17 00:00:00 2001 From: efiten Date: Sun, 10 May 2026 12:35:34 +0200 Subject: [PATCH 03/11] =?UTF-8?q?feat(startup):=20implement=20loadChunk=20?= =?UTF-8?q?=E2=80=94=20SQL=20scan=20outside=20lock,=20merge=20under=20writ?= =?UTF-8?q?e=20lock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/server/hot_startup_test.go | 46 +++++++ cmd/server/store.go | 223 +++++++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) diff --git a/cmd/server/hot_startup_test.go b/cmd/server/hot_startup_test.go index 629aa799..71755a93 100644 --- a/cmd/server/hot_startup_test.go +++ b/cmd/server/hot_startup_test.go @@ -167,3 +167,49 @@ func TestHotStartup_DisabledWhenZero(t *testing.T) { t.Errorf("expected 60 packets with hotStartupHours=0, got %d", len(store.packets)) } } + +func TestHotStartup_loadChunk_AddsOlderData(t *testing.T) { + // 50 old packets (48h ago), 10 recent (30min ago) + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + if len(store.packets) != 10 { + t.Fatalf("setup: expected 10 packets after hot Load, got %d", len(store.packets)) + } + + // Load the old chunk (covers the 50 old packets at ~48h ago) + chunkEnd := time.Now().UTC().Add(-1 * time.Hour) + chunkStart := time.Now().UTC().Add(-72 * time.Hour) + if err := store.loadChunk(chunkStart, chunkEnd); err != nil { + t.Fatalf("loadChunk failed: %v", err) + } + + // Should have 10 recent + 50 old + if len(store.packets) != 60 { + t.Errorf("expected 60 packets after loadChunk, got %d", len(store.packets)) + } + // Packets must remain sorted ASC by first_seen + for i := 1; i < len(store.packets); i++ { + if store.packets[i].FirstSeen < store.packets[i-1].FirstSeen { + t.Fatalf("packets not in ASC order at index %d: %s < %s", + i, store.packets[i].FirstSeen, store.packets[i-1].FirstSeen) + } + } + // byHash must include the old packets + if len(store.byHash) != 60 { + t.Errorf("expected byHash len=60, got %d", len(store.byHash)) + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index c2a0a6c4..ca6520ea 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -699,6 +699,229 @@ func (s *PacketStore) Load() error { return nil } +// loadChunk queries a [from, to) time window from SQLite without holding the +// write lock, builds local data structures, then merges them into the store +// under s.mu.Lock(). It is the building block for the background loader. +// +// The chunk is assumed to be older than the data already in the store, so +// localPackets are prepended to s.packets. +// +// byPathHop, spIndex, and distHops are NOT updated here — the caller +// (loadBackgroundChunks) rebuilds those once after all chunks are merged. +func (s *PacketStore) loadChunk(from, to time.Time) error { + fromStr := from.UTC().Format(time.RFC3339) + toStr := to.UTC().Format(time.RFC3339) + + // Build the same SQL as Load() but with a [from, to) window. + rpCol := "" + if s.db.hasResolvedPath { + rpCol = ",\n\t\t\t\to.resolved_path" + } + obsRawHexCol := "" + if s.db.hasObsRawHex { + obsRawHexCol = ", o.raw_hex" + } + + filterClause := fmt.Sprintf("\n\t\t\tWHERE t.first_seen >= '%s' AND t.first_seen < '%s'", fromStr, toStr) + + var chunkSQL string + if s.db.isV3 { + chunkSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, + t.payload_type, t.payload_version, t.decoded_json, + o.id, obs.id, obs.name, o.direction, + o.snr, o.rssi, o.score, o.path_json, strftime('%Y-%m-%dT%H:%M:%fZ', o.timestamp, 'unixepoch')` + obsRawHexCol + rpCol + ` + FROM transmissions t + LEFT JOIN observations o ON o.transmission_id = t.id + LEFT JOIN observers obs ON obs.rowid = o.observer_idx` + filterClause + ` + ORDER BY t.first_seen ASC, o.timestamp DESC` + } else { + chunkSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, + t.payload_type, t.payload_version, t.decoded_json, + o.id, o.observer_id, o.observer_name, o.direction, + o.snr, o.rssi, o.score, o.path_json, o.timestamp` + obsRawHexCol + rpCol + ` + FROM transmissions t + LEFT JOIN observations o ON o.transmission_id = t.id` + filterClause + ` + ORDER BY t.first_seen ASC, o.timestamp DESC` + } + + rows, err := s.db.conn.Query(chunkSQL) + if err != nil { + return err + } + defer rows.Close() + + // Local data structures — built without holding the lock. + localByHash := make(map[string]*StoreTx) + localPackets := make([]*StoreTx, 0) + localByTxID := make(map[int]*StoreTx) + localByObsID := make(map[int]*StoreObs) + localByObserver := make(map[string][]*StoreObs) + var localTotalObs int + var localTrackedBytes int64 + var localMaxTxID int + var localMaxObsID int + + for rows.Next() { + var txID int + var rawHex, hash, firstSeen, decodedJSON sql.NullString + var routeType, payloadType, payloadVersion sql.NullInt64 + var obsID sql.NullInt64 + var observerID, observerName, direction, pathJSON, obsTimestamp sql.NullString + var snr, rssi sql.NullFloat64 + var score sql.NullInt64 + var obsRawHex sql.NullString + var resolvedPathStr sql.NullString + + scanArgs := []interface{}{&txID, &rawHex, &hash, &firstSeen, &routeType, &payloadType, + &payloadVersion, &decodedJSON, + &obsID, &observerID, &observerName, &direction, + &snr, &rssi, &score, &pathJSON, &obsTimestamp} + if s.db.hasObsRawHex { + scanArgs = append(scanArgs, &obsRawHex) + } + if s.db.hasResolvedPath { + scanArgs = append(scanArgs, &resolvedPathStr) + } + if err := rows.Scan(scanArgs...); err != nil { + log.Printf("[store] loadChunk scan error: %v", err) + continue + } + + hashStr := nullStrVal(hash) + tx := localByHash[hashStr] + if tx == nil { + tx = &StoreTx{ + ID: txID, + RawHex: nullStrVal(rawHex), + Hash: hashStr, + FirstSeen: nullStrVal(firstSeen), + LatestSeen: nullStrVal(firstSeen), + RouteType: nullIntPtr(routeType), + PayloadType: nullIntPtr(payloadType), + DecodedJSON: nullStrVal(decodedJSON), + obsKeys: make(map[string]bool), + observerSet: make(map[string]bool), + } + localByHash[hashStr] = tx + localPackets = append(localPackets, tx) + localByTxID[txID] = tx + if txID > localMaxTxID { + localMaxTxID = txID + } + localTrackedBytes += estimateStoreTxBytes(tx) + } + + if obsID.Valid { + oid := int(obsID.Int64) + obsIDStr := nullStrVal(observerID) + obsPJ := nullStrVal(pathJSON) + + dk := obsIDStr + "|" + obsPJ + if tx.obsKeys[dk] { + continue + } + + obs := &StoreObs{ + ID: oid, + TransmissionID: txID, + ObserverID: obsIDStr, + ObserverName: nullStrVal(observerName), + Direction: nullStrVal(direction), + SNR: nullFloatPtr(snr), + RSSI: nullFloatPtr(rssi), + Score: nullIntPtr(score), + PathJSON: obsPJ, + RawHex: nullStrVal(obsRawHex), + Timestamp: normalizeTimestamp(nullStrVal(obsTimestamp)), + } + + tx.Observations = append(tx.Observations, obs) + tx.obsKeys[dk] = true + if obs.ObserverID != "" && !tx.observerSet[obs.ObserverID] { + tx.observerSet[obs.ObserverID] = true + tx.UniqueObserverCount++ + } + tx.ObservationCount++ + if obs.Timestamp > tx.LatestSeen { + tx.LatestSeen = obs.Timestamp + } + + localByObsID[oid] = obs + if oid > localMaxObsID { + localMaxObsID = oid + } + if obsIDStr != "" { + localByObserver[obsIDStr] = append(localByObserver[obsIDStr], obs) + } + localTotalObs++ + localTrackedBytes += estimateStoreObsBytes(obs) + } + } + if err := rows.Err(); err != nil { + return err + } + + // Pick best observation for each local packet before merging. + for _, tx := range localPackets { + pickBestObservation(tx) + } + + if len(localPackets) == 0 { + return nil + } + + // Merge under write lock. + s.mu.Lock() + defer s.mu.Unlock() + + // Prepend: chunk is older than the hot window already in store. + s.packets = append(localPackets, s.packets...) + + // Merge indexes. + for k, v := range localByHash { + if s.byHash[k] == nil { + s.byHash[k] = v + } + } + for k, v := range localByTxID { + if s.byTxID[k] == nil { + s.byTxID[k] = v + } + } + for k, v := range localByObsID { + if s.byObsID[k] == nil { + s.byObsID[k] = v + } + } + for k, obs := range localByObserver { + s.byObserver[k] = append(s.byObserver[k], obs...) + } + + // Index each local packet into byNode and byPayloadType. + for _, tx := range localPackets { + s.indexByNode(tx) + if tx.PayloadType != nil { + pt := *tx.PayloadType + s.byPayloadType[pt] = append(s.byPayloadType[pt], tx) + } + s.trackAdvertPubkey(tx) + } + + // Update counters. + s.totalObs += localTotalObs + s.trackedBytes += localTrackedBytes + if localMaxTxID > s.maxTxID { + s.maxTxID = localMaxTxID + } + if localMaxObsID > s.maxObsID { + s.maxObsID = localMaxObsID + } + s.oldestLoaded = fromStr + + log.Printf("[store] background chunk [%s, %s) merged: %d tx, %d obs", fromStr, toStr, len(localPackets), localTotalObs) + return nil +} + // pickBestObservation selects the observation with the longest path // and sets it as the transmission's display observation. func pickBestObservation(tx *StoreTx) { From 99c2e8baef49b7a2b445a51967a99cf151a0f766 Mon Sep 17 00:00:00 2001 From: efiten Date: Sun, 10 May 2026 14:13:13 +0200 Subject: [PATCH 04/11] =?UTF-8?q?feat(startup):=20implement=20loadBackgrou?= =?UTF-8?q?ndChunks=20=E2=80=94=20progressive=20daily=20chunk=20loading=20?= =?UTF-8?q?with=20error=20recovery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/server/hot_startup_test.go | 80 ++++++++++++++++++++++++++++++++ cmd/server/store.go | 84 ++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+) diff --git a/cmd/server/hot_startup_test.go b/cmd/server/hot_startup_test.go index 71755a93..faa4aa20 100644 --- a/cmd/server/hot_startup_test.go +++ b/cmd/server/hot_startup_test.go @@ -213,3 +213,83 @@ func TestHotStartup_loadChunk_AddsOlderData(t *testing.T) { t.Errorf("expected byHash len=60, got %d", len(store.byHash)) } } + +func TestHotStartup_BackgroundFillsToRetention(t *testing.T) { + // 3 days × 50 tx/day = 150 total + dbPath := createTestDBMultiDay(t, 3, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 24, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // After hot Load: only ~50 packets (day 1 = last 24h) + afterHot := len(store.packets) + if afterHot < 1 || afterHot > 60 { + t.Errorf("expected ~50 packets after hot Load, got %d", afterHot) + } + + // Start background fill + go store.loadBackgroundChunks() + waitForBackgroundLoad(t, store, 15*time.Second) + + // After background fill: all 150 packets should be loaded + store.mu.RLock() + total := len(store.packets) + store.mu.RUnlock() + + if total < 140 || total > 160 { + t.Errorf("expected ~150 packets after background load, got %d", total) + } + if !store.backgroundLoadDone.Load() { + t.Error("backgroundLoadDone must be true after loadBackgroundChunks returns") + } +} + +func TestHotStartup_ChunkErrorRecovery(t *testing.T) { + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // Close the DB so all subsequent chunk queries fail. + db.conn.Close() + + done := make(chan struct{}) + go func() { + store.loadBackgroundChunks() + close(done) + }() + + select { + case <-done: + // Good — completed without hanging. + case <-time.After(10 * time.Second): + t.Fatal("loadBackgroundChunks hung after DB close") + } + + if !store.backgroundLoadDone.Load() { + t.Error("backgroundLoadDone must be set even when all chunks fail") + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index ca6520ea..364e16e8 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -922,6 +922,90 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { return nil } +// loadBackgroundChunks fills the remaining retentionHours window by loading +// daily chunks from oldestLoaded back to the retention cutoff. After all +// chunks are merged it rebuilds analytics indexes once. Chunk errors are +// handled by advancing past the failed window so the loop always terminates. +func (s *PacketStore) loadBackgroundChunks() { + if s.retentionHours <= 0 { + s.backgroundLoadDone.Store(true) + return + } + + target := time.Now().UTC().Add(-time.Duration(s.retentionHours*3600) * time.Second) + totalHours := s.retentionHours - s.hotStartupHours + if totalHours <= 0 { + s.backgroundLoadDone.Store(true) + return + } + + var chunksLoaded float64 + totalChunks := math.Ceil(totalHours / 24) + + for { + s.mu.RLock() + oldest := s.oldestLoaded + s.mu.RUnlock() + + if oldest == "" { + break + } + chunkEnd, err := time.Parse(time.RFC3339, oldest) + if err != nil { + log.Printf("[store] background loader: bad oldestLoaded %q: %v", oldest, err) + break + } + if !chunkEnd.After(target) { + break + } + + chunkStart := chunkEnd.Add(-24 * time.Hour) + if chunkStart.Before(target) { + chunkStart = target + } + + chunkStartStr := chunkStart.Format(time.RFC3339) + if err := s.loadChunk(chunkStart, chunkEnd); err != nil { + log.Printf("[store] background chunk [%s, %s) error: %v — advancing past it", + chunkStartStr, chunkEnd.Format(time.RFC3339), err) + } + // Always advance oldestLoaded to chunkStart so the loop terminates, + // even when the chunk was empty (loadChunk skips the update when 0 packets). + s.mu.Lock() + if s.oldestLoaded > chunkStartStr || s.oldestLoaded == "" { + s.oldestLoaded = chunkStartStr + } + s.mu.Unlock() + + chunksLoaded++ + if totalChunks > 0 { + pct := int64(chunksLoaded / totalChunks * 100) + if pct > 100 { + pct = 100 + } + s.backgroundLoadProgress.Store(pct) + } + + runtime.Gosched() + } + + // Rebuild analytics indexes once after all chunks are merged. + s.mu.Lock() + s.buildSubpathIndex() + s.buildPathHopIndex() + s.buildDistanceIndex() + s.mu.Unlock() + + s.backgroundLoadDone.Store(true) + s.backgroundLoadProgress.Store(100) + + s.mu.RLock() + totalPkts := len(s.packets) + oldest := s.oldestLoaded + s.mu.RUnlock() + log.Printf("[store] background load complete: %d packets in memory, oldestLoaded=%s", totalPkts, oldest) +} + // pickBestObservation selects the observation with the longest path // and sets it as the transmission's display observation. func pickBestObservation(tx *StoreTx) { From 4e6bf6f884f5b678f20879a78707879cc8d70186 Mon Sep 17 00:00:00 2001 From: efiten Date: Sun, 10 May 2026 14:56:24 +0200 Subject: [PATCH 05/11] feat(startup): SQL fallback in QueryPackets/QueryGroupedPackets when since/until predates oldestLoaded Also fix oldestLoaded to reflect the hot-window cutoff boundary (not the first packet's FirstSeen) so that the fallback threshold is accurate. --- cmd/server/hot_startup_test.go | 63 ++++++++++++++++++++++++++++++++++ cmd/server/store.go | 38 +++++++++++++++++++- 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/cmd/server/hot_startup_test.go b/cmd/server/hot_startup_test.go index faa4aa20..57a08781 100644 --- a/cmd/server/hot_startup_test.go +++ b/cmd/server/hot_startup_test.go @@ -293,3 +293,66 @@ func TestHotStartup_ChunkErrorRecovery(t *testing.T) { t.Error("backgroundLoadDone must be set even when all chunks fail") } } + +func TestHotStartup_SQLFallback_TriggeredForOldDate(t *testing.T) { + // 50 old packets (48h ago), 10 recent (30min ago) + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + // Hot load: only last 1h → 10 recent packets in memory + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + if len(store.packets) != 10 { + t.Fatalf("setup: expected 10 in-memory packets, got %d", len(store.packets)) + } + + // Query with Since = 49h ago (before oldestLoaded ~1h ago) → SQL fallback + since49h := time.Now().UTC().Add(-49 * time.Hour).Format(time.RFC3339) + result := store.QueryPackets(PacketQuery{Since: since49h, Limit: 100, Order: "ASC"}) + + // SQL fallback should return the old packets (stored at ~48h ago) + if result.Total == 0 { + t.Error("expected SQL fallback to return old packets for query before oldestLoaded") + } +} + +func TestHotStartup_SQLFallback_NotTriggeredForRecentDate(t *testing.T) { + // 50 old packets (48h ago), 10 recent (30min ago) + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + // Hot load: last 1h → 10 recent packets in memory + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + // Query with Since = 45min ago (after oldestLoaded ~1h ago) → in-memory path + since45m := time.Now().UTC().Add(-45 * time.Minute).Format(time.RFC3339) + result := store.QueryPackets(PacketQuery{Since: since45m, Limit: 100, Order: "ASC"}) + + // In-memory path: returns only the 10 recent packets (all within last 30min) + if result.Total != 10 { + t.Errorf("expected 10 in-memory packets for recent Since query, got %d", result.Total) + } +} diff --git a/cmd/server/store.go b/cmd/server/store.go index 364e16e8..2dadf18f 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -683,7 +683,13 @@ func (s *PacketStore) Load() error { s.buildDistanceIndex() // Track oldest loaded timestamp for future SQL fallback queries. - if len(s.packets) > 0 { + // When hotStartupHours > 0 the load window boundary (cutoff) is the + // authoritative lower bound, not the first packet's timestamp (which may be + // newer if no packets landed exactly at the boundary). + if s.hotStartupHours > 0 { + hotCutoffStr := time.Now().UTC().Add(-time.Duration(s.hotStartupHours*3600) * time.Second).Format(time.RFC3339) + s.oldestLoaded = hotCutoffStr + } else if len(s.packets) > 0 { s.oldestLoaded = s.packets[0].FirstSeen } @@ -1156,6 +1162,22 @@ func (s *PacketStore) untrackAdvertPubkey(tx *StoreTx) { // QueryPackets returns filtered, paginated packets from memory. func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult { + // SQL fallback: if the query window predates the in-memory window, delegate + // to the DB layer which covers the full SQLite retention period. + s.mu.RLock() + oldest := s.oldestLoaded + s.mu.RUnlock() + if oldest != "" { + needsSQL := (q.Since != "" && q.Since < oldest) || + (q.Until != "" && q.Until < oldest) + if needsSQL { + if result, err := s.db.QueryPackets(q); err == nil { + return result + } else { + log.Printf("[store] QueryPackets SQL fallback failed: %v — using in-memory", err) + } + } + } atomic.AddInt64(&s.queryCount, 1) s.mu.RLock() defer s.mu.RUnlock() @@ -1202,6 +1224,20 @@ func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult { // QueryGroupedPackets returns transmissions grouped by hash (already 1:1). func (s *PacketStore) QueryGroupedPackets(q PacketQuery) *PacketResult { + s.mu.RLock() + oldest := s.oldestLoaded + s.mu.RUnlock() + if oldest != "" { + needsSQL := (q.Since != "" && q.Since < oldest) || + (q.Until != "" && q.Until < oldest) + if needsSQL { + if result, err := s.db.QueryGroupedPackets(q); err == nil { + return result + } else { + log.Printf("[store] QueryGroupedPackets SQL fallback failed: %v — using in-memory", err) + } + } + } atomic.AddInt64(&s.queryCount, 1) if q.Limit <= 0 { From 96caadef8eaa60c3ddcb99a6091787c1ee674a04 Mon Sep 17 00:00:00 2001 From: efiten Date: Sun, 10 May 2026 15:11:32 +0200 Subject: [PATCH 06/11] feat(startup): expose hotStartupHours/backgroundLoadComplete/Progress in /api/perf/store --- cmd/server/hot_startup_test.go | 31 +++++++++++++++++++++++++++++++ cmd/server/store.go | 27 +++++++++++++++------------ 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/cmd/server/hot_startup_test.go b/cmd/server/hot_startup_test.go index 57a08781..0ef3f0d0 100644 --- a/cmd/server/hot_startup_test.go +++ b/cmd/server/hot_startup_test.go @@ -327,6 +327,37 @@ func TestHotStartup_SQLFallback_TriggeredForOldDate(t *testing.T) { } } +func TestHotStartup_PerfStats(t *testing.T) { + dbPath := createTestDBWithAgedPackets(t, 10, 50) + defer os.RemoveAll(filepath.Dir(dbPath)) + + db, err := OpenDB(dbPath) + if err != nil { + t.Fatal(err) + } + defer db.conn.Close() + + store := NewPacketStore(db, &PacketStoreConfig{ + RetentionHours: 72, + HotStartupHours: 1, + }) + if err := store.Load(); err != nil { + t.Fatal(err) + } + + stats := store.GetPerfStoreStats() + + if v, ok := stats["hotStartupHours"]; !ok || v.(float64) != 1 { + t.Errorf("expected hotStartupHours=1 in stats, got %v", v) + } + if v, ok := stats["backgroundLoadComplete"]; !ok || v.(bool) != false { + t.Errorf("expected backgroundLoadComplete=false in stats, got %v", v) + } + if _, ok := stats["backgroundLoadProgress"]; !ok { + t.Error("expected backgroundLoadProgress in stats") + } +} + func TestHotStartup_SQLFallback_NotTriggeredForRecentDate(t *testing.T) { // 50 old packets (48h ago), 10 recent (30min ago) dbPath := createTestDBWithAgedPackets(t, 10, 50) diff --git a/cmd/server/store.go b/cmd/server/store.go index 2dadf18f..13319976 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -1416,18 +1416,21 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { evicted := atomic.LoadInt64(&s.evicted) return map[string]interface{}{ - "totalLoaded": totalLoaded, - "totalObservations": totalObs, - "evicted": evicted, - "inserts": atomic.LoadInt64(&s.insertCount), - "queries": atomic.LoadInt64(&s.queryCount), - "inMemory": totalLoaded, - "sqliteOnly": false, - "retentionHours": s.retentionHours, - "maxMemoryMB": s.maxMemoryMB, - "oldestLoaded": s.oldestLoaded, - "estimatedMB": estimatedMB, - "trackedMB": trackedMB, + "totalLoaded": totalLoaded, + "totalObservations": totalObs, + "evicted": evicted, + "inserts": atomic.LoadInt64(&s.insertCount), + "queries": atomic.LoadInt64(&s.queryCount), + "inMemory": totalLoaded, + "sqliteOnly": false, + "retentionHours": s.retentionHours, + "maxMemoryMB": s.maxMemoryMB, + "oldestLoaded": s.oldestLoaded, + "estimatedMB": estimatedMB, + "trackedMB": trackedMB, + "hotStartupHours": s.hotStartupHours, + "backgroundLoadComplete": s.backgroundLoadDone.Load(), + "backgroundLoadProgress": s.backgroundLoadProgress.Load(), "indexes": map[string]interface{}{ "byHash": hashIdx, "byTxID": txIdx, From 2fe8560891d98fa8b92b4d58e0dc134dba44462e Mon Sep 17 00:00:00 2001 From: efiten Date: Sun, 10 May 2026 15:50:32 +0200 Subject: [PATCH 07/11] feat(startup): start loadBackgroundChunks goroutine in main after store.Load() --- cmd/server/main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/server/main.go b/cmd/server/main.go index 9b3ad842..64f605e2 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -175,6 +175,11 @@ func main() { if err := store.Load(); err != nil { log.Fatalf("[store] failed to load: %v", err) } + if store.hotStartupHours > 0 { + log.Printf("[store] starting background load: filling retentionHours=%.0fh from hotStartupHours=%.0fh", + store.retentionHours, store.hotStartupHours) + go store.loadBackgroundChunks() + } // Initialize persisted neighbor graph dbPath = database.path From 0efa336872a6b1508b1e43898f9a1470c40c29b3 Mon Sep 17 00:00:00 2001 From: efiten Date: Sun, 10 May 2026 17:46:23 +0200 Subject: [PATCH 08/11] docs: document packetStore.hotStartupHours in config.example.json --- config.example.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/config.example.json b/config.example.json index 988d8bd4..df16cde0 100644 --- a/config.example.json +++ b/config.example.json @@ -224,7 +224,8 @@ "maxMemoryMB": 1024, "estimatedPacketBytes": 450, "retentionHours": 168, - "_comment": "In-memory packet store. maxMemoryMB caps RAM usage. retentionHours: only packets younger than this are loaded on startup and kept in memory (0 = unlimited, not recommended for large DBs — causes OOM on cold start). 168 = 7 days. Must be ≤ retention.packetDays * 24.", + "hotStartupHours": 24, + "_comment": "In-memory packet store. maxMemoryMB caps RAM usage. retentionHours: only packets younger than this are kept in memory (0 = unlimited). hotStartupHours: hours loaded synchronously at startup; background loader fills the remaining retentionHours window. Set to 24 for <60s startup on large DBs. 0 = disabled (loads full retentionHours at startup, legacy behavior).", "_comment_gomemlimit": "On startup the server reads GOMEMLIMIT from the environment if set; otherwise it derives a Go runtime soft memory limit of maxMemoryMB * 1.5 and applies it via debug.SetMemoryLimit. This forces aggressive GC under cgroup pressure so the process self-throttles before the kernel SIGKILLs it. To override, set GOMEMLIMIT explicitly (e.g. GOMEMLIMIT=850MiB). See issue #836." }, "resolvedPath": { From a6f8d3bad5339c7f5effb1e53c6869e47ad0dbb4 Mon Sep 17 00:00:00 2001 From: efiten Date: Sun, 10 May 2026 18:23:03 +0200 Subject: [PATCH 09/11] fix(startup): fix data race in GetPerfStoreStats and byObserver double-append in loadChunk --- cmd/server/store.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/cmd/server/store.go b/cmd/server/store.go index 13319976..204aab7c 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -899,8 +899,12 @@ func (s *PacketStore) loadChunk(from, to time.Time) error { s.byObsID[k] = v } } - for k, obs := range localByObserver { - s.byObserver[k] = append(s.byObserver[k], obs...) + for observerID, obsList := range localByObserver { + for _, o := range obsList { + if s.byObsID[o.ID] == nil { + s.byObserver[observerID] = append(s.byObserver[observerID], o) + } + } } // Index each local packet into byNode and byPayloadType. @@ -1405,6 +1409,10 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { nodeIdx := len(s.byNode) pathHopIdx := len(s.byPathHop) ptIdx := len(s.byPayloadType) + oldestLoaded := s.oldestLoaded + retentionHours := s.retentionHours + maxMemoryMB := s.maxMemoryMB + hotStartupHours := s.hotStartupHours // Distinct advert pubkey count — precomputed incrementally (see trackAdvertPubkey). advertByObsCount := len(s.advertPubkeys) @@ -1423,12 +1431,12 @@ func (s *PacketStore) GetPerfStoreStats() map[string]interface{} { "queries": atomic.LoadInt64(&s.queryCount), "inMemory": totalLoaded, "sqliteOnly": false, - "retentionHours": s.retentionHours, - "maxMemoryMB": s.maxMemoryMB, - "oldestLoaded": s.oldestLoaded, + "retentionHours": retentionHours, + "maxMemoryMB": maxMemoryMB, + "oldestLoaded": oldestLoaded, "estimatedMB": estimatedMB, "trackedMB": trackedMB, - "hotStartupHours": s.hotStartupHours, + "hotStartupHours": hotStartupHours, "backgroundLoadComplete": s.backgroundLoadDone.Load(), "backgroundLoadProgress": s.backgroundLoadProgress.Load(), "indexes": map[string]interface{}{ From fa7d8bd19d56262fe43aa0579a605f9ace3d9156 Mon Sep 17 00:00:00 2001 From: efiten Date: Mon, 11 May 2026 21:57:53 +0200 Subject: [PATCH 10/11] test(e2e): reset groupbyhash before hex pane test to fix hidden-row timeout The "Expanded group children" test sets meshcore-groupbyhash=true in localStorage and never cleans up. In grouped mode child rows are hidden by default, causing waitForSelector('tr:not([id^=vscroll])') to resolve to 34 hidden s and time out. Co-Authored-By: Claude Sonnet 4.6 --- test-e2e-playwright.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test-e2e-playwright.js b/test-e2e-playwright.js index 6bbd87e5..21185931 100644 --- a/test-e2e-playwright.js +++ b/test-e2e-playwright.js @@ -2052,6 +2052,12 @@ async function run() { // Test: per-observation raw_hex — hex pane updates when switching observations (#881) await test('Packet detail hex pane updates per observation', async () => { + // Reset groupbyhash left over from "Expanded group children" test — grouped mode hides child + // rows by default, causing waitForSelector to see hidden s and time out. + await page.evaluate(() => { + localStorage.removeItem('meshcore-groupbyhash'); + localStorage.setItem('meshcore-time-window', '525600'); + }); await page.goto(BASE + '#/packets', { waitUntil: 'domcontentloaded' }); await page.waitForSelector('table tbody tr:not([id^=vscroll])', { timeout: 15000 }); await page.waitForTimeout(500); From 3fb57b2b84aef99ffd6c147f5fe9fe9488fb4f52 Mon Sep 17 00:00:00 2001 From: efiten Date: Mon, 11 May 2026 22:17:33 +0200 Subject: [PATCH 11/11] =?UTF-8?q?test(e2e):=20fix=20hex=20pane=20test=20is?= =?UTF-8?q?olation=20=E2=80=94=20force=20full=20reload=20after=20localStor?= =?UTF-8?q?age=20reset?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A simple page.goto is treated as a hash-change (same document), so the SPA closes the detail pane left open by the prior test before the main table's API fetch completes. waitForSelector then sees ~33 hidden detail-pane table rows and times out. Fix matches the pattern used by "Expanded group children": set localStorage, goto, then reload({ waitUntil: 'load' }) to guarantee a clean page with data fully loaded. Co-Authored-By: Claude Sonnet 4.6 --- test-e2e-playwright.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test-e2e-playwright.js b/test-e2e-playwright.js index 21185931..b15e39fc 100644 --- a/test-e2e-playwright.js +++ b/test-e2e-playwright.js @@ -2052,13 +2052,17 @@ async function run() { // Test: per-observation raw_hex — hex pane updates when switching observations (#881) await test('Packet detail hex pane updates per observation', async () => { - // Reset groupbyhash left over from "Expanded group children" test — grouped mode hides child - // rows by default, causing waitForSelector to see hidden s and time out. + // The prior "Expanded group children" test leaves groupbyhash=true and the detail pane open. + // A simple page.goto is treated as a hash-change (same document), so the SPA closes the + // detail pane (hiding its 30+ field/obs table rows) before the main table's API fetch + // completes — waitForSelector sees only hidden rows and times out. + // Fix: set localStorage, navigate, then reload with 'load' (same pattern the prior test uses). await page.evaluate(() => { localStorage.removeItem('meshcore-groupbyhash'); localStorage.setItem('meshcore-time-window', '525600'); }); - await page.goto(BASE + '#/packets', { waitUntil: 'domcontentloaded' }); + await page.goto(`${BASE}/#/packets`, { waitUntil: 'domcontentloaded' }); + await page.reload({ waitUntil: 'load' }); await page.waitForSelector('table tbody tr:not([id^=vscroll])', { timeout: 15000 }); await page.waitForTimeout(500);