diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java
index 8b933523d0..bde87879f0 100644
--- a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java
@@ -22,6 +22,7 @@
import com.apple.foundationdb.Database;
import com.apple.foundationdb.Transaction;
+import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.linear.AffineOperator;
import com.apple.foundationdb.linear.DoubleRealVector;
import com.apple.foundationdb.linear.HalfRealVector;
@@ -31,11 +32,11 @@
import com.apple.foundationdb.linear.StoredVecsIterator;
import com.apple.foundationdb.linear.Transformed;
import com.apple.foundationdb.subspace.Subspace;
+import com.apple.foundationdb.test.ThrottledRetryingRunner;
import com.apple.foundationdb.tuple.Tuple;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
@@ -68,8 +69,11 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -97,6 +101,40 @@ static void dumpQueryResults(@Nonnull final Path tempDir, @Nonnull final String
}
}
+ @Nonnull
+ static CompletableFuture> basicInsertBatch(
+ @Nonnull final Transaction tr,
+ @Nonnull final HNSW hnsw,
+ final int batchSize,
+ final long firstId,
+ @Nonnull final BiFunction insertFunction) {
+ logger.info("Inserting batch starting at " + firstId);
+ final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener();
+ onWriteListener.reset();
+ final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener();
+ onReadListener.reset();
+
+ final ImmutableList.Builder data = ImmutableList.builder();
+ // Call insertFunction lazily between async inserts so that shouldContinue() sees the
+ // actual elapsed time after each insert rather than evaluating all checks synchronously
+ // before any async work begins.
+ final AtomicInteger insertCount = new AtomicInteger(0);
+ return AsyncUtil.whileTrue(() -> {
+ final int i = insertCount.get();
+ if (i >= batchSize) {
+ return AsyncUtil.READY_FALSE;
+ }
+ final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i);
+ if (record == null) {
+ return AsyncUtil.READY_FALSE;
+ }
+ data.add(record);
+ insertCount.incrementAndGet();
+ return hnsw.insert(tr, record.getPrimaryKey(), record.getVector())
+ .thenApply(ignored -> Boolean.TRUE);
+ }).>thenApply(ignored -> data.build());
+ }
+
@Nonnull
static List basicInsertBatch(@Nonnull final Database db,
@Nonnull final HNSW hnsw,
@@ -104,72 +142,77 @@ static List basicInsertBatch(@Nonnull final Database db,
final long firstId,
@Nonnull final BiFunction insertFunction)
throws ExecutionException, InterruptedException, TimeoutException {
- return db.runAsync(tr -> {
- final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener();
- onWriteListener.reset();
- final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener();
- onReadListener.reset();
-
- final ImmutableList.Builder data = ImmutableList.builder();
-
- // In theory this could put all the futures in a List and run the inserts concurrently, but for a `basicInsertBatch`
- // it's probably better to not test the concurrent handling of hnsw, even if it makes the tests slower.
- CompletableFuture future = CompletableFuture.completedFuture(null);
- final long beginTs = System.nanoTime();
- for (int i = 0; i < batchSize; i ++) {
- final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i);
- if (record == null) {
- break;
- }
- data.add(record);
- future = future.thenCompose((vignore) -> hnsw.insert(tr, record.getPrimaryKey(), record.getVector()));
- }
- return future.thenApply(vignore -> data.build())
- .whenComplete((result, error) -> {
- if (error != null) {
- logger.info("Failed to insert batchSize={}", error);
- } else {
- final long endTs = System.nanoTime();
- logger.info("inserted batchSize={} records={} starting at nodeId={} took elapsedTime={}ms, readCounts={}, readBytes={}",
- batchSize, result.size(), firstId, TimeUnit.NANOSECONDS.toMillis(endTs - beginTs),
- onReadListener.getNodeCountByLayer(), onReadListener.getBytesReadByLayer());
- }
- });
- }).get(2, TimeUnit.MINUTES); // set a timeout for inserting a single batch including retries so setup won't run forever
+ return db.runAsync(
+ tr -> {
+ final long beginTs = System.nanoTime();
+ return basicInsertBatch(tr, hnsw, batchSize, firstId, insertFunction)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ logger.info("Failed to insert batchSize={}", error);
+ } else {
+ final long endTs = System.nanoTime();
+ final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener();
+ logger.info("inserted batchSize={} records={} starting at nodeId={} took elapsedTime={}ms, readCounts={}, readBytes={}",
+ batchSize, result.size(), firstId, TimeUnit.NANOSECONDS.toMillis(endTs - beginTs),
+ onReadListener.getNodeCountByLayer(), onReadListener.getBytesReadByLayer());
+ }
+ });
+ })
+ .get(2, TimeUnit.MINUTES); // set a timeout for inserting a single batch including retries so setup won't run forever
}
static List insertSIFTSmall(@Nonnull final Database db,
@Nonnull final HNSW hnsw) throws Exception {
final Path siftSmallPath = Paths.get(".out/extracted/siftsmall/siftsmall_base.fvecs");
- final ImmutableList.Builder insertedDataBuilder = ImmutableList.builder();
-
+ // Load all vectors upfront so the task can index into them by position using the adaptive limit.
+ final List allVectors;
try (final var fileChannel = FileChannel.open(siftSmallPath, StandardOpenOption.READ)) {
- final Iterator vectorIterator = new StoredVecsIterator.StoredFVecsIterator(fileChannel);
-
- final int batchSize = 50;
- int i = 0;
- while (vectorIterator.hasNext()) {
- final List batch =
- Lists.newArrayList(Iterators.limit(vectorIterator, batchSize));
- final long currentBatchStart = i;
- final List insertedInBatch =
- basicInsertBatch(db, hnsw, batchSize, i,
- (tr, nextId) -> {
- final int indexInBatch = Math.toIntExact(nextId - currentBatchStart);
- if (indexInBatch >= batch.size()) {
- return null;
- }
- final Tuple currentPrimaryKey = createPrimaryKey(nextId);
- final DoubleRealVector doubleVector = batch.get(indexInBatch);
- return new PrimaryKeyAndVector(currentPrimaryKey, doubleVector);
- });
- insertedDataBuilder.addAll(insertedInBatch);
- i += insertedInBatch.size();
+ allVectors = Lists.newArrayList(new StoredVecsIterator.StoredFVecsIterator(fileChannel));
+ }
+
+ // A continuation that carries the next vector index to insert.
+ class SiftContinuation implements ThrottledRetryingRunner.Continuation {
+ final int nextIndex;
+
+ SiftContinuation(int nextIndex) {
+ this.nextIndex = nextIndex;
+ }
+
+ @Override
+ public boolean hasMore() {
+ return nextIndex < allVectors.size();
}
- assertThat(i).isEqualTo(10000);
}
- return insertedDataBuilder.build();
+
+ final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
+ try (ThrottledRetryingRunner runner = ThrottledRetryingRunner.builder(db, executor)
+ .withMaxLimit(500)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ final int startIdx = (cont instanceof SiftContinuation)
+ ? ((SiftContinuation) cont).nextIndex : 0;
+ return basicInsertBatch(tr, hnsw, quota.getLimit(), startIdx,
+ (ignoredTr, nextId) -> {
+ final int idx = Math.toIntExact(nextId);
+ if (idx < allVectors.size() && quota.shouldContinue()) {
+ quota.processedCountInc();
+ return new PrimaryKeyAndVector(createPrimaryKey(idx), allVectors.get(idx));
+ }
+ return null;
+ })
+ .thenApply(list -> new SiftContinuation(startIdx + list.size()));
+ }).join();
+ } finally {
+ executor.shutdown();
+ }
+
+ assertThat(allVectors).hasSize(10000);
+ final ImmutableList.Builder result = ImmutableList.builder();
+ for (int i = 0; i < allVectors.size(); i++) {
+ result.add(new PrimaryKeyAndVector(createPrimaryKey(i), allVectors.get(i)));
+ }
+ return result.build();
}
static void validateSIFTSmall(@Nonnull final Database db,
diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
new file mode 100644
index 0000000000..e26ba64afd
--- /dev/null
+++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java
@@ -0,0 +1,955 @@
+/*
+ * ThrottledRetryingRunnerTest.java
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apple.foundationdb.test;
+
+import com.apple.foundationdb.Database;
+import com.apple.foundationdb.Transaction;
+import com.apple.foundationdb.subspace.Subspace;
+import com.apple.foundationdb.tuple.Tuple;
+import com.apple.test.Tags;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.UnaryOperator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowableOfType;
+
+@Tag(Tags.RequiresFDB)
+class ThrottledRetryingRunnerTest {
+ @RegisterExtension
+ static final TestDatabaseExtension dbExtension = new TestDatabaseExtension();
+ @RegisterExtension
+ TestSubspaceExtension subspaceExtension = new TestSubspaceExtension(dbExtension);
+
+ private Database db;
+ private Subspace subspace;
+ private ScheduledExecutorService scheduledExecutor;
+
+ @BeforeEach
+ void setUp() {
+ db = dbExtension.getDatabase();
+ subspace = subspaceExtension.getSubspace();
+ scheduledExecutor = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryBuilder().setNameFormat("throttled-runner-test-%d").build());
+ }
+
+ @AfterEach
+ void tearDown() {
+ scheduledExecutor.shutdown();
+ }
+
+ // -------------------------------------------------------------------------
+ // throttlePerSecDelayMillis static helper
+ // -------------------------------------------------------------------------
+
+ @ParameterizedTest
+ @CsvSource({
+ // elapsedMs, maxPerSec, eventCount, expectedDelayMs
+ "1000, 100, 0, 0", // no events: no delay
+ "1000, 100, 100, 0", // exactly right rate: no delay
+ "1000, 100, 80, 0", // below rate: no delay
+ "1000, 100, 200, 1000", // double the rate: 1s delay
+ " 250, 50, 100, 1750", // 100 events should take 2s, only 250ms elapsed
+ " 1, 50, 100, 1999", // nearly instant: almost full 2s delay
+ " 0, 100, 0, 0", // zero events, no limit
+ "1000, 0, 999, 0", // maxPerSec=0 means disabled
+ })
+ void testThrottlePerSecDelayMillis(long elapsedMs, int maxPerSec, int eventCount, long expectedDelay) {
+ assertThat(ThrottledRetryingRunner.throttlePerSecDelayMillis(elapsedMs, maxPerSec, eventCount))
+ .isEqualTo(expectedDelay);
+ }
+
+ // -------------------------------------------------------------------------
+ // QuotaManager limit adjustment
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testIncreaseLimitFormula() {
+ // limit=0 is no-limit mode; increaseLimit is a no-op
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(0);
+ qm.increaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(0);
+
+ // With a real maxLimit, decreaseLimit reads processedCount directly (before initTransaction)
+ ThrottledRetryingRunner.QuotaManager qm2 = quotaManager(1000);
+ qm2.processedCountAdd(80);
+ qm2.decreaseLimit(); // limit = max(1, 80*9/10) = 72
+ assertThat(qm2.getLimit()).isEqualTo(72);
+ qm2.increaseLimit(); // limit = max(72*5/4, 72+4) = max(90, 76) = 90
+ assertThat(qm2.getLimit()).isEqualTo(90);
+ }
+
+ @Test
+ void testDecreaseLimitFormula() {
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(1000);
+ // Process 100 items, then fail → limit = max(1, 100*9/10) = 90
+ qm.processedCountAdd(100);
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(90);
+
+ // Process 10 items, fail again → max(1, 10*9/10) = 9
+ qm.initTransaction();
+ qm.processedCountAdd(10);
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(9);
+
+ // Decrease floors at 1
+ qm.initTransaction();
+ qm.processedCountInc();
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(1);
+
+ // Zero processed also floors at 1
+ qm.initTransaction();
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(1);
+ }
+
+ @Test
+ void testDecreaseLimitNoOpWhenLimitIsZero() {
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(0);
+ qm.processedCountAdd(100);
+ qm.initTransaction();
+ qm.decreaseLimit();
+ assertThat(qm.getLimit()).isEqualTo(0);
+ }
+
+ @Test
+ void testLimitCappedAtMaxLimit() {
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(50);
+ // Drive limit down by simulating a failure with 40 in-flight items
+ qm.processedCountAdd(40);
+ qm.decreaseLimit(); // limit = max(1, 40*9/10) = 36
+ // Keep increasing until we hit the cap
+ for (int i = 0; i < 100; i++) {
+ qm.increaseLimit();
+ }
+ assertThat(qm.getLimit()).isEqualTo(50); // capped at maxLimit
+ }
+
+ // -------------------------------------------------------------------------
+ // QuotaManager shouldContinue
+ // -------------------------------------------------------------------------
+
+ @Test
+ void shouldContinueTrueWhenUnderLimit() {
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(10);
+ qm.initTransaction();
+ qm.processedCountAdd(5);
+ assertThat(qm.shouldContinue()).isTrue();
+ }
+
+ @Test
+ void shouldContinueFalseWhenProcessedCountExceedsLimit() {
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(10);
+ qm.initTransaction();
+ qm.processedCountAdd(11); // 11 > 10
+ assertThat(qm.shouldContinue()).isFalse();
+ }
+
+ @Test
+ void shouldContinueTrueWhenProcessedCountEqualsLimit() {
+ // > not >=: at exactly the limit, shouldContinue still returns true
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(10);
+ qm.initTransaction();
+ qm.processedCountAdd(10); // 10 > 10 is false
+ assertThat(qm.shouldContinue()).isTrue();
+ }
+
+ @Test
+ void shouldContinueTrueWhenNoLimitRegardlessOfCount() {
+ // limit=0 means no limit; the count is never checked
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(0);
+ qm.initTransaction();
+ qm.processedCountAdd(Integer.MAX_VALUE);
+ assertThat(qm.shouldContinue()).isTrue();
+ }
+
+ @Test
+ void shouldContinueResetsAfterInitTransaction() {
+ // After a transaction fails (count exceeds limit), initTransaction should reset
+ // the state so shouldContinue returns true again at the start of the retry.
+ ThrottledRetryingRunner.QuotaManager qm = quotaManager(5);
+ qm.initTransaction();
+ qm.processedCountAdd(6); // 6 > 5 → shouldContinue false
+ assertThat(qm.shouldContinue()).isFalse();
+
+ qm.initTransaction(); // reset for the next transaction
+ assertThat(qm.shouldContinue()).isTrue();
+ }
+
+ // -------------------------------------------------------------------------
+ // Basic iteration: task runs the expected number of times
+ // -------------------------------------------------------------------------
+
+ @Test
+ void taskRunsUntilExhausted() {
+ final int iterations = 5;
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ if (callCount.incrementAndGet() >= iterations) {
+ return exhausted();
+ }
+ return hasMore();
+ }).join();
+ }
+
+ assertThat(callCount.get()).isEqualTo(iterations);
+ }
+
+ @Test
+ void taskExhaustedImmediatelyRunsOnce() {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ callCount.incrementAndGet();
+ return exhausted();
+ }).join();
+ }
+
+ assertThat(callCount.get()).isEqualTo(1);
+ }
+
+ // -------------------------------------------------------------------------
+ // cannotRunWhenClosed
+ // -------------------------------------------------------------------------
+
+ @Test
+ void cannotRunWhenClosed() {
+ ThrottledRetryingRunner runner = runnerBuilder().build();
+ runner.iterateAll((tr, quota, cont) -> exhausted()).join();
+ runner.close();
+
+ assertThatThrownBy(() ->
+ runner.iterateAll((tr, quota, cont) -> exhausted()).join())
+ .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class);
+ }
+
+ // -------------------------------------------------------------------------
+ // Retries on failure
+ // -------------------------------------------------------------------------
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 3, 10})
+ void retriesCorrectNumberOfTimes(int numRetries) {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(numRetries)
+ .build()) {
+ Throwable ex = catchThrowableOfType(RuntimeException.class, () ->
+ runner.iterateAll((tr, quota, cont) -> {
+ callCount.incrementAndGet();
+ return CompletableFuture.failedFuture(new RuntimeException("always fails"));
+ }).join());
+
+ assertThat(ex).isNotNull();
+ assertThat(ex.getMessage()).contains("always fails");
+ }
+
+ // First attempt + numRetries retries
+ assertThat(callCount.get()).isEqualTo(numRetries + 1);
+ }
+
+ @Test
+ void retryCounterResetsAfterSuccess() {
+ // Fail twice, succeed, then fail twice again — should not exhaust the retry budget
+ // because the counter resets on each success.
+ AtomicInteger totalCalls = new AtomicInteger(0);
+ AtomicInteger successCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(2)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int call = totalCalls.incrementAndGet();
+ // Fail on calls 1 and 2
+ if (call == 1 || call == 2) {
+ return fail();
+ }
+ // Succeed on call 3 (resets counter), then fail on 4 and 5, succeed on 6, done
+ if (call == 4 || call == 5) {
+ return fail();
+ }
+ successCount.incrementAndGet();
+ if (successCount.get() == 2) {
+ return exhausted();
+ }
+ return hasMore();
+ }).join();
+ }
+
+ assertThat(successCount.get()).isEqualTo(2);
+ }
+
+ @Test
+ void runnerClosedDuringIterationAbortsImmediately() {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(100).build();
+ runner.close(); // close before iterating
+
+ assertThatThrownBy(() ->
+ runner.iterateAll((tr, quota, cont) -> {
+ callCount.incrementAndGet();
+ return exhausted();
+ }).join())
+ .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class);
+
+ // iterateAll checks closed before starting, so the task is never called
+ assertThat(callCount.get()).isEqualTo(0);
+ }
+
+ // -------------------------------------------------------------------------
+ // Counts: processedCount and deletedCount
+ // -------------------------------------------------------------------------
+
+ @Test
+ void countsAreResetEachTransaction() {
+ // Each transaction should see processedCount starting at 0
+ List observedCounts = new ArrayList<>();
+ AtomicInteger totalCalls = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ observedCounts.add(quota.getProcessedCount()); // always 0 at start
+ quota.processedCountAdd(10);
+ if (totalCalls.incrementAndGet() >= 3) {
+ return exhausted();
+ }
+ return hasMore();
+ }).join();
+ }
+
+ assertThat(observedCounts).containsExactly(0, 0, 0);
+ }
+
+ @Test
+ void processedAndDeletedCountsAreIndependent() {
+ AtomicInteger observedProcessed = new AtomicInteger(-1);
+ AtomicInteger observedDeleted = new AtomicInteger(-1);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ quota.processedCountAdd(7);
+ quota.deletedCountAdd(3);
+ observedProcessed.set(quota.getProcessedCount());
+ observedDeleted.set(quota.getDeletedCount());
+ return exhausted();
+ }).join();
+ }
+
+ assertThat(observedProcessed.get()).isEqualTo(7);
+ assertThat(observedDeleted.get()).isEqualTo(3);
+ }
+
+ // -------------------------------------------------------------------------
+ // Limit: adaptive adjustment
+ // -------------------------------------------------------------------------
+
+ @ParameterizedTest
+ @CsvSource({"0, 0", "50, 50"})
+ void limitStartsAtConfiguredMaxLimit(int maxLimit, int expectedLimit) {
+ AtomicInteger observedLimit = new AtomicInteger(-1);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withMaxLimit(maxLimit)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ observedLimit.set(quota.getLimit());
+ return exhausted();
+ }).join();
+ }
+
+ assertThat(observedLimit.get()).isEqualTo(expectedLimit);
+ }
+
+ @Test
+ void limitDecreasesOnFailureThenIncreasesOnSuccess() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ List observedLimits = new ArrayList<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withMaxLimit(200)
+ .withIncreaseLimitAfter(2) // increase after 2 successes to keep the test short
+ .withNumOfRetries(10)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int call = callCount.incrementAndGet();
+ observedLimits.add(quota.getLimit());
+
+ if (call == 1) {
+ // Process 100 items and fail → next limit = 90
+ quota.processedCountAdd(100);
+ return fail();
+ }
+ if (call <= 3) {
+ // Two successes → limit increases from 90
+ quota.processedCountInc();
+ return hasMore();
+ }
+ return exhausted();
+ }).join();
+ }
+
+ // call 1: limit=200 (initial), fails with 100 processed → new limit = 90
+ assertThat(observedLimits.get(0)).isEqualTo(200);
+ // call 2: limit=90
+ assertThat(observedLimits.get(1)).isEqualTo(90);
+ // call 3: limit=90 still (need 2 consecutive successes before increase)
+ assertThat(observedLimits.get(2)).isEqualTo(90);
+ // call 4: after 2 successes, limit increased: max(90*5/4, 90+4) = max(112,94) = 112
+ assertThat(observedLimits.get(3)).isEqualTo(112);
+ }
+
+ @Test
+ void limitDecreasesProgressivelyTowardOne() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ List observedLimits = new ArrayList<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withMaxLimit(1000)
+ .withNumOfRetries(20)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int call = callCount.incrementAndGet();
+ observedLimits.add(quota.getLimit());
+ if (call <= 10) {
+ quota.processedCountInc(); // 1 item processed → limit floor at 1
+ return fail();
+ }
+ return exhausted();
+ }).join();
+ }
+
+ // After first failure with 1 processed: max(1, 1*9/10=0) = 1
+ assertThat(observedLimits.get(1)).isEqualTo(1);
+ // Stays at 1 regardless of further failures
+ for (int i = 2; i < 10; i++) {
+ assertThat(observedLimits.get(i)).isEqualTo(1);
+ }
+ }
+
+ @Test
+ void limitIsNeverAdjustedWhenMaxLimitIsZero() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ List observedLimits = new ArrayList<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(5)
+ // no withMaxLimit → limit stays 0
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int call = callCount.incrementAndGet();
+ observedLimits.add(quota.getLimit());
+ quota.processedCountAdd(100);
+ if (call <= 3) {
+ return fail();
+ }
+ return exhausted();
+ }).join();
+ }
+
+ assertThat(observedLimits).containsOnly(0);
+ }
+
+ // -------------------------------------------------------------------------
+ // commitWhenDone: writes are visible (commit) or not (no commit)
+ // -------------------------------------------------------------------------
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void commitWhenDoneControlsWhetherWritesArePersisted(boolean commitWhenDone) {
+ byte[] key = subspace.pack(Tuple.from("commit-test"));
+ byte[] value = "hello".getBytes();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withCommitWhenDone(commitWhenDone)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ tr.set(key, value);
+ return exhausted();
+ }).join();
+ }
+
+ byte[] read = db.run(tr -> tr.get(key).join());
+ if (commitWhenDone) {
+ assertThat(read).isEqualTo(value);
+ } else {
+ assertThat(read).isNull();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Each transaction gets a fresh Transaction object
+ // -------------------------------------------------------------------------
+
+ @Test
+ void eachIterationReceivesADistinctTransaction() {
+ List seenTransactions = new ArrayList<>();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ seenTransactions.add(tr);
+ if (callCount.incrementAndGet() >= 3) {
+ return exhausted();
+ }
+ return hasMore();
+ }).join();
+ }
+
+ assertThat(seenTransactions).hasSize(3);
+ assertThat(seenTransactions.get(0)).isNotSameAs(seenTransactions.get(1));
+ assertThat(seenTransactions.get(1)).isNotSameAs(seenTransactions.get(2));
+ }
+
+ // -------------------------------------------------------------------------
+ // Rate limiting
+ // -------------------------------------------------------------------------
+
+ @Test
+ void throttlingByScannedItemsSlowsItDown() {
+ assertThrottledByItemsPerSec(
+ b -> b.withMaxItemsScannedPerSec(20),
+ ThrottledRetryingRunner.QuotaManager::processedCountAdd);
+ }
+
+ @Test
+ void throttlingByDeletedItemsSlowsItDown() {
+ assertThrottledByItemsPerSec(
+ b -> b.withMaxItemsDeletedPerSec(20),
+ ThrottledRetryingRunner.QuotaManager::deletedCountAdd);
+ }
+
+ @Test
+ void noThrottlingWithZeroRateLimit() {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ quota.processedCountAdd(1000);
+ if (callCount.incrementAndGet() >= 5) {
+ return exhausted();
+ }
+ return hasMore();
+ }).join();
+ }
+
+ assertThat(callCount.get()).isEqualTo(5);
+ }
+
+ // -------------------------------------------------------------------------
+ // Real DB writes
+ // -------------------------------------------------------------------------
+
+ @Test
+ void dataWrittenInEachTransactionIsVisible() {
+ final int numTransactions = 5;
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int n = callCount.incrementAndGet();
+ tr.set(subspace.pack(Tuple.from(n)), Tuple.from(n).pack());
+ if (n >= numTransactions) {
+ return exhausted();
+ }
+ return hasMore();
+ }).join();
+ }
+
+ db.run(tr -> {
+ for (int i = 1; i <= numTransactions; i++) {
+ byte[] val = tr.get(subspace.pack(Tuple.from(i))).join();
+ assertThat(val).isEqualTo(Tuple.from(i).pack());
+ }
+ return null;
+ });
+ }
+
+ // -------------------------------------------------------------------------
+ // Continuation passing
+ // -------------------------------------------------------------------------
+
+ @Test
+ void firstCallReceivesStartContinuation() {
+ AtomicReference observed = new AtomicReference<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ observed.set(cont);
+ return exhausted();
+ }).join();
+ }
+
+ assertThat(observed.get()).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class);
+ assertThat(observed.get()).isSameAs(ThrottledRetryingRunner.StartContinuation.INSTANCE);
+ }
+
+ @Test
+ void continuationFromSuccessIsPassedToNextCall() {
+ // Each transaction returns a custom continuation object; the next call should receive it.
+ List received = new ArrayList<>();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ // Distinct continuation objects for each transaction
+ ThrottledRetryingRunner.Continuation cont1 = () -> true;
+ ThrottledRetryingRunner.Continuation cont2 = () -> true;
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ received.add(cont);
+ int call = callCount.incrementAndGet();
+ if (call == 1) {
+ return CompletableFuture.completedFuture(cont1);
+ }
+ if (call == 2) {
+ return CompletableFuture.completedFuture(cont2);
+ }
+ return exhausted();
+ }).join();
+ }
+
+ // call 1: receives StartContinuation
+ assertThat(received.get(0)).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class);
+ // call 2: receives the continuation returned by call 1
+ assertThat(received.get(1)).isSameAs(cont1);
+ // call 3: receives the continuation returned by call 2
+ assertThat(received.get(2)).isSameAs(cont2);
+ }
+
+ @Test
+ void continuationIsNotAdvancedOnRetry() {
+ // When a transaction fails, the retry should receive the same continuation as the failed
+ // attempt — not a new one — so that the task can resume from the same position.
+ List received = new ArrayList<>();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ ThrottledRetryingRunner.Continuation contAfterFirstSuccess = () -> true;
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(3)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ received.add(cont);
+ int call = callCount.incrementAndGet();
+
+ if (call == 1) {
+ // First success: return a custom continuation
+ return CompletableFuture.completedFuture(contAfterFirstSuccess);
+ }
+ if (call == 2 || call == 3) {
+ // Fail twice: continuation should NOT advance
+ return fail();
+ }
+ // Final success
+ return exhausted();
+ }).join();
+ }
+
+ // call 1: StartContinuation (first call)
+ assertThat(received.get(0)).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class);
+ // call 2: contAfterFirstSuccess (first call succeeded)
+ assertThat(received.get(1)).isSameAs(contAfterFirstSuccess);
+ // call 3: still contAfterFirstSuccess — call 2 failed, continuation not advanced
+ assertThat(received.get(2)).isSameAs(contAfterFirstSuccess);
+ // call 4: still contAfterFirstSuccess — call 3 failed too
+ assertThat(received.get(3)).isSameAs(contAfterFirstSuccess);
+ }
+
+ @Test
+ void startContinuationReusedAcrossRetriesBeforeAnySuccess() {
+ // Before any successful transaction, all retries should receive StartContinuation.
+ List received = new ArrayList<>();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder()
+ .withNumOfRetries(3)
+ .build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ received.add(cont);
+ if (callCount.incrementAndGet() <= 2) {
+ return fail();
+ }
+ return exhausted();
+ }).join();
+ }
+
+ // All three calls should receive StartContinuation since no success ever happened
+ assertThat(received).hasSize(3);
+ assertThat(received).allMatch(c -> c instanceof ThrottledRetryingRunner.StartContinuation);
+ }
+
+ @Test
+ void continuationCarriesStateToNextTransaction() {
+ // A realistic scenario: the continuation carries a "cursor position" (last key seen).
+ // Each transaction processes one key and the next transaction picks up where it left off.
+ final int numKeys = 4;
+
+ // Write test data
+ db.run(tr -> {
+ for (int i = 0; i < numKeys; i++) {
+ tr.set(subspace.pack(Tuple.from(i)), Tuple.from(i * 10).pack());
+ }
+ return null;
+ });
+
+ List processedValues = new ArrayList<>();
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int idx = (cont instanceof IndexContinuation) ? ((IndexContinuation) cont).nextIndex : 0;
+ byte[] raw = tr.get(subspace.pack(Tuple.from(idx))).join();
+ processedValues.add((int) Tuple.fromBytes(raw).getLong(0));
+ return CompletableFuture.completedFuture(new IndexContinuation(idx + 1, numKeys));
+ }).join();
+ }
+
+ assertThat(processedValues).containsExactly(0, 10, 20, 30);
+ }
+
+ // -------------------------------------------------------------------------
+ // Real transaction conflict
+ // -------------------------------------------------------------------------
+
+ @Test
+ void retriesOnRealTransactionConflict() {
+ // Arrange a genuine FDB NOT_COMMITTED conflict without any threading:
+ // 1. Task (attempt 1) reads conflictKey → establishes a read-conflict range on tr
+ // 2. Task also writes to ownWriteKey → makes tr a write transaction so FDB actually
+ // checks read conflicts on commit (FDB silently skips conflict checking for
+ // read-only transactions)
+ // 3. Task opens a second transaction, writes to conflictKey, and commits it
+ // 4. TransactionalRunner then commits tr → FDB detects the conflict and rejects it
+ // 5. Runner retries; attempt 2 sees no conflict and succeeds
+ byte[] conflictKey = subspace.pack(Tuple.from("conflict-key"));
+ byte[] ownWriteKey = subspace.pack(Tuple.from("own-write"));
+ AtomicInteger attemptCount = new AtomicInteger(0);
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int attempt = attemptCount.incrementAndGet();
+ if (attempt == 1) {
+ return injectConflict(tr, conflictKey, ownWriteKey)
+ .thenCompose(ignored -> exhausted());
+ }
+ // Attempt 2: no conflict
+ return exhausted();
+ }).join();
+ }
+
+ assertThat(attemptCount.get()).isEqualTo(2);
+ }
+
+ @Test
+ void conflictDoesNotAdvanceContinuation() {
+ // Write 6 data keys to process in batches of 2 across 3 transactions.
+ //
+ // Expected flow:
+ // Attempt 1 (idx=0): processes items 0,1 — commits → continuation advances to idx=2
+ // Attempt 2 (idx=2): processes items 2,3 — CONFLICT → not committed
+ // Attempt 3 (idx=2): retry receives idx=2 (not idx=4!) — processes 2,3 again — commits
+ // Attempt 4 (idx=4): processes items 4,5 — commits → hasMore()=false, loop ends
+ //
+ // The key assertion is that attempt 3 starts at idx=2, not idx=4. If the continuation
+ // were incorrectly advanced on failure, items 2 and 3 would never be written to FDB.
+ final int numKeys = 6;
+ final int batchSize = 2;
+
+ db.run(tr -> {
+ for (int i = 0; i < numKeys; i++) {
+ tr.set(subspace.pack(Tuple.from("data", i)), Tuple.from(i).pack());
+ }
+ return null;
+ });
+
+ byte[] conflictKey = subspace.pack(Tuple.from("conflict-trigger"));
+ byte[] ownWriteKey = subspace.pack(Tuple.from("own-write"));
+
+ AtomicInteger attemptCount = new AtomicInteger(0);
+ AtomicInteger retryStartIdx = new AtomicInteger(-1); // captured from attempt 3
+
+ try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ int attempt = attemptCount.incrementAndGet();
+ int startIdx = (cont instanceof IndexContinuation)
+ ? ((IndexContinuation) cont).nextIndex : 0;
+ int endIdx = Math.min(startIdx + batchSize, numKeys);
+
+ if (attempt == 3) {
+ retryStartIdx.set(startIdx);
+ }
+
+ // Write a "processed" marker for each item in this batch
+ for (int i = startIdx; i < endIdx; i++) {
+ tr.set(subspace.pack(Tuple.from("done", i)), Tuple.from(i).pack());
+ }
+
+ if (attempt == 2) {
+ return injectConflict(tr, conflictKey, ownWriteKey)
+ .thenCompose(ignored ->
+ CompletableFuture.completedFuture(new IndexContinuation(endIdx, numKeys)));
+ }
+
+ return CompletableFuture.completedFuture(new IndexContinuation(endIdx, numKeys));
+ }).join();
+ }
+
+ // Attempt 3 must have restarted at idx=2 (the last committed continuation),
+ // not at idx=4 (what would have been returned if the failed attempt had committed)
+ assertThat(retryStartIdx.get()).isEqualTo(2);
+
+ // 4 attempts total: tx1(0-1) + tx2(2-3, conflict) + retry(2-3) + tx3(4-5)
+ assertThat(attemptCount.get()).isEqualTo(4);
+
+ // All 6 items must be present in FDB exactly once — none skipped, none missing
+ db.run(tr -> {
+ for (int i = 0; i < numKeys; i++) {
+ byte[] val = tr.get(subspace.pack(Tuple.from("done", i))).join();
+ assertThat(val).as("item %d should be committed", i)
+ .isEqualTo(Tuple.from(i).pack());
+ }
+ return null;
+ });
+ }
+
+
+ // -------------------------------------------------------------------------
+
+ /** Returns a completed future with a continuation indicating there is more work to do. */
+ private static CompletableFuture hasMore() {
+ return CompletableFuture.completedFuture(() -> true);
+ }
+
+ /** Returns a completed future with a continuation indicating the source is exhausted. */
+ private static CompletableFuture exhausted() {
+ return CompletableFuture.completedFuture(() -> false);
+ }
+
+ /** Returns a pre-failed future simulating a transient task failure. */
+ private static CompletableFuture fail() {
+ return CompletableFuture.failedFuture(new RuntimeException("transient"));
+ }
+
+ /** Convenience factory for a {@link ThrottledRetryingRunner.QuotaManager}. */
+ private static ThrottledRetryingRunner.QuotaManager quotaManager(int maxLimit) {
+ return new ThrottledRetryingRunner.QuotaManager(maxLimit);
+ }
+
+ /**
+ * Asserts that the runner throttles correctly when {@code reportItems} is used to count
+ * items and {@code configure} wires up the corresponding rate-limiter on the builder.
+ */
+ private void assertThrottledByItemsPerSec(
+ UnaryOperator configure,
+ BiConsumer reportItems) {
+ final int itemsPerTransaction = 10;
+ final int maxPerSec = 20;
+ final int transactions = 3;
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ long start = System.currentTimeMillis();
+ try (ThrottledRetryingRunner runner = configure.apply(runnerBuilder()).build()) {
+ runner.iterateAll((tr, quota, cont) -> {
+ reportItems.accept(quota, itemsPerTransaction);
+ if (callCount.incrementAndGet() >= transactions) {
+ return exhausted();
+ }
+ return hasMore();
+ }).join();
+ }
+ long elapsed = System.currentTimeMillis() - start;
+
+ // totalItems / maxPerSec = (3 * 10) / 20 = 1.5s minimum
+ long expectedMinMs = TimeUnit.SECONDS.toMillis((long) itemsPerTransaction * transactions / maxPerSec);
+ assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs);
+ }
+
+ /**
+ * Injects a genuine FDB read-conflict into {@code tr} and returns a future that completes
+ * once the conflicting write has been committed.
+ *
+ * How it works:
+ *
+ * - Writes to {@code ownWriteKey} so that {@code tr} is a write transaction — FDB only
+ * checks read-conflict ranges when the committing transaction has writes.
+ * - Reads {@code conflictKey} to add it to {@code tr}'s read-conflict range.
+ * - Opens a separate transaction, writes to {@code conflictKey}, and commits it, so that
+ * when {@code TransactionalRunner} later commits {@code tr} it receives
+ * {@code NOT_COMMITTED}.
+ *
+ *
+ */
+ private CompletableFuture injectConflict(Transaction tr, byte[] conflictKey, byte[] ownWriteKey) {
+ tr.set(ownWriteKey, new byte[]{0});
+ return tr.get(conflictKey)
+ .thenCompose(ignored -> db.runAsync(tr2 -> {
+ tr2.set(conflictKey, new byte[]{1});
+ return CompletableFuture.completedFuture(null);
+ }));
+ }
+
+ private ThrottledRetryingRunner.Builder runnerBuilder() {
+ return ThrottledRetryingRunner.builder(db, scheduledExecutor);
+ }
+
+ /**
+ * A continuation that carries a numeric index as cursor position.
+ * {@link #hasMore()} returns {@code true} while {@link #nextIndex} is less than
+ * {@link #totalKeys}.
+ */
+ private static class IndexContinuation implements ThrottledRetryingRunner.Continuation {
+ final int nextIndex;
+ private final int totalKeys;
+
+ IndexContinuation(int nextIndex, int totalKeys) {
+ this.nextIndex = nextIndex;
+ this.totalKeys = totalKeys;
+ }
+
+ @Override
+ public boolean hasMore() {
+ return nextIndex < totalKeys;
+ }
+ }
+}
diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
new file mode 100644
index 0000000000..15b22ae952
--- /dev/null
+++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java
@@ -0,0 +1,547 @@
+/*
+ * ThrottledRetryingRunner.java
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apple.foundationdb.test;
+
+import com.apple.foundationdb.Database;
+import com.apple.foundationdb.Transaction;
+import com.apple.foundationdb.async.AsyncUtil;
+import com.apple.foundationdb.async.MoreAsyncUtil;
+
+import javax.annotation.Nonnull;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Runs a task repeatedly across multiple transactions, with adaptive limit management, optional
+ * rate limiting, and retry logic.
+ *
+ * The caller supplies a {@link Task} that receives a fresh {@link Transaction}, a
+ * {@link QuotaManager}, and the {@link Continuation} from the last successful
+ * transaction (or {@link StartContinuation} on the first call). The task returns a
+ * {@code CompletableFuture}; returning a continuation with
+ * {@link Continuation#hasMore()} {@code == false} stops the loop.
+ *
+ *
+ * On failure the continuation is not advanced — the last successful continuation is
+ * re-passed to the retry, so the task can resume from the same position. This distinguishes a
+ * retry (same continuation) from a fresh transaction after success (new continuation).
+ *
+ *
+ * The {@link QuotaManager} is reset at the start of each transaction. The task uses it to report
+ * how many items were processed ({@link QuotaManager#processedCountInc()}) and deleted
+ * ({@link QuotaManager#deletedCountInc()}). These counts drive the between-transaction throttle
+ * delay when {@code maxItemsScannedPerSec} or {@code maxItemsDeletedPerSec} are configured, and
+ * inform the adaptive limit adjustment on failure.
+ *
+ *
+ * A limit of {@code 0} is treated as "no limit" — the task may do as much work as it likes in
+ * each transaction and the limit is never adjusted.
+ *
+ */
+public class ThrottledRetryingRunner implements AutoCloseable {
+
+ public static final int DEFAULT_NUM_RETRIES = 100;
+ public static final int DEFAULT_INCREASE_LIMIT_AFTER = 40;
+
+ @Nonnull
+ private final TransactionalRunner transactionalRunner;
+ @Nonnull
+ private final ScheduledExecutorService scheduledExecutor;
+ private final int maxItemsScannedPerSec;
+ private final int maxItemsDeletedPerSec;
+ private final int numOfRetries;
+ private final boolean commitWhenDone;
+ private final int increaseLimitAfter;
+ private final int maxLimit;
+
+ private boolean closed = false;
+ private long transactionStartTimeMillis = 0;
+ private int failureRetriesCounter = 0;
+ private int consecutiveSuccessCount = 0;
+
+ private ThrottledRetryingRunner(Builder builder) {
+ this.transactionalRunner = new TransactionalRunner(builder.database);
+ this.scheduledExecutor = builder.scheduledExecutor;
+ this.maxItemsScannedPerSec = builder.maxItemsScannedPerSec;
+ this.maxItemsDeletedPerSec = builder.maxItemsDeletedPerSec;
+ this.numOfRetries = builder.numOfRetries;
+ this.commitWhenDone = builder.commitWhenDone;
+ this.increaseLimitAfter = builder.increaseLimitAfter;
+ this.maxLimit = builder.maxLimit;
+ }
+
+ /**
+ * Run the given task repeatedly across multiple transactions until it returns a
+ * {@link Continuation} with {@link Continuation#hasMore()} {@code == false}.
+ *
+ * On the very first call the task receives {@link StartContinuation#INSTANCE}. On each
+ * subsequent call after a successful commit it receives the continuation returned by the
+ * previous call. On a retry after failure it receives the same continuation that was passed
+ * to the failed attempt — i.e. the last successful continuation is preserved.
+ *
+ *
+ * A single {@link QuotaManager} is created for the lifetime of this call. Its per-transaction
+ * counts are reset at the start of each transaction; its limit persists and is adjusted
+ * between transactions.
+ *
+ *
+ * @param task the task to run
+ * @return a future that completes normally when the task returns a continuation with
+ * {@link Continuation#hasMore()} {@code == false}, or exceptionally if the retry
+ * limit is exceeded or the runner is closed
+ */
+ @Nonnull
+ public CompletableFuture iterateAll(@Nonnull Task task) {
+ if (closed) {
+ return CompletableFuture.failedFuture(new TransactionalRunner.RunnerClosed());
+ }
+
+ final QuotaManager quotaManager = new QuotaManager(maxLimit);
+ final AtomicReference lastSuccessfulCont =
+ new AtomicReference<>(StartContinuation.INSTANCE);
+ return AsyncUtil.whileTrue(() ->
+ runOneTransaction(task, quotaManager, lastSuccessfulCont.get())
+ .handle((continuation, ex) -> {
+ if (ex == null) {
+ lastSuccessfulCont.set(continuation);
+ return handleSuccess(quotaManager, continuation);
+ }
+ return handleFailure(ex, quotaManager);
+ })
+ .thenCompose(ret -> ret));
+ }
+
+ private CompletableFuture runOneTransaction(
+ @Nonnull Task task,
+ @Nonnull QuotaManager quotaManager,
+ @Nonnull Continuation continuation) {
+ transactionStartTimeMillis = System.currentTimeMillis();
+ return transactionalRunner.runAsync(commitWhenDone, transaction -> {
+ quotaManager.initTransaction();
+ return task.run(transaction, quotaManager, continuation);
+ });
+ }
+
+ private CompletableFuture handleSuccess(
+ @Nonnull QuotaManager quotaManager,
+ @Nonnull Continuation continuation) {
+ failureRetriesCounter = 0;
+ ++consecutiveSuccessCount;
+
+ // Increase limit after enough consecutive successes, but only when a limit is active
+ if (quotaManager.limit > 0 && consecutiveSuccessCount >= increaseLimitAfter) {
+ quotaManager.increaseLimit();
+ consecutiveSuccessCount = 0;
+ }
+
+ if (!continuation.hasMore()) {
+ return AsyncUtil.READY_FALSE;
+ }
+
+ long elapsedMillis = Math.max(0, System.currentTimeMillis() - transactionStartTimeMillis);
+ long delayMillis = Collections.max(List.of(
+ throttlePerSecDelayMillis(elapsedMillis, maxItemsScannedPerSec, quotaManager.processedCount),
+ throttlePerSecDelayMillis(elapsedMillis, maxItemsDeletedPerSec, quotaManager.deletedCount)
+ ));
+
+ if (delayMillis > 0) {
+ return MoreAsyncUtil.delayedFuture(delayMillis, TimeUnit.MILLISECONDS, scheduledExecutor)
+ .thenApply(ignore -> true);
+ }
+ return AsyncUtil.READY_TRUE;
+ }
+
+ private CompletableFuture handleFailure(
+ @Nonnull Throwable ex,
+ @Nonnull QuotaManager quotaManager) {
+ ++failureRetriesCounter;
+ consecutiveSuccessCount = 0;
+
+ if (ex instanceof CompletionException) {
+ ex = ex.getCause();
+ }
+ if (ex instanceof TransactionalRunner.RunnerClosed) {
+ return CompletableFuture.failedFuture(ex);
+ }
+ if (failureRetriesCounter > numOfRetries) {
+ return CompletableFuture.failedFuture(ex);
+ }
+
+ // Decrease the limit based on how much work was in-flight when the failure occurred,
+ // so the next attempt is less likely to exceed the transaction budget.
+ quotaManager.decreaseLimit();
+
+ return AsyncUtil.READY_TRUE; // retry
+ }
+
+ /**
+ * Compute how long to wait to keep the event rate at or below {@code maxPerSec}.
+ * Matches the formula used by {@code ThrottledRetryingIterator}.
+ */
+ static long throttlePerSecDelayMillis(long elapsedMillis, int maxPerSec, int eventCount) {
+ if (maxPerSec <= 0) {
+ return 0;
+ }
+ long waitMillis = (TimeUnit.SECONDS.toMillis(eventCount) / maxPerSec) - elapsedMillis;
+ return waitMillis > 0 ? waitMillis : 0;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ transactionalRunner.close();
+ }
+
+ // -------------------------------------------------------------------------
+ // Continuation
+ // -------------------------------------------------------------------------
+
+ /**
+ * Represents the result of one transaction's work and signals whether the runner should
+ * start another transaction.
+ *
+ * Implementations are free to carry any state needed to resume work at the right position
+ * in the next transaction. The runner itself only inspects {@link #hasMore()}.
+ *
+ */
+ @FunctionalInterface
+ public interface Continuation {
+ /**
+ * Returns {@code true} if there is more work to do; {@code false} if the source has
+ * been exhausted and the loop should stop after committing the current transaction.
+ */
+ boolean hasMore();
+ }
+
+ /**
+ * The continuation passed to the task on the very first transaction.
+ *
+ * Tasks can detect the first call with {@code continuation instanceof StartContinuation}.
+ *
+ */
+ public static final class StartContinuation implements Continuation {
+ /** The singleton instance to pass on the first call. */
+ public static final StartContinuation INSTANCE = new StartContinuation();
+
+ private StartContinuation() {
+ }
+
+ @Override
+ public boolean hasMore() {
+ return true;
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Task
+ // -------------------------------------------------------------------------
+
+ /**
+ * A task that performs work inside a single transaction and returns a continuation.
+ */
+ @FunctionalInterface
+ public interface Task {
+ /**
+ * Run one transaction's worth of work.
+ *
+ * @param transaction the transaction for this attempt
+ * @param quotaManager tracks work done this transaction and exposes the current limit
+ * @param continuation the continuation from the last successful transaction,
+ * or {@link StartContinuation#INSTANCE} on the very first call;
+ * on a retry this is the same continuation that was passed to the
+ * failed attempt
+ * @return a future containing the continuation describing where the next transaction
+ * should resume; return a continuation with {@link Continuation#hasMore()}
+ * {@code == false} to stop the loop
+ */
+ @Nonnull
+ CompletableFuture run(@Nonnull Transaction transaction,
+ @Nonnull QuotaManager quotaManager,
+ @Nonnull Continuation continuation);
+ }
+
+ // -------------------------------------------------------------------------
+ // QuotaManager
+ // -------------------------------------------------------------------------
+
+ /**
+ * Tracks per-transaction resource usage and exposes the adaptive limit.
+ *
+ * One instance is created per {@link #iterateAll} call. Per-transaction counts are reset at
+ * the start of each transaction; the limit persists and is adjusted by the runner between
+ * transactions.
+ *
+ *
+ * The task should call {@link #shouldContinue()} between processing items to decide whether
+ * to stop early. {@link #shouldContinue()} returns {@code false} once the processed count
+ * exceeds the active limit or the transaction has been running for longer than
+ * {@link #MAX_TRANSACTION_DURATION_MILLIS}.
+ *
+ */
+ public static class QuotaManager {
+ /** Maximum time a single transaction should run before {@link #shouldContinue()} returns {@code false}. */
+ static final long MAX_TRANSACTION_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(4);
+
+ private int processedCount;
+ private int deletedCount;
+ private int limit;
+ private final int maxLimit;
+ private long transactionStartTimeMillis;
+
+ QuotaManager(int maxLimit) {
+ this.maxLimit = maxLimit;
+ this.limit = maxLimit;
+ }
+
+ /**
+ * Return the current limit.
+ *
+ * The task should attempt at most this many units of work in the current transaction.
+ * A value of {@code 0} means no limit is active.
+ *
+ */
+ public int getLimit() {
+ return limit;
+ }
+
+ /**
+ * Return the number of items processed in the current transaction.
+ */
+ public int getProcessedCount() {
+ return processedCount;
+ }
+
+ /**
+ * Return the number of items deleted in the current transaction.
+ */
+ public int getDeletedCount() {
+ return deletedCount;
+ }
+
+ /**
+ * Increment the processed-item count by {@code count}.
+ *
+ * @param count number of items to add
+ */
+ public void processedCountAdd(int count) {
+ processedCount += count;
+ }
+
+ /**
+ * Increment the processed-item count by 1.
+ */
+ public void processedCountInc() {
+ processedCount++;
+ }
+
+ /**
+ * Increment the deleted-item count by {@code count}.
+ *
+ * @param count number of items to add
+ */
+ public void deletedCountAdd(int count) {
+ deletedCount += count;
+ }
+
+ /**
+ * Increment the deleted-item count by 1.
+ */
+ public void deletedCountInc() {
+ deletedCount++;
+ }
+
+ /**
+ * Returns {@code true} if the task should continue processing items in this transaction,
+ * {@code false} if it should stop and return a continuation.
+ *
+ * Returns {@code false} when either:
+ *
+ * - a limit is active ({@link #getLimit()} {@code > 0}) and the processed count
+ * exceeds it, or
+ * - the transaction has been running for longer than
+ * {@link #MAX_TRANSACTION_DURATION_MILLIS}.
+ *
+ *
+ */
+ public boolean shouldContinue() {
+ if (limit > 0 && processedCount > limit) {
+ return false;
+ }
+ return System.currentTimeMillis() - transactionStartTimeMillis < MAX_TRANSACTION_DURATION_MILLIS;
+ }
+
+ void initTransaction() {
+ processedCount = 0;
+ deletedCount = 0;
+ transactionStartTimeMillis = System.currentTimeMillis();
+ }
+
+ void increaseLimit() {
+ if (limit == 0) {
+ return; // no-limit mode: never adjust
+ }
+ final int increased = Math.max((limit * 5) / 4, limit + 4);
+ limit = maxLimit > 0 ? Math.min(maxLimit, increased) : increased;
+ }
+
+ void decreaseLimit() {
+ if (limit == 0) {
+ return; // no-limit mode: never adjust
+ }
+ // Use the count from the transaction that just failed (before the next initTransaction resets it)
+ limit = Math.max(1, (processedCount * 9) / 10);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Builder
+ // -------------------------------------------------------------------------
+
+ /**
+ * Create a new builder for a {@link ThrottledRetryingRunner}.
+ *
+ * @param database the database to open transactions against
+ * @param scheduledExecutor executor used for throttle delays between transactions
+ * @return a new builder
+ */
+ public static Builder builder(@Nonnull Database database,
+ @Nonnull ScheduledExecutorService scheduledExecutor) {
+ return new Builder(database, scheduledExecutor);
+ }
+
+ /**
+ * Builder for {@link ThrottledRetryingRunner}.
+ */
+ public static class Builder {
+ @Nonnull
+ private final Database database;
+ @Nonnull
+ private final ScheduledExecutorService scheduledExecutor;
+ private int maxItemsScannedPerSec = 0;
+ private int maxItemsDeletedPerSec = 0;
+ private int numOfRetries = DEFAULT_NUM_RETRIES;
+ private boolean commitWhenDone = true;
+ private int increaseLimitAfter = DEFAULT_INCREASE_LIMIT_AFTER;
+ private int maxLimit = 0;
+
+ private Builder(@Nonnull Database database, @Nonnull ScheduledExecutorService scheduledExecutor) {
+ this.database = database;
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ /**
+ * Set the maximum number of items that may be scanned (processed) per second.
+ * The runner will delay between transactions to stay at or below this rate.
+ * A value of {@code 0} (the default) disables scanned-item throttling.
+ *
+ * @param maxItemsScannedPerSec the rate limit; {@code 0} disables it
+ * @return this builder
+ */
+ public Builder withMaxItemsScannedPerSec(int maxItemsScannedPerSec) {
+ this.maxItemsScannedPerSec = Math.max(0, maxItemsScannedPerSec);
+ return this;
+ }
+
+ /**
+ * Set the maximum number of items that may be deleted per second.
+ * The runner will delay between transactions to stay at or below this rate.
+ * A value of {@code 0} (the default) disables deleted-item throttling.
+ *
+ * @param maxItemsDeletedPerSec the rate limit; {@code 0} disables it
+ * @return this builder
+ */
+ public Builder withMaxItemsDeletedPerSec(int maxItemsDeletedPerSec) {
+ this.maxItemsDeletedPerSec = Math.max(0, maxItemsDeletedPerSec);
+ return this;
+ }
+
+ /**
+ * Set the number of consecutive successful transactions required before the limit is
+ * increased. Defaults to {@value DEFAULT_INCREASE_LIMIT_AFTER}.
+ *
+ * @param increaseLimitAfter consecutive successes before a limit increase
+ * @return this builder
+ */
+ public Builder withIncreaseLimitAfter(int increaseLimitAfter) {
+ this.increaseLimitAfter = Math.max(1, increaseLimitAfter);
+ return this;
+ }
+
+ /**
+ * Set the initial and maximum per-transaction limit passed to the task via
+ * {@link QuotaManager#getLimit()}.
+ *
+ * The limit starts at this value and will not be increased beyond it. If {@code 0}
+ * (the default) the limit feature is disabled: {@link QuotaManager#getLimit()} always
+ * returns {@code 0} and is never adjusted.
+ *
+ *
+ * @param maxLimit the initial and maximum limit; {@code 0} disables limit management
+ * @return this builder
+ */
+ public Builder withMaxLimit(int maxLimit) {
+ this.maxLimit = Math.max(0, maxLimit);
+ return this;
+ }
+
+ /**
+ * Set the maximum number of times to retry a failed transaction before giving up.
+ * The counter resets after each successful commit. Defaults to {@value DEFAULT_NUM_RETRIES}.
+ *
+ * @param numOfRetries maximum retries
+ * @return this builder
+ */
+ public Builder withNumOfRetries(int numOfRetries) {
+ this.numOfRetries = Math.max(0, numOfRetries);
+ return this;
+ }
+
+ /**
+ * Set whether to commit each transaction when the task returns.
+ * If {@code false}, transactions are rolled back instead of committed (useful for read-only tasks).
+ * Defaults to {@code true}.
+ *
+ * @param commitWhenDone {@code true} to commit, {@code false} to roll back
+ * @return this builder
+ */
+ public Builder withCommitWhenDone(boolean commitWhenDone) {
+ this.commitWhenDone = commitWhenDone;
+ return this;
+ }
+
+ /**
+ * Build the {@link ThrottledRetryingRunner}.
+ *
+ * @return a new runner
+ */
+ public ThrottledRetryingRunner build() {
+ return new ThrottledRetryingRunner(this);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java
new file mode 100644
index 0000000000..e35df9b81c
--- /dev/null
+++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java
@@ -0,0 +1,146 @@
+/*
+ * TransactionalRunner.java
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apple.foundationdb.test;
+
+import com.apple.foundationdb.Database;
+import com.apple.foundationdb.Transaction;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A simple runner that opens and commits raw FDB transactions and ensures they are all closed on runner close.
+ *
+ * This is the raw-FDB analog of the record-layer {@code TransactionalRunner}. It operates at the
+ * {@link Database}/{@link Transaction} level, with no record-layer dependencies.
+ *
+ */
+public class TransactionalRunner implements AutoCloseable {
+
+ @Nonnull
+ private final Database database;
+ private boolean closed;
+ @Nonnull
+ private final List transactionsToClose;
+
+ public TransactionalRunner(@Nonnull Database database) {
+ this.database = database;
+ this.transactionsToClose = new ArrayList<>();
+ }
+
+ /**
+ * Run some code with a transaction, then commit it.
+ *
+ * The transaction will be committed if the future returned by the runnable completes successfully;
+ * otherwise it will not be committed. Either way, the transaction is closed when the future completes.
+ * If this runner is {@link #close() closed}, any open transaction will also be closed.
+ *
+ *
+ * @param runnable code to run with a fresh {@link Transaction}
+ * @param the type of value returned by the future
+ * @return a future containing the result, after the transaction has been committed
+ */
+ @Nonnull
+ @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"})
+ public CompletableFuture runAsync(@Nonnull Function super Transaction, CompletableFuture extends T>> runnable) {
+ return runAsync(true, runnable);
+ }
+
+ /**
+ * A flavor of {@link #runAsync(Function)} that supports read-only (non-committing) transactions.
+ *
+ * @param commitWhenDone if {@code true} the transaction is committed on success; if {@code false} it is only closed
+ * @param runnable code to run with a fresh {@link Transaction}
+ * @param the type of value returned by the future
+ * @return a future containing the result; if {@code commitWhenDone} is {@code true}, after the transaction has been committed
+ */
+ @Nonnull
+ @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"})
+ public CompletableFuture runAsync(boolean commitWhenDone,
+ @Nonnull Function super Transaction, CompletableFuture extends T>> runnable) {
+ Transaction transaction = openTransaction();
+ boolean returnedFuture = false;
+ try {
+ CompletableFuture future = runnable.apply(transaction)
+ .thenCompose(val -> {
+ if (commitWhenDone) {
+ return transaction.commit().thenApply(vignore -> val);
+ } else {
+ return CompletableFuture.completedFuture(val);
+ }
+ });
+ returnedFuture = true;
+ return future.whenComplete((result, exception) -> transaction.close());
+ } finally {
+ if (!returnedFuture) {
+ transaction.close();
+ }
+ }
+ }
+
+ /**
+ * Open a new transaction that will be closed when this runner is closed.
+ *
+ * @return a new {@link Transaction}
+ * @throws RunnerClosed if this runner has already been closed
+ */
+ @Nonnull
+ public Transaction openTransaction() {
+ if (closed) {
+ throw new RunnerClosed();
+ }
+ Transaction transaction = database.createTransaction();
+ addTransactionToClose(transaction);
+ return transaction;
+ }
+
+ private synchronized void addTransactionToClose(@Nonnull Transaction transaction) {
+ if (closed) {
+ transaction.close();
+ throw new RunnerClosed();
+ }
+ transactionsToClose.add(transaction);
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ transactionsToClose.forEach(Transaction::close);
+ transactionsToClose.clear();
+ this.closed = true;
+ }
+
+ /**
+ * Exception thrown when an operation is attempted on a closed runner.
+ */
+ public static class RunnerClosed extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public RunnerClosed() {
+ super("Runner has been closed");
+ }
+ }
+}