diff --git a/src/TaskRunner.ts b/src/TaskRunner.ts index 63ba44e..bb9b453 100644 --- a/src/TaskRunner.ts +++ b/src/TaskRunner.ts @@ -17,6 +17,8 @@ import { RetryingExecutionStrategy } from "./strategies/RetryingExecutionStrateg import { Plugin } from "./contracts/Plugin.js"; import { PluginManager } from "./PluginManager.js"; import { DryRunExecutionStrategy } from "./strategies/DryRunExecutionStrategy.js"; +import { ICacheProvider } from "./contracts/ICacheProvider.js"; +import { CachingExecutionStrategy } from "./strategies/CachingExecutionStrategy.js"; const MERMAID_ID_REGEX = /[^a-zA-Z0-9_-]/g; @@ -30,6 +32,7 @@ export class TaskRunner { private readonly validator = new TaskGraphValidator(); private executionStrategy: IExecutionStrategy = new RetryingExecutionStrategy(new StandardExecutionStrategy()); + private cacheProvider?: ICacheProvider; private readonly pluginManager: PluginManager; @@ -84,6 +87,16 @@ export class TaskRunner { return this; } + /** + * Sets the cache provider for task caching. + * @param cacheProvider The cache provider. + * @returns The TaskRunner instance for chaining. + */ + public setCacheProvider(cacheProvider: ICacheProvider): this { + this.cacheProvider = cacheProvider; + return this; + } + /** * Generates a Mermaid.js graph representation of the task workflow. * @param steps The list of tasks to visualize. @@ -199,6 +212,10 @@ export class TaskRunner { strategy = new DryRunExecutionStrategy(); } + if (this.cacheProvider) { + strategy = new CachingExecutionStrategy(strategy, this.cacheProvider); + } + const executor = new WorkflowExecutor( this.context, this.eventBus, diff --git a/src/TaskStep.ts b/src/TaskStep.ts index 10fed0c..f5481d2 100644 --- a/src/TaskStep.ts +++ b/src/TaskStep.ts @@ -2,6 +2,19 @@ import { TaskResult } from "./TaskResult.js"; import { TaskRetryConfig } from "./contracts/TaskRetryConfig.js"; import { TaskLoopConfig } from "./contracts/TaskLoopConfig.js"; +/** + * Configuration for task output caching. + * @template TContext The shape of the shared context object. + */ +export interface TaskCacheConfig { + /** A function returning a unique string key based on the context. */ + key: (context: TContext) => string | Promise; + /** Optional time-to-live in milliseconds. Defaults to infinite. */ + ttl?: number; + /** Optional function to restore context side effects from a cached result. */ + restore?: (context: TContext, cachedResult: TaskResult) => void | Promise; +} + /** * Represents a single, executable step within a workflow. * @template TContext The shape of the shared context object. @@ -15,6 +28,8 @@ export interface TaskStep { retry?: TaskRetryConfig; /** Optional loop configuration for the task. */ loop?: TaskLoopConfig; + /** Optional cache configuration for the task. */ + cache?: TaskCacheConfig; /** * Optional function to determine if the task should run. * If it returns false (synchronously or asynchronously), the task is skipped. diff --git a/src/contracts/ICacheProvider.ts b/src/contracts/ICacheProvider.ts new file mode 100644 index 0000000..e95e85a --- /dev/null +++ b/src/contracts/ICacheProvider.ts @@ -0,0 +1,29 @@ +import { TaskResult } from "../TaskResult.js"; + +/** + * Interface for task output cache providers. + */ +export interface ICacheProvider { + /** + * Retrieves a cached result by its key. + * @param key The unique cache key. + * @returns A promise resolving to the cached TaskResult, or undefined if not found. + */ + get(key: string): Promise; + + /** + * Stores a task result in the cache. + * @param key The unique cache key. + * @param value The task result to cache. + * @param ttl Optional time-to-live in milliseconds. + * @returns A promise that resolves when the cache is updated. + */ + set(key: string, value: TaskResult, ttl?: number): Promise; + + /** + * Deletes a cached result by its key. + * @param key The unique cache key. + * @returns A promise that resolves when the key is deleted. + */ + delete(key: string): Promise; +} diff --git a/src/strategies/CachingExecutionStrategy.ts b/src/strategies/CachingExecutionStrategy.ts new file mode 100644 index 0000000..7d18e89 --- /dev/null +++ b/src/strategies/CachingExecutionStrategy.ts @@ -0,0 +1,45 @@ +import { IExecutionStrategy } from "./IExecutionStrategy.js"; +import { TaskStep } from "../TaskStep.js"; +import { TaskResult } from "../TaskResult.js"; +import { ICacheProvider } from "../contracts/ICacheProvider.js"; + +/** + * An execution strategy that caches task results. + */ +export class CachingExecutionStrategy implements IExecutionStrategy { + constructor( + private readonly innerStrategy: IExecutionStrategy, + private readonly cacheProvider: ICacheProvider + ) {} + + async execute( + step: TaskStep, + context: TContext, + signal?: AbortSignal + ): Promise { + if (!step.cache) { + return this.innerStrategy.execute(step, context, signal); + } + + const key = await step.cache.key(context); + const cachedResult = await this.cacheProvider.get(key); + + if (cachedResult !== undefined) { + if (step.cache.restore) { + await step.cache.restore(context, cachedResult); + } + return { + ...cachedResult, + status: "skipped", + }; + } + + const result = await this.innerStrategy.execute(step, context, signal); + + if (result.status === "success") { + await this.cacheProvider.set(key, result, step.cache.ttl); + } + + return result; + } +} diff --git a/src/utils/MemoryCacheProvider.ts b/src/utils/MemoryCacheProvider.ts new file mode 100644 index 0000000..6517941 --- /dev/null +++ b/src/utils/MemoryCacheProvider.ts @@ -0,0 +1,32 @@ +import { ICacheProvider } from "../contracts/ICacheProvider.js"; +import { TaskResult } from "../TaskResult.js"; + +/** + * A default in-memory implementation of ICacheProvider. + */ +export class MemoryCacheProvider implements ICacheProvider { + private cache = new Map(); + + async get(key: string): Promise { + const entry = this.cache.get(key); + if (!entry) { + return undefined; + } + + if (entry.expiry !== null && Date.now() > entry.expiry) { + this.cache.delete(key); + return undefined; + } + + return entry.value; + } + + async set(key: string, value: TaskResult, ttl?: number): Promise { + const expiry = ttl !== undefined ? Date.now() + ttl : null; + this.cache.set(key, { value, expiry }); + } + + async delete(key: string): Promise { + this.cache.delete(key); + } +} diff --git a/tests/CachingExecutionStrategy.test.ts b/tests/CachingExecutionStrategy.test.ts new file mode 100644 index 0000000..91f89de --- /dev/null +++ b/tests/CachingExecutionStrategy.test.ts @@ -0,0 +1,152 @@ +import { describe, it, expect, vi } from "vitest"; +import { CachingExecutionStrategy } from "../src/strategies/CachingExecutionStrategy.js"; +import { IExecutionStrategy } from "../src/strategies/IExecutionStrategy.js"; +import { TaskStep } from "../src/TaskStep.js"; +import { MemoryCacheProvider } from "../src/utils/MemoryCacheProvider.js"; + +describe("CachingExecutionStrategy", () => { + it("should execute inner strategy if cache is not configured", async () => { + const mockStrategy: IExecutionStrategy = { + execute: vi.fn().mockResolvedValue({ status: "success" }), + }; + const cacheProvider = new MemoryCacheProvider(); + const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); + + const step: TaskStep = { + name: "task1", + run: vi.fn(), + }; + + const result = await cachingStrategy.execute(step, {}); + + expect(mockStrategy.execute).toHaveBeenCalled(); + expect(result.status).toBe("success"); + }); + + it("should cache successful execution results", async () => { + const mockStrategy: IExecutionStrategy = { + execute: vi.fn().mockResolvedValue({ status: "success", data: "result_data" }), + }; + const cacheProvider = new MemoryCacheProvider(); + const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); + + const step: TaskStep = { + name: "task1", + cache: { + key: () => "my_cache_key", + }, + run: vi.fn(), + }; + + const result = await cachingStrategy.execute(step, {}); + + expect(mockStrategy.execute).toHaveBeenCalledTimes(1); + expect(result.status).toBe("success"); + + const cached = await cacheProvider.get("my_cache_key"); + expect(cached).toBeDefined(); + expect(cached?.data).toBe("result_data"); + }); + + it("should not cache failed execution results", async () => { + const mockStrategy: IExecutionStrategy = { + execute: vi.fn().mockResolvedValue({ status: "failure", error: "failed" }), + }; + const cacheProvider = new MemoryCacheProvider(); + const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); + + const step: TaskStep = { + name: "task1", + cache: { + key: () => "my_cache_key", + }, + run: vi.fn(), + }; + + const result = await cachingStrategy.execute(step, {}); + + expect(mockStrategy.execute).toHaveBeenCalledTimes(1); + expect(result.status).toBe("failure"); + + const cached = await cacheProvider.get("my_cache_key"); + expect(cached).toBeUndefined(); + }); + + it("should return cached result with status 'skipped' on cache hit with restore function", async () => { + const mockStrategy: IExecutionStrategy = { + execute: vi.fn(), + }; + const cacheProvider = new MemoryCacheProvider(); + await cacheProvider.set("my_cache_key", { status: "success", data: "cached_data" }); + + const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); + + const restoreFn = vi.fn(); + const step: TaskStep = { + name: "task1", + cache: { + key: () => "my_cache_key", + restore: restoreFn, + }, + run: vi.fn(), + }; + + const context = { state: 1 }; + const result = await cachingStrategy.execute(step, context); + + expect(mockStrategy.execute).not.toHaveBeenCalled(); + expect(result.status).toBe("skipped"); + expect(result.data).toBe("cached_data"); + + // verify restore was called with context and cachedResult + expect(restoreFn).toHaveBeenCalledWith(context, { status: "success", data: "cached_data" }); + }); + + it("should return cached result with status 'skipped' on cache hit without restore function", async () => { + const mockStrategy: IExecutionStrategy = { + execute: vi.fn(), + }; + const cacheProvider = new MemoryCacheProvider(); + await cacheProvider.set("my_cache_key", { status: "success", data: "cached_data" }); + + const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); + + const step: TaskStep = { + name: "task1", + cache: { + key: () => "my_cache_key", + }, + run: vi.fn(), + }; + + const context = { state: 1 }; + const result = await cachingStrategy.execute(step, context); + + expect(mockStrategy.execute).not.toHaveBeenCalled(); + expect(result.status).toBe("skipped"); + expect(result.data).toBe("cached_data"); + }); + + it("should use TTL when caching", async () => { + const mockStrategy: IExecutionStrategy = { + execute: vi.fn().mockResolvedValue({ status: "success", data: "result_data" }), + }; + const cacheProvider = new MemoryCacheProvider(); + vi.spyOn(cacheProvider, "set"); + + const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); + + const step: TaskStep = { + name: "task1", + cache: { + key: () => "my_cache_key", + ttl: 500, + }, + run: vi.fn(), + }; + + await cachingStrategy.execute(step, {}); + + expect(cacheProvider.set).toHaveBeenCalledWith("my_cache_key", { status: "success", data: "result_data" }, 500); + }); +}); diff --git a/tests/MemoryCacheProvider.test.ts b/tests/MemoryCacheProvider.test.ts new file mode 100644 index 0000000..02c44bf --- /dev/null +++ b/tests/MemoryCacheProvider.test.ts @@ -0,0 +1,63 @@ +import { describe, it, expect } from "vitest"; +import { MemoryCacheProvider } from "../src/utils/MemoryCacheProvider.js"; +import { TaskResult } from "../src/TaskResult.js"; + +describe("MemoryCacheProvider", () => { + it("should set and get a cached item", async () => { + const cache = new MemoryCacheProvider(); + const result: TaskResult = { status: "success", data: "test" }; + + await cache.set("key1", result); + const cached = await cache.get("key1"); + + expect(cached).toEqual(result); + }); + + it("should return undefined for a missing item", async () => { + const cache = new MemoryCacheProvider(); + const cached = await cache.get("missing_key"); + + expect(cached).toBeUndefined(); + }); + + it("should delete a cached item", async () => { + const cache = new MemoryCacheProvider(); + const result: TaskResult = { status: "success", data: "test" }; + + await cache.set("key1", result); + await cache.delete("key1"); + const cached = await cache.get("key1"); + + expect(cached).toBeUndefined(); + }); + + it("should expire an item based on TTL", async () => { + const cache = new MemoryCacheProvider(); + const result: TaskResult = { status: "success", data: "test" }; + + await cache.set("key1", result, 100); // 100ms TTL + + // Immediately available + let cached = await cache.get("key1"); + expect(cached).toEqual(result); + + // Wait for expiration + await new Promise((resolve) => setTimeout(resolve, 150)); + + cached = await cache.get("key1"); + expect(cached).toBeUndefined(); + }); + + it("should not expire an item if TTL is not provided", async () => { + const cache = new MemoryCacheProvider(); + const result: TaskResult = { status: "success", data: "test" }; + + await cache.set("key1", result); + + // Wait for a little bit + await new Promise((resolve) => setTimeout(resolve, 50)); + + const cached = await cache.get("key1"); + expect(cached).toEqual(result); + }); +}); diff --git a/tests/TaskRunnerCaching.test.ts b/tests/TaskRunnerCaching.test.ts new file mode 100644 index 0000000..0657639 --- /dev/null +++ b/tests/TaskRunnerCaching.test.ts @@ -0,0 +1,59 @@ +import { describe, it, expect, vi } from "vitest"; +import { TaskRunnerBuilder } from "../src/TaskRunnerBuilder.js"; +import { TaskStep } from "../src/TaskStep.js"; +import { MemoryCacheProvider } from "../src/utils/MemoryCacheProvider.js"; + +interface TestContext { + executionCount: number; + restoredData?: string; +} + +describe("TaskRunner Caching Integration", () => { + it("should cache task execution and avoid re-execution on subsequent runs", async () => { + const cacheProvider = new MemoryCacheProvider(); + + // Create first runner, execution should happen + const builder1 = new TaskRunnerBuilder({ executionCount: 0 }); + const runner1 = builder1.build(); + runner1.setCacheProvider(cacheProvider); + + const runSpy = vi.fn().mockImplementation(async (ctx: TestContext) => { + ctx.executionCount++; + return { status: "success", data: "expensive_computation_result" }; + }); + + const step: TaskStep = { + name: "expensiveTask", + cache: { + key: () => "static_key", + restore: (ctx, result) => { + ctx.restoredData = result.data as string; + } + }, + run: runSpy, + }; + + const results1 = await runner1.execute([step]); + + expect(runSpy).toHaveBeenCalledTimes(1); + expect(results1.get("expensiveTask")?.status).toBe("success"); + + // Create second runner with same cache, execution should be skipped + const context2: TestContext = { executionCount: 0 }; + const builder2 = new TaskRunnerBuilder(context2); + const runner2 = builder2.build(); + runner2.setCacheProvider(cacheProvider); + + // Reuse the exact same step config (runSpy will be checked) + const results2 = await runner2.execute([step]); + + // runSpy should NOT be called again + expect(runSpy).toHaveBeenCalledTimes(1); + expect(results2.get("expensiveTask")?.status).toBe("skipped"); + expect(results2.get("expensiveTask")?.data).toBe("expensive_computation_result"); + + // restore should have been called + expect(context2.restoredData).toBe("expensive_computation_result"); + expect(context2.executionCount).toBe(0); // Task logic wasn't executed + }); +});