From 8edcf53cf4f871db148cf00c46a6f09b1da8ff42 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 18 May 2026 19:11:08 +0200 Subject: [PATCH 1/3] Fix RandomAccessReader leak in OnDiskGraphIndexCompactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit compactLevels() ran batch tasks on a (possibly caller-owned) executor whose worker threads each lazily created a Scratch via a ThreadLocal. Each Scratch holds one GraphSearcher — hence one RandomAccessReader — per source. Only the calling thread's Scratch was closed, so every worker thread's readers leaked for the lifetime of the executor. With a remote-backed reader (HerdDB's RemoteRandomAccessReader) each leaked reader pins an off-heap block buffer; over many compaction cycles this exhausts the memory budget and stalls ingestion (eolivelli/herddb#590). Track every Scratch created on any thread in a ConcurrentLinkedQueue and close them all in a finally block. resolveEntryNodeSource() had a second, smaller leak — getView() opened a View that was never closed — now fixed with try-with-resources. Add testCompactionClosesAllReaders, which wraps each source's ReaderSupplier so it records whether every vended RandomAccessReader was closed, runs a compaction on a caller-owned multi-threaded pool, and asserts no reader is left open once compact() returns. The test fails (5 readers left open) without the fix. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../graph/disk/OnDiskGraphIndexCompactor.java | 202 +++++++++++------ .../disk/TestOnDiskGraphIndexCompactor.java | 213 ++++++++++++++++++ 2 files changed, 348 insertions(+), 67 deletions(-) diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java index 870470036..d737d55c3 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java @@ -18,6 +18,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Path; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -290,7 +291,14 @@ private int[] resolveEntryNodeSource() { // maxLevel source; fall back to any live node that is at maxLevel. for (int s = 0; s < sources.size(); s++) { if (sources.get(s).getMaxLevel() == maxLevel) { - int originalEntry = sources.get(s).getView().entryNode().node; + // try-with-resources: getView() opens a fresh RandomAccessReader that must + // be closed, otherwise it leaks for the lifetime of the underlying storage. + int originalEntry; + try (var entryView = sources.get(s).getView()) { + originalEntry = entryView.entryNode().node; + } catch (IOException e) { + throw new UncheckedIOException(e); + } if (liveNodes.get(s).get(originalEntry)) { return new int[]{s, originalEntry}; } @@ -336,32 +344,81 @@ private void compactLevels(CompactWriter writer, int upperMaxCandidateSize = upperMaxPerSourceTopK * sources.size(); int maxCandidateSize = Math.max(baseMaxCandidateSize, upperMaxCandidateSize); int scratchDegree = Math.max(maxDegrees.get(0), Math.max(1, maxUpperDegree)); - final ThreadLocal threadLocalScratch = ThreadLocal.withInitial(() -> - new Scratch(maxCandidateSize, scratchDegree, dimension, sources, pq) - ); - - for (int level = 0; level < maxDegrees.size(); level++) { - List batches = buildBatches(level); - int searchTopK = Math.max(MIN_SEARCH_TOP_K, ((maxDegrees.get(level) + sources.size() - 1) / sources.size()) * SEARCH_TOP_K_MULTIPLIER); - int beamWidth = Math.max(maxDegrees.get(level), searchTopK) * BEAM_WIDTH_MULTIPLIER; + // Batch tasks run on `executor`, whose worker threads each lazily create their own + // Scratch via this ThreadLocal. compactLevels() cannot reach those worker-thread + // ThreadLocal entries with threadLocalScratch.get()/remove(), so every Scratch is + // registered in `allScratch` and closed in the finally block below. Closing only the + // calling thread's Scratch leaks every worker thread's GraphSearchers — and the + // RandomAccessReaders behind their Views — for the lifetime of the executor. + final Queue allScratch = new ConcurrentLinkedQueue<>(); + final ThreadLocal threadLocalScratch = ThreadLocal.withInitial(() -> { + Scratch scratch = new Scratch(maxCandidateSize, scratchDegree, dimension, sources, pq); + allScratch.add(scratch); + return scratch; + }); - CompactionParams params = new CompactionParams(fusedPQEnabled, compressedPrecision, searchTopK, beamWidth, pq); + boolean compactionSucceeded = false; + try { + for (int level = 0; level < maxDegrees.size(); level++) { + List batches = buildBatches(level); + int searchTopK = Math.max(MIN_SEARCH_TOP_K, ((maxDegrees.get(level) + sources.size() - 1) / sources.size()) * SEARCH_TOP_K_MULTIPLIER); + int beamWidth = Math.max(maxDegrees.get(level), searchTopK) * BEAM_WIDTH_MULTIPLIER; + + CompactionParams params = new CompactionParams(fusedPQEnabled, compressedPrecision, searchTopK, beamWidth, pq); + + if (level == 0) { + log.info("Compacting level 0 (base layer)"); + + ExecutorCompletionService> ecs = + new ExecutorCompletionService<>(executor); + + java.util.function.Consumer submitOne = (bs) -> { + ecs.submit(() -> { + Scratch scratch = threadLocalScratch.get(); + return computeBaseBatch(writer, bs, scratch, params); + }); + }; + + var wropts = EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.READ); + try (FileChannel fc = FileChannel.open(writer.getOutputPath(), wropts)) { + + runBatchesWithBackpressure( + batches, + ecs, + submitOne, + (results) -> { + try { + for (WriteResult r : results) { + ByteBuffer b = r.data; + long pos = r.fileOffset; + while (b.hasRemaining()) { + int n = fc.write(b, pos); + pos += n; + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + progressListener + ); + } - if (level == 0) { - log.info("Compacting level 0 (base layer)"); + writer.offsetAfterInline(); - ExecutorCompletionService> ecs = - new ExecutorCompletionService<>(executor); + } else { + final int lvl = level; + log.info("Compacting upper layer {}", level); - java.util.function.Consumer submitOne = (bs) -> { - ecs.submit(() -> { - Scratch scratch = threadLocalScratch.get(); - return computeBaseBatch(writer, bs, scratch, params); - }); - }; + ExecutorCompletionService> ecs = + new ExecutorCompletionService<>(executor); - var wropts = EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.READ); - try (FileChannel fc = FileChannel.open(writer.getOutputPath(), wropts)) { + java.util.function.Consumer submitOne = (bs) -> { + ecs.submit(() -> { + Scratch scratch = threadLocalScratch.get(); + return computeUpperBatchForLevel(bs, lvl, scratch, params); + }); + }; runBatchesWithBackpressure( batches, @@ -369,13 +426,13 @@ private void compactLevels(CompactWriter writer, submitOne, (results) -> { try { - for (WriteResult r : results) { - ByteBuffer b = r.data; - long pos = r.fileOffset; - while (b.hasRemaining()) { - int n = fc.write(b, pos); - pos += n; - } + for (UpperLayerWriteResult r : results) { + writer.writeUpperLayerNode( + lvl, + r.ordinal, + r.neighbors, + r.pqCode + ); } } catch (IOException e) { throw new RuntimeException(e); @@ -384,49 +441,60 @@ private void compactLevels(CompactWriter writer, progressListener ); } - - writer.offsetAfterInline(); - + } + compactionSucceeded = true; + } finally { + // Drop this thread's ThreadLocal entry; worker-thread entries are unreachable + // here but their Scratch instances are all registered in `allScratch`. + threadLocalScratch.remove(); + if (compactionSucceeded) { + // No primary exception in flight, so a close failure can be surfaced safely. + closeAllScratch(allScratch); } else { - final int lvl = level; - log.info("Compacting upper layer {}", level); - - ExecutorCompletionService> ecs = - new ExecutorCompletionService<>(executor); - - java.util.function.Consumer submitOne = (bs) -> { - ecs.submit(() -> { - Scratch scratch = threadLocalScratch.get(); - return computeUpperBatchForLevel(bs, lvl, scratch, params); - }); - }; + // A primary exception is already propagating; close best-effort so a + // secondary close failure does not mask the original cause. + closeAllScratchQuietly(allScratch); + } + } + } - runBatchesWithBackpressure( - batches, - ecs, - submitOne, - (results) -> { - try { - for (UpperLayerWriteResult r : results) { - writer.writeUpperLayerNode( - lvl, - r.ordinal, - r.neighbors, - r.pqCode - ); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - }, - progressListener - ); + /** + * Closes every {@link Scratch} created during compaction, regardless of which thread + * created it. A close failure on one Scratch does not prevent the others from being + * closed; the first failure is rethrown with any later ones attached as suppressed. + */ + private static void closeAllScratch(Queue allScratch) throws IOException { + IOException firstError = null; + Scratch scratch; + while ((scratch = allScratch.poll()) != null) { + try { + scratch.close(); + } catch (IOException e) { + if (firstError == null) { + firstError = e; + } else { + firstError.addSuppressed(e); + } } } + if (firstError != null) { + throw firstError; + } + } - Scratch s = threadLocalScratch.get(); - s.close(); - threadLocalScratch.remove(); + /** + * Like {@link #closeAllScratch} but never throws — used when a primary exception is + * already propagating, so a close failure is logged instead of masking the cause. + */ + private static void closeAllScratchQuietly(Queue allScratch) { + Scratch scratch; + while ((scratch = allScratch.poll()) != null) { + try { + scratch.close(); + } catch (IOException e) { + log.warn("Failed to close compaction Scratch space", e); + } + } } /** diff --git a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java index 61756f090..0831d1476 100644 --- a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java +++ b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java @@ -19,6 +19,7 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import io.github.jbellis.jvector.TestUtil; +import io.github.jbellis.jvector.disk.RandomAccessReader; import io.github.jbellis.jvector.disk.ReaderSupplier; import io.github.jbellis.jvector.disk.ReaderSupplierFactory; import io.github.jbellis.jvector.disk.SimpleMappedReader; @@ -45,10 +46,13 @@ import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; import java.util.function.IntFunction; import static io.github.jbellis.jvector.TestUtil.createRandomVectors; @@ -866,4 +870,213 @@ public void testDeletionsAndOrdinalMapping() throws Exception { searcher.close(); } + + /** + * Regression test for the compaction reader leak: {@code compactLevels()} ran batch + * tasks on a (possibly caller-owned) executor whose worker threads each lazily created + * a {@code Scratch} — one {@code GraphSearcher}, hence one {@link RandomAccessReader}, + * per source. Only the calling thread's {@code Scratch} was closed, so every worker + * thread's readers leaked for the lifetime of the executor. With a remote-backed + * reader (HerdDB's {@code RemoteRandomAccessReader}) each leaked reader pins an + * off-heap block buffer, eventually exhausting the memory budget. + * + *

The test compacts several FusedPQ sources on a caller-owned multi-threaded + * {@link ForkJoinPool} (so the compactor does not shut it down and worker-thread + * {@code Scratch} instances genuinely outlive {@code compact()}), wrapping every + * source's {@link ReaderSupplier} so each {@link RandomAccessReader} it vends records + * whether it was closed. Once {@code compact()} returns, every reader opened against a + * source — including those created on executor worker threads — must be closed. + */ + @Test + public void testCompactionClosesAllReaders() throws Exception { + // Source graphs test_graph_0..numSources-1 were written by buildFusedPQ() in setup(). + List trackingSuppliers = new ArrayList<>(); + List graphs = new ArrayList<>(); + List liveNodes = new ArrayList<>(); + List remappers = new ArrayList<>(); + + int globalOrdinal = 0; + for (int i = 0; i < numSources; i++) { + Path path = testDirectory.resolve("test_graph_" + i); + TrackingReaderSupplier ts = new TrackingReaderSupplier( + ReaderSupplierFactory.open(path.toAbsolutePath())); + trackingSuppliers.add(ts); + graphs.add(OnDiskGraphIndex.load(ts)); + + Map map = new HashMap<>(numVectorsPerGraph); + for (int n = 0; n < numVectorsPerGraph; n++) { + map.put(n, globalOrdinal++); + } + remappers.add(new OrdinalMapper.MapMapper(map)); + FixedBitSet lives = new FixedBitSet(numVectorsPerGraph); + lives.set(0, numVectorsPerGraph); + liveNodes.add(lives); + } + + // OnDiskGraphIndex.load() opens its readers inside try-with-resources, so all + // readers created so far must already be closed before compaction starts. + int readersBeforeCompact = 0; + for (TrackingReaderSupplier ts : trackingSuppliers) { + readersBeforeCompact += ts.createdCount(); + assertTrue("readers opened by OnDiskGraphIndex.load() must be closed before" + + " compaction (" + ts.openCount() + " still open)", ts.allClosed()); + } + + // Caller-owned pool: the compactor sees executor != null, so ownsExecutor is false + // and it never shuts the pool down. The worker threads — and any Scratch they + // create — therefore outlive compact(); a leaked worker-thread Scratch would leave + // its readers open after compact() returns. + ForkJoinPool pool = new ForkJoinPool(4); + try { + OnDiskGraphIndexCompactor compactor = new OnDiskGraphIndexCompactor( + graphs, liveNodes, remappers, similarityFunction, pool); + Path outputPath = testDirectory.resolve("test_compact_closes_readers"); + compactor.compact(outputPath); + } finally { + pool.shutdown(); + assertTrue("compaction pool did not terminate", + pool.awaitTermination(30, TimeUnit.SECONDS)); + } + + int readersAfterCompact = 0; + for (TrackingReaderSupplier ts : trackingSuppliers) { + readersAfterCompact += ts.createdCount(); + assertTrue("every RandomAccessReader opened against a compaction source must be" + + " closed once compact() returns (" + ts.openCount() + " still open)", + ts.allClosed()); + } + // Guard against a vacuous pass: compaction must actually open source readers + // (Scratch GraphSearchers + resolveEntryNodeSource), otherwise the assertions + // above would hold trivially even if the leak were still present. + assertTrue("expected compaction to open additional source readers", + readersAfterCompact > readersBeforeCompact); + } + + /** + * A {@link ReaderSupplier} that wraps a delegate and records every {@link RandomAccessReader} + * it vends as a {@link TrackingReader}, so a test can assert they were all closed. + */ + private static final class TrackingReaderSupplier implements ReaderSupplier { + private final ReaderSupplier delegate; + private final Queue readers = new ConcurrentLinkedQueue<>(); + + TrackingReaderSupplier(ReaderSupplier delegate) { + this.delegate = delegate; + } + + @Override + public RandomAccessReader get() throws IOException { + TrackingReader r = new TrackingReader(delegate.get()); + readers.add(r); + return r; + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + /** Total number of readers vended so far. */ + int createdCount() { + return readers.size(); + } + + /** Number of vended readers that have not been closed. */ + int openCount() { + int open = 0; + for (TrackingReader r : readers) { + if (!r.isClosed()) { + open++; + } + } + return open; + } + + boolean allClosed() { + return openCount() == 0; + } + } + + /** + * A {@link RandomAccessReader} that delegates every call and records whether + * {@link #close()} has been invoked. + */ + private static final class TrackingReader implements RandomAccessReader { + private final RandomAccessReader delegate; + private volatile boolean closed; + + TrackingReader(RandomAccessReader delegate) { + this.delegate = delegate; + } + + boolean isClosed() { + return closed; + } + + @Override + public void seek(long offset) throws IOException { + delegate.seek(offset); + } + + @Override + public long getPosition() throws IOException { + return delegate.getPosition(); + } + + @Override + public int readInt() throws IOException { + return delegate.readInt(); + } + + @Override + public float readFloat() throws IOException { + return delegate.readFloat(); + } + + @Override + public long readLong() throws IOException { + return delegate.readLong(); + } + + @Override + public void readFully(byte[] bytes) throws IOException { + delegate.readFully(bytes); + } + + @Override + public void readFully(ByteBuffer buffer) throws IOException { + delegate.readFully(buffer); + } + + @Override + public void readFully(float[] floats) throws IOException { + delegate.readFully(floats); + } + + @Override + public void readFully(long[] vector) throws IOException { + delegate.readFully(vector); + } + + @Override + public void read(int[] ints, int offset, int count) throws IOException { + delegate.read(ints, offset, count); + } + + @Override + public void read(float[] floats, int offset, int count) throws IOException { + delegate.read(floats, offset, count); + } + + @Override + public long length() throws IOException { + return delegate.length(); + } + + @Override + public void close() throws IOException { + closed = true; + delegate.close(); + } + } } From bb76e22337e51645759263a6f6e1482642aa2d89 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 18 May 2026 19:44:24 +0200 Subject: [PATCH 2/3] review: close failure-path Scratch leak and harden cleanup Addresses pr-reviewer findings on the compaction reader-leak fix: - compactLevels() now tracks every submitted batch Future and, in its finally block, cancels and awaits all of them (awaitAllTasks) BEFORE draining the Scratch registry. Previously, when compaction failed, runBatchesWithBackpressure propagated immediately while other tasks were still in flight; such a task could run the ThreadLocal supplier and register a fresh Scratch after the drain had finished, leaking it. awaitAllTasks is a cheap no-op on the success path (all tasks already done) and also guarantees no task outlives compactLevels(). - The Scratch constructor now closes any GraphSearchers it already opened if a later source's getView() throws, so a partially built Scratch (never registered for the drain) does not leak readers. - closeAllScratch / closeAllScratchQuietly now also catch RuntimeException so one failing Scratch.close() cannot abort closing the rest. - Add testCompactionFailureClosesAllReaders: injects an IOException from a worker-thread vector read partway through compaction and asserts every vended RandomAccessReader is closed once compact() fails, covering the closeAllScratchQuietly path. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../graph/disk/OnDiskGraphIndexCompactor.java | 94 +++++++++++++-- .../disk/TestOnDiskGraphIndexCompactor.java | 109 +++++++++++++++++- 2 files changed, 186 insertions(+), 17 deletions(-) diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java index d737d55c3..ad04c6a35 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java @@ -356,6 +356,11 @@ private void compactLevels(CompactWriter writer, allScratch.add(scratch); return scratch; }); + // Every batch task submitted to `executor` is tracked here so the finally block can + // quiesce the executor (cancel + await) before draining `allScratch`. Without this + // barrier, when compaction fails an in-flight task that has not yet run the + // ThreadLocal supplier could register a fresh Scratch AFTER the drain — leaking it. + final Queue> submittedFutures = new ConcurrentLinkedQueue<>(); boolean compactionSucceeded = false; try { @@ -373,10 +378,10 @@ private void compactLevels(CompactWriter writer, new ExecutorCompletionService<>(executor); java.util.function.Consumer submitOne = (bs) -> { - ecs.submit(() -> { + submittedFutures.add(ecs.submit(() -> { Scratch scratch = threadLocalScratch.get(); return computeBaseBatch(writer, bs, scratch, params); - }); + })); }; var wropts = EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.READ); @@ -414,10 +419,10 @@ private void compactLevels(CompactWriter writer, new ExecutorCompletionService<>(executor); java.util.function.Consumer submitOne = (bs) -> { - ecs.submit(() -> { + submittedFutures.add(ecs.submit(() -> { Scratch scratch = threadLocalScratch.get(); return computeUpperBatchForLevel(bs, lvl, scratch, params); - }); + })); }; runBatchesWithBackpressure( @@ -444,6 +449,10 @@ private void compactLevels(CompactWriter writer, } compactionSucceeded = true; } finally { + // Quiesce the executor BEFORE draining allScratch: until every submitted task + // has terminated, an in-flight task can still run the threadLocalScratch + // supplier and register a fresh Scratch that the drain below would miss. + awaitAllTasks(submittedFutures); // Drop this thread's ThreadLocal entry; worker-thread entries are unreachable // here but their Scratch instances are all registered in `allScratch`. threadLocalScratch.remove(); @@ -458,18 +467,56 @@ private void compactLevels(CompactWriter writer, } } + /** + * Cancels and waits for every batch task submitted to {@code executor}. This MUST run + * before {@code allScratch} is drained: until a submitted task has terminated it can + * still execute the {@code threadLocalScratch} supplier and {@code allScratch.add(...)} + * a fresh Scratch, which the drain would then miss and leak. + * + *

On the normal-completion path every task is already done, so {@code cancel} is a + * no-op and {@code get} returns immediately — a cheap barrier. On the failure path the + * not-yet-started tasks are cancelled (they never create a Scratch) and the running + * ones are awaited (their Scratch is registered and will be drained). + */ + private static void awaitAllTasks(Queue> submittedFutures) { + boolean interrupted = false; + Future f; + while ((f = submittedFutures.poll()) != null) { + // No-op for already-completed tasks; interrupts the rest so a failed compaction + // need not wait for the whole sliding window to drain naturally. + f.cancel(true); + if (interrupted) { + // Already interrupted: keep cancelling the remainder but do not block. + continue; + } + try { + f.get(); + } catch (CancellationException | ExecutionException ignored) { + // We only need the task to have TERMINATED; its result/failure was already + // handled by runBatchesWithBackpressure (or is irrelevant once cancelled). + } catch (InterruptedException e) { + interrupted = true; + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + /** * Closes every {@link Scratch} created during compaction, regardless of which thread * created it. A close failure on one Scratch does not prevent the others from being * closed; the first failure is rethrown with any later ones attached as suppressed. */ private static void closeAllScratch(Queue allScratch) throws IOException { - IOException firstError = null; + Throwable firstError = null; Scratch scratch; while ((scratch = allScratch.poll()) != null) { try { scratch.close(); - } catch (IOException e) { + } catch (IOException | RuntimeException e) { + // Catch RuntimeException too: a single Scratch failing to close must not + // abort the loop, otherwise every remaining Scratch would leak. if (firstError == null) { firstError = e; } else { @@ -477,8 +524,10 @@ private static void closeAllScratch(Queue allScratch) throws IOExceptio } } } - if (firstError != null) { - throw firstError; + if (firstError instanceof IOException) { + throw (IOException) firstError; + } else if (firstError instanceof RuntimeException) { + throw (RuntimeException) firstError; } } @@ -491,7 +540,8 @@ private static void closeAllScratchQuietly(Queue allScratch) { while ((scratch = allScratch.poll()) != null) { try { scratch.close(); - } catch (IOException e) { + } catch (IOException | RuntimeException e) { + // Broad-ish catch on purpose: one bad close must not abort the rest. log.warn("Failed to close compaction Scratch space", e); } } @@ -1187,9 +1237,29 @@ private static final class Scratch implements AutoCloseable { this.pqCode = (pq == null) ? null : vectorTypeSupport.createByteSequence(pq.getSubspaceCount()); this.gs = new GraphSearcher[sources.size()]; - for (int i = 0; i < sources.size(); i++) { - gs[i] = new GraphSearcher(sources.get(i)); - gs[i].usePruning(false); + // Each GraphSearcher opens a View (hence a RandomAccessReader) on its source. + // If a later source's getView() throws, close the searchers already created so + // their readers are not leaked — this Scratch is never returned to the caller, + // so it would otherwise never be registered for the compactor's drain. + boolean initialised = false; + try { + for (int i = 0; i < sources.size(); i++) { + gs[i] = new GraphSearcher(sources.get(i)); + gs[i].usePruning(false); + } + initialised = true; + } finally { + if (!initialised) { + for (GraphSearcher s : gs) { + if (s != null) { + try { + s.close(); + } catch (IOException | RuntimeException ignored) { + // best-effort cleanup; the original failure propagates + } + } + } + } } } diff --git a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java index 0831d1476..adce64539 100644 --- a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java +++ b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java @@ -52,7 +52,9 @@ import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntFunction; import static io.github.jbellis.jvector.TestUtil.createRandomVectors; @@ -899,7 +901,7 @@ public void testCompactionClosesAllReaders() throws Exception { for (int i = 0; i < numSources; i++) { Path path = testDirectory.resolve("test_graph_" + i); TrackingReaderSupplier ts = new TrackingReaderSupplier( - ReaderSupplierFactory.open(path.toAbsolutePath())); + ReaderSupplierFactory.open(path.toAbsolutePath()), null); trackingSuppliers.add(ts); graphs.add(OnDiskGraphIndex.load(ts)); @@ -952,21 +954,102 @@ public void testCompactionClosesAllReaders() throws Exception { readersAfterCompact > readersBeforeCompact); } + /** + * Regression test for the compaction-failure leak path. When a batch task throws, + * {@code runBatchesWithBackpressure} propagates immediately without awaiting the other + * in-flight tasks, and {@code compactLevels()} runs its {@code closeAllScratchQuietly()} + * cleanup branch. The fix must still close every reader: {@code compactLevels()} cancels + * and awaits all submitted tasks before draining its Scratch registry, so no task can + * register a Scratch after the drain. + * + *

The test injects an {@link IOException} from {@code read(float[], int, int)} once a + * handful of vector reads have run on executor worker threads (i.e. inside + * {@code compactLevels()}), forcing {@code compact()} to fail. Once it returns, every + * {@link RandomAccessReader} vended to a compaction source — including those created on + * worker threads whose tasks were cancelled mid-flight — must be closed. + */ + @Test + public void testCompactionFailureClosesAllReaders() throws Exception { + List trackingSuppliers = new ArrayList<>(); + List graphs = new ArrayList<>(); + List liveNodes = new ArrayList<>(); + List remappers = new ArrayList<>(); + + // After 20 vector reads on executor worker threads (only compactLevels() reads + // vectors on worker threads), every further read(float[]) throws — forcing the + // compaction to fail partway through, exercising the closeAllScratchQuietly() path. + AtomicInteger failCountdown = new AtomicInteger(20); + + int globalOrdinal = 0; + for (int i = 0; i < numSources; i++) { + Path path = testDirectory.resolve("test_graph_" + i); + TrackingReaderSupplier ts = new TrackingReaderSupplier( + ReaderSupplierFactory.open(path.toAbsolutePath()), failCountdown); + trackingSuppliers.add(ts); + graphs.add(OnDiskGraphIndex.load(ts)); + + Map map = new HashMap<>(numVectorsPerGraph); + for (int n = 0; n < numVectorsPerGraph; n++) { + map.put(n, globalOrdinal++); + } + remappers.add(new OrdinalMapper.MapMapper(map)); + FixedBitSet lives = new FixedBitSet(numVectorsPerGraph); + lives.set(0, numVectorsPerGraph); + liveNodes.add(lives); + } + + ForkJoinPool pool = new ForkJoinPool(4); + boolean compactionFailed = false; + try { + OnDiskGraphIndexCompactor compactor = new OnDiskGraphIndexCompactor( + graphs, liveNodes, remappers, similarityFunction, pool); + Path outputPath = testDirectory.resolve("test_compact_failure_closes_readers"); + try { + compactor.compact(outputPath); + } catch (RuntimeException expected) { + // compact() wraps batch-task IOExceptions in a RuntimeException. + compactionFailed = true; + } + } finally { + pool.shutdown(); + assertTrue("compaction pool did not terminate", + pool.awaitTermination(30, TimeUnit.SECONDS)); + } + + assertTrue("the injected read failure should have made compaction fail", + compactionFailed); + + int readersOpened = 0; + for (TrackingReaderSupplier ts : trackingSuppliers) { + readersOpened += ts.createdCount(); + assertTrue("every RandomAccessReader must be closed even when compaction fails" + + " (" + ts.openCount() + " still open)", ts.allClosed()); + } + assertTrue("expected the failed compaction to have opened source readers", + readersOpened > 0); + } + /** * A {@link ReaderSupplier} that wraps a delegate and records every {@link RandomAccessReader} * it vends as a {@link TrackingReader}, so a test can assert they were all closed. + * + *

When {@code failCountdown} is non-null, the vended readers throw an + * {@link IOException} from {@code read(float[], int, int)} once the countdown is + * exhausted by reads issued on executor worker threads — see {@link TrackingReader}. */ private static final class TrackingReaderSupplier implements ReaderSupplier { private final ReaderSupplier delegate; private final Queue readers = new ConcurrentLinkedQueue<>(); + private final AtomicInteger failCountdown; - TrackingReaderSupplier(ReaderSupplier delegate) { + TrackingReaderSupplier(ReaderSupplier delegate, AtomicInteger failCountdown) { this.delegate = delegate; + this.failCountdown = failCountdown; } @Override public RandomAccessReader get() throws IOException { - TrackingReader r = new TrackingReader(delegate.get()); + TrackingReader r = new TrackingReader(delegate.get(), failCountdown); readers.add(r); return r; } @@ -999,20 +1082,35 @@ boolean allClosed() { /** * A {@link RandomAccessReader} that delegates every call and records whether - * {@link #close()} has been invoked. + * {@link #close()} has been invoked. When constructed with a non-null + * {@code failCountdown}, {@code read(float[], int, int)} throws an {@link IOException} + * once the countdown reaches zero — but only for reads issued on an + * {@link ForkJoinWorkerThread}, so the injected failure lands inside + * {@code OnDiskGraphIndexCompactor.compactLevels()} (the only code that reads vectors + * on executor worker threads) and never during the single-threaded setup phase. */ private static final class TrackingReader implements RandomAccessReader { private final RandomAccessReader delegate; + private final AtomicInteger failCountdown; private volatile boolean closed; - TrackingReader(RandomAccessReader delegate) { + TrackingReader(RandomAccessReader delegate, AtomicInteger failCountdown) { this.delegate = delegate; + this.failCountdown = failCountdown; } boolean isClosed() { return closed; } + private void maybeFail() throws IOException { + if (failCountdown != null + && Thread.currentThread() instanceof ForkJoinWorkerThread + && failCountdown.getAndDecrement() <= 0) { + throw new IOException("injected compaction read failure"); + } + } + @Override public void seek(long offset) throws IOException { delegate.seek(offset); @@ -1065,6 +1163,7 @@ public void read(int[] ints, int offset, int count) throws IOException { @Override public void read(float[] floats, int offset, int count) throws IOException { + maybeFail(); delegate.read(floats, offset, count); } From f778e5a4dfb6e4bd4a66cb4b1aae4f8761f947d5 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 18 May 2026 19:51:17 +0200 Subject: [PATCH 3/3] review: make Scratch.close() resilient to a failing GraphSearcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses pr-reviewer iteration 2: Scratch.close() did `for (var s : gs) s.close()`, so a single GraphSearcher.close() (reader close()) throwing would leak the readers behind every remaining searcher in the same Scratch — a residual instance of the leak class this PR exists to fix. Scratch.close() now closes every GraphSearcher best-effort: each close() is wrapped in its own try/catch accumulating a firstError (later failures attached as suppressed), the first failure is rethrown after the loop, and null slots left by a partially-failed constructor are skipped. Also add testCompactionSurvivesReaderCloseFailure, which makes one source's worker-thread readers throw from close() and asserts every other source's readers are still closed after compact(). The test fails (4 readers never closed) against the previous Scratch.close(). NIT: the level-0 and upper-layer onComplete lambdas now wrap their IOException in UncheckedIOException instead of a bare RuntimeException. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../graph/disk/OnDiskGraphIndexCompactor.java | 31 ++++- .../disk/TestOnDiskGraphIndexCompactor.java | 114 +++++++++++++++++- 2 files changed, 135 insertions(+), 10 deletions(-) diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java index ad04c6a35..07b96cc85 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java @@ -402,7 +402,7 @@ private void compactLevels(CompactWriter writer, } } } catch (IOException e) { - throw new RuntimeException(e); + throw new UncheckedIOException(e); } }, progressListener @@ -440,7 +440,7 @@ private void compactLevels(CompactWriter writer, ); } } catch (IOException e) { - throw new RuntimeException(e); + throw new UncheckedIOException(e); } }, progressListener @@ -1264,12 +1264,35 @@ private static final class Scratch implements AutoCloseable { } /** - * Closes all graph searchers and resets the cache. + * Closes all graph searchers and resets the cache. Every searcher is closed even + * if one throws, otherwise a single failing GraphSearcher.close() would leak the + * RandomAccessReaders behind the remaining searchers' Views; the first failure is + * rethrown with any later ones attached as suppressed. */ @Override public void close() throws IOException { - for (var s : gs) s.close(); + Throwable firstError = null; + for (GraphSearcher s : gs) { + // The constructor may leave a trailing null slot if it failed partway. + if (s == null) { + continue; + } + try { + s.close(); + } catch (IOException | RuntimeException e) { + if (firstError == null) { + firstError = e; + } else { + firstError.addSuppressed(e); + } + } + } selectedCache.reset(); + if (firstError instanceof IOException) { + throw (IOException) firstError; + } else if (firstError instanceof RuntimeException) { + throw (RuntimeException) firstError; + } } } diff --git a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java index adce64539..2b1700f0b 100644 --- a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java +++ b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskGraphIndexCompactor.java @@ -54,6 +54,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntFunction; @@ -901,7 +902,7 @@ public void testCompactionClosesAllReaders() throws Exception { for (int i = 0; i < numSources; i++) { Path path = testDirectory.resolve("test_graph_" + i); TrackingReaderSupplier ts = new TrackingReaderSupplier( - ReaderSupplierFactory.open(path.toAbsolutePath()), null); + ReaderSupplierFactory.open(path.toAbsolutePath()), null, null); trackingSuppliers.add(ts); graphs.add(OnDiskGraphIndex.load(ts)); @@ -984,7 +985,7 @@ public void testCompactionFailureClosesAllReaders() throws Exception { for (int i = 0; i < numSources; i++) { Path path = testDirectory.resolve("test_graph_" + i); TrackingReaderSupplier ts = new TrackingReaderSupplier( - ReaderSupplierFactory.open(path.toAbsolutePath()), failCountdown); + ReaderSupplierFactory.open(path.toAbsolutePath()), failCountdown, null); trackingSuppliers.add(ts); graphs.add(OnDiskGraphIndex.load(ts)); @@ -1029,27 +1030,115 @@ public void testCompactionFailureClosesAllReaders() throws Exception { readersOpened > 0); } + /** + * Regression test for intra-{@code Scratch} close resilience. {@code Scratch.close()} + * closes one {@code GraphSearcher} per source; if one {@code GraphSearcher.close()} + * (i.e. its reader's {@code close()}) throws, the remaining searchers must still be + * closed — otherwise their {@link RandomAccessReader}s leak. With a remote-backed + * reader a {@code close()} {@link IOException} is plausible. + * + *

The test makes source 0's readers throw from {@code close()} (only for readers + * created on executor worker threads — i.e. the {@code Scratch} readers, so the + * single-threaded setup and entry-node paths are unaffected), then asserts that after + * the compaction every reader of every other source was still closed, and + * that source 0's readers each had {@code close()} attempted. + */ + @Test + public void testCompactionSurvivesReaderCloseFailure() throws Exception { + List trackingSuppliers = new ArrayList<>(); + List graphs = new ArrayList<>(); + List liveNodes = new ArrayList<>(); + List remappers = new ArrayList<>(); + + // Armed only after OnDiskGraphIndex.load() so the setup-phase readers close cleanly. + AtomicBoolean failOnClose = new AtomicBoolean(false); + + int globalOrdinal = 0; + for (int i = 0; i < numSources; i++) { + Path path = testDirectory.resolve("test_graph_" + i); + // Only source 0's worker-thread readers throw from close(). + TrackingReaderSupplier ts = new TrackingReaderSupplier( + ReaderSupplierFactory.open(path.toAbsolutePath()), null, + i == 0 ? failOnClose : null); + trackingSuppliers.add(ts); + graphs.add(OnDiskGraphIndex.load(ts)); + + Map map = new HashMap<>(numVectorsPerGraph); + for (int n = 0; n < numVectorsPerGraph; n++) { + map.put(n, globalOrdinal++); + } + remappers.add(new OrdinalMapper.MapMapper(map)); + FixedBitSet lives = new FixedBitSet(numVectorsPerGraph); + lives.set(0, numVectorsPerGraph); + liveNodes.add(lives); + } + + // Arm the close failure now that load() (single-threaded) has finished. + failOnClose.set(true); + + ForkJoinPool pool = new ForkJoinPool(4); + boolean compactRaised = false; + try { + OnDiskGraphIndexCompactor compactor = new OnDiskGraphIndexCompactor( + graphs, liveNodes, remappers, similarityFunction, pool); + Path outputPath = testDirectory.resolve("test_compact_close_failure"); + try { + compactor.compact(outputPath); + } catch (RuntimeException expected) { + // closeAllScratch surfaces the injected close() IOException on the + // success path; compact() rewraps it as a RuntimeException. + compactRaised = true; + } + } finally { + pool.shutdown(); + assertTrue("compaction pool did not terminate", + pool.awaitTermination(30, TimeUnit.SECONDS)); + } + + assertTrue("the injected reader close() failure should surface from compact()", + compactRaised); + + // Resilience: a failing close() on source 0 must not prevent the other sources' + // readers from being closed, and source 0's own readers must each have had + // close() attempted. + int readersOpened = 0; + for (int i = 0; i < numSources; i++) { + TrackingReaderSupplier ts = trackingSuppliers.get(i); + readersOpened += ts.createdCount(); + assertTrue("source " + i + " has " + ts.openCount() + " reader(s) whose" + + " close() was never attempted despite a close failure elsewhere", + ts.allClosed()); + } + assertTrue("expected the compaction to have opened source readers", + readersOpened > 0); + } + /** * A {@link ReaderSupplier} that wraps a delegate and records every {@link RandomAccessReader} * it vends as a {@link TrackingReader}, so a test can assert they were all closed. * *

When {@code failCountdown} is non-null, the vended readers throw an * {@link IOException} from {@code read(float[], int, int)} once the countdown is - * exhausted by reads issued on executor worker threads — see {@link TrackingReader}. + * exhausted by reads issued on executor worker threads. When {@code failOnClose} is + * non-null and set, worker-thread-created readers throw an {@link IOException} from + * {@code close()}. See {@link TrackingReader}. */ private static final class TrackingReaderSupplier implements ReaderSupplier { private final ReaderSupplier delegate; private final Queue readers = new ConcurrentLinkedQueue<>(); private final AtomicInteger failCountdown; + private final AtomicBoolean failOnClose; - TrackingReaderSupplier(ReaderSupplier delegate, AtomicInteger failCountdown) { + TrackingReaderSupplier(ReaderSupplier delegate, AtomicInteger failCountdown, + AtomicBoolean failOnClose) { this.delegate = delegate; this.failCountdown = failCountdown; + this.failOnClose = failOnClose; } @Override public RandomAccessReader get() throws IOException { - TrackingReader r = new TrackingReader(delegate.get(), failCountdown); + TrackingReader r = new TrackingReader(delegate.get(), failCountdown, failOnClose); readers.add(r); return r; } @@ -1088,15 +1177,25 @@ boolean allClosed() { * {@link ForkJoinWorkerThread}, so the injected failure lands inside * {@code OnDiskGraphIndexCompactor.compactLevels()} (the only code that reads vectors * on executor worker threads) and never during the single-threaded setup phase. + * + *

When constructed with a non-null {@code failOnClose} that is set, {@link #close()} + * throws an {@link IOException} — but only for readers created on a worker thread (the + * {@code Scratch} readers), so the setup phase and the calling-thread entry-node path + * close cleanly. The delegate is always closed first, so no real resource leaks. */ private static final class TrackingReader implements RandomAccessReader { private final RandomAccessReader delegate; private final AtomicInteger failCountdown; + private final AtomicBoolean failOnClose; + private final boolean createdByWorkerThread; private volatile boolean closed; - TrackingReader(RandomAccessReader delegate, AtomicInteger failCountdown) { + TrackingReader(RandomAccessReader delegate, AtomicInteger failCountdown, + AtomicBoolean failOnClose) { this.delegate = delegate; this.failCountdown = failCountdown; + this.failOnClose = failOnClose; + this.createdByWorkerThread = Thread.currentThread() instanceof ForkJoinWorkerThread; } boolean isClosed() { @@ -1176,6 +1275,9 @@ public long length() throws IOException { public void close() throws IOException { closed = true; delegate.close(); + if (failOnClose != null && failOnClose.get() && createdByWorkerThread) { + throw new IOException("injected reader close failure"); + } } } }