diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsConfig.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsConfig.java index f91a2fbebef..ba034a55c12 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsConfig.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/VeniceMetricsConfig.java @@ -1,9 +1,11 @@ package com.linkedin.venice.stats; import static com.linkedin.venice.stats.VeniceOpenTelemetryMetricNamingFormat.SNAKE_CASE; +import static io.opentelemetry.sdk.metrics.InstrumentType.GAUGE; import com.linkedin.venice.stats.metrics.MetricEntity; import io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; import io.opentelemetry.sdk.metrics.export.MetricExporter; import io.opentelemetry.sdk.metrics.export.MetricReader; @@ -126,6 +128,16 @@ public class VeniceMetricsConfig { public static final String OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE = "otel.exporter.otlp.metrics.temporality.preference"; + /** + * Make Synchronous Gauge instruments export the last recorded value even if the instrument is not + * recorded in the current collection interval. + * - If true, it will export the last recorded value even if the instrument is not recorded in the current collection interval. + * - If false, it will use the default behavior of the selected {@link #otelAggregationTemporalitySelector}, which is + * {@link AggregationTemporalitySelector#deltaPreferred()} by default. + */ + public static final String OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE = + "otel.venice.export.last.recorded.value.for.synchronous.gauge"; + /** * Default histogram aggregation to be used for all histograms: Select one of the below
* 1. base2_exponential_bucket_histogram
@@ -208,6 +220,11 @@ public class VeniceMetricsConfig { /** Metric naming conventions for OpenTelemetry metrics */ private final VeniceOpenTelemetryMetricNamingFormat metricNamingFormat; + /** + * Whether to export the last recorded value for synchronous Gauge instruments. + */ + private final boolean exportLastRecordedValueForSynchronousGauge; + /** Aggregation Temporality selector to export only the delta or cumulate or different */ private final AggregationTemporalitySelector otelAggregationTemporalitySelector; @@ -234,6 +251,7 @@ private VeniceMetricsConfig(Builder builder) { this.otelHeaders = builder.otelHeaders; this.exportOtelMetricsToLog = builder.exportOtelMetricsToLog; this.metricNamingFormat = builder.metricNamingFormat; + this.exportLastRecordedValueForSynchronousGauge = builder.exportLastRecordedValueForSynchronousGauge; this.otelAggregationTemporalitySelector = builder.otelAggregationTemporalitySelector; this.useOtelExponentialHistogram = builder.useOtelExponentialHistogram; this.otelExponentialHistogramMaxScale = builder.otelExponentialHistogramMaxScale; @@ -257,6 +275,7 @@ public static class Builder { Map otelHeaders = new HashMap<>(); private boolean exportOtelMetricsToLog = false; private VeniceOpenTelemetryMetricNamingFormat metricNamingFormat = SNAKE_CASE; + private boolean exportLastRecordedValueForSynchronousGauge = true; private AggregationTemporalitySelector otelAggregationTemporalitySelector = AggregationTemporalitySelector.deltaPreferred(); private boolean useOtelExponentialHistogram = true; @@ -331,6 +350,11 @@ public Builder setMetricNamingFormat(String metricNamingFormat) { return this; } + public Builder setExportLastRecordedValueForSynchronousGauge(boolean exportLastRecordedValueForSynchronousGauge) { + this.exportLastRecordedValueForSynchronousGauge = exportLastRecordedValueForSynchronousGauge; + return this; + } + public Builder setOtelAggregationTemporalitySelector( AggregationTemporalitySelector otelAggregationTemporalitySelector) { this.otelAggregationTemporalitySelector = otelAggregationTemporalitySelector; @@ -439,6 +463,10 @@ public Builder extractAndSetOtelConfigs(Map configs) { otelHeaders.put(headers[0], headers[1]); } + if ((configValue = configs.get(OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE)) != null) { + setExportLastRecordedValueForSynchronousGauge(Boolean.parseBoolean(configValue)); + } + if ((configValue = configs.get(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE)) != null) { switch (configValue.toLowerCase(Locale.ROOT)) { case "cumulative": @@ -507,6 +535,12 @@ private void checkAndSetDefaults() { } else { LOGGER.warn("OpenTelemetry metrics are disabled"); } + + if (exportLastRecordedValueForSynchronousGauge) { + // Override the configured temporality selector to ensure synchronous gauges export last recorded value + otelAggregationTemporalitySelector = + getTemporalitySelector(exportLastRecordedValueForSynchronousGauge, otelAggregationTemporalitySelector); + } } public VeniceMetricsConfig build() { @@ -572,6 +606,10 @@ public VeniceOpenTelemetryMetricNamingFormat getMetricNamingFormat() { return metricNamingFormat; } + public boolean exportLastRecordedValueForSynchronousGauge() { + return exportLastRecordedValueForSynchronousGauge; + } + public AggregationTemporalitySelector getOtelAggregationTemporalitySelector() { return otelAggregationTemporalitySelector; } @@ -609,4 +647,23 @@ public String toString() { + otelExponentialHistogramMaxScale + ", otelExponentialHistogramMaxBuckets=" + otelExponentialHistogramMaxBuckets + ", tehutiMetricConfig=" + tehutiMetricConfig + '}'; } + + /** + * Custom AggregationTemporalitySelector which enforces that if the instrument type is + * GAUGE and {@link #exportLastRecordedValueForSynchronousGauge} is true, + * it returns CUMULATIVE for GAUGE type instruments. + * This is to support Synchronous GAUGE exporting the last set value even when + * it is not set during the last export interval + * Check https://github.com/open-telemetry/opentelemetry-java/pull/7634 for more details + */ + public static AggregationTemporalitySelector getTemporalitySelector( + boolean exportLastRecordedValueForSynchronousGauge, + AggregationTemporalitySelector configuredTemporalitySelector) { + return instrumentType -> { + if (exportLastRecordedValueForSynchronousGauge && instrumentType == GAUGE) { + return AggregationTemporality.CUMULATIVE; + } + return configuredTemporalitySelector.getAggregationTemporality(instrumentType); + }; + } } diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsConfigTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsConfigTest.java index 70dcf0860ca..05d3d7cdd65 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsConfigTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/VeniceMetricsConfigTest.java @@ -6,6 +6,7 @@ import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_ENDPOINT; import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_PROTOCOL; import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE; +import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE; import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_CUSTOM_DIMENSIONS_MAP; import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_ENABLED; import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_VENICE_METRICS_EXPORT_TO_ENDPOINT; @@ -19,7 +20,10 @@ import static org.testng.Assert.assertTrue; import com.linkedin.venice.stats.VeniceMetricsConfig.Builder; +import com.linkedin.venice.utils.DataProviderUtils; import io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; import io.tehuti.metrics.MetricConfig; import java.util.HashMap; @@ -46,7 +50,15 @@ public void testDefaultValuesWithBasicConfig() { assertTrue(config.getOtelHeaders().isEmpty()); assertFalse(config.exportOtelMetricsToLog()); assertEquals(config.getMetricNamingFormat(), SNAKE_CASE); - assertEquals(config.getOtelAggregationTemporalitySelector(), AggregationTemporalitySelector.deltaPreferred()); + assertEquals(config.exportLastRecordedValueForSynchronousGauge(), true); + AggregationTemporalitySelector defaultSelector = + VeniceMetricsConfig.getTemporalitySelector(true, AggregationTemporalitySelector.deltaPreferred()); + for (InstrumentType type: InstrumentType.values()) { + assertEquals( + config.getOtelAggregationTemporalitySelector().getAggregationTemporality(type), + defaultSelector.getAggregationTemporality(type), + type.name()); + } assertEquals(config.useOtelExponentialHistogram(), true); assertEquals(config.getOtelExponentialHistogramMaxScale(), 3); assertEquals(config.getOtelExponentialHistogramMaxBuckets(), 250); @@ -132,20 +144,50 @@ public void testEnableHttpGrpcEndpointConfigWithRequiredFields() { assertEquals(config.getOtelEndpoint(), "http://localhost"); } - @Test - public void testSetAggregationTemporalitySelector() { + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testSetAggregationTemporalitySelector(boolean useLastRecordedValueForSynchronousGauge) { Map otelConfigs = new HashMap<>(); otelConfigs.put(OTEL_VENICE_METRICS_ENABLED, "true"); otelConfigs.put(OTEL_VENICE_METRICS_EXPORT_TO_ENDPOINT, "true"); otelConfigs.put(OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, OtlpConfigUtil.PROTOCOL_HTTP_PROTOBUF); otelConfigs.put(OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, "http://localhost"); + otelConfigs.put( + OTEL_VENICE_EXPORT_LAST_RECORDED_VALUE_FOR_SYNCHRONOUS_GAUGE, + Boolean.toString(useLastRecordedValueForSynchronousGauge)); otelConfigs.put(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "delta"); VeniceMetricsConfig config = new Builder().setServiceName("TestService") .setMetricPrefix("TestPrefix") .extractAndSetOtelConfigs(otelConfigs) .build(); - assertEquals(config.getOtelAggregationTemporalitySelector(), AggregationTemporalitySelector.deltaPreferred()); + AggregationTemporalitySelector defaultSelector = VeniceMetricsConfig.getTemporalitySelector( + useLastRecordedValueForSynchronousGauge, + AggregationTemporalitySelector.deltaPreferred()); + for (InstrumentType type: InstrumentType.values()) { + assertEquals( + config.getOtelAggregationTemporalitySelector().getAggregationTemporality(type), + defaultSelector.getAggregationTemporality(type)); + } + } + + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testGetTemporalitySelector(boolean useLastRecordedValueForSynchronousGauge) { + AggregationTemporalitySelector[] selectors = + new AggregationTemporalitySelector[] { AggregationTemporalitySelector.deltaPreferred(), + AggregationTemporalitySelector.alwaysCumulative(), AggregationTemporalitySelector.lowMemory() }; + for (AggregationTemporalitySelector baseSelector: selectors) { + AggregationTemporalitySelector selector = + VeniceMetricsConfig.getTemporalitySelector(useLastRecordedValueForSynchronousGauge, baseSelector); + for (InstrumentType type: InstrumentType.values()) { + if (useLastRecordedValueForSynchronousGauge && type == InstrumentType.GAUGE) { + // If exportLastRecordedValue and type is GAUGE, must be CUMULATIVE + assertEquals(selector.getAggregationTemporality(type), AggregationTemporality.CUMULATIVE); + } else { + // Otherwise, should be same as baseSelector + assertEquals(selector.getAggregationTemporality(type), baseSelector.getAggregationTemporality(type)); + } + } + } } @Test(expectedExceptions = IllegalArgumentException.class) diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/MetricTypeTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/MetricTypeTest.java index c21967c3f14..07137905fba 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/MetricTypeTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/metrics/MetricTypeTest.java @@ -18,10 +18,12 @@ import com.linkedin.venice.utils.DataProviderUtils; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; +import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.tehuti.metrics.MetricConfig; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -63,7 +65,7 @@ private static VeniceOpenTelemetryMetricsRepository createOtelRepo( VeniceMetricsConfig metricsConfig = new VeniceMetricsConfig.Builder().setEmitOtelMetrics(true) .setMetricPrefix(METRIC_PREFIX) .setOtelAdditionalMetricsReader(inMemoryMetricReader) - .setMetricEntities(Arrays.asList(metricEntity)) + .setMetricEntities(Collections.singletonList(metricEntity)) .setTehutiMetricConfig(new MetricConfig()) .build(); return new VeniceOpenTelemetryMetricsRepository(metricsConfig); @@ -169,29 +171,51 @@ public void testOTelRecordHistogram(boolean isExponentialHistogram) { } } - @Test - public void testOTelRecordGauge() { + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testOTelRecordGauge(boolean exportLastRecordedValueForSynchronousGauge) { MetricEntity metricEntityGauge = new MetricEntity( "test_metric_gauge", MetricType.GAUGE, MetricUnit.NUMBER, TEST_DESCRIPTION, getTestDimensions()); - InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create(); + InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create( + VeniceMetricsConfig.getTemporalitySelector( + exportLastRecordedValueForSynchronousGauge, + AggregationTemporalitySelector.deltaPreferred()), + DefaultAggregationSelector.getDefault()); VeniceOpenTelemetryMetricsRepository otelMetricsRepository = createOtelRepo(metricEntityGauge, inMemoryMetricReader); MetricEntityStateBase metricEntityStateBaseGauge = MetricEntityStateBase .create(metricEntityGauge, otelMetricsRepository, getBaseDimensionsMap(), getBaseAttributes()); metricEntityStateBaseGauge.record(10L); metricEntityStateBaseGauge.record(20L); - Collection metrics = inMemoryMetricReader.collectAllMetrics(); - assertFalse(metrics.isEmpty(), "Metrics should not be empty"); - assertEquals(metrics.size(), 1, "There should be one metric recorded"); + // validate the last recorded value is 20L: Note that the validate method calls collectAllMetrics() which is + // equivalent to an export validateLongPointDataFromGauge(inMemoryMetricReader, 20L, getBaseAttributes(), "test_metric_gauge", METRIC_PREFIX); - // record another value and validate again - metricEntityStateBaseGauge.record(30L); - validateLongPointDataFromGauge(inMemoryMetricReader, 30L, getBaseAttributes(), "test_metric_gauge", METRIC_PREFIX); + if (exportLastRecordedValueForSynchronousGauge) { + // should be able to read the same value again after export + validateLongPointDataFromGauge( + inMemoryMetricReader, + 20L, + getBaseAttributes(), + "test_metric_gauge", + METRIC_PREFIX); + } else { + // should not be able to read the same value again after export + try { + validateLongPointDataFromGauge( + inMemoryMetricReader, + 20L, + getBaseAttributes(), + "test_metric_gauge", + METRIC_PREFIX); + fail("Should not be able to read the same value again after export"); + } catch (AssertionError e) { + // expected + } + } } @Test