0 {
fmt.Printf(" Workers: %d\n", crd.Workers)
} else {
- fmt.Printf(" Workers: %d (default)\n", kfg.konfig.Cluster().DefaultWorkers)
+ fmt.Printf(" Workers: %d (default)\n", kfg.konfig.Katalog().DefaultWorkers)
}
if crd.Queue.MaxQueueDepth > 0 {
@@ -141,7 +141,7 @@ func printBanner(kfg *orkestraKfg, konductor string) {
fmt.Printf(" Resync: %s\n", crd.Resync.String())
} else {
fmt.Printf(" Resync: %s (default)\n",
- kfg.konfig.Cluster().DefaultResync)
+ kfg.konfig.Katalog().DefaultResync)
}
if len(crd.DependsOn) > 0 {
diff --git a/cmd/internal/konstructor.go b/cmd/internal/konstructor.go
index ba5c3a3a..018644e3 100644
--- a/cmd/internal/konstructor.go
+++ b/cmd/internal/konstructor.go
@@ -288,13 +288,23 @@ func konstructOrkestra(kfg *konfig.Konfig, m *merger.Merger, ctx context.Context
// enabling cross-CRD observation with zero API server calls.
ktrlRegistry := kordinator.NewKordinatorRegistry()
+ // ββ 4e. CRD health map ββββββββββββββββββββββββββββββββββββββββββββββββββββ
+ // One CRDHealth per CRD β shared between the DependencyKordinator
+ // (which updates it on each reconcile) and the HTTP health routes
+ // (which read it on each request). All three reference the same pointers.
+ crdHealthMap := make(map[string]*kordinator.CRDHealth)
+ for _, crd := range kat.Enabled() {
+ gvk := crd.GVK().String()
+ crdHealthMap[gvk] = kordinator.NewCRDHealth(crd.Name)
+ }
+
logger.Debug().Msg("wiring CRDs into kordinator registry...")
finalizers := kfg.Finalizers()
for _, crd := range kat.Enabled() {
crd := crd
gvk := crd.GVK().String()
- crd.Workers = crd.SetWorkers(kfg.Cluster().DefaultWorkers)
+ crd.Workers = crd.SetWorkers(kfg.Katalog().DefaultWorkers)
object, _ := crd.GetRuntimeObjects()
@@ -408,6 +418,7 @@ func konstructOrkestra(kfg *konfig.Konfig, m *merger.Merger, ctx context.Context
return objCopy.DeepCopyObject().(domain.Object)
},
ktrlRegistry, // cross-CRD informer lookup via GetInformerByName
+ crdHealthMap, // cross-CRD health map via HealthProvider
providerRegistry, // aws:, mongodb:, etc. block dispatch
pStats, // per-CRD provider error rate tracking
)
@@ -432,17 +443,7 @@ func konstructOrkestra(kfg *konfig.Konfig, m *merger.Merger, ctx context.Context
logger.Debug().Str("gvk", gvk).Msg("CRD registered")
}
- // ββ 5a. CRD health map ββββββββββββββββββββββββββββββββββββββββββββββββββββ
- // One CRDHealth per CRD β shared between the DependencyKordinator
- // (which updates it on each reconcile) and the HTTP health routes
- // (which read it on each request). All three reference the same pointers.
- crdHealthMap := make(map[string]*kordinator.CRDHealth)
- for _, crd := range kat.Enabled() {
- gvk := crd.GVK().String()
- crdHealthMap[gvk] = kordinator.NewCRDHealth(crd.Name)
- }
-
- // ββ 5b. HTTP routes βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
+ // ββ 5. HTTP routes βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// All routes registered before hs.Start() β the mux is shared.
//
// Per-CRD routes:
@@ -549,9 +550,9 @@ func konstructOrkestra(kfg *konfig.Konfig, m *merger.Merger, ctx context.Context
defaultWq,
crdHealthMap,
orkHealth,
- kfg.Cluster().DefaultWorkers,
+ kfg.Katalog().DefaultWorkers,
katalog.NewDependencyGraph(kat),
- kfg.Cluster().ShutdownTimeout,
+ kfg.Katalog().ShutdownTimeout,
)
// ββ 7. Komponent list βββββββββββββββββββββββββββββββββββββββββββββββββββββ
@@ -577,7 +578,7 @@ func konstructOrkestra(kfg *konfig.Konfig, m *merger.Merger, ctx context.Context
// On OS signal (SIGTERM/SIGINT) or fatal error, calls Stop() in reverse.
// Graceful shutdown: drains queues before stopping workers.
o := ork.NewOrkestra(
- kfg.Cluster().ShutdownGracePeriod,
+ kfg.Katalog().ShutdownGracePeriod,
kfg.Ork().LogLevel,
)
o.Register(komponents)
diff --git a/docs/profiles/autoscale-profile.md b/docs/profiles/autoscale-profile.md
index d60f2dcb..590ca44f 100644
--- a/docs/profiles/autoscale-profile.md
+++ b/docs/profiles/autoscale-profile.md
@@ -3,7 +3,14 @@
An autoscale profile is a named preset that expands into a complete `operatorBox.autoscale` block at Katalog load time.
-Autoscale profiles are relative. They use the CRD's declared `workers` and `queue.maxQueueDepth` as a **baseline** and derive thresholds, overrides, and timing from it. The same profile behaves differently for a low-throughput operator with 2 workers than for a high-throughput operator with 20.
+Profiles are **relative**. They use the CRDβs declared `workers` and `queue.maxQueueDepth` as a **baseline**, then compute:
+
+- **override workers**
+- **override queueDepth**
+- **thresholds** (as a % of the override queueDepth)
+- **interval + cooldown**
+
+The same profile behaves differently for an operator with 2 workers than for one with 20.
---
@@ -11,17 +18,49 @@ Autoscale profiles are relative. They use the CRD's declared `workers` and `queu
| Profile | Trigger | Workers Override | Queue Override | Interval | Cooldown |
|---|---|---|---|---|---|
-| `burst` | queueDepth > baseline Γ 3 | baseline Γ 4 | baseline Γ 10 | 5s | 30s |
-| `steady` | queueDepth > baseline Γ 1.5 AND workers busy > 70% | baseline Γ 2 | baseline Γ 3 | 30s | 2m |
+| `burst` | queueDepth > 60% of override queueDepth | baseline Γ 4 | baseline Γ 10 | 5s | 30s |
+| `steady` | queueDepth > 40% of override queueDepth AND workersBusy > 70% | baseline Γ 2 | baseline Γ 3 | 30s | 2m |
| `batch` | cron window 23:00 β 02:00 | baseline Γ 3 | baseline Γ 8 | 60s | 5m |
| `latency-sensitive` | P95 reconcile > 200ms | βbaseline Γ 2.5β | β | 15s | 1m |
-| `cost-optimized` | workers idle > 60% | max(1, baseline Γ 0.5) | baseline Γ 0.5 | 30s | 10m |
+| `cost-optimized` | workersIdle > 60% AND queueDepth > 80% of override queueDepth | max(1, baseline Γ 0.5) | baseline Γ 0.5 | 30s | 10m |
---
-## Usage
+## How profiles compute values
+
+Given:
+
+- baseline workers = `W`
+- baseline queueDepth = `Q`
+- maxQueueDepth = `M`
+- profile multipliers = `WorkerMultiplier`, `QueueMultiplier`
+- threshold percentage = `QueueThresholdPct`
+
+Profiles compute:
+
+### **1. Override queueDepth**
+```
+overrideQueueDepth = Q Γ QueueMultiplier
+```
+
+### **2. Threshold**
+```
+threshold = overrideQueueDepth Γ QueueThresholdPct
+```
+
+### **3. Override workers**
+```
+overrideWorkers = W Γ WorkerMultiplier
+```
+
+### **4. Interval + Cooldown**
+Taken directly from the profile.
+
+This ensures scaling happens **before** hitting the effective queue limit.
-Set `profile` inside the `autoscale:` block under `operatorBox:`:
+---
+
+## Usage
```yaml
operatorBox:
@@ -32,20 +71,31 @@ operatorBox:
profile: steady
```
-The profile expands using `workers: 4` and `maxQueueDepth: 100` as the baseline.
+For `steady`, with baseline:
+
+- workers = 4
+- queueDepth = 100
+
+The profile expands to:
-For `steady`, that produces:
+- override workers = 4 Γ 2 = **8**
+- override queueDepth = 100 Γ 3 = **300**
+- threshold = 300 Γ 0.40 = **120**
+- interval = 30s
+- cooldown = 2m
-- Trigger: queueDepth > 150 AND workersBusyPercent > 70
-- Override: workers β 8, queueDepth β 300
-- Interval: 30s, Cooldown: 2m
+Trigger:
+
+```
+queueDepth > 120 AND workersBusyPercent > 70
+```
---
## Rules
**Profile or explicit β not both.**
-A `profile:` cannot coexist with manual `autoscale` fields (`interval:`, `cooldown:`, `conditions:`, `do:`) on the same CRD. Use one or the other.
+A `profile:` cannot coexist with manual autoscale fields (`interval:`, `cooldown:`, `conditions:`, `do:`).
```yaml
# Valid β profile only
@@ -79,37 +129,33 @@ An unrecognized profile name is a Katalog load error.
### `burst`
*React instantly to spikes.*
-Short intervals, aggressive overrides. Intended for operators that see sudden flood events β mass ingestion, fan-out operations, event storms.
+Aggressive scaling, short interval, short cooldown.
```
-trigger: queueDepth > workers Γ 3
+trigger: queueDepth > 60% of override queueDepth
override: workers Γ 4, queueDepth Γ 10
timing: interval 5s, cooldown 30s
```
-Reverts quickly once the queue drains. Use when bursts are short-lived.
-
---
### `steady`
*Smooth, predictable scaling.*
-Two-condition trigger prevents scaling on noise β both queue depth and worker saturation must rise together. Moderate overrides. Long cooldown.
+Requires both queue pressure and worker saturation.
```
-trigger: queueDepth > baseline Γ 1.5 AND workersBusyPercent > 70
+trigger: queueDepth > 40% of override queueDepth AND workersBusyPercent > 70
override: workers Γ 2, queueDepth Γ 3
timing: interval 30s, cooldown 2m
```
-Good default for operators that serve consistent API load.
-
---
### `batch`
*Scale for a nightly processing window.*
-Time-triggered via cron. Activates at 23:00, stays active for 3 hours, then reverts. No metric conditions.
+Time-triggered only.
```
trigger: cron "0 23 * * *", duration 3h
@@ -117,14 +163,12 @@ override: workers Γ 3, queueDepth Γ 8
timing: interval 60s, cooldown 5m
```
-Use for ETL operators, nightly report generators, scheduled cleanup jobs.
-
---
### `latency-sensitive`
*Keep reconcile latency low.*
-Triggers on P95 reconcile duration β not queue depth. Adds workers before the queue even builds. Only modifies worker count, not queue.
+Triggered by P95 latency, not queue depth.
```
trigger: reconcileDurationP95Ms > 200
@@ -132,37 +176,27 @@ override: workers Γ 2.5 (ceiling)
timing: interval 15s, cooldown 1m
```
-Use for operators where response time matters more than throughput β admission controllers, cert rotators, DNS operators.
-
---
### `cost-optimized`
*Minimize resource usage during low activity.*
-Inverted logic β triggers when workers are mostly idle. Reduces both workers and queue depth. Long cooldown prevents flapping.
+Triggers when idle and queue is low.
```
-trigger: workersIdlePercent > 60
+trigger: workersIdlePercent > 60 AND queueDepth > 80% of override queueDepth
override: max(1, workers Γ 0.5), queueDepth Γ 0.5
timing: interval 30s, cooldown 10m
```
-Use for operators that see highly variable load and you want to conserve resources during quiet periods.
-
---
## Choosing a profile
-Ask these questions:
-
-1. **Does this operator see sudden spikes?** β `burst`
-2. **Does this operator run a scheduled nightly job?** β `batch`
-3. **Does reconcile latency matter more than throughput?** β `latency-sensitive`
-4. **Is the load consistent and predictable?** β `steady`
-5. **Is the load low and I want to minimize footprint?** β `cost-optimized`
-
-If none fit cleanly, write the autoscale block manually. Profiles are shortcuts, not constraints.
-
----
+1. **Sudden spikes?** β `burst`
+2. **Consistent load?** β `steady`
+3. **Nightly batch window?** β `batch`
+4. **Latency matters?** β `latency-sensitive`
+5. **Save resources?** β `cost-optimized`
-**Next β** [Probe Profile](./probe-profile.md)
+If none fit, write the autoscale block manually.
\ No newline at end of file
diff --git a/examples/advanced/12-autoscale/01-based-on-own-metrics/load.sh b/examples/advanced/12-autoscale/01-based-on-own-metrics/load.sh
deleted file mode 100755
index de8e1f3c..00000000
--- a/examples/advanced/12-autoscale/01-based-on-own-metrics/load.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-for i in $(seq 1 200); do
- kubectl apply -f - <
Note:
+> You can override the default queue limit by setting `queue.maxQueueDepth`.
+
+---
+
+## Load the Ingestor
+
+Now letβs overload the operator with **150 Ingestor resources**.
+
+```bash
+./load.sh 150
+```
+
+Observe in the Control Center:
+
+- Queue depth rises
+- Workers stay busy
+- Once the queue reaches **100**, new items are **dropped** (default behaviour)
+
+### Expected logs
+
+```json
+{"level":"warn","key":"default/ingestor-123","gvk":"autoscale.orkestra.io/v1alpha1, Kind=Ingestor","limit":100,"depth":100,"message":"enqueue: queue depth limit reached β item dropped"}
+{"level":"warn","key":"default/ingestor-124","gvk":"autoscale.orkestra.io/v1alpha1, Kind=Ingestor","limit":100,"depth":100,"message":"enqueue: queue depth limit reached β item dropped"}
+{"level":"warn","key":"default/ingestor-125","gvk":"autoscale.orkestra.io/v1alpha1, Kind=Ingestor","limit":100,"depth":100,"message":"enqueue: queue depth limit reached β item dropped"}
+```
+
+Control Center will show:
+
+- **100/100**
+- **99/100**
+
+No errors β the operator simply drops excess events once the queue is full.
+
+This is **normal behaviour** for operators **without autoscale**.
+
+---
+
+## What You Learned
+
+- How Orkestra handles queue pressure without autoscaling
+- How workers behave under load
+- How the Control Center visualises queue depth and worker activity
+- Why items get dropped when queueDepth is exceeded
+
+In the **next example**, youβll enable autoscaling and see how Orkestra prevents dropped items by dynamically scaling workers and adjusting queueDepth.
+
+---
+
+## Cleanup
+
+```bash
+chmod +x cleanup.sh && ./cleanup.sh
+```
+
+> This will take a few seconds to drain the queue and finalize cleanup.
\ No newline at end of file
diff --git a/examples/advanced/12-autoscale/01-based-on-own-metrics/cleanup.sh b/examples/advanced/12-autoscale/01-without-autoscaler/cleanup.sh
similarity index 83%
rename from examples/advanced/12-autoscale/01-based-on-own-metrics/cleanup.sh
rename to examples/advanced/12-autoscale/01-without-autoscaler/cleanup.sh
index 3ff00d03..a15dd187 100755
--- a/examples/advanced/12-autoscale/01-based-on-own-metrics/cleanup.sh
+++ b/examples/advanced/12-autoscale/01-without-autoscaler/cleanup.sh
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
set -euo pipefail
-echo "Cleaning up 11-autoscale / 01-based-on-own-metrics..."
+echo "Cleaning up 11-autoscale / 01-without-autoscaler..."
# Delete any load CRs first
kubectl delete ingestors -l orkestra.load=true --ignore-not-found 2>/dev/null || true
diff --git a/examples/advanced/12-autoscale/01-based-on-own-metrics/cr.yaml b/examples/advanced/12-autoscale/01-without-autoscaler/cr.yaml
similarity index 100%
rename from examples/advanced/12-autoscale/01-based-on-own-metrics/cr.yaml
rename to examples/advanced/12-autoscale/01-without-autoscaler/cr.yaml
diff --git a/examples/advanced/12-autoscale/01-based-on-own-metrics/crd.yaml b/examples/advanced/12-autoscale/01-without-autoscaler/crd.yaml
similarity index 100%
rename from examples/advanced/12-autoscale/01-based-on-own-metrics/crd.yaml
rename to examples/advanced/12-autoscale/01-without-autoscaler/crd.yaml
diff --git a/examples/advanced/12-autoscale/01-without-autoscaler/katalog.yaml b/examples/advanced/12-autoscale/01-without-autoscaler/katalog.yaml
new file mode 100644
index 00000000..2e498f68
--- /dev/null
+++ b/examples/advanced/12-autoscale/01-without-autoscaler/katalog.yaml
@@ -0,0 +1,42 @@
+apiVersion: orkestra.orkspace.io/v1
+kind: Katalog
+metadata:
+ name: without-autoscaler
+ author: orkspace
+ version: 0.1.0
+ description: >
+ Same Ingestor pattern but without the autoscaler enabled.
+ Useful for demonstrating baseline behaviour before autoscaling
+ is introduced.
+
+spec:
+ crds:
+ ingestor:
+ apiTypes:
+ group: autoscale.orkestra.io
+ version: v1alpha1
+ kind: Ingestor
+ plural: ingestors
+
+ workers: 2
+ resync: 60s
+
+ operatorBox:
+ status:
+ fields:
+ - path: phase
+ value: "Running"
+ - path: autoscaleActive
+ value: "{{ .metrics.autoscaleActive }}"
+ - path: workers
+ value: "{{ .metrics.workers }}"
+
+ onCreate:
+ deployments:
+ - image: "{{ .spec.image }}"
+ replicas: "{{ .spec.replicas }}"
+ port: 8080
+ reconcile: true
+ services:
+ - port: "8080"
+ reconcile: true
diff --git a/examples/advanced/12-autoscale/01-without-autoscaler/load.sh b/examples/advanced/12-autoscale/01-without-autoscaler/load.sh
new file mode 100755
index 00000000..cb9adae8
--- /dev/null
+++ b/examples/advanced/12-autoscale/01-without-autoscaler/load.sh
@@ -0,0 +1,122 @@
+#!/usr/bin/env bash
+# load.sh β generic CR loader for autoscale testing.
+#
+# Usage:
+# ./load.sh
+# ./load.sh up
+# ./load.sh down
+#
+# Defaults:
+# CRD = ingestor
+#
+# Examples:
+# ./load.sh processor 200
+# ./load.sh loader up 150
+# ./load.sh auditor down 10
+# ./load.sh 100 # defaults to ingestor
+# ./load.sh down 0 # defaults to ingestor
+
+set -euo pipefail
+
+DEFAULT_CRD="ingestor"
+
+CRD="${1:-$DEFAULT_CRD}"
+CMD="${2:-up}"
+ARG="${3:-}"
+
+# Shorthand: ./load.sh 200
+if [[ "$CRD" =~ ^[0-9]+$ ]]; then
+ ARG="$CRD"
+ CRD="$DEFAULT_CRD"
+ CMD="up"
+fi
+
+# Shorthand: ./load.sh processor 200
+if [[ "$CMD" =~ ^[0-9]+$ ]]; then
+ ARG="$CMD"
+ CMD="up"
+fi
+
+# Valid CRDs
+VALID_CRDS=("ingestor" "loader" "processor" "auditor")
+
+is_valid_crd() {
+ local x
+ for x in "${VALID_CRDS[@]}"; do
+ [[ "$x" == "$1" ]] && return 0
+ done
+ return 1
+}
+
+print_help() {
+ echo "Usage:"
+ echo " $0 "
+ echo " $0 up "
+ echo " $0 down "
+ echo
+ echo "Valid CRDs: ${VALID_CRDS[*]}"
+ echo "(crd defaults to '${DEFAULT_CRD}')"
+}
+
+# Validate CRD
+if ! is_valid_crd "$CRD"; then
+ echo "β Unknown CRD '$CRD'"
+ print_help
+ exit 1
+fi
+
+# Validate command
+if [[ "$CMD" != "up" && "$CMD" != "down" ]]; then
+ echo "β Unknown command '$CMD'"
+ print_help
+ exit 1
+fi
+
+# Emit correct spec for each CRD
+emit_spec() {
+ case "$1" in
+ loader|processor|ingestor)
+ cat < Note:
+> You can override the default queue limit by setting `queue.maxQueueDepth`.
+
+---
+
+## Load the Ingestor
+
+Now letβs overload the operator with **150 Ingestor resources**.
+
+```bash
+./load.sh 150
+```
+
+Observe in the Control Center:
+
+- Queue depth rises
+- Workers become fully busy
+- Once the queue stays above **80** for the `autoscale.interval` (15s), autoscaling activates
+- **Autoscaler Baseline** switches to **Autoscaler Override Active**
+- Youβll see:
+ - queueDepth β **500**
+ - workers β **8**
+ - resync β **10s**
+ - **no items dropped**
+
+In the *Worker Pool* section:
+
+```
+8 of 8 workers actively processing (scaled from 2)
+```
+
+### Expected logs
+
+```json
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Ingestor","workers":8,"message":"autoscaler: worker pool resized"}
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Ingestor","queueDepth":500,"message":"autoscaler: queue depth limit updated"}
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Ingestor","resync":10000,"message":"autoscaler: resync interval updated"}
+{"level":"info","crd":"Ingestor","workers":8,"queueDepth":500,"resync":10000,"message":"autoscaler: override applied"}
+```
+
+---
+
+## Reduce the Load
+
+Bring the load down to 50:
+
+```bash
+./load.sh down 50
+```
+
+What happens:
+
+- Queue depth begins to fall
+- Workers remain at **8** until the queue stays below **80%** for the entire `autoscale.cooldown` (2 minutes)
+- After cooldown, autoscaler restores the baseline
+
+### Expected logs
+
+```json
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Ingestor","workers":2,"message":"autoscaler: worker pool resized"}
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Ingestor","queueDepth":0,"message":"autoscaler: queue depth limit updated"}
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Ingestor","resync":60000,"message":"autoscaler: baseline restored"}
+```
+
+This is how Orkestra performs **perβOperatorBox autoscaling**:
+no restart, no redeployment β everything happens live at runtime based on your declarative katalog.
+
+---
+
+## Cleanup
+
+```bash
+chmod +x cleanup.sh && ./cleanup.sh
+```
\ No newline at end of file
diff --git a/examples/advanced/12-autoscale/02-based-on-own-metrics/cleanup.sh b/examples/advanced/12-autoscale/02-based-on-own-metrics/cleanup.sh
new file mode 100755
index 00000000..ad6aeee5
--- /dev/null
+++ b/examples/advanced/12-autoscale/02-based-on-own-metrics/cleanup.sh
@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+set -euo pipefail
+echo "Cleaning up 11-autoscale / 02-based-on-own-metrics..."
+
+# Delete any load CRs first
+kubectl delete ingestors -l orkestra.load=true --ignore-not-found 2>/dev/null || true
+
+kubectl delete -f cr.yaml --ignore-not-found
+kubectl delete -f crd.yaml --ignore-not-found
+
+echo "β Done. Stop 'ork run' with Ctrl+C if still running."
diff --git a/examples/advanced/12-autoscale/02-based-on-own-metrics/cr.yaml b/examples/advanced/12-autoscale/02-based-on-own-metrics/cr.yaml
new file mode 100644
index 00000000..19975ceb
--- /dev/null
+++ b/examples/advanced/12-autoscale/02-based-on-own-metrics/cr.yaml
@@ -0,0 +1,8 @@
+apiVersion: autoscale.orkestra.io/v1alpha1
+kind: Ingestor
+metadata:
+ name: my-ingestor
+ namespace: default
+spec:
+ image: nginx:stable-alpine
+ replicas: 1
diff --git a/examples/advanced/12-autoscale/02-based-on-own-metrics/crd.yaml b/examples/advanced/12-autoscale/02-based-on-own-metrics/crd.yaml
new file mode 100644
index 00000000..63ea856b
--- /dev/null
+++ b/examples/advanced/12-autoscale/02-based-on-own-metrics/crd.yaml
@@ -0,0 +1,43 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: ingestors.autoscale.orkestra.io
+spec:
+ group: autoscale.orkestra.io
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
+ subresources:
+ status: {}
+ additionalPrinterColumns:
+ - name: Image
+ type: string
+ jsonPath: .spec.image
+ - name: Phase
+ type: string
+ jsonPath: .status.phase
+ - name: Age
+ type: date
+ jsonPath: .metadata.creationTimestamp
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ required: [image]
+ properties:
+ image:
+ type: string
+ replicas:
+ type: integer
+ default: 1
+ status:
+ type: object
+ x-kubernetes-preserve-unknown-fields: true
+ names:
+ kind: Ingestor
+ plural: ingestors
+ singular: ingestor
+ scope: Namespaced
diff --git a/examples/advanced/12-autoscale/01-based-on-own-metrics/katalog.yaml b/examples/advanced/12-autoscale/02-based-on-own-metrics/katalog.yaml
similarity index 85%
rename from examples/advanced/12-autoscale/01-based-on-own-metrics/katalog.yaml
rename to examples/advanced/12-autoscale/02-based-on-own-metrics/katalog.yaml
index 4dfa07a1..89a655f4 100644
--- a/examples/advanced/12-autoscale/01-based-on-own-metrics/katalog.yaml
+++ b/examples/advanced/12-autoscale/02-based-on-own-metrics/katalog.yaml
@@ -2,6 +2,8 @@ apiVersion: orkestra.orkspace.io/v1
kind: Katalog
metadata:
name: autoscale-own-metrics
+ author: orkspace
+ version: 0.1.0
description: >
Autoscaling based on the CRD's own queue depth.
When the Ingestor's reconcile queue exceeds 80 items, Orkestra
@@ -37,6 +39,10 @@ spec:
fields:
- path: phase
value: "Running"
+ - path: autoscaleActive
+ value: "{{ .metrics.autoscaleActive }}"
+ - path: workers
+ value: "{{ .metrics.workers }}"
onCreate:
deployments:
diff --git a/examples/advanced/12-autoscale/02-based-on-own-metrics/load.sh b/examples/advanced/12-autoscale/02-based-on-own-metrics/load.sh
new file mode 100755
index 00000000..cb9adae8
--- /dev/null
+++ b/examples/advanced/12-autoscale/02-based-on-own-metrics/load.sh
@@ -0,0 +1,122 @@
+#!/usr/bin/env bash
+# load.sh β generic CR loader for autoscale testing.
+#
+# Usage:
+# ./load.sh
+# ./load.sh up
+# ./load.sh down
+#
+# Defaults:
+# CRD = ingestor
+#
+# Examples:
+# ./load.sh processor 200
+# ./load.sh loader up 150
+# ./load.sh auditor down 10
+# ./load.sh 100 # defaults to ingestor
+# ./load.sh down 0 # defaults to ingestor
+
+set -euo pipefail
+
+DEFAULT_CRD="ingestor"
+
+CRD="${1:-$DEFAULT_CRD}"
+CMD="${2:-up}"
+ARG="${3:-}"
+
+# Shorthand: ./load.sh 200
+if [[ "$CRD" =~ ^[0-9]+$ ]]; then
+ ARG="$CRD"
+ CRD="$DEFAULT_CRD"
+ CMD="up"
+fi
+
+# Shorthand: ./load.sh processor 200
+if [[ "$CMD" =~ ^[0-9]+$ ]]; then
+ ARG="$CMD"
+ CMD="up"
+fi
+
+# Valid CRDs
+VALID_CRDS=("ingestor" "loader" "processor" "auditor")
+
+is_valid_crd() {
+ local x
+ for x in "${VALID_CRDS[@]}"; do
+ [[ "$x" == "$1" ]] && return 0
+ done
+ return 1
+}
+
+print_help() {
+ echo "Usage:"
+ echo " $0 "
+ echo " $0 up "
+ echo " $0 down "
+ echo
+ echo "Valid CRDs: ${VALID_CRDS[*]}"
+ echo "(crd defaults to '${DEFAULT_CRD}')"
+}
+
+# Validate CRD
+if ! is_valid_crd "$CRD"; then
+ echo "β Unknown CRD '$CRD'"
+ print_help
+ exit 1
+fi
+
+# Validate command
+if [[ "$CMD" != "up" && "$CMD" != "down" ]]; then
+ echo "β Unknown command '$CMD'"
+ print_help
+ exit 1
+fi
+
+# Emit correct spec for each CRD
+emit_spec() {
+ case "$1" in
+ loader|processor|ingestor)
+ cat <.metrics.\*** to read sibling metrics in real time.
+- How **profileβbased autoscaling** differs from **explicit autoscaling**.
+- How downstream CRDs (Processor, Auditor) react differently to load on the upstream Loader.
+- How Orkestra performs **crossβoperator dependency scaling** without restarts, redeployments, or external metrics.
+- How the Control Center visualizes **baseline vs override** for multiple CRDs simultaneously.
+
+---
+
+## Prerequisites
+
+- Ork CLI
+- Kubernetes cluster (Kind works great)
+
+Install Ork CLI:
+
+```bash
+curl get.orkestra.sh | bash
+```
+
+Create a Kind cluster and run this example:
+
+```bash
+ork run -f katalog --dev
+```
+
+Start the Control Center:
+
+```bash
+ork control start
+```
+
+Visit: **http://localhost:8081**
+
+---
+
+## Run the Example
+
+### **1. Apply the CRDs**
+
+```bash
+kubectl apply -f crd-loader.yaml
+kubectl apply -f crd-processor.yaml
+kubectl apply -f crd-auditor.yaml
+```
+
+### **2. Apply the CRs**
+
+```bash
+kubectl apply -f cr-loader.yaml
+kubectl apply -f cr-processor.yaml
+kubectl apply -f cr-auditor.yaml
+```
+
+Watch in the Control Center as each CRD creates its Deployment/Service and begins reconciling.
+
+> **Note:**
+> Only **Loader** has *no autoscaler*.
+> Processor and Auditor both autoscale β but in *different ways*.
+
+---
+
+# Load the Processor (ProfileβBased Autoscale)
+
+```bash
+./load.sh processor up 100
+```
+
+Observe:
+
+- Processor queue grows
+- Processor autoscaler (profile: burst) activates
+- Workers scale from **2 β 8**
+- QueueDepth expands from **100 β 1000**
+- Resync tightens from **30s β 5s**
+- Loader and Auditor remain unaffected
+
+This demonstrates **profileβbased autoscaling on own metrics**.
+
+---
+
+# Load the Loader (CrossβCRD Autoscale)
+
+```bash
+./load.sh loader up 100
+```
+
+Observe:
+
+- Loader queue grows
+- Once queue hits **75+ for 30s**, Auditor autoscaler activates
+- Auditor workers scale **1 β 4**
+- Auditor resync tightens **60s β 15s**
+- Processor remains unaffected
+
+This demonstrates **crossβCRD autoscaling** using sibling metrics.
+
+---
+
+# Reduce the Load
+
+```bash
+./load.sh loader down 10
+```
+
+Notes:
+
+- Loader queue drains slowly β this is expected.
+- Auditor remains scaled up until Loader queue stays **below 75** for the full **5βminute cooldown**.
+- After cooldown, Auditor returns to baseline (1 worker, 60s resync).
+
+This is **runtime dependency scaling** without any explicit `dependsOn` configuration.
+
+No restarts.
+No redeployments.
+No external metrics.
+All live, all inβprocess.
+
+---
+
+## Cleanup
+
+```bash
+chmod +x cleanup.sh && ./cleanup.sh
+```
diff --git a/examples/advanced/12-autoscale/03-sibling-in-binary/cleanup.sh b/examples/advanced/12-autoscale/03-sibling-in-binary/cleanup.sh
new file mode 100755
index 00000000..e0bf37e3
--- /dev/null
+++ b/examples/advanced/12-autoscale/03-sibling-in-binary/cleanup.sh
@@ -0,0 +1,16 @@
+#!/usr/bin/env bash
+set -euo pipefail
+echo "Cleaning up 11-autoscale / 03-sibling-in-Binary..."
+
+# Delete any load CRs first
+kubectl delete ingestors -l orkestra.load=true --ignore-not-found 2>/dev/null || true
+
+kubectl delete -f cr-loader.yaml --ignore-not-found
+kubectl delete -f cr-processor.yaml --ignore-not-found
+kubectl delete -f cr-auditor.yaml --ignore-not-found
+
+kubectl delete -f crd-loader.yaml --ignore-not-found
+kubectl delete -f crd-processor.yaml --ignore-not-found
+kubectl delete -f crd-auditor.yaml --ignore-not-found
+
+echo "β Done. Stop 'ork run' with Ctrl+C if still running."
diff --git a/examples/advanced/12-autoscale/03-sibling-in-binary/cr-auditor.yaml b/examples/advanced/12-autoscale/03-sibling-in-binary/cr-auditor.yaml
new file mode 100644
index 00000000..938ab99e
--- /dev/null
+++ b/examples/advanced/12-autoscale/03-sibling-in-binary/cr-auditor.yaml
@@ -0,0 +1,7 @@
+apiVersion: autoscale.orkestra.io/v1alpha1
+kind: Auditor
+metadata:
+ name: my-auditor
+ namespace: default
+spec:
+ auditMode: "standard"
diff --git a/examples/advanced/12-autoscale/03-sibling-in-binary/cr-loader.yaml b/examples/advanced/12-autoscale/03-sibling-in-binary/cr-loader.yaml
new file mode 100644
index 00000000..3a2b0680
--- /dev/null
+++ b/examples/advanced/12-autoscale/03-sibling-in-binary/cr-loader.yaml
@@ -0,0 +1,8 @@
+apiVersion: autoscale.orkestra.io/v1alpha1
+kind: Loader
+metadata:
+ name: my-loader
+ namespace: default
+spec:
+ image: nginx:stable-alpine
+ replicas: 1
\ No newline at end of file
diff --git a/examples/advanced/12-autoscale/03-sibling-in-binary/cr-processor.yaml b/examples/advanced/12-autoscale/03-sibling-in-binary/cr-processor.yaml
new file mode 100644
index 00000000..d8609c30
--- /dev/null
+++ b/examples/advanced/12-autoscale/03-sibling-in-binary/cr-processor.yaml
@@ -0,0 +1,8 @@
+apiVersion: autoscale.orkestra.io/v1alpha1
+kind: Processor
+metadata:
+ name: my-processor
+ namespace: default
+spec:
+ image: nginx:stable-alpine
+ replicas: 1
diff --git a/examples/advanced/12-autoscale/03-sibling-in-binary/crd-auditor.yaml b/examples/advanced/12-autoscale/03-sibling-in-binary/crd-auditor.yaml
new file mode 100644
index 00000000..b95fd158
--- /dev/null
+++ b/examples/advanced/12-autoscale/03-sibling-in-binary/crd-auditor.yaml
@@ -0,0 +1,43 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: auditors.autoscale.orkestra.io
+spec:
+ group: autoscale.orkestra.io
+ names:
+ kind: Auditor
+ plural: auditors
+ singular: auditor
+ scope: Namespaced
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
+ subresources:
+ status: {}
+ additionalPrinterColumns:
+ - name: AuditMode
+ type: string
+ jsonPath: .spec.auditMode
+ - name: Autoscale
+ type: string
+ jsonPath: .status.autoscaleActive
+ - name: Phase
+ type: string
+ jsonPath: .status.phase
+ - name: Age
+ type: date
+ jsonPath: .metadata.creationTimestamp
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ properties:
+ auditMode:
+ type: string
+ nullable: true
+ status:
+ type: object
+ x-kubernetes-preserve-unknown-fields: true
diff --git a/examples/advanced/12-autoscale/03-sibling-in-binary/crd-loader.yaml b/examples/advanced/12-autoscale/03-sibling-in-binary/crd-loader.yaml
new file mode 100644
index 00000000..2012b4a0
--- /dev/null
+++ b/examples/advanced/12-autoscale/03-sibling-in-binary/crd-loader.yaml
@@ -0,0 +1,46 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: loaders.autoscale.orkestra.io
+spec:
+ group: autoscale.orkestra.io
+ names:
+ kind: Loader
+ plural: loaders
+ singular: loader
+ scope: Namespaced
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
+ subresources:
+ status: {}
+ additionalPrinterColumns:
+ - name: Image
+ type: string
+ jsonPath: .spec.image
+ - name: Phase
+ type: string
+ jsonPath: .status.phase
+ - name: Age
+ type: date
+ jsonPath: .metadata.creationTimestamp
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ required: [image]
+ properties:
+ image:
+ type: string
+ replicas:
+ type: integer
+ default: 1
+ auditMode:
+ type: string
+ nullable: true
+ status:
+ type: object
+ x-kubernetes-preserve-unknown-fields: true
diff --git a/examples/advanced/12-autoscale/03-sibling-in-binary/crd-processor.yaml b/examples/advanced/12-autoscale/03-sibling-in-binary/crd-processor.yaml
new file mode 100644
index 00000000..10f6ba1b
--- /dev/null
+++ b/examples/advanced/12-autoscale/03-sibling-in-binary/crd-processor.yaml
@@ -0,0 +1,49 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: processors.autoscale.orkestra.io
+spec:
+ group: autoscale.orkestra.io
+ names:
+ kind: Processor
+ plural: processors
+ singular: processor
+ scope: Namespaced
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
+ subresources:
+ status: {}
+ additionalPrinterColumns:
+ - name: Image
+ type: string
+ jsonPath: .spec.image
+ - name: Workers
+ type: integer
+ jsonPath: .status.workers
+ - name: Autoscale
+ type: string
+ jsonPath: .status.autoscaleActive
+ - name: Phase
+ type: string
+ jsonPath: .status.phase
+ - name: Age
+ type: date
+ jsonPath: .metadata.creationTimestamp
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ required: [image]
+ properties:
+ image:
+ type: string
+ replicas:
+ type: integer
+ default: 1
+ status:
+ type: object
+ x-kubernetes-preserve-unknown-fields: true
diff --git a/examples/advanced/12-autoscale/02-sibling-in-binary/katalog.yaml b/examples/advanced/12-autoscale/03-sibling-in-binary/katalog.yaml
similarity index 85%
rename from examples/advanced/12-autoscale/02-sibling-in-binary/katalog.yaml
rename to examples/advanced/12-autoscale/03-sibling-in-binary/katalog.yaml
index 6f958276..bf20186f 100644
--- a/examples/advanced/12-autoscale/02-sibling-in-binary/katalog.yaml
+++ b/examples/advanced/12-autoscale/03-sibling-in-binary/katalog.yaml
@@ -2,14 +2,16 @@ apiVersion: orkestra.orkspace.io/v1
kind: Katalog
metadata:
name: autoscale-sibling
+ author: orkspace
+ version: 0.1.0
description: >
Cross-CRD autoscaling β two approaches in one binary.
Loader produces work. Two downstream CRDs consume it:
Processor β uses profile: burst. The profile expands into a complete
- autoscale config (4Γ workers, 10Γ queue depth, 5 s evaluation,
- 30 s cooldown). Orkestra derives the trigger from the profile's
+ autoscale config (4x workers, 10x queue depth, 5s evaluation,
+ 30s cooldown). Orkestra derives the trigger from the profile's
heuristics. You declare intent; Orkestra computes the parameters.
Auditor β uses explicit conditions. Watches Loader's live queue depth
@@ -50,17 +52,22 @@ spec:
- port: "8080"
# ββ Processor ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
- # Profile-based autoscale. Declare profile: burst and Orkestra does the rest.
+ # Profile-based autoscale. Declare profile: burst and Orkestra computes the
+ # full autoscale spec at katalog load time.
#
- # burst profile expands to:
- # workers: baseline Γ 4 (2 β 8)
- # queueDepth: baseline Γ 10 (100 β 1000)
- # interval: 5s
- # cooldown: 30s
- # trigger: metrics.queueDepth > threshold (computed from baseline)
+ # burst profile expands using baseline:
+ # workers: baseline Γ 4
+ # queueDepth: baseline Γ 10
#
- # Cannot be combined with interval/cooldown/conditions/do β
- # the profile owns those fields entirely.
+ # threshold is computed as:
+ # threshold = overrideQueueDepth Γ 0.60
+ #
+ # timing:
+ # interval: 5s
+ # cooldown: 30s
+ #
+ # Cannot be combined with interval/cooldown/conditions/do β the profile owns
+ # the entire autoscale block.
processor:
apiTypes:
group: autoscale.orkestra.io
@@ -114,7 +121,7 @@ spec:
conditions:
when:
- field: cross.loader.metrics.queueDepth
- greaterThan: "150"
+ greaterThan: "75"
do:
workers: 4
resync: 15s
@@ -125,6 +132,8 @@ spec:
value: "Running"
- path: autoscaleActive
value: "{{ .metrics.autoscaleActive }}"
+ - path: workers
+ value: "{{ .metrics.workers }}"
onCreate:
configMaps:
diff --git a/examples/advanced/12-autoscale/03-sibling-in-binary/load.sh b/examples/advanced/12-autoscale/03-sibling-in-binary/load.sh
new file mode 100755
index 00000000..cb9adae8
--- /dev/null
+++ b/examples/advanced/12-autoscale/03-sibling-in-binary/load.sh
@@ -0,0 +1,122 @@
+#!/usr/bin/env bash
+# load.sh β generic CR loader for autoscale testing.
+#
+# Usage:
+# ./load.sh
+# ./load.sh up
+# ./load.sh down
+#
+# Defaults:
+# CRD = ingestor
+#
+# Examples:
+# ./load.sh processor 200
+# ./load.sh loader up 150
+# ./load.sh auditor down 10
+# ./load.sh 100 # defaults to ingestor
+# ./load.sh down 0 # defaults to ingestor
+
+set -euo pipefail
+
+DEFAULT_CRD="ingestor"
+
+CRD="${1:-$DEFAULT_CRD}"
+CMD="${2:-up}"
+ARG="${3:-}"
+
+# Shorthand: ./load.sh 200
+if [[ "$CRD" =~ ^[0-9]+$ ]]; then
+ ARG="$CRD"
+ CRD="$DEFAULT_CRD"
+ CMD="up"
+fi
+
+# Shorthand: ./load.sh processor 200
+if [[ "$CMD" =~ ^[0-9]+$ ]]; then
+ ARG="$CMD"
+ CMD="up"
+fi
+
+# Valid CRDs
+VALID_CRDS=("ingestor" "loader" "processor" "auditor")
+
+is_valid_crd() {
+ local x
+ for x in "${VALID_CRDS[@]}"; do
+ [[ "$x" == "$1" ]] && return 0
+ done
+ return 1
+}
+
+print_help() {
+ echo "Usage:"
+ echo " $0 "
+ echo " $0 up "
+ echo " $0 down "
+ echo
+ echo "Valid CRDs: ${VALID_CRDS[*]}"
+ echo "(crd defaults to '${DEFAULT_CRD}')"
+}
+
+# Validate CRD
+if ! is_valid_crd "$CRD"; then
+ echo "β Unknown CRD '$CRD'"
+ print_help
+ exit 1
+fi
+
+# Validate command
+if [[ "$CMD" != "up" && "$CMD" != "down" ]]; then
+ echo "β Unknown command '$CMD'"
+ print_help
+ exit 1
+fi
+
+# Emit correct spec for each CRD
+emit_spec() {
+ case "$1" in
+ loader|processor|ingestor)
+ cat <.metrics.\*** to read sibling metrics across binaries
+- How **profileβbased autoscaling** differs from **explicit autoscaling**
+- How downstream CRDs (Processor, Auditor) react differently to upstream Loader pressure
+- How Orkestra performs **crossβruntime dependency scaling** with no restarts, no redeployments, and no external metrics
+- How the Control Center visualizes **baseline vs override** across multiple runtimes
+
+---
+
+## Prerequisites
+
+- Ork CLI
+- Kubernetes cluster (Kind recommended)
+
+Install Ork CLI:
+
+```bash
+curl get.orkestra.sh | bash
+```
+
+Create a Kind cluster and run the first runtime:
+
+```bash
+ORKESTRA_NAMESPACE=loader-system ork run -f katalog-loader.yml --dev
+```
+
+This:
+
+- creates a Kind cluster named `orkestra-playground`
+- runs Orkestra in namespace `loader-system`
+- exposes Control Center on port `8080`
+
+Run the second runtime:
+
+```bash
+ORKESTRA_NAMESPACE=processor-system ORKESTRA_PORT=8090 ork run -f katalog-processor.yaml
+```
+
+This runs a second Orkestra instance in `processor-system` on port `8090`.
+
+Start the Control Center:
+
+```bash
+ork control start -u localhost:8080,localhost:8090
+```
+
+This registers **both runtimes** at startup.
+
+Visit: **http://localhost:8081**
+
+---
+
+## Run the Example
+
+### 1. Apply the CRDs
+
+```bash
+kubectl apply -f crd-loader.yaml
+kubectl apply -f crd-processor.yaml
+```
+
+### 2. Apply the CRs
+
+```bash
+kubectl apply -f cr-loader.yaml
+kubectl apply -f cr-processor.yaml
+```
+
+In the Control Center, you will see two runtimes:
+
+- `loader-autoscale-sibling-in-cluster`
+- `processor-autoscale-sibling-in-cluster`
+
+Open each in separate tabs to watch:
+
+- Loader generating work
+- Processor autoscaling based on Loaderβs queue depth
+
+> **Note:**
+> Loader (in `loader-system`) has **no autoscaler**.
+> Processor (in `processor-system`) autoscale **based on Loaderβs queue depth** via `cross.loader.metrics.queueDepth`.
+
+---
+
+## Load the Loader (CrossβBinary Autoscale)
+
+```bash
+./load.sh loader up 100
+```
+
+Observe:
+
+- Loader queue grows
+- Once queue hits **60% of Processorβs override queueDepth** for the full **5s interval**, Processor autoscaler activates
+- Processor workers scale **2 β 8**
+- Processor queueDepth scales **100 β 1000**
+
+### Expected logs in `processor-system`
+
+```json
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Processor","workers":8,"message":"autoscaler: worker pool resized"}
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Processor","queueDepth":1000,"message":"autoscaler: queue depth limit updated"}
+```
+
+This is **crossβbinary autoscaling** using the standard Orkestra metrics API.
+
+### What Processor is reading:
+
+```json
+// curl -sSL http://localhost:8080/katalog/loader | jq .metrics
+{
+ "errorRatePercent": 0,
+ "queueDepth": 98,
+ "reconcileDurationP95Ms": 1244.18,
+ "workersBusyPercent": 100,
+ "workersIdlePercent": 0
+}
+```
+
+Processor sees Loaderβs queue depth **live**, across namespaces, across binaries.
+
+---
+
+## Reduce the Load
+
+```bash
+./load.sh loader down 10
+```
+
+Notes:
+
+- Loader queue drains gradually
+- Processor stays scaled up until Loader queue remains **below threshold** for the full **30s cooldown**
+- After cooldown, Processor returns to baseline (2 workers, queueDepth 100)
+
+### Expected logs
+
+```json
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Processor","workers":2,"message":"autoscaler: worker pool resized"}
+{"level":"info","crd":"autoscale.orkestra.io/v1alpha1, Kind=Processor","queueDepth":100,"message":"autoscaler: queue depth limit updated"}
+{"level":"info","crd":"Processor","workers":2,"queueDepth":100,"message":"autoscaler: baseline restored"}
+```
+
+This is **crossβruntime dependency scaling**:
+
+- No restarts
+- No redeployments
+- No external metrics
+- No custom code
+- Just declarative autoscaling
+
+---
+
+## Cleanup
+
+```bash
+chmod +x cleanup.sh && ./cleanup.sh
+```
diff --git a/examples/advanced/12-autoscale/04-sibling-in-cluster/cleanup.sh b/examples/advanced/12-autoscale/04-sibling-in-cluster/cleanup.sh
new file mode 100755
index 00000000..fba6e06c
--- /dev/null
+++ b/examples/advanced/12-autoscale/04-sibling-in-cluster/cleanup.sh
@@ -0,0 +1,14 @@
+#!/usr/bin/env bash
+set -euo pipefail
+echo "Cleaning up 11-autoscale / 04-sibling-in-Cluster..."
+
+# Delete any load CRs first
+kubectl delete ingestors -l orkestra.load=true --ignore-not-found 2>/dev/null || true
+
+kubectl delete -f cr-loader.yaml --ignore-not-found
+kubectl delete -f cr-processor.yaml --ignore-not-found
+
+kubectl delete -f crd-loader.yaml --ignore-not-found
+kubectl delete -f crd-processor.yaml --ignore-not-found
+
+echo "β Done. Stop 'ork run' with Ctrl+C if still running."
diff --git a/examples/advanced/12-autoscale/04-sibling-in-cluster/cr-loader.yaml b/examples/advanced/12-autoscale/04-sibling-in-cluster/cr-loader.yaml
new file mode 100644
index 00000000..3a2b0680
--- /dev/null
+++ b/examples/advanced/12-autoscale/04-sibling-in-cluster/cr-loader.yaml
@@ -0,0 +1,8 @@
+apiVersion: autoscale.orkestra.io/v1alpha1
+kind: Loader
+metadata:
+ name: my-loader
+ namespace: default
+spec:
+ image: nginx:stable-alpine
+ replicas: 1
\ No newline at end of file
diff --git a/examples/advanced/12-autoscale/04-sibling-in-cluster/cr-processor.yaml b/examples/advanced/12-autoscale/04-sibling-in-cluster/cr-processor.yaml
new file mode 100644
index 00000000..d8609c30
--- /dev/null
+++ b/examples/advanced/12-autoscale/04-sibling-in-cluster/cr-processor.yaml
@@ -0,0 +1,8 @@
+apiVersion: autoscale.orkestra.io/v1alpha1
+kind: Processor
+metadata:
+ name: my-processor
+ namespace: default
+spec:
+ image: nginx:stable-alpine
+ replicas: 1
diff --git a/examples/advanced/12-autoscale/04-sibling-in-cluster/crd-loader.yaml b/examples/advanced/12-autoscale/04-sibling-in-cluster/crd-loader.yaml
new file mode 100644
index 00000000..2012b4a0
--- /dev/null
+++ b/examples/advanced/12-autoscale/04-sibling-in-cluster/crd-loader.yaml
@@ -0,0 +1,46 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: loaders.autoscale.orkestra.io
+spec:
+ group: autoscale.orkestra.io
+ names:
+ kind: Loader
+ plural: loaders
+ singular: loader
+ scope: Namespaced
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
+ subresources:
+ status: {}
+ additionalPrinterColumns:
+ - name: Image
+ type: string
+ jsonPath: .spec.image
+ - name: Phase
+ type: string
+ jsonPath: .status.phase
+ - name: Age
+ type: date
+ jsonPath: .metadata.creationTimestamp
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ required: [image]
+ properties:
+ image:
+ type: string
+ replicas:
+ type: integer
+ default: 1
+ auditMode:
+ type: string
+ nullable: true
+ status:
+ type: object
+ x-kubernetes-preserve-unknown-fields: true
diff --git a/examples/advanced/12-autoscale/04-sibling-in-cluster/crd-processor.yaml b/examples/advanced/12-autoscale/04-sibling-in-cluster/crd-processor.yaml
new file mode 100644
index 00000000..10f6ba1b
--- /dev/null
+++ b/examples/advanced/12-autoscale/04-sibling-in-cluster/crd-processor.yaml
@@ -0,0 +1,49 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: processors.autoscale.orkestra.io
+spec:
+ group: autoscale.orkestra.io
+ names:
+ kind: Processor
+ plural: processors
+ singular: processor
+ scope: Namespaced
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
+ subresources:
+ status: {}
+ additionalPrinterColumns:
+ - name: Image
+ type: string
+ jsonPath: .spec.image
+ - name: Workers
+ type: integer
+ jsonPath: .status.workers
+ - name: Autoscale
+ type: string
+ jsonPath: .status.autoscaleActive
+ - name: Phase
+ type: string
+ jsonPath: .status.phase
+ - name: Age
+ type: date
+ jsonPath: .metadata.creationTimestamp
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ required: [image]
+ properties:
+ image:
+ type: string
+ replicas:
+ type: integer
+ default: 1
+ status:
+ type: object
+ x-kubernetes-preserve-unknown-fields: true
diff --git a/examples/advanced/12-autoscale/04-sibling-in-cluster/katalog-loader.yaml b/examples/advanced/12-autoscale/04-sibling-in-cluster/katalog-loader.yaml
new file mode 100644
index 00000000..206fb4f5
--- /dev/null
+++ b/examples/advanced/12-autoscale/04-sibling-in-cluster/katalog-loader.yaml
@@ -0,0 +1,40 @@
+apiVersion: orkestra.orkspace.io/v1
+kind: Katalog
+metadata:
+ name: loader-autoscale-sibling-in-cluster
+ author: orkspace
+ version: 0.1.0
+ description: >
+ Cross-binary autoscaling within the same cluster.
+
+ Loader runs in loader-system under its own Orkestra deployment.
+ Processor runs in processor-system under a separate Orkestra deployment.
+
+spec:
+ crds:
+ # ββ Loader (loader-system deployment) βββββββββββββββββββββββββββββββββββββ
+ # Fixed concurrency β this is the load source, not the scaling target.
+ loader:
+ apiTypes:
+ group: autoscale.orkestra.io
+ version: v1alpha1
+ kind: Loader
+ plural: loaders
+
+ workers: 3
+ resync: 60s
+
+ operatorBox:
+ status:
+ fields:
+ - path: phase
+ value: "Running"
+
+ onCreate:
+ deployments:
+ - image: "{{ .spec.image }}"
+ replicas: "{{ .spec.replicas }}"
+ port: 8080
+ reconcile: true
+ services:
+ - port: "8080"
diff --git a/examples/advanced/12-autoscale/03-sibling-in-cluster/katalog.yaml b/examples/advanced/12-autoscale/04-sibling-in-cluster/katalog-processor.yaml
similarity index 51%
rename from examples/advanced/12-autoscale/03-sibling-in-cluster/katalog.yaml
rename to examples/advanced/12-autoscale/04-sibling-in-cluster/katalog-processor.yaml
index fcb91520..a310f8be 100644
--- a/examples/advanced/12-autoscale/03-sibling-in-cluster/katalog.yaml
+++ b/examples/advanced/12-autoscale/04-sibling-in-cluster/katalog-processor.yaml
@@ -1,58 +1,23 @@
apiVersion: orkestra.orkspace.io/v1
kind: Katalog
metadata:
- name: autoscale-sibling-in-cluster
+ name: processor-autoscale-sibling-in-cluster
+ author: orkspace
+ version: 0.1.0
description: >
Cross-binary autoscaling within the same cluster.
- Loader runs in loader-system under its own Orkestra deployment.
- Processor runs in processor-system under a separate Orkestra deployment.
-
Processor autoscales based on Loader's live queue depth β but because
they are in different processes, Processor uses source.endpoint to fetch
the metric from Loader's Orkestra REST API instead of reading the
informer cache directly.
- Responses are cached for 10 s to avoid hammering on every evaluation
- interval. The autoscale interval is 20 s, so each evaluation reads a
+ Responses are cached for 10s to avoid hammering on every evaluation
+ interval. The autoscale interval is 20s, so each evaluation reads a
cached value at most twice stale β acceptable for burst-response scaling.
- Deploy order:
- 1. kubectl apply -f katalog.yaml -n loader-system
- 2. kubectl apply -f katalog.yaml -n processor-system
-
- Trigger load:
- make load # creates 100 Loader CRs to fill the queue
-
spec:
crds:
- # ββ Loader (loader-system deployment) βββββββββββββββββββββββββββββββββββββ
- # Fixed concurrency β this is the load source, not the scaling target.
- loader:
- apiTypes:
- group: autoscale.orkestra.io
- version: v1alpha1
- kind: Loader
- plural: loaders
-
- workers: 3
- resync: 60s
-
- operatorBox:
- status:
- fields:
- - path: phase
- value: "Running"
-
- onCreate:
- deployments:
- - image: "{{ .spec.image }}"
- replicas: "{{ .spec.replicas }}"
- port: 8080
- reconcile: true
- services:
- - port: "8080"
-
# ββ Processor (processor-system deployment) βββββββββββββββββββββββββββββββ
# Autoscales based on Loader's queue depth fetched via HTTP API.
# source.endpoint is required because Loader is in a different process.
@@ -67,15 +32,30 @@ spec:
resync: 30s
operatorBox:
+ cross:
+ # Monitors a particular Loader CR called 'my-loader'
+ - crd: loader
+ selector:
+ name: my-loader
+ source:
+ endpoint: "http://localhost:8080/katalog/loader/cr/default/my-loader"
+ as: loaderInfo
+
+ # Monitor the operator itself
+ - crd: loader
+ source:
+ endpoint: "http://localhost:8080/katalog/loader/health"
+ as: loaderHealth
autoscale:
interval: 20s
cooldown: 3m
conditions:
when:
- field: cross.loader.metrics.queueDepth
- greaterThan: "100"
+ greaterThan: "60"
source:
- endpoint: "http://orkestra.loader-system:8080/katalog/loader/metrics"
+ # endpoint: "http://orkestra-runtime.loader-system:8080/katalog/loader" # > if running in a pod
+ endpoint: "http://localhost:8080/katalog/loader"
cacheFor: 10s
do:
workers: 8
@@ -90,6 +70,22 @@ spec:
value: "{{ .metrics.autoscaleActive }}"
- path: workers
value: "{{ .metrics.workers }}"
+ - path: lastError
+ value: "{{ .health.lastError }}"
+ - path: state
+ value: "{{ .health.state }}"
+ - path: loaderQueueDepth
+ value: "{{ .cross.loaderInfo.metrics.queueDepth }}"
+ - path: loaderState
+ value: "{{ .cross.loaderInfo.status.phase }}"
+ - path: loaderHealthy
+ value: "{{ .cross.loaderHealth.healthy }}"
+ - path: loaderLastError
+ value: "{{ .cross.loaderHealth.lastError }}"
+ - path: loaderPhase
+ value: "{{ .cross.loaderHealth.state }}"
+ - path: loaderDeploymentReady
+ value: "{{ .cross.loaderInfo.children.deployment.ready }}"
onCreate:
deployments:
diff --git a/examples/advanced/12-autoscale/04-sibling-in-cluster/load.sh b/examples/advanced/12-autoscale/04-sibling-in-cluster/load.sh
new file mode 100755
index 00000000..cb9adae8
--- /dev/null
+++ b/examples/advanced/12-autoscale/04-sibling-in-cluster/load.sh
@@ -0,0 +1,122 @@
+#!/usr/bin/env bash
+# load.sh β generic CR loader for autoscale testing.
+#
+# Usage:
+# ./load.sh
+# ./load.sh up
+# ./load.sh down
+#
+# Defaults:
+# CRD = ingestor
+#
+# Examples:
+# ./load.sh processor 200
+# ./load.sh loader up 150
+# ./load.sh auditor down 10
+# ./load.sh 100 # defaults to ingestor
+# ./load.sh down 0 # defaults to ingestor
+
+set -euo pipefail
+
+DEFAULT_CRD="ingestor"
+
+CRD="${1:-$DEFAULT_CRD}"
+CMD="${2:-up}"
+ARG="${3:-}"
+
+# Shorthand: ./load.sh 200
+if [[ "$CRD" =~ ^[0-9]+$ ]]; then
+ ARG="$CRD"
+ CRD="$DEFAULT_CRD"
+ CMD="up"
+fi
+
+# Shorthand: ./load.sh processor 200
+if [[ "$CMD" =~ ^[0-9]+$ ]]; then
+ ARG="$CMD"
+ CMD="up"
+fi
+
+# Valid CRDs
+VALID_CRDS=("ingestor" "loader" "processor" "auditor")
+
+is_valid_crd() {
+ local x
+ for x in "${VALID_CRDS[@]}"; do
+ [[ "$x" == "$1" ]] && return 0
+ done
+ return 1
+}
+
+print_help() {
+ echo "Usage:"
+ echo " $0 "
+ echo " $0 up "
+ echo " $0 down "
+ echo
+ echo "Valid CRDs: ${VALID_CRDS[*]}"
+ echo "(crd defaults to '${DEFAULT_CRD}')"
+}
+
+# Validate CRD
+if ! is_valid_crd "$CRD"; then
+ echo "β Unknown CRD '$CRD'"
+ print_help
+ exit 1
+fi
+
+# Validate command
+if [[ "$CMD" != "up" && "$CMD" != "down" ]]; then
+ echo "β Unknown command '$CMD'"
+ print_help
+ exit 1
+fi
+
+# Emit correct spec for each CRD
+emit_spec() {
+ case "$1" in
+ loader|processor|ingestor)
+ cat < 60
//
-func expandCostOptimized(b orktypes.AutoscaleBaseline) *orktypes.AutoscaleSpec {
- workers := int(math.Max(1, float64(b.Workers)*0.5))
- queue := int(float64(b.QueueDepth) * 0.5)
+func expandCostOptimized(b orktypes.AutoscaleBaseline, cfg ProfileConfig) *orktypes.AutoscaleSpec {
+ workers := int(math.Max(1, float64(b.Workers)*cfg.WorkerMultiplier))
+ queue := int(float64(b.MaxQueueDepth) * cfg.QueueMultiplier)
+ threshold := int(float64(b.MaxQueueDepth) * cfg.QueueThresholdPct)
return &orktypes.AutoscaleSpec{
- Interval: orktypes.Duration{Duration: 30 * 1e9}, // 30s
- Cooldown: orktypes.Duration{Duration: 600 * 1e9}, // 10m
+ Interval: orktypes.Duration{Duration: cfg.Interval},
+ Cooldown: orktypes.Duration{Duration: cfg.Cooldown},
Conditions: orktypes.AutoscaleConditions{
When: []orktypes.Condition{
{
Field: "metrics.workersIdlePercent",
GreaterThan: "60",
},
+ {
+ Field: "metrics.queueDepth",
+ GreaterThan: fmt.Sprintf("%d", threshold),
+ },
},
},
Do: orktypes.AutoscaleAction{
diff --git a/pkg/katalog/autosclaer_profile_test.go b/pkg/katalog/autosclaer_profile_test.go
index 5a58961e..1e1f751c 100644
--- a/pkg/katalog/autosclaer_profile_test.go
+++ b/pkg/katalog/autosclaer_profile_test.go
@@ -10,9 +10,9 @@ import (
func baseline(workers, queue int, resync time.Duration) orktypes.AutoscaleBaseline {
return orktypes.AutoscaleBaseline{
- Workers: workers,
- QueueDepth: queue,
- Resync: resync,
+ Workers: workers,
+ MaxQueueDepth: queue,
+ Resync: resync,
}
}
diff --git a/pkg/katalog/validate.go b/pkg/katalog/validate.go
index 08f16f97..3753b572 100644
--- a/pkg/katalog/validate.go
+++ b/pkg/katalog/validate.go
@@ -226,12 +226,22 @@ func (k *Katalog) setDefaults(kfg *konfig.Konfig) error {
// Handle Resync
if crd.Resync == 0 {
- crd.Resync = kfg.Cluster().DefaultResync
+ crd.Resync = crd.SetResync(kfg.Katalog().DefaultResync)
}
// Handle Workers
if crd.Workers == 0 {
- crd.Workers = kfg.Cluster().DefaultWorkers
+ crd.Workers = crd.SetWorkers(kfg.Katalog().DefaultWorkers)
+ }
+
+ // Handle QueueDepth
+ if crd.Queue.MaxQueueDepth == 0 {
+ crd.Queue.MaxQueueDepth = crd.SetMaxQueueDepth(kfg.Katalog().DefaultMaxQueueDepth)
+ }
+
+ // Handle QueueDegradeThreshold
+ if crd.Queue.DegradeThreshold == 0 {
+ crd.Queue.DegradeThreshold = crd.SetMaxQueueDepth(kfg.Katalog().DefaultDegradeThreshold)
}
// Handle Notifications
diff --git a/pkg/katalog/validation_autoscale.go b/pkg/katalog/validation_autoscale.go
index 49a50fc8..991b267d 100644
--- a/pkg/katalog/validation_autoscale.go
+++ b/pkg/katalog/validation_autoscale.go
@@ -74,9 +74,9 @@ func (k *Katalog) validateAutoscaleProfile() error {
// Expand the profile into a fully-formed AutoscaleSpec using the CRD's
// declared workers and queue depth as the baseline.
baseline := orktypes.AutoscaleBaseline{
- Workers: crd.Workers,
- QueueDepth: crd.Queue.MaxQueueDepth,
- Resync: crd.Resync,
+ Workers: crd.Workers,
+ MaxQueueDepth: crd.Queue.MaxQueueDepth,
+ Resync: crd.Resync,
}
expanded, err := ApplyAutoscalerProfile(profile, baseline)
if err != nil {
diff --git a/pkg/konfig/konfig.go b/pkg/konfig/konfig.go
index 4c1b45bb..7044f784 100644
--- a/pkg/konfig/konfig.go
+++ b/pkg/konfig/konfig.go
@@ -25,12 +25,6 @@ func Init(filenames ...string) (*Konfig, error) {
MasterURL: GetStrEnv("MASTER_URL", ""),
Name: GetStrEnv("CLUSTER_NAME", "orkestra-cluster"),
Namespace: ns,
-
- // Workload
- DefaultResync: GetDurEnvSeconds("DEFAULT_RESYNC", 15),
- DefaultWorkers: GetIntEnv("DEFAULT_WORKERS", 3),
- ShutdownTimeout: GetDurEnvSeconds("SHUTDOWN_TIMEOUT", 30),
- ShutdownGracePeriod: GetDurEnvSeconds("SHUTDOWN_GRACE_PERIOD", 60),
},
// ββ Unified security configuration βββββββββββββββββββββββββββββββββββ
// ENV vars populate SecurityConfig as defaults.
@@ -126,6 +120,10 @@ func Init(filenames ...string) (*Konfig, error) {
DefaultMaxQueueDepth: GetIntEnv("MAX_QUEUE_DEPTH", 100),
DefaultDegradeThreshold: GetIntEnv("DEGRADE_THRESHOLD", 5),
Paths: GetStrSliceEnv("KATALOG_PATH", []string{}),
+ DefaultResync: GetDurEnvSeconds("DEFAULT_RESYNC", 15),
+ DefaultWorkers: GetIntEnv("DEFAULT_WORKERS", 3),
+ ShutdownTimeout: GetDurEnvSeconds("SHUTDOWN_TIMEOUT", 30),
+ ShutdownGracePeriod: GetDurEnvSeconds("SHUTDOWN_GRACE_PERIOD", 60),
},
}
diff --git a/pkg/konfig/type.go b/pkg/konfig/type.go
index c16c5125..005a6267 100644
--- a/pkg/konfig/type.go
+++ b/pkg/konfig/type.go
@@ -35,12 +35,6 @@ type clusterKonfig struct {
MasterURL string
Name string
Namespace string `validate:"required"`
-
- // Worload specific
- DefaultResync time.Duration
- DefaultWorkers int
- ShutdownTimeout time.Duration
- ShutdownGracePeriod time.Duration
}
type registryConfig struct {
@@ -139,6 +133,10 @@ type katalogKonfig struct {
Paths []string // Comma separated Paths to CRD katalog YAML file
DefaultMaxQueueDepth int
DefaultDegradeThreshold int `validate:"required"`
+ DefaultResync time.Duration
+ DefaultWorkers int
+ ShutdownTimeout time.Duration
+ ShutdownGracePeriod time.Duration
}
type konductorElection struct {
diff --git a/pkg/kordinator/crd_health.go b/pkg/kordinator/crd_health.go
index b748de1d..5a2cc0eb 100644
--- a/pkg/kordinator/crd_health.go
+++ b/pkg/kordinator/crd_health.go
@@ -2,6 +2,7 @@
package kordinator
import (
+ "net/http"
"sync"
"sync/atomic"
"time"
@@ -530,3 +531,85 @@ func (h *CRDHealth) GetDependencyStatuses() map[string]DependencyStatus {
}
return result
}
+
+// StateAndStatus returns the reconciler's derived health state and the
+// corresponding HTTP status code used by the /health endpoint.
+//
+// State is one of:
+//
+// "not started", "pending", "degraded", "healthy"
+//
+// Status is either:
+//
+// 200 β healthy or pending
+// 503 β degraded or not started
+func (h *CRDHealth) StateAndStatus() (string, int) {
+ if h == nil {
+ return "not started", http.StatusServiceUnavailable
+ }
+
+ isStarted := h.Started()
+ isPending := h.Pending()
+ isHealthy := h.IsHealthy()
+
+ switch {
+ case !isStarted && !isPending:
+ return "not started", http.StatusServiceUnavailable
+ case isPending:
+ return "pending", http.StatusOK
+ case isStarted && !isHealthy:
+ return "degraded", http.StatusServiceUnavailable
+ case isHealthy:
+ return "healthy", http.StatusOK
+ default:
+ return "pending", http.StatusOK
+ }
+}
+
+// HealthAsMap returns a snapshot of this CRD's health as a plain map, suitable
+// for injection into the template resolver under the "health" key.
+//
+// Available in conditions status.fields templates as:
+//
+// {{ .health.healthy }} β bool: reconciler is healthy
+// {{ .health.state }} β string: "healthy" / "degraded" / "pending" / "not started"
+// {{ .health.status }} β int: HTTP status code representing health state
+// {{ .health.started }} β bool: reconciler has started
+// {{ .health.pending }} β bool: reconciler is pending
+// {{ .health.degraded }} β bool: reconciler is degraded
+// {{ .health.totalReconciles }} β int64: total reconcile attempts
+// {{ .health.failedReconciles }} β int64: total failed reconciles
+// {{ .health.consecutiveFails }} β int64: current consecutive failure streak
+// {{ .health.errorRatePercent }} β float64: error rate as percentage
+// {{ .health.lastReconcile }} β string: RFC3339 timestamp of last reconcile
+// {{ .health.uptime }} β string: how long the reconciler has been running
+// {{ .health.lastError }} β string: most recent error message (empty if none)
+// {{ .health.rollbackActive }} β bool: rollback is currently blocking reconcile
+// {{ .health.rollbackTotal }} β int64: total rollback triggers since startup
+// {{ .health.hasUnhealthyDeps }} β bool: any dependency is unsatisfied
+func (h *CRDHealth) HealthAsMap() map[string]interface{} {
+ if h == nil {
+ return map[string]interface{}{}
+ }
+
+ state, status := h.StateAndStatus()
+
+ return map[string]interface{}{
+ "healthy": h.IsHealthy(),
+ "state": state,
+ "status": status,
+ "started": h.started.Load(),
+ "pending": h.pending.Load(),
+ "degraded": h.degraded.Load(),
+ "totalReconciles": h.totalReconciles.Load(),
+ "failedReconciles": h.failedReconciles.Load(),
+ "consecutiveFails": h.consecutiveFails.Load(),
+ "errorRatePercent": h.ErrorRatePercent(),
+ "lastReconcile": h.LastReconcile(),
+ "uptime": h.Uptime(),
+ "lastError": h.LastError(),
+ "rollbackActive": h.rollbackActive.Load(),
+ "rollbackTotal": h.rollbackTotal.Load(),
+ "hasUnhealthyDeps": h.hasUnhealthyDeps.Load(),
+ }
+}
diff --git a/pkg/kordinator/crd_health_handers.go b/pkg/kordinator/crd_health_handers.go
index 3f3f5372..e85951c4 100644
--- a/pkg/kordinator/crd_health_handers.go
+++ b/pkg/kordinator/crd_health_handers.go
@@ -23,6 +23,7 @@ import (
type CRDHealthResponse struct {
Name string `json:"name"`
State string `json:"state"` // "not started", "pending", "started", "healthy", "degraded"
+ Status int `json:"status"`
Healthy bool `json:"healthy"`
Started bool `json:"started"`
Pending bool `json:"pending"`
@@ -65,39 +66,16 @@ func BuildCRDHealthHandler(
h *CRDHealth,
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
- isStarted := h.Started()
- isPending := h.Pending()
- isHealthy := h.IsHealthy()
-
- var httpStatus int
- var state string
-
- switch {
- case !isStarted && !isPending:
- httpStatus = http.StatusServiceUnavailable
- state = "not started"
- case isPending:
- httpStatus = http.StatusOK
- state = "pending"
- case isStarted && !isHealthy:
- httpStatus = http.StatusServiceUnavailable
- state = "degraded"
- case isHealthy:
- httpStatus = http.StatusOK
- state = "healthy"
- default:
- httpStatus = http.StatusOK
- state = "pending"
- }
-
+ state, status := h.StateAndStatus()
v := resolveCRDDisplayValues(crd, kfg, inf)
response := CRDHealthResponse{
Name: crd.Name,
State: state,
- Healthy: isHealthy,
- Started: isStarted,
- Pending: isPending,
+ Status: status,
+ Healthy: h.IsHealthy(),
+ Started: h.Started(),
+ Pending: h.Pending(),
StartedAt: h.StartedAt(),
Uptime: h.Uptime(),
QueueDepth: h.QueueDepth(crd.GVK().String()),
@@ -112,7 +90,7 @@ func BuildCRDHealthHandler(
Missing: h.IsMissing(),
}
- utils.WriteJSON(w, httpStatus, response)
+ utils.WriteJSON(w, status, response)
}
}
@@ -298,11 +276,47 @@ func BuildCRDInfoHandler(
// Generate RBAC info for this CRD
rbacInfo := generateRBACInfo(crd, v)
- autoMetrics := make(map[string]interface{})
- if crd.AutoscaleEnabled() {
- autoMetrics = h.GetAutoMetrics()
- } else {
- autoMetrics = nil
+ // Always expose live metrics so any CRD can be observed via cross.*.metrics.*
+ // by a sibling in a different binary β the observed CRD does not need to
+ // declare autoscale: itself. AutoMetrics is initialised for every reconciler.
+ autoMetrics := h.GetAutoMetrics()
+
+ // When autoscaler is enabled, use WorkerInfo as the authoritative source
+ // for all worker and queue fields β the legacy health counters only track
+ // the initial configured pool and go negative when extra workers spawn.
+ wi := h.GetWorkerInfo()
+ workers := v.workers
+ workersActive := h.GetActiveWorkers()
+ workersIdle := h.GetIdleWorkers()
+ workersProcessing := h.GetProcessingWorkers()
+ workersSource := v.workersSource
+ queueDepth := h.QueueDepth(crd.GVK().String())
+ maxQueueDepth := v.maxQueueDepth
+ maxQueueDepthSource := v.maxQueueDepthSource
+ resync := v.resync
+ resyncSource := v.resyncSource
+
+ autoscalerSource := "autoscaler"
+ if wi != nil {
+ // workers β wi is always authoritative: legacy counters go negative when
+ // autoscale spawns extra goroutines beyond the initial configured pool.
+ workers = wi.Configured
+ workersActive = int32(wi.Effective)
+ workersIdle = int32(wi.Idle)
+ workersProcessing = int32(wi.InFlight)
+ queueDepth = int(wi.QueueDepth)
+
+ // queue limit and resync β only override when autoscaler is actively
+ // applying an override. Baseline values come from resolveCRDDisplayValues
+ // which includes the konfig default fallback (e.g. 100 queue depth).
+ if wi.OverrideActive {
+ maxQueueDepth = int(wi.QueueDepthEffective)
+ resync = wi.ResyncEffective
+
+ maxQueueDepthSource = autoscalerSource
+ resyncSource = autoscalerSource
+ workersSource = autoscalerSource
+ }
}
response := CRDInfoResponse{
@@ -314,17 +328,17 @@ func BuildCRDInfoHandler(
Namespaced: crd.IsNamespaced(),
Namespace: crd.Namespace,
DependsOn: crd.DependsOn.Names(),
- Workers: v.workers,
- WorkersActive: h.GetActiveWorkers(),
- WorkersIdle: h.GetIdleWorkers(),
- WorkersProcessing: h.GetProcessingWorkers(),
+ Workers: workers,
+ WorkersActive: workersActive,
+ WorkersIdle: workersIdle,
+ WorkersProcessing: workersProcessing,
WorkerDetails: h.GetWorkerStates(),
- WorkersSource: v.workersSource,
- Resync: v.resync,
- ResyncSource: v.resyncSource,
- QueueDepth: h.QueueDepth(crd.GVK().String()),
- MaxQueueDepth: v.maxQueueDepth,
- MaxQueueDepthSource: v.maxQueueDepthSource,
+ WorkersSource: workersSource,
+ Resync: resync,
+ ResyncSource: resyncSource,
+ QueueDepth: queueDepth,
+ MaxQueueDepth: maxQueueDepth,
+ MaxQueueDepthSource: maxQueueDepthSource,
ResourceCount: v.resourceCount,
TotalReconciles: h.TotalReconciles(),
OperatorBox: operatorBoxInfoStruct(crd),
@@ -334,7 +348,7 @@ func BuildCRDInfoHandler(
ErrorRate: h.ErrorRatePercent(),
RBAC: rbacInfo,
AutoscalerEnabled: crd.AutoscaleEnabled(),
- AutoscalerWorkers: h.GetWorkerInfo(),
+ AutoscalerWorkers: wi,
Metrics: autoMetrics,
}
@@ -843,14 +857,14 @@ func resolveCRDDisplayValues(
resyncSource := "configured"
if crd.Resync == 0 {
resyncSource = "default"
- resync = kfg.Cluster().DefaultResync.String()
+ resync = kfg.Katalog().DefaultResync.String()
}
// Workers
workers := crd.Workers
workersSource := "configured"
if crd.Workers == 0 {
- workers = kfg.Cluster().DefaultWorkers
+ workers = kfg.Katalog().DefaultWorkers
workersSource = "default"
}
diff --git a/pkg/kordinator/dependency_kordinator.go b/pkg/kordinator/dependency_kordinator.go
index 59b96e87..d81ee929 100644
--- a/pkg/kordinator/dependency_kordinator.go
+++ b/pkg/kordinator/dependency_kordinator.go
@@ -451,7 +451,7 @@ type autoMetricsExporter interface {
// workerInfoProvider is a local interface for reading a live WorkerInfo snapshot
// from a reconciler. Used to populate the /katalog/{crd} handler response.
type workerInfoProvider interface {
- WorkerInfo(configuredWorkers, configuredQueueDepth int) *ork_autoscaler.WorkerInfo
+ WorkerInfo(configuredResync string, configuredWorkers, configuredQueueDepth int) *ork_autoscaler.WorkerInfo
}
// rollbackNotifierSetter is a local interface for injecting CRDHealth rollback
@@ -534,7 +534,7 @@ func (k *DependencyKordinator) startCRDWorkers(ctx context.Context, gvk string,
// live concurrency metrics without importing the reconciler package.
if wip, ok := rec.(workerInfoProvider); ok {
k.crdHealthMap[gvk].SetWorkerInfoFn(func() *ork_autoscaler.WorkerInfo {
- info := wip.WorkerInfo(workers, entry.CRD.Queue.MaxQueueDepth)
+ info := wip.WorkerInfo(entry.CRD.Resync.String(), workers, entry.CRD.Queue.MaxQueueDepth)
return info
})
}
diff --git a/pkg/orkestra-registry/template/resolver_data.go b/pkg/orkestra-registry/template/resolver_data.go
index 2eabdd13..4b5db395 100644
--- a/pkg/orkestra-registry/template/resolver_data.go
+++ b/pkg/orkestra-registry/template/resolver_data.go
@@ -13,7 +13,8 @@
// 3. resolver.WithItem(val, as) β adds .item / . for forEach loops
// 4. resolver.WithExternal(map) β adds .external..status / .body
// 5. resolver.WithCross(map) β adds .cross..status.*
-// 6. resolver.WithPrevious(map) β adds .previous.* (rollback path only)
+// 6. resolver.WithMetrics(map) β adds .metrics.queueDepth / .workers / .autoscaleActive β¦
+// 7. resolver.WithPrevious(map) β adds .previous.* (rollback path only)
//
// Each extension is a shallow copy of the previous resolver's data map
// with one new top-level key added. The template engine sees the full
@@ -34,6 +35,8 @@ package template
// .children.* β child resources, after WithChildren is called
// .external.* β HTTP call results, after WithExternal is called
// .cross.* β cross-CRD observations, after WithCross is called
+// .metrics.* β live operatorbox runtime metrics, after WithMetrics is called
+// .health.* β live operatorbox runtime health metrics, after WithHealth is called
// .item β current forEach item, after WithItem is called
// .previous.* β last successfully reconciled spec, after WithPrevious is called (rollback path only)
//
@@ -127,19 +130,19 @@ func (r *Resolver) WithItemAndValue(key interface{}, value interface{}, as strin
// Each call result is keyed by the call's name from the Katalog:
//
// external:
-// - name: health-check
+// - name: healthCheck
// url: "{{ .spec.serviceUrl }}/health"
//
// Results accessible in subsequent template expressions and when: conditions:
//
-// {{ .external.health-check.status }} β HTTP status code as string
-// {{ .external.health-check.body }} β response body (first 4KB)
-// {{ .external.health-check.error }} β error message if call failed
+// {{ .external.healthCheck.status }} β HTTP status code as string
+// {{ .external.healthCheck.body }} β response body (first 4KB)
+// {{ .external.healthCheck.error }} β error message if call failed
//
// Resource declarations that follow in runTemplateReconcile can gate on these:
//
// when:
-// - field: external.health-check.status
+// - field: external.healthCheck.status
// equals: "200"
func (r *Resolver) WithExternal(results map[string]interface{}) *Resolver {
if len(results) == 0 {
@@ -318,6 +321,71 @@ func (r *Resolver) WithPrevious(previous map[string]interface{}) *Resolver {
}
}
+// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
+// WithMetrics β live operatorbox runtime metrics injection
+// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
+
+// WithMetrics returns a new Resolver with live operatorbox runtime metrics
+// injected under the "metrics" key. Makes the following available in templates:
+//
+// {{ .metrics.queueDepth }} β current workqueue depth
+// {{ .metrics.workers }} β current effective worker count
+// {{ .metrics.autoscaleActive }} β "true"/"false" β override active
+// {{ .metrics.workersBusyPercent }} β worker utilisation %
+// {{ .metrics.workersIdlePercent }} β idle worker %
+// {{ .metrics.errorRatePercent }} β reconcile error rate %
+// {{ .metrics.reconcileDurationP95Ms }} β P95 reconcile latency (ms)
+//
+// Useful in status.fields to surface live runtime state into CR status.
+func (r *Resolver) WithMetrics(metrics map[string]interface{}) *Resolver {
+ if len(metrics) == 0 {
+ return r
+ }
+ newData := r.shallowCopy()
+ newData["metrics"] = metrics
+ return &Resolver{
+ data: newData,
+ ownerName: r.ownerName,
+ ownerNamespace: r.ownerNamespace,
+ }
+}
+
+// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
+// WithHealth β live operatorbox runtime health injection
+// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
+
+// WithHealth returns a new Resolver with live operatorbox runtime health
+// injected under the "health" key. Makes the following available in templates:
+//
+// {{ .health.healthy }} β boolean: overall health
+// {{ .health.state }} β "healthy" / "degraded" / "error"
+// {{ .health.started }} β runtime started flag
+// {{ .health.pending }} β pending startup
+// {{ .health.startedAt }} β RFC3339 timestamp
+// {{ .health.uptime }} β humanβreadable uptime
+// {{ .health.queueDepth }} β current queue depth
+// {{ .health.errorRate }} β reconcile error rate
+// {{ .health.consecutiveFails }} β consecutive reconcile failures
+// {{ .health.totalReconciles }} β total reconciles since start
+// {{ .health.resourceCount }} β number of managed CRs
+// {{ .health.lastError }} β last reconcile error (string)
+// {{ .health.lastReconcile }} β timestamp of last reconcile
+// {{ .health.hasUnhealthyDependencies }}β dependency health flag
+//
+// Useful in status.fields to surface live runtime health into CR status.
+func (r *Resolver) WithHealth(health map[string]interface{}) *Resolver {
+ if len(health) == 0 {
+ return r
+ }
+ newData := r.shallowCopy()
+ newData["health"] = health
+ return &Resolver{
+ data: newData,
+ ownerName: r.ownerName,
+ ownerNamespace: r.ownerNamespace,
+ }
+}
+
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// shallowCopy β internal helper
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
diff --git a/pkg/reconciler/generic.go b/pkg/reconciler/generic.go
index f1c00992..71c0e43e 100644
--- a/pkg/reconciler/generic.go
+++ b/pkg/reconciler/generic.go
@@ -61,12 +61,13 @@ import (
// because the informer cache always holds the correct underlying concrete type.
// See pkg/reconciler/ptr_hooks.go for the full design rationale.
type GenericReconciler[PTR domain.Object] struct {
- katalogRegistry *kordinator.ResourceKatalog
- providerRegistry orktypes.ProviderRegistry
- providerStats providerStatsRecorder
- informer cache.SharedIndexInformer
- event *event.Event
- kube *kubeclient.Kubeclient
+ katalogRegistry *kordinator.ResourceKatalog
+ crdHealthRegistry map[string]*kordinator.CRDHealth
+ providerRegistry orktypes.ProviderRegistry
+ providerStats providerStatsRecorder
+ informer cache.SharedIndexInformer
+ event *event.Event
+ kube *kubeclient.Kubeclient
// hooks holds type-erased, domain.Object-parameterized callbacks built at
// construction time from the user's ReconcileHooks[PTR]. Stored as
// ObjectHooks rather than ReconcileHooks[PTR] so the reconciler remains
@@ -134,6 +135,7 @@ func NewGenericReconciler[PTR domain.Object](
anyHooks domain.AnyReconcileHooks,
newObj func() PTR,
katalogRegistry *kordinator.ResourceKatalog,
+ crdHealthRegistry map[string]*kordinator.CRDHealth,
providerRegistry orktypes.ProviderRegistry,
providerStats providerStatsRecorder,
) *GenericReconciler[PTR] {
@@ -164,26 +166,27 @@ func NewGenericReconciler[PTR domain.Object](
autoMet := autoscaler.NewAutoMetrics(sem)
r := &GenericReconciler[PTR]{
- katalogRegistry: katalogRegistry,
- providerRegistry: providerRegistry,
- providerStats: providerStats,
- crd: crd,
- operatorBox: crd.OperatorBox,
- informer: informer,
- event: ev,
- kube: kube,
- hooks: hooks,
- newObj: newObj,
- workerSem: sem,
- autoMetrics: autoMet,
- rollbackHistory: make(map[string]*rollbackFailureHistory),
+ katalogRegistry: katalogRegistry,
+ crdHealthRegistry: crdHealthRegistry,
+ providerRegistry: providerRegistry,
+ providerStats: providerStats,
+ crd: crd,
+ operatorBox: crd.OperatorBox,
+ informer: informer,
+ event: ev,
+ kube: kube,
+ hooks: hooks,
+ newObj: newObj,
+ workerSem: sem,
+ autoMetrics: autoMet,
+ rollbackHistory: make(map[string]*rollbackFailureHistory),
}
if crd.AutoscaleEnabled() {
baseline := orktypes.AutoscaleBaseline{
- Workers: workers,
- QueueDepth: crd.Queue.MaxQueueDepth,
- Resync: crd.Resync,
+ Workers: workers,
+ MaxQueueDepth: crd.Queue.MaxQueueDepth,
+ Resync: crd.Resync,
}
r.autoscaler = autoscaler.NewAutoscaler(
crd.APITypes.Kind,
@@ -484,11 +487,33 @@ func (r *GenericReconciler[PTR]) reconcileImpl(ctx context.Context, resolver *or
r.clearFailureHistory(obj.GetNamespace() + "/" + obj.GetName())
}
+ // Inject live runtime metrics into the resolver so status.fields templates
+ // can reference .metrics.queueDepth, .metrics.workers, .metrics.autoscaleActive, etc.
+ metricsMap := r.autoMetrics.AsMap()
+ if r.autoscaler != nil {
+ if snap := r.autoscaler.Snapshot(); snap != nil {
+ metricsMap["autoscaleActive"] = snap.OverrideActive
+ }
+ } else {
+ metricsMap["autoscaleActive"] = false
+ }
+ resolver = resolver.WithMetrics(metricsMap)
+
+ // Inject live runtime health into the resolver so status.fields templates
+ // can reference .health.healthy, .health.state, .health.uptime,
+ // .health.totalReconciles, .health.lastError, etc.
+ //
+ // This surfaces the operatorbox health endpoint directly into templates,
+ // enabling CR status fields to show live reconcile health, uptime,
+ // dependency health, and error information without any API calls.
+ if h, ok := r.crdHealthRegistry[r.crd.GVK().String()]; ok {
+ resolver = resolver.WithHealth(h.HealthAsMap())
+ }
+
// Always patch status β best-effort, never fails reconcile.
// Called with the outcome so Ready condition reflects reality.
// Must run before the error return so Ready=False is written on failure.
- // r.updatedPatchStatus(ctx, obj, err)
- r.patchStatusWithChildren(ctx, obj, resolver, err) // Layer 3: read children only on success β no point reading
+ r.patchStatusWithChildren(ctx, obj, resolver, err)
if err != nil {
logger.FromContext(ctx).Error().Err(err).
diff --git a/pkg/reconciler/generic_autoscale.go b/pkg/reconciler/generic_autoscale.go
index 938670e8..22c7ae68 100644
--- a/pkg/reconciler/generic_autoscale.go
+++ b/pkg/reconciler/generic_autoscale.go
@@ -101,7 +101,7 @@ func (r *GenericReconciler[PTR]) GetAutoMetrics() *autoscaler.AutoMetrics {
// WorkerInfo returns a live WorkerInfo snapshot for the /katalog/{crd} endpoint.
// configuredWorkers and configuredQueueDepth come from the CRD entry at startup.
-func (r *GenericReconciler[PTR]) WorkerInfo(configuredWorkers, configuredQueueDepth int) *autoscaler.WorkerInfo {
+func (r *GenericReconciler[PTR]) WorkerInfo(configuredResync string, configuredWorkers, configuredQueueDepth int) *autoscaler.WorkerInfo {
maxWorkers := configuredWorkers
if r.autoscaler != nil {
if snap := r.autoscaler.Snapshot(); snap != nil && snap.EffectiveWorkers > maxWorkers {
@@ -113,6 +113,7 @@ func (r *GenericReconciler[PTR]) WorkerInfo(configuredWorkers, configuredQueueDe
r.autoMetrics,
configuredWorkers,
configuredQueueDepth,
+ configuredResync,
maxWorkers,
r.autoscaler != nil,
r.autoscaler.Snapshot(),
diff --git a/pkg/reconciler/generic_registry.go b/pkg/reconciler/generic_registry.go
index 938cc4ee..0be96c29 100644
--- a/pkg/reconciler/generic_registry.go
+++ b/pkg/reconciler/generic_registry.go
@@ -31,3 +31,9 @@ type KatalogRegistry interface {
// key and value. Returns nil, false when no CRD matches.
GetInformerByLabel(key, value string) (cache.SharedIndexInformer, bool)
}
+
+// HealthProvider is the minimal interface GenericReconciler needs
+// to expose CRD health to templates without importing kordinator.
+type HealthProvider interface {
+ HealthAsMap() map[string]interface{}
+}
diff --git a/pkg/reconciler/ptr_hooks.go b/pkg/reconciler/ptr_hooks.go
index 5d5c84f5..ed91db78 100644
--- a/pkg/reconciler/ptr_hooks.go
+++ b/pkg/reconciler/ptr_hooks.go
@@ -7,7 +7,7 @@
// GenericReconciler uses the type parameter name PTR rather than the
// conventional T to make the pointer expectation visible at every call site.
// Kubernetes informers store and return pointer values: when you create an
-// informer for Database objects, every item retrieved from its cache is a
+// informer for Database objects for example, every item retrieved from its cache is a
// *Database. The reconciler type-asserts the raw interface{} value from the
// cache with raw.(PTR), which succeeds only when PTR is a pointer type.
//
diff --git a/pkg/reconciler/run_foreach.go b/pkg/reconciler/run_foreach.go
index 7309ecca..2689b272 100644
--- a/pkg/reconciler/run_foreach.go
+++ b/pkg/reconciler/run_foreach.go
@@ -781,6 +781,56 @@ func expandForEachPVs(
return result
}
+func expandForEachRoles(
+ resolver *orktmpl.Resolver,
+ srcs []orktypes.RoleTemplateSource,
+) []orktypes.RoleTemplateSource {
+ if !anyHasForEach(len(srcs), func(i int) *orktypes.ForEachSpec { return srcs[i].ForEach }) {
+ return srcs
+ }
+ var result []orktypes.RoleTemplateSource
+ for _, src := range srcs {
+ if src.ForEach == nil {
+ result = append(result, src)
+ continue
+ }
+ for i, fi := range resolveForEachItems(resolver.Data(), src.ForEach.Field) {
+ ir := itemResolver(resolver, fi, src.ForEach.As, i)
+ expanded := src
+ expanded.ForEach = nil
+ expanded.Name, _ = ir.Resolve(src.Name)
+ expanded.Namespace, _ = ir.Resolve(src.Namespace)
+ result = append(result, expanded)
+ }
+ }
+ return result
+}
+
+func expandForEachRoleBindings(
+ resolver *orktmpl.Resolver,
+ srcs []orktypes.RoleBindingTemplateSource,
+) []orktypes.RoleBindingTemplateSource {
+ if !anyHasForEach(len(srcs), func(i int) *orktypes.ForEachSpec { return srcs[i].ForEach }) {
+ return srcs
+ }
+ var result []orktypes.RoleBindingTemplateSource
+ for _, src := range srcs {
+ if src.ForEach == nil {
+ result = append(result, src)
+ continue
+ }
+ for i, fi := range resolveForEachItems(resolver.Data(), src.ForEach.Field) {
+ ir := itemResolver(resolver, fi, src.ForEach.As, i)
+ expanded := src
+ expanded.ForEach = nil
+ expanded.Name, _ = ir.Resolve(src.Name)
+ expanded.Namespace, _ = ir.Resolve(src.Namespace)
+ result = append(result, expanded)
+ }
+ }
+ return result
+}
+
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// Internal helpers
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@@ -854,66 +904,6 @@ func itemResolver(base *orktmpl.Resolver, fi forEachItem, as string, index int)
return base.WithItem(fi.key, as, index)
}
-// resolveListField is kept for backward compatibility with any callers outside this file.
-func resolveListField(data map[string]interface{}, path string) []interface{} {
- items := resolveForEachItems(data, path)
- result := make([]interface{}, len(items))
- for i, fi := range items {
- result[i] = fi.key
- }
- return result
-}
-
-func expandForEachRoles(
- resolver *orktmpl.Resolver,
- srcs []orktypes.RoleTemplateSource,
-) []orktypes.RoleTemplateSource {
- if !anyHasForEach(len(srcs), func(i int) *orktypes.ForEachSpec { return srcs[i].ForEach }) {
- return srcs
- }
- var result []orktypes.RoleTemplateSource
- for _, src := range srcs {
- if src.ForEach == nil {
- result = append(result, src)
- continue
- }
- for i, fi := range resolveForEachItems(resolver.Data(), src.ForEach.Field) {
- ir := itemResolver(resolver, fi, src.ForEach.As, i)
- expanded := src
- expanded.ForEach = nil
- expanded.Name, _ = ir.Resolve(src.Name)
- expanded.Namespace, _ = ir.Resolve(src.Namespace)
- result = append(result, expanded)
- }
- }
- return result
-}
-
-func expandForEachRoleBindings(
- resolver *orktmpl.Resolver,
- srcs []orktypes.RoleBindingTemplateSource,
-) []orktypes.RoleBindingTemplateSource {
- if !anyHasForEach(len(srcs), func(i int) *orktypes.ForEachSpec { return srcs[i].ForEach }) {
- return srcs
- }
- var result []orktypes.RoleBindingTemplateSource
- for _, src := range srcs {
- if src.ForEach == nil {
- result = append(result, src)
- continue
- }
- for i, fi := range resolveForEachItems(resolver.Data(), src.ForEach.Field) {
- ir := itemResolver(resolver, fi, src.ForEach.As, i)
- expanded := src
- expanded.ForEach = nil
- expanded.Name, _ = ir.Resolve(src.Name)
- expanded.Namespace, _ = ir.Resolve(src.Namespace)
- result = append(result, expanded)
- }
- }
- return result
-}
-
// splitFieldPath splits a dot-notation path into segments.
func splitFieldPath(path string) []string {
var parts []string
diff --git a/pkg/reconciler/run_template_reconcile.go b/pkg/reconciler/run_template_reconcile.go
index fedcf167..323b5f27 100644
--- a/pkg/reconciler/run_template_reconcile.go
+++ b/pkg/reconciler/run_template_reconcile.go
@@ -309,7 +309,9 @@ func (r *GenericReconciler[PTR]) readCross(
Msg("cross: no CRD matched label selector in registry")
}
}
- // Path 1b: name-based informer lookup (existing, unchanged)
+
+ notFoundInBianry := false
+ // Path 1b: name-based informer lookup
if decl.Crd != "" && r.katalogRegistry != nil {
inf, found := r.katalogRegistry.GetInformerByName(decl.Crd)
if found {
@@ -322,13 +324,10 @@ func (r *GenericReconciler[PTR]) readCross(
result[as] = data
continue
}
- log.Warn().
- Str("crd", decl.Crd).
- Str("as", as).
- Bool("registry_nil", registryNil).
- Msg("cross: no CRD matched name in registry")
+ notFoundInBianry = true
}
+ notFoundCrossBinary := false
// Path 2: HTTP endpoint fallback.
// For cross-binary or cross-cluster. Uses Orkestra's CR detail endpoint.
if decl.Source != nil && decl.Source.Endpoint != "" {
@@ -344,12 +343,21 @@ func (r *GenericReconciler[PTR]) readCross(
Msg("cross: read via HTTP endpoint")
continue
}
+ notFoundCrossBinary = true
log.Warn().
Str("crd", decl.Crd).
Str("endpoint", endpointURL).
Msg("cross: HTTP endpoint returned nil")
}
+ if notFoundInBianry && notFoundCrossBinary {
+ log.Warn().
+ Str("crd", decl.Crd).
+ Str("as", as).
+ Bool("registry_nil", registryNil).
+ Msg("cross: no CRD matched name in registry")
+ }
+
// Path 3: not found.
result[as] = map[string]interface{}{
"found": "false",
diff --git a/pkg/types/autoscale.go b/pkg/types/autoscale.go
index ec96da99..58ac18d8 100644
--- a/pkg/types/autoscale.go
+++ b/pkg/types/autoscale.go
@@ -111,9 +111,9 @@ type AutoscaleAction struct {
// AutoscaleBaseline captures the CRD's declared configuration before any
// autoscale override is applied. Always restored when conditions are false.
type AutoscaleBaseline struct {
- Workers int
- QueueDepth int
- Resync time.Duration
+ Workers int
+ MaxQueueDepth int
+ Resync time.Duration
}
// AutoscaleState tracks the runtime state of the autoscaler for one operatorbox.
diff --git a/pkg/types/conditions.go b/pkg/types/conditions.go
index 48e4667f..7092b085 100644
--- a/pkg/types/conditions.go
+++ b/pkg/types/conditions.go
@@ -106,7 +106,13 @@ type Condition struct {
// - field: cross.managed-database.metrics.queueDepth
// greaterThan: "500"
// source:
- // endpoint: "http://database-operator:8080/katalog/managed-database"
+ // host: "http://orkestra-database-operator:8080"
+ // crd: managed-database
+ // when:
+ // - field: cross.managed-database.metrics.queueDepth
+ // greaterThan: "500"
+ // source:
+ // endpoint: "http://non-orkestra-database-operator:8080/katalog/managed-database"
Source *CrossSource `yaml:"source,omitempty" json:"source,omitempty"`
}
diff --git a/pkg/types/cross.go b/pkg/types/cross.go
index 74f1022d..3cdfd10b 100644
--- a/pkg/types/cross.go
+++ b/pkg/types/cross.go
@@ -110,19 +110,42 @@ type CrossSelector struct {
LabelSelector string `yaml:"labelSelector,omitempty" json:"labelSelector,omitempty"`
}
-// CrossSource declares an HTTP fallback for cross-binary/cluster observation.
+// CrossSource declares how to fetch cross-binary/cluster data for a CRD.
+// If Endpoint is provided, it is used as-is (raw HTTP fetch).
+// If Host is provided, Orkestra constructs the URL based on Type.
+//
+// Supported Type values:
+// - "info" β /katalog//cr//
+// - "metrics" β /katalog/
+// - "health" β /katalog//health
+// - "events" β /katalog//cr///events
+//
// The endpoint must return the same JSON shape as the informer cache path β
// i.e., the Orkestra CR detail endpoint format.
+// Namespace is optional; defaults to the CR's namespace when omitted.
type CrossSource struct {
- // Endpoint is the URL to call when the informer is unavailable.
- // Template expressions supported.
- // endpoint: "http://database-operator:8080/katalog/database/cr/{{ .metadata.namespace }}/{{ .metadata.name }}"
- Endpoint string `yaml:"endpoint" json:"endpoint"`
+ // Endpoint is a fully-qualified URL. If set, Orkestra uses it directly
+ // and ignores Host/Type/Namespace. Template expressions supported.
+ Endpoint string `yaml:"endpoint,omitempty" json:"endpoint,omitempty"`
+
+ // Host is the base URL of a remote Orkestra runtime, e.g.:
+ // http://orkestra-runtime.loader-system:8080
+ // Combined with Type to build the final URL.
+ Host string `yaml:"host,omitempty" json:"host,omitempty"`
+
+ // Type selects which Orkestra-native endpoint to call.
+ // One of: "info", "metrics", "health", "events".
+ // Default: "info".
+ Type string `yaml:"type,omitempty" json:"type,omitempty"`
+
+ // Namespace overrides the CR namespace when building info/events URLs.
+ // Optional β defaults to the CR's own namespace.
+ Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"`
- // Token is a bearer token for the endpoint. $ENV_VAR syntax supported.
- Token string `yaml:"token,omitempty" json:"token,omitempty"`
+ // Token is a bearer token for the endpoint. $ENV_VAR syntax supported.
+ Token string `yaml:"token,omitempty" json:"token,omitempty"`
- // CacheFor is how long to cache the result before calling again.
- // Default: 30s β prevents hammering the endpoint on every resync.
- CacheFor string `yaml:"cacheFor,omitempty" json:"cacheFor,omitempty"`
+ // CacheFor controls how long to cache the result before calling again.
+ // Default: 30s β prevents hammering the endpoint on every evaluation.
+ CacheFor string `yaml:"cacheFor,omitempty" json:"cacheFor,omitempty"`
}
diff --git a/pkg/types/methods.go b/pkg/types/methods.go
index 0636b735..df678436 100644
--- a/pkg/types/methods.go
+++ b/pkg/types/methods.go
@@ -3,6 +3,7 @@ package types
import (
"fmt"
"strings"
+ "time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -65,6 +66,15 @@ func (c *CRDEntry) SetWorkers(def int) int {
return c.Workers
}
+// SetResync resolves the resync period for this CRD. If a perβCRD nonβzero value is
+// set, it is used; otherwise the global default resync is applied.
+func (c *CRDEntry) SetResync(def time.Duration) time.Duration {
+ if c.Resync != 0 {
+ return c.Resync
+ }
+ return def
+}
+
// IsDynamic determines whether this CRD should operate in dynamic mode.
// Resolution order (first match wins):
// 1. mode: dynamic explicitly declared β true