-
Notifications
You must be signed in to change notification settings - Fork 1
feat: implement task output caching strategy #245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| import { TaskResult } from "../TaskResult.js"; | ||
|
|
||
| /** | ||
| * Interface for task output cache providers. | ||
| */ | ||
| export interface ICacheProvider { | ||
| /** | ||
| * Retrieves a cached result by its key. | ||
| * @param key The unique cache key. | ||
| * @returns A promise resolving to the cached TaskResult, or undefined if not found. | ||
| */ | ||
| get(key: string): Promise<TaskResult | undefined>; | ||
|
|
||
| /** | ||
| * Stores a task result in the cache. | ||
| * @param key The unique cache key. | ||
| * @param value The task result to cache. | ||
| * @param ttl Optional time-to-live in milliseconds. | ||
| * @returns A promise that resolves when the cache is updated. | ||
| */ | ||
| set(key: string, value: TaskResult, ttl?: number): Promise<void>; | ||
|
|
||
| /** | ||
| * Deletes a cached result by its key. | ||
| * @param key The unique cache key. | ||
| * @returns A promise that resolves when the key is deleted. | ||
| */ | ||
| delete(key: string): Promise<void>; | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,45 @@ | ||||||||||||||||||||
| import { IExecutionStrategy } from "./IExecutionStrategy.js"; | ||||||||||||||||||||
| import { TaskStep } from "../TaskStep.js"; | ||||||||||||||||||||
| import { TaskResult } from "../TaskResult.js"; | ||||||||||||||||||||
| import { ICacheProvider } from "../contracts/ICacheProvider.js"; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * An execution strategy that caches task results. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| export class CachingExecutionStrategy<TContext> implements IExecutionStrategy<TContext> { | ||||||||||||||||||||
| constructor( | ||||||||||||||||||||
| private readonly innerStrategy: IExecutionStrategy<TContext>, | ||||||||||||||||||||
| private readonly cacheProvider: ICacheProvider | ||||||||||||||||||||
| ) {} | ||||||||||||||||||||
|
Comment on lines
+10
to
+13
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To prevent dry run results from polluting the cache, consider adding a
Suggested change
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| async execute( | ||||||||||||||||||||
| step: TaskStep<TContext>, | ||||||||||||||||||||
| context: TContext, | ||||||||||||||||||||
| signal?: AbortSignal | ||||||||||||||||||||
| ): Promise<TaskResult> { | ||||||||||||||||||||
| if (!step.cache) { | ||||||||||||||||||||
| return this.innerStrategy.execute(step, context, signal); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const key = await step.cache.key(context); | ||||||||||||||||||||
| const cachedResult = await this.cacheProvider.get(key); | ||||||||||||||||||||
|
Comment on lines
+24
to
+25
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cache retrieval operations are currently not protected against failures. If the const key = await step.cache.key(context);
let cachedResult: TaskResult | undefined;
try {
cachedResult = await this.cacheProvider.get(key);
} catch {
// Fallback to execution if cache retrieval fails
} |
||||||||||||||||||||
|
|
||||||||||||||||||||
| if (cachedResult !== undefined) { | ||||||||||||||||||||
| if (step.cache.restore) { | ||||||||||||||||||||
| await step.cache.restore(context, cachedResult); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| return { | ||||||||||||||||||||
| ...cachedResult, | ||||||||||||||||||||
| status: "skipped", | ||||||||||||||||||||
| }; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const result = await this.innerStrategy.execute(step, context, signal); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if (result.status === "success") { | ||||||||||||||||||||
| await this.cacheProvider.set(key, result, step.cache.ttl); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
Comment on lines
+39
to
+41
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure that cache writes are skipped when the strategy is in if (result.status === "success" && !this.readOnly) {
try {
await this.cacheProvider.set(key, result, step.cache.ttl);
} catch {
// Ignore cache storage failures
}
} |
||||||||||||||||||||
|
|
||||||||||||||||||||
| return result; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| import { ICacheProvider } from "../contracts/ICacheProvider.js"; | ||
| import { TaskResult } from "../TaskResult.js"; | ||
|
|
||
| /** | ||
| * A default in-memory implementation of ICacheProvider. | ||
| */ | ||
| export class MemoryCacheProvider implements ICacheProvider { | ||
| private cache = new Map<string, { value: TaskResult; expiry: number | null }>(); | ||
|
Check warning on line 8 in src/utils/MemoryCacheProvider.ts
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
|
||
| async get(key: string): Promise<TaskResult | undefined> { | ||
| const entry = this.cache.get(key); | ||
| if (!entry) { | ||
| return undefined; | ||
| } | ||
|
|
||
| if (entry.expiry !== null && Date.now() > entry.expiry) { | ||
| this.cache.delete(key); | ||
| return undefined; | ||
| } | ||
|
|
||
| return entry.value; | ||
| } | ||
|
|
||
| async set(key: string, value: TaskResult, ttl?: number): Promise<void> { | ||
| const expiry = ttl !== undefined ? Date.now() + ttl : null; | ||
|
Check warning on line 25 in src/utils/MemoryCacheProvider.ts
|
||
| this.cache.set(key, { value, expiry }); | ||
| } | ||
|
|
||
| async delete(key: string): Promise<void> { | ||
| this.cache.delete(key); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,152 @@ | ||
| import { describe, it, expect, vi } from "vitest"; | ||
| import { CachingExecutionStrategy } from "../src/strategies/CachingExecutionStrategy.js"; | ||
| import { IExecutionStrategy } from "../src/strategies/IExecutionStrategy.js"; | ||
| import { TaskStep } from "../src/TaskStep.js"; | ||
| import { MemoryCacheProvider } from "../src/utils/MemoryCacheProvider.js"; | ||
|
|
||
| describe("CachingExecutionStrategy", () => { | ||
| it("should execute inner strategy if cache is not configured", async () => { | ||
| const mockStrategy: IExecutionStrategy<unknown> = { | ||
| execute: vi.fn().mockResolvedValue({ status: "success" }), | ||
| }; | ||
| const cacheProvider = new MemoryCacheProvider(); | ||
| const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); | ||
|
|
||
| const step: TaskStep<unknown> = { | ||
| name: "task1", | ||
| run: vi.fn(), | ||
| }; | ||
|
|
||
| const result = await cachingStrategy.execute(step, {}); | ||
|
|
||
| expect(mockStrategy.execute).toHaveBeenCalled(); | ||
| expect(result.status).toBe("success"); | ||
| }); | ||
|
|
||
| it("should cache successful execution results", async () => { | ||
| const mockStrategy: IExecutionStrategy<unknown> = { | ||
| execute: vi.fn().mockResolvedValue({ status: "success", data: "result_data" }), | ||
| }; | ||
| const cacheProvider = new MemoryCacheProvider(); | ||
| const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); | ||
|
|
||
| const step: TaskStep<unknown> = { | ||
| name: "task1", | ||
| cache: { | ||
| key: () => "my_cache_key", | ||
| }, | ||
| run: vi.fn(), | ||
| }; | ||
|
|
||
| const result = await cachingStrategy.execute(step, {}); | ||
|
|
||
| expect(mockStrategy.execute).toHaveBeenCalledTimes(1); | ||
| expect(result.status).toBe("success"); | ||
|
|
||
| const cached = await cacheProvider.get("my_cache_key"); | ||
| expect(cached).toBeDefined(); | ||
| expect(cached?.data).toBe("result_data"); | ||
| }); | ||
|
|
||
| it("should not cache failed execution results", async () => { | ||
| const mockStrategy: IExecutionStrategy<unknown> = { | ||
| execute: vi.fn().mockResolvedValue({ status: "failure", error: "failed" }), | ||
| }; | ||
| const cacheProvider = new MemoryCacheProvider(); | ||
| const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); | ||
|
|
||
| const step: TaskStep<unknown> = { | ||
| name: "task1", | ||
| cache: { | ||
| key: () => "my_cache_key", | ||
| }, | ||
| run: vi.fn(), | ||
| }; | ||
|
|
||
| const result = await cachingStrategy.execute(step, {}); | ||
|
|
||
| expect(mockStrategy.execute).toHaveBeenCalledTimes(1); | ||
| expect(result.status).toBe("failure"); | ||
|
|
||
| const cached = await cacheProvider.get("my_cache_key"); | ||
| expect(cached).toBeUndefined(); | ||
| }); | ||
|
|
||
| it("should return cached result with status 'skipped' on cache hit with restore function", async () => { | ||
| const mockStrategy: IExecutionStrategy<unknown> = { | ||
| execute: vi.fn(), | ||
| }; | ||
| const cacheProvider = new MemoryCacheProvider(); | ||
| await cacheProvider.set("my_cache_key", { status: "success", data: "cached_data" }); | ||
|
|
||
| const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); | ||
|
|
||
| const restoreFn = vi.fn(); | ||
| const step: TaskStep<unknown> = { | ||
| name: "task1", | ||
| cache: { | ||
| key: () => "my_cache_key", | ||
| restore: restoreFn, | ||
| }, | ||
| run: vi.fn(), | ||
| }; | ||
|
|
||
| const context = { state: 1 }; | ||
| const result = await cachingStrategy.execute(step, context); | ||
|
|
||
| expect(mockStrategy.execute).not.toHaveBeenCalled(); | ||
| expect(result.status).toBe("skipped"); | ||
| expect(result.data).toBe("cached_data"); | ||
|
|
||
| // verify restore was called with context and cachedResult | ||
| expect(restoreFn).toHaveBeenCalledWith(context, { status: "success", data: "cached_data" }); | ||
| }); | ||
|
|
||
| it("should return cached result with status 'skipped' on cache hit without restore function", async () => { | ||
| const mockStrategy: IExecutionStrategy<unknown> = { | ||
| execute: vi.fn(), | ||
| }; | ||
| const cacheProvider = new MemoryCacheProvider(); | ||
| await cacheProvider.set("my_cache_key", { status: "success", data: "cached_data" }); | ||
|
|
||
| const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); | ||
|
|
||
| const step: TaskStep<unknown> = { | ||
| name: "task1", | ||
| cache: { | ||
| key: () => "my_cache_key", | ||
| }, | ||
| run: vi.fn(), | ||
| }; | ||
|
|
||
| const context = { state: 1 }; | ||
| const result = await cachingStrategy.execute(step, context); | ||
|
|
||
| expect(mockStrategy.execute).not.toHaveBeenCalled(); | ||
| expect(result.status).toBe("skipped"); | ||
| expect(result.data).toBe("cached_data"); | ||
| }); | ||
|
|
||
| it("should use TTL when caching", async () => { | ||
| const mockStrategy: IExecutionStrategy<unknown> = { | ||
| execute: vi.fn().mockResolvedValue({ status: "success", data: "result_data" }), | ||
| }; | ||
| const cacheProvider = new MemoryCacheProvider(); | ||
| vi.spyOn(cacheProvider, "set"); | ||
|
|
||
| const cachingStrategy = new CachingExecutionStrategy(mockStrategy, cacheProvider); | ||
|
|
||
| const step: TaskStep<unknown> = { | ||
| name: "task1", | ||
| cache: { | ||
| key: () => "my_cache_key", | ||
| ttl: 500, | ||
| }, | ||
| run: vi.fn(), | ||
| }; | ||
|
|
||
| await cachingStrategy.execute(step, {}); | ||
|
|
||
| expect(cacheProvider.set).toHaveBeenCalledWith("my_cache_key", { status: "success", data: "result_data" }, 500); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| import { describe, it, expect } from "vitest"; | ||
| import { MemoryCacheProvider } from "../src/utils/MemoryCacheProvider.js"; | ||
| import { TaskResult } from "../src/TaskResult.js"; | ||
|
|
||
| describe("MemoryCacheProvider", () => { | ||
| it("should set and get a cached item", async () => { | ||
| const cache = new MemoryCacheProvider(); | ||
| const result: TaskResult = { status: "success", data: "test" }; | ||
|
|
||
| await cache.set("key1", result); | ||
| const cached = await cache.get("key1"); | ||
|
|
||
| expect(cached).toEqual(result); | ||
| }); | ||
|
|
||
| it("should return undefined for a missing item", async () => { | ||
| const cache = new MemoryCacheProvider(); | ||
| const cached = await cache.get("missing_key"); | ||
|
|
||
| expect(cached).toBeUndefined(); | ||
| }); | ||
|
|
||
| it("should delete a cached item", async () => { | ||
| const cache = new MemoryCacheProvider(); | ||
| const result: TaskResult = { status: "success", data: "test" }; | ||
|
|
||
| await cache.set("key1", result); | ||
| await cache.delete("key1"); | ||
| const cached = await cache.get("key1"); | ||
|
|
||
| expect(cached).toBeUndefined(); | ||
| }); | ||
|
|
||
| it("should expire an item based on TTL", async () => { | ||
| const cache = new MemoryCacheProvider(); | ||
| const result: TaskResult = { status: "success", data: "test" }; | ||
|
|
||
| await cache.set("key1", result, 100); // 100ms TTL | ||
|
|
||
| // Immediately available | ||
| let cached = await cache.get("key1"); | ||
| expect(cached).toEqual(result); | ||
|
|
||
| // Wait for expiration | ||
| await new Promise((resolve) => setTimeout(resolve, 150)); | ||
|
|
||
| cached = await cache.get("key1"); | ||
| expect(cached).toBeUndefined(); | ||
| }); | ||
|
|
||
| it("should not expire an item if TTL is not provided", async () => { | ||
| const cache = new MemoryCacheProvider(); | ||
| const result: TaskResult = { status: "success", data: "test" }; | ||
|
|
||
| await cache.set("key1", result); | ||
|
|
||
| // Wait for a little bit | ||
| await new Promise((resolve) => setTimeout(resolve, 50)); | ||
|
|
||
| const cached = await cache.get("key1"); | ||
| expect(cached).toEqual(result); | ||
| }); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
CachingExecutionStrategyis applied even during a dry run. This causes the results of theDryRunExecutionStrategy(which are typically placeholders or mock data) to be persisted in the cache, potentially corrupting it for subsequent real executions. Consider passing a flag to the strategy to disable cache writes during dry runs.