From c47b9487aeef5c4a2065496e41b8ffd37e2cb000 Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Mon, 2 Mar 2026 14:49:07 -0500
Subject: [PATCH 1/9] Initial pass at ThrottledRetryingRunner
---
.../test/ThrottledRetryingRunner.java | 448 ++++++++++++++++++
.../test/TransactionalRunner.java | 146 ++++++
2 files changed, 594 insertions(+)
create mode 100644 fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
create mode 100644 fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java
diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
new file mode 100644
index 0000000000..1a6cdb3d2d
--- /dev/null
+++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
@@ -0,0 +1,448 @@
+/*
+ * ThrottledRetryingRunner.java
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apple.foundationdb.test;
+
+import com.apple.foundationdb.Database;
+import com.apple.foundationdb.Transaction;
+import com.apple.foundationdb.async.AsyncUtil;
+import com.apple.foundationdb.async.MoreAsyncUtil;
+
+import javax.annotation.Nonnull;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+/**
+ * Runs a task repeatedly across multiple transactions, with adaptive limit management, optional
+ * rate limiting, and retry logic.
+ *
+ * The caller supplies a {@link BiFunction} that receives a fresh {@link Transaction} and a
+ * {@link QuotaManager}, and returns a {@code CompletableFuture}. The task reads
+ * {@link QuotaManager#getLimit()} to know how much work to attempt in this transaction, reports
+ * how many items it processed via {@link QuotaManager#processedCountInc()} (and optionally
+ * {@link QuotaManager#deletedCountInc()}), and signals completion by calling
+ * {@link QuotaManager#markExhausted()}.
+ *
+ *
+ * The runner adjusts the limit across transactions:
+ *
+ *
+ * - After {@code increaseLimitAfter} consecutive successes the limit is increased.
+ * - After any failure the limit is decreased based on how many items were processed before
+ * the failure, so the next attempt is less likely to exceed the transaction budget.
+ *
+ *
+ * A limit of {@code 0} is treated as "no limit" — the task may do as much work as it likes in
+ * each transaction and the limit is never adjusted.
+ *
+ *
+ * Between successful transactions the runner can optionally delay to enforce a maximum item
+ * processing or deletion rate ({@code maxItemsScannedPerSec} / {@code maxItemsDeletedPerSec}).
+ * On failure the transaction is retried up to {@code numOfRetries} times before failing.
+ *
+ */
+public class ThrottledRetryingRunner implements AutoCloseable {
+
+ public static final int DEFAULT_NUM_RETRIES = 100;
+ public static final int DEFAULT_INCREASE_LIMIT_AFTER = 40;
+
+ @Nonnull
+ private final TransactionalRunner transactionalRunner;
+ @Nonnull
+ private final ScheduledExecutorService scheduledExecutor;
+ private final int maxItemsScannedPerSec;
+ private final int maxItemsDeletedPerSec;
+ private final int numOfRetries;
+ private final boolean commitWhenDone;
+ private final int increaseLimitAfter;
+ private final int maxLimit;
+
+ private boolean closed = false;
+ private long transactionStartTimeMillis = 0;
+ private int failureRetriesCounter = 0;
+ private int consecutiveSuccessCount = 0;
+
+ private ThrottledRetryingRunner(Builder builder) {
+ this.transactionalRunner = new TransactionalRunner(builder.database);
+ this.scheduledExecutor = builder.scheduledExecutor;
+ this.maxItemsScannedPerSec = builder.maxItemsScannedPerSec;
+ this.maxItemsDeletedPerSec = builder.maxItemsDeletedPerSec;
+ this.numOfRetries = builder.numOfRetries;
+ this.commitWhenDone = builder.commitWhenDone;
+ this.increaseLimitAfter = builder.increaseLimitAfter;
+ this.maxLimit = builder.maxLimit;
+ }
+
+ /**
+ * Run the given task repeatedly across multiple transactions until it calls
+ * {@link QuotaManager#markExhausted()}.
+ *
+ * A single {@link QuotaManager} is created for the lifetime of this call. Its per-transaction
+ * counts ({@link QuotaManager#getProcessedCount()}, {@link QuotaManager#getDeletedCount()})
+ * are reset at the start of each transaction. Its limit ({@link QuotaManager#getLimit()}) is
+ * adjusted by the runner after each transaction and persists across them.
+ *
+ *
+ * @param task the task to run; reads the limit, reports counts, calls
+ * {@link QuotaManager#markExhausted()} to stop
+ * @return a future that completes normally when the task exhausts the source, or exceptionally
+ * if the retry limit is exceeded or the runner is closed
+ */
+ @Nonnull
+ public CompletableFuture iterateAll(
+ @Nonnull BiFunction super Transaction, QuotaManager, CompletableFuture> task) {
+ if (closed) {
+ return CompletableFuture.failedFuture(new TransactionalRunner.RunnerClosed());
+ }
+
+ final QuotaManager quotaManager = new QuotaManager(maxLimit);
+ return AsyncUtil.whileTrue(() ->
+ runOneTransaction(task, quotaManager)
+ .handle((ignored, ex) -> {
+ if (ex == null) {
+ return handleSuccess(quotaManager);
+ }
+ return handleFailure(ex, quotaManager);
+ })
+ .thenCompose(ret -> ret));
+ }
+
+ private CompletableFuture runOneTransaction(
+ @Nonnull BiFunction super Transaction, QuotaManager, CompletableFuture> task,
+ @Nonnull QuotaManager quotaManager) {
+ transactionStartTimeMillis = System.currentTimeMillis();
+ return transactionalRunner.runAsync(commitWhenDone, transaction -> {
+ quotaManager.initTransaction();
+ return task.apply(transaction, quotaManager);
+ });
+ }
+
+ private CompletableFuture handleSuccess(QuotaManager quotaManager) {
+ failureRetriesCounter = 0;
+ ++consecutiveSuccessCount;
+
+ // Increase limit after enough consecutive successes, but only when a limit is active
+ if (quotaManager.limit > 0 && consecutiveSuccessCount >= increaseLimitAfter) {
+ quotaManager.increaseLimit();
+ consecutiveSuccessCount = 0;
+ }
+
+ if (!quotaManager.hasMore) {
+ return AsyncUtil.READY_FALSE;
+ }
+
+ long elapsedMillis = Math.max(0, System.currentTimeMillis() - transactionStartTimeMillis);
+ long delayMillis = Collections.max(List.of(
+ throttlePerSecDelayMillis(elapsedMillis, maxItemsScannedPerSec, quotaManager.processedCount),
+ throttlePerSecDelayMillis(elapsedMillis, maxItemsDeletedPerSec, quotaManager.deletedCount)
+ ));
+
+ if (delayMillis > 0) {
+ return MoreAsyncUtil.delayedFuture(delayMillis, TimeUnit.MILLISECONDS, scheduledExecutor)
+ .thenApply(ignore -> true);
+ }
+ return AsyncUtil.READY_TRUE;
+ }
+
+ private CompletableFuture handleFailure(Throwable ex, QuotaManager quotaManager) {
+ ++failureRetriesCounter;
+ consecutiveSuccessCount = 0;
+
+ if (ex instanceof CompletionException) {
+ ex = ex.getCause();
+ }
+ if (ex instanceof TransactionalRunner.RunnerClosed) {
+ return CompletableFuture.failedFuture(ex);
+ }
+ if (failureRetriesCounter > numOfRetries) {
+ return CompletableFuture.failedFuture(ex);
+ }
+
+ // Decrease the limit based on how much work was in-flight when the failure occurred,
+ // so the next attempt is less likely to exceed the transaction budget.
+ quotaManager.decreaseLimit();
+
+ return AsyncUtil.READY_TRUE; // retry
+ }
+
+ /**
+ * Compute how long to wait to keep the event rate at or below {@code maxPerSec}.
+ * Matches the formula used by {@code ThrottledRetryingIterator}.
+ */
+ static long throttlePerSecDelayMillis(long elapsedMillis, int maxPerSec, int eventCount) {
+ if (maxPerSec <= 0) {
+ return 0;
+ }
+ long waitMillis = (TimeUnit.SECONDS.toMillis(eventCount) / maxPerSec) - elapsedMillis;
+ return waitMillis > 0 ? waitMillis : 0;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ transactionalRunner.close();
+ }
+
+ /**
+ * Tracks per-transaction resource usage, exposes the adaptive limit, and controls whether
+ * the loop continues.
+ *
+ * One instance is created per {@link #iterateAll} call. Per-transaction counts are reset at
+ * the start of each transaction; the limit persists and is adjusted by the runner between
+ * transactions.
+ *
+ *
+ * The task should call {@link #getLimit()} at the start of each transaction to know how much
+ * work to attempt, increment the counts as it goes, and call {@link #markExhausted()} when
+ * the source is fully consumed.
+ *
+ */
+ public static class QuotaManager {
+ private int processedCount;
+ private int deletedCount;
+ private boolean hasMore;
+ private int limit;
+ private int lastProcessedCount;
+ private final int maxLimit;
+
+ QuotaManager(int maxLimit) {
+ this.maxLimit = maxLimit;
+ this.limit = 0;
+ this.lastProcessedCount = 0;
+ }
+
+ /**
+ * Return the current limit.
+ *
+ * The task should attempt at most this many units of work in the current transaction.
+ * A value of {@code 0} means no limit is active.
+ *
+ */
+ public int getLimit() {
+ return limit;
+ }
+
+ /**
+ * Return the number of items processed in the current transaction.
+ */
+ public int getProcessedCount() {
+ return processedCount;
+ }
+
+ /**
+ * Return the number of items deleted in the current transaction.
+ */
+ public int getDeletedCount() {
+ return deletedCount;
+ }
+
+ /**
+ * Increment the processed-item count by {@code count}.
+ *
+ * @param count number of items to add
+ */
+ public void processedCountAdd(int count) {
+ processedCount += count;
+ }
+
+ /**
+ * Increment the processed-item count by 1.
+ */
+ public void processedCountInc() {
+ processedCount++;
+ }
+
+ /**
+ * Increment the deleted-item count by {@code count}.
+ *
+ * @param count number of items to add
+ */
+ public void deletedCountAdd(int count) {
+ deletedCount += count;
+ }
+
+ /**
+ * Increment the deleted-item count by 1.
+ */
+ public void deletedCountInc() {
+ deletedCount++;
+ }
+
+ /**
+ * Signal that the source is exhausted. The loop will stop after the current transaction
+ * commits, without starting a new one.
+ */
+ public void markExhausted() {
+ hasMore = false;
+ }
+
+ void initTransaction() {
+ lastProcessedCount = processedCount;
+ processedCount = 0;
+ deletedCount = 0;
+ hasMore = true;
+ }
+
+ void increaseLimit() {
+ if (limit == 0) {
+ return; // no-limit mode: never adjust
+ }
+ final int increased = Math.max((limit * 5) / 4, limit + 4);
+ limit = maxLimit > 0 ? Math.min(maxLimit, increased) : increased;
+ }
+
+ void decreaseLimit() {
+ if (limit == 0) {
+ return; // no-limit mode: never adjust
+ }
+ limit = Math.max(1, (lastProcessedCount * 9) / 10);
+ }
+ }
+
+ /**
+ * Create a new builder for a {@link ThrottledRetryingRunner}.
+ *
+ * @param database the database to open transactions against
+ * @param scheduledExecutor executor used for throttle delays between transactions
+ * @return a new builder
+ */
+ public static Builder builder(@Nonnull Database database,
+ @Nonnull ScheduledExecutorService scheduledExecutor) {
+ return new Builder(database, scheduledExecutor);
+ }
+
+ /**
+ * Builder for {@link ThrottledRetryingRunner}.
+ */
+ public static class Builder {
+ @Nonnull
+ private final Database database;
+ @Nonnull
+ private final ScheduledExecutorService scheduledExecutor;
+ private int maxItemsScannedPerSec = 0;
+ private int maxItemsDeletedPerSec = 0;
+ private int numOfRetries = DEFAULT_NUM_RETRIES;
+ private boolean commitWhenDone = true;
+ private int increaseLimitAfter = DEFAULT_INCREASE_LIMIT_AFTER;
+ private int maxLimit = 0;
+
+ private Builder(@Nonnull Database database, @Nonnull ScheduledExecutorService scheduledExecutor) {
+ this.database = database;
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ /**
+ * Set the maximum number of items that may be scanned (processed) per second.
+ * The runner will delay between transactions to stay at or below this rate.
+ * A value of {@code 0} (the default) disables scanned-item throttling.
+ *
+ * @param maxItemsScannedPerSec the rate limit; {@code 0} disables it
+ * @return this builder
+ */
+ public Builder withMaxItemsScannedPerSec(int maxItemsScannedPerSec) {
+ this.maxItemsScannedPerSec = Math.max(0, maxItemsScannedPerSec);
+ return this;
+ }
+
+ /**
+ * Set the maximum number of items that may be deleted per second.
+ * The runner will delay between transactions to stay at or below this rate.
+ * A value of {@code 0} (the default) disables deleted-item throttling.
+ *
+ * @param maxItemsDeletedPerSec the rate limit; {@code 0} disables it
+ * @return this builder
+ */
+ public Builder withMaxItemsDeletedPerSec(int maxItemsDeletedPerSec) {
+ this.maxItemsDeletedPerSec = Math.max(0, maxItemsDeletedPerSec);
+ return this;
+ }
+
+ /**
+ * Set the number of consecutive successful transactions required before the limit is
+ * increased. Defaults to {@value DEFAULT_INCREASE_LIMIT_AFTER}.
+ *
+ * @param increaseLimitAfter consecutive successes before a limit increase
+ * @return this builder
+ */
+ public Builder withIncreaseLimitAfter(int increaseLimitAfter) {
+ this.increaseLimitAfter = Math.max(1, increaseLimitAfter);
+ return this;
+ }
+
+ /**
+ * Set the initial and maximum per-transaction limit passed to the task via
+ * {@link QuotaManager#getLimit()}.
+ *
+ * The limit starts at this value. It will not be increased beyond {@code maxLimit} on
+ * success. If {@code maxLimit} is {@code 0} (the default) the limit feature is disabled:
+ * {@link QuotaManager#getLimit()} always returns {@code 0} and is never adjusted.
+ *
+ *
+ * @param maxLimit the initial and maximum limit; {@code 0} disables limit management
+ * @return this builder
+ */
+ public Builder withMaxLimit(int maxLimit) {
+ this.maxLimit = Math.max(0, maxLimit);
+ return this;
+ }
+
+ /**
+ * Set the maximum number of times to retry a failed transaction before giving up.
+ * The counter resets after each successful commit. Defaults to {@value DEFAULT_NUM_RETRIES}.
+ *
+ * @param numOfRetries maximum retries
+ * @return this builder
+ */
+ public Builder withNumOfRetries(int numOfRetries) {
+ this.numOfRetries = Math.max(0, numOfRetries);
+ return this;
+ }
+
+ /**
+ * Set whether to commit each transaction when the task returns.
+ * If {@code false}, transactions are rolled back instead of committed (useful for read-only tasks).
+ * Defaults to {@code true}.
+ *
+ * @param commitWhenDone {@code true} to commit, {@code false} to roll back
+ * @return this builder
+ */
+ public Builder withCommitWhenDone(boolean commitWhenDone) {
+ this.commitWhenDone = commitWhenDone;
+ return this;
+ }
+
+ /**
+ * Build the {@link ThrottledRetryingRunner}.
+ *
+ * @return a new runner
+ */
+ public ThrottledRetryingRunner build() {
+ return new ThrottledRetryingRunner(this);
+ }
+ }
+}
diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java
new file mode 100644
index 0000000000..e35df9b81c
--- /dev/null
+++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java
@@ -0,0 +1,146 @@
+/*
+ * TransactionalRunner.java
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apple.foundationdb.test;
+
+import com.apple.foundationdb.Database;
+import com.apple.foundationdb.Transaction;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A simple runner that opens and commits raw FDB transactions and ensures they are all closed on runner close.
+ *
+ * This is the raw-FDB analog of the record-layer {@code TransactionalRunner}. It operates at the
+ * {@link Database}/{@link Transaction} level, with no record-layer dependencies.
+ *
+ */
+public class TransactionalRunner implements AutoCloseable {
+
+ @Nonnull
+ private final Database database;
+ private boolean closed;
+ @Nonnull
+ private final List transactionsToClose;
+
+ public TransactionalRunner(@Nonnull Database database) {
+ this.database = database;
+ this.transactionsToClose = new ArrayList<>();
+ }
+
+ /**
+ * Run some code with a transaction, then commit it.
+ *
+ * The transaction will be committed if the future returned by the runnable completes successfully;
+ * otherwise it will not be committed. Either way, the transaction is closed when the future completes.
+ * If this runner is {@link #close() closed}, any open transaction will also be closed.
+ *
+ *
+ * @param runnable code to run with a fresh {@link Transaction}
+ * @param the type of value returned by the future
+ * @return a future containing the result, after the transaction has been committed
+ */
+ @Nonnull
+ @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"})
+ public CompletableFuture runAsync(@Nonnull Function super Transaction, CompletableFuture extends T>> runnable) {
+ return runAsync(true, runnable);
+ }
+
+ /**
+ * A flavor of {@link #runAsync(Function)} that supports read-only (non-committing) transactions.
+ *
+ * @param commitWhenDone if {@code true} the transaction is committed on success; if {@code false} it is only closed
+ * @param runnable code to run with a fresh {@link Transaction}
+ * @param the type of value returned by the future
+ * @return a future containing the result; if {@code commitWhenDone} is {@code true}, after the transaction has been committed
+ */
+ @Nonnull
+ @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"})
+ public CompletableFuture runAsync(boolean commitWhenDone,
+ @Nonnull Function super Transaction, CompletableFuture extends T>> runnable) {
+ Transaction transaction = openTransaction();
+ boolean returnedFuture = false;
+ try {
+ CompletableFuture future = runnable.apply(transaction)
+ .thenCompose(val -> {
+ if (commitWhenDone) {
+ return transaction.commit().thenApply(vignore -> val);
+ } else {
+ return CompletableFuture.completedFuture(val);
+ }
+ });
+ returnedFuture = true;
+ return future.whenComplete((result, exception) -> transaction.close());
+ } finally {
+ if (!returnedFuture) {
+ transaction.close();
+ }
+ }
+ }
+
+ /**
+ * Open a new transaction that will be closed when this runner is closed.
+ *
+ * @return a new {@link Transaction}
+ * @throws RunnerClosed if this runner has already been closed
+ */
+ @Nonnull
+ public Transaction openTransaction() {
+ if (closed) {
+ throw new RunnerClosed();
+ }
+ Transaction transaction = database.createTransaction();
+ addTransactionToClose(transaction);
+ return transaction;
+ }
+
+ private synchronized void addTransactionToClose(@Nonnull Transaction transaction) {
+ if (closed) {
+ transaction.close();
+ throw new RunnerClosed();
+ }
+ transactionsToClose.add(transaction);
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ transactionsToClose.forEach(Transaction::close);
+ transactionsToClose.clear();
+ this.closed = true;
+ }
+
+ /**
+ * Exception thrown when an operation is attempted on a closed runner.
+ */
+ public static class RunnerClosed extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public RunnerClosed() {
+ super("Runner has been closed");
+ }
+ }
+}
From 85181edd9faf1405f8a6a7bb213e705a73c133c2 Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Wed, 4 Mar 2026 17:03:59 -0500
Subject: [PATCH 2/9] Add some tests of ThrottledRetryingRunnerTest
---
.../test/ThrottledRetryingRunnerTest.java | 641 ++++++++++++++++++
.../test/ThrottledRetryingRunner.java | 8 +-
2 files changed, 644 insertions(+), 5 deletions(-)
create mode 100644 fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
new file mode 100644
index 0000000000..7a6b4cbee2
--- /dev/null
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
@@ -0,0 +1,641 @@
+/*
+ * ThrottledRetryingRunnerTest.java
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apple.foundationdb.test;
+
+import com.apple.foundationdb.Database;
+import com.apple.foundationdb.Transaction;
+import com.apple.foundationdb.subspace.Subspace;
+import com.apple.foundationdb.tuple.Tuple;
+import com.apple.test.Tags;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowableOfType;
+
+@Tag(Tags.RequiresFDB)
+class ThrottledRetryingRunnerTest {
+ @RegisterExtension
+ static final TestDatabaseExtension dbExtension = new TestDatabaseExtension();
+ @RegisterExtension
+ TestSubspaceExtension subspaceExtension = new TestSubspaceExtension(dbExtension);
+
+ private Database db;
+ private Subspace subspace;
+ private ScheduledExecutorService scheduledExecutor;
+
+ @BeforeEach
+ void setUp() {
+ db = dbExtension.getDatabase();
+ subspace = subspaceExtension.getSubspace();
+ scheduledExecutor = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryBuilder().setNameFormat("throttled-runner-test-%d").build());
+ }
+
+ @AfterEach
+ void tearDown() {
+ scheduledExecutor.shutdown();
+ }
+
+ // -------------------------------------------------------------------------
+ // throttlePerSecDelayMillis static helper
+ // -------------------------------------------------------------------------
+
+ @ParameterizedTest
+ @CsvSource({
+ // elapsedMs, maxPerSec, eventCount, expectedDelayMs
+ "1000, 100, 0, 0", // no events: no delay
+ "1000, 100, 100, 0", // exactly right rate: no delay
+ "1000, 100, 80, 0", // below rate: no delay
+ "1000, 100, 200, 1000", // double the rate: 1s delay
+ " 250, 50, 100, 1750", // 100 events should take 2s, only 250ms elapsed
+ " 1, 50, 100, 1999", // nearly instant: almost full 2s delay
+ " 0, 100, 0, 0", // zero events, no limit
+ "1000, 0, 999, 0", // maxPerSec=0 means disabled
+ })
+ void testThrottlePerSecDelayMillis(long elapsedMs, int maxPerSec, int eventCount, long expectedDelay) {
+ assertThat(ThrottledRetryingRunner.throttlePerSecDelayMillis(elapsedMs, maxPerSec, eventCount))
+ .isEqualTo(expectedDelay);
+ }
+
+ // -------------------------------------------------------------------------
+ // QuotaManager limit adjustment
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testIncreaseLimitFormula() {
+ // limit=0 is no-limit mode; increaseLimit is a no-op
+ ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(0);
+ qm.increaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(0);
+
+ // With a real maxLimit, decreaseLimit reads processedCount directly (before initTransaction)
+ ThrottledRetryingRunner.QuotaManager qm2 = new ThrottledRetryingRunner.QuotaManager(1000);
+ qm2.processedCountAdd(80);
+ qm2.decreaseLimit(); // limit = max(1, 80*9/10) = 72
+ assertThat(qm2.getLimit()).isEqualTo(72);
+ qm2.increaseLimit(); // limit = max(72*5/4, 72+4) = max(90, 76) = 90
+ assertThat(qm2.getLimit()).isEqualTo(90);
+ }
+
+ @Test
+ void testDecreaseLimitFormula() {
+ ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(1000);
+ // Process 100 items, then fail → limit = max(1, 100*9/10) = 90
+ qm.processedCountAdd(100);
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(90);
+
+ // Process 10 items, fail again → max(1, 10*9/10) = 9
+ qm.initTransaction();
+ qm.processedCountAdd(10);
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(9);
+
+ // Decrease floors at 1
+ qm.initTransaction();
+ qm.processedCountInc();
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(1);
+
+ // Zero processed also floors at 1
+ qm.initTransaction();
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(1);
+ }
+
+ @Test
+ void testDecreaseLimitNoOpWhenLimitIsZero() {
+ ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(0);
+ qm.processedCountAdd(100);
+ qm.initTransaction();
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(0);
+ }
+
+ @Test
+ void testLimitCappedAtMaxLimit() {
+ ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(50);
+ // Drive limit down by simulating a failure with 40 in-flight items
+ qm.processedCountAdd(40);
+ qm.decreaseLimit(); // limit = max(1, 40*9/10) = 36
+ // Keep increasing until we hit the cap
+ for (int i = 0; i < 100; i++) {
+ qm.increaseLimit();
+ }
+ assertThat(qm.getLimit()).isEqualTo(50); // capped at maxLimit
+ }
+
+ // -------------------------------------------------------------------------
+ // Basic iteration: task runs the expected number of times
+ // -------------------------------------------------------------------------
+
+ @Test
+ void taskRunsUntilExhausted() {
+ final int iterations = 5;
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota) -> {
+ int n = callCount.incrementAndGet();
+ if (n >= iterations) {
+ quota.markExhausted();
+ }
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(callCount.get()).isEqualTo(iterations);
+ }
+
+ @Test
+ void taskExhaustedImmediatelyRunsOnce() {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota) -> {
+ callCount.incrementAndGet();
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(callCount.get()).isEqualTo(1);
+ }
+
+ // -------------------------------------------------------------------------
+ // cannotRunWhenClosed
+ // -------------------------------------------------------------------------
+
+ @Test
+ void cannotRunWhenClosed() {
+ ThrottledRetryingRunner runner = runnerBuilder().build();
+ runner.iterateAll((tr, quota) -> {
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ runner.close();
+
+ assertThatThrownBy(() ->
+ runner.iterateAll((tr, quota) -> {
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join())
+ .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class);
+ }
+
+ // -------------------------------------------------------------------------
+ // Retries on failure
+ // -------------------------------------------------------------------------
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 3, 10})
+ void retriesCorrectNumberOfTimes(int numRetries) {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(numRetries)
+ .build()) {
+ Throwable ex = catchThrowableOfType(RuntimeException.class, () ->
+ runner.iterateAll((tr, quota) -> {
+ callCount.incrementAndGet();
+ return CompletableFuture.failedFuture(new RuntimeException("always fails"));
+ }).join());
+
+ assertThat(ex).isNotNull();
+ assertThat(ex.getMessage()).contains("always fails");
+ }
+
+ // First attempt + numRetries retries
+ assertThat(callCount.get()).isEqualTo(numRetries + 1);
+ }
+
+ @Test
+ void retryCounterResetsAfterSuccess() {
+ // Fail twice, succeed, then fail twice again — should not exhaust the retry budget
+ // because the counter resets on each success.
+ AtomicInteger totalCalls = new AtomicInteger(0);
+ AtomicInteger successCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(2)
+ .build()) {
+ runner.iterateAll((tr, quota) -> {
+ int call = totalCalls.incrementAndGet();
+ // Fail on calls 1 and 2
+ if (call == 1 || call == 2) {
+ return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ }
+ // Succeed on call 3 (resets counter), then fail on 4 and 5, succeed on 6, done
+ if (call == 4 || call == 5) {
+ return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ }
+ successCount.incrementAndGet();
+ if (successCount.get() == 2) {
+ quota.markExhausted();
+ }
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(successCount.get()).isEqualTo(2);
+ }
+
+ @Test
+ void runnerClosedDuringIterationAbortsImmediately() {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(100).build();
+ runner.close(); // close before iterating
+
+ assertThatThrownBy(() ->
+ runner.iterateAll((tr, quota) -> {
+ callCount.incrementAndGet();
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join())
+ .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class);
+
+ // iterateAll checks closed before starting, so the task is never called
+ assertThat(callCount.get()).isEqualTo(0);
+ }
+
+ // -------------------------------------------------------------------------
+ // Counts: processedCount and deletedCount
+ // -------------------------------------------------------------------------
+
+ @Test
+ void countsAreResetEachTransaction() {
+ // Each transaction should see processedCount starting at 0
+ List observedCounts = new ArrayList<>();
+ AtomicInteger totalCalls = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota) -> {
+ observedCounts.add(quota.getProcessedCount()); // always 0 at start
+ quota.processedCountAdd(10);
+ if (totalCalls.incrementAndGet() >= 3) {
+ quota.markExhausted();
+ }
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(observedCounts).containsExactly(0, 0, 0);
+ }
+
+ @Test
+ void processedAndDeletedCountsAreIndependent() {
+ AtomicInteger observedProcessed = new AtomicInteger(-1);
+ AtomicInteger observedDeleted = new AtomicInteger(-1);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota) -> {
+ quota.processedCountAdd(7);
+ quota.deletedCountAdd(3);
+ observedProcessed.set(quota.getProcessedCount());
+ observedDeleted.set(quota.getDeletedCount());
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(observedProcessed.get()).isEqualTo(7);
+ assertThat(observedDeleted.get()).isEqualTo(3);
+ }
+
+ // -------------------------------------------------------------------------
+ // Limit: adaptive adjustment
+ // -------------------------------------------------------------------------
+
+ @Test
+ void limitStartsAtMaxLimitAndIsExposedToTask() {
+ AtomicInteger observedLimit = new AtomicInteger(-1);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withMaxLimit(50)
+ .build()) {
+ runner.iterateAll((tr, quota) -> {
+ observedLimit.set(quota.getLimit());
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(observedLimit.get()).isEqualTo(50);
+ }
+
+ @Test
+ void limitIsZeroWhenMaxLimitNotConfigured() {
+ AtomicInteger observedLimit = new AtomicInteger(-1);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota) -> {
+ observedLimit.set(quota.getLimit());
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(observedLimit.get()).isEqualTo(0);
+ }
+
+ @Test
+ void limitDecreasesOnFailureThenIncreasesOnSuccess() {
+ // On failure: limit = max(1, processedCount * 9/10)
+ // On success: after increaseLimitAfter consecutive successes, limit increases
+ AtomicInteger callCount = new AtomicInteger(0);
+ List observedLimits = new ArrayList<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withMaxLimit(200)
+ .withIncreaseLimitAfter(2) // increase after 2 successes to keep the test short
+ .withNumOfRetries(10)
+ .build()) {
+ runner.iterateAll((tr, quota) -> {
+ int call = callCount.incrementAndGet();
+ observedLimits.add(quota.getLimit());
+
+ if (call == 1) {
+ // Process 100 items and fail → next limit = 90
+ quota.processedCountAdd(100);
+ return CompletableFuture.failedFuture(new RuntimeException("fail"));
+ }
+ if (call <= 3) {
+ // Two successes → limit increases from 90
+ quota.processedCountInc();
+ return CompletableFuture.completedFuture(null); // keep going
+ }
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ // call 1: limit=200 (initial), fails with 100 processed → new limit = 90
+ assertThat(observedLimits.get(0)).isEqualTo(200);
+ // call 2: limit=90
+ assertThat(observedLimits.get(1)).isEqualTo(90);
+ // call 3: limit=90 still (need 2 consecutive successes before increase)
+ assertThat(observedLimits.get(2)).isEqualTo(90);
+ // call 4: after 2 successes, limit increased: max(90*5/4, 90+4) = max(112,94) = 112
+ assertThat(observedLimits.get(3)).isEqualTo(112);
+ }
+
+ @Test
+ void limitDecreasesProgressivelyTowardOne() {
+ // Each failure with very few processed items should converge the limit toward 1
+ AtomicInteger callCount = new AtomicInteger(0);
+ List observedLimits = new ArrayList<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withMaxLimit(1000)
+ .withNumOfRetries(20)
+ .build()) {
+ runner.iterateAll((tr, quota) -> {
+ int call = callCount.incrementAndGet();
+ observedLimits.add(quota.getLimit());
+ if (call <= 10) {
+ // Process 1 item each time → limit = max(1, 1*9/10) = 1 after first decrease
+ quota.processedCountInc();
+ return CompletableFuture.failedFuture(new RuntimeException("fail"));
+ }
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ // After first failure with 1 processed: max(1, 0) = 1
+ assertThat(observedLimits.get(1)).isEqualTo(1);
+ // Stays at 1 regardless of further failures
+ for (int i = 2; i < 10; i++) {
+ assertThat(observedLimits.get(i)).isEqualTo(1);
+ }
+ }
+
+ @Test
+ void limitIsNeverAdjustedWhenMaxLimitIsZero() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ List observedLimits = new ArrayList<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(5)
+ // no withMaxLimit → limit stays 0
+ .build()) {
+ runner.iterateAll((tr, quota) -> {
+ int call = callCount.incrementAndGet();
+ observedLimits.add(quota.getLimit());
+ quota.processedCountAdd(100);
+ if (call <= 3) {
+ return CompletableFuture.failedFuture(new RuntimeException("fail"));
+ }
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(observedLimits).containsOnly(0);
+ }
+
+ // -------------------------------------------------------------------------
+ // commitWhenDone: writes are visible (commit) or not (no commit)
+ // -------------------------------------------------------------------------
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void commitWhenDoneControlsWhetherWritesArePersisted(boolean commitWhenDone) {
+ byte[] key = subspace.pack(Tuple.from("commit-test"));
+ byte[] value = "hello".getBytes();
+
+ // Write a key inside iterateAll
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withCommitWhenDone(commitWhenDone)
+ .build()) {
+ runner.iterateAll((tr, quota) -> {
+ tr.set(key, value);
+ quota.markExhausted();
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ // Check whether the key is actually present
+ byte[] read = db.run(tr -> tr.get(key).join());
+ if (commitWhenDone) {
+ assertThat(read).isEqualTo(value);
+ } else {
+ assertThat(read).isNull();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Each transaction gets a fresh Transaction object
+ // -------------------------------------------------------------------------
+
+ @Test
+ void eachIterationReceivesADistinctTransaction() {
+ List seenTransactions = new ArrayList<>();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota) -> {
+ seenTransactions.add(tr);
+ if (callCount.incrementAndGet() >= 3) {
+ quota.markExhausted();
+ }
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(seenTransactions).hasSize(3);
+ // All three must be distinct objects
+ assertThat(seenTransactions.get(0)).isNotSameAs(seenTransactions.get(1));
+ assertThat(seenTransactions.get(1)).isNotSameAs(seenTransactions.get(2));
+ }
+
+ // -------------------------------------------------------------------------
+ // Rate limiting: elapsed time grows when throttling is applied
+ // -------------------------------------------------------------------------
+
+ @Test
+ void throttlingByScannedItemsSlowsItDown() {
+ // 3 transactions each processing 100 items at max 50/sec → each batch should take ≥2s
+ // Use a low count to keep the test fast: 3 transactions × 10 items at max 20/sec = ≥1.5s
+ final int itemsPerTransaction = 10;
+ final int maxPerSec = 20;
+ final int transactions = 3;
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ long start = System.currentTimeMillis();
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withMaxItemsScannedPerSec(maxPerSec)
+ .build()) {
+ runner.iterateAll((tr, quota) -> {
+ quota.processedCountAdd(itemsPerTransaction);
+ if (callCount.incrementAndGet() >= transactions) {
+ quota.markExhausted();
+ }
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+ long elapsed = System.currentTimeMillis() - start;
+
+ // totalItems / maxPerSec = (3*10)/20 = 1.5s minimum
+ long expectedMinMs = TimeUnit.SECONDS.toMillis(itemsPerTransaction * transactions / maxPerSec);
+ assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs);
+ }
+
+ @Test
+ void throttlingByDeletedItemsSlowsItDown() {
+ final int deletesPerTransaction = 10;
+ final int maxPerSec = 20;
+ final int transactions = 3;
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ long start = System.currentTimeMillis();
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withMaxItemsDeletedPerSec(maxPerSec)
+ .build()) {
+ runner.iterateAll((tr, quota) -> {
+ quota.deletedCountAdd(deletesPerTransaction);
+ if (callCount.incrementAndGet() >= transactions) {
+ quota.markExhausted();
+ }
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+ long elapsed = System.currentTimeMillis() - start;
+
+ long expectedMinMs = TimeUnit.SECONDS.toMillis(deletesPerTransaction * transactions / maxPerSec);
+ assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs);
+ }
+
+ @Test
+ void noThrottlingWithZeroRateLimit() {
+ // With no rate limit configured the runner should complete nearly instantly
+ // (well under 1 second for trivial work). We just assert it completes normally.
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota) -> {
+ quota.processedCountAdd(1000); // lots of items but no limit configured
+ if (callCount.incrementAndGet() >= 5) {
+ quota.markExhausted();
+ }
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ assertThat(callCount.get()).isEqualTo(5);
+ }
+
+ // -------------------------------------------------------------------------
+ // Real DB writes: data persisted across transactions within one iterateAll
+ // -------------------------------------------------------------------------
+
+ @Test
+ void dataWrittenInEachTransactionIsVisible() {
+ // Write one key per transaction, confirm all are present at the end
+ final int numTransactions = 5;
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota) -> {
+ int n = callCount.incrementAndGet();
+ tr.set(subspace.pack(Tuple.from(n)), Tuple.from(n).pack());
+ if (n >= numTransactions) {
+ quota.markExhausted();
+ }
+ return CompletableFuture.completedFuture(null);
+ }).join();
+ }
+
+ db.run(tr -> {
+ for (int i = 1; i <= numTransactions; i++) {
+ byte[] val = tr.get(subspace.pack(Tuple.from(i))).join();
+ assertThat(val).isEqualTo(Tuple.from(i).pack());
+ }
+ return null;
+ });
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ private ThrottledRetryingRunner.Builder runnerBuilder() {
+ return ThrottledRetryingRunner.builder(db, scheduledExecutor);
+ }
+}
diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
index 1a6cdb3d2d..c0131f421e 100644
--- a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
+++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
@@ -227,13 +227,11 @@ public static class QuotaManager {
private int deletedCount;
private boolean hasMore;
private int limit;
- private int lastProcessedCount;
private final int maxLimit;
QuotaManager(int maxLimit) {
this.maxLimit = maxLimit;
- this.limit = 0;
- this.lastProcessedCount = 0;
+ this.limit = maxLimit;
}
/**
@@ -302,7 +300,6 @@ public void markExhausted() {
}
void initTransaction() {
- lastProcessedCount = processedCount;
processedCount = 0;
deletedCount = 0;
hasMore = true;
@@ -320,7 +317,8 @@ void decreaseLimit() {
if (limit == 0) {
return; // no-limit mode: never adjust
}
- limit = Math.max(1, (lastProcessedCount * 9) / 10);
+ // Use the count from the transaction that just failed (before the next initTransaction resets it)
+ limit = Math.max(1, (processedCount * 9) / 10);
}
}
From 880cb58914d3f98d861cffadbdd3492fc79367d4 Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Wed, 4 Mar 2026 17:26:31 -0500
Subject: [PATCH 3/9] Change ThrottledRetryingRunner to return a continuation
---
.../test/ThrottledRetryingRunnerTest.java | 293 +++++++++++++-----
.../test/ThrottledRetryingRunner.java | 188 +++++++----
2 files changed, 343 insertions(+), 138 deletions(-)
diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
index 7a6b4cbee2..215f60944a 100644
--- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
@@ -42,6 +42,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -170,12 +171,11 @@ void taskRunsUntilExhausted() {
AtomicInteger callCount = new AtomicInteger(0);
try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
- runner.iterateAll((tr, quota) -> {
- int n = callCount.incrementAndGet();
- if (n >= iterations) {
- quota.markExhausted();
+ runner.iterateAll((tr, quota, cont) -> {
+ if (callCount.incrementAndGet() >= iterations) {
+ return exhausted();
}
- return CompletableFuture.completedFuture(null);
+ return hasMore();
}).join();
}
@@ -187,10 +187,9 @@ void taskExhaustedImmediatelyRunsOnce() {
AtomicInteger callCount = new AtomicInteger(0);
try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
callCount.incrementAndGet();
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
+ return exhausted();
}).join();
}
@@ -204,17 +203,11 @@ void taskExhaustedImmediatelyRunsOnce() {
@Test
void cannotRunWhenClosed() {
ThrottledRetryingRunner runner = runnerBuilder().build();
- runner.iterateAll((tr, quota) -> {
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
- }).join();
+ runner.iterateAll((tr, quota, cont) -> exhausted()).join();
runner.close();
assertThatThrownBy(() ->
- runner.iterateAll((tr, quota) -> {
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
- }).join())
+ runner.iterateAll((tr, quota, cont) -> exhausted()).join())
.hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class);
}
@@ -231,7 +224,7 @@ void retriesCorrectNumberOfTimes(int numRetries) {
.withNumOfRetries(numRetries)
.build()) {
Throwable ex = catchThrowableOfType(RuntimeException.class, () ->
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
callCount.incrementAndGet();
return CompletableFuture.failedFuture(new RuntimeException("always fails"));
}).join());
@@ -254,7 +247,7 @@ void retryCounterResetsAfterSuccess() {
try (ThrottledRetryingRunner runner = runnerBuilder()
.withNumOfRetries(2)
.build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
int call = totalCalls.incrementAndGet();
// Fail on calls 1 and 2
if (call == 1 || call == 2) {
@@ -266,9 +259,9 @@ void retryCounterResetsAfterSuccess() {
}
successCount.incrementAndGet();
if (successCount.get() == 2) {
- quota.markExhausted();
+ return exhausted();
}
- return CompletableFuture.completedFuture(null);
+ return hasMore();
}).join();
}
@@ -283,10 +276,9 @@ void runnerClosedDuringIterationAbortsImmediately() {
runner.close(); // close before iterating
assertThatThrownBy(() ->
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
callCount.incrementAndGet();
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
+ return exhausted();
}).join())
.hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class);
@@ -305,13 +297,13 @@ void countsAreResetEachTransaction() {
AtomicInteger totalCalls = new AtomicInteger(0);
try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
observedCounts.add(quota.getProcessedCount()); // always 0 at start
quota.processedCountAdd(10);
if (totalCalls.incrementAndGet() >= 3) {
- quota.markExhausted();
+ return exhausted();
}
- return CompletableFuture.completedFuture(null);
+ return hasMore();
}).join();
}
@@ -324,13 +316,12 @@ void processedAndDeletedCountsAreIndependent() {
AtomicInteger observedDeleted = new AtomicInteger(-1);
try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
quota.processedCountAdd(7);
quota.deletedCountAdd(3);
observedProcessed.set(quota.getProcessedCount());
observedDeleted.set(quota.getDeletedCount());
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
+ return exhausted();
}).join();
}
@@ -349,10 +340,9 @@ void limitStartsAtMaxLimitAndIsExposedToTask() {
try (ThrottledRetryingRunner runner = runnerBuilder()
.withMaxLimit(50)
.build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
observedLimit.set(quota.getLimit());
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
+ return exhausted();
}).join();
}
@@ -364,10 +354,9 @@ void limitIsZeroWhenMaxLimitNotConfigured() {
AtomicInteger observedLimit = new AtomicInteger(-1);
try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
observedLimit.set(quota.getLimit());
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
+ return exhausted();
}).join();
}
@@ -376,8 +365,6 @@ void limitIsZeroWhenMaxLimitNotConfigured() {
@Test
void limitDecreasesOnFailureThenIncreasesOnSuccess() {
- // On failure: limit = max(1, processedCount * 9/10)
- // On success: after increaseLimitAfter consecutive successes, limit increases
AtomicInteger callCount = new AtomicInteger(0);
List observedLimits = new ArrayList<>();
@@ -386,7 +373,7 @@ void limitDecreasesOnFailureThenIncreasesOnSuccess() {
.withIncreaseLimitAfter(2) // increase after 2 successes to keep the test short
.withNumOfRetries(10)
.build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
int call = callCount.incrementAndGet();
observedLimits.add(quota.getLimit());
@@ -398,10 +385,9 @@ void limitDecreasesOnFailureThenIncreasesOnSuccess() {
if (call <= 3) {
// Two successes → limit increases from 90
quota.processedCountInc();
- return CompletableFuture.completedFuture(null); // keep going
+ return hasMore();
}
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
+ return exhausted();
}).join();
}
@@ -417,7 +403,6 @@ void limitDecreasesOnFailureThenIncreasesOnSuccess() {
@Test
void limitDecreasesProgressivelyTowardOne() {
- // Each failure with very few processed items should converge the limit toward 1
AtomicInteger callCount = new AtomicInteger(0);
List observedLimits = new ArrayList<>();
@@ -425,20 +410,18 @@ void limitDecreasesProgressivelyTowardOne() {
.withMaxLimit(1000)
.withNumOfRetries(20)
.build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
int call = callCount.incrementAndGet();
observedLimits.add(quota.getLimit());
if (call <= 10) {
- // Process 1 item each time → limit = max(1, 1*9/10) = 1 after first decrease
- quota.processedCountInc();
+ quota.processedCountInc(); // 1 item processed → limit floor at 1
return CompletableFuture.failedFuture(new RuntimeException("fail"));
}
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
+ return exhausted();
}).join();
}
- // After first failure with 1 processed: max(1, 0) = 1
+ // After first failure with 1 processed: max(1, 1*9/10=0) = 1
assertThat(observedLimits.get(1)).isEqualTo(1);
// Stays at 1 regardless of further failures
for (int i = 2; i < 10; i++) {
@@ -455,15 +438,14 @@ void limitIsNeverAdjustedWhenMaxLimitIsZero() {
.withNumOfRetries(5)
// no withMaxLimit → limit stays 0
.build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
int call = callCount.incrementAndGet();
observedLimits.add(quota.getLimit());
quota.processedCountAdd(100);
if (call <= 3) {
return CompletableFuture.failedFuture(new RuntimeException("fail"));
}
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
+ return exhausted();
}).join();
}
@@ -480,18 +462,15 @@ void commitWhenDoneControlsWhetherWritesArePersisted(boolean commitWhenDone) {
byte[] key = subspace.pack(Tuple.from("commit-test"));
byte[] value = "hello".getBytes();
- // Write a key inside iterateAll
try (ThrottledRetryingRunner runner = runnerBuilder()
.withCommitWhenDone(commitWhenDone)
.build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
tr.set(key, value);
- quota.markExhausted();
- return CompletableFuture.completedFuture(null);
+ return exhausted();
}).join();
}
- // Check whether the key is actually present
byte[] read = db.run(tr -> tr.get(key).join());
if (commitWhenDone) {
assertThat(read).isEqualTo(value);
@@ -510,29 +489,26 @@ void eachIterationReceivesADistinctTransaction() {
AtomicInteger callCount = new AtomicInteger(0);
try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
seenTransactions.add(tr);
if (callCount.incrementAndGet() >= 3) {
- quota.markExhausted();
+ return exhausted();
}
- return CompletableFuture.completedFuture(null);
+ return hasMore();
}).join();
}
assertThat(seenTransactions).hasSize(3);
- // All three must be distinct objects
assertThat(seenTransactions.get(0)).isNotSameAs(seenTransactions.get(1));
assertThat(seenTransactions.get(1)).isNotSameAs(seenTransactions.get(2));
}
// -------------------------------------------------------------------------
- // Rate limiting: elapsed time grows when throttling is applied
+ // Rate limiting
// -------------------------------------------------------------------------
@Test
void throttlingByScannedItemsSlowsItDown() {
- // 3 transactions each processing 100 items at max 50/sec → each batch should take ≥2s
- // Use a low count to keep the test fast: 3 transactions × 10 items at max 20/sec = ≥1.5s
final int itemsPerTransaction = 10;
final int maxPerSec = 20;
final int transactions = 3;
@@ -542,12 +518,12 @@ void throttlingByScannedItemsSlowsItDown() {
try (ThrottledRetryingRunner runner = runnerBuilder()
.withMaxItemsScannedPerSec(maxPerSec)
.build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
quota.processedCountAdd(itemsPerTransaction);
if (callCount.incrementAndGet() >= transactions) {
- quota.markExhausted();
+ return exhausted();
}
- return CompletableFuture.completedFuture(null);
+ return hasMore();
}).join();
}
long elapsed = System.currentTimeMillis() - start;
@@ -568,12 +544,12 @@ void throttlingByDeletedItemsSlowsItDown() {
try (ThrottledRetryingRunner runner = runnerBuilder()
.withMaxItemsDeletedPerSec(maxPerSec)
.build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
quota.deletedCountAdd(deletesPerTransaction);
if (callCount.incrementAndGet() >= transactions) {
- quota.markExhausted();
+ return exhausted();
}
- return CompletableFuture.completedFuture(null);
+ return hasMore();
}).join();
}
long elapsed = System.currentTimeMillis() - start;
@@ -584,17 +560,15 @@ void throttlingByDeletedItemsSlowsItDown() {
@Test
void noThrottlingWithZeroRateLimit() {
- // With no rate limit configured the runner should complete nearly instantly
- // (well under 1 second for trivial work). We just assert it completes normally.
AtomicInteger callCount = new AtomicInteger(0);
try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
- runner.iterateAll((tr, quota) -> {
- quota.processedCountAdd(1000); // lots of items but no limit configured
+ runner.iterateAll((tr, quota, cont) -> {
+ quota.processedCountAdd(1000);
if (callCount.incrementAndGet() >= 5) {
- quota.markExhausted();
+ return exhausted();
}
- return CompletableFuture.completedFuture(null);
+ return hasMore();
}).join();
}
@@ -602,23 +576,22 @@ void noThrottlingWithZeroRateLimit() {
}
// -------------------------------------------------------------------------
- // Real DB writes: data persisted across transactions within one iterateAll
+ // Real DB writes
// -------------------------------------------------------------------------
@Test
void dataWrittenInEachTransactionIsVisible() {
- // Write one key per transaction, confirm all are present at the end
final int numTransactions = 5;
AtomicInteger callCount = new AtomicInteger(0);
try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
- runner.iterateAll((tr, quota) -> {
+ runner.iterateAll((tr, quota, cont) -> {
int n = callCount.incrementAndGet();
tr.set(subspace.pack(Tuple.from(n)), Tuple.from(n).pack());
if (n >= numTransactions) {
- quota.markExhausted();
+ return exhausted();
}
- return CompletableFuture.completedFuture(null);
+ return hasMore();
}).join();
}
@@ -631,11 +604,169 @@ void dataWrittenInEachTransactionIsVisible() {
});
}
+ // -------------------------------------------------------------------------
+ // Continuation passing
+ // -------------------------------------------------------------------------
+
+ @Test
+ void firstCallReceivesStartContinuation() {
+ AtomicReference observed = new AtomicReference<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ observed.set(cont);
+ return exhausted();
+ }).join();
+ }
+
+ assertThat(observed.get()).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class);
+ assertThat(observed.get()).isSameAs(ThrottledRetryingRunner.StartContinuation.INSTANCE);
+ }
+
+ @Test
+ void continuationFromSuccessIsPassedToNextCall() {
+ // Each transaction returns a custom continuation object; the next call should receive it.
+ List received = new ArrayList<>();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ // Distinct continuation objects for each transaction
+ ThrottledRetryingRunner.Continuation cont1 = () -> true;
+ ThrottledRetryingRunner.Continuation cont2 = () -> true;
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ received.add(cont);
+ int call = callCount.incrementAndGet();
+ if (call == 1) {
+ return CompletableFuture.completedFuture(cont1);
+ }
+ if (call == 2) {
+ return CompletableFuture.completedFuture(cont2);
+ }
+ return exhausted();
+ }).join();
+ }
+
+ // call 1: receives StartContinuation
+ assertThat(received.get(0)).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class);
+ // call 2: receives the continuation returned by call 1
+ assertThat(received.get(1)).isSameAs(cont1);
+ // call 3: receives the continuation returned by call 2
+ assertThat(received.get(2)).isSameAs(cont2);
+ }
+
+ @Test
+ void continuationIsNotAdvancedOnRetry() {
+ // When a transaction fails, the retry should receive the same continuation as the failed
+ // attempt — not a new one — so that the task can resume from the same position.
+ List received = new ArrayList<>();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ ThrottledRetryingRunner.Continuation contAfterFirstSuccess = () -> true;
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(3)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ received.add(cont);
+ int call = callCount.incrementAndGet();
+
+ if (call == 1) {
+ // First success: return a custom continuation
+ return CompletableFuture.completedFuture(contAfterFirstSuccess);
+ }
+ if (call == 2 || call == 3) {
+ // Fail twice: continuation should NOT advance
+ return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ }
+ // Final success
+ return exhausted();
+ }).join();
+ }
+
+ // call 1: StartContinuation (first call)
+ assertThat(received.get(0)).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class);
+ // call 2: contAfterFirstSuccess (first call succeeded)
+ assertThat(received.get(1)).isSameAs(contAfterFirstSuccess);
+ // call 3: still contAfterFirstSuccess — call 2 failed, continuation not advanced
+ assertThat(received.get(2)).isSameAs(contAfterFirstSuccess);
+ // call 4: still contAfterFirstSuccess — call 3 failed too
+ assertThat(received.get(3)).isSameAs(contAfterFirstSuccess);
+ }
+
+ @Test
+ void startContinuationReusedAcrossRetriesBeforeAnySuccess() {
+ // Before any successful transaction, all retries should receive StartContinuation.
+ List received = new ArrayList<>();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(3)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ received.add(cont);
+ if (callCount.incrementAndGet() <= 2) {
+ return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ }
+ return exhausted();
+ }).join();
+ }
+
+ // All three calls should receive StartContinuation since no success ever happened
+ assertThat(received).hasSize(3);
+ assertThat(received).allMatch(c -> c instanceof ThrottledRetryingRunner.StartContinuation);
+ }
+
+ @Test
+ void continuationCarriesStateToNextTransaction() {
+ // A realistic scenario: the continuation carries a "cursor position" (last key seen).
+ // Each transaction processes one key and the next transaction picks up where it left off.
+ final int numKeys = 4;
+
+ // Write test data
+ db.run(tr -> {
+ for (int i = 0; i < numKeys; i++) {
+ tr.set(subspace.pack(Tuple.from(i)), Tuple.from(i * 10).pack());
+ }
+ return null;
+ });
+
+ // A continuation that carries the next index to read
+ class IndexContinuation implements ThrottledRetryingRunner.Continuation {
+ final int nextIndex;
+ IndexContinuation(int nextIndex) { this.nextIndex = nextIndex; }
+ @Override public boolean hasMore() { return nextIndex < numKeys; }
+ }
+
+ List processedValues = new ArrayList<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int idx = (cont instanceof IndexContinuation) ? ((IndexContinuation) cont).nextIndex : 0;
+ byte[] raw = tr.get(subspace.pack(Tuple.from(idx))).join();
+ processedValues.add((int) Tuple.fromBytes(raw).getLong(0));
+ return CompletableFuture.completedFuture(new IndexContinuation(idx + 1));
+ }).join();
+ }
+
+ assertThat(processedValues).containsExactly(0, 10, 20, 30);
+ }
+
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
+ /** Returns a completed future with a continuation indicating there is more work to do. */
+ private static CompletableFuture hasMore() {
+ return CompletableFuture.completedFuture(() -> true);
+ }
+
+ /** Returns a completed future with a continuation indicating the source is exhausted. */
+ private static CompletableFuture exhausted() {
+ return CompletableFuture.completedFuture(() -> false);
+ }
+
private ThrottledRetryingRunner.Builder runnerBuilder() {
return ThrottledRetryingRunner.builder(db, scheduledExecutor);
}
-}
+}
\ No newline at end of file
diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
index c0131f421e..7bc9058814 100644
--- a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
+++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
@@ -32,35 +32,33 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Runs a task repeatedly across multiple transactions, with adaptive limit management, optional
* rate limiting, and retry logic.
*
- * The caller supplies a {@link BiFunction} that receives a fresh {@link Transaction} and a
- * {@link QuotaManager}, and returns a {@code CompletableFuture}. The task reads
- * {@link QuotaManager#getLimit()} to know how much work to attempt in this transaction, reports
- * how many items it processed via {@link QuotaManager#processedCountInc()} (and optionally
- * {@link QuotaManager#deletedCountInc()}), and signals completion by calling
- * {@link QuotaManager#markExhausted()}.
+ * The caller supplies a {@link Task} that receives a fresh {@link Transaction}, a
+ * {@link QuotaManager}, and the {@link Continuation} from the last successful
+ * transaction (or {@link StartContinuation} on the first call). The task returns a
+ * {@code CompletableFuture}; returning a continuation with
+ * {@link Continuation#hasMore()} {@code == false} stops the loop.
*
*
- * The runner adjusts the limit across transactions:
+ * On failure the continuation is not advanced — the last successful continuation is
+ * re-passed to the retry, so the task can resume from the same position. This distinguishes a
+ * retry (same continuation) from a fresh transaction after success (new continuation).
*
- *
- * - After {@code increaseLimitAfter} consecutive successes the limit is increased.
- * - After any failure the limit is decreased based on how many items were processed before
- * the failure, so the next attempt is less likely to exceed the transaction budget.
- *
*
- * A limit of {@code 0} is treated as "no limit" — the task may do as much work as it likes in
- * each transaction and the limit is never adjusted.
+ * The {@link QuotaManager} is reset at the start of each transaction. The task uses it to report
+ * how many items were processed ({@link QuotaManager#processedCountInc()}) and deleted
+ * ({@link QuotaManager#deletedCountInc()}). These counts drive the between-transaction throttle
+ * delay when {@code maxItemsScannedPerSec} or {@code maxItemsDeletedPerSec} are configured, and
+ * inform the adaptive limit adjustment on failure.
*
*
- * Between successful transactions the runner can optionally delay to enforce a maximum item
- * processing or deletion rate ({@code maxItemsScannedPerSec} / {@code maxItemsDeletedPerSec}).
- * On failure the transaction is retried up to {@code numOfRetries} times before failing.
+ * A limit of {@code 0} is treated as "no limit" — the task may do as much work as it likes in
+ * each transaction and the limit is never adjusted.
*
*/
public class ThrottledRetryingRunner implements AutoCloseable {
@@ -96,50 +94,60 @@ private ThrottledRetryingRunner(Builder builder) {
}
/**
- * Run the given task repeatedly across multiple transactions until it calls
- * {@link QuotaManager#markExhausted()}.
+ * Run the given task repeatedly across multiple transactions until it returns a
+ * {@link Continuation} with {@link Continuation#hasMore()} {@code == false}.
+ *
+ * On the very first call the task receives {@link StartContinuation#INSTANCE}. On each
+ * subsequent call after a successful commit it receives the continuation returned by the
+ * previous call. On a retry after failure it receives the same continuation that was passed
+ * to the failed attempt — i.e. the last successful continuation is preserved.
+ *
*
* A single {@link QuotaManager} is created for the lifetime of this call. Its per-transaction
- * counts ({@link QuotaManager#getProcessedCount()}, {@link QuotaManager#getDeletedCount()})
- * are reset at the start of each transaction. Its limit ({@link QuotaManager#getLimit()}) is
- * adjusted by the runner after each transaction and persists across them.
+ * counts are reset at the start of each transaction; its limit persists and is adjusted
+ * between transactions.
*
*
- * @param task the task to run; reads the limit, reports counts, calls
- * {@link QuotaManager#markExhausted()} to stop
- * @return a future that completes normally when the task exhausts the source, or exceptionally
- * if the retry limit is exceeded or the runner is closed
+ * @param task the task to run
+ * @return a future that completes normally when the task returns a continuation with
+ * {@link Continuation#hasMore()} {@code == false}, or exceptionally if the retry
+ * limit is exceeded or the runner is closed
*/
@Nonnull
- public CompletableFuture iterateAll(
- @Nonnull BiFunction super Transaction, QuotaManager, CompletableFuture> task) {
+ public CompletableFuture iterateAll(@Nonnull Task task) {
if (closed) {
return CompletableFuture.failedFuture(new TransactionalRunner.RunnerClosed());
}
final QuotaManager quotaManager = new QuotaManager(maxLimit);
+ final AtomicReference lastSuccessfulCont =
+ new AtomicReference<>(StartContinuation.INSTANCE);
return AsyncUtil.whileTrue(() ->
- runOneTransaction(task, quotaManager)
- .handle((ignored, ex) -> {
+ runOneTransaction(task, quotaManager, lastSuccessfulCont.get())
+ .handle((continuation, ex) -> {
if (ex == null) {
- return handleSuccess(quotaManager);
+ lastSuccessfulCont.set(continuation);
+ return handleSuccess(quotaManager, continuation);
}
return handleFailure(ex, quotaManager);
})
.thenCompose(ret -> ret));
}
- private CompletableFuture runOneTransaction(
- @Nonnull BiFunction super Transaction, QuotaManager, CompletableFuture> task,
- @Nonnull QuotaManager quotaManager) {
+ private CompletableFuture runOneTransaction(
+ @Nonnull Task task,
+ @Nonnull QuotaManager quotaManager,
+ @Nonnull Continuation continuation) {
transactionStartTimeMillis = System.currentTimeMillis();
return transactionalRunner.runAsync(commitWhenDone, transaction -> {
quotaManager.initTransaction();
- return task.apply(transaction, quotaManager);
+ return task.run(transaction, quotaManager, continuation);
});
}
- private CompletableFuture handleSuccess(QuotaManager quotaManager) {
+ private CompletableFuture handleSuccess(
+ @Nonnull QuotaManager quotaManager,
+ @Nonnull Continuation continuation) {
failureRetriesCounter = 0;
++consecutiveSuccessCount;
@@ -149,7 +157,7 @@ private CompletableFuture handleSuccess(QuotaManager quotaManager) {
consecutiveSuccessCount = 0;
}
- if (!quotaManager.hasMore) {
+ if (!continuation.hasMore()) {
return AsyncUtil.READY_FALSE;
}
@@ -166,7 +174,9 @@ private CompletableFuture handleSuccess(QuotaManager quotaManager) {
return AsyncUtil.READY_TRUE;
}
- private CompletableFuture handleFailure(Throwable ex, QuotaManager quotaManager) {
+ private CompletableFuture handleFailure(
+ @Nonnull Throwable ex,
+ @Nonnull QuotaManager quotaManager) {
++failureRetriesCounter;
consecutiveSuccessCount = 0;
@@ -208,9 +218,80 @@ public void close() {
transactionalRunner.close();
}
+ // -------------------------------------------------------------------------
+ // Continuation
+ // -------------------------------------------------------------------------
+
+ /**
+ * Represents the result of one transaction's work and signals whether the runner should
+ * start another transaction.
+ *
+ * Implementations are free to carry any state needed to resume work at the right position
+ * in the next transaction. The runner itself only inspects {@link #hasMore()}.
+ *
+ */
+ @FunctionalInterface
+ public interface Continuation {
+ /**
+ * Returns {@code true} if there is more work to do; {@code false} if the source has
+ * been exhausted and the loop should stop after committing the current transaction.
+ */
+ boolean hasMore();
+ }
+
+ /**
+ * The continuation passed to the task on the very first transaction.
+ *
+ * Tasks can detect the first call with {@code continuation instanceof StartContinuation}.
+ *
+ */
+ public static final class StartContinuation implements Continuation {
+ /** The singleton instance to pass on the first call. */
+ public static final StartContinuation INSTANCE = new StartContinuation();
+
+ private StartContinuation() {
+ }
+
+ @Override
+ public boolean hasMore() {
+ return true;
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Task
+ // -------------------------------------------------------------------------
+
/**
- * Tracks per-transaction resource usage, exposes the adaptive limit, and controls whether
- * the loop continues.
+ * A task that performs work inside a single transaction and returns a continuation.
+ */
+ @FunctionalInterface
+ public interface Task {
+ /**
+ * Run one transaction's worth of work.
+ *
+ * @param transaction the transaction for this attempt
+ * @param quotaManager tracks work done this transaction and exposes the current limit
+ * @param continuation the continuation from the last successful transaction,
+ * or {@link StartContinuation#INSTANCE} on the very first call;
+ * on a retry this is the same continuation that was passed to the
+ * failed attempt
+ * @return a future containing the continuation describing where the next transaction
+ * should resume; return a continuation with {@link Continuation#hasMore()}
+ * {@code == false} to stop the loop
+ */
+ @Nonnull
+ CompletableFuture run(@Nonnull Transaction transaction,
+ @Nonnull QuotaManager quotaManager,
+ @Nonnull Continuation continuation);
+ }
+
+ // -------------------------------------------------------------------------
+ // QuotaManager
+ // -------------------------------------------------------------------------
+
+ /**
+ * Tracks per-transaction resource usage and exposes the adaptive limit.
*
* One instance is created per {@link #iterateAll} call. Per-transaction counts are reset at
* the start of each transaction; the limit persists and is adjusted by the runner between
@@ -218,14 +299,12 @@ public void close() {
*
*
* The task should call {@link #getLimit()} at the start of each transaction to know how much
- * work to attempt, increment the counts as it goes, and call {@link #markExhausted()} when
- * the source is fully consumed.
+ * work to attempt, and increment the counts as it goes.
*
*/
public static class QuotaManager {
private int processedCount;
private int deletedCount;
- private boolean hasMore;
private int limit;
private final int maxLimit;
@@ -291,18 +370,9 @@ public void deletedCountInc() {
deletedCount++;
}
- /**
- * Signal that the source is exhausted. The loop will stop after the current transaction
- * commits, without starting a new one.
- */
- public void markExhausted() {
- hasMore = false;
- }
-
void initTransaction() {
processedCount = 0;
deletedCount = 0;
- hasMore = true;
}
void increaseLimit() {
@@ -322,6 +392,10 @@ void decreaseLimit() {
}
}
+ // -------------------------------------------------------------------------
+ // Builder
+ // -------------------------------------------------------------------------
+
/**
* Create a new builder for a {@link ThrottledRetryingRunner}.
*
@@ -396,9 +470,9 @@ public Builder withIncreaseLimitAfter(int increaseLimitAfter) {
* Set the initial and maximum per-transaction limit passed to the task via
* {@link QuotaManager#getLimit()}.
*
- * The limit starts at this value. It will not be increased beyond {@code maxLimit} on
- * success. If {@code maxLimit} is {@code 0} (the default) the limit feature is disabled:
- * {@link QuotaManager#getLimit()} always returns {@code 0} and is never adjusted.
+ * The limit starts at this value and will not be increased beyond it. If {@code 0}
+ * (the default) the limit feature is disabled: {@link QuotaManager#getLimit()} always
+ * returns {@code 0} and is never adjusted.
*
*
* @param maxLimit the initial and maximum limit; {@code 0} disables limit management
@@ -443,4 +517,4 @@ public ThrottledRetryingRunner build() {
return new ThrottledRetryingRunner(this);
}
}
-}
+}
\ No newline at end of file
From de705ed90c70be4c8fbae2b320e320dfd779050d Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Thu, 5 Mar 2026 15:52:02 -0500
Subject: [PATCH 4/9] Test that retries/continuations behave correctly on
conflicts
---
.../test/ThrottledRetryingRunnerTest.java | 153 +++++++++++++++++-
1 file changed, 150 insertions(+), 3 deletions(-)
diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
index 215f60944a..0dcbe8105b 100644
--- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
@@ -734,8 +734,15 @@ void continuationCarriesStateToNextTransaction() {
// A continuation that carries the next index to read
class IndexContinuation implements ThrottledRetryingRunner.Continuation {
final int nextIndex;
- IndexContinuation(int nextIndex) { this.nextIndex = nextIndex; }
- @Override public boolean hasMore() { return nextIndex < numKeys; }
+
+ IndexContinuation(int nextIndex) {
+ this.nextIndex = nextIndex;
+ }
+
+ @Override
+ public boolean hasMore() {
+ return nextIndex < numKeys;
+ }
}
List processedValues = new ArrayList<>();
@@ -753,7 +760,147 @@ class IndexContinuation implements ThrottledRetryingRunner.Continuation {
}
// -------------------------------------------------------------------------
- // Helpers
+ // Real transaction conflict
+ // -------------------------------------------------------------------------
+
+ @Test
+ void retriesOnRealTransactionConflict() {
+ // Arrange a genuine FDB NOT_COMMITTED conflict without any threading:
+ // 1. Task (attempt 1) reads conflictKey → establishes a read-conflict range on tr
+ // 2. Task also writes to ownWriteKey → makes tr a write transaction so FDB actually
+ // checks read conflicts on commit (FDB silently skips conflict checking for
+ // read-only transactions)
+ // 3. Task opens a second transaction, writes to conflictKey, and commits it
+ // 4. TransactionalRunner then commits tr → FDB detects the conflict and rejects it
+ // 5. Runner retries; attempt 2 sees no conflict and succeeds
+ byte[] conflictKey = subspace.pack(Tuple.from("conflict-key"));
+ byte[] ownWriteKey = subspace.pack(Tuple.from("own-write"));
+ AtomicInteger attemptCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int attempt = attemptCount.incrementAndGet();
+ if (attempt == 1) {
+ // Write to a separate key to make tr a write transaction.
+ tr.set(ownWriteKey, new byte[]{0});
+ // Read conflictKey to add it to tr's read-conflict range, then commit a
+ // write to the same key in a separate transaction before tr commits.
+ return tr.get(conflictKey)
+ .thenCompose(ignored ->
+ db.runAsync(tr2 -> {
+ tr2.set(conflictKey, new byte[]{1});
+ return CompletableFuture.completedFuture(null);
+ })
+ )
+ .thenCompose(ignored -> exhausted());
+ }
+ // Attempt 2: no conflict
+ return exhausted();
+ }).join();
+ }
+
+ assertThat(attemptCount.get()).isEqualTo(2);
+ }
+
+ @Test
+ void conflictDoesNotAdvanceContinuation() {
+ // Write 6 data keys to process in batches of 2 across 3 transactions.
+ //
+ // Expected flow:
+ // Attempt 1 (idx=0): processes items 0,1 — commits → continuation advances to idx=2
+ // Attempt 2 (idx=2): processes items 2,3 — CONFLICT → not committed
+ // Attempt 3 (idx=2): retry receives idx=2 (not idx=4!) — processes 2,3 again — commits
+ // Attempt 4 (idx=4): processes items 4,5 — commits → hasMore()=false, loop ends
+ //
+ // The key assertion is that attempt 3 starts at idx=2, not idx=4. If the continuation
+ // were incorrectly advanced on failure, items 2 and 3 would never be written to FDB.
+ final int numKeys = 6;
+ final int batchSize = 2;
+
+ db.run(tr -> {
+ for (int i = 0; i < numKeys; i++) {
+ tr.set(subspace.pack(Tuple.from("data", i)), Tuple.from(i).pack());
+ }
+ return null;
+ });
+
+ byte[] conflictKey = subspace.pack(Tuple.from("conflict-trigger"));
+ byte[] ownWriteKey = subspace.pack(Tuple.from("own-write"));
+
+ AtomicInteger attemptCount = new AtomicInteger(0);
+ AtomicInteger retryStartIdx = new AtomicInteger(-1); // captured from attempt 3
+
+ class IndexContinuation implements ThrottledRetryingRunner.Continuation {
+ final int nextIndex;
+
+ IndexContinuation(int nextIndex) {
+ this.nextIndex = nextIndex;
+ }
+
+ @Override
+ public boolean hasMore() {
+ return nextIndex < numKeys;
+ }
+ }
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int attempt = attemptCount.incrementAndGet();
+ int startIdx = (cont instanceof IndexContinuation)
+ ? ((IndexContinuation) cont).nextIndex : 0;
+ int endIdx = Math.min(startIdx + batchSize, numKeys);
+
+ if (attempt == 3) {
+ retryStartIdx.set(startIdx);
+ }
+
+ // Write a "processed" marker for each item in this batch
+ for (int i = startIdx; i < endIdx; i++) {
+ tr.set(subspace.pack(Tuple.from("done", i)), Tuple.from(i).pack());
+ }
+
+ if (attempt == 2) {
+ // Inject a conflict on the second attempt:
+ // - write to ownWriteKey to make tr a write transaction (FDB only checks
+ // read conflicts when the transaction has writes)
+ // - read conflictKey to add it to tr's read-conflict range
+ // - commit a write to conflictKey in a separate transaction so that tr
+ // fails with NOT_COMMITTED
+ tr.set(ownWriteKey, new byte[]{0});
+ return tr.get(conflictKey)
+ .thenCompose(ignored ->
+ db.runAsync(tr2 -> {
+ tr2.set(conflictKey, new byte[]{1});
+ return CompletableFuture.completedFuture(null);
+ })
+ )
+ .thenCompose(ignored ->
+ CompletableFuture.completedFuture(new IndexContinuation(endIdx)));
+ }
+
+ return CompletableFuture.completedFuture(new IndexContinuation(endIdx));
+ }).join();
+ }
+
+ // Attempt 3 must have restarted at idx=2 (the last committed continuation),
+ // not at idx=4 (what would have been returned if the failed attempt had committed)
+ assertThat(retryStartIdx.get()).isEqualTo(2);
+
+ // 4 attempts total: tx1(0-1) + tx2(2-3, conflict) + retry(2-3) + tx3(4-5)
+ assertThat(attemptCount.get()).isEqualTo(4);
+
+ // All 6 items must be present in FDB exactly once — none skipped, none missing
+ db.run(tr -> {
+ for (int i = 0; i < numKeys; i++) {
+ byte[] val = tr.get(subspace.pack(Tuple.from("done", i))).join();
+ assertThat(val).as("item %d should be committed", i)
+ .isEqualTo(Tuple.from(i).pack());
+ }
+ return null;
+ });
+ }
+
+
// -------------------------------------------------------------------------
/** Returns a completed future with a continuation indicating there is more work to do. */
From 4b5788ec68ed896d8aafac4b0a1289d73c580e4b Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Fri, 6 Mar 2026 14:55:42 -0500
Subject: [PATCH 5/9] Cleanup tests a bit
---
.../test/ThrottledRetryingRunnerTest.java | 106 +++++++++---------
1 file changed, 50 insertions(+), 56 deletions(-)
diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
index 0dcbe8105b..b13e8fb8a8 100644
--- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
@@ -731,20 +731,6 @@ void continuationCarriesStateToNextTransaction() {
return null;
});
- // A continuation that carries the next index to read
- class IndexContinuation implements ThrottledRetryingRunner.Continuation {
- final int nextIndex;
-
- IndexContinuation(int nextIndex) {
- this.nextIndex = nextIndex;
- }
-
- @Override
- public boolean hasMore() {
- return nextIndex < numKeys;
- }
- }
-
List processedValues = new ArrayList<>();
try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
@@ -752,7 +738,7 @@ public boolean hasMore() {
int idx = (cont instanceof IndexContinuation) ? ((IndexContinuation) cont).nextIndex : 0;
byte[] raw = tr.get(subspace.pack(Tuple.from(idx))).join();
processedValues.add((int) Tuple.fromBytes(raw).getLong(0));
- return CompletableFuture.completedFuture(new IndexContinuation(idx + 1));
+ return CompletableFuture.completedFuture(new IndexContinuation(idx + 1, numKeys));
}).join();
}
@@ -781,17 +767,7 @@ void retriesOnRealTransactionConflict() {
runner.iterateAll((tr, quota, cont) -> {
int attempt = attemptCount.incrementAndGet();
if (attempt == 1) {
- // Write to a separate key to make tr a write transaction.
- tr.set(ownWriteKey, new byte[]{0});
- // Read conflictKey to add it to tr's read-conflict range, then commit a
- // write to the same key in a separate transaction before tr commits.
- return tr.get(conflictKey)
- .thenCompose(ignored ->
- db.runAsync(tr2 -> {
- tr2.set(conflictKey, new byte[]{1});
- return CompletableFuture.completedFuture(null);
- })
- )
+ return injectConflict(tr, conflictKey, ownWriteKey)
.thenCompose(ignored -> exhausted());
}
// Attempt 2: no conflict
@@ -830,19 +806,6 @@ void conflictDoesNotAdvanceContinuation() {
AtomicInteger attemptCount = new AtomicInteger(0);
AtomicInteger retryStartIdx = new AtomicInteger(-1); // captured from attempt 3
- class IndexContinuation implements ThrottledRetryingRunner.Continuation {
- final int nextIndex;
-
- IndexContinuation(int nextIndex) {
- this.nextIndex = nextIndex;
- }
-
- @Override
- public boolean hasMore() {
- return nextIndex < numKeys;
- }
- }
-
try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) {
runner.iterateAll((tr, quota, cont) -> {
int attempt = attemptCount.incrementAndGet();
@@ -860,25 +823,12 @@ public boolean hasMore() {
}
if (attempt == 2) {
- // Inject a conflict on the second attempt:
- // - write to ownWriteKey to make tr a write transaction (FDB only checks
- // read conflicts when the transaction has writes)
- // - read conflictKey to add it to tr's read-conflict range
- // - commit a write to conflictKey in a separate transaction so that tr
- // fails with NOT_COMMITTED
- tr.set(ownWriteKey, new byte[]{0});
- return tr.get(conflictKey)
- .thenCompose(ignored ->
- db.runAsync(tr2 -> {
- tr2.set(conflictKey, new byte[]{1});
- return CompletableFuture.completedFuture(null);
- })
- )
+ return injectConflict(tr, conflictKey, ownWriteKey)
.thenCompose(ignored ->
- CompletableFuture.completedFuture(new IndexContinuation(endIdx)));
+ CompletableFuture.completedFuture(new IndexContinuation(endIdx, numKeys)));
}
- return CompletableFuture.completedFuture(new IndexContinuation(endIdx));
+ return CompletableFuture.completedFuture(new IndexContinuation(endIdx, numKeys));
}).join();
}
@@ -913,7 +863,51 @@ private static CompletableFuture exhausted
return CompletableFuture.completedFuture(() -> false);
}
+ /**
+ * Injects a genuine FDB read-conflict into {@code tr} and returns a future that completes
+ * once the conflicting write has been committed.
+ *
+ * How it works:
+ *
+ * - Writes to {@code ownWriteKey} so that {@code tr} is a write transaction — FDB only
+ * checks read-conflict ranges when the committing transaction has writes.
+ * - Reads {@code conflictKey} to add it to {@code tr}'s read-conflict range.
+ * - Opens a separate transaction, writes to {@code conflictKey}, and commits it, so that
+ * when {@code TransactionalRunner} later commits {@code tr} it receives
+ * {@code NOT_COMMITTED}.
+ *
+ *
+ */
+ private CompletableFuture injectConflict(Transaction tr, byte[] conflictKey, byte[] ownWriteKey) {
+ tr.set(ownWriteKey, new byte[]{0});
+ return tr.get(conflictKey)
+ .thenCompose(ignored -> db.runAsync(tr2 -> {
+ tr2.set(conflictKey, new byte[]{1});
+ return CompletableFuture.completedFuture(null);
+ }));
+ }
+
private ThrottledRetryingRunner.Builder runnerBuilder() {
return ThrottledRetryingRunner.builder(db, scheduledExecutor);
}
-}
\ No newline at end of file
+
+ /**
+ * A continuation that carries a numeric index as cursor position.
+ * {@link #hasMore()} returns {@code true} while {@link #nextIndex} is less than
+ * {@link #totalKeys}.
+ */
+ private static class IndexContinuation implements ThrottledRetryingRunner.Continuation {
+ final int nextIndex;
+ private final int totalKeys;
+
+ IndexContinuation(int nextIndex, int totalKeys) {
+ this.nextIndex = nextIndex;
+ this.totalKeys = totalKeys;
+ }
+
+ @Override
+ public boolean hasMore() {
+ return nextIndex < totalKeys;
+ }
+ }
+}
From cf2694a66e107d3958b8428dd22f29a4ccce01e0 Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Fri, 6 Mar 2026 16:46:12 -0500
Subject: [PATCH 6/9] Cleanup the tests a bit more
---
.../test/ThrottledRetryingRunnerTest.java | 137 ++++++++----------
1 file changed, 64 insertions(+), 73 deletions(-)
diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
index b13e8fb8a8..354980af4a 100644
--- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
@@ -43,6 +43,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.UnaryOperator;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -100,12 +102,12 @@ void testThrottlePerSecDelayMillis(long elapsedMs, int maxPerSec, int eventCount
@Test
void testIncreaseLimitFormula() {
// limit=0 is no-limit mode; increaseLimit is a no-op
- ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(0);
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(0);
qm.increaseLimit();
assertThat(qm.getLimit()).isEqualTo(0);
// With a real maxLimit, decreaseLimit reads processedCount directly (before initTransaction)
- ThrottledRetryingRunner.QuotaManager qm2 = new ThrottledRetryingRunner.QuotaManager(1000);
+ ThrottledRetryingRunner.QuotaManager qm2 = quotaManager(1000);
qm2.processedCountAdd(80);
qm2.decreaseLimit(); // limit = max(1, 80*9/10) = 72
assertThat(qm2.getLimit()).isEqualTo(72);
@@ -115,7 +117,7 @@ void testIncreaseLimitFormula() {
@Test
void testDecreaseLimitFormula() {
- ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(1000);
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(1000);
// Process 100 items, then fail → limit = max(1, 100*9/10) = 90
qm.processedCountAdd(100);
qm.decreaseLimit();
@@ -141,7 +143,7 @@ void testDecreaseLimitFormula() {
@Test
void testDecreaseLimitNoOpWhenLimitIsZero() {
- ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(0);
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(0);
qm.processedCountAdd(100);
qm.initTransaction();
qm.decreaseLimit();
@@ -150,7 +152,7 @@ void testDecreaseLimitNoOpWhenLimitIsZero() {
@Test
void testLimitCappedAtMaxLimit() {
- ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(50);
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(50);
// Drive limit down by simulating a failure with 40 in-flight items
qm.processedCountAdd(40);
qm.decreaseLimit(); // limit = max(1, 40*9/10) = 36
@@ -251,11 +253,11 @@ void retryCounterResetsAfterSuccess() {
int call = totalCalls.incrementAndGet();
// Fail on calls 1 and 2
if (call == 1 || call == 2) {
- return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ return fail();
}
// Succeed on call 3 (resets counter), then fail on 4 and 5, succeed on 6, done
if (call == 4 || call == 5) {
- return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ return fail();
}
successCount.incrementAndGet();
if (successCount.get() == 2) {
@@ -333,12 +335,13 @@ void processedAndDeletedCountsAreIndependent() {
// Limit: adaptive adjustment
// -------------------------------------------------------------------------
- @Test
- void limitStartsAtMaxLimitAndIsExposedToTask() {
+ @ParameterizedTest
+ @CsvSource({"0, 0", "50, 50"})
+ void limitStartsAtConfiguredMaxLimit(int maxLimit, int expectedLimit) {
AtomicInteger observedLimit = new AtomicInteger(-1);
try (ThrottledRetryingRunner runner = runnerBuilder()
- .withMaxLimit(50)
+ .withMaxLimit(maxLimit)
.build()) {
runner.iterateAll((tr, quota, cont) -> {
observedLimit.set(quota.getLimit());
@@ -346,21 +349,7 @@ void limitStartsAtMaxLimitAndIsExposedToTask() {
}).join();
}
- assertThat(observedLimit.get()).isEqualTo(50);
- }
-
- @Test
- void limitIsZeroWhenMaxLimitNotConfigured() {
- AtomicInteger observedLimit = new AtomicInteger(-1);
-
- try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
- runner.iterateAll((tr, quota, cont) -> {
- observedLimit.set(quota.getLimit());
- return exhausted();
- }).join();
- }
-
- assertThat(observedLimit.get()).isEqualTo(0);
+ assertThat(observedLimit.get()).isEqualTo(expectedLimit);
}
@Test
@@ -380,7 +369,7 @@ void limitDecreasesOnFailureThenIncreasesOnSuccess() {
if (call == 1) {
// Process 100 items and fail → next limit = 90
quota.processedCountAdd(100);
- return CompletableFuture.failedFuture(new RuntimeException("fail"));
+ return fail();
}
if (call <= 3) {
// Two successes → limit increases from 90
@@ -415,7 +404,7 @@ void limitDecreasesProgressivelyTowardOne() {
observedLimits.add(quota.getLimit());
if (call <= 10) {
quota.processedCountInc(); // 1 item processed → limit floor at 1
- return CompletableFuture.failedFuture(new RuntimeException("fail"));
+ return fail();
}
return exhausted();
}).join();
@@ -443,7 +432,7 @@ void limitIsNeverAdjustedWhenMaxLimitIsZero() {
observedLimits.add(quota.getLimit());
quota.processedCountAdd(100);
if (call <= 3) {
- return CompletableFuture.failedFuture(new RuntimeException("fail"));
+ return fail();
}
return exhausted();
}).join();
@@ -509,53 +498,16 @@ void eachIterationReceivesADistinctTransaction() {
@Test
void throttlingByScannedItemsSlowsItDown() {
- final int itemsPerTransaction = 10;
- final int maxPerSec = 20;
- final int transactions = 3;
- AtomicInteger callCount = new AtomicInteger(0);
-
- long start = System.currentTimeMillis();
- try (ThrottledRetryingRunner runner = runnerBuilder()
- .withMaxItemsScannedPerSec(maxPerSec)
- .build()) {
- runner.iterateAll((tr, quota, cont) -> {
- quota.processedCountAdd(itemsPerTransaction);
- if (callCount.incrementAndGet() >= transactions) {
- return exhausted();
- }
- return hasMore();
- }).join();
- }
- long elapsed = System.currentTimeMillis() - start;
-
- // totalItems / maxPerSec = (3*10)/20 = 1.5s minimum
- long expectedMinMs = TimeUnit.SECONDS.toMillis(itemsPerTransaction * transactions / maxPerSec);
- assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs);
+ assertThrottledByItemsPerSec(
+ b -> b.withMaxItemsScannedPerSec(20),
+ ThrottledRetryingRunner.QuotaManager::processedCountAdd);
}
@Test
void throttlingByDeletedItemsSlowsItDown() {
- final int deletesPerTransaction = 10;
- final int maxPerSec = 20;
- final int transactions = 3;
- AtomicInteger callCount = new AtomicInteger(0);
-
- long start = System.currentTimeMillis();
- try (ThrottledRetryingRunner runner = runnerBuilder()
- .withMaxItemsDeletedPerSec(maxPerSec)
- .build()) {
- runner.iterateAll((tr, quota, cont) -> {
- quota.deletedCountAdd(deletesPerTransaction);
- if (callCount.incrementAndGet() >= transactions) {
- return exhausted();
- }
- return hasMore();
- }).join();
- }
- long elapsed = System.currentTimeMillis() - start;
-
- long expectedMinMs = TimeUnit.SECONDS.toMillis(deletesPerTransaction * transactions / maxPerSec);
- assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs);
+ assertThrottledByItemsPerSec(
+ b -> b.withMaxItemsDeletedPerSec(20),
+ ThrottledRetryingRunner.QuotaManager::deletedCountAdd);
}
@Test
@@ -677,7 +629,7 @@ void continuationIsNotAdvancedOnRetry() {
}
if (call == 2 || call == 3) {
// Fail twice: continuation should NOT advance
- return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ return fail();
}
// Final success
return exhausted();
@@ -706,7 +658,7 @@ void startContinuationReusedAcrossRetriesBeforeAnySuccess() {
runner.iterateAll((tr, quota, cont) -> {
received.add(cont);
if (callCount.incrementAndGet() <= 2) {
- return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ return fail();
}
return exhausted();
}).join();
@@ -863,6 +815,45 @@ private static CompletableFuture exhausted
return CompletableFuture.completedFuture(() -> false);
}
+ /** Returns a pre-failed future simulating a transient task failure. */
+ private static CompletableFuture fail() {
+ return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ }
+
+ /** Convenience factory for a {@link ThrottledRetryingRunner.QuotaManager}. */
+ private static ThrottledRetryingRunner.QuotaManager quotaManager(int maxLimit) {
+ return new ThrottledRetryingRunner.QuotaManager(maxLimit);
+ }
+
+ /**
+ * Asserts that the runner throttles correctly when {@code reportItems} is used to count
+ * items and {@code configure} wires up the corresponding rate-limiter on the builder.
+ */
+ private void assertThrottledByItemsPerSec(
+ UnaryOperator configure,
+ BiConsumer reportItems) {
+ final int itemsPerTransaction = 10;
+ final int maxPerSec = 20;
+ final int transactions = 3;
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ long start = System.currentTimeMillis();
+ try (ThrottledRetryingRunner runner = configure.apply(runnerBuilder()).build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ reportItems.accept(quota, itemsPerTransaction);
+ if (callCount.incrementAndGet() >= transactions) {
+ return exhausted();
+ }
+ return hasMore();
+ }).join();
+ }
+ long elapsed = System.currentTimeMillis() - start;
+
+ // totalItems / maxPerSec = (3 * 10) / 20 = 1.5s minimum
+ long expectedMinMs = TimeUnit.SECONDS.toMillis((long) itemsPerTransaction * transactions / maxPerSec);
+ assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs);
+ }
+
/**
* Injects a genuine FDB read-conflict into {@code tr} and returns a future that completes
* once the conflicting write has been committed.
From 97cd26378a059ee2f1a72e1d99ab68e24c76c4f4 Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Mon, 16 Mar 2026 13:52:48 -0400
Subject: [PATCH 7/9] Have the QuotaManager tell the task whether to continue
Otherwise the other limits don't get applied
---
.../test/ThrottledRetryingRunnerTest.java | 51 +++++++++++++++++++
.../test/ThrottledRetryingRunner.java | 31 ++++++++++-
2 files changed, 80 insertions(+), 2 deletions(-)
diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
index 354980af4a..e26ba64afd 100644
--- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
@@ -163,6 +163,57 @@ void testLimitCappedAtMaxLimit() {
assertThat(qm.getLimit()).isEqualTo(50); // capped at maxLimit
}
+ // -------------------------------------------------------------------------
+ // QuotaManager shouldContinue
+ // -------------------------------------------------------------------------
+
+ @Test
+ void shouldContinueTrueWhenUnderLimit() {
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(10);
+ qm.initTransaction();
+ qm.processedCountAdd(5);
+ assertThat(qm.shouldContinue()).isTrue();
+ }
+
+ @Test
+ void shouldContinueFalseWhenProcessedCountExceedsLimit() {
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(10);
+ qm.initTransaction();
+ qm.processedCountAdd(11); // 11 > 10
+ assertThat(qm.shouldContinue()).isFalse();
+ }
+
+ @Test
+ void shouldContinueTrueWhenProcessedCountEqualsLimit() {
+ // > not >=: at exactly the limit, shouldContinue still returns true
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(10);
+ qm.initTransaction();
+ qm.processedCountAdd(10); // 10 > 10 is false
+ assertThat(qm.shouldContinue()).isTrue();
+ }
+
+ @Test
+ void shouldContinueTrueWhenNoLimitRegardlessOfCount() {
+ // limit=0 means no limit; the count is never checked
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(0);
+ qm.initTransaction();
+ qm.processedCountAdd(Integer.MAX_VALUE);
+ assertThat(qm.shouldContinue()).isTrue();
+ }
+
+ @Test
+ void shouldContinueResetsAfterInitTransaction() {
+ // After a transaction fails (count exceeds limit), initTransaction should reset
+ // the state so shouldContinue returns true again at the start of the retry.
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(5);
+ qm.initTransaction();
+ qm.processedCountAdd(6); // 6 > 5 → shouldContinue false
+ assertThat(qm.shouldContinue()).isFalse();
+
+ qm.initTransaction(); // reset for the next transaction
+ assertThat(qm.shouldContinue()).isTrue();
+ }
+
// -------------------------------------------------------------------------
// Basic iteration: task runs the expected number of times
// -------------------------------------------------------------------------
diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
index 7bc9058814..15b22ae952 100644
--- a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
+++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
@@ -298,15 +298,21 @@ CompletableFuture run(@Nonnull Transaction transaction,
* transactions.
*
*
- * The task should call {@link #getLimit()} at the start of each transaction to know how much
- * work to attempt, and increment the counts as it goes.
+ * The task should call {@link #shouldContinue()} between processing items to decide whether
+ * to stop early. {@link #shouldContinue()} returns {@code false} once the processed count
+ * exceeds the active limit or the transaction has been running for longer than
+ * {@link #MAX_TRANSACTION_DURATION_MILLIS}.
*
*/
public static class QuotaManager {
+ /** Maximum time a single transaction should run before {@link #shouldContinue()} returns {@code false}. */
+ static final long MAX_TRANSACTION_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(4);
+
private int processedCount;
private int deletedCount;
private int limit;
private final int maxLimit;
+ private long transactionStartTimeMillis;
QuotaManager(int maxLimit) {
this.maxLimit = maxLimit;
@@ -370,9 +376,30 @@ public void deletedCountInc() {
deletedCount++;
}
+ /**
+ * Returns {@code true} if the task should continue processing items in this transaction,
+ * {@code false} if it should stop and return a continuation.
+ *
+ * Returns {@code false} when either:
+ *
+ * - a limit is active ({@link #getLimit()} {@code > 0}) and the processed count
+ * exceeds it, or
+ * - the transaction has been running for longer than
+ * {@link #MAX_TRANSACTION_DURATION_MILLIS}.
+ *
+ *
+ */
+ public boolean shouldContinue() {
+ if (limit > 0 && processedCount > limit) {
+ return false;
+ }
+ return System.currentTimeMillis() - transactionStartTimeMillis < MAX_TRANSACTION_DURATION_MILLIS;
+ }
+
void initTransaction() {
processedCount = 0;
deletedCount = 0;
+ transactionStartTimeMillis = System.currentTimeMillis();
}
void increaseLimit() {
From 0d21b0fa6fe9dd855daffc62de4f9844b721846e Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Mon, 16 Mar 2026 14:41:38 -0400
Subject: [PATCH 8/9] Switch insertSIFTSmall to use ThrottledRunner
---
.../foundationdb/async/hnsw/TestHelpers.java | 152 +++++++++++-------
1 file changed, 94 insertions(+), 58 deletions(-)
diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java
index 8b933523d0..8ebe93c3b4 100644
--- a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java
@@ -31,11 +31,11 @@
import com.apple.foundationdb.linear.StoredVecsIterator;
import com.apple.foundationdb.linear.Transformed;
import com.apple.foundationdb.subspace.Subspace;
+import com.apple.foundationdb.test.ThrottledRetryingRunner;
import com.apple.foundationdb.tuple.Tuple;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
@@ -68,6 +68,8 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@@ -97,6 +99,35 @@ static void dumpQueryResults(@Nonnull final Path tempDir, @Nonnull final String
}
}
+ @Nonnull
+ static CompletableFuture> basicInsertBatch(
+ @Nonnull final Transaction tr,
+ @Nonnull final HNSW hnsw,
+ final int batchSize,
+ final long firstId,
+ @Nonnull final BiFunction insertFunction) {
+ logger.info("Inserting batch [" + firstId + ", " + (firstId + batchSize) + ")");
+ final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener();
+ onWriteListener.reset();
+ final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener();
+ onReadListener.reset();
+
+ final ImmutableList.Builder data = ImmutableList.builder();
+
+ // In theory this could put all the futures in a List and run the inserts concurrently, but for a `basicInsertBatch`
+ // it's probably better to not test the concurrent handling of hnsw, even if it makes the tests slower.
+ CompletableFuture future = CompletableFuture.completedFuture(null);
+ for (int i = 0; i < batchSize; i ++) {
+ final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i);
+ if (record == null) {
+ break;
+ }
+ data.add(record);
+ future = future.thenCompose((vignore) -> hnsw.insert(tr, record.getPrimaryKey(), record.getVector()));
+ }
+ return future.thenApply(vignore -> data.build());
+ }
+
@Nonnull
static List basicInsertBatch(@Nonnull final Database db,
@Nonnull final HNSW hnsw,
@@ -104,72 +135,77 @@ static List basicInsertBatch(@Nonnull final Database db,
final long firstId,
@Nonnull final BiFunction insertFunction)
throws ExecutionException, InterruptedException, TimeoutException {
- return db.runAsync(tr -> {
- final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener();
- onWriteListener.reset();
- final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener();
- onReadListener.reset();
-
- final ImmutableList.Builder data = ImmutableList.builder();
-
- // In theory this could put all the futures in a List and run the inserts concurrently, but for a `basicInsertBatch`
- // it's probably better to not test the concurrent handling of hnsw, even if it makes the tests slower.
- CompletableFuture future = CompletableFuture.completedFuture(null);
- final long beginTs = System.nanoTime();
- for (int i = 0; i < batchSize; i ++) {
- final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i);
- if (record == null) {
- break;
- }
- data.add(record);
- future = future.thenCompose((vignore) -> hnsw.insert(tr, record.getPrimaryKey(), record.getVector()));
- }
- return future.thenApply(vignore -> data.build())
- .whenComplete((result, error) -> {
- if (error != null) {
- logger.info("Failed to insert batchSize={}", error);
- } else {
- final long endTs = System.nanoTime();
- logger.info("inserted batchSize={} records={} starting at nodeId={} took elapsedTime={}ms, readCounts={}, readBytes={}",
- batchSize, result.size(), firstId, TimeUnit.NANOSECONDS.toMillis(endTs - beginTs),
- onReadListener.getNodeCountByLayer(), onReadListener.getBytesReadByLayer());
- }
- });
- }).get(2, TimeUnit.MINUTES); // set a timeout for inserting a single batch including retries so setup won't run forever
+ return db.runAsync(
+ tr -> {
+ final long beginTs = System.nanoTime();
+ return basicInsertBatch(tr, hnsw, batchSize, firstId, insertFunction)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ logger.info("Failed to insert batchSize={}", error);
+ } else {
+ final long endTs = System.nanoTime();
+ final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener();
+ logger.info("inserted batchSize={} records={} starting at nodeId={} took elapsedTime={}ms, readCounts={}, readBytes={}",
+ batchSize, result.size(), firstId, TimeUnit.NANOSECONDS.toMillis(endTs - beginTs),
+ onReadListener.getNodeCountByLayer(), onReadListener.getBytesReadByLayer());
+ }
+ });
+ })
+ .get(2, TimeUnit.MINUTES); // set a timeout for inserting a single batch including retries so setup won't run forever
}
static List insertSIFTSmall(@Nonnull final Database db,
@Nonnull final HNSW hnsw) throws Exception {
final Path siftSmallPath = Paths.get(".out/extracted/siftsmall/siftsmall_base.fvecs");
- final ImmutableList.Builder insertedDataBuilder = ImmutableList.builder();
-
+ // Load all vectors upfront so the task can index into them by position using the adaptive limit.
+ final List allVectors;
try (final var fileChannel = FileChannel.open(siftSmallPath, StandardOpenOption.READ)) {
- final Iterator vectorIterator = new StoredVecsIterator.StoredFVecsIterator(fileChannel);
-
- final int batchSize = 50;
- int i = 0;
- while (vectorIterator.hasNext()) {
- final List batch =
- Lists.newArrayList(Iterators.limit(vectorIterator, batchSize));
- final long currentBatchStart = i;
- final List insertedInBatch =
- basicInsertBatch(db, hnsw, batchSize, i,
- (tr, nextId) -> {
- final int indexInBatch = Math.toIntExact(nextId - currentBatchStart);
- if (indexInBatch >= batch.size()) {
- return null;
- }
- final Tuple currentPrimaryKey = createPrimaryKey(nextId);
- final DoubleRealVector doubleVector = batch.get(indexInBatch);
- return new PrimaryKeyAndVector(currentPrimaryKey, doubleVector);
- });
- insertedDataBuilder.addAll(insertedInBatch);
- i += insertedInBatch.size();
+ allVectors = Lists.newArrayList(new StoredVecsIterator.StoredFVecsIterator(fileChannel));
+ }
+
+ // A continuation that carries the next vector index to insert.
+ class SiftContinuation implements ThrottledRetryingRunner.Continuation {
+ final int nextIndex;
+
+ SiftContinuation(int nextIndex) {
+ this.nextIndex = nextIndex;
+ }
+
+ @Override
+ public boolean hasMore() {
+ return nextIndex < allVectors.size();
}
- assertThat(i).isEqualTo(10000);
}
- return insertedDataBuilder.build();
+
+ final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
+ try (ThrottledRetryingRunner runner = ThrottledRetryingRunner.builder(db, executor)
+ .withMaxLimit(500)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ final int startIdx = (cont instanceof SiftContinuation)
+ ? ((SiftContinuation) cont).nextIndex : 0;
+ return basicInsertBatch(tr, hnsw, quota.getLimit(), startIdx,
+ (ignoredTr, nextId) -> {
+ final int idx = Math.toIntExact(nextId);
+ if (idx < allVectors.size() && quota.shouldContinue()) {
+ quota.processedCountInc();
+ return new PrimaryKeyAndVector(createPrimaryKey(idx), allVectors.get(idx));
+ }
+ return null;
+ })
+ .thenApply(list -> new SiftContinuation(startIdx + list.size()));
+ }).join();
+ } finally {
+ executor.shutdown();
+ }
+
+ assertThat(allVectors).hasSize(10000);
+ final ImmutableList.Builder result = ImmutableList.builder();
+ for (int i = 0; i < allVectors.size(); i++) {
+ result.add(new PrimaryKeyAndVector(createPrimaryKey(i), allVectors.get(i)));
+ }
+ return result.build();
}
static void validateSIFTSmall(@Nonnull final Database db,
From b5ef0162149e5b9ed3787b0c3e0f59f5b9d59682 Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Mon, 16 Mar 2026 15:23:55 -0400
Subject: [PATCH 9/9] Fix loop's handling of futures
---
.../foundationdb/async/hnsw/TestHelpers.java | 27 ++++++++++++-------
1 file changed, 17 insertions(+), 10 deletions(-)
diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java
index 8ebe93c3b4..bde87879f0 100644
--- a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java
@@ -22,6 +22,7 @@
import com.apple.foundationdb.Database;
import com.apple.foundationdb.Transaction;
+import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.linear.AffineOperator;
import com.apple.foundationdb.linear.DoubleRealVector;
import com.apple.foundationdb.linear.HalfRealVector;
@@ -72,6 +73,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -106,26 +108,31 @@ static CompletableFuture> basicInsertBatch(
final int batchSize,
final long firstId,
@Nonnull final BiFunction insertFunction) {
- logger.info("Inserting batch [" + firstId + ", " + (firstId + batchSize) + ")");
+ logger.info("Inserting batch starting at " + firstId);
final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener();
onWriteListener.reset();
final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener();
onReadListener.reset();
final ImmutableList.Builder data = ImmutableList.builder();
-
- // In theory this could put all the futures in a List and run the inserts concurrently, but for a `basicInsertBatch`
- // it's probably better to not test the concurrent handling of hnsw, even if it makes the tests slower.
- CompletableFuture future = CompletableFuture.completedFuture(null);
- for (int i = 0; i < batchSize; i ++) {
+ // Call insertFunction lazily between async inserts so that shouldContinue() sees the
+ // actual elapsed time after each insert rather than evaluating all checks synchronously
+ // before any async work begins.
+ final AtomicInteger insertCount = new AtomicInteger(0);
+ return AsyncUtil.whileTrue(() -> {
+ final int i = insertCount.get();
+ if (i >= batchSize) {
+ return AsyncUtil.READY_FALSE;
+ }
final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i);
if (record == null) {
- break;
+ return AsyncUtil.READY_FALSE;
}
data.add(record);
- future = future.thenCompose((vignore) -> hnsw.insert(tr, record.getPrimaryKey(), record.getVector()));
- }
- return future.thenApply(vignore -> data.build());
+ insertCount.incrementAndGet();
+ return hnsw.insert(tr, record.getPrimaryKey(), record.getVector())
+ .thenApply(ignored -> Boolean.TRUE);
+ }).>thenApply(ignored -> data.build());
}
@Nonnull