Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmd/ingestor/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,20 @@ func applySchema(db *sql.DB) error {
log.Println("[migration] observations.raw_hex column added")
}


// Migration: add multibyte capability columns to nodes/inactive_nodes (#903)
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'multibyte_sup_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding multibyte_sup columns to nodes/inactive_nodes...")
db.Exec(`ALTER TABLE nodes ADD COLUMN multibyte_sup INTEGER NOT NULL DEFAULT 0`)
db.Exec(`ALTER TABLE nodes ADD COLUMN multibyte_evidence TEXT`)
db.Exec(`ALTER TABLE inactive_nodes ADD COLUMN multibyte_sup INTEGER NOT NULL DEFAULT 0`)
db.Exec(`ALTER TABLE inactive_nodes ADD COLUMN multibyte_evidence TEXT`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('multibyte_sup_v1')`)
log.Println("[migration] multibyte_sup columns added")
}


return nil
}

Expand Down
57 changes: 57 additions & 0 deletions cmd/ingestor/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,63 @@ func TestSchemaNoiseFloorIsReal(t *testing.T) {
}
}

func TestSchemaMultibyteSupColumns(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()

cols := map[string]string{}
rows, err := s.db.Query("PRAGMA table_info(nodes)")
if err != nil {
t.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var cid int
var colName, colType string
var notNull, pk int
var dflt interface{}
if rows.Scan(&cid, &colName, &colType, &notNull, &dflt, &pk) == nil {
cols[colName] = colType
}
}

if ct, ok := cols["multibyte_sup"]; !ok {
t.Error("nodes.multibyte_sup column missing")
} else if ct != "INTEGER" {
t.Errorf("nodes.multibyte_sup type=%s, want INTEGER", ct)
}
if _, ok := cols["multibyte_evidence"]; !ok {
t.Error("nodes.multibyte_evidence column missing")
}

inactiveCols := map[string]string{}
inactiveRows, err := s.db.Query("PRAGMA table_info(inactive_nodes)")
if err != nil {
t.Fatal(err)
}
defer inactiveRows.Close()
for inactiveRows.Next() {
var cid int
var colName, colType string
var notNull, pk int
var dflt interface{}
if inactiveRows.Scan(&cid, &colName, &colType, &notNull, &dflt, &pk) == nil {
inactiveCols[colName] = colType
}
}
if ct, ok := inactiveCols["multibyte_sup"]; !ok {
t.Error("inactive_nodes.multibyte_sup column missing")
} else if ct != "INTEGER" {
t.Errorf("inactive_nodes.multibyte_sup type=%s, want INTEGER", ct)
}
if _, ok := inactiveCols["multibyte_evidence"]; !ok {
t.Error("inactive_nodes.multibyte_evidence column missing")
}
}

func TestInsertTransmissionWithObserver(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
Expand Down
86 changes: 86 additions & 0 deletions cmd/server/bounded_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,92 @@ func TestBoundedLoad_AscendingOrder(t *testing.T) {
}
}

// loadStoreWithRetention creates a PacketStore with retentionHours set.
func loadStoreWithRetention(t *testing.T, dbPath string, retentionHours float64) *PacketStore {
t.Helper()
db, err := OpenDB(dbPath)
if err != nil {
t.Fatal(err)
}
cfg := &PacketStoreConfig{RetentionHours: retentionHours}
store := NewPacketStore(db, cfg)
if err := store.Load(); err != nil {
t.Fatal(err)
}
return store
}

// createTestDBWithAgedPackets inserts numRecent packets with timestamps within
// the last hour and numOld packets with timestamps 48 hours ago.
func createTestDBWithAgedPackets(t *testing.T, numRecent, numOld int) string {
t.Helper()
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")

conn, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL")
if err != nil {
t.Fatal(err)
}
defer conn.Close()

execOrFail := func(s string) {
if _, err := conn.Exec(s); err != nil {
t.Fatalf("setup: %v\nSQL: %s", err, s)
}
}
execOrFail(`CREATE TABLE transmissions (id INTEGER PRIMARY KEY, raw_hex TEXT, hash TEXT, first_seen TEXT, route_type INTEGER, payload_type INTEGER, payload_version INTEGER, decoded_json TEXT)`)
execOrFail(`CREATE TABLE observations (id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, observer_name TEXT, direction TEXT, snr REAL, rssi REAL, score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT)`)
execOrFail(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`)
execOrFail(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`)
execOrFail(`CREATE TABLE schema_version (version INTEGER)`)
execOrFail(`INSERT INTO schema_version (version) VALUES (1)`)
execOrFail(`CREATE INDEX idx_tx_first_seen ON transmissions(first_seen)`)

now := time.Now().UTC()
id := 1
// Insert old packets (48 hours ago)
for i := 0; i < numOld; i++ {
ts := now.Add(-48 * time.Hour).Add(time.Duration(i) * time.Second).Format(time.RFC3339)
conn.Exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)", id, "aa", fmt.Sprintf("old%d", i), ts, `{}`)
conn.Exec("INSERT INTO observations VALUES (?,?,?,?,?,?,?,?,?,?,?)", id, id, "obs1", "Obs1", "RX", -10.0, -80.0, 5, `[]`, ts, "")
id++
}
// Insert recent packets (within last hour)
for i := 0; i < numRecent; i++ {
ts := now.Add(-30 * time.Minute).Add(time.Duration(i) * time.Second).Format(time.RFC3339)
conn.Exec("INSERT INTO transmissions VALUES (?,?,?,?,0,4,1,?)", id, "bb", fmt.Sprintf("new%d", i), ts, `{}`)
conn.Exec("INSERT INTO observations VALUES (?,?,?,?,?,?,?,?,?,?,?)", id, id, "obs1", "Obs1", "RX", -10.0, -80.0, 5, `[]`, ts, "")
id++
}
return dbPath
}

func TestRetentionLoad_OnlyLoadsRecentPackets(t *testing.T) {
dbPath := createTestDBWithAgedPackets(t, 50, 100)
defer os.RemoveAll(filepath.Dir(dbPath))

// retention = 2 hours — should load only the 50 recent packets, not the 100 old ones
store := loadStoreWithRetention(t, dbPath, 2)
defer store.db.conn.Close()

if len(store.packets) != 50 {
t.Errorf("expected 50 recent packets, got %d (old packets should be excluded by retentionHours)", len(store.packets))
}
}

func TestRetentionLoad_ZeroRetentionLoadsAll(t *testing.T) {
dbPath := createTestDBWithAgedPackets(t, 50, 100)
defer os.RemoveAll(filepath.Dir(dbPath))

// retention = 0 (unlimited) — should load all 150 packets
store := loadStoreWithRetention(t, dbPath, 0)
defer store.db.conn.Close()

if len(store.packets) != 150 {
t.Errorf("expected all 150 packets with retentionHours=0, got %d", len(store.packets))
}
}

func TestEstimateStoreTxBytesTypical(t *testing.T) {
est := estimateStoreTxBytesTypical(10)
if est < 1000 {
Expand Down
63 changes: 53 additions & 10 deletions cmd/server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type DB struct {
path string // filesystem path to the database file
isV3 bool // v3 schema: observer_idx in observations (vs observer_id in v2)
hasResolvedPath bool // observations table has resolved_path column
hasObsRawHex bool // observations table has raw_hex column (#881)
hasObsRawHex bool // observations table has raw_hex column (#881)
hasMultibyteSupCols bool // nodes table has multibyte_sup/multibyte_evidence columns (#903)

// Channel list cache (60s TTL) — avoids repeated GROUP BY scans (#762)
channelsCacheMu sync.Mutex
Expand Down Expand Up @@ -82,6 +83,24 @@ func (db *DB) detectSchema() {
}
}
}

nodeRows, err := db.conn.Query("PRAGMA table_info(nodes)")
if err != nil {
return
}
defer nodeRows.Close()
for nodeRows.Next() {
var cid int
var colName string
var colType sql.NullString
var notNull, pk int
var dflt sql.NullString
if nodeRows.Scan(&cid, &colName, &colType, &notNull, &dflt, &pk) == nil {
if colName == "multibyte_sup" {
db.hasMultibyteSupCols = true
}
}
}
}

// transmissionBaseSQL returns the SELECT columns and JOIN clause for transmission-centric queries.
Expand Down Expand Up @@ -786,7 +805,11 @@ func (db *DB) GetNodes(limit, offset int, role, search, before, lastHeard, sortB
var total int
db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM nodes %s", w), args...).Scan(&total)

querySQL := fmt.Sprintf("SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c FROM nodes %s ORDER BY %s LIMIT ? OFFSET ?", w, order)
nodeColList := "public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c"
if db.hasMultibyteSupCols {
nodeColList += ", multibyte_sup, multibyte_evidence"
}
querySQL := fmt.Sprintf("SELECT %s FROM nodes %s ORDER BY %s LIMIT ? OFFSET ?", nodeColList, w, order)
qArgs := append(args, limit, offset)

rows, err := db.conn.Query(querySQL, qArgs...)
Expand All @@ -797,7 +820,7 @@ func (db *DB) GetNodes(limit, offset int, role, search, before, lastHeard, sortB

nodes := make([]map[string]interface{}, 0)
for rows.Next() {
n := scanNodeRow(rows)
n := db.scanNodeRow(rows)
if n != nil {
nodes = append(nodes, n)
}
Expand All @@ -812,8 +835,12 @@ func (db *DB) SearchNodes(query string, limit int) ([]map[string]interface{}, er
if limit <= 0 {
limit = 10
}
rows, err := db.conn.Query(`SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c
FROM nodes WHERE name LIKE ? OR public_key LIKE ? ORDER BY last_seen DESC LIMIT ?`,
colList := "public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c"
if db.hasMultibyteSupCols {
colList += ", multibyte_sup, multibyte_evidence"
}
rows, err := db.conn.Query(
fmt.Sprintf("SELECT %s FROM nodes WHERE name LIKE ? OR public_key LIKE ? ORDER BY last_seen DESC LIMIT ?", colList),
"%"+query+"%", query+"%", limit)
if err != nil {
return nil, err
Expand All @@ -822,7 +849,7 @@ func (db *DB) SearchNodes(query string, limit int) ([]map[string]interface{}, er

nodes := make([]map[string]interface{}, 0)
for rows.Next() {
n := scanNodeRow(rows)
n := db.scanNodeRow(rows)
if n != nil {
nodes = append(nodes, n)
}
Expand All @@ -832,13 +859,17 @@ func (db *DB) SearchNodes(query string, limit int) ([]map[string]interface{}, er

// GetNodeByPubkey returns a single node.
func (db *DB) GetNodeByPubkey(pubkey string) (map[string]interface{}, error) {
rows, err := db.conn.Query("SELECT public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c FROM nodes WHERE public_key = ?", pubkey)
colList := "public_key, name, role, lat, lon, last_seen, first_seen, advert_count, battery_mv, temperature_c"
if db.hasMultibyteSupCols {
colList += ", multibyte_sup, multibyte_evidence"
}
rows, err := db.conn.Query(fmt.Sprintf("SELECT %s FROM nodes WHERE public_key = ?", colList), pubkey)
if err != nil {
return nil, err
}
defer rows.Close()
if rows.Next() {
return scanNodeRow(rows), nil
return db.scanNodeRow(rows), nil
}
return nil, nil
}
Expand Down Expand Up @@ -1795,15 +1826,21 @@ func scanPacketRow(rows *sql.Rows) map[string]interface{} {
}
}

func scanNodeRow(rows *sql.Rows) map[string]interface{} {
func (db *DB) scanNodeRow(rows *sql.Rows) map[string]interface{} {
var pk string
var name, role, lastSeen, firstSeen sql.NullString
var lat, lon sql.NullFloat64
var advertCount int
var batteryMv sql.NullInt64
var temperatureC sql.NullFloat64
var multibyteSup sql.NullInt64
var multibyteEvidence sql.NullString

if err := rows.Scan(&pk, &name, &role, &lat, &lon, &lastSeen, &firstSeen, &advertCount, &batteryMv, &temperatureC); err != nil {
scanArgs := []interface{}{&pk, &name, &role, &lat, &lon, &lastSeen, &firstSeen, &advertCount, &batteryMv, &temperatureC}
if db.hasMultibyteSupCols {
scanArgs = append(scanArgs, &multibyteSup, &multibyteEvidence)
}
if err := rows.Scan(scanArgs...); err != nil {
return nil
}
m := map[string]interface{}{
Expand All @@ -1818,6 +1855,12 @@ func scanNodeRow(rows *sql.Rows) map[string]interface{} {
"last_heard": nullStr(lastSeen),
"hash_size": nil,
"hash_size_inconsistent": false,
"multibyte_sup": int(multibyteSup.Int64), // always present; zero-value when col absent
}
if multibyteEvidence.Valid {
m["multibyte_evidence"] = multibyteEvidence.String
} else {
m["multibyte_evidence"] = nil
}
if batteryMv.Valid {
m["battery_mv"] = int(batteryMv.Int64)
Expand Down
28 changes: 28 additions & 0 deletions cmd/server/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2033,3 +2033,31 @@ func TestPerObservationRawHexEnrich(t *testing.T) {
}
}
}

func TestGetNodesReturnsMultibyteSupField(t *testing.T) {
conn, _ := sql.Open("sqlite", ":memory:")
conn.SetMaxOpenConns(1)
conn.Exec(`CREATE TABLE nodes (
public_key TEXT PRIMARY KEY, name TEXT, role TEXT,
lat REAL, lon REAL, last_seen TEXT, first_seen TEXT,
advert_count INTEGER DEFAULT 0, battery_mv INTEGER, temperature_c REAL,
multibyte_sup INTEGER NOT NULL DEFAULT 0, multibyte_evidence TEXT
)`)
conn.Exec(`INSERT INTO nodes (public_key, name, role, last_seen, first_seen)
VALUES ('aabb1122', 'TestRep', 'repeater', '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z')`)
db := &DB{conn: conn, hasMultibyteSupCols: true}

nodes, _, _, err := db.GetNodes(10, 0, "", "", "", "", "", "")
if err != nil {
t.Fatal(err)
}
if len(nodes) == 0 {
t.Fatal("expected 1 node")
}
if _, ok := nodes[0]["multibyte_sup"]; !ok {
t.Error("multibyte_sup missing from GetNodes response")
}
if nodes[0]["multibyte_sup"] != 0 {
t.Errorf("multibyte_sup = %v, want 0", nodes[0]["multibyte_sup"])
}
}
Loading