diff --git a/packages/utils/docs/profiler.md b/packages/utils/docs/profiler.md new file mode 100644 index 000000000..a3740b875 --- /dev/null +++ b/packages/utils/docs/profiler.md @@ -0,0 +1,360 @@ +# User Timing Profiler + +⏱️ **High-performance profiling utility for structured timing measurements with Chrome DevTools Extensibility API payloads.** 📊 + +--- + +The `Profiler` class provides a clean, type-safe API for performance monitoring that integrates seamlessly with Chrome DevTools. It supports both synchronous and asynchronous operations with smart defaults for custom track visualization, enabling developers to track performance bottlenecks and optimize application speed. + +## Getting started + +1. If you haven't already, install [@code-pushup/utils](../../README.md). + +2. Install as a dependency with your package manager: + + ```sh + npm install @code-pushup/utils + ``` + + ```sh + yarn add @code-pushup/utils + ``` + + ```sh + pnpm add @code-pushup/utils + ``` + +3. Import and create a profiler instance: + + ```ts + import { Profiler } from '@code-pushup/utils'; + + const profiler = new Profiler({ + prefix: 'cp', + track: 'CLI', + trackGroup: 'Code Pushup', + color: 'primary-dark', + tracks: { + utils: { track: 'Utils', color: 'primary' }, + core: { track: 'Core', color: 'primary-light' }, + }, + enabled: true, + }); + ``` + +4. Start measuring performance: + + ```ts + // Measure synchronous operations + const result = profiler.measure('data-processing', () => { + return processData(data); + }); + + // Measure asynchronous operations + const asyncResult = await profiler.measureAsync('api-call', async () => { + return await fetch('/api/data').then(r => r.json()); + }); + ``` + +## Configuration + +```ts +new Profiler(options: ProfilerOptions) +``` + +**Parameters:** + +- `options` - Configuration options for the profiler instance + +**Options:** + +| Property | Type | Default | Description | +| ------------ | --------- | ----------- | --------------------------------------------------------------- | +| `tracks` | `object` | `undefined` | Custom track configurations merged with defaults | +| `prefix` | `string` | `undefined` | Prefix for all measurement names | +| `track` | `string` | `undefined` | Default track name for measurements | +| `trackGroup` | `string` | `undefined` | Default track group for organization | +| `color` | `string` | `undefined` | Default color for track entries | +| `enabled` | `boolean` | `env var` | Whether profiling is enabled (defaults to CP_PROFILING env var) | + +### Environment Variables + +- `CP_PROFILING` - Enables or disables profiling globally (boolean) + +```bash +# Enable profiling in development +CP_PROFILING=true npm run dev + +# Disable profiling in production +CP_PROFILING=false npm run build +``` + +## API Methods + +The profiler provides several methods for different types of performance measurements: + +| Method | Description | +| ------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------- | +| `measure(event: string, work: () => R, options?: MeasureOptions): R` | Measures synchronous operation execution time with DevTools payloads. Noop when profiling is disabled. | +| `measureAsync(event: string, work: () => Promise, options?: MeasureOptions): Promise` | Measures asynchronous operation execution time with DevTools payloads. Noop when profiling is disabled. | +| `marker(name: string, opt?: MarkerOptions): void` | Creates performance markers as vertical lines in DevTools timeline. Noop when profiling is disabled. | +| `setEnabled(enabled: boolean): void` | Controls profiling at runtime. | +| `isEnabled(): boolean` | Returns whether profiling is currently enabled. | + +### Synchronous measurements + +```ts +profiler.measure(event: string, work: () => R, options?: MeasureOptions): R +``` + +Measures the execution time of a synchronous operation. Creates performance start/end marks and a final measure with Chrome DevTools Extensibility API payloads. + +```ts +const result = profiler.measure( + 'file-processing', + () => { + return fs.readFileSync('large-file.txt', 'utf8'); + }, + { + track: 'io-operations', + color: 'warning', + }, +); +``` + +### Asynchronous measurements + +```ts +profiler.measureAsync(event: string, work: () => Promise, options?: MeasureOptions): Promise +``` + +Measures the execution time of an asynchronous operation. + +```ts +const data = await profiler.measureAsync( + 'api-request', + async () => { + const response = await fetch('/api/data'); + return response.json(); + }, + { + track: 'network', + trackGroup: 'external', + }, +); +``` + +### Performance markers + +```ts +profiler.marker(name: string, options?: EntryMeta & { color?: DevToolsColor }): void +``` + +Creates a performance mark with Chrome DevTools marker visualization. Markers appear as vertical lines spanning all tracks and can include custom metadata. + +```ts +profiler.marker('user-action', { + color: 'secondary', + tooltipText: 'User clicked save button', + properties: [ + ['action', 'save'], + ['elementId', 'save-btn'], + ], +}); +``` + +### Runtime control + +```ts +profiler.setEnabled(enabled: boolean): void +profiler.isEnabled(): boolean +``` + +Control profiling at runtime and check current status. + +```ts +// Disable profiling temporarily +profiler.setEnabled(false); + +// Check if profiling is active +if (profiler.isEnabled()) { + console.log('Performance monitoring is active'); +} +``` + +## Examples + +### Basic usage + +```ts +import { Profiler } from '@code-pushup/utils'; + +const profiler = new Profiler({ + prefix: 'cp', + track: 'CLI', + trackGroup: 'Code Pushup', + color: 'primary-dark', + tracks: { + utils: { track: 'Utils', color: 'primary' }, + core: { track: 'Core', color: 'primary-light' }, + }, + enabled: true, +}); + +// Simple measurement +const result = profiler.measure('data-transform', () => { + return transformData(input); +}); + +// Async measurement with custom options +const data = await profiler.measureAsync( + 'fetch-user', + async () => { + return await api.getUser(userId); + }, + { + track: 'api', + color: 'info', + }, +); + +// Add a marker for important events +profiler.marker('user-login', { + tooltipText: 'User authentication completed', +}); +``` + +### Custom tracks + +Define custom track configurations for better organization: + +```ts +interface AppTracks { + api: ActionTrackEntryPayload; + db: ActionTrackEntryPayload; + cache: ActionTrackEntryPayload; +} + +const profiler = new Profiler({ + track: 'API', + trackGroup: 'Server', + color: 'primary-dark', + tracks: { + api: { color: 'primary' }, + db: { track: 'database', color: 'warning' }, + cache: { track: 'cache', color: 'success' }, + }, +}); + +// Use predefined tracks +const users = await profiler.measureAsync('fetch-users', fetchUsers, profiler.tracks.api); + +const saved = profiler.measure('save-user', () => saveToDb(user), { + ...profiler.tracks.db, + color: 'primary', +}); +``` + +## NodeJSProfiler + +This profiler extends all options and API from Profiler with automatic process exit handling for buffered performance data. + +The NodeJSProfiler automatically subscribes to performance observation and installs exit handlers that flush buffered data on process termination (signals, fatal errors, or normal exit). + +## Configuration + +```ts +new NodejsProfiler(options: NodejsProfilerOptions) +``` + +**Parameters:** + +- `options` - Configuration options for the profiler instance + +**Options:** + +| Property | Type | Default | Description | +| ------------------------ | --------------------------------------- | ---------- | ------------------------------------------------------------------------------- | +| `encodePerfEntry` | `PerformanceEntryEncoder` | _required_ | Function that encodes raw PerformanceEntry objects into domain-specific types | +| `captureBufferedEntries` | `boolean` | `true` | Whether to capture performance entries that occurred before observation started | +| `flushThreshold` | `number` | `20` | Threshold for triggering queue flushes based on queue length | +| `maxQueueSize` | `number` | `10_000` | Maximum number of items allowed in the queue before new entries are dropped | + +## API Methods + +The NodeJSProfiler inherits all API methods from the base Profiler class and adds additional methods for queue management and WAL lifecycle control. + +| Method | Description | +| ------------------------------------ | ------------------------------------------------------------------------------- | +| `getStats()` | Returns comprehensive queue statistics for monitoring and debugging. | +| `flush()` | Forces immediate writing of all queued performance entries to the WAL. | +| `setEnabled(enabled: boolean): void` | Controls profiling at runtime with automatic WAL/observer lifecycle management. | + +### Runtime control with Write Ahead Log lifecycle management + +```ts +profiler.setEnabled(enabled: boolean): void +``` + +Controls profiling at runtime and manages the WAL/observer lifecycle. Unlike the base Profiler class, this method ensures that when profiling is enabled, the WAL is opened and the performance observer is subscribed. When disabled, the WAL is closed and the observer is unsubscribed. + +```ts +// Temporarily disable profiling to reduce overhead during heavy operations +profiler.setEnabled(false); +await performHeavyOperation(); +profiler.setEnabled(true); // WAL reopens and observer resubscribes +``` + +### Queue statistics + +```ts +profiler.getStats(): { + enabled: boolean; + observing: boolean; + walOpen: boolean; + isSubscribed: boolean; + queued: number; + dropped: number; + written: number; + maxQueueSize: number; + flushThreshold: number; + addedSinceLastFlush: number; + buffered: boolean; +} +``` + +Returns comprehensive queue statistics for monitoring and debugging. Provides insight into the current state of the performance entry queue, useful for monitoring memory usage and processing throughput. + +```ts +const stats = profiler.getStats(); +console.log(`Enabled: ${stats.enabled}, WAL Open: ${stats.walOpen}, Observing: ${stats.observing}, Subscribed: ${stats.isSubscribed}, Queued: ${stats.queued}`); +if (stats.enabled && stats.walOpen && stats.observing && stats.isSubscribed && stats.queued > stats.flushThreshold) { + console.log('Queue nearing capacity, consider manual flush'); +} +``` + +### Manual flushing + +```ts +profiler.flush(): void +``` + +Forces immediate writing of all queued performance entries to the write ahead log, ensuring no performance data is lost. This method is useful for manual control over when buffered data is written, complementing the automatic flushing that occurs during process exit or when thresholds are reached. + +```ts +// Flush periodically in long-running applications to prevent memory buildup +setInterval(() => { + profiler.flush(); +}, 60000); // Flush every minute + +// Ensure all measurements are saved before critical operations +await profiler.measureAsync('database-migration', async () => { + await runMigration(); + profiler.flush(); // Ensure migration timing is recorded immediately +}); +``` + +## Resources + +- **[Chrome DevTools Extensibility API](https://developer.chrome.com/docs/devtools/performance/extension)** - Official documentation for performance profiling +- **[User Timing API](https://developer.mozilla.org/en-US/docs/Web/API/User_Timing_API)** - Web Performance API reference diff --git a/packages/utils/mocks/sink.mock.ts b/packages/utils/mocks/sink.mock.ts index 13d89e91c..7f435ab4f 100644 --- a/packages/utils/mocks/sink.mock.ts +++ b/packages/utils/mocks/sink.mock.ts @@ -1,30 +1,55 @@ -import type { Sink } from '../src/lib/sink-source.type'; +import { vi } from 'vitest'; +import type { + RecoverResult, + Recoverable, + Sink, +} from '../src/lib/sink-source.type'; export class MockSink implements Sink { private writtenItems: string[] = []; - private closed = false; + private closed = true; - open(): void { + open = vi.fn((): void => { this.closed = false; - } + }); - write(input: string): void { + write = vi.fn((input: string): void => { this.writtenItems.push(input); - } + }); - close(): void { + close = vi.fn((): void => { this.closed = true; - } + }); - isClosed(): boolean { + isClosed = vi.fn((): boolean => { return this.closed; - } + }); - encode(input: string): string { + encode = vi.fn((input: string): string => { return `${input}-${this.constructor.name}-encoded`; - } + }); - getWrittenItems(): string[] { + getWrittenItems = vi.fn((): string[] => { return [...this.writtenItems]; - } + }); +} + +export class MockTraceEventFileSink extends MockSink implements Recoverable { + recover = vi.fn( + (): { + records: unknown[]; + errors: { lineNo: number; line: string; error: Error }[]; + partialTail: string | null; + } => { + return { + records: this.getWrittenItems(), + errors: [], + partialTail: null, + } satisfies RecoverResult; + }, + ); + + repack = vi.fn((): void => {}); + + finalize = vi.fn((): void => {}); } diff --git a/packages/utils/src/lib/exit-process.ts b/packages/utils/src/lib/exit-process.ts index 62cee4977..e2e3f89f8 100644 --- a/packages/utils/src/lib/exit-process.ts +++ b/packages/utils/src/lib/exit-process.ts @@ -44,8 +44,8 @@ export function installExitHandlers(options: ExitHandlerOptions = {}): void { const { onExit, onError, - exitOnFatal, - exitOnSignal, + exitOnFatal = false, + exitOnSignal = false, fatalExitCode = DEFAULT_FATAL_EXIT_CODE, } = options; diff --git a/packages/utils/src/lib/performance-observer.int.test.ts b/packages/utils/src/lib/performance-observer.int.test.ts index 2c1721ebb..268106a70 100644 --- a/packages/utils/src/lib/performance-observer.int.test.ts +++ b/packages/utils/src/lib/performance-observer.int.test.ts @@ -23,13 +23,14 @@ describe('PerformanceObserverSink', () => { beforeEach(() => { sink = new MockSink(); + sink.open(); encode = vi.fn((entry: PerformanceEntry) => [ `${entry.name}:${entry.entryType}`, ]); options = { sink, - encode, + encodePerfEntry: encode, }; performance.clearMarks(); @@ -40,23 +41,22 @@ describe('PerformanceObserverSink', () => { expect(() => new PerformanceObserverSink(options)).not.toThrow(); }); - it('internal PerformanceObserver should process observed entries', () => { + it('internal PerformanceObserver should process observed entries', async () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); performance.mark('test-mark'); performance.measure('test-measure'); + await awaitObserverCallback(); observer.flush(); expect(encode).toHaveBeenCalledTimes(2); - expect(encode).toHaveBeenNthCalledWith( - 1, + expect(encode).toHaveBeenCalledWith( expect.objectContaining({ name: 'test-mark', entryType: 'mark', }), ); - expect(encode).toHaveBeenNthCalledWith( - 2, + expect(encode).toHaveBeenCalledWith( expect.objectContaining({ name: 'test-measure', entryType: 'measure', @@ -80,7 +80,7 @@ describe('PerformanceObserverSink', () => { expect(encode).toHaveBeenCalledTimes(3); }); - it('flush flushes observed entries when subscribed', () => { + it('flush flushes observed entries when subscribed', async () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); @@ -88,6 +88,7 @@ describe('PerformanceObserverSink', () => { performance.mark('test-mark2'); expect(sink.getWrittenItems()).toStrictEqual([]); + await awaitObserverCallback(); observer.flush(); expect(sink.getWrittenItems()).toStrictEqual([ 'test-mark1:mark', @@ -95,13 +96,14 @@ describe('PerformanceObserverSink', () => { ]); }); - it('flush calls encode for each entry', () => { + it('flush calls encode for each entry', async () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); performance.mark('test-mark1'); performance.mark('test-mark2'); + await awaitObserverCallback(); observer.flush(); expect(encode).toHaveBeenCalledWith( @@ -137,91 +139,55 @@ describe('PerformanceObserverSink', () => { expect(encode).toHaveBeenCalledTimes(2); }); - it('should observe performance entries and write them to the sink on flush', () => { + it('should observe performance entries and write them to the sink on flush', async () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); performance.mark('test-mark'); + await awaitObserverCallback(); observer.flush(); expect(sink.getWrittenItems()).toHaveLength(1); }); - it('should observe buffered performance entries when buffered is enabled', async () => { - const observer = new PerformanceObserverSink({ - ...options, - buffered: true, - }); + it('should observe performance entries when subscribed', async () => { + const observer = new PerformanceObserverSink(options); + observer.subscribe(); performance.mark('test-mark-1'); performance.mark('test-mark-2'); - await new Promise(resolve => setTimeout(resolve, 10)); - observer.subscribe(); - await new Promise(resolve => setTimeout(resolve, 10)); - expect(performance.getEntries()).toHaveLength(2); + await awaitObserverCallback(); observer.flush(); expect(sink.getWrittenItems()).toHaveLength(2); }); - it('handles multiple encoded items per performance entry', () => { + it('handles multiple encoded items per performance entry', async () => { const multiEncodeFn = vi.fn(e => [ `${e.entryType}-item1`, `${e.entryType}item2`, ]); const observer = new PerformanceObserverSink({ ...options, - encode: multiEncodeFn, + encodePerfEntry: multiEncodeFn, }); observer.subscribe(); performance.mark('test-mark'); + await awaitObserverCallback(); observer.flush(); expect(sink.getWrittenItems()).toHaveLength(2); }); - it('cursor logic prevents duplicate processing of performance entries', () => { - const observer = new PerformanceObserverSink(options); - observer.subscribe(); - - performance.mark('first-mark'); - performance.mark('second-mark'); - expect(encode).not.toHaveBeenCalled(); - observer.flush(); - expect(sink.getWrittenItems()).toStrictEqual([ - 'first-mark:mark', - 'second-mark:mark', - ]); - - expect(encode).toHaveBeenCalledTimes(2); - expect(encode).toHaveBeenNthCalledWith( - 1, - expect.objectContaining({ name: 'first-mark' }), - ); - expect(encode).toHaveBeenNthCalledWith( - 2, - expect.objectContaining({ name: 'second-mark' }), - ); - - performance.mark('third-mark'); - performance.measure('first-measure'); - - observer.flush(); - expect(sink.getWrittenItems()).toStrictEqual([ - 'first-mark:mark', - 'second-mark:mark', - 'third-mark:mark', - 'first-measure:measure', - ]); + it('throws error when subscribing with sink that is not open', () => { + const closedSink = new MockSink(); + const observer = new PerformanceObserverSink({ + sink: closedSink, + encodePerfEntry: encode, + }); - expect(encode).toHaveBeenCalledTimes(4); - expect(encode).toHaveBeenNthCalledWith( - 3, - expect.objectContaining({ name: 'third-mark' }), - ); - expect(encode).toHaveBeenNthCalledWith( - 4, - expect.objectContaining({ name: 'first-measure' }), + expect(() => observer.subscribe()).toThrow( + 'Sink MockSink must be opened before subscribing PerformanceObserver', ); }); }); diff --git a/packages/utils/src/lib/performance-observer.ts b/packages/utils/src/lib/performance-observer.ts index fa5720427..402fb3c79 100644 --- a/packages/utils/src/lib/performance-observer.ts +++ b/packages/utils/src/lib/performance-observer.ts @@ -1,111 +1,326 @@ -import { - type PerformanceEntry, - PerformanceObserver, - type PerformanceObserverEntryList, - performance, -} from 'node:perf_hooks'; -import type { Buffered, Encoder, Observer, Sink } from './sink-source.type'; +import { type PerformanceEntry, PerformanceObserver } from 'node:perf_hooks'; +import type { Buffered, Observer, Sink } from './sink-source.type'; +/** + * Encoder that converts PerformanceEntry to domain events. + * + * Pure function that transforms performance entries into domain events. + * Should be stateless, synchronous, and have no side effects. + * Returns a readonly array of encoded items. + */ +export type PerformanceEntryEncoder = ( + entry: PerformanceEntry, +) => readonly F[]; + +/** + * Array of performance entry types that this observer monitors. + * Only 'mark' and 'measure' entries are tracked as they represent + * user-defined performance markers and measurements. + */ const OBSERVED_TYPES = ['mark', 'measure'] as const; type ObservedEntryType = 'mark' | 'measure'; +const OBSERVED_TYPE_SET = new Set(OBSERVED_TYPES); + +/** + * Default threshold for triggering queue flushes based on queue length. + * When the queue length reaches (maxQueueSize - flushThreshold), + * a flush is triggered to prevent overflow. This provides a buffer zone + * before hitting the maximum queue capacity. + */ export const DEFAULT_FLUSH_THRESHOLD = 20; +/** + * Default maximum number of items allowed in the queue before entries are dropped. + * This acts as a memory safety limit to prevent unbounded memory growth + * in case of sink slowdown or high-frequency performance entries. + */ +export const DEFAULT_MAX_QUEUE_SIZE = 10_000; + +/** + * Validates the flush threshold configuration to ensure sensible bounds. + * + * The flush threshold must be positive and cannot exceed the maximum queue size, + * as it represents a buffer zone within the queue capacity. + * + * @param flushThreshold - The threshold value to validate (must be > 0) + * @param maxQueueSize - The maximum queue size for comparison (flushThreshold <= maxQueueSize) + * @throws {Error} If flushThreshold is not positive or exceeds maxQueueSize + */ +export function validateFlushThreshold( + flushThreshold: number, + maxQueueSize: number, +): void { + if (flushThreshold <= 0) { + throw new Error('flushThreshold must be > 0'); + } + if (flushThreshold > maxQueueSize) { + throw new Error('flushThreshold must be <= maxQueueSize'); + } +} + +/** + * Configuration options for the PerformanceObserverSink. + * + * @template T - The type of encoded performance data that will be written to the sink + */ export type PerformanceObserverOptions = { + /** + * The sink where encoded performance entries will be written. + * Must implement the Sink interface for handling the encoded data. + */ sink: Sink; - encode: (entry: PerformanceEntry) => T[]; - buffered?: boolean; + + /** + * Function that encodes raw PerformanceEntry objects into domain-specific types. + * This transformer converts Node.js performance entries into application-specific data structures. + * Returns a readonly array of encoded items. + */ + encodePerfEntry: PerformanceEntryEncoder; + + /** + * Whether to enable buffered observation mode. + * When true, captures all performance entries that occurred before observation started. + * When false, only captures entries after subscription begins. + * + * @default true + */ + captureBufferedEntries?: boolean; + + /** + * Threshold for triggering queue flushes based on queue length. + * Flushes occur when queue length reaches (maxQueueSize - flushThreshold). + * Larger values provide more buffer space before hitting capacity limits. + * + * @default DEFAULT_FLUSH_THRESHOLD (20) + */ flushThreshold?: number; + + /** + * Maximum number of items allowed in the queue before new entries are dropped. + * Acts as a memory safety limit to prevent unbounded growth during sink slowdown. + * + * @default DEFAULT_MAX_QUEUE_SIZE (10000) + */ + maxQueueSize?: number; }; -export class PerformanceObserverSink - implements Observer, Buffered, Encoder -{ - #encode: (entry: PerformanceEntry) => T[]; +/** + * A sink implementation that observes Node.js performance entries and forwards them to a configurable sink. + * + * This class provides a buffered, memory-safe bridge between Node.js PerformanceObserver + * and application-specific data sinks. It handles performance entry encoding, queue management, + * and graceful degradation under high load conditions. + * + * @template T - The type of encoded performance data written to the sink + * @implements {Observer} - Lifecycle management interface + * @implements {Buffered} - Queue statistics interface + */ +export class PerformanceObserverSink implements Observer, Buffered { + /** Encoder function for transforming PerformanceEntry objects into domain types */ + #encodePerfEntry: PerformanceEntryEncoder; + + /** Whether buffered observation mode is enabled */ #buffered: boolean; + + /** Threshold for triggering flushes based on queue length proximity to max capacity */ #flushThreshold: number; + + /** Maximum number of items allowed in queue before dropping new entries (hard memory limit) */ + #maxQueueSize: number; + + /** The target sink where encoded performance data is written */ #sink: Sink; + + /** Node.js PerformanceObserver instance, undefined when not subscribed */ #observer: PerformanceObserver | undefined; - #pendingCount = 0; + /** Bounded queue storing encoded performance items awaiting flush */ + #queue: T[] = []; + + /** Count of performance entries dropped due to queue overflow */ + #dropped = 0; + + /** Count of performance entries successfully written to sink */ + #written = 0; - // "cursor" per type: how many we already wrote from the global buffer - #written: Map; + /** Number of items added to queue since last successful flush */ + #addedSinceLastFlush = 0; + /** + * Creates a new PerformanceObserverSink with the specified configuration. + * + * @param options - Configuration options for the performance observer sink + * @throws {Error} If flushThreshold validation fails (must be > 0 and <= maxQueueSize) + */ constructor(options: PerformanceObserverOptions) { - const { encode, sink, buffered, flushThreshold } = options; - this.#encode = encode; - this.#written = new Map( - OBSERVED_TYPES.map(t => [t, 0]), - ); + const { + encodePerfEntry, + sink, + captureBufferedEntries, + flushThreshold = DEFAULT_FLUSH_THRESHOLD, + maxQueueSize = DEFAULT_MAX_QUEUE_SIZE, + } = options; + this.#encodePerfEntry = encodePerfEntry; this.#sink = sink; - this.#buffered = buffered ?? false; - this.#flushThreshold = flushThreshold ?? DEFAULT_FLUSH_THRESHOLD; + this.#buffered = captureBufferedEntries ?? true; + this.#maxQueueSize = maxQueueSize; + validateFlushThreshold(flushThreshold, this.#maxQueueSize); + this.#flushThreshold = flushThreshold; } - encode(entry: PerformanceEntry): T[] { - return this.#encode(entry); + /** + * Returns current queue statistics for monitoring and debugging. + * + * Provides insight into the current state of the performance entry queue, + * useful for monitoring memory usage and processing throughput. + * + * @returns Object containing all states and entry counts + */ + getStats() { + return { + isSubscribed: this.isSubscribed(), + queued: this.#queue.length, + dropped: this.#dropped, + written: this.#written, + maxQueueSize: this.#maxQueueSize, + flushThreshold: this.#flushThreshold, + addedSinceLastFlush: this.#addedSinceLastFlush, + buffered: this.#buffered, + }; + } + + /** + * Encodes a raw PerformanceEntry using the configured encoder function. + * + * This method delegates to the user-provided encoder function, allowing + * transformation of Node.js performance entries into application-specific types. + * + * @param entry - The raw performance entry to encode + * @returns Readonly array of encoded items + */ + encode(entry: PerformanceEntry): readonly T[] { + return this.#encodePerfEntry(entry); } + /** + * Starts observing performance entries and forwarding them to the sink. + * + * Creates a Node.js PerformanceObserver that monitors 'mark' and 'measure' entries. + * The observer uses a bounded queue with proactive flushing to manage memory usage. + * When buffered mode is enabled, any existing buffered entries are immediately flushed. + * + * @throws {Error} If the sink is closed before subscription + * + */ subscribe(): void { if (this.#observer) { return; } + if (this.#sink.isClosed()) { + throw new Error( + `Sink ${this.#sink.constructor.name} must be opened before subscribing PerformanceObserver`, + ); + } + + this.#observer = new PerformanceObserver(list => { + list.getEntries().forEach(entry => { + if (OBSERVED_TYPE_SET.has(entry.entryType as ObservedEntryType)) { + const items = this.encode(entry); + items.forEach(item => { + if (this.#queue.length >= this.#maxQueueSize) { + this.#dropped++; + return; + } - // Only used to trigger the flush - it's not processing the entries, just counting them - this.#observer = new PerformanceObserver( - (list: PerformanceObserverEntryList) => { - const batchCount = OBSERVED_TYPES.reduce( - (n, t) => n + list.getEntriesByType(t).length, - 0, - ); - - this.#pendingCount += batchCount; - if (this.#pendingCount >= this.#flushThreshold) { - this.flush(); + if ( + this.#queue.length >= + this.#maxQueueSize - this.#flushThreshold + ) { + this.flush(); + } + this.#queue.push(item); + this.#addedSinceLastFlush++; + }); } - }, - ); + }); + + if (this.#addedSinceLastFlush >= this.#flushThreshold) { + this.flush(); + } + }); this.#observer.observe({ entryTypes: OBSERVED_TYPES, buffered: this.#buffered, }); + + if (this.#buffered) { + this.flush(); + } } + /** + * Flushes all queued performance entries to the sink. + * + * Writes all currently queued encoded performance entries to the configured sink. + * If the sink is closed during flush, the queue is cleared without writing. + * The queue is always cleared after flush attempt, regardless of success or failure. + * + * @throws {Error} If sink write operations fail (with original error as cause) + */ flush(): void { - if (!this.#observer) { + if (this.#queue.length === 0) { + return; + } + if (this.#sink.isClosed()) { + // clear queue and drop items when sink closes unexpectedly + this.#queue.length = 0; return; } - OBSERVED_TYPES.forEach(t => { - const written = this.#written.get(t) ?? 0; - const fresh = performance.getEntriesByType(t).slice(written); - - try { - fresh - .flatMap(entry => this.encode(entry)) - .forEach(item => this.#sink.write(item)); - - this.#written.set(t, written + fresh.length); - } catch (error) { - throw new Error( - 'PerformanceObserverSink failed to write items to sink.', - { cause: error }, - ); - } - }); - - this.#pendingCount = 0; + try { + this.#queue.forEach(item => { + this.#sink.write(item); + this.#written++; + }); + } catch (error) { + throw new Error( + 'PerformanceObserverSink failed to write items to sink.', + { cause: error }, + ); + } finally { + this.#queue.length = 0; + this.#addedSinceLastFlush = 0; + } } + /** + * Stops observing performance entries and cleans up resources. + * + * Performs a final flush of any remaining queued entries, then disconnects + * the PerformanceObserver and releases all references. + * + * This method is idempotent - safe to call multiple times. + */ unsubscribe(): void { if (!this.#observer) { return; } - this.#observer?.disconnect(); + this.flush(); + this.#queue.length = 0; + this.#addedSinceLastFlush = 0; + this.#observer.disconnect(); this.#observer = undefined; } + /** + * Checks whether the performance observer is currently active. + * + * Returns true if the sink is subscribed and actively observing performance entries. + * This indicates that a PerformanceObserver instance exists and is connected. + * + * @returns true if currently subscribed and observing, false otherwise + */ isSubscribed(): boolean { return this.#observer !== undefined; } diff --git a/packages/utils/src/lib/performance-observer.unit.test.ts b/packages/utils/src/lib/performance-observer.unit.test.ts index a73be955a..574c25775 100644 --- a/packages/utils/src/lib/performance-observer.unit.test.ts +++ b/packages/utils/src/lib/performance-observer.unit.test.ts @@ -10,25 +10,71 @@ import { import { MockPerformanceObserver } from '@code-pushup/test-utils'; import { MockSink } from '../../mocks/sink.mock'; import { + DEFAULT_FLUSH_THRESHOLD, + DEFAULT_MAX_QUEUE_SIZE, type PerformanceObserverOptions, PerformanceObserverSink, + validateFlushThreshold, } from './performance-observer.js'; +describe('validateFlushThreshold', () => { + it.each([ + { flushThreshold: 1, description: 'minimum valid value (1)' }, + { flushThreshold: 10, description: 'arbitrary valid value (10)' }, + { + flushThreshold: DEFAULT_FLUSH_THRESHOLD, + description: 'default flush threshold', + }, + { + flushThreshold: DEFAULT_MAX_QUEUE_SIZE, + description: 'maximum valid value (equals maxQueueSize)', + }, + ])( + 'accepts valid flushThreshold value: $description', + ({ flushThreshold }) => { + expect(() => + validateFlushThreshold(flushThreshold, DEFAULT_MAX_QUEUE_SIZE), + ).not.toThrow(); + }, + ); + + it.each([ + { flushThreshold: 0, expectedError: 'flushThreshold must be > 0' }, + { flushThreshold: -1, expectedError: 'flushThreshold must be > 0' }, + { flushThreshold: -10, expectedError: 'flushThreshold must be > 0' }, + { + flushThreshold: DEFAULT_MAX_QUEUE_SIZE + 1, + expectedError: 'flushThreshold must be <= maxQueueSize', + }, + { + flushThreshold: 20_000, + expectedError: 'flushThreshold must be <= maxQueueSize', + }, + ])( + 'throws error when flushThreshold is invalid: $flushThreshold', + ({ flushThreshold, expectedError }) => { + expect(() => + validateFlushThreshold(flushThreshold, DEFAULT_MAX_QUEUE_SIZE), + ).toThrow(expectedError); + }, + ); +}); + describe('PerformanceObserverSink', () => { - let encode: MockedFunction<(entry: PerformanceEntry) => string[]>; + let encodePerfEntry: MockedFunction<(entry: PerformanceEntry) => string[]>; let sink: MockSink; let options: PerformanceObserverOptions; beforeEach(() => { vi.clearAllMocks(); sink = new MockSink(); - encode = vi.fn((entry: PerformanceEntry) => [ + sink.open(); + encodePerfEntry = vi.fn((entry: PerformanceEntry) => [ `${entry.name}:${entry.entryType}`, ]); options = { sink, - encode, - // we test buffered behavior separately + encodePerfEntry, flushThreshold: 1, }; @@ -46,33 +92,10 @@ describe('PerformanceObserverSink', () => { () => new PerformanceObserverSink({ sink, - encode, + encodePerfEntry, }), ).not.toThrow(); expect(MockPerformanceObserver.instances).toHaveLength(0); - // Instance creation covers the default flushThreshold assignment - }); - - it('automatically flushes when pendingCount reaches flushThreshold', () => { - const observer = new PerformanceObserverSink({ - sink, - encode, - flushThreshold: 2, // Set threshold to 2 - }); - observer.subscribe(); - - const mockObserver = MockPerformanceObserver.lastInstance(); - - // Emit 1 entry - should not trigger flush yet (pendingCount = 1 < 2) - mockObserver?.emitMark('first-mark'); - expect(sink.getWrittenItems()).toStrictEqual([]); - - // Emit 1 more entry - should trigger flush (pendingCount = 2 >= 2) - mockObserver?.emitMark('second-mark'); - expect(sink.getWrittenItems()).toStrictEqual([ - 'first-mark:mark', - 'second-mark:mark', - ]); }); it('creates instance with all options without starting to observe', () => { @@ -80,14 +103,34 @@ describe('PerformanceObserverSink', () => { () => new PerformanceObserverSink({ ...options, - buffered: true, + captureBufferedEntries: true, flushThreshold: 10, }), ).not.toThrow(); expect(MockPerformanceObserver.instances).toHaveLength(0); }); - it('subscribe is isomorphic and calls observe on internal PerformanceObserver', () => { + it.each([ + { flushThreshold: 0, expectedError: 'flushThreshold must be > 0' }, + { flushThreshold: -1, expectedError: 'flushThreshold must be > 0' }, + { + flushThreshold: 10_001, + expectedError: 'flushThreshold must be <= maxQueueSize', + }, + ])( + 'throws error when flushThreshold is invalid: $flushThreshold', + ({ flushThreshold, expectedError }) => { + expect( + () => + new PerformanceObserverSink({ + ...options, + flushThreshold, + }), + ).toThrow(expectedError); + }, + ); + + it('subscribe is idempotent and calls observe on internal PerformanceObserver', () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); @@ -110,7 +153,7 @@ describe('PerformanceObserverSink', () => { ); }); - it('internal PerformanceObserver should observe unbuffered by default', () => { + it('internal PerformanceObserver should observe buffered by default', () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); @@ -118,7 +161,7 @@ describe('PerformanceObserverSink', () => { MockPerformanceObserver.lastInstance()?.observe, ).toHaveBeenCalledWith( expect.objectContaining({ - buffered: false, + buffered: true, }), ); }); @@ -126,7 +169,7 @@ describe('PerformanceObserverSink', () => { it('internal PerformanceObserver should observe buffered if buffered option is provided', () => { const observer = new PerformanceObserverSink({ ...options, - buffered: true, + captureBufferedEntries: true, }); observer.subscribe(); @@ -142,22 +185,35 @@ describe('PerformanceObserverSink', () => { it('internal PerformanceObserver should process observed entries', () => { const observer = new PerformanceObserverSink({ ...options, - flushThreshold: 20, // Disable automatic flushing for this test + flushThreshold: 20, }); observer.subscribe(); - performance.mark('test-mark'); - performance.measure('test-measure'); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'test-mark', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'test-measure', + entryType: 'measure', + startTime: 0, + duration: 100, + }, + ]); observer.flush(); - expect(encode).toHaveBeenCalledTimes(2); - expect(encode).toHaveBeenNthCalledWith( + expect(encodePerfEntry).toHaveBeenCalledTimes(2); + expect(encodePerfEntry).toHaveBeenNthCalledWith( 1, expect.objectContaining({ name: 'test-mark', entryType: 'mark', }), ); - expect(encode).toHaveBeenNthCalledWith( + expect(encodePerfEntry).toHaveBeenNthCalledWith( 2, expect.objectContaining({ name: 'test-measure', @@ -171,7 +227,7 @@ describe('PerformanceObserverSink', () => { observer.subscribe(); MockPerformanceObserver.lastInstance()?.emitNavigation('test-navigation'); - expect(encode).not.toHaveBeenCalled(); + expect(encodePerfEntry).not.toHaveBeenCalled(); }); it('isSubscribed returns false when not observing', () => { @@ -196,12 +252,28 @@ describe('PerformanceObserverSink', () => { expect(observer.isSubscribed()).toBe(false); }); - it('flush flushes observed entries when subscribed', () => { - const observer = new PerformanceObserverSink(options); + it('flush writes queued entries to sink when subscribed', () => { + const observer = new PerformanceObserverSink({ + ...options, + flushThreshold: 10, + }); observer.subscribe(); - performance.mark('test-mark1'); - performance.mark('test-mark2'); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'test-mark1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'test-mark2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); expect(sink.getWrittenItems()).toStrictEqual([]); observer.flush(); @@ -211,39 +283,80 @@ describe('PerformanceObserverSink', () => { ]); }); - it('flush calls encode for each entry', () => { + it('flush does not flush observed entries when not subscribed', () => { + const observer = new PerformanceObserverSink(options); + + performance.mark('test-mark'); + observer.flush(); + expect(encodePerfEntry).not.toHaveBeenCalled(); + expect(sink.getWrittenItems()).toStrictEqual([]); + }); + + it('flush calls encodePerfEntry for each entry', () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); - performance.mark('test-mark1'); - performance.mark('test-mark2'); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'test-mark1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'test-mark2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); observer.flush(); - expect(encode).toHaveBeenCalledWith({ - name: 'test-mark1', - entryType: 'mark', - startTime: 0, - duration: 0, - }); - expect(encode).toHaveBeenCalledWith({ - name: 'test-mark2', - entryType: 'mark', - startTime: 0, - duration: 0, + expect(encodePerfEntry).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'test-mark1', + entryType: 'mark', + }), + ); + expect(encodePerfEntry).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'test-mark2', + entryType: 'mark', + }), + ); + }); + + it('flush is idempotent and safe when queue is empty', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, }); + + expect(() => observer.flush()).not.toThrow(); + expect(() => observer.flush()).not.toThrow(); + expect(sink.getWrittenItems()).toStrictEqual([]); }); - it('flush does not flush observed entries when not subscribed', () => { - const observer = new PerformanceObserverSink(options); + it('flush is safe when sink is closed', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + flushThreshold: 10, + }); + observer.subscribe(); performance.mark('test-mark'); - observer.flush(); - expect(encode).not.toHaveBeenCalled(); - expect(sink.getWrittenItems()).toStrictEqual([]); + sink.close(); + + expect(() => observer.flush()).not.toThrow(); + expect(() => observer.flush()).not.toThrow(); + + observer.unsubscribe(); }); - it('unsubscribe is isomorphic and calls observe on internal PerformanceObserver', () => { + it('unsubscribe is idempotent and calls disconnect on internal PerformanceObserver', () => { const observerSink = new PerformanceObserverSink(options); observerSink.subscribe(); @@ -254,22 +367,57 @@ describe('PerformanceObserverSink', () => { expect(MockPerformanceObserver.instances).toHaveLength(0); }); + it('observer callback throws encodePerfEntry errors immediately', () => { + const failingEncode = vi.fn(() => { + throw new Error('Encode failed'); + }); + + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry: failingEncode, + flushThreshold: 10, + }); + + observer.subscribe(); + + const mockObserver = MockPerformanceObserver.lastInstance(); + expect(() => + mockObserver?.emit([ + { + name: 'test-mark', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]), + ).toThrow('Encode failed'); + }); + it('flush wraps sink write errors with descriptive error message', () => { const failingSink = { write: vi.fn(() => { throw new Error('Sink write failed'); }), + isClosed: vi.fn(() => false), }; const observer = new PerformanceObserverSink({ sink: failingSink as any, - encode, - flushThreshold: 1, + encodePerfEntry, + flushThreshold: 10, }); observer.subscribe(); - performance.mark('test-mark'); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'test-mark', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); expect(() => observer.flush()).toThrow( expect.objectContaining({ @@ -281,28 +429,242 @@ describe('PerformanceObserverSink', () => { ); }); - it('flush wraps encode errors with descriptive error message', () => { - const failingEncode = vi.fn(() => { - throw new Error('Encode failed'); + it('throws error when subscribing with sink that is not open', () => { + const closedSink = new MockSink(); + const observer = new PerformanceObserverSink({ + sink: closedSink, + encodePerfEntry, }); + expect(() => observer.subscribe()).toThrow( + 'Sink MockSink must be opened before subscribing PerformanceObserver', + ); + }); + + it('getStats returns dropped and queued item information', () => { const observer = new PerformanceObserverSink({ sink, - encode: failingEncode, - flushThreshold: 1, + encodePerfEntry, + maxQueueSize: 20, + flushThreshold: 10, + }); + + expect(observer.getStats()).toStrictEqual( + expect.objectContaining({ + queued: 0, + dropped: 0, + }), + ); + }); + + it('getStats returns correct queue item count', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + flushThreshold: 10, }); observer.subscribe(); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'start-operation', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); - performance.mark('test-mark'); + expect(observer.getStats()).toStrictEqual( + expect.objectContaining({ + queued: 1, + }), + ); + }); - expect(() => observer.flush()).toThrow( + it('getStats returns correct dropped count when queue overflows', () => { + const smallQueueSize = 2; + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + maxQueueSize: smallQueueSize, + flushThreshold: smallQueueSize, + }); + + const flushSpy = vi.spyOn(observer, 'flush').mockImplementation(() => {}); + + observer.subscribe(); + + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'mark-1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'mark-2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'mark-3', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + + expect(observer.getStats()).toStrictEqual( expect.objectContaining({ - message: 'PerformanceObserverSink failed to write items to sink.', - cause: expect.objectContaining({ - message: 'Encode failed', - }), + queued: 2, + dropped: 1, }), ); + + flushSpy.mockRestore(); + observer.unsubscribe(); + }); + + it('getStats returns correct written count when queue overflows', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + flushThreshold: 2, + }); + + observer.subscribe(); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'write-test-1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'write-test-2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'write-test-3', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + observer.flush(); + + expect(observer.getStats()).toStrictEqual( + expect.objectContaining({ + written: 3, + }), + ); + }); + + it('tracks addedSinceLastFlush counter correctly', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + flushThreshold: 10, + }); + + expect(observer.getStats().addedSinceLastFlush).toBe(0); + + observer.subscribe(); + const mockObserver = MockPerformanceObserver.lastInstance(); + + mockObserver?.emit([ + { + name: 'test-1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + + expect(observer.getStats().addedSinceLastFlush).toBe(1); + + mockObserver?.emit([ + { + name: 'test-2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + + expect(observer.getStats().addedSinceLastFlush).toBe(2); + + observer.flush(); + expect(observer.getStats().addedSinceLastFlush).toBe(0); + + mockObserver?.emit([ + { + name: 'test-3', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'test-4', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + + expect(observer.getStats()).toHaveProperty('addedSinceLastFlush', 2); + + observer.unsubscribe(); + }); + + it('clears queue without writing when sink is closed during flush', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + flushThreshold: 10, // High threshold to prevent automatic flushing + }); + + observer.subscribe(); + + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'test-entry-1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'test-entry-2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + + // Verify entries are queued + expect(observer.getStats().queued).toBe(2); + expect(observer.getStats().written).toBe(0); + + // Close the sink + sink.close(); + + // Flush should clear queue without writing + observer.flush(); + + // Verify queue is cleared but written count unchanged + expect(observer.getStats().queued).toBe(0); + expect(observer.getStats().written).toBe(0); + + // Verify sink received no additional writes + expect(sink.getWrittenItems()).toHaveLength(0); + + observer.unsubscribe(); }); }); diff --git a/packages/utils/src/lib/profiler/profiler.int.test.ts b/packages/utils/src/lib/profiler/profiler.int.test.ts index 949f66649..3bbb636bd 100644 --- a/packages/utils/src/lib/profiler/profiler.int.test.ts +++ b/packages/utils/src/lib/profiler/profiler.int.test.ts @@ -1,7 +1,8 @@ -import { performance } from 'node:perf_hooks'; import { beforeEach, describe, expect, it } from 'vitest'; +import { MockTraceEventFileSink } from '../../../mocks/sink.mock.js'; +import type { PerformanceEntryEncoder } from '../performance-observer.js'; import type { ActionTrackEntryPayload } from '../user-timing-extensibility-api.type.js'; -import { Profiler } from './profiler.js'; +import { NodejsProfiler, Profiler } from './profiler.js'; describe('Profiler Integration', () => { let profiler: Profiler>; @@ -24,14 +25,14 @@ describe('Profiler Integration', () => { }); it('should create complete performance timeline for sync operation', () => { - const result = profiler.measure('sync-test', () => - Array.from({ length: 1000 }, (_, i) => i).reduce( - (sum, num) => sum + num, - 0, + expect( + profiler.measure('sync-test', () => + Array.from({ length: 1000 }, (_, i) => i).reduce( + (sum, num) => sum + num, + 0, + ), ), - ); - - expect(result).toBe(499_500); + ).toBe(499_500); const marks = performance.getEntriesByType('mark'); const measures = performance.getEntriesByType('measure'); @@ -67,12 +68,12 @@ describe('Profiler Integration', () => { }); it('should create complete performance timeline for async operation', async () => { - const result = await profiler.measureAsync('async-test', async () => { - await new Promise(resolve => setTimeout(resolve, 10)); - return 'async-result'; - }); - - expect(result).toBe('async-result'); + await expect( + profiler.measureAsync('async-test', async () => { + await new Promise(resolve => setTimeout(resolve, 10)); + return 'async-result'; + }), + ).resolves.toBe('async-result'); const marks = performance.getEntriesByType('mark'); const measures = performance.getEntriesByType('measure'); @@ -168,7 +169,7 @@ describe('Profiler Integration', () => { it('should create proper DevTools payloads for tracks', () => { profiler.measure('track-test', (): string => 'result', { - success: result => ({ + success: (result: string) => ({ properties: [['result', result]], tooltipText: 'Track test completed', }), @@ -195,8 +196,8 @@ describe('Profiler Integration', () => { }); it('should merge track defaults with measurement options', () => { - profiler.measure('sync-op', () => 'sync-result', { - success: result => ({ + profiler.measure('sync-op', (): string => 'sync-result', { + success: (result: string) => ({ properties: [ ['operation', 'sync'], ['result', result], @@ -282,8 +283,7 @@ describe('Profiler Integration', () => { it('should not create performance entries when disabled', async () => { profiler.setEnabled(false); - const syncResult = profiler.measure('disabled-sync', () => 'sync'); - expect(syncResult).toBe('sync'); + expect(profiler.measure('disabled-sync', () => 'sync')).toBe('sync'); const asyncResult = profiler.measureAsync( 'disabled-async', @@ -297,3 +297,187 @@ describe('Profiler Integration', () => { expect(performance.getEntriesByType('measure')).toHaveLength(0); }); }); + +describe('NodeJS Profiler Integration', () => { + const simpleEncoder: PerformanceEntryEncoder = entry => { + if (entry.entryType === 'measure') { + return [`${entry.name}:${entry.duration.toFixed(2)}ms`]; + } + return []; + }; + + let mockSink: MockTraceEventFileSink; + let nodejsProfiler: NodejsProfiler; + + beforeEach(() => { + mockSink = new MockTraceEventFileSink(); + + nodejsProfiler = new NodejsProfiler({ + prefix: 'test', + track: 'test-track', + sink: mockSink, + encodePerfEntry: simpleEncoder, + enabled: true, + }); + }); + + it('should initialize with sink opened when enabled', () => { + expect(mockSink.isClosed()).toBe(false); + expect(nodejsProfiler.isEnabled()).toBe(true); + expect(mockSink.open).toHaveBeenCalledTimes(1); + }); + + it('should create performance entries and write to sink', () => { + expect(nodejsProfiler.measure('test-operation', () => 'success')).toBe( + 'success', + ); + }); + + it('should handle async operations', async () => { + await expect( + nodejsProfiler.measureAsync('async-test', async () => { + await new Promise(resolve => setTimeout(resolve, 1)); + return 'async-result'; + }), + ).resolves.toBe('async-result'); + }); + + it('should disable profiling and close sink', () => { + nodejsProfiler.setEnabled(false); + expect(nodejsProfiler.isEnabled()).toBe(false); + expect(mockSink.isClosed()).toBe(true); + expect(mockSink.close).toHaveBeenCalledTimes(1); + + expect(nodejsProfiler.measure('disabled-test', () => 'success')).toBe( + 'success', + ); + + expect(mockSink.getWrittenItems()).toHaveLength(0); + }); + + it('should re-enable profiling correctly', () => { + nodejsProfiler.setEnabled(false); + nodejsProfiler.setEnabled(true); + + expect(nodejsProfiler.isEnabled()).toBe(true); + expect(mockSink.isClosed()).toBe(false); + expect(mockSink.open).toHaveBeenCalledTimes(2); + + expect(nodejsProfiler.measure('re-enabled-test', () => 42)).toBe(42); + }); + + it('should support custom tracks', () => { + const profilerWithTracks = new NodejsProfiler({ + prefix: 'api-server', + track: 'HTTP', + tracks: { + db: { track: 'Database', color: 'secondary' }, + cache: { track: 'Cache', color: 'primary' }, + }, + sink: mockSink, + encodePerfEntry: simpleEncoder, + }); + + expect( + profilerWithTracks.measure('user-lookup', () => 'user123', { + track: 'cache', + }), + ).toBe('user123'); + }); + + it('should capture buffered entries when buffered option is enabled', () => { + const bufferedProfiler = new NodejsProfiler({ + prefix: 'buffered-test', + track: 'Test', + sink: mockSink, + encodePerfEntry: simpleEncoder, + captureBufferedEntries: true, + enabled: true, + }); + + const bufferedStats = bufferedProfiler.getStats(); + expect(bufferedStats.enabled).toBe(true); + expect(bufferedStats.walOpen).toBe(true); + expect(bufferedStats.isSubscribed).toBe(true); + expect(bufferedStats.queued).toBe(0); + expect(bufferedStats.dropped).toBe(0); + expect(bufferedStats.written).toBe(0); + + bufferedProfiler.setEnabled(false); + }); + + it('should return correct getStats with dropped and written counts', () => { + const statsProfiler = new NodejsProfiler({ + prefix: 'stats-test', + track: 'Stats', + sink: mockSink, + encodePerfEntry: simpleEncoder, + maxQueueSize: 2, + flushThreshold: 2, + enabled: true, + }); + + expect(statsProfiler.measure('test-op', () => 'result')).toBe('result'); + + const stats = statsProfiler.getStats(); + expect(stats.enabled).toBe(true); + expect(stats.walOpen).toBe(true); + expect(stats.isSubscribed).toBe(true); + expect(typeof stats.queued).toBe('number'); + expect(typeof stats.dropped).toBe('number'); + expect(typeof stats.written).toBe('number'); + + statsProfiler.setEnabled(false); + }); + + it('should provide comprehensive queue statistics via getStats', () => { + const profiler = new NodejsProfiler({ + prefix: 'stats-profiler', + track: 'Stats', + sink: mockSink, + encodePerfEntry: simpleEncoder, + maxQueueSize: 3, + flushThreshold: 2, // Low threshold to trigger flushing + enabled: true, + }); + + // Initial stats should be zero + const initialStats = profiler.getStats(); + expect(initialStats.enabled).toBe(true); + expect(initialStats.walOpen).toBe(true); + expect(initialStats.isSubscribed).toBe(true); + expect(initialStats.queued).toBe(0); + expect(initialStats.dropped).toBe(0); + expect(initialStats.written).toBe(0); + + // Add measurements that will trigger flushing + profiler.measure('operation-1', () => 'result1'); + profiler.measure('operation-2', () => 'result2'); + + const statsAfterMeasurements = profiler.getStats(); + + // Verify all stats are present and are numbers + expect(typeof statsAfterMeasurements.queued).toBe('number'); + expect(typeof statsAfterMeasurements.dropped).toBe('number'); + expect(typeof statsAfterMeasurements.written).toBe('number'); + + // Stats should be non-negative + expect(statsAfterMeasurements.queued).toBeGreaterThanOrEqual(0); + expect(statsAfterMeasurements.dropped).toBeGreaterThanOrEqual(0); + expect(statsAfterMeasurements.written).toBeGreaterThanOrEqual(0); + + // Disable profiler to flush remaining items + profiler.setEnabled(false); + + const finalStats = profiler.getStats(); + expect(finalStats.enabled).toBe(false); // Should be disabled + expect(finalStats.walOpen).toBe(false); // WAL should be closed when disabled + expect(finalStats.isSubscribed).toBe(false); // Should not be subscribed when disabled + expect(finalStats.queued).toBe(0); // Should be cleared when disabled + }); + + it('should write to file on flush', () => { + // @TODO: Implement test when PR #1210 is merged + expect(true).toBe(true); + }); +}); diff --git a/packages/utils/src/lib/profiler/profiler.ts b/packages/utils/src/lib/profiler/profiler.ts index 130e28c44..4a7aae6aa 100644 --- a/packages/utils/src/lib/profiler/profiler.ts +++ b/packages/utils/src/lib/profiler/profiler.ts @@ -1,5 +1,10 @@ import process from 'node:process'; import { isEnvVarEnabled } from '../env.js'; +import { + type PerformanceObserverOptions, + PerformanceObserverSink, +} from '../performance-observer.js'; +import type { Recoverable, Sink } from '../sink-source.type.js'; import { type ActionTrackConfigs, type MeasureCtxOptions, @@ -226,3 +231,152 @@ export class Profiler { } } } + +/** + * Options for configuring a NodejsProfiler instance. + * + * Extends ProfilerOptions with a required sink parameter. + * + * @template Tracks - Record type defining available track names and their configurations + */ +export type NodejsProfilerOptions< + DomainEvents, + Tracks extends Record, +> = ProfilerOptions & + Omit, 'sink'> & { + /** Sink for buffering and flushing performance data + * @NOTE this is dummy code and will be replaced by PR #1210 + **/ + sink: Sink & Recoverable; + }; + +/** + * Performance profiler with automatic process exit handling for buffered performance data. + * + * This class extends the base {@link Profiler} with automatic flushing of performance data + * when the process exits. It accepts a {@link PerformanceObserverSink} that buffers performance + * entries and ensures they are written out during process termination, even for unexpected exits. + * + * The sink defines the output format for performance data, enabling flexible serialization + * to various formats such as DevTools TraceEvent JSON, OpenTelemetry protocol buffers, + * or custom domain-specific formats. + * + * The profiler automatically subscribes to the performance observer when enabled and installs + * exit handlers that flush buffered data on process termination (signals, fatal errors, or normal exit). + * + */ +export class NodejsProfiler< + DomainEvents, + Tracks extends Record = Record< + string, + ActionTrackEntryPayload + >, +> extends Profiler { + #sink: Sink & Recoverable; + #performanceObserverSink: PerformanceObserverSink; + #observing = false; + + /** + * Creates a new NodejsProfiler instance with automatic exit handling. + * + * @param options - Configuration options including the sink + * @param options.sink - Sink for buffering and flushing performance data + * @param options.tracks - Custom track configurations merged with defaults + * @param options.prefix - Prefix for all measurement names + * @param options.track - Default track name for measurements + * @param options.trackGroup - Default track group for organization + * @param options.color - Default color for track entries + * @param options.enabled - Whether profiling is enabled (defaults to CP_PROFILING env var) + * + */ + constructor(options: NodejsProfilerOptions) { + const { + sink, + encodePerfEntry, + captureBufferedEntries, + flushThreshold, + maxQueueSize, + ...profilerOptions + } = options; + + super(profilerOptions); + + this.#sink = sink; + + this.#performanceObserverSink = new PerformanceObserverSink({ + sink, + encodePerfEntry, + captureBufferedEntries, + flushThreshold, + maxQueueSize, + }); + + this.#setObserving(this.isEnabled()); + } + + #setObserving(observing: boolean): void { + if (this.#observing === observing) { + return; + } + this.#observing = observing; + + if (observing) { + this.#sink.open(); + this.#performanceObserverSink.subscribe(); + } else { + this.#performanceObserverSink.unsubscribe(); + this.#performanceObserverSink.flush(); + this.#sink.close(); + } + } + + /** + * Returns current queue statistics and profiling state for monitoring and debugging. + * + * Provides insight into the current state of the performance entry queue, observer status, and WAL state, + * useful for monitoring memory usage, processing throughput, and profiling lifecycle. + * + * @returns Object containing profiling state and queue statistics + */ + getStats() { + return { + enabled: this.isEnabled(), + walOpen: !this.#sink.isClosed(), + ...this.#performanceObserverSink.getStats(), + }; + } + + /** + * Sets enabled state for this profiler and manages sink/observer lifecycle. + * + * Design: Environment = default, Runtime = override + * - Environment variables define defaults (read once at construction) + * - This method provides runtime control without mutating globals + * - Child processes are unaffected by runtime enablement changes + * + * Invariant: enabled ↔ sink + observer state + * - enabled === true → sink open + observer subscribed + * - enabled === false → sink closed + observer unsubscribed + * + * @param enabled - Whether profiling should be enabled + */ + setEnabled(enabled: boolean): void { + if (this.isEnabled() === enabled) { + return; + } + super.setEnabled(enabled); + this.#setObserving(enabled); + } + + /** + * Flushes any buffered performance data to the sink. + * + * Forces immediate writing of all queued performance entries to the configured sink, + * ensuring no performance data is lost. This method is useful for manual control + * over when buffered data is written, complementing the automatic flushing that + * occurs during process exit or when thresholds are reached. + */ + flush(): void { + this.#performanceObserverSink.flush(); + } +} diff --git a/packages/utils/src/lib/profiler/profiler.unit.test.ts b/packages/utils/src/lib/profiler/profiler.unit.test.ts index 0e285deb2..0095b81c4 100644 --- a/packages/utils/src/lib/profiler/profiler.unit.test.ts +++ b/packages/utils/src/lib/profiler/profiler.unit.test.ts @@ -1,13 +1,22 @@ import { performance } from 'node:perf_hooks'; import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { MockTraceEventFileSink } from '../../../mocks/sink.mock.js'; +import type { PerformanceEntryEncoder } from '../performance-observer.js'; +import * as PerfObserverModule from '../performance-observer.js'; import type { ActionTrackEntryPayload } from '../user-timing-extensibility-api.type.js'; -import { Profiler, type ProfilerOptions } from './profiler.js'; +import { + NodejsProfiler, + type NodejsProfilerOptions, + Profiler, + type ProfilerOptions, +} from './profiler.js'; describe('Profiler', () => { const getProfiler = (overrides?: Partial) => new Profiler({ prefix: 'cp', track: 'test-track', + enabled: false, ...overrides, }); @@ -24,7 +33,10 @@ describe('Profiler', () => { it('constructor should initialize with default enabled state from env', () => { vi.stubEnv('CP_PROFILING', 'true'); - const profilerWithEnv = getProfiler(); + const profilerWithEnv = new Profiler({ + prefix: 'cp', + track: 'test-track', + }); expect(profilerWithEnv.isEnabled()).toBe(true); }); @@ -193,7 +205,7 @@ describe('Profiler', () => { detail: { devtools: expect.objectContaining({ dataType: 'marker', - color: 'primary', // Should use default color + color: 'primary', tooltipText: 'Test marker with default color', }), }, @@ -424,3 +436,168 @@ describe('Profiler', () => { expect(workFn).toHaveBeenCalled(); }); }); + +const simpleEncoder: PerformanceEntryEncoder = entry => { + if (entry.entryType === 'measure') { + return [`${entry.name}:${entry.duration.toFixed(2)}ms`]; + } + return []; +}; + +describe('NodejsProfiler', () => { + const getNodejsProfiler = ( + overrides?: Partial< + NodejsProfilerOptions> + >, + ) => { + const sink = new MockTraceEventFileSink(); + + const mockPerfObserverSink = { + subscribe: vi.fn(), + unsubscribe: vi.fn(), + isSubscribed: vi.fn().mockReturnValue(false), + encode: vi.fn(), + flush: vi.fn(), + getStats: vi.fn().mockReturnValue({ + isSubscribed: false, + queued: 0, + dropped: 0, + written: 0, + maxQueueSize: 10_000, + flushThreshold: 20, + addedSinceLastFlush: 0, + buffered: true, + }), + }; + vi.spyOn(PerfObserverModule, 'PerformanceObserverSink').mockReturnValue( + mockPerfObserverSink as any, + ); + + vi.spyOn(sink, 'open'); + vi.spyOn(sink, 'close'); + + const profiler = new NodejsProfiler({ + prefix: 'test', + track: 'test-track', + sink, + encodePerfEntry: simpleEncoder, + ...overrides, + }); + + return { sink, perfObserverSink: mockPerfObserverSink, profiler }; + }; + + it('should export NodejsProfiler class', () => { + expect(typeof NodejsProfiler).toBe('function'); + }); + + it('should have required static structure', () => { + const proto = NodejsProfiler.prototype; + expect(typeof proto.measure).toBe('function'); + expect(typeof proto.measureAsync).toBe('function'); + expect(typeof proto.marker).toBe('function'); + expect(typeof proto.setEnabled).toBe('function'); + expect(typeof proto.isEnabled).toBe('function'); + }); + + it('should inherit from Profiler', () => { + expect(Object.getPrototypeOf(NodejsProfiler.prototype)).toBe( + Profiler.prototype, + ); + }); + + it('should initialize with sink opened when enabled is true', () => { + const { sink, perfObserverSink } = getNodejsProfiler({ enabled: true }); + expect(sink.isClosed()).toBe(false); + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should initialize with sink closed when enabled is false', () => { + const { sink, perfObserverSink } = getNodejsProfiler({ enabled: false }); + expect(sink.isClosed()).toBe(true); + expect(sink.open).not.toHaveBeenCalled(); + expect(perfObserverSink.subscribe).not.toHaveBeenCalled(); + }); + + it('should open sink and subscribe observer when enabling', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + profiler.setEnabled(true); + + expect(profiler.isEnabled()).toBe(true); + expect(sink.isClosed()).toBe(false); + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should close sink and unsubscribe observer when disabling', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + profiler.setEnabled(false); + + expect(profiler.isEnabled()).toBe(false); + expect(sink.isClosed()).toBe(true); + expect(sink.close).toHaveBeenCalledTimes(1); + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + }); + + it('should be idempotent - no-op when setting same state', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + + profiler.setEnabled(true); + + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should perform measurements when enabled', () => { + const { profiler } = getNodejsProfiler({ enabled: true }); + + const result = profiler.measure('test-op', () => 'success'); + expect(result).toBe('success'); + }); + + it('should skip sink operations when disabled', () => { + const { sink, profiler } = getNodejsProfiler({ enabled: false }); + + const result = profiler.measure('disabled-op', () => 'success'); + expect(result).toBe('success'); + + expect(sink.getWrittenItems()).toHaveLength(0); + }); + + it('should flush buffered performance data to sink', () => { + const { perfObserverSink, profiler } = getNodejsProfiler(); + + profiler.flush(); + + expect(perfObserverSink.flush).toHaveBeenCalledTimes(1); + }); + + it('getStats should return current stats', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + expect(profiler.getStats()).toStrictEqual({ + enabled: false, + walOpen: false, + isSubscribed: false, + queued: 0, + dropped: 0, + written: 0, + maxQueueSize: 10_000, + flushThreshold: 20, + addedSinceLastFlush: 0, + buffered: true, + }); + }); +}); diff --git a/packages/utils/src/lib/sink-source.type.ts b/packages/utils/src/lib/sink-source.type.ts index ee096e31f..5f94584bd 100644 --- a/packages/utils/src/lib/sink-source.type.ts +++ b/packages/utils/src/lib/sink-source.type.ts @@ -29,8 +29,8 @@ export type Observer = { isSubscribed: () => boolean; }; -export type Recoverable = { - recover: () => RecoverResult; +export type Recoverable = { + recover: () => RecoverResult; repack: () => void; finalize: () => void; };