diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramAggregationParam.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramAggregationParam.java index 162794f52d8..629d2671abd 100644 --- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramAggregationParam.java +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramAggregationParam.java @@ -17,27 +17,27 @@ public enum HistogramAggregationParam { new DoubleExplicitBucketHistogramAggregator( ExplicitBucketHistogramUtils.createBoundaryArray( ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES), - ExemplarReservoir::doubleNoSamples, + ExemplarReservoir::noSamples, IMMUTABLE_DATA)), EXPLICIT_SINGLE_BUCKET( new DoubleExplicitBucketHistogramAggregator( ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()), - ExemplarReservoir::doubleNoSamples, + ExemplarReservoir::noSamples, IMMUTABLE_DATA)), EXPONENTIAL_SMALL_CIRCULAR_BUFFER( new DoubleBase2ExponentialHistogramAggregator( - ExemplarReservoir::doubleNoSamples, 20, 0, IMMUTABLE_DATA)), + ExemplarReservoir::noSamples, 20, 0, IMMUTABLE_DATA)), EXPONENTIAL_CIRCULAR_BUFFER( new DoubleBase2ExponentialHistogramAggregator( - ExemplarReservoir::doubleNoSamples, 160, 0, IMMUTABLE_DATA)); + ExemplarReservoir::noSamples, 160, 0, IMMUTABLE_DATA)); - private final Aggregator aggregator; + private final Aggregator aggregator; - HistogramAggregationParam(Aggregator aggregator) { + HistogramAggregationParam(Aggregator aggregator) { this.aggregator = aggregator; } - public Aggregator getAggregator() { + public Aggregator getAggregator() { return this.aggregator; } } diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java index a86bf733259..5379779a17c 100644 --- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java @@ -33,7 +33,7 @@ public class HistogramBenchmark { public static class ThreadState { @Param HistogramValueGenerator valueGen; @Param HistogramAggregationParam aggregation; - private AggregatorHandle aggregatorHandle; + private AggregatorHandle aggregatorHandle; private DoubleSupplier valueSupplier; @Setup(Level.Trial) diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java index ab2bc6b57c7..f3a95befb16 100644 --- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java @@ -39,7 +39,7 @@ public class HistogramScaleBenchmark { public static class ThreadState { @Param HistogramValueGenerator valueGen; @Param HistogramAggregationParam aggregation; - private AggregatorHandle aggregatorHandle; + private AggregatorHandle aggregatorHandle; private DoubleSupplier valueSupplier; @Setup(Level.Invocation) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java index b31b3721428..525506725b3 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java @@ -304,7 +304,7 @@ WriteableMetricStorage registerSynchronousMetricStorage(InstrumentDescriptor ins /** Register new asynchronous storage associated with a given instrument. */ SdkObservableMeasurement registerObservableMeasurement( InstrumentDescriptor instrumentDescriptor) { - List> registeredStorages = new ArrayList<>(); + List> registeredStorages = new ArrayList<>(); for (Map.Entry entry : readerStorageRegistries.entrySet()) { RegisteredReader reader = entry.getKey(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AbstractSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AbstractSumAggregator.java index 612219a2836..a7574254408 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AbstractSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AbstractSumAggregator.java @@ -11,7 +11,7 @@ import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; abstract class AbstractSumAggregator - implements Aggregator { + implements Aggregator { private final boolean isMonotonic; AbstractSumAggregator(InstrumentDescriptor instrumentDescriptor) { diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java index db167bab5b4..79492f61c66 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java @@ -7,8 +7,6 @@ import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricDataType; import io.opentelemetry.sdk.metrics.data.PointData; @@ -25,9 +23,9 @@ * at any time. */ @Immutable -public interface Aggregator { +public interface Aggregator { /** Returns the drop aggregator, an aggregator that drops measurements. */ - static Aggregator drop() { + static Aggregator drop() { return DropAggregator.INSTANCE; } @@ -37,7 +35,7 @@ static Aggregator drop() { * * @return a new {@link AggregatorHandle}. */ - AggregatorHandle createHandle(); + AggregatorHandle createHandle(); /** * Returns a new DELTA point by computing the difference between two cumulative points. diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorFactory.java index 071df0fca81..8c5941acba3 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorFactory.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorFactory.java @@ -6,7 +6,6 @@ package io.opentelemetry.sdk.metrics.internal.aggregator; import io.opentelemetry.sdk.common.export.MemoryMode; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; @@ -31,7 +30,7 @@ public interface AggregatorFactory { * @return a new {@link Aggregator}. {@link Aggregator#drop()} indicates no measurements should be * recorded. */ - Aggregator createAggregator( + Aggregator createAggregator( InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter, MemoryMode memoryMode); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java index 2bf7c8c4db6..c954136c973 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java @@ -7,7 +7,8 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; -import io.opentelemetry.sdk.metrics.data.ExemplarData; +import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; +import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; import java.util.List; @@ -24,14 +25,16 @@ * at any time. */ @ThreadSafe -public abstract class AggregatorHandle { +public abstract class AggregatorHandle { // A reservoir of sampled exemplars for this time period. - private final ExemplarReservoir exemplarReservoir; + private final ExemplarReservoir exemplarReservoir; private volatile boolean valuesRecorded = false; + private final boolean isDoubleType; - protected AggregatorHandle(ExemplarReservoir exemplarReservoir) { + protected AggregatorHandle(ExemplarReservoir exemplarReservoir) { this.exemplarReservoir = exemplarReservoir; + this.isDoubleType = isDoubleType(); } /** @@ -44,21 +47,54 @@ public final T aggregateThenMaybeReset( valuesRecorded = false; } - return doAggregateThenMaybeReset( + if (isDoubleType) { + return doAggregateThenMaybeResetDoubles( + startEpochNanos, + epochNanos, + attributes, + exemplarReservoir.collectAndResetDoubles(attributes), + reset); + } + return doAggregateThenMaybeResetLongs( startEpochNanos, epochNanos, attributes, - exemplarReservoir.collectAndReset(attributes), + exemplarReservoir.collectAndResetLongs(attributes), reset); } + /** + * Indicates whether this {@link AggregatorHandle} supports double or long values. + * + *

If it supports doubles, it MUST implement {@link #doAggregateThenMaybeResetDoubles(long, + * long, Attributes, List, boolean)} and {@link #doRecordDouble(double)}. + * + *

If it supports long, it MUST implement {@link #doAggregateThenMaybeResetLongs(long, long, + * Attributes, List, boolean)} and {@link #doRecordLong(long)}. + * + * @return true if it supports doubles, false if it supports longs. + */ + protected abstract boolean isDoubleType(); + + /** Implementation of the {@link #aggregateThenMaybeReset(long, long, Attributes, boolean)} . */ + protected T doAggregateThenMaybeResetDoubles( + long startEpochNanos, + long epochNanos, + Attributes attributes, + List exemplars, + boolean reset) { + throw new UnsupportedOperationException("This aggregator does not support double values."); + } + /** Implementation of the {@link #aggregateThenMaybeReset(long, long, Attributes, boolean)} . */ - protected abstract T doAggregateThenMaybeReset( + protected T doAggregateThenMaybeResetLongs( long startEpochNanos, long epochNanos, Attributes attributes, - List exemplars, - boolean reset); + List exemplars, + boolean reset) { + throw new UnsupportedOperationException("This aggregator does not support long values."); + } public final void recordLong(long value, Attributes attributes, Context context) { exemplarReservoir.offerLongMeasurement(value, attributes, context); @@ -82,8 +118,7 @@ public final void recordLong(long value) { * values. */ protected void doRecordLong(long value) { - throw new UnsupportedOperationException( - "This aggregator does not support recording long values."); + throw new UnsupportedOperationException("This aggregator does not support long values."); } public final void recordDouble(double value, Attributes attributes, Context context) { @@ -108,8 +143,7 @@ public final void recordDouble(double value) { * double values. */ protected void doRecordDouble(double value) { - throw new UnsupportedOperationException( - "This aggregator does not support recording double values."); + throw new UnsupportedOperationException("This aggregator does not support double values."); } /** diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java index 18b5b60ee4b..cc9556fa9d1 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java @@ -35,9 +35,9 @@ * at any time. */ public final class DoubleBase2ExponentialHistogramAggregator - implements Aggregator { + implements Aggregator { - private final Supplier> reservoirSupplier; + private final Supplier reservoirSupplier; private final int maxBuckets; private final int maxScale; private final MemoryMode memoryMode; @@ -48,7 +48,7 @@ public final class DoubleBase2ExponentialHistogramAggregator * @param reservoirSupplier Supplier of exemplar reservoirs per-stream. */ public DoubleBase2ExponentialHistogramAggregator( - Supplier> reservoirSupplier, + Supplier reservoirSupplier, int maxBuckets, int maxScale, MemoryMode memoryMode) { @@ -59,7 +59,7 @@ public DoubleBase2ExponentialHistogramAggregator( } @Override - public AggregatorHandle createHandle() { + public AggregatorHandle createHandle() { return new Handle(reservoirSupplier.get(), maxBuckets, maxScale, memoryMode); } @@ -79,8 +79,7 @@ public MetricData toMetricData( ImmutableExponentialHistogramData.create(temporality, points)); } - static final class Handle - extends AggregatorHandle { + static final class Handle extends AggregatorHandle { private final int maxBuckets; private final int maxScale; @Nullable private DoubleBase2ExponentialHistogramBuckets positiveBuckets; @@ -96,11 +95,7 @@ static final class Handle // Used only when MemoryMode = REUSABLE_DATA @Nullable private final MutableExponentialHistogramPointData reusablePoint; - Handle( - ExemplarReservoir reservoir, - int maxBuckets, - int maxScale, - MemoryMode memoryMode) { + Handle(ExemplarReservoir reservoir, int maxBuckets, int maxScale, MemoryMode memoryMode) { super(reservoir); this.maxBuckets = maxBuckets; this.maxScale = maxScale; @@ -118,7 +113,7 @@ static final class Handle } @Override - protected synchronized ExponentialHistogramPointData doAggregateThenMaybeReset( + protected synchronized ExponentialHistogramPointData doAggregateThenMaybeResetDoubles( long startEpochNanos, long epochNanos, Attributes attributes, @@ -261,6 +256,11 @@ protected synchronized void doRecordDouble(double value) { } } + @Override + protected boolean isDoubleType() { + return true; + } + @Override protected void doRecordLong(long value) { doRecordDouble((double) value); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java index 82d71f25313..d4ccdabe0b6 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java @@ -36,14 +36,14 @@ * at any time. */ public final class DoubleExplicitBucketHistogramAggregator - implements Aggregator { + implements Aggregator { private final double[] boundaries; private final MemoryMode memoryMode; // a cache for converting to MetricData private final List boundaryList; - private final Supplier> reservoirSupplier; + private final Supplier reservoirSupplier; /** * Constructs an explicit bucket histogram aggregator. @@ -53,9 +53,7 @@ public final class DoubleExplicitBucketHistogramAggregator * @param memoryMode The {@link MemoryMode} to use in this aggregator. */ public DoubleExplicitBucketHistogramAggregator( - double[] boundaries, - Supplier> reservoirSupplier, - MemoryMode memoryMode) { + double[] boundaries, Supplier reservoirSupplier, MemoryMode memoryMode) { this.boundaries = boundaries; this.memoryMode = memoryMode; @@ -68,7 +66,7 @@ public DoubleExplicitBucketHistogramAggregator( } @Override - public AggregatorHandle createHandle() { + public AggregatorHandle createHandle() { return new Handle(this.boundaryList, this.boundaries, reservoirSupplier.get(), memoryMode); } @@ -88,7 +86,7 @@ public MetricData toMetricData( ImmutableHistogramData.create(temporality, pointData)); } - static final class Handle extends AggregatorHandle { + static final class Handle extends AggregatorHandle { // read-only private final List boundaryList; // read-only @@ -117,7 +115,7 @@ static final class Handle extends AggregatorHandle boundaryList, double[] boundaries, - ExemplarReservoir reservoir, + ExemplarReservoir reservoir, MemoryMode memoryMode) { super(reservoir); this.boundaryList = boundaryList; @@ -133,7 +131,12 @@ static final class Handle extends AggregatorHandle { - private final Supplier> reservoirSupplier; +public final class DoubleLastValueAggregator implements Aggregator { + private final Supplier reservoirSupplier; private final MemoryMode memoryMode; public DoubleLastValueAggregator( - Supplier> reservoirSupplier, MemoryMode memoryMode) { + Supplier reservoirSupplier, MemoryMode memoryMode) { this.reservoirSupplier = reservoirSupplier; this.memoryMode = memoryMode; } @Override - public AggregatorHandle createHandle() { + public AggregatorHandle createHandle() { return new Handle(reservoirSupplier.get(), memoryMode); } @@ -94,14 +93,14 @@ public MetricData toMetricData( ImmutableGaugeData.create(points)); } - static final class Handle extends AggregatorHandle { + static final class Handle extends AggregatorHandle { private final AtomicReference current = new AtomicReference<>(null); private final AtomicLong valueBits = new AtomicLong(); // Only used when memoryMode is REUSABLE_DATA @Nullable private final MutableDoublePointData reusablePoint; - private Handle(ExemplarReservoir reservoir, MemoryMode memoryMode) { + private Handle(ExemplarReservoir reservoir, MemoryMode memoryMode) { super(reservoir); if (memoryMode == MemoryMode.REUSABLE_DATA) { reusablePoint = new MutableDoublePointData(); @@ -111,7 +110,7 @@ private Handle(ExemplarReservoir reservoir, MemoryMode memor } @Override - protected DoublePointData doAggregateThenMaybeReset( + protected DoublePointData doAggregateThenMaybeResetDoubles( long startEpochNanos, long epochNanos, Attributes attributes, @@ -129,6 +128,11 @@ protected DoublePointData doAggregateThenMaybeReset( } } + @Override + protected boolean isDoubleType() { + return true; + } + @Override protected void doRecordDouble(double value) { valueBits.set(Double.doubleToLongBits(value)); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java index c0701d7e0ec..110e248abd9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java @@ -35,7 +35,7 @@ */ public final class DoubleSumAggregator extends AbstractSumAggregator { - private final Supplier> reservoirSupplier; + private final Supplier reservoirSupplier; private final MemoryMode memoryMode; /** @@ -47,7 +47,7 @@ public final class DoubleSumAggregator */ public DoubleSumAggregator( InstrumentDescriptor instrumentDescriptor, - Supplier> reservoirSupplier, + Supplier reservoirSupplier, MemoryMode memoryMode) { super(instrumentDescriptor); @@ -56,7 +56,7 @@ public DoubleSumAggregator( } @Override - public AggregatorHandle createHandle() { + public AggregatorHandle createHandle() { return new Handle(reservoirSupplier.get(), memoryMode); } @@ -107,19 +107,24 @@ public MetricData toMetricData( ImmutableSumData.create(isMonotonic(), temporality, points)); } - static final class Handle extends AggregatorHandle { + static final class Handle extends AggregatorHandle { private final DoubleAdder current = AdderUtil.createDoubleAdder(); // Only used if memoryMode == MemoryMode.REUSABLE_DATA @Nullable private final MutableDoublePointData reusablePoint; - Handle(ExemplarReservoir exemplarReservoir, MemoryMode memoryMode) { + Handle(ExemplarReservoir exemplarReservoir, MemoryMode memoryMode) { super(exemplarReservoir); reusablePoint = memoryMode == MemoryMode.REUSABLE_DATA ? new MutableDoublePointData() : null; } @Override - protected DoublePointData doAggregateThenMaybeReset( + protected boolean isDoubleType() { + return true; + } + + @Override + protected DoublePointData doAggregateThenMaybeResetDoubles( long startEpochNanos, long epochNanos, Attributes attributes, diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java index 28336b8fb4a..9d47b8967d7 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java @@ -25,7 +25,7 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class DropAggregator implements Aggregator { +public final class DropAggregator implements Aggregator { private static final PointData POINT_DATA = new PointData() { @@ -50,12 +50,17 @@ public List getExemplars() { } }; - public static final Aggregator INSTANCE = new DropAggregator(); + public static final Aggregator INSTANCE = new DropAggregator(); - private static final AggregatorHandle HANDLE = - new AggregatorHandle(ExemplarReservoir.doubleNoSamples()) { + private static final AggregatorHandle HANDLE = + new AggregatorHandle(ExemplarReservoir.noSamples()) { @Override - protected PointData doAggregateThenMaybeReset( + protected boolean isDoubleType() { + return true; + } + + @Override + protected PointData doAggregateThenMaybeResetDoubles( long startEpochNanos, long epochNanos, Attributes attributes, @@ -74,7 +79,7 @@ protected void doRecordDouble(double value) {} private DropAggregator() {} @Override - public AggregatorHandle createHandle() { + public AggregatorHandle createHandle() { return HANDLE; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java index e5a8d67682d..ee0202d976a 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java @@ -37,18 +37,18 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class LongLastValueAggregator implements Aggregator { - private final Supplier> reservoirSupplier; +public final class LongLastValueAggregator implements Aggregator { + private final Supplier reservoirSupplier; private final MemoryMode memoryMode; public LongLastValueAggregator( - Supplier> reservoirSupplier, MemoryMode memoryMode) { + Supplier reservoirSupplier, MemoryMode memoryMode) { this.reservoirSupplier = reservoirSupplier; this.memoryMode = memoryMode; } @Override - public AggregatorHandle createHandle() { + public AggregatorHandle createHandle() { return new Handle(reservoirSupplier.get(), memoryMode); } @@ -89,14 +89,14 @@ public MetricData toMetricData( ImmutableGaugeData.create(points)); } - static final class Handle extends AggregatorHandle { + static final class Handle extends AggregatorHandle { @Nullable private static final Long DEFAULT_VALUE = null; private final AtomicReference current = new AtomicReference<>(DEFAULT_VALUE); // Only used when memoryMode is REUSABLE_DATA @Nullable private final MutableLongPointData reusablePoint; - Handle(ExemplarReservoir exemplarReservoir, MemoryMode memoryMode) { + Handle(ExemplarReservoir exemplarReservoir, MemoryMode memoryMode) { super(exemplarReservoir); if (memoryMode == MemoryMode.REUSABLE_DATA) { reusablePoint = new MutableLongPointData(); @@ -106,7 +106,12 @@ static final class Handle extends AggregatorHandle { - private final Supplier> reservoirSupplier; + private final Supplier reservoirSupplier; private final MemoryMode memoryMode; public LongSumAggregator( InstrumentDescriptor instrumentDescriptor, - Supplier> reservoirSupplier, + Supplier reservoirSupplier, MemoryMode memoryMode) { super(instrumentDescriptor); this.reservoirSupplier = reservoirSupplier; @@ -49,7 +49,7 @@ public LongSumAggregator( } @Override - public AggregatorHandle createHandle() { + public AggregatorHandle createHandle() { return new Handle(reservoirSupplier.get(), memoryMode); } @@ -100,20 +100,20 @@ public MetricData toMetricData( ImmutableSumData.create(isMonotonic(), temporality, points)); } - static final class Handle extends AggregatorHandle { + static final class Handle extends AggregatorHandle { private final LongAdder current = AdderUtil.createLongAdder(); // Only used if memoryMode == MemoryMode.REUSABLE_DATA @Nullable private final MutableLongPointData reusablePointData; - Handle(ExemplarReservoir exemplarReservoir, MemoryMode memoryMode) { + Handle(ExemplarReservoir exemplarReservoir, MemoryMode memoryMode) { super(exemplarReservoir); reusablePointData = memoryMode == MemoryMode.REUSABLE_DATA ? new MutableLongPointData() : null; } @Override - protected LongPointData doAggregateThenMaybeReset( + protected LongPointData doAggregateThenMaybeResetLongs( long startEpochNanos, long epochNanos, Attributes attributes, @@ -129,6 +129,11 @@ protected LongPointData doAggregateThenMaybeReset( } } + @Override + protected boolean isDoubleType() { + return false; + } + @Override public void doRecordLong(long value) { current.add(value); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoir.java index c0de6e0df6a..c29d159409f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoir.java @@ -24,59 +24,42 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public interface ExemplarReservoir { +public interface ExemplarReservoir { /** * Wraps an {@link ExemplarReservoir}, casting calls from {@link * ExemplarReservoir#offerLongMeasurement(long, Attributes, Context)} to {@link * ExemplarReservoir#offerDoubleMeasurement(double, Attributes, Context)} such that {@link - * ExemplarReservoir#collectAndReset(Attributes)} only returns {@link DoubleExemplarData}. + * ExemplarReservoir#collectAndResetDoubles(Attributes)} only returns {@link DoubleExemplarData} + * and {@link ExemplarReservoir#collectAndResetLongs(Attributes)} throws. * *

This is used for {@link Aggregation#explicitBucketHistogram()} and {@link * Aggregation#base2ExponentialBucketHistogram()} which only support double measurements. */ - static ExemplarReservoir longToDouble(ExemplarReservoir delegate) { - return new LongToDoubleExemplarReservoir<>(delegate); + static ExemplarReservoir longToDouble(ExemplarReservoir delegate) { + return new LongToDoubleExemplarReservoir(delegate); } /** Wraps a {@link ExemplarReservoir} with a measurement pre-filter. */ - static ExemplarReservoir filtered( - ExemplarFilter filter, ExemplarReservoir original) { - return new FilteredExemplarReservoir<>(filter, original); + static ExemplarReservoir filtered(ExemplarFilter filter, ExemplarReservoir original) { + return new FilteredExemplarReservoir(filter, original); } - /** A double exemplar reservoir that stores no exemplars. */ - static ExemplarReservoir doubleNoSamples() { - return NoopExemplarReservoir.DOUBLE_INSTANCE; - } - - /** A long exemplar reservoir that stores no exemplars. */ - static ExemplarReservoir longNoSamples() { - return NoopExemplarReservoir.LONG_INSTANCE; - } - - /** - * A double reservoir with fixed size that stores the given number of exemplars. - * - * @param clock The clock to use when annotating measurements with time. - * @param size The maximum number of exemplars to preserve. - * @param randomSupplier The random number generator to use for sampling. - */ - static ExemplarReservoir doubleFixedSizeReservoir( - Clock clock, int size, Supplier randomSupplier) { - return RandomFixedSizeExemplarReservoir.createDouble(clock, size, randomSupplier); + /** An exemplar reservoir that stores no exemplars. */ + static ExemplarReservoir noSamples() { + return NoopExemplarReservoir.INSTANCE; } /** - * A long reservoir with fixed size that stores the given number of exemplars. + * A reservoir with fixed size that stores the given number of exemplars. * * @param clock The clock to use when annotating measurements with time. * @param size The maximum number of exemplars to preserve. * @param randomSupplier The random number generator to use for sampling. */ - static ExemplarReservoir longFixedSizeReservoir( + static ExemplarReservoir fixedSizeReservoir( Clock clock, int size, Supplier randomSupplier) { - return RandomFixedSizeExemplarReservoir.createLong(clock, size, randomSupplier); + return RandomFixedSizeExemplarReservoir.create(clock, size, randomSupplier); } /** @@ -86,8 +69,7 @@ static ExemplarReservoir longFixedSizeReservoir( * @param boundaries A list of (inclusive) upper bounds for the histogram. Should be in order from * lowest to highest. */ - static ExemplarReservoir histogramBucketReservoir( - Clock clock, List boundaries) { + static ExemplarReservoir histogramBucketReservoir(Clock clock, List boundaries) { return new HistogramExemplarReservoir(clock, boundaries); } @@ -107,5 +89,7 @@ static ExemplarReservoir histogramBucketReservoir( * @return An (immutable) list of sampled exemplars for this point. Implementers are expected to * filter out {@code pointAttributes} from the original recorded attributes. */ - List collectAndReset(Attributes pointAttributes); + List collectAndResetDoubles(Attributes pointAttributes); + + List collectAndResetLongs(Attributes pointAttributes); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/FilteredExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/FilteredExemplarReservoir.java index 17415f9d06e..4db9ce05d6f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/FilteredExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/FilteredExemplarReservoir.java @@ -7,15 +7,16 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; -import io.opentelemetry.sdk.metrics.data.ExemplarData; +import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; +import io.opentelemetry.sdk.metrics.data.LongExemplarData; import java.util.List; /** A reservoir that has a pre-filter on measurements. */ -class FilteredExemplarReservoir implements ExemplarReservoir { +class FilteredExemplarReservoir implements ExemplarReservoir { private final ExemplarFilter filter; - private final ExemplarReservoir reservoir; + private final ExemplarReservoir reservoir; - FilteredExemplarReservoir(ExemplarFilter filter, ExemplarReservoir reservoir) { + FilteredExemplarReservoir(ExemplarFilter filter, ExemplarReservoir reservoir) { this.filter = filter; this.reservoir = reservoir; } @@ -35,7 +36,12 @@ public void offerLongMeasurement(long value, Attributes attributes, Context cont } @Override - public List collectAndReset(Attributes pointAttributes) { - return reservoir.collectAndReset(pointAttributes); + public List collectAndResetDoubles(Attributes pointAttributes) { + return reservoir.collectAndResetDoubles(pointAttributes); + } + + @Override + public List collectAndResetLongs(Attributes pointAttributes) { + return reservoir.collectAndResetLongs(pointAttributes); } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/FixedSizeExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/FixedSizeExemplarReservoir.java index 20254de5991..7d1e442f539 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/FixedSizeExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/FixedSizeExemplarReservoir.java @@ -8,7 +8,9 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; import io.opentelemetry.sdk.metrics.data.ExemplarData; +import io.opentelemetry.sdk.metrics.data.LongExemplarData; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -16,26 +18,20 @@ import javax.annotation.Nullable; /** Base for fixed-size reservoir sampling of Exemplars. */ -abstract class FixedSizeExemplarReservoir implements ExemplarReservoir { +class FixedSizeExemplarReservoir implements ExemplarReservoir { @Nullable private ReservoirCell[] storage; private final ReservoirCellSelector reservoirCellSelector; - private final BiFunction mapAndResetCell; private final int size; private final Clock clock; private volatile boolean hasMeasurements = false; /** Instantiates an exemplar reservoir of fixed size. */ - FixedSizeExemplarReservoir( - Clock clock, - int size, - ReservoirCellSelector reservoirCellSelector, - BiFunction mapAndResetCell) { + FixedSizeExemplarReservoir(Clock clock, int size, ReservoirCellSelector reservoirCellSelector) { this.storage = null; // lazily initialize to avoid allocations this.size = size; this.clock = clock; this.reservoirCellSelector = reservoirCellSelector; - this.mapAndResetCell = mapAndResetCell; } @Override @@ -50,6 +46,16 @@ public void offerLongMeasurement(long value, Attributes attributes, Context cont } } + @Override + public List collectAndResetDoubles(Attributes pointAttributes) { + return doCollectAndReset(pointAttributes, ReservoirCell::getAndResetDouble); + } + + @Override + public List collectAndResetLongs(Attributes pointAttributes) { + return doCollectAndReset(pointAttributes, ReservoirCell::getAndResetLong); + } + @Override public void offerDoubleMeasurement(double value, Attributes attributes, Context context) { if (storage == null) { @@ -70,8 +76,8 @@ private ReservoirCell[] initStorage() { return storage; } - @Override - public List collectAndReset(Attributes pointAttributes) { + public List doCollectAndReset( + Attributes pointAttributes, BiFunction mapAndResetCell) { if (!hasMeasurements || storage == null) { return Collections.emptyList(); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/HistogramExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/HistogramExemplarReservoir.java index ac06e64058e..8b5ae4f97e0 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/HistogramExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/HistogramExemplarReservoir.java @@ -8,19 +8,14 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.Clock; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; import io.opentelemetry.sdk.metrics.internal.aggregator.ExplicitBucketHistogramUtils; import java.util.List; /** A reservoir that records the latest measurement for each histogram bucket. */ -class HistogramExemplarReservoir extends FixedSizeExemplarReservoir { +class HistogramExemplarReservoir extends FixedSizeExemplarReservoir { HistogramExemplarReservoir(Clock clock, List boundaries) { - super( - clock, - boundaries.size() + 1, - new HistogramCellSelector(boundaries), - ReservoirCell::getAndResetDouble); + super(clock, boundaries.size() + 1, new HistogramCellSelector(boundaries)); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongToDoubleExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongToDoubleExemplarReservoir.java index c3a6b98fced..afec03ebaa7 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongToDoubleExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongToDoubleExemplarReservoir.java @@ -7,14 +7,15 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; -import io.opentelemetry.sdk.metrics.data.ExemplarData; +import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; +import io.opentelemetry.sdk.metrics.data.LongExemplarData; import java.util.List; -class LongToDoubleExemplarReservoir implements ExemplarReservoir { +class LongToDoubleExemplarReservoir implements ExemplarReservoir { - private final ExemplarReservoir delegate; + private final ExemplarReservoir delegate; - LongToDoubleExemplarReservoir(ExemplarReservoir delegate) { + LongToDoubleExemplarReservoir(ExemplarReservoir delegate) { this.delegate = delegate; } @@ -29,7 +30,13 @@ public void offerLongMeasurement(long value, Attributes attributes, Context cont } @Override - public List collectAndReset(Attributes pointAttributes) { - return delegate.collectAndReset(pointAttributes); + public List collectAndResetDoubles(Attributes pointAttributes) { + return delegate.collectAndResetDoubles(pointAttributes); + } + + @Override + public List collectAndResetLongs(Attributes pointAttributes) { + throw new UnsupportedOperationException( + "This exemplar reservoir does not support collecting long values."); } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/NoopExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/NoopExemplarReservoir.java index ec08dd7db42..4deef5f2ba4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/NoopExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/NoopExemplarReservoir.java @@ -8,18 +8,14 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.LongExemplarData; import java.util.Collections; import java.util.List; /** A reservoir that keeps no exemplars. */ -class NoopExemplarReservoir implements ExemplarReservoir { +class NoopExemplarReservoir implements ExemplarReservoir { - static final NoopExemplarReservoir LONG_INSTANCE = - new NoopExemplarReservoir<>(); - static final NoopExemplarReservoir DOUBLE_INSTANCE = - new NoopExemplarReservoir<>(); + static final NoopExemplarReservoir INSTANCE = new NoopExemplarReservoir(); private NoopExemplarReservoir() {} @@ -34,7 +30,12 @@ public void offerLongMeasurement(long value, Attributes attributes, Context cont } @Override - public List collectAndReset(Attributes pointAttributes) { + public List collectAndResetDoubles(Attributes pointAttributes) { + return Collections.emptyList(); + } + + @Override + public List collectAndResetLongs(Attributes pointAttributes) { return Collections.emptyList(); } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/RandomFixedSizeExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/RandomFixedSizeExemplarReservoir.java index 5880394b37f..86e0c6a0ec4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/RandomFixedSizeExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/RandomFixedSizeExemplarReservoir.java @@ -8,13 +8,9 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.Clock; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; -import io.opentelemetry.sdk.metrics.data.ExemplarData; -import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.internal.concurrent.AdderUtil; import io.opentelemetry.sdk.metrics.internal.concurrent.LongAdder; import java.util.Random; -import java.util.function.BiFunction; import java.util.function.Supplier; /** @@ -22,30 +18,19 @@ * where the probability of sampling decrease as the number of observations continue. * *

When measurements are collected via {@link - * FixedSizeExemplarReservoir#collectAndReset(Attributes)}, the observation count is reset, making - * the probability of samplings effectively 1.0. + * FixedSizeExemplarReservoir#collectAndResetDoubles(Attributes)} (Attributes)} and {@link + * FixedSizeExemplarReservoir#collectAndResetLongs(Attributes)}, the observation count is reset, + * making the probability of samplings effectively 1.0. */ -class RandomFixedSizeExemplarReservoir - extends FixedSizeExemplarReservoir { +class RandomFixedSizeExemplarReservoir extends FixedSizeExemplarReservoir { - private RandomFixedSizeExemplarReservoir( - Clock clock, - int size, - Supplier randomSupplier, - BiFunction mapAndResetCell) { - super(clock, size, new RandomCellSelector(randomSupplier), mapAndResetCell); + private RandomFixedSizeExemplarReservoir(Clock clock, int size, Supplier randomSupplier) { + super(clock, size, new RandomCellSelector(randomSupplier)); } - static RandomFixedSizeExemplarReservoir createLong( + static RandomFixedSizeExemplarReservoir create( Clock clock, int size, Supplier randomSupplier) { - return new RandomFixedSizeExemplarReservoir<>( - clock, size, randomSupplier, ReservoirCell::getAndResetLong); - } - - static RandomFixedSizeExemplarReservoir createDouble( - Clock clock, int size, Supplier randomSupplier) { - return new RandomFixedSizeExemplarReservoir<>( - clock, size, randomSupplier, ReservoirCell::getAndResetDouble); + return new RandomFixedSizeExemplarReservoir(clock, size, randomSupplier); } static class RandomCellSelector implements ReservoirCellSelector { diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 6463226f8bc..62a686e35b2 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -16,7 +16,6 @@ import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.View; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; @@ -46,15 +45,14 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class AsynchronousMetricStorage - implements MetricStorage { +public final class AsynchronousMetricStorage implements MetricStorage { private static final Logger logger = Logger.getLogger(AsynchronousMetricStorage.class.getName()); private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger); private final RegisteredReader registeredReader; private final MetricDescriptor metricDescriptor; private final AggregationTemporality aggregationTemporality; - private final Aggregator aggregator; + private final Aggregator aggregator; private final AttributesProcessor attributesProcessor; private final MemoryMode memoryMode; @@ -65,16 +63,16 @@ public final class AsynchronousMetricStorage> aggregatorHandles; + private final Map> aggregatorHandles; // Only populated if aggregationTemporality == DELTA private Map lastPoints; // Only populated if memoryMode == REUSABLE_DATA private final ObjectPool reusablePointsPool; - private final ObjectPool> reusableHandlesPool; - private final Function> handleBuilder; - private final BiConsumer> handleReleaser; + private final ObjectPool> reusableHandlesPool; + private final Function> handleBuilder; + private final BiConsumer> handleReleaser; private final BiConsumer pointReleaser; private final List reusablePointsList = new ArrayList<>(); @@ -92,7 +90,7 @@ public final class AsynchronousMetricStorage aggregator, + Aggregator aggregator, AttributesProcessor attributesProcessor, int maxCardinality, boolean enabled) { @@ -126,16 +124,15 @@ private AsynchronousMetricStorage( * Create an asynchronous storage instance for the {@link View} and {@link InstrumentDescriptor}. */ // TODO(anuraaga): The cast to generic type here looks suspicious. - public static - AsynchronousMetricStorage create( - RegisteredReader registeredReader, - RegisteredView registeredView, - InstrumentDescriptor instrumentDescriptor, - boolean enabled) { + public static AsynchronousMetricStorage create( + RegisteredReader registeredReader, + RegisteredView registeredView, + InstrumentDescriptor instrumentDescriptor, + boolean enabled) { View view = registeredView.getView(); MetricDescriptor metricDescriptor = MetricDescriptor.create(view, registeredView.getViewSourceInfo(), instrumentDescriptor); - Aggregator aggregator = + Aggregator aggregator = ((AggregatorFactory) view.getAggregation()) .createAggregator( instrumentDescriptor, @@ -153,14 +150,14 @@ AsynchronousMetricStorage create( /** Record callback measurement from {@link ObservableLongMeasurement}. */ void record(Attributes attributes, long value) { attributes = validateAndProcessAttributes(attributes); - AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); + AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); handle.recordLong(value, attributes, Context.current()); } /** Record callback measurement from {@link ObservableDoubleMeasurement}. */ void record(Attributes attributes, double value) { attributes = validateAndProcessAttributes(attributes); - AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); + AggregatorHandle handle = aggregatorHandles.computeIfAbsent(attributes, handleBuilder); handle.recordDouble(value, attributes, Context.current()); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 77070ace40d..c95a62596d2 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -15,7 +15,6 @@ import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; @@ -41,7 +40,7 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class DefaultSynchronousMetricStorage +public final class DefaultSynchronousMetricStorage implements SynchronousMetricStorage { private static final Logger internalLogger = @@ -51,8 +50,8 @@ public final class DefaultSynchronousMetricStorage aggregator; - private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); + private final Aggregator aggregator; + private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); private final AttributesProcessor attributesProcessor; private final MemoryMode memoryMode; @@ -62,7 +61,7 @@ public final class DefaultSynchronousMetricStorage> + private volatile ConcurrentHashMap> previousCollectionAggregatorHandles = new ConcurrentHashMap<>(); /** @@ -71,7 +70,7 @@ public final class DefaultSynchronousMetricStorage> aggregatorHandlePool = + private final ConcurrentLinkedQueue> aggregatorHandlePool = new ConcurrentLinkedQueue<>(); private boolean enabled; @@ -79,7 +78,7 @@ public final class DefaultSynchronousMetricStorage aggregator, + Aggregator aggregator, AttributesProcessor attributesProcessor, int maxCardinality, boolean enabled) { @@ -97,7 +96,7 @@ public final class DefaultSynchronousMetricStorage> getAggregatorHandlePool() { + Queue> getAggregatorHandlePool() { return aggregatorHandlePool; } @@ -106,9 +105,9 @@ public void recordLong(long value, Attributes attributes, Context context) { if (!enabled) { return; } - AggregatorHolder aggregatorHolder = getHolderForRecord(); + AggregatorHolder aggregatorHolder = getHolderForRecord(); try { - AggregatorHandle handle = + AggregatorHandle handle = getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); handle.recordLong(value, attributes, context); } finally { @@ -131,9 +130,9 @@ public void recordDouble(double value, Attributes attributes, Context context) { + ". Dropping measurement."); return; } - AggregatorHolder aggregatorHolder = getHolderForRecord(); + AggregatorHolder aggregatorHolder = getHolderForRecord(); try { - AggregatorHandle handle = + AggregatorHandle handle = getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); handle.recordDouble(value, attributes, context); } finally { @@ -159,9 +158,9 @@ public boolean isEnabled() { * #releaseHolderForRecord(AggregatorHolder)} when record operation completes to signal to that * its safe to proceed with Collect operations. */ - private AggregatorHolder getHolderForRecord() { + private AggregatorHolder getHolderForRecord() { do { - AggregatorHolder aggregatorHolder = this.aggregatorHolder; + AggregatorHolder aggregatorHolder = this.aggregatorHolder; int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2); if (recordsInProgress % 2 == 0) { return aggregatorHolder; @@ -177,17 +176,17 @@ private AggregatorHolder getHolderForRecord() { * Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to indicate * that recording is complete, and it is safe to collect. */ - private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { + private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { aggregatorHolder.activeRecordingThreads.addAndGet(-2); } - private AggregatorHandle getAggregatorHandle( - ConcurrentHashMap> aggregatorHandles, + private AggregatorHandle getAggregatorHandle( + ConcurrentHashMap> aggregatorHandles, Attributes attributes, Context context) { Objects.requireNonNull(attributes, "attributes"); attributes = attributesProcessor.process(attributes, context); - AggregatorHandle handle = aggregatorHandles.get(attributes); + AggregatorHandle handle = aggregatorHandles.get(attributes); if (handle != null) { return handle; } @@ -207,7 +206,7 @@ private AggregatorHandle getAggregatorHandle( } } // Get handle from pool if available, else create a new one. - AggregatorHandle newHandle = aggregatorHandlePool.poll(); + AggregatorHandle newHandle = aggregatorHandlePool.poll(); if (newHandle == null) { newHandle = aggregator.createHandle(); } @@ -227,9 +226,9 @@ public MetricData collect( ? registeredReader.getLastCollectEpochNanos() : startEpochNanos; - ConcurrentHashMap> aggregatorHandles; + ConcurrentHashMap> aggregatorHandles; if (reset) { - AggregatorHolder holder = this.aggregatorHolder; + AggregatorHolder holder = this.aggregatorHolder; this.aggregatorHolder = (memoryMode == REUSABLE_DATA) ? new AggregatorHolder<>(previousCollectionAggregatorHandles) @@ -327,8 +326,8 @@ public MetricDescriptor getMetricDescriptor() { return metricDescriptor; } - private static class AggregatorHolder { - private final ConcurrentHashMap> aggregatorHandles; + private static class AggregatorHolder { + private final ConcurrentHashMap> aggregatorHandles; // Recording threads grab the current interval (AggregatorHolder) and atomically increment // this by 2 before recording against it (and then decrement by two when done). // @@ -349,8 +348,7 @@ private AggregatorHolder() { aggregatorHandles = new ConcurrentHashMap<>(); } - private AggregatorHolder( - ConcurrentHashMap> aggregatorHandles) { + private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) { this.aggregatorHandles = aggregatorHandles; } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java index 6e195fa545f..697402339b3 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java @@ -31,7 +31,7 @@ public final class SdkObservableMeasurement private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger); private final InstrumentationScopeInfo instrumentationScopeInfo; private final InstrumentDescriptor instrumentDescriptor; - private final List> storages; + private final List> storages; // These fields are set before invoking callbacks. They allow measurements to be recorded to the // storages for correct reader, and with the correct time. @@ -40,7 +40,7 @@ public final class SdkObservableMeasurement private SdkObservableMeasurement( InstrumentationScopeInfo instrumentationScopeInfo, InstrumentDescriptor instrumentDescriptor, - List> storages) { + List> storages) { this.instrumentationScopeInfo = instrumentationScopeInfo; this.instrumentDescriptor = instrumentDescriptor; this.storages = storages; @@ -57,7 +57,7 @@ private SdkObservableMeasurement( public static SdkObservableMeasurement create( InstrumentationScopeInfo instrumentationScopeInfo, InstrumentDescriptor instrumentDescriptor, - List> storages) { + List> storages) { return new SdkObservableMeasurement(instrumentationScopeInfo, instrumentDescriptor, storages); } @@ -73,7 +73,7 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() { public void setActiveReader( RegisteredReader registeredReader, long startEpochNanos, long epochNanos) { this.activeReader = registeredReader; - for (AsynchronousMetricStorage storage : storages) { + for (AsynchronousMetricStorage storage : storages) { if (storage.getRegisteredReader().equals(activeReader)) { storage.setEpochInformation(startEpochNanos, epochNanos); } @@ -91,7 +91,7 @@ InstrumentDescriptor getInstrumentDescriptor() { return instrumentDescriptor; } - List> getStorages() { + List> getStorages() { return storages; } @@ -108,7 +108,7 @@ public void record(long value, Attributes attributes) { return; } - for (AsynchronousMetricStorage storage : storages) { + for (AsynchronousMetricStorage storage : storages) { if (storage.getRegisteredReader().equals(activeReader)) { storage.record(attributes, value); } @@ -139,7 +139,7 @@ public void record(double value, Attributes attributes) { return; } - for (AsynchronousMetricStorage storage : storages) { + for (AsynchronousMetricStorage storage : storages) { if (storage.getRegisteredReader().equals(activeReader)) { storage.record(attributes, value); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java index efb49692563..bc9d8f60610 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java @@ -6,7 +6,6 @@ package io.opentelemetry.sdk.metrics.internal.state; import io.opentelemetry.sdk.metrics.View; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; @@ -36,7 +35,7 @@ static SynchronousMetricStorage empty() { * @return The storage, or {@link EmptyMetricStorage#empty()} if the instrument should not be * recorded. */ - static SynchronousMetricStorage create( + static SynchronousMetricStorage create( RegisteredReader registeredReader, RegisteredView registeredView, InstrumentDescriptor instrumentDescriptor, @@ -45,7 +44,7 @@ static SynchronousMetricStorage cr View view = registeredView.getView(); MetricDescriptor metricDescriptor = MetricDescriptor.create(view, registeredView.getViewSourceInfo(), instrumentDescriptor); - Aggregator aggregator = + Aggregator aggregator = ((AggregatorFactory) view.getAggregation()) .createAggregator( instrumentDescriptor, exemplarFilter, registeredReader.getReader().getMemoryMode()); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java index a4facefca7a..b1f1796abc3 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java @@ -11,7 +11,6 @@ import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.internal.RandomSupplier; import io.opentelemetry.sdk.metrics.Aggregation; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.MetricDataType; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; @@ -66,17 +65,17 @@ public static Aggregation create(int maxBuckets, int maxScale) { @Override @SuppressWarnings("unchecked") - public Aggregator createAggregator( + public Aggregator createAggregator( InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter, MemoryMode memoryMode) { - return (Aggregator) + return (Aggregator) new DoubleBase2ExponentialHistogramAggregator( () -> ExemplarReservoir.filtered( exemplarFilter, ExemplarReservoir.longToDouble( - ExemplarReservoir.doubleFixedSizeReservoir( + ExemplarReservoir.fixedSizeReservoir( Clock.getDefault(), Runtime.getRuntime().availableProcessors(), RandomSupplier.platformDefault()))), diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java index 494e486c333..e329ff9425f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java @@ -8,7 +8,6 @@ import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.Aggregation; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; @@ -58,7 +57,7 @@ private static Aggregation resolve(InstrumentDescriptor instrument, boolean with } @Override - public Aggregator createAggregator( + public Aggregator createAggregator( InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter, MemoryMode memoryMode) { diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java index 2b27938c20a..38db4acd381 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java @@ -7,7 +7,6 @@ import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.Aggregation; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; @@ -32,11 +31,11 @@ private DropAggregation() {} @Override @SuppressWarnings("unchecked") - public Aggregator createAggregator( + public Aggregator createAggregator( InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter, MemoryMode memoryMode) { - return (Aggregator) Aggregator.drop(); + return (Aggregator) Aggregator.drop(); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ExplicitBucketHistogramAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ExplicitBucketHistogramAggregation.java index e452d02792d..6af85aecbb6 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ExplicitBucketHistogramAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ExplicitBucketHistogramAggregation.java @@ -8,7 +8,6 @@ import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.Aggregation; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; @@ -50,11 +49,11 @@ private ExplicitBucketHistogramAggregation(List bucketBoundaries) { @Override @SuppressWarnings("unchecked") - public Aggregator createAggregator( + public Aggregator createAggregator( InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter, MemoryMode memoryMode) { - return (Aggregator) + return (Aggregator) new DoubleExplicitBucketHistogramAggregator( bucketBoundaryArray, () -> diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/LastValueAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/LastValueAggregation.java index 693af692c93..93771fcebde 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/LastValueAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/LastValueAggregation.java @@ -10,9 +10,6 @@ import io.opentelemetry.sdk.internal.RandomSupplier; import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; -import io.opentelemetry.sdk.metrics.data.ExemplarData; -import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; @@ -41,37 +38,25 @@ private LastValueAggregation() {} @Override @SuppressWarnings("unchecked") - public Aggregator createAggregator( + public Aggregator createAggregator( InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter, MemoryMode memoryMode) { - // For the initial version we do not sample exemplars on gauges. + Supplier reservoirFactory = + () -> + ExemplarReservoir.filtered( + exemplarFilter, + ExemplarReservoir.fixedSizeReservoir( + Clock.getDefault(), + Runtime.getRuntime().availableProcessors(), + RandomSupplier.platformDefault())); + switch (instrumentDescriptor.getValueType()) { case LONG: - { - Supplier> reservoirFactory = - () -> - ExemplarReservoir.filtered( - exemplarFilter, - ExemplarReservoir.longFixedSizeReservoir( - Clock.getDefault(), - Runtime.getRuntime().availableProcessors(), - RandomSupplier.platformDefault())); - return (Aggregator) new LongLastValueAggregator(reservoirFactory, memoryMode); - } + return (Aggregator) new LongLastValueAggregator(reservoirFactory, memoryMode); case DOUBLE: - { - Supplier> reservoirFactory = - () -> - ExemplarReservoir.filtered( - exemplarFilter, - ExemplarReservoir.doubleFixedSizeReservoir( - Clock.getDefault(), - Runtime.getRuntime().availableProcessors(), - RandomSupplier.platformDefault())); - return (Aggregator) new DoubleLastValueAggregator(reservoirFactory, memoryMode); - } + return (Aggregator) new DoubleLastValueAggregator(reservoirFactory, memoryMode); } throw new IllegalArgumentException("Invalid instrument value type"); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/SumAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/SumAggregation.java index 7ca1294666b..78ac8f8df55 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/SumAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/SumAggregation.java @@ -9,9 +9,6 @@ import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.internal.RandomSupplier; import io.opentelemetry.sdk.metrics.Aggregation; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; -import io.opentelemetry.sdk.metrics.data.ExemplarData; -import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; @@ -39,37 +36,25 @@ private SumAggregation() {} @Override @SuppressWarnings("unchecked") - public Aggregator createAggregator( + public Aggregator createAggregator( InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter, MemoryMode memoryMode) { + Supplier reservoirFactory = + () -> + ExemplarReservoir.filtered( + exemplarFilter, + ExemplarReservoir.fixedSizeReservoir( + Clock.getDefault(), + Runtime.getRuntime().availableProcessors(), + RandomSupplier.platformDefault())); switch (instrumentDescriptor.getValueType()) { case LONG: - { - Supplier> reservoirFactory = - () -> - ExemplarReservoir.filtered( - exemplarFilter, - ExemplarReservoir.longFixedSizeReservoir( - Clock.getDefault(), - Runtime.getRuntime().availableProcessors(), - RandomSupplier.platformDefault())); - return (Aggregator) - new LongSumAggregator(instrumentDescriptor, reservoirFactory, memoryMode); - } + return (Aggregator) + new LongSumAggregator(instrumentDescriptor, reservoirFactory, memoryMode); case DOUBLE: - { - Supplier> reservoirFactory = - () -> - ExemplarReservoir.filtered( - exemplarFilter, - ExemplarReservoir.doubleFixedSizeReservoir( - Clock.getDefault(), - Runtime.getRuntime().availableProcessors(), - RandomSupplier.platformDefault())); - return (Aggregator) - new DoubleSumAggregator(instrumentDescriptor, reservoirFactory, memoryMode); - } + return (Aggregator) + new DoubleSumAggregator(instrumentDescriptor, reservoirFactory, memoryMode); } throw new IllegalArgumentException("Invalid instrument value type"); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java index 276b5e7b055..5291bc2cdf7 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandleTest.java @@ -33,33 +33,32 @@ @ExtendWith(MockitoExtension.class) class AggregatorHandleTest { - @Mock ExemplarReservoir doubleReservoir; - @Mock ExemplarReservoir longReservoir; + @Mock ExemplarReservoir doubleReservoir; + @Mock ExemplarReservoir longReservoir; @Test void testRecordings() { - TestAggregatorHandle testAggregator = new TestAggregatorHandle<>(doubleReservoir); + TestLongAggregatorHandle testLongAggregator = new TestLongAggregatorHandle(longReservoir); - testAggregator.recordLong(22); - assertThat(testAggregator.recordedLong.get()).isEqualTo(22); - assertThat(testAggregator.recordedDouble.get()).isEqualTo(0); + testLongAggregator.recordLong(22); + assertThat(testLongAggregator.recordedLong.get()).isEqualTo(22); - testAggregator.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true); - assertThat(testAggregator.recordedLong.get()).isEqualTo(0); - assertThat(testAggregator.recordedDouble.get()).isEqualTo(0); + testLongAggregator.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true); + assertThat(testLongAggregator.recordedLong.get()).isEqualTo(0); - testAggregator.recordDouble(33.55); - assertThat(testAggregator.recordedLong.get()).isEqualTo(0); - assertThat(testAggregator.recordedDouble.get()).isEqualTo(33.55); + TestDoubleAggregatorHandle testDoubleAggregator = + new TestDoubleAggregatorHandle(doubleReservoir); - testAggregator.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true); - assertThat(testAggregator.recordedLong.get()).isEqualTo(0); - assertThat(testAggregator.recordedDouble.get()).isEqualTo(0); + testDoubleAggregator.recordDouble(33.55); + assertThat(testDoubleAggregator.recordedDouble.get()).isEqualTo(33.55); + + testDoubleAggregator.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true); + assertThat(testDoubleAggregator.recordedDouble.get()).isEqualTo(0); } @Test void testOfferMeasurementLongToExemplar() { - TestAggregatorHandle testAggregator = new TestAggregatorHandle<>(longReservoir); + TestLongAggregatorHandle testAggregator = new TestLongAggregatorHandle(longReservoir); Attributes attributes = Attributes.builder().put("test", "value").build(); Context context = Context.root(); testAggregator.recordLong(1L, attributes, context); @@ -68,7 +67,7 @@ void testOfferMeasurementLongToExemplar() { @Test void testOfferMeasurementDoubleToExemplar() { - TestAggregatorHandle testAggregator = new TestAggregatorHandle<>(doubleReservoir); + TestDoubleAggregatorHandle testAggregator = new TestDoubleAggregatorHandle(doubleReservoir); Attributes attributes = Attributes.builder().put("test", "value").build(); Context context = Context.root(); testAggregator.recordDouble(1.0d, attributes, context); @@ -77,8 +76,7 @@ void testOfferMeasurementDoubleToExemplar() { @Test void testGenerateExemplarsOnCollect() { - TestAggregatorHandle testAggregator = - new TestAggregatorHandle<>(doubleReservoir); + TestDoubleAggregatorHandle testAggregator = new TestDoubleAggregatorHandle(doubleReservoir); Attributes attributes = Attributes.builder().put("test", "value").build(); DoubleExemplarData result = ImmutableDoubleExemplarData.create( @@ -92,29 +90,66 @@ void testGenerateExemplarsOnCollect() { 1); // We need to first record a value so that collect and reset does something. testAggregator.recordDouble(1.0, Attributes.empty(), Context.root()); - Mockito.when(doubleReservoir.collectAndReset(attributes)) + Mockito.when(doubleReservoir.collectAndResetDoubles(attributes)) .thenReturn(Collections.singletonList(result)); testAggregator.aggregateThenMaybeReset(0, 1, attributes, /* reset= */ true); - assertThat(testAggregator.recordedExemplars.get()).containsExactly(result); + assertThat(testAggregator.recordedExemplars.get()).isEqualTo(Collections.singletonList(result)); } - private static class TestAggregatorHandle - extends AggregatorHandle { + private static class TestDoubleAggregatorHandle extends TestAggregatorHandle { + + TestDoubleAggregatorHandle(ExemplarReservoir reservoir) { + super(reservoir); + } + + @Override + protected boolean isDoubleType() { + return true; + } + } + + private static class TestLongAggregatorHandle extends TestAggregatorHandle { + + TestLongAggregatorHandle(ExemplarReservoir reservoir) { + super(reservoir); + } + + @Override + protected boolean isDoubleType() { + return false; + } + } + + private abstract static class TestAggregatorHandle extends AggregatorHandle { final AtomicLong recordedLong = new AtomicLong(); final AtomicDouble recordedDouble = new AtomicDouble(); - final AtomicReference> recordedExemplars = new AtomicReference<>(); + final AtomicReference> recordedExemplars = new AtomicReference<>(); - TestAggregatorHandle(ExemplarReservoir reservoir) { + TestAggregatorHandle(ExemplarReservoir reservoir) { super(reservoir); } @Nullable @Override - protected PointData doAggregateThenMaybeReset( + protected PointData doAggregateThenMaybeResetDoubles( + long startEpochNanos, + long epochNanos, + Attributes attributes, + List exemplars, + boolean reset) { + recordedLong.set(0); + recordedDouble.set(0); + recordedExemplars.set(exemplars); + return null; + } + + @Nullable + @Override + protected PointData doAggregateThenMaybeResetLongs( long startEpochNanos, long epochNanos, Attributes attributes, - List exemplars, + List exemplars, boolean reset) { recordedLong.set(0); recordedDouble.set(0); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java index e774939dd4c..a5086f668b5 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java @@ -53,7 +53,7 @@ @ExtendWith(MockitoExtension.class) class DoubleBase2ExponentialHistogramAggregatorTest { - @Mock ExemplarReservoir reservoir; + @Mock ExemplarReservoir reservoir; private DoubleBase2ExponentialHistogramAggregator aggregator; private static final int MAX_SCALE = 20; @@ -68,10 +68,10 @@ private static Stream provideAggregat for (MemoryMode memoryMode : MemoryMode.values()) { parameters.add( new DoubleBase2ExponentialHistogramAggregator( - ExemplarReservoir::doubleNoSamples, 160, 20, memoryMode)); + ExemplarReservoir::noSamples, 160, 20, memoryMode)); parameters.add( new DoubleBase2ExponentialHistogramAggregator( - ExemplarReservoir::doubleNoSamples, 160, MAX_SCALE, memoryMode)); + ExemplarReservoir::noSamples, 160, MAX_SCALE, memoryMode)); } return parameters.stream(); } @@ -84,7 +84,7 @@ private static int valueToIndex(int scale, double value) { private void initialize(MemoryMode memoryMode) { aggregator = new DoubleBase2ExponentialHistogramAggregator( - ExemplarReservoir::doubleNoSamples, 160, 20, memoryMode); + ExemplarReservoir::noSamples, 160, 20, memoryMode); } @ParameterizedTest @@ -92,11 +92,11 @@ private void initialize(MemoryMode memoryMode) { void createHandle(MemoryMode memoryMode) { initialize(memoryMode); - AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(); assertThat(handle).isInstanceOf(DoubleBase2ExponentialHistogramAggregator.Handle.class); ExponentialHistogramPointData point = ((DoubleBase2ExponentialHistogramAggregator.Handle) handle) - .doAggregateThenMaybeReset( + .doAggregateThenMaybeResetDoubles( 0, 1, Attributes.empty(), Collections.emptyList(), /* reset= */ true); assertThat(point.getPositiveBuckets()).isInstanceOf(EmptyExponentialHistogramBuckets.class); assertThat(point.getPositiveBuckets().getScale()).isEqualTo(MAX_SCALE); @@ -109,8 +109,7 @@ void createHandle(MemoryMode memoryMode) { void testRecordings(MemoryMode memoryMode) { initialize(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(0.5); aggregatorHandle.recordDouble(1.0); aggregatorHandle.recordDouble(12.0); @@ -155,8 +154,7 @@ void testRecordings(MemoryMode memoryMode) { void testInvalidRecording(MemoryMode memoryMode) { initialize(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); // Non-finite recordings should be ignored aggregatorHandle.recordDouble(Double.POSITIVE_INFINITY); aggregatorHandle.recordDouble(Double.NEGATIVE_INFINITY); @@ -173,8 +171,7 @@ void testInvalidRecording(MemoryMode memoryMode) { @ParameterizedTest @MethodSource("provideAggregator") void testRecordingsAtLimits(DoubleBase2ExponentialHistogramAggregator aggregator) { - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(Double.MIN_VALUE); aggregatorHandle.recordDouble(Double.MAX_VALUE); @@ -233,10 +230,9 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { TraceState.getDefault()), 1); List exemplars = Collections.singletonList(exemplar); - Mockito.when(reservoir.collectAndReset(Attributes.empty())).thenReturn(exemplars); + Mockito.when(reservoir.collectAndResetDoubles(Attributes.empty())).thenReturn(exemplars); - AggregatorHandle aggregatorHandle = - agg.createHandle(); + AggregatorHandle aggregatorHandle = agg.createHandle(); aggregatorHandle.recordDouble(0, attributes, Context.root()); assertThat( @@ -252,8 +248,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { void aggregateThenMaybeReset(MemoryMode memoryMode) { initialize(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(5.0); assertThat( @@ -270,8 +265,7 @@ void aggregateThenMaybeReset(MemoryMode memoryMode) { void testInsert1M(MemoryMode memoryMode) { initialize(memoryMode); - AggregatorHandle handle = - aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(); int n = 1024 * 1024 - 1; double min = 16.0 / n; @@ -337,9 +331,8 @@ void testToMetricData(MemoryMode memoryMode) { TraceState.getDefault()), 1); @SuppressWarnings("unchecked") - Supplier> reservoirSupplier = - Mockito.mock(Supplier.class); - Mockito.when(reservoir.collectAndReset(Attributes.empty())) + Supplier reservoirSupplier = Mockito.mock(Supplier.class); + Mockito.when(reservoir.collectAndResetDoubles(Attributes.empty())) .thenReturn(Collections.singletonList(exemplar)); Mockito.when(reservoirSupplier.get()).thenReturn(reservoir); @@ -347,7 +340,7 @@ void testToMetricData(MemoryMode memoryMode) { new DoubleBase2ExponentialHistogramAggregator( reservoirSupplier, 160, MAX_SCALE, memoryMode); - AggregatorHandle aggregatorHandle = + AggregatorHandle aggregatorHandle = cumulativeAggregator.createHandle(); aggregatorHandle.recordDouble(0); aggregatorHandle.recordDouble(0); @@ -413,8 +406,7 @@ void testToMetricData(MemoryMode memoryMode) { void testMultithreadedUpdates(MemoryMode memoryMode) throws InterruptedException { initialize(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); ImmutableList updates = ImmutableList.of(0D, 0.1D, -0.1D, 1D, -1D, 100D); int numberOfThreads = updates.size(); int numberOfUpdates = 10000; diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java index f10787c7197..69bf25f1ec6 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java @@ -45,7 +45,7 @@ @ExtendWith(MockitoExtension.class) class DoubleExplicitBucketHistogramAggregatorTest { - @Mock ExemplarReservoir reservoir; + @Mock ExemplarReservoir reservoir; private static final double[] boundaries = new double[] {10.0, 100.0, 1000.0}; private static final List boundariesList = @@ -60,7 +60,7 @@ class DoubleExplicitBucketHistogramAggregatorTest { private void init(MemoryMode memoryMode) { aggregator = new DoubleExplicitBucketHistogramAggregator( - boundaries, ExemplarReservoir::doubleNoSamples, memoryMode); + boundaries, ExemplarReservoir::noSamples, memoryMode); } @ParameterizedTest @@ -75,8 +75,7 @@ void createHandle(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void testRecordings(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(20); aggregatorHandle.recordLong(5); aggregatorHandle.recordLong(150); @@ -112,11 +111,10 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { TraceState.getDefault()), 1); List exemplars = Collections.singletonList(exemplar); - Mockito.when(reservoir.collectAndReset(Attributes.empty())).thenReturn(exemplars); + Mockito.when(reservoir.collectAndResetDoubles(Attributes.empty())).thenReturn(exemplars); DoubleExplicitBucketHistogramAggregator aggregator = new DoubleExplicitBucketHistogramAggregator(boundaries, () -> reservoir, memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(0, attributes, Context.root()); assertThat( aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)) @@ -139,8 +137,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(100); assertThat( @@ -179,8 +176,7 @@ void aggregateThenMaybeReset(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(10); MetricData metricData = @@ -250,8 +246,7 @@ void toMetricDataWithExemplars(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void testHistogramCounts(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(1.1); HistogramPointData point = aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true); @@ -263,8 +258,7 @@ void testHistogramCounts(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void testMultithreadedUpdates(MemoryMode memoryMode) throws InterruptedException { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); ImmutableList updates = ImmutableList.of(1L, 2L, 3L, 5L, 7L, 11L, 13L, 17L, 19L, 23L); int numberOfThreads = updates.size(); int numberOfUpdates = 10000; @@ -306,8 +300,7 @@ void testMultithreadedUpdates(MemoryMode memoryMode) throws InterruptedException @Test void testReusableDataMemoryMode() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(10); aggregatorHandle.recordLong(20); aggregatorHandle.recordLong(30); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java index 8234b172a7a..2a40ba6c80c 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java @@ -41,7 +41,7 @@ class DoubleLastValueAggregatorTest { private DoubleLastValueAggregator aggregator; private void init(MemoryMode memoryMode) { - aggregator = new DoubleLastValueAggregator(ExemplarReservoir::doubleNoSamples, memoryMode); + aggregator = new DoubleLastValueAggregator(ExemplarReservoir::noSamples, memoryMode); } @ParameterizedTest @@ -56,8 +56,7 @@ void createHandle(MemoryMode memoryMode) { void multipleRecords(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(12.1); assertThat( aggregatorHandle @@ -78,8 +77,7 @@ void multipleRecords(MemoryMode memoryMode) { void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(13.1); assertThat( @@ -229,8 +227,7 @@ void copyPoint(MemoryMode memoryMode) { void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(10); MetricData metricData = @@ -260,7 +257,7 @@ void toMetricData(MemoryMode memoryMode) { @Test void testReusableDataOnCollect() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(); handle.recordDouble(1); DoublePointData pointData = handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ false); @@ -281,7 +278,7 @@ void testReusableDataOnCollect() { @Test void testNullPointerExceptionWhenUnset() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(); assertThatThrownBy( () -> handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ true)) .isInstanceOf(NullPointerException.class); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java index 0e6043f7211..bf67debeb41 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java @@ -43,7 +43,7 @@ @ExtendWith(MockitoExtension.class) class DoubleSumAggregatorTest { - @Mock ExemplarReservoir reservoir; + @Mock ExemplarReservoir reservoir; private static final Resource resource = Resource.getDefault(); private static final InstrumentationScopeInfo scope = InstrumentationScopeInfo.empty(); @@ -62,7 +62,7 @@ private void init(MemoryMode memoryMode) { InstrumentType.COUNTER, InstrumentValueType.DOUBLE, Advice.empty()), - ExemplarReservoir::doubleNoSamples, + ExemplarReservoir::noSamples, memoryMode); } @@ -77,8 +77,7 @@ void createHandle(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void multipleRecords(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(12.1); aggregatorHandle.recordDouble(12.1); aggregatorHandle.recordDouble(12.1); @@ -95,8 +94,7 @@ void multipleRecords(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void multipleRecords_WithNegatives(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(12); aggregatorHandle.recordDouble(12); aggregatorHandle.recordDouble(-23); @@ -114,8 +112,7 @@ void multipleRecords_WithNegatives(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(13); aggregatorHandle.recordDouble(12); @@ -149,7 +146,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { TraceState.getDefault()), 1); List exemplars = Collections.singletonList(exemplar); - Mockito.when(reservoir.collectAndReset(Attributes.empty())).thenReturn(exemplars); + Mockito.when(reservoir.collectAndResetDoubles(Attributes.empty())).thenReturn(exemplars); DoubleSumAggregator aggregator = new DoubleSumAggregator( InstrumentDescriptor.create( @@ -161,8 +158,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { Advice.empty()), () -> reservoir, memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(0, attributes, Context.root()); assertThat( aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)) @@ -195,7 +191,7 @@ void mergeAndDiff(MemoryMode memoryMode) { instrumentType, InstrumentValueType.LONG, Advice.empty()), - ExemplarReservoir::doubleNoSamples, + ExemplarReservoir::noSamples, memoryMode); DoublePointData diffed = @@ -305,8 +301,7 @@ void copyPoint(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(10); MetricData metricData = @@ -366,8 +361,7 @@ void toMetricDataWithExemplars(MemoryMode memoryMode) { @Test void sameObjectReturnedOnReusableDataMemoryMode() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle aggregatorHandle = - aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordDouble(1.0); DoublePointData firstCollection = diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java index 2a4f6ecf11c..65637511149 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java @@ -42,7 +42,7 @@ class LongLastValueAggregatorTest { private LongLastValueAggregator aggregator; private void init(MemoryMode memoryMode) { - aggregator = new LongLastValueAggregator(ExemplarReservoir::longNoSamples, memoryMode); + aggregator = new LongLastValueAggregator(ExemplarReservoir::noSamples, memoryMode); } @ParameterizedTest @@ -56,7 +56,7 @@ void createHandle(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void multipleRecords(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(12); assertThat( aggregatorHandle @@ -76,7 +76,7 @@ void multipleRecords(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(13); assertThat( @@ -189,7 +189,7 @@ void copyPoint(MemoryMode memoryMode) { void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(10); MetricData metricData = @@ -217,7 +217,7 @@ void toMetricData(MemoryMode memoryMode) { @Test void testReusablePointOnCollect() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle handle = aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(); handle.recordLong(1); LongPointData pointData = handle.aggregateThenMaybeReset(0, 10, Attributes.empty(), /* reset= */ false); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java index 408403fd952..795150e13cf 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java @@ -43,7 +43,7 @@ @ExtendWith(MockitoExtension.class) class LongSumAggregatorTest { - @Mock ExemplarReservoir reservoir; + @Mock ExemplarReservoir reservoir; private static final Resource resource = Resource.getDefault(); private static final InstrumentationScopeInfo library = InstrumentationScopeInfo.empty(); @@ -61,7 +61,7 @@ private void init(MemoryMode memoryMode) { InstrumentType.COUNTER, InstrumentValueType.LONG, Advice.empty()), - ExemplarReservoir::longNoSamples, + ExemplarReservoir::noSamples, memoryMode); } @@ -76,7 +76,7 @@ void createHandle(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void multipleRecords(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(12); aggregatorHandle.recordLong(12); aggregatorHandle.recordLong(12); @@ -93,7 +93,7 @@ void multipleRecords(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void multipleRecords_WithNegatives(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(12); aggregatorHandle.recordLong(12); aggregatorHandle.recordLong(-23); @@ -111,7 +111,7 @@ void multipleRecords_WithNegatives(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void aggregateThenMaybeReset(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(13); aggregatorHandle.recordLong(12); @@ -145,7 +145,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { TraceState.getDefault()), 1); List exemplars = Collections.singletonList(exemplar); - Mockito.when(reservoir.collectAndReset(Attributes.empty())).thenReturn(exemplars); + Mockito.when(reservoir.collectAndResetLongs(Attributes.empty())).thenReturn(exemplars); LongSumAggregator aggregator = new LongSumAggregator( InstrumentDescriptor.create( @@ -157,7 +157,7 @@ void aggregateThenMaybeReset_WithExemplars(MemoryMode memoryMode) { Advice.empty()), () -> reservoir, memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(0, attributes, Context.root()); assertThat( aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)) @@ -189,7 +189,7 @@ void mergeAndDiff(MemoryMode memoryMode) { instrumentType, InstrumentValueType.LONG, Advice.empty()), - ExemplarReservoir::longNoSamples, + ExemplarReservoir::noSamples, memoryMode); LongPointData diffed = @@ -299,7 +299,7 @@ void copyPoint(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void toMetricData(MemoryMode memoryMode) { init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(10); MetricData metricData = @@ -360,7 +360,7 @@ void toMetricDataWithExemplars(MemoryMode memoryMode) { @Test void sameObjectReturnedOnReusableDataMemoryMode() { init(MemoryMode.REUSABLE_DATA); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); + AggregatorHandle aggregatorHandle = aggregator.createHandle(); aggregatorHandle.recordLong(1L); LongPointData firstCollection = diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/FilteredExemplarReservoirTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/FilteredExemplarReservoirTest.java index a7b0b0a756b..adcd650422f 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/FilteredExemplarReservoirTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/FilteredExemplarReservoirTest.java @@ -13,7 +13,6 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; import java.util.Collections; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,31 +21,28 @@ @ExtendWith(MockitoExtension.class) class FilteredExemplarReservoirTest { - @Mock ExemplarReservoir reservoir; + @Mock ExemplarReservoir reservoir; @Mock ExemplarFilter filter; @Test void testFilter_preventsSampling() { when(filter.shouldSampleMeasurement(anyDouble(), any(), any())).thenReturn(false); - ExemplarReservoir filtered = - new FilteredExemplarReservoir<>(filter, reservoir); + ExemplarReservoir filtered = new FilteredExemplarReservoir(filter, reservoir); filtered.offerDoubleMeasurement(1.0, Attributes.empty(), Context.root()); } @Test void testFilter_allowsSampling() { when(filter.shouldSampleMeasurement(anyDouble(), any(), any())).thenReturn(true); - ExemplarReservoir filtered = - new FilteredExemplarReservoir<>(filter, reservoir); + ExemplarReservoir filtered = new FilteredExemplarReservoir(filter, reservoir); filtered.offerDoubleMeasurement(1.0, Attributes.empty(), Context.root()); verify(reservoir).offerDoubleMeasurement(1.0, Attributes.empty(), Context.root()); } @Test void reservoir_collectsUnderlying() { - when(reservoir.collectAndReset(Attributes.empty())).thenReturn(Collections.emptyList()); - ExemplarReservoir filtered = - new FilteredExemplarReservoir<>(filter, reservoir); - assertThat(filtered.collectAndReset(Attributes.empty())).isEmpty(); + when(reservoir.collectAndResetDoubles(Attributes.empty())).thenReturn(Collections.emptyList()); + ExemplarReservoir filtered = new FilteredExemplarReservoir(filter, reservoir); + assertThat(filtered.collectAndResetDoubles(Attributes.empty())).isEmpty(); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/HistogramExemplarReservoirTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/HistogramExemplarReservoirTest.java index a6882dfe797..6e5719c0dbb 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/HistogramExemplarReservoirTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/HistogramExemplarReservoirTest.java @@ -10,7 +10,6 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; import java.util.Arrays; @@ -21,18 +20,16 @@ class HistogramExemplarReservoirTest { @Test void noMeasurement_returnsEmpty() { TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - new HistogramExemplarReservoir(clock, Collections.emptyList()); - assertThat(reservoir.collectAndReset(Attributes.empty())).isEmpty(); + ExemplarReservoir reservoir = new HistogramExemplarReservoir(clock, Collections.emptyList()); + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())).isEmpty(); } @Test void oneBucket_samplesEverything() { TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - new HistogramExemplarReservoir(clock, Collections.emptyList()); + ExemplarReservoir reservoir = new HistogramExemplarReservoir(clock, Collections.emptyList()); reservoir.offerDoubleMeasurement(1.1, Attributes.empty(), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .hasSize(1) .satisfiesExactly( exemplar -> { @@ -43,7 +40,7 @@ void oneBucket_samplesEverything() { // Measurement count is reset, we should sample a new measurement (and only one) clock.advance(Duration.ofSeconds(1)); reservoir.offerDoubleMeasurement(2, Attributes.empty(), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .hasSize(1) .satisfiesExactly( exemplar -> { @@ -55,7 +52,7 @@ void oneBucket_samplesEverything() { clock.advance(Duration.ofSeconds(1)); reservoir.offerDoubleMeasurement(3, Attributes.empty(), Context.root()); reservoir.offerDoubleMeasurement(4, Attributes.empty(), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .hasSize(1) .satisfiesExactly( exemplar -> { @@ -69,13 +66,13 @@ void oneBucket_samplesEverything() { void multipleBuckets_samplesIntoCorrectBucket() { TestClock clock = TestClock.create(); AttributeKey bucketKey = AttributeKey.longKey("bucket"); - ExemplarReservoir reservoir = + ExemplarReservoir reservoir = new HistogramExemplarReservoir(clock, Arrays.asList(0d, 10d, 20d)); reservoir.offerDoubleMeasurement(-1.1, Attributes.of(bucketKey, 0L), Context.root()); reservoir.offerDoubleMeasurement(1, Attributes.of(bucketKey, 1L), Context.root()); reservoir.offerDoubleMeasurement(11, Attributes.of(bucketKey, 2L), Context.root()); reservoir.offerDoubleMeasurement(21, Attributes.of(bucketKey, 3L), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .hasSize(4) .satisfiesExactlyInAnyOrder( e -> { @@ -99,10 +96,9 @@ void multipleBuckets_samplesIntoCorrectBucket() { @Test void longMeasurement_CastsToDouble() { TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - new HistogramExemplarReservoir(clock, Collections.emptyList()); + ExemplarReservoir reservoir = new HistogramExemplarReservoir(clock, Collections.emptyList()); reservoir.offerLongMeasurement(1L, Attributes.empty(), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .hasSize(1) .satisfiesExactly( exemplar -> { @@ -113,7 +109,7 @@ void longMeasurement_CastsToDouble() { // Measurement count is reset, we should sample a new measurement (and only one) clock.advance(Duration.ofSeconds(1)); reservoir.offerLongMeasurement(2, Attributes.empty(), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .hasSize(1) .satisfiesExactly( exemplar -> { diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongRandomFixedSizeExemplarReservoirTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongRandomFixedSizeExemplarReservoirTest.java deleted file mode 100644 index fc754576401..00000000000 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongRandomFixedSizeExemplarReservoirTest.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics.internal.exemplar; - -import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; - -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; -import io.opentelemetry.api.trace.TraceFlags; -import io.opentelemetry.api.trace.TraceState; -import io.opentelemetry.context.Context; -import io.opentelemetry.sdk.internal.RandomSupplier; -import io.opentelemetry.sdk.metrics.data.LongExemplarData; -import io.opentelemetry.sdk.testing.time.TestClock; -import java.time.Duration; -import java.util.Random; -import org.junit.jupiter.api.Test; - -class LongRandomFixedSizeExemplarReservoirTest { - private static final String TRACE_ID = "ff000000000000000000000000000041"; - private static final String SPAN_ID = "ff00000000000041"; - - @Test - public void noMeasurement_returnsEmpty() { - TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - RandomFixedSizeExemplarReservoir.createLong(clock, 1, RandomSupplier.platformDefault()); - assertThat(reservoir.collectAndReset(Attributes.empty())).isEmpty(); - } - - @Test - public void oneMeasurement_alwaysSamplesFirstMeasurement() { - TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - RandomFixedSizeExemplarReservoir.createLong(clock, 1, RandomSupplier.platformDefault()); - reservoir.offerLongMeasurement(1, Attributes.empty(), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) - .hasSize(1) - .satisfiesExactly( - exemplar -> { - assertThat(exemplar.getEpochNanos()).isEqualTo(clock.now()); - assertThat(exemplar.getValue()).isEqualTo(1); - assertThat(exemplar.getFilteredAttributes()).isEmpty(); - }); - - // Measurement count is reset, we should sample a new measurement (and only one) - clock.advance(Duration.ofSeconds(1)); - reservoir.offerLongMeasurement(2, Attributes.empty(), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) - .hasSize(1) - .satisfiesExactly( - exemplar -> { - assertThat(exemplar.getEpochNanos()).isEqualTo(clock.now()); - assertThat(exemplar.getValue()).isEqualTo(2); - assertThat(exemplar.getFilteredAttributes()).isEmpty(); - }); - } - - @Test - public void oneMeasurement_filtersAttributes() { - Attributes all = - Attributes.builder().put("one", 1).put("two", "two").put("three", true).build(); - Attributes partial = Attributes.builder().put("three", true).build(); - Attributes remaining = Attributes.builder().put("one", 1).put("two", "two").build(); - TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - RandomFixedSizeExemplarReservoir.createLong(clock, 1, RandomSupplier.platformDefault()); - reservoir.offerLongMeasurement(1, all, Context.root()); - assertThat(reservoir.collectAndReset(partial)) - .satisfiesExactly( - exemplar -> { - assertThat(exemplar.getEpochNanos()).isEqualTo(clock.now()); - assertThat(exemplar.getValue()).isEqualTo(1); - assertThat(exemplar.getFilteredAttributes()).isEqualTo(remaining); - }); - } - - @Test - public void oneMeasurement_includesTraceAndSpanIds() { - Attributes all = - Attributes.builder().put("one", 1).put("two", "two").put("three", true).build(); - Context context = - Context.root() - .with( - Span.wrap( - SpanContext.createFromRemoteParent( - TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()))); - TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - RandomFixedSizeExemplarReservoir.createLong(clock, 1, RandomSupplier.platformDefault()); - reservoir.offerLongMeasurement(1, all, context); - assertThat(reservoir.collectAndReset(Attributes.empty())) - .satisfiesExactly( - exemplar -> { - assertThat(exemplar.getEpochNanos()).isEqualTo(clock.now()); - assertThat(exemplar.getValue()).isEqualTo(1); - assertThat(exemplar.getFilteredAttributes()).isEqualTo(all); - assertThat(exemplar.getSpanContext().getTraceId()).isEqualTo(TRACE_ID); - assertThat(exemplar.getSpanContext().getSpanId()).isEqualTo(SPAN_ID); - }); - } - - @Test - public void multiMeasurements_preservesLatestSamples() { - AttributeKey key = AttributeKey.longKey("K"); - // We cannot mock random in latest jdk, so we create an override. - Random mockRandom = - new Random() { - @Override - public int nextInt(int max) { - switch (max) { - // Force one sample in bucket 1 and two in bucket 0. - case 2: - return 1; - default: - return 0; - } - } - }; - TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - ExemplarReservoir.longFixedSizeReservoir(clock, 2, () -> mockRandom); - reservoir.offerLongMeasurement(1, Attributes.of(key, 1L), Context.root()); - reservoir.offerLongMeasurement(2, Attributes.of(key, 2L), Context.root()); - reservoir.offerLongMeasurement(3, Attributes.of(key, 3L), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) - .satisfiesExactlyInAnyOrder( - exemplar -> { - assertThat(exemplar.getEpochNanos()).isEqualTo(clock.now()); - assertThat(exemplar.getValue()).isEqualTo(2); - }, - exemplar -> { - assertThat(exemplar.getEpochNanos()).isEqualTo(clock.now()); - assertThat(exemplar.getValue()).isEqualTo(3); - }); - } -} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongToDoubleExemplarReservoirTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongToDoubleExemplarReservoirTest.java index fc7a5a3c07a..ebce4ec244c 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongToDoubleExemplarReservoirTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/LongToDoubleExemplarReservoirTest.java @@ -5,6 +5,8 @@ package io.opentelemetry.sdk.metrics.internal.exemplar; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.never; @@ -19,11 +21,11 @@ @ExtendWith(MockitoExtension.class) class LongToDoubleExemplarReservoirTest { - @Mock ExemplarReservoir delegate; + @Mock ExemplarReservoir delegate; @Test void offerDoubleMeasurement() { - ExemplarReservoir filtered = new LongToDoubleExemplarReservoir<>(delegate); + ExemplarReservoir filtered = new LongToDoubleExemplarReservoir(delegate); filtered.offerDoubleMeasurement(1.0, Attributes.empty(), Context.root()); verify(delegate).offerDoubleMeasurement(1.0, Attributes.empty(), Context.root()); verify(delegate, never()).offerLongMeasurement(anyLong(), any(), any()); @@ -31,9 +33,19 @@ void offerDoubleMeasurement() { @Test void offerLongMeasurement() { - ExemplarReservoir filtered = new LongToDoubleExemplarReservoir<>(delegate); + ExemplarReservoir filtered = new LongToDoubleExemplarReservoir(delegate); filtered.offerLongMeasurement(1L, Attributes.empty(), Context.root()); verify(delegate).offerDoubleMeasurement(1.0, Attributes.empty(), Context.root()); verify(delegate, never()).offerLongMeasurement(anyLong(), any(), any()); } + + @Test + void collectAndReset() { + ExemplarReservoir filtered = new LongToDoubleExemplarReservoir(delegate); + + assertThatThrownBy(() -> filtered.collectAndResetLongs(Attributes.empty())) + .isInstanceOf(UnsupportedOperationException.class); + assertThatCode(() -> filtered.collectAndResetDoubles(Attributes.empty())) + .doesNotThrowAnyException(); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/DoubleRandomFixedSizeExemplarReservoirTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/RandomFixedSizeExemplarReservoirTest.java similarity index 79% rename from sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/DoubleRandomFixedSizeExemplarReservoirTest.java rename to sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/RandomFixedSizeExemplarReservoirTest.java index 85a53fbeb0d..8d34e9220fe 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/DoubleRandomFixedSizeExemplarReservoirTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/exemplar/RandomFixedSizeExemplarReservoirTest.java @@ -15,31 +15,30 @@ import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.internal.RandomSupplier; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; import java.util.Random; import org.junit.jupiter.api.Test; -class DoubleRandomFixedSizeExemplarReservoirTest { +class RandomFixedSizeExemplarReservoirTest { private static final String TRACE_ID = "ff000000000000000000000000000041"; private static final String SPAN_ID = "ff00000000000041"; @Test public void noMeasurement_returnsEmpty() { TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - RandomFixedSizeExemplarReservoir.createDouble(clock, 1, RandomSupplier.platformDefault()); - assertThat(reservoir.collectAndReset(Attributes.empty())).isEmpty(); + ExemplarReservoir reservoir = + RandomFixedSizeExemplarReservoir.create(clock, 1, RandomSupplier.platformDefault()); + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())).isEmpty(); } @Test public void oneMeasurement_alwaysSamplesFirstMeasurement() { TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - RandomFixedSizeExemplarReservoir.createDouble(clock, 1, RandomSupplier.platformDefault()); + ExemplarReservoir reservoir = + RandomFixedSizeExemplarReservoir.create(clock, 1, RandomSupplier.platformDefault()); reservoir.offerDoubleMeasurement(1.1, Attributes.empty(), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .hasSize(1) .satisfiesExactly( exemplar -> { @@ -51,7 +50,7 @@ public void oneMeasurement_alwaysSamplesFirstMeasurement() { // Measurement count is reset, we should sample a new measurement (and only one) clock.advance(Duration.ofSeconds(1)); reservoir.offerDoubleMeasurement(2, Attributes.empty(), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .hasSize(1) .satisfiesExactly( exemplar -> { @@ -68,10 +67,10 @@ public void oneMeasurement_filtersAttributes() { Attributes partial = Attributes.builder().put("three", true).build(); Attributes remaining = Attributes.builder().put("one", 1).put("two", "two").build(); TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - RandomFixedSizeExemplarReservoir.createDouble(clock, 1, RandomSupplier.platformDefault()); + ExemplarReservoir reservoir = + RandomFixedSizeExemplarReservoir.create(clock, 1, RandomSupplier.platformDefault()); reservoir.offerDoubleMeasurement(1.1, all, Context.root()); - assertThat(reservoir.collectAndReset(partial)) + assertThat(reservoir.collectAndResetDoubles(partial)) .satisfiesExactly( exemplar -> { assertThat(exemplar.getEpochNanos()).isEqualTo(clock.now()); @@ -91,10 +90,10 @@ public void oneMeasurement_includesTraceAndSpanIds() { SpanContext.createFromRemoteParent( TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()))); TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - RandomFixedSizeExemplarReservoir.createDouble(clock, 1, RandomSupplier.platformDefault()); + ExemplarReservoir reservoir = + RandomFixedSizeExemplarReservoir.create(clock, 1, RandomSupplier.platformDefault()); reservoir.offerDoubleMeasurement(1, all, context); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .satisfiesExactly( exemplar -> { assertThat(exemplar.getEpochNanos()).isEqualTo(clock.now()); @@ -123,12 +122,11 @@ public int nextInt(int max) { } }; TestClock clock = TestClock.create(); - ExemplarReservoir reservoir = - ExemplarReservoir.doubleFixedSizeReservoir(clock, 2, () -> mockRandom); + ExemplarReservoir reservoir = ExemplarReservoir.fixedSizeReservoir(clock, 2, () -> mockRandom); reservoir.offerDoubleMeasurement(1, Attributes.of(key, 1L), Context.root()); reservoir.offerDoubleMeasurement(2, Attributes.of(key, 2L), Context.root()); reservoir.offerDoubleMeasurement(3, Attributes.of(key, 3L), Context.root()); - assertThat(reservoir.collectAndReset(Attributes.empty())) + assertThat(reservoir.collectAndResetDoubles(Attributes.empty())) .satisfiesExactlyInAnyOrder( exemplar -> { assertThat(exemplar.getEpochNanos()).isEqualTo(clock.now()); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index d744a22c2db..860307d4777 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -67,8 +67,8 @@ class AsynchronousMetricStorageTest { @Mock private MetricReader reader; private RegisteredReader registeredReader; - private AsynchronousMetricStorage longCounterStorage; - private AsynchronousMetricStorage doubleCounterStorage; + private AsynchronousMetricStorage longCounterStorage; + private AsynchronousMetricStorage doubleCounterStorage; // Not using @BeforeEach since many methods require executing them for each MemoryMode void setup(MemoryMode memoryMode) { @@ -161,7 +161,7 @@ void recordDouble(MemoryMode memoryMode) { void record_ProcessesAttributes(MemoryMode memoryMode) { setup(memoryMode); - AsynchronousMetricStorage storage = + AsynchronousMetricStorage storage = AsynchronousMetricStorage.create( registeredReader, RegisteredView.create( diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java index b6bc13a858c..80d02c90ff9 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java @@ -67,9 +67,9 @@ class CallbackRegistrationTest { LogCapturer logs = LogCapturer.create().captureForType(CallbackRegistration.class); @Mock private MetricReader reader; - @Mock private AsynchronousMetricStorage storage1; - @Mock private AsynchronousMetricStorage storage2; - @Mock private AsynchronousMetricStorage storage3; + @Mock private AsynchronousMetricStorage storage1; + @Mock private AsynchronousMetricStorage storage2; + @Mock private AsynchronousMetricStorage storage3; private RegisteredReader registeredReader; private SdkObservableMeasurement measurement1; diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index bc1f5acffdd..e79f3ad9718 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -26,8 +26,6 @@ import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.InstrumentValueType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.ExemplarData; -import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; @@ -84,7 +82,7 @@ public class SynchronousMetricStorageTest { private RegisteredReader deltaReader; private RegisteredReader cumulativeReader; private final TestClock testClock = TestClock.create(); - private Aggregator aggregator; + private Aggregator aggregator; private final AttributesProcessor attributesProcessor = AttributesProcessor.noop(); private void initialize(MemoryMode memoryMode) { @@ -111,7 +109,7 @@ private void initialize(MemoryMode memoryMode) { @EnumSource(MemoryMode.class) void recordDouble_NaN(MemoryMode memoryMode) { initialize(memoryMode); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( cumulativeReader, METRIC_DESCRIPTOR, @@ -162,7 +160,7 @@ void attributesProcessor_applied(MemoryMode memoryMode) { void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) { initialize(memoryMode); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( cumulativeReader, METRIC_DESCRIPTOR, @@ -211,7 +209,7 @@ void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) { void recordAndCollect_DeltaResets_ImmutableData() { initialize(IMMUTABLE_DATA); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, @@ -265,7 +263,7 @@ void recordAndCollect_DeltaResets_ImmutableData() { void recordAndCollect_DeltaResets_ReusableData() { initialize(MemoryMode.REUSABLE_DATA); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, @@ -368,7 +366,7 @@ void recordAndCollect_DeltaResets_ReusableData() { void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { initialize(memoryMode); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( cumulativeReader, METRIC_DESCRIPTOR, @@ -440,7 +438,7 @@ void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { initialize(IMMUTABLE_DATA); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, @@ -541,7 +539,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { initialize(MemoryMode.REUSABLE_DATA); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, @@ -628,7 +626,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { initialize(MemoryMode.REUSABLE_DATA); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, @@ -771,7 +769,7 @@ private static void assertOverflowDoesNotExists(MetricData metricData) { @ParameterizedTest @MethodSource("concurrentStressTestArguments") void recordAndCollect_concurrentStressTest( - DefaultSynchronousMetricStorage storage, BiConsumer collect) { + DefaultSynchronousMetricStorage storage, BiConsumer collect) { // Define record threads. Each records a value of 1.0, 2000 times List threads = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(4); @@ -826,7 +824,7 @@ private static Stream concurrentStressTestArguments() { List argumentsList = new ArrayList<>(); for (MemoryMode memoryMode : MemoryMode.values()) { - Aggregator aggregator = + Aggregator aggregator = ((AggregatorFactory) Aggregation.sum()) .createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff(), memoryMode); @@ -872,7 +870,7 @@ private static Stream concurrentStressTestArguments() { void enabledThenDisable_isEnabled(MemoryMode memoryMode) { initialize(memoryMode); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, @@ -891,7 +889,7 @@ void enabledThenDisable_isEnabled(MemoryMode memoryMode) { void enabledThenDisableThenEnable_isEnabled(MemoryMode memoryMode) { initialize(memoryMode); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, @@ -911,7 +909,7 @@ void enabledThenDisableThenEnable_isEnabled(MemoryMode memoryMode) { void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) { initialize(memoryMode); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, @@ -932,7 +930,7 @@ void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) { void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) { initialize(memoryMode); - DefaultSynchronousMetricStorage storage = + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java index 0479baafde3..aec6c4bcfac 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregationTest.java @@ -13,7 +13,6 @@ import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.InstrumentValueType; -import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; @@ -47,7 +46,7 @@ void invalidConfig_Throws() { @Test void minimumBucketsCanAccommodateMaxRange() { Aggregation aggregation = Base2ExponentialHistogramAggregation.create(2, 20); - Aggregator aggregator = + Aggregator aggregator = ((AggregatorFactory) aggregation) .createAggregator( InstrumentDescriptor.create( @@ -59,8 +58,7 @@ void minimumBucketsCanAccommodateMaxRange() { Advice.empty()), ExemplarFilter.alwaysOff(), MemoryMode.IMMUTABLE_DATA); - AggregatorHandle handle = - aggregator.createHandle(); + AggregatorHandle handle = aggregator.createHandle(); // Record max range handle.recordDouble(Double.MIN_VALUE); handle.recordDouble(Double.MAX_VALUE);