From 473f29cd0113f1e9386258c4251ff5ecbec30d99 Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Tue, 21 May 2024 00:21:55 +0530 Subject: [PATCH 1/5] fix for cluster aggregation performance --- lib/cluster.js | 68 +++++++++++++++++++++++++++++----------- lib/metricAggregators.js | 19 ++++++++--- test/aggregatorsTest.js | 17 +++++----- test/clusterTest.js | 27 ++++++++++------ 4 files changed, 92 insertions(+), 39 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index 5cb707ed..996aff1b 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -9,8 +9,11 @@ */ const Registry = require('./registry'); -const { Grouper } = require('./util'); +const { Grouper, hashObject } = require('./util'); const { aggregators } = require('./metricAggregators'); +const fs = require('fs'); +const path = require('path'); +const os = require('os'); // We need to lazy-load the 'cluster' module as some application servers - // namely Passenger - crash when it is imported. let cluster = () => { @@ -117,6 +120,7 @@ class AggregatorRegistry extends Registry { // Aggregate gathered metrics. metricsByName.forEach(metrics => { + metrics.workerSize = metricsArr.length; const aggregatorName = metrics[0].aggregator; const aggregatorFn = aggregators[aggregatorName]; if (typeof aggregatorFn !== 'function') { @@ -175,19 +179,29 @@ function addListeners() { request.done(new Error(message.error)); return; } - - message.metrics.forEach(registry => request.responses.push(registry)); - request.pending--; - - if (request.pending === 0) { - // finalize - requests.delete(message.requestId); - clearTimeout(request.errorTimeout); - - const registry = AggregatorRegistry.aggregate(request.responses); - const promString = registry.metrics(); - request.done(null, promString); - } + + fs.readFile(message.filename, 'utf8', (err, data) => { + if(err) { + request.done(err); + return; + } else { + const metrics = JSON.parse(data); + metrics.forEach(registry => request.responses.push(registry)); + fs.unlink(message.filename, (err) => { + if(err) console.error(`Error deleting file ${message.filename}:`, err) + }); + request.pending--; + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + clearTimeout(request.errorTimeout); + + const registry = AggregatorRegistry.aggregate(request.responses); + const promString = registry.metrics(); + request.done(null, promString); + } + } + }); } }); } @@ -198,10 +212,28 @@ function addListeners() { if (message.type === GET_METRICS_REQ) { Promise.all(registries.map(r => r.getMetricsAsJSON())) .then(metrics => { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - metrics, + metrics.forEach((registry, i) => { + registry.forEach(value => { + const hash = hashObject(value); + const key = `${value.metricName}_${hash}`; + value["hash"] = key; + }); + }); + const filename = path.join(os.tmpdir(), `metrics-${process.pid}}.json`); + fs.writeFile(filename, JSON.stringify(metrics), (err) => { + if(err) { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + error: err.message, + }); + } else { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + filename, + }); + } }); }) .catch(error => { diff --git a/lib/metricAggregators.js b/lib/metricAggregators.js index c010d467..198d1691 100644 --- a/lib/metricAggregators.js +++ b/lib/metricAggregators.js @@ -1,6 +1,6 @@ 'use strict'; -const { Grouper, hashObject } = require('./util'); +const metricMap = new Map(); /** * Returns a new function that applies the `aggregatorFn` to the values. @@ -18,11 +18,22 @@ function AggregatorFactory(aggregatorFn) { aggregator: metrics[0].aggregator, }; // Gather metrics by metricName and labels. - const byLabels = new Grouper(); + if(!metricMap.get(metrics[0].name)) { + metricMap.set(metrics[0].name, new Map()); + } + let byLabels = metricMap.get(metrics[0].name); metrics.forEach(metric => { metric.values.forEach(value => { - const key = hashObject(value.labels); - byLabels.add(`${value.metricName}_${key}`, value); + let valuesArray = byLabels.get(value.hash); + if(!valuesArray) { + byLabels.set(value.hash, [value]); + } else { + if(valuesArray.length < metrics.workerSize) { + valuesArray.push(value); + } else { + byLabels.set(value.hash, [value]); + } + } }); }); // Apply aggregator function to gathered metrics. diff --git a/test/aggregatorsTest.js b/test/aggregatorsTest.js index 0010a656..67704db6 100644 --- a/test/aggregatorsTest.js +++ b/test/aggregatorsTest.js @@ -8,8 +8,8 @@ describe('aggregators', () => { name: 'metric_name', type: 'does not matter', values: [ - { labels: [], value: 1 }, - { labels: ['label1'], value: 2 }, + { labels: [], value: 1, hash: 'h1' }, + { labels: ['label1'], value: 2, hash: 'h2'}, ], }, { @@ -17,12 +17,14 @@ describe('aggregators', () => { name: 'metric_name', type: 'does not matter', values: [ - { labels: [], value: 3 }, - { labels: ['label1'], value: 4 }, + { labels: [], value: 3, hash: 'h1'}, + { labels: ['label1'], value: 4, hash: 'h2'}, ], }, ]; + metrics.workerSize = 2; + describe('sum', () => { it('properly sums values', () => { const result = aggregators.sum(metrics); @@ -102,21 +104,22 @@ describe('aggregators', () => { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 1, metricName: 'abc' }], + values: [{ labels: [], value: 1, metricName: 'abc', hash: 'h1' }], }, { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 3, metricName: 'abc' }], + values: [{ labels: [], value: 3, metricName: 'abc', hash: 'h1' }], }, { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 5, metricName: 'def' }], + values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2'}], }, ]; + metrics2.workerSize = 2; const result = aggregators.sum(metrics2); expect(result.values).toEqual([ { value: 4, labels: [], metricName: 'abc' }, diff --git a/test/clusterTest.js b/test/clusterTest.js index d1a58314..81217a38 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -3,6 +3,7 @@ const cluster = require('cluster'); const process = require('process'); const Registry = require('../lib/cluster'); +const { hash } = require('crypto'); describe.each([ ['Prometheus', Registry.PROMETHEUS_CONTENT_TYPE], @@ -61,11 +62,13 @@ describe.each([ labels: { le: 0.1, code: '300' }, value: 0, metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="0.1",code="300"}', }, { labels: { le: 10, code: '300' }, value: 1.6486727018068046, metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="10",code="300"}', }, ], aggregator: 'sum', @@ -75,9 +78,9 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.47, labels: { method: 'get', code: 200 } }, - { value: 0.64, labels: {} }, - { value: 23, labels: { method: 'post', code: '300' } }, + { value: 0.47, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' }, + { value: 0.64, labels: {}, hash: 'test_gauge{}' }, + { value: 23, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' }, ], aggregator: 'sum', }, @@ -85,14 +88,14 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075832, labels: {} }], + values: [{ value: 1502075832, labels: {}, hash: 'process_start_time_seconds{}' }], aggregator: 'omit', }, { help: 'Lag of event loop in seconds.', name: 'nodejs_eventloop_lag_seconds', type: 'gauge', - values: [{ value: 0.009, labels: {} }], + values: [{ value: 0.009, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }], aggregator: 'average', }, { @@ -103,6 +106,7 @@ describe.each([ { value: 1, labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}', }, ], aggregator: 'first', @@ -118,11 +122,13 @@ describe.each([ labels: { le: 0.1, code: '300' }, value: 0.235151, metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="0.1",code="300"}', }, { labels: { le: 10, code: '300' }, value: 1.192591, metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="10",code="300"}', }, ], aggregator: 'sum', @@ -132,9 +138,9 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.02, labels: { method: 'get', code: 200 } }, - { value: 0.24, labels: {} }, - { value: 51, labels: { method: 'post', code: '300' } }, + { value: 0.02, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' }, + { value: 0.24, labels: {}, hash: 'test_gauge{}' }, + { value: 51, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' }, ], aggregator: 'sum', }, @@ -142,14 +148,14 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075849, labels: {} }], + values: [{ value: 1502075849, labels: {}, hash: 'process_start_time_seconds{}' }], aggregator: 'omit', }, { help: 'Lag of event loop in seconds.', name: 'nodejs_eventloop_lag_seconds', type: 'gauge', - values: [{ value: 0.008, labels: {} }], + values: [{ value: 0.008, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }], aggregator: 'average', }, { @@ -160,6 +166,7 @@ describe.each([ { value: 1, labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}', }, ], aggregator: 'first', From cbe29f3c4229e253e82ac9163da1070791dae824 Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Thu, 23 May 2024 14:44:24 +0530 Subject: [PATCH 2/5] added change log and handling concurrency --- CHANGELOG.md | 4 ++++ lib/cluster.js | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96fcb5cb..5b6f62b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ project adheres to [Semantic Versioning](http://semver.org/). ### Breaking ### Changed + - Changes for cluster mode + - Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics + - Moved hashing of labels from master to worker to distribute the cpu bound hashing among workers + - Workers to write metrics in tmp file and send the file name to master to read metrics from rather than sending on IPC to keep IPC congestion free. (change in `cluster.js`) ### Added diff --git a/lib/cluster.js b/lib/cluster.js index 996aff1b..b0576ada 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -219,7 +219,8 @@ function addListeners() { value["hash"] = key; }); }); - const filename = path.join(os.tmpdir(), `metrics-${process.pid}}.json`); + // adding request id in file path to handle concurrency + const filename = path.join(os.tmpdir(), `metrics-${process.pid}-${message.requestId}.json`); fs.writeFile(filename, JSON.stringify(metrics), (err) => { if(err) { process.send({ From 78791149a76edb160ba73caf225dba8e9ebab5da Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Fri, 24 May 2024 16:33:09 +0530 Subject: [PATCH 3/5] linting and prettier fixes --- CHANGELOG.md | 9 ++++---- lib/cluster.js | 22 +++++++++--------- lib/metricAggregators.js | 10 ++++----- test/aggregatorsTest.js | 8 +++---- test/clusterTest.js | 48 +++++++++++++++++++++++++++++++++------- 5 files changed, 66 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b6f62b9..739b7032 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,11 @@ project adheres to [Semantic Versioning](http://semver.org/). ### Breaking ### Changed - - Changes for cluster mode - - Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics - - Moved hashing of labels from master to worker to distribute the cpu bound hashing among workers - - Workers to write metrics in tmp file and send the file name to master to read metrics from rather than sending on IPC to keep IPC congestion free. (change in `cluster.js`) + +- Changes for cluster mode +- Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics +- Moved hashing of labels from master to worker to distribute the cpu bound hashing among workers +- Workers to write metrics in tmp file and send the file name to master to read metrics from rather than sending on IPC to keep IPC congestion free. (change in `cluster.js`) ### Added diff --git a/lib/cluster.js b/lib/cluster.js index b0576ada..68923e6d 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -179,23 +179,22 @@ function addListeners() { request.done(new Error(message.error)); return; } - fs.readFile(message.filename, 'utf8', (err, data) => { - if(err) { + if (err) { request.done(err); return; } else { const metrics = JSON.parse(data); metrics.forEach(registry => request.responses.push(registry)); - fs.unlink(message.filename, (err) => { - if(err) console.error(`Error deleting file ${message.filename}:`, err) + fs.unlink(message.filename, e => { + if (e) + console.error(`Error deleting file ${message.filename}:`, e); }); request.pending--; if (request.pending === 0) { // finalize requests.delete(message.requestId); clearTimeout(request.errorTimeout); - const registry = AggregatorRegistry.aggregate(request.responses); const promString = registry.metrics(); request.done(null, promString); @@ -212,17 +211,20 @@ function addListeners() { if (message.type === GET_METRICS_REQ) { Promise.all(registries.map(r => r.getMetricsAsJSON())) .then(metrics => { - metrics.forEach((registry, i) => { + metrics.forEach(registry => { registry.forEach(value => { const hash = hashObject(value); const key = `${value.metricName}_${hash}`; - value["hash"] = key; + value.hash = key; }); }); // adding request id in file path to handle concurrency - const filename = path.join(os.tmpdir(), `metrics-${process.pid}-${message.requestId}.json`); - fs.writeFile(filename, JSON.stringify(metrics), (err) => { - if(err) { + const filename = path.join( + os.tmpdir(), + `metrics-${process.pid}-${message.requestId}.json`, + ); + fs.writeFile(filename, JSON.stringify(metrics), err => { + if (err) { process.send({ type: GET_METRICS_RES, requestId: message.requestId, diff --git a/lib/metricAggregators.js b/lib/metricAggregators.js index 198d1691..73ec77c2 100644 --- a/lib/metricAggregators.js +++ b/lib/metricAggregators.js @@ -18,17 +18,17 @@ function AggregatorFactory(aggregatorFn) { aggregator: metrics[0].aggregator, }; // Gather metrics by metricName and labels. - if(!metricMap.get(metrics[0].name)) { + if (!metricMap.get(metrics[0].name)) { metricMap.set(metrics[0].name, new Map()); } - let byLabels = metricMap.get(metrics[0].name); + const byLabels = metricMap.get(metrics[0].name); metrics.forEach(metric => { metric.values.forEach(value => { - let valuesArray = byLabels.get(value.hash); - if(!valuesArray) { + const valuesArray = byLabels.get(value.hash); + if (!valuesArray) { byLabels.set(value.hash, [value]); } else { - if(valuesArray.length < metrics.workerSize) { + if (valuesArray.length < metrics.workerSize) { valuesArray.push(value); } else { byLabels.set(value.hash, [value]); diff --git a/test/aggregatorsTest.js b/test/aggregatorsTest.js index 67704db6..85677af0 100644 --- a/test/aggregatorsTest.js +++ b/test/aggregatorsTest.js @@ -9,7 +9,7 @@ describe('aggregators', () => { type: 'does not matter', values: [ { labels: [], value: 1, hash: 'h1' }, - { labels: ['label1'], value: 2, hash: 'h2'}, + { labels: ['label1'], value: 2, hash: 'h2' }, ], }, { @@ -17,8 +17,8 @@ describe('aggregators', () => { name: 'metric_name', type: 'does not matter', values: [ - { labels: [], value: 3, hash: 'h1'}, - { labels: ['label1'], value: 4, hash: 'h2'}, + { labels: [], value: 3, hash: 'h1' }, + { labels: ['label1'], value: 4, hash: 'h2' }, ], }, ]; @@ -116,7 +116,7 @@ describe('aggregators', () => { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2'}], + values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2' }], }, ]; metrics2.workerSize = 2; diff --git a/test/clusterTest.js b/test/clusterTest.js index 81217a38..8596264a 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -78,9 +78,17 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.47, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' }, + { + value: 0.47, + labels: { method: 'get', code: 200 }, + hash: 'test_gauge{method="get",code="200"}', + }, { value: 0.64, labels: {}, hash: 'test_gauge{}' }, - { value: 23, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' }, + { + value: 23, + labels: { method: 'post', code: '300' }, + hash: 'test_gauge{method="post",code="300"}', + }, ], aggregator: 'sum', }, @@ -88,14 +96,22 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075832, labels: {}, hash: 'process_start_time_seconds{}' }], + values: [ + { + value: 1502075832, + labels: {}, + hash: 'process_start_time_seconds{}', + }, + ], aggregator: 'omit', }, { help: 'Lag of event loop in seconds.', name: 'nodejs_eventloop_lag_seconds', type: 'gauge', - values: [{ value: 0.009, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }], + values: [ + { value: 0.009, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }, + ], aggregator: 'average', }, { @@ -138,9 +154,17 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.02, labels: { method: 'get', code: 200 }, hash: 'test_gauge{method="get",code="200"}' }, + { + value: 0.02, + labels: { method: 'get', code: 200 }, + hash: 'test_gauge{method="get",code="200"}', + }, { value: 0.24, labels: {}, hash: 'test_gauge{}' }, - { value: 51, labels: { method: 'post', code: '300' }, hash: 'test_gauge{method="post",code="300"}' }, + { + value: 51, + labels: { method: 'post', code: '300' }, + hash: 'test_gauge{method="post",code="300"}', + }, ], aggregator: 'sum', }, @@ -148,14 +172,22 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075849, labels: {}, hash: 'process_start_time_seconds{}' }], + values: [ + { + value: 1502075849, + labels: {}, + hash: 'process_start_time_seconds{}', + }, + ], aggregator: 'omit', }, { help: 'Lag of event loop in seconds.', name: 'nodejs_eventloop_lag_seconds', type: 'gauge', - values: [{ value: 0.008, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }], + values: [ + { value: 0.008, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }, + ], aggregator: 'average', }, { From 49da0d97969cf8a048d6821319cc6a014fb38dab Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Sat, 1 Jun 2024 02:39:10 +0530 Subject: [PATCH 4/5] making values array empty in each scrape --- lib/cluster.js | 1 - lib/metricAggregators.js | 7 ++----- test/aggregatorsTest.js | 3 --- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index 68923e6d..299e1eae 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -120,7 +120,6 @@ class AggregatorRegistry extends Registry { // Aggregate gathered metrics. metricsByName.forEach(metrics => { - metrics.workerSize = metricsArr.length; const aggregatorName = metrics[0].aggregator; const aggregatorFn = aggregators[aggregatorName]; if (typeof aggregatorFn !== 'function') { diff --git a/lib/metricAggregators.js b/lib/metricAggregators.js index 73ec77c2..5f43b554 100644 --- a/lib/metricAggregators.js +++ b/lib/metricAggregators.js @@ -28,11 +28,7 @@ function AggregatorFactory(aggregatorFn) { if (!valuesArray) { byLabels.set(value.hash, [value]); } else { - if (valuesArray.length < metrics.workerSize) { - valuesArray.push(value); - } else { - byLabels.set(value.hash, [value]); - } + valuesArray.push(value); } }); }); @@ -48,6 +44,7 @@ function AggregatorFactory(aggregatorFn) { } // NB: Timestamps are omitted. result.values.push(valObj); + values.length = 0; }); return result; }; diff --git a/test/aggregatorsTest.js b/test/aggregatorsTest.js index 85677af0..f0570fb1 100644 --- a/test/aggregatorsTest.js +++ b/test/aggregatorsTest.js @@ -23,8 +23,6 @@ describe('aggregators', () => { }, ]; - metrics.workerSize = 2; - describe('sum', () => { it('properly sums values', () => { const result = aggregators.sum(metrics); @@ -119,7 +117,6 @@ describe('aggregators', () => { values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2' }], }, ]; - metrics2.workerSize = 2; const result = aggregators.sum(metrics2); expect(result.values).toEqual([ { value: 4, labels: [], metricName: 'abc' }, From dbb81ae6e82d8e2fe3951d2bbceb70d3ce9a7545 Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Sat, 24 May 2025 00:43:28 +0530 Subject: [PATCH 5/5] changes to make file based metric communications configurable --- CHANGELOG.md | 6 +- example/cluster-2.js | 36 ++++++ example/cluster.js | 3 + index.d.ts | 20 ++++ lib/cluster.js | 142 +++++++++++++++-------- lib/metricAggregators.js | 7 +- test/clusterTest.js | 241 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 403 insertions(+), 52 deletions(-) create mode 100644 example/cluster-2.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 739b7032..cc92449b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,10 @@ project adheres to [Semantic Versioning](http://semver.org/). - Changes for cluster mode - Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics -- Moved hashing of labels from master to worker to distribute the cpu bound hashing among workers -- Workers to write metrics in tmp file and send the file name to master to read metrics from rather than sending on IPC to keep IPC congestion free. (change in `cluster.js`) +- Made an options field `performanceOptimizedVarient` to create AggregatorRegistry as an optional param. +- If the `performanceOptimizedVarient` is set to true the hashing of labels is done by the workers to distribute the cpu bound hashing among workers. (change in `cluster.js`) +- If the `performanceOptimizedVarient` is set to true the workers will write metrics in a file in /tmp dir and send the file name to master to read metrics rather than sending the complete metrics on IPC to keep IPC congestion free. (change in `cluster.js`) +- Added a new example in `cluster-2.js` for creating the AggregatorRegistry using the performance configurations ### Added diff --git a/example/cluster-2.js b/example/cluster-2.js new file mode 100644 index 00000000..d9a3be25 --- /dev/null +++ b/example/cluster-2.js @@ -0,0 +1,36 @@ +'use strict'; + +const cluster = require('cluster'); +const express = require('express'); +const metricsServer = express(); +const AggregatorRegistry = require('../').AggregatorRegistry; + +// Create performance optimized AggregatorRegistry +// Uses file based communication for metrics flow from workers to master and requires access to /tmp folder +const aggregatorRegistry = new AggregatorRegistry(undefined, { + performanceOptimizedVarient: true, +}); + +if (cluster.isMaster) { + for (let i = 0; i < 4; i++) { + cluster.fork(); + } + + metricsServer.get('/cluster_metrics', async (req, res) => { + try { + const metrics = await aggregatorRegistry.clusterMetrics(); + res.set('Content-Type', aggregatorRegistry.contentType); + res.send(metrics); + } catch (ex) { + res.statusCode = 500; + res.send(ex.message); + } + }); + + metricsServer.listen(3001); + console.log( + 'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics', + ); +} else { + require('./server.js'); +} diff --git a/example/cluster.js b/example/cluster.js index b91e1ceb..0c40068b 100644 --- a/example/cluster.js +++ b/example/cluster.js @@ -4,6 +4,9 @@ const cluster = require('cluster'); const express = require('express'); const metricsServer = express(); const AggregatorRegistry = require('../').AggregatorRegistry; + +// Create default AggregatorRegistry +// Uses IPC communication for metrics flow from workers to master for aggregation const aggregatorRegistry = new AggregatorRegistry(); if (cluster.isMaster) { diff --git a/index.d.ts b/index.d.ts index 1fd1eac9..67531173 100644 --- a/index.d.ts +++ b/index.d.ts @@ -131,9 +131,27 @@ export const prometheusContentType: PrometheusContentType; */ export const openMetricsContentType: OpenMetricsContentType; +export interface ClusterOptions { + /** + * Enable performance optimizations + * When enabled: + * - Workers use file based communication instead of IPC for transferring metrics to master + * - Workers perform metrics hashing instead of the master process + * @default false + */ + enablePerformanceOptimizedVarient?: boolean; +} + export class AggregatorRegistry< T extends RegistryContentType, > extends Registry { + /** + * Create a new AggregatorRegistry + * @param reContentType Content Type of the registry + * @param options Performance optimization options for cluster mode + */ + constructor(regContentType?: T, options?: ClusterOptions); + /** * Gets aggregated metrics for all workers. * @return {Promise} Promise that resolves with the aggregated @@ -159,6 +177,7 @@ export class AggregatorRegistry< * use a registry/registries other than the default global registry. * @param {Array|Registry} regs Registry or registries to be * aggregated. + * @param {ClusterOptions} options Performance optimization options for cluster mode * @return {void} */ static setRegistries( @@ -168,6 +187,7 @@ export class AggregatorRegistry< > | Registry | Registry, + options?: ClusterOptions, ): void; } diff --git a/lib/cluster.js b/lib/cluster.js index 299e1eae..786d3011 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -30,10 +30,17 @@ let requestCtr = 0; // Concurrency control let listenersAdded = false; const requests = new Map(); // Pending requests for workers' local metrics. +let enablePerformanceOptimizedVarient = false; + class AggregatorRegistry extends Registry { - constructor(regContentType = Registry.PROMETHEUS_CONTENT_TYPE) { + constructor(regContentType = Registry.PROMETHEUS_CONTENT_TYPE, options = {}) { super(regContentType); addListeners(); + + if (options.enablePerformanceOptimizedVarient !== undefined) { + enablePerformanceOptimizedVarient = + options.enablePerformanceOptimizedVarient; + } } /** @@ -146,9 +153,11 @@ class AggregatorRegistry extends Registry { * use a registry/registries other than the default global registry. * @param {Array|Registry} regs Registry or registries to be * aggregated. + * @param {Object} options Configuration options for cluster optimizations + * @param {boolean} options.enablePerformanceOptimizedVarient Enable performance optimizations * @return {void} */ - static setRegistries(regs) { + static setRegistries(regs, options = {}) { if (!Array.isArray(regs)) regs = [regs]; regs.forEach(reg => { if (!(reg instanceof Registry)) { @@ -156,6 +165,11 @@ class AggregatorRegistry extends Registry { } }); registries = regs; + + if (options.enablePerformanceOptimizedVarient !== undefined) { + enablePerformanceOptimizedVarient = + options.enablePerformanceOptimizedVarient; + } } } @@ -178,28 +192,48 @@ function addListeners() { request.done(new Error(message.error)); return; } - fs.readFile(message.filename, 'utf8', (err, data) => { - if (err) { - request.done(err); - return; - } else { - const metrics = JSON.parse(data); - metrics.forEach(registry => request.responses.push(registry)); - fs.unlink(message.filename, e => { - if (e) - console.error(`Error deleting file ${message.filename}:`, e); - }); - request.pending--; - if (request.pending === 0) { - // finalize - requests.delete(message.requestId); - clearTimeout(request.errorTimeout); - const registry = AggregatorRegistry.aggregate(request.responses); - const promString = registry.metrics(); - request.done(null, promString); + + if (enablePerformanceOptimizedVarient && message.filename) { + // File based metrics communication + fs.readFile(message.filename, 'utf8', (err, data) => { + if (err) { + request.done(err); + return; + } else { + const metrics = JSON.parse(data); + metrics.forEach(registry => request.responses.push(registry)); + fs.unlink(message.filename, e => { + if (e) + console.error(`Error deleting file ${message.filename}:`, e); + }); + request.pending--; + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + clearTimeout(request.errorTimeout); + const registry = AggregatorRegistry.aggregate( + request.responses, + ); + const promString = registry.metrics(); + request.done(null, promString); + } } + }); + } else { + //Original behaviour with IPC communication + message.metrics.forEach(registry => request.responses.push(registry)); + request.pending--; + + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + clearTimeout(request.errorTimeout); + + const registry = AggregatorRegistry.aggregate(request.responses); + const promString = registry.metrics(); + request.done(null, promString); } - }); + } } }); } @@ -210,33 +244,45 @@ function addListeners() { if (message.type === GET_METRICS_REQ) { Promise.all(registries.map(r => r.getMetricsAsJSON())) .then(metrics => { - metrics.forEach(registry => { - registry.forEach(value => { - const hash = hashObject(value); - const key = `${value.metricName}_${hash}`; - value.hash = key; - }); - }); - // adding request id in file path to handle concurrency - const filename = path.join( - os.tmpdir(), - `metrics-${process.pid}-${message.requestId}.json`, - ); - fs.writeFile(filename, JSON.stringify(metrics), err => { - if (err) { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - error: err.message, - }); - } else { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - filename, + if (enablePerformanceOptimizedVarient) { + metrics.forEach(registryMetrics => { + registryMetrics.forEach(metric => { + metric.values.forEach(value => { + const hash = hashObject(value); + const key = `${value.metricName}_${hash}`; + value.hash = key; + }); }); - } - }); + }); + // adding request id in file path to handle concurrency + const filename = path.join( + os.tmpdir(), + `metrics-${process.pid}-${message.requestId}.json`, + ); + + fs.writeFile(filename, JSON.stringify(metrics), err => { + if (err) { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + error: err.message, + }); + } else { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + filename, + }); + } + }); + } else { + // Original behaviour with IPC communication + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + metrics, + }); + } }) .catch(error => { process.send({ diff --git a/lib/metricAggregators.js b/lib/metricAggregators.js index 5f43b554..7aaeb423 100644 --- a/lib/metricAggregators.js +++ b/lib/metricAggregators.js @@ -1,5 +1,6 @@ 'use strict'; +const { hashObject } = require('./util'); const metricMap = new Map(); /** @@ -24,9 +25,11 @@ function AggregatorFactory(aggregatorFn) { const byLabels = metricMap.get(metrics[0].name); metrics.forEach(metric => { metric.values.forEach(value => { - const valuesArray = byLabels.get(value.hash); + const hash = + value.hash || `${value.metricName}_${hashObject(value.labels)}`; + const valuesArray = byLabels.get(hash); if (!valuesArray) { - byLabels.set(value.hash, [value]); + byLabels.set(hash, [value]); } else { valuesArray.push(value); } diff --git a/test/clusterTest.js b/test/clusterTest.js index 8596264a..73b97529 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -44,12 +44,253 @@ describe.each([ describe('aggregatorRegistry.clusterMetrics()', () => { it('works properly if there are no cluster workers', async () => { const AggregatorRegistry = require('../lib/cluster'); + // performanceOptimizedVarient not set const ar = new AggregatorRegistry(regType); const metrics = await ar.clusterMetrics(); expect(metrics).toEqual(''); }); }); + describe('AggregatorRegistry.aggregate()', () => { + // These mimic the output of `getMetricsAsJSON`. + const metricsArr1 = [ + { + name: 'test_histogram', + help: 'Example of a histogram', + type: 'histogram', + values: [ + { + labels: { le: 0.1, code: '300' }, + value: 0, + metricName: 'test_histogram_bucket', + }, + { + labels: { le: 10, code: '300' }, + value: 1.6486727018068046, + metricName: 'test_histogram_bucket', + }, + ], + aggregator: 'sum', + }, + { + help: 'Example of a gauge', + name: 'test_gauge', + type: 'gauge', + values: [ + { + value: 0.47, + labels: { method: 'get', code: 200 }, + }, + { value: 0.64, labels: {} }, + { + value: 23, + labels: { method: 'post', code: '300' }, + }, + ], + aggregator: 'sum', + }, + { + help: 'Start time of the process since unix epoch in seconds.', + name: 'process_start_time_seconds', + type: 'gauge', + values: [ + { + value: 1502075832, + labels: {}, + }, + ], + aggregator: 'omit', + }, + { + help: 'Lag of event loop in seconds.', + name: 'nodejs_eventloop_lag_seconds', + type: 'gauge', + values: [{ value: 0.009, labels: {} }], + aggregator: 'average', + }, + { + help: 'Node.js version info.', + name: 'nodejs_version_info', + type: 'gauge', + values: [ + { + value: 1, + labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + }, + ], + aggregator: 'first', + }, + ]; + const metricsArr2 = [ + { + name: 'test_histogram', + help: 'Example of a histogram', + type: 'histogram', + values: [ + { + labels: { le: 0.1, code: '300' }, + value: 0.235151, + metricName: 'test_histogram_bucket', + }, + { + labels: { le: 10, code: '300' }, + value: 1.192591, + metricName: 'test_histogram_bucket', + }, + ], + aggregator: 'sum', + }, + { + help: 'Example of a gauge', + name: 'test_gauge', + type: 'gauge', + values: [ + { + value: 0.02, + labels: { method: 'get', code: 200 }, + }, + { value: 0.24, labels: {} }, + { + value: 51, + labels: { method: 'post', code: '300' }, + }, + ], + aggregator: 'sum', + }, + { + help: 'Start time of the process since unix epoch in seconds.', + name: 'process_start_time_seconds', + type: 'gauge', + values: [ + { + value: 1502075849, + labels: {}, + }, + ], + aggregator: 'omit', + }, + { + help: 'Lag of event loop in seconds.', + name: 'nodejs_eventloop_lag_seconds', + type: 'gauge', + values: [{ value: 0.008, labels: {} }], + aggregator: 'average', + }, + { + help: 'Node.js version info.', + name: 'nodejs_version_info', + type: 'gauge', + values: [ + { + value: 1, + labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + }, + ], + aggregator: 'first', + }, + ]; + + const aggregated = Registry.aggregate([metricsArr1, metricsArr2], regType); + + it('defaults to summation, preserves histogram bins', async () => { + const histogram = aggregated.getSingleMetric('test_histogram').get(); + expect(histogram).toEqual({ + name: 'test_histogram', + help: 'Example of a histogram', + type: 'histogram', + values: [ + { + labels: { le: 0.1, code: '300' }, + value: 0.235151, + metricName: 'test_histogram_bucket', + }, + { + labels: { le: 10, code: '300' }, + value: 2.8412637018068043, + metricName: 'test_histogram_bucket', + }, + ], + aggregator: 'sum', + }); + }); + + it('defaults to summation, works for gauges', () => { + const gauge = aggregated.getSingleMetric('test_gauge').get(); + expect(gauge).toEqual({ + help: 'Example of a gauge', + name: 'test_gauge', + type: 'gauge', + values: [ + { value: 0.49, labels: { method: 'get', code: 200 } }, + { value: 0.88, labels: {} }, + { value: 74, labels: { method: 'post', code: '300' } }, + ], + aggregator: 'sum', + }); + }); + + it('uses `aggregate` method defined for process_start_time', () => { + const procStartTime = aggregated.getSingleMetric( + 'process_start_time_seconds', + ); + expect(procStartTime).toBeUndefined(); + }); + + it('uses `aggregate` method defined for nodejs_eventloop_lag_seconds', () => { + const ell = aggregated + .getSingleMetric('nodejs_eventloop_lag_seconds') + .get(); + expect(ell).toEqual({ + help: 'Lag of event loop in seconds.', + name: 'nodejs_eventloop_lag_seconds', + type: 'gauge', + values: [{ value: 0.0085, labels: {} }], + aggregator: 'average', + }); + }); + + it('uses `aggregate` method defined for nodejs_evnetloop_lag_seconds', () => { + const ell = aggregated + .getSingleMetric('nodejs_eventloop_lag_seconds') + .get(); + expect(ell).toEqual({ + help: 'Lag of event loop in seconds.', + name: 'nodejs_eventloop_lag_seconds', + type: 'gauge', + values: [{ value: 0.0085, labels: {} }], + aggregator: 'average', + }); + }); + + it('uses `aggregate` method defined for nodejs_version_info', () => { + const version = aggregated.getSingleMetric('nodejs_version_info').get(); + expect(version).toEqual({ + help: 'Node.js version info.', + name: 'nodejs_version_info', + type: 'gauge', + values: [ + { + value: 1, + labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + }, + ], + aggregator: 'first', + }); + }); + }); + + describe('aggregatorRegistry.clusterMetrics()', () => { + it('works properly if there are no cluster workers', async () => { + const AggregatorRegistry = require('../lib/cluster'); + // enablePerformanceOptimizedVarient set to true + const ar = new AggregatorRegistry(regType, { + performanceOptimizedVarient: true, + }); + const metrics = await ar.clusterMetrics(); + expect(metrics).toEqual(''); + }); + }); + describe('AggregatorRegistry.aggregate()', () => { // These mimic the output of `getMetricsAsJSON`. const metricsArr1 = [