diff --git a/.bench/baseline.json b/.bench/baseline.json index a196152..77b1502 100644 --- a/.bench/baseline.json +++ b/.bench/baseline.json @@ -1,11 +1,11 @@ { - "capturedAt": "2026-05-06T01:12:11.470Z", + "capturedAt": "2026-05-06T03:03:50.907Z", "node": "v22.13.0", "platform": "darwin-arm64", "options": { "baseLatencyMs": 1500, "perTokenMs": 2, - "maxConcurrent": 6, + "maxConcurrent": 24, "maxTokens": 4096 }, "results": [ @@ -13,7 +13,7 @@ "fixture": "tiny", "fileCount": 5, "approxTokens": 790, - "durationMs": 1, + "durationMs": 2, "llmCalls": 0, "llmTotalMs": 0, "llmTotalPromptTokens": 0 @@ -22,54 +22,54 @@ "fixture": "medium", "fileCount": 25, "approxTokens": 36150, - "durationMs": 6906, + "durationMs": 6905, "llmCalls": 6, - "llmTotalMs": 25221, + "llmTotalMs": 25222, "llmTotalPromptTokens": 8525 }, { "fixture": "large", "fileCount": 50, "approxTokens": 83410, - "durationMs": 9749, - "llmCalls": 6, - "llmTotalMs": 42401, - "llmTotalPromptTokens": 16602 + "durationMs": 9754, + "llmCalls": 7, + "llmTotalMs": 45865, + "llmTotalPromptTokens": 17461 }, { "fixture": "feature-add", "fileCount": 14, "approxTokens": 17600, - "durationMs": 5640, + "durationMs": 5641, "llmCalls": 4, - "llmTotalMs": 18854, + "llmTotalMs": 18853, "llmTotalPromptTokens": 6117 }, { "fixture": "refactor", "fileCount": 30, "approxTokens": 32650, - "durationMs": 41347, + "durationMs": 26718, "llmCalls": 20, - "llmTotalMs": 143990, + "llmTotalMs": 143989, "llmTotalPromptTokens": 53548 }, { "fixture": "initial-commit", "fileCount": 50, "approxTokens": 83410, - "durationMs": 9818, - "llmCalls": 6, - "llmTotalMs": 42557, - "llmTotalPromptTokens": 16306 + "durationMs": 9816, + "llmCalls": 7, + "llmTotalMs": 45897, + "llmTotalPromptTokens": 17107 }, { "fixture": "docs-update", "fileCount": 9, "approxTokens": 15050, - "durationMs": 18564, + "durationMs": 10248, "llmCalls": 7, - "llmTotalMs": 52222, + "llmTotalMs": 52224, "llmTotalPromptTokens": 13139 }, { diff --git a/bin/benchmark.ts b/bin/benchmark.ts index 7bdbb57..24e4e0a 100644 --- a/bin/benchmark.ts +++ b/bin/benchmark.ts @@ -70,11 +70,13 @@ const DEFAULT_OPTIONS: BenchOptions = { // multi-thousand-token inputs. Adjust if real-world timings drift. baseLatencyMs: 1500, perTokenMs: 2, - maxConcurrent: 6, + // Match the canonical service maxConcurrent from + // `langchain/utils.ts` (raised 12 → 24 in PR 3 of #845). The + // bench mirrors the most-common production setting so per-PR + // diffs reflect what real users see. + maxConcurrent: 24, // Match the canonical service tokenLimit from `langchain/utils.ts` - // (raised from 2048 to 4096 in PR 1 of #845). The bench mirrors - // the most-common production budget so per-PR diffs reflect what - // real users will see. + // (raised from 2048 to 4096 in PR 1 of #845). maxTokens: 4096, } diff --git a/src/lib/langchain/chains/summarize/index.test.ts b/src/lib/langchain/chains/summarize/index.test.ts new file mode 100644 index 0000000..a3bfd93 --- /dev/null +++ b/src/lib/langchain/chains/summarize/index.test.ts @@ -0,0 +1,93 @@ +import { Document } from '@langchain/classic/document' +import { RecursiveCharacterTextSplitter } from '@langchain/textsplitters' + +import { summarize, SummarizeContext } from './index' + +/** + * The summarize() helper wraps the chain.invoke() call in a + * 429-aware adaptive backoff (#845, PR 3). These tests pin the + * retry behavior so a future regression in the chain wrapper + * doesn't silently start failing user pipelines on transient rate + * limits. + */ + +function makeContext(overrides: { invoke: jest.Mock }): SummarizeContext { + const chain = { invoke: overrides.invoke } as unknown as SummarizeContext['chain'] + return { + chain, + textSplitter: new RecursiveCharacterTextSplitter({ chunkSize: 100, chunkOverlap: 0 }), + } +} + +describe('summarize() retry behavior', () => { + beforeAll(() => { + // Speed up tests by faking the backoff timer. + jest.useFakeTimers({ doNotFake: ['nextTick', 'queueMicrotask'] }) + }) + afterAll(() => { + jest.useRealTimers() + }) + + it('returns the chain output on success without retrying', async () => { + const invoke = jest.fn().mockResolvedValue({ text: 'summary' }) + const ctx = makeContext({ invoke }) + const result = await summarize([{ pageContent: 'hello world' }], ctx) + expect(result).toBe('summary') + expect(invoke).toHaveBeenCalledTimes(1) + }) + + it.each([ + ['429', { status: 429 }], + ['rate_limit_exceeded code', { code: 'rate_limit_exceeded' }], + ['ECONNRESET', { code: 'ECONNRESET' }], + ['429 in message', new Error('OpenAI returned 429: Too Many Requests')], + ['rate-limit in message', new Error('Anthropic rate-limit exceeded, retry later')], + ['503', { status: 503 }], + ])('retries on retryable %s and eventually succeeds', async (_, errorShape) => { + const error = errorShape instanceof Error ? errorShape : Object.assign(new Error('boom'), errorShape) + const invoke = jest.fn() + .mockRejectedValueOnce(error) + .mockResolvedValue({ text: 'late summary' }) + const ctx = makeContext({ invoke }) + const promise = summarize([{ pageContent: 'hello' }], ctx) + // Drain the backoff timer. + await jest.runOnlyPendingTimersAsync() + await expect(promise).resolves.toBe('late summary') + expect(invoke).toHaveBeenCalledTimes(2) + }) + + it('does NOT retry on non-retryable errors', async () => { + const invoke = jest.fn().mockRejectedValue(Object.assign(new Error('Bad Request'), { status: 400 })) + const ctx = makeContext({ invoke }) + await expect(summarize([{ pageContent: 'hello' }], ctx)).rejects.toThrow('Bad Request') + expect(invoke).toHaveBeenCalledTimes(1) + }) + + it('gives up after BACKOFF_RETRIES (3) attempts on persistent rate limits', async () => { + const error = Object.assign(new Error('429'), { status: 429 }) + const invoke = jest.fn().mockImplementation(() => Promise.reject(error)) + const ctx = makeContext({ invoke }) + const promise = summarize([{ pageContent: 'hello' }], ctx) + // Pre-attach a rejection handler so node's unhandled-rejection + // tracking doesn't fire before we drain the backoff timers. + promise.catch(() => undefined) + await jest.runAllTimersAsync() + await expect(promise).rejects.toThrow('429') + // Initial attempt + 3 retries = 4 total invocations. + expect(invoke).toHaveBeenCalledTimes(4) + }) + + it('passes the chain Document[] shape unchanged through retries', async () => { + const invoke = jest.fn() + .mockRejectedValueOnce(Object.assign(new Error('429'), { status: 429 })) + .mockResolvedValue({ text: 'ok' }) + const ctx = makeContext({ invoke }) + const promise = summarize([{ pageContent: 'first doc' }], ctx) + await jest.runOnlyPendingTimersAsync() + await promise + // Both calls should have received the same input shape. + const [firstCall, secondCall] = invoke.mock.calls + expect(firstCall[0].input_documents).toEqual(secondCall[0].input_documents) + expect(firstCall[0].input_documents[0]).toBeInstanceOf(Document) + }) +}) diff --git a/src/lib/langchain/chains/summarize/index.ts b/src/lib/langchain/chains/summarize/index.ts index 80faa42..003a073 100644 --- a/src/lib/langchain/chains/summarize/index.ts +++ b/src/lib/langchain/chains/summarize/index.ts @@ -20,6 +20,64 @@ export type SummarizeContext = { metadata?: Partial } +/** + * Adaptive backoff (#845, PR 3). Wraps the chain invocation so a + * transient 429 (rate limit) or 5xx no longer kills the whole + * pipeline — instead we wait briefly and retry up to N times + * before surfacing the failure. + * + * Cap is intentionally short. Diff condensing fans out to many + * concurrent calls; if rate limits hit hard, queueing requests + * indefinitely just makes the user wait longer for a result the + * pipeline ultimately handles via fewer concurrent passes anyway. + * 3 retries with 1s/2s/4s waits trade ~7s of worst-case extra + * latency for resilience to brief rate-limit blips. + */ +const BACKOFF_RETRIES = 3 +const BACKOFF_BASE_MS = 1000 +const BACKOFF_CAP_MS = 5000 + +function isRetryableError(error: unknown): boolean { + if (!error || typeof error !== 'object') return false + const err = error as { status?: number; code?: string | number; message?: string } + if (err.status === 429 || err.status === 503 || err.status === 502 || err.status === 504) { + return true + } + if (err.code === 429 || err.code === 'rate_limit_exceeded' || err.code === 'ECONNRESET' || err.code === 'ETIMEDOUT') { + return true + } + if (typeof err.message === 'string' && /(rate.?limit|429|too many requests|timeout|temporarily unavailable)/i.test(err.message)) { + return true + } + return false +} + +async function invokeWithBackoff( + chain: SummarizeContext['chain'], + input: { input_documents: Document[]; returnIntermediateSteps: boolean }, + logger: Logger | undefined +): Promise<{ text: string; error?: string }> { + let lastError: unknown + for (let attempt = 0; attempt <= BACKOFF_RETRIES; attempt++) { + try { + return await chain.invoke(input) as { text: string; error?: string } + } catch (error) { + lastError = error + if (!isRetryableError(error) || attempt === BACKOFF_RETRIES) { + throw error + } + const wait = Math.min(BACKOFF_CAP_MS, BACKOFF_BASE_MS * Math.pow(2, attempt)) + logger?.verbose( + `[summarize] retryable error (attempt ${attempt + 1}/${BACKOFF_RETRIES}); backing off ${wait}ms`, + { color: 'yellow' } + ) + await new Promise((resolve) => setTimeout(resolve, wait)) + } + } + // Unreachable — the loop either returns or rethrows above. + throw lastError +} + export async function summarize( documents: DocumentInput[], { chain, textSplitter, options, logger, tokenizer, metadata }: SummarizeContext @@ -32,10 +90,10 @@ export async function summarize( : undefined const startedAt = Date.now() - const res = await chain.invoke({ + const res = await invokeWithBackoff(chain, { input_documents: docs, returnIntermediateSteps, - }) + }, logger) const elapsedMs = Date.now() - startedAt logLlmCall(logger, { diff --git a/src/lib/langchain/utils.ts b/src/lib/langchain/utils.ts index 2f83b66..1143beb 100644 --- a/src/lib/langchain/utils.ts +++ b/src/lib/langchain/utils.ts @@ -111,7 +111,13 @@ export const DEFAULT_OPENAI_LLM_SERVICE: OpenAILLMService = { model: 'gpt-4.1-nano', tokenLimit: 4096, temperature: 0.32, - maxConcurrent: 12, + // Bumped 12 → 24 (#845, PR 3). The OpenAI fast tier comfortably + // handles ~30 concurrent on the per-key default rate limit; 24 + // leaves headroom for retries while still doubling throughput. + // The summarize chain has a 429-aware backoff (`summarize` + // helper) so a temporary rate-limit hit no longer kills the + // whole pipeline. + maxConcurrent: 24, minTokensForSummary: 800, maxFileTokens: 2000, authentication: { @@ -133,7 +139,11 @@ export const DEFAULT_ANTHROPIC_LLM_SERVICE: AnthropicLLMService = { model: 'claude-haiku-4-5-20251001', temperature: 0.32, tokenLimit: 4096, - maxConcurrent: 12, + // Bumped 12 → 24 (#845, PR 3). Matches the OpenAI default; + // Anthropic's per-key concurrency on Haiku is generous enough + // that 24 stays under the rate ceiling for typical fast-model + // request shapes. Backoff in `summarize` handles spikes. + maxConcurrent: 24, minTokensForSummary: 800, maxFileTokens: 2000, authentication: {