diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 0618526617..fe31086f89 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -2,275 +2,192 @@ name: SLO on: pull_request: - types: [opened, reopened, synchronize] - branches: - - main - workflow_dispatch: - inputs: - github_issue: - description: "GitHub issue number where the SLO results will be reported" - required: true - baseline_ref: - description: "Baseline commit/branch/tag to compare against (leave empty to auto-detect merge-base with main)" - required: false - slo_workload_duration_seconds: - description: "Duration of the SLO workload in seconds" - required: false - default: "600" - slo_workload_read_max_rps: - description: "Maximum read RPS for the SLO workload" - required: false - default: "1000" - slo_workload_write_max_rps: - description: "Maximum write RPS for the SLO workload" - required: false - default: "100" + types: [opened, reopened, synchronize, labeled] jobs: ydb-slo-action: + if: contains(github.event.pull_request.labels.*.name, 'SLO') + name: Run YDB SLO Tests runs-on: ubuntu-latest + permissions: + contents: read + strategy: + fail-fast: false matrix: - compiler: [clang, gcc] - include: - - workload: table + sdk: + - name: cpp-key-value + preset: release-test-clang + command: "" concurrency: - group: slo-${{ github.ref }}-${{ matrix.os }}-${{ matrix.workload }}-${{ matrix.compiler }} + group: slo-${{ github.ref }}-${{ matrix.sdk.name }} cancel-in-progress: true steps: - name: Install dependencies run: | + set -euxo pipefail YQ_VERSION=v4.48.2 BUILDX_VERSION=0.30.1 COMPOSE_VERSION=2.40.3 - sudo curl -L https://github.com/mikefarah/yq/releases/download/${YQ_VERSION}/yq_linux_amd64 -o /usr/local/bin/yq && \ - sudo chmod +x /usr/local/bin/yq + sudo curl -fLo /usr/local/bin/yq \ + "https://github.com/mikefarah/yq/releases/download/${YQ_VERSION}/yq_linux_amd64" + sudo chmod +x /usr/local/bin/yq - echo "Updating Docker plugins..." sudo mkdir -p /usr/local/lib/docker/cli-plugins - echo "Installing Docker Buildx ${BUILDX_VERSION}..." sudo curl -fLo /usr/local/lib/docker/cli-plugins/docker-buildx \ "https://github.com/docker/buildx/releases/download/v${BUILDX_VERSION}/buildx-v${BUILDX_VERSION}.linux-amd64" sudo chmod +x /usr/local/lib/docker/cli-plugins/docker-buildx - echo "Installing Docker Compose ${COMPOSE_VERSION}..." sudo curl -fLo /usr/local/lib/docker/cli-plugins/docker-compose \ "https://github.com/docker/compose/releases/download/v${COMPOSE_VERSION}/docker-compose-linux-x86_64" sudo chmod +x /usr/local/lib/docker/cli-plugins/docker-compose - echo "Installed versions:" yq --version docker --version docker buildx version docker compose version - - name: Checkout current version + - name: Checkout current SDK version uses: actions/checkout@v5 with: - path: current + path: sdk-current fetch-depth: 0 submodules: true - name: Determine baseline commit id: baseline + working-directory: sdk-current run: | - cd current - if [[ -n "${{ inputs.baseline_ref }}" ]]; then - BASELINE="${{ inputs.baseline_ref }}" - else - BASELINE=$(git merge-base HEAD origin/main) - fi - echo "sha=$BASELINE" >> $GITHUB_OUTPUT + set -euo pipefail + BASELINE=$(git merge-base HEAD origin/main) + echo "sha=${BASELINE}" >> "$GITHUB_OUTPUT" - # Try to determine a human-readable ref name for baseline - # Check if baseline is on main - if git merge-base --is-ancestor $BASELINE origin/main && \ - [ "$(git rev-parse origin/main)" = "$BASELINE" ]; then + if git merge-base --is-ancestor "${BASELINE}" origin/main && \ + [ "$(git rev-parse origin/main)" = "${BASELINE}" ]; then BASELINE_REF="main" else - # Try to find a branch containing this commit - BRANCH=$(git branch -r --contains $BASELINE | grep -v HEAD | head -1 | sed 's/.*\///' || echo "") - if [ -n "$BRANCH" ]; then + BRANCH=$(git branch -r --contains "${BASELINE}" | grep -v HEAD | head -1 | sed 's|.*/||' || echo "") + if [ -n "${BRANCH}" ]; then BASELINE_REF="${BRANCH}@${BASELINE:0:7}" else BASELINE_REF="${BASELINE:0:7}" fi fi - echo "ref=$BASELINE_REF" >> $GITHUB_OUTPUT + echo "ref=${BASELINE_REF}" >> "$GITHUB_OUTPUT" - - name: Checkout baseline version + - name: Checkout baseline SDK version uses: actions/checkout@v5 with: ref: ${{ steps.baseline.outputs.sha }} - path: baseline + path: sdk-baseline fetch-depth: 1 submodules: true - - name: Build Workload Image - run: | - echo "Cleaning up Docker system before builds..." - docker system prune -af --volumes - docker builder prune -af - df -h - - # Build current version - if [ -f "$GITHUB_WORKSPACE/current/tests/slo_workloads/Dockerfile" ]; then - echo "Building current app image..." - cd "$GITHUB_WORKSPACE/current" - - # Use SLO-specific .dockerignore - cp tests/slo_workloads/.dockerignore .dockerignore - - docker build -t ydb-app-current \ - --build-arg REF="${{ github.head_ref || github.ref_name }}" \ - --build-arg PRESET=release-test-${{ matrix.compiler }} \ - -f tests/slo_workloads/Dockerfile . - - # Clean up .dockerignore - rm -f .dockerignore - else - echo "No current app Dockerfile found" - exit 1 - fi - - docker system prune -f --volumes - docker builder prune -af - - # Build baseline version - if [ -f "$GITHUB_WORKSPACE/baseline/tests/slo_workloads/Dockerfile" ]; then - echo "Building baseline app image..." - cd "$GITHUB_WORKSPACE/baseline" - - # Use SLO-specific .dockerignore - cp tests/slo_workloads/.dockerignore .dockerignore + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 - docker build -t ydb-app-baseline \ - --build-arg REF="${{ steps.baseline.outputs.ref }}" \ - --build-arg PRESET=release-test-${{ matrix.compiler }} \ - -f tests/slo_workloads/Dockerfile . - - # Clean up .dockerignore - rm -f .dockerignore - else - echo "No baseline app Dockerfile found" - exit 1 - fi - - docker system prune -f --volumes - docker builder prune -af + # Use current's workload harness (Dockerfile, sources, .dockerignore) for + # both builds so only the SDK library differs between current and + # baseline. Without this the baseline image picks up the harness from + # the merge-base commit, which can lag behind the action's contract. + # buildx also expects .dockerignore at the context root, not under + # tests/, so copy it up in each checkout. + - name: Stage workload harness + run: | + set -euxo pipefail + rm -rf sdk-baseline/tests/slo_workloads + cp -a sdk-current/tests/slo_workloads sdk-baseline/tests/slo_workloads + cp sdk-current/tests/slo_workloads/.dockerignore sdk-current/.dockerignore + cp sdk-baseline/tests/slo_workloads/.dockerignore sdk-baseline/.dockerignore + + # `cache-to: type=gha` does NOT export `--mount=type=cache` content, so + # ccache state is lost between runs. Persist /root/.ccache via host + # directory + cache-dance: actions/cache restores the host dir, the + # dance injects it into the BuildKit cache mount before the build and + # extracts the updated state afterwards for the next save. + - name: Restore ccache + id: ccache + uses: actions/cache@v4 + with: + path: ccache + key: slo-ccache-${{ matrix.sdk.preset }}-${{ github.run_id }} + restore-keys: | + slo-ccache-${{ matrix.sdk.preset }}- - echo "Final disk space after builds:" - df -h + - name: Inject ccache into BuildKit + uses: reproducible-containers/buildkit-cache-dance@v3.1.2 + with: + cache-map: | + { + "ccache": "/root/.ccache" + } + # Always extract so newly-compiled TUs from this run are saved by + # actions/cache (key uses ${{ github.run_id }}, so each run gets + # its own snapshot). Without extraction the cache stays frozen at + # whatever was first persisted. + skip-extraction: false + + # A clean build of the SLO image takes ~30 min because the Dockerfile + # rebuilds the full C++ toolchain + abseil/protobuf/grpc from source. + # The GHA cache lets subsequent runs reuse every layer up to the SDK + # source COPY, so only the actual workload link step reruns (~3 min). + - name: Build current workload image + uses: docker/build-push-action@v6 + env: + DOCKER_BUILD_SUMMARY: "false" + DOCKER_BUILD_RECORD_UPLOAD: "false" + with: + context: sdk-current + file: sdk-current/tests/slo_workloads/Dockerfile + platforms: linux/amd64 + tags: ydb-app-current + load: true + build-args: PRESET=${{ matrix.sdk.preset }} + cache-from: type=gha,scope=slo-${{ matrix.sdk.preset }} + cache-to: type=gha,mode=max,scope=slo-${{ matrix.sdk.preset }} + + - name: Build baseline workload image + id: baseline-build + continue-on-error: true + uses: docker/build-push-action@v6 + env: + DOCKER_BUILD_SUMMARY: "false" + DOCKER_BUILD_RECORD_UPLOAD: "false" + with: + context: sdk-baseline + file: sdk-baseline/tests/slo_workloads/Dockerfile + platforms: linux/amd64 + tags: ydb-app-baseline + load: true + build-args: PRESET=${{ matrix.sdk.preset }} + cache-from: type=gha,scope=slo-${{ matrix.sdk.preset }} + + # If the historical commit lacks the SLO Dockerfile or can't compile, + # reuse the current image so the SLO run is still comparable against + # itself rather than failing outright. + - name: Fall back to current image for baseline + if: steps.baseline-build.outcome == 'failure' + run: | + echo "Baseline build failed; reusing current image as baseline." + docker tag ydb-app-current ydb-app-baseline - - name: Initialize YDB SLO - uses: ydb-platform/ydb-slo-action/init@main + - name: Run SLO Tests + uses: ydb-platform/ydb-slo-action/init@v2 + timeout-minutes: 30 with: - github_issue: ${{ github.event.inputs.github_issue }} + github_issue: ${{ github.event.pull_request.number }} github_token: ${{ secrets.GITHUB_TOKEN }} - workload_name: ${{ matrix.workload }}-${{ matrix.compiler }} + workload_name: ${{ matrix.sdk.name }} + workload_duration: "600" workload_current_ref: ${{ github.head_ref || github.ref_name }} + workload_current_image: ydb-app-current + workload_current_command: ${{ matrix.sdk.command }} --read-rps 1000 --write-rps 100 workload_baseline_ref: ${{ steps.baseline.outputs.ref }} - - - name: Prepare SLO Database - run: | - echo "Preparing SLO database..." - docker run --rm --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - ydb-app-current --connection-string grpc://ydb:2136/?database=/Root/testdb create --dont-push - - - name: Run SLO Tests (parallel) - timeout-minutes: 15 - run: | - DURATION=${{ inputs.slo_workload_duration_seconds || 600 }} - READ_RPS=${{ inputs.slo_workload_read_max_rps || 1000 }} - WRITE_RPS=${{ inputs.slo_workload_write_max_rps || 100 }} - - ARGS="--connection-string grpc://ydb:2136/?database=/Root/testdb run \ - --metrics-push-url http://prometheus:9090/api/v1/otlp/v1/metrics \ - --time $DURATION \ - --read-rps $READ_RPS \ - --write-rps $WRITE_RPS \ - --read-timeout 100 \ - --write-timeout 100" - - echo "Starting ydb-app-current..." - docker run -d \ - --name ydb-app-current \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - ydb-app-current $ARGS - - echo "Starting ydb-app-baseline..." - docker run -d \ - --name ydb-app-baseline \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - ydb-app-baseline $ARGS - - # Show initial logs - echo "" - echo "==================== INITIAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== INITIAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - # Wait for workloads to complete - echo "Waiting for workloads to complete (${DURATION}s)..." - sleep ${DURATION} - - # Stop containers after workload duration and wait for graceful shutdown - echo "Stopping containers after ${DURATION}s..." - docker stop --timeout=30 ydb-app-current ydb-app-baseline 2>&1 || true - - # Force kill if still running - docker kill ydb-app-current ydb-app-baseline 2>&1 || true - - # Check exit codes - CURRENT_EXIT=$(docker inspect ydb-app-current --format='{{.State.ExitCode}}' 2>/dev/null || echo "1") - BASELINE_EXIT=$(docker inspect ydb-app-baseline --format='{{.State.ExitCode}}' 2>/dev/null || echo "0") - - echo "Current container exit code: $CURRENT_EXIT" - echo "Baseline container exit code: $BASELINE_EXIT" - - # Show final logs - echo "" - echo "==================== FINAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== FINAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - echo "SUCCESS: Workloads completed successfully" - - - if: always() - name: Store logs - run: | - docker logs ydb-app-current > current.log 2>&1 || echo "No current container" - docker logs ydb-app-baseline > baseline.log 2>&1 || echo "No baseline container" - - - if: always() - uses: actions/upload-artifact@v4 - with: - name: ${{ matrix.workload }}-${{ matrix.compiler }}-slo-cpp-sdk-logs - path: | - ./current.log - ./baseline.log - retention-days: 1 + workload_baseline_image: ydb-app-baseline + workload_baseline_command: ${{ matrix.sdk.command }} --read-rps 1000 --write-rps 100 diff --git a/.github/workflows/slo_report.yml b/.github/workflows/slo_report.yml index 0a7c2e3483..0ccd36abe9 100644 --- a/.github/workflows/slo_report.yml +++ b/.github/workflows/slo_report.yml @@ -7,17 +7,53 @@ on: - completed jobs: - ydb-slo-action-report: + publish-slo-report: + if: github.event.workflow_run.conclusion == 'success' runs-on: ubuntu-latest name: Publish YDB SLO Report permissions: + actions: read checks: write contents: read pull-requests: write - if: github.event.workflow_run.conclusion == 'success' steps: - name: Publish YDB SLO Report - uses: ydb-platform/ydb-slo-action/report@main + uses: ydb-platform/ydb-slo-action/report@v2 with: github_token: ${{ secrets.GITHUB_TOKEN }} github_run_id: ${{ github.event.workflow_run.id }} + + remove-slo-label: + needs: publish-slo-report + if: always() && github.event.workflow_run.event == 'pull_request' + runs-on: ubuntu-latest + name: Remove SLO Label + permissions: + pull-requests: write + steps: + - name: Remove SLO label from PR + uses: actions/github-script@v7 + with: + script: | + const pullRequests = context.payload.workflow_run.pull_requests; + if (pullRequests && pullRequests.length > 0) { + for (const pr of pullRequests) { + try { + await github.rest.issues.removeLabel({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pr.number, + name: 'SLO' + }); + console.log(`Removed SLO label from PR #${pr.number}`); + } catch (error) { + if (error.status === 404) { + console.log(`SLO label not found on PR #${pr.number}, skipping`); + } else { + throw error; + } + } + } + } else { + console.log('No pull requests associated with this workflow run'); + } diff --git a/tests/slo_workloads/Dockerfile b/tests/slo_workloads/Dockerfile index 091ce23066..f87be8f8df 100644 --- a/tests/slo_workloads/Dockerfile +++ b/tests/slo_workloads/Dockerfile @@ -1,21 +1,57 @@ +# syntax=docker/dockerfile:1.7 FROM ubuntu:22.04 ARG PRESET=release-test-clang -ARG REF=unknown -# Install software-properties-common for add-apt-repository -RUN apt-get -y update && apt-get -y install software-properties-common && add-apt-repository ppa:ubuntu-toolchain-r/test +# ccache settings consumed by the configure/build steps below. The cache dir +# is materialised by the BuildKit cache mount on those RUN steps; values +# elsewhere in the image are inert. +ENV CCACHE_DIR=/root/.ccache +ENV CCACHE_MAXSIZE=2G +ENV CCACHE_COMPRESS=true +ENV CCACHE_COMPILERCHECK=content + +# Every RUN that hits the network retries on transient failures so one +# flake doesn't throw away 30 min of previous build work. apt gets five +# Acquire retries + 60 s timeouts; wget gets the equivalent via WGET_OPTS. +RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \ + echo 'Acquire::http::Timeout "60";' >> /etc/apt/apt.conf.d/80-retries && \ + echo 'Acquire::https::Timeout "60";' >> /etc/apt/apt.conf.d/80-retries + +ENV WGET_OPTS="--tries=5 --waitretry=15 --timeout=60 --retry-connrefused --retry-on-http-error=500,502,503,504" + +# Install software-properties-common and add the gcc-13 PPA. +# Acquire::Retries only retries HTTP errors; TCP connect timeouts to +# ppa.launchpadcontent.net still drop through and kill the step. Wrap the +# whole command in a shell retry loop with exponential backoff so a CDN +# blip doesn't throw away 30 minutes of downstream build work. +RUN for i in 1 2 3 4 5; do \ + apt-get -y update && \ + apt-get -y install software-properties-common && \ + add-apt-repository -y ppa:ubuntu-toolchain-r/test && \ + apt-get -y update && \ + break; \ + echo "add-apt-repository attempt $i failed; sleeping $((i * 15))s"; \ + sleep $((i * 15)); \ + done && \ + apt-cache show gcc-13 > /dev/null # fail fast if PPA never came up # Install C++ tools and libraries -RUN apt-get -y update && apt-get -y install \ - git gdb wget ninja-build libidn11-dev ragel yasm libc-ares-dev libre2-dev \ - rapidjson-dev zlib1g-dev libxxhash-dev libzstd-dev libsnappy-dev libgtest-dev libgmock-dev \ - libbz2-dev liblz4-dev libdouble-conversion-dev libssl-dev libstdc++-13-dev gcc-13 g++-13 \ - && apt-get clean && rm -rf /var/lib/apt/lists/* +RUN for i in 1 2 3 4 5; do \ + apt-get -y install \ + git gdb wget ninja-build libidn11-dev ragel yasm libc-ares-dev libre2-dev \ + rapidjson-dev zlib1g-dev libxxhash-dev libzstd-dev libsnappy-dev libgtest-dev libgmock-dev \ + libbz2-dev liblz4-dev libdouble-conversion-dev libssl-dev libstdc++-13-dev gcc-13 g++-13 && \ + break; \ + echo "apt-get install attempt $i failed; sleeping $((i * 15))s"; \ + sleep $((i * 15)); \ + apt-get -y update || true; \ + done && \ + apt-get clean && rm -rf /var/lib/apt/lists/* # Install CMake ENV CMAKE_VERSION=3.27.7 -RUN wget https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.sh \ +RUN wget $WGET_OPTS https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.sh \ -q -O cmake-install.sh \ && chmod u+x cmake-install.sh \ && ./cmake-install.sh --skip-license --prefix=/usr/local \ @@ -23,7 +59,7 @@ RUN wget https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cm # Install LLVM ENV LLVM_VERSION=16 -RUN wget https://apt.llvm.org/llvm.sh && \ +RUN wget $WGET_OPTS https://apt.llvm.org/llvm.sh && \ chmod u+x llvm.sh && \ ./llvm.sh ${LLVM_VERSION} && \ rm llvm.sh @@ -40,7 +76,7 @@ RUN update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-13 10000 && \ # Install abseil-cpp ENV ABSEIL_CPP_VERSION=20230802.0 ENV ABSEIL_CPP_INSTALL_DIR=/root/ydb_deps/absl -RUN wget -O abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz https://github.com/abseil/abseil-cpp/archive/refs/tags/${ABSEIL_CPP_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz https://github.com/abseil/abseil-cpp/archive/refs/tags/${ABSEIL_CPP_VERSION}.tar.gz && \ tar -xvzf abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz && cd abseil-cpp-${ABSEIL_CPP_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release -DABSL_PROPAGATE_CXX_STD=ON .. && \ @@ -51,7 +87,7 @@ RUN wget -O abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz https://github.com/abseil/ab # Install protobuf ENV PROTOBUF_VERSION=3.21.12 ENV PROTOBUF_INSTALL_DIR=/root/ydb_deps/protobuf -RUN wget -O protobuf-${PROTOBUF_VERSION}.tar.gz https://github.com/protocolbuffers/protobuf/archive/refs/tags/v${PROTOBUF_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O protobuf-${PROTOBUF_VERSION}.tar.gz https://github.com/protocolbuffers/protobuf/archive/refs/tags/v${PROTOBUF_VERSION}.tar.gz && \ tar -xvzf protobuf-${PROTOBUF_VERSION}.tar.gz && cd protobuf-${PROTOBUF_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release -Dprotobuf_BUILD_TESTS=OFF -Dprotobuf_INSTALL=ON -Dprotobuf_ABSL_PROVIDER=package .. && \ @@ -62,7 +98,7 @@ RUN wget -O protobuf-${PROTOBUF_VERSION}.tar.gz https://github.com/protocolbuffe # Install grpc ENV GRPC_VERSION=1.54.3 ENV GRPC_INSTALL_DIR=/root/ydb_deps/grpc -RUN wget -O grpc-${GRPC_VERSION}.tar.gz https://github.com/grpc/grpc/archive/refs/tags/v${GRPC_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O grpc-${GRPC_VERSION}.tar.gz https://github.com/grpc/grpc/archive/refs/tags/v${GRPC_VERSION}.tar.gz && \ tar -xvzf grpc-${GRPC_VERSION}.tar.gz && cd grpc-${GRPC_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_PREFIX_PATH="${ABSEIL_CPP_INSTALL_DIR};${PROTOBUF_INSTALL_DIR}" \ @@ -79,7 +115,7 @@ RUN wget -O grpc-${GRPC_VERSION}.tar.gz https://github.com/grpc/grpc/archive/ref # Install base64 ENV BASE64_VERSION=0.5.2 ENV BASE64_INSTALL_DIR=/root/ydb_deps/base64 -RUN wget -O base64-${BASE64_VERSION}.tar.gz https://github.com/aklomp/base64/archive/refs/tags/v${BASE64_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O base64-${BASE64_VERSION}.tar.gz https://github.com/aklomp/base64/archive/refs/tags/v${BASE64_VERSION}.tar.gz && \ tar -xvzf base64-${BASE64_VERSION}.tar.gz && cd base64-${BASE64_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release .. && \ @@ -90,7 +126,7 @@ RUN wget -O base64-${BASE64_VERSION}.tar.gz https://github.com/aklomp/base64/arc # Install brotli ENV BROTLI_VERSION=1.1.0 ENV BROTLI_INSTALL_DIR=/root/ydb_deps/brotli -RUN wget -O brotli-${BROTLI_VERSION}.tar.gz https://github.com/google/brotli/archive/refs/tags/v${BROTLI_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O brotli-${BROTLI_VERSION}.tar.gz https://github.com/google/brotli/archive/refs/tags/v${BROTLI_VERSION}.tar.gz && \ tar -xvzf brotli-${BROTLI_VERSION}.tar.gz && cd brotli-${BROTLI_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release .. && \ @@ -101,7 +137,7 @@ RUN wget -O brotli-${BROTLI_VERSION}.tar.gz https://github.com/google/brotli/arc # Install jwt-cpp ENV JWT_CPP_VERSION=0.7.0 ENV JWT_CPP_INSTALL_DIR=/root/ydb_deps/jwt-cpp -RUN wget -O jwt-cpp-${JWT_CPP_VERSION}.tar.gz https://github.com/Thalhammer/jwt-cpp/archive/refs/tags/v${JWT_CPP_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O jwt-cpp-${JWT_CPP_VERSION}.tar.gz https://github.com/Thalhammer/jwt-cpp/archive/refs/tags/v${JWT_CPP_VERSION}.tar.gz && \ tar -xvzf jwt-cpp-${JWT_CPP_VERSION}.tar.gz && cd jwt-cpp-${JWT_CPP_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release .. && \ @@ -111,7 +147,7 @@ RUN wget -O jwt-cpp-${JWT_CPP_VERSION}.tar.gz https://github.com/Thalhammer/jwt- # Install ccache 4.8.1 or above ENV CCACHE_VERSION=4.8.1 -RUN wget https://github.com/ccache/ccache/releases/download/v${CCACHE_VERSION}/ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz \ +RUN wget $WGET_OPTS https://github.com/ccache/ccache/releases/download/v${CCACHE_VERSION}/ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz \ && tar -xf ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz \ && cp ccache-${CCACHE_VERSION}-linux-x86_64/ccache /usr/local/bin/ \ && rm -rf ccache-${CCACHE_VERSION}-linux-x86_64 ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz @@ -120,7 +156,13 @@ COPY . /ydb-cpp-sdk WORKDIR /ydb-cpp-sdk RUN rm -rf build -RUN cmake -DSLO_BRANCH_REF=${REF} --preset ${PRESET} -RUN cmake --build --preset default --target slo-key-value +RUN --mount=type=cache,target=/root/.ccache,sharing=locked \ + cmake --preset ${PRESET} \ + -DCMAKE_C_COMPILER_LAUNCHER=ccache \ + -DCMAKE_CXX_COMPILER_LAUNCHER=ccache +RUN --mount=type=cache,target=/root/.ccache,sharing=locked \ + ccache --zero-stats >/dev/null \ + && cmake --build --preset default --target slo-key-value \ + && ccache --show-stats ENTRYPOINT ["./build/tests/slo_workloads/key_value/slo-key-value"] diff --git a/tests/slo_workloads/key_value/create.cpp b/tests/slo_workloads/key_value/create.cpp index a79db76a12..a105caba7f 100644 --- a/tests/slo_workloads/key_value/create.cpp +++ b/tests/slo_workloads/key_value/create.cpp @@ -8,7 +8,7 @@ using namespace NYdb::NTable; namespace { void CreateTable(TTableClient& client, const std::string& prefix) { - RetryBackoff(client, 5, [prefix](TSession session) { + NYdb::NStatusHelpers::ThrowOnError(client.RetryOperationSync([prefix](TSession session) { auto desc = TTableBuilder() .AddNullableColumn("object_id_key", EPrimitiveType::Uint32) .AddNullableColumn("object_id", EPrimitiveType::Uint32) @@ -27,7 +27,7 @@ namespace { , std::move(desc) , std::move(tableSettings) ).ExtractValueSync(); - }); + })); } } //namespace diff --git a/tests/slo_workloads/key_value/drop.cpp b/tests/slo_workloads/key_value/drop.cpp index ae749bc903..e22c92c894 100644 --- a/tests/slo_workloads/key_value/drop.cpp +++ b/tests/slo_workloads/key_value/drop.cpp @@ -4,15 +4,20 @@ using namespace NLastGetopt; using namespace NYdb; using namespace NYdb::NTable; -static void DropTable(TTableClient& client, const std::string& path) { - NYdb::NStatusHelpers::ThrowOnError(client.RetryOperationSync([path](TSession session) { - return session.DropTable(path).ExtractValueSync(); - })); -} - int DropTable(TDatabaseOptions& dbOptions) { TTableClient client(dbOptions.Driver); - DropTable(client, JoinPath(dbOptions.Prefix, TableName)); + const std::string path = JoinPath(dbOptions.Prefix, TableName); + TStatus status = client.RetryOperationSync([path](TSession session) { + TStatus dropStatus = session.DropTable(path).ExtractValueSync(); + if (dropStatus.GetStatus() == EStatus::NOT_FOUND) { + return TStatus(EStatus::SUCCESS, NYdb::NIssue::TIssues()); + } + return dropStatus; + }); + if (!status.IsSuccess()) { + Cerr << "DropTable failed: " << status << Endl; + return EXIT_FAILURE; + } Cout << "Table dropped." << Endl; return EXIT_SUCCESS; } diff --git a/tests/slo_workloads/key_value/key_value.cpp b/tests/slo_workloads/key_value/key_value.cpp index 1cb40f3620..23bd3ff184 100644 --- a/tests/slo_workloads/key_value/key_value.cpp +++ b/tests/slo_workloads/key_value/key_value.cpp @@ -50,6 +50,7 @@ int DoCreate(TDatabaseOptions& dbOptions, int argc, char** argv) { jobs->Start(); jobs->Wait(); jobs->ShowProgress(); + jobs.reset(); return EXIT_SUCCESS; } @@ -95,6 +96,7 @@ int DoRun(TDatabaseOptions& dbOptions, int argc, char** argv) { Cout << "All jobs finished: " << TInstant::Now().ToRfc822StringLocal() << Endl; jobs->ShowProgress(); + jobs.reset(); return EXIT_SUCCESS; } diff --git a/tests/slo_workloads/utils/CMakeLists.txt b/tests/slo_workloads/utils/CMakeLists.txt index e8589a568f..5434af9bb4 100644 --- a/tests/slo_workloads/utils/CMakeLists.txt +++ b/tests/slo_workloads/utils/CMakeLists.txt @@ -1,3 +1,16 @@ +include(FetchContent) + +FetchContent_Declare( + hdr_histogram + GIT_REPOSITORY https://github.com/HdrHistogram/HdrHistogram_c.git + GIT_TAG 0.11.8 + EXCLUDE_FROM_ALL +) +set(HDR_HISTOGRAM_BUILD_PROGRAMS OFF CACHE BOOL "" FORCE) +set(HDR_HISTOGRAM_BUILD_SHARED OFF CACHE BOOL "" FORCE) +set(HDR_LOG_REQUIRED OFF CACHE BOOL "" FORCE) +FetchContent_MakeAvailable(hdr_histogram) + add_library(slo-utils) target_link_libraries(slo-utils PUBLIC @@ -9,9 +22,9 @@ target_link_libraries(slo-utils PUBLIC opentelemetry-cpp::otlp_http_metric_exporter ) -if (SLO_BRANCH_REF) - target_compile_definitions(slo-utils PRIVATE REF=${SLO_BRANCH_REF}) -endif() +target_link_libraries(slo-utils PRIVATE + hdr_histogram_static +) target_sources(slo-utils PRIVATE executor.cpp diff --git a/tests/slo_workloads/utils/executor.cpp b/tests/slo_workloads/utils/executor.cpp index 28add35c98..270f89a8bc 100644 --- a/tests/slo_workloads/utils/executor.cpp +++ b/tests/slo_workloads/utils/executor.cpp @@ -16,62 +16,17 @@ TInsistentClient::TInsistentClient(const TCommonOptions& opts) .AllowRequestMigration(true) ) , ClientMaxRetries(opts.MaxRetries) - , Timeout(opts.ReactionTime) - , RetryTimeout(Timeout / 2) - , SessionTimeout(Timeout + ReactionTimeDelay) - , UseApplicationTimeout(opts.UseApplicationTimeout) - , SendPreventiveRequest(opts.SendPreventiveRequest) + , SessionTimeout(opts.ReactionTime + ReactionTimeDelay) { - if (UseApplicationTimeout || SendPreventiveRequest) { - CallbackQueue.Start(opts.MaxCallbackThreads); - // Thread that executes timeout callbacks - auto threadFunc = [this]() { - TDuration timeToSleep; - while (!ShouldStop.WaitT(timeToSleep)) { - TInstant wakeupTime; - TInstant now; - with_lock(CallbacksLock) { - now = TInstant::Now(); - while (!TimeoutCallbacks.empty() && now >= TimeoutCallbacks.front().ExecucionTime) { - Y_UNUSED(CallbackQueue.AddFunc(TimeoutCallbacks.front().Callback)); - RemoveTimeoutIter(TimeoutCallbacks.front().context); - } - while (!RetryCallbacks.empty() && now >= RetryCallbacks.front().ExecucionTime) { - Y_UNUSED(CallbackQueue.AddFunc(RetryCallbacks.front().Callback)); - RemoveRetryIter(RetryCallbacks.front().context); - } - if (RetryCallbacks.empty()) { - wakeupTime = now + RetryTimeout; - } else { - wakeupTime = RetryCallbacks.front().ExecucionTime; - } - if (!TimeoutCallbacks.empty()) { - wakeupTime = Min(wakeupTime, TimeoutCallbacks.front().ExecucionTime); - } - } - timeToSleep = wakeupTime - now; - } - }; - WorkThread.reset(SystemThreadFactory()->Run(threadFunc).Release()); - } } TInsistentClient::~TInsistentClient() { - ShouldStop.Signal(); - if (UseApplicationTimeout || SendPreventiveRequest) { - if (WorkThread) { - WorkThread->Join(); - } else { - Cerr << (TStringBuilder() << "TInsistentClient::~TINsistentClient Error: WorkThread is not running." << Endl); - } - CallbackQueue.Stop(); - } Client.Stop().Wait(WaitTimeout); } void TInsistentClient::Report(TStringBuilder& out) const { - out << "Client retries sent: total " << CounterSStart.load() - << ", successful " << CounterSOk.load() << Endl; + out << "Operations dispatched: " << CounterStart.load() + << ", succeeded: " << CounterOk.load() << Endl; } std::uint64_t TInsistentClient::GetActiveSessions() const { @@ -79,112 +34,33 @@ std::uint64_t TInsistentClient::GetActiveSessions() const { return static_cast(sessions); } -void TInsistentClient::ClearContext(std::shared_ptr& context) { - if (SendPreventiveRequest) { - RemoveRetryIter(context); - } - if (UseApplicationTimeout) { - RemoveTimeoutIter(context); - } -} - -void TInsistentClient::RemoveRetryIter(std::shared_ptr& context) { - if (context->RetryIter.Valid) { - context->RetryIter.Valid = false; - RetryCallbacks.erase(context->RetryIter.RealIter); - } -} - -void TInsistentClient::RemoveTimeoutIter(std::shared_ptr& context) { - if (context->TimeoutIter.Valid) { - context->TimeoutIter.Valid = false; - TimeoutCallbacks.erase(context->TimeoutIter.RealIter); - } -} - -TAsyncFinalStatus TInsistentClient::ExecuteWithRetry(const NYdb::NTable::TTableClient::TOperationFunc& operation) { +TAsyncFinalStatus TInsistentClient::ExecuteWithRetry(const NYdb::NTable::TTableClient::TOperationFunc& operation, + const std::shared_ptr& stat) +{ TTracedPromise promise = TTracedPromise( NThreading::NewPromise(), &ExecutorPromises ); - std::shared_ptr context = std::make_shared(); - - auto launchOperation = [this, operation, promise, context](bool firstTime) mutable { - with_lock(context->Lock) { - if (context->Finished) { - return; - } - } - auto callback = [promise, context, firstTime, this](const NYdb::TAsyncStatus& future) mutable { - Y_ABORT_UNLESS(future.HasValue()); - // Not setting promise under lock to avoid deadlock - bool firstCallback = false; - with_lock(context->Lock) { - if (!context->Finished) { - context->Finished = true; - firstCallback = true; - } - } - if (firstCallback) { - promise.SetValue(future.GetValue()); - with_lock(CallbacksLock) { - if (firstTime) { - CounterFOk.fetch_add(1); - } else { - CounterSOk.fetch_add(1); - } - ClearContext(context); - } - } - }; - if (firstTime) { - CounterFStart.fetch_add(1); - } else { - CounterSStart.fetch_add(1); - } - NYdb::NTable::TRetryOperationSettings settings; - settings.MaxRetries(ClientMaxRetries); - settings.GetSessionClientTimeout(SessionTimeout); - auto future = Client.RetryOperation(operation, settings); - future.Subscribe(std::move(callback)); - }; - with_lock(CallbacksLock) { - TInstant now = TInstant::Now(); + CounterStart.fetch_add(1); - if (SendPreventiveRequest) { - auto onRetryTimeout = [launchOperation]() mutable { - launchOperation(false); - }; + NYdb::NTable::TRetryOperationSettings settings; + settings.MaxRetries(ClientMaxRetries); + settings.GetSessionClientTimeout(SessionTimeout); - RetryCallbacks.push_back({ now + RetryTimeout, onRetryTimeout, context }); - context->RetryIter = { --RetryCallbacks.end() }; - } - - if (UseApplicationTimeout) { - auto onTimeout = [this, promise, context]() mutable { - // Not setting promise under lock to avoid deadlock - bool firstCallback = false; - with_lock(context->Lock) { - if (!context->Finished) { - context->Finished = true; - firstCallback = true; - } - } - if (firstCallback) { - promise.SetValue(TFinalStatus()); - with_lock(CallbacksLock) { - ClearContext(context); - } - } - }; - - TimeoutCallbacks.push_back({ now + Timeout, onTimeout, context }); - context->TimeoutIter = { --TimeoutCallbacks.end() }; + auto wrappedOperation = [operation, stat](NYdb::NTable::TSession session) { + stat->IncRetryAttempts(); + return operation(session); + }; + auto future = Client.RetryOperation(wrappedOperation, settings); + future.Subscribe([promise, this](const NYdb::TAsyncStatus& f) mutable { + Y_ABORT_UNLESS(f.HasValue()); + const auto& status = f.GetValue(); + if (status.IsSuccess()) { + CounterOk.fetch_add(1); } - } - - launchOperation(true); + promise.SetValue(status); + }); return promise.GetFuture(); } @@ -276,10 +152,7 @@ bool TExecutor::Execute(const NYdb::NTable::TTableClient::TOperationFunc& func) auto stat = Stats.StartRequest(); - auto future = InsistentClient.ExecuteWithRetry([func, stat](NYdb::NTable::TSession session) { - auto result = func(session); - return result; - }); + auto future = InsistentClient.ExecuteWithRetry(func, stat); future.Subscribe([this, stat, SemaphoreWrapper](const TAsyncFinalStatus& future) mutable { Y_ABORT_UNLESS(future.HasValue()); diff --git a/tests/slo_workloads/utils/executor.h b/tests/slo_workloads/utils/executor.h index 69ed261a35..637d791b9a 100644 --- a/tests/slo_workloads/utils/executor.h +++ b/tests/slo_workloads/utils/executor.h @@ -50,59 +50,20 @@ class TTracedPromise : public NThreading::TPromise { class TInsistentClient { public: - struct TDelayedCallback; - - struct TCheckedIterator { - std::list::iterator RealIter; - bool Valid = true; - }; - - struct TOperationContext { - bool Finished = false; - TAdaptiveLock Lock; - TCheckedIterator RetryIter; - TCheckedIterator TimeoutIter; - }; - - struct TDelayedCallback { - TInstant ExecucionTime; - std::function Callback; - std::shared_ptr context; - }; - TInsistentClient(const TCommonOptions& opts); ~TInsistentClient(); void Report(TStringBuilder& out) const; - TAsyncFinalStatus ExecuteWithRetry(const NYdb::NTable::TTableClient::TOperationFunc& operation); + TAsyncFinalStatus ExecuteWithRetry(const NYdb::NTable::TTableClient::TOperationFunc& operation, + const std::shared_ptr& stat); std::uint64_t GetActiveSessions() const; private: - void ClearContext(std::shared_ptr& context); - void RemoveRetryIter(std::shared_ptr& context); - void RemoveTimeoutIter(std::shared_ptr& context); - - TThreadPool CallbackQueue; NYdb::NTable::TTableClient Client; std::uint32_t ClientMaxRetries; - TDuration Timeout; - TDuration RetryTimeout; TDuration SessionTimeout; - TAdaptiveLock CallbacksLock; - std::unique_ptr WorkThread; - TManualEvent ShouldStop; - std::list RetryCallbacks; - std::list TimeoutCallbacks; - bool UseApplicationTimeout; - bool SendPreventiveRequest; - - // Ok received on the First try - std::atomic CounterFOk = 0; - // Ok received on the Second try - std::atomic CounterSOk = 0; - // First try launches (= total) - std::atomic CounterFStart = 0; - // Second try launches - std::atomic CounterSStart = 0; + + std::atomic CounterStart = 0; + std::atomic CounterOk = 0; }; class TExecutor { diff --git a/tests/slo_workloads/utils/metrics.cpp b/tests/slo_workloads/utils/metrics.cpp index 50e1f859c0..708c83a563 100644 --- a/tests/slo_workloads/utils/metrics.cpp +++ b/tests/slo_workloads/utils/metrics.cpp @@ -11,21 +11,97 @@ #include +#include + +#include + +#include +#include +#include +#include +#include +#include using namespace std::chrono_literals; -#ifdef REF -static constexpr const std::string_view REF_LABEL = Y_STRINGIZE(REF); -#else -static constexpr const std::string_view REF_LABEL = "unknown"; -#endif +namespace { + +constexpr std::int64_t kHdrMinLatencyNs = 1'000; // 1 us +constexpr std::int64_t kHdrMaxLatencyNs = 60'000'000'000; // 60 s +constexpr int kHdrSignificantFigures = 3; + +std::string ResolveWorkloadRef() { + std::string ref = GetEnv("WORKLOAD_REF"); + return ref.empty() ? "unknown" : ref; +} + +// Thread-safe HDR histogram. Only successful latencies are recorded; errors +// are excluded from the percentile stream (operation_status="success"). +class TLatencyRecorder { +public: + TLatencyRecorder() { + hdr_histogram* raw = nullptr; + int rc = hdr_init(kHdrMinLatencyNs, kHdrMaxLatencyNs, kHdrSignificantFigures, &raw); + Y_ABORT_UNLESS(rc == 0, "hdr_init failed: %d", rc); + Histogram_.reset(raw); + } + + void Record(TDuration d) { + std::int64_t ns = static_cast(d.NanoSeconds()); + if (ns < kHdrMinLatencyNs) { + ns = kHdrMinLatencyNs; + } else if (ns > kHdrMaxLatencyNs) { + ns = kHdrMaxLatencyNs; + } + std::lock_guard lock(Mutex_); + hdr_record_value(Histogram_.get(), ns); + } + + struct TPercentiles { + double P50 = 0.0; + double P95 = 0.0; + double P99 = 0.0; + bool HasData = false; + }; + + // Snapshot all three percentiles from one consistent HDR state and reset + // the window — so each export cycle's gauge reflects only the last + // interval's latencies. Reading p50/p95/p99 in one critical section + // matches the Java workload's batch-callback pattern (avoids the race + // where p99 would observe a histogram already reset by p50). + TPercentiles SnapshotAndReset() { + TPercentiles out; + std::lock_guard lock(Mutex_); + if (Histogram_->total_count == 0) { + return out; + } + out.HasData = true; + out.P50 = hdr_value_at_percentile(Histogram_.get(), 50.0) / 1e9; + out.P95 = hdr_value_at_percentile(Histogram_.get(), 95.0) / 1e9; + out.P99 = hdr_value_at_percentile(Histogram_.get(), 99.0) / 1e9; + hdr_reset(Histogram_.get()); + return out; + } + +private: + struct THdrDeleter { + void operator()(hdr_histogram* h) const noexcept { if (h) hdr_close(h); } + }; + + std::mutex Mutex_; + std::unique_ptr Histogram_; +}; -class TOtelMetricsPusher : public IMetricsPusher { +// Process-wide pusher: ONE MeterProvider with one OTLP exporter shared by +// all operation types. Publishing duplicate MeterProviders against the same +// Prometheus endpoint produces racing `target_info` writes for the same +// resource label set, which Prometheus rejects as `out of order sample`. +class TOtelSharedPusher { public: - TOtelMetricsPusher(const std::string& metricsPushUrl, const std::string& operationType) - : OperationType_(operationType) + explicit TOtelSharedPusher(const std::string& metricsPushUrl) + : Ref_(ResolveWorkloadRef()) , CommonAttributes_{ - {"ref", std::string(REF_LABEL)}, + {"ref", Ref_}, {"sdk", "cpp"}, {"sdk_version", NYdb::GetSdkSemver()} } @@ -36,15 +112,16 @@ class TOtelMetricsPusher : public IMetricsPusher { auto exporter = opentelemetry::exporter::otlp::OtlpHttpMetricExporterFactory::Create(exporterOptions); opentelemetry::sdk::metrics::PeriodicExportingMetricReaderOptions readerOptions; - readerOptions.export_interval_millis = 250ms; - readerOptions.export_timeout_millis = 200ms; + readerOptions.export_interval_millis = 1000ms; + readerOptions.export_timeout_millis = 500ms; - auto metricReader = opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), readerOptions); + auto metricReader = opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create( + std::move(exporter), readerOptions); - // Create MeterContext with resource auto context = std::make_unique( std::unique_ptr(new opentelemetry::sdk::metrics::ViewRegistry()), - opentelemetry::sdk::resource::Resource::Create(opentelemetry::common::MakeKeyValueIterableView(CommonAttributes_)) + opentelemetry::sdk::resource::Resource::Create( + opentelemetry::common::MakeKeyValueIterableView(CommonAttributes_)) ); MeterProvider_ = opentelemetry::sdk::metrics::MeterProviderFactory::Create(std::move(context)); @@ -55,113 +132,201 @@ class TOtelMetricsPusher : public IMetricsPusher { InitMetrics(); } - void PushRequestData(const TRequestData& requestData) override { - if (requestData.Status == NYdb::EStatus::SUCCESS) { - OperationsSuccessTotal_->Add(1, MergeAttributes({{"operation_type", OperationType_}})); - } else { - ErrorsTotal_->Add(1, MergeAttributes({{"status", YdbStatusToString(requestData.Status)}})); - OperationsFailureTotal_->Add(1, MergeAttributes({{"operation_type", OperationType_}})); + ~TOtelSharedPusher() { + // Remove observable-gauge callbacks before MeterProvider tears down + // the readers, so a final collection in flight cannot see this object + // half-destroyed. + if (LatencyP50_) LatencyP50_->RemoveCallback(&TOtelSharedPusher::ObserveP50, this); + if (LatencyP95_) LatencyP95_->RemoveCallback(&TOtelSharedPusher::ObserveP95, this); + if (LatencyP99_) LatencyP99_->RemoveCallback(&TOtelSharedPusher::ObserveP99, this); + // MeterProvider destructor calls Shutdown(); do not call it explicitly + // here — the OTel SDK rejects a second Shutdown with a warning. + } + + void Record(const std::string& operationType, const TRequestData& data) { + const bool success = data.Status == NYdb::EStatus::SUCCESS; + auto& series = GetOrCreateSeries(operationType); + + OperationsTotal_->Add(uint64_t{1}, + opentelemetry::common::MakeKeyValueIterableView( + success ? series.SuccessAttrs : series.ErrorAttrs)); + + // sdk_retry_attempts_total = total number of technical attempts + // including the first one. RetryAttempts counts only post-first + // attempts, so add 1 to include the initial attempt. + RetryAttemptsTotal_->Add(data.RetryAttempts + 1, + opentelemetry::common::MakeKeyValueIterableView(series.RetryAttrs)); + + if (success) { + series.Recorder.Record(data.Delay); } - OperationsTotal_->Add(1, MergeAttributes({{"operation_type", OperationType_}})); - OperationLatencySeconds_->Record(requestData.Delay.SecondsFloat(), MergeAttributes({{"operation_type", OperationType_}, {"status", YdbStatusToString(requestData.Status)}})); - RetryAttempts_->Record(requestData.RetryAttempts, MergeAttributes({{"operation_type", OperationType_}})); } private: - void InitMetrics() { - ErrorsTotal_ = Meter_->CreateUInt64Counter("sdk_errors_total", - "Total number of errors encountered, categorized by error type." - ); - - OperationsTotal_ = Meter_->CreateUInt64Counter("sdk_operations_total", - "Total number of operations, categorized by type attempted by the SDK." - ); - - OperationsSuccessTotal_ = Meter_->CreateUInt64Counter("sdk_operations_success_total", - "Total number of successful operations, categorized by type." - ); + struct TSeries { + TLatencyRecorder Recorder; + // Pre-merged attribute maps used on the hot path so Record() does not + // allocate / copy CommonAttributes_ per call. + std::map SuccessAttrs; + std::map ErrorAttrs; + std::map RetryAttrs; + + // Cached snapshot, refreshed by EnsureSnapshot() in whichever + // observable-gauge callback fires first per export cycle. All three + // callbacks read from these atomics so p50/p95/p99 land in the same + // export with consistent values from one HDR snapshot — independent + // of the order the SDK iterates instruments. + std::atomic P50{0.0}; + std::atomic P95{0.0}; + std::atomic P99{0.0}; + std::atomic HasData{false}; + std::mutex SnapshotMutex; + std::chrono::steady_clock::time_point LastSnapshot{}; + }; + + // Half the export interval — guarantees one snapshot per cycle while + // tolerating arbitrary callback ordering. + static constexpr std::chrono::milliseconds kSnapshotFreshness{500}; + + void EnsureSnapshot(TSeries& series) { + auto now = std::chrono::steady_clock::now(); + std::lock_guard lock(series.SnapshotMutex); + if (now - series.LastSnapshot < kSnapshotFreshness) { + return; + } + auto snap = series.Recorder.SnapshotAndReset(); + if (snap.HasData) { + series.P50.store(snap.P50); + series.P95.store(snap.P95); + series.P99.store(snap.P99); + series.HasData.store(true); + } else { + series.HasData.store(false); + } + series.LastSnapshot = now; + } - OperationsFailureTotal_ = Meter_->CreateUInt64Counter("sdk_operations_failure_total", - "Total number of failed operations, categorized by type." - ); + TSeries& GetOrCreateSeries(const std::string& op) { + { + std::shared_lock lock(SeriesMutex_); + auto it = Series_.find(op); + if (it != Series_.end()) { + return *it->second; + } + } + std::unique_lock lock(SeriesMutex_); + auto& slot = Series_[op]; + if (!slot) { + slot = std::make_unique(); + slot->SuccessAttrs = CommonAttributes_; + slot->SuccessAttrs["operation_type"] = op; + slot->SuccessAttrs["operation_status"] = "success"; + slot->ErrorAttrs = CommonAttributes_; + slot->ErrorAttrs["operation_type"] = op; + slot->ErrorAttrs["operation_status"] = "error"; + slot->RetryAttrs = CommonAttributes_; + slot->RetryAttrs["operation_type"] = op; + } + return *slot; + } - OperationLatencySeconds_ = CreateDoubleHistogram("sdk_operation_latency_seconds", - "Latency of operations performed by the SDK in seconds, categorized by type and status.", - { - 0.001, // 1 ms - 0.002, // 2 ms - 0.003, // 3 ms - 0.004, // 4 ms - 0.005, // 5 ms - 0.0075, // 7.5 ms - 0.010, // 10 ms - 0.020, // 20 ms - 0.050, // 50 ms - 0.100, // 100 ms - 0.200, // 200 ms - 0.500, // 500 ms - 1.000, // 1 s - }, - "s" - ); + void InitMetrics() { + OperationsTotal_ = Meter_->CreateUInt64Counter("sdk_operations_total", + "Total number of operations, categorized by operation type and status."); + RetryAttemptsTotal_ = Meter_->CreateUInt64Counter("sdk_retry_attempts_total", + "Total number of retry attempts (including the first attempt), categorized by operation type."); + + LatencyP50_ = Meter_->CreateDoubleObservableGauge( + "sdk_operation_latency_p50_seconds", + "P50 latency of successful operations in seconds.", "s"); + LatencyP95_ = Meter_->CreateDoubleObservableGauge( + "sdk_operation_latency_p95_seconds", + "P95 latency of successful operations in seconds.", "s"); + LatencyP99_ = Meter_->CreateDoubleObservableGauge( + "sdk_operation_latency_p99_seconds", + "P99 latency of successful operations in seconds.", "s"); + + LatencyP50_->AddCallback(&TOtelSharedPusher::ObserveP50, this); + LatencyP95_->AddCallback(&TOtelSharedPusher::ObserveP95, this); + LatencyP99_->AddCallback(&TOtelSharedPusher::ObserveP99, this); + } - RetryAttempts_ = Meter_->CreateInt64Gauge("sdk_retry_attempts", - "Current retry attempts, categorized by operation type." - ); + static void ObserveP50(opentelemetry::metrics::ObserverResult r, void* s) { + ObservePercentile(r, s, &TSeries::P50); + } + static void ObserveP95(opentelemetry::metrics::ObserverResult r, void* s) { + ObservePercentile(r, s, &TSeries::P95); + } + static void ObserveP99(opentelemetry::metrics::ObserverResult r, void* s) { + ObservePercentile(r, s, &TSeries::P99); } - std::unique_ptr> CreateDoubleHistogram( - const std::string& name, - const std::string& description, - const std::vector& buckets, - const std::string& unit = {}) + // Each callback ensures a fresh snapshot exists for the current export + // cycle (EnsureSnapshot is a no-op if one was taken less than + // kSnapshotFreshness ago). Whichever of P50/P95/P99 the SDK invokes first + // performs the snapshot+reset; the others read cached atomics. Order + // between the three callbacks is irrelevant. + static void ObservePercentile(opentelemetry::metrics::ObserverResult result, void* state, + std::atomic TSeries::*field) { - auto selector = std::make_unique( - opentelemetry::sdk::metrics::InstrumentType::kHistogram, - name, - unit - ); + auto* self = static_cast(state); + auto obs = opentelemetry::nostd::get< + opentelemetry::nostd::shared_ptr>>(result); + + std::shared_lock lock(self->SeriesMutex_); + for (const auto& [op, series] : self->Series_) { + self->EnsureSnapshot(*series); + if (!series->HasData.load()) { + continue; + } + obs->Observe((series.get()->*field).load(), + opentelemetry::common::MakeKeyValueIterableView(series->SuccessAttrs)); + } + } - auto meterSelector = std::make_unique( - "slo_workloads", - NYdb::GetSdkSemver(), - "" - ); + std::string Ref_; + std::map CommonAttributes_; - auto histogramConfig = std::make_shared(); - histogramConfig->boundaries_ = buckets; + std::unique_ptr MeterProvider_; + std::shared_ptr Meter_; - auto view = std::make_unique( - "", - "", - opentelemetry::sdk::metrics::AggregationType::kHistogram, - histogramConfig - ); + std::unique_ptr> OperationsTotal_; + std::unique_ptr> RetryAttemptsTotal_; + std::shared_ptr LatencyP50_; + std::shared_ptr LatencyP95_; + std::shared_ptr LatencyP99_; - MeterProvider_->AddView(std::move(selector), std::move(meterSelector), std::move(view)); + std::shared_mutex SeriesMutex_; + std::unordered_map> Series_; +}; - return Meter_->CreateDoubleHistogram(name, description, unit); - } +std::mutex g_sharedMu; +std::weak_ptr g_shared; - // Helper to merge common attributes with metric-specific ones - std::map MergeAttributes(const std::map& metricAttrs) const { - std::map result = CommonAttributes_; - result.insert(metricAttrs.begin(), metricAttrs.end()); - return result; +std::shared_ptr GetOrCreateSharedPusher(const std::string& url) { + std::lock_guard lock(g_sharedMu); + auto sp = g_shared.lock(); + if (!sp) { + sp = std::make_shared(url); + g_shared = sp; } + return sp; +} - std::string OperationType_; - std::map CommonAttributes_; // ref, sdk, sdk_version +class TOtelMetricsPusherWrapper : public IMetricsPusher { +public: + TOtelMetricsPusherWrapper(std::shared_ptr shared, std::string operationType) + : Shared_(std::move(shared)) + , OperationType_(std::move(operationType)) + {} - std::unique_ptr MeterProvider_; - std::shared_ptr Meter_; + void PushRequestData(const TRequestData& requestData) override { + Shared_->Record(OperationType_, requestData); + } - std::unique_ptr> ErrorsTotal_; - std::unique_ptr> OperationsTotal_; - std::unique_ptr> OperationsSuccessTotal_; - std::unique_ptr> OperationsFailureTotal_; - std::unique_ptr> OperationLatencySeconds_; - std::unique_ptr> RetryAttempts_; +private: + std::shared_ptr Shared_; + std::string OperationType_; }; class TNoopMetricsPusher : public IMetricsPusher { @@ -169,8 +334,10 @@ class TNoopMetricsPusher : public IMetricsPusher { void PushRequestData([[maybe_unused]] const TRequestData& requestData) override {} }; +} // namespace + std::unique_ptr CreateOtelMetricsPusher(const std::string& metricsPushUrl, const std::string& operationType) { - return std::make_unique(metricsPushUrl, operationType); + return std::make_unique(GetOrCreateSharedPusher(metricsPushUrl), operationType); } std::unique_ptr CreateNoopMetricsPusher() { diff --git a/tests/slo_workloads/utils/statistics.cpp b/tests/slo_workloads/utils/statistics.cpp index 15789e7620..dddc35c56a 100644 --- a/tests/slo_workloads/utils/statistics.cpp +++ b/tests/slo_workloads/utils/statistics.cpp @@ -36,6 +36,11 @@ TStat::TStat(const std::optional& metricsPushUrl, const std::string MetricsPushQueue.Start(20); } +TStat::~TStat() { + MetricsPushQueue.Stop(); + MetricsPusher.reset(); +} + void TStat::Start() { StartTime = TInstant::Now(); } @@ -71,9 +76,12 @@ void TStat::FinishRequest(const std::shared_ptr& unit, const TFinalSt } ScheduleMetricsPush([this, delay, status, unit]() { + NYdb::EStatus requestStatus = status + ? status->GetStatus() + : NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED; MetricsPusher->PushRequestData({ .Delay = delay, - .Status = status->GetStatus(), + .Status = requestStatus, .RetryAttempts = unit->RetryAttempts }); }); diff --git a/tests/slo_workloads/utils/statistics.h b/tests/slo_workloads/utils/statistics.h index 9e7a092430..c61a267db4 100644 --- a/tests/slo_workloads/utils/statistics.h +++ b/tests/slo_workloads/utils/statistics.h @@ -62,6 +62,8 @@ class TStat { TInstant GetStartTime() const; + ~TStat(); + private: void ScheduleMetricsPush(std::function func); diff --git a/tests/slo_workloads/utils/utils.cpp b/tests/slo_workloads/utils/utils.cpp index b7572c2981..d686d4958c 100644 --- a/tests/slo_workloads/utils/utils.cpp +++ b/tests/slo_workloads/utils/utils.cpp @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include #include #include @@ -110,6 +112,19 @@ std::string GetDatabase(const std::string& connectionString) { return {}; } +static std::string DefaultConnectionStringFromEnv() { + std::string cs = GetEnv("YDB_CONNECTION_STRING"); + if (!cs.empty()) { + return cs; + } + std::string endpoint = GetEnv("YDB_ENDPOINT"); + std::string database = GetEnv("YDB_DATABASE"); + if (!endpoint.empty() && !database.empty()) { + return TStringBuilder() << endpoint << "/?database=" << database; + } + return {}; +} + int DoMain(int argc, char** argv, TCreateCommand create, TRunCommand run, TCleanupCommand cleanup) { TOpts opts = TOpts::Default(); @@ -121,8 +136,15 @@ int DoMain(int argc, char** argv, TCreateCommand create, TRunCommand run, TClean std::string statConfigFile; std::string balancingPolicy; - opts.AddLongOption('c', "connection-string", "YDB connection string").Required().RequiredArgument("SCHEMA://HOST:PORT/?DATABASE=DATABASE") + std::string defaultConnectionString = DefaultConnectionStringFromEnv(); + + auto& connOpt = opts.AddLongOption('c', "connection-string", "YDB connection string").RequiredArgument("SCHEMA://HOST:PORT/?DATABASE=DATABASE") .StoreResult(&connectionString); + if (!defaultConnectionString.empty()) { + connOpt.DefaultValue(defaultConnectionString); + } else { + connOpt.Required(); + } opts.AddLongOption('p', "prefix", "Base prefix for tables").RequiredArgument("PATH") .StoreResult(&prefix); opts.AddLongOption('k', "token", "security token").RequiredArgument("TOKEN") @@ -136,23 +158,42 @@ int DoMain(int argc, char** argv, TCreateCommand create, TRunCommand run, TClean opts.AddLongOption('b', "balancing-policy", "Balancing policy").Optional().DefaultValue("use-all-nodes").RequiredArgument("(use-all-nodes|prefer-local-dc|prefer-primary-pile)") .StoreResult(&balancingPolicy); opts.AddHelpOption('h'); - opts.SetFreeArgsMin(1); + opts.SetFreeArgsMin(0); opts.SetFreeArgTitle(0, "", GetCmdList()); opts.ArgPermutation_ = NLastGetopt::REQUIRE_ORDER; + // Run-phase options (--read-rps, --write-rps, …) reach DoMain when the + // caller invokes the workload without an explicit subcommand (the v2 SLO + // action contract). Tolerate them here so the global parser stops at the + // first unknown option instead of erroring; they are forwarded to the + // run phase below. + opts.AllowUnknownLongOptions_ = true; TOptsParseResult res(&opts, argc, argv); size_t freeArgsPos = res.GetFreeArgsPos(); argc -= freeArgsPos; argv += freeArgsPos; - ECommandType command = ParseCommand(*argv); + + ECommandType command = (argc > 0) ? ParseCommand(*argv) : ECommandType::All; if (command == ECommandType::Unknown) { - Cerr << "Unknown command '" << *argv << "'" << Endl; - return EXIT_FAILURE; + if (argv[0][0] == '-') { + // First leftover token is an option, not a subcommand keyword: + // treat as implicit All mode and let the run phase parse it. + command = ECommandType::All; + } else { + Cerr << "Unknown command '" << *argv << "'" << Endl; + return EXIT_FAILURE; + } } if (prefix.empty()) { prefix = GetDatabase(connectionString); } + if (prefix.empty()) { + // YDB SLO action sets YDB_CONNECTION_STRING in path form + // (grpc://host:port/Root/testdb), which GetDatabase can't parse. + // Fall back to YDB_DATABASE which the action sets alongside it. + prefix = GetEnv("YDB_DATABASE"); + } if (!ParseToken(token, tokenFile)) { return EXIT_FAILURE; @@ -202,6 +243,45 @@ int DoMain(int argc, char** argv, TCreateCommand create, TRunCommand run, TClean Cout << "Launching cleanup command..." << Endl; result = cleanup(dbOptions, argc); break; + case ECommandType::All: { + Cout << "Launching full lifecycle: create -> run -> cleanup" << Endl; + // Forward leftover argv to the run phase so options like + // --read-rps / --write-rps take effect. argv[0] here is the first + // run-phase option (no subcommand keyword was supplied), so + // prepend a synthetic program name for ParseOptionsRun. + char programName[] = "slo"; + std::vector runArgv; + runArgv.reserve(argc + 2); + runArgv.push_back(programName); + for (int i = 0; i < argc; ++i) { + runArgv.push_back(argv[i]); + } + runArgv.push_back(nullptr); + int fakeArgc = 1; + char* fakeArgv[] = { programName, nullptr }; + + Cout << "[all] Launching create command..." << Endl; + result = create(dbOptions, fakeArgc, fakeArgv); + if (!result) { + Cout << "[all] Launching run command..." << Endl; + result = run(dbOptions, static_cast(runArgv.size() - 1), runArgv.data()); + } + Cout << "[all] Launching cleanup command..." << Endl; + int cleanupRc = cleanup(dbOptions, fakeArgc); + // Cleanup runs while chaos-monkey is still killing nodes, so a + // DropTable failure here is expected noise and must not mask a + // successful run. Surface the run's status; only fall back to + // the cleanup status when run itself failed and we have nothing + // else to report. + if (cleanupRc && !result) { + Cerr << "[all] Warning: cleanup failed (exit " << cleanupRc + << ") but run succeeded; ignoring cleanup exit code." << Endl; + } else if (cleanupRc) { + Cerr << "[all] Warning: cleanup failed (exit " << cleanupRc + << "); preserving earlier run failure." << Endl; + } + break; + } default: Cerr << "Unknown command" << Endl; return EXIT_FAILURE; @@ -216,7 +296,7 @@ int DoMain(int argc, char** argv, TCreateCommand create, TRunCommand run, TClean } std::string GetCmdList() { - return "create, run, cleanup"; + return "create, run, cleanup (omit to run create -> run -> cleanup in one process)"; } ECommandType ParseCommand(const char* cmd) { @@ -425,6 +505,11 @@ TTableStats GetTableStats(TDatabaseOptions& dbOptions, const std::string& tableN } void ParseOptionsCommon(TOpts& opts, TCommonOptions& options) { + std::string metricsPushUrlFromEnv = GetEnv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"); + if (!metricsPushUrlFromEnv.empty()) { + options.MetricsPushUrl = metricsPushUrlFromEnv; + } + opts.AddLongOption("threads", "Number of threads to use").RequiredArgument("NUM") .DefaultValue(options.MaxInputThreads).StoreResult(&options.MaxInputThreads); opts.AddLongOption("stop-on-error", "Stop thread if an error occured").NoArgument() @@ -437,11 +522,6 @@ void ParseOptionsCommon(TOpts& opts, TCommonOptions& options) { .SetFlag(&options.DontPushMetrics).DefaultValue(options.DontPushMetrics); opts.AddLongOption("metrics-push-url", "URL to push metrics").RequiredArgument("URL") .DefaultValue(options.MetricsPushUrl).StoreResult(&options.MetricsPushUrl); - opts.AddLongOption("app-timeout", "Use application timeout (over SDK)").NoArgument() - .SetFlag(&options.UseApplicationTimeout).DefaultValue(options.UseApplicationTimeout); - opts.AddLongOption("prevention-request", "Send prevention request at 1/2 of timeout").NoArgument() - .SetFlag(&options.SendPreventiveRequest).DefaultValue(options.SendPreventiveRequest); - opts.MutuallyExclusive("dont-push", "metrics-push-url"); } @@ -485,6 +565,18 @@ bool ParseOptionsCreate(int argc, char** argv, TCreateOptions& createOptions) { bool ParseOptionsRun(int argc, char** argv, TRunOptions& runOptions) { TOpts opts = TOpts::Default(); ParseOptionsCommon(opts, runOptions.CommonOptions); + + if (std::string workloadDuration = GetEnv("WORKLOAD_DURATION"); !workloadDuration.empty()) { + try { + std::uint32_t parsed = FromString(workloadDuration); + if (parsed > 0) { + runOptions.CommonOptions.SecondsToRun = parsed; + } + } catch (const std::exception& e) { + Cerr << "Invalid WORKLOAD_DURATION env value '" << workloadDuration << "': " << e.what() << Endl; + } + } + opts.AddLongOption("time", "Time to run (Seconds)").RequiredArgument("Seconds") .DefaultValue(runOptions.CommonOptions.SecondsToRun).StoreResult(&runOptions.CommonOptions.SecondsToRun); opts.AddLongOption("read-rps", "Request generation rate for read requests (Thread A)").RequiredArgument("NUM") diff --git a/tests/slo_workloads/utils/utils.h b/tests/slo_workloads/utils/utils.h index 65be9f4891..28576adaad 100644 --- a/tests/slo_workloads/utils/utils.h +++ b/tests/slo_workloads/utils/utils.h @@ -49,8 +49,6 @@ struct TCommonOptions { std::uint32_t MaxRetries = 50; TDuration ReactionTime = DefaultReactionTime; bool StopOnError = false; - bool UseApplicationTimeout = false; - bool SendPreventiveRequest = false; // Generator options: std::uint32_t MinLength = 20; @@ -98,7 +96,8 @@ enum class ECommandType { Unknown, Create, Run, - Cleanup + Cleanup, + All, // No free-arg passed: execute Create -> Run -> Cleanup in one process }; struct TTableStats { @@ -117,29 +116,6 @@ ECommandType ParseCommand(const char* cmd); std::string JoinPath(const std::string& prefix, const std::string& path); -inline void RetryBackoff( - NYdb::NTable::TTableClient& client, - std::uint32_t retries, - const NYdb::NTable::TTableClient::TOperationSyncFunc& func -) { - TDuration delay = TDuration::Seconds(5); - while (retries) { - NYdb::TStatus status = client.RetryOperationSync(func); - if (status.IsSuccess()) { - return; - } - --retries; - if (!retries) { - Cerr << "Create request failed after all retries." << Endl; - Cerr << status << Endl; - NYdb::NStatusHelpers::ThrowOnError(status); - } - Cerr << "Create request failed. Sleeping for " << delay << Endl; - Sleep(delay); - delay *= 2; - } -} - std::string GenerateRandomString(std::uint32_t minLength, std::uint32_t maxLength); NYdb::TParams PackValuesToParamsAsList(const std::vector& items, const std::string name = "$items");