Skip to content

Commit 9306dfe

Browse files
pavkamroncohen
authored andcommitted
feat(node-sdk): add configurable exit event flushing (#311)
This PR Introduces `flushOnExit` option in batch buffer configuration. When on (default), the client will flush all events on it's exit.
1 parent 79a9757 commit 9306dfe

11 files changed

Lines changed: 490 additions & 12 deletions

File tree

packages/node-sdk/README.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -604,17 +604,21 @@ the number of calls that are sent to Bucket's servers. During process shutdown,
604604
some messages could be waiting to be sent, and thus, would be discarded if the
605605
buffer is not flushed.
606606
607-
A naive example:
607+
By default, the SDK automatically subscribes to process exit signals and attempts to flush
608+
any pending events. This behavior is controlled by the `flushOnExit` option in the client configuration:
608609
609610
```typescript
610-
process.on("SIGINT", () => {
611-
console.log("Flushing batch buffer...");
612-
client.flush().then(() => {
613-
process.exit(0);
614-
});
611+
const client = new BucketClient({
612+
batchOptions: {
613+
flushOnExit: false, // disable automatic flushing on exit
614+
},
615615
});
616616
```
617617
618+
> [!NOTE]
619+
> If you are creating multiple client instances in your application, it's recommended to disable `flushOnExit`
620+
> to avoid potential conflicts during process shutdown. In such cases, you should implement your own flush handling.
621+
618622
When you bind a client to a user/company, this data is matched against the
619623
targeting rules. To get accurate targeting, you must ensure that the user/company
620624
information provided is sufficient to match against the targeting rules you've

packages/node-sdk/example/bucket.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,6 @@ let featureOverrides = (_: Context): FeatureOverrides => {
3535
}; // feature keys checked at compile time
3636
};
3737

38-
let host = undefined;
39-
if (process.env.BUCKET_HOST) {
40-
host = process.env.BUCKET_HOST;
41-
}
42-
4338
// Create a new BucketClient instance with the secret key and default features
4439
// The default features will be used if the user does not have any features set
4540
// Create a bucketConfig.json file to configure the client or set environment variables

packages/node-sdk/example/serve.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import "dotenv/config";
2+
13
import bucket from "./bucket";
24
import app from "./app";
35

packages/node-sdk/src/client.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
SDK_VERSION_HEADER_NAME,
1515
} from "./config";
1616
import fetchClient from "./fetch-http-client";
17+
import { subscribe as triggerOnExit } from "./flusher";
1718
import { newRateLimiter } from "./rate-limiter";
1819
import type {
1920
EvaluatedFeaturesAPIResponse,
@@ -266,6 +267,10 @@ export class BucketClient {
266267
: () => config.featureOverrides,
267268
};
268269

270+
if ((config.batchOptions?.flushOnExit ?? true) && !this._config.offline) {
271+
triggerOnExit(() => this.flush());
272+
}
273+
269274
if (!new URL(this._config.apiBaseUrl).pathname.endsWith("/")) {
270275
this._config.apiBaseUrl += "/";
271276
}
@@ -470,8 +475,14 @@ export class BucketClient {
470475
* @remarks
471476
* It is recommended to call this method when the application is shutting down to ensure all events are sent
472477
* before the process exits.
478+
*
479+
* This method is automatically called when the process exits if `batchOptions.flushOnExit` is `true` in the options (default).
473480
*/
474481
public async flush() {
482+
if (this._config.offline) {
483+
return;
484+
}
485+
475486
await this._config.batchBuffer.flush();
476487
}
477488

packages/node-sdk/src/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export const API_BASE_URL = "https://front.bucket.co";
99
export const SDK_VERSION_HEADER_NAME = "bucket-sdk-version";
1010
export const SDK_VERSION = `node-sdk/${version}`;
1111
export const API_TIMEOUT_MS = 5000;
12+
export const END_FLUSH_TIMEOUT_MS = 5000;
1213

1314
export const BUCKET_LOG_PREFIX = "[Bucket]";
1415

packages/node-sdk/src/flusher.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { constants } from "os";
2+
3+
import { END_FLUSH_TIMEOUT_MS } from "./config";
4+
import { TimeoutError, withTimeout } from "./utils";
5+
6+
type Callback = () => Promise<void>;
7+
8+
const killSignals = ["SIGINT", "SIGTERM", "SIGHUP", "SIGBREAK"] as const;
9+
10+
export function subscribe(
11+
callback: Callback,
12+
timeout: number = END_FLUSH_TIMEOUT_MS,
13+
) {
14+
let state: boolean | undefined;
15+
16+
const wrappedCallback = async () => {
17+
if (state !== undefined) {
18+
return;
19+
}
20+
21+
state = false;
22+
23+
try {
24+
await withTimeout(callback(), timeout);
25+
} catch (error) {
26+
if (error instanceof TimeoutError) {
27+
console.error(
28+
"[Bucket SDK] Timeout while flushing events on process exit.",
29+
);
30+
} else {
31+
console.error(
32+
"[Bucket SDK] An error occurred while flushing events on process exit.",
33+
error,
34+
);
35+
}
36+
}
37+
38+
state = true;
39+
};
40+
41+
killSignals.forEach((signal) => {
42+
const hasListeners = process.listenerCount(signal) > 0;
43+
44+
if (hasListeners) {
45+
process.prependListener(signal, wrappedCallback);
46+
} else {
47+
process.on(signal, async () => {
48+
await wrappedCallback();
49+
process.exit(0x80 + constants.signals[signal]);
50+
});
51+
}
52+
});
53+
54+
process.on("beforeExit", wrappedCallback);
55+
process.on("exit", () => {
56+
if (!state) {
57+
console.error(
58+
"[Bucket SDK] Failed to finalize the flushing of events on process exit.",
59+
);
60+
}
61+
});
62+
}

packages/node-sdk/src/types.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,13 +455,24 @@ export type BatchBufferOptions<T> = {
455455

456456
/**
457457
* The maximum size of the buffer before it is flushed.
458+
*
459+
* @defaultValue `100`
458460
**/
459461
maxSize?: number;
460462

461463
/**
462464
* The interval in milliseconds at which the buffer is flushed.
465+
*
466+
* @defaultValue `1000`
463467
**/
464468
intervalMs?: number;
469+
470+
/**
471+
* Whether to flush the buffer on exit.
472+
*
473+
* @defaultValue `true`
474+
*/
475+
flushOnExit?: boolean;
465476
};
466477

467478
/**

packages/node-sdk/src/utils.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,44 @@ export function once<T extends () => ReturnType<T>>(
175175
return returned;
176176
};
177177
}
178+
179+
export class TimeoutError extends Error {
180+
constructor(timeoutMs: number) {
181+
super(`Operation timed out after ${timeoutMs}ms`);
182+
this.name = "TimeoutError";
183+
}
184+
}
185+
186+
/**
187+
* Wraps a promise with a timeout. If the promise doesn't resolve within the specified
188+
* timeout, it will reject with a timeout error. The original promise will still
189+
* continue to execute but its result will be ignored.
190+
*
191+
* @param promise - The promise to wrap with a timeout
192+
* @param timeoutMs - The timeout in milliseconds
193+
* @returns A promise that resolves with the original promise result or rejects with a timeout error
194+
* @throws {Error} If the timeout is reached before the promise resolves
195+
**/
196+
export function withTimeout<T>(
197+
promise: Promise<T>,
198+
timeoutMs: number,
199+
): Promise<T> {
200+
ok(timeoutMs > 0, "timeout must be a positive number");
201+
202+
return new Promise((resolve, reject) => {
203+
const timeoutId = setTimeout(() => {
204+
reject(new TimeoutError(timeoutMs));
205+
}, timeoutMs);
206+
207+
promise
208+
.then((result) => {
209+
resolve(result);
210+
})
211+
.catch((error) => {
212+
reject(error);
213+
})
214+
.finally(() => {
215+
clearTimeout(timeoutId);
216+
});
217+
});
218+
}

packages/node-sdk/test/client.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
SDK_VERSION_HEADER_NAME,
2424
} from "../src/config";
2525
import fetchClient from "../src/fetch-http-client";
26+
import { subscribe as triggerOnExit } from "../src/flusher";
2627
import { newRateLimiter } from "../src/rate-limiter";
2728
import { ClientOptions, Context, FeaturesAPIResponse } from "../src/types";
2829

@@ -45,6 +46,10 @@ vi.mock("../src/rate-limiter", async (importOriginal) => {
4546
};
4647
});
4748

49+
vi.mock("../src/flusher", () => ({
50+
subscribe: vi.fn(),
51+
}));
52+
4853
const user = {
4954
id: "user123",
5055
age: 1,
@@ -82,6 +87,7 @@ const validOptions: ClientOptions = {
8287
batchOptions: {
8388
maxSize: 99,
8489
intervalMs: 100,
90+
flushOnExit: false,
8591
},
8692
offline: false,
8793
};
@@ -372,6 +378,36 @@ describe("BucketClient", () => {
372378
);
373379
});
374380

381+
it("should not register an exit flush handler if `batchOptions.flushOnExit` is false", () => {
382+
new BucketClient({
383+
...validOptions,
384+
batchOptions: { ...validOptions.batchOptions, flushOnExit: false },
385+
});
386+
387+
expect(triggerOnExit).not.toHaveBeenCalled();
388+
});
389+
390+
it("should not register an exit flush handler if `offline` is true", () => {
391+
new BucketClient({
392+
...validOptions,
393+
offline: true,
394+
});
395+
396+
expect(triggerOnExit).not.toHaveBeenCalled();
397+
});
398+
399+
it.each([undefined, true])(
400+
"should register an exit flush handler if `batchOptions.flushOnExit` is `%s`",
401+
(flushOnExit) => {
402+
new BucketClient({
403+
...validOptions,
404+
batchOptions: { ...validOptions.batchOptions, flushOnExit },
405+
});
406+
407+
expect(triggerOnExit).toHaveBeenCalledWith(expect.any(Function));
408+
},
409+
);
410+
375411
it.each([
376412
["https://api.example.com", "https://api.example.com/bulk"],
377413
["https://api.example.com/", "https://api.example.com/bulk"],
@@ -971,6 +1007,18 @@ describe("BucketClient", () => {
9711007
],
9721008
);
9731009
});
1010+
1011+
it("should not flush all bulk data if `offline` is true", async () => {
1012+
const client = new BucketClient({
1013+
...validOptions,
1014+
offline: true,
1015+
});
1016+
1017+
await client.updateUser(user.id, { attributes: { age: 2 } });
1018+
await client.flush();
1019+
1020+
expect(httpClient.post).not.toHaveBeenCalled();
1021+
});
9741022
});
9751023

9761024
describe("getFeature", () => {

0 commit comments

Comments
 (0)