Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,23 @@ public View getView() {
}
}

/**
* Opens a {@link View} backed by a caller-supplied {@link ReaderSupplier} instead of the
* index's own internal reader. This allows callers that have already downloaded or buffered
* the index file locally (e.g. for bulk vector extraction during PQ retraining) to avoid
* routing reads through the block cache.
*
* <p>The caller is responsible for closing the returned View (which closes the underlying
* reader) and for separately closing the {@code supplier} when it is no longer needed.
*
* @param supplier the reader supplier to use; {@code supplier.get()} is called exactly once
* @return a new View backed by the provided reader
* @throws IOException if {@code supplier.get()} throws
*/
public View getView(ReaderSupplier supplier) throws IOException {
return new View(supplier.get());
}

@Override
public double getAverageDegree(int level) {
var view = this.getView();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.IntStream;
import io.github.jbellis.jvector.disk.ReaderSupplier;
import io.github.jbellis.jvector.graph.*;
import io.github.jbellis.jvector.graph.disk.feature.Feature;
import io.github.jbellis.jvector.graph.disk.feature.FeatureId;
Expand Down Expand Up @@ -71,17 +73,47 @@ public final class OnDiskGraphIndexCompactor implements Accountable {
private final ForkJoinPool executor;
private final int taskWindowSize;
private final VectorSimilarityFunction similarityFunction;
/**
* Optional factory for bulk sequential readers used during PQ retraining.
* When non-null, {@link #resolvePQFromSources} passes it to {@link PQRetrainer}
* so that training-vector extraction avoids per-node block-cache round-trips
* (issue #599 Option B). {@code null} uses the default block-cache path.
*/
private final Function<OnDiskGraphIndex, ReaderSupplier> readerSupplierFactory;

/**
* Constructs a new OnDiskGraphIndexCompactor to merge multiple graph indexes.
* Initializes thread pool, validates inputs, and prepares metadata for compaction.
* Uses the default block-cache path for PQ retraining vector extraction.
*
* @see #OnDiskGraphIndexCompactor(List, List, List, VectorSimilarityFunction, ForkJoinPool, Function)
*/
public OnDiskGraphIndexCompactor(
List<OnDiskGraphIndex> sources,
List<FixedBitSet> liveNodes,
List<OrdinalMapper> remappers,
VectorSimilarityFunction similarityFunction,
ForkJoinPool executor) {
this(sources, liveNodes, remappers, similarityFunction, executor, null);
}

/**
* Constructs a new OnDiskGraphIndexCompactor to merge multiple graph indexes,
* with an optional reader-supplier factory for bulk PQ retraining I/O.
*
* @param readerSupplierFactory when non-null, called once per source segment
* during PQ retraining to obtain a
* {@link ReaderSupplier} backed by a pre-downloaded
* or locally-buffered copy of the index file; avoids
* per-node block-cache round-trips (issue #599
* Option B); pass {@code null} for the default path
*/
public OnDiskGraphIndexCompactor(
List<OnDiskGraphIndex> sources,
List<FixedBitSet> liveNodes,
List<OrdinalMapper> remappers,
VectorSimilarityFunction similarityFunction,
ForkJoinPool executor,
Function<OnDiskGraphIndex, ReaderSupplier> readerSupplierFactory) {
checkBeforeCompact(sources, liveNodes, remappers);

int threads = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -112,6 +144,7 @@ public OnDiskGraphIndexCompactor(
maxOrdinal = max(mapper.maxOrdinal(), maxOrdinal);
}
this.similarityFunction = similarityFunction;
this.readerSupplierFactory = readerSupplierFactory;
}

/**
Expand Down Expand Up @@ -971,7 +1004,9 @@ private List<CommonHeader.LayerInfo> computeLayerInfoFromSources() {
* indexes. This ensures the PQ is optimized for the combined dataset.
*/
private ProductQuantization resolvePQFromSources(VectorSimilarityFunction similarityFunction) {
PQRetrainer retrainer = new PQRetrainer(sources, liveNodes, dimension);
// Pass the reader-supplier factory so PQRetrainer can open bulk sequential readers
// instead of per-node block-cache reads when remote storage is involved (issue #599).
PQRetrainer retrainer = new PQRetrainer(sources, liveNodes, dimension, readerSupplierFactory);
return retrainer.retrain(similarityFunction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.github.jbellis.jvector.graph.disk;

import io.github.jbellis.jvector.disk.ReaderSupplier;
import io.github.jbellis.jvector.graph.ListRandomAccessVectorValues;
import io.github.jbellis.jvector.graph.disk.feature.FeatureId;
import io.github.jbellis.jvector.graph.disk.feature.FusedPQ;
Expand All @@ -42,6 +43,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
* Handles Product Quantization retraining for graph index compaction.
Expand Down Expand Up @@ -74,11 +76,44 @@ public class PQRetrainer {
private final List<Integer> numLiveNodesPerSource;
private final int dimension;
private final int numTotalNodes;
/**
* Optional factory that, given a source {@link OnDiskGraphIndex}, returns a
* {@link ReaderSupplier} backed by a pre-downloaded or locally-buffered copy of
* the index file. When non-null, {@link #extractVectorsSequential} uses it to
* open Views via {@link OnDiskGraphIndex#getView(ReaderSupplier)} instead of the
* default {@link OnDiskGraphIndex#getView()}, avoiding per-node block-cache reads
* over remote storage (issue #599 Option B).
*
* <p>The factory may throw {@link RuntimeException} (including
* {@link java.io.UncheckedIOException}) on failure; such exceptions surface as
* the existing {@code RuntimeException("PQ retraining vector extraction failed")}
* wrapper.
*/
private final Function<OnDiskGraphIndex, ReaderSupplier> readerSupplierFactory;

/**
* Constructs a {@code PQRetrainer} without a custom reader-supplier factory.
* Vector extraction uses the default {@link OnDiskGraphIndex#getView()}, which
* routes reads through the block cache.
*/
public PQRetrainer(List<OnDiskGraphIndex> sources, List<FixedBitSet> liveNodes, int dimension) {
this(sources, liveNodes, dimension, null);
}

/**
* Constructs a {@code PQRetrainer} with an optional reader-supplier factory.
*
* @param readerSupplierFactory when non-null, called once per source segment to
* obtain a {@link ReaderSupplier} for bulk sequential
* reads; the returned supplier is closed after all
* vectors for that source are extracted
*/
public PQRetrainer(List<OnDiskGraphIndex> sources, List<FixedBitSet> liveNodes, int dimension,
Function<OnDiskGraphIndex, ReaderSupplier> readerSupplierFactory) {
this.sources = sources;
this.liveNodes = liveNodes;
this.dimension = dimension;
this.readerSupplierFactory = readerSupplierFactory;

this.numLiveNodesPerSource = new ArrayList<>(sources.size());
int total = 0;
Expand Down Expand Up @@ -270,8 +305,30 @@ private List<VectorFloat<?>> extractVectorsSequential(List<SampleRef> samples) {
futures.add(pool.submit(() -> {
// One View — and one RandomAccessReader — per task. Never
// shared across threads, so concurrent extraction is safe.
OnDiskGraphIndex.View view =
(OnDiskGraphIndex.View) sources.get(source).getView();
//
// When a readerSupplierFactory is present, use it to open a View
// backed by a pre-downloaded or locally-buffered copy of the index
// file instead of the default block-cache reader. This eliminates
// per-node remote-storage round-trips (issue #599 Option B).
final OnDiskGraphIndex odg = sources.get(source);
final ReaderSupplier supplierForSource;
final OnDiskGraphIndex.View view;
if (readerSupplierFactory != null) {
supplierForSource = readerSupplierFactory.apply(odg);
try {
view = odg.getView(supplierForSource);
} catch (IOException e) {
try {
supplierForSource.close();
} catch (IOException suppressed) {
e.addSuppressed(suppressed);
}
throw new java.io.UncheckedIOException(e);
}
} else {
supplierForSource = null;
view = (OnDiskGraphIndex.View) odg.getView();
}
try {
VectorFloat<?> scratch = vectorTypeSupport.createFloatVector(dimension);
for (SampleRef ref : group) {
Expand All @@ -290,6 +347,14 @@ private List<VectorFloat<?>> extractVectorsSequential(List<SampleRef> samples) {
log.warn("Failed to close source {} view during PQ retraining",
source, ioe);
}
if (supplierForSource != null) {
try {
supplierForSource.close();
} catch (IOException ioe) {
log.warn("Failed to close bulk reader supplier for source {} during PQ retraining",
source, ioe);
}
}
}
}));
}
Expand Down
Loading
Loading