Skip to content

Commit 5a50fd8

Browse files
committed
HYPERFLEET-706 - fix: lastUpdateTime for Ready
1 parent 8bafe72 commit 5a50fd8

25 files changed

Lines changed: 3261 additions & 341 deletions

File tree

docs/api-resources.md

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,100 @@ The status uses Kubernetes-style conditions instead of a single phase field:
249249
- One adapter reports `Available=False` for `observed_generation=1` `Available` transitions to `False`
250250
- One adapter reports `Available=False` for `observed_generation=2` `Available` keeps its `True` status
251251

252+
### Aggregation logic
253+
254+
Description of the aggregation logic for the resource status conditions
255+
256+
- An API that stores resources entities (clusters, nodepools)
257+
- A sentinel that polls the API for changes and triggers messages
258+
- Instances of "adapters":
259+
- Read the messages
260+
- Reconcile the state with the world
261+
- Report back to the API, using statuses "conditions"
262+
263+
Resources keep track of its status, which is affected by the reports from adapters
264+
265+
- Each resource keeps a `generation` property that gets increased on every change
266+
- Adapters associated with a resource, report their state as an array of adapter conditions
267+
- Three of these conditions are always mandatory : `Available`, `Applied`, `Health`
268+
- If one of the mandatory conditions is missing, the report is discarded
269+
- A `observed_generation` field indicating the generation associated with the report
270+
- `observed_time` for when the adapter work was done
271+
- If the reported `observed_generation` is lower than the already stored `observed_generation` for that adapter, the report is discarded
272+
- Each resource has a list of associated "adapters" used to compute the aggregated status.conditions
273+
- Each resource "status.conditions" is array property composed of:
274+
- The `Available` condition of each adapter, named as `<adapter-name>Successful`
275+
- 2 aggregated conditions: `Ready` and `Available` computed from the array of `Available` resource statuses conditions
276+
- Only `Available` condition from adapters is used to compute aggregated conditions
277+
278+
The whole API spec is at: <https://raw.githubusercontent.com/openshift-hyperfleet/hyperfleet-api/refs/heads/main/openapi/openapi.yaml>
279+
280+
The aggregation logic for a resource (cluster/nodepool) works as follows.
281+
282+
**Notation:**
283+
284+
- `X` = report's `observed_generation`
285+
- `G` = resource's current `generation`
286+
- `statuses[]` = all stored adapter condition reports
287+
- `lut` = `last_update_time`
288+
- `ltt` = `last_transition_time`
289+
- `obs_gen` = `observed_generation`
290+
- `obs_time` = report's `observed_time`
291+
- `` = no change
292+
293+
---
294+
295+
#### Discard / Reject Rules
296+
297+
Checked before any aggregation. A discarded or rejected report causes no state change.
298+
299+
| Rule | Condition | Outcome |
300+
|---|---|---|
301+
| `obs_gen` too high | report `observed_generation` > resource `generation` | Discarded |
302+
| Stale adapter report | report `observed_generation` < adapter's stored `observed_generation` | Discarded |
303+
| Missing mandatory conditions | Missing any of `Available`, `Applied`, `Health`, or value not in `{True, False, Unknown}` | Discarded |
304+
| Available=Unknown | Report is valid but `Available=Unknown` | Discarded |
305+
306+
---
307+
308+
#### Lifecycle Events
309+
310+
| Event | Condition | Target | → status | → obs_gen | → lut | → ltt |
311+
|---|---|---|---|---|---|---|
312+
| Creation || `Ready` | `False` | `1` | `now` | `now` |
313+
| Creation || `Available` | `False` | `1` | `now` | `now` |
314+
| Change (→G) | Was `Ready=True` | `Ready` | `False` | `G` | `now` | `now` |
315+
| Change (→G) | Was `Ready=False` | `Ready` | `False` | `G` | `now` | `` |
316+
| Change (→G) || `Available` | unchanged | unchanged | `` | `` |
317+
318+
---
319+
320+
#### Adapter Report Aggregation Matrix
321+
322+
The **Ready** check and **Available** check are independent — both can apply to the same incoming report.
323+
324+
##### Report `Available=True` (obs_gen = X)
325+
326+
| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
327+
|---|---|---|---|---|---|---|
328+
| `Ready` | `Ready=True` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | unchanged | `min(statuses[].lut)` | `` | `` |
329+
| `Ready` | `Ready=False` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | **`True`** | `min(statuses[].lut)` | `obs_time` | `` |
330+
| `Ready` | any | Conditions above not met | `` | `` | `` | `` |
331+
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | **`True`** | `min(statuses[].lut)` | `obs_time` | `X` |
332+
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | unchanged | `min(statuses[].lut)` | `` | `X` |
333+
| `Available` | any | Conditions above not met | `` | `` | `` | `` |
334+
335+
##### Report `Available=False` (obs_gen = X)
336+
337+
| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
338+
|---|---|---|---|---|---|---|
339+
| `Ready` | `Ready=False` | `X==G` | unchanged | `min(statuses[].lut)` | `` | `` |
340+
| `Ready` | `Ready=True` | `X==G` | **`False`** | `obs_time` | `obs_time` | `` |
341+
| `Ready` | any | Conditions above not met | `` | `` | `` | `` |
342+
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | unchanged | `min(statuses[].lut)` | `` | `X` |
343+
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | **`False`** | `obs_time` | `obs_time` | `X` |
344+
| `Available` | any | Conditions above not met | `` | `` | `` | `` |
345+
252346
## NodePool Management
253347

254348
### Endpoints
@@ -456,7 +550,7 @@ The status object contains synthesized conditions computed from adapter reports:
456550
- All above fields plus:
457551
- `observed_generation` - Generation this condition reflects
458552
- `created_time` - When condition was first created (API-managed)
459-
- `last_updated_time` - When adapter last reported (API-managed, from AdapterStatus.last_report_time)
553+
- `last_updated_time` - When this condition was last refreshed (API-managed). For **Available**, always the evaluation time. For **Ready**: when Ready=True, the minimum of `last_report_time` across all required adapters that report Available=True at the current generation; when Ready=False, the evaluation time (so consumers can detect staleness).
460554
- `last_transition_time` - When status last changed (API-managed)
461555

462556
## Parameter Restrictions

pkg/config/loader.go

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -227,41 +227,43 @@ func (l *ConfigLoader) validateConfig(config *ApplicationConfig) error {
227227
}
228228

229229
// handleJSONArrayEnvVars processes environment variables containing JSON arrays
230-
// Viper doesn't automatically parse JSON from env vars, so we handle this explicitly
231-
// Used for: HYPERFLEET_ADAPTERS_CLUSTER_ADAPTERS and HYPERFLEET_ADAPTERS_NODEPOOL_ADAPTERS
230+
// Viper doesn't automatically parse JSON from env vars, so we handle this explicitly.
231+
// Each viper key is filled from the first non-empty env var in the list (canonical name first, then aliases).
232232
func (l *ConfigLoader) handleJSONArrayEnvVars(ctx context.Context) error {
233-
// Map of env var name -> viper key
234-
jsonArrayMappings := map[string]string{
235-
EnvPrefix + "_ADAPTERS_REQUIRED_CLUSTER": "adapters.required.cluster",
236-
EnvPrefix + "_ADAPTERS_REQUIRED_NODEPOOL": "adapters.required.nodepool",
233+
// viper key -> ordered list of env var names (first one set wins)
234+
clusterEnvVars := []string{
235+
EnvPrefix + "_ADAPTERS_REQUIRED_CLUSTER",
236+
EnvPrefix + "_CLUSTER_ADAPTERS", // alias for user convenience
237+
}
238+
nodepoolEnvVars := []string{
239+
EnvPrefix + "_ADAPTERS_REQUIRED_NODEPOOL",
240+
EnvPrefix + "_NODEPOOL_ADAPTERS", // alias for user convenience
237241
}
238242

239-
for envVar, viperKey := range jsonArrayMappings {
240-
jsonValue := os.Getenv(envVar)
241-
if jsonValue == "" {
242-
continue
243-
}
244-
245-
// Parse JSON array
246-
var arrayValue []string
247-
if err := json.Unmarshal([]byte(jsonValue), &arrayValue); err != nil {
248-
return fmt.Errorf("failed to parse %s as JSON array: %w (value: %s)", envVar, err, jsonValue)
243+
setFromEnvVars := func(viperKey string, envVars []string) error {
244+
for _, envVar := range envVars {
245+
jsonValue := os.Getenv(envVar)
246+
if jsonValue == "" {
247+
continue
248+
}
249+
var arrayValue []string
250+
if err := json.Unmarshal([]byte(jsonValue), &arrayValue); err != nil {
251+
return fmt.Errorf("failed to parse %s as JSON array: %w (value: %s)", envVar, err, jsonValue)
252+
}
253+
// Set() overrides Viper's auto-env CSV parsing so JSON arrays are correct.
254+
l.viper.Set(viperKey, arrayValue)
255+
logger.With(ctx, "env_var", envVar, "count", len(arrayValue)).Debug("Parsed JSON array from environment")
256+
return nil
249257
}
250-
251-
// Always set the parsed JSON array value to override Viper's auto-env CSV parsing.
252-
// Viper's AutomaticEnv treats comma-separated strings as arrays, incorrectly parsing
253-
// JSON arrays like '["a","b"]' as ["[\"a\"", "\"b\"]"] instead of ["a", "b"].
254-
//
255-
// We use Set() to ensure proper JSON parsing overrides Viper's CSV parsing.
256-
// This maintains ENV > Config > Default precedence for adapters.
257-
//
258-
// NOTE: Adapters currently have no CLI flags (see bindFlags line 494).
259-
// If CLI flags are added in the future, this code needs updating to check
260-
// if the value came from a flag before calling Set().
261-
l.viper.Set(viperKey, arrayValue)
262-
logger.With(ctx, "env_var", envVar, "count", len(arrayValue)).Debug("Parsed JSON array from environment")
258+
return nil
263259
}
264260

261+
if err := setFromEnvVars("adapters.required.cluster", clusterEnvVars); err != nil {
262+
return err
263+
}
264+
if err := setFromEnvVars("adapters.required.nodepool", nodepoolEnvVars); err != nil {
265+
return err
266+
}
265267
return nil
266268
}
267269

pkg/services/CLAUDE.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ func NewClusterService(dao, adapterStatusDao, config) ClusterService
2222
## Status Aggregation
2323

2424
`UpdateClusterStatusFromAdapters()` in `cluster.go` synthesizes two top-level conditions:
25-
- **Available**: True if all required adapters report `Available=True` (any generation)
25+
- **Available**: True if all required adapters report `Available=True` at ANY generation (last-known-good semantics). `ObservedGeneration` = minimum observed generation across qualifying adapters. When False, `ObservedGeneration` = current resource generation.
2626
- **Ready**: True if all adapters report `Available=True` AND `observed_generation` matches current generation
2727

28+
Ready's `LastUpdatedTime` is computed in `status_aggregation.computeReadyLastUpdated`: when Ready=False it is the minimum of `LastReportTime` across all required adapters (falls back to `now` if any required adapter has no stored status yet); when Ready=True it is the minimum of `LastReportTime` across required adapters that have Available=True at the current generation. True→False transitions override this with the triggering adapter's `observedTime`.
29+
2830
`ProcessAdapterStatus()` validates mandatory conditions (`Available`, `Applied`, `Health`) before persisting. Rejects `Available=Unknown` on subsequent reports (only allowed on first report).
2931

3032
## GenericService

pkg/services/cluster.go

Lines changed: 66 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,12 @@ func (s *sqlClusterService) Replace(ctx context.Context, cluster *api.Cluster) (
9696
return nil, handleUpdateError("Cluster", err)
9797
}
9898

99-
// REMOVED: Event creation - no event-driven components
100-
return cluster, nil
99+
updatedCluster, svcErr := s.UpdateClusterStatusFromAdapters(ctx, cluster.ID)
100+
if svcErr != nil {
101+
return nil, svcErr
102+
}
103+
104+
return updatedCluster, nil
101105
}
102106

103107
func (s *sqlClusterService) Delete(ctx context.Context, id string) *errors.ServiceError {
@@ -143,9 +147,22 @@ func (s *sqlClusterService) OnDelete(ctx context.Context, id string) error {
143147
return nil
144148
}
145149

146-
// UpdateClusterStatusFromAdapters aggregates adapter statuses into cluster status
150+
// UpdateClusterStatusFromAdapters aggregates adapter statuses into cluster status.
151+
// Uses time.Now() as the observed time (for generation-change recomputations).
152+
// Called from Create/Replace, so isLifecycleChange=true (Available frozen, Ready resets).
147153
func (s *sqlClusterService) UpdateClusterStatusFromAdapters(
148154
ctx context.Context, clusterID string,
155+
) (*api.Cluster, *errors.ServiceError) {
156+
return s.updateClusterStatusFromAdapters(ctx, clusterID, time.Now(), true)
157+
}
158+
159+
// updateClusterStatusFromAdapters is the internal implementation.
160+
// observedTime is the triggering adapter's observed_time (its LastReportTime) and is used
161+
// for transition timestamps in the synthetic conditions.
162+
// isLifecycleChange=true freezes Available and resets Ready.lut=now (Create/Replace path).
163+
// isLifecycleChange=false uses the normal adapter-report aggregation path.
164+
func (s *sqlClusterService) updateClusterStatusFromAdapters(
165+
ctx context.Context, clusterID string, observedTime time.Time, isLifecycleChange bool,
149166
) (*api.Cluster, *errors.ServiceError) {
150167
// Get the cluster
151168
cluster, err := s.clusterDao.Get(ctx, clusterID)
@@ -210,11 +227,14 @@ func (s *sqlClusterService) UpdateClusterStatusFromAdapters(
210227

211228
// Compute synthetic Available and Ready conditions
212229
availableCondition, readyCondition := BuildSyntheticConditions(
230+
ctx,
213231
cluster.StatusConditions,
214232
adapterStatuses,
215233
s.adapterConfig.RequiredClusterAdapters(),
216234
cluster.Generation,
217235
now,
236+
observedTime,
237+
isLifecycleChange,
218238
)
219239

220240
// Combine synthetic conditions with adapter conditions
@@ -238,13 +258,15 @@ func (s *sqlClusterService) UpdateClusterStatusFromAdapters(
238258
return cluster, nil
239259
}
240260

241-
// ProcessAdapterStatus handles the business logic for adapter status:
242-
// - Validates that all mandatory conditions (Available, Applied, Health) are present
243-
// - Rejects duplicate condition types
244-
// - For first status report: accepts Unknown Available condition to avoid data loss
245-
// - For subsequent reports: rejects Unknown Available condition to preserve existing valid state
246-
// - Uses complete replacement semantics: each update replaces all conditions for this adapter
247-
// - Returns (nil, nil) for discarded updates
261+
// ProcessAdapterStatus handles the business logic for adapter status.
262+
// Pre-processing rules applied in order (spec §2):
263+
// - Stale: discards if observed_generation < existing adapter generation
264+
// - P1: discards if observed_generation > resource generation (report ahead of resource)
265+
// - P2: rejects if mandatory conditions (Available, Applied, Health) are missing or have invalid status
266+
// - P3: discards if Available == Unknown (not processed per spec)
267+
//
268+
// Otherwise: upserts the status and triggers aggregation.
269+
// Returns (nil, nil) for discarded/rejected updates.
248270
func (s *sqlClusterService) ProcessAdapterStatus(
249271
ctx context.Context, clusterID string, adapterStatus *api.AdapterStatus,
250272
) (*api.AdapterStatus, *errors.ServiceError) {
@@ -256,65 +278,65 @@ func (s *sqlClusterService) ProcessAdapterStatus(
256278
return nil, errors.GeneralError("Failed to get adapter status: %s", findErr)
257279
}
258280
}
281+
// Stale check: discard if older than the adapter's last recorded generation.
259282
if existingStatus != nil && adapterStatus.ObservedGeneration < existingStatus.ObservedGeneration {
260-
// Discard stale status updates (older observed_generation).
261283
return nil, nil
262284
}
263285

264-
// Parse conditions from the adapter status
286+
// Parse conditions from the adapter status (needed for P2 and P3 before resource fetch).
265287
var conditions []api.AdapterCondition
266288
if len(adapterStatus.Conditions) > 0 {
267289
if err := json.Unmarshal(adapterStatus.Conditions, &conditions); err != nil {
268290
return nil, errors.GeneralError("Failed to unmarshal adapter status conditions: %s", err)
269291
}
270292
}
271293

272-
// Validate mandatory conditions and check for duplicates
294+
// P2: validate mandatory conditions (presence and valid status values).
273295
if errorType, conditionName := ValidateMandatoryConditions(conditions); errorType != "" {
274296
ctx = logger.WithClusterID(ctx, clusterID)
275297
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: %s condition %s",
276298
adapterStatus.Adapter, errorType, conditionName))
277299
return nil, nil
278300
}
279301

280-
// Check Available condition for Unknown status
281-
triggerAggregation := false
302+
// P3: discard if Available == Unknown (spec §2, all reports).
282303
for _, cond := range conditions {
283-
if cond.Type != api.ConditionTypeAvailable {
284-
continue
285-
}
286-
287-
triggerAggregation = true
288-
if cond.Status == api.AdapterConditionUnknown {
289-
if existingStatus != nil {
290-
// Non-first report && Available=Unknown → reject
291-
ctx = logger.WithClusterID(ctx, clusterID)
292-
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: subsequent Unknown Available",
293-
adapterStatus.Adapter))
294-
return nil, nil
295-
}
296-
// First report from this adapter: allow storing even with Available=Unknown
297-
// but skip aggregation since Unknown should not affect cluster-level conditions
298-
triggerAggregation = false
304+
if cond.Type == api.ConditionTypeAvailable && cond.Status == api.AdapterConditionUnknown {
305+
ctx = logger.WithClusterID(ctx, clusterID)
306+
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: Available=Unknown reports are not processed",
307+
adapterStatus.Adapter))
308+
return nil, nil
299309
}
300-
break
301310
}
302311

303-
// Upsert the adapter status (complete replacement)
304-
upsertedStatus, err := s.adapterStatusDao.Upsert(ctx, adapterStatus)
312+
// P1: discard if observed_generation is ahead of the current resource generation.
313+
// Checked after P2/P3 to avoid unnecessary resource fetches for invalid/Unknown reports.
314+
cluster, err := s.clusterDao.Get(ctx, clusterID)
305315
if err != nil {
306-
return nil, handleCreateError("AdapterStatus", err)
316+
return nil, handleGetError("Cluster", "id", clusterID, err)
317+
}
318+
if adapterStatus.ObservedGeneration > cluster.Generation {
319+
ctx = logger.WithClusterID(ctx, clusterID)
320+
logger.Info(ctx, fmt.Sprintf(
321+
"Discarding adapter status update from %s: observed_generation %d > resource generation %d",
322+
adapterStatus.Adapter, adapterStatus.ObservedGeneration, cluster.Generation))
323+
return nil, nil
307324
}
308325

309-
// Only trigger aggregation when triggerAggregation is true
310-
if triggerAggregation {
311-
if _, aggregateErr := s.UpdateClusterStatusFromAdapters(
312-
ctx, clusterID,
313-
); aggregateErr != nil {
314-
// Log error but don't fail the request - the status will be computed on next update
315-
ctx = logger.WithClusterID(ctx, clusterID)
316-
logger.WithError(ctx, aggregateErr).Warn("Failed to aggregate cluster status")
317-
}
326+
// Upsert the adapter status (complete replacement).
327+
upsertedStatus, upsertErr := s.adapterStatusDao.Upsert(ctx, adapterStatus)
328+
if upsertErr != nil {
329+
return nil, handleCreateError("AdapterStatus", upsertErr)
330+
}
331+
332+
// Trigger aggregation using the adapter's observed_time for transition timestamps.
333+
observedTime := time.Now()
334+
if upsertedStatus.LastReportTime != nil {
335+
observedTime = *upsertedStatus.LastReportTime
336+
}
337+
if _, aggregateErr := s.updateClusterStatusFromAdapters(ctx, clusterID, observedTime, false); aggregateErr != nil {
338+
ctx = logger.WithClusterID(ctx, clusterID)
339+
logger.WithError(ctx, aggregateErr).Warn("Failed to aggregate cluster status")
318340
}
319341

320342
return upsertedStatus, nil

0 commit comments

Comments
 (0)