Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 123 additions & 4 deletions cmd/server/clock_skew.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type NodeClockSkew struct {
GoodFraction float64 `json:"goodFraction"` // fraction of recent samples with |skew| <= 1h
RecentBadSampleCount int `json:"recentBadSampleCount"` // count of recent samples with |skew| > 1h
RecentSampleCount int `json:"recentSampleCount"` // total recent samples in window
RecentHashEvidence []HashEvidence `json:"recentHashEvidence,omitempty"`
CalibrationSummary *CalibrationSummary `json:"calibrationSummary,omitempty"`
NodeName string `json:"nodeName,omitempty"` // populated in fleet responses
NodeRole string `json:"nodeRole,omitempty"` // populated in fleet responses
}
Expand All @@ -130,6 +132,31 @@ type SkewSample struct {
SkewSec float64 `json:"skew"` // corrected skew in seconds
}

// HashEvidenceObserver is one observer's contribution to a per-hash evidence entry.
type HashEvidenceObserver struct {
ObserverID string `json:"observerID"`
ObserverName string `json:"observerName"`
RawSkewSec float64 `json:"rawSkewSec"`
CorrectedSkewSec float64 `json:"correctedSkewSec"`
ObserverOffsetSec float64 `json:"observerOffsetSec"`
Calibrated bool `json:"calibrated"`
}

// HashEvidence is per-hash clock skew evidence showing individual observer contributions.
type HashEvidence struct {
Hash string `json:"hash"`
Observers []HashEvidenceObserver `json:"observers"`
MedianCorrectedSkewSec float64 `json:"medianCorrectedSkewSec"`
Timestamp int64 `json:"timestamp"`
}

// CalibrationSummary counts how many samples were corrected via observer calibration.
type CalibrationSummary struct {
TotalSamples int `json:"totalSamples"`
CalibratedSamples int `json:"calibratedSamples"`
UncalibratedSamples int `json:"uncalibratedSamples"`
}

// txSkewResult maps tx hash → per-transmission skew stats. This is an
// intermediate result keyed by hash (not pubkey); the store maps hash → pubkey
// when building the final per-node view.
Expand All @@ -143,15 +170,27 @@ type ClockSkewEngine struct {
observerOffsets map[string]float64 // observerID → calibrated offset (seconds)
observerSamples map[string]int // observerID → number of multi-observer packets used
nodeSkew txSkewResult
hashEvidence map[string][]hashEvidenceEntry // hash → per-observer raw/corrected data
lastComputed time.Time
computeInterval time.Duration
}

// hashEvidenceEntry stores raw evidence per observer per hash, cached during Recompute.
type hashEvidenceEntry struct {
observerID string
rawSkew float64
corrected float64
offset float64
calibrated bool
observedTS int64
}

func NewClockSkewEngine() *ClockSkewEngine {
return &ClockSkewEngine{
observerOffsets: make(map[string]float64),
observerSamples: make(map[string]int),
nodeSkew: make(txSkewResult),
hashEvidence: make(map[string][]hashEvidenceEntry),
computeInterval: 30 * time.Second,
}
}
Expand All @@ -176,14 +215,16 @@ func (e *ClockSkewEngine) Recompute(store *PacketStore) {
var newOffsets map[string]float64
var newSamples map[string]int
var newNodeSkew txSkewResult
var newHashEvidence map[string][]hashEvidenceEntry

if len(samples) > 0 {
newOffsets, newSamples = calibrateObservers(samples)
newNodeSkew = computeNodeSkew(samples, newOffsets)
newNodeSkew, newHashEvidence = computeNodeSkew(samples, newOffsets)
} else {
newOffsets = make(map[string]float64)
newSamples = make(map[string]int)
newNodeSkew = make(txSkewResult)
newHashEvidence = make(map[string][]hashEvidenceEntry)
}

// Swap results under brief write lock.
Expand All @@ -196,6 +237,7 @@ func (e *ClockSkewEngine) Recompute(store *PacketStore) {
e.observerOffsets = newOffsets
e.observerSamples = newSamples
e.nodeSkew = newNodeSkew
e.hashEvidence = newHashEvidence
e.lastComputed = time.Now()
e.mu.Unlock()
}
Expand Down Expand Up @@ -332,7 +374,7 @@ func calibrateObservers(samples []skewSample) (map[string]float64, map[string]in
// ── Phase 3: Per-Node Skew ─────────────────────────────────────────────────────

// computeNodeSkew calculates corrected skew statistics for each node.
func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkewResult {
func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) (txSkewResult, map[string][]hashEvidenceEntry) {
// Compute corrected skew per sample, grouped by hash (each hash = one
// node's advert transmission). The caller maps hash → pubkey via byNode.
type correctedSample struct {
Expand All @@ -343,6 +385,7 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew

byHash := make(map[string][]correctedSample)
hashAdvertTS := make(map[string]int64)
evidence := make(map[string][]hashEvidenceEntry) // hash → per-observer evidence

for _, s := range samples {
obsOffset, hasCal := obsOffsets[s.observerID]
Expand All @@ -359,6 +402,14 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew
calibrated: hasCal,
})
hashAdvertTS[s.hash] = s.advertTS
evidence[s.hash] = append(evidence[s.hash], hashEvidenceEntry{
observerID: s.observerID,
rawSkew: round(rawSkew, 1),
corrected: round(corrected, 1),
offset: round(obsOffset, 1),
calibrated: hasCal,
observedTS: s.observedTS,
})
}

// Each hash represents one advert from one node. Compute median corrected
Expand Down Expand Up @@ -397,7 +448,7 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew
LastObservedTS: latestObsTS,
}
}
return result
return result, evidence
}

// ── Integration with PacketStore ───────────────────────────────────────────────
Expand Down Expand Up @@ -558,6 +609,70 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew {
samples[i] = SkewSample{Timestamp: p.ts, SkewSec: round(p.skew, 1)}
}

// Build per-hash evidence (most recent 10 hashes with ≥1 observer).
// Observer name lookup from store observations.
obsNameMap := make(map[string]string)
type hashMeta struct {
hash string
ts int64
}
var evidenceHashes []hashMeta
for _, tx := range txs {
if tx.PayloadType == nil || *tx.PayloadType != PayloadADVERT {
continue
}
ev, ok := s.clockSkew.hashEvidence[tx.Hash]
if !ok || len(ev) == 0 {
continue
}
// Collect observer names from tx observations.
for _, obs := range tx.Observations {
if obs.ObserverID != "" && obs.ObserverName != "" {
obsNameMap[obs.ObserverID] = obs.ObserverName
}
}
evidenceHashes = append(evidenceHashes, hashMeta{hash: tx.Hash, ts: ev[0].observedTS})
}
// Sort by timestamp descending, take most recent 10.
sort.Slice(evidenceHashes, func(i, j int) bool { return evidenceHashes[i].ts > evidenceHashes[j].ts })
if len(evidenceHashes) > 10 {
evidenceHashes = evidenceHashes[:10]
}
var recentEvidence []HashEvidence
var calSummary CalibrationSummary
for _, eh := range evidenceHashes {
entries := s.clockSkew.hashEvidence[eh.hash]
var observers []HashEvidenceObserver
var corrSkews []float64
for _, e := range entries {
name := obsNameMap[e.observerID]
if name == "" {
name = e.observerID
}
observers = append(observers, HashEvidenceObserver{
ObserverID: e.observerID,
ObserverName: name,
RawSkewSec: e.rawSkew,
CorrectedSkewSec: e.corrected,
ObserverOffsetSec: e.offset,
Calibrated: e.calibrated,
})
corrSkews = append(corrSkews, e.corrected)
calSummary.TotalSamples++
if e.calibrated {
calSummary.CalibratedSamples++
} else {
calSummary.UncalibratedSamples++
}
}
recentEvidence = append(recentEvidence, HashEvidence{
Hash: eh.hash,
Observers: observers,
MedianCorrectedSkewSec: round(median(corrSkews), 1),
Timestamp: eh.ts,
})
}

return &NodeClockSkew{
Pubkey: pubkey,
MeanSkewSec: round(meanSkew, 1),
Expand All @@ -574,6 +689,8 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew {
GoodFraction: round(goodFraction, 2),
RecentBadSampleCount: recentBadCount,
RecentSampleCount: recentSampleCount,
RecentHashEvidence: recentEvidence,
CalibrationSummary: &calSummary,
}
}

Expand Down Expand Up @@ -601,8 +718,10 @@ func (s *PacketStore) GetFleetClockSkew() []*NodeClockSkew {
cs.NodeName = ni.Name
cs.NodeRole = ni.Role
}
// Omit samples in fleet response (too much data).
// Omit samples and evidence in fleet response (too much data).
cs.Samples = nil
cs.RecentHashEvidence = nil
cs.CalibrationSummary = nil
results = append(results, cs)
}
return results
Expand Down
105 changes: 103 additions & 2 deletions cmd/server/clock_skew_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestComputeNodeSkew_BasicCorrection(t *testing.T) {
// So the median approach finds obs2 is +5 ahead (relative to median)

// Now compute node skew with those offsets:
nodeSkew := computeNodeSkew(samples, offsets)
nodeSkew, _ := computeNodeSkew(samples, offsets)
cs, ok := nodeSkew["h1"]
if !ok {
t.Fatal("expected skew data for hash h1")
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestComputeNodeSkew_ThreeObservers(t *testing.T) {
t.Errorf("obs3 offset = %v, want 30", offsets["obs3"])
}

nodeSkew := computeNodeSkew(samples, offsets)
nodeSkew, _ := computeNodeSkew(samples, offsets)
cs := nodeSkew["h1"]
if cs == nil {
t.Fatal("expected skew data for h1")
Expand Down Expand Up @@ -954,3 +954,104 @@ func TestAllGood_OK_845(t *testing.T) {
t.Errorf("recentBadSampleCount = %v, want 0", r.RecentBadSampleCount)
}
}

func TestNodeClockSkew_EvidencePayload(t *testing.T) {
// 3-observer scenario: obs1 ahead by +2s, obs2 on time, obs3 behind by -1s.
// Node clock is 60s ahead. Raw skew = advertTS - obsTS.
// Hash has 3 observations, each observer sees same advert.
ps := NewPacketStore(nil, nil)

pt := 4 // ADVERT
// Advert timestamp: 1700000060 (node 60s ahead of true time 1700000000)
// obs1 sees at 1700000002 (2s ahead of true time) → raw = 60 - 2 = 58
// obs2 sees at 1700000000 (on time) → raw = 60 - 0 = 60
// obs3 sees at 1699999999 (-1s, behind) → raw = 60 + 1 = 61
// Median obsTS = 1700000000, so:
// obs1 offset = 1700000002 - 1700000000 = +2
// obs2 offset = 0
// obs3 offset = 1699999999 - 1700000000 = -1
// Corrected: raw + offset → obs1: 58+2=60, obs2: 60+0=60, obs3: 61+(-1)=60

tx1 := &StoreTx{
Hash: "evidence_hash_1",
PayloadType: &pt,
DecodedJSON: `{"payload":{"timestamp":1700000060}}`,
Observations: []*StoreObs{
{ObserverID: "obs1", ObserverName: "Observer Alpha", Timestamp: "2023-11-14T22:13:22Z"},
{ObserverID: "obs2", ObserverName: "Observer Beta", Timestamp: "2023-11-14T22:13:20Z"},
{ObserverID: "obs3", ObserverName: "Observer Gamma", Timestamp: "2023-11-14T22:13:19Z"},
},
}
// Second hash to ensure we get multiple evidence entries.
tx2 := &StoreTx{
Hash: "evidence_hash_2",
PayloadType: &pt,
DecodedJSON: `{"payload":{"timestamp":1700003660}}`,
Observations: []*StoreObs{
{ObserverID: "obs1", ObserverName: "Observer Alpha", Timestamp: "2023-11-14T23:13:22Z"},
{ObserverID: "obs2", ObserverName: "Observer Beta", Timestamp: "2023-11-14T23:13:20Z"},
{ObserverID: "obs3", ObserverName: "Observer Gamma", Timestamp: "2023-11-14T23:13:19Z"},
},
}

ps.mu.Lock()
ps.byNode["NODETEST"] = []*StoreTx{tx1, tx2}
ps.byPayloadType[4] = []*StoreTx{tx1, tx2}
ps.clockSkew.computeInterval = 0
ps.mu.Unlock()

r := ps.GetNodeClockSkew("NODETEST")
if r == nil {
t.Fatal("expected clock skew result")
}

// Check recentHashEvidence exists.
if len(r.RecentHashEvidence) == 0 {
t.Fatal("expected recentHashEvidence to be populated")
}
if len(r.RecentHashEvidence) != 2 {
t.Errorf("recentHashEvidence length = %d, want 2", len(r.RecentHashEvidence))
}

// Check first evidence entry has 3 observers.
ev := r.RecentHashEvidence[0]
if len(ev.Observers) != 3 {
t.Fatalf("evidence observers = %d, want 3", len(ev.Observers))
}

// Verify corrected = raw + offset for each observer.
for _, o := range ev.Observers {
expected := o.RawSkewSec + o.ObserverOffsetSec
if math.Abs(o.CorrectedSkewSec-expected) > 0.2 {
t.Errorf("observer %s: corrected=%.1f, expected raw(%.1f)+offset(%.1f)=%.1f",
o.ObserverID, o.CorrectedSkewSec, o.RawSkewSec, o.ObserverOffsetSec, expected)
}
}

// All corrected values should be ~60s (node is 60s ahead).
if math.Abs(ev.MedianCorrectedSkewSec-60) > 1 {
t.Errorf("median corrected = %.1f, want ~60", ev.MedianCorrectedSkewSec)
}

// Check calibration summary.
if r.CalibrationSummary == nil {
t.Fatal("expected calibrationSummary")
}
if r.CalibrationSummary.TotalSamples != 6 { // 3 observers × 2 hashes
t.Errorf("calibration total = %d, want 6", r.CalibrationSummary.TotalSamples)
}
if r.CalibrationSummary.CalibratedSamples != 6 {
t.Errorf("calibrated = %d, want 6 (all multi-observer)", r.CalibrationSummary.CalibratedSamples)
}

// Check observer names are populated.
nameFound := false
for _, o := range ev.Observers {
if o.ObserverName == "Observer Alpha" || o.ObserverName == "Observer Beta" {
nameFound = true
}
}
if !nameFound {
t.Error("expected observer names to be populated from tx observations")
}
}
Loading
Loading