diff --git a/integrationTests/server/thread-management.test.ts b/integrationTests/server/thread-management.test.ts index 66d08bf40..2e7c022c9 100644 --- a/integrationTests/server/thread-management.test.ts +++ b/integrationTests/server/thread-management.test.ts @@ -10,6 +10,21 @@ import { strictEqual } from 'node:assert/strict'; import { startHarper, teardownHarper, type ContextWithHarper } from '@harperfast/integration-testing'; +const REQUEST_TIMEOUT_MS = 5000; + +function authHeader(ctx: ContextWithHarper) { + return `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`; +} + +function opsRequest(ctx: ContextWithHarper, body: string) { + return fetch(ctx.harper.operationsAPIURL, { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'Authorization': authHeader(ctx) }, + body, + signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS), + }); +} + suite('Thread Management', (ctx: ContextWithHarper) => { before(async () => { await startHarper(ctx, { config: {}, env: {} }); @@ -19,20 +34,10 @@ suite('Thread Management', (ctx: ContextWithHarper) => { await teardownHarper(ctx); }); - test('server handles concurrent requests across threads', async () => { - // Send multiple concurrent requests to verify thread handling + test('server handles concurrent requests across threads', { timeout: 15000 }, async () => { const requests = []; for (let i = 0; i < 20; i++) { - requests.push( - fetch(ctx.harper.operationsAPIURL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`, - }, - body: JSON.stringify({ operation: 'describe_all' }), - }) - ); + requests.push(opsRequest(ctx, JSON.stringify({ operation: 'describe_all' }))); } const responses = await Promise.all(requests); @@ -42,78 +47,33 @@ suite('Thread Management', (ctx: ContextWithHarper) => { } }); - test('server recovers from malformed requests without affecting subsequent requests', async () => { - // Send multiple malformed requests - const badRequests = []; - for (let i = 0; i < 5; i++) { - badRequests.push( - fetch(ctx.harper.operationsAPIURL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`, - }, - body: 'not json', - }) - ); - } - - const badResponses = await Promise.all(badRequests); - for (const response of badResponses) { - strictEqual(response.status, 400); - } + test( + 'server recovers from malformed requests without affecting subsequent requests', + { timeout: 15000 }, + async () => { + const badResponses = await Promise.all(Array.from({ length: 5 }, () => opsRequest(ctx, 'not json'))); + for (const response of badResponses) { + strictEqual(response.status, 400); + } - // Server should still handle good requests after bad ones - const goodRequests = []; - for (let i = 0; i < 5; i++) { - goodRequests.push( - fetch(ctx.harper.operationsAPIURL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`, - }, - body: JSON.stringify({ operation: 'describe_all' }), - }) + const goodResponses = await Promise.all( + Array.from({ length: 5 }, () => opsRequest(ctx, JSON.stringify({ operation: 'describe_all' }))) ); - } - - const goodResponses = await Promise.all(goodRequests); - for (const response of goodResponses) { - strictEqual(response.status, 200, 'Server should recover and handle valid requests'); - } - }); - - test('server handles mixed concurrent valid and invalid requests', async () => { - // Mix of good and bad requests simultaneously - const requests = []; - for (let i = 0; i < 20; i++) { - if (i % 3 === 0) { - // Bad request - requests.push( - fetch(ctx.harper.operationsAPIURL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`, - }, - body: 'invalid json', - }).then((r) => ({ status: r.status, expected: 400 })) - ); - } else { - // Good request - requests.push( - fetch(ctx.harper.operationsAPIURL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`, - }, - body: JSON.stringify({ operation: 'describe_all' }), - }).then((r) => ({ status: r.status, expected: 200 })) - ); + for (const response of goodResponses) { + strictEqual(response.status, 200, 'Server should recover and handle valid requests'); } } + ); + + test('server handles mixed concurrent valid and invalid requests', { timeout: 15000 }, async () => { + const requests = Array.from({ length: 20 }, (_, i) => + i % 3 === 0 + ? opsRequest(ctx, 'invalid json').then((r) => ({ status: r.status, expected: 400 })) + : opsRequest(ctx, JSON.stringify({ operation: 'describe_all' })).then((r) => ({ + status: r.status, + expected: 200, + })) + ); const results = await Promise.all(requests); diff --git a/unitTests/resources/subscriptionReplay.test.js b/unitTests/resources/subscriptionReplay.test.js index 0d6630e36..99d94d39b 100644 --- a/unitTests/resources/subscriptionReplay.test.js +++ b/unitTests/resources/subscriptionReplay.test.js @@ -127,6 +127,10 @@ describe('Subscription replay', () => { })(); const events = await collect(subscription, 100); await concurrentWrites; + // All writes have committed; wait for any pending subscription events to flush + // before closing. Without this, the last write's event can be queued but not + // yet delivered when return() closes the stream. + await delay(200); subscription.return?.(); const ids = new Set(events.map((e) => e.id)); @@ -180,6 +184,7 @@ describe('Subscription replay', () => { })(); const events = await collect(subscription, 150); await concurrentWrites; + await delay(200); subscription.return?.(); // concurrent writes should arrive @@ -236,6 +241,7 @@ describe('Subscription replay', () => { })(); const events = await collect(subscription, 150); await concurrentWrites; + await delay(200); subscription.return?.(); // every updated key MUST be delivered with its final value at least once, even if