From 506b984c60632f17ce7cc4ddf4aac4e11296b164 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 09:19:21 +0200 Subject: [PATCH 01/17] docs: add scope stats page design spec (issue #899) Co-Authored-By: Claude Sonnet 4.6 --- .../specs/2026-04-23-scope-stats-design.md | 204 ++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-23-scope-stats-design.md diff --git a/docs/superpowers/specs/2026-04-23-scope-stats-design.md b/docs/superpowers/specs/2026-04-23-scope-stats-design.md new file mode 100644 index 00000000..22a310bd --- /dev/null +++ b/docs/superpowers/specs/2026-04-23-scope-stats-design.md @@ -0,0 +1,204 @@ +# Scope Stats Page — Design Spec + +**Issue**: Kpa-clawbot/CoreScope#899 +**Date**: 2026-04-23 +**Branch target**: `master` + +--- + +## Overview + +Add a dedicated **Scopes** page showing scope/region statistics for MeshCore transport-route packets. Scope filtering in MeshCore uses `TRANSPORT_FLOOD` (route_type 0) and `TRANSPORT_DIRECT` (route_type 3) packets that carry two 16-bit transport codes. Code1 ≠ `0000` means the packet is region-scoped. + +Feature 3 from the issue (default scope per client via advert) is **not implemented** — the advert format has no scope field in the current firmware. + +--- + +## How Scopes Work (Firmware) + +Transport code derivation (authoritative source: `meshcore-dev/MeshCore`): + +``` +key = SHA256("#regionname")[:16] // TransportKeyStore::getAutoKeyFor +Code1 = HMAC-SHA256(key, type || payload) // TransportKey::calcTransportCode, 2-byte output +``` + +Code1 is a **per-message** HMAC — the same region produces a different Code1 for every message. Identifying a region from Code1 requires knowing the region name in advance and recomputing the HMAC. + +`Code1 = 0000` is the "no scope" sentinel (also `FFFF` is reserved). Packets with route_type 1 or 2 (plain FLOOD/DIRECT) carry no transport codes. + +--- + +## Config + +Add `hashRegions` to the ingestor `Config` struct in `cmd/ingestor/config.go`, mirroring `hashChannels`: + +```json +"hashRegions": ["#belgium", "#eu", "#brussels"] +``` + +Normalization (same rules as `hashChannels`): +- Trim whitespace +- Prepend `#` if missing +- Skip empty entries + +--- + +## Ingestor Changes + +### Key derivation (`loadRegionKeys`) + +```go +func loadRegionKeys(cfg *Config) map[string][]byte { + // key = first 16 bytes of SHA256("#regionname") +} +``` + +Returns `map[string][]byte` (region name → 16-byte HMAC key). Called once at startup, stored on the `Store`. + +### Decoder: expose raw payload bytes + +Add `PayloadRaw []byte` to `DecodedPacket` in `cmd/ingestor/decoder.go`. Populated from the raw `buf` slice at the payload offset — zero-copy slice, no allocation. This is the **encrypted** payload bytes, matching what the firmware feeds into `calcTransportCode`. + +### At-ingest region matching + +In `BuildPacketData`: +- Skip if `route_type` not in `{0, 3}` → `scope_name` stays `nil` +- If `Code1 == "0000"` → `scope_name = nil` (unscoped transport, no scope involvement) +- If `Code1 != "0000"` → try each region key: + ``` + HMAC-SHA256(key, payloadType_byte || PayloadRaw) → first 2 bytes as uint16 + ``` + First match → `scope_name = "#regionname"`. No match → `scope_name = ""` (unknown scope). + +Add `ScopeName *string` to `PacketData`. + +### MQTT-sourced packets (DM / CHAN paths in main.go) + +These are injected directly without going through `BuildPacketData`. They use `route_type = 1` (FLOOD), so they are never transport-route packets. No scope matching needed for these paths. + +--- + +## Database + +### Migration + +```sql +ALTER TABLE transmissions ADD COLUMN scope_name TEXT DEFAULT NULL; +CREATE INDEX idx_tx_scope_name ON transmissions(scope_name) WHERE scope_name IS NOT NULL; +``` + +### Column semantics + +| Value | Meaning | +|-------|---------| +| `NULL` | Either: non-transport-route packet (route_type 1/2), or transport-route with Code1=0000 | +| `""` (empty string) | Transport-route, Code1 ≠ 0000, but no configured region matched | +| `"#belgium"` | Matched named region | + +The API stats queries resolve the NULL ambiguity by always filtering `route_type IN (0, 3)` first: +- `unscoped` count = `route_type IN (0,3) AND scope_name IS NULL` +- `scoped` count = `route_type IN (0,3) AND scope_name IS NOT NULL` + +### Backfill + +On migration, re-decode `raw_hex` for all rows where `route_type IN (0, 3)` and `scope_name IS NULL`. Run the same HMAC matching logic. Rows with `Code1 = 0000` remain `NULL`. + +The backfill runs in the existing migration framework in `cmd/ingestor/db.go`. If no regions are configured, backfill is skipped. + +--- + +## API + +### `GET /api/scope-stats` + +**Query param**: `window` — one of `1h`, `24h` (default), `7d` + +**Time-series bucket sizes**: +| Window | Bucket | +|--------|--------| +| `1h` | 5 min | +| `24h` | 1 hour | +| `7d` | 6 hours| + +**Response**: +```json +{ + "window": "24h", + "summary": { + "transportTotal": 1240, + "scoped": 890, + "unscoped": 350, + "unknownScope": 42 + }, + "byRegion": [ + { "name": "#belgium", "count": 612 }, + { "name": "#eu", "count": 236 } + ], + "timeSeries": [ + { "t": "2026-04-23T10:00:00Z", "scoped": 45, "unscoped": 18 }, + { "t": "2026-04-23T11:00:00Z", "scoped": 51, "unscoped": 22 } + ] +} +``` + +- `transportTotal` = `scoped + unscoped` (transport-route packets only) +- `scoped` = Code1 ≠ 0000 (named + unknown) +- `unscoped` = transport-route with Code1 = 0000 +- `unknownScope` = scoped but no region name matched (subset of `scoped`) +- `byRegion` sorted by count descending, excludes unknown +- `timeSeries` covers the full window at the bucket granularity + +Route: `GET /api/scope-stats` registered in `cmd/server/routes.go`. +No auth required (same as other read endpoints). +TTL cache: 30 seconds (heavier query than `/api/stats`). + +--- + +## Frontend + +### Navigation + +Add nav link between Channels and Nodes in `public/index.html`: +```html +Scopes +``` + +### `public/scopes.js` + +Three sections on the page: + +**1. Summary cards** (reuse existing card CSS pattern from home/analytics pages) +- Transport total, Scoped, Unscoped, Unknown scope +- Each card shows count + percentage of transport total + +**2. Per-region table** +Columns: Region, Messages, % of Scoped +Sorted by count descending. Last row: "Unknown scope" (italic) if unknownScope > 0. +Shows "No regions configured" message if `byRegion` is empty and `unknownScope = 0`. + +**3. Time-series chart** +- Window selector: `1h / 24h / 7d` (default 24h) +- Two lines: **Scoped** (blue) and **Unscoped** (grey) +- Uses the same lightweight canvas chart pattern as other pages (no external chart lib) + +### Cache buster + +`scopes.js` added to the `__BUST__` entries in `index.html` in the same commit. + +--- + +## Testing + +- Unit tests for `loadRegionKeys`: normalization, key bytes match firmware SHA256 derivation +- Unit tests for HMAC matching: known Code1 value computed from firmware logic, verified against Go implementation +- Integration test: ingest a synthetic transport-route packet with a known region, assert `scope_name` column is set correctly +- API test: `GET /api/scope-stats` returns correct summary counts against fixture DB + +--- + +## Out of Scope + +- Feature 3 (default scope per client via advert) — firmware has no advert scope field +- Drill-down from region row to filtered packet list (deferred) +- Private regions (`$`-prefixed) — use secret keys not publicly derivable From fa7da35551562480abd0885a7e0f2c2abc6b4750 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 11:35:49 +0200 Subject: [PATCH 02/17] =?UTF-8?q?docs:=20update=20scope=20stats=20spec=20?= =?UTF-8?q?=E2=80=94=20scopes=20tab=20in=20Analytics=20(per=20issue=20feed?= =?UTF-8?q?back)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- .../specs/2026-04-23-scope-stats-design.md | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/docs/superpowers/specs/2026-04-23-scope-stats-design.md b/docs/superpowers/specs/2026-04-23-scope-stats-design.md index 22a310bd..9e9860bb 100644 --- a/docs/superpowers/specs/2026-04-23-scope-stats-design.md +++ b/docs/superpowers/specs/2026-04-23-scope-stats-design.md @@ -157,34 +157,27 @@ TTL cache: 30 seconds (heavier query than `/api/stats`). ## Frontend -### Navigation +### Placement -Add nav link between Channels and Nodes in `public/index.html`: -```html -Scopes -``` +Add a **"Scopes" tab** to the existing Analytics page (`public/analytics.js`) — no new nav item, no new JS file. Tab button added after the existing "Clock Health" tab in the `analyticsTabs` div. -### `public/scopes.js` +Deep-link: `#/analytics?tab=scopes` -Three sections on the page: +### Tab content — three sections -**1. Summary cards** (reuse existing card CSS pattern from home/analytics pages) +**1. Summary cards** (reuse existing Analytics card CSS) - Transport total, Scoped, Unscoped, Unknown scope - Each card shows count + percentage of transport total **2. Per-region table** Columns: Region, Messages, % of Scoped -Sorted by count descending. Last row: "Unknown scope" (italic) if unknownScope > 0. -Shows "No regions configured" message if `byRegion` is empty and `unknownScope = 0`. +Sorted by count descending. Last row: "Unknown scope" (italic) if `unknownScope > 0`. +Shows a "No regions configured — add `hashRegions` to your config" hint if `byRegion` is empty and `unknownScope = 0`. **3. Time-series chart** - Window selector: `1h / 24h / 7d` (default 24h) - Two lines: **Scoped** (blue) and **Unscoped** (grey) -- Uses the same lightweight canvas chart pattern as other pages (no external chart lib) - -### Cache buster - -`scopes.js` added to the `__BUST__` entries in `index.html` in the same commit. +- Uses the same inline SVG chart helpers already present in `analytics.js` (no external lib) --- From d5b23e91d9a4a3011736df206e74a3ea83d3c1c7 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 11:41:50 +0200 Subject: [PATCH 03/17] docs: add scope stats implementation plan (#899) Co-Authored-By: Claude Sonnet 4.6 --- .../plans/2026-04-23-scope-stats.md | 1358 +++++++++++++++++ 1 file changed, 1358 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-23-scope-stats.md diff --git a/docs/superpowers/plans/2026-04-23-scope-stats.md b/docs/superpowers/plans/2026-04-23-scope-stats.md new file mode 100644 index 00000000..5fbcfe36 --- /dev/null +++ b/docs/superpowers/plans/2026-04-23-scope-stats.md @@ -0,0 +1,1358 @@ +# Scope Stats Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a "Scopes" tab to the Analytics page showing scoped vs non-scoped transport-route statistics with per-region breakdowns, driven by a `hashRegions` config list. + +**Architecture:** At ingest, each transport-route packet (route_type 0 or 3) with Code1 ≠ `0000` is matched against HMAC-SHA256 codes derived from configured region names — mirroring the `hashChannels`/`channel_hash` pattern. The matched name (or `""` for unknown) goes into a new `scope_name` column. The server exposes `/api/scope-stats?window=` which the Analytics "Scopes" tab calls for summary counts, a per-region table, and a two-line time-series chart. + +**Tech Stack:** Go (`crypto/hmac`, `crypto/sha256`), SQLite, vanilla JS (no new libraries) + +**Spec:** `docs/superpowers/specs/2026-04-23-scope-stats-design.md` + +--- + +## File Map + +| File | Change | +|---|---| +| `cmd/ingestor/decoder.go` | Add `PayloadRaw []byte` to `DecodedPacket` | +| `cmd/ingestor/decoder_test.go` | Test `PayloadRaw` is populated | +| `cmd/ingestor/config.go` | Add `HashRegions []string` to `Config` | +| `cmd/ingestor/main.go` | Add `loadRegionKeys`, `matchScope`, call backfill | +| `cmd/ingestor/main_test.go` | Tests for `loadRegionKeys` and `matchScope` | +| `cmd/ingestor/db.go` | Migration for `scope_name` column + `BackfillScopeNames` method | +| `cmd/ingestor/db_test.go` | Test migration and backfill | +| `cmd/server/db.go` | Add `hasScopeName` to schema detection + `GetScopeStats` | +| `cmd/server/db_test.go` | Test `GetScopeStats` | +| `cmd/server/types.go` | Add `ScopeStatsResponse` and sub-types | +| `cmd/server/routes.go` | Add `handleScopeStats`, register route, add cache fields | +| `public/analytics.js` | Add "Scopes" tab button + `renderScopesTab` function | +| `public/index.html` | No change needed (tab is inside Analytics) | + +--- + +## Task 1: Expose `PayloadRaw` in the decoder + +**Files:** +- Modify: `cmd/ingestor/decoder.go` — `DecodedPacket` struct + `DecodePacket` function +- Test: `cmd/ingestor/decoder_test.go` + +- [ ] **Step 1: Write the failing test** + +Add to `cmd/ingestor/decoder_test.go`: + +```go +func TestDecodePacketPayloadRaw(t *testing.T) { + // Build a minimal TRANSPORT_FLOOD packet (route_type=0): + // header(1) + transport_codes(4) + path_len(1) + payload(N) + // Header 0x00 = route_type=TRANSPORT_FLOOD, payload_type=0, version=0 + // Code1=9A52, Code2=0000, path_len=0x00 (0 hops, hash_size=1) + payload := []byte("hello") + raw := []byte{0x00, 0x9A, 0x52, 0x00, 0x00, 0x00} + raw = append(raw, payload...) + hexStr := strings.ToUpper(hex.EncodeToString(raw)) + + decoded, err := DecodePacket(hexStr, nil, false) + if err != nil { + t.Fatalf("DecodePacket: %v", err) + } + if decoded.TransportCodes == nil { + t.Fatal("expected TransportCodes, got nil") + } + if string(decoded.PayloadRaw) != string(payload) { + t.Errorf("PayloadRaw = %v, want %v", decoded.PayloadRaw, payload) + } +} +``` + +- [ ] **Step 2: Run to verify it fails** + +``` +cd cmd/ingestor && go test -run TestDecodePacketPayloadRaw -v +``` +Expected: compile error — `PayloadRaw` field does not exist on `DecodedPacket`. + +- [ ] **Step 3: Add `PayloadRaw` to `DecodedPacket`** + +In `cmd/ingestor/decoder.go`, find the `DecodedPacket` struct (around line 141) and add the field: + +```go +type DecodedPacket struct { + Header Header `json:"header"` + TransportCodes *TransportCodes `json:"transportCodes"` + Path Path `json:"path"` + Payload Payload `json:"payload"` + Raw string `json:"raw"` + Anomaly string `json:"anomaly,omitempty"` + PayloadRaw []byte `json:"-"` // raw encrypted payload bytes, for HMAC matching +} +``` + +Then in `DecodePacket` (around line 589), after `payloadBuf := buf[offset:]`, populate the field in the return statement (around line 635): + +```go +return &DecodedPacket{ + Header: header, + TransportCodes: tc, + Path: path, + Payload: payload, + Raw: strings.ToUpper(hexString), + Anomaly: anomaly, + PayloadRaw: payloadBuf, +}, nil +``` + +- [ ] **Step 4: Run test to verify it passes** + +``` +cd cmd/ingestor && go test -run TestDecodePacketPayloadRaw -v +``` +Expected: PASS + +- [ ] **Step 5: Run all ingestor tests** + +``` +cd cmd/ingestor && go test ./... -v 2>&1 | tail -20 +``` +Expected: all PASS + +- [ ] **Step 6: Commit** + +```bash +git add cmd/ingestor/decoder.go cmd/ingestor/decoder_test.go +git commit -m "feat(ingestor/decoder): expose PayloadRaw bytes on DecodedPacket (#899)" +``` + +--- + +## Task 2: Config field + region key helpers + +**Files:** +- Modify: `cmd/ingestor/config.go` — add `HashRegions []string` +- Modify: `cmd/ingestor/main.go` — add `loadRegionKeys` and `matchScope` +- Test: `cmd/ingestor/main_test.go` + +- [ ] **Step 1: Write failing tests** + +Add to `cmd/ingestor/main_test.go`: + +```go +func TestLoadRegionKeys(t *testing.T) { + cfg := &Config{HashRegions: []string{"#belgium", "eu", " #Test ", "", "#belgium"}} + keys := loadRegionKeys(cfg) + + // Deduplication + normalization + if len(keys) != 3 { + t.Fatalf("len(keys) = %d, want 3", len(keys)) + } + // "#belgium" key = SHA256("#belgium")[:16] + h := sha256.Sum256([]byte("#belgium")) + want := h[:16] + if got := keys["#belgium"]; !bytes.Equal(got, want) { + t.Errorf("#belgium key mismatch: got %x, want %x", got, want) + } + // "eu" should be normalized to "#eu" + if _, ok := keys["#eu"]; !ok { + t.Error("expected #eu key") + } + // " #Test " should be normalized to "#Test" + if _, ok := keys["#Test"]; !ok { + t.Error("expected #Test key") + } +} + +func TestMatchScope(t *testing.T) { + // Build a known Code1 for region "#test" and payload type 5, payload "hello" + name := "#test" + h := sha256.Sum256([]byte(name)) + key := h[:16] + + payloadType := byte(0x05) + payloadRaw := []byte("hello") + + mac := hmac.New(sha256.New, key) + mac.Write([]byte{payloadType}) + mac.Write(payloadRaw) + hmacBytes := mac.Sum(nil) + code := uint16(hmacBytes[0]) | uint16(hmacBytes[1])<<8 + if code == 0 { + code = 1 + } else if code == 0xFFFF { + code = 0xFFFE + } + code1Bytes := [2]byte{byte(code & 0xFF), byte(code >> 8)} + code1 := strings.ToUpper(hex.EncodeToString(code1Bytes[:])) + + regionKeys := map[string][]byte{name: key} + + got := matchScope(regionKeys, payloadType, payloadRaw, code1) + if got != name { + t.Errorf("matchScope = %q, want %q", got, name) + } + + // Unscoped (Code1 = 0000) → empty + if got := matchScope(regionKeys, payloadType, payloadRaw, "0000"); got != "" { + t.Errorf("unscoped: matchScope = %q, want empty", got) + } + + // Scoped but no match → empty string sentinel + if got := matchScope(regionKeys, payloadType, payloadRaw, "BEEF"); got != "" { + t.Errorf("no match: matchScope = %q, want empty", got) + } +} +``` + +Also add the needed imports to the test file (`bytes`, `crypto/hmac`, `crypto/sha256`, `encoding/hex`, `strings`). + +- [ ] **Step 2: Run to verify they fail** + +``` +cd cmd/ingestor && go test -run "TestLoadRegionKeys|TestMatchScope" -v +``` +Expected: compile error — `loadRegionKeys` and `matchScope` not defined. + +- [ ] **Step 3: Add `HashRegions` to Config** + +In `cmd/ingestor/config.go`, add the field after `HashChannels`: + +```go +HashChannels []string `json:"hashChannels,omitempty"` +HashRegions []string `json:"hashRegions,omitempty"` +``` + +- [ ] **Step 4: Add `loadRegionKeys` and `matchScope` to main.go** + +Add to `cmd/ingestor/main.go` (near `loadChannelKeys`, around line 755): + +```go +// loadRegionKeys derives 16-byte HMAC keys from configured region names. +// Key derivation matches firmware: SHA256("#regionname")[:16]. +// Names without a leading '#' are prefixed automatically. +func loadRegionKeys(cfg *Config) map[string][]byte { + keys := make(map[string][]byte) + for _, raw := range cfg.HashRegions { + name := strings.TrimSpace(raw) + if name == "" { + continue + } + if !strings.HasPrefix(name, "#") { + name = "#" + name + } + if _, exists := keys[name]; exists { + continue // deduplicate + } + h := sha256.Sum256([]byte(name)) + keys[name] = h[:16] + } + if len(keys) > 0 { + log.Printf("[regions] %d region key(s) loaded", len(keys)) + } + return keys +} + +// matchScope tries each configured region key against Code1 using the same +// HMAC derivation as the firmware (TransportKey::calcTransportCode). +// Returns the matched region name, "" if scoped but no match, or "" if Code1 is "0000". +func matchScope(regionKeys map[string][]byte, payloadType byte, payloadRaw []byte, code1 string) string { + if code1 == "0000" || len(regionKeys) == 0 || len(payloadRaw) == 0 { + return "" + } + for name, key := range regionKeys { + mac := hmac.New(sha256.New, key) + mac.Write([]byte{payloadType}) + mac.Write(payloadRaw) + hmacBytes := mac.Sum(nil) + code := uint16(hmacBytes[0]) | uint16(hmacBytes[1])<<8 + if code == 0 { + code = 1 + } else if code == 0xFFFF { + code = 0xFFFE + } + codeBytes := [2]byte{byte(code & 0xFF), byte(code >> 8)} + if strings.ToUpper(hex.EncodeToString(codeBytes[:])) == code1 { + return name + } + } + return "" // scoped but no configured region matched +} +``` + +Add `"crypto/hmac"` to the imports in `main.go` (it already imports `crypto/sha256`). + +- [ ] **Step 5: Run tests** + +``` +cd cmd/ingestor && go test -run "TestLoadRegionKeys|TestMatchScope" -v +``` +Expected: PASS + +- [ ] **Step 6: Run all ingestor tests** + +``` +cd cmd/ingestor && go test ./... 2>&1 | tail -10 +``` +Expected: all PASS + +- [ ] **Step 7: Commit** + +```bash +git add cmd/ingestor/config.go cmd/ingestor/main.go cmd/ingestor/main_test.go +git commit -m "feat(ingestor): add hashRegions config + loadRegionKeys + matchScope (#899)" +``` + +--- + +## Task 3: DB migration — `scope_name` column + +**Files:** +- Modify: `cmd/ingestor/db.go` — migration block + `BackfillScopeNames` +- Test: `cmd/ingestor/db_test.go` (or `main_test.go`) + +- [ ] **Step 1: Write the failing migration test** + +Add to `cmd/ingestor/db_test.go` (create the file if it doesn't exist, otherwise append): + +```go +func TestScopeNameMigration(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + store, err := OpenStore(dbPath) + if err != nil { + t.Fatalf("OpenStore: %v", err) + } + defer store.Close() + + // Verify scope_name column exists + rows, err := store.db.Query("PRAGMA table_info(transmissions)") + if err != nil { + t.Fatalf("PRAGMA: %v", err) + } + defer rows.Close() + found := false + for rows.Next() { + var cid int + var colName, colType string + var notNull, pk int + var dflt interface{} + if err := rows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk); err == nil { + if colName == "scope_name" { + found = true + } + } + } + if !found { + t.Error("scope_name column not found in transmissions") + } +} +``` + +- [ ] **Step 2: Run to verify it fails** + +``` +cd cmd/ingestor && go test -run TestScopeNameMigration -v +``` +Expected: FAIL — scope_name column not found. + +- [ ] **Step 3: Add the migration to `cmd/ingestor/db.go`** + +Append after the last migration block (after the `observations_raw_hex_v1` block, before `return nil`): + +```go +// Migration: add scope_name column for transport-route region matching (#899) +row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'scope_name_v1'") +if row.Scan(&migDone) != nil { + log.Println("[migration] Adding scope_name column to transmissions...") + db.Exec(`ALTER TABLE transmissions ADD COLUMN scope_name TEXT DEFAULT NULL`) + db.Exec(`CREATE INDEX IF NOT EXISTS idx_tx_scope_name ON transmissions(scope_name) WHERE scope_name IS NOT NULL`) + db.Exec(`INSERT INTO _migrations (name) VALUES ('scope_name_v1')`) + log.Println("[migration] scope_name column added") +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +``` +cd cmd/ingestor && go test -run TestScopeNameMigration -v +``` +Expected: PASS + +- [ ] **Step 5: Add `BackfillScopeNames` to `cmd/ingestor/db.go`** + +Add this method to `Store` (near the bottom of db.go, before the closing brace): + +```go +// BackfillScopeNames re-decodes raw_hex for existing transport-route rows +// and populates scope_name using the given region keys. +// Skips rows that already have scope_name set. +// Safe to call with empty regionKeys — returns immediately. +func (s *Store) BackfillScopeNames(regionKeys map[string][]byte) { + if len(regionKeys) == 0 { + return + } + rows, err := s.db.Query(` + SELECT id, raw_hex FROM transmissions + WHERE route_type IN (0, 3) AND scope_name IS NULL AND raw_hex IS NOT NULL + `) + if err != nil { + log.Printf("[backfill] scope_name query: %v", err) + return + } + defer rows.Close() + + type row struct { + id int64 + rawHex string + } + var pending []row + for rows.Next() { + var r row + if rows.Scan(&r.id, &r.rawHex) == nil && r.rawHex != "" { + pending = append(pending, r) + } + } + + updated := 0 + for _, r := range pending { + decoded, err := DecodePacket(r.rawHex, nil, false) + if err != nil || decoded.TransportCodes == nil { + continue + } + if decoded.TransportCodes.Code1 == "0000" { + continue // unscoped transport — leave NULL + } + scopeName := matchScope(regionKeys, byte(decoded.Header.PayloadType), decoded.PayloadRaw, decoded.TransportCodes.Code1) + // scopeName == "" means scoped but unknown — write empty string to distinguish from NULL + s.db.Exec(`UPDATE transmissions SET scope_name = ? WHERE id = ?`, scopeName, r.id) + updated++ + } + if updated > 0 { + log.Printf("[backfill] scope_name set for %d/%d transport-route rows", updated, len(pending)) + } +} +``` + +- [ ] **Step 6: Add backfill test** + +Add to `cmd/ingestor/db_test.go`: + +```go +func TestBackfillScopeNames(t *testing.T) { + dir := t.TempDir() + store, err := OpenStore(filepath.Join(dir, "test.db")) + if err != nil { + t.Fatalf("OpenStore: %v", err) + } + defer store.Close() + + // Insert a transport-route packet with known Code1 + regionName := "#test" + h := sha256.Sum256([]byte(regionName)) + key := h[:16] + payloadType := byte(0x05) + payloadRaw := []byte("hello") + + mac := hmac.New(sha256.New, key) + mac.Write([]byte{payloadType}) + mac.Write(payloadRaw) + hmacBytes := mac.Sum(nil) + code := uint16(hmacBytes[0]) | uint16(hmacBytes[1])<<8 + if code == 0 { code = 1 } else if code == 0xFFFF { code = 0xFFFE } + codeBytes := [2]byte{byte(code & 0xFF), byte(code >> 8)} + + // Build raw packet bytes: header(1) + Code1(2) + Code2(2) + path_len(1) + payload + header := byte(0x00) | (payloadType << 2) // TRANSPORT_FLOOD + payload_type in bits 2-5 + raw := []byte{header} + raw = append(raw, codeBytes[:]...) + raw = append(raw, 0x00, 0x00) // Code2 = 0000 + raw = append(raw, 0x00) // path_len = 0 hops + raw = append(raw, payloadRaw...) + rawHex := strings.ToUpper(hex.EncodeToString(raw)) + + store.db.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) + VALUES (?, 'testhash1', datetime('now'), 0, 5, 0, '{}')`, rawHex) + + store.BackfillScopeNames(map[string][]byte{regionName: key}) + + var scopeName *string + store.db.QueryRow(`SELECT scope_name FROM transmissions WHERE hash = 'testhash1'`).Scan(&scopeName) + if scopeName == nil || *scopeName != regionName { + t.Errorf("scope_name = %v, want %q", scopeName, regionName) + } +} +``` + +Add needed imports to db_test.go: `crypto/hmac`, `crypto/sha256`, `encoding/hex`, `strings`. + +- [ ] **Step 7: Run tests** + +``` +cd cmd/ingestor && go test -run "TestScopeNameMigration|TestBackfillScopeNames" -v +``` +Expected: PASS + +- [ ] **Step 8: Commit** + +```bash +git add cmd/ingestor/db.go cmd/ingestor/db_test.go +git commit -m "feat(ingestor/db): add scope_name migration and BackfillScopeNames (#899)" +``` + +--- + +## Task 4: Wire scope matching into ingest + backfill call + +**Files:** +- Modify: `cmd/ingestor/db.go` — `PacketData` struct, `stmtInsertTransmission`, `InsertTransmission` +- Modify: `cmd/ingestor/main.go` — `BuildPacketData` and startup call to `BackfillScopeNames` + +- [ ] **Step 1: Write the failing integration test** + +Add to `cmd/ingestor/main_test.go`: + +```go +func TestBuildPacketDataScopeMatching(t *testing.T) { + // Build region key for "#test" + regionName := "#test" + h := sha256.Sum256([]byte(regionName)) + key := h[:16] + + payloadType := byte(0x05) + payloadRaw := []byte("hello") + + mac := hmac.New(sha256.New, key) + mac.Write([]byte{payloadType}) + mac.Write(payloadRaw) + hmacBytes := mac.Sum(nil) + code := uint16(hmacBytes[0]) | uint16(hmacBytes[1])<<8 + if code == 0 { code = 1 } else if code == 0xFFFF { code = 0xFFFE } + codeBytes := [2]byte{byte(code & 0xFF), byte(code >> 8)} + + // TRANSPORT_FLOOD header with payloadType in bits 2-5 + header := byte(0x00) | (payloadType << 2) + raw := []byte{header} + raw = append(raw, codeBytes[:]...) + raw = append(raw, 0x00, 0x00) // Code2 + raw = append(raw, 0x00) // path_len + raw = append(raw, payloadRaw...) + rawHex := strings.ToUpper(hex.EncodeToString(raw)) + + decoded, err := DecodePacket(rawHex, nil, false) + if err != nil { + t.Fatalf("DecodePacket: %v", err) + } + + msg := &MQTTPacketMessage{Raw: rawHex} + regionKeys := map[string][]byte{regionName: key} + + pktData := BuildPacketData(msg, decoded, "obs1", "region1", regionKeys) + if pktData.ScopeName != regionName { + t.Errorf("ScopeName = %q, want %q", pktData.ScopeName, regionName) + } +} +``` + +- [ ] **Step 2: Run to verify it fails** + +``` +cd cmd/ingestor && go test -run TestBuildPacketDataScopeMatching -v +``` +Expected: compile error — `ScopeName` not on `PacketData`, `BuildPacketData` signature mismatch. + +- [ ] **Step 3: Add `ScopeName` to `PacketData`** + +In `cmd/ingestor/db.go`, in the `PacketData` struct (around line 908): + +```go +type PacketData struct { + RawHex string + Timestamp string + ObserverID string + ObserverName string + SNR *float64 + RSSI *float64 + Score *float64 + Direction *string + Hash string + RouteType int + PayloadType int + PayloadVersion int + PathJSON string + DecodedJSON string + ChannelHash string + ScopeName string // "" = scoped but unknown; only set for transport routes with Code1≠0000 +} +``` + +- [ ] **Step 4: Update `stmtInsertTransmission` in `prepareStatements`** + +In `cmd/ingestor/db.go`, find `stmtInsertTransmission` prepare (around line 432) and update: + +```go +s.stmtInsertTransmission, err = s.db.Prepare(` + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, channel_hash, scope_name) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +`) +``` + +- [ ] **Step 5: Update `InsertTransmission` Exec call** + +In `cmd/ingestor/db.go`, find the `stmtInsertTransmission.Exec` call (around line 559) and add the `scope_name` argument: + +```go +result, err := s.stmtInsertTransmission.Exec( + data.RawHex, hash, now, + data.RouteType, data.PayloadType, data.PayloadVersion, + data.DecodedJSON, nilIfEmpty(data.ChannelHash), + nilIfEmpty(data.ScopeName), +) +``` + +Note: `nilIfEmpty` maps `""` → `nil` (SQL NULL). But the spec requires `""` (empty string) for "scoped but unknown". Use a different helper: + +Replace the last argument with: +```go +scopeNameVal, +``` + +And before the `Exec` call, add: +```go +var scopeNameVal interface{} +if data.RouteType == 0 || data.RouteType == 3 { + // For transport routes, store the matched name (or "" for unknown scoped) + // Only leave NULL for non-transport routes + scopeNameVal = data.ScopeName // may be "" or "#regionname" + if data.ScopeName == "" && /* check Code1 == "0000": */ (decoded TransportCodes is nil check) { +``` + +Wait — `InsertTransmission` takes `*PacketData`, not the decoded packet. We don't have Code1 here. We need a different approach. + +Use a new convention: populate `ScopeName` in `BuildPacketData` as follows: +- Non-transport route → `ScopeName = ""` with a sentinel meaning "not applicable" → store as NULL +- Transport route + Code1 == "0000" → `ScopeName = ""` → store as NULL +- Transport route + Code1 ≠ "0000" + no match → `ScopeName = "\x00"` (internal sentinel for "scoped/unknown") +- Transport route + Code1 ≠ "0000" + match → `ScopeName = "#regionname"` + +Actually, this is getting complicated. Simpler: add `IsTransportScoped bool` to `PacketData`: + +```go +type PacketData struct { + // ... existing fields ... + ScopeName string // matched region name, or "" + IsTransportScoped bool // true = transport route with Code1≠0000 (even if name unknown) +} +``` + +Then in `InsertTransmission`: +```go +var scopeNameVal interface{} +if data.IsTransportScoped { + scopeNameVal = data.ScopeName // "" or "#regionname" — both stored as non-NULL +} // else: NULL (not a transport-scoped packet) +``` + +This cleanly encodes the three-state semantics without sentinels. + +- [ ] **Step 5 (revised): Update `PacketData`, `InsertTransmission`, `BuildPacketData`** + +In `cmd/ingestor/db.go`, `PacketData` struct: + +```go +type PacketData struct { + RawHex string + Timestamp string + ObserverID string + ObserverName string + SNR *float64 + RSSI *float64 + Score *float64 + Direction *string + Hash string + RouteType int + PayloadType int + PayloadVersion int + PathJSON string + DecodedJSON string + ChannelHash string + ScopeName string // matched region name, or "" for unknown-scoped + IsTransportScoped bool // true when route_type IN (0,3) AND Code1 ≠ "0000" +} +``` + +In `InsertTransmission` Exec call, replace `nilIfEmpty(data.ChannelHash),` line and add `scope_name`: + +```go +result, err := s.stmtInsertTransmission.Exec( + data.RawHex, hash, now, + data.RouteType, data.PayloadType, data.PayloadVersion, + data.DecodedJSON, nilIfEmpty(data.ChannelHash), + scopeNameForDB(data), +) +``` + +Add helper function in `db.go`: + +```go +// scopeNameForDB converts PacketData scope fields to the DB value. +// NULL = not a transport-scoped packet; "" = scoped but region unknown; "#name" = matched. +func scopeNameForDB(data *PacketData) interface{} { + if !data.IsTransportScoped { + return nil + } + return data.ScopeName // "" or "#regionname" +} +``` + +- [ ] **Step 6: Update `BuildPacketData` signature and body** + +In `cmd/ingestor/main.go`, find `BuildPacketData` (around line 948) and update signature: + +```go +func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string, regionKeys map[string][]byte) *PacketData { +``` + +At the end of `BuildPacketData`, before the `return pd` line, add scope matching: + +```go +// Scope matching for transport-route packets +if decoded.TransportCodes != nil && decoded.TransportCodes.Code1 != "0000" { + pd.IsTransportScoped = true + pd.ScopeName = matchScope(regionKeys, byte(decoded.Header.PayloadType), decoded.PayloadRaw, decoded.TransportCodes.Code1) +} +``` + +- [ ] **Step 7: Update both `BuildPacketData` call sites in `main.go`** + +Both calls (lines 354 and 377) become: + +```go +pktData := BuildPacketData(mqttMsg, decoded, observerID, region, regionKeys) +``` + +`regionKeys` is loaded once at startup (see Step 9). + +- [ ] **Step 8: Load region keys at startup and call backfill** + +In `main()` in `cmd/ingestor/main.go`, after `loadChannelKeys` is called, add: + +```go +regionKeys := loadRegionKeys(cfg) +``` + +Pass `regionKeys` to `BuildPacketData` where it's called (already done in Step 7). + +Also call backfill after the store is opened (find where `store` is initialized): + +```go +go store.BackfillScopeNames(regionKeys) +``` + +Run in a goroutine so it doesn't block startup. + +- [ ] **Step 9: Run all ingestor tests** + +``` +cd cmd/ingestor && go test ./... 2>&1 | tail -20 +``` +Expected: all PASS (including `TestBuildPacketDataScopeMatching`) + +- [ ] **Step 10: Commit** + +```bash +git add cmd/ingestor/db.go cmd/ingestor/main.go cmd/ingestor/main_test.go +git commit -m "feat(ingestor): wire scope matching into ingest pipeline (#899)" +``` + +--- + +## Task 5: Server — schema detection + `GetScopeStats` + +**Files:** +- Modify: `cmd/server/db.go` — `detectSchema`, add `hasScopeName bool`, add `GetScopeStats` +- Test: `cmd/server/db_test.go` + +- [ ] **Step 1: Write failing test** + +Add to `cmd/server/db_test.go`: + +```go +func TestGetScopeStats(t *testing.T) { + db, err := OpenDB(":memory:") + if err != nil { + t.Fatalf("OpenDB: %v", err) + } + defer db.Close() + + // Create minimal schema + db.conn.Exec(`CREATE TABLE IF NOT EXISTS transmissions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw_hex TEXT, hash TEXT, first_seen TEXT, route_type INTEGER, + payload_type INTEGER, payload_version INTEGER, decoded_json TEXT, + scope_name TEXT DEFAULT NULL + )`) + // Manually set hasScopeName since we bypassed the detector + db.hasScopeName = true + + now := time.Now().UTC().Format(time.RFC3339) + // Transport scoped, known region + db.conn.Exec(`INSERT INTO transmissions (hash, first_seen, route_type, scope_name) VALUES ('a', ?, 0, '#belgium')`, now) + // Transport scoped, unknown + db.conn.Exec(`INSERT INTO transmissions (hash, first_seen, route_type, scope_name) VALUES ('b', ?, 0, '')`, now) + // Transport unscoped (NULL) + db.conn.Exec(`INSERT INTO transmissions (hash, first_seen, route_type, scope_name) VALUES ('c', ?, 0, NULL)`, now) + // Non-transport (should not count) + db.conn.Exec(`INSERT INTO transmissions (hash, first_seen, route_type, scope_name) VALUES ('d', ?, 1, NULL)`, now) + + stats, err := db.GetScopeStats("24h") + if err != nil { + t.Fatalf("GetScopeStats: %v", err) + } + if stats.Summary.TransportTotal != 3 { + t.Errorf("TransportTotal = %d, want 3", stats.Summary.TransportTotal) + } + if stats.Summary.Scoped != 2 { + t.Errorf("Scoped = %d, want 2", stats.Summary.Scoped) + } + if stats.Summary.Unscoped != 1 { + t.Errorf("Unscoped = %d, want 1", stats.Summary.Unscoped) + } + if stats.Summary.UnknownScope != 1 { + t.Errorf("UnknownScope = %d, want 1", stats.Summary.UnknownScope) + } + if len(stats.ByRegion) != 1 || stats.ByRegion[0].Name != "#belgium" || stats.ByRegion[0].Count != 1 { + t.Errorf("ByRegion = %+v, want [{#belgium 1}]", stats.ByRegion) + } +} +``` + +- [ ] **Step 2: Run to verify it fails** + +``` +cd cmd/server && go test -run TestGetScopeStats -v +``` +Expected: compile error — `GetScopeStats` not defined, `hasScopeName` not a field. + +- [ ] **Step 3: Add `hasScopeName` to DB struct and `detectSchema`** + +In `cmd/server/db.go`, add to `DB` struct: + +```go +type DB struct { + conn *sql.DB + path string + isV3 bool + hasResolvedPath bool + hasObsRawHex bool + hasScopeName bool // transmissions.scope_name column exists (#899) + // ... cache fields ... +} +``` + +In `detectSchema`, in the loop that scans column names, add: + +```go +if colName == "scope_name" { + db.hasScopeName = true +} +``` + +- [ ] **Step 4: Add `ScopeStatsResponse` types to `cmd/server/types.go`** + +Add after the `StatsResponse` section: + +```go +// ─── Scope Stats ─────────────────────────────────────────────────────────────── + +type ScopeStatsSummary struct { + TransportTotal int `json:"transportTotal"` + Scoped int `json:"scoped"` + Unscoped int `json:"unscoped"` + UnknownScope int `json:"unknownScope"` +} + +type ScopeRegionCount struct { + Name string `json:"name"` + Count int `json:"count"` +} + +type ScopeTimePoint struct { + T string `json:"t"` + Scoped int `json:"scoped"` + Unscoped int `json:"unscoped"` +} + +type ScopeStatsResponse struct { + Window string `json:"window"` + Summary ScopeStatsSummary `json:"summary"` + ByRegion []ScopeRegionCount `json:"byRegion"` + TimeSeries []ScopeTimePoint `json:"timeSeries"` +} +``` + +- [ ] **Step 5: Add `GetScopeStats` to `cmd/server/db.go`** + +Add at the end of db.go, before the closing line: + +```go +// GetScopeStats returns scope statistics for the given window ("1h", "24h", "7d"). +func (db *DB) GetScopeStats(window string) (*ScopeStatsResponse, error) { + if !db.hasScopeName { + return nil, fmt.Errorf("scope_name column not present — run ingestor to apply migrations") + } + + var since string + var bucketExpr string + switch window { + case "1h": + since = time.Now().Add(-1 * time.Hour).UTC().Format(time.RFC3339) + // 5-minute buckets + bucketExpr = `strftime('%Y-%m-%dT%H:', first_seen) || printf('%02d', (CAST(strftime('%M', first_seen) AS INTEGER) / 5) * 5) || ':00Z'` + case "7d": + since = time.Now().Add(-7 * 24 * time.Hour).UTC().Format(time.RFC3339) + // 6-hour buckets + bucketExpr = `strftime('%Y-%m-%dT', first_seen) || printf('%02d', (CAST(strftime('%H', first_seen) AS INTEGER) / 6) * 6) || ':00:00Z'` + default: // "24h" + window = "24h" + since = time.Now().Add(-24 * time.Hour).UTC().Format(time.RFC3339) + // 1-hour buckets + bucketExpr = `strftime('%Y-%m-%dT%H:00:00Z', first_seen)` + } + + resp := &ScopeStatsResponse{Window: window} + + // Summary counts + row := db.conn.QueryRow(` + SELECT + COUNT(*) AS transport_total, + COUNT(scope_name) AS scoped, + SUM(CASE WHEN scope_name IS NULL THEN 1 ELSE 0 END) AS unscoped, + SUM(CASE WHEN scope_name = '' THEN 1 ELSE 0 END) AS unknown_scope + FROM transmissions + WHERE route_type IN (0, 3) AND first_seen >= ? + `, since) + if err := row.Scan( + &resp.Summary.TransportTotal, + &resp.Summary.Scoped, + &resp.Summary.Unscoped, + &resp.Summary.UnknownScope, + ); err != nil { + return nil, fmt.Errorf("scope summary query: %w", err) + } + + // Per-region counts (named regions only) + rows, err := db.conn.Query(` + SELECT scope_name, COUNT(*) AS cnt + FROM transmissions + WHERE route_type IN (0, 3) AND scope_name IS NOT NULL AND scope_name != '' AND first_seen >= ? + GROUP BY scope_name + ORDER BY cnt DESC + `, since) + if err != nil { + return nil, fmt.Errorf("scope byRegion query: %w", err) + } + defer rows.Close() + for rows.Next() { + var rc ScopeRegionCount + if rows.Scan(&rc.Name, &rc.Count) == nil { + resp.ByRegion = append(resp.ByRegion, rc) + } + } + if resp.ByRegion == nil { + resp.ByRegion = []ScopeRegionCount{} + } + + // Time series + tsQuery := fmt.Sprintf(` + SELECT %s AS bucket, + COUNT(scope_name) AS scoped, + SUM(CASE WHEN scope_name IS NULL THEN 1 ELSE 0 END) AS unscoped + FROM transmissions + WHERE route_type IN (0, 3) AND first_seen >= ? + GROUP BY bucket + ORDER BY bucket + `, bucketExpr) + tsRows, err := db.conn.Query(tsQuery, since) + if err != nil { + return nil, fmt.Errorf("scope timeseries query: %w", err) + } + defer tsRows.Close() + for tsRows.Next() { + var pt ScopeTimePoint + if tsRows.Scan(&pt.T, &pt.Scoped, &pt.Unscoped) == nil { + resp.TimeSeries = append(resp.TimeSeries, pt) + } + } + if resp.TimeSeries == nil { + resp.TimeSeries = []ScopeTimePoint{} + } + + return resp, nil +} +``` + +- [ ] **Step 6: Run test** + +``` +cd cmd/server && go test -run TestGetScopeStats -v +``` +Expected: PASS + +- [ ] **Step 7: Run all server tests** + +``` +cd cmd/server && go test ./... 2>&1 | tail -20 +``` +Expected: all PASS + +- [ ] **Step 8: Commit** + +```bash +git add cmd/server/db.go cmd/server/db_test.go cmd/server/types.go +git commit -m "feat(server/db): add GetScopeStats and ScopeStatsResponse types (#899)" +``` + +--- + +## Task 6: Server — HTTP handler + route registration + +**Files:** +- Modify: `cmd/server/routes.go` — add cache fields to `Server`, register route, add `handleScopeStats` + +- [ ] **Step 1: Write failing handler test** + +Add to `cmd/server/routes_test.go`: + +```go +func TestHandleScopeStats(t *testing.T) { + srv := newTestServer(t) + // Manually mark hasScopeName on the test DB + srv.db.hasScopeName = true + + req := httptest.NewRequest("GET", "/api/scope-stats?window=24h", nil) + w := httptest.NewRecorder() + srv.handleScopeStats(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", w.Code) + } + var resp ScopeStatsResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.Window != "24h" { + t.Errorf("window = %q, want 24h", resp.Window) + } + // TimeSeries and ByRegion are always non-nil slices + if resp.TimeSeries == nil { + t.Error("timeSeries is nil, want empty slice") + } + if resp.ByRegion == nil { + t.Error("byRegion is nil, want empty slice") + } +} +``` + +Check `routes_test.go` for how `newTestServer` is implemented and adapt if needed. + +- [ ] **Step 2: Run to verify it fails** + +``` +cd cmd/server && go test -run TestHandleScopeStats -v +``` +Expected: compile error — `handleScopeStats` not defined. + +- [ ] **Step 3: Add cache fields to `Server` struct** + +In `cmd/server/routes.go`, add to `Server` struct (near the other cache fields): + +```go +// Scope stats cache (30s TTL) +scopeStatsMu sync.Mutex +scopeStatsCache map[string]*ScopeStatsResponse // keyed by window param +scopeStatsCachedAt map[string]time.Time +``` + +- [ ] **Step 4: Register the route** + +In the route registration block in `cmd/server/routes.go` (near `/api/stats`): + +```go +r.HandleFunc("/api/scope-stats", s.handleScopeStats).Methods("GET") +``` + +- [ ] **Step 5: Add `handleScopeStats`** + +Add to `cmd/server/routes.go`: + +```go +func (s *Server) handleScopeStats(w http.ResponseWriter, r *http.Request) { + const scopeStatsTTL = 30 * time.Second + + window := r.URL.Query().Get("window") + if window == "" { + window = "24h" + } + if window != "1h" && window != "24h" && window != "7d" { + writeError(w, 400, "window must be 1h, 24h, or 7d") + return + } + + s.scopeStatsMu.Lock() + if s.scopeStatsCache != nil { + if cached, ok := s.scopeStatsCache[window]; ok && time.Since(s.scopeStatsCachedAt[window]) < scopeStatsTTL { + s.scopeStatsMu.Unlock() + writeJSON(w, cached) + return + } + } + s.scopeStatsMu.Unlock() + + resp, err := s.db.GetScopeStats(window) + if err != nil { + writeError(w, 500, err.Error()) + return + } + + s.scopeStatsMu.Lock() + if s.scopeStatsCache == nil { + s.scopeStatsCache = make(map[string]*ScopeStatsResponse) + s.scopeStatsCachedAt = make(map[string]time.Time) + } + s.scopeStatsCache[window] = resp + s.scopeStatsCachedAt[window] = time.Now() + s.scopeStatsMu.Unlock() + + writeJSON(w, resp) +} +``` + +- [ ] **Step 6: Run tests** + +``` +cd cmd/server && go test -run TestHandleScopeStats -v +``` +Expected: PASS + +- [ ] **Step 7: Run all server tests** + +``` +cd cmd/server && go test ./... 2>&1 | tail -20 +``` +Expected: all PASS + +- [ ] **Step 8: Commit** + +```bash +git add cmd/server/routes.go cmd/server/routes_test.go +git commit -m "feat(server): add /api/scope-stats endpoint (#899)" +``` + +--- + +## Task 7: Update API spec docs + +**Files:** +- Modify: `docs/api-spec.md` + +- [ ] **Step 1: Add `GET /api/scope-stats` to `docs/api-spec.md`** + +Open `docs/api-spec.md` and add an entry for the new endpoint following the existing format. Include: method, path, query params (`window`: `1h`/`24h`/`7d`), response shape with the `ScopeStatsResponse` JSON, and a note that it requires the ingestor migration to have run. + +- [ ] **Step 2: Commit** + +```bash +git add docs/api-spec.md +git commit -m "docs: add /api/scope-stats to api-spec (#899)" +``` + +--- + +## Task 8: Frontend — "Scopes" tab in Analytics + +**Files:** +- Modify: `public/analytics.js` — tab button + `renderScopesTab` function +- Modify: `public/index.html` — bump `__BUST__` version for `analytics.js` + +- [ ] **Step 1: Add the tab button** + +In `public/analytics.js`, find the tab buttons list (around line 88). Add after ``: + +```html + +``` + +- [ ] **Step 2: Wire the tab in `renderTab`** + +In `renderTab` (around line 186), add before the closing `}`: + +```js +case 'scopes': await renderScopesTab(el); break; +``` + +- [ ] **Step 3: Add `renderScopesTab` function** + +Add just before the `registerPage('analytics', ...)` line at the end of `analytics.js`: + +```js +// ===================== SCOPES ===================== +async function renderScopesTab(el) { + var window = 'scopes_window'; + var selectedWindow = (typeof sessionStorage !== 'undefined' && sessionStorage.getItem(window)) || '24h'; + + async function load(w) { + el.innerHTML = '
Loading scope stats…
'; + try { + var data = await (await fetch('/api/scope-stats?window=' + encodeURIComponent(w))).json(); + if (data.error) { + el.innerHTML = '
' + esc(data.error) + '
'; + return; + } + render(data, w); + } catch (err) { + el.innerHTML = '
Failed to load scope stats: ' + esc(String(err)) + '
'; + } + } + + function pct(n, total) { + if (!total) return '—'; + return (n / total * 100).toFixed(1) + '%'; + } + + function render(d, w) { + var s = d.summary; + var total = s.transportTotal || 0; + + // Window selector + var winHtml = ['1h', '24h', '7d'].map(function(v) { + return ''; + }).join(''); + + // Summary cards + var cardsHtml = [ + { label: 'Transport Total', value: total.toLocaleString(), note: '' }, + { label: 'Scoped', value: s.scoped.toLocaleString(), note: pct(s.scoped, total) }, + { label: 'Unscoped', value: s.unscoped.toLocaleString(), note: pct(s.unscoped, total) }, + { label: 'Unknown Scope', value: s.unknownScope.toLocaleString(), note: pct(s.unknownScope, s.scoped) + ' of scoped' }, + ].map(function(c) { + return '
' + c.value + '
' + + '
' + c.label + '
' + + (c.note ? '
' + c.note + '
' : '') + + '
'; + }).join(''); + + // Per-region table + var tableBody = ''; + if (d.byRegion && d.byRegion.length) { + tableBody = d.byRegion.map(function(r) { + return '' + esc(r.name) + '' + + '' + r.count.toLocaleString() + '' + + '' + pct(r.count, s.scoped) + ''; + }).join(''); + if (s.unknownScope > 0) { + tableBody += 'Unknown scope' + + '' + s.unknownScope.toLocaleString() + '' + + '' + pct(s.unknownScope, s.scoped) + ''; + } + } else if (s.scoped === 0) { + tableBody = 'No scoped messages in this window'; + } else { + tableBody = 'No regions configured — add hashRegions to your config'; + } + + // Time-series chart (two-line SVG) + var chartHtml = ''; + if (d.timeSeries && d.timeSeries.length > 1) { + var scopedVals = d.timeSeries.map(function(p) { return p.scoped; }); + var unscopedVals = d.timeSeries.map(function(p) { return p.unscoped; }); + var maxVal = Math.max(1, Math.max.apply(null, scopedVals.concat(unscopedVals))); + var W = 800, H = 180, padL = 44, padB = 24, padT = 10, padR = 10; + var plotW = W - padL - padR, plotH = H - padB - padT; + var n = d.timeSeries.length; + + function pts(vals) { + return vals.map(function(v, i) { + var x = padL + i * plotW / Math.max(n - 1, 1); + var y = padT + plotH - (v / maxVal) * plotH; + return x.toFixed(1) + ',' + y.toFixed(1); + }).join(' '); + } + + // Grid lines + var grid = ''; + for (var gi = 0; gi <= 4; gi++) { + var gy = padT + plotH * gi / 4; + var gv = Math.round(maxVal * (4 - gi) / 4); + grid += ''; + grid += '' + gv + ''; + } + + var legendX = padL + plotW - 120; + chartHtml = '
' + + '' + + grid + + '' + + '' + + '' + + 'Scoped' + + '' + + 'Unscoped' + + '
'; + } + + el.innerHTML = + '

🔭 Scope Statistics

' + + '
' + winHtml + '
' + + '
' + cardsHtml + '
' + + '' + + '' + + '' + tableBody + '
RegionMessages% of Scoped
' + + chartHtml; + + // Bind window selector + el.querySelectorAll('[data-win]').forEach(function(btn) { + btn.addEventListener('click', function() { + selectedWindow = btn.dataset.win; + if (typeof sessionStorage !== 'undefined') sessionStorage.setItem(window, selectedWindow); + load(selectedWindow); + }); + }); + } + + load(selectedWindow); +} +``` + +- [ ] **Step 4: Bump cache buster in `public/index.html`** + +Find the line that loads `analytics.js` with a `?v=__BUST__` suffix and increment the bust value to match the other files changed in this PR. Follow the project convention for how `__BUST__` is managed (check the existing values in the file and the Makefile/build script if any). + +- [ ] **Step 5: Manual smoke test** + +Start the server pointing at a DB that has had the ingestor migration run: +``` +cd cmd/server && go run . -db path/to/meshcore.db +``` +Open the browser at `http://localhost:8080/#/analytics?tab=scopes`. Verify: +- The "Scopes" tab appears and is clickable +- Summary cards render with counts (may be zeros on a fresh DB) +- Window selector switches between 1h / 24h / 7d +- No JS errors in the browser console + +- [ ] **Step 6: Commit** + +```bash +git add public/analytics.js public/index.html +git commit -m "feat(frontend): add Scopes tab to Analytics page (#899)" +``` + +--- + +## Self-Review Notes + +- **Spec coverage**: All items covered: Feature 1 (scoped/unscoped counts) ✅, Feature 2 (region matching via config) ✅, Feature 3 (excluded — firmware limitation) ✅ noted in spec +- **NULL semantics**: `scopeNameForDB` correctly encodes the three states (NULL / "" / "#name") +- **HMAC derivation**: `matchScope` mirrors firmware exactly — little-endian uint16, zero/FFFF adjustment, first 2 bytes of HMAC output +- **API spec doc**: Task 7 updates `docs/api-spec.md` as required by project convention +- **Cache buster**: Task 8 Step 4 bumps `analytics.js` bust value +- **Backfill goroutine**: Runs in background so ingestor startup is not blocked +- **Empty slices**: `GetScopeStats` always returns non-nil slices for `ByRegion` and `TimeSeries` to avoid `null` in JSON From 7284242801ad6640fd454b9f82681770c2bf3c21 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 12:57:26 +0200 Subject: [PATCH 04/17] feat(ingestor/decoder): expose PayloadRaw bytes on DecodedPacket (#899) --- cmd/ingestor/decoder.go | 2 ++ cmd/ingestor/decoder_test.go | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/cmd/ingestor/decoder.go b/cmd/ingestor/decoder.go index f28454e3..d39ed4c6 100644 --- a/cmd/ingestor/decoder.go +++ b/cmd/ingestor/decoder.go @@ -146,6 +146,7 @@ type DecodedPacket struct { Payload Payload `json:"payload"` Raw string `json:"raw"` Anomaly string `json:"anomaly,omitempty"` + PayloadRaw []byte `json:"-"` } func decodeHeader(b byte) Header { @@ -639,6 +640,7 @@ func DecodePacket(hexString string, channelKeys map[string]string, validateSigna Payload: payload, Raw: strings.ToUpper(hexString), Anomaly: anomaly, + PayloadRaw: payloadBuf, }, nil } diff --git a/cmd/ingestor/decoder_test.go b/cmd/ingestor/decoder_test.go index f49b25dc..71d8631d 100644 --- a/cmd/ingestor/decoder_test.go +++ b/cmd/ingestor/decoder_test.go @@ -447,6 +447,28 @@ func TestValidateAdvert(t *testing.T) { } } +func TestDecodePacketPayloadRaw(t *testing.T) { + // Build a minimal TRANSPORT_FLOOD packet (route_type=0): + // header(1) + transport_codes(4) + path_len(1) + payload(N) + // Header 0x00 = route_type=TRANSPORT_FLOOD, payload_type=0, version=0 + // Code1=9A52, Code2=0000, path_len=0x00 (0 hops, hash_size=1) + payload := []byte("hello") + raw := []byte{0x00, 0x9A, 0x52, 0x00, 0x00, 0x00} + raw = append(raw, payload...) + hexStr := strings.ToUpper(hex.EncodeToString(raw)) + + decoded, err := DecodePacket(hexStr, nil, false) + if err != nil { + t.Fatalf("DecodePacket: %v", err) + } + if decoded.TransportCodes == nil { + t.Fatal("expected TransportCodes, got nil") + } + if string(decoded.PayloadRaw) != string(payload) { + t.Errorf("PayloadRaw = %v, want %v", decoded.PayloadRaw, payload) + } +} + func TestDecodeGrpTxtShort(t *testing.T) { p := decodeGrpTxt([]byte{0x01, 0x02}, nil) if p.Error != "too short" { From cd2c81b5979438026c2423152d70e866f7325cf2 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 15:11:18 +0200 Subject: [PATCH 05/17] feat(ingestor): add hashRegions config + loadRegionKeys + matchScope (#899) --- cmd/ingestor/config.go | 1 + cmd/ingestor/main.go | 46 ++++++++++++++++++++++++++ cmd/ingestor/main_test.go | 69 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+) diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index 70c18fbe..85e99370 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -37,6 +37,7 @@ type Config struct { ChannelKeysPath string `json:"channelKeysPath,omitempty"` ChannelKeys map[string]string `json:"channelKeys,omitempty"` HashChannels []string `json:"hashChannels,omitempty"` + HashRegions []string `json:"hashRegions,omitempty"` Retention *RetentionConfig `json:"retention,omitempty"` Metrics *MetricsConfig `json:"metrics,omitempty"` GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"` diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 481c7cc1..a7c6adf0 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -1,6 +1,7 @@ package main import ( + "crypto/hmac" "crypto/sha256" "crypto/tls" "encoding/hex" @@ -810,6 +811,51 @@ func loadChannelKeys(cfg *Config, configPath string) map[string]string { return keys } +func loadRegionKeys(cfg *Config) map[string][]byte { + keys := make(map[string][]byte) + for _, raw := range cfg.HashRegions { + name := strings.TrimSpace(raw) + if name == "" { + continue + } + if !strings.HasPrefix(name, "#") { + name = "#" + name + } + if _, exists := keys[name]; exists { + continue + } + h := sha256.Sum256([]byte(name)) + keys[name] = h[:16] + } + if len(keys) > 0 { + log.Printf("[regions] %d region key(s) loaded", len(keys)) + } + return keys +} + +func matchScope(regionKeys map[string][]byte, payloadType byte, payloadRaw []byte, code1 string) string { + if code1 == "0000" || len(regionKeys) == 0 || len(payloadRaw) == 0 { + return "" + } + for name, key := range regionKeys { + mac := hmac.New(sha256.New, key) + mac.Write([]byte{payloadType}) + mac.Write(payloadRaw) + hmacBytes := mac.Sum(nil) + code := uint16(hmacBytes[0]) | uint16(hmacBytes[1])<<8 + if code == 0 { + code = 1 + } else if code == 0xFFFF { + code = 0xFFFE + } + codeBytes := [2]byte{byte(code & 0xFF), byte(code >> 8)} + if strings.ToUpper(hex.EncodeToString(codeBytes[:])) == code1 { + return name + } + } + return "" +} + // Version info (set via ldflags) var version = "dev" diff --git a/cmd/ingestor/main_test.go b/cmd/ingestor/main_test.go index 6a10bcb9..73680056 100644 --- a/cmd/ingestor/main_test.go +++ b/cmd/ingestor/main_test.go @@ -1,10 +1,15 @@ package main import ( + "bytes" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" "encoding/json" "math" "os" "path/filepath" + "strings" "testing" "time" ) @@ -780,3 +785,67 @@ func TestIATAFilterDoesNotDropStatusMessages(t *testing.T) { t.Error("packet from out-of-region BFL should still be filtered by IATA") } } + +func TestLoadRegionKeys(t *testing.T) { + cfg := &Config{HashRegions: []string{"#belgium", "eu", " #Test ", "", "#belgium"}} + keys := loadRegionKeys(cfg) + + // Deduplication + normalization + if len(keys) != 3 { + t.Fatalf("len(keys) = %d, want 3", len(keys)) + } + // "#belgium" key = SHA256("#belgium")[:16] + h := sha256.Sum256([]byte("#belgium")) + want := h[:16] + if got := keys["#belgium"]; !bytes.Equal(got, want) { + t.Errorf("#belgium key mismatch: got %x, want %x", got, want) + } + // "eu" should be normalized to "#eu" + if _, ok := keys["#eu"]; !ok { + t.Error("expected #eu key") + } + // " #Test " should be normalized to "#Test" + if _, ok := keys["#Test"]; !ok { + t.Error("expected #Test key") + } +} + +func TestMatchScope(t *testing.T) { + // Build a known Code1 for region "#test" and payload type 5, payload "hello" + name := "#test" + h := sha256.Sum256([]byte(name)) + key := h[:16] + + payloadType := byte(0x05) + payloadRaw := []byte("hello") + + mac := hmac.New(sha256.New, key) + mac.Write([]byte{payloadType}) + mac.Write(payloadRaw) + hmacBytes := mac.Sum(nil) + code := uint16(hmacBytes[0]) | uint16(hmacBytes[1])<<8 + if code == 0 { + code = 1 + } else if code == 0xFFFF { + code = 0xFFFE + } + code1Bytes := [2]byte{byte(code & 0xFF), byte(code >> 8)} + code1 := strings.ToUpper(hex.EncodeToString(code1Bytes[:])) + + regionKeys := map[string][]byte{name: key} + + got := matchScope(regionKeys, payloadType, payloadRaw, code1) + if got != name { + t.Errorf("matchScope = %q, want %q", got, name) + } + + // Unscoped (Code1 = 0000) → empty + if got := matchScope(regionKeys, payloadType, payloadRaw, "0000"); got != "" { + t.Errorf("unscoped: matchScope = %q, want empty", got) + } + + // Scoped but no match → empty string sentinel + if got := matchScope(regionKeys, payloadType, payloadRaw, "BEEF"); got != "" { + t.Errorf("no match: matchScope = %q, want empty", got) + } +} From 5a28a772aac59a1f0e03f8e9e881b5cc4117858a Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 15:34:15 +0200 Subject: [PATCH 06/17] feat(ingestor/db): add scope_name migration and BackfillScopeNames (#899) Co-Authored-By: Claude Sonnet 4.6 --- cmd/ingestor/db.go | 57 ++++++++++++++++++++++++++++ cmd/ingestor/db_test.go | 83 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index bada26c8..f18223f6 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -418,6 +418,16 @@ func applySchema(db *sql.DB) error { log.Println("[migration] observations.raw_hex column added") } + // Migration: add scope_name column to transmissions (#899) + row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'scope_name_v1'") + if row.Scan(&migDone) != nil { + log.Println("[migration] Adding scope_name column to transmissions...") + db.Exec(`ALTER TABLE transmissions ADD COLUMN scope_name TEXT DEFAULT NULL`) + db.Exec(`CREATE INDEX IF NOT EXISTS idx_tx_scope_name ON transmissions(scope_name) WHERE scope_name IS NOT NULL`) + db.Exec(`INSERT INTO _migrations (name) VALUES ('scope_name_v1')`) + log.Println("[migration] scope_name column added") + } + return nil } @@ -904,6 +914,53 @@ func (s *Store) PruneDroppedPackets(retentionDays int) (int64, error) { return n, nil } +// BackfillScopeNames sets scope_name on existing transport-route transmissions that +// lack it, using the provided region HMAC keys. Safe to re-run — only processes +// rows where scope_name IS NULL and route_type IN (0, 3). +func (s *Store) BackfillScopeNames(regionKeys map[string][]byte) { + if len(regionKeys) == 0 { + return + } + rows, err := s.db.Query(` + SELECT id, raw_hex FROM transmissions + WHERE route_type IN (0, 3) AND scope_name IS NULL AND raw_hex IS NOT NULL + `) + if err != nil { + log.Printf("[backfill] scope_name query: %v", err) + return + } + defer rows.Close() + + type row struct { + id int64 + rawHex string + } + var pending []row + for rows.Next() { + var r row + if rows.Scan(&r.id, &r.rawHex) == nil && r.rawHex != "" { + pending = append(pending, r) + } + } + + updated := 0 + for _, r := range pending { + decoded, err := DecodePacket(r.rawHex, nil, false) + if err != nil || decoded.TransportCodes == nil { + continue + } + if decoded.TransportCodes.Code1 == "0000" { + continue + } + scopeName := matchScope(regionKeys, byte(decoded.Header.PayloadType), decoded.PayloadRaw, decoded.TransportCodes.Code1) + s.db.Exec(`UPDATE transmissions SET scope_name = ? WHERE id = ?`, scopeName, r.id) + updated++ + } + if updated > 0 { + log.Printf("[backfill] scope_name set for %d/%d transport-route rows", updated, len(pending)) + } +} + // PacketData holds the data needed to insert a packet into the DB. type PacketData struct { RawHex string diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index d51903f9..7eee07d1 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -1,7 +1,10 @@ package main import ( + "crypto/hmac" + "crypto/sha256" "database/sql" + "encoding/hex" "encoding/json" "fmt" "os" @@ -2123,3 +2126,83 @@ func TestBuildPacketData_NonTracePathJSON(t *testing.T) { t.Errorf("path_json = %s, want %s", pd.PathJSON, expectedPathJSON) } } + +func TestScopeNameMigration(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + store, err := OpenStore(dbPath) + if err != nil { + t.Fatalf("OpenStore: %v", err) + } + defer store.Close() + + // Verify scope_name column exists + rows, err := store.db.Query("PRAGMA table_info(transmissions)") + if err != nil { + t.Fatalf("PRAGMA: %v", err) + } + defer rows.Close() + found := false + for rows.Next() { + var cid int + var colName, colType string + var notNull, pk int + var dflt interface{} + if err := rows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk); err == nil { + if colName == "scope_name" { + found = true + } + } + } + if !found { + t.Error("scope_name column not found in transmissions") + } +} + +func TestBackfillScopeNames(t *testing.T) { + dir := t.TempDir() + store, err := OpenStore(filepath.Join(dir, "test.db")) + if err != nil { + t.Fatalf("OpenStore: %v", err) + } + defer store.Close() + + // Insert a transport-route packet with known Code1 + regionName := "#test" + h := sha256.Sum256([]byte(regionName)) + key := h[:16] + payloadType := byte(0x05) + payloadRaw := []byte("hello") + + mac := hmac.New(sha256.New, key) + mac.Write([]byte{payloadType}) + mac.Write(payloadRaw) + hmacBytes := mac.Sum(nil) + code := uint16(hmacBytes[0]) | uint16(hmacBytes[1])<<8 + if code == 0 { + code = 1 + } else if code == 0xFFFF { + code = 0xFFFE + } + codeBytes := [2]byte{byte(code & 0xFF), byte(code >> 8)} + + // Build raw packet bytes: header(1) + Code1(2) + Code2(2) + path_len(1) + payload + header := byte(0x00) | (payloadType << 2) // TRANSPORT_FLOOD + payload_type in bits 2-5 + raw := []byte{header} + raw = append(raw, codeBytes[:]...) + raw = append(raw, 0x00, 0x00) // Code2 = 0000 + raw = append(raw, 0x00) // path_len = 0 hops + raw = append(raw, payloadRaw...) + rawHex := strings.ToUpper(hex.EncodeToString(raw)) + + store.db.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) + VALUES (?, 'testhash1', datetime('now'), 0, 5, 0, '{}')`, rawHex) + + store.BackfillScopeNames(map[string][]byte{regionName: key}) + + var scopeName *string + store.db.QueryRow(`SELECT scope_name FROM transmissions WHERE hash = 'testhash1'`).Scan(&scopeName) + if scopeName == nil || *scopeName != regionName { + t.Errorf("scope_name = %v, want %q", scopeName, regionName) + } +} From 124d3900a80e0a2b9127e574f6684209172ab012 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 15:39:45 +0200 Subject: [PATCH 07/17] fix(ingestor/db): check rows.Err() after BackfillScopeNames iteration (#899) --- cmd/ingestor/db.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index f18223f6..04c616aa 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -942,6 +942,10 @@ func (s *Store) BackfillScopeNames(regionKeys map[string][]byte) { pending = append(pending, r) } } + if err := rows.Err(); err != nil { + log.Printf("[backfill] scope_name iteration error: %v", err) + return + } updated := 0 for _, r := range pending { From 8a9f4a29e8757cd5af4e8df0b79d271b9a485131 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 15:57:39 +0200 Subject: [PATCH 08/17] feat(ingestor): wire scope matching into ingest pipeline (#899) Co-Authored-By: Claude Sonnet 4.6 --- cmd/ingestor/coverage_boost_test.go | 38 +++++----- cmd/ingestor/db.go | 53 ++++++++----- cmd/ingestor/db_test.go | 16 ++-- cmd/ingestor/main.go | 11 ++- cmd/ingestor/main_test.go | 95 ++++++++++++++++++------ cmd/ingestor/sig_validate_ingest_test.go | 12 +-- 6 files changed, 146 insertions(+), 79 deletions(-) diff --git a/cmd/ingestor/coverage_boost_test.go b/cmd/ingestor/coverage_boost_test.go index 90f82b48..af5055b4 100644 --- a/cmd/ingestor/coverage_boost_test.go +++ b/cmd/ingestor/coverage_boost_test.go @@ -158,7 +158,7 @@ func TestHandleMessageChannelMessage(t *testing.T) { payload := []byte(`{"text":"Alice: Hello everyone","channel_idx":3,"SNR":5.0,"RSSI":-95,"score":10,"direction":"rx","sender_timestamp":1700000000}`) msg := &mockMessage{topic: "meshcore/message/channel/2", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -218,7 +218,7 @@ func TestHandleMessageChannelMessageEmptyText(t *testing.T) { store, source := newTestContext(t) msg := &mockMessage{topic: "meshcore/message/channel/1", payload: []byte(`{"text":""}`)} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -233,7 +233,7 @@ func TestHandleMessageChannelNoSender(t *testing.T) { store, source := newTestContext(t) msg := &mockMessage{topic: "meshcore/message/channel/1", payload: []byte(`{"text":"no sender here"}`)} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil { @@ -250,7 +250,7 @@ func TestHandleMessageDirectMessage(t *testing.T) { payload := []byte(`{"text":"Bob: Hey there","sender_timestamp":1700000000,"SNR":3.0,"rssi":-100,"Score":8,"Direction":"tx"}`) msg := &mockMessage{topic: "meshcore/message/direct/abc123", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -294,7 +294,7 @@ func TestHandleMessageDirectMessageEmptyText(t *testing.T) { store, source := newTestContext(t) msg := &mockMessage{topic: "meshcore/message/direct/abc", payload: []byte(`{"text":""}`)} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -309,7 +309,7 @@ func TestHandleMessageDirectNoSender(t *testing.T) { store, source := newTestContext(t) msg := &mockMessage{topic: "meshcore/message/direct/xyz", payload: []byte(`{"text":"message with no colon"}`)} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -328,7 +328,7 @@ func TestHandleMessageUppercaseScoreDirection(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","Score":9.0,"Direction":"tx"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var score *float64 var direction *string @@ -349,7 +349,7 @@ func TestHandleMessageChannelLowercaseFields(t *testing.T) { payload := []byte(`{"text":"Test: msg","snr":3.0,"rssi":-90,"Score":5,"Direction":"rx"}`) msg := &mockMessage{topic: "meshcore/message/channel/0", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -365,7 +365,7 @@ func TestHandleMessageDirectLowercaseFields(t *testing.T) { payload := []byte(`{"text":"Test: msg","snr":2.0,"rssi":-85,"score":7,"direction":"tx"}`) msg := &mockMessage{topic: "meshcore/message/direct/xyz", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -388,7 +388,7 @@ func TestHandleMessageAdvertWithTelemetry(t *testing.T) { payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) // Should have created transmission, node, and observer var txCount, nodeCount, obsCount int @@ -428,7 +428,7 @@ func TestHandleMessageAdvertGeoFiltered(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, &Config{GeoFilter: gf}) + handleMessage(store, "test", source, msg, nil, nil, &Config{GeoFilter: gf}) // Geo-filtered adverts should not create nodes var nodeCount int @@ -665,7 +665,7 @@ func TestHandleMessageCorruptedAdvertNoNode(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil { @@ -687,7 +687,7 @@ func TestHandleMessageNonAdvertPacket(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -864,7 +864,7 @@ func TestHandleMessageChannelLongSender(t *testing.T) { longText := "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA: msg" payload := []byte(`{"text":"` + longText + `"}`) msg := &mockMessage{topic: "meshcore/message/channel/1", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil { @@ -883,7 +883,7 @@ func TestHandleMessageDirectLongSender(t *testing.T) { longText := "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB: msg" payload := []byte(`{"text":"` + longText + `"}`) msg := &mockMessage{topic: "meshcore/message/direct/abc", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -900,7 +900,7 @@ func TestHandleMessageDirectUppercaseScoreDirection(t *testing.T) { payload := []byte(`{"text":"X: hi","Score":6,"Direction":"rx"}`) msg := &mockMessage{topic: "meshcore/message/direct/d1", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -930,7 +930,7 @@ func TestHandleMessageChannelUppercaseScoreDirection(t *testing.T) { payload := []byte(`{"text":"Y: hi","Score":4,"Direction":"tx"}`) msg := &mockMessage{topic: "meshcore/message/channel/5", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -961,7 +961,7 @@ func TestHandleMessageRawLowercaseScore(t *testing.T) { rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976" payload := []byte(`{"raw":"` + rawHex + `","score":3.5}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var score *float64 if err := store.db.QueryRow("SELECT score FROM observations LIMIT 1").Scan(&score); err != nil { @@ -980,7 +980,7 @@ func TestHandleMessageStatusNoOrigin(t *testing.T) { topic: "meshcore/LAX/obs5/status", payload: []byte(`{"model":"L1"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM observers WHERE id = 'obs5'").Scan(&count); err != nil { diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 04c616aa..86243cb9 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -440,8 +440,8 @@ func (s *Store) prepareStatements() error { } s.stmtInsertTransmission, err = s.db.Prepare(` - INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, channel_hash) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, channel_hash, scope_name) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { return err @@ -570,6 +570,7 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) { data.RawHex, hash, now, data.RouteType, data.PayloadType, data.PayloadVersion, data.DecodedJSON, nilIfEmpty(data.ChannelHash), + scopeNameForDB(data), ) if err != nil { s.Stats.WriteErrors.Add(1) @@ -967,21 +968,23 @@ func (s *Store) BackfillScopeNames(regionKeys map[string][]byte) { // PacketData holds the data needed to insert a packet into the DB. type PacketData struct { - RawHex string - Timestamp string - ObserverID string - ObserverName string - SNR *float64 - RSSI *float64 - Score *float64 - Direction *string - Hash string - RouteType int - PayloadType int - PayloadVersion int - PathJSON string - DecodedJSON string - ChannelHash string // grouping key for channel queries (#762) + RawHex string + Timestamp string + ObserverID string + ObserverName string + SNR *float64 + RSSI *float64 + Score *float64 + Direction *string + Hash string + RouteType int + PayloadType int + PayloadVersion int + PathJSON string + DecodedJSON string + ChannelHash string // grouping key for channel queries (#762) + ScopeName string // matched region name, or "" for unknown-scoped + IsTransportScoped bool // true when route_type IN (0,3) AND Code1 ≠ "0000" } // nilIfEmpty returns nil for empty strings (for nullable DB columns). @@ -992,6 +995,15 @@ func nilIfEmpty(s string) interface{} { return s } +// scopeNameForDB encodes PacketData scope semantics for DB storage: +// non-transport-scoped → NULL; transport-scoped → ScopeName (may be "" for unknown). +func scopeNameForDB(data *PacketData) interface{} { + if !data.IsTransportScoped { + return nil + } + return data.ScopeName // "" or "#regionname" +} + // MQTTPacketMessage is the JSON payload from an MQTT raw packet message. type MQTTPacketMessage struct { Raw string `json:"raw"` @@ -1006,7 +1018,7 @@ type MQTTPacketMessage struct { // path_json is derived directly from raw_hex header bytes (not decoded.Path.Hops) // to guarantee the stored path always matches the raw bytes. This matters for // TRACE packets where decoded.Path.Hops is overwritten with payload hops (#886). -func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string) *PacketData { +func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string, regionKeys map[string][]byte) *PacketData { now := time.Now().UTC().Format(time.RFC3339) pathJSON := "[]" // For TRACE packets, path_json must be the payload-decoded route hops @@ -1048,5 +1060,10 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, } } + if decoded.TransportCodes != nil && decoded.TransportCodes.Code1 != "0000" { + pd.IsTransportScoped = true + pd.ScopeName = matchScope(regionKeys, byte(decoded.Header.PayloadType), decoded.PayloadRaw, decoded.TransportCodes.Code1) + } + return pd } diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index 7eee07d1..9ff10ebe 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -590,7 +590,7 @@ func TestEndToEndIngest(t *testing.T) { msg := &MQTTPacketMessage{ Raw: rawHex, } - pktData := BuildPacketData(msg, decoded, "obs1", "SJC") + pktData := BuildPacketData(msg, decoded, "obs1", "SJC", nil) if _, err := s.InsertTransmission(pktData); err != nil { t.Fatal(err) } @@ -784,7 +784,7 @@ func TestBuildPacketData(t *testing.T) { Origin: "test-observer", } - pkt := BuildPacketData(msg, decoded, "obs123", "SJC") + pkt := BuildPacketData(msg, decoded, "obs123", "SJC", nil) if pkt.RawHex != rawHex { t.Errorf("rawHex mismatch") @@ -829,7 +829,7 @@ func TestBuildPacketDataWithHops(t *testing.T) { t.Fatal(err) } msg := &MQTTPacketMessage{Raw: raw} - pkt := BuildPacketData(msg, decoded, "", "") + pkt := BuildPacketData(msg, decoded, "", "", nil) if pkt.PathJSON == "[]" { t.Error("pathJSON should contain hops") @@ -842,7 +842,7 @@ func TestBuildPacketDataWithHops(t *testing.T) { func TestBuildPacketDataNilSNRRSSI(t *testing.T) { decoded, _ := DecodePacket("0A00"+strings.Repeat("00", 10), nil, false) msg := &MQTTPacketMessage{Raw: "0A00" + strings.Repeat("00", 10)} - pkt := BuildPacketData(msg, decoded, "", "") + pkt := BuildPacketData(msg, decoded, "", "", nil) if pkt.SNR != nil { t.Errorf("SNR should be nil") @@ -1643,7 +1643,7 @@ func TestBuildPacketDataScoreAndDirection(t *testing.T) { Direction: &dir, } - pkt := BuildPacketData(msg, decoded, "obs1", "SJC") + pkt := BuildPacketData(msg, decoded, "obs1", "SJC", nil) if pkt.Score == nil || *pkt.Score != 42.0 { t.Errorf("Score=%v, want 42.0", pkt.Score) } @@ -1655,7 +1655,7 @@ func TestBuildPacketDataScoreAndDirection(t *testing.T) { func TestBuildPacketDataNilScoreDirection(t *testing.T) { decoded, _ := DecodePacket("0A00"+strings.Repeat("00", 10), nil, false) msg := &MQTTPacketMessage{Raw: "0A00" + strings.Repeat("00", 10)} - pkt := BuildPacketData(msg, decoded, "", "") + pkt := BuildPacketData(msg, decoded, "", "", nil) if pkt.Score != nil { t.Errorf("Score should be nil, got %v", *pkt.Score) @@ -2087,7 +2087,7 @@ func TestBuildPacketData_TraceUsesPayloadHops(t *testing.T) { } msg := &MQTTPacketMessage{Raw: rawHex} - pd := BuildPacketData(msg, decoded, "test-obs", "TST") + pd := BuildPacketData(msg, decoded, "test-obs", "TST", nil) // For TRACE: path_json MUST be the payload-decoded route hops, NOT the SNR bytes expectedPathJSON := `["67","33","D6","33","67"]` @@ -2119,7 +2119,7 @@ func TestBuildPacketData_NonTracePathJSON(t *testing.T) { } msg := &MQTTPacketMessage{Raw: rawHex} - pd := BuildPacketData(msg, decoded, "obs1", "TST") + pd := BuildPacketData(msg, decoded, "obs1", "TST", nil) expectedPathJSON := `["AA","BB"]` if pd.PathJSON != expectedPathJSON { diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index a7c6adf0..44cd7853 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -113,6 +113,9 @@ func main() { log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted") } + regionKeys := loadRegionKeys(cfg) + go store.BackfillScopeNames(regionKeys) + // Connect to each MQTT source var clients []mqtt.Client for _, source := range sources { @@ -163,7 +166,7 @@ func main() { // Capture source for closure src := source opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { - handleMessage(store, tag, src, m, channelKeys, cfg) + handleMessage(store, tag, src, m, channelKeys, regionKeys, cfg) }) client := mqtt.NewClient(opts) @@ -198,7 +201,7 @@ func main() { log.Println("Done.") } -func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string, cfg *Config) { +func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string, regionKeys map[string][]byte, cfg *Config) { defer func() { if r := recover(); r != nil { log.Printf("MQTT [%s] panic in handler: %v", tag, r) @@ -352,7 +355,7 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, if !NodePassesGeoFilter(decoded.Payload.Lat, decoded.Payload.Lon, cfg.GeoFilter) { return } - pktData := BuildPacketData(mqttMsg, decoded, observerID, region) + pktData := BuildPacketData(mqttMsg, decoded, observerID, region, regionKeys) isNew, err := store.InsertTransmission(pktData) if err != nil { log.Printf("MQTT [%s] db insert error: %v", tag, err) @@ -375,7 +378,7 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, } else { // Non-ADVERT packets: store normally (routing/channel messages from // in-area observers are relevant regardless of relay hop origin). - pktData := BuildPacketData(mqttMsg, decoded, observerID, region) + pktData := BuildPacketData(mqttMsg, decoded, observerID, region, regionKeys) if _, err := store.InsertTransmission(pktData); err != nil { log.Printf("MQTT [%s] db insert error: %v", tag, err) } diff --git a/cmd/ingestor/main_test.go b/cmd/ingestor/main_test.go index 73680056..a968b870 100644 --- a/cmd/ingestor/main_test.go +++ b/cmd/ingestor/main_test.go @@ -135,7 +135,7 @@ func TestHandleMessageRawPacket(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","SNR":5.5,"RSSI":-100.0,"origin":"myobs"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -152,7 +152,7 @@ func TestHandleMessageRawPacketAdvert(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) // Should create a node from the ADVERT var count int @@ -174,7 +174,7 @@ func TestHandleMessageInvalidJSON(t *testing.T) { msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: []byte(`not json`)} // Should not panic - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -191,7 +191,7 @@ func TestHandleMessageStatusTopic(t *testing.T) { payload: []byte(`{"origin":"MyObserver"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var name, iata string err := store.db.QueryRow("SELECT name, iata FROM observers WHERE id = 'obs1'").Scan(&name, &iata) @@ -212,11 +212,11 @@ func TestHandleMessageSkipStatusTopics(t *testing.T) { // meshcore/status should be skipped msg1 := &mockMessage{topic: "meshcore/status", payload: []byte(`{"raw":"0A00"}`)} - handleMessage(store, "test", source, msg1, nil, &Config{}) + handleMessage(store, "test", source, msg1, nil, nil, &Config{}) // meshcore/events/connection should be skipped msg2 := &mockMessage{topic: "meshcore/events/connection", payload: []byte(`{"raw":"0A00"}`)} - handleMessage(store, "test", source, msg2, nil, &Config{}) + handleMessage(store, "test", source, msg2, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -235,7 +235,7 @@ func TestHandleMessageIATAFilter(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -248,7 +248,7 @@ func TestHandleMessageIATAFilter(t *testing.T) { topic: "meshcore/LAX/obs2/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg2, nil, &Config{}) + handleMessage(store, "test", source, msg2, nil, nil, &Config{}) store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) if count != 1 { @@ -266,7 +266,7 @@ func TestHandleMessageIATAFilterNoRegion(t *testing.T) { topic: "meshcore", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) // No region part → filter doesn't apply, message goes through // Actually the code checks len(parts) > 1 for IATA filter @@ -282,7 +282,7 @@ func TestHandleMessageNoRawHex(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"type":"companion","data":"something"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -300,7 +300,7 @@ func TestHandleMessageBadRawHex(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"ZZZZ"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -317,7 +317,7 @@ func TestHandleMessageWithSNRRSSIAsNumbers(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","SNR":7.2,"RSSI":-95}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var snr, rssi *float64 store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi) @@ -336,7 +336,7 @@ func TestHandleMessageMinimalTopic(t *testing.T) { topic: "meshcore/SJC", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -357,7 +357,7 @@ func TestHandleMessageCorruptedAdvert(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) // Transmission should be inserted (even if advert is invalid) var count int @@ -383,7 +383,7 @@ func TestHandleMessageNoObserverID(t *testing.T) { topic: "packets", payload: []byte(`{"raw":"` + rawHex + `","origin":"obs1"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -405,7 +405,7 @@ func TestHandleMessageSNRNotFloat(t *testing.T) { // SNR as a string value — should not parse as float payload := []byte(`{"raw":"` + rawHex + `","SNR":"bad","RSSI":"bad"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -421,7 +421,7 @@ func TestHandleMessageOriginExtraction(t *testing.T) { rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976" payload := []byte(`{"raw":"` + rawHex + `","origin":"MyOrigin"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) // Verify origin was extracted to observer name var name string @@ -444,7 +444,7 @@ func TestHandleMessagePanicRecovery(t *testing.T) { } // Should not panic — the defer/recover should catch it - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) } func TestHandleMessageStatusOriginFallback(t *testing.T) { @@ -456,7 +456,7 @@ func TestHandleMessageStatusOriginFallback(t *testing.T) { topic: "meshcore/SJC/obs1/status", payload: []byte(`{"type":"status"}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var name string err := store.db.QueryRow("SELECT name FROM observers WHERE id = 'obs1'").Scan(&name) @@ -645,7 +645,7 @@ func TestHandleMessageWithLowercaseSNRRSSI(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","snr":5.5,"rssi":-102}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var snr, rssi *float64 store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi) @@ -666,7 +666,7 @@ func TestHandleMessageSNRRSSIUppercaseWins(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","SNR":7.2,"snr":1.0,"RSSI":-95,"rssi":-50}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var snr, rssi *float64 store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi) @@ -686,7 +686,7 @@ func TestHandleMessageNoSNRRSSI(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var snr, rssi *float64 store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi) @@ -757,7 +757,7 @@ func TestIATAFilterDoesNotDropStatusMessages(t *testing.T) { topic: "meshcore/BFL/bfl-obs1/status", payload: []byte(`{"origin":"BFLObserver","stats":{"noise_floor":-105.0}}`), } - handleMessage(store, "test", source, msg, nil, &Config{}) + handleMessage(store, "test", source, msg, nil, nil, &Config{}) var name string var noiseFloor *float64 @@ -778,7 +778,7 @@ func TestIATAFilterDoesNotDropStatusMessages(t *testing.T) { topic: "meshcore/BFL/bfl-obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, pktMsg, nil, &Config{}) + handleMessage(store, "test", source, pktMsg, nil, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) if count != 0 { @@ -849,3 +849,50 @@ func TestMatchScope(t *testing.T) { t.Errorf("no match: matchScope = %q, want empty", got) } } + +func TestBuildPacketDataScopeMatching(t *testing.T) { + // Build region key for "#test" + regionName := "#test" + h := sha256.Sum256([]byte(regionName)) + key := h[:16] + + payloadType := byte(0x05) + payloadRaw := []byte("hello") + + mac := hmac.New(sha256.New, key) + mac.Write([]byte{payloadType}) + mac.Write(payloadRaw) + hmacBytes := mac.Sum(nil) + code := uint16(hmacBytes[0]) | uint16(hmacBytes[1])<<8 + if code == 0 { + code = 1 + } else if code == 0xFFFF { + code = 0xFFFE + } + codeBytes := [2]byte{byte(code & 0xFF), byte(code >> 8)} + + // TRANSPORT_FLOOD header with payloadType in bits 2-5 + header := byte(0x00) | (payloadType << 2) + raw := []byte{header} + raw = append(raw, codeBytes[:]...) + raw = append(raw, 0x00, 0x00) // Code2 + raw = append(raw, 0x00) // path_len + raw = append(raw, payloadRaw...) + rawHex := strings.ToUpper(hex.EncodeToString(raw)) + + decoded, err := DecodePacket(rawHex, nil, false) + if err != nil { + t.Fatalf("DecodePacket: %v", err) + } + + msg := &MQTTPacketMessage{Raw: rawHex} + regionKeys := map[string][]byte{regionName: key} + + pktData := BuildPacketData(msg, decoded, "obs1", "region1", regionKeys) + if pktData.ScopeName != regionName { + t.Errorf("ScopeName = %q, want %q", pktData.ScopeName, regionName) + } + if !pktData.IsTransportScoped { + t.Error("IsTransportScoped should be true") + } +} diff --git a/cmd/ingestor/sig_validate_ingest_test.go b/cmd/ingestor/sig_validate_ingest_test.go index 8dafa181..ab09e1fb 100644 --- a/cmd/ingestor/sig_validate_ingest_test.go +++ b/cmd/ingestor/sig_validate_ingest_test.go @@ -61,7 +61,7 @@ func TestSigValidation_ValidAdvertStored(t *testing.T) { msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+rawHex+`","origin":"TestObs"}`) cfg := &Config{} - handleMessage(store, "test", source, msg, nil, cfg) + handleMessage(store, "test", source, msg, nil, nil, cfg) // Verify packet was stored var count int @@ -98,7 +98,7 @@ func TestSigValidation_TamperedSignatureDropped(t *testing.T) { msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+tamperedHex+`","origin":"TestObs"}`) cfg := &Config{} - handleMessage(store, "test", source, msg, nil, cfg) + handleMessage(store, "test", source, msg, nil, nil, cfg) // Verify packet was NOT stored in transmissions var txCount int @@ -157,7 +157,7 @@ func TestSigValidation_TruncatedAppdataDropped(t *testing.T) { msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+truncatedHex+`","origin":"TestObs"}`) cfg := &Config{} - handleMessage(store, "test", source, msg, nil, cfg) + handleMessage(store, "test", source, msg, nil, nil, cfg) var txCount int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount) @@ -192,7 +192,7 @@ func TestSigValidation_DisabledByConfig(t *testing.T) { falseVal := false cfg := &Config{ValidateSignatures: &falseVal} - handleMessage(store, "test", source, msg, nil, cfg) + handleMessage(store, "test", source, msg, nil, nil, cfg) // With validation disabled, tampered packet should be stored var txCount int @@ -225,7 +225,7 @@ func TestSigValidation_DropCounterIncrements(t *testing.T) { rawBytes[76] = '0' } msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+string(rawBytes)+`","origin":"Obs"}`) - handleMessage(store, "test", source, msg, nil, cfg) + handleMessage(store, "test", source, msg, nil, nil, cfg) } if store.Stats.SignatureDrops.Load() != 3 { @@ -258,7 +258,7 @@ func TestSigValidation_LogContainsFields(t *testing.T) { msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+string(rawBytes)+`","origin":"MyObserver"}`) cfg := &Config{} - handleMessage(store, "test", source, msg, nil, cfg) + handleMessage(store, "test", source, msg, nil, nil, cfg) var hash, reason, obsID, obsName, pubkey, nodeName string err = store.db.QueryRow("SELECT hash, reason, observer_id, observer_name, node_pubkey, node_name FROM dropped_packets LIMIT 1"). From 17c7c8e666c85415e190120f9a16d5effcaf53e7 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 16:12:28 +0200 Subject: [PATCH 09/17] feat(server/db): add GetScopeStats and ScopeStatsResponse types (#899) Co-Authored-By: Claude Sonnet 4.6 --- cmd/server/db.go | 113 ++++++++++++++++++++++++++++++++++++++++++ cmd/server/db_test.go | 50 +++++++++++++++++++ cmd/server/types.go | 27 ++++++++++ 3 files changed, 190 insertions(+) diff --git a/cmd/server/db.go b/cmd/server/db.go index aeb09769..b3134c84 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -21,6 +21,7 @@ type DB struct { isV3 bool // v3 schema: observer_idx in observations (vs observer_id in v2) hasResolvedPath bool // observations table has resolved_path column hasObsRawHex bool // observations table has raw_hex column (#881) + hasScopeName bool // transmissions.scope_name column exists (#899) // Channel list cache (60s TTL) — avoids repeated GROUP BY scans (#762) channelsCacheMu sync.Mutex @@ -82,6 +83,24 @@ func (db *DB) detectSchema() { } } } + + txRows, err := db.conn.Query("PRAGMA table_info(transmissions)") + if err != nil { + return + } + defer txRows.Close() + for txRows.Next() { + var cid int + var colName string + var colType sql.NullString + var notNull, pk int + var dflt sql.NullString + if txRows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk) == nil { + if colName == "scope_name" { + db.hasScopeName = true + } + } + } } // transmissionBaseSQL returns the SELECT columns and JOIN clause for transmission-centric queries. @@ -2344,3 +2363,97 @@ func (db *DB) GetSignatureDropCount() int64 { } return count } + +func (db *DB) GetScopeStats(window string) (*ScopeStatsResponse, error) { + if !db.hasScopeName { + return nil, fmt.Errorf("scope_name column not present — run ingestor to apply migrations") + } + + var since string + var bucketExpr string + switch window { + case "1h": + since = time.Now().Add(-1 * time.Hour).UTC().Format(time.RFC3339) + // 5-minute buckets + bucketExpr = `strftime('%Y-%m-%dT%H:', first_seen) || printf('%02d', (CAST(strftime('%M', first_seen) AS INTEGER) / 5) * 5) || ':00Z'` + case "7d": + since = time.Now().Add(-7 * 24 * time.Hour).UTC().Format(time.RFC3339) + // 6-hour buckets + bucketExpr = `strftime('%Y-%m-%dT', first_seen) || printf('%02d', (CAST(strftime('%H', first_seen) AS INTEGER) / 6) * 6) || ':00:00Z'` + default: // "24h" + window = "24h" + since = time.Now().Add(-24 * time.Hour).UTC().Format(time.RFC3339) + // 1-hour buckets + bucketExpr = `strftime('%Y-%m-%dT%H:00:00Z', first_seen)` + } + + resp := &ScopeStatsResponse{Window: window} + + // Summary counts + row := db.conn.QueryRow(` + SELECT + COUNT(*) AS transport_total, + COUNT(scope_name) AS scoped, + SUM(CASE WHEN scope_name IS NULL THEN 1 ELSE 0 END) AS unscoped, + SUM(CASE WHEN scope_name = '' THEN 1 ELSE 0 END) AS unknown_scope + FROM transmissions + WHERE route_type IN (0, 3) AND first_seen >= ? + `, since) + if err := row.Scan( + &resp.Summary.TransportTotal, + &resp.Summary.Scoped, + &resp.Summary.Unscoped, + &resp.Summary.UnknownScope, + ); err != nil { + return nil, fmt.Errorf("scope summary query: %w", err) + } + + // Per-region counts (named regions only) + rows, err := db.conn.Query(` + SELECT scope_name, COUNT(*) AS cnt + FROM transmissions + WHERE route_type IN (0, 3) AND scope_name IS NOT NULL AND scope_name != '' AND first_seen >= ? + GROUP BY scope_name + ORDER BY cnt DESC + `, since) + if err != nil { + return nil, fmt.Errorf("scope byRegion query: %w", err) + } + defer rows.Close() + for rows.Next() { + var rc ScopeRegionCount + if rows.Scan(&rc.Name, &rc.Count) == nil { + resp.ByRegion = append(resp.ByRegion, rc) + } + } + if resp.ByRegion == nil { + resp.ByRegion = []ScopeRegionCount{} + } + + // Time series + tsQuery := fmt.Sprintf(` + SELECT %s AS bucket, + COUNT(scope_name) AS scoped, + SUM(CASE WHEN scope_name IS NULL THEN 1 ELSE 0 END) AS unscoped + FROM transmissions + WHERE route_type IN (0, 3) AND first_seen >= ? + GROUP BY bucket + ORDER BY bucket + `, bucketExpr) + tsRows, err := db.conn.Query(tsQuery, since) + if err != nil { + return nil, fmt.Errorf("scope timeseries query: %w", err) + } + defer tsRows.Close() + for tsRows.Next() { + var pt ScopeTimePoint + if tsRows.Scan(&pt.T, &pt.Scoped, &pt.Unscoped) == nil { + resp.TimeSeries = append(resp.TimeSeries, pt) + } + } + if resp.TimeSeries == nil { + resp.TimeSeries = []ScopeTimePoint{} + } + + return resp, nil +} diff --git a/cmd/server/db_test.go b/cmd/server/db_test.go index 5067a029..1190c428 100644 --- a/cmd/server/db_test.go +++ b/cmd/server/db_test.go @@ -2033,3 +2033,53 @@ func TestPerObservationRawHexEnrich(t *testing.T) { } } } + +func TestGetScopeStats(t *testing.T) { + conn, err := sql.Open("sqlite", ":memory:") + if err != nil { + t.Fatalf("sql.Open: %v", err) + } + conn.SetMaxOpenConns(1) + db := &DB{conn: conn} + defer db.conn.Close() + + // Create minimal schema + db.conn.Exec(`CREATE TABLE IF NOT EXISTS transmissions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw_hex TEXT, hash TEXT, first_seen TEXT, route_type INTEGER, + payload_type INTEGER, payload_version INTEGER, decoded_json TEXT, + scope_name TEXT DEFAULT NULL + )`) + // Manually set hasScopeName since we bypassed the detector + db.hasScopeName = true + + now := time.Now().UTC().Format(time.RFC3339) + // Transport scoped, known region + db.conn.Exec(`INSERT INTO transmissions (hash, first_seen, route_type, scope_name) VALUES ('a', ?, 0, '#belgium')`, now) + // Transport scoped, unknown + db.conn.Exec(`INSERT INTO transmissions (hash, first_seen, route_type, scope_name) VALUES ('b', ?, 0, '')`, now) + // Transport unscoped (NULL) + db.conn.Exec(`INSERT INTO transmissions (hash, first_seen, route_type, scope_name) VALUES ('c', ?, 0, NULL)`, now) + // Non-transport (should not count) + db.conn.Exec(`INSERT INTO transmissions (hash, first_seen, route_type, scope_name) VALUES ('d', ?, 1, NULL)`, now) + + stats, err := db.GetScopeStats("24h") + if err != nil { + t.Fatalf("GetScopeStats: %v", err) + } + if stats.Summary.TransportTotal != 3 { + t.Errorf("TransportTotal = %d, want 3", stats.Summary.TransportTotal) + } + if stats.Summary.Scoped != 2 { + t.Errorf("Scoped = %d, want 2", stats.Summary.Scoped) + } + if stats.Summary.Unscoped != 1 { + t.Errorf("Unscoped = %d, want 1", stats.Summary.Unscoped) + } + if stats.Summary.UnknownScope != 1 { + t.Errorf("UnknownScope = %d, want 1", stats.Summary.UnknownScope) + } + if len(stats.ByRegion) != 1 || stats.ByRegion[0].Name != "#belgium" || stats.ByRegion[0].Count != 1 { + t.Errorf("ByRegion = %+v, want [{#belgium 1}]", stats.ByRegion) + } +} diff --git a/cmd/server/types.go b/cmd/server/types.go index 50505763..b6b933d7 100644 --- a/cmd/server/types.go +++ b/cmd/server/types.go @@ -90,6 +90,33 @@ type StatsResponse struct { GoSysMB float64 `json:"goSysMB"` // runtime.MemStats.Sys (total Go-managed) } +// ─── Scope Stats ─────────────────────────────────────────────────────────────── + +type ScopeStatsSummary struct { + TransportTotal int `json:"transportTotal"` + Scoped int `json:"scoped"` + Unscoped int `json:"unscoped"` + UnknownScope int `json:"unknownScope"` +} + +type ScopeRegionCount struct { + Name string `json:"name"` + Count int `json:"count"` +} + +type ScopeTimePoint struct { + T string `json:"t"` + Scoped int `json:"scoped"` + Unscoped int `json:"unscoped"` +} + +type ScopeStatsResponse struct { + Window string `json:"window"` + Summary ScopeStatsSummary `json:"summary"` + ByRegion []ScopeRegionCount `json:"byRegion"` + TimeSeries []ScopeTimePoint `json:"timeSeries"` +} + // ─── Health ──────────────────────────────────────────────────────────────────── type MemoryStats struct { From 2a66739a54ee6e8718283c27f198fb7737b4415d Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 16:51:16 +0200 Subject: [PATCH 10/17] feat(server): add /api/scope-stats endpoint (#899) Register GET /api/scope-stats route with per-window 30s cache; fix COALESCE on SUM in GetScopeStats summary query to handle empty result sets. Co-Authored-By: Claude Sonnet 4.6 --- cmd/server/db.go | 4 ++-- cmd/server/routes.go | 46 +++++++++++++++++++++++++++++++++++++++ cmd/server/routes_test.go | 31 ++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/cmd/server/db.go b/cmd/server/db.go index b3134c84..57af38fc 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -2394,8 +2394,8 @@ func (db *DB) GetScopeStats(window string) (*ScopeStatsResponse, error) { SELECT COUNT(*) AS transport_total, COUNT(scope_name) AS scoped, - SUM(CASE WHEN scope_name IS NULL THEN 1 ELSE 0 END) AS unscoped, - SUM(CASE WHEN scope_name = '' THEN 1 ELSE 0 END) AS unknown_scope + COALESCE(SUM(CASE WHEN scope_name IS NULL THEN 1 ELSE 0 END), 0) AS unscoped, + COALESCE(SUM(CASE WHEN scope_name = '' THEN 1 ELSE 0 END), 0) AS unknown_scope FROM transmissions WHERE route_type IN (0, 3) AND first_seen >= ? `, since) diff --git a/cmd/server/routes.go b/cmd/server/routes.go index 70839b52..b716d44e 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -45,6 +45,11 @@ type Server struct { neighborMu sync.Mutex neighborGraph *NeighborGraph + // Cached /api/scope-stats response — per-window, recomputed at most once every 30s + scopeStatsMu sync.Mutex + scopeStatsCache map[string]*ScopeStatsResponse + scopeStatsCachedAt map[string]time.Time + // Router reference for OpenAPI spec generation router *mux.Router } @@ -121,6 +126,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) { // System endpoints r.HandleFunc("/api/health", s.handleHealth).Methods("GET") r.HandleFunc("/api/stats", s.handleStats).Methods("GET") + r.HandleFunc("/api/scope-stats", s.handleScopeStats).Methods("GET") r.HandleFunc("/api/perf", s.handlePerf).Methods("GET") r.Handle("/api/perf/reset", s.requireAPIKey(http.HandlerFunc(s.handlePerfReset))).Methods("POST") r.Handle("/api/admin/prune", s.requireAPIKey(http.HandlerFunc(s.handleAdminPrune))).Methods("POST") @@ -2712,3 +2718,43 @@ func (s *Server) handleDroppedPackets(w http.ResponseWriter, r *http.Request) { } writeJSON(w, results) } + +func (s *Server) handleScopeStats(w http.ResponseWriter, r *http.Request) { + const scopeStatsTTL = 30 * time.Second + + window := r.URL.Query().Get("window") + if window == "" { + window = "24h" + } + if window != "1h" && window != "24h" && window != "7d" { + writeError(w, 400, "window must be 1h, 24h, or 7d") + return + } + + s.scopeStatsMu.Lock() + if s.scopeStatsCache != nil { + if cached, ok := s.scopeStatsCache[window]; ok && time.Since(s.scopeStatsCachedAt[window]) < scopeStatsTTL { + s.scopeStatsMu.Unlock() + writeJSON(w, cached) + return + } + } + s.scopeStatsMu.Unlock() + + resp, err := s.db.GetScopeStats(window) + if err != nil { + writeError(w, 500, err.Error()) + return + } + + s.scopeStatsMu.Lock() + if s.scopeStatsCache == nil { + s.scopeStatsCache = make(map[string]*ScopeStatsResponse) + s.scopeStatsCachedAt = make(map[string]time.Time) + } + s.scopeStatsCache[window] = resp + s.scopeStatsCachedAt[window] = time.Now() + s.scopeStatsMu.Unlock() + + writeJSON(w, resp) +} diff --git a/cmd/server/routes_test.go b/cmd/server/routes_test.go index 4ac15f54..1a301a45 100644 --- a/cmd/server/routes_test.go +++ b/cmd/server/routes_test.go @@ -3972,3 +3972,34 @@ func TestPacketDetailPrefersStoreOverDB(t *testing.T) { t.Errorf("expected observation_count=2 (from store), got %v", body["observation_count"]) } } + +func TestHandleScopeStats(t *testing.T) { + srv, _ := setupTestServer(t) + // Add scope_name column and mark hasScopeName on the test DB + if _, err := srv.db.conn.Exec(`ALTER TABLE transmissions ADD COLUMN scope_name TEXT DEFAULT NULL`); err != nil { + t.Fatalf("add scope_name column: %v", err) + } + srv.db.hasScopeName = true + + req := httptest.NewRequest("GET", "/api/scope-stats?window=24h", nil) + w := httptest.NewRecorder() + srv.handleScopeStats(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body: %s", w.Code, w.Body.String()) + } + var resp ScopeStatsResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.Window != "24h" { + t.Errorf("window = %q, want 24h", resp.Window) + } + // TimeSeries and ByRegion are always non-nil slices + if resp.TimeSeries == nil { + t.Error("timeSeries is nil, want empty slice") + } + if resp.ByRegion == nil { + t.Error("byRegion is nil, want empty slice") + } +} From 96f9ac1dd761cc3e885e919f9b61d65c8654fd85 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 16:54:46 +0200 Subject: [PATCH 11/17] test(server): add error path coverage for handleScopeStats (#899) --- cmd/server/routes_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/cmd/server/routes_test.go b/cmd/server/routes_test.go index 1a301a45..985c0e8d 100644 --- a/cmd/server/routes_test.go +++ b/cmd/server/routes_test.go @@ -4003,3 +4003,32 @@ func TestHandleScopeStats(t *testing.T) { t.Error("byRegion is nil, want empty slice") } } + +func TestHandleScopeStatsInvalidWindow(t *testing.T) { + srv, _ := setupTestServer(t) + if _, err := srv.db.conn.Exec(`ALTER TABLE transmissions ADD COLUMN scope_name TEXT DEFAULT NULL`); err != nil { + t.Fatalf("add scope_name column: %v", err) + } + srv.db.hasScopeName = true + + req := httptest.NewRequest("GET", "/api/scope-stats?window=invalid", nil) + w := httptest.NewRecorder() + srv.handleScopeStats(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("status = %d, want 400", w.Code) + } +} + +func TestHandleScopeStatsNoColumn(t *testing.T) { + srv, _ := setupTestServer(t) + // hasScopeName stays false (not set) + + req := httptest.NewRequest("GET", "/api/scope-stats?window=24h", nil) + w := httptest.NewRecorder() + srv.handleScopeStats(w, req) + + if w.Code != http.StatusInternalServerError { + t.Errorf("status = %d, want 500", w.Code) + } +} From 5a6458f73a4751645b44ea1ecfe927cbe3f88cff Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 16:55:23 +0200 Subject: [PATCH 12/17] docs: add /api/scope-stats to api-spec (#899) --- docs/api-spec.md | 50 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/docs/api-spec.md b/docs/api-spec.md index 082b8861..f8b145ba 100644 --- a/docs/api-spec.md +++ b/docs/api-spec.md @@ -40,6 +40,7 @@ - [GET /api/analytics/hash-sizes](#get-apianalyticshash-sizes) - [GET /api/analytics/subpaths](#get-apianalyticssubpaths) - [GET /api/analytics/subpath-detail](#get-apianalyticssubpath-detail) +- [GET /api/scope-stats](#get-apiscope-stats) - [GET /api/resolve-hops](#get-apiresolve-hops) - [GET /api/traces/:hash](#get-apitraceshash) - [GET /api/config/theme](#get-apiconfigtheme) @@ -1456,6 +1457,55 @@ Detailed stats for a specific subpath. --- +## GET /api/scope-stats + +Scope-based packet statistics over a time window. Requires ingestor `scope_name_v1` migration to have run. + +### Query Parameters + +| Param | Type | Default | Description | +|----------|--------|---------|------------------------------------------------| +| `window` | string | `24h` | Time window: `1h`, `24h`, `7d` | + +### Response `200` + +```jsonc +{ + "window": string, // echoed window ("1h", "24h", or "7d") + "summary": { + "transportTotal": number, // scoped + unscoped transport-route packets + "scoped": number, // Code1 ≠ 0000 (named + unknown regions) + "unscoped": number, // transport-route with Code1 = 0000 + "unknownScope": number // scoped but no configured region matched (subset of scoped) + }, + "byRegion": [ + { "name": string, "count": number } // region name and packet count + ], + "timeSeries": [ + { "t": string (ISO), "scoped": number, "unscoped": number } // bucket timestamps and counts + ] +} +``` + +**Notes:** +- `transportTotal` = `scoped` + `unscoped` (only route_type 0 or 3 packets) +- `scoped` = packets with Code1 ≠ 0000 +- `unscoped` = transport-route packets with Code1 = 0000 +- `unknownScope` = scoped packets that did not match any configured region name +- Time-series bucket size depends on window: + - `1h` window → 5-minute buckets + - `24h` window → 1-hour buckets + - `7d` window → 6-hour buckets +- Cached 30 seconds + +### Response `400` + +```json +{ "error": "invalid window parameter" } +``` + +--- + ## GET /api/resolve-hops Resolve path hop hex prefixes to node names with regional disambiguation. From 7b6e97274883f069935dad4e981ec647ad581fe4 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 16:57:41 +0200 Subject: [PATCH 13/17] feat(frontend): add Scopes tab to Analytics page (#899) Co-Authored-By: Claude Sonnet 4.6 --- public/analytics.js | 128 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/public/analytics.js b/public/analytics.js index 36fe90c6..1612df3e 100644 --- a/public/analytics.js +++ b/public/analytics.js @@ -89,6 +89,7 @@ +
@@ -184,6 +185,7 @@ case 'rf-health': await renderRFHealthTab(el); break; case 'clock-health': await renderClockHealthTab(el); break; case 'prefix-tool': await renderPrefixTool(el); break; + case 'scopes': await renderScopesTab(el); break; } // Auto-apply column resizing to all analytics tables requestAnimationFrame(() => { @@ -3567,5 +3569,131 @@ function destroy() { _analyticsData = {}; _channelData = null; if (_ngState && _ } } + // ===================== SCOPES ===================== + async function renderScopesTab(el) { + var winKey = 'scopes_window'; + var selectedWindow = (typeof sessionStorage !== 'undefined' && sessionStorage.getItem(winKey)) || '24h'; + + async function load(w) { + el.innerHTML = '
Loading scope stats…
'; + try { + var data = await (await fetch('/api/scope-stats?window=' + encodeURIComponent(w))).json(); + if (data.error) { + el.innerHTML = '
' + esc(data.error) + '
'; + return; + } + render(data, w); + } catch (err) { + el.innerHTML = '
Failed to load scope stats: ' + esc(String(err)) + '
'; + } + } + + function pct(n, total) { + if (!total) return '—'; + return (n / total * 100).toFixed(1) + '%'; + } + + function render(d, w) { + var s = d.summary; + var total = s.transportTotal || 0; + + // Window selector + var winHtml = ['1h', '24h', '7d'].map(function(v) { + return ''; + }).join(''); + + // Summary cards + var cardsHtml = [ + { label: 'Transport Total', value: total.toLocaleString(), note: '' }, + { label: 'Scoped', value: s.scoped.toLocaleString(), note: pct(s.scoped, total) }, + { label: 'Unscoped', value: s.unscoped.toLocaleString(), note: pct(s.unscoped, total) }, + { label: 'Unknown Scope', value: s.unknownScope.toLocaleString(), note: pct(s.unknownScope, s.scoped) + ' of scoped' }, + ].map(function(c) { + return '
' + c.value + '
' + + '
' + c.label + '
' + + (c.note ? '
' + c.note + '
' : '') + + '
'; + }).join(''); + + // Per-region table + var tableBody = ''; + if (d.byRegion && d.byRegion.length) { + tableBody = d.byRegion.map(function(r) { + return '' + esc(r.name) + '' + + '' + r.count.toLocaleString() + '' + + '' + pct(r.count, s.scoped) + ''; + }).join(''); + if (s.unknownScope > 0) { + tableBody += 'Unknown scope' + + '' + s.unknownScope.toLocaleString() + '' + + '' + pct(s.unknownScope, s.scoped) + ''; + } + } else if (s.scoped === 0) { + tableBody = 'No scoped messages in this window'; + } else { + tableBody = 'No regions configured — add hashRegions to your config'; + } + + // Time-series chart (two-line SVG) + var chartHtml = ''; + if (d.timeSeries && d.timeSeries.length > 1) { + var scopedVals = d.timeSeries.map(function(p) { return p.scoped; }); + var unscopedVals = d.timeSeries.map(function(p) { return p.unscoped; }); + var maxVal = Math.max(1, Math.max.apply(null, scopedVals.concat(unscopedVals))); + var W = 800, H = 180, padL = 44, padB = 24, padT = 10, padR = 10; + var plotW = W - padL - padR, plotH = H - padB - padT; + var n = d.timeSeries.length; + + function pts(vals) { + return vals.map(function(v, i) { + var x = padL + i * plotW / Math.max(n - 1, 1); + var y = padT + plotH - (v / maxVal) * plotH; + return x.toFixed(1) + ',' + y.toFixed(1); + }).join(' '); + } + + var grid = ''; + for (var gi = 0; gi <= 4; gi++) { + var gy = padT + plotH * gi / 4; + var gv = Math.round(maxVal * (4 - gi) / 4); + grid += ''; + grid += '' + gv + ''; + } + + var legendX = padL + plotW - 120; + chartHtml = '
' + + '' + + grid + + '' + + '' + + '' + + 'Scoped' + + '' + + 'Unscoped' + + '
'; + } + + el.innerHTML = + '

Scope Statistics

' + + '
' + winHtml + '
' + + '
' + cardsHtml + '
' + + '' + + '' + + '' + tableBody + '
RegionMessages% of Scoped
' + + chartHtml; + + // Bind window selector + el.querySelectorAll('[data-win]').forEach(function(btn) { + btn.addEventListener('click', function() { + selectedWindow = btn.dataset.win; + if (typeof sessionStorage !== 'undefined') sessionStorage.setItem(winKey, selectedWindow); + load(selectedWindow); + }); + }); + } + + load(selectedWindow); + } + registerPage('analytics', { init, destroy }); })(); From 0ded7d152f78bf02195c7c1b83655e05b4ffd775 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 17:25:05 +0200 Subject: [PATCH 14/17] fix(server/db): check rows.Err() after GetScopeStats iteration; fix api-spec 400 message (#899) --- cmd/server/db.go | 6 ++++++ docs/api-spec.md | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/server/db.go b/cmd/server/db.go index 57af38fc..d1cca180 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -2426,6 +2426,9 @@ func (db *DB) GetScopeStats(window string) (*ScopeStatsResponse, error) { resp.ByRegion = append(resp.ByRegion, rc) } } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("scope byRegion iteration: %w", err) + } if resp.ByRegion == nil { resp.ByRegion = []ScopeRegionCount{} } @@ -2451,6 +2454,9 @@ func (db *DB) GetScopeStats(window string) (*ScopeStatsResponse, error) { resp.TimeSeries = append(resp.TimeSeries, pt) } } + if err := tsRows.Err(); err != nil { + return nil, fmt.Errorf("scope timeseries iteration: %w", err) + } if resp.TimeSeries == nil { resp.TimeSeries = []ScopeTimePoint{} } diff --git a/docs/api-spec.md b/docs/api-spec.md index f8b145ba..b2ca4e30 100644 --- a/docs/api-spec.md +++ b/docs/api-spec.md @@ -1501,7 +1501,7 @@ Scope-based packet statistics over a time window. Requires ingestor `scope_name_ ### Response `400` ```json -{ "error": "invalid window parameter" } +{ "error": "window must be 1h, 24h, or 7d" } ``` --- From 95c2382651b33319114a2028503236af9c913282 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 21:50:43 +0200 Subject: [PATCH 15/17] docs: add hashRegions example to config.example.json (#899) --- config.example.json | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/config.example.json b/config.example.json index 5672ed31..c95b4aee 100644 --- a/config.example.json +++ b/config.example.json @@ -221,6 +221,11 @@ "_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).", "_comment_channelKeys": "Hex keys for decrypting channel messages. Key name = channel display name. public channel key is well-known.", "_comment_hashChannels": "Channel names whose keys are derived via SHA256. Key = SHA256(name)[:16]. Listed here so the ingestor can auto-derive keys.", + "hashRegions": [ + "#belgium", + "#eu" + ], + "_comment_hashRegions": "Region names for scope matching on transport-route packets. Key = SHA256('#name')[:16]. Add any region names used by nodes in your network.", "_comment_defaultRegion": "IATA code shown by default in region filters.", "_comment_mapDefaults": "Initial map center [lat, lon] and zoom level.", "_comment_regions": "IATA code to display name mapping. Packets are tagged with region codes by MQTT topic structure." From 29cfd49b152bf9a5ec104b1d303530d4642b7158 Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 22:26:14 +0200 Subject: [PATCH 16/17] feat(packets): show scope_name in packet breakdown detail panel (#899) Add scope_name to transmissionBaseSQL/scanTransmissionRow when the column exists, and render a Scope row in the detail-meta dl for scoped transport packets (matched region = "#name", unmatched = "unknown scope"). Co-Authored-By: Claude Sonnet 4.6 --- cmd/server/db.go | 18 +++++++++++++++--- public/packets.js | 1 + 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/cmd/server/db.go b/cmd/server/db.go index d1cca180..16eda435 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -125,6 +125,9 @@ func (db *DB) transmissionBaseSQL() (selectCols, observerJoin string) { ORDER BY length(COALESCE(path_json,'')) DESC LIMIT 1 )` } + if db.hasScopeName { + selectCols += `, t.scope_name` + } return } @@ -135,13 +138,18 @@ func (db *DB) scanTransmissionRow(rows *sql.Rows) map[string]interface{} { var rawHex, hash, firstSeen, decodedJSON, observerID, observerName, pathJSON, direction sql.NullString var routeType, payloadType sql.NullInt64 var snr, rssi sql.NullFloat64 + var scopeName sql.NullString - if err := rows.Scan(&id, &rawHex, &hash, &firstSeen, &routeType, &payloadType, &decodedJSON, - &observationCount, &observerID, &observerName, &snr, &rssi, &pathJSON, &direction); err != nil { + scanArgs := []interface{}{&id, &rawHex, &hash, &firstSeen, &routeType, &payloadType, &decodedJSON, + &observationCount, &observerID, &observerName, &snr, &rssi, &pathJSON, &direction} + if db.hasScopeName { + scanArgs = append(scanArgs, &scopeName) + } + if err := rows.Scan(scanArgs...); err != nil { return nil } - return map[string]interface{}{ + m := map[string]interface{}{ "id": id, "raw_hex": nullStr(rawHex), "hash": nullStr(hash), @@ -158,6 +166,10 @@ func (db *DB) scanTransmissionRow(rows *sql.Rows) map[string]interface{} { "path_json": nullStr(pathJSON), "direction": nullStr(direction), } + if db.hasScopeName { + m["scope_name"] = nullStr(scopeName) + } + return m } // Node represents a row from the nodes table. diff --git a/public/packets.js b/public/packets.js index f0b2db8a..f8b73b27 100644 --- a/public/packets.js +++ b/public/packets.js @@ -2012,6 +2012,7 @@
Location
${locationHtml}
SNR / RSSI
${snr != null ? snr + ' dB' : '—'} / ${rssi != null ? rssi + ' dBm' : '—'}
Route Type
${routeTypeName(pkt.route_type)}
+ ${pkt.scope_name != null ? `
Scope
${pkt.scope_name !== '' ? escapeHtml(pkt.scope_name) : 'unknown scope'}
` : ''}
Payload Type
${typeName}
${hashSize ? `
Hash Size
${hashSize} byte${hashSize !== 1 ? 's' : ''}
` : ''}
Timestamp
${renderTimestampCell(effectivePkt.timestamp)}
From e535b1eb921f2712bc393580f916820eee8d72dc Mon Sep 17 00:00:00 2001 From: efiten Date: Fri, 24 Apr 2026 23:31:54 +0200 Subject: [PATCH 17/17] =?UTF-8?q?perf(ingestor):=20remove=20BackfillScopeN?= =?UTF-8?q?ames=20=E2=80=94=20caused=20100%=20CPU=20on=20first=20deploy?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The backfill processed all historical transport-route rows in a tight loop at startup with no throttling. scope_name is populated at ingest for new packets going forward; historical rows remain NULL. Co-Authored-By: Claude Sonnet 4.6 --- cmd/ingestor/db.go | 50 ----------------------------------------- cmd/ingestor/db_test.go | 50 ----------------------------------------- cmd/ingestor/main.go | 1 - 3 files changed, 101 deletions(-) diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 86243cb9..414d32bb 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -915,56 +915,6 @@ func (s *Store) PruneDroppedPackets(retentionDays int) (int64, error) { return n, nil } -// BackfillScopeNames sets scope_name on existing transport-route transmissions that -// lack it, using the provided region HMAC keys. Safe to re-run — only processes -// rows where scope_name IS NULL and route_type IN (0, 3). -func (s *Store) BackfillScopeNames(regionKeys map[string][]byte) { - if len(regionKeys) == 0 { - return - } - rows, err := s.db.Query(` - SELECT id, raw_hex FROM transmissions - WHERE route_type IN (0, 3) AND scope_name IS NULL AND raw_hex IS NOT NULL - `) - if err != nil { - log.Printf("[backfill] scope_name query: %v", err) - return - } - defer rows.Close() - - type row struct { - id int64 - rawHex string - } - var pending []row - for rows.Next() { - var r row - if rows.Scan(&r.id, &r.rawHex) == nil && r.rawHex != "" { - pending = append(pending, r) - } - } - if err := rows.Err(); err != nil { - log.Printf("[backfill] scope_name iteration error: %v", err) - return - } - - updated := 0 - for _, r := range pending { - decoded, err := DecodePacket(r.rawHex, nil, false) - if err != nil || decoded.TransportCodes == nil { - continue - } - if decoded.TransportCodes.Code1 == "0000" { - continue - } - scopeName := matchScope(regionKeys, byte(decoded.Header.PayloadType), decoded.PayloadRaw, decoded.TransportCodes.Code1) - s.db.Exec(`UPDATE transmissions SET scope_name = ? WHERE id = ?`, scopeName, r.id) - updated++ - } - if updated > 0 { - log.Printf("[backfill] scope_name set for %d/%d transport-route rows", updated, len(pending)) - } -} // PacketData holds the data needed to insert a packet into the DB. type PacketData struct { diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index 9ff10ebe..e4002ca8 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -1,10 +1,7 @@ package main import ( - "crypto/hmac" - "crypto/sha256" "database/sql" - "encoding/hex" "encoding/json" "fmt" "os" @@ -2159,50 +2156,3 @@ func TestScopeNameMigration(t *testing.T) { } } -func TestBackfillScopeNames(t *testing.T) { - dir := t.TempDir() - store, err := OpenStore(filepath.Join(dir, "test.db")) - if err != nil { - t.Fatalf("OpenStore: %v", err) - } - defer store.Close() - - // Insert a transport-route packet with known Code1 - regionName := "#test" - h := sha256.Sum256([]byte(regionName)) - key := h[:16] - payloadType := byte(0x05) - payloadRaw := []byte("hello") - - mac := hmac.New(sha256.New, key) - mac.Write([]byte{payloadType}) - mac.Write(payloadRaw) - hmacBytes := mac.Sum(nil) - code := uint16(hmacBytes[0]) | uint16(hmacBytes[1])<<8 - if code == 0 { - code = 1 - } else if code == 0xFFFF { - code = 0xFFFE - } - codeBytes := [2]byte{byte(code & 0xFF), byte(code >> 8)} - - // Build raw packet bytes: header(1) + Code1(2) + Code2(2) + path_len(1) + payload - header := byte(0x00) | (payloadType << 2) // TRANSPORT_FLOOD + payload_type in bits 2-5 - raw := []byte{header} - raw = append(raw, codeBytes[:]...) - raw = append(raw, 0x00, 0x00) // Code2 = 0000 - raw = append(raw, 0x00) // path_len = 0 hops - raw = append(raw, payloadRaw...) - rawHex := strings.ToUpper(hex.EncodeToString(raw)) - - store.db.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) - VALUES (?, 'testhash1', datetime('now'), 0, 5, 0, '{}')`, rawHex) - - store.BackfillScopeNames(map[string][]byte{regionName: key}) - - var scopeName *string - store.db.QueryRow(`SELECT scope_name FROM transmissions WHERE hash = 'testhash1'`).Scan(&scopeName) - if scopeName == nil || *scopeName != regionName { - t.Errorf("scope_name = %v, want %q", scopeName, regionName) - } -} diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 44cd7853..5a758397 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -114,7 +114,6 @@ func main() { } regionKeys := loadRegionKeys(cfg) - go store.BackfillScopeNames(regionKeys) // Connect to each MQTT source var clients []mqtt.Client