Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c03b2c4
refactor: add general file sink logic
Jan 8, 2026
756f8c0
feat: add file sink classes
Jan 9, 2026
4c95897
Merge branch 'main' into feat/utils/file-sink
Jan 10, 2026
b0c9cc4
refactor: add trace json file
Jan 12, 2026
1f6e326
refactor: wip
Jan 14, 2026
6bcb73b
refactor: wip
Jan 14, 2026
dfb43be
Merge remote-tracking branch 'origin/main' into feat/utils/file-sink
Jan 14, 2026
3fe6871
refactor: wip arc
BioPhoton Jan 14, 2026
67e004c
refactor: fix lint
BioPhoton Jan 14, 2026
a4a9f10
Merge branch 'main' into feat/utils/file-sink
BioPhoton Jan 16, 2026
0b1fd3a
refactor: wip
BioPhoton Jan 16, 2026
bf0ecb9
refactor: wip
BioPhoton Jan 16, 2026
c88ffe9
refactor: wip
BioPhoton Jan 16, 2026
d6e75e6
refactor: wip
BioPhoton Jan 16, 2026
21575b3
refactor: wip
BioPhoton Jan 16, 2026
09437a9
refactor: wip
BioPhoton Jan 16, 2026
4705394
refactor: wip
BioPhoton Jan 16, 2026
e0c210e
refactor: wip
BioPhoton Jan 16, 2026
29016b0
refactor: wip
BioPhoton Jan 17, 2026
e60ea2b
refactor: wip
BioPhoton Jan 17, 2026
5ea2ac4
refactor: wip
BioPhoton Jan 17, 2026
73ff371
refactor: wip
BioPhoton Jan 17, 2026
34b0eb8
refactor: wip
BioPhoton Jan 17, 2026
dd2e959
refactor: fix lint
BioPhoton Jan 17, 2026
64501ec
refactor: fix tests
BioPhoton Jan 17, 2026
eb559f5
refactor: add tests
BioPhoton Jan 17, 2026
e20e804
refactor: wip
BioPhoton Jan 17, 2026
8f812c5
refactor: wip
BioPhoton Jan 17, 2026
a839b9b
refactor: fix lint
BioPhoton Jan 17, 2026
27b1506
refactor: fix lint
BioPhoton Jan 17, 2026
592fb89
refactor: wip
BioPhoton Jan 17, 2026
a19070e
refactor: wip
BioPhoton Jan 17, 2026
4f8cd2d
refactor: wip
BioPhoton Jan 18, 2026
bc86d08
refactor: wip
BioPhoton Jan 18, 2026
a530134
refactor: wip
BioPhoton Jan 18, 2026
d9eb84b
refactor: remove path logic
BioPhoton Jan 18, 2026
67f0c1e
refactor: fix lint
BioPhoton Jan 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/utils/eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ export default tseslint.config(
},
},
},
{
files: ['packages/utils/src/lib/**/wal*.ts'],
rules: {
'n/no-sync': 'off',
},
},
{
files: ['**/*.json'],
rules: {
Expand Down
48 changes: 40 additions & 8 deletions packages/utils/mocks/sink.mock.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,62 @@
import type { Sink } from '../src/lib/sink-source.type';
import { WriteAheadLogFile } from '../src/lib/wal.js';
import type { Codec } from '../src/lib/wal.js';

export class MockSink implements Sink<string, string> {
export class MockFileSink implements WriteAheadLogFile<string> {
private writtenItems: string[] = [];
private closed = false;

constructor(options?: { file?: string; codec?: Codec<string> }) {
const file = options?.file || '/tmp/mock-sink.log';
const codec = options?.codec || {
encode: (input: string) => input,
decode: (data: string) => data,
};
}

#fd: number | null = null;

get path(): string {
return '/tmp/mock-sink.log';
}

getPath(): string {
return this.path;
}

open(): void {
this.closed = false;
this.#fd = 1; // Mock file descriptor
}

write(input: string): void {
this.writtenItems.push(input);
append(v: string): void {
this.writtenItems.push(v);
}

close(): void {
this.#fd = null;
this.closed = true;
}

isClosed(): boolean {
return this.closed;
return this.#fd === null;
}

recover(): any {
return {
records: this.writtenItems,
errors: [],
partialTail: null,
};
}

encode(input: string): string {
return `${input}-${this.constructor.name}-encoded`;
repack(): void {
// Mock implementation - do nothing
}

getWrittenItems(): string[] {
return [...this.writtenItems];
}

clearWrittenItems(): void {
this.writtenItems = [];
}
}
10 changes: 5 additions & 5 deletions packages/utils/src/lib/clock-epoch.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { defaultClock, epochClock } from './clock-epoch.js';
describe('epochClock', () => {
it('should create epoch clock with defaults', () => {
const c = epochClock();
expect(c.timeOriginMs).toBe(500_000);
expect(c.timeOriginMs).toBe(1_700_000_000_000);
expect(c.tid).toBe(2);
expect(c.pid).toBe(10_001);
expect(c.fromEpochMs).toBeFunction();
Expand Down Expand Up @@ -33,8 +33,8 @@ describe('epochClock', () => {

it('should support performance clock by default for epochNowUs', () => {
const c = epochClock();
expect(c.timeOriginMs).toBe(500_000);
expect(c.epochNowUs()).toBe(500_000_000);
expect(c.timeOriginMs).toBe(1_700_000_000_000);
expect(c.epochNowUs()).toBe(1_700_000_000_000_000);
});

it.each([
Expand All @@ -56,8 +56,8 @@ describe('epochClock', () => {
});

it.each([
[0, 500_000_000],
[1000, 501_000_000],
[0, 1_700_000_000_000_000],
[1000, 1_700_000_001_000_000],
])(
'should convert performance milliseconds to microseconds',
(perfMs, expected) => {
Expand Down
6 changes: 3 additions & 3 deletions packages/utils/src/lib/performance-observer.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ import {
it,
vi,
} from 'vitest';
import { MockSink } from '../../mocks/sink.mock';
import { MockFileSink } from '../../mocks/sink.mock';
import {
type PerformanceObserverOptions,
PerformanceObserverSink,
} from './performance-observer.js';

describe('PerformanceObserverSink', () => {
let encode: MockedFunction<(entry: PerformanceEntry) => string[]>;
let sink: MockSink;
let sink: MockFileSink;
let options: PerformanceObserverOptions<string>;

const awaitObserverCallback = () =>
new Promise(resolve => setTimeout(resolve, 10));

beforeEach(() => {
sink = new MockSink();
sink = new MockFileSink();
encode = vi.fn((entry: PerformanceEntry) => [
`${entry.name}:${entry.entryType}`,
]);
Expand Down
12 changes: 5 additions & 7 deletions packages/utils/src/lib/performance-observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,24 @@ import {
type PerformanceObserverEntryList,
performance,
} from 'node:perf_hooks';
import type { Buffered, Encoder, Observer, Sink } from './sink-source.type';
import type { AppendableSink } from './wal.js';

const OBSERVED_TYPES = ['mark', 'measure'] as const;
type ObservedEntryType = 'mark' | 'measure';
export const DEFAULT_FLUSH_THRESHOLD = 20;

export type PerformanceObserverOptions<T> = {
sink: Sink<T, unknown>;
sink: AppendableSink<T>;
encode: (entry: PerformanceEntry) => T[];
buffered?: boolean;
flushThreshold?: number;
};

export class PerformanceObserverSink<T>
implements Observer, Buffered, Encoder<PerformanceEntry, T[]>
{
export class PerformanceObserverSink<T> {
#encode: (entry: PerformanceEntry) => T[];
#buffered: boolean;
#flushThreshold: number;
#sink: Sink<T, unknown>;
#sink: AppendableSink<T>;
#observer: PerformanceObserver | undefined;

#pendingCount = 0;
Expand Down Expand Up @@ -84,7 +82,7 @@ export class PerformanceObserverSink<T>
try {
fresh
.flatMap(entry => this.encode(entry))
.forEach(item => this.#sink.write(item));
.forEach(item => this.#sink.append(item));

this.#written.set(t, written + fresh.length);
} catch (error) {
Expand Down
54 changes: 42 additions & 12 deletions packages/utils/src/lib/performance-observer.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,28 @@ import {
vi,
} from 'vitest';
import { MockPerformanceObserver } from '@code-pushup/test-utils';
import { MockSink } from '../../mocks/sink.mock';
import { MockFileSink } from '../../mocks/sink.mock';
import {
type PerformanceObserverOptions,
PerformanceObserverSink,
} from './performance-observer.js';
import type { Codec } from './wal.js';

describe('PerformanceObserverSink', () => {
let encode: MockedFunction<(entry: PerformanceEntry) => string[]>;
let sink: MockSink;
let sink: MockFileSink;
let options: PerformanceObserverOptions<string>;

beforeEach(() => {
vi.clearAllMocks();
sink = new MockSink();
sink = new MockFileSink();
encode = vi.fn((entry: PerformanceEntry) => [
`${entry.name}:${entry.entryType}`,
]);
options = {
sink,
encode,
// we test buffered behavior separately

flushThreshold: 1,
};

Expand All @@ -50,24 +51,21 @@ describe('PerformanceObserverSink', () => {
}),
).not.toThrow();
expect(MockPerformanceObserver.instances).toHaveLength(0);
// Instance creation covers the default flushThreshold assignment
});

it('automatically flushes when pendingCount reaches flushThreshold', () => {
const observer = new PerformanceObserverSink({
sink,
encode,
flushThreshold: 2, // Set threshold to 2
flushThreshold: 2,
});
observer.subscribe();

const mockObserver = MockPerformanceObserver.lastInstance();

// Emit 1 entry - should not trigger flush yet (pendingCount = 1 < 2)
mockObserver?.emitMark('first-mark');
expect(sink.getWrittenItems()).toStrictEqual([]);

// Emit 1 more entry - should trigger flush (pendingCount = 2 >= 2)
mockObserver?.emitMark('second-mark');
expect(sink.getWrittenItems()).toStrictEqual([
'first-mark:mark',
Expand Down Expand Up @@ -142,7 +140,7 @@ describe('PerformanceObserverSink', () => {
it('internal PerformanceObserver should process observed entries', () => {
const observer = new PerformanceObserverSink({
...options,
flushThreshold: 20, // Disable automatic flushing for this test
flushThreshold: 20,
});
observer.subscribe();

Expand Down Expand Up @@ -255,12 +253,22 @@ describe('PerformanceObserverSink', () => {
});

it('flush wraps sink write errors with descriptive error message', () => {
const failingSink = {
write: vi.fn(() => {
const failingCodec: Codec<string> = {
encode: () => {
throw new Error('Sink write failed');
}),
},
decode: (data: string) => data,
};

const failingSink = new MockFileSink({
file: '/test/path',
codec: failingCodec,
});

vi.spyOn(failingSink, 'append').mockImplementation(() => {
throw new Error('Sink write failed');
});

const observer = new PerformanceObserverSink({
sink: failingSink as any,
encode,
Expand Down Expand Up @@ -305,4 +313,26 @@ describe('PerformanceObserverSink', () => {
}),
);
});

it('accepts custom sinks with append method', () => {
const collectedItems: string[] = [];
const customSink = {
// eslint-disable-next-line functional/immutable-data
append: (item: string) => collectedItems.push(item),
};

const observer = new PerformanceObserverSink({
sink: customSink,
encode: (entry: PerformanceEntry) => [`${entry.name}:${entry.duration}`],
});

observer.subscribe();

const mockObserver = MockPerformanceObserver.lastInstance();
mockObserver?.emitMark('test-mark');

observer.flush();

expect(collectedItems).toContain('test-mark:0');
});
});
5 changes: 5 additions & 0 deletions packages/utils/src/lib/profiler/constants.ts
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
export const PROFILER_ENABLED_ENV_VAR = 'CP_PROFILING';
export const PROFILER_COORDINATOR_FLAG_ENV_VAR = 'CP_PROFILER_COORDINATOR';
export const PROFILER_ORIGIN_PID_ENV_VAR = 'CP_PROFILER_ORIGIN_PID';
export const PROFILER_DIRECTORY_ENV_VAR = 'CP_PROFILER_DIR';
export const PROFILER_BASE_NAME = 'trace';
export const PROFILER_DIRECTORY = './tmp/profiles';
11 changes: 11 additions & 0 deletions packages/utils/src/lib/profiler/profiler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import process from 'node:process';
import { threadId } from 'node:worker_threads';
import { isEnvVarEnabled } from '../env.js';
import {
type ActionTrackConfigs,
Expand All @@ -16,6 +17,14 @@ import type {
} from '../user-timing-extensibility-api.type.js';
import { PROFILER_ENABLED_ENV_VAR } from './constants.js';

/**
* Generates a unique profiler ID based on performance time origin, process ID, thread ID, and instance count.
*/
export function getProfilerId() {
// eslint-disable-next-line functional/immutable-data
return `${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.${++Profiler.instanceCount}`;
}

/**
* Configuration options for creating a Profiler instance.
*
Expand Down Expand Up @@ -59,6 +68,8 @@ export type ProfilerOptions<T extends ActionTrackConfigs = ActionTrackConfigs> =
*
*/
export class Profiler<T extends ActionTrackConfigs> {
static instanceCount = 0;
readonly id = getProfilerId();
#enabled: boolean;
readonly #defaults: ActionTrackEntryPayload;
readonly tracks: Record<keyof T, ActionTrackEntryPayload> | undefined;
Expand Down
14 changes: 13 additions & 1 deletion packages/utils/src/lib/profiler/profiler.unit.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import { performance } from 'node:perf_hooks';
import { threadId } from 'node:worker_threads';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import type { ActionTrackEntryPayload } from '../user-timing-extensibility-api.type.js';
import { Profiler, type ProfilerOptions } from './profiler.js';
import { Profiler, type ProfilerOptions, getProfilerId } from './profiler.js';

describe('getProfilerId', () => {
it('should generate a unique id per process', () => {
expect(getProfilerId()).toBe(
`${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.1`,
);
expect(getProfilerId()).toBe(
`${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.2`,
);
});
});

describe('Profiler', () => {
const getProfiler = (overrides?: Partial<ProfilerOptions>) =>
Expand Down
Loading