Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ project adheres to [Semantic Versioning](http://semver.org/).

### Changed

- Changes for cluster mode
- Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics
- Made an options field `performanceOptimizedVarient` to create AggregatorRegistry as an optional param.
- If the `performanceOptimizedVarient` is set to true the hashing of labels is done by the workers to distribute the cpu bound hashing among workers. (change in `cluster.js`)
- If the `performanceOptimizedVarient` is set to true the workers will write metrics in a file in /tmp dir and send the file name to master to read metrics rather than sending the complete metrics on IPC to keep IPC congestion free. (change in `cluster.js`)
- Added a new example in `cluster-2.js` for creating the AggregatorRegistry using the performance configurations

### Added

[unreleased]: https://github.com/siimon/prom-client/compare/v15.1.2...HEAD
Expand Down
36 changes: 36 additions & 0 deletions example/cluster-2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict';

const cluster = require('cluster');
const express = require('express');
const metricsServer = express();
const AggregatorRegistry = require('../').AggregatorRegistry;

// Create performance optimized AggregatorRegistry
// Uses file based communication for metrics flow from workers to master and requires access to /tmp folder
const aggregatorRegistry = new AggregatorRegistry(undefined, {
performanceOptimizedVarient: true,
});

if (cluster.isMaster) {
for (let i = 0; i < 4; i++) {
cluster.fork();
}

metricsServer.get('/cluster_metrics', async (req, res) => {
try {
const metrics = await aggregatorRegistry.clusterMetrics();
res.set('Content-Type', aggregatorRegistry.contentType);
res.send(metrics);
} catch (ex) {
res.statusCode = 500;
res.send(ex.message);
}
});

metricsServer.listen(3001);
console.log(
'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics',
);
} else {
require('./server.js');
}
3 changes: 3 additions & 0 deletions example/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ const cluster = require('cluster');
const express = require('express');
const metricsServer = express();
const AggregatorRegistry = require('../').AggregatorRegistry;

// Create default AggregatorRegistry
// Uses IPC communication for metrics flow from workers to master for aggregation
const aggregatorRegistry = new AggregatorRegistry();

if (cluster.isMaster) {
Expand Down
20 changes: 20 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,27 @@ export const prometheusContentType: PrometheusContentType;
*/
export const openMetricsContentType: OpenMetricsContentType;

export interface ClusterOptions {
/**
* Enable performance optimizations
* When enabled:
* - Workers use file based communication instead of IPC for transferring metrics to master
* - Workers perform metrics hashing instead of the master process
* @default false
*/
enablePerformanceOptimizedVarient?: boolean;
}

export class AggregatorRegistry<
T extends RegistryContentType,
> extends Registry<T> {
/**
* Create a new AggregatorRegistry
* @param reContentType Content Type of the registry
* @param options Performance optimization options for cluster mode
*/
constructor(regContentType?: T, options?: ClusterOptions);

/**
* Gets aggregated metrics for all workers.
* @return {Promise<string>} Promise that resolves with the aggregated
Expand All @@ -159,6 +177,7 @@ export class AggregatorRegistry<
* use a registry/registries other than the default global registry.
* @param {Array<Registry>|Registry} regs Registry or registries to be
* aggregated.
* @param {ClusterOptions} options Performance optimization options for cluster mode
* @return {void}
*/
static setRegistries(
Expand All @@ -168,6 +187,7 @@ export class AggregatorRegistry<
>
| Registry<PrometheusContentType>
| Registry<OpenMetricsContentType>,
options?: ClusterOptions,
): void;
}

Expand Down
114 changes: 97 additions & 17 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
*/

const Registry = require('./registry');
const { Grouper } = require('./util');
const { Grouper, hashObject } = require('./util');
const { aggregators } = require('./metricAggregators');
const fs = require('fs');
const path = require('path');
const os = require('os');
// We need to lazy-load the 'cluster' module as some application servers -
// namely Passenger - crash when it is imported.
let cluster = () => {
Expand All @@ -27,10 +30,17 @@ let requestCtr = 0; // Concurrency control
let listenersAdded = false;
const requests = new Map(); // Pending requests for workers' local metrics.

let enablePerformanceOptimizedVarient = false;

class AggregatorRegistry extends Registry {
constructor(regContentType = Registry.PROMETHEUS_CONTENT_TYPE) {
constructor(regContentType = Registry.PROMETHEUS_CONTENT_TYPE, options = {}) {
super(regContentType);
addListeners();

if (options.enablePerformanceOptimizedVarient !== undefined) {
enablePerformanceOptimizedVarient =
options.enablePerformanceOptimizedVarient;
}
}

/**
Expand Down Expand Up @@ -143,16 +153,23 @@ class AggregatorRegistry extends Registry {
* use a registry/registries other than the default global registry.
* @param {Array<Registry>|Registry} regs Registry or registries to be
* aggregated.
* @param {Object} options Configuration options for cluster optimizations
* @param {boolean} options.enablePerformanceOptimizedVarient Enable performance optimizations
* @return {void}
*/
static setRegistries(regs) {
static setRegistries(regs, options = {}) {
if (!Array.isArray(regs)) regs = [regs];
regs.forEach(reg => {
if (!(reg instanceof Registry)) {
throw new TypeError(`Expected Registry, got ${typeof reg}`);
}
});
registries = regs;

if (options.enablePerformanceOptimizedVarient !== undefined) {
enablePerformanceOptimizedVarient =
options.enablePerformanceOptimizedVarient;
}
}
}

Expand All @@ -176,17 +193,46 @@ function addListeners() {
return;
}

message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;
if (enablePerformanceOptimizedVarient && message.filename) {
// File based metrics communication
fs.readFile(message.filename, 'utf8', (err, data) => {
if (err) {
request.done(err);
return;
} else {
const metrics = JSON.parse(data);
metrics.forEach(registry => request.responses.push(registry));
fs.unlink(message.filename, e => {
if (e)
console.error(`Error deleting file ${message.filename}:`, e);
});
request.pending--;
if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);
const registry = AggregatorRegistry.aggregate(
request.responses,
);
const promString = registry.metrics();
request.done(null, promString);
}
}
});
} else {
//Original behaviour with IPC communication
message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);
if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}
}
}
});
Expand All @@ -198,11 +244,45 @@ function addListeners() {
if (message.type === GET_METRICS_REQ) {
Promise.all(registries.map(r => r.getMetricsAsJSON()))
.then(metrics => {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
metrics,
});
if (enablePerformanceOptimizedVarient) {
metrics.forEach(registryMetrics => {
registryMetrics.forEach(metric => {
metric.values.forEach(value => {
const hash = hashObject(value);
const key = `${value.metricName}_${hash}`;
value.hash = key;
});
});
});
// adding request id in file path to handle concurrency
const filename = path.join(
os.tmpdir(),
`metrics-${process.pid}-${message.requestId}.json`,
);

fs.writeFile(filename, JSON.stringify(metrics), err => {
if (err) {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
error: err.message,
});
} else {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
filename,
});
}
});
} else {
// Original behaviour with IPC communication
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
metrics,
});
}
})
.catch(error => {
process.send({
Expand Down
19 changes: 15 additions & 4 deletions lib/metricAggregators.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const { Grouper, hashObject } = require('./util');
const { hashObject } = require('./util');
const metricMap = new Map();

/**
* Returns a new function that applies the `aggregatorFn` to the values.
Expand All @@ -18,11 +19,20 @@ function AggregatorFactory(aggregatorFn) {
aggregator: metrics[0].aggregator,
};
// Gather metrics by metricName and labels.
const byLabels = new Grouper();
if (!metricMap.get(metrics[0].name)) {
metricMap.set(metrics[0].name, new Map());
}
const byLabels = metricMap.get(metrics[0].name);
metrics.forEach(metric => {
metric.values.forEach(value => {
const key = hashObject(value.labels);
byLabels.add(`${value.metricName}_${key}`, value);
const hash =
value.hash || `${value.metricName}_${hashObject(value.labels)}`;
const valuesArray = byLabels.get(hash);
if (!valuesArray) {
byLabels.set(hash, [value]);
} else {
valuesArray.push(value);
}
});
});
// Apply aggregator function to gathered metrics.
Expand All @@ -37,6 +47,7 @@ function AggregatorFactory(aggregatorFn) {
}
// NB: Timestamps are omitted.
result.values.push(valObj);
values.length = 0;
});
return result;
};
Expand Down
14 changes: 7 additions & 7 deletions test/aggregatorsTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ describe('aggregators', () => {
name: 'metric_name',
type: 'does not matter',
values: [
{ labels: [], value: 1 },
{ labels: ['label1'], value: 2 },
{ labels: [], value: 1, hash: 'h1' },
{ labels: ['label1'], value: 2, hash: 'h2' },
],
},
{
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [
{ labels: [], value: 3 },
{ labels: ['label1'], value: 4 },
{ labels: [], value: 3, hash: 'h1' },
{ labels: ['label1'], value: 4, hash: 'h2' },
],
},
];
Expand Down Expand Up @@ -102,19 +102,19 @@ describe('aggregators', () => {
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [{ labels: [], value: 1, metricName: 'abc' }],
values: [{ labels: [], value: 1, metricName: 'abc', hash: 'h1' }],
},
{
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [{ labels: [], value: 3, metricName: 'abc' }],
values: [{ labels: [], value: 3, metricName: 'abc', hash: 'h1' }],
},
{
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [{ labels: [], value: 5, metricName: 'def' }],
values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2' }],
},
];
const result = aggregators.sum(metrics2);
Expand Down
Loading