diff --git a/pom.xml b/pom.xml index be8e522..cc60063 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,7 @@ jdbc project-course slo + slo-workload diff --git a/slo-workload/README.md b/slo-workload/README.md new file mode 100644 index 0000000..7b874bf --- /dev/null +++ b/slo-workload/README.md @@ -0,0 +1,144 @@ +# YDB SLO Workload Tests + +This module hosts SLO (Service Level Objective) workloads that test the +reliability of YDB Java clients under load and chaos using the +[YDB SLO action](https://github.com/ydb-platform/ydb-slo-action). + +Each submodule is a self-contained, runnable workload that follows the same +contract as the SDK SLO workload in [`../slo`](../slo): it reads its +configuration from environment variables, runs setup/run/teardown phases, and +pushes OpenTelemetry (OTLP) metrics that the action scrapes and compares +between the current PR run and a baseline run. + +| Module | Component under test | Description | +| --- | --- | --- | +| [`jdbc`](jdbc) | `ydb-jdbc-driver` | Plain JDBC KV workload (no framework) | + +## How a workload behaves + +Every workload runs three phases: + +1. **Setup** — creates a partitioned KV table and prefills it with rows. +2. **Run** — drives concurrent read and write loops at fixed RPS for the + configured duration. Each operation is timed and retried; the outcome is + recorded as OTLP metrics. +3. **Teardown** — drops the workload table even if the run failed, so the + cluster is left clean. + +While the workload runs, the SLO action injects chaos (node restarts, network +black holes, container pauses). The metrics show how well the client copes. + +## Metrics + +Every metric carries a `ref` label taken from the `WORKLOAD_REF` environment +variable, which lets the report action separate the **current** run from the +**baseline** run. Names are shown below in Prometheus form (dots become +underscores during the OTLP → Prometheus conversion). + +| Metric | Type | Labels | +| --- | --- | --- | +| `sdk_operations_total` | counter | `operation_type`, `operation_status` | +| `sdk_errors_total` | counter | `operation_type`, `error_kind` | +| `sdk_retry_attempts_total` | counter | `operation_type`, `operation_status` | +| `sdk_pending_operations` | up/down counter | `operation_type` | +| `sdk_operation_latency_p50_seconds` | gauge | `operation_type`, `operation_status` (always `success`) | +| `sdk_operation_latency_p95_seconds` | gauge | `operation_type`, `operation_status` (always `success`) | +| `sdk_operation_latency_p99_seconds` | gauge | `operation_type`, `operation_status` (always `success`) | + +Latency percentiles are computed from per-operation HDR histograms and reflect +only successful operations — failure latency is dominated by retry budgets and +timeouts and would mask real regressions during chaos. Counters cover both +branches, so availability is computed correctly. + +## Inputs + +Connection details and run parameters come from environment variables: + +| Variable | Description | +| --- | --- | +| `YDB_JDBC_URL` | Full JDBC URL (`jdbc:ydb:...`), used verbatim if set | +| `YDB_CONNECTION_STRING` | YDB connection string; prefixed with `jdbc:ydb:` | +| `YDB_ENDPOINT` + `YDB_DATABASE` | Used to compose the connection string if the above are unset | +| `YDB_TOKEN` | Optional auth token | +| `WORKLOAD_REF` | Value of the `ref` label on every metric | +| `WORKLOAD_NAME` | Workload name (also part of the table name) | +| `WORKLOAD_DURATION` | Run duration in seconds | +| `OTEL_EXPORTER_OTLP_ENDPOINT` | OTLP HTTP endpoint to push metrics to | + +KV tunables are passed on the command line and parsed by JCommander: + +``` +--read-rps Target read RPS (default 1000) +--write-rps Target write RPS (default 100) +--read-timeout-ms Per-attempt read timeout in ms (default 10000) +--write-timeout-ms Per-attempt write timeout in ms (default 10000) +--prefill-count Rows to prefill before the run phase (default 1000) +--partition-size Auto-partitioning partition size in MB (default 1) +--min-partition-count Minimum number of table partitions (default 6) +--max-partition-count Maximum number of table partitions (default 1000) +--duration Override WORKLOAD_DURATION when > 0 +``` + +Unknown flags are ignored, so a workload accepts command strings designed for +other SDKs without erroring. + +## How CI uses this module + +This repository only hosts the workload sources. The CI that actually runs them +lives in the repository of the component under test — for `jdbc` that is +[`ydb-jdbc-driver`](https://github.com/ydb-platform/ydb-jdbc-driver) — mirroring +how the SDK SLO workload in [`../slo`](../slo) is driven from the +`ydb-java-sdk` repository rather than from here. + +The driver's workflow, via [`ydb-platform/ydb-slo-action`](https://github.com/ydb-platform/ydb-slo-action): + +1. checks out the driver under test (current and baseline) and this repository + for the workload sources; +2. `ydb-platform/ydb-slo-action/init` deploys a YDB cluster (storage + database + nodes), Prometheus with an OTLP receiver, and a chaos monkey, exposing the + database node IPs and the OTLP endpoint as step outputs; +3. builds the workload jar and runs it, pointing `YDB_CONNECTION_STRING` at a + database node and `OTEL_EXPORTER_OTLP_ENDPOINT` at the Prometheus OTLP + receiver; +4. `ydb-platform/ydb-slo-action/report` compares the current run against the + baseline and posts a summary to the PR. + +## Building locally + +From the `ydb-java-examples` repository root: + +```bash +mvn -pl slo-workload/jdbc -am -DskipTests package +``` + +The resulting jar is at +`slo-workload/jdbc/target/ydb-slo-jdbc-workload.jar`. To run it against a +local YDB: + +```bash +export YDB_CONNECTION_STRING="grpc://localhost:2136/local" +export WORKLOAD_REF=local +export WORKLOAD_NAME=java-slo-jdbc + +java -jar slo-workload/jdbc/target/ydb-slo-jdbc-workload.jar \ + --duration 60 --read-rps 100 --write-rps 10 --prefill-count 100 +``` + +If `OTEL_EXPORTER_OTLP_ENDPOINT` is not set, metrics are still recorded +in-process but never exported — handy for verifying that the workload runs +cleanly before pushing to CI. + +## Adding a new workload + +1. Create a module next to `jdbc` (e.g. `spring-data-jpa`). +2. Reuse `Config`, `Metrics`, and the `kv` package; replace only the data + access layer with the framework under test. +3. Register the module in `slo-workload/pom.xml` and wire it into the SLO + workflow of the component under test (in its own repository). + +## Links + +- [SDK SLO workload (reference)](../slo) +- [YDB SLO Action](https://github.com/ydb-platform/ydb-slo-action) +- [YDB JDBC Driver](https://github.com/ydb-platform/ydb-jdbc-driver) +- [YDB Documentation](https://ydb.tech/docs/) diff --git a/slo-workload/jdbc/Dockerfile b/slo-workload/jdbc/Dockerfile new file mode 100644 index 0000000..943be48 --- /dev/null +++ b/slo-workload/jdbc/Dockerfile @@ -0,0 +1,51 @@ +# Multi-stage Dockerfile for the YDB JDBC SLO workload. +# +# The image can be consumed by the YDB SLO action +# (`ydb-platform/ydb-slo-action`): the workload reads its connection details +# and run parameters from environment variables and pushes OTLP metrics to the +# endpoint the action provides. +# +# Build context: the `ydb-java-examples` repository root. +# +# Optional build args: +# MAVEN_IMAGE Builder image. Defaults to `maven:3.9-eclipse-temurin-17`. +# RUNTIME_IMAGE Runtime image. Defaults to `eclipse-temurin:17-jre`. +# YDB_JDBC_VERSION Override the ydb-jdbc-driver version under test. + +ARG MAVEN_IMAGE=maven:3.9-eclipse-temurin-17 +ARG RUNTIME_IMAGE=eclipse-temurin:17-jre + +# ---------- builder --------------------------------------------------------- +FROM ${MAVEN_IMAGE} AS workload-build + +WORKDIR /src +COPY . /src + +ARG YDB_JDBC_VERSION="" + +# Pin the JDBC driver version under test when provided, then build only the +# workload module (and the parent context it needs). +RUN if [ -n "${YDB_JDBC_VERSION}" ]; then \ + echo "Pinning ydb-jdbc-driver to ${YDB_JDBC_VERSION}" && \ + mvn -B -q versions:set-property \ + -Dproperty=ydb.jdbc.version \ + -DnewVersion="${YDB_JDBC_VERSION}" \ + -DgenerateBackupPoms=false \ + -pl slo-workload ; \ + fi && \ + mvn -B -q -pl slo-workload/jdbc -am \ + -DskipTests \ + -Dmaven.javadoc.skip=true \ + package + +# ---------- runtime --------------------------------------------------------- +FROM ${RUNTIME_IMAGE} + +WORKDIR /app + +# The jar's manifest Class-Path points at libs/, so a single `java -jar` call +# is enough. +COPY --from=workload-build /src/slo-workload/jdbc/target/ydb-slo-jdbc-workload.jar /app/ydb-slo-jdbc-workload.jar +COPY --from=workload-build /src/slo-workload/jdbc/target/libs /app/libs + +ENTRYPOINT ["java", "-jar", "/app/ydb-slo-jdbc-workload.jar"] diff --git a/slo-workload/jdbc/README.md b/slo-workload/jdbc/README.md new file mode 100644 index 0000000..19ee337 --- /dev/null +++ b/slo-workload/jdbc/README.md @@ -0,0 +1,91 @@ +# JDBC SLO workload + +A plain-JDBC SLO workload that exercises the +[YDB JDBC driver](https://github.com/ydb-platform/ydb-jdbc-driver) under load +and chaos. It mirrors the structure and metrics contract of the SDK SLO +workload in [`../../slo`](../../slo), so reports are directly comparable. + +> See the [parent README](../README.md) for the shared metrics, environment +> variables, CLI flags and CI flow. + +## What it does + +The workload runs as a standalone jar (`tech.ydb.slo.Main`) and goes through +three phases against a partitioned KV table: + +1. **Setup** — `CREATE TABLE IF NOT EXISTS` plus a prefill of `--prefill-count` + rows. +2. **Run** — dedicated read and write thread pools, each paced by a Guava + `RateLimiter` to the target RPS, running until the configured duration. +3. **Teardown** — `DROP TABLE`. + +Every worker thread owns its own JDBC `Connection` (the driver's connections +are not thread-safe) and reuses prepared statements. On a connection-level +error the connection is transparently reopened on the next attempt. + +## Schema + +``` +hash Uint64 -- primary key, derived from id on the client +id Uint64 -- primary key +payload_str Utf8 +payload_double Double +payload_timestamp Timestamp +payload_hash Uint64 +``` + +The primary-key `hash` column is derived from `id` with a SplitMix64-style mix +(`KvWorkload#numericHash`) so reads and writes target the same key without +relying on server-side YQL builtins inside parameterized statements. + +## Retries + +Operations are retried with exponential backoff (up to 10 attempts). An error +is considered retryable when the driver throws a `SQLRecoverableException` or +`SQLTransientException` (which covers the driver's +`YdbRetryableException`, `YdbConditionallyRetryableException`, +`YdbUnavailbaleException` and `YdbTimeoutException`). The number of retries is +recorded in `sdk_retry_attempts_total`, and the failure reason is reported via +the `error_kind` label on `sdk_errors_total` (using the YDB status code when +available). + +## Files + +``` +jdbc/ +├── Dockerfile +├── pom.xml +├── README.md +└── src/main/ + ├── java/tech/ydb/slo/ + │ ├── Config.java Reads env vars, resolves the JDBC URL + │ ├── Main.java Entry point + │ ├── Metrics.java OTLP metrics + HDR histograms + │ └── kv/ + │ ├── KvWorkload.java Setup/run/teardown loop over JDBC + │ ├── KvWorkloadParams.java JCommander-bound CLI flags + │ ├── Row.java Row data class + │ └── RowGenerator.java Random payload generator + └── resources/ + └── log4j2.xml Console logging config +``` + +## Building and running locally + +```bash +# From the repository root +mvn -pl slo-workload/jdbc -am -DskipTests package + +export YDB_CONNECTION_STRING="grpc://localhost:2136/local" +export WORKLOAD_REF=local +export WORKLOAD_NAME=java-slo-jdbc + +java -jar slo-workload/jdbc/target/ydb-slo-jdbc-workload.jar \ + --duration 60 --read-rps 100 --write-rps 10 --prefill-count 100 +``` + +Build the container image (context is the repository root): + +```bash +docker build -f slo-workload/jdbc/Dockerfile -t ydb-slo-jdbc-workload . +``` diff --git a/slo-workload/jdbc/pom.xml b/slo-workload/jdbc/pom.xml new file mode 100644 index 0000000..c4b174e --- /dev/null +++ b/slo-workload/jdbc/pom.xml @@ -0,0 +1,102 @@ + + 4.0.0 + + + tech.ydb.examples + slo-workload + 1.1.0-SNAPSHOT + ../pom.xml + + + jdbc + jar + JDBC SLO workload + SLO workload exercising the YDB JDBC driver, compatible with ydb-slo-action + + + + + tech.ydb.jdbc + ydb-jdbc-driver + + + + + com.beust + jcommander + + + + + com.google.guava + guava + + + + + org.hdrhistogram + HdrHistogram + + + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-sdk-metrics + + + io.opentelemetry + opentelemetry-exporter-otlp + + + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + + + ydb-slo-jdbc-workload + + + org.apache.maven.plugins + maven-compiler-plugin + + 17 + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + libs/ + tech.ydb.slo.Main + + + + + + + diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/Config.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/Config.java new file mode 100644 index 0000000..2b1ca3a --- /dev/null +++ b/slo-workload/jdbc/src/main/java/tech/ydb/slo/Config.java @@ -0,0 +1,148 @@ +package tech.ydb.slo; + +/** + * Configuration for the JDBC SLO workload, populated from environment + * variables provided by the YDB SLO action runtime. + * + *

The action sets these variables on the workload container: + *

    + *
  • {@code YDB_CONNECTION_STRING} or {@code YDB_ENDPOINT} + {@code YDB_DATABASE} — YDB connection
  • + *
  • {@code WORKLOAD_REF} — value used as the {@code ref} label on all metrics
  • + *
  • {@code WORKLOAD_NAME} — workload name (also used as part of the table path)
  • + *
  • {@code WORKLOAD_DURATION} — workload run duration in seconds (0 = unlimited)
  • + *
  • {@code OTEL_EXPORTER_OTLP_ENDPOINT} — OTLP endpoint for pushing metrics
  • + *
+ * + *

Because the component under test here is the JDBC driver, the + * YDB connection is expressed as a JDBC URL ({@code jdbc:ydb:...}). The URL is + * resolved in this order: {@code YDB_JDBC_URL} (used verbatim), then + * {@code YDB_CONNECTION_STRING} (prefixed with {@code jdbc:ydb:}), then + * {@code YDB_ENDPOINT} + {@code YDB_DATABASE}. + */ +public final class Config { + private final String jdbcUrl; + private final String token; + private final String ref; + private final String workloadName; + private final int durationSeconds; + private final String otlpEndpoint; + + private Config( + String jdbcUrl, + String token, + String ref, + String workloadName, + int durationSeconds, + String otlpEndpoint + ) { + this.jdbcUrl = jdbcUrl; + this.token = token; + this.ref = ref; + this.workloadName = workloadName; + this.durationSeconds = durationSeconds; + this.otlpEndpoint = otlpEndpoint; + } + + public String jdbcUrl() { + return jdbcUrl; + } + + public String token() { + return token; + } + + public String ref() { + return ref; + } + + public String workloadName() { + return workloadName; + } + + public int durationSeconds() { + return durationSeconds; + } + + public String otlpEndpoint() { + return otlpEndpoint; + } + + /** + * Loads configuration from environment variables. + * + * @return configuration instance + * @throws IllegalStateException if required variables are missing or invalid + */ + public static Config fromEnv() { + String jdbcUrl = resolveJdbcUrl(); + if (jdbcUrl == null || jdbcUrl.isEmpty()) { + throw new IllegalStateException( + "YDB connection is not configured: set YDB_JDBC_URL, " + + "YDB_CONNECTION_STRING or YDB_ENDPOINT + YDB_DATABASE" + ); + } + + String token = envOrDefault("YDB_TOKEN", ""); + String ref = envOrDefault("WORKLOAD_REF", "unknown"); + String workloadName = envOrDefault("WORKLOAD_NAME", "java-slo-jdbc-workload"); + int durationSeconds = parseInt(envOrDefault("WORKLOAD_DURATION", "600"), 600); + String otlpEndpoint = envOrDefault("OTEL_EXPORTER_OTLP_ENDPOINT", ""); + + return new Config(jdbcUrl, token, ref, workloadName, durationSeconds, otlpEndpoint); + } + + private static String resolveJdbcUrl() { + String explicit = System.getenv("YDB_JDBC_URL"); + if (explicit != null && !explicit.isEmpty()) { + return explicit; + } + + String connectionString = System.getenv("YDB_CONNECTION_STRING"); + if (connectionString != null && !connectionString.isEmpty()) { + return toJdbcUrl(connectionString); + } + + String endpoint = System.getenv("YDB_ENDPOINT"); + String database = System.getenv("YDB_DATABASE"); + if (endpoint == null || endpoint.isEmpty() || database == null || database.isEmpty()) { + return null; + } + return toJdbcUrl(composeConnectionString(endpoint, database)); + } + + /** + * Turns a YDB connection string ({@code grpc://host:port/database}) into a + * JDBC URL understood by the YDB JDBC driver. If the value already starts + * with {@code jdbc:}, it is returned unchanged. + */ + private static String toJdbcUrl(String connectionString) { + if (connectionString.startsWith("jdbc:")) { + return connectionString; + } + return "jdbc:ydb:" + connectionString; + } + + private static String composeConnectionString(String endpoint, String database) { + // Compose a connection string in the form grpc://host:port/database. + if (endpoint.endsWith("/") && database.startsWith("/")) { + return endpoint + database.substring(1); + } + if (!endpoint.endsWith("/") && !database.startsWith("/")) { + return endpoint + "/" + database; + } + return endpoint + database; + } + + private static String envOrDefault(String name, String defaultValue) { + String value = System.getenv(name); + return (value == null || value.isEmpty()) ? defaultValue : value; + } + + private static int parseInt(String value, int defaultValue) { + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return defaultValue; + } + } +} diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/Main.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/Main.java new file mode 100644 index 0000000..997f8b9 --- /dev/null +++ b/slo-workload/jdbc/src/main/java/tech/ydb/slo/Main.java @@ -0,0 +1,160 @@ +package tech.ydb.slo; + +import java.util.Properties; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.ParameterException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.slo.kv.KvWorkload; +import tech.ydb.slo.kv.KvWorkloadParams; + +/** + * Entry point of the JDBC SLO workload. + * + *

Reads connection details and run parameters from environment variables + * (see {@link Config}), parses workload-specific flags from the command line + * (see {@link KvWorkloadParams}), and runs the KV workload phases — setup, + * run, teardown — pushing metrics to the OTLP endpoint configured by the YDB + * SLO action runtime. + * + *

Exit codes: + *

    + *
  • {@code 0} — workload completed successfully
  • + *
  • {@code 1} — workload failed (an unhandled exception or interrupted run)
  • + *
  • {@code 2} — invalid CLI arguments or environment configuration
  • + *
+ */ +public final class Main { + private static final Logger logger = LoggerFactory.getLogger(Main.class); + + private static final String YDB_DRIVER_CLASS = "tech.ydb.jdbc.YdbDriver"; + + private Main() { + // utility class + } + + public static void main(String[] args) { + Config config; + try { + config = Config.fromEnv(); + } catch (IllegalStateException e) { + logger.error("invalid environment configuration: {}", e.getMessage()); + System.exit(2); + return; + } + + KvWorkloadParams params = new KvWorkloadParams(); + try { + JCommander.newBuilder() + .programName("ydb-slo-jdbc-workload") + .acceptUnknownOptions(true) + .addObject(params) + .build() + .parse(args); + } catch (ParameterException e) { + logger.error("invalid CLI arguments: {}", e.getMessage()); + System.exit(2); + return; + } + + // CLI duration takes precedence over WORKLOAD_DURATION when supplied. + if (params.durationSeconds() <= 0) { + params.setDurationSeconds(config.durationSeconds()); + } + + try { + // The driver auto-registers via the JDBC SPI, but loading it + // explicitly fails fast with a clear message if it's missing. + Class.forName(YDB_DRIVER_CLASS); + } catch (ClassNotFoundException e) { + logger.error("YDB JDBC driver not found on classpath: {}", YDB_DRIVER_CLASS); + System.exit(1); + return; + } + + logger.info("starting SLO workload: name={}, ref={}, duration={}s, readRps={}, writeRps={}, url={}", + config.workloadName(), + config.ref(), + params.durationSeconds(), + params.readRps(), + params.writeRps(), + config.jdbcUrl()); + + // The table path embeds workload name and ref so concurrent runs of + // the current and baseline images don't step on each other. Both + // components are sanitized: WORKLOAD_NAME comes from the action input + // and is normally already safe, but we don't trust user input to be + // a valid YDB identifier. + String tablePath = sanitize(config.workloadName()) + "_" + sanitize(config.ref()); + + Properties connectionProperties = new Properties(); + if (config.token() != null && !config.token().isEmpty()) { + connectionProperties.setProperty("token", config.token()); + } + + int exitCode = 0; + Metrics metrics = Metrics.create(config); + KvWorkload workload = new KvWorkload( + config.jdbcUrl(), connectionProperties, metrics, params, tablePath + ); + + try { + workload.setup(); + workload.run(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("workload interrupted"); + exitCode = 1; + } catch (Throwable t) { + logger.error("workload failed", t); + exitCode = 1; + } finally { + try { + workload.teardown(); + } catch (Throwable t) { + logger.warn("teardown failed", t); + } + + try { + metrics.flush(); + } catch (Throwable t) { + logger.warn("metrics flush failed", t); + } + + closeQuietly(metrics, "metrics"); + } + + System.exit(exitCode); + } + + private static void closeQuietly(AutoCloseable closeable, String name) { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (Throwable t) { + logger.warn("failed to close {}: {}", name, t.toString()); + } + } + + /** + * Replaces characters that aren't valid in YDB table names with underscores. + * Refs from CI may include slashes ({@code release/1.2}) or dots, which + * the action permits in metrics labels but YDB rejects in table paths. + */ + private static String sanitize(String value) { + StringBuilder sb = new StringBuilder(value.length()); + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + if (Character.isLetterOrDigit(c) || c == '_') { + sb.append(c); + } else { + sb.append('_'); + } + } + return sb.toString(); + } +} diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/Metrics.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/Metrics.java new file mode 100644 index 0000000..767a747 --- /dev/null +++ b/slo-workload/jdbc/src/main/java/tech/ydb/slo/Metrics.java @@ -0,0 +1,381 @@ +package tech.ydb.slo; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongUpDownCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import org.HdrHistogram.AtomicHistogram; +import org.HdrHistogram.Histogram; + +/** + * Collects and pushes SLO workload metrics to the OTLP endpoint configured by + * the YDB SLO action runtime. + * + *

Metrics emitted (matching the contract from + * {@code ydb-platform/ydb-slo-action}): + *

    + *
  • {@code sdk.operations.total} — counter, labeled by + * {@code operation_type} and {@code operation_status}
  • + *
  • {@code sdk.errors.total} — counter, labeled by + * {@code operation_type} and {@code error_kind}
  • + *
  • {@code sdk.retry.attempts.total} — counter, labeled by + * {@code operation_type} and {@code operation_status}
  • + *
  • {@code sdk.pending.operations} — up/down counter, labeled by + * {@code operation_type}
  • + *
  • {@code sdk.operation.latency.p50.seconds} / + * {@code .p95.seconds} / {@code .p99.seconds} — + * observable gauges fed from per-operation HDR histograms
  • + *
+ * + *

Every metric carries the {@code ref} label so the report action can + * separate current and baseline series. + */ +public final class Metrics implements AutoCloseable { + + public enum OperationType { + READ("read"), + WRITE("write"); + + private final String label; + + OperationType(String label) { + this.label = label; + } + + public String label() { + return label; + } + } + + public enum OperationStatus { + SUCCESS("success"), + ERROR("error"); + + private final String label; + + OperationStatus(String label) { + this.label = label; + } + + public String label() { + return label; + } + } + + private static final AttributeKey ATTR_OPERATION_TYPE = + AttributeKey.stringKey("operation_type"); + private static final AttributeKey ATTR_OPERATION_STATUS = + AttributeKey.stringKey("operation_status"); + private static final AttributeKey ATTR_ERROR_KIND = + AttributeKey.stringKey("error_kind"); + private static final AttributeKey ATTR_REF = + AttributeKey.stringKey("ref"); + + // HDR histograms record latencies in microseconds with high precision up to 60 s. + private static final long HDR_MIN_MICROS = 1L; + private static final long HDR_MAX_MICROS = 60L * 1_000_000L; + private static final int HDR_SIGNIFICANT_DIGITS = 3; + + private final SdkMeterProvider meterProvider; + private final String ref; + private final LongCounter operationsTotal; + private final LongCounter errorsTotal; + private final LongCounter retryAttemptsTotal; + private final LongUpDownCounter pendingOperations; + + private final Map histograms = new ConcurrentHashMap<>(); + + private Metrics( + SdkMeterProvider meterProvider, + String ref, + LongCounter operationsTotal, + LongCounter errorsTotal, + LongCounter retryAttemptsTotal, + LongUpDownCounter pendingOperations + ) { + this.meterProvider = meterProvider; + this.ref = ref; + this.operationsTotal = operationsTotal; + this.errorsTotal = errorsTotal; + this.retryAttemptsTotal = retryAttemptsTotal; + this.pendingOperations = pendingOperations; + } + + /* + * Builds a {@code Metrics} instance configured to push OTLP metrics every + * second to the endpoint from {@code config.otlpEndpoint()}. If the + * endpoint is empty, all metrics are still observable in-process but never + * exported. + */ + public static Metrics create(Config config) { + String ref = config.ref(); + + Resource resource = Resource.getDefault().toBuilder() + .put("service.name", config.workloadName()) + .put("ref", ref) + .put("sdk", "java") + .build(); + + SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() + .setResource(resource); + + if (config.otlpEndpoint() != null && !config.otlpEndpoint().isEmpty()) { + OtlpHttpMetricExporter exporter = OtlpHttpMetricExporter.builder() + .setEndpoint(metricsEndpoint(config.otlpEndpoint())) + .setTimeout(Duration.ofSeconds(10)) + .build(); + providerBuilder.registerMetricReader( + PeriodicMetricReader.builder(exporter) + .setInterval(Duration.ofSeconds(1)) + .build() + ); + } + + SdkMeterProvider provider = providerBuilder.build(); + Meter meter = provider.get("slo-workload-" + config.workloadName()); + + LongCounter operationsTotal = meter.counterBuilder("sdk.operations.total") + .setDescription("Total number of operations") + .setUnit("{operation}") + .build(); + + LongCounter errorsTotal = meter.counterBuilder("sdk.errors.total") + .setDescription("Total number of errors") + .setUnit("{error}") + .build(); + + LongCounter retryAttemptsTotal = meter.counterBuilder("sdk.retry.attempts.total") + .setDescription("Total number of retry attempts") + .setUnit("{attempt}") + .build(); + + LongUpDownCounter pendingOperations = meter.upDownCounterBuilder("sdk.pending.operations") + .setDescription("Currently in-flight operations") + .build(); + + Map histograms = new ConcurrentHashMap<>(); + + // Pre-create one histogram per operation_type so the first export + // already produces gauge series. We only track successful operations: + // failure latency is dominated by retry budgets / timeouts and would + // skew the percentiles without telling us anything useful about SDK + // performance. The SLO action's metrics.yaml filters by + // operation_status="success" anyway. + for (OperationType type : OperationType.values()) { + histograms.put(type, newHistogram()); + } + + // Build the three percentile gauges as raw observers — their values + // are produced by a single batch callback below, which reads + // p50/p95/p99 from the same histogram snapshot and then resets the + // histogram. Reading all three percentiles from one snapshot avoids + // races where p99 could be observed against a freshly-reset histogram + // populated by p50, and resetting after each export means the gauge + // reflects only latencies recorded during the last export interval — + // matching the JS SDK's behaviour and avoiding cold-start tail drag + // on the JVM (without reset, JIT-warmup outliers stick to p99 for + // the rest of the run). + ObservableDoubleMeasurement p50Observer = meter.gaugeBuilder("sdk.operation.latency.p50.seconds") + .setUnit("s") + .setDescription("p50 operation latency in seconds") + .buildObserver(); + + ObservableDoubleMeasurement p95Observer = meter.gaugeBuilder("sdk.operation.latency.p95.seconds") + .setUnit("s") + .setDescription("p95 operation latency in seconds") + .buildObserver(); + + ObservableDoubleMeasurement p99Observer = meter.gaugeBuilder("sdk.operation.latency.p99.seconds") + .setUnit("s") + .setDescription("p99 operation latency in seconds") + .buildObserver(); + + meter.batchCallback( + () -> observeAndResetPercentiles(histograms, ref, p50Observer, p95Observer, p99Observer), + p50Observer, p95Observer, p99Observer + ); + + Metrics metrics = new Metrics( + provider, + ref, + operationsTotal, + errorsTotal, + retryAttemptsTotal, + pendingOperations + ); + metrics.histograms.putAll(histograms); + return metrics; + } + + private static String metricsEndpoint(String otlpEndpoint) { + // OTLP HTTP exporter expects the full /v1/metrics path. The SLO action + // sets OTEL_EXPORTER_OTLP_ENDPOINT to the base URL (e.g. + // http://ydb-prometheus:9090/api/v1/otlp), so we append the suffix + // unless the user has already provided it. + String trimmed = otlpEndpoint.endsWith("/") + ? otlpEndpoint.substring(0, otlpEndpoint.length() - 1) + : otlpEndpoint; + if (trimmed.endsWith("/v1/metrics")) { + return trimmed; + } + return trimmed + "/v1/metrics"; + } + + /* + * Records a started operation and returns a span used to record the + * outcome. + */ + public Span startOperation(OperationType type) { + pendingOperations.add(1, Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label() + )); + return new Span(this, type, System.nanoTime()); + } + + /** + * Forces a final flush of pending metrics. Should be called before exit + * to make sure the report action sees the last seconds of data. + */ + public void flush() { + meterProvider.forceFlush().join(10, TimeUnit.SECONDS); + } + + @Override + public void close() { + meterProvider.shutdown().join(10, TimeUnit.SECONDS); + } + + private void recordOutcome( + OperationType type, + OperationStatus status, + int attempts, + long latencyMicros, + String errorKind + ) { + Attributes opAttrs = Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label(), + ATTR_OPERATION_STATUS, status.label() + ); + + operationsTotal.add(1, opAttrs); + retryAttemptsTotal.add(Math.max(0L, attempts), opAttrs); + pendingOperations.add(-1, Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label() + )); + + // Latency is recorded only for successful operations. Failed + // operations spend most of their time inside the retry budget / + // timeout machinery, so their latency reflects the retry policy + // rather than the SDK's performance. Mixing those samples into the + // percentile gauges produces noisy spikes during chaos scenarios + // and tells us nothing actionable. + if (status == OperationStatus.SUCCESS) { + Histogram histogram = histograms.computeIfAbsent(type, k -> newHistogram()); + long clamped = Math.max(HDR_MIN_MICROS, Math.min(HDR_MAX_MICROS, latencyMicros)); + histogram.recordValue(clamped); + } else { + errorsTotal.add(1, Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label(), + ATTR_ERROR_KIND, errorKind == null ? "unknown" : errorKind + )); + } + } + + /** + * Observes p50/p95/p99 for every populated histogram in one go and then + * resets the histogram. Called from a single OTel batch callback so all + * three percentiles are read from a consistent snapshot — without that, + * a concurrent record could land between the p50 and p99 reads and + * produce inconsistent values across gauges. + */ + private static void observeAndResetPercentiles( + Map histograms, + String ref, + ObservableDoubleMeasurement p50Out, + ObservableDoubleMeasurement p95Out, + ObservableDoubleMeasurement p99Out + ) { + for (Map.Entry entry : histograms.entrySet()) { + OperationType type = entry.getKey(); + Histogram histogram = entry.getValue(); + + long p50Micros; + long p95Micros; + long p99Micros; + if (histogram.getTotalCount() == 0) { + continue; + } + p50Micros = histogram.getValueAtPercentile(50.0); + p95Micros = histogram.getValueAtPercentile(95.0); + p99Micros = histogram.getValueAtPercentile(99.0); + histogram.reset(); + + // Percentile gauges are always tagged with operation_status="success" + // because we only record successful samples (see recordOutcome). + // The SLO action's metrics.yaml filters on this same label, so the + // gauges line up with what the report expects. + Attributes attrs = Attributes.of( + ATTR_REF, ref, + ATTR_OPERATION_TYPE, type.label(), + ATTR_OPERATION_STATUS, OperationStatus.SUCCESS.label() + ); + p50Out.record(p50Micros / 1_000_000.0, attrs); + p95Out.record(p95Micros / 1_000_000.0, attrs); + p99Out.record(p99Micros / 1_000_000.0, attrs); + } + } + + private static Histogram newHistogram() { + return new AtomicHistogram(HDR_MIN_MICROS, HDR_MAX_MICROS, HDR_SIGNIFICANT_DIGITS); + } + + /** + * One in-flight operation. Call exactly one of the {@code finish} methods. + */ + public static final class Span { + private final Metrics metrics; + private final OperationType type; + private final long startNanos; + private boolean finished; + + private Span(Metrics metrics, OperationType type, long startNanos) { + this.metrics = metrics; + this.type = type; + this.startNanos = startNanos; + } + + public void finishSuccess(int attempts) { + finish(OperationStatus.SUCCESS, attempts, null); + } + + public void finishError(int attempts, String errorKind) { + finish(OperationStatus.ERROR, attempts, errorKind); + } + + private void finish(OperationStatus status, int attempts, String errorKind) { + if (finished) { + return; + } + finished = true; + long latencyMicros = (System.nanoTime() - startNanos) / 1_000L; + metrics.recordOutcome(type, status, attempts, latencyMicros, errorKind); + } + } + +} diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/KvWorkload.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/KvWorkload.java new file mode 100644 index 0000000..dd74c32 --- /dev/null +++ b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/KvWorkload.java @@ -0,0 +1,566 @@ +package tech.ydb.slo.kv; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLRecoverableException; +import java.sql.SQLTransientConnectionException; +import java.sql.SQLTransientException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.jdbc.exception.YdbStatusable; +import tech.ydb.slo.Metrics; + +/** + * Key-value workload for the SLO test, driving the YDB JDBC driver. + * + *

The workload creates a partitioned table, prefills it with rows, and then + * runs read and write loops at fixed RPS for the configured duration. Each + * operation is timed and retried with exponential backoff; the outcome is + * recorded into {@link Metrics} so the SLO action can compare current and + * baseline runs. + * + *

Schema and queries mirror the KV workloads in the Go and JavaScript SDKs + * so the produced metrics are directly comparable across SDKs. Unlike the + * query-service workload, the primary-key {@code hash} column is derived on + * the client (see {@link #numericHash(long)}) instead of via the server-side + * {@code Digest::NumericHash}, which keeps the parameterized JDBC statements + * free of type-inference ambiguity. + * + *

Concurrency model: each operation type (read / write) gets a dedicated + * thread pool sized to the configured RPS. Every worker thread owns its own + * JDBC {@link Connection} (the YDB driver's connections are not thread-safe), + * pulls a permit from a shared Guava {@link RateLimiter}, and executes the + * operation inline. There is no separate driver thread and no work queue. + */ +public final class KvWorkload { + private static final Logger logger = LoggerFactory.getLogger(KvWorkload.class); + + private static final String CREATE_TABLE_QUERY_TEMPLATE = "" + + "CREATE TABLE IF NOT EXISTS `%s` (" + + " hash Uint64," + + " id Uint64," + + " payload_str Utf8," + + " payload_double Double," + + " payload_timestamp Timestamp," + + " payload_hash Uint64," + + " PRIMARY KEY (hash, id)" + + ") WITH (" + + " UNIFORM_PARTITIONS = %d," + + " AUTO_PARTITIONING_BY_SIZE = ENABLED," + + " AUTO_PARTITIONING_PARTITION_SIZE_MB = %d," + + " AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d," + + " AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d" + + ")"; + + private static final String DROP_TABLE_QUERY_TEMPLATE = "DROP TABLE `%s`"; + + private static final String WRITE_QUERY_TEMPLATE = "" + + "UPSERT INTO `%s` (" + + " hash, id, payload_str, payload_double, payload_timestamp, payload_hash" + + ") VALUES (?, ?, ?, ?, ?, ?)"; + + private static final String READ_QUERY_TEMPLATE = "" + + "SELECT id, payload_str, payload_double, payload_timestamp, payload_hash" + + " FROM `%s`" + + " WHERE id = ? AND hash = ?"; + + /* + * Hard cap on the number of worker threads spawned for a single operation + * type. The SLO targets a few hundred RPS in CI; allowing more workers + * than this just wastes threads on JIT-warmup contention without + * improving throughput. + */ + private static final int MAX_WORKERS = 64; + + /* + * Maximum number of attempts (initial + retries) per operation before it + * is recorded as a failure. Mirrors the order of magnitude of the + * query-service SessionRetryContext default. + */ + private static final int MAX_ATTEMPTS = 10; + + private static final long INITIAL_BACKOFF_MS = 10L; + private static final long MAX_BACKOFF_MS = 1_000L; + + /* + * Extra time, on top of the workload duration, given to worker pools to + * complete their last in-flight operations before {@link #run()} forces + * shutdown. + */ + private static final long SHUTDOWN_GRACE_SECONDS = 30L; + + private final String jdbcUrl; + private final Properties connectionProperties; + private final Metrics metrics; + private final KvWorkloadParams params; + private final String tablePath; + private final RowGenerator generator; + + public KvWorkload( + String jdbcUrl, + Properties connectionProperties, + Metrics metrics, + KvWorkloadParams params, + String tablePath + ) { + this.jdbcUrl = jdbcUrl; + this.connectionProperties = connectionProperties; + this.metrics = metrics; + this.params = params; + this.tablePath = tablePath; + this.generator = new RowGenerator(params.prefillCount()); + } + + /* + * Creates the table (if missing) and prefills it with + * {@code params.prefillCount()} rows using a bounded pool of worker + * connections. + */ + public void setup() throws InterruptedException, SQLException { + logger.info("creating table {}", tablePath); + try (Connection conn = openConnection(); + Statement stmt = conn.createStatement()) { + stmt.execute(String.format( + CREATE_TABLE_QUERY_TEMPLATE, + tablePath, + params.minPartitionCount(), + params.partitionSizeMb(), + params.minPartitionCount(), + params.maxPartitionCount() + )); + } + logger.info("table {} created", tablePath); + + if (params.prefillCount() <= 0) { + logger.info("prefill count <= 0, skipping prefill"); + return; + } + + logger.info("prefilling {} rows into {}", params.prefillCount(), tablePath); + int parallelism = Math.min(MAX_WORKERS, Math.max(1, params.prefillCount())); + ExecutorService prefillPool = Executors.newFixedThreadPool( + parallelism, namedThreadFactory("slo-prefill-") + ); + AtomicLong nextId = new AtomicLong(0); + AtomicInteger failed = new AtomicInteger(); + try { + for (int w = 0; w < parallelism; w++) { + prefillPool.execute(() -> { + try (WorkerConnection wc = new WorkerConnection()) { + long id; + while ((id = nextId.getAndIncrement()) < params.prefillCount()) { + SQLException err = writeWithRetry(wc, RowGenerator.generate(id), + params.writeTimeoutMs(), null); + if (err != null) { + int f = failed.incrementAndGet(); + if (f <= 5) { + logger.warn("prefill row {} failed: {}", id, err.toString()); + } + } + } + } + }); + } + } finally { + prefillPool.shutdown(); + if (!prefillPool.awaitTermination(5, TimeUnit.MINUTES)) { + prefillPool.shutdownNow(); + } + } + if (failed.get() > 0) { + logger.warn("prefill completed with {} failed rows out of {}", + failed.get(), params.prefillCount()); + } else { + logger.info("prefill completed"); + } + } + + /* + * Runs the workload until the configured deadline or thread interruption. + */ + public void run() throws InterruptedException { + long durationSeconds = params.durationSeconds(); + long endNanos = durationSeconds > 0 + ? System.nanoTime() + TimeUnit.SECONDS.toNanos(durationSeconds) + : Long.MAX_VALUE; + + // Track how many writes have completed so reads target a key-space + // that's actually been populated. The generator was constructed with + // nextId = prefillCount, so writes pick up where prefill left off. + AtomicLong writesIssued = new AtomicLong(); + + int readWorkers = workerCount(params.readRps()); + int writeWorkers = workerCount(params.writeRps()); + + if (readWorkers == 0 && writeWorkers == 0) { + logger.warn("both read and write RPS are <= 0, run phase has nothing to do"); + return; + } + + ExecutorService readPool = null; + ExecutorService writePool = null; + try { + if (readWorkers > 0) { + readPool = Executors.newFixedThreadPool(readWorkers, namedThreadFactory("slo-read-")); + RateLimiter readLimiter = RateLimiter.create(params.readRps()); + for (int i = 0; i < readWorkers; i++) { + readPool.execute(() -> readWorkerLoop(endNanos, readLimiter, writesIssued)); + } + } else { + logger.info("read RPS <= 0, skipping read workers"); + } + + if (writeWorkers > 0) { + writePool = Executors.newFixedThreadPool(writeWorkers, namedThreadFactory("slo-write-")); + RateLimiter writeLimiter = RateLimiter.create(params.writeRps()); + for (int i = 0; i < writeWorkers; i++) { + writePool.execute(() -> writeWorkerLoop(endNanos, writeLimiter, writesIssued)); + } + } else { + logger.info("write RPS <= 0, skipping write workers"); + } + + // Wait for workers to drain naturally as they hit the deadline. + long graceNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_GRACE_SECONDS); + long waitNanos = durationSeconds > 0 + ? Math.max(0L, endNanos - System.nanoTime()) + graceNanos + : Long.MAX_VALUE; + + if (readPool != null) { + readPool.shutdown(); + } + if (writePool != null) { + writePool.shutdown(); + } + + if (readPool != null) { + long started = System.nanoTime(); + if (!readPool.awaitTermination(waitNanos, TimeUnit.NANOSECONDS)) { + logger.warn("read pool did not drain within deadline, forcing shutdown"); + readPool.shutdownNow(); + } + waitNanos = Math.max(0L, waitNanos - (System.nanoTime() - started)); + } + if (writePool != null) { + if (!writePool.awaitTermination(waitNanos, TimeUnit.NANOSECONDS)) { + logger.warn("write pool did not drain within deadline, forcing shutdown"); + writePool.shutdownNow(); + } + } + } finally { + forceShutdown(readPool, "read pool"); + forceShutdown(writePool, "write pool"); + } + } + + /* + * Drops the workload table. Called from the {@code finally} block in + * {@code Main} so the database is left clean even on failure. + */ + public void teardown() { + logger.info("dropping table {}", tablePath); + try (Connection conn = openConnection(); + Statement stmt = conn.createStatement()) { + stmt.execute(String.format(DROP_TABLE_QUERY_TEMPLATE, tablePath)); + logger.info("table {} dropped", tablePath); + } catch (SQLException e) { + logger.warn("failed to drop table {}: {}", tablePath, e.toString()); + } + } + + // --- worker loops ------------------------------------------------------ + + private void readWorkerLoop(long endNanos, RateLimiter limiter, AtomicLong writesIssued) { + try (WorkerConnection wc = new WorkerConnection()) { + while (System.nanoTime() < endNanos && !Thread.currentThread().isInterrupted()) { + limiter.acquire(); + try { + readOnce(wc, writesIssued.get()); + } catch (Throwable t) { + logger.warn("read op threw unexpectedly: {}", t.toString()); + } + } + } + } + + private void writeWorkerLoop(long endNanos, RateLimiter limiter, AtomicLong writesIssued) { + try (WorkerConnection wc = new WorkerConnection()) { + while (System.nanoTime() < endNanos && !Thread.currentThread().isInterrupted()) { + limiter.acquire(); + try { + writeOnce(wc, generator.generate()); + writesIssued.incrementAndGet(); + } catch (Throwable t) { + logger.warn("write op threw unexpectedly: {}", t.toString()); + } + } + } + } + + // --- single operations ------------------------------------------------- + + private void readOnce(WorkerConnection wc, long writesObserved) { + long upperBound = Math.max(1L, params.prefillCount() + writesObserved); + long id = ThreadLocalRandom.current().nextLong(upperBound); + long hash = numericHash(id); + + Metrics.Span span = metrics.startOperation(Metrics.OperationType.READ); + int attempts = 0; + SQLException last = null; + while (attempts < MAX_ATTEMPTS) { + attempts++; + try { + wc.read(id, hash, timeoutSeconds(params.readTimeoutMs())); + span.finishSuccess(attempts - 1); + return; + } catch (SQLException e) { + last = e; + if (!isRetryable(e) || attempts >= MAX_ATTEMPTS) { + break; + } + wc.invalidateOnConnectionError(e); + backoff(attempts); + } + } + span.finishError(attempts - 1, classifyError(last)); + logger.debug("read {} failed: {}", id, last == null ? "?" : last.toString()); + } + + private void writeOnce(WorkerConnection wc, Row row) { + Metrics.Span span = metrics.startOperation(Metrics.OperationType.WRITE); + int[] attemptsOut = new int[1]; + SQLException err = writeWithRetry(wc, row, params.writeTimeoutMs(), attemptsOut); + if (err == null) { + span.finishSuccess(attemptsOut[0] - 1); + } else { + span.finishError(Math.max(0, attemptsOut[0] - 1), classifyError(err)); + logger.debug("write {} failed: {}", row.id(), err.toString()); + } + } + + /* + * Writes a single row with retry. When {@code attemptsOut} is non-null, the + * total number of attempts is written to its first element. Returns + * {@code null} on success or the last {@link SQLException} on failure. + * Used both by the run phase (with metrics handled by the caller) and + * prefill (silent). + */ + private SQLException writeWithRetry(WorkerConnection wc, Row row, int timeoutMs, int[] attemptsOut) { + long hash = numericHash(row.id()); + int attempts = 0; + SQLException last = null; + while (attempts < MAX_ATTEMPTS) { + attempts++; + try { + wc.write(row, hash, timeoutSeconds(timeoutMs)); + if (attemptsOut != null) { + attemptsOut[0] = attempts; + } + return null; + } catch (SQLException e) { + last = e; + if (!isRetryable(e) || attempts >= MAX_ATTEMPTS) { + break; + } + wc.invalidateOnConnectionError(e); + backoff(attempts); + } + } + if (attemptsOut != null) { + attemptsOut[0] = attempts; + } + return last; + } + + // --- helpers ----------------------------------------------------------- + + private Connection openConnection() throws SQLException { + return DriverManager.getConnection(jdbcUrl, connectionProperties); + } + + private static int timeoutSeconds(int timeoutMs) { + return Math.max(1, (timeoutMs + 999) / 1000); + } + + private static boolean isRetryable(SQLException e) { + return e instanceof SQLRecoverableException || e instanceof SQLTransientException; + } + + private static boolean isConnectionError(SQLException e) { + return e instanceof SQLRecoverableException || e instanceof SQLTransientConnectionException; + } + + private static String classifyError(SQLException e) { + if (e == null) { + return "unknown"; + } + if (e instanceof YdbStatusable) { + try { + return "ydb/" + ((YdbStatusable) e).getStatus().getCode().name().toLowerCase(); + } catch (RuntimeException ignored) { + // fall through to the generic classification + } + } + return e.getClass().getSimpleName().toLowerCase(); + } + + private static void backoff(int attempt) { + long delay = Math.min(MAX_BACKOFF_MS, INITIAL_BACKOFF_MS * (1L << Math.min(attempt - 1, 20))); + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Derives the primary-key {@code hash} column from {@code id} using a + * SplitMix64-style mix. Reads and writes both call this so they always + * target the same key. The exact function does not need to match YQL's + * {@code Digest::NumericHash}; it only needs to be deterministic and + * well distributed across partitions. + * @param id row id + * @return derived hash value + */ + private static long numericHash(long id) { + long z = id + 0x9E3779B97F4A7C15L; + z = (z ^ (z >>> 30)) * 0xBF58476D1CE4E5B9L; + z = (z ^ (z >>> 27)) * 0x94D049BB133111EBL; + return z ^ (z >>> 31); + } + + private static int workerCount(int rps) { + if (rps <= 0) { + return 0; + } + return Math.min(MAX_WORKERS, Math.max(1, rps)); + } + + private static ThreadFactory namedThreadFactory(String prefix) { + AtomicInteger counter = new AtomicInteger(); + return r -> { + Thread t = new Thread(r, prefix + counter.getAndIncrement()); + t.setDaemon(true); + return t; + }; + } + + private static void forceShutdown(ExecutorService pool, String name) { + if (pool == null || pool.isTerminated()) { + return; + } + logger.warn("{} still active in cleanup, forcing shutdown", name); + pool.shutdownNow(); + try { + if (!pool.awaitTermination(5, TimeUnit.SECONDS)) { + logger.warn("{} did not terminate after shutdownNow", name); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * A JDBC connection owned by a single worker thread, together with lazily + * prepared read/write statements. On a connection-level error the holder + * is invalidated so the next operation transparently reconnects. + */ + private final class WorkerConnection implements AutoCloseable { + private Connection connection; + private PreparedStatement readStmt; + private PreparedStatement writeStmt; + + private Connection connection() throws SQLException { + if (connection == null || connection.isClosed()) { + connection = openConnection(); + readStmt = null; + writeStmt = null; + } + return connection; + } + + private PreparedStatement readStmt() throws SQLException { + Connection conn = connection(); + if (readStmt == null) { + readStmt = conn.prepareStatement(String.format(READ_QUERY_TEMPLATE, tablePath)); + } + return readStmt; + } + + private PreparedStatement writeStmt() throws SQLException { + Connection conn = connection(); + if (writeStmt == null) { + writeStmt = conn.prepareStatement(String.format(WRITE_QUERY_TEMPLATE, tablePath)); + } + return writeStmt; + } + + void read(long id, long hash, int timeoutSeconds) throws SQLException { + PreparedStatement stmt = readStmt(); + stmt.setQueryTimeout(timeoutSeconds); + stmt.setLong(1, id); + stmt.setLong(2, hash); + try (ResultSet rs = stmt.executeQuery()) { + // Touch the result set so we exercise the deserialization path. + while (rs.next()) { + rs.getLong("id"); + } + } + } + + void write(Row row, long hash, int timeoutSeconds) throws SQLException { + PreparedStatement stmt = writeStmt(); + stmt.setQueryTimeout(timeoutSeconds); + stmt.setLong(1, hash); + stmt.setLong(2, row.id()); + stmt.setString(3, row.payloadStr()); + stmt.setDouble(4, row.payloadDouble()); + stmt.setTimestamp(5, Timestamp.from(row.payloadTimestamp())); + stmt.setLong(6, row.payloadHash()); + stmt.executeUpdate(); + } + + void invalidateOnConnectionError(SQLException e) { + if (isConnectionError(e)) { + close(); + } + } + + @Override + public void close() { + closeQuietly(readStmt); + closeQuietly(writeStmt); + closeQuietly(connection); + readStmt = null; + writeStmt = null; + connection = null; + } + + private void closeQuietly(AutoCloseable closeable) { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (Exception ignored) { + // best-effort cleanup + } + } + } +} diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java new file mode 100644 index 0000000..41f2156 --- /dev/null +++ b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java @@ -0,0 +1,115 @@ +package tech.ydb.slo.kv; + +import com.beust.jcommander.Parameter; + +/** + * Tunable parameters for the KV workload. + * + *

Defaults match the SLO workloads in the Go and JavaScript SDKs so the + * runs are comparable. JCommander annotations let the operator override + * any field from the command line, e.g. + * {@code --read-rps 500 --write-rps 50}. + */ +@SuppressWarnings("FieldMayBeFinal") +public final class KvWorkloadParams { + + @Parameter( + names = {"--read-rps"}, + description = "Target read operations per second" + ) + private int readRps = 1000; + + @Parameter( + names = {"--write-rps"}, + description = "Target write operations per second" + ) + private int writeRps = 100; + + @Parameter( + names = {"--read-timeout-ms"}, + description = "Per-attempt read timeout in milliseconds" + ) + private int readTimeoutMs = 10_000; + + @Parameter( + names = {"--write-timeout-ms"}, + description = "Per-attempt write timeout in milliseconds" + ) + private int writeTimeoutMs = 10_000; + + @Parameter( + names = {"--prefill-count"}, + description = "Number of rows to prefill before the run phase" + ) + private int prefillCount = 1_000; + + @Parameter( + names = {"--partition-size"}, + description = "Auto-partitioning partition size in MB" + ) + private int partitionSizeMb = 1; + + @Parameter( + names = {"--min-partition-count"}, + description = "Minimum number of table partitions" + ) + private int minPartitionCount = 6; + + @Parameter( + names = {"--max-partition-count"}, + description = "Maximum number of table partitions" + ) + private int maxPartitionCount = 1_000; + + @Parameter( + names = {"--duration"}, + description = "Run duration in seconds (overrides WORKLOAD_DURATION when > 0)" + ) + private int durationSeconds = 0; + + public int readRps() { + return readRps; + } + + public int writeRps() { + return writeRps; + } + + public int readTimeoutMs() { + return readTimeoutMs; + } + + public int writeTimeoutMs() { + return writeTimeoutMs; + } + + public int prefillCount() { + return prefillCount; + } + + public int partitionSizeMb() { + return partitionSizeMb; + } + + public int minPartitionCount() { + return minPartitionCount; + } + + public int maxPartitionCount() { + return maxPartitionCount; + } + + /** + * Effective run duration. If the CLI flag was omitted (left at 0), falls + * back to the value supplied via the {@code WORKLOAD_DURATION} environment + * variable through {@code Config}. + * @return Effective run duration value + */ + public int durationSeconds() { + return durationSeconds; + } + + public void setDurationSeconds(int durationSeconds) { + this.durationSeconds = durationSeconds; + } +} diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/Row.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/Row.java new file mode 100644 index 0000000..a53ba89 --- /dev/null +++ b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/Row.java @@ -0,0 +1,64 @@ +package tech.ydb.slo.kv; + +import java.time.Instant; + +/** + * A single row of the KV workload table. + * + *

The schema mirrors the one used by SLO workloads in other YDB SDKs + * (Go, JavaScript): + *

+ * hash              Uint64 (primary key, derived from id)
+ * id                Uint64 (primary key)
+ * payload_str       Utf8
+ * payload_double    Double
+ * payload_timestamp Timestamp
+ * payload_hash      Uint64
+ * 
+ * + *

The {@code hash} primary-key column is derived from {@code id}; the JDBC + * workload computes it on the client (see {@code KvWorkload}) so reads and + * writes target the same key without relying on server-side YQL builtins in + * parameterized statements. + */ +public final class Row { + private final long id; + private final String payloadStr; + private final double payloadDouble; + private final Instant payloadTimestamp; + private final long payloadHash; + + public Row( + long id, + String payloadStr, + double payloadDouble, + Instant payloadTimestamp, + long payloadHash + ) { + this.id = id; + this.payloadStr = payloadStr; + this.payloadDouble = payloadDouble; + this.payloadTimestamp = payloadTimestamp; + this.payloadHash = payloadHash; + } + + public long id() { + return id; + } + + public String payloadStr() { + return payloadStr; + } + + public double payloadDouble() { + return payloadDouble; + } + + public Instant payloadTimestamp() { + return payloadTimestamp; + } + + public long payloadHash() { + return payloadHash; + } +} diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/RowGenerator.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/RowGenerator.java new file mode 100644 index 0000000..04fdb9d --- /dev/null +++ b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/RowGenerator.java @@ -0,0 +1,55 @@ +package tech.ydb.slo.kv; + +import java.time.Instant; +import java.util.Base64; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Generates rows for the KV workload. + * + *

Each row gets a monotonically increasing {@code id} and a random payload. + * The format mirrors the SLO workloads in the Go and JS SDKs so the resulting + * tables are comparable. + */ +public final class RowGenerator { + private static final int MIN_PAYLOAD_LENGTH = 20; + private static final int MAX_PAYLOAD_LENGTH = 40; + + private final AtomicLong nextId; + + public RowGenerator(long startId) { + this.nextId = new AtomicLong(startId); + } + + /** + * Generates a new row with a fresh monotonically increasing id. + * @return a new row + */ + public Row generate() { + long id = nextId.getAndIncrement(); + return generate(id); + } + + /** + * Generates a row with an explicit id (used during prefill to control IDs). + * @param id row id + * @return a new row + */ + public static Row generate(long id) { + long payloadHash = ThreadLocalRandom.current().nextLong(); + double payloadDouble = ThreadLocalRandom.current().nextDouble(); + String payloadStr = randomPayloadString(); + Instant payloadTimestamp = Instant.now(); + + return new Row(id, payloadStr, payloadDouble, payloadTimestamp, payloadHash); + } + + private static String randomPayloadString() { + int length = MIN_PAYLOAD_LENGTH + + ThreadLocalRandom.current().nextInt(MAX_PAYLOAD_LENGTH - MIN_PAYLOAD_LENGTH + 1); + byte[] bytes = new byte[length]; + ThreadLocalRandom.current().nextBytes(bytes); + return Base64.getEncoder().withoutPadding().encodeToString(bytes); + } +} diff --git a/slo-workload/jdbc/src/main/resources/log4j2.xml b/slo-workload/jdbc/src/main/resources/log4j2.xml new file mode 100644 index 0000000..0ebaed0 --- /dev/null +++ b/slo-workload/jdbc/src/main/resources/log4j2.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/slo-workload/pom.xml b/slo-workload/pom.xml new file mode 100644 index 0000000..f1535c0 --- /dev/null +++ b/slo-workload/pom.xml @@ -0,0 +1,77 @@ + + 4.0.0 + + tech.ydb.examples + ydb-sdk-examples + 1.1.0-SNAPSHOT + ../pom.xml + + + slo-workload + pom + + YDB SLO Workload Tests + SLO workload applications for validating YDB Java clients under load and chaos + + + 17 + + 2.3.20 + 1.59.0 + 2.2.2 + 33.4.0-jre + + + + jdbc + + + + + + + io.opentelemetry + opentelemetry-bom + ${opentelemetry.version} + pom + import + + + + + tech.ydb.jdbc + ydb-jdbc-driver + ${ydb.jdbc.version} + + + + org.hdrhistogram + HdrHistogram + ${hdrhistogram.version} + + + + com.google.guava + guava + ${guava.version} + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.release} + + + + + +