diff --git a/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/retry.go b/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/retry.go index cc543f4e1..07d026b55 100644 --- a/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/retry.go +++ b/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/retry.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sort" "time" "workspace-engine/pkg/oapi" "workspace-engine/pkg/workspace/releasemanager/policy/evaluator" @@ -14,7 +15,7 @@ import ( var _ evaluator.JobEvaluator = &RetryEvaluator{} // RetryEvaluator enforces retry limits for failed deployments based on job status. -// It counts previous job attempts for the same release that match the configured +// It counts previous consecutive job attempts for the same release that match the configured // retryable statuses and denies job creation if the limit is exceeded. // // When backoff is configured, it calculates the required wait time between retries @@ -75,8 +76,20 @@ func NewEvaluator(getters Getters, rule *oapi.RetryRule) evaluator.JobEvaluator } } +func (e *RetryEvaluator) getJobsForReleaseTargetSortedLatestFirst(releaseTarget *oapi.ReleaseTarget) []*oapi.Job { + jobsMap := e.getters.GetJobsForReleaseTarget(releaseTarget) + jobs := make([]*oapi.Job, 0, len(jobsMap)) + for _, job := range jobsMap { + jobs = append(jobs, job) + } + sort.Slice(jobs, func(i, j int) bool { + return jobs[i].CreatedAt.After(jobs[j].CreatedAt) + }) + return jobs +} + // Evaluate checks if the release has exceeded its retry limit or is in backoff period. -// Counts previous job attempts for this exact release that match the configured +// Counts previous consecutive job attempts for this exact release that match the configured // retryable statuses (e.g., failure, timeout). // // Returns: @@ -90,12 +103,12 @@ func (e *RetryEvaluator) Evaluate( releaseTarget := release.ReleaseTarget // Get all jobs for this release target - jobs := e.getters.GetJobsForReleaseTarget(&releaseTarget) + jobs := e.getJobsForReleaseTargetSortedLatestFirst(&releaseTarget) // Build a map of retryable statuses for efficient lookup retryableStatuses := e.buildRetryableStatusMap() - // Count previous attempts and find most recent retryable job + // Count previous consecutive attempts and find most recent retryable job attemptCount := 0 matchingJobIds := make([]string, 0) var mostRecentJob *oapi.Job @@ -104,13 +117,13 @@ func (e *RetryEvaluator) Evaluate( for _, job := range jobs { // Only count jobs for this exact release if job.ReleaseId != release.ID() { - continue + break } // Check if job status is retryable (or if all statuses count) isRetryable := retryableStatuses == nil || retryableStatuses[job.Status] if !isRetryable { - continue + break } attemptCount++ diff --git a/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/retry_test.go b/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/retry_test.go index 5bd73caa5..7e3950e8a 100644 --- a/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/retry_test.go +++ b/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/retry_test.go @@ -818,65 +818,218 @@ func TestRetryEvaluator_Backoff_OnlyForRetryableStatuses(t *testing.T) { } // ============================================================================= -// Edge Cases +// Newest-First Sort Order Tests (version flipping) // ============================================================================= -func TestRetryEvaluator_MixedJobStatuses(t *testing.T) { +func TestRetryEvaluator_VersionFlip_AllowsRedeployAfterDifferentRelease(t *testing.T) { + // When versions flip (v1 → v2 → v1), the retry evaluator must only count + // consecutive jobs for the CURRENT release from newest to oldest. + // Jobs for other releases in between should break the streak, resetting the count. st := setupStoreWithResource(t, "resource-1") ctx := context.Background() - release := createRelease("dep-1", "env-1", "resource-1", "v1", "v1.0.0") - if err := st.Releases.Upsert(ctx, release); err != nil { - t.Fatalf("Failed to upsert release: %v", err) + releaseV1 := createRelease("dep-1", "env-1", "resource-1", "v1", "v1.0.0") + releaseV2 := createRelease("dep-1", "env-1", "resource-1", "v2", "v2.0.0") + + if err := st.Releases.Upsert(ctx, releaseV1); err != nil { + t.Fatalf("Failed to upsert releaseV1: %v", err) + } + if err := st.Releases.Upsert(ctx, releaseV2); err != nil { + t.Fatalf("Failed to upsert releaseV2: %v", err) } - retryOnStatuses := []oapi.JobStatus{ - oapi.JobStatusFailure, - oapi.JobStatusInvalidJobAgent, + eval := NewEvaluatorFromStore(st, nil) + + // Job 1: v1 deployed successfully (oldest) + completedAt1 := time.Now().Add(-3 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-v1-first", + ReleaseId: releaseV1.ID(), + Status: oapi.JobStatusSuccessful, + CreatedAt: time.Now().Add(-4 * time.Hour), + CompletedAt: &completedAt1, + }) + + // Job 2: v2 deployed successfully (middle) + completedAt2 := time.Now().Add(-2 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-v2", + ReleaseId: releaseV2.ID(), + Status: oapi.JobStatusSuccessful, + CreatedAt: time.Now().Add(-150 * time.Minute), + CompletedAt: &completedAt2, + }) + + // Now we want to redeploy v1: the most recent job is for v2, + // so the consecutive count for v1 should be 0 → first attempt → allowed + result := eval.Evaluate(ctx, releaseV1) + assert.True(t, result.Allowed, "Should allow v1 redeploy after v2 was deployed in between") + assert.Contains(t, result.Message, "First attempt") +} + +func TestRetryEvaluator_VersionFlip_CountsOnlyLatestConsecutiveJobs(t *testing.T) { + // Verifies that with maxRetries=2, only the most recent consecutive jobs + // for the current release are counted, ignoring older jobs separated by + // a different release. + st := setupStoreWithResource(t, "resource-1") + ctx := context.Background() + + releaseV1 := createRelease("dep-1", "env-1", "resource-1", "v1", "v1.0.0") + releaseV2 := createRelease("dep-1", "env-1", "resource-1", "v2", "v2.0.0") + + if err := st.Releases.Upsert(ctx, releaseV1); err != nil { + t.Fatalf("Failed to upsert releaseV1: %v", err) } - rule := &oapi.RetryRule{ - MaxRetries: 2, - RetryOnStatuses: &retryOnStatuses, + if err := st.Releases.Upsert(ctx, releaseV2); err != nil { + t.Fatalf("Failed to upsert releaseV2: %v", err) } + + rule := &oapi.RetryRule{MaxRetries: 2} eval := NewEvaluatorFromStore(st, rule) - // Mix of jobs - jobs := []struct { - id string - status oapi.JobStatus - }{ - {"job-1", oapi.JobStatusFailure}, // Counts - {"job-2", oapi.JobStatusSuccessful}, // Doesn't count - {"job-3", oapi.JobStatusCancelled}, // Doesn't count - {"job-4", oapi.JobStatusInvalidJobAgent}, // Counts - {"job-5", oapi.JobStatusInProgress}, // Doesn't count - {"job-6", oapi.JobStatusPending}, // Doesn't count - } + // Old v1 job (should be ignored because v2 job separates it) + completedAt1 := time.Now().Add(-5 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-v1-old", + ReleaseId: releaseV1.ID(), + Status: oapi.JobStatusFailure, + CreatedAt: time.Now().Add(-6 * time.Hour), + CompletedAt: &completedAt1, + }) - for _, j := range jobs { - completedAt := time.Now() - st.Jobs.Upsert(ctx, &oapi.Job{ - Id: j.id, - ReleaseId: release.ID(), - Status: j.status, - CreatedAt: time.Now(), - CompletedAt: &completedAt, - }) + // v2 job in between + completedAt2 := time.Now().Add(-3 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-v2", + ReleaseId: releaseV2.ID(), + Status: oapi.JobStatusSuccessful, + CreatedAt: time.Now().Add(-4 * time.Hour), + CompletedAt: &completedAt2, + }) + + // Recent v1 job (only this one should count) + completedAt3 := time.Now().Add(-1 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-v1-recent", + ReleaseId: releaseV1.ID(), + Status: oapi.JobStatusFailure, + CreatedAt: time.Now().Add(-2 * time.Hour), + CompletedAt: &completedAt3, + }) + + result := eval.Evaluate(ctx, releaseV1) + assert.True(t, result.Allowed, "Should allow retry (only 1 consecutive attempt, max is 2)") + assert.Equal(t, 1, result.Details["attempt_count"]) +} + +func TestRetryEvaluator_VersionFlip_DeniesWhenConsecutiveExceedsLimit(t *testing.T) { + // After flipping back to v1, if consecutive v1 jobs exceed the retry limit, + // it should be denied. + st := setupStoreWithResource(t, "resource-1") + ctx := context.Background() + + releaseV1 := createRelease("dep-1", "env-1", "resource-1", "v1", "v1.0.0") + releaseV2 := createRelease("dep-1", "env-1", "resource-1", "v2", "v2.0.0") + + if err := st.Releases.Upsert(ctx, releaseV1); err != nil { + t.Fatalf("Failed to upsert releaseV1: %v", err) + } + if err := st.Releases.Upsert(ctx, releaseV2); err != nil { + t.Fatalf("Failed to upsert releaseV2: %v", err) } - result := eval.Evaluate(ctx, release) + rule := &oapi.RetryRule{MaxRetries: 1} + eval := NewEvaluatorFromStore(st, rule) + + // v2 job (old, will be skipped because newer v1 jobs come after) + completedAt1 := time.Now().Add(-5 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-v2", + ReleaseId: releaseV2.ID(), + Status: oapi.JobStatusSuccessful, + CreatedAt: time.Now().Add(-6 * time.Hour), + CompletedAt: &completedAt1, + }) + + // Two consecutive v1 jobs (most recent) + completedAt2 := time.Now().Add(-2 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-v1-a", + ReleaseId: releaseV1.ID(), + Status: oapi.JobStatusFailure, + CreatedAt: time.Now().Add(-3 * time.Hour), + CompletedAt: &completedAt2, + }) + + completedAt3 := time.Now().Add(-1 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-v1-b", + ReleaseId: releaseV1.ID(), + Status: oapi.JobStatusFailure, + CreatedAt: time.Now().Add(-90 * time.Minute), + CompletedAt: &completedAt3, + }) - // Only 2 jobs should count (failure + invalidJobAgent) - assert.True(t, result.Allowed, "Should allow (2 retryable <= 2 max)") + result := eval.Evaluate(ctx, releaseV1) + assert.False(t, result.Allowed, "Should deny (2 consecutive v1 attempts > maxRetries=1)") + assert.Contains(t, result.Message, "Retry limit exceeded") assert.Equal(t, 2, result.Details["attempt_count"]) +} - jobIds := result.Details["retryable_job_ids"].([]string) - assert.Contains(t, jobIds, "job-1") - assert.Contains(t, jobIds, "job-4") - assert.NotContains(t, jobIds, "job-2") // successful - assert.NotContains(t, jobIds, "job-5") // in progress +func TestRetryEvaluator_VersionFlip_MultipleFlips(t *testing.T) { + // Simulate multiple version flips: v1 → v2 → v1 → v2 + // Each flip should reset the consecutive count. + st := setupStoreWithResource(t, "resource-1") + ctx := context.Background() + + releaseV1 := createRelease("dep-1", "env-1", "resource-1", "v1", "v1.0.0") + releaseV2 := createRelease("dep-1", "env-1", "resource-1", "v2", "v2.0.0") + + if err := st.Releases.Upsert(ctx, releaseV1); err != nil { + t.Fatalf("Failed to upsert releaseV1: %v", err) + } + if err := st.Releases.Upsert(ctx, releaseV2); err != nil { + t.Fatalf("Failed to upsert releaseV2: %v", err) + } + + eval := NewEvaluatorFromStore(st, nil) + + // v1 → v2 → v1 → v2 (each successful) + completedAt1 := time.Now().Add(-4 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-1-v1", ReleaseId: releaseV1.ID(), Status: oapi.JobStatusSuccessful, + CreatedAt: time.Now().Add(-5 * time.Hour), CompletedAt: &completedAt1, + }) + completedAt2 := time.Now().Add(-3 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-2-v2", ReleaseId: releaseV2.ID(), Status: oapi.JobStatusSuccessful, + CreatedAt: time.Now().Add(-210 * time.Minute), CompletedAt: &completedAt2, + }) + completedAt3 := time.Now().Add(-2 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-3-v1", ReleaseId: releaseV1.ID(), Status: oapi.JobStatusSuccessful, + CreatedAt: time.Now().Add(-150 * time.Minute), CompletedAt: &completedAt3, + }) + completedAt4 := time.Now().Add(-1 * time.Hour) + st.Jobs.Upsert(ctx, &oapi.Job{ + Id: "job-4-v2", ReleaseId: releaseV2.ID(), Status: oapi.JobStatusSuccessful, + CreatedAt: time.Now().Add(-90 * time.Minute), CompletedAt: &completedAt4, + }) + + // Most recent is v2 → evaluating v1 should see 0 consecutive → first attempt + resultV1 := eval.Evaluate(ctx, releaseV1) + assert.True(t, resultV1.Allowed, "v1 should be allowed (most recent job is v2)") + assert.Contains(t, resultV1.Message, "First attempt") + + // Most recent is v2 → evaluating v2 should see 1 consecutive → denied (maxRetries=0) + resultV2 := eval.Evaluate(ctx, releaseV2) + assert.False(t, resultV2.Allowed, "v2 should be denied (most recent job matches, maxRetries=0)") } +// ============================================================================= +// Edge Cases +// ============================================================================= + func TestRetryEvaluator_NilStore_ReturnsNil(t *testing.T) { rule := &oapi.RetryRule{ MaxRetries: 3, diff --git a/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/smart_defaults_test.go b/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/smart_defaults_test.go index 256a390a6..709aa5461 100644 --- a/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/smart_defaults_test.go +++ b/apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/retry/smart_defaults_test.go @@ -160,7 +160,8 @@ func TestRetryEvaluator_SmartDefault_MixedStatuses(t *testing.T) { completedAt := time.Now() - // Add jobs in various states - only failure and invalidIntegration should count + // Non-retryable jobs (oldest) — these break the consecutive streak + // so they must be older than the retryable ones. st.Jobs.Upsert(ctx, &oapi.Job{ Id: "job-1-success", ReleaseId: release.ID(), @@ -178,17 +179,18 @@ func TestRetryEvaluator_SmartDefault_MixedStatuses(t *testing.T) { }) st.Jobs.Upsert(ctx, &oapi.Job{ - Id: "job-3-failure", + Id: "job-3-skipped", ReleaseId: release.ID(), - Status: oapi.JobStatusFailure, + Status: oapi.JobStatusSkipped, CreatedAt: time.Now().Add(-6 * time.Minute), CompletedAt: &completedAt, }) + // Retryable jobs (most recent consecutive) — only these count st.Jobs.Upsert(ctx, &oapi.Job{ - Id: "job-4-skipped", + Id: "job-4-failure", ReleaseId: release.ID(), - Status: oapi.JobStatusSkipped, + Status: oapi.JobStatusFailure, CreatedAt: time.Now().Add(-4 * time.Minute), CompletedAt: &completedAt, }) @@ -201,9 +203,9 @@ func TestRetryEvaluator_SmartDefault_MixedStatuses(t *testing.T) { CompletedAt: &completedAt, }) - // Should count: failure + invalidIntegration = 2 attempts + // Only the 2 most recent consecutive retryable jobs count (failure + invalidIntegration) result := evaluator.Evaluate(ctx, release) - assert.True(t, result.Allowed, "Should allow: 2 retryable jobs (failure + invalidIntegration) = 2/2 attempts") + assert.True(t, result.Allowed, "Should allow: 2 consecutive retryable jobs = 2/2 attempts") assert.Contains(t, result.Message, "2/2") // Add one more failure - should exceed diff --git a/apps/workspace-engine/test/e2e/engine_version_selector_test.go b/apps/workspace-engine/test/e2e/engine_version_selector_test.go new file mode 100644 index 000000000..a412bfa3d --- /dev/null +++ b/apps/workspace-engine/test/e2e/engine_version_selector_test.go @@ -0,0 +1,166 @@ +package e2e + +import ( + "context" + "testing" + "time" + "workspace-engine/pkg/events/handler" + "workspace-engine/pkg/oapi" + "workspace-engine/test/integration" + c "workspace-engine/test/integration/creators" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestEngine_VersionSelector_FlipppingBetweenVersions(t *testing.T) { + jobAgentID := uuid.New().String() + deploymentID := uuid.New().String() + environmentID := uuid.New().String() + resourceID := uuid.New().String() + policyID := uuid.New().String() + ruleID := uuid.New().String() + + v1SelectorString := "version.tag.startsWith('v1.')" + v2SelectorString := "version.tag.startsWith('v2.')" + + engine := integration.NewTestWorkspace(t, + integration.WithJobAgent( + integration.JobAgentID(jobAgentID), + ), + integration.WithSystem( + integration.WithDeployment( + integration.DeploymentID(deploymentID), + integration.DeploymentJobAgent(jobAgentID), + integration.DeploymentCelResourceSelector("true"), + ), + integration.WithEnvironment( + integration.EnvironmentID(environmentID), + integration.EnvironmentCelResourceSelector("true"), + ), + ), + integration.WithResource( + integration.ResourceID(resourceID), + ), + integration.WithPolicy( + integration.PolicyID(policyID), + integration.PolicyName("version-selector"), + integration.WithPolicySelector("true"), + integration.WithPolicyRule( + integration.PolicyRuleID(ruleID), + integration.WithRuleVersionSelector( + v1SelectorString, + ), + ), + ), + ) + + ctx := context.Background() + + version1 := c.NewDeploymentVersion() + version1.DeploymentId = deploymentID + version1.Tag = "v1.0.0" + engine.PushEvent(ctx, handler.DeploymentVersionCreate, version1) + + version2 := c.NewDeploymentVersion() + version2.DeploymentId = deploymentID + version2.Tag = "v2.0.0" + engine.PushEvent(ctx, handler.DeploymentVersionCreate, version2) + + jobs := engine.Workspace().Jobs().GetPending() + assert.Equal(t, 1, len(jobs), "Expected 1 job for v1.0.0") + + jobsSlice := make([]*oapi.Job, 0) + for _, job := range jobs { + jobsSlice = append(jobsSlice, job) + } + + job1 := jobsSlice[0] + assert.Equal(t, job1.DispatchContext.Version.Tag, "v1.0.0") + + now := time.Now() + engine.PushEvent(ctx, handler.JobUpdate, oapi.JobUpdateEvent{ + Id: &job1.Id, + Job: oapi.Job{ + Id: job1.Id, + Status: oapi.JobStatusSuccessful, + CompletedAt: &now, + }, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + }, + }) + + v2Selector := &oapi.Selector{} + _ = v2Selector.FromCelSelector(oapi.CelSelector{ + Cel: v2SelectorString, + }) + engine.PushEvent(ctx, handler.PolicyUpdate, oapi.Policy{ + Id: policyID, + Rules: []oapi.PolicyRule{ + { + Id: ruleID, + VersionSelector: &oapi.VersionSelectorRule{ + Selector: *v2Selector, + }, + }, + }, + Selector: "true", + Enabled: true, + }) + + jobs = engine.Workspace().Jobs().GetPending() + assert.Equal(t, 1, len(jobs), "Expected 1 job for v2.0.0") + + jobsSlice = make([]*oapi.Job, 0) + for _, job := range jobs { + jobsSlice = append(jobsSlice, job) + } + + job2 := jobsSlice[0] + assert.Equal(t, job2.DispatchContext.Version.Tag, "v2.0.0") + + nowPlus10Seconds := now.Add(10 * time.Second) + engine.PushEvent(ctx, handler.JobUpdate, oapi.JobUpdateEvent{ + Id: &job2.Id, + Job: oapi.Job{ + Id: job2.Id, + Status: oapi.JobStatusSuccessful, + CompletedAt: &nowPlus10Seconds, + }, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + }, + }) + + v1Selector := &oapi.Selector{} + _ = v1Selector.FromCelSelector(oapi.CelSelector{ + Cel: v1SelectorString, + }) + engine.PushEvent(ctx, handler.PolicyUpdate, oapi.Policy{ + Id: policyID, + Rules: []oapi.PolicyRule{ + { + Id: ruleID, + VersionSelector: &oapi.VersionSelectorRule{ + Selector: *v1Selector, + }, + }, + }, + Selector: "true", + Enabled: true, + }) + + jobs = engine.Workspace().Jobs().GetPending() + assert.Equal(t, 1, len(jobs), "Expected 1 job for v1.0.0") + + jobsSlice = make([]*oapi.Job, 0) + for _, job := range jobs { + jobsSlice = append(jobsSlice, job) + } + + job3 := jobsSlice[0] + assert.Equal(t, job3.DispatchContext.Version.Tag, "v1.0.0") +} diff --git a/apps/workspace-engine/test/integration/opts.go b/apps/workspace-engine/test/integration/opts.go index c6fa2d381..698664d97 100644 --- a/apps/workspace-engine/test/integration/opts.go +++ b/apps/workspace-engine/test/integration/opts.go @@ -881,6 +881,19 @@ func WithRuleVersionCooldown(intervalSeconds int32) PolicyRuleOption { } } +// ===== VersionSelectorRule Options ===== + +func WithRuleVersionSelector(cel string) PolicyRuleOption { + s := &oapi.Selector{} + _ = s.FromCelSelector(oapi.CelSelector{Cel: cel}) + return func(_ *TestWorkspace, r *oapi.PolicyRule) error { + r.VersionSelector = &oapi.VersionSelectorRule{ + Selector: *s, + } + return nil + } +} + // ===== RollbackRule Options ===== // WithRuleRollback configures a rollback rule that triggers rollback to the previous