diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsConfig.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsConfig.java
index f91a2fbebef..ba034a55c12 100644
--- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsConfig.java
+++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsConfig.java
@@ -1,9 +1,11 @@
package com.linkedin.venice.stats;
import static com.linkedin.venice.stats.VeniceOpenTelemetryMetricNamingFormat.SNAKE_CASE;
+import static io.opentelemetry.sdk.metrics.InstrumentType.GAUGE;
import com.linkedin.venice.stats.metrics.MetricEntity;
import io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.MetricReader;
@@ -126,6 +128,16 @@ public class VeniceMetricsConfig {
public static final String OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE =
"otel.exporter.otlp.metrics.temporality.preference";
+ /**
+ * Make Synchronous Gauge instruments export the last recorded value even if the instrument is not
+ * recorded in the current collection interval.
+ * - If true, it will export the last recorded value even if the instrument is not recorded in the current collection interval.
+ * - If false, it will use the default behavior of the selected {@link #otelAggregationTemporalitySelector}, which is
+ * {@link AggregationTemporalitySelector#deltaPreferred()} by default.
+ */
+ public static final String OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE =
+ "otel.venice.export.last.recorded.value.for.synchronous.gauge";
+
/**
* Default histogram aggregation to be used for all histograms: Select one of the below
* 1. base2_exponential_bucket_histogram
@@ -208,6 +220,11 @@ public class VeniceMetricsConfig {
/** Metric naming conventions for OpenTelemetry metrics */
private final VeniceOpenTelemetryMetricNamingFormat metricNamingFormat;
+ /**
+ * Whether to export the last recorded value for synchronous Gauge instruments.
+ */
+ private final boolean exportLastRecordedValueForSynchronousGauge;
+
/** Aggregation Temporality selector to export only the delta or cumulate or different */
private final AggregationTemporalitySelector otelAggregationTemporalitySelector;
@@ -234,6 +251,7 @@ private VeniceMetricsConfig(Builder builder) {
this.otelHeaders = builder.otelHeaders;
this.exportOtelMetricsToLog = builder.exportOtelMetricsToLog;
this.metricNamingFormat = builder.metricNamingFormat;
+ this.exportLastRecordedValueForSynchronousGauge = builder.exportLastRecordedValueForSynchronousGauge;
this.otelAggregationTemporalitySelector = builder.otelAggregationTemporalitySelector;
this.useOtelExponentialHistogram = builder.useOtelExponentialHistogram;
this.otelExponentialHistogramMaxScale = builder.otelExponentialHistogramMaxScale;
@@ -257,6 +275,7 @@ public static class Builder {
Map otelHeaders = new HashMap<>();
private boolean exportOtelMetricsToLog = false;
private VeniceOpenTelemetryMetricNamingFormat metricNamingFormat = SNAKE_CASE;
+ private boolean exportLastRecordedValueForSynchronousGauge = true;
private AggregationTemporalitySelector otelAggregationTemporalitySelector =
AggregationTemporalitySelector.deltaPreferred();
private boolean useOtelExponentialHistogram = true;
@@ -331,6 +350,11 @@ public Builder setMetricNamingFormat(String metricNamingFormat) {
return this;
}
+ public Builder setExportLastRecordedValueForSynchronousGauge(boolean exportLastRecordedValueForSynchronousGauge) {
+ this.exportLastRecordedValueForSynchronousGauge = exportLastRecordedValueForSynchronousGauge;
+ return this;
+ }
+
public Builder setOtelAggregationTemporalitySelector(
AggregationTemporalitySelector otelAggregationTemporalitySelector) {
this.otelAggregationTemporalitySelector = otelAggregationTemporalitySelector;
@@ -439,6 +463,10 @@ public Builder extractAndSetOtelConfigs(Map configs) {
otelHeaders.put(headers[0], headers[1]);
}
+ if ((configValue = configs.get(OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE)) != null) {
+ setExportLastRecordedValueForSynchronousGauge(Boolean.parseBoolean(configValue));
+ }
+
if ((configValue = configs.get(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE)) != null) {
switch (configValue.toLowerCase(Locale.ROOT)) {
case "cumulative":
@@ -507,6 +535,12 @@ private void checkAndSetDefaults() {
} else {
LOGGER.warn("OpenTelemetry metrics are disabled");
}
+
+ if (exportLastRecordedValueForSynchronousGauge) {
+ // Override the configured temporality selector to ensure synchronous gauges export last recorded value
+ otelAggregationTemporalitySelector =
+ getTemporalitySelector(exportLastRecordedValueForSynchronousGauge, otelAggregationTemporalitySelector);
+ }
}
public VeniceMetricsConfig build() {
@@ -572,6 +606,10 @@ public VeniceOpenTelemetryMetricNamingFormat getMetricNamingFormat() {
return metricNamingFormat;
}
+ public boolean exportLastRecordedValueForSynchronousGauge() {
+ return exportLastRecordedValueForSynchronousGauge;
+ }
+
public AggregationTemporalitySelector getOtelAggregationTemporalitySelector() {
return otelAggregationTemporalitySelector;
}
@@ -609,4 +647,23 @@ public String toString() {
+ otelExponentialHistogramMaxScale + ", otelExponentialHistogramMaxBuckets="
+ otelExponentialHistogramMaxBuckets + ", tehutiMetricConfig=" + tehutiMetricConfig + '}';
}
+
+ /**
+ * Custom AggregationTemporalitySelector which enforces that if the instrument type is
+ * GAUGE and {@link #exportLastRecordedValueForSynchronousGauge} is true,
+ * it returns CUMULATIVE for GAUGE type instruments.
+ * This is to support Synchronous GAUGE exporting the last set value even when
+ * it is not set during the last export interval
+ * Check https://github.com/open-telemetry/opentelemetry-java/pull/7634 for more details
+ */
+ public static AggregationTemporalitySelector getTemporalitySelector(
+ boolean exportLastRecordedValueForSynchronousGauge,
+ AggregationTemporalitySelector configuredTemporalitySelector) {
+ return instrumentType -> {
+ if (exportLastRecordedValueForSynchronousGauge && instrumentType == GAUGE) {
+ return AggregationTemporality.CUMULATIVE;
+ }
+ return configuredTemporalitySelector.getAggregationTemporality(instrumentType);
+ };
+ }
}
diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsConfigTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsConfigTest.java
index 70dcf0860ca..05d3d7cdd65 100644
--- a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsConfigTest.java
+++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsConfigTest.java
@@ -6,6 +6,7 @@
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_ENDPOINT;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_PROTOCOL;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE;
+import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_CUSTOM_DIMENSIONS_MAP;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_ENABLED;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_EXPORT_TO_ENDPOINT;
@@ -19,7 +20,10 @@
import static org.testng.Assert.assertTrue;
import com.linkedin.venice.stats.VeniceMetricsConfig.Builder;
+import com.linkedin.venice.utils.DataProviderUtils;
import io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.tehuti.metrics.MetricConfig;
import java.util.HashMap;
@@ -46,7 +50,15 @@ public void testDefaultValuesWithBasicConfig() {
assertTrue(config.getOtelHeaders().isEmpty());
assertFalse(config.exportOtelMetricsToLog());
assertEquals(config.getMetricNamingFormat(), SNAKE_CASE);
- assertEquals(config.getOtelAggregationTemporalitySelector(), AggregationTemporalitySelector.deltaPreferred());
+ assertEquals(config.exportLastRecordedValueForSynchronousGauge(), true);
+ AggregationTemporalitySelector defaultSelector =
+ VeniceMetricsConfig.getTemporalitySelector(true, AggregationTemporalitySelector.deltaPreferred());
+ for (InstrumentType type: InstrumentType.values()) {
+ assertEquals(
+ config.getOtelAggregationTemporalitySelector().getAggregationTemporality(type),
+ defaultSelector.getAggregationTemporality(type),
+ type.name());
+ }
assertEquals(config.useOtelExponentialHistogram(), true);
assertEquals(config.getOtelExponentialHistogramMaxScale(), 3);
assertEquals(config.getOtelExponentialHistogramMaxBuckets(), 250);
@@ -132,20 +144,50 @@ public void testEnableHttpGrpcEndpointConfigWithRequiredFields() {
assertEquals(config.getOtelEndpoint(), "http://localhost");
}
- @Test
- public void testSetAggregationTemporalitySelector() {
+ @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
+ public void testSetAggregationTemporalitySelector(boolean useLastRecordedValueForSynchronousGauge) {
Map otelConfigs = new HashMap<>();
otelConfigs.put(OTEL_VENICE_METRICS_ENABLED, "true");
otelConfigs.put(OTEL_VENICE_METRICS_EXPORT_TO_ENDPOINT, "true");
otelConfigs.put(OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, OtlpConfigUtil.PROTOCOL_HTTP_PROTOBUF);
otelConfigs.put(OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, "http://localhost");
+ otelConfigs.put(
+ OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE,
+ Boolean.toString(useLastRecordedValueForSynchronousGauge));
otelConfigs.put(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "delta");
VeniceMetricsConfig config = new Builder().setServiceName("TestService")
.setMetricPrefix("TestPrefix")
.extractAndSetOtelConfigs(otelConfigs)
.build();
- assertEquals(config.getOtelAggregationTemporalitySelector(), AggregationTemporalitySelector.deltaPreferred());
+ AggregationTemporalitySelector defaultSelector = VeniceMetricsConfig.getTemporalitySelector(
+ useLastRecordedValueForSynchronousGauge,
+ AggregationTemporalitySelector.deltaPreferred());
+ for (InstrumentType type: InstrumentType.values()) {
+ assertEquals(
+ config.getOtelAggregationTemporalitySelector().getAggregationTemporality(type),
+ defaultSelector.getAggregationTemporality(type));
+ }
+ }
+
+ @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
+ public void testGetTemporalitySelector(boolean useLastRecordedValueForSynchronousGauge) {
+ AggregationTemporalitySelector[] selectors =
+ new AggregationTemporalitySelector[] { AggregationTemporalitySelector.deltaPreferred(),
+ AggregationTemporalitySelector.alwaysCumulative(), AggregationTemporalitySelector.lowMemory() };
+ for (AggregationTemporalitySelector baseSelector: selectors) {
+ AggregationTemporalitySelector selector =
+ VeniceMetricsConfig.getTemporalitySelector(useLastRecordedValueForSynchronousGauge, baseSelector);
+ for (InstrumentType type: InstrumentType.values()) {
+ if (useLastRecordedValueForSynchronousGauge && type == InstrumentType.GAUGE) {
+ // If exportLastRecordedValue and type is GAUGE, must be CUMULATIVE
+ assertEquals(selector.getAggregationTemporality(type), AggregationTemporality.CUMULATIVE);
+ } else {
+ // Otherwise, should be same as baseSelector
+ assertEquals(selector.getAggregationTemporality(type), baseSelector.getAggregationTemporality(type));
+ }
+ }
+ }
}
@Test(expectedExceptions = IllegalArgumentException.class)
diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/MetricTypeTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/MetricTypeTest.java
index c21967c3f14..07137905fba 100644
--- a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/MetricTypeTest.java
+++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/MetricTypeTest.java
@@ -18,10 +18,12 @@
import com.linkedin.venice.utils.DataProviderUtils;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
+import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.tehuti.metrics.MetricConfig;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -63,7 +65,7 @@ private static VeniceOpenTelemetryMetricsRepository createOtelRepo(
VeniceMetricsConfig metricsConfig = new VeniceMetricsConfig.Builder().setEmitOtelMetrics(true)
.setMetricPrefix(METRIC_PREFIX)
.setOtelAdditionalMetricsReader(inMemoryMetricReader)
- .setMetricEntities(Arrays.asList(metricEntity))
+ .setMetricEntities(Collections.singletonList(metricEntity))
.setTehutiMetricConfig(new MetricConfig())
.build();
return new VeniceOpenTelemetryMetricsRepository(metricsConfig);
@@ -169,29 +171,51 @@ public void testOTelRecordHistogram(boolean isExponentialHistogram) {
}
}
- @Test
- public void testOTelRecordGauge() {
+ @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
+ public void testOTelRecordGauge(boolean exportLastRecordedValueForSynchronousGauge) {
MetricEntity metricEntityGauge = new MetricEntity(
"test_metric_gauge",
MetricType.GAUGE,
MetricUnit.NUMBER,
TEST_DESCRIPTION,
getTestDimensions());
- InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create();
+ InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create(
+ VeniceMetricsConfig.getTemporalitySelector(
+ exportLastRecordedValueForSynchronousGauge,
+ AggregationTemporalitySelector.deltaPreferred()),
+ DefaultAggregationSelector.getDefault());
VeniceOpenTelemetryMetricsRepository otelMetricsRepository =
createOtelRepo(metricEntityGauge, inMemoryMetricReader);
MetricEntityStateBase metricEntityStateBaseGauge = MetricEntityStateBase
.create(metricEntityGauge, otelMetricsRepository, getBaseDimensionsMap(), getBaseAttributes());
metricEntityStateBaseGauge.record(10L);
metricEntityStateBaseGauge.record(20L);
- Collection metrics = inMemoryMetricReader.collectAllMetrics();
- assertFalse(metrics.isEmpty(), "Metrics should not be empty");
- assertEquals(metrics.size(), 1, "There should be one metric recorded");
+ // validate the last recorded value is 20L: Note that the validate method calls collectAllMetrics() which is
+ // equivalent to an export
validateLongPointDataFromGauge(inMemoryMetricReader, 20L, getBaseAttributes(), "test_metric_gauge", METRIC_PREFIX);
- // record another value and validate again
- metricEntityStateBaseGauge.record(30L);
- validateLongPointDataFromGauge(inMemoryMetricReader, 30L, getBaseAttributes(), "test_metric_gauge", METRIC_PREFIX);
+ if (exportLastRecordedValueForSynchronousGauge) {
+ // should be able to read the same value again after export
+ validateLongPointDataFromGauge(
+ inMemoryMetricReader,
+ 20L,
+ getBaseAttributes(),
+ "test_metric_gauge",
+ METRIC_PREFIX);
+ } else {
+ // should not be able to read the same value again after export
+ try {
+ validateLongPointDataFromGauge(
+ inMemoryMetricReader,
+ 20L,
+ getBaseAttributes(),
+ "test_metric_gauge",
+ METRIC_PREFIX);
+ fail("Should not be able to read the same value again after export");
+ } catch (AssertionError e) {
+ // expected
+ }
+ }
}
@Test