diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java index 96816601f..401052d80 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java @@ -40,6 +40,15 @@ GrpcReadWriteStream readWriteStreamCall( String getDatabase(); + /** + * Returns the discovery endpoint as {@code host:port}, or an empty string when unknown. + * + * @return discovery endpoint or empty string + */ + default String getEndpoint() { + return ""; + } + ScheduledExecutorService getScheduler(); default Tracer getTracer() { diff --git a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java index eaf2ff069..5cd8c28a3 100644 --- a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java @@ -65,6 +65,11 @@ protected BaseGrpcTransport(EndpointRecord serverEndpoint) { this.serverEndpoint = serverEndpoint; } + @Override + public String getEndpoint() { + return serverEndpoint == null ? "" : serverEndpoint.getHost() + ":" + serverEndpoint.getPort(); + } + protected abstract AuthCallOptions getAuthCallOptions(); protected abstract GrpcChannel getChannel(GrpcRequestSettings settings); diff --git a/core/src/main/java/tech/ydb/core/metrics/DoubleHistogram.java b/core/src/main/java/tech/ydb/core/metrics/DoubleHistogram.java new file mode 100644 index 000000000..f2cccb720 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/DoubleHistogram.java @@ -0,0 +1,19 @@ +package tech.ydb.core.metrics; + +import io.grpc.ExperimentalApi; + +/** + * Histogram of {@code double} values (typically used for durations in seconds). + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public interface DoubleHistogram { + DoubleHistogram NOOP = (value, keyValues) -> { }; + + /** + * Records the given value with optional pairs of attribute key/value. + * + * @param value value to record + * @param keyValues alternating {@code key, value} pairs; must have even length + */ + void record(double value, String... keyValues); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/LongCounter.java b/core/src/main/java/tech/ydb/core/metrics/LongCounter.java new file mode 100644 index 000000000..ed6671492 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/LongCounter.java @@ -0,0 +1,19 @@ +package tech.ydb.core.metrics; + +import io.grpc.ExperimentalApi; + +/** + * A monotonic counter that records non-negative {@code long} deltas. + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public interface LongCounter { + LongCounter NOOP = (value, keyValues) -> { }; + + /** + * Adds the given value with optional pairs of attribute key/value. + * + * @param value non-negative delta + * @param keyValues alternating {@code key, value} pairs; must have even length + */ + void add(long value, String... keyValues); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/LongMeasurement.java b/core/src/main/java/tech/ydb/core/metrics/LongMeasurement.java new file mode 100644 index 000000000..27e11c860 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/LongMeasurement.java @@ -0,0 +1,20 @@ +package tech.ydb.core.metrics; + +import io.grpc.ExperimentalApi; + +/** + * Per-observation handle passed to {@link Meter#createLongGauge} callbacks. + * + *

A single callback invocation may call {@link #record} multiple times with different + * attributes to emit several measurements per collection cycle. + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public interface LongMeasurement { + /** + * Records the current value of the gauge for the given attribute set. + * + * @param value observed value + * @param keyValues alternating {@code key, value} pairs; must have even length + */ + void record(long value, String... keyValues); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/Meter.java b/core/src/main/java/tech/ydb/core/metrics/Meter.java new file mode 100644 index 000000000..4caa95341 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/Meter.java @@ -0,0 +1,58 @@ +package tech.ydb.core.metrics; + +import java.util.function.Consumer; + +import io.grpc.ExperimentalApi; + +/** + * Entry point to create metric instruments (counters, histograms, gauges). + * + *

The interface is dependency-free, so the SDK core does not require an OpenTelemetry runtime + * to compile or run. Each call site (session pool, RPC service, etc.) is expected to create and + * own its private instruments via this {@code Meter} in its constructor and call them on the + * hot path. + * + *

Implementations must be thread-safe. + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public interface Meter { + Meter NOOP = new Meter() { }; + + /** + * Creates a monotonic {@code long} counter. + * + * @param name metric name + * @param unit metric unit (for example, {@code {operation}}) + * @param description human-readable metric description + * @return created counter + */ + default LongCounter createCounter(String name, String unit, String description) { + return LongCounter.NOOP; + } + + /** + * Creates a {@code double} histogram. + * + * @param name metric name + * @param unit metric unit (for example, {@code s}) + * @param description human-readable metric description + * @return created histogram + */ + default DoubleHistogram createHistogram(String name, String unit, String description) { + return DoubleHistogram.NOOP; + } + + /** + * Registers an asynchronous {@code long} gauge. The {@code callback} is invoked by the metrics + * backend on every collection cycle; the supplied {@link LongMeasurement} may emit any number + * of measurements with different attributes. + * + * @param name metric name + * @param unit metric unit (for example, {@code {session}}) + * @param description human-readable metric description + * @param callback callback invoked by the metrics backend to collect gauge values + */ + default void createLongGauge(String name, String unit, String description, Consumer callback) { + // noop: the backend never queries the callback + } +} diff --git a/core/src/main/java/tech/ydb/core/metrics/MetricAttributes.java b/core/src/main/java/tech/ydb/core/metrics/MetricAttributes.java new file mode 100644 index 000000000..fc13776d9 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/MetricAttributes.java @@ -0,0 +1,17 @@ +package tech.ydb.core.metrics; + +import io.grpc.ExperimentalApi; + +/** + * Attribute keys shared by SDK metrics across modules (query, topic, ...). + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public final class MetricAttributes { + public static final String DATABASE = "database"; + public static final String ENDPOINT = "endpoint"; + public static final String OPERATION_NAME = "operation.name"; + public static final String STATUS_CODE = "status_code"; + + private MetricAttributes() { + } +} diff --git a/core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java b/core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java new file mode 100644 index 000000000..868a6da6b --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java @@ -0,0 +1,95 @@ +package tech.ydb.core.metrics; + +import java.util.Objects; +import java.util.function.Consumer; + +import io.grpc.ExperimentalApi; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.LongGaugeBuilder; + +/** + * OpenTelemetry-backed implementation of {@link Meter}. + * + *

This adapter is a thin facade over an OpenTelemetry {@code Meter}: it owns no state besides + * the underlying meter. Callers attach attributes (including any resource-style attributes such as + * {@code database}) per recorded measurement. + */ +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public final class OpenTelemetryMeter implements Meter { + private static final String DEFAULT_SCOPE = "tech.ydb.sdk"; + + private final io.opentelemetry.api.metrics.Meter meter; + + private OpenTelemetryMeter(io.opentelemetry.api.metrics.Meter meter) { + this.meter = Objects.requireNonNull(meter, "meter is null"); + } + + public static OpenTelemetryMeter fromOpenTelemetry(OpenTelemetry openTelemetry) { + Objects.requireNonNull(openTelemetry, "openTelemetry is null"); + return new OpenTelemetryMeter(openTelemetry.getMeter(DEFAULT_SCOPE)); + } + + public static OpenTelemetryMeter createGlobal() { + return fromOpenTelemetry(GlobalOpenTelemetry.get()); + } + + @Override + public LongCounter createCounter(String name, String unit, String description) { + LongCounterBuilder builder = meter.counterBuilder(name); + if (unit != null) { + builder.setUnit(unit); + } + if (description != null) { + builder.setDescription(description); + } + io.opentelemetry.api.metrics.LongCounter counter = builder.build(); + return (value, kv) -> counter.add(value, attributesOf(kv)); + } + + @Override + public DoubleHistogram createHistogram(String name, String unit, String description) { + DoubleHistogramBuilder builder = meter.histogramBuilder(name); + if (unit != null) { + builder.setUnit(unit); + } + if (description != null) { + builder.setDescription(description); + } + io.opentelemetry.api.metrics.DoubleHistogram histogram = builder.build(); + return (value, kv) -> histogram.record(value, attributesOf(kv)); + } + + @Override + public void createLongGauge(String name, String unit, String description, Consumer callback) { + LongGaugeBuilder builder = meter.gaugeBuilder(name).ofLongs(); + if (unit != null) { + builder.setUnit(unit); + } + if (description != null) { + builder.setDescription(description); + } + builder.buildWithCallback(otelMeasurement -> + callback.accept((value, kv) -> otelMeasurement.record(value, attributesOf(kv)))); + } + + private static Attributes attributesOf(String[] keyValues) { + if (keyValues == null || keyValues.length == 0) { + return Attributes.empty(); + } + if ((keyValues.length & 1) == 1) { + throw new IllegalArgumentException( + "Meter attribute keyValues must contain an even number of entries (got " + + keyValues.length + ")"); + } + AttributesBuilder builder = Attributes.builder(); + for (int i = 0; i < keyValues.length; i += 2) { + builder.put(keyValues[i], keyValues[i + 1]); + } + return builder.build(); + } +} diff --git a/core/src/main/java/tech/ydb/core/tracing/Span.java b/core/src/main/java/tech/ydb/core/tracing/Span.java index b482bc6cb..2eb6779dc 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Span.java +++ b/core/src/main/java/tech/ydb/core/tracing/Span.java @@ -1,14 +1,10 @@ package tech.ydb.core.tracing; -import java.util.concurrent.CompletableFuture; - import javax.annotation.Nullable; import io.grpc.ExperimentalApi; -import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.core.utils.FutureTools; /** * A span represents a timed operation. @@ -93,35 +89,4 @@ default Scope restoreContext() { */ default void end() { } - - /** - * Subscribes to a {@link Status} future: when it completes, sets status/error and ends the span. - * For non-valid spans returns the future as-is without subscribing. - * - * @param span the span to finalize - * @param future the future to observe - * @return the same future (for chaining) - */ - static CompletableFuture endOnStatus(Span span, CompletableFuture future) { - return span.isValid() ? future.whenComplete((status, th) -> { - span.setStatus(status, FutureTools.unwrapCompletionException(th)); - span.end(); - }) : future; - } - - /** - * Subscribes to a {@link Result} future: when it completes, sets status/error and ends the span. - * For non-valid spans returns the future as-is without subscribing. - * - * @param result value type - * @param span the span to finalize - * @param future the future to observe - * @return the same future (for chaining) - */ - static CompletableFuture> endOnResult(Span span, CompletableFuture> future) { - return span.isValid() ? future.whenComplete((result, th) -> { - span.setStatus(result != null ? result.getStatus() : null, FutureTools.unwrapCompletionException(th)); - span.end(); - }) : future; - } } diff --git a/core/src/main/java/tech/ydb/core/utils/DiagnosticCall.java b/core/src/main/java/tech/ydb/core/utils/DiagnosticCall.java new file mode 100644 index 000000000..b6d61dce3 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/utils/DiagnosticCall.java @@ -0,0 +1,133 @@ +package tech.ydb.core.utils; + +import java.util.concurrent.CompletableFuture; + +import javax.annotation.Nullable; + +import io.grpc.ExperimentalApi; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.metrics.DoubleHistogram; +import tech.ydb.core.metrics.LongCounter; +import tech.ydb.core.metrics.MetricAttributes; +import tech.ydb.core.tracing.Span; + +/** + * Static helpers for recording diagnostics (span + metrics) around client operations. + */ +@ExperimentalApi("YDB diagnostic API is experimental and may change without notice") +public final class DiagnosticCall { + private static final double NANOS_IN_SECOND = 1_000_000_000.0; + + private DiagnosticCall() { + } + + /** + * Subscribes to a {@link Status} future and records operation span and metrics when it completes. + * + * @param operationName logical operation name + * @param span operation span + * @param startNanos operation start timestamp from {@link System#nanoTime()} + * @param duration operation duration histogram + * @param failed operation failure counter + * @param future future to observe + * @param baseAttributes alternating {@code key, value} pairs added to every metric point + * @return completion stage with diagnostics attached + */ + public static CompletableFuture endOnStatus( + String operationName, + Span span, + long startNanos, + DoubleHistogram duration, + LongCounter failed, + CompletableFuture future, + String... baseAttributes) { + return future.whenComplete((status, th) -> { + recordDuration(operationName, startNanos, duration, baseAttributes); + recordFailed(operationName, failed, status, baseAttributes); + endSpan(span, status, FutureTools.unwrapCompletionException(th)); + }); + } + + /** + * Subscribes to a {@link Result} future and records operation span and metrics when it completes. + * + * @param result value type + * @param operationName logical operation name + * @param span operation span + * @param startNanos operation start timestamp from {@link System#nanoTime()} + * @param duration operation duration histogram + * @param failed operation failure counter + * @param future future to observe + * @param baseAttributes alternating {@code key, value} pairs added to every metric point + * @return completion stage with diagnostics attached + */ + public static CompletableFuture> endOnResult( + String operationName, + Span span, + long startNanos, + DoubleHistogram duration, + LongCounter failed, + CompletableFuture> future, + String... baseAttributes) { + return future.whenComplete((result, th) -> { + Status status = result != null ? result.getStatus() : null; + recordDuration(operationName, startNanos, duration, baseAttributes); + recordFailed(operationName, failed, status, baseAttributes); + endSpan(span, status, FutureTools.unwrapCompletionException(th)); + }); + } + + private static void recordDuration( + String operationName, + long startNanos, + DoubleHistogram duration, + String... baseAttributes) { + duration.record((System.nanoTime() - startNanos) / NANOS_IN_SECOND, + operationAttributes(operationName, baseAttributes)); + } + + private static void recordFailed( + String operationName, + LongCounter failed, + @Nullable Status status, + String... baseAttributes) { + if (status != null && !status.isSuccess()) { + failed.add(1L, failedAttributes(operationName, status, baseAttributes)); + } + } + + private static void endSpan(Span span, @Nullable Status status, @Nullable Throwable error) { + span.setStatus(status, error); + span.end(); + } + + private static String[] operationAttributes(String operationName, String[] baseAttributes) { + String[] attributes = new String[baseLength(baseAttributes) + 2]; + attributes[0] = MetricAttributes.OPERATION_NAME; + attributes[1] = operationName; + copyBaseAttributes(baseAttributes, attributes, 2); + return attributes; + } + + private static String[] failedAttributes(String operationName, Status status, String[] baseAttributes) { + String[] attributes = new String[baseLength(baseAttributes) + 4]; + attributes[0] = MetricAttributes.OPERATION_NAME; + attributes[1] = operationName; + attributes[2] = MetricAttributes.STATUS_CODE; + attributes[3] = status.getCode().toString(); + copyBaseAttributes(baseAttributes, attributes, 4); + return attributes; + } + + private static int baseLength(String[] baseAttributes) { + return baseAttributes == null ? 0 : baseAttributes.length; + } + + private static void copyBaseAttributes(String[] source, String[] target, int offset) { + if (source != null && source.length > 0) { + System.arraycopy(source, 0, target, offset, source.length); + } + } +} diff --git a/core/src/test/java/tech/ydb/core/tracing/SpanTest.java b/core/src/test/java/tech/ydb/core/tracing/SpanTest.java deleted file mode 100644 index 186f9de14..000000000 --- a/core/src/test/java/tech/ydb/core/tracing/SpanTest.java +++ /dev/null @@ -1,98 +0,0 @@ -package tech.ydb.core.tracing; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; - -import org.junit.Assert; -import org.junit.Test; - -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; - -public class SpanTest { - - @Test - public void finishByStatusExceptionTest() { - RecordingSpan span = new RecordingSpan(); - IOException ex = new IOException("test message"); - - CompletableFuture future = new CompletableFuture<>(); - - Assert.assertSame(future, Span.endOnStatus(Span.NOOP, future)); - Assert.assertNotSame(future, Span.endOnStatus(span, future)); - - Assert.assertFalse(span.ended); - Assert.assertNull(span.statusError); - Assert.assertNull(span.throwableError); - - future.completeExceptionally(ex); - - Assert.assertTrue(span.ended); - Assert.assertNull(span.statusError); - Assert.assertSame(ex, span.throwableError); - } - - @Test - public void finishByResultTest() { - RecordingSpan span = new RecordingSpan(); - Status fail = Status.of(StatusCode.BAD_REQUEST); - - CompletableFuture> future = new CompletableFuture<>(); - - Assert.assertNotSame(future, Span.endOnResult(span, future)); - - Assert.assertFalse(span.ended); - Assert.assertNull(span.statusError); - Assert.assertNull(span.throwableError); - - future.complete(Result.fail(fail)); - - Assert.assertTrue(span.ended); - Assert.assertSame(fail, span.statusError); - Assert.assertNull(span.throwableError); - } - - @Test - public void finishByResultExceptionTest() { - RecordingSpan span = new RecordingSpan(); - IOException ex = new IOException("test message"); - - CompletableFuture> future = new CompletableFuture<>(); - - Assert.assertSame(future, Span.endOnResult(Span.NOOP, future)); - Assert.assertNotSame(future, Span.endOnResult(span, future)); - - Assert.assertFalse(span.ended); - Assert.assertNull(span.statusError); - Assert.assertNull(span.throwableError); - - future.completeExceptionally(ex); - - Assert.assertTrue(span.ended); - Assert.assertNull(span.statusError); - Assert.assertSame(ex, span.throwableError); - } - - private static final class RecordingSpan implements Span { - private Status statusError; - private Throwable throwableError; - private boolean ended; - - @Override - public boolean isValid() { - return true; - } - - @Override - public void setStatus(Status status, Throwable error) { - this.statusError = status; - this.throwableError = error; - } - - @Override - public void end() { - this.ended = true; - } - } -} diff --git a/core/src/test/java/tech/ydb/core/utils/DiagnosticCallTest.java b/core/src/test/java/tech/ydb/core/utils/DiagnosticCallTest.java new file mode 100644 index 000000000..e7d0aafd2 --- /dev/null +++ b/core/src/test/java/tech/ydb/core/utils/DiagnosticCallTest.java @@ -0,0 +1,221 @@ +package tech.ydb.core.utils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import tech.ydb.core.Issue; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.metrics.DoubleHistogram; +import tech.ydb.core.metrics.LongCounter; +import tech.ydb.core.metrics.MetricAttributes; +import tech.ydb.core.tracing.Span; + +public class DiagnosticCallTest { + private static final String DATABASE = "/local"; + private static final String ENDPOINT = "host:2136"; + + @Test + public void recordsDurationAndSuccessSpanOnStatus() { + DoubleHistogram duration = Mockito.mock(DoubleHistogram.class); + LongCounter failed = Mockito.mock(LongCounter.class); + Span span = Mockito.mock(Span.class); + + long start = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(20); + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture wrapped = DiagnosticCall.endOnStatus( + "Commit", span, start, duration, failed, future, + MetricAttributes.DATABASE, DATABASE, + MetricAttributes.ENDPOINT, ENDPOINT); + + future.complete(Status.SUCCESS); + Status result = wrapped.join(); + + Assert.assertEquals(Status.SUCCESS, result); + + ArgumentCaptor durationValue = ArgumentCaptor.forClass(Double.class); + ArgumentCaptor durationAttrs = ArgumentCaptor.forClass(String[].class); + Mockito.verify(duration).record(durationValue.capture(), durationAttrs.capture()); + Assert.assertTrue("duration must be positive", durationValue.getValue() > 0.0); + Assert.assertTrue("duration must be < 5s for fresh nanoTime", durationValue.getValue() < 5.0); + Assert.assertArrayEquals(new String[] { + MetricAttributes.OPERATION_NAME, "Commit", + MetricAttributes.DATABASE, DATABASE, + MetricAttributes.ENDPOINT, ENDPOINT + }, durationAttrs.getValue()); + + Mockito.verifyNoInteractions(failed); + Mockito.verify(span).setStatus(Status.SUCCESS, null); + Mockito.verify(span).end(); + } + + @Test + public void incrementsFailedOnNonSuccessStatus() { + DoubleHistogram duration = Mockito.mock(DoubleHistogram.class); + LongCounter failed = Mockito.mock(LongCounter.class); + Span span = Mockito.mock(Span.class); + + long start = System.nanoTime(); + CompletableFuture future = new CompletableFuture<>(); + DiagnosticCall.endOnStatus( + "ExecuteQuery", span, start, duration, failed, future, + MetricAttributes.DATABASE, DATABASE); + + Status badRequest = Status.of(StatusCode.BAD_REQUEST, Issue.of("bad", Issue.Severity.ERROR)); + future.complete(badRequest); + + Mockito.verify(duration).record(Mockito.anyDouble(), Mockito.any(String[].class)); + + ArgumentCaptor failedValue = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor failedAttrs = ArgumentCaptor.forClass(String[].class); + Mockito.verify(failed).add(failedValue.capture(), failedAttrs.capture()); + Assert.assertEquals(Long.valueOf(1L), failedValue.getValue()); + Assert.assertArrayEquals(new String[] { + MetricAttributes.OPERATION_NAME, "ExecuteQuery", + MetricAttributes.STATUS_CODE, StatusCode.BAD_REQUEST.toString(), + MetricAttributes.DATABASE, DATABASE + }, failedAttrs.getValue()); + + Mockito.verify(span).setStatus(badRequest, null); + Mockito.verify(span).end(); + } + + @Test + public void resultStatusIsExtractedOnSuccess() { + DoubleHistogram duration = Mockito.mock(DoubleHistogram.class); + LongCounter failed = Mockito.mock(LongCounter.class); + Span span = Mockito.mock(Span.class); + + long start = System.nanoTime(); + CompletableFuture> future = new CompletableFuture<>(); + CompletableFuture> wrapped = DiagnosticCall.endOnResult( + "Rollback", span, start, duration, failed, future, + MetricAttributes.DATABASE, DATABASE, + MetricAttributes.ENDPOINT, ENDPOINT); + + future.complete(Result.success("ok")); + Result result = wrapped.join(); + + Assert.assertTrue(result.isSuccess()); + Mockito.verify(duration).record(Mockito.anyDouble(), Mockito.any(String[].class)); + Mockito.verifyNoInteractions(failed); + Mockito.verify(span).setStatus(Status.SUCCESS, null); + Mockito.verify(span).end(); + } + + @Test + public void resultFailureIncrementsCounterAndKeepsBaseAttributesOrder() { + DoubleHistogram duration = Mockito.mock(DoubleHistogram.class); + LongCounter failed = Mockito.mock(LongCounter.class); + Span span = Mockito.mock(Span.class); + + Status overloaded = Status.of(StatusCode.OVERLOADED); + long start = System.nanoTime(); + CompletableFuture> future = new CompletableFuture<>(); + DiagnosticCall.endOnResult( + "ExecuteQuery", span, start, duration, failed, future, + MetricAttributes.DATABASE, DATABASE, + MetricAttributes.ENDPOINT, ENDPOINT); + + future.complete(Result.fail(overloaded)); + + ArgumentCaptor failedAttrs = ArgumentCaptor.forClass(String[].class); + Mockito.verify(failed).add(Mockito.eq(1L), failedAttrs.capture()); + Assert.assertArrayEquals(new String[] { + MetricAttributes.OPERATION_NAME, "ExecuteQuery", + MetricAttributes.STATUS_CODE, StatusCode.OVERLOADED.toString(), + MetricAttributes.DATABASE, DATABASE, + MetricAttributes.ENDPOINT, ENDPOINT + }, failedAttrs.getValue()); + + Mockito.verify(span).setStatus(overloaded, null); + Mockito.verify(span).end(); + } + + @Test + public void exceptionalCompletionEndsSpanWithUnwrappedError() { + DoubleHistogram duration = Mockito.mock(DoubleHistogram.class); + LongCounter failed = Mockito.mock(LongCounter.class); + Span span = Mockito.mock(Span.class); + + RuntimeException boom = new RuntimeException("boom"); + long start = System.nanoTime(); + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture wrapped = DiagnosticCall.endOnStatus( + "Commit", span, start, duration, failed, future); + + future.completeExceptionally(boom); + try { + wrapped.join(); + Assert.fail("future must propagate exception"); + } catch (CompletionException ex) { + Assert.assertSame(boom, ex.getCause()); + } + + ArgumentCaptor durationAttrs = ArgumentCaptor.forClass(String[].class); + Mockito.verify(duration).record(Mockito.anyDouble(), durationAttrs.capture()); + Assert.assertArrayEquals( + new String[] {MetricAttributes.OPERATION_NAME, "Commit"}, + durationAttrs.getValue()); + + Mockito.verifyNoInteractions(failed); + Mockito.verify(span).setStatus(null, boom); + Mockito.verify(span).end(); + } + + @Test + public void worksWithoutBaseAttributes() { + DoubleHistogram duration = Mockito.mock(DoubleHistogram.class); + LongCounter failed = Mockito.mock(LongCounter.class); + Span span = Mockito.mock(Span.class); + + long start = System.nanoTime(); + CompletableFuture> future = new CompletableFuture<>(); + DiagnosticCall.endOnResult("Foo", span, start, duration, failed, future); + + Status unavailable = Status.of(StatusCode.UNAVAILABLE); + future.complete(Result.fail(unavailable)); + + ArgumentCaptor durationAttrs = ArgumentCaptor.forClass(String[].class); + Mockito.verify(duration).record(Mockito.anyDouble(), durationAttrs.capture()); + Assert.assertArrayEquals( + new String[] {MetricAttributes.OPERATION_NAME, "Foo"}, + durationAttrs.getValue()); + + ArgumentCaptor failedAttrs = ArgumentCaptor.forClass(String[].class); + Mockito.verify(failed).add(Mockito.eq(1L), failedAttrs.capture()); + Assert.assertArrayEquals(new String[] { + MetricAttributes.OPERATION_NAME, "Foo", + MetricAttributes.STATUS_CODE, StatusCode.UNAVAILABLE.toString() + }, failedAttrs.getValue()); + + Mockito.verify(span).setStatus(unavailable, null); + Mockito.verify(span).end(); + } + + @Test + public void durationMatchesElapsedNanoTime() throws InterruptedException { + DoubleHistogram duration = Mockito.mock(DoubleHistogram.class); + LongCounter failed = Mockito.mock(LongCounter.class); + Span span = Mockito.mock(Span.class); + + long start = System.nanoTime(); + Thread.sleep(20); + CompletableFuture future = new CompletableFuture<>(); + DiagnosticCall.endOnStatus("Sleep", span, start, duration, failed, future); + future.complete(Status.SUCCESS); + + ArgumentCaptor durationValue = ArgumentCaptor.forClass(Double.class); + Mockito.verify(duration).record(durationValue.capture(), Mockito.any(String[].class)); + double seconds = durationValue.getValue(); + Assert.assertTrue("expected at least 15ms, got " + seconds, seconds >= 0.015); + Assert.assertTrue("expected less than 5s, got " + seconds, seconds < 5.0); + } +} diff --git a/query/src/main/java/tech/ydb/query/QueryClient.java b/query/src/main/java/tech/ydb/query/QueryClient.java index 343ccb01f..8cdeec529 100644 --- a/query/src/main/java/tech/ydb/query/QueryClient.java +++ b/query/src/main/java/tech/ydb/query/QueryClient.java @@ -8,6 +8,7 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Tracer; import tech.ydb.query.impl.QueryClientImpl; @@ -55,6 +56,10 @@ interface Builder { Builder sessionMaxIdleTime(Duration duration); + Builder sessionPoolName(String poolName); + + Builder withMeter(Meter meter); + QueryClient build(); } } diff --git a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java index 4716abc85..6fd909a19 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java @@ -9,6 +9,7 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.Tracer; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; @@ -24,13 +25,17 @@ public class QueryClientImpl implements QueryClient { private final Tracer tracer; public QueryClientImpl(Builder builder) { + String poolName = builder.sessionPoolName != null + ? builder.sessionPoolName + : defaultPoolName(builder.transport); this.pool = new SessionPool( Clock.systemUTC(), - new QueryServiceRpc(builder.transport), + new QueryServiceRpc(builder.transport, builder.meter), builder.transport.getScheduler(), builder.sessionPoolMinSize, builder.sessionPoolMaxSize, - builder.sessionPoolIdleDuration + builder.sessionPoolIdleDuration, + poolName ); this.scheduler = builder.transport.getScheduler(); this.tracer = builder.transport.getTracer(); @@ -69,6 +74,18 @@ public static Builder newClient(GrpcTransport transport) { return new Builder(transport); } + private static String defaultPoolName(GrpcTransport transport) { + String endpoint = transport.getEndpoint(); + String database = transport.getDatabase(); + if (endpoint == null || endpoint.isEmpty()) { + return database; + } + if (database == null || database.isEmpty()) { + return endpoint; + } + return database.startsWith("/") ? endpoint + database : endpoint + "/" + database; + } + public static class Builder implements QueryClient.Builder { private static final Duration MAX_DURATION = Duration.ofMinutes(30); private static final Duration MIN_DURATION = Duration.ofSeconds(1); @@ -77,6 +94,8 @@ public static class Builder implements QueryClient.Builder { private int sessionPoolMinSize = 0; private int sessionPoolMaxSize = 50; private Duration sessionPoolIdleDuration = Duration.ofMinutes(5); + private String sessionPoolName = null; + private Meter meter = Meter.NOOP; Builder(GrpcTransport transport) { Preconditions.checkArgument(transport != null, "transport is null"); @@ -126,6 +145,21 @@ public Builder sessionMaxIdleTime(Duration duration) { return this; } + @Override + public Builder sessionPoolName(String poolName) { + Preconditions.checkArgument(poolName != null && !poolName.isEmpty(), + "sessionPoolName must be a non-empty string"); + this.sessionPoolName = poolName; + return this; + } + + @Override + public Builder withMeter(Meter meter) { + Preconditions.checkArgument(meter != null, "meter is null"); + this.meter = meter; + return this; + } + @Override public QueryClientImpl build() { return new QueryClientImpl(this); diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index 66c2e0a6c..de52c5c92 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -6,6 +6,9 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.DoubleHistogram; +import tech.ydb.core.metrics.LongCounter; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; @@ -51,16 +54,56 @@ class QueryServiceRpc { private final GrpcTransport transport; private final Tracer trace; + private final Meter meter; + private final String database; + private final String endpoint; + private final DoubleHistogram operationDuration; + private final LongCounter operationFailed; QueryServiceRpc(GrpcTransport transport) { + this(transport, Meter.NOOP); + } + + QueryServiceRpc(GrpcTransport transport, Meter meter) { this.transport = transport; this.trace = transport.getTracer(); + this.meter = meter; + this.database = transport.getDatabase(); + this.endpoint = transport.getEndpoint(); + this.operationDuration = meter.createHistogram( + "ydb.client.operation.duration", + "s", + "Duration of a single client operation attempt (ExecuteQuery, Commit, Rollback)."); + this.operationFailed = meter.createCounter( + "ydb.client.operation.failed", + "{operation}", + "Number of failed client operation attempts."); } Span startSpan(String spanName) { return trace.startSpan(spanName, SpanKind.CLIENT); } + Meter getMeter() { + return meter; + } + + String getDatabase() { + return database; + } + + String getEndpoint() { + return endpoint; + } + + DoubleHistogram operationDuration() { + return operationDuration; + } + + LongCounter operationFailed() { + return operationFailed; + } + public CompletableFuture> createSession( YdbQuery.CreateSessionRequest request, GrpcRequestSettings settings) { return transport diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index f4c5a4065..b5f390d59 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -24,9 +24,13 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.metrics.DoubleHistogram; +import tech.ydb.core.metrics.LongCounter; +import tech.ydb.core.metrics.MetricAttributes; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.tracing.Span; +import tech.ydb.core.utils.DiagnosticCall; import tech.ydb.core.utils.URITools; import tech.ydb.core.utils.UpdatableOptional; import tech.ydb.proto.ValueProtos; @@ -75,6 +79,10 @@ abstract class SessionImpl implements QuerySession { private final long nodeID; private final boolean isTraceEnabled; private final AtomicReference transaction; + private final String database; + private final String endpoint; + private final DoubleHistogram operationDuration; + private final LongCounter operationFailed; SessionImpl(QueryServiceRpc rpc, YdbQuery.CreateSessionResponse response) { this.rpc = rpc; @@ -82,6 +90,42 @@ abstract class SessionImpl implements QuerySession { this.nodeID = getNodeBySessionId(response.getSessionId(), response.getNodeId()); this.isTraceEnabled = logger.isTraceEnabled(); this.transaction = new AtomicReference<>(new TransactionImpl(TxMode.SERIALIZABLE_RW, null)); + this.database = rpc.getDatabase(); + this.endpoint = rpc.getEndpoint(); + this.operationDuration = rpc.operationDuration(); + this.operationFailed = rpc.operationFailed(); + } + + private CompletableFuture> endOperationOnResult( + String operationName, + Span span, + long startNanos, + CompletableFuture> future) { + return DiagnosticCall.endOnResult( + operationName, + span, + startNanos, + operationDuration, + operationFailed, + future, + MetricAttributes.DATABASE, database, + MetricAttributes.ENDPOINT, endpoint); + } + + private CompletableFuture endOperationOnStatus( + String operationName, + Span span, + long startNanos, + CompletableFuture future) { + return DiagnosticCall.endOnStatus( + operationName, + span, + startNanos, + operationDuration, + operationFailed, + future, + MetricAttributes.DATABASE, database, + MetricAttributes.ENDPOINT, endpoint); } private static Long getNodeBySessionId(String sessionId, long defaultValue) { @@ -266,19 +310,23 @@ private static YdbFormats.ArrowFormatSettings mapApacheArrowFormat(ApacheArrowFo } CompletableFuture commitById(String txId, CommitTransactionSettings settings, Span span) { + long startNanos = System.nanoTime(); YdbQuery.CommitTransactionRequest request = YdbQuery.CommitTransactionRequest.newBuilder() .setSessionId(sessionId) .setTxId(txId) .build(); - return rpc.commitTransaction(request, makeOptions(settings, span).build()).thenApply(Result::getStatus); + return endOperationOnStatus("Commit", span, startNanos, + rpc.commitTransaction(request, makeOptions(settings, span).build()).thenApply(Result::getStatus)); } CompletableFuture rollbackById(String txId, RollbackTransactionSettings settings, Span span) { + long startNanos = System.nanoTime(); YdbQuery.RollbackTransactionRequest request = YdbQuery.RollbackTransactionRequest.newBuilder() .setSessionId(sessionId) .setTxId(txId) .build(); - return rpc.rollbackTransaction(request, makeOptions(settings, span).build()).thenApply(Result::getStatus); + return endOperationOnStatus("Rollback", span, startNanos, + rpc.rollbackTransaction(request, makeOptions(settings, span).build()).thenApply(Result::getStatus)); } GrpcReadStream createGrpcStream( @@ -331,7 +379,8 @@ GrpcReadStream createGrpcStream( public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) { YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true); Span span = startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + long startNanos = System.nanoTime(); + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos) { @Override void handleTxMeta(String txID) { if (txID != null && !txID.isEmpty()) { @@ -376,12 +425,17 @@ static CompletableFuture> createSession( } abstract class StreamImpl implements QueryStream { + private static final String OPERATION_NAME = "ExecuteQuery"; + private final GrpcReadStream grpcStream; private final Span span; + private final long startNanos; - StreamImpl(GrpcReadStream grpcStream, Span operationSpan) { + StreamImpl(GrpcReadStream grpcStream, Span operationSpan, + long startNanos) { this.grpcStream = grpcStream; this.span = operationSpan; + this.startNanos = startNanos; } abstract void handleTxMeta(String txId); @@ -393,7 +447,7 @@ void handleCompletion(Status status, Throwable th) { public CompletableFuture> execute(PartsHandler handler) { final UpdatableOptional operationStatus = new UpdatableOptional<>(); final UpdatableOptional stats = new UpdatableOptional<>(); - return Span.endOnResult(span, grpcStream.start(msg -> { + return endOperationOnResult(OPERATION_NAME, span, startNanos, grpcStream.start(msg -> { if (isTraceEnabled) { logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg)); @@ -434,11 +488,10 @@ public CompletableFuture> execute(PartsHandler handler) { }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { updateSessionState(streamStatus); Status status = operationStatus.orElse(streamStatus); - if (status.isSuccess()) { - return Result.success(new QueryInfo(stats.get()), streamStatus); - } else { + if (!status.isSuccess()) { return Result.fail(status); } + return Result.success(new QueryInfo(stats.get()), streamStatus); }) ); } @@ -478,7 +531,8 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E : TxControl.txModeCtrl(txMode, commitAtEnd); Span span = startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + long startNanos = System.nanoTime(); + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos) { @Override void handleTxMeta(String txID) { String newId = txID == null || txID.isEmpty() ? null : txID; @@ -518,12 +572,13 @@ public void cancel() { @Override public CompletableFuture> commit(CommitTransactionSettings settings) { Span span = startSpan("ydb.Commit"); + long startNanos = System.nanoTime(); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Result res = Result.success(new QueryInfo(null), Status.of(StatusCode.SUCCESS, issue)); - return Span.endOnResult(span, CompletableFuture.completedFuture(res)); + return endOperationOnResult("Commit", span, startNanos, CompletableFuture.completedFuture(res)); } YdbQuery.CommitTransactionRequest request = YdbQuery.CommitTransactionRequest.newBuilder() @@ -531,17 +586,19 @@ public CompletableFuture> commit(CommitTransactionSettings set .setTxId(transactionId) .build(); - return Span.endOnResult(span, rpc.commitTransaction(request, makeOptions(settings, span).build())) + return endOperationOnResult("Commit", span, startNanos, + rpc.commitTransaction(request, makeOptions(settings, span).build())) .thenApply(res -> { Status status = res.getStatus(); currentStatusFuture.complete(status); updateSessionState(status); if (!txId.compareAndSet(transactionId, null)) { - logger.warn("{} lost commit response for transaction {}", SessionImpl.this, transactionId); + logger.warn("{} lost commit response for transaction {}", + SessionImpl.this, transactionId); } // TODO: CommitTransactionResponse must contain exec_stats return res.map(resp -> new QueryInfo(null)); - }).whenComplete(((status, th) -> { + }).whenComplete(((res, th) -> { if (th != null) { currentStatusFuture.completeExceptionally( new RuntimeException("Transaction commit failed with exception", th)); @@ -552,20 +609,22 @@ public CompletableFuture> commit(CommitTransactionSettings set @Override public CompletableFuture rollback(RollbackTransactionSettings settings) { Span span = startSpan("ydb.Rollback"); + long startNanos = System.nanoTime(); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Status status = Status.of(StatusCode.SUCCESS, issue); - return Span.endOnStatus(span, CompletableFuture.completedFuture(status)); + return endOperationOnStatus("Rollback", span, startNanos, CompletableFuture.completedFuture(status)); } YdbQuery.RollbackTransactionRequest request = YdbQuery.RollbackTransactionRequest.newBuilder() .setSessionId(sessionId) .setTxId(transactionId) .build(); - return Span.endOnResult(span, rpc.rollbackTransaction(request, makeOptions(settings, span).build())) + return endOperationOnResult("Rollback", span, startNanos, + rpc.rollbackTransaction(request, makeOptions(settings, span).build())) .thenApply(result -> { updateSessionState(result.getStatus()); if (!txId.compareAndSet(transactionId, null)) { diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index ccbbf4889..da5396525 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -22,7 +22,12 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.metrics.DoubleHistogram; +import tech.ydb.core.metrics.LongCounter; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.metrics.MetricAttributes; import tech.ydb.core.tracing.Span; +import tech.ydb.core.utils.DiagnosticCall; import tech.ydb.core.utils.FutureTools; import tech.ydb.proto.query.YdbQuery; import tech.ydb.query.QuerySession; @@ -40,6 +45,9 @@ class SessionPool implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(SessionPool.class); + private static final String POOL_NAME_ATTR = "ydb.query.session.pool.name"; + private static final String SESSION_STATE_ATTR = "ydb.query.session.state"; + private static final CreateSessionSettings CREATE_SETTINGS = CreateSessionSettings.newBuilder() .withRequestTimeout(Duration.ofSeconds(300)) .withOperationTimeout(Duration.ofSeconds(299)) @@ -54,27 +62,75 @@ class SessionPool implements AutoCloseable { .build(); private final int minSize; + private final String poolName; private final Clock clock; private final ScheduledExecutorService scheduler; private final WaitingQueue queue; private final ScheduledFuture cleanerFuture; private final StatsImpl stats = new StatsImpl(); + private final LongCounter pendingRequests; + private final LongCounter sessionTimeouts; + private final DoubleHistogram sessionCreateTime; + private final DoubleHistogram operationDuration; + private final LongCounter operationFailed; + SessionPool(Clock clock, QueryServiceRpc rpc, ScheduledExecutorService scheduler, int minSize, int maxSize, - Duration idleDuration) { + Duration idleDuration, String poolName) { this.minSize = minSize; + this.poolName = poolName; this.clock = clock; this.scheduler = scheduler; this.queue = new WaitingQueue<>(new Handler(rpc), maxSize); + Meter meter = rpc.getMeter(); + this.pendingRequests = meter.createCounter( + "ydb.query.session.pending_requests", + "{request}", + "Number of session-acquire requests that had to wait for a free session."); + this.sessionTimeouts = meter.createCounter( + "ydb.query.session.timeouts", + "{timeout}", + "Number of session-acquire timeouts."); + this.sessionCreateTime = meter.createHistogram( + "ydb.query.session.create_time", + "s", + "Time spent creating a new session."); + this.operationDuration = rpc.operationDuration(); + this.operationFailed = rpc.operationFailed(); + + meter.createLongGauge( + "ydb.query.session.count", + "{session}", + "Current number of sessions in the pool by state.", + m -> { + m.record(queue.getIdleCount(), + POOL_NAME_ATTR, poolName, + SESSION_STATE_ATTR, "idle"); + m.record(queue.getUsedCount(), + POOL_NAME_ATTR, poolName, + SESSION_STATE_ATTR, "used"); + }); + meter.createLongGauge( + "ydb.query.session.min", + "{session}", + "Configured minimum size of the session pool.", + m -> m.record(minSize, POOL_NAME_ATTR, poolName)); + meter.createLongGauge( + "ydb.query.session.max", + "{session}", + "Configured maximum size of the session pool.", + m -> m.record(queue.getTotalLimit(), POOL_NAME_ATTR, poolName)); + CleanerTask cleaner = new CleanerTask(idleDuration); this.cleanerFuture = scheduler.scheduleAtFixedRate( cleaner, cleaner.periodMillis / 2, cleaner.periodMillis, TimeUnit.MILLISECONDS); - logger.info("init QuerySession pool, min size = {}, max size = {}, keep alive period = {}", + logger.info("init QuerySession pool '{}', min size = {}, max size = {}, keep alive period = {}", + poolName, minSize, maxSize, cleaner.periodMillis); @@ -102,8 +158,9 @@ public CompletableFuture> acquire(Duration timeout) { // If next session is not ready - add timeout canceler if (!pollNext(future)) { + pendingRequests.add(1L, POOL_NAME_ATTR, poolName); future.whenComplete(new Canceller(scheduler.schedule( - new Timeout(future), + new Timeout(future, sessionTimeouts, poolName), timeout.toMillis(), TimeUnit.MILLISECONDS) )); @@ -277,8 +334,21 @@ public CompletableFuture create() { Context previous = ctx.attach(); try { Span createSpan = rpc.startSpan("ydb.CreateSession"); + long startNanos = System.nanoTime(); stats.requested.increment(); - return Span.endOnResult(createSpan, SessionImpl.createSession(rpc, CREATE_SETTINGS, true, createSpan)) + return DiagnosticCall.endOnResult( + "CreateSession", + createSpan, + startNanos, + operationDuration, + operationFailed, + SessionImpl.createSession(rpc, CREATE_SETTINGS, true, createSpan), + MetricAttributes.DATABASE, rpc.getDatabase(), + MetricAttributes.ENDPOINT, rpc.getEndpoint()) + .whenComplete((r, th) -> { + double seconds = (System.nanoTime() - startNanos) / 1_000_000_000.0; + sessionCreateTime.record(seconds, POOL_NAME_ATTR, poolName); + }) .thenCompose(r -> { if (!r.isSuccess()) { stats.failed.increment(); @@ -444,15 +514,19 @@ static final class Timeout implements Runnable { ); private final CompletableFuture> f; + private final LongCounter sessionTimeouts; + private final String poolName; - Timeout(CompletableFuture> f) { + Timeout(CompletableFuture> f, LongCounter sessionTimeouts, String poolName) { this.f = f; + this.sessionTimeouts = sessionTimeouts; + this.poolName = poolName; } @Override public void run() { - if (f != null && !f.isDone()) { - f.complete(Result.fail(EXPIRE)); + if (f != null && !f.isDone() && f.complete(Result.fail(EXPIRE))) { + sessionTimeouts.add(1L, POOL_NAME_ATTR, poolName); } } } diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 8fde13bc8..47a2c1f58 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -132,9 +132,10 @@ public CompletableFuture> executeDataQueryInternal( final List issues = new ArrayList<>(); final List results = new ArrayList<>(); Span span = querySession.startSpan("ydb.ExecuteQuery"); + long startNanos = System.nanoTime(); QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, span), - span) { + span, startNanos) { @Override void handleTxMeta(String txID) { txRef.set(txID); @@ -211,7 +212,7 @@ protected CompletableFuture commitTransactionInternal(String txId, Commi .withTraceId(settings.getTraceId()) .withRequestTimeout(settings.getTimeoutDuration()) .build(); - return Span.endOnStatus(span, querySession.commitById(txId, querySettings, span)); + return querySession.commitById(txId, querySettings, span); } @Override @@ -221,7 +222,7 @@ protected CompletableFuture rollbackTransactionInternal(String txId, Rol .withTraceId(settings.getTraceId()) .withRequestTimeout(settings.getTimeoutDuration()) .build(); - return Span.endOnStatus(span, querySession.rollbackById(txId, querySettings, span)); + return querySession.rollbackById(txId, querySettings, span); } private final class TracedTableTransaction implements TableTransaction { @@ -325,6 +326,12 @@ public Builder sessionMaxIdleTime(Duration duration) { return this; } + @Override + public Builder withMeter(tech.ydb.core.metrics.Meter meter) { + query.withMeter(meter); + return this; + } + @Override public TableClientImpl build() { return new TableClientImpl(this); diff --git a/query/src/test/java/tech/ydb/query/opentelemetry/OpenTelemetryQueryMetricsIntegrationTest.java b/query/src/test/java/tech/ydb/query/opentelemetry/OpenTelemetryQueryMetricsIntegrationTest.java new file mode 100644 index 000000000..76d7f79fb --- /dev/null +++ b/query/src/test/java/tech/ydb/query/opentelemetry/OpenTelemetryQueryMetricsIntegrationTest.java @@ -0,0 +1,242 @@ +package tech.ydb.query.opentelemetry; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.auth.TokenAuthProvider; +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.OpenTelemetryMeter; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QuerySession; +import tech.ydb.query.QueryTransaction; +import tech.ydb.test.junit4.YdbHelperRule; + +public class OpenTelemetryQueryMetricsIntegrationTest { + @ClassRule + public static final YdbHelperRule YDB = new YdbHelperRule(); + + private static final AttributeKey DATABASE = AttributeKey.stringKey("database"); + private static final AttributeKey ENDPOINT = AttributeKey.stringKey("endpoint"); + private static final AttributeKey OPERATION_NAME = AttributeKey.stringKey("operation.name"); + private static final AttributeKey STATUS_CODE = AttributeKey.stringKey("status_code"); + private static final AttributeKey POOL_NAME = AttributeKey.stringKey("ydb.query.session.pool.name"); + private static final AttributeKey SESSION_STATE = AttributeKey.stringKey("ydb.query.session.state"); + + private static InMemoryMetricReader metricReader; + private static SdkMeterProvider meterProvider; + private static OpenTelemetryMeter ydbMeter; + private static GrpcTransport transport; + + private QueryClient queryClient; + + @BeforeClass + public static void initTransport() { + metricReader = InMemoryMetricReader.create(); + meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + + ydbMeter = OpenTelemetryMeter.fromOpenTelemetry(openTelemetry); + + transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) + .withAuthProvider(new TokenAuthProvider(YDB.authToken())) + .build(); + } + + @AfterClass + public static void closeTransport() throws IOException { + transport.close(); + meterProvider.close(); + metricReader.close(); + } + + @Before + public void initClient() { + queryClient = QueryClient.newClient(transport).withMeter(ydbMeter).build(); + } + + @After + public void closeClient() { + queryClient.close(); + } + + @Test + public void executeQueryRecordsOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData metric = findMetric("ydb.client.operation.duration"); + Assert.assertNotNull("ydb.client.operation.duration metric not found", metric); + Assert.assertEquals("s", metric.getUnit()); + + HistogramPointData point = findHistogramPoint(metric, "ExecuteQuery"); + Assert.assertNotNull("No histogram point for ExecuteQuery", point); + Assert.assertTrue("Duration must be > 0", point.getSum() > 0); + Assert.assertEquals(YDB.database(), point.getAttributes().get(DATABASE)); + Assert.assertEquals(YDB.endpoint(), point.getAttributes().get(ENDPOINT)); + } + + @Test + public void commitAndRollbackRecordOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + QueryTransaction txCommit = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txCommit.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txCommit.commit().join().getStatus().expectSuccess(); + + QueryTransaction txRollback = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txRollback.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txRollback.rollback().join().expectSuccess(); + } + + MetricData metric = findMetric("ydb.client.operation.duration"); + Assert.assertNotNull(metric); + + Assert.assertNotNull("No histogram point for Commit", + findHistogramPoint(metric, "Commit")); + Assert.assertNotNull("No histogram point for Rollback", + findHistogramPoint(metric, "Rollback")); + } + + @Test + public void failedOperationRecordsFailedCounter() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT * FROM __nonexistent_table__", TxMode.NONE) + .execute().join(); + } + + MetricData metric = findMetric("ydb.client.operation.failed"); + Assert.assertNotNull("ydb.client.operation.failed metric not found", metric); + Assert.assertEquals("{operation}", metric.getUnit()); + + Collection points = metric.getLongSumData().getPoints(); + Assert.assertFalse("Failed counter must have at least one point", points.isEmpty()); + long total = points.stream().mapToLong(LongPointData::getValue).sum(); + Assert.assertTrue("Failed counter must be > 0", total > 0); + Assert.assertTrue("Failed counter must carry status_code attribute", + points.stream().anyMatch(p -> p.getAttributes().get(STATUS_CODE) != null)); + Assert.assertTrue("Failed counter must carry database attribute", + points.stream().anyMatch(p -> YDB.database().equals(p.getAttributes().get(DATABASE)))); + Assert.assertTrue("Failed counter must carry endpoint attribute", + points.stream().anyMatch(p -> YDB.endpoint().equals(p.getAttributes().get(ENDPOINT)))); + } + + @Test + public void sessionPoolMetricsAreReported() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData count = findMetric("ydb.query.session.count"); + Assert.assertNotNull("ydb.query.session.count metric not found", count); + Assert.assertEquals("{session}", count.getUnit()); + Assert.assertTrue("session.count must have idle/used buckets", + count.getLongGaugeData().getPoints().stream() + .map(p -> p.getAttributes().get(SESSION_STATE)) + .anyMatch("idle"::equals)); + + MetricData min = findMetric("ydb.query.session.min"); + Assert.assertNotNull("ydb.query.session.min metric not found", min); + Assert.assertEquals("{session}", min.getUnit()); + Assert.assertTrue("min must have a pool.name attribute", + min.getLongGaugeData().getPoints().stream() + .anyMatch(p -> p.getAttributes().get(POOL_NAME) != null)); + + MetricData max = findMetric("ydb.query.session.max"); + Assert.assertNotNull("ydb.query.session.max metric not found", max); + Assert.assertEquals("{session}", max.getUnit()); + + MetricData createTime = findMetric("ydb.query.session.create_time"); + Assert.assertNotNull("ydb.query.session.create_time metric not found", createTime); + Assert.assertEquals("s", createTime.getUnit()); + Assert.assertFalse("session.create_time must have at least one point", + createTime.getHistogramData().getPoints().isEmpty()); + } + + @Test + public void sessionPendingAndTimeoutsMetricsAreCounters() { + try (QueryClient tinyClient = QueryClient.newClient(transport) + .withMeter(ydbMeter) + .sessionPoolMaxSize(1) + .sessionPoolName("tiny") + .build()) { + try (QuerySession s1 = tinyClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + Result result = tinyClient.createSession(Duration.ofMillis(500)).join(); + Assert.assertFalse( + "waiter must time out (sessionPoolMaxSize=1 with one session held), but got " + result, + result.isSuccess()); + Assert.assertEquals( + "waiter must complete with CLIENT_DEADLINE_EXPIRED", + StatusCode.CLIENT_DEADLINE_EXPIRED, result.getStatus().getCode()); + } + } + + // Metric reader observes counter increments synchronously, but give the runtime + // a brief moment in case other threads still hold references in flight. + Collection snapshot = metricReader.collectAllMetrics(); + + MetricData pending = findMetric(snapshot, "ydb.query.session.pending_requests"); + Assert.assertNotNull("ydb.query.session.pending_requests metric not found", pending); + Assert.assertEquals("{request}", pending.getUnit()); + long pendingTotal = pending.getLongSumData().getPoints().stream() + .mapToLong(LongPointData::getValue).sum(); + Assert.assertTrue("pending_requests must be > 0", pendingTotal > 0); + + MetricData timeouts = findMetric(snapshot, "ydb.query.session.timeouts"); + Assert.assertNotNull("ydb.query.session.timeouts metric not found", timeouts); + Assert.assertEquals("{timeout}", timeouts.getUnit()); + long timeoutTotal = timeouts.getLongSumData().getPoints().stream() + .mapToLong(LongPointData::getValue).sum(); + Assert.assertTrue("timeouts must be > 0", timeoutTotal > 0); + } + + private MetricData findMetric(String name) { + return findMetric(metricReader.collectAllMetrics(), name); + } + + private MetricData findMetric(Collection metrics, String name) { + for (MetricData m : metrics) { + if (name.equals(m.getName())) { + return m; + } + } + return null; + } + + private HistogramPointData findHistogramPoint(MetricData metric, String operationName) { + for (HistogramPointData point : metric.getHistogramData().getPoints()) { + String op = point.getAttributes().get(OPERATION_NAME); + if (operationName.equals(op)) { + return point; + } + } + return null; + } + +} diff --git a/table/src/main/java/tech/ydb/table/TableClient.java b/table/src/main/java/tech/ydb/table/TableClient.java index 5dcf73966..6b29d3735 100644 --- a/table/src/main/java/tech/ydb/table/TableClient.java +++ b/table/src/main/java/tech/ydb/table/TableClient.java @@ -47,6 +47,10 @@ interface Builder { Builder sessionMaxIdleTime(Duration duration); + default Builder withMeter(tech.ydb.core.metrics.Meter meter) { + return this; + } + TableClient build(); } }