Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Licensed to Diennea S.r.l. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Diennea S.r.l. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

*/

package herddb.indexing.vector;

import io.github.jbellis.jvector.disk.RandomAccessReader;
import io.github.jbellis.jvector.disk.ReaderSupplier;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* A {@link ReaderSupplier} wrapper that deletes a temporary file when the supplier
* is closed. Used by {@link SegmentPQReaderSupplier} to ensure that segment graph
* files downloaded to local disk for bulk PQ-retraining vector extraction
* (issue #599 Option B) are cleaned up after use.
*
* <p>Typical lifecycle:
* <ol>
* <li>Download the segment graph file to {@code tempFile} via
* {@link herddb.storage.DataStorageManager#downloadMultipartIndexFile}.</li>
* <li>Wrap the mmap-backed {@link ReaderSupplier} with a
* {@code DeleteOnCloseReaderSupplier(delegate, tempFile)}.</li>
* <li>Pass the wrapper to {@link io.github.jbellis.jvector.graph.disk.PQRetrainer}
* via the reader-supplier factory.</li>
* <li>After all vectors for that source are extracted, {@code PQRetrainer} closes
* the view ({@link RandomAccessReader#close()}) and then calls
* {@link #close()} on this supplier, which closes the mmap and deletes
* {@code tempFile}.</li>
* </ol>
*/
final class DeleteOnCloseReaderSupplier implements ReaderSupplier {

private static final Logger LOGGER = Logger.getLogger(DeleteOnCloseReaderSupplier.class.getName());

private final ReaderSupplier delegate;
private final Path tempFile;

/**
* @param delegate the mmap-backed (or otherwise) supplier to delegate reads to
* @param tempFile the temp file to delete when this supplier is closed
*/
DeleteOnCloseReaderSupplier(ReaderSupplier delegate, Path tempFile) {
this.delegate = delegate;
this.tempFile = tempFile;
}

@Override
public RandomAccessReader get() throws IOException {
return delegate.get();
}

/**
* Closes the backing {@link ReaderSupplier} (releasing any memory-mapped
* resources) and then deletes {@link #tempFile}.
*
* <p>Any {@link IOException} from {@code delegate.close()} is logged at
* {@code WARNING} level but does not suppress the file-deletion step.
* Any {@link IOException} from {@link Files#deleteIfExists} is logged at
* {@code WARNING} level; it is not re-thrown because by this point the
* data has been fully consumed and a residual temp file, while wasteful,
* is not a correctness error.
*/
@Override
public void close() throws IOException {
IOException delegateException = null;
try {
delegate.close();
} catch (IOException e) {
delegateException = e;
LOGGER.log(Level.WARNING, "Failed to close delegate reader supplier for temp file {0}", tempFile);
}
try {
Files.deleteIfExists(tempFile);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failed to delete temp segment file {0}", tempFile);
}
if (delegateException != null) {
throw delegateException;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ static String encodeMultipartPath(String segUuid, String fileType) {
* that does not fit in {@code int}; their storage key is stored explicitly in
* {@link VectorSegment#externalStorageKey} during adoption.
*/
private String segmentStorageKey(VectorSegment seg) {
String segmentStorageKey(VectorSegment seg) {
return seg.externalStorageKey != null
? seg.externalStorageKey
: (indexUUID + "_seg" + seg.segmentId);
Expand Down Expand Up @@ -3405,6 +3405,24 @@ Path tmpDirectory() {
return tmpDirectory;
}

/**
* Package-private accessor for the tablespace UUID. Used by
* {@link SegmentPQReaderSupplier} to construct the correct storage key for
* downloading segment graph files during PQ retraining (issue #599).
*/
String tableSpaceUUID() {
return tableSpaceUUID;
}

/**
* Package-private accessor for the data storage manager. Used by
* {@link SegmentPQReaderSupplier} to download segment graph files for bulk
* sequential vector reads during PQ retraining (issue #599).
*/
herddb.storage.DataStorageManager dataStorageManager() {
return dataStorageManager;
}

SegmentWriteResult writeSyntheticShard(LiveGraphShard syntheticShard,
int segmentId,
int dim) throws IOException, DataStorageManagerException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
Licensed to Diennea S.r.l. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Diennea S.r.l. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

*/

package herddb.indexing.vector;

import herddb.storage.DataStorageManager;
import herddb.storage.DataStorageManagerException;
import io.github.jbellis.jvector.disk.ReaderSupplier;
import io.github.jbellis.jvector.disk.ReaderSupplierFactory;
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Factory for bulk sequential {@link ReaderSupplier} instances used during
* PQ retraining in streaming compaction (issue #599 Option B).
*
* <p>When {@link io.github.jbellis.jvector.graph.disk.PQRetrainer} calls
* {@link io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex#getView} on each
* source segment, it normally goes through the segment's default
* {@link ReaderSupplier} — which in production is backed by the
* {@code SegmentBlockCache} / gRPC file-server stack. For N source segments and
* ~4 096 sampled nodes each, this serialises ~53 248 16 KiB block-cache round-trips.
*
* <p>This class returns a {@link Function}{@code <OnDiskGraphIndex, ReaderSupplier>}
* that, given a source graph, either:
* <ul>
* <li><b>downloads the graph file once</b> to a temp file in
* {@link PersistentVectorStore#tmpDirectory()} (= the IS data directory) via
* {@link DataStorageManager#downloadMultipartIndexFile} when the storage manager
* supports a direct object-storage path
* ({@link DataStorageManager#supportsDirectMultipartDownload()} is {@code true});
* or</li>
* <li>opens a <b>sequential reader</b> via
* {@link DataStorageManager#multipartIndexReaderSupplier} (the existing
* file-server / in-memory path) when direct download is not available.</li>
* </ul>
* Either way, {@link io.github.jbellis.jvector.graph.disk.PQRetrainer} reads all
* sampled vectors from the local file / in-memory buffer rather than issuing one
* gRPC round-trip per sampled node.
*
* <p>Temp files created for the direct-download path are wrapped in a
* {@link DeleteOnCloseReaderSupplier} so they are deleted as soon as the
* {@code PQRetrainer} closes the supplier after extracting that source's vectors.
*/
final class SegmentPQReaderSupplier {

private static final Logger LOGGER = Logger.getLogger(SegmentPQReaderSupplier.class.getName());

private SegmentPQReaderSupplier() {
}

/**
* Builds a reader-supplier factory for all source segments in a streaming
* compaction. The factory maps each {@link OnDiskGraphIndex} to a
* {@link ReaderSupplier} backed by a local file download or DSM sequential
* reader, enabling bulk sequential reads during PQ retraining.
*
* <p>The factory is safe to call from multiple threads concurrently (each
* invocation creates its own independent {@link ReaderSupplier} and, for the
* direct-download path, its own temp file).
*
* @param store the store that owns the segments; provides the storage
* manager, tablespace UUID, and download directory
* @param candidates the compaction input segments, positionally aligned with
* {@code sources}
* @param sources the corresponding {@link OnDiskGraphIndex} objects, in the
* same order as {@code candidates}
* @return a factory that maps each {@link OnDiskGraphIndex} to a
* {@link ReaderSupplier} for its graph file; never {@code null}
*/
static Function<OnDiskGraphIndex, ReaderSupplier> forSegments(
PersistentVectorStore store,
List<VectorSegment> candidates,
List<OnDiskGraphIndex> sources) {

if (candidates.size() != sources.size()) {
throw new IllegalArgumentException(
"SegmentPQReaderSupplier: candidates.size()=" + candidates.size()
+ " != sources.size()=" + sources.size()
+ "; both lists must be positionally aligned");
}

DataStorageManager dsm = store.dataStorageManager();
String tsUUID = store.tableSpaceUUID();
Path downloadDir = store.tmpDirectory();

// Build an identity-keyed map so that we can resolve VectorSegment from
// the OnDiskGraphIndex reference without relying on equals/hashCode
// (OnDiskGraphIndex does not override them).
IdentityHashMap<OnDiskGraphIndex, VectorSegment> odgToSeg = new IdentityHashMap<>();
for (int i = 0; i < sources.size(); i++) {
odgToSeg.put(sources.get(i), candidates.get(i));
}

return odg -> {
VectorSegment seg = odgToSeg.get(odg);
if (seg == null) {
throw new IllegalStateException(
"SegmentPQReaderSupplier: OnDiskGraphIndex not found in candidate set");
}
String segUuid = store.segmentStorageKey(seg);
long graphFileSize = seg.graphFileSize;

if (graphFileSize <= 0) {
throw new IllegalStateException(
"SegmentPQReaderSupplier: graphFileSize=" + graphFileSize
+ " for segment " + segUuid + " — segment graph has not been"
+ " persisted yet or was not populated correctly");
}

if (dsm.supportsDirectMultipartDownload()) {
// Fast path: download directly from object storage (S3/GCS/MinIO),
// bypassing the gRPC file-server. The temp file lives in the IS data
// directory (store.tmpDirectory()) — the same location as all other
// compaction scratch files (e.g. "herddb-vector-compact-graph-*.idx").
Path tempFile = null;
boolean success = false;
try {
tempFile = Files.createTempFile(downloadDir, "herddb-pq-seg-", ".idx");
dsm.downloadMultipartIndexFile(tsUUID, segUuid, "graph", graphFileSize, tempFile);
ReaderSupplier mmap = ReaderSupplierFactory.open(tempFile);
success = true;
return new DeleteOnCloseReaderSupplier(mmap, tempFile);
} catch (IOException e) {
throw new UncheckedIOException(
"SegmentPQReaderSupplier: failed to download segment graph for PQ retraining"
+ " (tsUUID=" + tsUUID + ", segUuid=" + segUuid + ")", e);
} catch (DataStorageManagerException e) {
throw new IllegalStateException(
"SegmentPQReaderSupplier: storage error while downloading segment graph for PQ retraining"
+ " (tsUUID=" + tsUUID + ", segUuid=" + segUuid + ")", e);
} finally {
if (!success) {
// Clean up the temp file on any failure path — the catch blocks
// above re-throw without deleting, so the finally is the sole
// cleanup point. deleteSilently is idempotent; if the file was
// never created (null) it is a no-op.
deleteSilently(tempFile);
}
}
} else {
// Fallback: sequential reader from the file server (or in-memory for
// MemoryDataStorageManager). Still avoids per-node block-cache reads
// because the whole file is served by a single ReaderSupplier whose
// get() returns a reader over the complete file content.
// The compactor (jvector OnDiskGraphIndexCompactor) calls close() on
// every factory-produced supplier after use, so no leak occurs here.
try {
return dsm.multipartIndexReaderSupplier(tsUUID, segUuid, "graph", graphFileSize);
} catch (DataStorageManagerException e) {
throw new IllegalStateException(
"SegmentPQReaderSupplier: failed to open multipart reader for PQ retraining"
+ " (tsUUID=" + tsUUID + ", segUuid=" + segUuid + ")", e);
}
}
};
}

private static void deleteSilently(Path tempFile) {
if (tempFile == null) {
return;
}
try {
Files.deleteIfExists(tempFile);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failed to delete temp PQ segment file {0}", tempFile);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import herddb.indexing.vector.PersistentVectorStore.PendingDelete;
import herddb.storage.DataStorageManagerException;
import herddb.utils.Bytes;
import io.github.jbellis.jvector.disk.ReaderSupplier;
import io.github.jbellis.jvector.graph.GraphIndexBuilder;
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex;
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndexCompactor;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -134,6 +136,20 @@ public static long getStreamingFallbackToLegacyTotal() {
return STREAMING_FALLBACK_TO_LEGACY_TOTAL.get();
}

/**
* Process-wide counter incremented each time {@link SegmentPQReaderSupplier}
* is invoked for a source segment during PQ retraining in
* {@link #rebuildSegmentStreaming} (issue #599 Option B).
*
* <p>Each invocation corresponds to one segment whose graph file was either
* downloaded directly from object storage or opened via the DSM's sequential
* reader, bypassing the per-node block-cache path.
*
* <p>Tests may read this counter to verify that the bulk-reader path was
* exercised. Reset between tests if needed.
*/
static final AtomicInteger PQ_BULK_READER_COUNT = new AtomicInteger(0);

/** Reasons a compaction run can fail; carried through to metrics. */
enum FailureReason {
READ_IO,
Expand Down Expand Up @@ -1122,12 +1138,28 @@ private static RebuildResult rebuildSegmentStreaming(
// side-effect still matters.
long startNodeId = store.allocateCompactionNodeIds(keptCount);
try {
// Build a bulk reader-supplier factory so PQRetrainer downloads each source
// segment's graph file once (to IS data dir or DSM reader) instead of
// issuing one gRPC round-trip per sampled node (issue #599 Option B).
// The factory is tracked via PQ_BULK_READER_COUNT for test observability.
Function<OnDiskGraphIndex, ReaderSupplier> pqReaderFactory =
SegmentPQReaderSupplier.forSegments(store, candidates, sources);

// OnDiskGraphIndexCompactor is not AutoCloseable: it self-shuts-down
// a fork-join pool inside compact() iff it owns one. We hand in
// PhysicalCoreExecutor.pool() so the compactor never owns its executor.
// Pass the factory so resolvePQFromSources uses bulk reads (issue #599).
OnDiskGraphIndexCompactor compactor = new OnDiskGraphIndexCompactor(
sources, liveBitsets, mappers, store.compactionSimilarity(),
PhysicalCoreExecutor.pool());
PhysicalCoreExecutor.pool(),
odg -> {
// Build the reader first; only count it after the supplier is
// successfully obtained so the counter accurately reflects
// successful bulk-reader acquisitions (not just invocations).
ReaderSupplier supplier = pqReaderFactory.apply(odg);
PQ_BULK_READER_COUNT.incrementAndGet();
return supplier;
});
try {
compactor.compact(graphTemp);
} catch (java.io.FileNotFoundException | RuntimeException e) {
Expand Down
Loading
Loading