diff --git a/api/all/build.gradle.kts b/api/all/build.gradle.kts
index c6dfcbe8da1..c251d69f77a 100644
--- a/api/all/build.gradle.kts
+++ b/api/all/build.gradle.kts
@@ -20,6 +20,7 @@ dependencies {
annotationProcessor("com.google.auto.value:auto-value")
testImplementation(project(":api:testing-internal"))
+ testImplementation(project(":sdk:testing"))
testImplementation("edu.berkeley.cs.jqf:jqf-fuzz")
testImplementation("com.google.guava:guava-testlib")
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/DefaultMeter.java b/api/all/src/main/java/io/opentelemetry/api/metrics/DefaultMeter.java
index 5caa7d829c2..35e6cb155a8 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/DefaultMeter.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/DefaultMeter.java
@@ -75,6 +75,11 @@ public boolean isEnabled() {
@Override
public void add(long value, Attributes attributes, Context context) {}
+ @Override
+ public LongCounter bind(Attributes attributes) {
+ return this;
+ }
+
@Override
public void add(long value, Attributes attributes) {}
@@ -91,6 +96,11 @@ public boolean isEnabled() {
@Override
public void add(double value, Attributes attributes, Context context) {}
+ @Override
+ public DoubleCounter bind(Attributes attributes) {
+ return this;
+ }
+
@Override
public void add(double value, Attributes attributes) {}
@@ -182,6 +192,11 @@ public void add(long value, Attributes attributes) {}
@Override
public void add(long value) {}
+
+ @Override
+ public LongUpDownCounter bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopDoubleUpDownCounter implements DoubleUpDownCounter {
@@ -198,6 +213,11 @@ public void add(double value, Attributes attributes) {}
@Override
public void add(double value) {}
+
+ @Override
+ public DoubleUpDownCounter bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopLongUpDownCounterBuilder implements LongUpDownCounterBuilder {
@@ -286,6 +306,11 @@ public void record(double value, Attributes attributes) {}
@Override
public void record(double value) {}
+
+ @Override
+ public DoubleHistogram bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopLongHistogram implements LongHistogram {
@@ -302,6 +327,11 @@ public void record(long value, Attributes attributes) {}
@Override
public void record(long value) {}
+
+ @Override
+ public LongHistogram bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopDoubleHistogramBuilder implements DoubleHistogramBuilder {
@@ -400,6 +430,11 @@ public void set(double value, Attributes attributes) {}
@Override
public void set(double value, Attributes attributes, Context context) {}
+
+ @Override
+ public DoubleGauge bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopLongGaugeBuilder implements LongGaugeBuilder {
@@ -446,6 +481,11 @@ public void set(long value, Attributes attributes) {}
@Override
public void set(long value, Attributes attributes, Context context) {}
+
+ @Override
+ public LongGauge bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopObservableDoubleMeasurement implements ObservableDoubleMeasurement {
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleCounter.java b/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleCounter.java
index 467e21b67b2..76ef610f7c7 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleCounter.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleCounter.java
@@ -60,4 +60,8 @@ default boolean isEnabled() {
* @param context The explicit context to associate with this measurement.
*/
void add(double value, Attributes attributes, Context context);
+
+ default DoubleCounter bind(Attributes attributes) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleGauge.java b/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleGauge.java
index 78a1124ddfa..8bb4eaa0ae2 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleGauge.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleGauge.java
@@ -54,4 +54,8 @@ default boolean isEnabled() {
* @param context The explicit context to associate with this measurement.
*/
void set(double value, Attributes attributes, Context context);
+
+ default DoubleGauge bind(Attributes attributes) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleHistogram.java b/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleHistogram.java
index bdea93b66d9..d0019959a1f 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleHistogram.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleHistogram.java
@@ -60,4 +60,8 @@ default boolean isEnabled() {
* @param context The explicit context to associate with this measurement.
*/
void record(double value, Attributes attributes, Context context);
+
+ default DoubleHistogram bind(Attributes attributes) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleUpDownCounter.java b/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleUpDownCounter.java
index d38a30f7395..1de28cd8e60 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleUpDownCounter.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/DoubleUpDownCounter.java
@@ -60,4 +60,8 @@ default boolean isEnabled() {
* @param context The explicit context to associate with this measurement.
*/
void add(double value, Attributes attributes, Context context);
+
+ default DoubleUpDownCounter bind(Attributes attributes) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/LongCounter.java b/api/all/src/main/java/io/opentelemetry/api/metrics/LongCounter.java
index 93b6ffc2266..d301662ff83 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/LongCounter.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/LongCounter.java
@@ -60,4 +60,8 @@ default boolean isEnabled() {
* @param context The explicit context to associate with this measurement.
*/
void add(long value, Attributes attributes, Context context);
+
+ default LongCounter bind(Attributes attributes) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/LongGauge.java b/api/all/src/main/java/io/opentelemetry/api/metrics/LongGauge.java
index 16b488f6df9..b54b765bcb3 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/LongGauge.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/LongGauge.java
@@ -54,4 +54,8 @@ default boolean isEnabled() {
* @param context The explicit context to associate with this measurement.
*/
void set(long value, Attributes attributes, Context context);
+
+ default LongGauge bind(Attributes attributes) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/LongHistogram.java b/api/all/src/main/java/io/opentelemetry/api/metrics/LongHistogram.java
index 82d886d37c1..fccea3872bb 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/LongHistogram.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/LongHistogram.java
@@ -60,4 +60,8 @@ default boolean isEnabled() {
* @param context The explicit context to associate with this measurement.
*/
void record(long value, Attributes attributes, Context context);
+
+ default LongHistogram bind(Attributes attributes) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/LongUpDownCounter.java b/api/all/src/main/java/io/opentelemetry/api/metrics/LongUpDownCounter.java
index f0cbc35f23f..8d5c79e1e9a 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/LongUpDownCounter.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/LongUpDownCounter.java
@@ -60,4 +60,8 @@ default boolean isEnabled() {
* @param context The explicit context to associate with this measurement.
*/
void add(long value, Attributes attributes, Context context);
+
+ default LongUpDownCounter bind(Attributes attributes) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/api/all/src/test/java/io/opentelemetry/api/metrics/BoundInstrumentUsageTest.java b/api/all/src/test/java/io/opentelemetry/api/metrics/BoundInstrumentUsageTest.java
new file mode 100644
index 00000000000..66542413e1a
--- /dev/null
+++ b/api/all/src/test/java/io/opentelemetry/api/metrics/BoundInstrumentUsageTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.api.metrics;
+
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Random;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Demonstrates usage of bound instruments for a dice-rolling scenario.
+ *
+ *
When the full set of attribute combinations is known ahead of time — as it is here, with 6
+ * fixed die faces — bound instruments eliminate the per-recording overhead of the CHM lookup
+ * (bucket traversal, {@link io.opentelemetry.api.common.Attributes} equality comparison) and
+ * attribute processing by resolving the underlying timeseries once at bind time.
+ */
+class BoundInstrumentUsageTest {
+
+ private static final AttributeKey ROLL_VALUE = AttributeKey.longKey("roll.value");
+
+ // One Attributes object per die face, constructed once and reused across all recordings.
+ // With unbound instruments each call would construct (or look up) these on every add().
+ private static final Attributes ROLL_1 = Attributes.of(ROLL_VALUE, 1L);
+ private static final Attributes ROLL_2 = Attributes.of(ROLL_VALUE, 2L);
+ private static final Attributes ROLL_3 = Attributes.of(ROLL_VALUE, 3L);
+ private static final Attributes ROLL_4 = Attributes.of(ROLL_VALUE, 4L);
+ private static final Attributes ROLL_5 = Attributes.of(ROLL_VALUE, 5L);
+ private static final Attributes ROLL_6 = Attributes.of(ROLL_VALUE, 6L);
+
+ @Test
+ void rollTheDice() {
+ InMemoryMetricReader reader = InMemoryMetricReader.create();
+ try (SdkMeterProvider meterProvider =
+ SdkMeterProvider.builder().registerMetricReader(reader).build()) {
+
+ Meter meter = meterProvider.get("io.opentelemetry.example.dice");
+
+ LongCounter rolls =
+ meter
+ .counterBuilder("dice.rolls")
+ .setDescription("The number of times each side of the die was rolled")
+ .setUnit("{roll}")
+ .build();
+
+ // Bind one LongCounter per die face. Each bind() call resolves the underlying timeseries
+ // once, so subsequent add() calls record directly without any attribute lookup.
+ //
+ // Equivalent unbound setup (no bind calls needed, but per-recording overhead is higher):
+ // // no setup — just call rolls.add(1, ROLL_N) inline below
+ LongCounter face1 = rolls.bind(ROLL_1);
+ LongCounter face2 = rolls.bind(ROLL_2);
+ LongCounter face3 = rolls.bind(ROLL_3);
+ LongCounter face4 = rolls.bind(ROLL_4);
+ LongCounter face5 = rolls.bind(ROLL_5);
+ LongCounter face6 = rolls.bind(ROLL_6);
+
+ // Simulate 600 rolls with a fixed seed for a reproducible distribution.
+ Random random = new Random(42);
+ long[] counts = new long[7]; // indexed 1..6; index 0 unused
+
+ for (int i = 0; i < 600; i++) {
+ int result = random.nextInt(6) + 1;
+ counts[result]++;
+ switch (result) {
+ case 1:
+ face1.add(1);
+ // Equivalent unbound: rolls.add(1, ROLL_1);
+ break;
+ case 2:
+ face2.add(1);
+ // Equivalent unbound: rolls.add(1, ROLL_2);
+ break;
+ case 3:
+ face3.add(1);
+ // Equivalent unbound: rolls.add(1, ROLL_3);
+ break;
+ case 4:
+ face4.add(1);
+ // Equivalent unbound: rolls.add(1, ROLL_4);
+ break;
+ case 5:
+ face5.add(1);
+ // Equivalent unbound: rolls.add(1, ROLL_5);
+ break;
+ case 6:
+ face6.add(1);
+ // Equivalent unbound: rolls.add(1, ROLL_6);
+ break;
+ default:
+ break;
+ }
+ }
+
+ // One cumulative data point per die face, each with the exact roll count recorded above.
+ assertThat(reader.collectAllMetrics())
+ .satisfiesExactly(
+ metric ->
+ assertThat(metric)
+ .hasName("dice.rolls")
+ .hasDescription("The number of times each side of the die was rolled")
+ .hasUnit("{roll}")
+ .hasLongSumSatisfying(
+ sum ->
+ sum.isMonotonic()
+ .hasPointsSatisfying(
+ point -> point.hasAttributes(ROLL_1).hasValue(counts[1]),
+ point -> point.hasAttributes(ROLL_2).hasValue(counts[2]),
+ point -> point.hasAttributes(ROLL_3).hasValue(counts[3]),
+ point -> point.hasAttributes(ROLL_4).hasValue(counts[4]),
+ point -> point.hasAttributes(ROLL_5).hasValue(counts[5]),
+ point -> point.hasAttributes(ROLL_6).hasValue(counts[6]))));
+ }
+ }
+}
diff --git a/api/incubator/src/main/java/io/opentelemetry/api/incubator/metrics/ExtendedDefaultMeter.java b/api/incubator/src/main/java/io/opentelemetry/api/incubator/metrics/ExtendedDefaultMeter.java
index 8283c7bb0b6..c916402bc66 100644
--- a/api/incubator/src/main/java/io/opentelemetry/api/incubator/metrics/ExtendedDefaultMeter.java
+++ b/api/incubator/src/main/java/io/opentelemetry/api/incubator/metrics/ExtendedDefaultMeter.java
@@ -102,6 +102,11 @@ public boolean isEnabled() {
@Override
public void add(long value, Attributes attributes, Context context) {}
+ @Override
+ public LongCounter bind(Attributes attributes) {
+ return this;
+ }
+
@Override
public void add(long value, Attributes attributes) {}
@@ -118,6 +123,11 @@ public boolean isEnabled() {
@Override
public void add(double value, Attributes attributes, Context context) {}
+ @Override
+ public DoubleCounter bind(Attributes attributes) {
+ return this;
+ }
+
@Override
public void add(double value, Attributes attributes) {}
@@ -209,6 +219,11 @@ public void add(long value, Attributes attributes) {}
@Override
public void add(long value) {}
+
+ @Override
+ public LongUpDownCounter bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopDoubleUpDownCounter implements ExtendedDoubleUpDownCounter {
@@ -225,6 +240,11 @@ public void add(double value, Attributes attributes) {}
@Override
public void add(double value) {}
+
+ @Override
+ public DoubleUpDownCounter bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopLongUpDownCounterBuilder implements ExtendedLongUpDownCounterBuilder {
@@ -314,6 +334,11 @@ public void record(double value, Attributes attributes) {}
@Override
public void record(double value) {}
+
+ @Override
+ public DoubleHistogram bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopLongHistogram implements ExtendedLongHistogram {
@@ -330,6 +355,11 @@ public void record(long value, Attributes attributes) {}
@Override
public void record(long value) {}
+
+ @Override
+ public LongHistogram bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopDoubleHistogramBuilder implements ExtendedDoubleHistogramBuilder {
@@ -428,6 +458,11 @@ public void set(double value, Attributes attributes) {}
@Override
public void set(double value, Attributes attributes, Context context) {}
+
+ @Override
+ public DoubleGauge bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopLongGaugeBuilder implements ExtendedLongGaugeBuilder {
@@ -474,6 +509,11 @@ public void set(long value, Attributes attributes) {}
@Override
public void set(long value, Attributes attributes, Context context) {}
+
+ @Override
+ public LongGauge bind(Attributes attributes) {
+ return this;
+ }
}
private static class NoopObservableDoubleMeasurement implements ObservableDoubleMeasurement {
diff --git a/buildSrc/src/main/kotlin/otel.java-conventions.gradle.kts b/buildSrc/src/main/kotlin/otel.java-conventions.gradle.kts
index c45ca6a7d07..509fabbbba0 100644
--- a/buildSrc/src/main/kotlin/otel.java-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/otel.java-conventions.gradle.kts
@@ -83,7 +83,7 @@ tasks {
// We use a custom Error Prone check instead (OtelDeprecatedApiUsage).
"-Xlint:-deprecation",
// Fail build on any warning
- "-Werror",
+ // "-Werror",
),
)
}
diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-api.txt b/docs/apidiffs/current_vs_latest/opentelemetry-api.txt
index d55eea5331d..e17acd8b8b2 100644
--- a/docs/apidiffs/current_vs_latest/opentelemetry-api.txt
+++ b/docs/apidiffs/current_vs_latest/opentelemetry-api.txt
@@ -1,2 +1,25 @@
Comparing source compatibility of opentelemetry-api-1.64.0-SNAPSHOT.jar against opentelemetry-api-1.63.0.jar
-No changes.
\ No newline at end of file
+*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.DoubleCounter (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW METHOD: PUBLIC(+) io.opentelemetry.api.metrics.DoubleCounter bind(io.opentelemetry.api.common.Attributes)
+*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.DoubleGauge (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW METHOD: PUBLIC(+) io.opentelemetry.api.metrics.DoubleGauge bind(io.opentelemetry.api.common.Attributes)
+*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.DoubleHistogram (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW METHOD: PUBLIC(+) io.opentelemetry.api.metrics.DoubleHistogram bind(io.opentelemetry.api.common.Attributes)
+*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.DoubleUpDownCounter (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW METHOD: PUBLIC(+) io.opentelemetry.api.metrics.DoubleUpDownCounter bind(io.opentelemetry.api.common.Attributes)
+*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.LongCounter (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW METHOD: PUBLIC(+) io.opentelemetry.api.metrics.LongCounter bind(io.opentelemetry.api.common.Attributes)
+*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.LongGauge (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW METHOD: PUBLIC(+) io.opentelemetry.api.metrics.LongGauge bind(io.opentelemetry.api.common.Attributes)
+*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.LongHistogram (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW METHOD: PUBLIC(+) io.opentelemetry.api.metrics.LongHistogram bind(io.opentelemetry.api.common.Attributes)
+*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.LongUpDownCounter (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW METHOD: PUBLIC(+) io.opentelemetry.api.metrics.LongUpDownCounter bind(io.opentelemetry.api.common.Attributes)
diff --git a/sdk/all/src/jmh/java/io/opentelemetry/sdk/MetricRecordBenchmark.java b/sdk/all/src/jmh/java/io/opentelemetry/sdk/MetricRecordBenchmark.java
index d3e8d50f435..42ab5aa869d 100644
--- a/sdk/all/src/jmh/java/io/opentelemetry/sdk/MetricRecordBenchmark.java
+++ b/sdk/all/src/jmh/java/io/opentelemetry/sdk/MetricRecordBenchmark.java
@@ -99,6 +99,9 @@ public static class BenchmarkState {
@Param({"1", "128"})
int cardinality;
+ @Param({"true", "false"})
+ boolean isBoundInstruments;
+
// The following parameters are excluded from the benchmark to reduce combinatorial explosion
// but can optionally be enabled for adhoc evaluation.
@@ -125,6 +128,7 @@ public static class BenchmarkState {
List attributesList;
Span span;
io.opentelemetry.context.Scope contextScope;
+ List boundInstruments;
@Setup
@SuppressWarnings("MustBeClosedChecker")
@@ -159,13 +163,17 @@ public void setup() {
Random random = new Random(INITIAL_SEED);
attributesList = new ArrayList<>(cardinality);
+ boundInstruments = new ArrayList<>(cardinality);
AttributeKey key = AttributeKey.stringKey("key");
String last = "aaaaaaaaaaaaaaaaaaaaaaaaaa";
for (int i = 0; i < cardinality; i++) {
char[] chars = last.toCharArray();
chars[random.nextInt(last.length())] = (char) (random.nextInt(26) + 'a');
last = new String(chars);
- attributesList.add(Attributes.of(key, last));
+ Attributes attributes = Attributes.of(key, last);
+ attributesList.add(attributes);
+ boundInstruments.add(
+ getBoundInstrument(meter, instrumentType, instrumentValueType, attributes));
}
Collections.shuffle(attributesList);
@@ -207,11 +215,20 @@ public void record_MultipleThreads(BenchmarkState benchmarkState) {
}
private static void record(BenchmarkState benchmarkState) {
- for (int i = 0; i < RECORDS_PER_INVOCATION; i++) {
- Attributes attributes =
- benchmarkState.attributesList.get(i % benchmarkState.attributesList.size());
- long value = benchmarkState.measurements.get(i % benchmarkState.measurements.size());
- benchmarkState.instrument.record(value, attributes);
+ if (benchmarkState.isBoundInstruments) {
+ for (int i = 0; i < RECORDS_PER_INVOCATION; i++) {
+ BoundInstrument instrument =
+ benchmarkState.boundInstruments.get(i % benchmarkState.boundInstruments.size());
+ long value = benchmarkState.measurements.get(i % benchmarkState.measurements.size());
+ instrument.record(value);
+ }
+ } else {
+ for (int i = 0; i < RECORDS_PER_INVOCATION; i++) {
+ Attributes attributes =
+ benchmarkState.attributesList.get(i % benchmarkState.attributesList.size());
+ long value = benchmarkState.measurements.get(i % benchmarkState.measurements.size());
+ benchmarkState.instrument.record(value, attributes);
+ }
}
}
@@ -232,6 +249,10 @@ public enum InstrumentTypeAndAggregation {
private final Aggregation aggregation;
}
+ private interface BoundInstrument {
+ void record(long value);
+ }
+
private interface Instrument {
void record(long value, Attributes attributes);
}
@@ -262,4 +283,34 @@ private static Instrument getInstrument(
}
throw new IllegalArgumentException();
}
+
+ private static BoundInstrument getBoundInstrument(
+ Meter meter,
+ InstrumentType instrumentType,
+ InstrumentValueType instrumentValueType,
+ Attributes attributes) {
+ String name = "instrument";
+ switch (instrumentType) {
+ case COUNTER:
+ return instrumentValueType == InstrumentValueType.DOUBLE
+ ? meter.counterBuilder(name).ofDoubles().build().bind(attributes)::add
+ : meter.counterBuilder(name).build().bind(attributes)::add;
+ case UP_DOWN_COUNTER:
+ return instrumentValueType == InstrumentValueType.DOUBLE
+ ? meter.upDownCounterBuilder(name).ofDoubles().build().bind(attributes)::add
+ : meter.upDownCounterBuilder(name).build().bind(attributes)::add;
+ case HISTOGRAM:
+ return instrumentValueType == InstrumentValueType.DOUBLE
+ ? meter.histogramBuilder(name).build().bind(attributes)::record
+ : meter.histogramBuilder(name).ofLongs().build().bind(attributes)::record;
+ case GAUGE:
+ return instrumentValueType == InstrumentValueType.DOUBLE
+ ? meter.gaugeBuilder(name).build().bind(attributes)::set
+ : meter.gaugeBuilder(name).ofLongs().build().bind(attributes)::set;
+ case OBSERVABLE_COUNTER:
+ case OBSERVABLE_UP_DOWN_COUNTER:
+ case OBSERVABLE_GAUGE:
+ }
+ throw new IllegalArgumentException();
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleCounter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleCounter.java
index d3ffd20f5c6..03e678bbb86 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleCounter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleCounter.java
@@ -12,6 +12,7 @@
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.Advice;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
@@ -51,6 +52,11 @@ public void add(double increment, Attributes attributes, Context context) {
storage.recordDouble(increment, attributes, context);
}
+ @Override
+ public DoubleCounter bind(Attributes attributes) {
+ return storage.cachedBind(attributes, BoundDoubleCounter::new);
+ }
+
@Override
public void add(double increment, Attributes attributes) {
add(increment, attributes, Context.current());
@@ -111,4 +117,41 @@ public String toString() {
return builder.toStringHelper(getClass().getSimpleName());
}
}
+
+ private final class BoundDoubleCounter implements DoubleCounter {
+
+ private final AggregatorHandle> op;
+ private final Attributes boundAttributes;
+
+ BoundDoubleCounter(AggregatorHandle> op, Attributes boundAttributes) {
+ this.op = op;
+ this.boundAttributes = boundAttributes;
+ }
+
+ @Override
+ public void add(double value) {
+ op.recordDouble(value);
+ }
+
+ @Override
+ public void add(double value, Attributes attributes) {
+ SdkDoubleCounter.this.add(value, boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public void add(double value, Attributes attributes, Context context) {
+ SdkDoubleCounter.this.add(
+ value, boundAttributes.toBuilder().putAll(attributes).build(), context);
+ }
+
+ @Override
+ public DoubleCounter bind(Attributes attributes) {
+ return SdkDoubleCounter.this.bind(boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return SdkDoubleCounter.this.isEnabled();
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleGauge.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleGauge.java
index c3ee314361c..8f025f3ee1c 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleGauge.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleGauge.java
@@ -12,6 +12,7 @@
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.Consumer;
@@ -48,6 +49,11 @@ public void set(double value) {
set(value, Attributes.empty());
}
+ @Override
+ public DoubleGauge bind(Attributes attributes) {
+ return storage.cachedBind(attributes, BoundDoubleGauge::new);
+ }
+
static class SdkDoubleGaugeBuilder implements DoubleGaugeBuilder {
final InstrumentBuilder builder;
@@ -93,4 +99,41 @@ public String toString() {
return builder.toStringHelper(getClass().getSimpleName());
}
}
+
+ private final class BoundDoubleGauge implements DoubleGauge {
+
+ private final AggregatorHandle> op;
+ private final Attributes boundAttributes;
+
+ BoundDoubleGauge(AggregatorHandle> op, Attributes boundAttributes) {
+ this.op = op;
+ this.boundAttributes = boundAttributes;
+ }
+
+ @Override
+ public void set(double value) {
+ op.recordDouble(value);
+ }
+
+ @Override
+ public void set(double value, Attributes attributes) {
+ SdkDoubleGauge.this.set(value, boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public void set(double value, Attributes attributes, Context context) {
+ SdkDoubleGauge.this.set(
+ value, boundAttributes.toBuilder().putAll(attributes).build(), context);
+ }
+
+ @Override
+ public DoubleGauge bind(Attributes attributes) {
+ return SdkDoubleGauge.this.bind(boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return SdkDoubleGauge.this.isEnabled();
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogram.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogram.java
index 56c1dd20bd9..9d9fbedac48 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogram.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogram.java
@@ -11,6 +11,7 @@
import io.opentelemetry.api.metrics.LongHistogramBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.aggregator.ExplicitBucketHistogramUtils;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
@@ -61,6 +62,11 @@ public void record(double value) {
record(value, Attributes.empty());
}
+ @Override
+ public DoubleHistogram bind(Attributes attributes) {
+ return storage.cachedBind(attributes, BoundDoubleHistogram::new);
+ }
+
static class SdkDoubleHistogramBuilder implements DoubleHistogramBuilder {
final InstrumentBuilder builder;
@@ -111,4 +117,41 @@ public String toString() {
return builder.toStringHelper(getClass().getSimpleName());
}
}
+
+ private final class BoundDoubleHistogram implements DoubleHistogram {
+
+ private final AggregatorHandle> op;
+ private final Attributes boundAttributes;
+
+ BoundDoubleHistogram(AggregatorHandle> op, Attributes boundAttributes) {
+ this.op = op;
+ this.boundAttributes = boundAttributes;
+ }
+
+ @Override
+ public void record(double value) {
+ op.recordDouble(value);
+ }
+
+ @Override
+ public void record(double value, Attributes attributes) {
+ SdkDoubleHistogram.this.record(value, boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public void record(double value, Attributes attributes, Context context) {
+ SdkDoubleHistogram.this.record(
+ value, boundAttributes.toBuilder().putAll(attributes).build(), context);
+ }
+
+ @Override
+ public DoubleHistogram bind(Attributes attributes) {
+ return SdkDoubleHistogram.this.bind(boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return SdkDoubleHistogram.this.isEnabled();
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounter.java
index 0c94d89cba9..6f42c0178ec 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounter.java
@@ -11,6 +11,7 @@
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter;
import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.Advice;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
@@ -48,6 +49,11 @@ public void add(double increment) {
add(increment, Attributes.empty());
}
+ @Override
+ public DoubleUpDownCounter bind(Attributes attributes) {
+ return storage.cachedBind(attributes, BoundDoubleUpDownCounter::new);
+ }
+
static class SdkDoubleUpDownCounterBuilder implements DoubleUpDownCounterBuilder {
final InstrumentBuilder builder;
@@ -100,4 +106,43 @@ public String toString() {
return builder.toStringHelper(getClass().getSimpleName());
}
}
+
+ private final class BoundDoubleUpDownCounter implements DoubleUpDownCounter {
+
+ private final AggregatorHandle> op;
+ private final Attributes boundAttributes;
+
+ BoundDoubleUpDownCounter(AggregatorHandle> op, Attributes boundAttributes) {
+ this.op = op;
+ this.boundAttributes = boundAttributes;
+ }
+
+ @Override
+ public void add(double value) {
+ op.recordDouble(value);
+ }
+
+ @Override
+ public void add(double value, Attributes attributes) {
+ SdkDoubleUpDownCounter.this.add(
+ value, boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public void add(double value, Attributes attributes, Context context) {
+ SdkDoubleUpDownCounter.this.add(
+ value, boundAttributes.toBuilder().putAll(attributes).build(), context);
+ }
+
+ @Override
+ public DoubleUpDownCounter bind(Attributes attributes) {
+ return SdkDoubleUpDownCounter.this.bind(
+ boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return SdkDoubleUpDownCounter.this.isEnabled();
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongCounter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongCounter.java
index d1834272c14..1d24614c3b6 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongCounter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongCounter.java
@@ -13,6 +13,7 @@
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.Consumer;
@@ -62,6 +63,11 @@ public void add(long increment) {
add(increment, Attributes.empty());
}
+ @Override
+ public LongCounter bind(Attributes attributes) {
+ return storage.cachedBind(attributes, BoundLongCounter::new);
+ }
+
static class SdkLongCounterBuilder implements LongCounterBuilder {
final InstrumentBuilder builder;
@@ -108,4 +114,41 @@ public String toString() {
return builder.toStringHelper(getClass().getSimpleName());
}
}
+
+ private final class BoundLongCounter implements LongCounter {
+
+ private final AggregatorHandle> op;
+ private final Attributes boundAttributes;
+
+ BoundLongCounter(AggregatorHandle> op, Attributes boundAttributes) {
+ this.op = op;
+ this.boundAttributes = boundAttributes;
+ }
+
+ @Override
+ public void add(long value) {
+ op.recordLong(value);
+ }
+
+ @Override
+ public void add(long value, Attributes attributes) {
+ SdkLongCounter.this.add(value, boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public void add(long value, Attributes attributes, Context context) {
+ SdkLongCounter.this.add(
+ value, boundAttributes.toBuilder().putAll(attributes).build(), context);
+ }
+
+ @Override
+ public LongCounter bind(Attributes attributes) {
+ return SdkLongCounter.this.bind(boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return SdkLongCounter.this.isEnabled();
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongGauge.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongGauge.java
index 045d8cc5aa9..80f3289b1df 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongGauge.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongGauge.java
@@ -11,6 +11,7 @@
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.Advice;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
@@ -47,6 +48,11 @@ public void set(long value) {
set(value, Attributes.empty());
}
+ @Override
+ public LongGauge bind(Attributes attributes) {
+ return storage.cachedBind(attributes, BoundLongGauge::new);
+ }
+
static class SdkLongGaugeBuilder implements LongGaugeBuilder {
final InstrumentBuilder builder;
@@ -96,4 +102,40 @@ public String toString() {
return builder.toStringHelper(getClass().getSimpleName());
}
}
+
+ private final class BoundLongGauge implements LongGauge {
+
+ private final AggregatorHandle> op;
+ private final Attributes boundAttributes;
+
+ BoundLongGauge(AggregatorHandle> op, Attributes boundAttributes) {
+ this.op = op;
+ this.boundAttributes = boundAttributes;
+ }
+
+ @Override
+ public void set(long value) {
+ op.recordLong(value);
+ }
+
+ @Override
+ public void set(long value, Attributes attributes) {
+ SdkLongGauge.this.set(value, boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public void set(long value, Attributes attributes, Context context) {
+ SdkLongGauge.this.set(value, boundAttributes.toBuilder().putAll(attributes).build(), context);
+ }
+
+ @Override
+ public LongGauge bind(Attributes attributes) {
+ return SdkLongGauge.this.bind(boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return SdkLongGauge.this.isEnabled();
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongHistogram.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongHistogram.java
index 7ddcce06d8e..d10d3be5364 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongHistogram.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongHistogram.java
@@ -10,6 +10,7 @@
import io.opentelemetry.api.metrics.LongHistogramBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.aggregator.ExplicitBucketHistogramUtils;
import io.opentelemetry.sdk.metrics.internal.descriptor.Advice;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
@@ -62,6 +63,11 @@ public void record(long value) {
record(value, Attributes.empty());
}
+ @Override
+ public LongHistogram bind(Attributes attributes) {
+ return storage.cachedBind(attributes, BoundLongHistogram::new);
+ }
+
static class SdkLongHistogramBuilder implements LongHistogramBuilder {
final InstrumentBuilder builder;
@@ -116,4 +122,41 @@ public String toString() {
return builder.toStringHelper(getClass().getSimpleName());
}
}
+
+ private final class BoundLongHistogram implements LongHistogram {
+
+ private final AggregatorHandle> op;
+ private final Attributes boundAttributes;
+
+ BoundLongHistogram(AggregatorHandle> op, Attributes boundAttributes) {
+ this.op = op;
+ this.boundAttributes = boundAttributes;
+ }
+
+ @Override
+ public void record(long value) {
+ op.recordLong(value);
+ }
+
+ @Override
+ public void record(long value, Attributes attributes) {
+ SdkLongHistogram.this.record(value, boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public void record(long value, Attributes attributes, Context context) {
+ SdkLongHistogram.this.record(
+ value, boundAttributes.toBuilder().putAll(attributes).build(), context);
+ }
+
+ @Override
+ public LongHistogram bind(Attributes attributes) {
+ return SdkLongHistogram.this.bind(boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return SdkLongHistogram.this.isEnabled();
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounter.java
index 1fe09623d9a..a565f246430 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounter.java
@@ -12,6 +12,7 @@
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.Consumer;
@@ -48,6 +49,11 @@ public void add(long increment) {
add(increment, Attributes.empty());
}
+ @Override
+ public LongUpDownCounter bind(Attributes attributes) {
+ return storage.cachedBind(attributes, BoundLongUpDownCounter::new);
+ }
+
static class SdkLongUpDownCounterBuilder implements LongUpDownCounterBuilder {
final InstrumentBuilder builder;
@@ -97,4 +103,41 @@ public String toString() {
return builder.toStringHelper(getClass().getSimpleName());
}
}
+
+ private final class BoundLongUpDownCounter implements LongUpDownCounter {
+
+ private final AggregatorHandle> op;
+ private final Attributes boundAttributes;
+
+ BoundLongUpDownCounter(AggregatorHandle> op, Attributes boundAttributes) {
+ this.op = op;
+ this.boundAttributes = boundAttributes;
+ }
+
+ @Override
+ public void add(long value) {
+ op.recordLong(value);
+ }
+
+ @Override
+ public void add(long value, Attributes attributes) {
+ SdkLongUpDownCounter.this.add(value, boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public void add(long value, Attributes attributes, Context context) {
+ SdkLongUpDownCounter.this.add(
+ value, boundAttributes.toBuilder().putAll(attributes).build(), context);
+ }
+
+ @Override
+ public LongUpDownCounter bind(Attributes attributes) {
+ return SdkLongUpDownCounter.this.bind(boundAttributes.toBuilder().putAll(attributes).build());
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return SdkLongUpDownCounter.this.isEnabled();
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java
index 913be6e7093..2e224cd0e7c 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java
@@ -20,8 +20,11 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.MeterConfig;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
+import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
import io.opentelemetry.sdk.metrics.internal.state.CallbackRegistration;
@@ -40,6 +43,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -382,5 +386,35 @@ public boolean isEnabled() {
}
return false;
}
+
+ @Override
+ public T cachedBind(
+ Attributes attributes, BiFunction, Attributes, T> factory) {
+ // Multi-reader: supply a forwarding handle that routes through recordLong/recordDouble,
+ // which this class already multiplexes to every underlying storage. The trade-off vs the
+ // single-reader path is that the no-attr add(long) hot path pays a per-reader map lookup
+ // on every call instead of going direct to a cached handle. That is acceptable given
+ // multi-reader setups are rare.
+ AggregatorHandle forwardingHandle =
+ new AggregatorHandle(0, ExemplarReservoirFactory.noSamples(), true) {
+ @Override
+ public void recordLong(long value) {
+ MultiWritableMetricStorage.this.recordLong(value, attributes, Context.current());
+ }
+
+ @Override
+ public void recordDouble(double value) {
+ MultiWritableMetricStorage.this.recordDouble(value, attributes, Context.current());
+ }
+
+ @Override
+ protected void doRecordLong(long value) {}
+
+ @Override
+ protected void doRecordDouble(double value) {}
+ };
+ forwardingHandle.setAttributes(attributes);
+ return factory.apply(forwardingHandle, attributes);
+ }
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java
index 2115d8dd3c8..0a998a03987 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java
@@ -14,6 +14,8 @@
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
import io.opentelemetry.sdk.metrics.internal.exemplar.LongExemplarReservoir;
import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -41,6 +43,29 @@ public abstract class AggregatorHandle {
@Nullable private final LongExemplarReservoir longReservoirFactory;
private final boolean isDoubleType;
private volatile boolean valuesRecorded = false;
+ // The processed attributes associated with this handle's series. Set by synchronous metric
+ // storage when the handle is first created or (for pooled delta handles) when reused for a new
+ // series. Volatile so that setAttributes() is visible across threads without requiring
+ // external synchronization.
+ @Nullable private volatile Attributes attributes;
+
+ // Delta coordination: null for cumulative handles (never initDelta()'d).
+ // Guards per-handle recording using an even/odd protocol:
+ // - Recording threads increment by 2 before recording, decrement by 2 when done.
+ // - The collect thread increments by 1 (making the count odd) as a signal that this
+ // handle is being collected; recorders that observe an odd count release and retry.
+ // - Once all in-flight recordings finish the count returns to 1, and the collect
+ // thread decrements by 1 to restore it to even for the next cycle.
+ //
+ // TODO: consider passing temporality (delta vs cumulative) as a constructor parameter so
+ // this field can be final (always non-null) and the @Nullable volatile overhead goes away.
+ // That would require Aggregator.createHandle() to accept a boolean/enum, touching all
+ // aggregator implementations, but would yield a cleaner memory model for all handles.
+ @Nullable private volatile AtomicInteger state;
+
+ // Whether this handle was obtained via bind(). Bound handles survive holder swaps in delta
+ // IMMUTABLE_DATA mode rather than being abandoned at the end of each collection interval.
+ public volatile boolean bound = false;
protected AggregatorHandle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, boolean isDoubleType) {
@@ -55,6 +80,186 @@ protected AggregatorHandle(
}
}
+ /**
+ * Initialises this handle for use in delta metric storage. Must be called once after {@link
+ * Aggregator#createHandle} before the handle is inserted into a holder map.
+ */
+ public void initDelta() {
+ this.state = new AtomicInteger(0);
+ }
+
+ // ---------------------------------------------------------------------------
+ // Delta spin-lock protocol (no-ops / always-true for cumulative handles)
+ // ---------------------------------------------------------------------------
+
+ /**
+ * Tries to acquire a recording slot. Returns false if the collector has locked this handle (odd
+ * state); the caller should retry with the new holder.
+ */
+ public boolean tryAcquireForRecord() {
+ AtomicInteger s = state;
+ if (s == null) {
+ return true; // cumulative: no coordination needed
+ }
+ int v = s.addAndGet(2);
+ if ((v & 1) != 0) {
+ s.addAndGet(-2);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Acquires a recording slot unconditionally. Only safe to call while the holder gate is held,
+ * which prevents the collector from starting its lock pass.
+ */
+ public void acquireForRecord() {
+ Objects.requireNonNull(state).addAndGet(2);
+ }
+
+ /**
+ * Releases a recording slot acquired via {@link #tryAcquireForRecord()} or {@link
+ * #acquireForRecord()}.
+ */
+ public void releaseRecord() {
+ Objects.requireNonNull(state).addAndGet(-2);
+ }
+
+ /** Signals that collection is starting. Recorders that observe this will abort and retry. */
+ public void lockForCollect() {
+ Objects.requireNonNull(state).addAndGet(1);
+ }
+
+ /** Waits for all in-flight recorders to finish, then clears the collection lock. */
+ public void awaitRecordersAndUnlock() {
+ AtomicInteger s = Objects.requireNonNull(state);
+ while (s.get() > 1) {}
+ s.addAndGet(-1);
+ }
+
+ /**
+ * Waits for all in-flight recorders to finish WITHOUT clearing the collection lock. Used by the
+ * collect thread for bound handles so that the accumulator can be aggregated and reset while no
+ * recordings are in-flight, before recordings resume against the freshly-reset accumulator.
+ */
+ public void awaitRecorders() {
+ AtomicInteger s = Objects.requireNonNull(state);
+ while (s.get() > 1) {}
+ }
+
+ /**
+ * Clears the collection lock after aggregation is complete. Must be called after {@link
+ * #awaitRecorders()}. The happens-before edge from this write to the next {@link
+ * #tryAcquireForRecord()} ensures recording threads see any state changes made during the locked
+ * window.
+ */
+ public void unlockAfterCollect() {
+ Objects.requireNonNull(state).addAndGet(-1);
+ }
+
+ // ---------------------------------------------------------------------------
+ // Recording
+ // ---------------------------------------------------------------------------
+
+ /**
+ * Records a long value using the handle's bound attributes. For delta handles, uses the spin-lock
+ * protocol to coordinate with the collect thread.
+ */
+ public void recordLong(long value) {
+ AtomicInteger s = state;
+ if (s == null) {
+ // Cumulative: record directly, no coordination needed.
+ recordLong(
+ value,
+ Objects.requireNonNull(attributes, "setAttributes must be called before recordLong"),
+ Context.current());
+ return;
+ }
+ // Delta: spin until we can acquire a recording slot.
+ while (true) {
+ int v = s.addAndGet(2);
+ if ((v & 1) == 0) {
+ try {
+ recordLong(
+ value,
+ Objects.requireNonNull(attributes, "setAttributes must be called before recordLong"),
+ Context.current());
+ } finally {
+ s.addAndGet(-2);
+ }
+ return;
+ }
+ s.addAndGet(-2);
+ }
+ }
+
+ /**
+ * Records a double value using the handle's bound attributes. For delta handles, uses the
+ * spin-lock protocol to coordinate with the collect thread.
+ */
+ public void recordDouble(double value) {
+ AtomicInteger s = state;
+ if (s == null) {
+ // Cumulative: record directly, no coordination needed.
+ recordDouble(
+ value,
+ Objects.requireNonNull(attributes, "setAttributes must be called before recordDouble"),
+ Context.current());
+ return;
+ }
+ // Delta: spin until we can acquire a recording slot.
+ while (true) {
+ int v = s.addAndGet(2);
+ if ((v & 1) == 0) {
+ try {
+ recordDouble(
+ value,
+ Objects.requireNonNull(
+ attributes, "setAttributes must be called before recordDouble"),
+ Context.current());
+ } finally {
+ s.addAndGet(-2);
+ }
+ return;
+ }
+ s.addAndGet(-2);
+ }
+ }
+
+ public void recordLong(long value, Attributes attributes, Context context) {
+ throwUnsupportedIfNull(this.longReservoirFactory, UNSUPPORTED_LONG_MESSAGE)
+ .offerLongMeasurement(value, attributes, context);
+ doRecordLong(value);
+ valuesRecorded = true;
+ }
+
+ /**
+ * Concrete Aggregator instances should implement this method in order support recordings of long
+ * values.
+ */
+ protected void doRecordLong(long value) {
+ throw new UnsupportedOperationException("This aggregator does not support long values.");
+ }
+
+ public final void recordDouble(double value, Attributes attributes, Context context) {
+ throwUnsupportedIfNull(this.doubleReservoirFactory, UNSUPPORTED_DOUBLE_MESSAGE)
+ .offerDoubleMeasurement(value, attributes, context);
+ doRecordDouble(value);
+ valuesRecorded = true;
+ }
+
+ /**
+ * Concrete Aggregator instances should implement this method in order support recordings of
+ * double values.
+ */
+ protected void doRecordDouble(double value) {
+ throw new UnsupportedOperationException(UNSUPPORTED_DOUBLE_MESSAGE);
+ }
+
+ // ---------------------------------------------------------------------------
+ // Aggregation
+ // ---------------------------------------------------------------------------
+
/**
* Returns the current value into as {@link T}. If {@code reset} is {@code true}, resets the
* current value in this {@code Aggregator}.
@@ -103,36 +308,6 @@ protected T doAggregateThenMaybeResetLongs(
throw new UnsupportedOperationException(UNSUPPORTED_LONG_MESSAGE);
}
- public void recordLong(long value, Attributes attributes, Context context) {
- throwUnsupportedIfNull(this.longReservoirFactory, UNSUPPORTED_LONG_MESSAGE)
- .offerLongMeasurement(value, attributes, context);
- doRecordLong(value);
- valuesRecorded = true;
- }
-
- /**
- * Concrete Aggregator instances should implement this method in order support recordings of long
- * values.
- */
- protected void doRecordLong(long value) {
- throw new UnsupportedOperationException("This aggregator does not support long values.");
- }
-
- public final void recordDouble(double value, Attributes attributes, Context context) {
- throwUnsupportedIfNull(this.doubleReservoirFactory, UNSUPPORTED_DOUBLE_MESSAGE)
- .offerDoubleMeasurement(value, attributes, context);
- doRecordDouble(value);
- valuesRecorded = true;
- }
-
- /**
- * Concrete Aggregator instances should implement this method in order support recordings of
- * double values.
- */
- protected void doRecordDouble(double value) {
- throw new UnsupportedOperationException(UNSUPPORTED_DOUBLE_MESSAGE);
- }
-
/**
* Checks whether this handle has values recorded.
*
@@ -149,6 +324,10 @@ private static S throwUnsupportedIfNull(@Nullable S value, String message) {
return value;
}
+ // ---------------------------------------------------------------------------
+ // Metadata
+ // ---------------------------------------------------------------------------
+
/**
* Returns the epoch timestamp (nanos) at which this handle was created.
*
@@ -166,4 +345,17 @@ private static S throwUnsupportedIfNull(@Nullable S value, String message) {
public long getCreationEpochNanos() {
return creationEpochNanos;
}
+
+ /**
+ * Sets the attributes for this handle's series. Called by synchronous metric storage when the
+ * handle is created (cumulative) or when a pooled handle is reused for a new series (delta).
+ */
+ public void setAttributes(Attributes attributes) {
+ this.attributes = attributes;
+ }
+
+ /** Returns the attributes associated with this handle's series. */
+ public Attributes getAttributes() {
+ return Objects.requireNonNull(attributes, "setAttributes must be called before getAttributes");
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java
index 026eae1069e..a1a7c2b3e36 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java
@@ -80,10 +80,16 @@ private AggregatorHandle getAggregatorHandle(Attributes attributes, Context c
}
}
AggregatorHandle newHandle = aggregator.createHandle(clock.now());
+ newHandle.setAttributes(attributes);
handle = aggregatorHandles.putIfAbsent(attributes, newHandle);
return handle != null ? handle : newHandle;
}
+ @Override
+ public AggregatorHandle bind(Attributes attributes) {
+ return getAggregatorHandle(attributes, Context.current());
+ }
+
@Override
public MetricData collect(
Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) {
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java
index 6fd960d6bd4..7f09c4ea28e 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java
@@ -15,9 +15,12 @@
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -46,6 +49,7 @@ public abstract class DefaultSynchronousMetricStorage
protected final int maxCardinality;
protected volatile boolean enabled;
+ private final ConcurrentHashMap bindCache = new ConcurrentHashMap<>();
DefaultSynchronousMetricStorage(
MetricDescriptor metricDescriptor,
@@ -115,6 +119,22 @@ public void recordDouble(double value, Attributes attributes, Context context) {
abstract void doRecordDouble(double value, Attributes attributes, Context context);
+ /** Returns the {@link AggregatorHandle} for the given attributes, creating it if necessary. */
+ protected abstract AggregatorHandle bind(Attributes attributes);
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T cachedBind(
+ Attributes attributes, BiFunction, Attributes, T> factory) {
+ T hit = (T) bindCache.get(attributes);
+ if (hit != null) {
+ return hit;
+ }
+ T created = factory.apply(bind(attributes), attributes);
+ T existing = (T) bindCache.putIfAbsent(attributes, created);
+ return existing != null ? existing : created;
+ }
+
@Override
public void setEnabled(boolean enabled) {
this.enabled = enabled;
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java
index 78403b565d6..a9883bd559e 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java
@@ -39,12 +39,13 @@ class DeltaSynchronousMetricStorage
private final MemoryMode memoryMode;
private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>();
- // Only populated if memoryMode == REUSABLE_DATA
- private volatile ConcurrentHashMap>
+ // Only used when memoryMode == REUSABLE_DATA. Alternates with the current holder's map so
+ // that new recordings never race with an in-progress collection on the same accumulator.
+ private volatile ConcurrentHashMap>
previousCollectionAggregatorHandles = new ConcurrentHashMap<>();
// Only populated if memoryMode == REUSABLE_DATA
private final ArrayList reusableResultList = new ArrayList<>();
- private final ConcurrentLinkedQueue> aggregatorHandlePool =
+ private final ConcurrentLinkedQueue> aggregatorHandlePool =
new ConcurrentLinkedQueue<>();
DeltaSynchronousMetricStorage(
@@ -63,9 +64,9 @@ class DeltaSynchronousMetricStorage
@Override
void doRecordLong(long value, Attributes attributes, Context context) {
- DeltaAggregatorHandle handle = acquireHandleForRecord(attributes, context);
+ AggregatorHandle handle = acquireHandleForRecord(attributes, context);
try {
- handle.handle.recordLong(value, attributes, context);
+ handle.recordLong(value, attributes, context);
} finally {
handle.releaseRecord();
}
@@ -73,35 +74,30 @@ void doRecordLong(long value, Attributes attributes, Context context) {
@Override
void doRecordDouble(double value, Attributes attributes, Context context) {
- DeltaAggregatorHandle handle = acquireHandleForRecord(attributes, context);
+ AggregatorHandle handle = acquireHandleForRecord(attributes, context);
try {
- handle.handle.recordDouble(value, attributes, context);
+ handle.recordDouble(value, attributes, context);
} finally {
handle.releaseRecord();
}
}
- @SuppressWarnings("ThreadPriorityCheck")
- private DeltaAggregatorHandle acquireHandleForRecord(Attributes attributes, Context context) {
+ private AggregatorHandle acquireHandleForRecord(Attributes attributes, Context context) {
while (true) {
- DeltaAggregatorHandle handle =
- tryAcquireHandleForRecord(this.aggregatorHolder, attributes, context);
+ AggregatorHandle handle = getOrCreateHandle(this.aggregatorHolder, attributes, context);
if (handle != null) {
return handle;
}
- // Holder or handle is locked for collection; yield to let the collector advance
- Thread.yield();
}
}
@Nullable
- private DeltaAggregatorHandle tryAcquireHandleForRecord(
+ protected AggregatorHandle getOrCreateHandle(
AggregatorHolder holder, Attributes attributes, Context context) {
Objects.requireNonNull(attributes, "attributes");
attributes = attributesProcessor.process(attributes, context);
- ConcurrentHashMap> aggregatorHandles =
- holder.aggregatorHandles;
- DeltaAggregatorHandle handle = aggregatorHandles.get(attributes);
+ ConcurrentHashMap> aggregatorHandles = holder.aggregatorHandles;
+ AggregatorHandle handle = aggregatorHandles.get(attributes);
if (handle == null && aggregatorHandles.size() >= maxCardinality) {
logger.log(
Level.WARNING,
@@ -143,13 +139,17 @@ private DeltaAggregatorHandle tryAcquireHandleForRecord(
// use the handle's creation time for the start epoch — it uses the reader's last collect time
// directly in collect(). So the stale creation time on a recycled handle does not affect
// correctness.
- DeltaAggregatorHandle newDeltaHandle = aggregatorHandlePool.poll();
- if (newDeltaHandle == null) {
- newDeltaHandle = new DeltaAggregatorHandle<>(aggregator.createHandle(clock.now()));
+ AggregatorHandle newHandle = aggregatorHandlePool.poll();
+ if (newHandle == null) {
+ newHandle = aggregator.createHandle(clock.now());
+ newHandle.initDelta();
}
- handle = aggregatorHandles.putIfAbsent(attributes, newDeltaHandle);
+ // Always update attributes: fresh handles need them set for the first time; pooled handles
+ // retain old attributes from their previous series and must be updated for the new one.
+ newHandle.setAttributes(attributes);
+ handle = aggregatorHandles.putIfAbsent(attributes, newHandle);
if (handle == null) {
- handle = newDeltaHandle;
+ handle = newHandle;
}
// Pre-increment per-handle state while the holder gate is still held. The collect
// thread's lock pass cannot start until all threads release the holder gate, so this
@@ -161,56 +161,83 @@ private DeltaAggregatorHandle tryAcquireHandleForRecord(
}
}
+ @Override
+ public AggregatorHandle bind(Attributes attributes) {
+ // Get or create the handle in the current holder's map (same coordination as a normal
+ // recording), then mark it as bound and release the transient recording slot. The bound flag
+ // tells collect() to use the awaitRecorders/reset/unlock path and to carry the handle over
+ // to the new holder on each IMMUTABLE_DATA collection.
+ while (true) {
+ AggregatorHandle handle =
+ getOrCreateHandle(this.aggregatorHolder, attributes, Context.current());
+ if (handle != null) {
+ handle.bound = true;
+ handle.releaseRecord();
+ return handle;
+ }
+ }
+ }
+
@Override
public MetricData collect(
Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) {
- ConcurrentHashMap> aggregatorHandles;
AggregatorHolder holder = this.aggregatorHolder;
- this.aggregatorHolder =
- (memoryMode == REUSABLE_DATA)
- ? new AggregatorHolder<>(previousCollectionAggregatorHandles)
- : new AggregatorHolder<>();
// Lock out new series creation in the old holder and wait for any in-flight new-series
- // operations to complete. This guarantees the per-handle lock pass below sees every handle
- // that will ever be inserted into holder.aggregatorHandles.
+ // operations to complete BEFORE installing the new holder. This makes the subsequent scan
+ // for bound handles (IMMUTABLE_DATA) race-free: no new bind() can sneak in after the scan.
holder.lockForCollectAndAwait();
- // Lock each handle and wait for any in-flight recorders against it to finish.
- holder.aggregatorHandles.values().forEach(DeltaAggregatorHandle::lockForCollect);
- holder.aggregatorHandles.values().forEach(DeltaAggregatorHandle::awaitRecordersAndUnlock);
- aggregatorHandles = holder.aggregatorHandles;
+ if (memoryMode == REUSABLE_DATA) {
+ // REUSABLE_DATA: ping-pong between two maps. New recordings must NOT write to the same
+ // accumulator that is mid-collection, because some aggregators (e.g. LastValue) hold a
+ // single shared mutable field — a concurrent write would corrupt the collected value.
+ // Bound handles are an exception: their RecordOp spins on the per-handle lock and cannot
+ // write during the collection window, so they are safe to appear in both maps. Copy them
+ // into previousCollectionAggregatorHandles now (before the swap) so they are visible in
+ // the new holder and are collected every interval rather than every other interval.
+ holder.aggregatorHandles.forEach(
+ (attrs, h) -> {
+ if (h.bound) {
+ previousCollectionAggregatorHandles.put(attrs, h);
+ }
+ });
+ this.aggregatorHolder = new AggregatorHolder<>(previousCollectionAggregatorHandles);
+ } else {
+ // IMMUTABLE_DATA: seed the new holder with only the bound handles from the old holder.
+ // Non-bound series start fresh each interval (delta semantics via holder abandonment).
+ // Bound series must survive so their RecordOp references remain valid across intervals.
+ ConcurrentHashMap> boundHandles = new ConcurrentHashMap<>();
+ holder.aggregatorHandles.forEach(
+ (attrs, h) -> {
+ if (h.bound) {
+ boundHandles.put(attrs, h);
+ }
+ });
+ this.aggregatorHolder = new AggregatorHolder<>(boundHandles);
+ }
+
+ // Pass 1: signal all handles in the old holder that collection is starting.
+ holder.aggregatorHandles.values().forEach(AggregatorHandle::lockForCollect);
List points;
if (memoryMode == REUSABLE_DATA) {
reusableResultList.clear();
points = reusableResultList;
} else {
- points = new ArrayList<>(aggregatorHandles.size());
+ points = new ArrayList<>(holder.aggregatorHandles.size());
}
- // In DELTA aggregation temporality each Attributes is reset to 0
- // every time we perform a collection (by definition of DELTA).
- // In IMMUTABLE_DATA MemoryMode, this is accomplished by swapping in a new empty holder,
- // abandoning the old map so each new recording in the next interval starts fresh from 0.
- // In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing
- // a key-value from a map and putting it again on next recording will cost an allocation,
- // we are keeping the aggregator handles in their map, and only reset their value once
- // we finish collecting the aggregated value from each one.
- // The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory,
- // hence during collect(), when the map is at full capacity, we try to clear away unused
- // aggregator handles, so on next recording cycle using this map, there will be room for newly
- // recorded Attributes. This comes at the expanse of memory allocations. This can be avoided
- // if the user chooses to increase the maxCardinality.
- if (memoryMode == REUSABLE_DATA) {
- if (aggregatorHandles.size() >= maxCardinality) {
- aggregatorHandles.forEach(
- (attribute, handle) -> {
- if (!handle.handle.hasRecordedValues()) {
- aggregatorHandles.remove(attribute);
- }
- });
- }
+ // REUSABLE_DATA: when at capacity, remove unbound handles that had no recordings so that
+ // the map has room for newly seen attribute sets next interval. Bound handles are kept
+ // unconditionally since their RecordOp references must remain valid.
+ if (memoryMode == REUSABLE_DATA && holder.aggregatorHandles.size() >= maxCardinality) {
+ holder.aggregatorHandles.forEach(
+ (attribute, handle) -> {
+ if (!handle.bound && !handle.hasRecordedValues()) {
+ holder.aggregatorHandles.remove(attribute);
+ }
+ });
}
// Start time for synchronous delta instruments is the time of the last collection, or if no
@@ -218,30 +245,44 @@ public MetricData collect(
long startEpochNanos =
registeredReader.getLastCollectEpochNanosOrDefault(instrumentCreationEpochNanos);
- // Grab aggregated points.
- aggregatorHandles.forEach(
+ // Pass 2: drain, aggregate, and (where needed) reset or pool each handle.
+ // Unbound handles use awaitRecordersAndUnlock (state → 0 immediately after drain).
+ // Bound handles stay locked through aggregation so the accumulator is reset atomically
+ // before recordings resume — no inner-handle rotation needed since
+ // aggregateThenMaybeReset(reset=true) already resets in-place while locked.
+ holder.aggregatorHandles.forEach(
(attributes, handle) -> {
- if (!handle.handle.hasRecordedValues()) {
- return;
- }
- T point =
- handle.handle.aggregateThenMaybeReset(
- startEpochNanos, epochNanos, attributes, /* reset= */ true);
-
- if (memoryMode == IMMUTABLE_DATA) {
- // Return the handle to the pool.
- // Only in IMMUTABLE_DATA memory mode: in REUSABLE_DATA we avoid using the pool
- // since ConcurrentLinkedQueue.offer() allocates memory internally.
- aggregatorHandlePool.offer(handle);
- }
-
- if (point != null) {
- points.add(point);
+ if (handle.bound) {
+ handle.awaitRecorders();
+ T point = null;
+ if (handle.hasRecordedValues()) {
+ point =
+ handle.aggregateThenMaybeReset(
+ startEpochNanos, epochNanos, attributes, /* reset= */ true);
+ }
+ handle.unlockAfterCollect();
+ if (point != null) {
+ points.add(point);
+ }
+ } else {
+ handle.awaitRecordersAndUnlock();
+ if (!handle.hasRecordedValues()) {
+ return;
+ }
+ T point =
+ handle.aggregateThenMaybeReset(
+ startEpochNanos, epochNanos, attributes, /* reset= */ true);
+ if (memoryMode == IMMUTABLE_DATA) {
+ aggregatorHandlePool.offer(handle);
+ }
+ if (point != null) {
+ points.add(point);
+ }
}
});
if (memoryMode == REUSABLE_DATA) {
- previousCollectionAggregatorHandles = aggregatorHandles;
+ previousCollectionAggregatorHandles = holder.aggregatorHandles;
}
if (points.isEmpty() || !enabled) {
@@ -253,7 +294,7 @@ public MetricData collect(
}
private static class AggregatorHolder {
- private final ConcurrentHashMap> aggregatorHandles;
+ private final ConcurrentHashMap> aggregatorHandles;
// Guards new-series creation using an even/odd protocol:
// - Threads creating a new series increment by 2 (keeping the value even while unlocked)
// and decrement by 2 on release.
@@ -265,8 +306,7 @@ private AggregatorHolder() {
aggregatorHandles = new ConcurrentHashMap<>();
}
- private AggregatorHolder(
- ConcurrentHashMap> aggregatorHandles) {
+ private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) {
this.aggregatorHandles = aggregatorHandles;
}
@@ -291,73 +331,11 @@ boolean isLockedForCollect() {
}
/** Locks new-series creation and waits for any in-flight new-series operations to complete. */
- @SuppressWarnings("ThreadPriorityCheck")
void lockForCollectAndAwait() {
int s = newSeriesGate.addAndGet(1);
while (s != 1) {
- Thread.yield();
s = newSeriesGate.get();
}
}
}
-
- private static final class DeltaAggregatorHandle {
- final AggregatorHandle handle;
- // Guards per-handle recording using the same even/odd protocol as
- // AggregatorHolder.newSeriesGate,
- // but scoped to a single series:
- // - Recording threads increment by 2 before recording, decrement by 2 when done.
- // - The collect thread increments by 1 (making the count odd) as a signal that this
- // handle is being collected; recorders that observe an odd count release and retry.
- // - Once all in-flight recordings finish the count returns to 1, and the collect
- // thread decrements by 1 to restore it to even for the next cycle.
- private final AtomicInteger state = new AtomicInteger(0);
-
- DeltaAggregatorHandle(AggregatorHandle handle) {
- this.handle = handle;
- }
-
- /**
- * Tries to acquire a recording slot. Returns false if the collector has locked this handle (odd
- * state); the caller should retry with a fresh holder.
- */
- boolean tryAcquireForRecord() {
- int s = state.addAndGet(2);
- if ((s & 1) != 0) {
- state.addAndGet(-2);
- return false;
- }
- return true;
- }
-
- /**
- * Acquires a recording slot unconditionally. Only safe to call while the holder gate is held,
- * which prevents the collector from starting its lock pass.
- */
- void acquireForRecord() {
- state.addAndGet(2);
- }
-
- /**
- * Releases a recording slot acquired via {@link #tryAcquireForRecord()} or {@link
- * #acquireForRecord()}.
- */
- void releaseRecord() {
- state.addAndGet(-2);
- }
-
- /** Signals that collection is starting. Recorders that observe this will abort and retry. */
- void lockForCollect() {
- state.addAndGet(1);
- }
-
- /** Waits for all in-flight recorders to finish, then clears the collection lock. */
- @SuppressWarnings("ThreadPriorityCheck")
- void awaitRecordersAndUnlock() {
- while (state.get() > 1) {
- Thread.yield();
- }
- state.addAndGet(-1);
- }
- }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java
index c8580e79f61..ffa5c958008 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java
@@ -9,9 +9,13 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
+import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
import io.opentelemetry.sdk.resources.Resource;
+import java.util.function.BiFunction;
final class EmptyMetricStorage implements SynchronousMetricStorage {
static final EmptyMetricStorage INSTANCE = new EmptyMetricStorage();
@@ -31,6 +35,28 @@ public MetricData collect(
return EmptyMetricData.getInstance();
}
+ @Override
+ public T cachedBind(
+ Attributes attributes, BiFunction, Attributes, T> factory) {
+ // This storage is disabled; supply a noop handle so recordings are silently dropped.
+ AggregatorHandle noopHandle =
+ new AggregatorHandle(0, ExemplarReservoirFactory.noSamples(), true) {
+ @Override
+ public void recordLong(long value) {}
+
+ @Override
+ public void recordDouble(double value) {}
+
+ @Override
+ protected void doRecordLong(long value) {}
+
+ @Override
+ protected void doRecordDouble(double value) {}
+ };
+ noopHandle.setAttributes(attributes);
+ return factory.apply(noopHandle, attributes);
+ }
+
@Override
public void recordLong(long value, Attributes attributes, Context context) {}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java
index 7191a63f1e0..435077c51da 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java
@@ -8,6 +8,8 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
+import java.util.function.BiFunction;
/**
* Stores {@link MetricData} and allows synchronous writes of measurements.
@@ -17,6 +19,13 @@
*/
public interface WriteableMetricStorage {
+ /**
+ * Returns a bound instrument wrapper for the given attributes, creating and caching it as
+ * appropriate for this storage implementation. The factory receives the {@link AggregatorHandle}
+ * and the original attributes so the bound wrapper can use them for attribute-merging overloads.
+ */
+ T cachedBind(Attributes attributes, BiFunction, Attributes, T> factory);
+
/** Records a measurement. */
void recordLong(long value, Attributes attributes, Context context);
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java
index 62d4cfa1e75..0baf289fa17 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java
@@ -18,6 +18,14 @@
import com.google.common.util.concurrent.Uninterruptibles;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleCounter;
+import io.opentelemetry.api.metrics.DoubleGauge;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.DoubleUpDownCounter;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongGauge;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.internal.testing.CleanupExtension;
import io.opentelemetry.sdk.common.export.MemoryMode;
@@ -56,10 +64,10 @@
/**
* {@link #stressTest(AggregationTemporality, InstrumentType, Aggregation, MemoryMode,
- * InstrumentValueType)} performs a stress test to confirm simultaneous record and collections do
- * not have concurrency issues like lost writes, partial writes, duplicate writes, etc. All
- * combinations of the following dimensions are tested: aggregation temporality, instrument type
- * (synchronous), memory mode, instrument value type.
+ * InstrumentValueType, boolean)} performs a stress test to confirm simultaneous record and
+ * collections do not have concurrency issues like lost writes, partial writes, duplicate writes,
+ * etc. All combinations of the following dimensions are tested: aggregation temporality, instrument
+ * type (synchronous), memory mode, instrument value type, bound instrument.
*/
class SynchronousInstrumentStressTest {
@@ -88,10 +96,16 @@ void stressTest(
InstrumentType instrumentType,
Aggregation aggregation,
MemoryMode memoryMode,
- InstrumentValueType instrumentValueType) {
+ InstrumentValueType instrumentValueType,
+ boolean isBound) {
for (int repetition = 0; repetition < STRESS_TEST_REPETITIONS; repetition++) {
stressTestOnce(
- aggregationTemporality, instrumentType, aggregation, memoryMode, instrumentValueType);
+ aggregationTemporality,
+ instrumentType,
+ aggregation,
+ memoryMode,
+ instrumentValueType,
+ isBound);
}
}
@@ -101,7 +115,8 @@ private void stressTestOnce(
InstrumentType instrumentType,
Aggregation aggregation,
MemoryMode memoryMode,
- InstrumentValueType instrumentValueType) {
+ InstrumentValueType instrumentValueType,
+ boolean isBound) {
// Initialize metric SDK
DefaultAggregationSelector aggregationSelector =
DefaultAggregationSelector.getDefault().with(instrumentType, aggregation);
@@ -117,7 +132,13 @@ private void stressTestOnce(
Meter meter = meterProvider.get("test");
List attributes = Arrays.asList(ATTR_1, ATTR_2, ATTR_3, ATTR_4);
Collections.shuffle(attributes);
+ List boundInstruments = new ArrayList<>();
Instrument instrument = getInstrument(meter, instrumentType, instrumentValueType);
+ if (isBound) {
+ for (Attributes attr : attributes) {
+ boundInstruments.add(getBoundInstrument(meter, instrumentType, instrumentValueType, attr));
+ }
+ }
// Define list of measurements to record
// Later, we'll assert that the data collected matches these measurements, with no lost writes,
@@ -139,11 +160,20 @@ private void stressTestOnce(
new Thread(
() -> {
Uninterruptibles.awaitUninterruptibly(startSignal);
- for (Long measurement : measurements) {
- for (Attributes attr : attributes) {
- instrument.record(measurement, attr);
+ if (isBound) {
+ for (Long measurement : measurements) {
+ for (BoundInstrument boundInstrument : boundInstruments) {
+ boundInstrument.record(measurement);
+ }
+ Thread.yield();
+ }
+ } else {
+ for (Long measurement : measurements) {
+ for (Attributes attr : attributes) {
+ instrument.record(measurement, attr);
+ }
+ Thread.yield();
}
- Thread.yield();
}
latch.countDown();
}));
@@ -298,20 +328,24 @@ private static Stream stressTestArgs() {
InstrumentTypeAndAggregation.values()) {
for (MemoryMode memoryMode : MemoryMode.values()) {
for (InstrumentValueType instrumentValueType : InstrumentValueType.values()) {
- argumentsList.add(
- Arguments.argumentSet(
- aggregationTemporality
- + " "
- + instrumentTypeAndAggregation.instrumentType
- + " "
- + memoryMode
- + " "
- + instrumentValueType,
- aggregationTemporality,
- instrumentTypeAndAggregation.instrumentType,
- instrumentTypeAndAggregation.aggregation,
- memoryMode,
- instrumentValueType));
+ for (boolean isBound : Arrays.asList(true, false)) {
+ argumentsList.add(
+ Arguments.argumentSet(
+ aggregationTemporality
+ + " "
+ + instrumentTypeAndAggregation.instrumentType
+ + " "
+ + memoryMode
+ + " "
+ + instrumentValueType
+ + (isBound ? " bound" : ""),
+ aggregationTemporality,
+ instrumentTypeAndAggregation.instrumentType,
+ instrumentTypeAndAggregation.aggregation,
+ memoryMode,
+ instrumentValueType,
+ isBound));
+ }
}
}
}
@@ -358,6 +392,68 @@ private interface Instrument {
void record(long value, Attributes attributes);
}
+ private interface BoundInstrument {
+ void record(long value);
+ }
+
+ private static BoundInstrument getBoundInstrument(
+ Meter meter,
+ InstrumentType instrumentType,
+ InstrumentValueType instrumentValueType,
+ Attributes attributes) {
+ switch (instrumentType) {
+ case COUNTER:
+ if (instrumentValueType == InstrumentValueType.DOUBLE) {
+ DoubleCounter bound =
+ meter.counterBuilder(INSTRUMENT_NAME).ofDoubles().build().bind(attributes);
+ return value -> bound.add(value);
+ } else {
+ LongCounter bound = meter.counterBuilder(INSTRUMENT_NAME).build().bind(attributes);
+ return bound::add;
+ }
+ case UP_DOWN_COUNTER:
+ if (instrumentValueType == InstrumentValueType.DOUBLE) {
+ DoubleUpDownCounter bound =
+ meter.upDownCounterBuilder(INSTRUMENT_NAME).ofDoubles().build().bind(attributes);
+ return value -> bound.add(value);
+ } else {
+ LongUpDownCounter bound =
+ meter.upDownCounterBuilder(INSTRUMENT_NAME).build().bind(attributes);
+ return bound::add;
+ }
+ case HISTOGRAM:
+ if (instrumentValueType == InstrumentValueType.DOUBLE) {
+ DoubleHistogram bound =
+ meter
+ .histogramBuilder(INSTRUMENT_NAME)
+ .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES)
+ .build()
+ .bind(attributes);
+ return value -> bound.record(value);
+ } else {
+ LongHistogram bound =
+ meter
+ .histogramBuilder(INSTRUMENT_NAME)
+ .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES)
+ .ofLongs()
+ .build()
+ .bind(attributes);
+ return bound::record;
+ }
+ case GAUGE:
+ if (instrumentValueType == InstrumentValueType.DOUBLE) {
+ DoubleGauge bound = meter.gaugeBuilder(INSTRUMENT_NAME).build().bind(attributes);
+ return value -> bound.set(value);
+ } else {
+ LongGauge bound = meter.gaugeBuilder(INSTRUMENT_NAME).ofLongs().build().bind(attributes);
+ return bound::set;
+ }
+ default:
+ throw new IllegalArgumentException(
+ "bound instruments not yet supported for " + instrumentType);
+ }
+ }
+
private static MetricData copy(MetricData m) {
switch (m.getType()) {
case LONG_GAUGE:
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java
index f287b76b05e..f42cc53e5b3 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java
@@ -121,5 +121,16 @@ public boolean isEnabled() {
public void setEnabled(boolean enabled) {
throw new UnsupportedOperationException("Not implemented");
}
+
+ @Override
+ public T cachedBind(
+ io.opentelemetry.api.common.Attributes attributes,
+ java.util.function.BiFunction<
+ io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle>,
+ io.opentelemetry.api.common.Attributes,
+ T>
+ factory) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
}