From c47b9487aeef5c4a2065496e41b8ffd37e2cb000 Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Mon, 2 Mar 2026 14:49:07 -0500 Subject: [PATCH 1/9] Initial pass at ThrottledRetryingRunner --- .../test/ThrottledRetryingRunner.java | 448 ++++++++++++++++++ .../test/TransactionalRunner.java | 146 ++++++ 2 files changed, 594 insertions(+) create mode 100644 fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java create mode 100644 fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java new file mode 100644 index 0000000000..1a6cdb3d2d --- /dev/null +++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java @@ -0,0 +1,448 @@ +/* + * ThrottledRetryingRunner.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.test; + +import com.apple.foundationdb.Database; +import com.apple.foundationdb.Transaction; +import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.async.MoreAsyncUtil; + +import javax.annotation.Nonnull; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +/** + * Runs a task repeatedly across multiple transactions, with adaptive limit management, optional + * rate limiting, and retry logic. + *

+ * The caller supplies a {@link BiFunction} that receives a fresh {@link Transaction} and a + * {@link QuotaManager}, and returns a {@code CompletableFuture}. The task reads + * {@link QuotaManager#getLimit()} to know how much work to attempt in this transaction, reports + * how many items it processed via {@link QuotaManager#processedCountInc()} (and optionally + * {@link QuotaManager#deletedCountInc()}), and signals completion by calling + * {@link QuotaManager#markExhausted()}. + *

+ *

+ * The runner adjusts the limit across transactions: + *

+ * + *

+ * A limit of {@code 0} is treated as "no limit" — the task may do as much work as it likes in + * each transaction and the limit is never adjusted. + *

+ *

+ * Between successful transactions the runner can optionally delay to enforce a maximum item + * processing or deletion rate ({@code maxItemsScannedPerSec} / {@code maxItemsDeletedPerSec}). + * On failure the transaction is retried up to {@code numOfRetries} times before failing. + *

+ */ +public class ThrottledRetryingRunner implements AutoCloseable { + + public static final int DEFAULT_NUM_RETRIES = 100; + public static final int DEFAULT_INCREASE_LIMIT_AFTER = 40; + + @Nonnull + private final TransactionalRunner transactionalRunner; + @Nonnull + private final ScheduledExecutorService scheduledExecutor; + private final int maxItemsScannedPerSec; + private final int maxItemsDeletedPerSec; + private final int numOfRetries; + private final boolean commitWhenDone; + private final int increaseLimitAfter; + private final int maxLimit; + + private boolean closed = false; + private long transactionStartTimeMillis = 0; + private int failureRetriesCounter = 0; + private int consecutiveSuccessCount = 0; + + private ThrottledRetryingRunner(Builder builder) { + this.transactionalRunner = new TransactionalRunner(builder.database); + this.scheduledExecutor = builder.scheduledExecutor; + this.maxItemsScannedPerSec = builder.maxItemsScannedPerSec; + this.maxItemsDeletedPerSec = builder.maxItemsDeletedPerSec; + this.numOfRetries = builder.numOfRetries; + this.commitWhenDone = builder.commitWhenDone; + this.increaseLimitAfter = builder.increaseLimitAfter; + this.maxLimit = builder.maxLimit; + } + + /** + * Run the given task repeatedly across multiple transactions until it calls + * {@link QuotaManager#markExhausted()}. + *

+ * A single {@link QuotaManager} is created for the lifetime of this call. Its per-transaction + * counts ({@link QuotaManager#getProcessedCount()}, {@link QuotaManager#getDeletedCount()}) + * are reset at the start of each transaction. Its limit ({@link QuotaManager#getLimit()}) is + * adjusted by the runner after each transaction and persists across them. + *

+ * + * @param task the task to run; reads the limit, reports counts, calls + * {@link QuotaManager#markExhausted()} to stop + * @return a future that completes normally when the task exhausts the source, or exceptionally + * if the retry limit is exceeded or the runner is closed + */ + @Nonnull + public CompletableFuture iterateAll( + @Nonnull BiFunction> task) { + if (closed) { + return CompletableFuture.failedFuture(new TransactionalRunner.RunnerClosed()); + } + + final QuotaManager quotaManager = new QuotaManager(maxLimit); + return AsyncUtil.whileTrue(() -> + runOneTransaction(task, quotaManager) + .handle((ignored, ex) -> { + if (ex == null) { + return handleSuccess(quotaManager); + } + return handleFailure(ex, quotaManager); + }) + .thenCompose(ret -> ret)); + } + + private CompletableFuture runOneTransaction( + @Nonnull BiFunction> task, + @Nonnull QuotaManager quotaManager) { + transactionStartTimeMillis = System.currentTimeMillis(); + return transactionalRunner.runAsync(commitWhenDone, transaction -> { + quotaManager.initTransaction(); + return task.apply(transaction, quotaManager); + }); + } + + private CompletableFuture handleSuccess(QuotaManager quotaManager) { + failureRetriesCounter = 0; + ++consecutiveSuccessCount; + + // Increase limit after enough consecutive successes, but only when a limit is active + if (quotaManager.limit > 0 && consecutiveSuccessCount >= increaseLimitAfter) { + quotaManager.increaseLimit(); + consecutiveSuccessCount = 0; + } + + if (!quotaManager.hasMore) { + return AsyncUtil.READY_FALSE; + } + + long elapsedMillis = Math.max(0, System.currentTimeMillis() - transactionStartTimeMillis); + long delayMillis = Collections.max(List.of( + throttlePerSecDelayMillis(elapsedMillis, maxItemsScannedPerSec, quotaManager.processedCount), + throttlePerSecDelayMillis(elapsedMillis, maxItemsDeletedPerSec, quotaManager.deletedCount) + )); + + if (delayMillis > 0) { + return MoreAsyncUtil.delayedFuture(delayMillis, TimeUnit.MILLISECONDS, scheduledExecutor) + .thenApply(ignore -> true); + } + return AsyncUtil.READY_TRUE; + } + + private CompletableFuture handleFailure(Throwable ex, QuotaManager quotaManager) { + ++failureRetriesCounter; + consecutiveSuccessCount = 0; + + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } + if (ex instanceof TransactionalRunner.RunnerClosed) { + return CompletableFuture.failedFuture(ex); + } + if (failureRetriesCounter > numOfRetries) { + return CompletableFuture.failedFuture(ex); + } + + // Decrease the limit based on how much work was in-flight when the failure occurred, + // so the next attempt is less likely to exceed the transaction budget. + quotaManager.decreaseLimit(); + + return AsyncUtil.READY_TRUE; // retry + } + + /** + * Compute how long to wait to keep the event rate at or below {@code maxPerSec}. + * Matches the formula used by {@code ThrottledRetryingIterator}. + */ + static long throttlePerSecDelayMillis(long elapsedMillis, int maxPerSec, int eventCount) { + if (maxPerSec <= 0) { + return 0; + } + long waitMillis = (TimeUnit.SECONDS.toMillis(eventCount) / maxPerSec) - elapsedMillis; + return waitMillis > 0 ? waitMillis : 0; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + transactionalRunner.close(); + } + + /** + * Tracks per-transaction resource usage, exposes the adaptive limit, and controls whether + * the loop continues. + *

+ * One instance is created per {@link #iterateAll} call. Per-transaction counts are reset at + * the start of each transaction; the limit persists and is adjusted by the runner between + * transactions. + *

+ *

+ * The task should call {@link #getLimit()} at the start of each transaction to know how much + * work to attempt, increment the counts as it goes, and call {@link #markExhausted()} when + * the source is fully consumed. + *

+ */ + public static class QuotaManager { + private int processedCount; + private int deletedCount; + private boolean hasMore; + private int limit; + private int lastProcessedCount; + private final int maxLimit; + + QuotaManager(int maxLimit) { + this.maxLimit = maxLimit; + this.limit = 0; + this.lastProcessedCount = 0; + } + + /** + * Return the current limit. + *

+ * The task should attempt at most this many units of work in the current transaction. + * A value of {@code 0} means no limit is active. + *

+ */ + public int getLimit() { + return limit; + } + + /** + * Return the number of items processed in the current transaction. + */ + public int getProcessedCount() { + return processedCount; + } + + /** + * Return the number of items deleted in the current transaction. + */ + public int getDeletedCount() { + return deletedCount; + } + + /** + * Increment the processed-item count by {@code count}. + * + * @param count number of items to add + */ + public void processedCountAdd(int count) { + processedCount += count; + } + + /** + * Increment the processed-item count by 1. + */ + public void processedCountInc() { + processedCount++; + } + + /** + * Increment the deleted-item count by {@code count}. + * + * @param count number of items to add + */ + public void deletedCountAdd(int count) { + deletedCount += count; + } + + /** + * Increment the deleted-item count by 1. + */ + public void deletedCountInc() { + deletedCount++; + } + + /** + * Signal that the source is exhausted. The loop will stop after the current transaction + * commits, without starting a new one. + */ + public void markExhausted() { + hasMore = false; + } + + void initTransaction() { + lastProcessedCount = processedCount; + processedCount = 0; + deletedCount = 0; + hasMore = true; + } + + void increaseLimit() { + if (limit == 0) { + return; // no-limit mode: never adjust + } + final int increased = Math.max((limit * 5) / 4, limit + 4); + limit = maxLimit > 0 ? Math.min(maxLimit, increased) : increased; + } + + void decreaseLimit() { + if (limit == 0) { + return; // no-limit mode: never adjust + } + limit = Math.max(1, (lastProcessedCount * 9) / 10); + } + } + + /** + * Create a new builder for a {@link ThrottledRetryingRunner}. + * + * @param database the database to open transactions against + * @param scheduledExecutor executor used for throttle delays between transactions + * @return a new builder + */ + public static Builder builder(@Nonnull Database database, + @Nonnull ScheduledExecutorService scheduledExecutor) { + return new Builder(database, scheduledExecutor); + } + + /** + * Builder for {@link ThrottledRetryingRunner}. + */ + public static class Builder { + @Nonnull + private final Database database; + @Nonnull + private final ScheduledExecutorService scheduledExecutor; + private int maxItemsScannedPerSec = 0; + private int maxItemsDeletedPerSec = 0; + private int numOfRetries = DEFAULT_NUM_RETRIES; + private boolean commitWhenDone = true; + private int increaseLimitAfter = DEFAULT_INCREASE_LIMIT_AFTER; + private int maxLimit = 0; + + private Builder(@Nonnull Database database, @Nonnull ScheduledExecutorService scheduledExecutor) { + this.database = database; + this.scheduledExecutor = scheduledExecutor; + } + + /** + * Set the maximum number of items that may be scanned (processed) per second. + * The runner will delay between transactions to stay at or below this rate. + * A value of {@code 0} (the default) disables scanned-item throttling. + * + * @param maxItemsScannedPerSec the rate limit; {@code 0} disables it + * @return this builder + */ + public Builder withMaxItemsScannedPerSec(int maxItemsScannedPerSec) { + this.maxItemsScannedPerSec = Math.max(0, maxItemsScannedPerSec); + return this; + } + + /** + * Set the maximum number of items that may be deleted per second. + * The runner will delay between transactions to stay at or below this rate. + * A value of {@code 0} (the default) disables deleted-item throttling. + * + * @param maxItemsDeletedPerSec the rate limit; {@code 0} disables it + * @return this builder + */ + public Builder withMaxItemsDeletedPerSec(int maxItemsDeletedPerSec) { + this.maxItemsDeletedPerSec = Math.max(0, maxItemsDeletedPerSec); + return this; + } + + /** + * Set the number of consecutive successful transactions required before the limit is + * increased. Defaults to {@value DEFAULT_INCREASE_LIMIT_AFTER}. + * + * @param increaseLimitAfter consecutive successes before a limit increase + * @return this builder + */ + public Builder withIncreaseLimitAfter(int increaseLimitAfter) { + this.increaseLimitAfter = Math.max(1, increaseLimitAfter); + return this; + } + + /** + * Set the initial and maximum per-transaction limit passed to the task via + * {@link QuotaManager#getLimit()}. + *

+ * The limit starts at this value. It will not be increased beyond {@code maxLimit} on + * success. If {@code maxLimit} is {@code 0} (the default) the limit feature is disabled: + * {@link QuotaManager#getLimit()} always returns {@code 0} and is never adjusted. + *

+ * + * @param maxLimit the initial and maximum limit; {@code 0} disables limit management + * @return this builder + */ + public Builder withMaxLimit(int maxLimit) { + this.maxLimit = Math.max(0, maxLimit); + return this; + } + + /** + * Set the maximum number of times to retry a failed transaction before giving up. + * The counter resets after each successful commit. Defaults to {@value DEFAULT_NUM_RETRIES}. + * + * @param numOfRetries maximum retries + * @return this builder + */ + public Builder withNumOfRetries(int numOfRetries) { + this.numOfRetries = Math.max(0, numOfRetries); + return this; + } + + /** + * Set whether to commit each transaction when the task returns. + * If {@code false}, transactions are rolled back instead of committed (useful for read-only tasks). + * Defaults to {@code true}. + * + * @param commitWhenDone {@code true} to commit, {@code false} to roll back + * @return this builder + */ + public Builder withCommitWhenDone(boolean commitWhenDone) { + this.commitWhenDone = commitWhenDone; + return this; + } + + /** + * Build the {@link ThrottledRetryingRunner}. + * + * @return a new runner + */ + public ThrottledRetryingRunner build() { + return new ThrottledRetryingRunner(this); + } + } +} diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java new file mode 100644 index 0000000000..e35df9b81c --- /dev/null +++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java @@ -0,0 +1,146 @@ +/* + * TransactionalRunner.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.test; + +import com.apple.foundationdb.Database; +import com.apple.foundationdb.Transaction; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * A simple runner that opens and commits raw FDB transactions and ensures they are all closed on runner close. + *

+ * This is the raw-FDB analog of the record-layer {@code TransactionalRunner}. It operates at the + * {@link Database}/{@link Transaction} level, with no record-layer dependencies. + *

+ */ +public class TransactionalRunner implements AutoCloseable { + + @Nonnull + private final Database database; + private boolean closed; + @Nonnull + private final List transactionsToClose; + + public TransactionalRunner(@Nonnull Database database) { + this.database = database; + this.transactionsToClose = new ArrayList<>(); + } + + /** + * Run some code with a transaction, then commit it. + *

+ * The transaction will be committed if the future returned by the runnable completes successfully; + * otherwise it will not be committed. Either way, the transaction is closed when the future completes. + * If this runner is {@link #close() closed}, any open transaction will also be closed. + *

+ * + * @param runnable code to run with a fresh {@link Transaction} + * @param the type of value returned by the future + * @return a future containing the result, after the transaction has been committed + */ + @Nonnull + @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) + public CompletableFuture runAsync(@Nonnull Function> runnable) { + return runAsync(true, runnable); + } + + /** + * A flavor of {@link #runAsync(Function)} that supports read-only (non-committing) transactions. + * + * @param commitWhenDone if {@code true} the transaction is committed on success; if {@code false} it is only closed + * @param runnable code to run with a fresh {@link Transaction} + * @param the type of value returned by the future + * @return a future containing the result; if {@code commitWhenDone} is {@code true}, after the transaction has been committed + */ + @Nonnull + @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) + public CompletableFuture runAsync(boolean commitWhenDone, + @Nonnull Function> runnable) { + Transaction transaction = openTransaction(); + boolean returnedFuture = false; + try { + CompletableFuture future = runnable.apply(transaction) + .thenCompose(val -> { + if (commitWhenDone) { + return transaction.commit().thenApply(vignore -> val); + } else { + return CompletableFuture.completedFuture(val); + } + }); + returnedFuture = true; + return future.whenComplete((result, exception) -> transaction.close()); + } finally { + if (!returnedFuture) { + transaction.close(); + } + } + } + + /** + * Open a new transaction that will be closed when this runner is closed. + * + * @return a new {@link Transaction} + * @throws RunnerClosed if this runner has already been closed + */ + @Nonnull + public Transaction openTransaction() { + if (closed) { + throw new RunnerClosed(); + } + Transaction transaction = database.createTransaction(); + addTransactionToClose(transaction); + return transaction; + } + + private synchronized void addTransactionToClose(@Nonnull Transaction transaction) { + if (closed) { + transaction.close(); + throw new RunnerClosed(); + } + transactionsToClose.add(transaction); + } + + @Override + public synchronized void close() { + if (closed) { + return; + } + transactionsToClose.forEach(Transaction::close); + transactionsToClose.clear(); + this.closed = true; + } + + /** + * Exception thrown when an operation is attempted on a closed runner. + */ + public static class RunnerClosed extends RuntimeException { + private static final long serialVersionUID = 1L; + + public RunnerClosed() { + super("Runner has been closed"); + } + } +} From 85181edd9faf1405f8a6a7bb213e705a73c133c2 Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Wed, 4 Mar 2026 17:03:59 -0500 Subject: [PATCH 2/9] Add some tests of ThrottledRetryingRunnerTest --- .../test/ThrottledRetryingRunnerTest.java | 641 ++++++++++++++++++ .../test/ThrottledRetryingRunner.java | 8 +- 2 files changed, 644 insertions(+), 5 deletions(-) create mode 100644 fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java new file mode 100644 index 0000000000..7a6b4cbee2 --- /dev/null +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java @@ -0,0 +1,641 @@ +/* + * ThrottledRetryingRunnerTest.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.test; + +import com.apple.foundationdb.Database; +import com.apple.foundationdb.Transaction; +import com.apple.foundationdb.subspace.Subspace; +import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.Tags; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.catchThrowableOfType; + +@Tag(Tags.RequiresFDB) +class ThrottledRetryingRunnerTest { + @RegisterExtension + static final TestDatabaseExtension dbExtension = new TestDatabaseExtension(); + @RegisterExtension + TestSubspaceExtension subspaceExtension = new TestSubspaceExtension(dbExtension); + + private Database db; + private Subspace subspace; + private ScheduledExecutorService scheduledExecutor; + + @BeforeEach + void setUp() { + db = dbExtension.getDatabase(); + subspace = subspaceExtension.getSubspace(); + scheduledExecutor = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder().setNameFormat("throttled-runner-test-%d").build()); + } + + @AfterEach + void tearDown() { + scheduledExecutor.shutdown(); + } + + // ------------------------------------------------------------------------- + // throttlePerSecDelayMillis static helper + // ------------------------------------------------------------------------- + + @ParameterizedTest + @CsvSource({ + // elapsedMs, maxPerSec, eventCount, expectedDelayMs + "1000, 100, 0, 0", // no events: no delay + "1000, 100, 100, 0", // exactly right rate: no delay + "1000, 100, 80, 0", // below rate: no delay + "1000, 100, 200, 1000", // double the rate: 1s delay + " 250, 50, 100, 1750", // 100 events should take 2s, only 250ms elapsed + " 1, 50, 100, 1999", // nearly instant: almost full 2s delay + " 0, 100, 0, 0", // zero events, no limit + "1000, 0, 999, 0", // maxPerSec=0 means disabled + }) + void testThrottlePerSecDelayMillis(long elapsedMs, int maxPerSec, int eventCount, long expectedDelay) { + assertThat(ThrottledRetryingRunner.throttlePerSecDelayMillis(elapsedMs, maxPerSec, eventCount)) + .isEqualTo(expectedDelay); + } + + // ------------------------------------------------------------------------- + // QuotaManager limit adjustment + // ------------------------------------------------------------------------- + + @Test + void testIncreaseLimitFormula() { + // limit=0 is no-limit mode; increaseLimit is a no-op + ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(0); + qm.increaseLimit(); + assertThat(qm.getLimit()).isEqualTo(0); + + // With a real maxLimit, decreaseLimit reads processedCount directly (before initTransaction) + ThrottledRetryingRunner.QuotaManager qm2 = new ThrottledRetryingRunner.QuotaManager(1000); + qm2.processedCountAdd(80); + qm2.decreaseLimit(); // limit = max(1, 80*9/10) = 72 + assertThat(qm2.getLimit()).isEqualTo(72); + qm2.increaseLimit(); // limit = max(72*5/4, 72+4) = max(90, 76) = 90 + assertThat(qm2.getLimit()).isEqualTo(90); + } + + @Test + void testDecreaseLimitFormula() { + ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(1000); + // Process 100 items, then fail → limit = max(1, 100*9/10) = 90 + qm.processedCountAdd(100); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(90); + + // Process 10 items, fail again → max(1, 10*9/10) = 9 + qm.initTransaction(); + qm.processedCountAdd(10); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(9); + + // Decrease floors at 1 + qm.initTransaction(); + qm.processedCountInc(); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(1); + + // Zero processed also floors at 1 + qm.initTransaction(); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(1); + } + + @Test + void testDecreaseLimitNoOpWhenLimitIsZero() { + ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(0); + qm.processedCountAdd(100); + qm.initTransaction(); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(0); + } + + @Test + void testLimitCappedAtMaxLimit() { + ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(50); + // Drive limit down by simulating a failure with 40 in-flight items + qm.processedCountAdd(40); + qm.decreaseLimit(); // limit = max(1, 40*9/10) = 36 + // Keep increasing until we hit the cap + for (int i = 0; i < 100; i++) { + qm.increaseLimit(); + } + assertThat(qm.getLimit()).isEqualTo(50); // capped at maxLimit + } + + // ------------------------------------------------------------------------- + // Basic iteration: task runs the expected number of times + // ------------------------------------------------------------------------- + + @Test + void taskRunsUntilExhausted() { + final int iterations = 5; + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota) -> { + int n = callCount.incrementAndGet(); + if (n >= iterations) { + quota.markExhausted(); + } + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(callCount.get()).isEqualTo(iterations); + } + + @Test + void taskExhaustedImmediatelyRunsOnce() { + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota) -> { + callCount.incrementAndGet(); + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(callCount.get()).isEqualTo(1); + } + + // ------------------------------------------------------------------------- + // cannotRunWhenClosed + // ------------------------------------------------------------------------- + + @Test + void cannotRunWhenClosed() { + ThrottledRetryingRunner runner = runnerBuilder().build(); + runner.iterateAll((tr, quota) -> { + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join(); + runner.close(); + + assertThatThrownBy(() -> + runner.iterateAll((tr, quota) -> { + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join()) + .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class); + } + + // ------------------------------------------------------------------------- + // Retries on failure + // ------------------------------------------------------------------------- + + @ParameterizedTest + @ValueSource(ints = {0, 1, 3, 10}) + void retriesCorrectNumberOfTimes(int numRetries) { + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(numRetries) + .build()) { + Throwable ex = catchThrowableOfType(RuntimeException.class, () -> + runner.iterateAll((tr, quota) -> { + callCount.incrementAndGet(); + return CompletableFuture.failedFuture(new RuntimeException("always fails")); + }).join()); + + assertThat(ex).isNotNull(); + assertThat(ex.getMessage()).contains("always fails"); + } + + // First attempt + numRetries retries + assertThat(callCount.get()).isEqualTo(numRetries + 1); + } + + @Test + void retryCounterResetsAfterSuccess() { + // Fail twice, succeed, then fail twice again — should not exhaust the retry budget + // because the counter resets on each success. + AtomicInteger totalCalls = new AtomicInteger(0); + AtomicInteger successCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(2) + .build()) { + runner.iterateAll((tr, quota) -> { + int call = totalCalls.incrementAndGet(); + // Fail on calls 1 and 2 + if (call == 1 || call == 2) { + return CompletableFuture.failedFuture(new RuntimeException("transient")); + } + // Succeed on call 3 (resets counter), then fail on 4 and 5, succeed on 6, done + if (call == 4 || call == 5) { + return CompletableFuture.failedFuture(new RuntimeException("transient")); + } + successCount.incrementAndGet(); + if (successCount.get() == 2) { + quota.markExhausted(); + } + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(successCount.get()).isEqualTo(2); + } + + @Test + void runnerClosedDuringIterationAbortsImmediately() { + AtomicInteger callCount = new AtomicInteger(0); + + ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(100).build(); + runner.close(); // close before iterating + + assertThatThrownBy(() -> + runner.iterateAll((tr, quota) -> { + callCount.incrementAndGet(); + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join()) + .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class); + + // iterateAll checks closed before starting, so the task is never called + assertThat(callCount.get()).isEqualTo(0); + } + + // ------------------------------------------------------------------------- + // Counts: processedCount and deletedCount + // ------------------------------------------------------------------------- + + @Test + void countsAreResetEachTransaction() { + // Each transaction should see processedCount starting at 0 + List observedCounts = new ArrayList<>(); + AtomicInteger totalCalls = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota) -> { + observedCounts.add(quota.getProcessedCount()); // always 0 at start + quota.processedCountAdd(10); + if (totalCalls.incrementAndGet() >= 3) { + quota.markExhausted(); + } + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(observedCounts).containsExactly(0, 0, 0); + } + + @Test + void processedAndDeletedCountsAreIndependent() { + AtomicInteger observedProcessed = new AtomicInteger(-1); + AtomicInteger observedDeleted = new AtomicInteger(-1); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota) -> { + quota.processedCountAdd(7); + quota.deletedCountAdd(3); + observedProcessed.set(quota.getProcessedCount()); + observedDeleted.set(quota.getDeletedCount()); + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(observedProcessed.get()).isEqualTo(7); + assertThat(observedDeleted.get()).isEqualTo(3); + } + + // ------------------------------------------------------------------------- + // Limit: adaptive adjustment + // ------------------------------------------------------------------------- + + @Test + void limitStartsAtMaxLimitAndIsExposedToTask() { + AtomicInteger observedLimit = new AtomicInteger(-1); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withMaxLimit(50) + .build()) { + runner.iterateAll((tr, quota) -> { + observedLimit.set(quota.getLimit()); + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(observedLimit.get()).isEqualTo(50); + } + + @Test + void limitIsZeroWhenMaxLimitNotConfigured() { + AtomicInteger observedLimit = new AtomicInteger(-1); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota) -> { + observedLimit.set(quota.getLimit()); + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(observedLimit.get()).isEqualTo(0); + } + + @Test + void limitDecreasesOnFailureThenIncreasesOnSuccess() { + // On failure: limit = max(1, processedCount * 9/10) + // On success: after increaseLimitAfter consecutive successes, limit increases + AtomicInteger callCount = new AtomicInteger(0); + List observedLimits = new ArrayList<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withMaxLimit(200) + .withIncreaseLimitAfter(2) // increase after 2 successes to keep the test short + .withNumOfRetries(10) + .build()) { + runner.iterateAll((tr, quota) -> { + int call = callCount.incrementAndGet(); + observedLimits.add(quota.getLimit()); + + if (call == 1) { + // Process 100 items and fail → next limit = 90 + quota.processedCountAdd(100); + return CompletableFuture.failedFuture(new RuntimeException("fail")); + } + if (call <= 3) { + // Two successes → limit increases from 90 + quota.processedCountInc(); + return CompletableFuture.completedFuture(null); // keep going + } + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join(); + } + + // call 1: limit=200 (initial), fails with 100 processed → new limit = 90 + assertThat(observedLimits.get(0)).isEqualTo(200); + // call 2: limit=90 + assertThat(observedLimits.get(1)).isEqualTo(90); + // call 3: limit=90 still (need 2 consecutive successes before increase) + assertThat(observedLimits.get(2)).isEqualTo(90); + // call 4: after 2 successes, limit increased: max(90*5/4, 90+4) = max(112,94) = 112 + assertThat(observedLimits.get(3)).isEqualTo(112); + } + + @Test + void limitDecreasesProgressivelyTowardOne() { + // Each failure with very few processed items should converge the limit toward 1 + AtomicInteger callCount = new AtomicInteger(0); + List observedLimits = new ArrayList<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withMaxLimit(1000) + .withNumOfRetries(20) + .build()) { + runner.iterateAll((tr, quota) -> { + int call = callCount.incrementAndGet(); + observedLimits.add(quota.getLimit()); + if (call <= 10) { + // Process 1 item each time → limit = max(1, 1*9/10) = 1 after first decrease + quota.processedCountInc(); + return CompletableFuture.failedFuture(new RuntimeException("fail")); + } + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join(); + } + + // After first failure with 1 processed: max(1, 0) = 1 + assertThat(observedLimits.get(1)).isEqualTo(1); + // Stays at 1 regardless of further failures + for (int i = 2; i < 10; i++) { + assertThat(observedLimits.get(i)).isEqualTo(1); + } + } + + @Test + void limitIsNeverAdjustedWhenMaxLimitIsZero() { + AtomicInteger callCount = new AtomicInteger(0); + List observedLimits = new ArrayList<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(5) + // no withMaxLimit → limit stays 0 + .build()) { + runner.iterateAll((tr, quota) -> { + int call = callCount.incrementAndGet(); + observedLimits.add(quota.getLimit()); + quota.processedCountAdd(100); + if (call <= 3) { + return CompletableFuture.failedFuture(new RuntimeException("fail")); + } + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(observedLimits).containsOnly(0); + } + + // ------------------------------------------------------------------------- + // commitWhenDone: writes are visible (commit) or not (no commit) + // ------------------------------------------------------------------------- + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void commitWhenDoneControlsWhetherWritesArePersisted(boolean commitWhenDone) { + byte[] key = subspace.pack(Tuple.from("commit-test")); + byte[] value = "hello".getBytes(); + + // Write a key inside iterateAll + try (ThrottledRetryingRunner runner = runnerBuilder() + .withCommitWhenDone(commitWhenDone) + .build()) { + runner.iterateAll((tr, quota) -> { + tr.set(key, value); + quota.markExhausted(); + return CompletableFuture.completedFuture(null); + }).join(); + } + + // Check whether the key is actually present + byte[] read = db.run(tr -> tr.get(key).join()); + if (commitWhenDone) { + assertThat(read).isEqualTo(value); + } else { + assertThat(read).isNull(); + } + } + + // ------------------------------------------------------------------------- + // Each transaction gets a fresh Transaction object + // ------------------------------------------------------------------------- + + @Test + void eachIterationReceivesADistinctTransaction() { + List seenTransactions = new ArrayList<>(); + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota) -> { + seenTransactions.add(tr); + if (callCount.incrementAndGet() >= 3) { + quota.markExhausted(); + } + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(seenTransactions).hasSize(3); + // All three must be distinct objects + assertThat(seenTransactions.get(0)).isNotSameAs(seenTransactions.get(1)); + assertThat(seenTransactions.get(1)).isNotSameAs(seenTransactions.get(2)); + } + + // ------------------------------------------------------------------------- + // Rate limiting: elapsed time grows when throttling is applied + // ------------------------------------------------------------------------- + + @Test + void throttlingByScannedItemsSlowsItDown() { + // 3 transactions each processing 100 items at max 50/sec → each batch should take ≥2s + // Use a low count to keep the test fast: 3 transactions × 10 items at max 20/sec = ≥1.5s + final int itemsPerTransaction = 10; + final int maxPerSec = 20; + final int transactions = 3; + AtomicInteger callCount = new AtomicInteger(0); + + long start = System.currentTimeMillis(); + try (ThrottledRetryingRunner runner = runnerBuilder() + .withMaxItemsScannedPerSec(maxPerSec) + .build()) { + runner.iterateAll((tr, quota) -> { + quota.processedCountAdd(itemsPerTransaction); + if (callCount.incrementAndGet() >= transactions) { + quota.markExhausted(); + } + return CompletableFuture.completedFuture(null); + }).join(); + } + long elapsed = System.currentTimeMillis() - start; + + // totalItems / maxPerSec = (3*10)/20 = 1.5s minimum + long expectedMinMs = TimeUnit.SECONDS.toMillis(itemsPerTransaction * transactions / maxPerSec); + assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs); + } + + @Test + void throttlingByDeletedItemsSlowsItDown() { + final int deletesPerTransaction = 10; + final int maxPerSec = 20; + final int transactions = 3; + AtomicInteger callCount = new AtomicInteger(0); + + long start = System.currentTimeMillis(); + try (ThrottledRetryingRunner runner = runnerBuilder() + .withMaxItemsDeletedPerSec(maxPerSec) + .build()) { + runner.iterateAll((tr, quota) -> { + quota.deletedCountAdd(deletesPerTransaction); + if (callCount.incrementAndGet() >= transactions) { + quota.markExhausted(); + } + return CompletableFuture.completedFuture(null); + }).join(); + } + long elapsed = System.currentTimeMillis() - start; + + long expectedMinMs = TimeUnit.SECONDS.toMillis(deletesPerTransaction * transactions / maxPerSec); + assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs); + } + + @Test + void noThrottlingWithZeroRateLimit() { + // With no rate limit configured the runner should complete nearly instantly + // (well under 1 second for trivial work). We just assert it completes normally. + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota) -> { + quota.processedCountAdd(1000); // lots of items but no limit configured + if (callCount.incrementAndGet() >= 5) { + quota.markExhausted(); + } + return CompletableFuture.completedFuture(null); + }).join(); + } + + assertThat(callCount.get()).isEqualTo(5); + } + + // ------------------------------------------------------------------------- + // Real DB writes: data persisted across transactions within one iterateAll + // ------------------------------------------------------------------------- + + @Test + void dataWrittenInEachTransactionIsVisible() { + // Write one key per transaction, confirm all are present at the end + final int numTransactions = 5; + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota) -> { + int n = callCount.incrementAndGet(); + tr.set(subspace.pack(Tuple.from(n)), Tuple.from(n).pack()); + if (n >= numTransactions) { + quota.markExhausted(); + } + return CompletableFuture.completedFuture(null); + }).join(); + } + + db.run(tr -> { + for (int i = 1; i <= numTransactions; i++) { + byte[] val = tr.get(subspace.pack(Tuple.from(i))).join(); + assertThat(val).isEqualTo(Tuple.from(i).pack()); + } + return null; + }); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private ThrottledRetryingRunner.Builder runnerBuilder() { + return ThrottledRetryingRunner.builder(db, scheduledExecutor); + } +} diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java index 1a6cdb3d2d..c0131f421e 100644 --- a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java +++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java @@ -227,13 +227,11 @@ public static class QuotaManager { private int deletedCount; private boolean hasMore; private int limit; - private int lastProcessedCount; private final int maxLimit; QuotaManager(int maxLimit) { this.maxLimit = maxLimit; - this.limit = 0; - this.lastProcessedCount = 0; + this.limit = maxLimit; } /** @@ -302,7 +300,6 @@ public void markExhausted() { } void initTransaction() { - lastProcessedCount = processedCount; processedCount = 0; deletedCount = 0; hasMore = true; @@ -320,7 +317,8 @@ void decreaseLimit() { if (limit == 0) { return; // no-limit mode: never adjust } - limit = Math.max(1, (lastProcessedCount * 9) / 10); + // Use the count from the transaction that just failed (before the next initTransaction resets it) + limit = Math.max(1, (processedCount * 9) / 10); } } From 880cb58914d3f98d861cffadbdd3492fc79367d4 Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Wed, 4 Mar 2026 17:26:31 -0500 Subject: [PATCH 3/9] Change ThrottledRetryingRunner to return a continuation --- .../test/ThrottledRetryingRunnerTest.java | 293 +++++++++++++----- .../test/ThrottledRetryingRunner.java | 188 +++++++---- 2 files changed, 343 insertions(+), 138 deletions(-) diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java index 7a6b4cbee2..215f60944a 100644 --- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -170,12 +171,11 @@ void taskRunsUntilExhausted() { AtomicInteger callCount = new AtomicInteger(0); try (ThrottledRetryingRunner runner = runnerBuilder().build()) { - runner.iterateAll((tr, quota) -> { - int n = callCount.incrementAndGet(); - if (n >= iterations) { - quota.markExhausted(); + runner.iterateAll((tr, quota, cont) -> { + if (callCount.incrementAndGet() >= iterations) { + return exhausted(); } - return CompletableFuture.completedFuture(null); + return hasMore(); }).join(); } @@ -187,10 +187,9 @@ void taskExhaustedImmediatelyRunsOnce() { AtomicInteger callCount = new AtomicInteger(0); try (ThrottledRetryingRunner runner = runnerBuilder().build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { callCount.incrementAndGet(); - quota.markExhausted(); - return CompletableFuture.completedFuture(null); + return exhausted(); }).join(); } @@ -204,17 +203,11 @@ void taskExhaustedImmediatelyRunsOnce() { @Test void cannotRunWhenClosed() { ThrottledRetryingRunner runner = runnerBuilder().build(); - runner.iterateAll((tr, quota) -> { - quota.markExhausted(); - return CompletableFuture.completedFuture(null); - }).join(); + runner.iterateAll((tr, quota, cont) -> exhausted()).join(); runner.close(); assertThatThrownBy(() -> - runner.iterateAll((tr, quota) -> { - quota.markExhausted(); - return CompletableFuture.completedFuture(null); - }).join()) + runner.iterateAll((tr, quota, cont) -> exhausted()).join()) .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class); } @@ -231,7 +224,7 @@ void retriesCorrectNumberOfTimes(int numRetries) { .withNumOfRetries(numRetries) .build()) { Throwable ex = catchThrowableOfType(RuntimeException.class, () -> - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { callCount.incrementAndGet(); return CompletableFuture.failedFuture(new RuntimeException("always fails")); }).join()); @@ -254,7 +247,7 @@ void retryCounterResetsAfterSuccess() { try (ThrottledRetryingRunner runner = runnerBuilder() .withNumOfRetries(2) .build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { int call = totalCalls.incrementAndGet(); // Fail on calls 1 and 2 if (call == 1 || call == 2) { @@ -266,9 +259,9 @@ void retryCounterResetsAfterSuccess() { } successCount.incrementAndGet(); if (successCount.get() == 2) { - quota.markExhausted(); + return exhausted(); } - return CompletableFuture.completedFuture(null); + return hasMore(); }).join(); } @@ -283,10 +276,9 @@ void runnerClosedDuringIterationAbortsImmediately() { runner.close(); // close before iterating assertThatThrownBy(() -> - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { callCount.incrementAndGet(); - quota.markExhausted(); - return CompletableFuture.completedFuture(null); + return exhausted(); }).join()) .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class); @@ -305,13 +297,13 @@ void countsAreResetEachTransaction() { AtomicInteger totalCalls = new AtomicInteger(0); try (ThrottledRetryingRunner runner = runnerBuilder().build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { observedCounts.add(quota.getProcessedCount()); // always 0 at start quota.processedCountAdd(10); if (totalCalls.incrementAndGet() >= 3) { - quota.markExhausted(); + return exhausted(); } - return CompletableFuture.completedFuture(null); + return hasMore(); }).join(); } @@ -324,13 +316,12 @@ void processedAndDeletedCountsAreIndependent() { AtomicInteger observedDeleted = new AtomicInteger(-1); try (ThrottledRetryingRunner runner = runnerBuilder().build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { quota.processedCountAdd(7); quota.deletedCountAdd(3); observedProcessed.set(quota.getProcessedCount()); observedDeleted.set(quota.getDeletedCount()); - quota.markExhausted(); - return CompletableFuture.completedFuture(null); + return exhausted(); }).join(); } @@ -349,10 +340,9 @@ void limitStartsAtMaxLimitAndIsExposedToTask() { try (ThrottledRetryingRunner runner = runnerBuilder() .withMaxLimit(50) .build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { observedLimit.set(quota.getLimit()); - quota.markExhausted(); - return CompletableFuture.completedFuture(null); + return exhausted(); }).join(); } @@ -364,10 +354,9 @@ void limitIsZeroWhenMaxLimitNotConfigured() { AtomicInteger observedLimit = new AtomicInteger(-1); try (ThrottledRetryingRunner runner = runnerBuilder().build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { observedLimit.set(quota.getLimit()); - quota.markExhausted(); - return CompletableFuture.completedFuture(null); + return exhausted(); }).join(); } @@ -376,8 +365,6 @@ void limitIsZeroWhenMaxLimitNotConfigured() { @Test void limitDecreasesOnFailureThenIncreasesOnSuccess() { - // On failure: limit = max(1, processedCount * 9/10) - // On success: after increaseLimitAfter consecutive successes, limit increases AtomicInteger callCount = new AtomicInteger(0); List observedLimits = new ArrayList<>(); @@ -386,7 +373,7 @@ void limitDecreasesOnFailureThenIncreasesOnSuccess() { .withIncreaseLimitAfter(2) // increase after 2 successes to keep the test short .withNumOfRetries(10) .build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { int call = callCount.incrementAndGet(); observedLimits.add(quota.getLimit()); @@ -398,10 +385,9 @@ void limitDecreasesOnFailureThenIncreasesOnSuccess() { if (call <= 3) { // Two successes → limit increases from 90 quota.processedCountInc(); - return CompletableFuture.completedFuture(null); // keep going + return hasMore(); } - quota.markExhausted(); - return CompletableFuture.completedFuture(null); + return exhausted(); }).join(); } @@ -417,7 +403,6 @@ void limitDecreasesOnFailureThenIncreasesOnSuccess() { @Test void limitDecreasesProgressivelyTowardOne() { - // Each failure with very few processed items should converge the limit toward 1 AtomicInteger callCount = new AtomicInteger(0); List observedLimits = new ArrayList<>(); @@ -425,20 +410,18 @@ void limitDecreasesProgressivelyTowardOne() { .withMaxLimit(1000) .withNumOfRetries(20) .build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { int call = callCount.incrementAndGet(); observedLimits.add(quota.getLimit()); if (call <= 10) { - // Process 1 item each time → limit = max(1, 1*9/10) = 1 after first decrease - quota.processedCountInc(); + quota.processedCountInc(); // 1 item processed → limit floor at 1 return CompletableFuture.failedFuture(new RuntimeException("fail")); } - quota.markExhausted(); - return CompletableFuture.completedFuture(null); + return exhausted(); }).join(); } - // After first failure with 1 processed: max(1, 0) = 1 + // After first failure with 1 processed: max(1, 1*9/10=0) = 1 assertThat(observedLimits.get(1)).isEqualTo(1); // Stays at 1 regardless of further failures for (int i = 2; i < 10; i++) { @@ -455,15 +438,14 @@ void limitIsNeverAdjustedWhenMaxLimitIsZero() { .withNumOfRetries(5) // no withMaxLimit → limit stays 0 .build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { int call = callCount.incrementAndGet(); observedLimits.add(quota.getLimit()); quota.processedCountAdd(100); if (call <= 3) { return CompletableFuture.failedFuture(new RuntimeException("fail")); } - quota.markExhausted(); - return CompletableFuture.completedFuture(null); + return exhausted(); }).join(); } @@ -480,18 +462,15 @@ void commitWhenDoneControlsWhetherWritesArePersisted(boolean commitWhenDone) { byte[] key = subspace.pack(Tuple.from("commit-test")); byte[] value = "hello".getBytes(); - // Write a key inside iterateAll try (ThrottledRetryingRunner runner = runnerBuilder() .withCommitWhenDone(commitWhenDone) .build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { tr.set(key, value); - quota.markExhausted(); - return CompletableFuture.completedFuture(null); + return exhausted(); }).join(); } - // Check whether the key is actually present byte[] read = db.run(tr -> tr.get(key).join()); if (commitWhenDone) { assertThat(read).isEqualTo(value); @@ -510,29 +489,26 @@ void eachIterationReceivesADistinctTransaction() { AtomicInteger callCount = new AtomicInteger(0); try (ThrottledRetryingRunner runner = runnerBuilder().build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { seenTransactions.add(tr); if (callCount.incrementAndGet() >= 3) { - quota.markExhausted(); + return exhausted(); } - return CompletableFuture.completedFuture(null); + return hasMore(); }).join(); } assertThat(seenTransactions).hasSize(3); - // All three must be distinct objects assertThat(seenTransactions.get(0)).isNotSameAs(seenTransactions.get(1)); assertThat(seenTransactions.get(1)).isNotSameAs(seenTransactions.get(2)); } // ------------------------------------------------------------------------- - // Rate limiting: elapsed time grows when throttling is applied + // Rate limiting // ------------------------------------------------------------------------- @Test void throttlingByScannedItemsSlowsItDown() { - // 3 transactions each processing 100 items at max 50/sec → each batch should take ≥2s - // Use a low count to keep the test fast: 3 transactions × 10 items at max 20/sec = ≥1.5s final int itemsPerTransaction = 10; final int maxPerSec = 20; final int transactions = 3; @@ -542,12 +518,12 @@ void throttlingByScannedItemsSlowsItDown() { try (ThrottledRetryingRunner runner = runnerBuilder() .withMaxItemsScannedPerSec(maxPerSec) .build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { quota.processedCountAdd(itemsPerTransaction); if (callCount.incrementAndGet() >= transactions) { - quota.markExhausted(); + return exhausted(); } - return CompletableFuture.completedFuture(null); + return hasMore(); }).join(); } long elapsed = System.currentTimeMillis() - start; @@ -568,12 +544,12 @@ void throttlingByDeletedItemsSlowsItDown() { try (ThrottledRetryingRunner runner = runnerBuilder() .withMaxItemsDeletedPerSec(maxPerSec) .build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { quota.deletedCountAdd(deletesPerTransaction); if (callCount.incrementAndGet() >= transactions) { - quota.markExhausted(); + return exhausted(); } - return CompletableFuture.completedFuture(null); + return hasMore(); }).join(); } long elapsed = System.currentTimeMillis() - start; @@ -584,17 +560,15 @@ void throttlingByDeletedItemsSlowsItDown() { @Test void noThrottlingWithZeroRateLimit() { - // With no rate limit configured the runner should complete nearly instantly - // (well under 1 second for trivial work). We just assert it completes normally. AtomicInteger callCount = new AtomicInteger(0); try (ThrottledRetryingRunner runner = runnerBuilder().build()) { - runner.iterateAll((tr, quota) -> { - quota.processedCountAdd(1000); // lots of items but no limit configured + runner.iterateAll((tr, quota, cont) -> { + quota.processedCountAdd(1000); if (callCount.incrementAndGet() >= 5) { - quota.markExhausted(); + return exhausted(); } - return CompletableFuture.completedFuture(null); + return hasMore(); }).join(); } @@ -602,23 +576,22 @@ void noThrottlingWithZeroRateLimit() { } // ------------------------------------------------------------------------- - // Real DB writes: data persisted across transactions within one iterateAll + // Real DB writes // ------------------------------------------------------------------------- @Test void dataWrittenInEachTransactionIsVisible() { - // Write one key per transaction, confirm all are present at the end final int numTransactions = 5; AtomicInteger callCount = new AtomicInteger(0); try (ThrottledRetryingRunner runner = runnerBuilder().build()) { - runner.iterateAll((tr, quota) -> { + runner.iterateAll((tr, quota, cont) -> { int n = callCount.incrementAndGet(); tr.set(subspace.pack(Tuple.from(n)), Tuple.from(n).pack()); if (n >= numTransactions) { - quota.markExhausted(); + return exhausted(); } - return CompletableFuture.completedFuture(null); + return hasMore(); }).join(); } @@ -631,11 +604,169 @@ void dataWrittenInEachTransactionIsVisible() { }); } + // ------------------------------------------------------------------------- + // Continuation passing + // ------------------------------------------------------------------------- + + @Test + void firstCallReceivesStartContinuation() { + AtomicReference observed = new AtomicReference<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + observed.set(cont); + return exhausted(); + }).join(); + } + + assertThat(observed.get()).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class); + assertThat(observed.get()).isSameAs(ThrottledRetryingRunner.StartContinuation.INSTANCE); + } + + @Test + void continuationFromSuccessIsPassedToNextCall() { + // Each transaction returns a custom continuation object; the next call should receive it. + List received = new ArrayList<>(); + AtomicInteger callCount = new AtomicInteger(0); + + // Distinct continuation objects for each transaction + ThrottledRetryingRunner.Continuation cont1 = () -> true; + ThrottledRetryingRunner.Continuation cont2 = () -> true; + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + received.add(cont); + int call = callCount.incrementAndGet(); + if (call == 1) { + return CompletableFuture.completedFuture(cont1); + } + if (call == 2) { + return CompletableFuture.completedFuture(cont2); + } + return exhausted(); + }).join(); + } + + // call 1: receives StartContinuation + assertThat(received.get(0)).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class); + // call 2: receives the continuation returned by call 1 + assertThat(received.get(1)).isSameAs(cont1); + // call 3: receives the continuation returned by call 2 + assertThat(received.get(2)).isSameAs(cont2); + } + + @Test + void continuationIsNotAdvancedOnRetry() { + // When a transaction fails, the retry should receive the same continuation as the failed + // attempt — not a new one — so that the task can resume from the same position. + List received = new ArrayList<>(); + AtomicInteger callCount = new AtomicInteger(0); + + ThrottledRetryingRunner.Continuation contAfterFirstSuccess = () -> true; + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(3) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + received.add(cont); + int call = callCount.incrementAndGet(); + + if (call == 1) { + // First success: return a custom continuation + return CompletableFuture.completedFuture(contAfterFirstSuccess); + } + if (call == 2 || call == 3) { + // Fail twice: continuation should NOT advance + return CompletableFuture.failedFuture(new RuntimeException("transient")); + } + // Final success + return exhausted(); + }).join(); + } + + // call 1: StartContinuation (first call) + assertThat(received.get(0)).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class); + // call 2: contAfterFirstSuccess (first call succeeded) + assertThat(received.get(1)).isSameAs(contAfterFirstSuccess); + // call 3: still contAfterFirstSuccess — call 2 failed, continuation not advanced + assertThat(received.get(2)).isSameAs(contAfterFirstSuccess); + // call 4: still contAfterFirstSuccess — call 3 failed too + assertThat(received.get(3)).isSameAs(contAfterFirstSuccess); + } + + @Test + void startContinuationReusedAcrossRetriesBeforeAnySuccess() { + // Before any successful transaction, all retries should receive StartContinuation. + List received = new ArrayList<>(); + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(3) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + received.add(cont); + if (callCount.incrementAndGet() <= 2) { + return CompletableFuture.failedFuture(new RuntimeException("transient")); + } + return exhausted(); + }).join(); + } + + // All three calls should receive StartContinuation since no success ever happened + assertThat(received).hasSize(3); + assertThat(received).allMatch(c -> c instanceof ThrottledRetryingRunner.StartContinuation); + } + + @Test + void continuationCarriesStateToNextTransaction() { + // A realistic scenario: the continuation carries a "cursor position" (last key seen). + // Each transaction processes one key and the next transaction picks up where it left off. + final int numKeys = 4; + + // Write test data + db.run(tr -> { + for (int i = 0; i < numKeys; i++) { + tr.set(subspace.pack(Tuple.from(i)), Tuple.from(i * 10).pack()); + } + return null; + }); + + // A continuation that carries the next index to read + class IndexContinuation implements ThrottledRetryingRunner.Continuation { + final int nextIndex; + IndexContinuation(int nextIndex) { this.nextIndex = nextIndex; } + @Override public boolean hasMore() { return nextIndex < numKeys; } + } + + List processedValues = new ArrayList<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + int idx = (cont instanceof IndexContinuation) ? ((IndexContinuation) cont).nextIndex : 0; + byte[] raw = tr.get(subspace.pack(Tuple.from(idx))).join(); + processedValues.add((int) Tuple.fromBytes(raw).getLong(0)); + return CompletableFuture.completedFuture(new IndexContinuation(idx + 1)); + }).join(); + } + + assertThat(processedValues).containsExactly(0, 10, 20, 30); + } + // ------------------------------------------------------------------------- // Helpers // ------------------------------------------------------------------------- + /** Returns a completed future with a continuation indicating there is more work to do. */ + private static CompletableFuture hasMore() { + return CompletableFuture.completedFuture(() -> true); + } + + /** Returns a completed future with a continuation indicating the source is exhausted. */ + private static CompletableFuture exhausted() { + return CompletableFuture.completedFuture(() -> false); + } + private ThrottledRetryingRunner.Builder runnerBuilder() { return ThrottledRetryingRunner.builder(db, scheduledExecutor); } -} +} \ No newline at end of file diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java index c0131f421e..7bc9058814 100644 --- a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java +++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java @@ -32,35 +32,33 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; +import java.util.concurrent.atomic.AtomicReference; /** * Runs a task repeatedly across multiple transactions, with adaptive limit management, optional * rate limiting, and retry logic. *

- * The caller supplies a {@link BiFunction} that receives a fresh {@link Transaction} and a - * {@link QuotaManager}, and returns a {@code CompletableFuture}. The task reads - * {@link QuotaManager#getLimit()} to know how much work to attempt in this transaction, reports - * how many items it processed via {@link QuotaManager#processedCountInc()} (and optionally - * {@link QuotaManager#deletedCountInc()}), and signals completion by calling - * {@link QuotaManager#markExhausted()}. + * The caller supplies a {@link Task} that receives a fresh {@link Transaction}, a + * {@link QuotaManager}, and the {@link Continuation} from the last successful + * transaction (or {@link StartContinuation} on the first call). The task returns a + * {@code CompletableFuture}; returning a continuation with + * {@link Continuation#hasMore()} {@code == false} stops the loop. *

*

- * The runner adjusts the limit across transactions: + * On failure the continuation is not advanced — the last successful continuation is + * re-passed to the retry, so the task can resume from the same position. This distinguishes a + * retry (same continuation) from a fresh transaction after success (new continuation). *

- *
    - *
  • After {@code increaseLimitAfter} consecutive successes the limit is increased.
  • - *
  • After any failure the limit is decreased based on how many items were processed before - * the failure, so the next attempt is less likely to exceed the transaction budget.
  • - *
*

- * A limit of {@code 0} is treated as "no limit" — the task may do as much work as it likes in - * each transaction and the limit is never adjusted. + * The {@link QuotaManager} is reset at the start of each transaction. The task uses it to report + * how many items were processed ({@link QuotaManager#processedCountInc()}) and deleted + * ({@link QuotaManager#deletedCountInc()}). These counts drive the between-transaction throttle + * delay when {@code maxItemsScannedPerSec} or {@code maxItemsDeletedPerSec} are configured, and + * inform the adaptive limit adjustment on failure. *

*

- * Between successful transactions the runner can optionally delay to enforce a maximum item - * processing or deletion rate ({@code maxItemsScannedPerSec} / {@code maxItemsDeletedPerSec}). - * On failure the transaction is retried up to {@code numOfRetries} times before failing. + * A limit of {@code 0} is treated as "no limit" — the task may do as much work as it likes in + * each transaction and the limit is never adjusted. *

*/ public class ThrottledRetryingRunner implements AutoCloseable { @@ -96,50 +94,60 @@ private ThrottledRetryingRunner(Builder builder) { } /** - * Run the given task repeatedly across multiple transactions until it calls - * {@link QuotaManager#markExhausted()}. + * Run the given task repeatedly across multiple transactions until it returns a + * {@link Continuation} with {@link Continuation#hasMore()} {@code == false}. + *

+ * On the very first call the task receives {@link StartContinuation#INSTANCE}. On each + * subsequent call after a successful commit it receives the continuation returned by the + * previous call. On a retry after failure it receives the same continuation that was passed + * to the failed attempt — i.e. the last successful continuation is preserved. + *

*

* A single {@link QuotaManager} is created for the lifetime of this call. Its per-transaction - * counts ({@link QuotaManager#getProcessedCount()}, {@link QuotaManager#getDeletedCount()}) - * are reset at the start of each transaction. Its limit ({@link QuotaManager#getLimit()}) is - * adjusted by the runner after each transaction and persists across them. + * counts are reset at the start of each transaction; its limit persists and is adjusted + * between transactions. *

* - * @param task the task to run; reads the limit, reports counts, calls - * {@link QuotaManager#markExhausted()} to stop - * @return a future that completes normally when the task exhausts the source, or exceptionally - * if the retry limit is exceeded or the runner is closed + * @param task the task to run + * @return a future that completes normally when the task returns a continuation with + * {@link Continuation#hasMore()} {@code == false}, or exceptionally if the retry + * limit is exceeded or the runner is closed */ @Nonnull - public CompletableFuture iterateAll( - @Nonnull BiFunction> task) { + public CompletableFuture iterateAll(@Nonnull Task task) { if (closed) { return CompletableFuture.failedFuture(new TransactionalRunner.RunnerClosed()); } final QuotaManager quotaManager = new QuotaManager(maxLimit); + final AtomicReference lastSuccessfulCont = + new AtomicReference<>(StartContinuation.INSTANCE); return AsyncUtil.whileTrue(() -> - runOneTransaction(task, quotaManager) - .handle((ignored, ex) -> { + runOneTransaction(task, quotaManager, lastSuccessfulCont.get()) + .handle((continuation, ex) -> { if (ex == null) { - return handleSuccess(quotaManager); + lastSuccessfulCont.set(continuation); + return handleSuccess(quotaManager, continuation); } return handleFailure(ex, quotaManager); }) .thenCompose(ret -> ret)); } - private CompletableFuture runOneTransaction( - @Nonnull BiFunction> task, - @Nonnull QuotaManager quotaManager) { + private CompletableFuture runOneTransaction( + @Nonnull Task task, + @Nonnull QuotaManager quotaManager, + @Nonnull Continuation continuation) { transactionStartTimeMillis = System.currentTimeMillis(); return transactionalRunner.runAsync(commitWhenDone, transaction -> { quotaManager.initTransaction(); - return task.apply(transaction, quotaManager); + return task.run(transaction, quotaManager, continuation); }); } - private CompletableFuture handleSuccess(QuotaManager quotaManager) { + private CompletableFuture handleSuccess( + @Nonnull QuotaManager quotaManager, + @Nonnull Continuation continuation) { failureRetriesCounter = 0; ++consecutiveSuccessCount; @@ -149,7 +157,7 @@ private CompletableFuture handleSuccess(QuotaManager quotaManager) { consecutiveSuccessCount = 0; } - if (!quotaManager.hasMore) { + if (!continuation.hasMore()) { return AsyncUtil.READY_FALSE; } @@ -166,7 +174,9 @@ private CompletableFuture handleSuccess(QuotaManager quotaManager) { return AsyncUtil.READY_TRUE; } - private CompletableFuture handleFailure(Throwable ex, QuotaManager quotaManager) { + private CompletableFuture handleFailure( + @Nonnull Throwable ex, + @Nonnull QuotaManager quotaManager) { ++failureRetriesCounter; consecutiveSuccessCount = 0; @@ -208,9 +218,80 @@ public void close() { transactionalRunner.close(); } + // ------------------------------------------------------------------------- + // Continuation + // ------------------------------------------------------------------------- + + /** + * Represents the result of one transaction's work and signals whether the runner should + * start another transaction. + *

+ * Implementations are free to carry any state needed to resume work at the right position + * in the next transaction. The runner itself only inspects {@link #hasMore()}. + *

+ */ + @FunctionalInterface + public interface Continuation { + /** + * Returns {@code true} if there is more work to do; {@code false} if the source has + * been exhausted and the loop should stop after committing the current transaction. + */ + boolean hasMore(); + } + + /** + * The continuation passed to the task on the very first transaction. + *

+ * Tasks can detect the first call with {@code continuation instanceof StartContinuation}. + *

+ */ + public static final class StartContinuation implements Continuation { + /** The singleton instance to pass on the first call. */ + public static final StartContinuation INSTANCE = new StartContinuation(); + + private StartContinuation() { + } + + @Override + public boolean hasMore() { + return true; + } + } + + // ------------------------------------------------------------------------- + // Task + // ------------------------------------------------------------------------- + /** - * Tracks per-transaction resource usage, exposes the adaptive limit, and controls whether - * the loop continues. + * A task that performs work inside a single transaction and returns a continuation. + */ + @FunctionalInterface + public interface Task { + /** + * Run one transaction's worth of work. + * + * @param transaction the transaction for this attempt + * @param quotaManager tracks work done this transaction and exposes the current limit + * @param continuation the continuation from the last successful transaction, + * or {@link StartContinuation#INSTANCE} on the very first call; + * on a retry this is the same continuation that was passed to the + * failed attempt + * @return a future containing the continuation describing where the next transaction + * should resume; return a continuation with {@link Continuation#hasMore()} + * {@code == false} to stop the loop + */ + @Nonnull + CompletableFuture run(@Nonnull Transaction transaction, + @Nonnull QuotaManager quotaManager, + @Nonnull Continuation continuation); + } + + // ------------------------------------------------------------------------- + // QuotaManager + // ------------------------------------------------------------------------- + + /** + * Tracks per-transaction resource usage and exposes the adaptive limit. *

* One instance is created per {@link #iterateAll} call. Per-transaction counts are reset at * the start of each transaction; the limit persists and is adjusted by the runner between @@ -218,14 +299,12 @@ public void close() { *

*

* The task should call {@link #getLimit()} at the start of each transaction to know how much - * work to attempt, increment the counts as it goes, and call {@link #markExhausted()} when - * the source is fully consumed. + * work to attempt, and increment the counts as it goes. *

*/ public static class QuotaManager { private int processedCount; private int deletedCount; - private boolean hasMore; private int limit; private final int maxLimit; @@ -291,18 +370,9 @@ public void deletedCountInc() { deletedCount++; } - /** - * Signal that the source is exhausted. The loop will stop after the current transaction - * commits, without starting a new one. - */ - public void markExhausted() { - hasMore = false; - } - void initTransaction() { processedCount = 0; deletedCount = 0; - hasMore = true; } void increaseLimit() { @@ -322,6 +392,10 @@ void decreaseLimit() { } } + // ------------------------------------------------------------------------- + // Builder + // ------------------------------------------------------------------------- + /** * Create a new builder for a {@link ThrottledRetryingRunner}. * @@ -396,9 +470,9 @@ public Builder withIncreaseLimitAfter(int increaseLimitAfter) { * Set the initial and maximum per-transaction limit passed to the task via * {@link QuotaManager#getLimit()}. *

- * The limit starts at this value. It will not be increased beyond {@code maxLimit} on - * success. If {@code maxLimit} is {@code 0} (the default) the limit feature is disabled: - * {@link QuotaManager#getLimit()} always returns {@code 0} and is never adjusted. + * The limit starts at this value and will not be increased beyond it. If {@code 0} + * (the default) the limit feature is disabled: {@link QuotaManager#getLimit()} always + * returns {@code 0} and is never adjusted. *

* * @param maxLimit the initial and maximum limit; {@code 0} disables limit management @@ -443,4 +517,4 @@ public ThrottledRetryingRunner build() { return new ThrottledRetryingRunner(this); } } -} +} \ No newline at end of file From de705ed90c70be4c8fbae2b320e320dfd779050d Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Thu, 5 Mar 2026 15:52:02 -0500 Subject: [PATCH 4/9] Test that retries/continuations behave correctly on conflicts --- .../test/ThrottledRetryingRunnerTest.java | 153 +++++++++++++++++- 1 file changed, 150 insertions(+), 3 deletions(-) diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java index 215f60944a..0dcbe8105b 100644 --- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java @@ -734,8 +734,15 @@ void continuationCarriesStateToNextTransaction() { // A continuation that carries the next index to read class IndexContinuation implements ThrottledRetryingRunner.Continuation { final int nextIndex; - IndexContinuation(int nextIndex) { this.nextIndex = nextIndex; } - @Override public boolean hasMore() { return nextIndex < numKeys; } + + IndexContinuation(int nextIndex) { + this.nextIndex = nextIndex; + } + + @Override + public boolean hasMore() { + return nextIndex < numKeys; + } } List processedValues = new ArrayList<>(); @@ -753,7 +760,147 @@ class IndexContinuation implements ThrottledRetryingRunner.Continuation { } // ------------------------------------------------------------------------- - // Helpers + // Real transaction conflict + // ------------------------------------------------------------------------- + + @Test + void retriesOnRealTransactionConflict() { + // Arrange a genuine FDB NOT_COMMITTED conflict without any threading: + // 1. Task (attempt 1) reads conflictKey → establishes a read-conflict range on tr + // 2. Task also writes to ownWriteKey → makes tr a write transaction so FDB actually + // checks read conflicts on commit (FDB silently skips conflict checking for + // read-only transactions) + // 3. Task opens a second transaction, writes to conflictKey, and commits it + // 4. TransactionalRunner then commits tr → FDB detects the conflict and rejects it + // 5. Runner retries; attempt 2 sees no conflict and succeeds + byte[] conflictKey = subspace.pack(Tuple.from("conflict-key")); + byte[] ownWriteKey = subspace.pack(Tuple.from("own-write")); + AtomicInteger attemptCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) { + runner.iterateAll((tr, quota, cont) -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt == 1) { + // Write to a separate key to make tr a write transaction. + tr.set(ownWriteKey, new byte[]{0}); + // Read conflictKey to add it to tr's read-conflict range, then commit a + // write to the same key in a separate transaction before tr commits. + return tr.get(conflictKey) + .thenCompose(ignored -> + db.runAsync(tr2 -> { + tr2.set(conflictKey, new byte[]{1}); + return CompletableFuture.completedFuture(null); + }) + ) + .thenCompose(ignored -> exhausted()); + } + // Attempt 2: no conflict + return exhausted(); + }).join(); + } + + assertThat(attemptCount.get()).isEqualTo(2); + } + + @Test + void conflictDoesNotAdvanceContinuation() { + // Write 6 data keys to process in batches of 2 across 3 transactions. + // + // Expected flow: + // Attempt 1 (idx=0): processes items 0,1 — commits → continuation advances to idx=2 + // Attempt 2 (idx=2): processes items 2,3 — CONFLICT → not committed + // Attempt 3 (idx=2): retry receives idx=2 (not idx=4!) — processes 2,3 again — commits + // Attempt 4 (idx=4): processes items 4,5 — commits → hasMore()=false, loop ends + // + // The key assertion is that attempt 3 starts at idx=2, not idx=4. If the continuation + // were incorrectly advanced on failure, items 2 and 3 would never be written to FDB. + final int numKeys = 6; + final int batchSize = 2; + + db.run(tr -> { + for (int i = 0; i < numKeys; i++) { + tr.set(subspace.pack(Tuple.from("data", i)), Tuple.from(i).pack()); + } + return null; + }); + + byte[] conflictKey = subspace.pack(Tuple.from("conflict-trigger")); + byte[] ownWriteKey = subspace.pack(Tuple.from("own-write")); + + AtomicInteger attemptCount = new AtomicInteger(0); + AtomicInteger retryStartIdx = new AtomicInteger(-1); // captured from attempt 3 + + class IndexContinuation implements ThrottledRetryingRunner.Continuation { + final int nextIndex; + + IndexContinuation(int nextIndex) { + this.nextIndex = nextIndex; + } + + @Override + public boolean hasMore() { + return nextIndex < numKeys; + } + } + + try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) { + runner.iterateAll((tr, quota, cont) -> { + int attempt = attemptCount.incrementAndGet(); + int startIdx = (cont instanceof IndexContinuation) + ? ((IndexContinuation) cont).nextIndex : 0; + int endIdx = Math.min(startIdx + batchSize, numKeys); + + if (attempt == 3) { + retryStartIdx.set(startIdx); + } + + // Write a "processed" marker for each item in this batch + for (int i = startIdx; i < endIdx; i++) { + tr.set(subspace.pack(Tuple.from("done", i)), Tuple.from(i).pack()); + } + + if (attempt == 2) { + // Inject a conflict on the second attempt: + // - write to ownWriteKey to make tr a write transaction (FDB only checks + // read conflicts when the transaction has writes) + // - read conflictKey to add it to tr's read-conflict range + // - commit a write to conflictKey in a separate transaction so that tr + // fails with NOT_COMMITTED + tr.set(ownWriteKey, new byte[]{0}); + return tr.get(conflictKey) + .thenCompose(ignored -> + db.runAsync(tr2 -> { + tr2.set(conflictKey, new byte[]{1}); + return CompletableFuture.completedFuture(null); + }) + ) + .thenCompose(ignored -> + CompletableFuture.completedFuture(new IndexContinuation(endIdx))); + } + + return CompletableFuture.completedFuture(new IndexContinuation(endIdx)); + }).join(); + } + + // Attempt 3 must have restarted at idx=2 (the last committed continuation), + // not at idx=4 (what would have been returned if the failed attempt had committed) + assertThat(retryStartIdx.get()).isEqualTo(2); + + // 4 attempts total: tx1(0-1) + tx2(2-3, conflict) + retry(2-3) + tx3(4-5) + assertThat(attemptCount.get()).isEqualTo(4); + + // All 6 items must be present in FDB exactly once — none skipped, none missing + db.run(tr -> { + for (int i = 0; i < numKeys; i++) { + byte[] val = tr.get(subspace.pack(Tuple.from("done", i))).join(); + assertThat(val).as("item %d should be committed", i) + .isEqualTo(Tuple.from(i).pack()); + } + return null; + }); + } + + // ------------------------------------------------------------------------- /** Returns a completed future with a continuation indicating there is more work to do. */ From 4b5788ec68ed896d8aafac4b0a1289d73c580e4b Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Fri, 6 Mar 2026 14:55:42 -0500 Subject: [PATCH 5/9] Cleanup tests a bit --- .../test/ThrottledRetryingRunnerTest.java | 106 +++++++++--------- 1 file changed, 50 insertions(+), 56 deletions(-) diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java index 0dcbe8105b..b13e8fb8a8 100644 --- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java @@ -731,20 +731,6 @@ void continuationCarriesStateToNextTransaction() { return null; }); - // A continuation that carries the next index to read - class IndexContinuation implements ThrottledRetryingRunner.Continuation { - final int nextIndex; - - IndexContinuation(int nextIndex) { - this.nextIndex = nextIndex; - } - - @Override - public boolean hasMore() { - return nextIndex < numKeys; - } - } - List processedValues = new ArrayList<>(); try (ThrottledRetryingRunner runner = runnerBuilder().build()) { @@ -752,7 +738,7 @@ public boolean hasMore() { int idx = (cont instanceof IndexContinuation) ? ((IndexContinuation) cont).nextIndex : 0; byte[] raw = tr.get(subspace.pack(Tuple.from(idx))).join(); processedValues.add((int) Tuple.fromBytes(raw).getLong(0)); - return CompletableFuture.completedFuture(new IndexContinuation(idx + 1)); + return CompletableFuture.completedFuture(new IndexContinuation(idx + 1, numKeys)); }).join(); } @@ -781,17 +767,7 @@ void retriesOnRealTransactionConflict() { runner.iterateAll((tr, quota, cont) -> { int attempt = attemptCount.incrementAndGet(); if (attempt == 1) { - // Write to a separate key to make tr a write transaction. - tr.set(ownWriteKey, new byte[]{0}); - // Read conflictKey to add it to tr's read-conflict range, then commit a - // write to the same key in a separate transaction before tr commits. - return tr.get(conflictKey) - .thenCompose(ignored -> - db.runAsync(tr2 -> { - tr2.set(conflictKey, new byte[]{1}); - return CompletableFuture.completedFuture(null); - }) - ) + return injectConflict(tr, conflictKey, ownWriteKey) .thenCompose(ignored -> exhausted()); } // Attempt 2: no conflict @@ -830,19 +806,6 @@ void conflictDoesNotAdvanceContinuation() { AtomicInteger attemptCount = new AtomicInteger(0); AtomicInteger retryStartIdx = new AtomicInteger(-1); // captured from attempt 3 - class IndexContinuation implements ThrottledRetryingRunner.Continuation { - final int nextIndex; - - IndexContinuation(int nextIndex) { - this.nextIndex = nextIndex; - } - - @Override - public boolean hasMore() { - return nextIndex < numKeys; - } - } - try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) { runner.iterateAll((tr, quota, cont) -> { int attempt = attemptCount.incrementAndGet(); @@ -860,25 +823,12 @@ public boolean hasMore() { } if (attempt == 2) { - // Inject a conflict on the second attempt: - // - write to ownWriteKey to make tr a write transaction (FDB only checks - // read conflicts when the transaction has writes) - // - read conflictKey to add it to tr's read-conflict range - // - commit a write to conflictKey in a separate transaction so that tr - // fails with NOT_COMMITTED - tr.set(ownWriteKey, new byte[]{0}); - return tr.get(conflictKey) - .thenCompose(ignored -> - db.runAsync(tr2 -> { - tr2.set(conflictKey, new byte[]{1}); - return CompletableFuture.completedFuture(null); - }) - ) + return injectConflict(tr, conflictKey, ownWriteKey) .thenCompose(ignored -> - CompletableFuture.completedFuture(new IndexContinuation(endIdx))); + CompletableFuture.completedFuture(new IndexContinuation(endIdx, numKeys))); } - return CompletableFuture.completedFuture(new IndexContinuation(endIdx)); + return CompletableFuture.completedFuture(new IndexContinuation(endIdx, numKeys)); }).join(); } @@ -913,7 +863,51 @@ private static CompletableFuture exhausted return CompletableFuture.completedFuture(() -> false); } + /** + * Injects a genuine FDB read-conflict into {@code tr} and returns a future that completes + * once the conflicting write has been committed. + *

+ * How it works: + *

    + *
  1. Writes to {@code ownWriteKey} so that {@code tr} is a write transaction — FDB only + * checks read-conflict ranges when the committing transaction has writes.
  2. + *
  3. Reads {@code conflictKey} to add it to {@code tr}'s read-conflict range.
  4. + *
  5. Opens a separate transaction, writes to {@code conflictKey}, and commits it, so that + * when {@code TransactionalRunner} later commits {@code tr} it receives + * {@code NOT_COMMITTED}.
  6. + *
+ *

+ */ + private CompletableFuture injectConflict(Transaction tr, byte[] conflictKey, byte[] ownWriteKey) { + tr.set(ownWriteKey, new byte[]{0}); + return tr.get(conflictKey) + .thenCompose(ignored -> db.runAsync(tr2 -> { + tr2.set(conflictKey, new byte[]{1}); + return CompletableFuture.completedFuture(null); + })); + } + private ThrottledRetryingRunner.Builder runnerBuilder() { return ThrottledRetryingRunner.builder(db, scheduledExecutor); } -} \ No newline at end of file + + /** + * A continuation that carries a numeric index as cursor position. + * {@link #hasMore()} returns {@code true} while {@link #nextIndex} is less than + * {@link #totalKeys}. + */ + private static class IndexContinuation implements ThrottledRetryingRunner.Continuation { + final int nextIndex; + private final int totalKeys; + + IndexContinuation(int nextIndex, int totalKeys) { + this.nextIndex = nextIndex; + this.totalKeys = totalKeys; + } + + @Override + public boolean hasMore() { + return nextIndex < totalKeys; + } + } +} From cf2694a66e107d3958b8428dd22f29a4ccce01e0 Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Fri, 6 Mar 2026 16:46:12 -0500 Subject: [PATCH 6/9] Cleanup the tests a bit more --- .../test/ThrottledRetryingRunnerTest.java | 137 ++++++++---------- 1 file changed, 64 insertions(+), 73 deletions(-) diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java index b13e8fb8a8..354980af4a 100644 --- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java @@ -43,6 +43,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.UnaryOperator; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -100,12 +102,12 @@ void testThrottlePerSecDelayMillis(long elapsedMs, int maxPerSec, int eventCount @Test void testIncreaseLimitFormula() { // limit=0 is no-limit mode; increaseLimit is a no-op - ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(0); + ThrottledRetryingRunner.QuotaManager qm = quotaManager(0); qm.increaseLimit(); assertThat(qm.getLimit()).isEqualTo(0); // With a real maxLimit, decreaseLimit reads processedCount directly (before initTransaction) - ThrottledRetryingRunner.QuotaManager qm2 = new ThrottledRetryingRunner.QuotaManager(1000); + ThrottledRetryingRunner.QuotaManager qm2 = quotaManager(1000); qm2.processedCountAdd(80); qm2.decreaseLimit(); // limit = max(1, 80*9/10) = 72 assertThat(qm2.getLimit()).isEqualTo(72); @@ -115,7 +117,7 @@ void testIncreaseLimitFormula() { @Test void testDecreaseLimitFormula() { - ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(1000); + ThrottledRetryingRunner.QuotaManager qm = quotaManager(1000); // Process 100 items, then fail → limit = max(1, 100*9/10) = 90 qm.processedCountAdd(100); qm.decreaseLimit(); @@ -141,7 +143,7 @@ void testDecreaseLimitFormula() { @Test void testDecreaseLimitNoOpWhenLimitIsZero() { - ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(0); + ThrottledRetryingRunner.QuotaManager qm = quotaManager(0); qm.processedCountAdd(100); qm.initTransaction(); qm.decreaseLimit(); @@ -150,7 +152,7 @@ void testDecreaseLimitNoOpWhenLimitIsZero() { @Test void testLimitCappedAtMaxLimit() { - ThrottledRetryingRunner.QuotaManager qm = new ThrottledRetryingRunner.QuotaManager(50); + ThrottledRetryingRunner.QuotaManager qm = quotaManager(50); // Drive limit down by simulating a failure with 40 in-flight items qm.processedCountAdd(40); qm.decreaseLimit(); // limit = max(1, 40*9/10) = 36 @@ -251,11 +253,11 @@ void retryCounterResetsAfterSuccess() { int call = totalCalls.incrementAndGet(); // Fail on calls 1 and 2 if (call == 1 || call == 2) { - return CompletableFuture.failedFuture(new RuntimeException("transient")); + return fail(); } // Succeed on call 3 (resets counter), then fail on 4 and 5, succeed on 6, done if (call == 4 || call == 5) { - return CompletableFuture.failedFuture(new RuntimeException("transient")); + return fail(); } successCount.incrementAndGet(); if (successCount.get() == 2) { @@ -333,12 +335,13 @@ void processedAndDeletedCountsAreIndependent() { // Limit: adaptive adjustment // ------------------------------------------------------------------------- - @Test - void limitStartsAtMaxLimitAndIsExposedToTask() { + @ParameterizedTest + @CsvSource({"0, 0", "50, 50"}) + void limitStartsAtConfiguredMaxLimit(int maxLimit, int expectedLimit) { AtomicInteger observedLimit = new AtomicInteger(-1); try (ThrottledRetryingRunner runner = runnerBuilder() - .withMaxLimit(50) + .withMaxLimit(maxLimit) .build()) { runner.iterateAll((tr, quota, cont) -> { observedLimit.set(quota.getLimit()); @@ -346,21 +349,7 @@ void limitStartsAtMaxLimitAndIsExposedToTask() { }).join(); } - assertThat(observedLimit.get()).isEqualTo(50); - } - - @Test - void limitIsZeroWhenMaxLimitNotConfigured() { - AtomicInteger observedLimit = new AtomicInteger(-1); - - try (ThrottledRetryingRunner runner = runnerBuilder().build()) { - runner.iterateAll((tr, quota, cont) -> { - observedLimit.set(quota.getLimit()); - return exhausted(); - }).join(); - } - - assertThat(observedLimit.get()).isEqualTo(0); + assertThat(observedLimit.get()).isEqualTo(expectedLimit); } @Test @@ -380,7 +369,7 @@ void limitDecreasesOnFailureThenIncreasesOnSuccess() { if (call == 1) { // Process 100 items and fail → next limit = 90 quota.processedCountAdd(100); - return CompletableFuture.failedFuture(new RuntimeException("fail")); + return fail(); } if (call <= 3) { // Two successes → limit increases from 90 @@ -415,7 +404,7 @@ void limitDecreasesProgressivelyTowardOne() { observedLimits.add(quota.getLimit()); if (call <= 10) { quota.processedCountInc(); // 1 item processed → limit floor at 1 - return CompletableFuture.failedFuture(new RuntimeException("fail")); + return fail(); } return exhausted(); }).join(); @@ -443,7 +432,7 @@ void limitIsNeverAdjustedWhenMaxLimitIsZero() { observedLimits.add(quota.getLimit()); quota.processedCountAdd(100); if (call <= 3) { - return CompletableFuture.failedFuture(new RuntimeException("fail")); + return fail(); } return exhausted(); }).join(); @@ -509,53 +498,16 @@ void eachIterationReceivesADistinctTransaction() { @Test void throttlingByScannedItemsSlowsItDown() { - final int itemsPerTransaction = 10; - final int maxPerSec = 20; - final int transactions = 3; - AtomicInteger callCount = new AtomicInteger(0); - - long start = System.currentTimeMillis(); - try (ThrottledRetryingRunner runner = runnerBuilder() - .withMaxItemsScannedPerSec(maxPerSec) - .build()) { - runner.iterateAll((tr, quota, cont) -> { - quota.processedCountAdd(itemsPerTransaction); - if (callCount.incrementAndGet() >= transactions) { - return exhausted(); - } - return hasMore(); - }).join(); - } - long elapsed = System.currentTimeMillis() - start; - - // totalItems / maxPerSec = (3*10)/20 = 1.5s minimum - long expectedMinMs = TimeUnit.SECONDS.toMillis(itemsPerTransaction * transactions / maxPerSec); - assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs); + assertThrottledByItemsPerSec( + b -> b.withMaxItemsScannedPerSec(20), + ThrottledRetryingRunner.QuotaManager::processedCountAdd); } @Test void throttlingByDeletedItemsSlowsItDown() { - final int deletesPerTransaction = 10; - final int maxPerSec = 20; - final int transactions = 3; - AtomicInteger callCount = new AtomicInteger(0); - - long start = System.currentTimeMillis(); - try (ThrottledRetryingRunner runner = runnerBuilder() - .withMaxItemsDeletedPerSec(maxPerSec) - .build()) { - runner.iterateAll((tr, quota, cont) -> { - quota.deletedCountAdd(deletesPerTransaction); - if (callCount.incrementAndGet() >= transactions) { - return exhausted(); - } - return hasMore(); - }).join(); - } - long elapsed = System.currentTimeMillis() - start; - - long expectedMinMs = TimeUnit.SECONDS.toMillis(deletesPerTransaction * transactions / maxPerSec); - assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs); + assertThrottledByItemsPerSec( + b -> b.withMaxItemsDeletedPerSec(20), + ThrottledRetryingRunner.QuotaManager::deletedCountAdd); } @Test @@ -677,7 +629,7 @@ void continuationIsNotAdvancedOnRetry() { } if (call == 2 || call == 3) { // Fail twice: continuation should NOT advance - return CompletableFuture.failedFuture(new RuntimeException("transient")); + return fail(); } // Final success return exhausted(); @@ -706,7 +658,7 @@ void startContinuationReusedAcrossRetriesBeforeAnySuccess() { runner.iterateAll((tr, quota, cont) -> { received.add(cont); if (callCount.incrementAndGet() <= 2) { - return CompletableFuture.failedFuture(new RuntimeException("transient")); + return fail(); } return exhausted(); }).join(); @@ -863,6 +815,45 @@ private static CompletableFuture exhausted return CompletableFuture.completedFuture(() -> false); } + /** Returns a pre-failed future simulating a transient task failure. */ + private static CompletableFuture fail() { + return CompletableFuture.failedFuture(new RuntimeException("transient")); + } + + /** Convenience factory for a {@link ThrottledRetryingRunner.QuotaManager}. */ + private static ThrottledRetryingRunner.QuotaManager quotaManager(int maxLimit) { + return new ThrottledRetryingRunner.QuotaManager(maxLimit); + } + + /** + * Asserts that the runner throttles correctly when {@code reportItems} is used to count + * items and {@code configure} wires up the corresponding rate-limiter on the builder. + */ + private void assertThrottledByItemsPerSec( + UnaryOperator configure, + BiConsumer reportItems) { + final int itemsPerTransaction = 10; + final int maxPerSec = 20; + final int transactions = 3; + AtomicInteger callCount = new AtomicInteger(0); + + long start = System.currentTimeMillis(); + try (ThrottledRetryingRunner runner = configure.apply(runnerBuilder()).build()) { + runner.iterateAll((tr, quota, cont) -> { + reportItems.accept(quota, itemsPerTransaction); + if (callCount.incrementAndGet() >= transactions) { + return exhausted(); + } + return hasMore(); + }).join(); + } + long elapsed = System.currentTimeMillis() - start; + + // totalItems / maxPerSec = (3 * 10) / 20 = 1.5s minimum + long expectedMinMs = TimeUnit.SECONDS.toMillis((long) itemsPerTransaction * transactions / maxPerSec); + assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs); + } + /** * Injects a genuine FDB read-conflict into {@code tr} and returns a future that completes * once the conflicting write has been committed. From 97cd26378a059ee2f1a72e1d99ab68e24c76c4f4 Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Mon, 16 Mar 2026 13:52:48 -0400 Subject: [PATCH 7/9] Have the QuotaManager tell the task whether to continue Otherwise the other limits don't get applied --- .../test/ThrottledRetryingRunnerTest.java | 51 +++++++++++++++++++ .../test/ThrottledRetryingRunner.java | 31 ++++++++++- 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java index 354980af4a..e26ba64afd 100644 --- a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java @@ -163,6 +163,57 @@ void testLimitCappedAtMaxLimit() { assertThat(qm.getLimit()).isEqualTo(50); // capped at maxLimit } + // ------------------------------------------------------------------------- + // QuotaManager shouldContinue + // ------------------------------------------------------------------------- + + @Test + void shouldContinueTrueWhenUnderLimit() { + ThrottledRetryingRunner.QuotaManager qm = quotaManager(10); + qm.initTransaction(); + qm.processedCountAdd(5); + assertThat(qm.shouldContinue()).isTrue(); + } + + @Test + void shouldContinueFalseWhenProcessedCountExceedsLimit() { + ThrottledRetryingRunner.QuotaManager qm = quotaManager(10); + qm.initTransaction(); + qm.processedCountAdd(11); // 11 > 10 + assertThat(qm.shouldContinue()).isFalse(); + } + + @Test + void shouldContinueTrueWhenProcessedCountEqualsLimit() { + // > not >=: at exactly the limit, shouldContinue still returns true + ThrottledRetryingRunner.QuotaManager qm = quotaManager(10); + qm.initTransaction(); + qm.processedCountAdd(10); // 10 > 10 is false + assertThat(qm.shouldContinue()).isTrue(); + } + + @Test + void shouldContinueTrueWhenNoLimitRegardlessOfCount() { + // limit=0 means no limit; the count is never checked + ThrottledRetryingRunner.QuotaManager qm = quotaManager(0); + qm.initTransaction(); + qm.processedCountAdd(Integer.MAX_VALUE); + assertThat(qm.shouldContinue()).isTrue(); + } + + @Test + void shouldContinueResetsAfterInitTransaction() { + // After a transaction fails (count exceeds limit), initTransaction should reset + // the state so shouldContinue returns true again at the start of the retry. + ThrottledRetryingRunner.QuotaManager qm = quotaManager(5); + qm.initTransaction(); + qm.processedCountAdd(6); // 6 > 5 → shouldContinue false + assertThat(qm.shouldContinue()).isFalse(); + + qm.initTransaction(); // reset for the next transaction + assertThat(qm.shouldContinue()).isTrue(); + } + // ------------------------------------------------------------------------- // Basic iteration: task runs the expected number of times // ------------------------------------------------------------------------- diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java index 7bc9058814..15b22ae952 100644 --- a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java +++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java @@ -298,15 +298,21 @@ CompletableFuture run(@Nonnull Transaction transaction, * transactions. *

*

- * The task should call {@link #getLimit()} at the start of each transaction to know how much - * work to attempt, and increment the counts as it goes. + * The task should call {@link #shouldContinue()} between processing items to decide whether + * to stop early. {@link #shouldContinue()} returns {@code false} once the processed count + * exceeds the active limit or the transaction has been running for longer than + * {@link #MAX_TRANSACTION_DURATION_MILLIS}. *

*/ public static class QuotaManager { + /** Maximum time a single transaction should run before {@link #shouldContinue()} returns {@code false}. */ + static final long MAX_TRANSACTION_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(4); + private int processedCount; private int deletedCount; private int limit; private final int maxLimit; + private long transactionStartTimeMillis; QuotaManager(int maxLimit) { this.maxLimit = maxLimit; @@ -370,9 +376,30 @@ public void deletedCountInc() { deletedCount++; } + /** + * Returns {@code true} if the task should continue processing items in this transaction, + * {@code false} if it should stop and return a continuation. + *

+ * Returns {@code false} when either: + *

    + *
  • a limit is active ({@link #getLimit()} {@code > 0}) and the processed count + * exceeds it, or
  • + *
  • the transaction has been running for longer than + * {@link #MAX_TRANSACTION_DURATION_MILLIS}.
  • + *
+ *

+ */ + public boolean shouldContinue() { + if (limit > 0 && processedCount > limit) { + return false; + } + return System.currentTimeMillis() - transactionStartTimeMillis < MAX_TRANSACTION_DURATION_MILLIS; + } + void initTransaction() { processedCount = 0; deletedCount = 0; + transactionStartTimeMillis = System.currentTimeMillis(); } void increaseLimit() { From 0d21b0fa6fe9dd855daffc62de4f9844b721846e Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Mon, 16 Mar 2026 14:41:38 -0400 Subject: [PATCH 8/9] Switch insertSIFTSmall to use ThrottledRunner --- .../foundationdb/async/hnsw/TestHelpers.java | 152 +++++++++++------- 1 file changed, 94 insertions(+), 58 deletions(-) diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java index 8b933523d0..8ebe93c3b4 100644 --- a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java @@ -31,11 +31,11 @@ import com.apple.foundationdb.linear.StoredVecsIterator; import com.apple.foundationdb.linear.Transformed; import com.apple.foundationdb.subspace.Subspace; +import com.apple.foundationdb.test.ThrottledRetryingRunner; import com.apple.foundationdb.tuple.Tuple; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.junit.jupiter.api.extension.AfterTestExecutionCallback; @@ -68,6 +68,8 @@ import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -97,6 +99,35 @@ static void dumpQueryResults(@Nonnull final Path tempDir, @Nonnull final String } } + @Nonnull + static CompletableFuture> basicInsertBatch( + @Nonnull final Transaction tr, + @Nonnull final HNSW hnsw, + final int batchSize, + final long firstId, + @Nonnull final BiFunction insertFunction) { + logger.info("Inserting batch [" + firstId + ", " + (firstId + batchSize) + ")"); + final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener(); + onWriteListener.reset(); + final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener(); + onReadListener.reset(); + + final ImmutableList.Builder data = ImmutableList.builder(); + + // In theory this could put all the futures in a List and run the inserts concurrently, but for a `basicInsertBatch` + // it's probably better to not test the concurrent handling of hnsw, even if it makes the tests slower. + CompletableFuture future = CompletableFuture.completedFuture(null); + for (int i = 0; i < batchSize; i ++) { + final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i); + if (record == null) { + break; + } + data.add(record); + future = future.thenCompose((vignore) -> hnsw.insert(tr, record.getPrimaryKey(), record.getVector())); + } + return future.thenApply(vignore -> data.build()); + } + @Nonnull static List basicInsertBatch(@Nonnull final Database db, @Nonnull final HNSW hnsw, @@ -104,72 +135,77 @@ static List basicInsertBatch(@Nonnull final Database db, final long firstId, @Nonnull final BiFunction insertFunction) throws ExecutionException, InterruptedException, TimeoutException { - return db.runAsync(tr -> { - final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener(); - onWriteListener.reset(); - final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener(); - onReadListener.reset(); - - final ImmutableList.Builder data = ImmutableList.builder(); - - // In theory this could put all the futures in a List and run the inserts concurrently, but for a `basicInsertBatch` - // it's probably better to not test the concurrent handling of hnsw, even if it makes the tests slower. - CompletableFuture future = CompletableFuture.completedFuture(null); - final long beginTs = System.nanoTime(); - for (int i = 0; i < batchSize; i ++) { - final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i); - if (record == null) { - break; - } - data.add(record); - future = future.thenCompose((vignore) -> hnsw.insert(tr, record.getPrimaryKey(), record.getVector())); - } - return future.thenApply(vignore -> data.build()) - .whenComplete((result, error) -> { - if (error != null) { - logger.info("Failed to insert batchSize={}", error); - } else { - final long endTs = System.nanoTime(); - logger.info("inserted batchSize={} records={} starting at nodeId={} took elapsedTime={}ms, readCounts={}, readBytes={}", - batchSize, result.size(), firstId, TimeUnit.NANOSECONDS.toMillis(endTs - beginTs), - onReadListener.getNodeCountByLayer(), onReadListener.getBytesReadByLayer()); - } - }); - }).get(2, TimeUnit.MINUTES); // set a timeout for inserting a single batch including retries so setup won't run forever + return db.runAsync( + tr -> { + final long beginTs = System.nanoTime(); + return basicInsertBatch(tr, hnsw, batchSize, firstId, insertFunction) + .whenComplete((result, error) -> { + if (error != null) { + logger.info("Failed to insert batchSize={}", error); + } else { + final long endTs = System.nanoTime(); + final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener(); + logger.info("inserted batchSize={} records={} starting at nodeId={} took elapsedTime={}ms, readCounts={}, readBytes={}", + batchSize, result.size(), firstId, TimeUnit.NANOSECONDS.toMillis(endTs - beginTs), + onReadListener.getNodeCountByLayer(), onReadListener.getBytesReadByLayer()); + } + }); + }) + .get(2, TimeUnit.MINUTES); // set a timeout for inserting a single batch including retries so setup won't run forever } static List insertSIFTSmall(@Nonnull final Database db, @Nonnull final HNSW hnsw) throws Exception { final Path siftSmallPath = Paths.get(".out/extracted/siftsmall/siftsmall_base.fvecs"); - final ImmutableList.Builder insertedDataBuilder = ImmutableList.builder(); - + // Load all vectors upfront so the task can index into them by position using the adaptive limit. + final List allVectors; try (final var fileChannel = FileChannel.open(siftSmallPath, StandardOpenOption.READ)) { - final Iterator vectorIterator = new StoredVecsIterator.StoredFVecsIterator(fileChannel); - - final int batchSize = 50; - int i = 0; - while (vectorIterator.hasNext()) { - final List batch = - Lists.newArrayList(Iterators.limit(vectorIterator, batchSize)); - final long currentBatchStart = i; - final List insertedInBatch = - basicInsertBatch(db, hnsw, batchSize, i, - (tr, nextId) -> { - final int indexInBatch = Math.toIntExact(nextId - currentBatchStart); - if (indexInBatch >= batch.size()) { - return null; - } - final Tuple currentPrimaryKey = createPrimaryKey(nextId); - final DoubleRealVector doubleVector = batch.get(indexInBatch); - return new PrimaryKeyAndVector(currentPrimaryKey, doubleVector); - }); - insertedDataBuilder.addAll(insertedInBatch); - i += insertedInBatch.size(); + allVectors = Lists.newArrayList(new StoredVecsIterator.StoredFVecsIterator(fileChannel)); + } + + // A continuation that carries the next vector index to insert. + class SiftContinuation implements ThrottledRetryingRunner.Continuation { + final int nextIndex; + + SiftContinuation(int nextIndex) { + this.nextIndex = nextIndex; + } + + @Override + public boolean hasMore() { + return nextIndex < allVectors.size(); } - assertThat(i).isEqualTo(10000); } - return insertedDataBuilder.build(); + + final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + try (ThrottledRetryingRunner runner = ThrottledRetryingRunner.builder(db, executor) + .withMaxLimit(500) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + final int startIdx = (cont instanceof SiftContinuation) + ? ((SiftContinuation) cont).nextIndex : 0; + return basicInsertBatch(tr, hnsw, quota.getLimit(), startIdx, + (ignoredTr, nextId) -> { + final int idx = Math.toIntExact(nextId); + if (idx < allVectors.size() && quota.shouldContinue()) { + quota.processedCountInc(); + return new PrimaryKeyAndVector(createPrimaryKey(idx), allVectors.get(idx)); + } + return null; + }) + .thenApply(list -> new SiftContinuation(startIdx + list.size())); + }).join(); + } finally { + executor.shutdown(); + } + + assertThat(allVectors).hasSize(10000); + final ImmutableList.Builder result = ImmutableList.builder(); + for (int i = 0; i < allVectors.size(); i++) { + result.add(new PrimaryKeyAndVector(createPrimaryKey(i), allVectors.get(i))); + } + return result.build(); } static void validateSIFTSmall(@Nonnull final Database db, From b5ef0162149e5b9ed3787b0c3e0f59f5b9d59682 Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Mon, 16 Mar 2026 15:23:55 -0400 Subject: [PATCH 9/9] Fix loop's handling of futures --- .../foundationdb/async/hnsw/TestHelpers.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java index 8ebe93c3b4..bde87879f0 100644 --- a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java @@ -22,6 +22,7 @@ import com.apple.foundationdb.Database; import com.apple.foundationdb.Transaction; +import com.apple.foundationdb.async.AsyncUtil; import com.apple.foundationdb.linear.AffineOperator; import com.apple.foundationdb.linear.DoubleRealVector; import com.apple.foundationdb.linear.HalfRealVector; @@ -72,6 +73,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -106,26 +108,31 @@ static CompletableFuture> basicInsertBatch( final int batchSize, final long firstId, @Nonnull final BiFunction insertFunction) { - logger.info("Inserting batch [" + firstId + ", " + (firstId + batchSize) + ")"); + logger.info("Inserting batch starting at " + firstId); final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener(); onWriteListener.reset(); final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener(); onReadListener.reset(); final ImmutableList.Builder data = ImmutableList.builder(); - - // In theory this could put all the futures in a List and run the inserts concurrently, but for a `basicInsertBatch` - // it's probably better to not test the concurrent handling of hnsw, even if it makes the tests slower. - CompletableFuture future = CompletableFuture.completedFuture(null); - for (int i = 0; i < batchSize; i ++) { + // Call insertFunction lazily between async inserts so that shouldContinue() sees the + // actual elapsed time after each insert rather than evaluating all checks synchronously + // before any async work begins. + final AtomicInteger insertCount = new AtomicInteger(0); + return AsyncUtil.whileTrue(() -> { + final int i = insertCount.get(); + if (i >= batchSize) { + return AsyncUtil.READY_FALSE; + } final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i); if (record == null) { - break; + return AsyncUtil.READY_FALSE; } data.add(record); - future = future.thenCompose((vignore) -> hnsw.insert(tr, record.getPrimaryKey(), record.getVector())); - } - return future.thenApply(vignore -> data.build()); + insertCount.incrementAndGet(); + return hnsw.insert(tr, record.getPrimaryKey(), record.getVector()) + .thenApply(ignored -> Boolean.TRUE); + }).>thenApply(ignored -> data.build()); } @Nonnull