Skip to content
Draft
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ testbin/*
/website/public

# /tmp
tmp/
tmp/

# coding agents
.pi
378 changes: 378 additions & 0 deletions docs/specs/version-cleanup-without-prometheus/spec.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
apiVersion: sme.sap.com/v1alpha1
kind: CAPApplicationVersion
metadata:
creationTimestamp: "2022-03-18T22:14:33Z"
generation: 1
annotations:
sme.sap.com/btp-app-identifier: btp-glo-acc-id.test-cap-01
sme.sap.com/owner-identifier: default.test-cap-01
labels:
sme.sap.com/btp-app-identifier-hash: f20cc8aeb2003b3abc33f749a16bd53544b6bab2
sme.sap.com/owner-generation: "2"
sme.sap.com/owner-identifier-hash: 1f74ae2fbff71a708786a4df4bb2ca87ec603581
name: test-cap-01-cav-v1
namespace: default
ownerReferences:
- apiVersion: sme.sap.com/v1alpha1
blockOwnerDeletion: true
controller: true
kind: CAPApplication
name: test-cap-01
uid: 3c7ba7cb-dc04-4fd1-be86-3eb3a5c64a98
resourceVersion: "11371108"
uid: 5e64489b-7346-4984-8617-e8c37338b3d8
finalizers:
- sme.sap.com/capapplicationversion
spec:
capApplicationInstance: test-cap-01
registrySecrets:
- regcred
version: 5.6.7
workloads:
- name: cap-backend-srv
consumedBTPServices:
- cap-uaa
- cap-service-manager
- cap-saas-registry
deploymentDefinition:
type: CAP
image: docker.image.repo/srv/server:latest
- name: content
consumedBTPServices:
- cap-uaa
jobDefinition:
type: Content
image: docker.image.repo/content/cap-content:latest
- name: mtx
consumedBTPServices:
- cap-uaa
- cap-service-manager
- cap-saas-registry
jobDefinition:
type: "TenantOperation"
image: docker.image.repo/srv/server:latest
status:
conditions:
- reason: WorkloadsReady
observedGeneration: 1
status: "True"
type: Ready
observedGeneration: 1
finishedJobs:
- test-cap-01-cav-v1-content
state: Ready
197 changes: 152 additions & 45 deletions internal/controller/version-monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"time"

promapi "github.com/prometheus/client_golang/api"
Expand Down Expand Up @@ -40,10 +41,98 @@ const (
CounterEvaluationExpression = "sum(rate(%s{job=\"%s\",namespace=\"%s\"}[%s]))"
)

// promClientProvider abstracts Prometheus API availability so that each
// evaluation cycle can obtain the current API client without making a network
// call on every iteration.
type promClientProvider interface {
// Get returns the current Prometheus API and true when available, or
// (nil, false) when the address is unset or the server is unreachable.
Get(ctx context.Context) (promv1.API, bool)
}

// noopPromClientProvider is used when PROMETHEUS_ADDRESS is not set.
// It always reports the client as unavailable.
type noopPromClientProvider struct{}

func (n *noopPromClientProvider) Get(_ context.Context) (promv1.API, bool) {
return nil, false
}

// cachedPromClientProvider holds a cached Prometheus API client and manages
// reachability probing with a configurable delay between probes.
type cachedPromClientProvider struct {
address string
retryDelay time.Duration

mu sync.Mutex
cached promv1.API // non-nil when the last probe succeeded
lastProbeAt time.Time // when the last probe (success or failure) was attempted
loggedError bool // true after we have logged the first construction/probe failure
}

// Get returns the cached Prometheus API when available. When unavailable it
// probes at most once per retryDelay window.
func (p *cachedPromClientProvider) Get(ctx context.Context) (promv1.API, bool) {
p.mu.Lock()
defer p.mu.Unlock()

// Fast path: already have a working client.
if p.cached != nil {
return p.cached, true
}

// Only probe if enough time has passed since the last attempt.
if time.Since(p.lastProbeAt) < p.retryDelay {
return nil, false
}

// Attempt to construct a client and verify reachability.
p.lastProbeAt = time.Now()

client, err := promapi.NewClient(promapi.Config{Address: p.address})
if err != nil {
if !p.loggedError {
klog.ErrorS(err, "could not create prometheus client", "address", p.address)
p.loggedError = true
}
return nil, false
}

api := promv1.NewAPI(client)
_, err = api.Runtimeinfo(ctx)
if err != nil {
if !p.loggedError {
klog.ErrorS(err, "could not fetch runtime info from prometheus server", "address", p.address)
p.loggedError = true
}
return nil, false
}

// Probe succeeded — cache the client and reset the error flag.
klog.InfoS("prometheus server is now reachable", "address", p.address)
p.cached = api
p.loggedError = false
return p.cached, true
}

// newPromClientProvider creates the appropriate promClientProvider based on
// whether a Prometheus address is configured.
func newPromClientProvider(mEnv *monitoringEnv) promClientProvider {
if mEnv.address == "" {
return &noopPromClientProvider{}
}
return &cachedPromClientProvider{
address: mEnv.address,
retryDelay: mEnv.acquireClientRetryDelay,
// lastProbeAt is zero so the first call triggers an immediate probe.
}
}

// cleanupOrchestrator holds the state needed by the version-cleanup routines.
type cleanupOrchestrator struct {
api promv1.API
queue workqueue.TypedRateLimitingInterface[NamespacedResourceKey]
mEnv *monitoringEnv
clientProvider promClientProvider
queue workqueue.TypedRateLimitingInterface[NamespacedResourceKey]
mEnv *monitoringEnv
}

type monitoringEnv struct {
Expand All @@ -52,12 +141,14 @@ type monitoringEnv struct {
evaluationInterval time.Duration
}

// parseMonitoringEnv always returns a non-nil *monitoringEnv.
// When PROMETHEUS_ADDRESS is unset or whitespace-only, address is set to the
// empty string; the duration fields are still populated from their respective
// env vars (with the same defaults as before).
func parseMonitoringEnv() *monitoringEnv {
promAdd := strings.TrimSpace(os.Getenv(EnvPrometheusAddress))
if promAdd == "" {
return nil
env := &monitoringEnv{
address: strings.TrimSpace(os.Getenv(EnvPrometheusAddress)),
}
env := &monitoringEnv{address: promAdd}

evalDurationEnv := func(envName string, fallback time.Duration) time.Duration {
if v, ok := os.LookupEnv(envName); ok && strings.TrimSpace(v) != "" {
Expand All @@ -73,39 +164,35 @@ func parseMonitoringEnv() *monitoringEnv {
return env
}

// startVersionCleanup starts the version-cleanup schedule and worker goroutines
// for the lifetime of ctx. It no longer short-circuits when there is no
// Prometheus address configured — workloads without deletionRules are still
// eligible for cleanup.
func (c *Controller) startVersionCleanup(ctx context.Context) {
mEnv := parseMonitoringEnv()
if mEnv == nil {
return // no prometheus address

if mEnv.address == "" {
klog.InfoS("PROMETHEUS_ADDRESS is not set; only versions without deletion rules will be cleaned up")
}

restartSignal := make(chan bool, 1)
setup := func() context.CancelFunc {
for {
o := initializeVersionCleanupOrchestrator(ctx, mEnv)
if o == nil {
select {
case <-ctx.Done():
return nil
case <-time.After(mEnv.acquireClientRetryDelay): // sleep a long time before attempting to setup the cleanup process
continue
}
}
child, cancelFn := context.WithCancel(ctx)
go func() {
<-child.Done()
o.queue.ShutDown()
}()
go c.scheduleVersionCollectionForCleanup(child, o, restartSignal)
go c.processVersionCleanupQueue(child, o, restartSignal)
return cancelFn
}
o := initializeVersionCleanupOrchestrator(ctx, mEnv)
child, cancelFn := context.WithCancel(ctx)
go func() {
<-child.Done()
o.queue.ShutDown()
}()
go c.scheduleVersionCollectionForCleanup(child, o, restartSignal)
go c.processVersionCleanupQueue(child, o, restartSignal)
return cancelFn
}

for {
cancel := setup()
select {
case <-ctx.Done():
cancel()
return
case <-restartSignal: // restart broken routines
cancel()
Expand All @@ -124,24 +211,15 @@ func recoverVersionCleanupRoutine(restart chan<- bool) {
}
}

// initializeVersionCleanupOrchestrator always returns a non-nil
// *cleanupOrchestrator. It sets up the work queue and the Prometheus client
// provider but does NOT block on, or return nil because of, Prometheus
// reachability.
func initializeVersionCleanupOrchestrator(ctx context.Context, mEnv *monitoringEnv) *cleanupOrchestrator {
promClient, err := promapi.NewClient(promapi.Config{Address: mEnv.address})
if err != nil {
klog.ErrorS(err, "could not create client", "address", mEnv.address)
return nil
}
v1api := promv1.NewAPI(promClient)
_, err = v1api.Runtimeinfo(ctx)
if err != nil {
klog.ErrorS(err, "could not fetch runtime info from prometheus server", "address", mEnv.address)
return nil
}

// create orchestrator
return &cleanupOrchestrator{
api: v1api,
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[NamespacedResourceKey]()),
mEnv: mEnv,
clientProvider: newPromClientProvider(mEnv),
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[NamespacedResourceKey]()),
mEnv: mEnv,
}
}

Expand Down Expand Up @@ -246,21 +324,33 @@ func (c *Controller) processVersionCleanupQueue(ctx context.Context, orc *cleanu
}
}

// processVersionCleanupQueueItem dequeues one item and evaluates it for
// cleanup. It obtains the current Prometheus API from the provider; if the
// provider reports unavailable, nil is passed to the evaluator so that
// workloads without deletionRules can still be cleaned up.
func (c *Controller) processVersionCleanupQueueItem(ctx context.Context, orc *cleanupOrchestrator) (stop bool) {
item, shutdown := orc.queue.Get()
if shutdown {
return true // stop processing
}
defer orc.queue.Done(item)

if c.evaluateVersionForCleanup(ctx, item, orc.api) != nil {
// Obtain the current Prometheus API — may be nil if unavailable.
api, _ := orc.clientProvider.Get(ctx)

if c.evaluateVersionForCleanup(ctx, item, api) != nil {
orc.queue.AddRateLimited(item)
} else {
orc.queue.Forget(item)
}
return false
}

// evaluateVersionForCleanup evaluates one CAPApplicationVersion for deletion
// eligibility. promapi may be nil when Prometheus is unavailable; in that
// case workloads that require deletionRules are considered not eligible (kept
// for re-evaluation next cycle) while workloads without deletionRules remain
// automatically eligible.
func (c *Controller) evaluateVersionForCleanup(ctx context.Context, item NamespacedResourceKey, promapi promv1.API) error {
lister := c.crdInformerFactory.Sme().V1alpha1().CAPApplicationVersions().Lister()
cav, err := lister.CAPApplicationVersions(item.Namespace).Get(item.Name)
Expand Down Expand Up @@ -295,11 +385,28 @@ func (c *Controller) evaluateVersionForCleanup(ctx context.Context, item Namespa
return nil
}

// evaluateWorkloadForCleanup decides whether a single workload is eligible for
// cleanup.
//
// - When the workload has no deletionRules, it is automatically eligible
// (returns true) regardless of Prometheus availability.
// - When deletionRules are present but promapi is nil (Prometheus
// unavailable), the workload is NOT eligible (returns false) and a
// single informational log entry is emitted.
// - When deletionRules are present and promapi is available, the rules are
// evaluated via the existing helpers.
func evaluateWorkloadForCleanup(ctx context.Context, cav NamespacedResourceKey, wl *v1alpha1.WorkloadDetails, promapi promv1.API) bool {
if wl.DeploymentDefinition == nil || wl.DeploymentDefinition.Monitoring == nil || wl.DeploymentDefinition.Monitoring.DeletionRules == nil {
return true // if there are no rules - the workload is automatically eligible for cleanup
}

// Deletion rules are present; we need Prometheus to evaluate them.
if promapi == nil {
klog.InfoS("prometheus client unavailable; workload with deletion rules cannot be evaluated and will be skipped",
"workload", wl.Name, "version", cav.Name)
return false
}

if wl.DeploymentDefinition.Monitoring.DeletionRules.ScalarExpression != nil { // evaluate provided expression
isRelevantForCleanup, err := evaluateExpression(ctx, *wl.DeploymentDefinition.Monitoring.DeletionRules.ScalarExpression, promapi)
if err != nil {
Expand Down
Loading
Loading