diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java index 870470036..07b96cc85 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java @@ -18,6 +18,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Path; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -290,7 +291,14 @@ private int[] resolveEntryNodeSource() { // maxLevel source; fall back to any live node that is at maxLevel. for (int s = 0; s < sources.size(); s++) { if (sources.get(s).getMaxLevel() == maxLevel) { - int originalEntry = sources.get(s).getView().entryNode().node; + // try-with-resources: getView() opens a fresh RandomAccessReader that must + // be closed, otherwise it leaks for the lifetime of the underlying storage. + int originalEntry; + try (var entryView = sources.get(s).getView()) { + originalEntry = entryView.entryNode().node; + } catch (IOException e) { + throw new UncheckedIOException(e); + } if (liveNodes.get(s).get(originalEntry)) { return new int[]{s, originalEntry}; } @@ -336,32 +344,86 @@ private void compactLevels(CompactWriter writer, int upperMaxCandidateSize = upperMaxPerSourceTopK * sources.size(); int maxCandidateSize = Math.max(baseMaxCandidateSize, upperMaxCandidateSize); int scratchDegree = Math.max(maxDegrees.get(0), Math.max(1, maxUpperDegree)); - final ThreadLocal threadLocalScratch = ThreadLocal.withInitial(() -> - new Scratch(maxCandidateSize, scratchDegree, dimension, sources, pq) - ); - - for (int level = 0; level < maxDegrees.size(); level++) { - List batches = buildBatches(level); - int searchTopK = Math.max(MIN_SEARCH_TOP_K, ((maxDegrees.get(level) + sources.size() - 1) / sources.size()) * SEARCH_TOP_K_MULTIPLIER); - int beamWidth = Math.max(maxDegrees.get(level), searchTopK) * BEAM_WIDTH_MULTIPLIER; - - CompactionParams params = new CompactionParams(fusedPQEnabled, compressedPrecision, searchTopK, beamWidth, pq); + // Batch tasks run on `executor`, whose worker threads each lazily create their own + // Scratch via this ThreadLocal. compactLevels() cannot reach those worker-thread + // ThreadLocal entries with threadLocalScratch.get()/remove(), so every Scratch is + // registered in `allScratch` and closed in the finally block below. Closing only the + // calling thread's Scratch leaks every worker thread's GraphSearchers — and the + // RandomAccessReaders behind their Views — for the lifetime of the executor. + final Queue allScratch = new ConcurrentLinkedQueue<>(); + final ThreadLocal threadLocalScratch = ThreadLocal.withInitial(() -> { + Scratch scratch = new Scratch(maxCandidateSize, scratchDegree, dimension, sources, pq); + allScratch.add(scratch); + return scratch; + }); + // Every batch task submitted to `executor` is tracked here so the finally block can + // quiesce the executor (cancel + await) before draining `allScratch`. Without this + // barrier, when compaction fails an in-flight task that has not yet run the + // ThreadLocal supplier could register a fresh Scratch AFTER the drain — leaking it. + final Queue> submittedFutures = new ConcurrentLinkedQueue<>(); + + boolean compactionSucceeded = false; + try { + for (int level = 0; level < maxDegrees.size(); level++) { + List batches = buildBatches(level); + int searchTopK = Math.max(MIN_SEARCH_TOP_K, ((maxDegrees.get(level) + sources.size() - 1) / sources.size()) * SEARCH_TOP_K_MULTIPLIER); + int beamWidth = Math.max(maxDegrees.get(level), searchTopK) * BEAM_WIDTH_MULTIPLIER; + + CompactionParams params = new CompactionParams(fusedPQEnabled, compressedPrecision, searchTopK, beamWidth, pq); + + if (level == 0) { + log.info("Compacting level 0 (base layer)"); + + ExecutorCompletionService> ecs = + new ExecutorCompletionService<>(executor); + + java.util.function.Consumer submitOne = (bs) -> { + submittedFutures.add(ecs.submit(() -> { + Scratch scratch = threadLocalScratch.get(); + return computeBaseBatch(writer, bs, scratch, params); + })); + }; + + var wropts = EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.READ); + try (FileChannel fc = FileChannel.open(writer.getOutputPath(), wropts)) { + + runBatchesWithBackpressure( + batches, + ecs, + submitOne, + (results) -> { + try { + for (WriteResult r : results) { + ByteBuffer b = r.data; + long pos = r.fileOffset; + while (b.hasRemaining()) { + int n = fc.write(b, pos); + pos += n; + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, + progressListener + ); + } - if (level == 0) { - log.info("Compacting level 0 (base layer)"); + writer.offsetAfterInline(); - ExecutorCompletionService> ecs = - new ExecutorCompletionService<>(executor); + } else { + final int lvl = level; + log.info("Compacting upper layer {}", level); - java.util.function.Consumer submitOne = (bs) -> { - ecs.submit(() -> { - Scratch scratch = threadLocalScratch.get(); - return computeBaseBatch(writer, bs, scratch, params); - }); - }; + ExecutorCompletionService> ecs = + new ExecutorCompletionService<>(executor); - var wropts = EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.READ); - try (FileChannel fc = FileChannel.open(writer.getOutputPath(), wropts)) { + java.util.function.Consumer submitOne = (bs) -> { + submittedFutures.add(ecs.submit(() -> { + Scratch scratch = threadLocalScratch.get(); + return computeUpperBatchForLevel(bs, lvl, scratch, params); + })); + }; runBatchesWithBackpressure( batches, @@ -369,64 +431,120 @@ private void compactLevels(CompactWriter writer, submitOne, (results) -> { try { - for (WriteResult r : results) { - ByteBuffer b = r.data; - long pos = r.fileOffset; - while (b.hasRemaining()) { - int n = fc.write(b, pos); - pos += n; - } + for (UpperLayerWriteResult r : results) { + writer.writeUpperLayerNode( + lvl, + r.ordinal, + r.neighbors, + r.pqCode + ); } } catch (IOException e) { - throw new RuntimeException(e); + throw new UncheckedIOException(e); } }, progressListener ); } - - writer.offsetAfterInline(); - + } + compactionSucceeded = true; + } finally { + // Quiesce the executor BEFORE draining allScratch: until every submitted task + // has terminated, an in-flight task can still run the threadLocalScratch + // supplier and register a fresh Scratch that the drain below would miss. + awaitAllTasks(submittedFutures); + // Drop this thread's ThreadLocal entry; worker-thread entries are unreachable + // here but their Scratch instances are all registered in `allScratch`. + threadLocalScratch.remove(); + if (compactionSucceeded) { + // No primary exception in flight, so a close failure can be surfaced safely. + closeAllScratch(allScratch); } else { - final int lvl = level; - log.info("Compacting upper layer {}", level); - - ExecutorCompletionService> ecs = - new ExecutorCompletionService<>(executor); + // A primary exception is already propagating; close best-effort so a + // secondary close failure does not mask the original cause. + closeAllScratchQuietly(allScratch); + } + } + } - java.util.function.Consumer submitOne = (bs) -> { - ecs.submit(() -> { - Scratch scratch = threadLocalScratch.get(); - return computeUpperBatchForLevel(bs, lvl, scratch, params); - }); - }; + /** + * Cancels and waits for every batch task submitted to {@code executor}. This MUST run + * before {@code allScratch} is drained: until a submitted task has terminated it can + * still execute the {@code threadLocalScratch} supplier and {@code allScratch.add(...)} + * a fresh Scratch, which the drain would then miss and leak. + * + *

On the normal-completion path every task is already done, so {@code cancel} is a + * no-op and {@code get} returns immediately — a cheap barrier. On the failure path the + * not-yet-started tasks are cancelled (they never create a Scratch) and the running + * ones are awaited (their Scratch is registered and will be drained). + */ + private static void awaitAllTasks(Queue> submittedFutures) { + boolean interrupted = false; + Future f; + while ((f = submittedFutures.poll()) != null) { + // No-op for already-completed tasks; interrupts the rest so a failed compaction + // need not wait for the whole sliding window to drain naturally. + f.cancel(true); + if (interrupted) { + // Already interrupted: keep cancelling the remainder but do not block. + continue; + } + try { + f.get(); + } catch (CancellationException | ExecutionException ignored) { + // We only need the task to have TERMINATED; its result/failure was already + // handled by runBatchesWithBackpressure (or is irrelevant once cancelled). + } catch (InterruptedException e) { + interrupted = true; + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + } - runBatchesWithBackpressure( - batches, - ecs, - submitOne, - (results) -> { - try { - for (UpperLayerWriteResult r : results) { - writer.writeUpperLayerNode( - lvl, - r.ordinal, - r.neighbors, - r.pqCode - ); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - }, - progressListener - ); + /** + * Closes every {@link Scratch} created during compaction, regardless of which thread + * created it. A close failure on one Scratch does not prevent the others from being + * closed; the first failure is rethrown with any later ones attached as suppressed. + */ + private static void closeAllScratch(Queue allScratch) throws IOException { + Throwable firstError = null; + Scratch scratch; + while ((scratch = allScratch.poll()) != null) { + try { + scratch.close(); + } catch (IOException | RuntimeException e) { + // Catch RuntimeException too: a single Scratch failing to close must not + // abort the loop, otherwise every remaining Scratch would leak. + if (firstError == null) { + firstError = e; + } else { + firstError.addSuppressed(e); + } } } + if (firstError instanceof IOException) { + throw (IOException) firstError; + } else if (firstError instanceof RuntimeException) { + throw (RuntimeException) firstError; + } + } - Scratch s = threadLocalScratch.get(); - s.close(); - threadLocalScratch.remove(); + /** + * Like {@link #closeAllScratch} but never throws — used when a primary exception is + * already propagating, so a close failure is logged instead of masking the cause. + */ + private static void closeAllScratchQuietly(Queue allScratch) { + Scratch scratch; + while ((scratch = allScratch.poll()) != null) { + try { + scratch.close(); + } catch (IOException | RuntimeException e) { + // Broad-ish catch on purpose: one bad close must not abort the rest. + log.warn("Failed to close compaction Scratch space", e); + } + } } /** @@ -1119,19 +1237,62 @@ private static final class Scratch implements AutoCloseable { this.pqCode = (pq == null) ? null : vectorTypeSupport.createByteSequence(pq.getSubspaceCount()); this.gs = new GraphSearcher[sources.size()]; - for (int i = 0; i < sources.size(); i++) { - gs[i] = new GraphSearcher(sources.get(i)); - gs[i].usePruning(false); + // Each GraphSearcher opens a View (hence a RandomAccessReader) on its source. + // If a later source's getView() throws, close the searchers already created so + // their readers are not leaked — this Scratch is never returned to the caller, + // so it would otherwise never be registered for the compactor's drain. + boolean initialised = false; + try { + for (int i = 0; i < sources.size(); i++) { + gs[i] = new GraphSearcher(sources.get(i)); + gs[i].usePruning(false); + } + initialised = true; + } finally { + if (!initialised) { + for (GraphSearcher s : gs) { + if (s != null) { + try { + s.close(); + } catch (IOException | RuntimeException ignored) { + // best-effort cleanup; the original failure propagates + } + } + } + } } } /** - * Closes all graph searchers and resets the cache. + * Closes all graph searchers and resets the cache. Every searcher is closed even + * if one throws, otherwise a single failing GraphSearcher.close() would leak the + * RandomAccessReaders behind the remaining searchers' Views; the first failure is + * rethrown with any later ones attached as suppressed. */ @Override public void close() throws IOException { - for (var s : gs) s.close(); + Throwable firstError = null; + for (GraphSearcher s : gs) { + // The constructor may leave a trailing null slot if it failed partway. + if (s == null) { + continue; + } + try { + s.close(); + } catch (IOException | RuntimeException e) { + if (firstError == null) { + firstError = e; + } else { + firstError.addSuppressed(e); + } + } + } selectedCache.reset(); + if (firstError instanceof IOException) { + throw (IOException) firstError; + } else if (firstError instanceof RuntimeException) { + throw (RuntimeException) firstError; + } } } diff --git a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java index 61756f090..2b1700f0b 100644 --- a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java +++ b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java @@ -19,6 +19,7 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import io.github.jbellis.jvector.TestUtil; +import io.github.jbellis.jvector.disk.RandomAccessReader; import io.github.jbellis.jvector.disk.ReaderSupplier; import io.github.jbellis.jvector.disk.ReaderSupplierFactory; import io.github.jbellis.jvector.disk.SimpleMappedReader; @@ -45,10 +46,16 @@ import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntFunction; import static io.github.jbellis.jvector.TestUtil.createRandomVectors; @@ -866,4 +873,411 @@ public void testDeletionsAndOrdinalMapping() throws Exception { searcher.close(); } + + /** + * Regression test for the compaction reader leak: {@code compactLevels()} ran batch + * tasks on a (possibly caller-owned) executor whose worker threads each lazily created + * a {@code Scratch} — one {@code GraphSearcher}, hence one {@link RandomAccessReader}, + * per source. Only the calling thread's {@code Scratch} was closed, so every worker + * thread's readers leaked for the lifetime of the executor. With a remote-backed + * reader (HerdDB's {@code RemoteRandomAccessReader}) each leaked reader pins an + * off-heap block buffer, eventually exhausting the memory budget. + * + *

The test compacts several FusedPQ sources on a caller-owned multi-threaded + * {@link ForkJoinPool} (so the compactor does not shut it down and worker-thread + * {@code Scratch} instances genuinely outlive {@code compact()}), wrapping every + * source's {@link ReaderSupplier} so each {@link RandomAccessReader} it vends records + * whether it was closed. Once {@code compact()} returns, every reader opened against a + * source — including those created on executor worker threads — must be closed. + */ + @Test + public void testCompactionClosesAllReaders() throws Exception { + // Source graphs test_graph_0..numSources-1 were written by buildFusedPQ() in setup(). + List trackingSuppliers = new ArrayList<>(); + List graphs = new ArrayList<>(); + List liveNodes = new ArrayList<>(); + List remappers = new ArrayList<>(); + + int globalOrdinal = 0; + for (int i = 0; i < numSources; i++) { + Path path = testDirectory.resolve("test_graph_" + i); + TrackingReaderSupplier ts = new TrackingReaderSupplier( + ReaderSupplierFactory.open(path.toAbsolutePath()), null, null); + trackingSuppliers.add(ts); + graphs.add(OnDiskGraphIndex.load(ts)); + + Map map = new HashMap<>(numVectorsPerGraph); + for (int n = 0; n < numVectorsPerGraph; n++) { + map.put(n, globalOrdinal++); + } + remappers.add(new OrdinalMapper.MapMapper(map)); + FixedBitSet lives = new FixedBitSet(numVectorsPerGraph); + lives.set(0, numVectorsPerGraph); + liveNodes.add(lives); + } + + // OnDiskGraphIndex.load() opens its readers inside try-with-resources, so all + // readers created so far must already be closed before compaction starts. + int readersBeforeCompact = 0; + for (TrackingReaderSupplier ts : trackingSuppliers) { + readersBeforeCompact += ts.createdCount(); + assertTrue("readers opened by OnDiskGraphIndex.load() must be closed before" + + " compaction (" + ts.openCount() + " still open)", ts.allClosed()); + } + + // Caller-owned pool: the compactor sees executor != null, so ownsExecutor is false + // and it never shuts the pool down. The worker threads — and any Scratch they + // create — therefore outlive compact(); a leaked worker-thread Scratch would leave + // its readers open after compact() returns. + ForkJoinPool pool = new ForkJoinPool(4); + try { + OnDiskGraphIndexCompactor compactor = new OnDiskGraphIndexCompactor( + graphs, liveNodes, remappers, similarityFunction, pool); + Path outputPath = testDirectory.resolve("test_compact_closes_readers"); + compactor.compact(outputPath); + } finally { + pool.shutdown(); + assertTrue("compaction pool did not terminate", + pool.awaitTermination(30, TimeUnit.SECONDS)); + } + + int readersAfterCompact = 0; + for (TrackingReaderSupplier ts : trackingSuppliers) { + readersAfterCompact += ts.createdCount(); + assertTrue("every RandomAccessReader opened against a compaction source must be" + + " closed once compact() returns (" + ts.openCount() + " still open)", + ts.allClosed()); + } + // Guard against a vacuous pass: compaction must actually open source readers + // (Scratch GraphSearchers + resolveEntryNodeSource), otherwise the assertions + // above would hold trivially even if the leak were still present. + assertTrue("expected compaction to open additional source readers", + readersAfterCompact > readersBeforeCompact); + } + + /** + * Regression test for the compaction-failure leak path. When a batch task throws, + * {@code runBatchesWithBackpressure} propagates immediately without awaiting the other + * in-flight tasks, and {@code compactLevels()} runs its {@code closeAllScratchQuietly()} + * cleanup branch. The fix must still close every reader: {@code compactLevels()} cancels + * and awaits all submitted tasks before draining its Scratch registry, so no task can + * register a Scratch after the drain. + * + *

The test injects an {@link IOException} from {@code read(float[], int, int)} once a + * handful of vector reads have run on executor worker threads (i.e. inside + * {@code compactLevels()}), forcing {@code compact()} to fail. Once it returns, every + * {@link RandomAccessReader} vended to a compaction source — including those created on + * worker threads whose tasks were cancelled mid-flight — must be closed. + */ + @Test + public void testCompactionFailureClosesAllReaders() throws Exception { + List trackingSuppliers = new ArrayList<>(); + List graphs = new ArrayList<>(); + List liveNodes = new ArrayList<>(); + List remappers = new ArrayList<>(); + + // After 20 vector reads on executor worker threads (only compactLevels() reads + // vectors on worker threads), every further read(float[]) throws — forcing the + // compaction to fail partway through, exercising the closeAllScratchQuietly() path. + AtomicInteger failCountdown = new AtomicInteger(20); + + int globalOrdinal = 0; + for (int i = 0; i < numSources; i++) { + Path path = testDirectory.resolve("test_graph_" + i); + TrackingReaderSupplier ts = new TrackingReaderSupplier( + ReaderSupplierFactory.open(path.toAbsolutePath()), failCountdown, null); + trackingSuppliers.add(ts); + graphs.add(OnDiskGraphIndex.load(ts)); + + Map map = new HashMap<>(numVectorsPerGraph); + for (int n = 0; n < numVectorsPerGraph; n++) { + map.put(n, globalOrdinal++); + } + remappers.add(new OrdinalMapper.MapMapper(map)); + FixedBitSet lives = new FixedBitSet(numVectorsPerGraph); + lives.set(0, numVectorsPerGraph); + liveNodes.add(lives); + } + + ForkJoinPool pool = new ForkJoinPool(4); + boolean compactionFailed = false; + try { + OnDiskGraphIndexCompactor compactor = new OnDiskGraphIndexCompactor( + graphs, liveNodes, remappers, similarityFunction, pool); + Path outputPath = testDirectory.resolve("test_compact_failure_closes_readers"); + try { + compactor.compact(outputPath); + } catch (RuntimeException expected) { + // compact() wraps batch-task IOExceptions in a RuntimeException. + compactionFailed = true; + } + } finally { + pool.shutdown(); + assertTrue("compaction pool did not terminate", + pool.awaitTermination(30, TimeUnit.SECONDS)); + } + + assertTrue("the injected read failure should have made compaction fail", + compactionFailed); + + int readersOpened = 0; + for (TrackingReaderSupplier ts : trackingSuppliers) { + readersOpened += ts.createdCount(); + assertTrue("every RandomAccessReader must be closed even when compaction fails" + + " (" + ts.openCount() + " still open)", ts.allClosed()); + } + assertTrue("expected the failed compaction to have opened source readers", + readersOpened > 0); + } + + /** + * Regression test for intra-{@code Scratch} close resilience. {@code Scratch.close()} + * closes one {@code GraphSearcher} per source; if one {@code GraphSearcher.close()} + * (i.e. its reader's {@code close()}) throws, the remaining searchers must still be + * closed — otherwise their {@link RandomAccessReader}s leak. With a remote-backed + * reader a {@code close()} {@link IOException} is plausible. + * + *

The test makes source 0's readers throw from {@code close()} (only for readers + * created on executor worker threads — i.e. the {@code Scratch} readers, so the + * single-threaded setup and entry-node paths are unaffected), then asserts that after + * the compaction every reader of every other source was still closed, and + * that source 0's readers each had {@code close()} attempted. + */ + @Test + public void testCompactionSurvivesReaderCloseFailure() throws Exception { + List trackingSuppliers = new ArrayList<>(); + List graphs = new ArrayList<>(); + List liveNodes = new ArrayList<>(); + List remappers = new ArrayList<>(); + + // Armed only after OnDiskGraphIndex.load() so the setup-phase readers close cleanly. + AtomicBoolean failOnClose = new AtomicBoolean(false); + + int globalOrdinal = 0; + for (int i = 0; i < numSources; i++) { + Path path = testDirectory.resolve("test_graph_" + i); + // Only source 0's worker-thread readers throw from close(). + TrackingReaderSupplier ts = new TrackingReaderSupplier( + ReaderSupplierFactory.open(path.toAbsolutePath()), null, + i == 0 ? failOnClose : null); + trackingSuppliers.add(ts); + graphs.add(OnDiskGraphIndex.load(ts)); + + Map map = new HashMap<>(numVectorsPerGraph); + for (int n = 0; n < numVectorsPerGraph; n++) { + map.put(n, globalOrdinal++); + } + remappers.add(new OrdinalMapper.MapMapper(map)); + FixedBitSet lives = new FixedBitSet(numVectorsPerGraph); + lives.set(0, numVectorsPerGraph); + liveNodes.add(lives); + } + + // Arm the close failure now that load() (single-threaded) has finished. + failOnClose.set(true); + + ForkJoinPool pool = new ForkJoinPool(4); + boolean compactRaised = false; + try { + OnDiskGraphIndexCompactor compactor = new OnDiskGraphIndexCompactor( + graphs, liveNodes, remappers, similarityFunction, pool); + Path outputPath = testDirectory.resolve("test_compact_close_failure"); + try { + compactor.compact(outputPath); + } catch (RuntimeException expected) { + // closeAllScratch surfaces the injected close() IOException on the + // success path; compact() rewraps it as a RuntimeException. + compactRaised = true; + } + } finally { + pool.shutdown(); + assertTrue("compaction pool did not terminate", + pool.awaitTermination(30, TimeUnit.SECONDS)); + } + + assertTrue("the injected reader close() failure should surface from compact()", + compactRaised); + + // Resilience: a failing close() on source 0 must not prevent the other sources' + // readers from being closed, and source 0's own readers must each have had + // close() attempted. + int readersOpened = 0; + for (int i = 0; i < numSources; i++) { + TrackingReaderSupplier ts = trackingSuppliers.get(i); + readersOpened += ts.createdCount(); + assertTrue("source " + i + " has " + ts.openCount() + " reader(s) whose" + + " close() was never attempted despite a close failure elsewhere", + ts.allClosed()); + } + assertTrue("expected the compaction to have opened source readers", + readersOpened > 0); + } + + /** + * A {@link ReaderSupplier} that wraps a delegate and records every {@link RandomAccessReader} + * it vends as a {@link TrackingReader}, so a test can assert they were all closed. + * + *

When {@code failCountdown} is non-null, the vended readers throw an + * {@link IOException} from {@code read(float[], int, int)} once the countdown is + * exhausted by reads issued on executor worker threads. When {@code failOnClose} is + * non-null and set, worker-thread-created readers throw an {@link IOException} from + * {@code close()}. See {@link TrackingReader}. + */ + private static final class TrackingReaderSupplier implements ReaderSupplier { + private final ReaderSupplier delegate; + private final Queue readers = new ConcurrentLinkedQueue<>(); + private final AtomicInteger failCountdown; + private final AtomicBoolean failOnClose; + + TrackingReaderSupplier(ReaderSupplier delegate, AtomicInteger failCountdown, + AtomicBoolean failOnClose) { + this.delegate = delegate; + this.failCountdown = failCountdown; + this.failOnClose = failOnClose; + } + + @Override + public RandomAccessReader get() throws IOException { + TrackingReader r = new TrackingReader(delegate.get(), failCountdown, failOnClose); + readers.add(r); + return r; + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + /** Total number of readers vended so far. */ + int createdCount() { + return readers.size(); + } + + /** Number of vended readers that have not been closed. */ + int openCount() { + int open = 0; + for (TrackingReader r : readers) { + if (!r.isClosed()) { + open++; + } + } + return open; + } + + boolean allClosed() { + return openCount() == 0; + } + } + + /** + * A {@link RandomAccessReader} that delegates every call and records whether + * {@link #close()} has been invoked. When constructed with a non-null + * {@code failCountdown}, {@code read(float[], int, int)} throws an {@link IOException} + * once the countdown reaches zero — but only for reads issued on an + * {@link ForkJoinWorkerThread}, so the injected failure lands inside + * {@code OnDiskGraphIndexCompactor.compactLevels()} (the only code that reads vectors + * on executor worker threads) and never during the single-threaded setup phase. + * + *

When constructed with a non-null {@code failOnClose} that is set, {@link #close()} + * throws an {@link IOException} — but only for readers created on a worker thread (the + * {@code Scratch} readers), so the setup phase and the calling-thread entry-node path + * close cleanly. The delegate is always closed first, so no real resource leaks. + */ + private static final class TrackingReader implements RandomAccessReader { + private final RandomAccessReader delegate; + private final AtomicInteger failCountdown; + private final AtomicBoolean failOnClose; + private final boolean createdByWorkerThread; + private volatile boolean closed; + + TrackingReader(RandomAccessReader delegate, AtomicInteger failCountdown, + AtomicBoolean failOnClose) { + this.delegate = delegate; + this.failCountdown = failCountdown; + this.failOnClose = failOnClose; + this.createdByWorkerThread = Thread.currentThread() instanceof ForkJoinWorkerThread; + } + + boolean isClosed() { + return closed; + } + + private void maybeFail() throws IOException { + if (failCountdown != null + && Thread.currentThread() instanceof ForkJoinWorkerThread + && failCountdown.getAndDecrement() <= 0) { + throw new IOException("injected compaction read failure"); + } + } + + @Override + public void seek(long offset) throws IOException { + delegate.seek(offset); + } + + @Override + public long getPosition() throws IOException { + return delegate.getPosition(); + } + + @Override + public int readInt() throws IOException { + return delegate.readInt(); + } + + @Override + public float readFloat() throws IOException { + return delegate.readFloat(); + } + + @Override + public long readLong() throws IOException { + return delegate.readLong(); + } + + @Override + public void readFully(byte[] bytes) throws IOException { + delegate.readFully(bytes); + } + + @Override + public void readFully(ByteBuffer buffer) throws IOException { + delegate.readFully(buffer); + } + + @Override + public void readFully(float[] floats) throws IOException { + delegate.readFully(floats); + } + + @Override + public void readFully(long[] vector) throws IOException { + delegate.readFully(vector); + } + + @Override + public void read(int[] ints, int offset, int count) throws IOException { + delegate.read(ints, offset, count); + } + + @Override + public void read(float[] floats, int offset, int count) throws IOException { + maybeFail(); + delegate.read(floats, offset, count); + } + + @Override + public long length() throws IOException { + return delegate.length(); + } + + @Override + public void close() throws IOException { + closed = true; + delegate.close(); + if (failOnClose != null && failOnClose.get() && createdByWorkerThread) { + throw new IOException("injected reader close failure"); + } + } + } }