diff --git a/CHANGELOG.md b/CHANGELOG.md index 96fcb5cb..cc92449b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,13 @@ project adheres to [Semantic Versioning](http://semver.org/). ### Changed +- Changes for cluster mode +- Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics +- Made an options field `performanceOptimizedVarient` to create AggregatorRegistry as an optional param. +- If the `performanceOptimizedVarient` is set to true the hashing of labels is done by the workers to distribute the cpu bound hashing among workers. (change in `cluster.js`) +- If the `performanceOptimizedVarient` is set to true the workers will write metrics in a file in /tmp dir and send the file name to master to read metrics rather than sending the complete metrics on IPC to keep IPC congestion free. (change in `cluster.js`) +- Added a new example in `cluster-2.js` for creating the AggregatorRegistry using the performance configurations + ### Added [unreleased]: https://github.com/siimon/prom-client/compare/v15.1.2...HEAD diff --git a/example/cluster-2.js b/example/cluster-2.js new file mode 100644 index 00000000..d9a3be25 --- /dev/null +++ b/example/cluster-2.js @@ -0,0 +1,36 @@ +'use strict'; + +const cluster = require('cluster'); +const express = require('express'); +const metricsServer = express(); +const AggregatorRegistry = require('../').AggregatorRegistry; + +// Create performance optimized AggregatorRegistry +// Uses file based communication for metrics flow from workers to master and requires access to /tmp folder +const aggregatorRegistry = new AggregatorRegistry(undefined, { + performanceOptimizedVarient: true, +}); + +if (cluster.isMaster) { + for (let i = 0; i < 4; i++) { + cluster.fork(); + } + + metricsServer.get('/cluster_metrics', async (req, res) => { + try { + const metrics = await aggregatorRegistry.clusterMetrics(); + res.set('Content-Type', aggregatorRegistry.contentType); + res.send(metrics); + } catch (ex) { + res.statusCode = 500; + res.send(ex.message); + } + }); + + metricsServer.listen(3001); + console.log( + 'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics', + ); +} else { + require('./server.js'); +} diff --git a/example/cluster.js b/example/cluster.js index b91e1ceb..0c40068b 100644 --- a/example/cluster.js +++ b/example/cluster.js @@ -4,6 +4,9 @@ const cluster = require('cluster'); const express = require('express'); const metricsServer = express(); const AggregatorRegistry = require('../').AggregatorRegistry; + +// Create default AggregatorRegistry +// Uses IPC communication for metrics flow from workers to master for aggregation const aggregatorRegistry = new AggregatorRegistry(); if (cluster.isMaster) { diff --git a/index.d.ts b/index.d.ts index 1fd1eac9..67531173 100644 --- a/index.d.ts +++ b/index.d.ts @@ -131,9 +131,27 @@ export const prometheusContentType: PrometheusContentType; */ export const openMetricsContentType: OpenMetricsContentType; +export interface ClusterOptions { + /** + * Enable performance optimizations + * When enabled: + * - Workers use file based communication instead of IPC for transferring metrics to master + * - Workers perform metrics hashing instead of the master process + * @default false + */ + enablePerformanceOptimizedVarient?: boolean; +} + export class AggregatorRegistry< T extends RegistryContentType, > extends Registry { + /** + * Create a new AggregatorRegistry + * @param reContentType Content Type of the registry + * @param options Performance optimization options for cluster mode + */ + constructor(regContentType?: T, options?: ClusterOptions); + /** * Gets aggregated metrics for all workers. * @return {Promise} Promise that resolves with the aggregated @@ -159,6 +177,7 @@ export class AggregatorRegistry< * use a registry/registries other than the default global registry. * @param {Array|Registry} regs Registry or registries to be * aggregated. + * @param {ClusterOptions} options Performance optimization options for cluster mode * @return {void} */ static setRegistries( @@ -168,6 +187,7 @@ export class AggregatorRegistry< > | Registry | Registry, + options?: ClusterOptions, ): void; } diff --git a/lib/cluster.js b/lib/cluster.js index 5cb707ed..786d3011 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -9,8 +9,11 @@ */ const Registry = require('./registry'); -const { Grouper } = require('./util'); +const { Grouper, hashObject } = require('./util'); const { aggregators } = require('./metricAggregators'); +const fs = require('fs'); +const path = require('path'); +const os = require('os'); // We need to lazy-load the 'cluster' module as some application servers - // namely Passenger - crash when it is imported. let cluster = () => { @@ -27,10 +30,17 @@ let requestCtr = 0; // Concurrency control let listenersAdded = false; const requests = new Map(); // Pending requests for workers' local metrics. +let enablePerformanceOptimizedVarient = false; + class AggregatorRegistry extends Registry { - constructor(regContentType = Registry.PROMETHEUS_CONTENT_TYPE) { + constructor(regContentType = Registry.PROMETHEUS_CONTENT_TYPE, options = {}) { super(regContentType); addListeners(); + + if (options.enablePerformanceOptimizedVarient !== undefined) { + enablePerformanceOptimizedVarient = + options.enablePerformanceOptimizedVarient; + } } /** @@ -143,9 +153,11 @@ class AggregatorRegistry extends Registry { * use a registry/registries other than the default global registry. * @param {Array|Registry} regs Registry or registries to be * aggregated. + * @param {Object} options Configuration options for cluster optimizations + * @param {boolean} options.enablePerformanceOptimizedVarient Enable performance optimizations * @return {void} */ - static setRegistries(regs) { + static setRegistries(regs, options = {}) { if (!Array.isArray(regs)) regs = [regs]; regs.forEach(reg => { if (!(reg instanceof Registry)) { @@ -153,6 +165,11 @@ class AggregatorRegistry extends Registry { } }); registries = regs; + + if (options.enablePerformanceOptimizedVarient !== undefined) { + enablePerformanceOptimizedVarient = + options.enablePerformanceOptimizedVarient; + } } } @@ -176,17 +193,46 @@ function addListeners() { return; } - message.metrics.forEach(registry => request.responses.push(registry)); - request.pending--; + if (enablePerformanceOptimizedVarient && message.filename) { + // File based metrics communication + fs.readFile(message.filename, 'utf8', (err, data) => { + if (err) { + request.done(err); + return; + } else { + const metrics = JSON.parse(data); + metrics.forEach(registry => request.responses.push(registry)); + fs.unlink(message.filename, e => { + if (e) + console.error(`Error deleting file ${message.filename}:`, e); + }); + request.pending--; + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + clearTimeout(request.errorTimeout); + const registry = AggregatorRegistry.aggregate( + request.responses, + ); + const promString = registry.metrics(); + request.done(null, promString); + } + } + }); + } else { + //Original behaviour with IPC communication + message.metrics.forEach(registry => request.responses.push(registry)); + request.pending--; - if (request.pending === 0) { - // finalize - requests.delete(message.requestId); - clearTimeout(request.errorTimeout); + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + clearTimeout(request.errorTimeout); - const registry = AggregatorRegistry.aggregate(request.responses); - const promString = registry.metrics(); - request.done(null, promString); + const registry = AggregatorRegistry.aggregate(request.responses); + const promString = registry.metrics(); + request.done(null, promString); + } } } }); @@ -198,11 +244,45 @@ function addListeners() { if (message.type === GET_METRICS_REQ) { Promise.all(registries.map(r => r.getMetricsAsJSON())) .then(metrics => { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - metrics, - }); + if (enablePerformanceOptimizedVarient) { + metrics.forEach(registryMetrics => { + registryMetrics.forEach(metric => { + metric.values.forEach(value => { + const hash = hashObject(value); + const key = `${value.metricName}_${hash}`; + value.hash = key; + }); + }); + }); + // adding request id in file path to handle concurrency + const filename = path.join( + os.tmpdir(), + `metrics-${process.pid}-${message.requestId}.json`, + ); + + fs.writeFile(filename, JSON.stringify(metrics), err => { + if (err) { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + error: err.message, + }); + } else { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + filename, + }); + } + }); + } else { + // Original behaviour with IPC communication + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + metrics, + }); + } }) .catch(error => { process.send({ diff --git a/lib/metricAggregators.js b/lib/metricAggregators.js index c010d467..7aaeb423 100644 --- a/lib/metricAggregators.js +++ b/lib/metricAggregators.js @@ -1,6 +1,7 @@ 'use strict'; -const { Grouper, hashObject } = require('./util'); +const { hashObject } = require('./util'); +const metricMap = new Map(); /** * Returns a new function that applies the `aggregatorFn` to the values. @@ -18,11 +19,20 @@ function AggregatorFactory(aggregatorFn) { aggregator: metrics[0].aggregator, }; // Gather metrics by metricName and labels. - const byLabels = new Grouper(); + if (!metricMap.get(metrics[0].name)) { + metricMap.set(metrics[0].name, new Map()); + } + const byLabels = metricMap.get(metrics[0].name); metrics.forEach(metric => { metric.values.forEach(value => { - const key = hashObject(value.labels); - byLabels.add(`${value.metricName}_${key}`, value); + const hash = + value.hash || `${value.metricName}_${hashObject(value.labels)}`; + const valuesArray = byLabels.get(hash); + if (!valuesArray) { + byLabels.set(hash, [value]); + } else { + valuesArray.push(value); + } }); }); // Apply aggregator function to gathered metrics. @@ -37,6 +47,7 @@ function AggregatorFactory(aggregatorFn) { } // NB: Timestamps are omitted. result.values.push(valObj); + values.length = 0; }); return result; }; diff --git a/test/aggregatorsTest.js b/test/aggregatorsTest.js index 0010a656..f0570fb1 100644 --- a/test/aggregatorsTest.js +++ b/test/aggregatorsTest.js @@ -8,8 +8,8 @@ describe('aggregators', () => { name: 'metric_name', type: 'does not matter', values: [ - { labels: [], value: 1 }, - { labels: ['label1'], value: 2 }, + { labels: [], value: 1, hash: 'h1' }, + { labels: ['label1'], value: 2, hash: 'h2' }, ], }, { @@ -17,8 +17,8 @@ describe('aggregators', () => { name: 'metric_name', type: 'does not matter', values: [ - { labels: [], value: 3 }, - { labels: ['label1'], value: 4 }, + { labels: [], value: 3, hash: 'h1' }, + { labels: ['label1'], value: 4, hash: 'h2' }, ], }, ]; @@ -102,19 +102,19 @@ describe('aggregators', () => { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 1, metricName: 'abc' }], + values: [{ labels: [], value: 1, metricName: 'abc', hash: 'h1' }], }, { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 3, metricName: 'abc' }], + values: [{ labels: [], value: 3, metricName: 'abc', hash: 'h1' }], }, { help: 'metric_help', name: 'metric_name', type: 'does not matter', - values: [{ labels: [], value: 5, metricName: 'def' }], + values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2' }], }, ]; const result = aggregators.sum(metrics2); diff --git a/test/clusterTest.js b/test/clusterTest.js index d1a58314..73b97529 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -3,6 +3,7 @@ const cluster = require('cluster'); const process = require('process'); const Registry = require('../lib/cluster'); +const { hash } = require('crypto'); describe.each([ ['Prometheus', Registry.PROMETHEUS_CONTENT_TYPE], @@ -43,6 +44,7 @@ describe.each([ describe('aggregatorRegistry.clusterMetrics()', () => { it('works properly if there are no cluster workers', async () => { const AggregatorRegistry = require('../lib/cluster'); + // performanceOptimizedVarient not set const ar = new AggregatorRegistry(regType); const metrics = await ar.clusterMetrics(); expect(metrics).toEqual(''); @@ -75,9 +77,15 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.47, labels: { method: 'get', code: 200 } }, + { + value: 0.47, + labels: { method: 'get', code: 200 }, + }, { value: 0.64, labels: {} }, - { value: 23, labels: { method: 'post', code: '300' } }, + { + value: 23, + labels: { method: 'post', code: '300' }, + }, ], aggregator: 'sum', }, @@ -85,7 +93,12 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075832, labels: {} }], + values: [ + { + value: 1502075832, + labels: {}, + }, + ], aggregator: 'omit', }, { @@ -132,9 +145,15 @@ describe.each([ name: 'test_gauge', type: 'gauge', values: [ - { value: 0.02, labels: { method: 'get', code: 200 } }, + { + value: 0.02, + labels: { method: 'get', code: 200 }, + }, { value: 0.24, labels: {} }, - { value: 51, labels: { method: 'post', code: '300' } }, + { + value: 51, + labels: { method: 'post', code: '300' }, + }, ], aggregator: 'sum', }, @@ -142,7 +161,12 @@ describe.each([ help: 'Start time of the process since unix epoch in seconds.', name: 'process_start_time_seconds', type: 'gauge', - values: [{ value: 1502075849, labels: {} }], + values: [ + { + value: 1502075849, + labels: {}, + }, + ], aggregator: 'omit', }, { @@ -254,4 +278,260 @@ describe.each([ }); }); }); + + describe('aggregatorRegistry.clusterMetrics()', () => { + it('works properly if there are no cluster workers', async () => { + const AggregatorRegistry = require('../lib/cluster'); + // enablePerformanceOptimizedVarient set to true + const ar = new AggregatorRegistry(regType, { + performanceOptimizedVarient: true, + }); + const metrics = await ar.clusterMetrics(); + expect(metrics).toEqual(''); + }); + }); + + describe('AggregatorRegistry.aggregate()', () => { + // These mimic the output of `getMetricsAsJSON`. + const metricsArr1 = [ + { + name: 'test_histogram', + help: 'Example of a histogram', + type: 'histogram', + values: [ + { + labels: { le: 0.1, code: '300' }, + value: 0, + metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="0.1",code="300"}', + }, + { + labels: { le: 10, code: '300' }, + value: 1.6486727018068046, + metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="10",code="300"}', + }, + ], + aggregator: 'sum', + }, + { + help: 'Example of a gauge', + name: 'test_gauge', + type: 'gauge', + values: [ + { + value: 0.47, + labels: { method: 'get', code: 200 }, + hash: 'test_gauge{method="get",code="200"}', + }, + { value: 0.64, labels: {}, hash: 'test_gauge{}' }, + { + value: 23, + labels: { method: 'post', code: '300' }, + hash: 'test_gauge{method="post",code="300"}', + }, + ], + aggregator: 'sum', + }, + { + help: 'Start time of the process since unix epoch in seconds.', + name: 'process_start_time_seconds', + type: 'gauge', + values: [ + { + value: 1502075832, + labels: {}, + hash: 'process_start_time_seconds{}', + }, + ], + aggregator: 'omit', + }, + { + help: 'Lag of event loop in seconds.', + name: 'nodejs_eventloop_lag_seconds', + type: 'gauge', + values: [ + { value: 0.009, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }, + ], + aggregator: 'average', + }, + { + help: 'Node.js version info.', + name: 'nodejs_version_info', + type: 'gauge', + values: [ + { + value: 1, + labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}', + }, + ], + aggregator: 'first', + }, + ]; + const metricsArr2 = [ + { + name: 'test_histogram', + help: 'Example of a histogram', + type: 'histogram', + values: [ + { + labels: { le: 0.1, code: '300' }, + value: 0.235151, + metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="0.1",code="300"}', + }, + { + labels: { le: 10, code: '300' }, + value: 1.192591, + metricName: 'test_histogram_bucket', + hash: 'test_histogram_bucket{le="10",code="300"}', + }, + ], + aggregator: 'sum', + }, + { + help: 'Example of a gauge', + name: 'test_gauge', + type: 'gauge', + values: [ + { + value: 0.02, + labels: { method: 'get', code: 200 }, + hash: 'test_gauge{method="get",code="200"}', + }, + { value: 0.24, labels: {}, hash: 'test_gauge{}' }, + { + value: 51, + labels: { method: 'post', code: '300' }, + hash: 'test_gauge{method="post",code="300"}', + }, + ], + aggregator: 'sum', + }, + { + help: 'Start time of the process since unix epoch in seconds.', + name: 'process_start_time_seconds', + type: 'gauge', + values: [ + { + value: 1502075849, + labels: {}, + hash: 'process_start_time_seconds{}', + }, + ], + aggregator: 'omit', + }, + { + help: 'Lag of event loop in seconds.', + name: 'nodejs_eventloop_lag_seconds', + type: 'gauge', + values: [ + { value: 0.008, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' }, + ], + aggregator: 'average', + }, + { + help: 'Node.js version info.', + name: 'nodejs_version_info', + type: 'gauge', + values: [ + { + value: 1, + labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}', + }, + ], + aggregator: 'first', + }, + ]; + + const aggregated = Registry.aggregate([metricsArr1, metricsArr2], regType); + + it('defaults to summation, preserves histogram bins', async () => { + const histogram = aggregated.getSingleMetric('test_histogram').get(); + expect(histogram).toEqual({ + name: 'test_histogram', + help: 'Example of a histogram', + type: 'histogram', + values: [ + { + labels: { le: 0.1, code: '300' }, + value: 0.235151, + metricName: 'test_histogram_bucket', + }, + { + labels: { le: 10, code: '300' }, + value: 2.8412637018068043, + metricName: 'test_histogram_bucket', + }, + ], + aggregator: 'sum', + }); + }); + + it('defaults to summation, works for gauges', () => { + const gauge = aggregated.getSingleMetric('test_gauge').get(); + expect(gauge).toEqual({ + help: 'Example of a gauge', + name: 'test_gauge', + type: 'gauge', + values: [ + { value: 0.49, labels: { method: 'get', code: 200 } }, + { value: 0.88, labels: {} }, + { value: 74, labels: { method: 'post', code: '300' } }, + ], + aggregator: 'sum', + }); + }); + + it('uses `aggregate` method defined for process_start_time', () => { + const procStartTime = aggregated.getSingleMetric( + 'process_start_time_seconds', + ); + expect(procStartTime).toBeUndefined(); + }); + + it('uses `aggregate` method defined for nodejs_eventloop_lag_seconds', () => { + const ell = aggregated + .getSingleMetric('nodejs_eventloop_lag_seconds') + .get(); + expect(ell).toEqual({ + help: 'Lag of event loop in seconds.', + name: 'nodejs_eventloop_lag_seconds', + type: 'gauge', + values: [{ value: 0.0085, labels: {} }], + aggregator: 'average', + }); + }); + + it('uses `aggregate` method defined for nodejs_evnetloop_lag_seconds', () => { + const ell = aggregated + .getSingleMetric('nodejs_eventloop_lag_seconds') + .get(); + expect(ell).toEqual({ + help: 'Lag of event loop in seconds.', + name: 'nodejs_eventloop_lag_seconds', + type: 'gauge', + values: [{ value: 0.0085, labels: {} }], + aggregator: 'average', + }); + }); + + it('uses `aggregate` method defined for nodejs_version_info', () => { + const version = aggregated.getSingleMetric('nodejs_version_info').get(); + expect(version).toEqual({ + help: 'Node.js version info.', + name: 'nodejs_version_info', + type: 'gauge', + values: [ + { + value: 1, + labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 }, + }, + ], + aggregator: 'first', + }); + }); + }); });