From 9f62137b647576395ac8e96d0d82f3e798eed34f Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Fri, 5 Jun 2026 13:39:20 +0300 Subject: [PATCH 1/3] feat: pool metrics Add Meter/PoolMetrics instrumentation for table and query session pools. Test OpenTelemetry plugin in core; cover pool metrics with unit tests. Align mockito dependency setup with topic module. --- core/pom.xml | 13 + .../main/java/tech/ydb/core/metrics/Attr.java | 28 +++ .../ydb/core/metrics/DoubleHistogram.java | 19 ++ .../tech/ydb/core/metrics/LongCounter.java | 19 ++ .../ydb/core/metrics/LongMeasurement.java | 20 ++ .../java/tech/ydb/core/metrics/Meter.java | 26 ++ .../ydb/core/metrics/OpenTelemetryMeter.java | 86 +++++++ .../core/metrics/OpenTelemetryMeterTest.java | 120 +++++++++ query/pom.xml | 26 +- .../main/java/tech/ydb/query/QueryClient.java | 3 + .../tech/ydb/query/impl/QueryClientImpl.java | 38 ++- .../java/tech/ydb/query/impl/SessionPool.java | 20 +- .../tech/ydb/query/impl/TableClientImpl.java | 6 + .../tech/ydb/query/impl/PoolMetricsTest.java | 236 ++++++++++++++++++ table/pom.xml | 29 ++- .../main/java/tech/ydb/table/TableClient.java | 2 + .../ydb/table/impl/PooledTableClient.java | 17 +- .../tech/ydb/table/impl/pool/PoolMetrics.java | 85 +++++++ .../tech/ydb/table/impl/pool/SessionPool.java | 17 ++ .../ydb/table/impl/pool/PoolMetricsTest.java | 186 ++++++++++++++ 20 files changed, 975 insertions(+), 21 deletions(-) create mode 100644 core/src/main/java/tech/ydb/core/metrics/Attr.java create mode 100644 core/src/main/java/tech/ydb/core/metrics/DoubleHistogram.java create mode 100644 core/src/main/java/tech/ydb/core/metrics/LongCounter.java create mode 100644 core/src/main/java/tech/ydb/core/metrics/LongMeasurement.java create mode 100644 core/src/main/java/tech/ydb/core/metrics/Meter.java create mode 100644 core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java create mode 100644 core/src/test/java/tech/ydb/core/metrics/OpenTelemetryMeterTest.java create mode 100644 query/src/test/java/tech/ydb/query/impl/PoolMetricsTest.java create mode 100644 table/src/main/java/tech/ydb/table/impl/pool/PoolMetrics.java create mode 100644 table/src/test/java/tech/ydb/table/impl/pool/PoolMetricsTest.java diff --git a/core/pom.xml b/core/pom.xml index f0a20b88e..e07dcbf72 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -72,6 +72,19 @@ test + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-sdk-testing + ${opentelemetry.version} + test + + org.bouncycastle bcprov-jdk18on diff --git a/core/src/main/java/tech/ydb/core/metrics/Attr.java b/core/src/main/java/tech/ydb/core/metrics/Attr.java new file mode 100644 index 000000000..7c8fa9c8c --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/Attr.java @@ -0,0 +1,28 @@ +package tech.ydb.core.metrics; + +import java.util.Objects; + +/** + * Single immutable attribute (key + value) attached to a metric measurement. + */ +public final class Attr { + private final String key; + private final String value; + + private Attr(String key, String value) { + this.key = Objects.requireNonNull(key, "key is null"); + this.value = Objects.requireNonNull(value, "value is null"); + } + + public static Attr of(String key, String value) { + return new Attr(key, value); + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } +} diff --git a/core/src/main/java/tech/ydb/core/metrics/DoubleHistogram.java b/core/src/main/java/tech/ydb/core/metrics/DoubleHistogram.java new file mode 100644 index 000000000..012a3444b --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/DoubleHistogram.java @@ -0,0 +1,19 @@ +package tech.ydb.core.metrics; + +import io.grpc.ExperimentalApi; + +/** + * Histogram of {@code double} values (typically used for durations in seconds). + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public interface DoubleHistogram { + DoubleHistogram NOOP = (value, attrs) -> { }; + + /** + * Records the given value with optional attributes. + * + * @param value value to record + * @param attrs measurement attributes + */ + void record(double value, Attr... attrs); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/LongCounter.java b/core/src/main/java/tech/ydb/core/metrics/LongCounter.java new file mode 100644 index 000000000..003b0ff8f --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/LongCounter.java @@ -0,0 +1,19 @@ +package tech.ydb.core.metrics; + +import io.grpc.ExperimentalApi; + +/** + * A monotonic counter that records non-negative {@code long} deltas. + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public interface LongCounter { + LongCounter NOOP = (value, attrs) -> { }; + + /** + * Adds the given value with optional attributes. + * + * @param value non-negative delta + * @param attrs measurement attributes + */ + void add(long value, Attr... attrs); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/LongMeasurement.java b/core/src/main/java/tech/ydb/core/metrics/LongMeasurement.java new file mode 100644 index 000000000..378e901bb --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/LongMeasurement.java @@ -0,0 +1,20 @@ +package tech.ydb.core.metrics; + +import io.grpc.ExperimentalApi; + +/** + * Per-observation handle passed to {@link Meter#createLongGauge} callbacks. + * + *

A single callback invocation may call {@link #record} multiple times with different + * attributes to emit several measurements per collection cycle. + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public interface LongMeasurement { + /** + * Records the current value of the gauge for the given attribute set. + * + * @param value observed value + * @param attrs measurement attributes + */ + void record(long value, Attr... attrs); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/Meter.java b/core/src/main/java/tech/ydb/core/metrics/Meter.java new file mode 100644 index 000000000..15ca5c096 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/Meter.java @@ -0,0 +1,26 @@ +package tech.ydb.core.metrics; + +import java.util.function.Consumer; + +import io.grpc.ExperimentalApi; + +/** + * Entry point to create metric instruments. The interface is dependency-free, so the SDK core does + * not require an OpenTelemetry runtime to compile or run. Implementations must be thread-safe. + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public interface Meter { + Meter NOOP = new Meter() { }; + + default LongCounter createCounter(String name, String unit, String description) { + return LongCounter.NOOP; + } + + default DoubleHistogram createHistogram(String name, String unit, String description) { + return DoubleHistogram.NOOP; + } + + default void createLongGauge(String name, String unit, String description, Consumer callback) { + // noop: the backend never queries the callback + } +} diff --git a/core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java b/core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java new file mode 100644 index 000000000..f00980ace --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java @@ -0,0 +1,86 @@ +package tech.ydb.core.metrics; + +import java.util.Objects; +import java.util.function.Consumer; + +import io.grpc.ExperimentalApi; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.LongGaugeBuilder; + +/** + * OpenTelemetry-backed implementation of {@link Meter}. + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public final class OpenTelemetryMeter implements Meter { + private static final String DEFAULT_SCOPE = "tech.ydb.sdk"; + + private final io.opentelemetry.api.metrics.Meter meter; + + private OpenTelemetryMeter(io.opentelemetry.api.metrics.Meter meter) { + this.meter = Objects.requireNonNull(meter, "meter is null"); + } + + public static OpenTelemetryMeter fromOpenTelemetry(OpenTelemetry openTelemetry) { + Objects.requireNonNull(openTelemetry, "openTelemetry is null"); + return new OpenTelemetryMeter(openTelemetry.getMeter(DEFAULT_SCOPE)); + } + + public static OpenTelemetryMeter createGlobal() { + return fromOpenTelemetry(GlobalOpenTelemetry.get()); + } + + @Override + public LongCounter createCounter(String name, String unit, String description) { + LongCounterBuilder builder = meter.counterBuilder(name); + if (unit != null) { + builder.setUnit(unit); + } + if (description != null) { + builder.setDescription(description); + } + io.opentelemetry.api.metrics.LongCounter counter = builder.build(); + return (value, attrs) -> counter.add(value, attributesOf(attrs)); + } + + @Override + public DoubleHistogram createHistogram(String name, String unit, String description) { + DoubleHistogramBuilder builder = meter.histogramBuilder(name); + if (unit != null) { + builder.setUnit(unit); + } + if (description != null) { + builder.setDescription(description); + } + io.opentelemetry.api.metrics.DoubleHistogram histogram = builder.build(); + return (value, attrs) -> histogram.record(value, attributesOf(attrs)); + } + + @Override + public void createLongGauge(String name, String unit, String description, Consumer callback) { + LongGaugeBuilder builder = meter.gaugeBuilder(name).ofLongs(); + if (unit != null) { + builder.setUnit(unit); + } + if (description != null) { + builder.setDescription(description); + } + builder.buildWithCallback(otelMeasurement -> + callback.accept((value, attrs) -> otelMeasurement.record(value, attributesOf(attrs)))); + } + + private static Attributes attributesOf(Attr[] attrs) { + if (attrs == null || attrs.length == 0) { + return Attributes.empty(); + } + AttributesBuilder builder = Attributes.builder(); + for (Attr attr : attrs) { + builder.put(attr.getKey(), attr.getValue()); + } + return builder.build(); + } +} diff --git a/core/src/test/java/tech/ydb/core/metrics/OpenTelemetryMeterTest.java b/core/src/test/java/tech/ydb/core/metrics/OpenTelemetryMeterTest.java new file mode 100644 index 000000000..35e4cc706 --- /dev/null +++ b/core/src/test/java/tech/ydb/core/metrics/OpenTelemetryMeterTest.java @@ -0,0 +1,120 @@ +package tech.ydb.core.metrics; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class OpenTelemetryMeterTest { + private static final AttributeKey POOL = AttributeKey.stringKey("pool.name"); + private static final AttributeKey STATE = AttributeKey.stringKey("state"); + + private InMemoryMetricReader reader; + private SdkMeterProvider provider; + private OpenTelemetryMeter meter; + + @Before + public void setup() { + reader = InMemoryMetricReader.create(); + provider = SdkMeterProvider.builder().registerMetricReader(reader).build(); + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build(); + meter = OpenTelemetryMeter.fromOpenTelemetry(openTelemetry); + } + + @After + public void tearDown() throws IOException { + provider.close(); + reader.close(); + } + + @Test + public void counterReportsValueAndAttributes() { + LongCounter counter = meter.createCounter("ydb.test.counter", "{session}", "test counter"); + counter.add(3L, Attr.of("pool.name", "my-pool")); + counter.add(2L, Attr.of("pool.name", "my-pool")); + + MetricData metric = single("ydb.test.counter"); + Assert.assertEquals("{session}", metric.getUnit()); + Assert.assertEquals("test counter", metric.getDescription()); + + LongPointData point = singleLongPoint(metric.getLongSumData().getPoints()); + Assert.assertEquals(5L, point.getValue()); + Assert.assertEquals("my-pool", point.getAttributes().get(POOL)); + } + + @Test + public void histogramReportsValueAndAttributes() { + DoubleHistogram histogram = meter.createHistogram("ydb.test.histogram", "s", "test histogram"); + histogram.record(0.5d, Attr.of("pool.name", "my-pool")); + + MetricData metric = single("ydb.test.histogram"); + Assert.assertEquals("s", metric.getUnit()); + Assert.assertEquals("test histogram", metric.getDescription()); + + Collection points = metric.getHistogramData().getPoints(); + Assert.assertEquals(1, points.size()); + HistogramPointData point = points.iterator().next(); + Assert.assertEquals(1L, point.getCount()); + Assert.assertEquals(0.5d, point.getSum(), 0.0001d); + Assert.assertEquals("my-pool", point.getAttributes().get(POOL)); + } + + @Test + public void gaugeInvokesCallbackOnCollect() { + AtomicLong value = new AtomicLong(7L); + meter.createLongGauge("ydb.test.gauge", "{session}", "test gauge", + m -> m.record(value.get(), Attr.of("pool.name", "my-pool"), Attr.of("state", "idle"))); + + MetricData metric = single("ydb.test.gauge"); + Assert.assertEquals("{session}", metric.getUnit()); + Assert.assertEquals("test gauge", metric.getDescription()); + + LongPointData point = singleLongPoint(metric.getLongGaugeData().getPoints()); + Assert.assertEquals(7L, point.getValue()); + Assert.assertEquals("my-pool", point.getAttributes().get(POOL)); + Assert.assertEquals("idle", point.getAttributes().get(STATE)); + + value.set(11L); + LongPointData updated = singleLongPoint(single("ydb.test.gauge").getLongGaugeData().getPoints()); + Assert.assertEquals(11L, updated.getValue()); + } + + @Test + public void emptyAttributesAreSupported() { + LongCounter counter = meter.createCounter("ydb.test.noattrs", null, null); + counter.add(1L); + + LongPointData point = singleLongPoint(single("ydb.test.noattrs").getLongSumData().getPoints()); + Assert.assertEquals(1L, point.getValue()); + Assert.assertEquals(Attributes.empty(), point.getAttributes()); + } + + private MetricData single(String name) { + MetricData found = null; + for (MetricData metric : reader.collectAllMetrics()) { + if (name.equals(metric.getName())) { + found = metric; + } + } + Assert.assertNotNull(name + " metric not found", found); + return found; + } + + private static LongPointData singleLongPoint(Collection points) { + Assert.assertEquals(1, points.size()); + return points.iterator().next(); + } +} diff --git a/query/pom.xml b/query/pom.xml index 14628a48d..2ae95468d 100644 --- a/query/pom.xml +++ b/query/pom.xml @@ -53,11 +53,9 @@ junit test - org.mockito mockito-core - 4.11.0 test @@ -99,9 +97,29 @@ [9,) - - --add-opens=java.base/java.nio=ALL-UNNAMED + + --add-opens=java.base/java.nio=ALL-UNNAMED -XX:+EnableDynamicAgentLoading + + jdk8-build + + 1.8 + + + + + 4.11.0 + + + + + org.mockito + mockito-inline + ${mockito.version} + test + + + diff --git a/query/src/main/java/tech/ydb/query/QueryClient.java b/query/src/main/java/tech/ydb/query/QueryClient.java index 343ccb01f..f9625624b 100644 --- a/query/src/main/java/tech/ydb/query/QueryClient.java +++ b/query/src/main/java/tech/ydb/query/QueryClient.java @@ -8,6 +8,7 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Tracer; import tech.ydb.query.impl.QueryClientImpl; @@ -55,6 +56,8 @@ interface Builder { Builder sessionMaxIdleTime(Duration duration); + Builder withMeter(Meter meter, String poolName); + QueryClient build(); } } diff --git a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java index 4716abc85..0994f07cd 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java @@ -9,6 +9,7 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.Tracer; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; @@ -24,13 +25,16 @@ public class QueryClientImpl implements QueryClient { private final Tracer tracer; public QueryClientImpl(Builder builder) { + String poolName = builder.sessionPoolName != null ? builder.sessionPoolName : builder.transport.getDatabase(); this.pool = new SessionPool( Clock.systemUTC(), new QueryServiceRpc(builder.transport), builder.transport.getScheduler(), builder.sessionPoolMinSize, builder.sessionPoolMaxSize, - builder.sessionPoolIdleDuration + builder.sessionPoolIdleDuration, + builder.meter, + poolName ); this.scheduler = builder.transport.getScheduler(); this.tracer = builder.transport.getTracer(); @@ -77,6 +81,8 @@ public static class Builder implements QueryClient.Builder { private int sessionPoolMinSize = 0; private int sessionPoolMaxSize = 50; private Duration sessionPoolIdleDuration = Duration.ofMinutes(5); + private String sessionPoolName = null; + private Meter meter = Meter.NOOP; Builder(GrpcTransport transport) { Preconditions.checkArgument(transport != null, "transport is null"); @@ -92,9 +98,9 @@ private static String prettyDuration(Duration duration) { public Builder sessionPoolMinSize(int minSize) { Preconditions.checkArgument(minSize >= 0, "sessionPoolMinSize(%s) is negative", minSize); Preconditions.checkArgument( - minSize <= sessionPoolMaxSize, - "sessionPoolMinSize(%s) is greater than sessionPoolMaxSize(%s)", - minSize, sessionPoolMaxSize); + minSize <= sessionPoolMaxSize, + "sessionPoolMinSize(%s) is greater than sessionPoolMaxSize(%s)", + minSize, sessionPoolMaxSize); this.sessionPoolMinSize = minSize; return this; } @@ -103,9 +109,9 @@ public Builder sessionPoolMinSize(int minSize) { public Builder sessionPoolMaxSize(int maxSize) { Preconditions.checkArgument(maxSize > 0, "sessionPoolMaxSize(%s) is negative or zero", maxSize); Preconditions.checkArgument( - sessionPoolMinSize <= maxSize, - "sessionPoolMinSize(%s) is greater than sessionPoolMaxSize(%s)", - sessionPoolMinSize, maxSize); + sessionPoolMinSize <= maxSize, + "sessionPoolMinSize(%s) is greater than sessionPoolMaxSize(%s)", + sessionPoolMinSize, maxSize); this.sessionPoolMaxSize = maxSize; return this; } @@ -116,16 +122,26 @@ public Builder sessionMaxIdleTime(Duration duration) { "sessionMaxIdleTime(%s) is negative", prettyDuration(duration)); Preconditions.checkArgument(duration.compareTo(MIN_DURATION) >= 0, - "sessionMaxIdleTime(%s) is less than minimal duration %s", - prettyDuration(duration), prettyDuration(MIN_DURATION)); + "sessionMaxIdleTime(%s) is less than minimal duration %s", + prettyDuration(duration), prettyDuration(MIN_DURATION)); Preconditions.checkArgument(duration.compareTo(MAX_DURATION) <= 0, - "sessionMaxIdleTime(%s) is greater than maximal duration %s", - prettyDuration(duration), prettyDuration(MAX_DURATION)); + "sessionMaxIdleTime(%s) is greater than maximal duration %s", + prettyDuration(duration), prettyDuration(MAX_DURATION)); this.sessionPoolIdleDuration = duration; return this; } + @Override + public Builder withMeter(Meter meter, String poolName) { + Preconditions.checkArgument(meter != null, "meter is null"); + Preconditions.checkArgument(poolName != null && !poolName.isEmpty(), + "poolName must be a non-empty string when a Meter is provided"); + this.meter = meter; + this.sessionPoolName = poolName; + return this; + } + @Override public QueryClientImpl build() { return new QueryClientImpl(this); diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index fbbe078ae..a7743c079 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -22,6 +22,7 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.Span; import tech.ydb.core.utils.FutureTools; import tech.ydb.proto.query.YdbQuery; @@ -30,6 +31,7 @@ import tech.ydb.query.settings.CreateSessionSettings; import tech.ydb.query.settings.DeleteSessionSettings; import tech.ydb.table.SessionPoolStats; +import tech.ydb.table.impl.pool.PoolMetrics; import tech.ydb.table.impl.pool.WaitingQueue; @@ -59,14 +61,16 @@ class SessionPool implements AutoCloseable { private final WaitingQueue queue; private final ScheduledFuture cleanerFuture; private final StatsImpl stats = new StatsImpl(); + private final PoolMetrics metrics; + @SuppressWarnings("checkstyle:ParameterNumber") SessionPool(Clock clock, QueryServiceRpc rpc, ScheduledExecutorService scheduler, int minSize, int maxSize, - Duration idleDuration) { + Duration idleDuration, Meter meter, String poolName) { this.minSize = minSize; - this.clock = clock; this.scheduler = scheduler; this.queue = new WaitingQueue<>(new Handler(rpc), maxSize); + this.metrics = new PoolMetrics(meter, "query", poolName, queue, minSize); CleanerTask cleaner = new CleanerTask(idleDuration); this.cleanerFuture = scheduler.scheduleAtFixedRate( @@ -150,6 +154,7 @@ private boolean tryComplete(CompletableFuture> future, Pool } stats.acquired.increment(); + metrics.onSessionAcquired(); return true; } @@ -255,6 +260,7 @@ public void close() { logger.trace("QuerySession[{}] closed with broken status {}", getId(), isBroken); stats.released.increment(); + metrics.onSessionReleased(); if (isBroken || isStopped) { queue.delete(this); } else { @@ -277,16 +283,23 @@ public CompletableFuture create() { Context previous = ctx.attach(); try { Span createSpan = rpc.startSpan("ydb.CreateSession"); + long startNanos = System.nanoTime(); stats.requested.increment(); + metrics.onSessionRequested(); return Span.endOnResult(createSpan, SessionImpl.createSession(rpc, CREATE_SETTINGS, true, createSpan)) .thenCompose(r -> { + metrics.onCreateTime(System.nanoTime() - startNanos); + if (!r.isSuccess()) { stats.failed.increment(); + metrics.onSessionFailed(r.getStatus()); throw new UnexpectedResultException("create session problem", r.getStatus()); } + metrics.onSessionCreated(); PooledQuerySession session = new PooledQuerySession(rpc, r.getValue()); return session.start(); - }).thenApply(Result::getValue); + }) + .thenApply(Result::getValue); } finally { ctx.detach(previous); } @@ -295,6 +308,7 @@ public CompletableFuture create() { @Override public void destroy(PooledQuerySession session) { stats.deleted.increment(); + metrics.onSessionDeleted(); // Execute deleteSession call outside current context to avoid cancellation and deadline propogation Context ctx = Context.ROOT.fork(); diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 8fde13bc8..ff00becc9 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -325,6 +325,12 @@ public Builder sessionMaxIdleTime(Duration duration) { return this; } + @Override + public Builder withMeter(tech.ydb.core.metrics.Meter meter, String poolName) { + query.withMeter(meter, poolName); + return this; + } + @Override public TableClientImpl build() { return new TableClientImpl(this); diff --git a/query/src/test/java/tech/ydb/query/impl/PoolMetricsTest.java b/query/src/test/java/tech/ydb/query/impl/PoolMetricsTest.java new file mode 100644 index 000000000..a0ecd6e1f --- /dev/null +++ b/query/src/test/java/tech/ydb/query/impl/PoolMetricsTest.java @@ -0,0 +1,236 @@ +package tech.ydb.query.impl; + +import java.time.Clock; +import java.time.Duration; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Attr; +import tech.ydb.core.metrics.DoubleHistogram; +import tech.ydb.core.metrics.LongCounter; +import tech.ydb.core.metrics.LongMeasurement; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.tracing.NoopTracer; +import tech.ydb.proto.StatusCodesProtos.StatusIds; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.query.QuerySession; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PoolMetricsTest { + private static final Duration TIMEOUT = Duration.ofMillis(50); + private static final Duration IDLE = Duration.ofMinutes(5); + private static final String POOL = "my-pool"; + private static final String PREFIX = "ydb.query.session."; + + private final Clock clock = Clock.fixed(java.time.Instant.parse("2022-07-01T00:00:00.000Z"), ZoneId.of("UTC")); + private final ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); + private final TestRpc rpc = new TestRpc(); + + private final Meter meter = mock(Meter.class); + private final DoubleHistogram createTime = mock(DoubleHistogram.class); + private final Map counters = new HashMap<>(); + private final Map> gauges = new HashMap<>(); + + private final ArgumentMatcher poolName = a -> attr(a, "pool.name", POOL); + private final ArgumentMatcher stateIdle = a -> attr(a, "state", "idle"); + private final ArgumentMatcher stateInUse = a -> attr(a, "state", "in_use"); + private final ArgumentMatcher statusOverloaded = a -> attr(a, "status_code", "OVERLOADED"); + + @Before + public void setup() { + when(scheduler.scheduleAtFixedRate(any(), anyLong(), anyLong(), any())) + .thenAnswer(inv -> mock(ScheduledFuture.class)); + when(scheduler.schedule(any(Runnable.class), anyLong(), any())) + .thenAnswer(inv -> mock(ScheduledFuture.class)); + + when(meter.createCounter(anyString(), any(), any())) + .thenAnswer(inv -> counters.computeIfAbsent(inv.getArgument(0), k -> mock(LongCounter.class))); + when(meter.createHistogram(anyString(), any(), any())).thenReturn(createTime); + doAnswer(inv -> { + gauges.put(inv.getArgument(0), inv.getArgument(3)); + return null; + }).when(meter).createLongGauge(anyString(), any(), any(), any()); + } + + @Test + public void allInstrumentsAreCreated() { + try (SessionPool pool = createPool(0, 2)) { + verify(meter).createCounter(eq(PREFIX + "created"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "deleted"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "acquired"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "released"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "requested"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "failed"), eq("{session}"), anyString()); + verify(meter).createHistogram(eq(PREFIX + "create_time"), eq("s"), anyString()); + verify(meter).createLongGauge(eq(PREFIX + "max"), eq("{session}"), anyString(), any()); + verify(meter).createLongGauge(eq(PREFIX + "min"), eq("{session}"), anyString(), any()); + verify(meter).createLongGauge(eq(PREFIX + "count"), eq("{session}"), anyString(), any()); + verify(meter).createLongGauge(eq(PREFIX + "pending_requests"), eq("{session}"), anyString(), any()); + } + } + + @Test + public void sessionLifecycleRecordsCounters() { + try (SessionPool pool = createPool(0, 2)) { + QuerySession session = acquireReady(pool); + verify(counter("requested")).add(eq(1L), argThat(poolName)); + verify(counter("created")).add(eq(1L), argThat(poolName)); + verify(createTime).record(anyDouble(), argThat(poolName)); + verify(counter("acquired")).add(eq(1L), argThat(poolName)); + + session.close(); + verify(counter("released")).add(eq(1L), argThat(poolName)); + } + + verify(counter("deleted")).add(eq(1L), argThat(poolName)); + verify(counter("failed"), never()).add(anyLong(), any()); + } + + @Test + public void failedCreateRecordsFailedCounter() { + rpc.overloaded = true; + try (SessionPool pool = createPool(0, 2)) { + Result result = pool.acquire(TIMEOUT).join(); + Assert.assertFalse(result.isSuccess()); + + verify(counter("requested")).add(eq(1L), argThat(poolName)); + verify(counter("failed")).add(eq(1L), argThat(poolName), argThat(statusOverloaded)); + verify(createTime).record(anyDouble(), argThat(poolName)); + verify(counter("created"), never()).add(anyLong(), any()); + verify(counter("acquired"), never()).add(anyLong(), any()); + } + } + + @Test + public void gaugesObserveStats() { + try (SessionPool pool = createPool(0, 2)) { + QuerySession s1 = acquireReady(pool); + QuerySession s2 = acquireReady(pool); + + LongMeasurement max = mock(LongMeasurement.class); + gauges.get(PREFIX + "max").accept(max); + verify(max).record(eq(2L), argThat(poolName)); + + LongMeasurement min = mock(LongMeasurement.class); + gauges.get(PREFIX + "min").accept(min); + verify(min).record(eq(0L), argThat(poolName)); + + LongMeasurement count = mock(LongMeasurement.class); + gauges.get(PREFIX + "count").accept(count); + verify(count).record(eq(0L), argThat(poolName), argThat(stateIdle)); + verify(count).record(eq(2L), argThat(poolName), argThat(stateInUse)); + + LongMeasurement pending = mock(LongMeasurement.class); + gauges.get(PREFIX + "pending_requests").accept(pending); + verify(pending).record(eq(0L), argThat(poolName)); + + s1.close(); + + LongMeasurement countAfterRelease = mock(LongMeasurement.class); + gauges.get(PREFIX + "count").accept(countAfterRelease); + verify(countAfterRelease).record(eq(1L), argThat(poolName), argThat(stateIdle)); + verify(countAfterRelease).record(eq(1L), argThat(poolName), argThat(stateInUse)); + + s2.close(); + } + } + + private SessionPool createPool(int minSize, int maxSize) { + return new SessionPool(clock, rpc, scheduler, minSize, maxSize, IDLE, meter, POOL); + } + + private QuerySession acquireReady(SessionPool pool) { + Result result = pool.acquire(TIMEOUT).join(); + Assert.assertTrue("acquire must succeed", result.isSuccess()); + return result.getValue(); + } + + private LongCounter counter(String shortName) { + return counters.get(PREFIX + shortName); + } + + private static boolean attr(Attr attr, String shortKey, String value) { + return attr.getKey().equals(PREFIX + shortKey) && attr.getValue().equals(value); + } + + private static final GrpcTransport DUMMY_TRANSPORT = mock(GrpcTransport.class); + + static { + when(DUMMY_TRANSPORT.getTracer()).thenReturn(NoopTracer.getInstance()); + } + + private static final class TestRpc extends QueryServiceRpc { + private final AtomicInteger ids = new AtomicInteger(); + private volatile boolean overloaded = false; + + TestRpc() { + super(DUMMY_TRANSPORT); + } + + @Override + public CompletableFuture> createSession( + YdbQuery.CreateSessionRequest request, GrpcRequestSettings settings) { + StatusIds.StatusCode code = overloaded ? StatusIds.StatusCode.OVERLOADED : StatusIds.StatusCode.SUCCESS; + YdbQuery.CreateSessionResponse response = YdbQuery.CreateSessionResponse.newBuilder() + .setStatus(code) + .setSessionId("session-" + ids.incrementAndGet()) + .build(); + return CompletableFuture.completedFuture(Result.success(response)); + } + + @Override + public GrpcReadStream attachSession( + YdbQuery.AttachSessionRequest request, GrpcRequestSettings settings) { + YdbQuery.SessionState message = YdbQuery.SessionState.newBuilder() + .setStatus(StatusIds.StatusCode.SUCCESS) + .build(); + return new GrpcReadStream() { + @Override + public CompletableFuture start(Observer observer) { + observer.onNext(message); + return new CompletableFuture<>(); + } + + @Override + public void cancel() { + } + }; + } + + @Override + public CompletableFuture> deleteSession( + YdbQuery.DeleteSessionRequest request, GrpcRequestSettings settings) { + return CompletableFuture.completedFuture(Result.success( + YdbQuery.DeleteSessionResponse.newBuilder() + .setStatus(StatusIds.StatusCode.SUCCESS) + .build())); + } + } +} diff --git a/table/pom.xml b/table/pom.xml index 85dc221c4..eebc69287 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -45,6 +45,11 @@ ydb-junit4-support test + + org.mockito + mockito-core + test + org.apache.logging.log4j log4j-slf4j2-impl @@ -85,9 +90,29 @@ [9,) - - --add-opens=java.base/java.nio=ALL-UNNAMED + + --add-opens=java.base/java.nio=ALL-UNNAMED -XX:+EnableDynamicAgentLoading + + jdk8-build + + 1.8 + + + + + 4.11.0 + + + + + org.mockito + mockito-inline + ${mockito.version} + test + + + diff --git a/table/src/main/java/tech/ydb/table/TableClient.java b/table/src/main/java/tech/ydb/table/TableClient.java index 5dcf73966..f842bfc56 100644 --- a/table/src/main/java/tech/ydb/table/TableClient.java +++ b/table/src/main/java/tech/ydb/table/TableClient.java @@ -47,6 +47,8 @@ interface Builder { Builder sessionMaxIdleTime(Duration duration); + Builder withMeter(tech.ydb.core.metrics.Meter meter, String poolName); + TableClient build(); } } diff --git a/table/src/main/java/tech/ydb/table/impl/PooledTableClient.java b/table/src/main/java/tech/ydb/table/impl/PooledTableClient.java index 11eb1a81d..38834344b 100644 --- a/table/src/main/java/tech/ydb/table/impl/PooledTableClient.java +++ b/table/src/main/java/tech/ydb/table/impl/PooledTableClient.java @@ -8,6 +8,7 @@ import com.google.common.base.Preconditions; import tech.ydb.core.Result; +import tech.ydb.core.metrics.Meter; import tech.ydb.table.Session; import tech.ydb.table.SessionPoolStats; import tech.ydb.table.TableClient; @@ -28,7 +29,9 @@ protected PooledTableClient(Builder builder) { Clock.systemUTC(), builder.tableRpc, builder.keepQueryText, - builder.sessionPoolOptions + builder.sessionPoolOptions, + builder.meter, + builder.poolName ); } @@ -69,6 +72,8 @@ public static class Builder implements TableClient.Builder { private final TableRpc tableRpc; private boolean keepQueryText = true; private SessionPoolOptions sessionPoolOptions = SessionPoolOptions.DEFAULT; + private Meter meter = Meter.NOOP; + private String poolName = "default"; protected Builder(TableRpc tableRpc) { Preconditions.checkArgument(tableRpc != null, "table rpc is null"); @@ -130,6 +135,16 @@ public Builder sessionMaxIdleTime(Duration duration) { return this; } + @Override + public Builder withMeter(Meter meter, String poolName) { + Preconditions.checkArgument(meter != null, "meter is null"); + Preconditions.checkArgument(poolName != null && !poolName.isEmpty(), + "poolName must be a non-empty string when a Meter is provided"); + this.meter = meter; + this.poolName = poolName; + return this; + } + @Override public PooledTableClient build() { return new PooledTableClient(this); diff --git a/table/src/main/java/tech/ydb/table/impl/pool/PoolMetrics.java b/table/src/main/java/tech/ydb/table/impl/pool/PoolMetrics.java new file mode 100644 index 000000000..7005b1a4a --- /dev/null +++ b/table/src/main/java/tech/ydb/table/impl/pool/PoolMetrics.java @@ -0,0 +1,85 @@ +package tech.ydb.table.impl.pool; + +import tech.ydb.core.Status; +import tech.ydb.core.metrics.Attr; +import tech.ydb.core.metrics.DoubleHistogram; +import tech.ydb.core.metrics.LongCounter; +import tech.ydb.core.metrics.Meter; + +public final class PoolMetrics { + private static final String UNIT = "{session}"; + private static final double NANOS_IN_SECOND = 1_000_000_000.0; + + private final Attr poolNameAttr; + private final Attr[] poolAttrs; + private final Attr[] idleAttrs; + private final Attr[] inUseAttrs; + private final String statusKey; + + private final LongCounter created; + private final LongCounter deleted; + private final LongCounter acquired; + private final LongCounter released; + private final LongCounter requested; + private final LongCounter failed; + private final DoubleHistogram createTime; + + public PoolMetrics(Meter meter, String name, String poolName, WaitingQueue queue, int minSize) { + String prefix = "ydb." + name + ".session."; + this.statusKey = prefix + "status_code"; + this.poolNameAttr = Attr.of(prefix + "pool.name", poolName); + + this.poolAttrs = new Attr[]{poolNameAttr}; + this.idleAttrs = new Attr[]{poolNameAttr, Attr.of(prefix + "state", "idle")}; + this.inUseAttrs = new Attr[]{poolNameAttr, Attr.of(prefix + "state", "in_use")}; + + this.created = meter.createCounter(prefix + "created", UNIT, "Total successful session creations."); + this.deleted = meter.createCounter(prefix + "deleted", UNIT, "Total session deletions."); + this.acquired = meter.createCounter(prefix + "acquired", UNIT, "Total session acquires from the pool."); + this.released = meter.createCounter(prefix + "released", UNIT, "Total session releases back to the pool."); + this.requested = meter.createCounter(prefix + "requested", UNIT, "Total CreateSession calls."); + this.failed = meter.createCounter(prefix + "failed", UNIT, "Total failed session creations."); + this.createTime = meter.createHistogram(prefix + "create_time", "s", "Session creation cost."); + + meter.createLongGauge(prefix + "max", UNIT, "Configured MaxPoolSize", + m -> m.record(queue.getTotalLimit(), poolAttrs)); + meter.createLongGauge(prefix + "min", UNIT, "Configured MinPoolSize", + m -> m.record(minSize, poolAttrs)); + meter.createLongGauge(prefix + "count", UNIT, "Current pool session counts", m -> { + int total = queue.getTotalCount(); + int idle = queue.getIdleCount(); + m.record(idle, idleAttrs); + m.record(total - idle, inUseAttrs); + }); + meter.createLongGauge(prefix + "pending_requests", UNIT, "Requests waiting for a session.", + m -> m.record(queue.getWaitingCount() + queue.getPendingCount(), poolAttrs)); + } + + public void onSessionRequested() { + requested.add(1L, poolAttrs); + } + + public void onCreateTime(long createTimeNanos) { + createTime.record(createTimeNanos / NANOS_IN_SECOND, poolAttrs); + } + + public void onSessionCreated() { + created.add(1L, poolAttrs); + } + + public void onSessionDeleted() { + deleted.add(1L, poolAttrs); + } + + public void onSessionAcquired() { + acquired.add(1L, poolAttrs); + } + + public void onSessionReleased() { + released.add(1L, poolAttrs); + } + + public void onSessionFailed(Status status) { + failed.add(1L, new Attr[]{poolNameAttr, Attr.of(statusKey, status.getCode().name())}); + } +} diff --git a/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java b/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java index fa34023c1..cffad0d1b 100644 --- a/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java +++ b/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java @@ -22,6 +22,7 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.utils.FutureTools; import tech.ydb.table.Session; import tech.ydb.table.SessionPoolStats; @@ -48,13 +49,20 @@ public class SessionPool implements AutoCloseable { private final ScheduledFuture keepAliveFuture; private final StatsImpl stats = new StatsImpl(); + private final PoolMetrics metrics; public SessionPool(Clock clock, TableRpc rpc, boolean keepQueryText, SessionPoolOptions options) { + this(clock, rpc, keepQueryText, options, Meter.NOOP, "default"); + } + + public SessionPool(Clock clock, TableRpc rpc, boolean keepQueryText, SessionPoolOptions options, + Meter meter, String poolName) { this.minSize = options.getMinSize(); this.clock = clock; this.scheduler = rpc.getScheduler(); this.queue = new WaitingQueue<>(new Handler(rpc, keepQueryText), options.getMaxSize()); + this.metrics = new PoolMetrics(meter, "table", poolName, queue, this.minSize); KeepAliveTask keepAlive = new KeepAliveTask(options); this.keepAliveFuture = scheduler.scheduleAtFixedRate( @@ -134,6 +142,7 @@ private boolean validateSession(ClosableSession session, CompletableFuture create() { Context previous = ctx.attach(); try { stats.requested.increment(); + metrics.onSessionRequested(); + long startNanos = System.nanoTime(); return BaseSession .createSessionId(tableRpc, CREATE_SETTINGS, true) .thenApply(response -> { + metrics.onCreateTime(System.nanoTime() - startNanos); + if (!response.isSuccess()) { stats.failed.increment(); + metrics.onSessionFailed(response.getStatus()); throw new UnexpectedResultException("create session problem", response.getStatus()); } + metrics.onSessionCreated(); return new ClosableSession(response.getValue(), tableRpc, keepQueryText); }); } finally { @@ -201,6 +217,7 @@ public CompletableFuture create() { @Override public void destroy(ClosableSession session) { stats.deleted.increment(); + metrics.onSessionDeleted(); // Execute deleteSession call outside current context to avoid cancellation and deadline propogation Context ctx = Context.ROOT.fork(); Context previous = ctx.attach(); diff --git a/table/src/test/java/tech/ydb/table/impl/pool/PoolMetricsTest.java b/table/src/test/java/tech/ydb/table/impl/pool/PoolMetricsTest.java new file mode 100644 index 000000000..bdfa0861f --- /dev/null +++ b/table/src/test/java/tech/ydb/table/impl/pool/PoolMetricsTest.java @@ -0,0 +1,186 @@ +package tech.ydb.table.impl.pool; + +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +import tech.ydb.core.Result; +import tech.ydb.core.metrics.Attr; +import tech.ydb.core.metrics.DoubleHistogram; +import tech.ydb.core.metrics.LongCounter; +import tech.ydb.core.metrics.LongMeasurement; +import tech.ydb.core.metrics.Meter; +import tech.ydb.table.Session; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PoolMetricsTest extends FutureHelper { + private static final Duration TIMEOUT = Duration.ofMillis(50); + private static final String POOL = "my-pool"; + private static final String PREFIX = "ydb.table.session."; + + private final MockedClock clock = MockedClock.create(ZoneId.of("UTC")); + private final MockedScheduler scheduler = new MockedScheduler(clock); + private final MockedTableRpc tableRpc = new MockedTableRpc(clock, scheduler); + + private final Meter meter = mock(Meter.class); + private final DoubleHistogram createTime = mock(DoubleHistogram.class); + private final Map counters = new HashMap<>(); + private final Map> gauges = new HashMap<>(); + + private final ArgumentMatcher poolName = a -> attr(a, "pool.name", POOL); + private final ArgumentMatcher stateIdle = a -> attr(a, "state", "idle"); + private final ArgumentMatcher stateInUse = a -> attr(a, "state", "in_use"); + private final ArgumentMatcher statusOverloaded = a -> attr(a, "status_code", "OVERLOADED"); + + @Before + public void setup() { + clock.reset(Instant.parse("2022-07-01T00:00:00.000Z")); + when(meter.createCounter(anyString(), any(), any())) + .thenAnswer(inv -> counters.computeIfAbsent(inv.getArgument(0), k -> mock(LongCounter.class))); + when(meter.createHistogram(anyString(), any(), any())).thenReturn(createTime); + doAnswer(inv -> { + gauges.put(inv.getArgument(0), inv.getArgument(3)); + return null; + }).when(meter).createLongGauge(anyString(), any(), any(), any()); + } + + @After + public void close() { + scheduler.shutdown(); + scheduler.check().isClosed().hasNoTasks(); + tableRpc.check().hasNoSessions().hasNoPendingRequests(); + } + + @Test + public void allInstrumentsAreCreated() { + SessionPool pool = createPool(0, 2); + + verify(meter).createCounter(eq(PREFIX + "created"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "deleted"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "acquired"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "released"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "requested"), eq("{session}"), anyString()); + verify(meter).createCounter(eq(PREFIX + "failed"), eq("{session}"), anyString()); + verify(meter).createHistogram(eq(PREFIX + "create_time"), eq("s"), anyString()); + verify(meter).createLongGauge(eq(PREFIX + "max"), eq("{session}"), anyString(), any()); + verify(meter).createLongGauge(eq(PREFIX + "min"), eq("{session}"), anyString(), any()); + verify(meter).createLongGauge(eq(PREFIX + "count"), eq("{session}"), anyString(), any()); + verify(meter).createLongGauge(eq(PREFIX + "pending_requests"), eq("{session}"), anyString(), any()); + + pool.close(); + } + + @Test + public void sessionLifecycleRecordsCounters() { + SessionPool pool = createPool(0, 2); + + CompletableFuture> f1 = pendingFuture(pool.acquire(TIMEOUT)); + verify(counter("requested")).add(eq(1L), argThat(poolName)); + + tableRpc.nextCreateSession().completeSuccess(); + verify(counter("created")).add(eq(1L), argThat(poolName)); + verify(createTime).record(anyDouble(), argThat(poolName)); + verify(counter("acquired")).add(eq(1L), argThat(poolName)); + + Session s1 = futureIsReady(f1).getValue(); + s1.close(); + verify(counter("released")).add(eq(1L), argThat(poolName)); + + pool.close(); + verify(counter("deleted")).add(eq(1L), argThat(poolName)); + tableRpc.completeSessionDeleteRequests(); + + verify(counter("failed"), never()).add(anyLong(), any()); + } + + @Test + public void failedCreateRecordsFailedCounter() { + SessionPool pool = createPool(0, 2); + + CompletableFuture> f1 = pendingFuture(pool.acquire(TIMEOUT)); + tableRpc.nextCreateSession().completeOverloaded(); + + futureIsReady(f1); + verify(counter("requested")).add(eq(1L), argThat(poolName)); + verify(counter("failed")).add(eq(1L), argThat(poolName), argThat(statusOverloaded)); + verify(createTime).record(anyDouble(), argThat(poolName)); + verify(counter("created"), never()).add(anyLong(), any()); + verify(counter("acquired"), never()).add(anyLong(), any()); + + pool.close(); + } + + @Test + public void gaugesObserveStats() { + SessionPool pool = createPool(0, 2); + + CompletableFuture> f1 = pendingFuture(pool.acquire(TIMEOUT)); + CompletableFuture> f2 = pendingFuture(pool.acquire(TIMEOUT)); + tableRpc.nextCreateSession().completeSuccess(); + tableRpc.nextCreateSession().completeSuccess(); + + Session s1 = futureIsReady(f1).getValue(); + Session s2 = futureIsReady(f2).getValue(); + + LongMeasurement max = mock(LongMeasurement.class); + gauges.get(PREFIX + "max").accept(max); + verify(max).record(eq(2L), argThat(poolName)); + + LongMeasurement min = mock(LongMeasurement.class); + gauges.get(PREFIX + "min").accept(min); + verify(min).record(eq(0L), argThat(poolName)); + + LongMeasurement count = mock(LongMeasurement.class); + gauges.get(PREFIX + "count").accept(count); + verify(count).record(eq(0L), argThat(poolName), argThat(stateIdle)); + verify(count).record(eq(2L), argThat(poolName), argThat(stateInUse)); + + LongMeasurement pending = mock(LongMeasurement.class); + gauges.get(PREFIX + "pending_requests").accept(pending); + verify(pending).record(eq(0L), argThat(poolName)); + + s1.close(); + + LongMeasurement countAfterRelease = mock(LongMeasurement.class); + gauges.get(PREFIX + "count").accept(countAfterRelease); + verify(countAfterRelease).record(eq(1L), argThat(poolName), argThat(stateIdle)); + verify(countAfterRelease).record(eq(1L), argThat(poolName), argThat(stateInUse)); + + s2.close(); + pool.close(); + tableRpc.completeSessionDeleteRequests(); + } + + private SessionPool createPool(int minSize, int maxSize) { + return new SessionPool(clock, tableRpc, true, + SessionPoolOptions.DEFAULT.withSize(minSize, maxSize), meter, POOL); + } + + private LongCounter counter(String shortName) { + return counters.get(PREFIX + shortName); + } + + private static boolean attr(Attr attr, String shortKey, String value) { + return attr.getKey().equals(PREFIX + shortKey) && attr.getValue().equals(value); + } +} From 1819f606e93071943dd71a10d9d6d59c6f8fd05e Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Fri, 5 Jun 2026 13:43:23 +0300 Subject: [PATCH 2/3] deleted -XX:+EnableDynamicAgentLoading --- query/pom.xml | 6 ++++-- table/pom.xml | 10 +++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/query/pom.xml b/query/pom.xml index 2ae95468d..e816356b4 100644 --- a/query/pom.xml +++ b/query/pom.xml @@ -84,7 +84,9 @@ true ydbplatform/local-ydb:trunk - enable_columnshard_bool,enable_resource_pools,enable_arrow_result_set_format + + enable_columnshard_bool,enable_resource_pools,enable_arrow_result_set_format + @@ -98,7 +100,7 @@ - --add-opens=java.base/java.nio=ALL-UNNAMED -XX:+EnableDynamicAgentLoading + --add-opens=java.base/java.nio=ALL-UNNAMED diff --git a/table/pom.xml b/table/pom.xml index eebc69287..3f414faf3 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -1,7 +1,7 @@ + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 @@ -35,7 +35,7 @@ true - + junit junit test @@ -91,7 +91,7 @@ - --add-opens=java.base/java.nio=ALL-UNNAMED -XX:+EnableDynamicAgentLoading + --add-opens=java.base/java.nio=ALL-UNNAMED From 7a5e3395b1154a7e08747a02aa8909dd5182e92d Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Fri, 5 Jun 2026 13:59:25 +0300 Subject: [PATCH 3/3] added comments --- query/pom.xml | 1 + table/pom.xml | 1 + 2 files changed, 2 insertions(+) diff --git a/query/pom.xml b/query/pom.xml index e816356b4..5db83c8aa 100644 --- a/query/pom.xml +++ b/query/pom.xml @@ -100,6 +100,7 @@ + --add-opens=java.base/java.nio=ALL-UNNAMED diff --git a/table/pom.xml b/table/pom.xml index 3f414faf3..c84117e42 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -91,6 +91,7 @@ + --add-opens=java.base/java.nio=ALL-UNNAMED