Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.linkedin.venice.stats;

import static com.linkedin.venice.stats.VeniceOpenTelemetryMetricNamingFormat.SNAKE_CASE;
import static io.opentelemetry.sdk.metrics.InstrumentType.GAUGE;

import com.linkedin.venice.stats.metrics.MetricEntity;
import io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.MetricReader;
Expand Down Expand Up @@ -126,6 +128,16 @@ public class VeniceMetricsConfig {
public static final String OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE =
"otel.exporter.otlp.metrics.temporality.preference";

/**
* Make Synchronous Gauge instruments export the last recorded value even if the instrument is not
* recorded in the current collection interval.
* - If true, it will export the last recorded value even if the instrument is not recorded in the current collection interval.
* - If false, it will use the default behavior of the selected {@link #otelAggregationTemporalitySelector}, which is
* {@link AggregationTemporalitySelector#deltaPreferred()} by default.
*/
public static final String OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE =
"otel.venice.export.last.recorded.value.for.synchronous.gauge";

/**
* Default histogram aggregation to be used for all histograms: Select one of the below <br>
* 1. base2_exponential_bucket_histogram <br>
Expand Down Expand Up @@ -208,6 +220,11 @@ public class VeniceMetricsConfig {
/** Metric naming conventions for OpenTelemetry metrics */
private final VeniceOpenTelemetryMetricNamingFormat metricNamingFormat;

/**
* Whether to export the last recorded value for synchronous Gauge instruments.
*/
private final boolean exportLastRecordedValueForSynchronousGauge;

/** Aggregation Temporality selector to export only the delta or cumulate or different */
private final AggregationTemporalitySelector otelAggregationTemporalitySelector;

Expand All @@ -234,6 +251,7 @@ private VeniceMetricsConfig(Builder builder) {
this.otelHeaders = builder.otelHeaders;
this.exportOtelMetricsToLog = builder.exportOtelMetricsToLog;
this.metricNamingFormat = builder.metricNamingFormat;
this.exportLastRecordedValueForSynchronousGauge = builder.exportLastRecordedValueForSynchronousGauge;
this.otelAggregationTemporalitySelector = builder.otelAggregationTemporalitySelector;
this.useOtelExponentialHistogram = builder.useOtelExponentialHistogram;
this.otelExponentialHistogramMaxScale = builder.otelExponentialHistogramMaxScale;
Expand All @@ -257,6 +275,7 @@ public static class Builder {
Map<String, String> otelHeaders = new HashMap<>();
private boolean exportOtelMetricsToLog = false;
private VeniceOpenTelemetryMetricNamingFormat metricNamingFormat = SNAKE_CASE;
private boolean exportLastRecordedValueForSynchronousGauge = true;
private AggregationTemporalitySelector otelAggregationTemporalitySelector =
AggregationTemporalitySelector.deltaPreferred();
private boolean useOtelExponentialHistogram = true;
Expand Down Expand Up @@ -331,6 +350,11 @@ public Builder setMetricNamingFormat(String metricNamingFormat) {
return this;
}

public Builder setExportLastRecordedValueForSynchronousGauge(boolean exportLastRecordedValueForSynchronousGauge) {
this.exportLastRecordedValueForSynchronousGauge = exportLastRecordedValueForSynchronousGauge;
return this;
}

public Builder setOtelAggregationTemporalitySelector(
AggregationTemporalitySelector otelAggregationTemporalitySelector) {
this.otelAggregationTemporalitySelector = otelAggregationTemporalitySelector;
Expand Down Expand Up @@ -439,6 +463,10 @@ public Builder extractAndSetOtelConfigs(Map<String, String> configs) {
otelHeaders.put(headers[0], headers[1]);
}

if ((configValue = configs.get(OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE)) != null) {
setExportLastRecordedValueForSynchronousGauge(Boolean.parseBoolean(configValue));
}

if ((configValue = configs.get(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE)) != null) {
switch (configValue.toLowerCase(Locale.ROOT)) {
case "cumulative":
Expand Down Expand Up @@ -507,6 +535,12 @@ private void checkAndSetDefaults() {
} else {
LOGGER.warn("OpenTelemetry metrics are disabled");
}

if (exportLastRecordedValueForSynchronousGauge) {
// Override the configured temporality selector to ensure synchronous gauges export last recorded value
otelAggregationTemporalitySelector =
getTemporalitySelector(exportLastRecordedValueForSynchronousGauge, otelAggregationTemporalitySelector);
}
}

public VeniceMetricsConfig build() {
Expand Down Expand Up @@ -572,6 +606,10 @@ public VeniceOpenTelemetryMetricNamingFormat getMetricNamingFormat() {
return metricNamingFormat;
}

public boolean exportLastRecordedValueForSynchronousGauge() {
return exportLastRecordedValueForSynchronousGauge;
}

public AggregationTemporalitySelector getOtelAggregationTemporalitySelector() {
return otelAggregationTemporalitySelector;
}
Expand Down Expand Up @@ -609,4 +647,23 @@ public String toString() {
+ otelExponentialHistogramMaxScale + ", otelExponentialHistogramMaxBuckets="
+ otelExponentialHistogramMaxBuckets + ", tehutiMetricConfig=" + tehutiMetricConfig + '}';
}

/**
* Custom AggregationTemporalitySelector which enforces that if the instrument type is
* GAUGE and {@link #exportLastRecordedValueForSynchronousGauge} is true,
* it returns CUMULATIVE for GAUGE type instruments.
* This is to support Synchronous GAUGE exporting the last set value even when
* it is not set during the last export interval
* Check https://github.com/open-telemetry/opentelemetry-java/pull/7634 for more details
*/
public static AggregationTemporalitySelector getTemporalitySelector(
boolean exportLastRecordedValueForSynchronousGauge,
AggregationTemporalitySelector configuredTemporalitySelector) {
return instrumentType -> {
if (exportLastRecordedValueForSynchronousGauge && instrumentType == GAUGE) {
return AggregationTemporality.CUMULATIVE;
}
return configuredTemporalitySelector.getAggregationTemporality(instrumentType);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_ENDPOINT;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_PROTOCOL;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_CUSTOM_DIMENSIONS_MAP;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_ENABLED;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_EXPORT_TO_ENDPOINT;
Expand All @@ -19,7 +20,10 @@
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.stats.VeniceMetricsConfig.Builder;
import com.linkedin.venice.utils.DataProviderUtils;
import io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.tehuti.metrics.MetricConfig;
import java.util.HashMap;
Expand All @@ -46,7 +50,15 @@ public void testDefaultValuesWithBasicConfig() {
assertTrue(config.getOtelHeaders().isEmpty());
assertFalse(config.exportOtelMetricsToLog());
assertEquals(config.getMetricNamingFormat(), SNAKE_CASE);
assertEquals(config.getOtelAggregationTemporalitySelector(), AggregationTemporalitySelector.deltaPreferred());
assertEquals(config.exportLastRecordedValueForSynchronousGauge(), true);
AggregationTemporalitySelector defaultSelector =
VeniceMetricsConfig.getTemporalitySelector(true, AggregationTemporalitySelector.deltaPreferred());
for (InstrumentType type: InstrumentType.values()) {
assertEquals(
config.getOtelAggregationTemporalitySelector().getAggregationTemporality(type),
defaultSelector.getAggregationTemporality(type),
type.name());
}
assertEquals(config.useOtelExponentialHistogram(), true);
assertEquals(config.getOtelExponentialHistogramMaxScale(), 3);
assertEquals(config.getOtelExponentialHistogramMaxBuckets(), 250);
Expand Down Expand Up @@ -132,20 +144,50 @@ public void testEnableHttpGrpcEndpointConfigWithRequiredFields() {
assertEquals(config.getOtelEndpoint(), "http://localhost");
}

@Test
public void testSetAggregationTemporalitySelector() {
@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
public void testSetAggregationTemporalitySelector(boolean useLastRecordedValueForSynchronousGauge) {
Map<String, String> otelConfigs = new HashMap<>();
otelConfigs.put(OTEL_VENICE_METRICS_ENABLED, "true");
otelConfigs.put(OTEL_VENICE_METRICS_EXPORT_TO_ENDPOINT, "true");
otelConfigs.put(OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, OtlpConfigUtil.PROTOCOL_HTTP_PROTOBUF);
otelConfigs.put(OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, "http://localhost");
otelConfigs.put(
OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE,
Boolean.toString(useLastRecordedValueForSynchronousGauge));
otelConfigs.put(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "delta");

VeniceMetricsConfig config = new Builder().setServiceName("TestService")
.setMetricPrefix("TestPrefix")
.extractAndSetOtelConfigs(otelConfigs)
.build();
assertEquals(config.getOtelAggregationTemporalitySelector(), AggregationTemporalitySelector.deltaPreferred());
AggregationTemporalitySelector defaultSelector = VeniceMetricsConfig.getTemporalitySelector(
useLastRecordedValueForSynchronousGauge,
AggregationTemporalitySelector.deltaPreferred());
for (InstrumentType type: InstrumentType.values()) {
assertEquals(
config.getOtelAggregationTemporalitySelector().getAggregationTemporality(type),
defaultSelector.getAggregationTemporality(type));
}
}

@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
public void testGetTemporalitySelector(boolean useLastRecordedValueForSynchronousGauge) {
AggregationTemporalitySelector[] selectors =
new AggregationTemporalitySelector[] { AggregationTemporalitySelector.deltaPreferred(),
AggregationTemporalitySelector.alwaysCumulative(), AggregationTemporalitySelector.lowMemory() };
for (AggregationTemporalitySelector baseSelector: selectors) {
AggregationTemporalitySelector selector =
VeniceMetricsConfig.getTemporalitySelector(useLastRecordedValueForSynchronousGauge, baseSelector);
for (InstrumentType type: InstrumentType.values()) {
if (useLastRecordedValueForSynchronousGauge && type == InstrumentType.GAUGE) {
// If exportLastRecordedValue and type is GAUGE, must be CUMULATIVE
assertEquals(selector.getAggregationTemporality(type), AggregationTemporality.CUMULATIVE);
} else {
// Otherwise, should be same as baseSelector
assertEquals(selector.getAggregationTemporality(type), baseSelector.getAggregationTemporality(type));
}
}
}
}

@Test(expectedExceptions = IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import com.linkedin.venice.utils.DataProviderUtils;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.tehuti.metrics.MetricConfig;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -63,7 +65,7 @@ private static VeniceOpenTelemetryMetricsRepository createOtelRepo(
VeniceMetricsConfig metricsConfig = new VeniceMetricsConfig.Builder().setEmitOtelMetrics(true)
.setMetricPrefix(METRIC_PREFIX)
.setOtelAdditionalMetricsReader(inMemoryMetricReader)
.setMetricEntities(Arrays.asList(metricEntity))
.setMetricEntities(Collections.singletonList(metricEntity))
.setTehutiMetricConfig(new MetricConfig())
.build();
return new VeniceOpenTelemetryMetricsRepository(metricsConfig);
Expand Down Expand Up @@ -169,29 +171,51 @@ public void testOTelRecordHistogram(boolean isExponentialHistogram) {
}
}

@Test
public void testOTelRecordGauge() {
@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
public void testOTelRecordGauge(boolean exportLastRecordedValueForSynchronousGauge) {
MetricEntity metricEntityGauge = new MetricEntity(
"test_metric_gauge",
MetricType.GAUGE,
MetricUnit.NUMBER,
TEST_DESCRIPTION,
getTestDimensions());
InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create();
InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create(
VeniceMetricsConfig.getTemporalitySelector(
exportLastRecordedValueForSynchronousGauge,
AggregationTemporalitySelector.deltaPreferred()),
DefaultAggregationSelector.getDefault());
VeniceOpenTelemetryMetricsRepository otelMetricsRepository =
createOtelRepo(metricEntityGauge, inMemoryMetricReader);
MetricEntityStateBase metricEntityStateBaseGauge = MetricEntityStateBase
.create(metricEntityGauge, otelMetricsRepository, getBaseDimensionsMap(), getBaseAttributes());
metricEntityStateBaseGauge.record(10L);
metricEntityStateBaseGauge.record(20L);
Collection<MetricData> metrics = inMemoryMetricReader.collectAllMetrics();
assertFalse(metrics.isEmpty(), "Metrics should not be empty");
assertEquals(metrics.size(), 1, "There should be one metric recorded");
// validate the last recorded value is 20L: Note that the validate method calls collectAllMetrics() which is
// equivalent to an export
validateLongPointDataFromGauge(inMemoryMetricReader, 20L, getBaseAttributes(), "test_metric_gauge", METRIC_PREFIX);

// record another value and validate again
metricEntityStateBaseGauge.record(30L);
validateLongPointDataFromGauge(inMemoryMetricReader, 30L, getBaseAttributes(), "test_metric_gauge", METRIC_PREFIX);
if (exportLastRecordedValueForSynchronousGauge) {
// should be able to read the same value again after export
validateLongPointDataFromGauge(
inMemoryMetricReader,
20L,
getBaseAttributes(),
"test_metric_gauge",
METRIC_PREFIX);
} else {
// should not be able to read the same value again after export
try {
validateLongPointDataFromGauge(
inMemoryMetricReader,
20L,
getBaseAttributes(),
"test_metric_gauge",
METRIC_PREFIX);
fail("Should not be able to read the same value again after export");
} catch (AssertionError e) {
// expected
}
}
}

@Test
Expand Down
Loading