Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -290,7 +291,14 @@ private int[] resolveEntryNodeSource() {
// maxLevel source; fall back to any live node that is at maxLevel.
for (int s = 0; s < sources.size(); s++) {
if (sources.get(s).getMaxLevel() == maxLevel) {
int originalEntry = sources.get(s).getView().entryNode().node;
// try-with-resources: getView() opens a fresh RandomAccessReader that must
// be closed, otherwise it leaks for the lifetime of the underlying storage.
int originalEntry;
try (var entryView = sources.get(s).getView()) {
originalEntry = entryView.entryNode().node;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
if (liveNodes.get(s).get(originalEntry)) {
return new int[]{s, originalEntry};
}
Expand Down Expand Up @@ -336,97 +344,207 @@ private void compactLevels(CompactWriter writer,
int upperMaxCandidateSize = upperMaxPerSourceTopK * sources.size();
int maxCandidateSize = Math.max(baseMaxCandidateSize, upperMaxCandidateSize);
int scratchDegree = Math.max(maxDegrees.get(0), Math.max(1, maxUpperDegree));
final ThreadLocal<Scratch> threadLocalScratch = ThreadLocal.withInitial(() ->
new Scratch(maxCandidateSize, scratchDegree, dimension, sources, pq)
);

for (int level = 0; level < maxDegrees.size(); level++) {
List<BatchSpec> batches = buildBatches(level);
int searchTopK = Math.max(MIN_SEARCH_TOP_K, ((maxDegrees.get(level) + sources.size() - 1) / sources.size()) * SEARCH_TOP_K_MULTIPLIER);
int beamWidth = Math.max(maxDegrees.get(level), searchTopK) * BEAM_WIDTH_MULTIPLIER;

CompactionParams params = new CompactionParams(fusedPQEnabled, compressedPrecision, searchTopK, beamWidth, pq);
// Batch tasks run on `executor`, whose worker threads each lazily create their own
// Scratch via this ThreadLocal. compactLevels() cannot reach those worker-thread
// ThreadLocal entries with threadLocalScratch.get()/remove(), so every Scratch is
// registered in `allScratch` and closed in the finally block below. Closing only the
// calling thread's Scratch leaks every worker thread's GraphSearchers — and the
// RandomAccessReaders behind their Views — for the lifetime of the executor.
final Queue<Scratch> allScratch = new ConcurrentLinkedQueue<>();
final ThreadLocal<Scratch> threadLocalScratch = ThreadLocal.withInitial(() -> {
Scratch scratch = new Scratch(maxCandidateSize, scratchDegree, dimension, sources, pq);
allScratch.add(scratch);
return scratch;
});
// Every batch task submitted to `executor` is tracked here so the finally block can
// quiesce the executor (cancel + await) before draining `allScratch`. Without this
// barrier, when compaction fails an in-flight task that has not yet run the
// ThreadLocal supplier could register a fresh Scratch AFTER the drain — leaking it.
final Queue<Future<?>> submittedFutures = new ConcurrentLinkedQueue<>();

boolean compactionSucceeded = false;
try {
for (int level = 0; level < maxDegrees.size(); level++) {
List<BatchSpec> batches = buildBatches(level);
int searchTopK = Math.max(MIN_SEARCH_TOP_K, ((maxDegrees.get(level) + sources.size() - 1) / sources.size()) * SEARCH_TOP_K_MULTIPLIER);
int beamWidth = Math.max(maxDegrees.get(level), searchTopK) * BEAM_WIDTH_MULTIPLIER;

CompactionParams params = new CompactionParams(fusedPQEnabled, compressedPrecision, searchTopK, beamWidth, pq);

if (level == 0) {
log.info("Compacting level 0 (base layer)");

ExecutorCompletionService<List<WriteResult>> ecs =
new ExecutorCompletionService<>(executor);

java.util.function.Consumer<BatchSpec> submitOne = (bs) -> {
submittedFutures.add(ecs.submit(() -> {
Scratch scratch = threadLocalScratch.get();
return computeBaseBatch(writer, bs, scratch, params);
}));
};

var wropts = EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.READ);
try (FileChannel fc = FileChannel.open(writer.getOutputPath(), wropts)) {

runBatchesWithBackpressure(
batches,
ecs,
submitOne,
(results) -> {
try {
for (WriteResult r : results) {
ByteBuffer b = r.data;
long pos = r.fileOffset;
while (b.hasRemaining()) {
int n = fc.write(b, pos);
pos += n;
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
},
progressListener
);
}

if (level == 0) {
log.info("Compacting level 0 (base layer)");
writer.offsetAfterInline();

ExecutorCompletionService<List<WriteResult>> ecs =
new ExecutorCompletionService<>(executor);
} else {
final int lvl = level;
log.info("Compacting upper layer {}", level);

java.util.function.Consumer<BatchSpec> submitOne = (bs) -> {
ecs.submit(() -> {
Scratch scratch = threadLocalScratch.get();
return computeBaseBatch(writer, bs, scratch, params);
});
};
ExecutorCompletionService<List<UpperLayerWriteResult>> ecs =
new ExecutorCompletionService<>(executor);

var wropts = EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.READ);
try (FileChannel fc = FileChannel.open(writer.getOutputPath(), wropts)) {
java.util.function.Consumer<BatchSpec> submitOne = (bs) -> {
submittedFutures.add(ecs.submit(() -> {
Scratch scratch = threadLocalScratch.get();
return computeUpperBatchForLevel(bs, lvl, scratch, params);
}));
};

runBatchesWithBackpressure(
batches,
ecs,
submitOne,
(results) -> {
try {
for (WriteResult r : results) {
ByteBuffer b = r.data;
long pos = r.fileOffset;
while (b.hasRemaining()) {
int n = fc.write(b, pos);
pos += n;
}
for (UpperLayerWriteResult r : results) {
writer.writeUpperLayerNode(
lvl,
r.ordinal,
r.neighbors,
r.pqCode
);
}
} catch (IOException e) {
throw new RuntimeException(e);
throw new UncheckedIOException(e);
}
},
progressListener
);
}

writer.offsetAfterInline();

}
compactionSucceeded = true;
} finally {
// Quiesce the executor BEFORE draining allScratch: until every submitted task
// has terminated, an in-flight task can still run the threadLocalScratch
// supplier and register a fresh Scratch that the drain below would miss.
awaitAllTasks(submittedFutures);
// Drop this thread's ThreadLocal entry; worker-thread entries are unreachable
// here but their Scratch instances are all registered in `allScratch`.
threadLocalScratch.remove();
if (compactionSucceeded) {
// No primary exception in flight, so a close failure can be surfaced safely.
closeAllScratch(allScratch);
} else {
final int lvl = level;
log.info("Compacting upper layer {}", level);

ExecutorCompletionService<List<UpperLayerWriteResult>> ecs =
new ExecutorCompletionService<>(executor);
// A primary exception is already propagating; close best-effort so a
// secondary close failure does not mask the original cause.
closeAllScratchQuietly(allScratch);
}
}
}

java.util.function.Consumer<BatchSpec> submitOne = (bs) -> {
ecs.submit(() -> {
Scratch scratch = threadLocalScratch.get();
return computeUpperBatchForLevel(bs, lvl, scratch, params);
});
};
/**
* Cancels and waits for every batch task submitted to {@code executor}. This MUST run
* before {@code allScratch} is drained: until a submitted task has terminated it can
* still execute the {@code threadLocalScratch} supplier and {@code allScratch.add(...)}
* a fresh Scratch, which the drain would then miss and leak.
*
* <p>On the normal-completion path every task is already done, so {@code cancel} is a
* no-op and {@code get} returns immediately — a cheap barrier. On the failure path the
* not-yet-started tasks are cancelled (they never create a Scratch) and the running
* ones are awaited (their Scratch is registered and will be drained).
*/
private static void awaitAllTasks(Queue<Future<?>> submittedFutures) {
boolean interrupted = false;
Future<?> f;
while ((f = submittedFutures.poll()) != null) {
// No-op for already-completed tasks; interrupts the rest so a failed compaction
// need not wait for the whole sliding window to drain naturally.
f.cancel(true);
if (interrupted) {
// Already interrupted: keep cancelling the remainder but do not block.
continue;
}
try {
f.get();
} catch (CancellationException | ExecutionException ignored) {
// We only need the task to have TERMINATED; its result/failure was already
// handled by runBatchesWithBackpressure (or is irrelevant once cancelled).
} catch (InterruptedException e) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}

runBatchesWithBackpressure(
batches,
ecs,
submitOne,
(results) -> {
try {
for (UpperLayerWriteResult r : results) {
writer.writeUpperLayerNode(
lvl,
r.ordinal,
r.neighbors,
r.pqCode
);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
},
progressListener
);
/**
* Closes every {@link Scratch} created during compaction, regardless of which thread
* created it. A close failure on one Scratch does not prevent the others from being
* closed; the first failure is rethrown with any later ones attached as suppressed.
*/
private static void closeAllScratch(Queue<Scratch> allScratch) throws IOException {
Throwable firstError = null;
Scratch scratch;
while ((scratch = allScratch.poll()) != null) {
try {
scratch.close();
} catch (IOException | RuntimeException e) {
// Catch RuntimeException too: a single Scratch failing to close must not
// abort the loop, otherwise every remaining Scratch would leak.
if (firstError == null) {
firstError = e;
} else {
firstError.addSuppressed(e);
}
}
}
if (firstError instanceof IOException) {
throw (IOException) firstError;
} else if (firstError instanceof RuntimeException) {
throw (RuntimeException) firstError;
}
}

Scratch s = threadLocalScratch.get();
s.close();
threadLocalScratch.remove();
/**
* Like {@link #closeAllScratch} but never throws — used when a primary exception is
* already propagating, so a close failure is logged instead of masking the cause.
*/
private static void closeAllScratchQuietly(Queue<Scratch> allScratch) {
Scratch scratch;
while ((scratch = allScratch.poll()) != null) {
try {
scratch.close();
} catch (IOException | RuntimeException e) {
// Broad-ish catch on purpose: one bad close must not abort the rest.
log.warn("Failed to close compaction Scratch space", e);
}
}
}

/**
Expand Down Expand Up @@ -1119,19 +1237,62 @@ private static final class Scratch implements AutoCloseable {
this.pqCode = (pq == null) ? null : vectorTypeSupport.createByteSequence(pq.getSubspaceCount());

this.gs = new GraphSearcher[sources.size()];
for (int i = 0; i < sources.size(); i++) {
gs[i] = new GraphSearcher(sources.get(i));
gs[i].usePruning(false);
// Each GraphSearcher opens a View (hence a RandomAccessReader) on its source.
// If a later source's getView() throws, close the searchers already created so
// their readers are not leaked — this Scratch is never returned to the caller,
// so it would otherwise never be registered for the compactor's drain.
boolean initialised = false;
try {
for (int i = 0; i < sources.size(); i++) {
gs[i] = new GraphSearcher(sources.get(i));
gs[i].usePruning(false);
}
initialised = true;
} finally {
if (!initialised) {
for (GraphSearcher s : gs) {
if (s != null) {
try {
s.close();
} catch (IOException | RuntimeException ignored) {
// best-effort cleanup; the original failure propagates
}
}
}
}
}
}

/**
* Closes all graph searchers and resets the cache.
* Closes all graph searchers and resets the cache. Every searcher is closed even
* if one throws, otherwise a single failing GraphSearcher.close() would leak the
* RandomAccessReaders behind the remaining searchers' Views; the first failure is
* rethrown with any later ones attached as suppressed.
*/
@Override
public void close() throws IOException {
for (var s : gs) s.close();
Throwable firstError = null;
for (GraphSearcher s : gs) {
// The constructor may leave a trailing null slot if it failed partway.
if (s == null) {
continue;
}
try {
s.close();
} catch (IOException | RuntimeException e) {
if (firstError == null) {
firstError = e;
} else {
firstError.addSuppressed(e);
}
}
}
selectedCache.reset();
if (firstError instanceof IOException) {
throw (IOException) firstError;
} else if (firstError instanceof RuntimeException) {
throw (RuntimeException) firstError;
}
}
}

Expand Down
Loading
Loading