Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 37 additions & 153 deletions openspec/specs/task-runner/spec.md
Original file line number Diff line number Diff line change
@@ -1,174 +1,58 @@
# task-runner Specification
## ADDED Requirements
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The diff indicates that the entire existing specification (Purpose, Requirements for Execution, Cancellation, etc.) has been removed and replaced only with the new caching requirements. This appears to be an accidental deletion of critical documentation. Please ensure the new requirements are integrated into the existing specification rather than replacing it.


## Purpose
### Requirement: Task Caching Configuration

TBD - created by archiving change add-external-task-cancellation. Update Purpose after archive.
The `TaskStep` interface SHALL support an optional `cache` property of type `TaskCacheConfig`.

## Requirements
#### Scenario: Cache Config Structure

### Requirement: TaskRunner Execution

The `TaskRunner` SHALL execute a sequence of `TaskStep`s based on their dependencies, processing inputs and producing outputs.

#### Scenario: Successful execution

- **WHEN** all `TaskStep`s complete successfully
- **THEN** the `TaskRunner` returns a successful workflow result.

#### Scenario: Execution with AbortSignal

- **WHEN** `TaskRunner.execute` is called with an `AbortSignal`
- **THEN** the `TaskRunner` monitors the `AbortSignal` for cancellation requests.

#### Scenario: Execution with Global Timeout

- **WHEN** `TaskRunner.execute` is called with a `timeout` option
- **THEN** the `TaskRunner` monitors the elapsed time for the workflow.

### Requirement: External Workflow Cancellation

The `TaskRunner` SHALL allow external cancellation of an ongoing workflow.

#### Scenario: Workflow cancelled by AbortSignal

- **WHEN** an `AbortSignal` provided to `TaskRunner.execute` is triggered
- **THEN** the `TaskRunner` immediately attempts to stop execution of current and pending tasks.

#### Scenario: Workflow cancelled by Global Timeout

- **WHEN** the specified global `timeout` for `TaskRunner.execute` is reached
- **THEN** the `TaskRunner` immediately attempts to stop execution of current and pending tasks.

#### Scenario: Tasks marked as cancelled

- **WHEN** a workflow is cancelled (by `AbortSignal` or `timeout`)
- **THEN** all unexecuted `TaskStep`s SHALL be marked with a 'cancelled' status in the final result.

#### Scenario: Pre-aborted workflow

- **WHEN** `TaskRunner.execute` is called with an `AbortSignal` that is already aborted
- **THEN** the `TaskRunner` SHALL return immediately with all tasks marked as cancelled, without executing any steps.

#### Scenario: Graceful interruption of current task

- **WHEN** a workflow is cancelled and a `TaskStep` is currently executing
- **THEN** the `TaskStep` SHALL receive the cancellation signal (e.g., via `AbortSignal` context) to allow for graceful interruption.

### Requirement: Cancellation Conflict Resolution

The `TaskRunner` SHALL handle scenarios where both `AbortSignal` and global `timeout` are provided.

#### Scenario: AbortSignal precedes Timeout

- **WHEN** both `AbortSignal` and `timeout` are provided, and `AbortSignal` is triggered first
- **THEN** the `TaskRunner` SHALL cancel the workflow based on the `AbortSignal`, ignoring the `timeout`.

#### Scenario: Timeout precedes AbortSignal

- **WHEN** both `AbortSignal` and `timeout` are provided, and `timeout` is reached first
- **THEN** the `TaskRunner` SHALL cancel the workflow based on the `timeout`, ignoring the `AbortSignal`.

### Requirement: Integration Verification

The system's integrity SHALL be verified through comprehensive integration scenarios executed against the real runtime environment without mocks.

#### Scenario: Complex Graph Execution

- **WHEN** a complex task graph (diamonds, sequences, parallel branches) is executed
- **THEN** the system SHALL respect all dependency constraints and execution orders.
- **AND** the final state MUST reflect the cumulative side effects of all successful tasks.

#### Scenario: Failure Propagation

- **WHEN** a task fails in a complex graph
- **THEN** ONLY dependent tasks SHALL be skipped
- **AND** independent branches SHALL continue to execute to completion.

#### Scenario: Context Integrity

- **WHEN** multiple tasks mutate the shared context
- **THEN** state changes MUST be propagated correctly to downstream tasks.

### Requirement: Modular Execution Architecture

The system SHALL support pluggable execution strategies and decoupled state management.

#### Scenario: Pluggable Strategy

- **WHEN** configured with a custom execution strategy
- **THEN** the `TaskRunner` SHALL delegate the execution logic to that strategy.
- **GIVEN** a `TaskCacheConfig` object
- **THEN** it SHALL support:
- `key`: A function returning a unique string key based on the context.
- `ttl`: Optional time-to-live in milliseconds.
- `restore`: Optional function to restore context side effects from a cached result.

### Requirement: Dry Run Execution Strategy
### Requirement: Caching Execution Strategy

The system SHALL provide a `DryRunExecutionStrategy` that implements `IExecutionStrategy`.
The system SHALL provide a `CachingExecutionStrategy` that implements `IExecutionStrategy` and wraps another `IExecutionStrategy`.

#### Scenario: Simulating execution
#### Scenario: Cache Miss Execution

- **WHEN** `WorkflowExecutor` is configured with `DryRunExecutionStrategy`
- **AND** `execute` is called
- **THEN** it SHALL traverse the dependency graph respecting order
- **AND** it SHALL NOT execute the actual work of the `TaskStep`.
- **AND** it SHALL return `TaskResult`s with a status indicating successful simulation (e.g., `simulated` or `success`).
- **WHEN** the `CachingExecutionStrategy` executes a task with a cache key that is NOT present in the cache provider
- **THEN** it SHALL execute the task using the inner strategy.
- **AND** it SHALL store the result in the cache provider if execution is successful.
- **AND** it SHALL return the result.

### Requirement: Mermaid Visualization
#### Scenario: Cache Hit Execution

The system SHALL provide a utility to generate a Mermaid.js graph from task steps.
- **WHEN** the `CachingExecutionStrategy` executes a task with a cache key that IS present in the cache provider
- **THEN** it SHALL NOT execute the inner strategy.
- **AND** it SHALL invoke the `restore` function (if provided) with the current context and the cached result.
- **AND** it SHALL return the cached result.

#### Scenario: Generate Mermaid Graph
#### Scenario: Cache Expiration

- **GIVEN** a list of `TaskStep`s with dependencies
- **WHEN** `generateMermaidGraph` is called
- **THEN** it SHALL return a valid Mermaid flowchart syntax string.
- **AND** dependencies SHALL be represented as arrows (`-->`).
- **AND** independent tasks SHALL appear as nodes.
- **WHEN** a cached item's TTL has expired
- **THEN** the cache provider SHALL NOT return the item.
- **AND** the strategy SHALL proceed as a cache miss.

### Requirement: Task Retry Configuration
### Requirement: Cache Provider Interface

The `TaskStep` interface SHALL support an optional `retry` property of type `TaskRetryConfig`.
The system SHALL define an `ICacheProvider` interface for pluggable caching backends.

#### Scenario: Retry Config Structure
#### Scenario: Interface Methods

- **GIVEN** a `TaskRetryConfig` object
- **GIVEN** an `ICacheProvider` implementation
- **THEN** it SHALL support:
- `attempts`: Number of retry attempts (default: 0).
- `delay`: Base delay in milliseconds (default: 0).
- `backoff`: Backoff strategy ('fixed' | 'exponential') (default: 'fixed').

### Requirement: Retrying Execution Strategy

The system SHALL provide a `RetryingExecutionStrategy` that implements `IExecutionStrategy` and wraps another `IExecutionStrategy`.

#### Scenario: Successful execution

- **WHEN** the inner strategy returns a successful `TaskResult`
- **THEN** `RetryingExecutionStrategy` SHALL return that result immediately.

#### Scenario: Retry on failure

- **WHEN** the inner strategy throws or returns a failed `TaskResult`
- **AND** the task has `retry.attempts > 0`
- **THEN** it SHALL wait for the configured `delay`.
- **AND** it SHALL re-execute the task using the inner strategy.
- **AND** it SHALL decrement the remaining attempts.

#### Scenario: Max attempts reached

- **WHEN** the task fails and no attempts remain
- **THEN** it SHALL return the failed result (or throw).

#### Scenario: Exponential Backoff

- **WHEN** `retry.backoff` is 'exponential'
- **THEN** the delay SHALL increase for each attempt (e.g., `delay * 2^attempt`).
- `get(key: string): Promise<TaskResult | undefined>`
- `set(key: string, value: TaskResult, ttl?: number): Promise<void>`
- `delete(key: string): Promise<void>`

### Requirement: Task Execution Metrics
### Requirement: Default Memory Cache

The system SHALL record timing metrics for each executed task, including start time, end time, and duration.
The system SHALL provide a `MemoryCacheProvider` as the default implementation of `ICacheProvider`.

#### Scenario: Successful execution
- **WHEN** a task completes successfully
- **THEN** the task result contains the start timestamp, end timestamp, and duration in milliseconds
#### Scenario: In-Memory Storage

#### Scenario: Failed execution
- **WHEN** a task fails
- **THEN** the task result contains the start timestamp, end timestamp, and duration in milliseconds
- **WHEN** items are set in `MemoryCacheProvider`
- **THEN** they are stored in memory and retrieved correctly until process termination or expiration.
3 changes: 3 additions & 0 deletions src/TaskRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { RetryingExecutionStrategy } from "./strategies/RetryingExecutionStrateg
import { Plugin } from "./contracts/Plugin.js";
import { PluginManager } from "./PluginManager.js";
import { DryRunExecutionStrategy } from "./strategies/DryRunExecutionStrategy.js";
import { CachingExecutionStrategy } from "./strategies/CachingExecutionStrategy.js";

const MERMAID_ID_REGEX = /[^a-zA-Z0-9_-]/g;

Expand Down Expand Up @@ -197,6 +198,8 @@ export class TaskRunner<TContext> {
let strategy = this.executionStrategy;
if (config?.dryRun) {
strategy = new DryRunExecutionStrategy<TContext>();
} else if (config?.cacheProvider) {
strategy = new CachingExecutionStrategy<TContext>(strategy, config.cacheProvider);
}

const executor = new WorkflowExecutor(
Expand Down
6 changes: 6 additions & 0 deletions src/TaskRunnerExecutionConfig.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { ICacheProvider } from "./contracts/ICacheProvider.js";

/**
* Configuration options for TaskRunner execution.
*/
Expand All @@ -20,4 +22,8 @@ export interface TaskRunnerExecutionConfig {
* If undefined, all ready tasks will be run in parallel.
*/
concurrency?: number;
/**
* Optional cache provider to enable task caching.
*/
cacheProvider?: ICacheProvider;
}
2 changes: 1 addition & 1 deletion src/TaskStatus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
* Represents the completion status of a task.
*/
export type TaskStatus = "success" | "failure" | "skipped" | "cancelled";
export type TaskStatus = "success" | "failure" | "skipped" | "cancelled" | "cached";
11 changes: 11 additions & 0 deletions src/TaskStep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ import { TaskResult } from "./TaskResult.js";
import { TaskRetryConfig } from "./contracts/TaskRetryConfig.js";
import { TaskLoopConfig } from "./contracts/TaskLoopConfig.js";

export interface TaskCacheConfig<TContext> {
/** A function returning a unique string key based on the context. */
key: (context: TContext) => string | Promise<string>;
/** Optional time-to-live in milliseconds. */
ttl?: number;
/** Optional function to restore context side effects from a cached result. */
restore?: (context: TContext, cachedResult: TaskResult) => void | Promise<void>;
}

/**
* Represents a single, executable step within a workflow.
* @template TContext The shape of the shared context object.
Expand All @@ -15,6 +24,8 @@ export interface TaskStep<TContext> {
retry?: TaskRetryConfig;
/** Optional loop configuration for the task. */
loop?: TaskLoopConfig<TContext>;
/** Optional cache configuration for the task. */
cache?: TaskCacheConfig<TContext>;
/**
* Optional function to determine if the task should run.
* If it returns false (synchronously or asynchronously), the task is skipped.
Expand Down
27 changes: 27 additions & 0 deletions src/contracts/ICacheProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { TaskResult } from "../TaskResult.js";

/**
* Interface for caching task results.
*/
export interface ICacheProvider {
/**
* Retrieves a cached result by key.
* @param key The unique cache key.
* @returns A promise resolving to the cached result, or undefined if not found or expired.
*/
get(key: string): Promise<TaskResult | undefined>;

/**
* Stores a result in the cache.
* @param key The unique cache key.
* @param value The task result to cache.
* @param ttl Optional time-to-live in milliseconds.
*/
set(key: string, value: TaskResult, ttl?: number): Promise<void>;

/**
* Deletes a cached result by key.
* @param key The unique cache key.
*/
delete(key: string): Promise<void>;
}
47 changes: 47 additions & 0 deletions src/strategies/CachingExecutionStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { IExecutionStrategy } from "./IExecutionStrategy.js";
import { TaskStep } from "../TaskStep.js";
import { TaskResult } from "../TaskResult.js";
import { ICacheProvider } from "../contracts/ICacheProvider.js";

/**
* An execution strategy that caches task results.
*/
export class CachingExecutionStrategy<TContext>
implements IExecutionStrategy<TContext>
{
constructor(
private readonly innerStrategy: IExecutionStrategy<TContext>,
private readonly cacheProvider: ICacheProvider
) {}

public async execute(
step: TaskStep<TContext>,
context: TContext,
signal?: AbortSignal
): Promise<TaskResult> {
if (!step.cache) {
return this.innerStrategy.execute(step, context, signal);
}

const cacheKey = await step.cache.key(context);
const cachedResult = await this.cacheProvider.get(cacheKey);
Comment on lines +26 to +27
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This implementation is susceptible to a 'cache stampede' (or dog-piling). If multiple independent tasks with the same cache key are executed concurrently, they will all experience a cache miss and trigger the innerStrategy.execute() simultaneously. For expensive tasks, this negates the benefit of caching for the initial parallel runs. Consider implementing a mechanism to track and await pending executions for the same cache key to ensure the work is only performed once.


if (cachedResult) {
if (step.cache.restore) {
await step.cache.restore(context, cachedResult);
}
return {
...cachedResult,
status: "cached",
};
}

const result = await this.innerStrategy.execute(step, context, signal);

if (result.status === "success") {
await this.cacheProvider.set(cacheKey, result, step.cache.ttl);
}

return result;
}
}
Loading
Loading