-
Notifications
You must be signed in to change notification settings - Fork 1
feat(task-runner): implement task output caching #247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,174 +1,58 @@ | ||
| # task-runner Specification | ||
| ## ADDED Requirements | ||
|
|
||
| ## Purpose | ||
| ### Requirement: Task Caching Configuration | ||
|
|
||
| TBD - created by archiving change add-external-task-cancellation. Update Purpose after archive. | ||
| The `TaskStep` interface SHALL support an optional `cache` property of type `TaskCacheConfig`. | ||
|
|
||
| ## Requirements | ||
| #### Scenario: Cache Config Structure | ||
|
|
||
| ### Requirement: TaskRunner Execution | ||
|
|
||
| The `TaskRunner` SHALL execute a sequence of `TaskStep`s based on their dependencies, processing inputs and producing outputs. | ||
|
|
||
| #### Scenario: Successful execution | ||
|
|
||
| - **WHEN** all `TaskStep`s complete successfully | ||
| - **THEN** the `TaskRunner` returns a successful workflow result. | ||
|
|
||
| #### Scenario: Execution with AbortSignal | ||
|
|
||
| - **WHEN** `TaskRunner.execute` is called with an `AbortSignal` | ||
| - **THEN** the `TaskRunner` monitors the `AbortSignal` for cancellation requests. | ||
|
|
||
| #### Scenario: Execution with Global Timeout | ||
|
|
||
| - **WHEN** `TaskRunner.execute` is called with a `timeout` option | ||
| - **THEN** the `TaskRunner` monitors the elapsed time for the workflow. | ||
|
|
||
| ### Requirement: External Workflow Cancellation | ||
|
|
||
| The `TaskRunner` SHALL allow external cancellation of an ongoing workflow. | ||
|
|
||
| #### Scenario: Workflow cancelled by AbortSignal | ||
|
|
||
| - **WHEN** an `AbortSignal` provided to `TaskRunner.execute` is triggered | ||
| - **THEN** the `TaskRunner` immediately attempts to stop execution of current and pending tasks. | ||
|
|
||
| #### Scenario: Workflow cancelled by Global Timeout | ||
|
|
||
| - **WHEN** the specified global `timeout` for `TaskRunner.execute` is reached | ||
| - **THEN** the `TaskRunner` immediately attempts to stop execution of current and pending tasks. | ||
|
|
||
| #### Scenario: Tasks marked as cancelled | ||
|
|
||
| - **WHEN** a workflow is cancelled (by `AbortSignal` or `timeout`) | ||
| - **THEN** all unexecuted `TaskStep`s SHALL be marked with a 'cancelled' status in the final result. | ||
|
|
||
| #### Scenario: Pre-aborted workflow | ||
|
|
||
| - **WHEN** `TaskRunner.execute` is called with an `AbortSignal` that is already aborted | ||
| - **THEN** the `TaskRunner` SHALL return immediately with all tasks marked as cancelled, without executing any steps. | ||
|
|
||
| #### Scenario: Graceful interruption of current task | ||
|
|
||
| - **WHEN** a workflow is cancelled and a `TaskStep` is currently executing | ||
| - **THEN** the `TaskStep` SHALL receive the cancellation signal (e.g., via `AbortSignal` context) to allow for graceful interruption. | ||
|
|
||
| ### Requirement: Cancellation Conflict Resolution | ||
|
|
||
| The `TaskRunner` SHALL handle scenarios where both `AbortSignal` and global `timeout` are provided. | ||
|
|
||
| #### Scenario: AbortSignal precedes Timeout | ||
|
|
||
| - **WHEN** both `AbortSignal` and `timeout` are provided, and `AbortSignal` is triggered first | ||
| - **THEN** the `TaskRunner` SHALL cancel the workflow based on the `AbortSignal`, ignoring the `timeout`. | ||
|
|
||
| #### Scenario: Timeout precedes AbortSignal | ||
|
|
||
| - **WHEN** both `AbortSignal` and `timeout` are provided, and `timeout` is reached first | ||
| - **THEN** the `TaskRunner` SHALL cancel the workflow based on the `timeout`, ignoring the `AbortSignal`. | ||
|
|
||
| ### Requirement: Integration Verification | ||
|
|
||
| The system's integrity SHALL be verified through comprehensive integration scenarios executed against the real runtime environment without mocks. | ||
|
|
||
| #### Scenario: Complex Graph Execution | ||
|
|
||
| - **WHEN** a complex task graph (diamonds, sequences, parallel branches) is executed | ||
| - **THEN** the system SHALL respect all dependency constraints and execution orders. | ||
| - **AND** the final state MUST reflect the cumulative side effects of all successful tasks. | ||
|
|
||
| #### Scenario: Failure Propagation | ||
|
|
||
| - **WHEN** a task fails in a complex graph | ||
| - **THEN** ONLY dependent tasks SHALL be skipped | ||
| - **AND** independent branches SHALL continue to execute to completion. | ||
|
|
||
| #### Scenario: Context Integrity | ||
|
|
||
| - **WHEN** multiple tasks mutate the shared context | ||
| - **THEN** state changes MUST be propagated correctly to downstream tasks. | ||
|
|
||
| ### Requirement: Modular Execution Architecture | ||
|
|
||
| The system SHALL support pluggable execution strategies and decoupled state management. | ||
|
|
||
| #### Scenario: Pluggable Strategy | ||
|
|
||
| - **WHEN** configured with a custom execution strategy | ||
| - **THEN** the `TaskRunner` SHALL delegate the execution logic to that strategy. | ||
| - **GIVEN** a `TaskCacheConfig` object | ||
| - **THEN** it SHALL support: | ||
| - `key`: A function returning a unique string key based on the context. | ||
| - `ttl`: Optional time-to-live in milliseconds. | ||
| - `restore`: Optional function to restore context side effects from a cached result. | ||
|
|
||
| ### Requirement: Dry Run Execution Strategy | ||
| ### Requirement: Caching Execution Strategy | ||
|
|
||
| The system SHALL provide a `DryRunExecutionStrategy` that implements `IExecutionStrategy`. | ||
| The system SHALL provide a `CachingExecutionStrategy` that implements `IExecutionStrategy` and wraps another `IExecutionStrategy`. | ||
|
|
||
| #### Scenario: Simulating execution | ||
| #### Scenario: Cache Miss Execution | ||
|
|
||
| - **WHEN** `WorkflowExecutor` is configured with `DryRunExecutionStrategy` | ||
| - **AND** `execute` is called | ||
| - **THEN** it SHALL traverse the dependency graph respecting order | ||
| - **AND** it SHALL NOT execute the actual work of the `TaskStep`. | ||
| - **AND** it SHALL return `TaskResult`s with a status indicating successful simulation (e.g., `simulated` or `success`). | ||
| - **WHEN** the `CachingExecutionStrategy` executes a task with a cache key that is NOT present in the cache provider | ||
| - **THEN** it SHALL execute the task using the inner strategy. | ||
| - **AND** it SHALL store the result in the cache provider if execution is successful. | ||
| - **AND** it SHALL return the result. | ||
|
|
||
| ### Requirement: Mermaid Visualization | ||
| #### Scenario: Cache Hit Execution | ||
|
|
||
| The system SHALL provide a utility to generate a Mermaid.js graph from task steps. | ||
| - **WHEN** the `CachingExecutionStrategy` executes a task with a cache key that IS present in the cache provider | ||
| - **THEN** it SHALL NOT execute the inner strategy. | ||
| - **AND** it SHALL invoke the `restore` function (if provided) with the current context and the cached result. | ||
| - **AND** it SHALL return the cached result. | ||
|
|
||
| #### Scenario: Generate Mermaid Graph | ||
| #### Scenario: Cache Expiration | ||
|
|
||
| - **GIVEN** a list of `TaskStep`s with dependencies | ||
| - **WHEN** `generateMermaidGraph` is called | ||
| - **THEN** it SHALL return a valid Mermaid flowchart syntax string. | ||
| - **AND** dependencies SHALL be represented as arrows (`-->`). | ||
| - **AND** independent tasks SHALL appear as nodes. | ||
| - **WHEN** a cached item's TTL has expired | ||
| - **THEN** the cache provider SHALL NOT return the item. | ||
| - **AND** the strategy SHALL proceed as a cache miss. | ||
|
|
||
| ### Requirement: Task Retry Configuration | ||
| ### Requirement: Cache Provider Interface | ||
|
|
||
| The `TaskStep` interface SHALL support an optional `retry` property of type `TaskRetryConfig`. | ||
| The system SHALL define an `ICacheProvider` interface for pluggable caching backends. | ||
|
|
||
| #### Scenario: Retry Config Structure | ||
| #### Scenario: Interface Methods | ||
|
|
||
| - **GIVEN** a `TaskRetryConfig` object | ||
| - **GIVEN** an `ICacheProvider` implementation | ||
| - **THEN** it SHALL support: | ||
| - `attempts`: Number of retry attempts (default: 0). | ||
| - `delay`: Base delay in milliseconds (default: 0). | ||
| - `backoff`: Backoff strategy ('fixed' | 'exponential') (default: 'fixed'). | ||
|
|
||
| ### Requirement: Retrying Execution Strategy | ||
|
|
||
| The system SHALL provide a `RetryingExecutionStrategy` that implements `IExecutionStrategy` and wraps another `IExecutionStrategy`. | ||
|
|
||
| #### Scenario: Successful execution | ||
|
|
||
| - **WHEN** the inner strategy returns a successful `TaskResult` | ||
| - **THEN** `RetryingExecutionStrategy` SHALL return that result immediately. | ||
|
|
||
| #### Scenario: Retry on failure | ||
|
|
||
| - **WHEN** the inner strategy throws or returns a failed `TaskResult` | ||
| - **AND** the task has `retry.attempts > 0` | ||
| - **THEN** it SHALL wait for the configured `delay`. | ||
| - **AND** it SHALL re-execute the task using the inner strategy. | ||
| - **AND** it SHALL decrement the remaining attempts. | ||
|
|
||
| #### Scenario: Max attempts reached | ||
|
|
||
| - **WHEN** the task fails and no attempts remain | ||
| - **THEN** it SHALL return the failed result (or throw). | ||
|
|
||
| #### Scenario: Exponential Backoff | ||
|
|
||
| - **WHEN** `retry.backoff` is 'exponential' | ||
| - **THEN** the delay SHALL increase for each attempt (e.g., `delay * 2^attempt`). | ||
| - `get(key: string): Promise<TaskResult | undefined>` | ||
| - `set(key: string, value: TaskResult, ttl?: number): Promise<void>` | ||
| - `delete(key: string): Promise<void>` | ||
|
|
||
| ### Requirement: Task Execution Metrics | ||
| ### Requirement: Default Memory Cache | ||
|
|
||
| The system SHALL record timing metrics for each executed task, including start time, end time, and duration. | ||
| The system SHALL provide a `MemoryCacheProvider` as the default implementation of `ICacheProvider`. | ||
|
|
||
| #### Scenario: Successful execution | ||
| - **WHEN** a task completes successfully | ||
| - **THEN** the task result contains the start timestamp, end timestamp, and duration in milliseconds | ||
| #### Scenario: In-Memory Storage | ||
|
|
||
| #### Scenario: Failed execution | ||
| - **WHEN** a task fails | ||
| - **THEN** the task result contains the start timestamp, end timestamp, and duration in milliseconds | ||
| - **WHEN** items are set in `MemoryCacheProvider` | ||
| - **THEN** they are stored in memory and retrieved correctly until process termination or expiration. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| /** | ||
| * Represents the completion status of a task. | ||
| */ | ||
| export type TaskStatus = "success" | "failure" | "skipped" | "cancelled"; | ||
| export type TaskStatus = "success" | "failure" | "skipped" | "cancelled" | "cached"; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| import { TaskResult } from "../TaskResult.js"; | ||
|
|
||
| /** | ||
| * Interface for caching task results. | ||
| */ | ||
| export interface ICacheProvider { | ||
| /** | ||
| * Retrieves a cached result by key. | ||
| * @param key The unique cache key. | ||
| * @returns A promise resolving to the cached result, or undefined if not found or expired. | ||
| */ | ||
| get(key: string): Promise<TaskResult | undefined>; | ||
|
|
||
| /** | ||
| * Stores a result in the cache. | ||
| * @param key The unique cache key. | ||
| * @param value The task result to cache. | ||
| * @param ttl Optional time-to-live in milliseconds. | ||
| */ | ||
| set(key: string, value: TaskResult, ttl?: number): Promise<void>; | ||
|
|
||
| /** | ||
| * Deletes a cached result by key. | ||
| * @param key The unique cache key. | ||
| */ | ||
| delete(key: string): Promise<void>; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| import { IExecutionStrategy } from "./IExecutionStrategy.js"; | ||
| import { TaskStep } from "../TaskStep.js"; | ||
| import { TaskResult } from "../TaskResult.js"; | ||
| import { ICacheProvider } from "../contracts/ICacheProvider.js"; | ||
|
|
||
| /** | ||
| * An execution strategy that caches task results. | ||
| */ | ||
| export class CachingExecutionStrategy<TContext> | ||
| implements IExecutionStrategy<TContext> | ||
| { | ||
| constructor( | ||
| private readonly innerStrategy: IExecutionStrategy<TContext>, | ||
| private readonly cacheProvider: ICacheProvider | ||
| ) {} | ||
|
|
||
| public async execute( | ||
| step: TaskStep<TContext>, | ||
| context: TContext, | ||
| signal?: AbortSignal | ||
| ): Promise<TaskResult> { | ||
| if (!step.cache) { | ||
| return this.innerStrategy.execute(step, context, signal); | ||
| } | ||
|
|
||
| const cacheKey = await step.cache.key(context); | ||
| const cachedResult = await this.cacheProvider.get(cacheKey); | ||
|
Comment on lines
+26
to
+27
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation is susceptible to a 'cache stampede' (or dog-piling). If multiple independent tasks with the same cache key are executed concurrently, they will all experience a cache miss and trigger the |
||
|
|
||
| if (cachedResult) { | ||
| if (step.cache.restore) { | ||
| await step.cache.restore(context, cachedResult); | ||
| } | ||
| return { | ||
| ...cachedResult, | ||
| status: "cached", | ||
| }; | ||
| } | ||
|
|
||
| const result = await this.innerStrategy.execute(step, context, signal); | ||
|
|
||
| if (result.status === "success") { | ||
| await this.cacheProvider.set(cacheKey, result, step.cache.ttl); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diff indicates that the entire existing specification (Purpose, Requirements for Execution, Cancellation, etc.) has been removed and replaced only with the new caching requirements. This appears to be an accidental deletion of critical documentation. Please ensure the new requirements are integrated into the existing specification rather than replacing it.