Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
506b984
docs: add scope stats page design spec (issue #899)
efiten Apr 24, 2026
fa7da35
docs: update scope stats spec — scopes tab in Analytics (per issue fe…
efiten Apr 24, 2026
d5b23e9
docs: add scope stats implementation plan (#899)
efiten Apr 24, 2026
7284242
feat(ingestor/decoder): expose PayloadRaw bytes on DecodedPacket (#899)
efiten Apr 24, 2026
cd2c81b
feat(ingestor): add hashRegions config + loadRegionKeys + matchScope …
efiten Apr 24, 2026
5a28a77
feat(ingestor/db): add scope_name migration and BackfillScopeNames (#…
efiten Apr 24, 2026
124d390
fix(ingestor/db): check rows.Err() after BackfillScopeNames iteration…
efiten Apr 24, 2026
8a9f4a2
feat(ingestor): wire scope matching into ingest pipeline (#899)
efiten Apr 24, 2026
17c7c8e
feat(server/db): add GetScopeStats and ScopeStatsResponse types (#899)
efiten Apr 24, 2026
2a66739
feat(server): add /api/scope-stats endpoint (#899)
efiten Apr 24, 2026
96f9ac1
test(server): add error path coverage for handleScopeStats (#899)
efiten Apr 24, 2026
5a6458f
docs: add /api/scope-stats to api-spec (#899)
efiten Apr 24, 2026
7b6e972
feat(frontend): add Scopes tab to Analytics page (#899)
efiten Apr 24, 2026
0ded7d1
fix(server/db): check rows.Err() after GetScopeStats iteration; fix a…
efiten Apr 24, 2026
95c2382
docs: add hashRegions example to config.example.json (#899)
efiten Apr 24, 2026
29cfd49
feat(packets): show scope_name in packet breakdown detail panel (#899)
efiten Apr 24, 2026
e535b1e
perf(ingestor): remove BackfillScopeNames — caused 100% CPU on first …
efiten Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/ingestor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
ChannelKeysPath string `json:"channelKeysPath,omitempty"`
ChannelKeys map[string]string `json:"channelKeys,omitempty"`
HashChannels []string `json:"hashChannels,omitempty"`
HashRegions []string `json:"hashRegions,omitempty"`
Retention *RetentionConfig `json:"retention,omitempty"`
Metrics *MetricsConfig `json:"metrics,omitempty"`
GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"`
Expand Down
38 changes: 19 additions & 19 deletions cmd/ingestor/coverage_boost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestHandleMessageChannelMessage(t *testing.T) {
payload := []byte(`{"text":"Alice: Hello everyone","channel_idx":3,"SNR":5.0,"RSSI":-95,"score":10,"direction":"rx","sender_timestamp":1700000000}`)
msg := &mockMessage{topic: "meshcore/message/channel/2", payload: payload}

handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestHandleMessageChannelMessageEmptyText(t *testing.T) {
store, source := newTestContext(t)

msg := &mockMessage{topic: "meshcore/message/channel/1", payload: []byte(`{"text":""}`)}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand All @@ -233,7 +233,7 @@ func TestHandleMessageChannelNoSender(t *testing.T) {
store, source := newTestContext(t)

msg := &mockMessage{topic: "meshcore/message/channel/1", payload: []byte(`{"text":"no sender here"}`)}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil {
Expand All @@ -250,7 +250,7 @@ func TestHandleMessageDirectMessage(t *testing.T) {
payload := []byte(`{"text":"Bob: Hey there","sender_timestamp":1700000000,"SNR":3.0,"rssi":-100,"Score":8,"Direction":"tx"}`)
msg := &mockMessage{topic: "meshcore/message/direct/abc123", payload: payload}

handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestHandleMessageDirectMessageEmptyText(t *testing.T) {
store, source := newTestContext(t)

msg := &mockMessage{topic: "meshcore/message/direct/abc", payload: []byte(`{"text":""}`)}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand All @@ -309,7 +309,7 @@ func TestHandleMessageDirectNoSender(t *testing.T) {
store, source := newTestContext(t)

msg := &mockMessage{topic: "meshcore/message/direct/xyz", payload: []byte(`{"text":"message with no colon"}`)}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand All @@ -328,7 +328,7 @@ func TestHandleMessageUppercaseScoreDirection(t *testing.T) {
payload := []byte(`{"raw":"` + rawHex + `","Score":9.0,"Direction":"tx"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}

handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var score *float64
var direction *string
Expand All @@ -349,7 +349,7 @@ func TestHandleMessageChannelLowercaseFields(t *testing.T) {

payload := []byte(`{"text":"Test: msg","snr":3.0,"rssi":-90,"Score":5,"Direction":"rx"}`)
msg := &mockMessage{topic: "meshcore/message/channel/0", payload: payload}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand All @@ -365,7 +365,7 @@ func TestHandleMessageDirectLowercaseFields(t *testing.T) {

payload := []byte(`{"text":"Test: msg","snr":2.0,"rssi":-85,"score":7,"direction":"tx"}`)
msg := &mockMessage{topic: "meshcore/message/direct/xyz", payload: payload}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand All @@ -388,7 +388,7 @@ func TestHandleMessageAdvertWithTelemetry(t *testing.T) {
payload: []byte(`{"raw":"` + rawHex + `"}`),
}

handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

// Should have created transmission, node, and observer
var txCount, nodeCount, obsCount int
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestHandleMessageAdvertGeoFiltered(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, &Config{GeoFilter: gf})
handleMessage(store, "test", source, msg, nil, nil, &Config{GeoFilter: gf})

// Geo-filtered adverts should not create nodes
var nodeCount int
Expand Down Expand Up @@ -665,7 +665,7 @@ func TestHandleMessageCorruptedAdvertNoNode(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil {
Expand All @@ -687,7 +687,7 @@ func TestHandleMessageNonAdvertPacket(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand Down Expand Up @@ -864,7 +864,7 @@ func TestHandleMessageChannelLongSender(t *testing.T) {
longText := "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA: msg"
payload := []byte(`{"text":"` + longText + `"}`)
msg := &mockMessage{topic: "meshcore/message/channel/1", payload: payload}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil {
Expand All @@ -883,7 +883,7 @@ func TestHandleMessageDirectLongSender(t *testing.T) {
longText := "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB: msg"
payload := []byte(`{"text":"` + longText + `"}`)
msg := &mockMessage{topic: "meshcore/message/direct/abc", payload: payload}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand All @@ -900,7 +900,7 @@ func TestHandleMessageDirectUppercaseScoreDirection(t *testing.T) {

payload := []byte(`{"text":"X: hi","Score":6,"Direction":"rx"}`)
msg := &mockMessage{topic: "meshcore/message/direct/d1", payload: payload}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand Down Expand Up @@ -930,7 +930,7 @@ func TestHandleMessageChannelUppercaseScoreDirection(t *testing.T) {

payload := []byte(`{"text":"Y: hi","Score":4,"Direction":"tx"}`)
msg := &mockMessage{topic: "meshcore/message/channel/5", payload: payload}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
Expand Down Expand Up @@ -961,7 +961,7 @@ func TestHandleMessageRawLowercaseScore(t *testing.T) {
rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976"
payload := []byte(`{"raw":"` + rawHex + `","score":3.5}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var score *float64
if err := store.db.QueryRow("SELECT score FROM observations LIMIT 1").Scan(&score); err != nil {
Expand All @@ -980,7 +980,7 @@ func TestHandleMessageStatusNoOrigin(t *testing.T) {
topic: "meshcore/LAX/obs5/status",
payload: []byte(`{"model":"L1"}`),
}
handleMessage(store, "test", source, msg, nil, &Config{})
handleMessage(store, "test", source, msg, nil, nil, &Config{})

var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM observers WHERE id = 'obs5'").Scan(&count); err != nil {
Expand Down
64 changes: 46 additions & 18 deletions cmd/ingestor/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,16 @@ func applySchema(db *sql.DB) error {
log.Println("[migration] observations.raw_hex column added")
}

// Migration: add scope_name column to transmissions (#899)
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'scope_name_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Adding scope_name column to transmissions...")
db.Exec(`ALTER TABLE transmissions ADD COLUMN scope_name TEXT DEFAULT NULL`)
db.Exec(`CREATE INDEX IF NOT EXISTS idx_tx_scope_name ON transmissions(scope_name) WHERE scope_name IS NOT NULL`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('scope_name_v1')`)
log.Println("[migration] scope_name column added")
}

return nil
}

Expand All @@ -430,8 +440,8 @@ func (s *Store) prepareStatements() error {
}

s.stmtInsertTransmission, err = s.db.Prepare(`
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, channel_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, channel_hash, scope_name)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
Expand Down Expand Up @@ -560,6 +570,7 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
data.RawHex, hash, now,
data.RouteType, data.PayloadType, data.PayloadVersion,
data.DecodedJSON, nilIfEmpty(data.ChannelHash),
scopeNameForDB(data),
)
if err != nil {
s.Stats.WriteErrors.Add(1)
Expand Down Expand Up @@ -904,23 +915,26 @@ func (s *Store) PruneDroppedPackets(retentionDays int) (int64, error) {
return n, nil
}


// PacketData holds the data needed to insert a packet into the DB.
type PacketData struct {
RawHex string
Timestamp string
ObserverID string
ObserverName string
SNR *float64
RSSI *float64
Score *float64
Direction *string
Hash string
RouteType int
PayloadType int
PayloadVersion int
PathJSON string
DecodedJSON string
ChannelHash string // grouping key for channel queries (#762)
RawHex string
Timestamp string
ObserverID string
ObserverName string
SNR *float64
RSSI *float64
Score *float64
Direction *string
Hash string
RouteType int
PayloadType int
PayloadVersion int
PathJSON string
DecodedJSON string
ChannelHash string // grouping key for channel queries (#762)
ScopeName string // matched region name, or "" for unknown-scoped
IsTransportScoped bool // true when route_type IN (0,3) AND Code1 ≠ "0000"
}

// nilIfEmpty returns nil for empty strings (for nullable DB columns).
Expand All @@ -931,6 +945,15 @@ func nilIfEmpty(s string) interface{} {
return s
}

// scopeNameForDB encodes PacketData scope semantics for DB storage:
// non-transport-scoped → NULL; transport-scoped → ScopeName (may be "" for unknown).
func scopeNameForDB(data *PacketData) interface{} {
if !data.IsTransportScoped {
return nil
}
return data.ScopeName // "" or "#regionname"
}

// MQTTPacketMessage is the JSON payload from an MQTT raw packet message.
type MQTTPacketMessage struct {
Raw string `json:"raw"`
Expand All @@ -945,7 +968,7 @@ type MQTTPacketMessage struct {
// path_json is derived directly from raw_hex header bytes (not decoded.Path.Hops)
// to guarantee the stored path always matches the raw bytes. This matters for
// TRACE packets where decoded.Path.Hops is overwritten with payload hops (#886).
func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string) *PacketData {
func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string, regionKeys map[string][]byte) *PacketData {
now := time.Now().UTC().Format(time.RFC3339)
pathJSON := "[]"
// For TRACE packets, path_json must be the payload-decoded route hops
Expand Down Expand Up @@ -987,5 +1010,10 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID,
}
}

if decoded.TransportCodes != nil && decoded.TransportCodes.Code1 != "0000" {
pd.IsTransportScoped = true
pd.ScopeName = matchScope(regionKeys, byte(decoded.Header.PayloadType), decoded.PayloadRaw, decoded.TransportCodes.Code1)
}

return pd
}
Loading