diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java index 8b933523d0..bde87879f0 100644 --- a/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/TestHelpers.java @@ -22,6 +22,7 @@ import com.apple.foundationdb.Database; import com.apple.foundationdb.Transaction; +import com.apple.foundationdb.async.AsyncUtil; import com.apple.foundationdb.linear.AffineOperator; import com.apple.foundationdb.linear.DoubleRealVector; import com.apple.foundationdb.linear.HalfRealVector; @@ -31,11 +32,11 @@ import com.apple.foundationdb.linear.StoredVecsIterator; import com.apple.foundationdb.linear.Transformed; import com.apple.foundationdb.subspace.Subspace; +import com.apple.foundationdb.test.ThrottledRetryingRunner; import com.apple.foundationdb.tuple.Tuple; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.junit.jupiter.api.extension.AfterTestExecutionCallback; @@ -68,8 +69,11 @@ import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -97,6 +101,40 @@ static void dumpQueryResults(@Nonnull final Path tempDir, @Nonnull final String } } + @Nonnull + static CompletableFuture> basicInsertBatch( + @Nonnull final Transaction tr, + @Nonnull final HNSW hnsw, + final int batchSize, + final long firstId, + @Nonnull final BiFunction insertFunction) { + logger.info("Inserting batch starting at " + firstId); + final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener(); + onWriteListener.reset(); + final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener(); + onReadListener.reset(); + + final ImmutableList.Builder data = ImmutableList.builder(); + // Call insertFunction lazily between async inserts so that shouldContinue() sees the + // actual elapsed time after each insert rather than evaluating all checks synchronously + // before any async work begins. + final AtomicInteger insertCount = new AtomicInteger(0); + return AsyncUtil.whileTrue(() -> { + final int i = insertCount.get(); + if (i >= batchSize) { + return AsyncUtil.READY_FALSE; + } + final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i); + if (record == null) { + return AsyncUtil.READY_FALSE; + } + data.add(record); + insertCount.incrementAndGet(); + return hnsw.insert(tr, record.getPrimaryKey(), record.getVector()) + .thenApply(ignored -> Boolean.TRUE); + }).>thenApply(ignored -> data.build()); + } + @Nonnull static List basicInsertBatch(@Nonnull final Database db, @Nonnull final HNSW hnsw, @@ -104,72 +142,77 @@ static List basicInsertBatch(@Nonnull final Database db, final long firstId, @Nonnull final BiFunction insertFunction) throws ExecutionException, InterruptedException, TimeoutException { - return db.runAsync(tr -> { - final TestOnWriteListener onWriteListener = (TestOnWriteListener)hnsw.getOnWriteListener(); - onWriteListener.reset(); - final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener(); - onReadListener.reset(); - - final ImmutableList.Builder data = ImmutableList.builder(); - - // In theory this could put all the futures in a List and run the inserts concurrently, but for a `basicInsertBatch` - // it's probably better to not test the concurrent handling of hnsw, even if it makes the tests slower. - CompletableFuture future = CompletableFuture.completedFuture(null); - final long beginTs = System.nanoTime(); - for (int i = 0; i < batchSize; i ++) { - final PrimaryKeyAndVector record = insertFunction.apply(tr, firstId + i); - if (record == null) { - break; - } - data.add(record); - future = future.thenCompose((vignore) -> hnsw.insert(tr, record.getPrimaryKey(), record.getVector())); - } - return future.thenApply(vignore -> data.build()) - .whenComplete((result, error) -> { - if (error != null) { - logger.info("Failed to insert batchSize={}", error); - } else { - final long endTs = System.nanoTime(); - logger.info("inserted batchSize={} records={} starting at nodeId={} took elapsedTime={}ms, readCounts={}, readBytes={}", - batchSize, result.size(), firstId, TimeUnit.NANOSECONDS.toMillis(endTs - beginTs), - onReadListener.getNodeCountByLayer(), onReadListener.getBytesReadByLayer()); - } - }); - }).get(2, TimeUnit.MINUTES); // set a timeout for inserting a single batch including retries so setup won't run forever + return db.runAsync( + tr -> { + final long beginTs = System.nanoTime(); + return basicInsertBatch(tr, hnsw, batchSize, firstId, insertFunction) + .whenComplete((result, error) -> { + if (error != null) { + logger.info("Failed to insert batchSize={}", error); + } else { + final long endTs = System.nanoTime(); + final TestOnReadListener onReadListener = (TestOnReadListener)hnsw.getOnReadListener(); + logger.info("inserted batchSize={} records={} starting at nodeId={} took elapsedTime={}ms, readCounts={}, readBytes={}", + batchSize, result.size(), firstId, TimeUnit.NANOSECONDS.toMillis(endTs - beginTs), + onReadListener.getNodeCountByLayer(), onReadListener.getBytesReadByLayer()); + } + }); + }) + .get(2, TimeUnit.MINUTES); // set a timeout for inserting a single batch including retries so setup won't run forever } static List insertSIFTSmall(@Nonnull final Database db, @Nonnull final HNSW hnsw) throws Exception { final Path siftSmallPath = Paths.get(".out/extracted/siftsmall/siftsmall_base.fvecs"); - final ImmutableList.Builder insertedDataBuilder = ImmutableList.builder(); - + // Load all vectors upfront so the task can index into them by position using the adaptive limit. + final List allVectors; try (final var fileChannel = FileChannel.open(siftSmallPath, StandardOpenOption.READ)) { - final Iterator vectorIterator = new StoredVecsIterator.StoredFVecsIterator(fileChannel); - - final int batchSize = 50; - int i = 0; - while (vectorIterator.hasNext()) { - final List batch = - Lists.newArrayList(Iterators.limit(vectorIterator, batchSize)); - final long currentBatchStart = i; - final List insertedInBatch = - basicInsertBatch(db, hnsw, batchSize, i, - (tr, nextId) -> { - final int indexInBatch = Math.toIntExact(nextId - currentBatchStart); - if (indexInBatch >= batch.size()) { - return null; - } - final Tuple currentPrimaryKey = createPrimaryKey(nextId); - final DoubleRealVector doubleVector = batch.get(indexInBatch); - return new PrimaryKeyAndVector(currentPrimaryKey, doubleVector); - }); - insertedDataBuilder.addAll(insertedInBatch); - i += insertedInBatch.size(); + allVectors = Lists.newArrayList(new StoredVecsIterator.StoredFVecsIterator(fileChannel)); + } + + // A continuation that carries the next vector index to insert. + class SiftContinuation implements ThrottledRetryingRunner.Continuation { + final int nextIndex; + + SiftContinuation(int nextIndex) { + this.nextIndex = nextIndex; + } + + @Override + public boolean hasMore() { + return nextIndex < allVectors.size(); } - assertThat(i).isEqualTo(10000); } - return insertedDataBuilder.build(); + + final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + try (ThrottledRetryingRunner runner = ThrottledRetryingRunner.builder(db, executor) + .withMaxLimit(500) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + final int startIdx = (cont instanceof SiftContinuation) + ? ((SiftContinuation) cont).nextIndex : 0; + return basicInsertBatch(tr, hnsw, quota.getLimit(), startIdx, + (ignoredTr, nextId) -> { + final int idx = Math.toIntExact(nextId); + if (idx < allVectors.size() && quota.shouldContinue()) { + quota.processedCountInc(); + return new PrimaryKeyAndVector(createPrimaryKey(idx), allVectors.get(idx)); + } + return null; + }) + .thenApply(list -> new SiftContinuation(startIdx + list.size())); + }).join(); + } finally { + executor.shutdown(); + } + + assertThat(allVectors).hasSize(10000); + final ImmutableList.Builder result = ImmutableList.builder(); + for (int i = 0; i < allVectors.size(); i++) { + result.add(new PrimaryKeyAndVector(createPrimaryKey(i), allVectors.get(i))); + } + return result.build(); } static void validateSIFTSmall(@Nonnull final Database db, diff --git a/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java new file mode 100644 index 0000000000..e26ba64afd --- /dev/null +++ b/fdb-extensions/src/test/java/com/apple/foundationdb/test/ThrottledRetryingRunnerTest.java @@ -0,0 +1,955 @@ +/* + * ThrottledRetryingRunnerTest.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.test; + +import com.apple.foundationdb.Database; +import com.apple.foundationdb.Transaction; +import com.apple.foundationdb.subspace.Subspace; +import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.Tags; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.UnaryOperator; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.catchThrowableOfType; + +@Tag(Tags.RequiresFDB) +class ThrottledRetryingRunnerTest { + @RegisterExtension + static final TestDatabaseExtension dbExtension = new TestDatabaseExtension(); + @RegisterExtension + TestSubspaceExtension subspaceExtension = new TestSubspaceExtension(dbExtension); + + private Database db; + private Subspace subspace; + private ScheduledExecutorService scheduledExecutor; + + @BeforeEach + void setUp() { + db = dbExtension.getDatabase(); + subspace = subspaceExtension.getSubspace(); + scheduledExecutor = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder().setNameFormat("throttled-runner-test-%d").build()); + } + + @AfterEach + void tearDown() { + scheduledExecutor.shutdown(); + } + + // ------------------------------------------------------------------------- + // throttlePerSecDelayMillis static helper + // ------------------------------------------------------------------------- + + @ParameterizedTest + @CsvSource({ + // elapsedMs, maxPerSec, eventCount, expectedDelayMs + "1000, 100, 0, 0", // no events: no delay + "1000, 100, 100, 0", // exactly right rate: no delay + "1000, 100, 80, 0", // below rate: no delay + "1000, 100, 200, 1000", // double the rate: 1s delay + " 250, 50, 100, 1750", // 100 events should take 2s, only 250ms elapsed + " 1, 50, 100, 1999", // nearly instant: almost full 2s delay + " 0, 100, 0, 0", // zero events, no limit + "1000, 0, 999, 0", // maxPerSec=0 means disabled + }) + void testThrottlePerSecDelayMillis(long elapsedMs, int maxPerSec, int eventCount, long expectedDelay) { + assertThat(ThrottledRetryingRunner.throttlePerSecDelayMillis(elapsedMs, maxPerSec, eventCount)) + .isEqualTo(expectedDelay); + } + + // ------------------------------------------------------------------------- + // QuotaManager limit adjustment + // ------------------------------------------------------------------------- + + @Test + void testIncreaseLimitFormula() { + // limit=0 is no-limit mode; increaseLimit is a no-op + ThrottledRetryingRunner.QuotaManager qm = quotaManager(0); + qm.increaseLimit(); + assertThat(qm.getLimit()).isEqualTo(0); + + // With a real maxLimit, decreaseLimit reads processedCount directly (before initTransaction) + ThrottledRetryingRunner.QuotaManager qm2 = quotaManager(1000); + qm2.processedCountAdd(80); + qm2.decreaseLimit(); // limit = max(1, 80*9/10) = 72 + assertThat(qm2.getLimit()).isEqualTo(72); + qm2.increaseLimit(); // limit = max(72*5/4, 72+4) = max(90, 76) = 90 + assertThat(qm2.getLimit()).isEqualTo(90); + } + + @Test + void testDecreaseLimitFormula() { + ThrottledRetryingRunner.QuotaManager qm = quotaManager(1000); + // Process 100 items, then fail → limit = max(1, 100*9/10) = 90 + qm.processedCountAdd(100); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(90); + + // Process 10 items, fail again → max(1, 10*9/10) = 9 + qm.initTransaction(); + qm.processedCountAdd(10); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(9); + + // Decrease floors at 1 + qm.initTransaction(); + qm.processedCountInc(); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(1); + + // Zero processed also floors at 1 + qm.initTransaction(); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(1); + } + + @Test + void testDecreaseLimitNoOpWhenLimitIsZero() { + ThrottledRetryingRunner.QuotaManager qm = quotaManager(0); + qm.processedCountAdd(100); + qm.initTransaction(); + qm.decreaseLimit(); + assertThat(qm.getLimit()).isEqualTo(0); + } + + @Test + void testLimitCappedAtMaxLimit() { + ThrottledRetryingRunner.QuotaManager qm = quotaManager(50); + // Drive limit down by simulating a failure with 40 in-flight items + qm.processedCountAdd(40); + qm.decreaseLimit(); // limit = max(1, 40*9/10) = 36 + // Keep increasing until we hit the cap + for (int i = 0; i < 100; i++) { + qm.increaseLimit(); + } + assertThat(qm.getLimit()).isEqualTo(50); // capped at maxLimit + } + + // ------------------------------------------------------------------------- + // QuotaManager shouldContinue + // ------------------------------------------------------------------------- + + @Test + void shouldContinueTrueWhenUnderLimit() { + ThrottledRetryingRunner.QuotaManager qm = quotaManager(10); + qm.initTransaction(); + qm.processedCountAdd(5); + assertThat(qm.shouldContinue()).isTrue(); + } + + @Test + void shouldContinueFalseWhenProcessedCountExceedsLimit() { + ThrottledRetryingRunner.QuotaManager qm = quotaManager(10); + qm.initTransaction(); + qm.processedCountAdd(11); // 11 > 10 + assertThat(qm.shouldContinue()).isFalse(); + } + + @Test + void shouldContinueTrueWhenProcessedCountEqualsLimit() { + // > not >=: at exactly the limit, shouldContinue still returns true + ThrottledRetryingRunner.QuotaManager qm = quotaManager(10); + qm.initTransaction(); + qm.processedCountAdd(10); // 10 > 10 is false + assertThat(qm.shouldContinue()).isTrue(); + } + + @Test + void shouldContinueTrueWhenNoLimitRegardlessOfCount() { + // limit=0 means no limit; the count is never checked + ThrottledRetryingRunner.QuotaManager qm = quotaManager(0); + qm.initTransaction(); + qm.processedCountAdd(Integer.MAX_VALUE); + assertThat(qm.shouldContinue()).isTrue(); + } + + @Test + void shouldContinueResetsAfterInitTransaction() { + // After a transaction fails (count exceeds limit), initTransaction should reset + // the state so shouldContinue returns true again at the start of the retry. + ThrottledRetryingRunner.QuotaManager qm = quotaManager(5); + qm.initTransaction(); + qm.processedCountAdd(6); // 6 > 5 → shouldContinue false + assertThat(qm.shouldContinue()).isFalse(); + + qm.initTransaction(); // reset for the next transaction + assertThat(qm.shouldContinue()).isTrue(); + } + + // ------------------------------------------------------------------------- + // Basic iteration: task runs the expected number of times + // ------------------------------------------------------------------------- + + @Test + void taskRunsUntilExhausted() { + final int iterations = 5; + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + if (callCount.incrementAndGet() >= iterations) { + return exhausted(); + } + return hasMore(); + }).join(); + } + + assertThat(callCount.get()).isEqualTo(iterations); + } + + @Test + void taskExhaustedImmediatelyRunsOnce() { + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + callCount.incrementAndGet(); + return exhausted(); + }).join(); + } + + assertThat(callCount.get()).isEqualTo(1); + } + + // ------------------------------------------------------------------------- + // cannotRunWhenClosed + // ------------------------------------------------------------------------- + + @Test + void cannotRunWhenClosed() { + ThrottledRetryingRunner runner = runnerBuilder().build(); + runner.iterateAll((tr, quota, cont) -> exhausted()).join(); + runner.close(); + + assertThatThrownBy(() -> + runner.iterateAll((tr, quota, cont) -> exhausted()).join()) + .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class); + } + + // ------------------------------------------------------------------------- + // Retries on failure + // ------------------------------------------------------------------------- + + @ParameterizedTest + @ValueSource(ints = {0, 1, 3, 10}) + void retriesCorrectNumberOfTimes(int numRetries) { + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(numRetries) + .build()) { + Throwable ex = catchThrowableOfType(RuntimeException.class, () -> + runner.iterateAll((tr, quota, cont) -> { + callCount.incrementAndGet(); + return CompletableFuture.failedFuture(new RuntimeException("always fails")); + }).join()); + + assertThat(ex).isNotNull(); + assertThat(ex.getMessage()).contains("always fails"); + } + + // First attempt + numRetries retries + assertThat(callCount.get()).isEqualTo(numRetries + 1); + } + + @Test + void retryCounterResetsAfterSuccess() { + // Fail twice, succeed, then fail twice again — should not exhaust the retry budget + // because the counter resets on each success. + AtomicInteger totalCalls = new AtomicInteger(0); + AtomicInteger successCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(2) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + int call = totalCalls.incrementAndGet(); + // Fail on calls 1 and 2 + if (call == 1 || call == 2) { + return fail(); + } + // Succeed on call 3 (resets counter), then fail on 4 and 5, succeed on 6, done + if (call == 4 || call == 5) { + return fail(); + } + successCount.incrementAndGet(); + if (successCount.get() == 2) { + return exhausted(); + } + return hasMore(); + }).join(); + } + + assertThat(successCount.get()).isEqualTo(2); + } + + @Test + void runnerClosedDuringIterationAbortsImmediately() { + AtomicInteger callCount = new AtomicInteger(0); + + ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(100).build(); + runner.close(); // close before iterating + + assertThatThrownBy(() -> + runner.iterateAll((tr, quota, cont) -> { + callCount.incrementAndGet(); + return exhausted(); + }).join()) + .hasCauseInstanceOf(TransactionalRunner.RunnerClosed.class); + + // iterateAll checks closed before starting, so the task is never called + assertThat(callCount.get()).isEqualTo(0); + } + + // ------------------------------------------------------------------------- + // Counts: processedCount and deletedCount + // ------------------------------------------------------------------------- + + @Test + void countsAreResetEachTransaction() { + // Each transaction should see processedCount starting at 0 + List observedCounts = new ArrayList<>(); + AtomicInteger totalCalls = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + observedCounts.add(quota.getProcessedCount()); // always 0 at start + quota.processedCountAdd(10); + if (totalCalls.incrementAndGet() >= 3) { + return exhausted(); + } + return hasMore(); + }).join(); + } + + assertThat(observedCounts).containsExactly(0, 0, 0); + } + + @Test + void processedAndDeletedCountsAreIndependent() { + AtomicInteger observedProcessed = new AtomicInteger(-1); + AtomicInteger observedDeleted = new AtomicInteger(-1); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + quota.processedCountAdd(7); + quota.deletedCountAdd(3); + observedProcessed.set(quota.getProcessedCount()); + observedDeleted.set(quota.getDeletedCount()); + return exhausted(); + }).join(); + } + + assertThat(observedProcessed.get()).isEqualTo(7); + assertThat(observedDeleted.get()).isEqualTo(3); + } + + // ------------------------------------------------------------------------- + // Limit: adaptive adjustment + // ------------------------------------------------------------------------- + + @ParameterizedTest + @CsvSource({"0, 0", "50, 50"}) + void limitStartsAtConfiguredMaxLimit(int maxLimit, int expectedLimit) { + AtomicInteger observedLimit = new AtomicInteger(-1); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withMaxLimit(maxLimit) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + observedLimit.set(quota.getLimit()); + return exhausted(); + }).join(); + } + + assertThat(observedLimit.get()).isEqualTo(expectedLimit); + } + + @Test + void limitDecreasesOnFailureThenIncreasesOnSuccess() { + AtomicInteger callCount = new AtomicInteger(0); + List observedLimits = new ArrayList<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withMaxLimit(200) + .withIncreaseLimitAfter(2) // increase after 2 successes to keep the test short + .withNumOfRetries(10) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + int call = callCount.incrementAndGet(); + observedLimits.add(quota.getLimit()); + + if (call == 1) { + // Process 100 items and fail → next limit = 90 + quota.processedCountAdd(100); + return fail(); + } + if (call <= 3) { + // Two successes → limit increases from 90 + quota.processedCountInc(); + return hasMore(); + } + return exhausted(); + }).join(); + } + + // call 1: limit=200 (initial), fails with 100 processed → new limit = 90 + assertThat(observedLimits.get(0)).isEqualTo(200); + // call 2: limit=90 + assertThat(observedLimits.get(1)).isEqualTo(90); + // call 3: limit=90 still (need 2 consecutive successes before increase) + assertThat(observedLimits.get(2)).isEqualTo(90); + // call 4: after 2 successes, limit increased: max(90*5/4, 90+4) = max(112,94) = 112 + assertThat(observedLimits.get(3)).isEqualTo(112); + } + + @Test + void limitDecreasesProgressivelyTowardOne() { + AtomicInteger callCount = new AtomicInteger(0); + List observedLimits = new ArrayList<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withMaxLimit(1000) + .withNumOfRetries(20) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + int call = callCount.incrementAndGet(); + observedLimits.add(quota.getLimit()); + if (call <= 10) { + quota.processedCountInc(); // 1 item processed → limit floor at 1 + return fail(); + } + return exhausted(); + }).join(); + } + + // After first failure with 1 processed: max(1, 1*9/10=0) = 1 + assertThat(observedLimits.get(1)).isEqualTo(1); + // Stays at 1 regardless of further failures + for (int i = 2; i < 10; i++) { + assertThat(observedLimits.get(i)).isEqualTo(1); + } + } + + @Test + void limitIsNeverAdjustedWhenMaxLimitIsZero() { + AtomicInteger callCount = new AtomicInteger(0); + List observedLimits = new ArrayList<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(5) + // no withMaxLimit → limit stays 0 + .build()) { + runner.iterateAll((tr, quota, cont) -> { + int call = callCount.incrementAndGet(); + observedLimits.add(quota.getLimit()); + quota.processedCountAdd(100); + if (call <= 3) { + return fail(); + } + return exhausted(); + }).join(); + } + + assertThat(observedLimits).containsOnly(0); + } + + // ------------------------------------------------------------------------- + // commitWhenDone: writes are visible (commit) or not (no commit) + // ------------------------------------------------------------------------- + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void commitWhenDoneControlsWhetherWritesArePersisted(boolean commitWhenDone) { + byte[] key = subspace.pack(Tuple.from("commit-test")); + byte[] value = "hello".getBytes(); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withCommitWhenDone(commitWhenDone) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + tr.set(key, value); + return exhausted(); + }).join(); + } + + byte[] read = db.run(tr -> tr.get(key).join()); + if (commitWhenDone) { + assertThat(read).isEqualTo(value); + } else { + assertThat(read).isNull(); + } + } + + // ------------------------------------------------------------------------- + // Each transaction gets a fresh Transaction object + // ------------------------------------------------------------------------- + + @Test + void eachIterationReceivesADistinctTransaction() { + List seenTransactions = new ArrayList<>(); + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + seenTransactions.add(tr); + if (callCount.incrementAndGet() >= 3) { + return exhausted(); + } + return hasMore(); + }).join(); + } + + assertThat(seenTransactions).hasSize(3); + assertThat(seenTransactions.get(0)).isNotSameAs(seenTransactions.get(1)); + assertThat(seenTransactions.get(1)).isNotSameAs(seenTransactions.get(2)); + } + + // ------------------------------------------------------------------------- + // Rate limiting + // ------------------------------------------------------------------------- + + @Test + void throttlingByScannedItemsSlowsItDown() { + assertThrottledByItemsPerSec( + b -> b.withMaxItemsScannedPerSec(20), + ThrottledRetryingRunner.QuotaManager::processedCountAdd); + } + + @Test + void throttlingByDeletedItemsSlowsItDown() { + assertThrottledByItemsPerSec( + b -> b.withMaxItemsDeletedPerSec(20), + ThrottledRetryingRunner.QuotaManager::deletedCountAdd); + } + + @Test + void noThrottlingWithZeroRateLimit() { + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + quota.processedCountAdd(1000); + if (callCount.incrementAndGet() >= 5) { + return exhausted(); + } + return hasMore(); + }).join(); + } + + assertThat(callCount.get()).isEqualTo(5); + } + + // ------------------------------------------------------------------------- + // Real DB writes + // ------------------------------------------------------------------------- + + @Test + void dataWrittenInEachTransactionIsVisible() { + final int numTransactions = 5; + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + int n = callCount.incrementAndGet(); + tr.set(subspace.pack(Tuple.from(n)), Tuple.from(n).pack()); + if (n >= numTransactions) { + return exhausted(); + } + return hasMore(); + }).join(); + } + + db.run(tr -> { + for (int i = 1; i <= numTransactions; i++) { + byte[] val = tr.get(subspace.pack(Tuple.from(i))).join(); + assertThat(val).isEqualTo(Tuple.from(i).pack()); + } + return null; + }); + } + + // ------------------------------------------------------------------------- + // Continuation passing + // ------------------------------------------------------------------------- + + @Test + void firstCallReceivesStartContinuation() { + AtomicReference observed = new AtomicReference<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + observed.set(cont); + return exhausted(); + }).join(); + } + + assertThat(observed.get()).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class); + assertThat(observed.get()).isSameAs(ThrottledRetryingRunner.StartContinuation.INSTANCE); + } + + @Test + void continuationFromSuccessIsPassedToNextCall() { + // Each transaction returns a custom continuation object; the next call should receive it. + List received = new ArrayList<>(); + AtomicInteger callCount = new AtomicInteger(0); + + // Distinct continuation objects for each transaction + ThrottledRetryingRunner.Continuation cont1 = () -> true; + ThrottledRetryingRunner.Continuation cont2 = () -> true; + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + received.add(cont); + int call = callCount.incrementAndGet(); + if (call == 1) { + return CompletableFuture.completedFuture(cont1); + } + if (call == 2) { + return CompletableFuture.completedFuture(cont2); + } + return exhausted(); + }).join(); + } + + // call 1: receives StartContinuation + assertThat(received.get(0)).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class); + // call 2: receives the continuation returned by call 1 + assertThat(received.get(1)).isSameAs(cont1); + // call 3: receives the continuation returned by call 2 + assertThat(received.get(2)).isSameAs(cont2); + } + + @Test + void continuationIsNotAdvancedOnRetry() { + // When a transaction fails, the retry should receive the same continuation as the failed + // attempt — not a new one — so that the task can resume from the same position. + List received = new ArrayList<>(); + AtomicInteger callCount = new AtomicInteger(0); + + ThrottledRetryingRunner.Continuation contAfterFirstSuccess = () -> true; + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(3) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + received.add(cont); + int call = callCount.incrementAndGet(); + + if (call == 1) { + // First success: return a custom continuation + return CompletableFuture.completedFuture(contAfterFirstSuccess); + } + if (call == 2 || call == 3) { + // Fail twice: continuation should NOT advance + return fail(); + } + // Final success + return exhausted(); + }).join(); + } + + // call 1: StartContinuation (first call) + assertThat(received.get(0)).isInstanceOf(ThrottledRetryingRunner.StartContinuation.class); + // call 2: contAfterFirstSuccess (first call succeeded) + assertThat(received.get(1)).isSameAs(contAfterFirstSuccess); + // call 3: still contAfterFirstSuccess — call 2 failed, continuation not advanced + assertThat(received.get(2)).isSameAs(contAfterFirstSuccess); + // call 4: still contAfterFirstSuccess — call 3 failed too + assertThat(received.get(3)).isSameAs(contAfterFirstSuccess); + } + + @Test + void startContinuationReusedAcrossRetriesBeforeAnySuccess() { + // Before any successful transaction, all retries should receive StartContinuation. + List received = new ArrayList<>(); + AtomicInteger callCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder() + .withNumOfRetries(3) + .build()) { + runner.iterateAll((tr, quota, cont) -> { + received.add(cont); + if (callCount.incrementAndGet() <= 2) { + return fail(); + } + return exhausted(); + }).join(); + } + + // All three calls should receive StartContinuation since no success ever happened + assertThat(received).hasSize(3); + assertThat(received).allMatch(c -> c instanceof ThrottledRetryingRunner.StartContinuation); + } + + @Test + void continuationCarriesStateToNextTransaction() { + // A realistic scenario: the continuation carries a "cursor position" (last key seen). + // Each transaction processes one key and the next transaction picks up where it left off. + final int numKeys = 4; + + // Write test data + db.run(tr -> { + for (int i = 0; i < numKeys; i++) { + tr.set(subspace.pack(Tuple.from(i)), Tuple.from(i * 10).pack()); + } + return null; + }); + + List processedValues = new ArrayList<>(); + + try (ThrottledRetryingRunner runner = runnerBuilder().build()) { + runner.iterateAll((tr, quota, cont) -> { + int idx = (cont instanceof IndexContinuation) ? ((IndexContinuation) cont).nextIndex : 0; + byte[] raw = tr.get(subspace.pack(Tuple.from(idx))).join(); + processedValues.add((int) Tuple.fromBytes(raw).getLong(0)); + return CompletableFuture.completedFuture(new IndexContinuation(idx + 1, numKeys)); + }).join(); + } + + assertThat(processedValues).containsExactly(0, 10, 20, 30); + } + + // ------------------------------------------------------------------------- + // Real transaction conflict + // ------------------------------------------------------------------------- + + @Test + void retriesOnRealTransactionConflict() { + // Arrange a genuine FDB NOT_COMMITTED conflict without any threading: + // 1. Task (attempt 1) reads conflictKey → establishes a read-conflict range on tr + // 2. Task also writes to ownWriteKey → makes tr a write transaction so FDB actually + // checks read conflicts on commit (FDB silently skips conflict checking for + // read-only transactions) + // 3. Task opens a second transaction, writes to conflictKey, and commits it + // 4. TransactionalRunner then commits tr → FDB detects the conflict and rejects it + // 5. Runner retries; attempt 2 sees no conflict and succeeds + byte[] conflictKey = subspace.pack(Tuple.from("conflict-key")); + byte[] ownWriteKey = subspace.pack(Tuple.from("own-write")); + AtomicInteger attemptCount = new AtomicInteger(0); + + try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) { + runner.iterateAll((tr, quota, cont) -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt == 1) { + return injectConflict(tr, conflictKey, ownWriteKey) + .thenCompose(ignored -> exhausted()); + } + // Attempt 2: no conflict + return exhausted(); + }).join(); + } + + assertThat(attemptCount.get()).isEqualTo(2); + } + + @Test + void conflictDoesNotAdvanceContinuation() { + // Write 6 data keys to process in batches of 2 across 3 transactions. + // + // Expected flow: + // Attempt 1 (idx=0): processes items 0,1 — commits → continuation advances to idx=2 + // Attempt 2 (idx=2): processes items 2,3 — CONFLICT → not committed + // Attempt 3 (idx=2): retry receives idx=2 (not idx=4!) — processes 2,3 again — commits + // Attempt 4 (idx=4): processes items 4,5 — commits → hasMore()=false, loop ends + // + // The key assertion is that attempt 3 starts at idx=2, not idx=4. If the continuation + // were incorrectly advanced on failure, items 2 and 3 would never be written to FDB. + final int numKeys = 6; + final int batchSize = 2; + + db.run(tr -> { + for (int i = 0; i < numKeys; i++) { + tr.set(subspace.pack(Tuple.from("data", i)), Tuple.from(i).pack()); + } + return null; + }); + + byte[] conflictKey = subspace.pack(Tuple.from("conflict-trigger")); + byte[] ownWriteKey = subspace.pack(Tuple.from("own-write")); + + AtomicInteger attemptCount = new AtomicInteger(0); + AtomicInteger retryStartIdx = new AtomicInteger(-1); // captured from attempt 3 + + try (ThrottledRetryingRunner runner = runnerBuilder().withNumOfRetries(3).build()) { + runner.iterateAll((tr, quota, cont) -> { + int attempt = attemptCount.incrementAndGet(); + int startIdx = (cont instanceof IndexContinuation) + ? ((IndexContinuation) cont).nextIndex : 0; + int endIdx = Math.min(startIdx + batchSize, numKeys); + + if (attempt == 3) { + retryStartIdx.set(startIdx); + } + + // Write a "processed" marker for each item in this batch + for (int i = startIdx; i < endIdx; i++) { + tr.set(subspace.pack(Tuple.from("done", i)), Tuple.from(i).pack()); + } + + if (attempt == 2) { + return injectConflict(tr, conflictKey, ownWriteKey) + .thenCompose(ignored -> + CompletableFuture.completedFuture(new IndexContinuation(endIdx, numKeys))); + } + + return CompletableFuture.completedFuture(new IndexContinuation(endIdx, numKeys)); + }).join(); + } + + // Attempt 3 must have restarted at idx=2 (the last committed continuation), + // not at idx=4 (what would have been returned if the failed attempt had committed) + assertThat(retryStartIdx.get()).isEqualTo(2); + + // 4 attempts total: tx1(0-1) + tx2(2-3, conflict) + retry(2-3) + tx3(4-5) + assertThat(attemptCount.get()).isEqualTo(4); + + // All 6 items must be present in FDB exactly once — none skipped, none missing + db.run(tr -> { + for (int i = 0; i < numKeys; i++) { + byte[] val = tr.get(subspace.pack(Tuple.from("done", i))).join(); + assertThat(val).as("item %d should be committed", i) + .isEqualTo(Tuple.from(i).pack()); + } + return null; + }); + } + + + // ------------------------------------------------------------------------- + + /** Returns a completed future with a continuation indicating there is more work to do. */ + private static CompletableFuture hasMore() { + return CompletableFuture.completedFuture(() -> true); + } + + /** Returns a completed future with a continuation indicating the source is exhausted. */ + private static CompletableFuture exhausted() { + return CompletableFuture.completedFuture(() -> false); + } + + /** Returns a pre-failed future simulating a transient task failure. */ + private static CompletableFuture fail() { + return CompletableFuture.failedFuture(new RuntimeException("transient")); + } + + /** Convenience factory for a {@link ThrottledRetryingRunner.QuotaManager}. */ + private static ThrottledRetryingRunner.QuotaManager quotaManager(int maxLimit) { + return new ThrottledRetryingRunner.QuotaManager(maxLimit); + } + + /** + * Asserts that the runner throttles correctly when {@code reportItems} is used to count + * items and {@code configure} wires up the corresponding rate-limiter on the builder. + */ + private void assertThrottledByItemsPerSec( + UnaryOperator configure, + BiConsumer reportItems) { + final int itemsPerTransaction = 10; + final int maxPerSec = 20; + final int transactions = 3; + AtomicInteger callCount = new AtomicInteger(0); + + long start = System.currentTimeMillis(); + try (ThrottledRetryingRunner runner = configure.apply(runnerBuilder()).build()) { + runner.iterateAll((tr, quota, cont) -> { + reportItems.accept(quota, itemsPerTransaction); + if (callCount.incrementAndGet() >= transactions) { + return exhausted(); + } + return hasMore(); + }).join(); + } + long elapsed = System.currentTimeMillis() - start; + + // totalItems / maxPerSec = (3 * 10) / 20 = 1.5s minimum + long expectedMinMs = TimeUnit.SECONDS.toMillis((long) itemsPerTransaction * transactions / maxPerSec); + assertThat(elapsed).isGreaterThanOrEqualTo(expectedMinMs); + } + + /** + * Injects a genuine FDB read-conflict into {@code tr} and returns a future that completes + * once the conflicting write has been committed. + *

+ * How it works: + *

    + *
  1. Writes to {@code ownWriteKey} so that {@code tr} is a write transaction — FDB only + * checks read-conflict ranges when the committing transaction has writes.
  2. + *
  3. Reads {@code conflictKey} to add it to {@code tr}'s read-conflict range.
  4. + *
  5. Opens a separate transaction, writes to {@code conflictKey}, and commits it, so that + * when {@code TransactionalRunner} later commits {@code tr} it receives + * {@code NOT_COMMITTED}.
  6. + *
+ *

+ */ + private CompletableFuture injectConflict(Transaction tr, byte[] conflictKey, byte[] ownWriteKey) { + tr.set(ownWriteKey, new byte[]{0}); + return tr.get(conflictKey) + .thenCompose(ignored -> db.runAsync(tr2 -> { + tr2.set(conflictKey, new byte[]{1}); + return CompletableFuture.completedFuture(null); + })); + } + + private ThrottledRetryingRunner.Builder runnerBuilder() { + return ThrottledRetryingRunner.builder(db, scheduledExecutor); + } + + /** + * A continuation that carries a numeric index as cursor position. + * {@link #hasMore()} returns {@code true} while {@link #nextIndex} is less than + * {@link #totalKeys}. + */ + private static class IndexContinuation implements ThrottledRetryingRunner.Continuation { + final int nextIndex; + private final int totalKeys; + + IndexContinuation(int nextIndex, int totalKeys) { + this.nextIndex = nextIndex; + this.totalKeys = totalKeys; + } + + @Override + public boolean hasMore() { + return nextIndex < totalKeys; + } + } +} diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java new file mode 100644 index 0000000000..15b22ae952 --- /dev/null +++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/ThrottledRetryingRunner.java @@ -0,0 +1,547 @@ +/* + * ThrottledRetryingRunner.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.test; + +import com.apple.foundationdb.Database; +import com.apple.foundationdb.Transaction; +import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.async.MoreAsyncUtil; + +import javax.annotation.Nonnull; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Runs a task repeatedly across multiple transactions, with adaptive limit management, optional + * rate limiting, and retry logic. + *

+ * The caller supplies a {@link Task} that receives a fresh {@link Transaction}, a + * {@link QuotaManager}, and the {@link Continuation} from the last successful + * transaction (or {@link StartContinuation} on the first call). The task returns a + * {@code CompletableFuture}; returning a continuation with + * {@link Continuation#hasMore()} {@code == false} stops the loop. + *

+ *

+ * On failure the continuation is not advanced — the last successful continuation is + * re-passed to the retry, so the task can resume from the same position. This distinguishes a + * retry (same continuation) from a fresh transaction after success (new continuation). + *

+ *

+ * The {@link QuotaManager} is reset at the start of each transaction. The task uses it to report + * how many items were processed ({@link QuotaManager#processedCountInc()}) and deleted + * ({@link QuotaManager#deletedCountInc()}). These counts drive the between-transaction throttle + * delay when {@code maxItemsScannedPerSec} or {@code maxItemsDeletedPerSec} are configured, and + * inform the adaptive limit adjustment on failure. + *

+ *

+ * A limit of {@code 0} is treated as "no limit" — the task may do as much work as it likes in + * each transaction and the limit is never adjusted. + *

+ */ +public class ThrottledRetryingRunner implements AutoCloseable { + + public static final int DEFAULT_NUM_RETRIES = 100; + public static final int DEFAULT_INCREASE_LIMIT_AFTER = 40; + + @Nonnull + private final TransactionalRunner transactionalRunner; + @Nonnull + private final ScheduledExecutorService scheduledExecutor; + private final int maxItemsScannedPerSec; + private final int maxItemsDeletedPerSec; + private final int numOfRetries; + private final boolean commitWhenDone; + private final int increaseLimitAfter; + private final int maxLimit; + + private boolean closed = false; + private long transactionStartTimeMillis = 0; + private int failureRetriesCounter = 0; + private int consecutiveSuccessCount = 0; + + private ThrottledRetryingRunner(Builder builder) { + this.transactionalRunner = new TransactionalRunner(builder.database); + this.scheduledExecutor = builder.scheduledExecutor; + this.maxItemsScannedPerSec = builder.maxItemsScannedPerSec; + this.maxItemsDeletedPerSec = builder.maxItemsDeletedPerSec; + this.numOfRetries = builder.numOfRetries; + this.commitWhenDone = builder.commitWhenDone; + this.increaseLimitAfter = builder.increaseLimitAfter; + this.maxLimit = builder.maxLimit; + } + + /** + * Run the given task repeatedly across multiple transactions until it returns a + * {@link Continuation} with {@link Continuation#hasMore()} {@code == false}. + *

+ * On the very first call the task receives {@link StartContinuation#INSTANCE}. On each + * subsequent call after a successful commit it receives the continuation returned by the + * previous call. On a retry after failure it receives the same continuation that was passed + * to the failed attempt — i.e. the last successful continuation is preserved. + *

+ *

+ * A single {@link QuotaManager} is created for the lifetime of this call. Its per-transaction + * counts are reset at the start of each transaction; its limit persists and is adjusted + * between transactions. + *

+ * + * @param task the task to run + * @return a future that completes normally when the task returns a continuation with + * {@link Continuation#hasMore()} {@code == false}, or exceptionally if the retry + * limit is exceeded or the runner is closed + */ + @Nonnull + public CompletableFuture iterateAll(@Nonnull Task task) { + if (closed) { + return CompletableFuture.failedFuture(new TransactionalRunner.RunnerClosed()); + } + + final QuotaManager quotaManager = new QuotaManager(maxLimit); + final AtomicReference lastSuccessfulCont = + new AtomicReference<>(StartContinuation.INSTANCE); + return AsyncUtil.whileTrue(() -> + runOneTransaction(task, quotaManager, lastSuccessfulCont.get()) + .handle((continuation, ex) -> { + if (ex == null) { + lastSuccessfulCont.set(continuation); + return handleSuccess(quotaManager, continuation); + } + return handleFailure(ex, quotaManager); + }) + .thenCompose(ret -> ret)); + } + + private CompletableFuture runOneTransaction( + @Nonnull Task task, + @Nonnull QuotaManager quotaManager, + @Nonnull Continuation continuation) { + transactionStartTimeMillis = System.currentTimeMillis(); + return transactionalRunner.runAsync(commitWhenDone, transaction -> { + quotaManager.initTransaction(); + return task.run(transaction, quotaManager, continuation); + }); + } + + private CompletableFuture handleSuccess( + @Nonnull QuotaManager quotaManager, + @Nonnull Continuation continuation) { + failureRetriesCounter = 0; + ++consecutiveSuccessCount; + + // Increase limit after enough consecutive successes, but only when a limit is active + if (quotaManager.limit > 0 && consecutiveSuccessCount >= increaseLimitAfter) { + quotaManager.increaseLimit(); + consecutiveSuccessCount = 0; + } + + if (!continuation.hasMore()) { + return AsyncUtil.READY_FALSE; + } + + long elapsedMillis = Math.max(0, System.currentTimeMillis() - transactionStartTimeMillis); + long delayMillis = Collections.max(List.of( + throttlePerSecDelayMillis(elapsedMillis, maxItemsScannedPerSec, quotaManager.processedCount), + throttlePerSecDelayMillis(elapsedMillis, maxItemsDeletedPerSec, quotaManager.deletedCount) + )); + + if (delayMillis > 0) { + return MoreAsyncUtil.delayedFuture(delayMillis, TimeUnit.MILLISECONDS, scheduledExecutor) + .thenApply(ignore -> true); + } + return AsyncUtil.READY_TRUE; + } + + private CompletableFuture handleFailure( + @Nonnull Throwable ex, + @Nonnull QuotaManager quotaManager) { + ++failureRetriesCounter; + consecutiveSuccessCount = 0; + + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } + if (ex instanceof TransactionalRunner.RunnerClosed) { + return CompletableFuture.failedFuture(ex); + } + if (failureRetriesCounter > numOfRetries) { + return CompletableFuture.failedFuture(ex); + } + + // Decrease the limit based on how much work was in-flight when the failure occurred, + // so the next attempt is less likely to exceed the transaction budget. + quotaManager.decreaseLimit(); + + return AsyncUtil.READY_TRUE; // retry + } + + /** + * Compute how long to wait to keep the event rate at or below {@code maxPerSec}. + * Matches the formula used by {@code ThrottledRetryingIterator}. + */ + static long throttlePerSecDelayMillis(long elapsedMillis, int maxPerSec, int eventCount) { + if (maxPerSec <= 0) { + return 0; + } + long waitMillis = (TimeUnit.SECONDS.toMillis(eventCount) / maxPerSec) - elapsedMillis; + return waitMillis > 0 ? waitMillis : 0; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + transactionalRunner.close(); + } + + // ------------------------------------------------------------------------- + // Continuation + // ------------------------------------------------------------------------- + + /** + * Represents the result of one transaction's work and signals whether the runner should + * start another transaction. + *

+ * Implementations are free to carry any state needed to resume work at the right position + * in the next transaction. The runner itself only inspects {@link #hasMore()}. + *

+ */ + @FunctionalInterface + public interface Continuation { + /** + * Returns {@code true} if there is more work to do; {@code false} if the source has + * been exhausted and the loop should stop after committing the current transaction. + */ + boolean hasMore(); + } + + /** + * The continuation passed to the task on the very first transaction. + *

+ * Tasks can detect the first call with {@code continuation instanceof StartContinuation}. + *

+ */ + public static final class StartContinuation implements Continuation { + /** The singleton instance to pass on the first call. */ + public static final StartContinuation INSTANCE = new StartContinuation(); + + private StartContinuation() { + } + + @Override + public boolean hasMore() { + return true; + } + } + + // ------------------------------------------------------------------------- + // Task + // ------------------------------------------------------------------------- + + /** + * A task that performs work inside a single transaction and returns a continuation. + */ + @FunctionalInterface + public interface Task { + /** + * Run one transaction's worth of work. + * + * @param transaction the transaction for this attempt + * @param quotaManager tracks work done this transaction and exposes the current limit + * @param continuation the continuation from the last successful transaction, + * or {@link StartContinuation#INSTANCE} on the very first call; + * on a retry this is the same continuation that was passed to the + * failed attempt + * @return a future containing the continuation describing where the next transaction + * should resume; return a continuation with {@link Continuation#hasMore()} + * {@code == false} to stop the loop + */ + @Nonnull + CompletableFuture run(@Nonnull Transaction transaction, + @Nonnull QuotaManager quotaManager, + @Nonnull Continuation continuation); + } + + // ------------------------------------------------------------------------- + // QuotaManager + // ------------------------------------------------------------------------- + + /** + * Tracks per-transaction resource usage and exposes the adaptive limit. + *

+ * One instance is created per {@link #iterateAll} call. Per-transaction counts are reset at + * the start of each transaction; the limit persists and is adjusted by the runner between + * transactions. + *

+ *

+ * The task should call {@link #shouldContinue()} between processing items to decide whether + * to stop early. {@link #shouldContinue()} returns {@code false} once the processed count + * exceeds the active limit or the transaction has been running for longer than + * {@link #MAX_TRANSACTION_DURATION_MILLIS}. + *

+ */ + public static class QuotaManager { + /** Maximum time a single transaction should run before {@link #shouldContinue()} returns {@code false}. */ + static final long MAX_TRANSACTION_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(4); + + private int processedCount; + private int deletedCount; + private int limit; + private final int maxLimit; + private long transactionStartTimeMillis; + + QuotaManager(int maxLimit) { + this.maxLimit = maxLimit; + this.limit = maxLimit; + } + + /** + * Return the current limit. + *

+ * The task should attempt at most this many units of work in the current transaction. + * A value of {@code 0} means no limit is active. + *

+ */ + public int getLimit() { + return limit; + } + + /** + * Return the number of items processed in the current transaction. + */ + public int getProcessedCount() { + return processedCount; + } + + /** + * Return the number of items deleted in the current transaction. + */ + public int getDeletedCount() { + return deletedCount; + } + + /** + * Increment the processed-item count by {@code count}. + * + * @param count number of items to add + */ + public void processedCountAdd(int count) { + processedCount += count; + } + + /** + * Increment the processed-item count by 1. + */ + public void processedCountInc() { + processedCount++; + } + + /** + * Increment the deleted-item count by {@code count}. + * + * @param count number of items to add + */ + public void deletedCountAdd(int count) { + deletedCount += count; + } + + /** + * Increment the deleted-item count by 1. + */ + public void deletedCountInc() { + deletedCount++; + } + + /** + * Returns {@code true} if the task should continue processing items in this transaction, + * {@code false} if it should stop and return a continuation. + *

+ * Returns {@code false} when either: + *

    + *
  • a limit is active ({@link #getLimit()} {@code > 0}) and the processed count + * exceeds it, or
  • + *
  • the transaction has been running for longer than + * {@link #MAX_TRANSACTION_DURATION_MILLIS}.
  • + *
+ *

+ */ + public boolean shouldContinue() { + if (limit > 0 && processedCount > limit) { + return false; + } + return System.currentTimeMillis() - transactionStartTimeMillis < MAX_TRANSACTION_DURATION_MILLIS; + } + + void initTransaction() { + processedCount = 0; + deletedCount = 0; + transactionStartTimeMillis = System.currentTimeMillis(); + } + + void increaseLimit() { + if (limit == 0) { + return; // no-limit mode: never adjust + } + final int increased = Math.max((limit * 5) / 4, limit + 4); + limit = maxLimit > 0 ? Math.min(maxLimit, increased) : increased; + } + + void decreaseLimit() { + if (limit == 0) { + return; // no-limit mode: never adjust + } + // Use the count from the transaction that just failed (before the next initTransaction resets it) + limit = Math.max(1, (processedCount * 9) / 10); + } + } + + // ------------------------------------------------------------------------- + // Builder + // ------------------------------------------------------------------------- + + /** + * Create a new builder for a {@link ThrottledRetryingRunner}. + * + * @param database the database to open transactions against + * @param scheduledExecutor executor used for throttle delays between transactions + * @return a new builder + */ + public static Builder builder(@Nonnull Database database, + @Nonnull ScheduledExecutorService scheduledExecutor) { + return new Builder(database, scheduledExecutor); + } + + /** + * Builder for {@link ThrottledRetryingRunner}. + */ + public static class Builder { + @Nonnull + private final Database database; + @Nonnull + private final ScheduledExecutorService scheduledExecutor; + private int maxItemsScannedPerSec = 0; + private int maxItemsDeletedPerSec = 0; + private int numOfRetries = DEFAULT_NUM_RETRIES; + private boolean commitWhenDone = true; + private int increaseLimitAfter = DEFAULT_INCREASE_LIMIT_AFTER; + private int maxLimit = 0; + + private Builder(@Nonnull Database database, @Nonnull ScheduledExecutorService scheduledExecutor) { + this.database = database; + this.scheduledExecutor = scheduledExecutor; + } + + /** + * Set the maximum number of items that may be scanned (processed) per second. + * The runner will delay between transactions to stay at or below this rate. + * A value of {@code 0} (the default) disables scanned-item throttling. + * + * @param maxItemsScannedPerSec the rate limit; {@code 0} disables it + * @return this builder + */ + public Builder withMaxItemsScannedPerSec(int maxItemsScannedPerSec) { + this.maxItemsScannedPerSec = Math.max(0, maxItemsScannedPerSec); + return this; + } + + /** + * Set the maximum number of items that may be deleted per second. + * The runner will delay between transactions to stay at or below this rate. + * A value of {@code 0} (the default) disables deleted-item throttling. + * + * @param maxItemsDeletedPerSec the rate limit; {@code 0} disables it + * @return this builder + */ + public Builder withMaxItemsDeletedPerSec(int maxItemsDeletedPerSec) { + this.maxItemsDeletedPerSec = Math.max(0, maxItemsDeletedPerSec); + return this; + } + + /** + * Set the number of consecutive successful transactions required before the limit is + * increased. Defaults to {@value DEFAULT_INCREASE_LIMIT_AFTER}. + * + * @param increaseLimitAfter consecutive successes before a limit increase + * @return this builder + */ + public Builder withIncreaseLimitAfter(int increaseLimitAfter) { + this.increaseLimitAfter = Math.max(1, increaseLimitAfter); + return this; + } + + /** + * Set the initial and maximum per-transaction limit passed to the task via + * {@link QuotaManager#getLimit()}. + *

+ * The limit starts at this value and will not be increased beyond it. If {@code 0} + * (the default) the limit feature is disabled: {@link QuotaManager#getLimit()} always + * returns {@code 0} and is never adjusted. + *

+ * + * @param maxLimit the initial and maximum limit; {@code 0} disables limit management + * @return this builder + */ + public Builder withMaxLimit(int maxLimit) { + this.maxLimit = Math.max(0, maxLimit); + return this; + } + + /** + * Set the maximum number of times to retry a failed transaction before giving up. + * The counter resets after each successful commit. Defaults to {@value DEFAULT_NUM_RETRIES}. + * + * @param numOfRetries maximum retries + * @return this builder + */ + public Builder withNumOfRetries(int numOfRetries) { + this.numOfRetries = Math.max(0, numOfRetries); + return this; + } + + /** + * Set whether to commit each transaction when the task returns. + * If {@code false}, transactions are rolled back instead of committed (useful for read-only tasks). + * Defaults to {@code true}. + * + * @param commitWhenDone {@code true} to commit, {@code false} to roll back + * @return this builder + */ + public Builder withCommitWhenDone(boolean commitWhenDone) { + this.commitWhenDone = commitWhenDone; + return this; + } + + /** + * Build the {@link ThrottledRetryingRunner}. + * + * @return a new runner + */ + public ThrottledRetryingRunner build() { + return new ThrottledRetryingRunner(this); + } + } +} \ No newline at end of file diff --git a/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java new file mode 100644 index 0000000000..e35df9b81c --- /dev/null +++ b/fdb-extensions/src/testFixtures/java/com/apple/foundationdb/test/TransactionalRunner.java @@ -0,0 +1,146 @@ +/* + * TransactionalRunner.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.test; + +import com.apple.foundationdb.Database; +import com.apple.foundationdb.Transaction; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * A simple runner that opens and commits raw FDB transactions and ensures they are all closed on runner close. + *

+ * This is the raw-FDB analog of the record-layer {@code TransactionalRunner}. It operates at the + * {@link Database}/{@link Transaction} level, with no record-layer dependencies. + *

+ */ +public class TransactionalRunner implements AutoCloseable { + + @Nonnull + private final Database database; + private boolean closed; + @Nonnull + private final List transactionsToClose; + + public TransactionalRunner(@Nonnull Database database) { + this.database = database; + this.transactionsToClose = new ArrayList<>(); + } + + /** + * Run some code with a transaction, then commit it. + *

+ * The transaction will be committed if the future returned by the runnable completes successfully; + * otherwise it will not be committed. Either way, the transaction is closed when the future completes. + * If this runner is {@link #close() closed}, any open transaction will also be closed. + *

+ * + * @param runnable code to run with a fresh {@link Transaction} + * @param the type of value returned by the future + * @return a future containing the result, after the transaction has been committed + */ + @Nonnull + @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) + public CompletableFuture runAsync(@Nonnull Function> runnable) { + return runAsync(true, runnable); + } + + /** + * A flavor of {@link #runAsync(Function)} that supports read-only (non-committing) transactions. + * + * @param commitWhenDone if {@code true} the transaction is committed on success; if {@code false} it is only closed + * @param runnable code to run with a fresh {@link Transaction} + * @param the type of value returned by the future + * @return a future containing the result; if {@code commitWhenDone} is {@code true}, after the transaction has been committed + */ + @Nonnull + @SuppressWarnings({"PMD.CloseResource", "PMD.UseTryWithResources"}) + public CompletableFuture runAsync(boolean commitWhenDone, + @Nonnull Function> runnable) { + Transaction transaction = openTransaction(); + boolean returnedFuture = false; + try { + CompletableFuture future = runnable.apply(transaction) + .thenCompose(val -> { + if (commitWhenDone) { + return transaction.commit().thenApply(vignore -> val); + } else { + return CompletableFuture.completedFuture(val); + } + }); + returnedFuture = true; + return future.whenComplete((result, exception) -> transaction.close()); + } finally { + if (!returnedFuture) { + transaction.close(); + } + } + } + + /** + * Open a new transaction that will be closed when this runner is closed. + * + * @return a new {@link Transaction} + * @throws RunnerClosed if this runner has already been closed + */ + @Nonnull + public Transaction openTransaction() { + if (closed) { + throw new RunnerClosed(); + } + Transaction transaction = database.createTransaction(); + addTransactionToClose(transaction); + return transaction; + } + + private synchronized void addTransactionToClose(@Nonnull Transaction transaction) { + if (closed) { + transaction.close(); + throw new RunnerClosed(); + } + transactionsToClose.add(transaction); + } + + @Override + public synchronized void close() { + if (closed) { + return; + } + transactionsToClose.forEach(Transaction::close); + transactionsToClose.clear(); + this.closed = true; + } + + /** + * Exception thrown when an operation is attempted on a closed runner. + */ + public static class RunnerClosed extends RuntimeException { + private static final long serialVersionUID = 1L; + + public RunnerClosed() { + super("Runner has been closed"); + } + } +}