Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 100 additions & 5 deletions docs/api-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ POST /api/hyperfleet/v1/clusters/{cluster_id}/statuses
"status": "False",
"reason": "AwaitingAdapters",
"message": "Waiting for adapters to report status",
"observed_generation": 0,
"observed_generation": 1,
"created_time": "2025-01-01T00:00:00Z",
"last_updated_time": "2025-01-01T00:00:00Z",
"last_transition_time": "2025-01-01T00:00:00Z"
Expand All @@ -65,7 +65,7 @@ POST /api/hyperfleet/v1/clusters/{cluster_id}/statuses
"status": "False",
"reason": "AwaitingAdapters",
"message": "Waiting for adapters to report status",
"observed_generation": 0,
"observed_generation": 1,
"created_time": "2025-01-01T00:00:00Z",
"last_updated_time": "2025-01-01T00:00:00Z",
"last_transition_time": "2025-01-01T00:00:00Z"
Expand Down Expand Up @@ -249,6 +249,101 @@ The status uses Kubernetes-style conditions instead of a single phase field:
- One adapter reports `Available=False` for `observed_generation=1` `Available` transitions to `False`
- One adapter reports `Available=False` for `observed_generation=2` `Available` keeps its `True` status

### Aggregation logic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Description of the aggregation logic for the resource status conditions

- An API that stores resources entities (clusters, nodepools)
- A sentinel that polls the API for changes and triggers messages
- Instances of "adapters":
- Read the messages
- Reconcile the state with the world
- Report back to the API, using statuses "conditions"

Resources keep track of its status, which is affected by the reports from adapters

- Each resource keeps a `generation` property that gets increased on every change
- Adapters associated with a resource, report their state as an array of adapter conditions
- Three of these conditions are always mandatory : `Available`, `Applied`, `Health`
- If one of the mandatory conditions is missing, the report is discarded
- A `observed_generation` field indicating the generation associated with the report
- `observed_time` for when the adapter work was done
- If the reported `observed_generation` is lower than the already stored `observed_generation` for that adapter, the report is discarded
- Each resource has a list of associated "adapters" used to compute the aggregated status.conditions
- Each resource "status.conditions" is array property composed of:
- The `Available` condition of each adapter, named as `<adapter-name>Successful`
- 2 aggregated conditions: `Ready` and `Available` computed from the array of `Available` resource statuses conditions
- Only `Available` condition from adapters is used to compute aggregated conditions

The whole API spec is at: <https://raw.githubusercontent.com/openshift-hyperfleet/hyperfleet-api/refs/heads/main/openapi/openapi.yaml>

The aggregation logic for a resource (cluster/nodepool) works as follows.

**Notation:**

- `X` = report's `observed_generation`
- `G` = resource's current `generation`
- `statuses[]` = all stored adapter condition reports
- `lut` = adapter's `last_report_time`
- `ltt` = `last_transition_time`
- `obs_gen` = `observed_generation`
- `obs_time` = report's `observed_time`
- `—` = no change

---

#### Discard / Reject Rules

Checked before any aggregation. A discarded or rejected report causes no state change.

| Rule | Condition | Outcome |
|---|---|---|
| `obs_gen` too high | report `observed_generation` > resource `generation` | Discarded |
| Stale adapter report | report `observed_generation` < adapter's stored `observed_generation` | Discarded |
| Missing mandatory conditions | Missing any of `Available`, `Applied`, `Health`, or value not in `{True, False, Unknown}` | Discarded |
| Available=Unknown | Report is valid but `Available=Unknown` | Discarded |

---

#### Lifecycle Events

| Event | Condition | Target | → status | → obs_gen | → lut | → ltt |
|---|---|---|---|---|---|---|
| Creation | — | `Ready` | `False` | `1` | `now` | `now` |
| Creation | — | `Available` | `False` | `1` | `now` | `now` |
| Change (→G) | Was `Ready=True` | `Ready` | `False` | `G` | `now` | `now` |
| Change (→G) | Was `Ready=False` | `Ready` | `False` | `G` | `now` | `—` |
| Change (→G) | — | `Available` | unchanged | unchanged | `—` | `—` |

---

#### Adapter Report Aggregation Matrix

The **Ready** check and **Available** check are independent — both can apply to the same incoming report.

##### Report `Available=True` (obs_gen = X)

| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
|---|---|---|---|---|---|---|
| `Ready` | `Ready=True` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | unchanged | `min(adapter last_report_time)` | `—` | `—` |
| `Ready` | `Ready=False` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | **`True`** | `min(adapter last_report_time)` | `obs_time` | `—` |
| `Ready` | `Ready=False` | Any required adapter has no stored status | `—` | `now` | `—` | `—` |
| `Ready` | any | Conditions above not met | `—` | `—` | `—` | `—` |
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | **`True`** | `min(adapter last_report_time)` | `obs_time` | `X` |
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | unchanged | `min(adapter last_report_time)` | `—` | `X` |
| `Available` | any | Conditions above not met | `—` | `—` | `—` | `—` |

##### Report `Available=False` (obs_gen = X)

| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
|---|---|---|---|---|---|---|
| `Ready` | `Ready=False` | `X==G` | unchanged | `min(adapter last_report_time)` | `—` | `—` |
| `Ready` | `Ready=True` | `X==G` | **`False`** | `obs_time` | `obs_time` | `—` |
| `Ready` | any | Conditions above not met | `—` | `—` | `—` | `—` |
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | unchanged | `min(adapter last_report_time)` | `—` | `X` |
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | **`False`** | `obs_time` | `obs_time` | `X` |
| `Available` | any | Conditions above not met | `—` | `—` | `—` | `—` |

## NodePool Management

### Endpoints
Expand Down Expand Up @@ -307,7 +402,7 @@ POST /api/hyperfleet/v1/clusters/{cluster_id}/nodepools/{nodepool_id}/statuses
"status": "False",
"reason": "AwaitingAdapters",
"message": "Waiting for adapters to report status",
"observed_generation": 0,
"observed_generation": 1,
"created_time": "2025-01-01T00:00:00Z",
"last_updated_time": "2025-01-01T00:00:00Z",
"last_transition_time": "2025-01-01T00:00:00Z"
Expand All @@ -317,7 +412,7 @@ POST /api/hyperfleet/v1/clusters/{cluster_id}/nodepools/{nodepool_id}/statuses
"status": "False",
"reason": "AwaitingAdapters",
"message": "Waiting for adapters to report status",
"observed_generation": 0,
"observed_generation": 1,
"created_time": "2025-01-01T00:00:00Z",
"last_updated_time": "2025-01-01T00:00:00Z",
"last_transition_time": "2025-01-01T00:00:00Z"
Expand Down Expand Up @@ -456,7 +551,7 @@ The status object contains synthesized conditions computed from adapter reports:
- All above fields plus:
- `observed_generation` - Generation this condition reflects
- `created_time` - When condition was first created (API-managed)
- `last_updated_time` - When adapter last reported (API-managed, from AdapterStatus.last_report_time)
- `last_updated_time` - When this condition was last refreshed (API-managed). For **Available**, always the evaluation time. For **Ready**: when Ready=True, the minimum of `last_report_time` across all required adapters that report Available=True at the current generation; when Ready=False, the evaluation time (so consumers can detect staleness).
- `last_transition_time` - When status last changed (API-managed)

## Parameter Restrictions
Expand Down
58 changes: 39 additions & 19 deletions pkg/dao/adapter_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ type AdapterStatusDao interface {
Get(ctx context.Context, id string) (*api.AdapterStatus, error)
Create(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, error)
Replace(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, error)
Upsert(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, error)
// Upsert creates or replaces an adapter status. The second return value reports whether the
// write was actually applied: false means the incoming observed_generation was stale (either
// detected immediately or lost a race to a concurrent write with a higher generation).
Upsert(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, bool, error)
Delete(ctx context.Context, id string) error
FindByResource(ctx context.Context, resourceType, resourceID string) (api.AdapterStatusList, error)
FindByResourcePaginated(
Expand Down Expand Up @@ -72,49 +75,66 @@ func (d *sqlAdapterStatusDao) Replace(
return adapterStatus, nil
}

// Upsert creates or updates an adapter status based on resource_type, resource_id, and adapter
// This implements the upsert semantic required by the new API spec
// Upsert creates or updates an adapter status based on resource_type, resource_id, and adapter.
// The UPDATE path includes a WHERE predicate on observed_generation so that a stale write can
// never overwrite a newer generation, even under concurrent requests.
// Returns (status, true, nil) when the write was applied, or (existing, false, nil) when the
// incoming observed_generation was stale (fast-path check) or lost a race to a concurrent write.
func (d *sqlAdapterStatusDao) Upsert(
ctx context.Context, adapterStatus *api.AdapterStatus,
) (*api.AdapterStatus, error) {
) (*api.AdapterStatus, bool, error) {
g2 := (*d.sessionFactory).New(ctx)

// Try to find existing adapter status
// Try to find existing adapter status.
existing, err := d.FindByResourceAndAdapter(
ctx, adapterStatus.ResourceType, adapterStatus.ResourceID, adapterStatus.Adapter,
)

if err != nil {
// If not found, create new
if errors.Is(err, gorm.ErrRecordNotFound) {
return d.Create(ctx, adapterStatus)
created, createErr := d.Create(ctx, adapterStatus)
if createErr != nil {
return nil, false, createErr
}
return created, true, nil
}
// Other errors
db.MarkForRollback(ctx, err)
return nil, err
return nil, false, err
}

// Fast-path stale check: if the stored generation is already strictly newer, skip the write.
if existing.ObservedGeneration > adapterStatus.ObservedGeneration {
return existing, false, nil
}

// Update existing record
// Keep the original ID and CreatedTime
// Prepare the update: keep original ID and CreatedTime.
adapterStatus.ID = existing.ID
if existing.CreatedTime != nil {
adapterStatus.CreatedTime = existing.CreatedTime
}

// Update LastReportTime to now
// Update LastReportTime to now.
now := time.Now()
adapterStatus.LastReportTime = &now

// Preserve LastTransitionTime for conditions whose status hasn't changed
// Preserve LastTransitionTime for conditions whose status hasn't changed.
adapterStatus.Conditions = preserveLastTransitionTime(existing.Conditions, adapterStatus.Conditions)

// Save (update) the record
if err := g2.Omit(clause.Associations).Save(adapterStatus).Error; err != nil {
db.MarkForRollback(ctx, err)
return nil, err
// Atomic conditional UPDATE: only applies when the stored observed_generation is still <=
// the incoming one. A concurrent request that wrote a higher generation will cause
// RowsAffected to be 0, signalling a no-op.
result := g2.Omit(clause.Associations).
Where("observed_generation <= ?", adapterStatus.ObservedGeneration).
Save(adapterStatus)
if result.Error != nil {
db.MarkForRollback(ctx, result.Error)
return nil, false, result.Error
}
if result.RowsAffected == 0 {
// A concurrent write with a higher generation won the race.
return existing, false, nil
}

return adapterStatus, nil
return adapterStatus, true, nil
}

func (d *sqlAdapterStatusDao) Delete(ctx context.Context, id string) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/services/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@ func NewClusterService(dao, adapterStatusDao, config) ClusterService
## Status Aggregation

`UpdateClusterStatusFromAdapters()` in `cluster.go` synthesizes two top-level conditions:
- **Available**: True if all required adapters report `Available=True` (any generation)

- **Available**: True when all required adapters report `Available=True` at the same `observed_generation` (not necessarily the current resource generation). When adapters are at different generations, Available preserves its previous value (last-known-good semantics). `ObservedGeneration` = the common adapter generation when consistent; preserved from existing state otherwise.
- **Ready**: True if all adapters report `Available=True` AND `observed_generation` matches current generation

Ready's `LastUpdatedTime` is computed in `status_aggregation.computeReadyLastUpdated`: when Ready=False it is the minimum of `LastReportTime` across all required adapters (falls back to `now` if any required adapter has no stored status yet); when Ready=True it is the minimum of `LastReportTime` across required adapters that have Available=True at the current generation. True→False transitions override this with the triggering adapter's `observedTime`.

`ProcessAdapterStatus()` validates mandatory conditions (`Available`, `Applied`, `Health`) before persisting. Rejects `Available=Unknown` on subsequent reports (only allowed on first report).

## GenericService

`generic.go` provides `List()` with pagination, search, and ordering.

- `ListArguments` has Page, Size, Search, OrderBy, Fields, Preloads
- Search validation: `SearchDisallowedFields` map blocks searching certain fields per resource type
- Default ordering: `created_time desc`
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/adapter_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func (s *sqlAdapterStatusService) Replace(
func (s *sqlAdapterStatusService) Upsert(
ctx context.Context, adapterStatus *api.AdapterStatus,
) (*api.AdapterStatus, *errors.ServiceError) {
adapterStatus, err := s.adapterStatusDao.Upsert(ctx, adapterStatus)
result, _, err := s.adapterStatusDao.Upsert(ctx, adapterStatus)
if err != nil {
return nil, handleCreateError("AdapterStatus", err)
}
return adapterStatus, nil
return result, nil
}

func (s *sqlAdapterStatusService) Delete(ctx context.Context, id string) *errors.ServiceError {
Expand Down
Loading