Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 40 additions & 80 deletions integrationTests/server/thread-management.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@ import { strictEqual } from 'node:assert/strict';

import { startHarper, teardownHarper, type ContextWithHarper } from '@harperfast/integration-testing';

const REQUEST_TIMEOUT_MS = 5000;

function authHeader(ctx: ContextWithHarper) {
return `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`;
}

function opsRequest(ctx: ContextWithHarper, body: string) {
return fetch(ctx.harper.operationsAPIURL, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': authHeader(ctx) },
body,
signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS),
});
}

suite('Thread Management', (ctx: ContextWithHarper) => {
before(async () => {
await startHarper(ctx, { config: {}, env: {} });
Expand All @@ -19,20 +34,10 @@ suite('Thread Management', (ctx: ContextWithHarper) => {
await teardownHarper(ctx);
});

test('server handles concurrent requests across threads', async () => {
// Send multiple concurrent requests to verify thread handling
test('server handles concurrent requests across threads', { timeout: 15000 }, async () => {
const requests = [];
for (let i = 0; i < 20; i++) {
requests.push(
fetch(ctx.harper.operationsAPIURL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`,
},
body: JSON.stringify({ operation: 'describe_all' }),
})
);
requests.push(opsRequest(ctx, JSON.stringify({ operation: 'describe_all' })));
}

const responses = await Promise.all(requests);
Expand All @@ -42,78 +47,33 @@ suite('Thread Management', (ctx: ContextWithHarper) => {
}
});

test('server recovers from malformed requests without affecting subsequent requests', async () => {
// Send multiple malformed requests
const badRequests = [];
for (let i = 0; i < 5; i++) {
badRequests.push(
fetch(ctx.harper.operationsAPIURL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`,
},
body: 'not json',
})
);
}

const badResponses = await Promise.all(badRequests);
for (const response of badResponses) {
strictEqual(response.status, 400);
}
test(
'server recovers from malformed requests without affecting subsequent requests',
{ timeout: 15000 },
async () => {
const badResponses = await Promise.all(Array.from({ length: 5 }, () => opsRequest(ctx, 'not json')));
for (const response of badResponses) {
strictEqual(response.status, 400);
}

// Server should still handle good requests after bad ones
const goodRequests = [];
for (let i = 0; i < 5; i++) {
goodRequests.push(
fetch(ctx.harper.operationsAPIURL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`,
},
body: JSON.stringify({ operation: 'describe_all' }),
})
const goodResponses = await Promise.all(
Array.from({ length: 5 }, () => opsRequest(ctx, JSON.stringify({ operation: 'describe_all' })))
);
}

const goodResponses = await Promise.all(goodRequests);
for (const response of goodResponses) {
strictEqual(response.status, 200, 'Server should recover and handle valid requests');
}
});

test('server handles mixed concurrent valid and invalid requests', async () => {
// Mix of good and bad requests simultaneously
const requests = [];
for (let i = 0; i < 20; i++) {
if (i % 3 === 0) {
// Bad request
requests.push(
fetch(ctx.harper.operationsAPIURL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`,
},
body: 'invalid json',
}).then((r) => ({ status: r.status, expected: 400 }))
);
} else {
// Good request
requests.push(
fetch(ctx.harper.operationsAPIURL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Basic ${Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64')}`,
},
body: JSON.stringify({ operation: 'describe_all' }),
}).then((r) => ({ status: r.status, expected: 200 }))
);
for (const response of goodResponses) {
strictEqual(response.status, 200, 'Server should recover and handle valid requests');
}
}
);

test('server handles mixed concurrent valid and invalid requests', { timeout: 15000 }, async () => {
const requests = Array.from({ length: 20 }, (_, i) =>
i % 3 === 0
? opsRequest(ctx, 'invalid json').then((r) => ({ status: r.status, expected: 400 }))
: opsRequest(ctx, JSON.stringify({ operation: 'describe_all' })).then((r) => ({
status: r.status,
expected: 200,
}))
);

const results = await Promise.all(requests);

Expand Down
6 changes: 6 additions & 0 deletions unitTests/resources/subscriptionReplay.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ describe('Subscription replay', () => {
})();
const events = await collect(subscription, 100);
await concurrentWrites;
// All writes have committed; wait for any pending subscription events to flush
// before closing. Without this, the last write's event can be queued but not
// yet delivered when return() closes the stream.
await delay(200);
subscription.return?.();

const ids = new Set(events.map((e) => e.id));
Expand Down Expand Up @@ -180,6 +184,7 @@ describe('Subscription replay', () => {
})();
const events = await collect(subscription, 150);
await concurrentWrites;
await delay(200);
subscription.return?.();

// concurrent writes should arrive
Expand Down Expand Up @@ -236,6 +241,7 @@ describe('Subscription replay', () => {
})();
const events = await collect(subscription, 150);
await concurrentWrites;
await delay(200);
subscription.return?.();

// every updated key MUST be delivered with its final value at least once, even if
Expand Down
Loading