Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/TaskRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { RetryingExecutionStrategy } from "./strategies/RetryingExecutionStrateg
import { Plugin } from "./contracts/Plugin.js";
import { PluginManager } from "./PluginManager.js";
import { DryRunExecutionStrategy } from "./strategies/DryRunExecutionStrategy.js";
import { ICacheProvider } from "./contracts/ICacheProvider.js";
import { CachingExecutionStrategy } from "./strategies/CachingExecutionStrategy.js";

const MERMAID_ID_REGEX = /[^a-zA-Z0-9_-]/g;

Expand All @@ -30,6 +32,7 @@ export class TaskRunner<TContext> {
private readonly validator = new TaskGraphValidator();
private executionStrategy: IExecutionStrategy<TContext> =
new RetryingExecutionStrategy(new StandardExecutionStrategy());
private cacheProvider?: ICacheProvider;

private readonly pluginManager: PluginManager<TContext>;

Expand Down Expand Up @@ -84,6 +87,16 @@ export class TaskRunner<TContext> {
return this;
}

/**
* Sets the cache provider for task caching.
* @param cacheProvider The cache provider.
* @returns The TaskRunner instance for chaining.
*/
public setCacheProvider(cacheProvider: ICacheProvider): this {
this.cacheProvider = cacheProvider;
return this;
}

/**
* Generates a Mermaid.js graph representation of the task workflow.
* @param steps The list of tasks to visualize.
Expand Down Expand Up @@ -199,6 +212,10 @@ export class TaskRunner<TContext> {
strategy = new DryRunExecutionStrategy<TContext>();
}

if (this.cacheProvider) {
strategy = new CachingExecutionStrategy(strategy, this.cacheProvider);
}
Comment on lines +215 to +217
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The CachingExecutionStrategy is applied even during a dry run. This causes the results of the DryRunExecutionStrategy (which are typically placeholders or mock data) to be persisted in the cache, potentially corrupting it for subsequent real executions. Consider passing a flag to the strategy to disable cache writes during dry runs.

Suggested change
if (this.cacheProvider) {
strategy = new CachingExecutionStrategy(strategy, this.cacheProvider);
}
if (this.cacheProvider) {
strategy = new CachingExecutionStrategy(strategy, this.cacheProvider, !!config?.dryRun);
}


const executor = new WorkflowExecutor(
this.context,
this.eventBus,
Expand Down
15 changes: 15 additions & 0 deletions src/TaskStep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@ import { TaskResult } from "./TaskResult.js";
import { TaskRetryConfig } from "./contracts/TaskRetryConfig.js";
import { TaskLoopConfig } from "./contracts/TaskLoopConfig.js";

/**
* Configuration for task output caching.
* @template TContext The shape of the shared context object.
*/
export interface TaskCacheConfig<TContext> {
/** A function returning a unique string key based on the context. */
key: (context: TContext) => string | Promise<string>;
/** Optional time-to-live in milliseconds. Defaults to infinite. */
ttl?: number;
/** Optional function to restore context side effects from a cached result. */
restore?: (context: TContext, cachedResult: TaskResult) => void | Promise<void>;
}

/**
* Represents a single, executable step within a workflow.
* @template TContext The shape of the shared context object.
Expand All @@ -15,6 +28,8 @@ export interface TaskStep<TContext> {
retry?: TaskRetryConfig;
/** Optional loop configuration for the task. */
loop?: TaskLoopConfig<TContext>;
/** Optional cache configuration for the task. */
cache?: TaskCacheConfig<TContext>;
/**
* Optional function to determine if the task should run.
* If it returns false (synchronously or asynchronously), the task is skipped.
Expand Down
29 changes: 29 additions & 0 deletions src/contracts/ICacheProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { TaskResult } from "../TaskResult.js";

/**
* Interface for task output cache providers.
*/
export interface ICacheProvider {
/**
* Retrieves a cached result by its key.
* @param key The unique cache key.
* @returns A promise resolving to the cached TaskResult, or undefined if not found.
*/
get(key: string): Promise<TaskResult | undefined>;

/**
* Stores a task result in the cache.
* @param key The unique cache key.
* @param value The task result to cache.
* @param ttl Optional time-to-live in milliseconds.
* @returns A promise that resolves when the cache is updated.
*/
set(key: string, value: TaskResult, ttl?: number): Promise<void>;

/**
* Deletes a cached result by its key.
* @param key The unique cache key.
* @returns A promise that resolves when the key is deleted.
*/
delete(key: string): Promise<void>;
}
45 changes: 45 additions & 0 deletions src/strategies/CachingExecutionStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { IExecutionStrategy } from "./IExecutionStrategy.js";
import { TaskStep } from "../TaskStep.js";
import { TaskResult } from "../TaskResult.js";
import { ICacheProvider } from "../contracts/ICacheProvider.js";

/**
* An execution strategy that caches task results.
*/
export class CachingExecutionStrategy<TContext> implements IExecutionStrategy<TContext> {
constructor(
private readonly innerStrategy: IExecutionStrategy<TContext>,
private readonly cacheProvider: ICacheProvider
) {}
Comment on lines +10 to +13
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To prevent dry run results from polluting the cache, consider adding a readOnly flag to the constructor. This allows the strategy to still benefit from cache hits during a dry run without writing mock results back to the cache.

Suggested change
constructor(
private readonly innerStrategy: IExecutionStrategy<TContext>,
private readonly cacheProvider: ICacheProvider
) {}
constructor(
private readonly innerStrategy: IExecutionStrategy<TContext>,
private readonly cacheProvider: ICacheProvider,
private readonly readOnly: boolean = false
) {}


async execute(
step: TaskStep<TContext>,
context: TContext,
signal?: AbortSignal
): Promise<TaskResult> {
if (!step.cache) {
return this.innerStrategy.execute(step, context, signal);
}

const key = await step.cache.key(context);
const cachedResult = await this.cacheProvider.get(key);
Comment on lines +24 to +25
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Cache retrieval operations are currently not protected against failures. If the cacheProvider throws an error (e.g., due to a connection issue with an external cache), the entire task execution will fail. It is recommended to wrap cache interactions in a try-catch block to ensure that caching remains an optional optimization and does not compromise the reliability of the workflow execution.

    const key = await step.cache.key(context);
    let cachedResult: TaskResult | undefined;
    try {
      cachedResult = await this.cacheProvider.get(key);
    } catch {
      // Fallback to execution if cache retrieval fails
    }


if (cachedResult !== undefined) {
if (step.cache.restore) {
await step.cache.restore(context, cachedResult);
}
return {
...cachedResult,
status: "skipped",
};
}

const result = await this.innerStrategy.execute(step, context, signal);

if (result.status === "success") {
await this.cacheProvider.set(key, result, step.cache.ttl);
}
Comment on lines +39 to +41
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Ensure that cache writes are skipped when the strategy is in readOnly mode (e.g., during a dry run). Additionally, wrapping the set operation in a try-catch block prevents cache storage failures from affecting the overall task result.

    if (result.status === "success" && !this.readOnly) {
      try {
        await this.cacheProvider.set(key, result, step.cache.ttl);
      } catch {
        // Ignore cache storage failures
      }
    }


return result;
}
}
32 changes: 32 additions & 0 deletions src/utils/MemoryCacheProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { ICacheProvider } from "../contracts/ICacheProvider.js";
import { TaskResult } from "../TaskResult.js";

/**
* A default in-memory implementation of ICacheProvider.
*/
export class MemoryCacheProvider implements ICacheProvider {
private cache = new Map<string, { value: TaskResult; expiry: number | null }>();

Check warning on line 8 in src/utils/MemoryCacheProvider.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Member 'cache' is never reassigned; mark it as `readonly`.

See more on https://sonarcloud.io/project/issues?id=thalesraymond_task-runner&issues=AZ1Q1UwhDzipLG7ycy03&open=AZ1Q1UwhDzipLG7ycy03&pullRequest=245
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MemoryCacheProvider lacks a background cleanup process for expired entries. Entries are only removed when they are explicitly accessed via get(). This can lead to a memory leak if many keys with a TTL are set but never retrieved. Consider adding a periodic cleanup task (e.g., using setInterval) or using a more robust caching utility with built-in expiration support.


async get(key: string): Promise<TaskResult | undefined> {
const entry = this.cache.get(key);
if (!entry) {
return undefined;
}

if (entry.expiry !== null && Date.now() > entry.expiry) {
this.cache.delete(key);
return undefined;
}

return entry.value;
}

async set(key: string, value: TaskResult, ttl?: number): Promise<void> {
const expiry = ttl !== undefined ? Date.now() + ttl : null;

Check warning on line 25 in src/utils/MemoryCacheProvider.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Unexpected negated condition.

See more on https://sonarcloud.io/project/issues?id=thalesraymond_task-runner&issues=AZ1Q1UwhDzipLG7ycy04&open=AZ1Q1UwhDzipLG7ycy04&pullRequest=245
this.cache.set(key, { value, expiry });
}

async delete(key: string): Promise<void> {
this.cache.delete(key);
}
}
152 changes: 152 additions & 0 deletions tests/CachingExecutionStrategy.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { describe, it, expect, vi } from "vitest";
import { CachingExecutionStrategy } from "../src/strategies/CachingExecutionStrategy.js";
import { IExecutionStrategy } from "../src/strategies/IExecutionStrategy.js";
import { TaskStep } from "../src/TaskStep.js";
import { MemoryCacheProvider } from "../src/utils/MemoryCacheProvider.js";

describe("CachingExecutionStrategy", () => {
it("should execute inner strategy if cache is not configured", async () => {
const mockStrategy: IExecutionStrategy<unknown> = {
execute: vi.fn().mockResolvedValue({ status: "success" }),
};
const cacheProvider = new MemoryCacheProvider();
const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider);

const step: TaskStep<unknown> = {
name: "task1",
run: vi.fn(),
};

const result = await cachingStrategy.execute(step, {});

expect(mockStrategy.execute).toHaveBeenCalled();
expect(result.status).toBe("success");
});

it("should cache successful execution results", async () => {
const mockStrategy: IExecutionStrategy<unknown> = {
execute: vi.fn().mockResolvedValue({ status: "success", data: "result_data" }),
};
const cacheProvider = new MemoryCacheProvider();
const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider);

const step: TaskStep<unknown> = {
name: "task1",
cache: {
key: () => "my_cache_key",
},
run: vi.fn(),
};

const result = await cachingStrategy.execute(step, {});

expect(mockStrategy.execute).toHaveBeenCalledTimes(1);
expect(result.status).toBe("success");

const cached = await cacheProvider.get("my_cache_key");
expect(cached).toBeDefined();
expect(cached?.data).toBe("result_data");
});

it("should not cache failed execution results", async () => {
const mockStrategy: IExecutionStrategy<unknown> = {
execute: vi.fn().mockResolvedValue({ status: "failure", error: "failed" }),
};
const cacheProvider = new MemoryCacheProvider();
const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider);

const step: TaskStep<unknown> = {
name: "task1",
cache: {
key: () => "my_cache_key",
},
run: vi.fn(),
};

const result = await cachingStrategy.execute(step, {});

expect(mockStrategy.execute).toHaveBeenCalledTimes(1);
expect(result.status).toBe("failure");

const cached = await cacheProvider.get("my_cache_key");
expect(cached).toBeUndefined();
});

it("should return cached result with status 'skipped' on cache hit with restore function", async () => {
const mockStrategy: IExecutionStrategy<unknown> = {
execute: vi.fn(),
};
const cacheProvider = new MemoryCacheProvider();
await cacheProvider.set("my_cache_key", { status: "success", data: "cached_data" });

const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider);

const restoreFn = vi.fn();
const step: TaskStep<unknown> = {
name: "task1",
cache: {
key: () => "my_cache_key",
restore: restoreFn,
},
run: vi.fn(),
};

const context = { state: 1 };
const result = await cachingStrategy.execute(step, context);

expect(mockStrategy.execute).not.toHaveBeenCalled();
expect(result.status).toBe("skipped");
expect(result.data).toBe("cached_data");

// verify restore was called with context and cachedResult
expect(restoreFn).toHaveBeenCalledWith(context, { status: "success", data: "cached_data" });
});

it("should return cached result with status 'skipped' on cache hit without restore function", async () => {
const mockStrategy: IExecutionStrategy<unknown> = {
execute: vi.fn(),
};
const cacheProvider = new MemoryCacheProvider();
await cacheProvider.set("my_cache_key", { status: "success", data: "cached_data" });

const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider);

const step: TaskStep<unknown> = {
name: "task1",
cache: {
key: () => "my_cache_key",
},
run: vi.fn(),
};

const context = { state: 1 };
const result = await cachingStrategy.execute(step, context);

expect(mockStrategy.execute).not.toHaveBeenCalled();
expect(result.status).toBe("skipped");
expect(result.data).toBe("cached_data");
});

it("should use TTL when caching", async () => {
const mockStrategy: IExecutionStrategy<unknown> = {
execute: vi.fn().mockResolvedValue({ status: "success", data: "result_data" }),
};
const cacheProvider = new MemoryCacheProvider();
vi.spyOn(cacheProvider, "set");

const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider);

const step: TaskStep<unknown> = {
name: "task1",
cache: {
key: () => "my_cache_key",
ttl: 500,
},
run: vi.fn(),
};

await cachingStrategy.execute(step, {});

expect(cacheProvider.set).toHaveBeenCalledWith("my_cache_key", { status: "success", data: "result_data" }, 500);
});
});
63 changes: 63 additions & 0 deletions tests/MemoryCacheProvider.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { describe, it, expect } from "vitest";
import { MemoryCacheProvider } from "../src/utils/MemoryCacheProvider.js";
import { TaskResult } from "../src/TaskResult.js";

describe("MemoryCacheProvider", () => {
it("should set and get a cached item", async () => {
const cache = new MemoryCacheProvider();
const result: TaskResult = { status: "success", data: "test" };

await cache.set("key1", result);
const cached = await cache.get("key1");

expect(cached).toEqual(result);
});

it("should return undefined for a missing item", async () => {
const cache = new MemoryCacheProvider();
const cached = await cache.get("missing_key");

expect(cached).toBeUndefined();
});

it("should delete a cached item", async () => {
const cache = new MemoryCacheProvider();
const result: TaskResult = { status: "success", data: "test" };

await cache.set("key1", result);
await cache.delete("key1");
const cached = await cache.get("key1");

expect(cached).toBeUndefined();
});

it("should expire an item based on TTL", async () => {
const cache = new MemoryCacheProvider();
const result: TaskResult = { status: "success", data: "test" };

await cache.set("key1", result, 100); // 100ms TTL

// Immediately available
let cached = await cache.get("key1");
expect(cached).toEqual(result);

// Wait for expiration
await new Promise((resolve) => setTimeout(resolve, 150));

cached = await cache.get("key1");
expect(cached).toBeUndefined();
});

it("should not expire an item if TTL is not provided", async () => {
const cache = new MemoryCacheProvider();
const result: TaskResult = { status: "success", data: "test" };

await cache.set("key1", result);

// Wait for a little bit
await new Promise((resolve) => setTimeout(resolve, 50));

const cached = await cache.get("key1");
expect(cached).toEqual(result);
});
});
Loading
Loading