From d4ac25ec1274770dc72f85679d22b93d5c5aabdf Mon Sep 17 00:00:00 2001 From: dawang Date: Thu, 19 Mar 2026 17:04:46 +0800 Subject: [PATCH 1/2] HYPERFLEET-733 - fix: sentinel delays first event publish for newly created clusters --- docs/sentinel-operator-guide.md | 111 ++++++++++++++++--------------- internal/engine/decision.go | 25 ++++--- internal/engine/decision_test.go | 66 +++++++----------- 3 files changed, 97 insertions(+), 105 deletions(-) diff --git a/docs/sentinel-operator-guide.md b/docs/sentinel-operator-guide.md index e042d4e..a2c01e4 100644 --- a/docs/sentinel-operator-guide.md +++ b/docs/sentinel-operator-guide.md @@ -9,8 +9,10 @@ This comprehensive guide teaches operators how to deploy, configure, and operate - [When to Use Sentinel](#12-when-to-use-sentinel) 2. [Core Concepts](#2-core-concepts) - [Decision Engine](#21-decision-engine) - - [State-Based Reconciliation](#211-state-based-reconciliation) - - [Time-Based Reconciliation (Max Age Intervals)](#212-time-based-reconciliation-max-age-intervals) + - [Never-Processed Reconciliation](#211-never-processed-reconciliation) + - [State-Based Reconciliation](#212-state-based-reconciliation) + - [Time-Based Reconciliation (Max Age Intervals)](#213-time-based-reconciliation-max-age-intervals) + - [Complete Decision Flow](#214-complete-decision-flow) - [Resource Filtering](#22-resource-filtering) 3. [Configuration Reference](#3-configuration-reference) - [Configuration File Structure](#31-configuration-file-structure) @@ -59,7 +61,8 @@ graph LR E -->|Update Status| B ``` -Sentinel publishes events to a message broker, which fans out messages to downstream adapters. It uses a **dual-trigger reconciliation strategy**: +Sentinel publishes events to a message broker, which fans out messages to downstream adapters. It uses a **three-part decision strategy**: +- **Never-processed**: Publish immediately for new resources that have never been processed - **State-based**: Publish immediately when resource state indicates unprocessed spec changes - **Time-based**: Publish periodically based on max age intervals to ensure eventual consistency @@ -79,19 +82,36 @@ Deploy Sentinel when you need: ### 2.1 Decision Engine -Sentinel's decision engine evaluates resources during each poll cycle to determine when to publish events. It uses a **dual-trigger strategy** that combines two complementary mechanisms to ensure both immediate response to changes and eventual consistency over time: +Sentinel's decision engine evaluates resources during each poll cycle to determine when to publish events. It uses a **three-part decision strategy** that ensures both immediate response to changes and eventual consistency over time: -1. **State-Based Reconciliation** — Immediate event publishing when resource state indicates unprocessed spec changes, which is checked first -2. **Time-Based Reconciliation** — Periodic event publishing to handle drift and failures when state is in sync +1. **Never-Processed Reconciliation** — Immediate event publishing for new resources that have never been processed by any adapter +2. **State-Based Reconciliation** — Immediate event publishing when resource state indicates unprocessed spec changes +3. **Time-Based Reconciliation** — Periodic event publishing to handle drift and failures when state is in sync **How Sentinel Reads Resource State:** -When Sentinel polls the HyperFleet API, it retrieves cluster or nodepool resources with their current state. +When Sentinel polls the HyperFleet API, it retrieves cluster or nodepool resources with their current state. 1. **`resource.Generation`** — Retrieved from the API resource. The HyperFleet API increments this value every time the resource spec is updated. 2. **`resource.status`** — Extracted from the API resource's `type=Ready` condition. -#### 2.1.1 State-Based Reconciliation +#### 2.1.1 Never-Processed Reconciliation + +Never-processed reconciliation is the **first and highest-priority check** that ensures new resources are published immediately on the first poll cycle after creation. + +**How It Works:** + +Sentinel checks if `status.last_updated` is zero (meaning no adapter has ever updated the resource status): + +- If zero → **Publish immediately** with reason `"never processed"` +- If non-zero → Proceed to state-based check + +**Why This Matters:** + +- New clusters are published within one poll interval (~5s) +- Ensures consistent handling of all new resources regardless of generation initialization + +#### 2.1.2 State-Based Reconciliation State-based reconciliation is a **spec-change detection mechanism** where Sentinel immediately publishes events when resource state indicates the spec has changed but hasn't been fully processed yet. @@ -105,27 +125,6 @@ Sentinel detects unprocessed spec changes by comparing the resource's `generatio **Note**: Sentinel uses the Ready condition's `ObservedGeneration` field as a proxy signal for spec changes. While the Ready condition can also be False for other reasons (e.g., adapter-reported infrastructure failures), the `ObservedGeneration` field specifically tracks spec processing, making this an effective spec-change detection mechanism. -**Flow Diagram:** - -```mermaid -sequenceDiagram - participant User - participant API - participant Sentinel - participant Broker - participant Adapter - - User->>API: Update cluster spec (generation: 1 → 2) - API->>API: Increment generation - Sentinel->>API: Poll resources - API-->>Sentinel: cluster (gen: 2, observed_gen: 1) - Sentinel->>Sentinel: Evaluate: 2 > 1 → PUBLISH - Sentinel->>Broker: CloudEvent (reason: state change detected) - Broker->>Adapter: Consume event - Adapter->>API: Reconcile cluster - Adapter->>API: Update status (observed_generation: 2) -``` - **Key Properties:** - **Immediate Response**: No need to wait for max age interval when state indicates unprocessed changes @@ -133,7 +132,7 @@ sequenceDiagram - **Race Prevention**: Ensures spec changes are never missed due to timing - **Condition-Based**: Uses Ready condition data as a reliable proxy for tracking spec processing status -#### 2.1.2 Time-Based Reconciliation (Max Age Intervals) +#### 2.1.3 Time-Based Reconciliation (Max Age Intervals) Time-based reconciliation ensures **eventual consistency** by publishing events periodically, even when specs haven't changed. This handles external state drift and transient failures. @@ -148,42 +147,44 @@ Sentinel uses two configurable max age intervals based on the resource's status **Decision Logic:** -When the resource's `generation` matches the `Ready` condition's `ObservedGeneration` (indicating the condition reflects the current state), Sentinel checks if enough time has elapsed: +At this point, we know `status.last_updated` is not zero, so we can use it as the reference timestamp: -1. Calculate reference timestamp: - - If `status.last_updated` exists → use it (adapter has processed resource) - - Otherwise → use `created_time` (new resource never processed) - -2. Determine max age interval: +1. Determine max age interval based on ready status: - If resource is ready (`Ready` condition status == True) → use `max_age_ready` (default: 30m) - If resource is not ready (`Ready` condition status == False) → use `max_age_not_ready` (default: 10s) -3. Calculate next event time: - ```text - next_event = reference_time + max_age_interval - ``` +2. Calculate next event time: + - `next_event = last_updated + max_age_interval` -4. Compare with current time: +3. Compare with current time: - If `now >= next_event` → **Publish event** (reason: "max age exceeded") - Otherwise → **Skip** (reason: "max age not exceeded") -**Flow Diagram:** +#### 2.1.4 Complete Decision Flow + +The three reconciliation checks work together in priority order to determine when to publish events: ```mermaid graph TD - A[Determine Reference Time] --> B{last_updated exists?} - B -->|Yes| C[Use last_updated] - B -->|No| D[Use created_time] - C --> E{Resource Ready?} - D --> E - E -->|Yes| F[Max Age = 30m] - E -->|No| G[Max Age = 10s] - F --> H{now >= reference + max_age?} - G --> H - H -->|Yes| I[Publish: max age exceeded] - H -->|No| J[Skip: within max age] + START[Poll Resource] --> CHECK1{last_updated == zero?} + CHECK1 -->|Yes| PUB1[Publish: never processed] + CHECK1 -->|No| CHECK2{Generation > ObservedGeneration?} + CHECK2 -->|Yes| PUB2[Publish: generation changed] + CHECK2 -->|No| CHECK3{Resource Ready?} + CHECK3 -->|Yes| AGE1[Max Age = 30m] + CHECK3 -->|No| AGE2[Max Age = 10s] + AGE1 --> CHECK4{now >= last_updated + max_age?} + AGE2 --> CHECK4 + CHECK4 -->|Yes| PUB3[Publish: max age exceeded] + CHECK4 -->|No| SKIP[Skip: within max age] ``` +**Key Takeaways:** + +- **Never-processed takes absolute priority** - New resources always publish immediately +- **State changes override max age** - Spec changes don't wait for intervals +- **Max age is the fallback** - Ensures eventual consistency when nothing else triggers + ### 2.2 Resource Filtering Resource filtering enables **horizontal scaling** by allowing operators to distribute resources across multiple Sentinel instances using label-based selectors. @@ -362,7 +363,7 @@ The `message_data` field defines the CloudEvents payload structure using **Commo | Variable | Type | Description | Example Fields | |----------|------|-------------|----------------| | `resource` | Resource | The HyperFleet resource | `id`, `kind`, `href`, `generation`, `status`, `labels`, `created_time` | -| `reason` | string | Decision engine reason | `"max age exceeded"`, `"generation changed"` | +| `reason` | string | Decision engine reason | `"generation changed"`, `"never processed"`, `"max age exceeded"` | **CEL Expression Syntax:** @@ -652,7 +653,7 @@ Follow this checklist to ensure successful Sentinel deployment and operation. Trigger cycle completed total=15 published=3 skipped=12 duration=0.125s topic=hyperfleet-dev-clusters subset=clusters ``` - `count` - Number of resources fetched from the API matching the resource selector - - `published` - Number of events published (generation changed or max age exceeded) + - `published` - Number of events published (generation changed, never processed, or max age exceeded) - `skipped` - Number of resources skipped (no reconciliation needed) For detailed deployment guidance, see [docs/running-sentinel.md](running-sentinel.md) diff --git a/internal/engine/decision.go b/internal/engine/decision.go index 5f6bb3b..e989fd1 100644 --- a/internal/engine/decision.go +++ b/internal/engine/decision.go @@ -11,6 +11,7 @@ import ( const ( ReasonMaxAgeExceeded = "max age exceeded" ReasonGenerationChanged = "generation changed" + ReasonNeverProcessed = "never processed" ReasonNilResource = "resource is nil" ReasonZeroNow = "now time is zero" ) @@ -38,11 +39,12 @@ type Decision struct { // Evaluate determines if an event should be published for the resource. // // Decision Logic (in priority order): -// 1. Generation-based reconciliation: If resource.Generation > status.ObservedGeneration, +// 1. Never-processed reconciliation: If status.LastUpdated is zero, publish immediately +// (resource has never been processed by any adapter) +// 2. Generation-based reconciliation: If resource.Generation > status.ObservedGeneration, // publish immediately (spec has changed, adapter needs to reconcile) -// 2. Time-based reconciliation: If max age exceeded since last update, publish +// 3. Time-based reconciliation: If max age exceeded since last update, publish // - Uses status.LastUpdated as reference timestamp -// - If LastUpdated is zero (never processed), falls back to created_time // // Max Age Intervals: // - Resources with Ready=true: maxAgeReady (default 30m) @@ -71,6 +73,16 @@ func (e *DecisionEngine) Evaluate(resource *client.Resource, now time.Time) Deci } } + // Check if resource has never been processed by an adapter + // LastUpdated is zero means no adapter has updated the status yet + // This ensures first-time resources are published immediately + if resource.Status.LastUpdated.IsZero() { + return Decision{ + ShouldPublish: true, + Reason: ReasonNeverProcessed, + } + } + // Check for generation mismatch // This triggers immediate reconciliation regardless of max age if resource.Generation > resource.Status.ObservedGeneration { @@ -81,12 +93,9 @@ func (e *DecisionEngine) Evaluate(resource *client.Resource, now time.Time) Deci } // Determine the reference timestamp for max age calculation - // Use LastUpdated if available (adapter has processed the resource) - // Otherwise fall back to created_time (resource is newly created) + // At this point, we know LastUpdated is not zero (checked above) + // so we can use it directly for the max age calculation referenceTime := resource.Status.LastUpdated - if referenceTime.IsZero() { - referenceTime = resource.CreatedTime - } // Determine the appropriate max age based on resource ready status var maxAge time.Duration diff --git a/internal/engine/decision_test.go b/internal/engine/decision_test.go index aaf3a9a..05c1eaf 100644 --- a/internal/engine/decision_test.go +++ b/internal/engine/decision_test.go @@ -121,25 +121,24 @@ func TestDecisionEngine_Evaluate(t *testing.T) { ready bool wantShouldPublish bool }{ - // Zero LastUpdated tests - should fall back to created_time - // These tests use the test factory default (created 1 hour ago) + // Zero LastUpdated tests - should publish immediately as never processed { name: "zero LastUpdated - ready", ready: true, - lastUpdated: time.Time{}, // Zero time - will use created_time + lastUpdated: time.Time{}, // Zero time - never processed now: now, wantShouldPublish: true, - wantReasonContains: "max age exceeded", - description: "Resources with zero LastUpdated should use created_time and publish (created > 30m ago)", + wantReasonContains: ReasonNeverProcessed, + description: "Resources with zero LastUpdated should publish immediately (never processed)", }, { name: "zero LastUpdated - not ready", ready: false, - lastUpdated: time.Time{}, // Zero time - will use created_time + lastUpdated: time.Time{}, // Zero time - never processed now: now, wantShouldPublish: true, - wantReasonContains: "max age exceeded", - description: "Resources with zero LastUpdated should use created_time and publish (created > 10s ago)", + wantReasonContains: ReasonNeverProcessed, + description: "Resources with zero LastUpdated should publish immediately (never processed)", }, // Not-ready resources (10s max age) @@ -413,8 +412,9 @@ func TestDecisionEngine_Evaluate_InvalidInputs(t *testing.T) { } } -// TestDecisionEngine_Evaluate_CreatedTimeFallback tests that created_time is used when lastUpdated is zero -func TestDecisionEngine_Evaluate_CreatedTimeFallback(t *testing.T) { +// TestDecisionEngine_Evaluate_NeverProcessedResources tests the never-processed reconciliation +// When lastUpdated is zero, resources are published immediately with "never processed" reason +func TestDecisionEngine_Evaluate_NeverProcessedResources(t *testing.T) { engine := newTestEngine() now := time.Now() @@ -428,49 +428,31 @@ func TestDecisionEngine_Evaluate_CreatedTimeFallback(t *testing.T) { wantShouldPublish bool }{ { - name: "zero lastUpdated - created 11s ago - not ready", - createdTime: now.Add(-11 * time.Second), - lastUpdated: time.Time{}, // Zero - should use created_time + name: "never processed - not ready", + createdTime: now.Add(-1 * time.Hour), // Doesn't affect decision + lastUpdated: time.Time{}, // Zero - never processed ready: false, wantShouldPublish: true, - wantReasonContains: "max age exceeded", - description: "Should use created_time and publish (11s > 10s max age)", - }, - { - name: "zero lastUpdated - created 5s ago - not ready", - createdTime: now.Add(-5 * time.Second), - lastUpdated: time.Time{}, // Zero - should use created_time - ready: false, - wantShouldPublish: false, - wantReasonContains: "max age not exceeded", - description: "Should use created_time and not publish (5s < 10s max age)", + wantReasonContains: ReasonNeverProcessed, + description: "Should publish immediately (never processed)", }, { - name: "zero lastUpdated - created 31m ago - ready", - createdTime: now.Add(-31 * time.Minute), - lastUpdated: time.Time{}, // Zero - should use created_time + name: "never processed - ready", + createdTime: now.Add(-1 * time.Hour), // Doesn't affect decision + lastUpdated: time.Time{}, // Zero - never processed ready: true, wantShouldPublish: true, - wantReasonContains: "max age exceeded", - description: "Should use created_time and publish (31m > 30m max age)", - }, - { - name: "zero lastUpdated - created 15m ago - ready", - createdTime: now.Add(-15 * time.Minute), - lastUpdated: time.Time{}, // Zero - should use created_time - ready: true, - wantShouldPublish: false, - wantReasonContains: "max age not exceeded", - description: "Should use created_time and not publish (15m < 30m max age)", + wantReasonContains: ReasonNeverProcessed, + description: "Should publish immediately (never processed)", }, { - name: "non-zero lastUpdated - should ignore created_time", - createdTime: now.Add(-1 * time.Hour), // Created long ago + name: "processed resource - use lastUpdated for max age", + createdTime: now.Add(-1 * time.Hour), // Irrelevant once processed lastUpdated: now.Add(-5 * time.Second), // Updated recently ready: false, wantShouldPublish: false, wantReasonContains: "max age not exceeded", - description: "Should use lastUpdated, not created_time (5s < 10s max age)", + description: "Should use lastUpdated for max age calculation (5s < 10s max age)", }, } @@ -551,7 +533,7 @@ func TestDecisionEngine_Evaluate_GenerationBasedReconciliation(t *testing.T) { lastUpdated: now, wantShouldPublish: true, wantReasonContains: ReasonGenerationChanged, - description: "First reconciliation should be triggered by generation (never processed)", + description: "Generation mismatch triggers immediate reconciliation", }, // Generation in sync tests - should follow normal max age logic From e89f70e5cd87ec9cc45fbfcec41313af8aa5cde7 Mon Sep 17 00:00:00 2001 From: dawang Date: Fri, 20 Mar 2026 11:49:28 +0800 Subject: [PATCH 2/2] HYPERFLEET-733 - test: add precedence test for overlapping decision checks --- internal/engine/decision_test.go | 88 ++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/internal/engine/decision_test.go b/internal/engine/decision_test.go index 05c1eaf..908fbd0 100644 --- a/internal/engine/decision_test.go +++ b/internal/engine/decision_test.go @@ -624,3 +624,91 @@ func TestDecisionEngine_Evaluate_GenerationBasedReconciliation(t *testing.T) { }) } } + +// TestDecisionEngine_Evaluate_NeverProcessedPrecedence tests the precedence of checks +// when both "never processed" and "generation changed" conditions are true. +// +// For brand-new resources (generation=1, observedGeneration=0, lastUpdated=zero), +// both the LastUpdated.IsZero() check and the generation mismatch check would fire. +// This test ensures that the "never processed" check takes precedence since it runs +// first, and protects against future reordering of these checks. +func TestDecisionEngine_Evaluate_NeverProcessedPrecedence(t *testing.T) { + engine := newTestEngine() + now := time.Now() + + tests := []struct { + lastUpdated time.Time + name string + wantReasonContains string + description string + generation int32 + observedGeneration int32 + ready bool + wantShouldPublish bool + }{ + { + name: "brand-new resource - never processed wins over generation changed", + generation: 1, + observedGeneration: 0, + ready: false, + lastUpdated: time.Time{}, // Zero - triggers "never processed" + wantShouldPublish: true, + wantReasonContains: ReasonNeverProcessed, // Should be "never processed", not "generation changed" + description: "Both checks fire, but never-processed check runs first and wins", + }, + { + name: "brand-new resource - ready - never processed wins", + generation: 1, + observedGeneration: 0, + ready: true, + lastUpdated: time.Time{}, // Zero - triggers "never processed" + wantShouldPublish: true, + wantReasonContains: ReasonNeverProcessed, + description: "Both checks fire, but never-processed check runs first and wins (ready)", + }, + { + name: "generation mismatch with non-zero lastUpdated - generation changed wins", + generation: 1, + observedGeneration: 0, + ready: true, + lastUpdated: now, // Non-zero - skips "never processed" check + wantShouldPublish: true, + wantReasonContains: ReasonGenerationChanged, // Should be "generation changed" + description: "Only generation check fires since lastUpdated is non-zero", + }, + { + name: "multiple generation changes on never-processed resource", + generation: 5, + observedGeneration: 0, + ready: false, + lastUpdated: time.Time{}, // Zero - triggers "never processed" + wantShouldPublish: true, + wantReasonContains: ReasonNeverProcessed, // Still "never processed", not "generation changed" + description: "Never-processed check takes precedence even with large generation gap", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resource := newTestResourceWithGeneration( + testResourceID, + testResourceKind, + tt.ready, + tt.lastUpdated, + tt.generation, + tt.observedGeneration, + ) + decision := engine.Evaluate(resource, now) + + assertDecision(t, decision, tt.wantShouldPublish, tt.wantReasonContains) + + // Additional context on failure + if t.Failed() { + t.Logf("Test description: %s", tt.description) + t.Logf("Generation: %d, ObservedGeneration: %d", tt.generation, tt.observedGeneration) + t.Logf("LastUpdated.IsZero(): %v", tt.lastUpdated.IsZero()) + t.Logf("Expected reason containing: %q, got: %q", tt.wantReasonContains, decision.Reason) + } + }) + } +}