From 03fc12e1ebf51b9787ffba2a01e754f6d2e195c1 Mon Sep 17 00:00:00 2001 From: Graham Goh Date: Mon, 13 Apr 2026 18:10:49 +1000 Subject: [PATCH] feat(operations): add always-execute option and latest cache reuse Introduce WithForceExecute for operations to bypass previous successful report reuse when explicitly requested. Also ensure cached report lookup prefers the most recent successful match and add regression coverage plus docs updates. --- .changeset/gentle-moles-rule.md | 12 +++++ operations/doc.go | 19 +++++-- operations/execute.go | 91 ++++++++++++++++++++++++--------- operations/execute_test.go | 91 ++++++++++++++++++++++++++------- 4 files changed, 165 insertions(+), 48 deletions(-) create mode 100644 .changeset/gentle-moles-rule.md diff --git a/.changeset/gentle-moles-rule.md b/.changeset/gentle-moles-rule.md new file mode 100644 index 000000000..9685f0bb7 --- /dev/null +++ b/.changeset/gentle-moles-rule.md @@ -0,0 +1,12 @@ +--- +"chainlink-deployments-framework": minor +--- + +feat(operations-api): introduce `WithForceExecute` and new `ExecuteOperationN` options + +### Migration: `ExecuteOperationN` + +**Signature** + +- Before: `ExecuteOperationN(..., opts ...ExecuteOption[IN, DEP])` +- After: `ExecuteOperationN(..., opts ...ExecuteOperationNOption[IN, DEP])` diff --git a/operations/doc.go b/operations/doc.go index 4eba5e68a..5e6172226 100644 --- a/operations/doc.go +++ b/operations/doc.go @@ -26,6 +26,7 @@ Executor: - Executes operations with configurable retry policies - Handles operation failures and recovery strategies - Supports input hooks for dynamic parameter adjustment + - Operations reuse previous successful reports by default; ExecuteOperation accepts WithForceExecute to bypass that reuse Sequence: - Orchestrates multiple operations in dependency order @@ -39,14 +40,22 @@ Reporter: # Basic Usage - // Define an operation + // Define an operation. op := operations.NewOperation( - operations.OperationDef{ID: "deploy-contract", Version: "1.0.0"}, + "deploy-contract", + semver.MustParse("1.0.0"), + "deploy contract operation", handler, ) - // Execute the operation - bundle := operations.NewBundle(logger, reporter) - result, err := operations.ExecuteOperation(bundle, op, input, deps) + // Execute the operation. By default, previous successful reports are reused. + bundle := operations.NewBundle(context.Background, logger, reporter) + result, err := operations.ExecuteOperation(bundle, op, deps, input) + + // Force execution and ignore previous successful reports. + result, err = operations.ExecuteOperation(bundle, op, deps, input, operations.WithForceExecute[InputType, DepsType]()) + + // Execute a sequence. + _, err = operations.ExecuteSequence(bundle, sequence, deps, input) */ package operations diff --git a/operations/execute.go b/operations/execute.go index 070f08cc4..a951562b6 100644 --- a/operations/execute.go +++ b/operations/execute.go @@ -13,10 +13,20 @@ var ErrNotSerializable = errors.New("data cannot be safely written to disk witho // ExecuteConfig is the configuration for the ExecuteOperation function. type ExecuteConfig[IN, DEP any] struct { retryConfig RetryConfig[IN, DEP] + // forceExecute controls whether execution should skip execution when previous successful report is found (set by WithForceExecute). + forceExecute bool } type ExecuteOption[IN, DEP any] func(*ExecuteConfig[IN, DEP]) +// ExecuteOperationNConfig holds options for ExecuteOperationN. +type ExecuteOperationNConfig[IN, DEP any] struct { + retryConfig RetryConfig[IN, DEP] +} + +// ExecuteOperationNOption configures ExecuteOperationN. +type ExecuteOperationNOption[IN, DEP any] func(*ExecuteOperationNConfig[IN, DEP]) + type RetryConfig[IN, DEP any] struct { // Enabled determines if the retry is enabled for the operation. Enabled bool @@ -77,6 +87,35 @@ func WithRetryConfig[IN, DEP any](config RetryConfig[IN, DEP]) ExecuteOption[IN, } } +// WithForceExecute is an ExecuteOption that forces execution and ignores prior successful reports. +func WithForceExecute[IN, DEP any]() ExecuteOption[IN, DEP] { + return func(c *ExecuteConfig[IN, DEP]) { + c.forceExecute = true + } +} + +// WithOperationNRetry is an ExecuteOperationNOption that enables the default retry for each run in ExecuteOperationN. +func WithOperationNRetry[IN, DEP any]() ExecuteOperationNOption[IN, DEP] { + return func(c *ExecuteOperationNConfig[IN, DEP]) { + c.retryConfig.Enabled = true + } +} + +// WithOperationNRetryInput is an ExecuteOperationNOption that enables retry and an input transform on each retry attempt. +func WithOperationNRetryInput[IN, DEP any](inputHookFunc func(uint, error, IN, DEP) IN) ExecuteOperationNOption[IN, DEP] { + return func(c *ExecuteOperationNConfig[IN, DEP]) { + c.retryConfig.Enabled = true + c.retryConfig.InputHook = inputHookFunc + } +} + +// WithOperationNRetryConfig is an ExecuteOperationNOption that sets the retry configuration for ExecuteOperationN. +func WithOperationNRetryConfig[IN, DEP any](config RetryConfig[IN, DEP]) ExecuteOperationNOption[IN, DEP] { + return func(c *ExecuteOperationNConfig[IN, DEP]) { + c.retryConfig = config + } +} + // ExecuteOperation executes an operation with the given input and dependencies. // Execution will return the previous successful execution result and skip execution if there was a // previous successful run found in the Reports. @@ -105,25 +144,26 @@ func ExecuteOperation[IN, OUT, DEP any]( return Report[IN, OUT]{}, fmt.Errorf("operation %s input: %w", operation.def.ID, ErrNotSerializable) } - if previousReport, ok := loadPreviousSuccessfulReport[IN, OUT](b, operation.def, input); ok { - b.Logger.Infow("Operation already executed. Returning previous result", "id", operation.def.ID, - "version", operation.def.Version, "description", operation.def.Description) - - return previousReport, nil - } - executeConfig := &ExecuteConfig[IN, DEP]{ retryConfig: newDisabledRetryConfig[IN, DEP](), } for _, opt := range opts { opt(executeConfig) } + if !executeConfig.forceExecute { + if previousReport, ok := loadPreviousSuccessfulReport[IN, OUT](b, operation.def, input); ok { + b.Logger.Infow("Operation already executed. Returning previous result", "id", operation.def.ID, + "version", operation.def.Version, "description", operation.def.Description) + + return previousReport, nil + } + } var output OUT var err error if executeConfig.retryConfig.Enabled { - output, err = executeWithRetry(b, operation, deps, input, executeConfig) + output, err = executeWithRetry(b, operation, deps, input, executeConfig.retryConfig) } else { output, err = operation.execute(b, deps, input) } @@ -146,17 +186,24 @@ func ExecuteOperation[IN, OUT, DEP any]( // ExecuteOperationN executes the given operation multiple n times with the given input and dependencies. // Execution will return the previous successful execution results and skip execution if there were -// previous successful runs found in the Reports. +// previous successful runs found in the Reports. Options are ExecuteOperationNOption (retry only). // executionSeriesID is used to identify the multiple executions as a single unit. // It is important to use a unique executionSeriesID for different sets of multiple executions. func ExecuteOperationN[IN, OUT, DEP any]( b Bundle, operation *Operation[IN, OUT, DEP], deps DEP, input IN, seriesID string, n uint, - opts ...ExecuteOption[IN, DEP], + opts ...ExecuteOperationNOption[IN, DEP], ) ([]Report[IN, OUT], error) { if !IsSerializable(b.Logger, input) { return []Report[IN, OUT]{}, fmt.Errorf("operation %s input: %w", operation.def.ID, ErrNotSerializable) } + nConfig := &ExecuteOperationNConfig[IN, DEP]{ + retryConfig: newDisabledRetryConfig[IN, DEP](), + } + for _, opt := range opts { + opt(nConfig) + } + results, ok := loadSuccessfulExecutionSeriesReports[IN, OUT](b, operation.def, input, seriesID) resultsLen := uint(len(results)) if ok { @@ -175,20 +222,13 @@ func ExecuteOperationN[IN, OUT, DEP any]( "n", n, "remainingTimesToRun", remainingTimesToRun) - executeConfig := &ExecuteConfig[IN, DEP]{ - retryConfig: newDisabledRetryConfig[IN, DEP](), - } - for _, opt := range opts { - opt(executeConfig) - } - order := resultsLen for range remainingTimesToRun { var output OUT var err error - if executeConfig.retryConfig.Enabled { - output, err = executeWithRetry(b, operation, deps, input, executeConfig) + if nConfig.retryConfig.Enabled { + output, err = executeWithRetry(b, operation, deps, input, nConfig.retryConfig) } else { output, err = operation.execute(b, deps, input) } @@ -222,12 +262,12 @@ func executeWithRetry[IN, OUT, DEP any]( operation *Operation[IN, OUT, DEP], deps DEP, input IN, - executeConfig *ExecuteConfig[IN, DEP], + retryCfg RetryConfig[IN, DEP], ) (OUT, error) { var inputTemp = input // Generate the configurable options for the retry - retryOpts := executeConfig.retryConfig.Policy.options() + retryOpts := retryCfg.Policy.options() // Use the operation context in the retry retryOpts = append(retryOpts, retry.Context(b.GetContext())) // Append the retry logic which will log the retry and attempt to transform the input @@ -236,8 +276,8 @@ func executeWithRetry[IN, OUT, DEP any]( b.Logger.Infow("Operation failed. Retrying...", "operation", operation.def.ID, "attempt", attempt, "error", err) - if executeConfig.retryConfig.InputHook != nil { - inputTemp = executeConfig.retryConfig.InputHook(attempt, err, inputTemp, deps) + if retryCfg.InputHook != nil { + inputTemp = retryCfg.InputHook(attempt, err, inputTemp, deps) } })) @@ -355,7 +395,10 @@ func loadPreviousSuccessfulReport[IN, OUT any]( return Report[IN, OUT]{}, false } - for _, report := range prevReports { + // When multiple reports match the same operation input, we return the last matching report in + // GetReports order, which assumes newer reports are appended after older ones (typical for reporters). + for i := len(prevReports) - 1; i >= 0; i-- { + report := prevReports[i] // Check if operation/sequence was run previously and return the report if successful reportHash, err := constructUniqueHashFrom(b.reportHashCache, report.Def, report.Input) if err != nil { diff --git a/operations/execute_test.go b/operations/execute_test.go index a8a90acd5..b98bc51ab 100644 --- a/operations/execute_test.go +++ b/operations/execute_test.go @@ -178,6 +178,7 @@ func Test_ExecuteOperation_WithPreviousRun(t *testing.T) { require.Nil(t, res.Err) assert.Equal(t, 2, res.Output) assert.Equal(t, 1, handlerCalledTimes) + firstRunID := res.ID // rerun should return previous report res, err = ExecuteOperation(bundle, op, nil, 1) @@ -185,13 +186,31 @@ func Test_ExecuteOperation_WithPreviousRun(t *testing.T) { require.Nil(t, res.Err) assert.Equal(t, 2, res.Output) assert.Equal(t, 1, handlerCalledTimes) + assert.Equal(t, firstRunID, res.ID) + + // rerun with WithForceExecute should execute again and add a new report + res, err = ExecuteOperation(bundle, op, nil, 1, WithForceExecute[int, any]()) + require.NoError(t, err) + require.Nil(t, res.Err) + assert.Equal(t, 2, res.Output) + assert.Equal(t, 2, handlerCalledTimes) + forcedRunID := res.ID + assert.NotEqual(t, firstRunID, forcedRunID) + + // rerun without WithForceExecute should return the latest successful report + res, err = ExecuteOperation(bundle, op, nil, 1) + require.NoError(t, err) + require.Nil(t, res.Err) + assert.Equal(t, 2, res.Output) + assert.Equal(t, 2, handlerCalledTimes) + assert.Equal(t, forcedRunID, res.ID) // new run with different input, should perform execution res, err = ExecuteOperation(bundle, op, nil, 3) require.NoError(t, err) require.Nil(t, res.Err) assert.Equal(t, 4, res.Output) - assert.Equal(t, 2, handlerCalledTimes) + assert.Equal(t, 3, handlerCalledTimes) // new run with different op, should perform execution op = NewOperation("plus1-v2", semver.MustParse("2.0.0"), "test operation", handler) @@ -199,7 +218,7 @@ func Test_ExecuteOperation_WithPreviousRun(t *testing.T) { require.NoError(t, err) require.Nil(t, res.Err) assert.Equal(t, 2, res.Output) - assert.Equal(t, 3, handlerCalledTimes) + assert.Equal(t, 4, handlerCalledTimes) // new run with op that returns error res, err = ExecuteOperation(bundle, opWithError, nil, 1) @@ -216,6 +235,36 @@ func Test_ExecuteOperation_WithPreviousRun(t *testing.T) { assert.Equal(t, 2, handlerWithErrorCalledTimes) } +func Test_ExecuteOperation_WithPreviousRun_UsesMostRecentSuccessfulReport(t *testing.T) { + t.Parallel() + + handlerCalledTimes := 0 + op := NewOperation("plus1", semver.MustParse("1.0.0"), "test operation", + func(b Bundle, deps any, input int) (output int, err error) { + handlerCalledTimes++ + return input + 1, nil + }, + ) + + oldestSuccessful := NewReport(op.def, 1, 2, nil) + mostRecentSuccessful := NewReport(op.def, 1, 5, nil) + newestFailed := NewReport(op.def, 1, 0, errors.New("failed report should not be reused")) + + reporter := NewMemoryReporter(WithReports([]Report[any, any]{ + genericReport(oldestSuccessful), + genericReport(mostRecentSuccessful), + genericReport(newestFailed), + })) + bundle := NewBundle(t.Context, logger.Test(t), reporter) + + res, err := ExecuteOperation(bundle, op, nil, 1) + require.NoError(t, err) + require.Nil(t, res.Err) + assert.Equal(t, mostRecentSuccessful.ID, res.ID) + assert.Equal(t, 5, res.Output) + assert.Equal(t, 0, handlerCalledTimes) +} + func Test_ExecuteOperation_Unserializable_Data(t *testing.T) { t.Parallel() @@ -390,6 +439,7 @@ func Test_ExecuteSequence_WithPreviousRun(t *testing.T) { assert.Equal(t, 2, res.Output) assert.Len(t, res.ExecutionReports, 2) // 1 seq report + 1 op report assert.Equal(t, 1, handlerCalledTimes) + firstRunID := res.ID // rerun should return previous report res, err = ExecuteSequence(bundle, sequence, nil, 1) @@ -398,6 +448,7 @@ func Test_ExecuteSequence_WithPreviousRun(t *testing.T) { assert.Equal(t, 2, res.Output) assert.Len(t, res.ExecutionReports, 2) // 1 seq report + 1 op report assert.Equal(t, 1, handlerCalledTimes) + assert.Equal(t, firstRunID, res.ID) // new run with different input, should perform execution res, err = ExecuteSequence(bundle, sequence, nil, 3) @@ -814,16 +865,17 @@ func Test_ExecuteOperationN(t *testing.T) { t.Parallel() tests := []struct { - name string - n uint - seriesID string - setupReporter func() Reporter - options []ExecuteOption[int, any] - simulateOpError bool - input int - wantOpCalledTimes int - wantReportsCount int - wantErr string + name string + n uint + seriesID string + setupReporter func() Reporter + options []ExecuteOperationNOption[int, any] + simulateOpError bool + simulateRetryFailures bool // handler fails first attempts when testing WithOperationNRetry + input int + wantOpCalledTimes int + wantReportsCount int + wantErr string }{ { name: "execute operation multiple times", @@ -934,12 +986,13 @@ func Test_ExecuteOperationN(t *testing.T) { wantErr: "add report error", }, { - name: "with retry option", - n: 1, - seriesID: "test-multiple-6", - options: []ExecuteOption[int, any]{WithRetry[int, any]()}, - wantOpCalledTimes: 3, // 2 attempts with default retry - wantReportsCount: 1, + name: "with retry option", + n: 1, + seriesID: "test-multiple-6", + options: []ExecuteOperationNOption[int, any]{WithOperationNRetry[int, any]()}, + simulateRetryFailures: true, + wantOpCalledTimes: 3, // 3 attempts total: 2 failures + 1 success with default retry + wantReportsCount: 1, }, } @@ -960,7 +1013,7 @@ func Test_ExecuteOperationN(t *testing.T) { return 0, NewUnrecoverableError(errors.New("fatal error")) } - if failTimes > 0 && len(tt.options) > 0 { + if failTimes > 0 && tt.simulateRetryFailures { failTimes-- return 0, errors.New("test error") }