Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions herddb-core/src/main/java/herddb/storage/DataStorageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,63 @@ public abstract io.github.jbellis.jvector.disk.ReaderSupplier multipartIndexRead
public abstract void deleteMultipartIndexFile(String tableSpace, String uuid, String fileType)
throws DataStorageManagerException;

/**
* Issue #617: best-effort presence check for a multipart index file.
* Used by the operator-facing {@code DeleteSegment} RPC to refuse the
* delete when the graph file IS reachable (warning the operator that
* they may be targeting the wrong segment).
*
* <p>Default implementation opens the reader supplier with a 1-byte
* fictitious file size and attempts a single 1-byte read at offset 0.
* Success → {@code true}; an {@link IOException} or
* {@link DataStorageManagerException} on either step → {@code false}.
* Backends that can answer the question more cheaply (e.g. an S3
* {@code HEAD} request) should override.
*
* <p>The contract is intentionally "best-effort": callers MUST NOT
* use this as a strong existence check, only as a safety gate for
* operator-driven deletes. A {@code false} result combined with a
* {@code --force} override is the supported way to delete a segment.
*/
public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) {
// We do not know the real file size here; pass a sentinel value
// larger than any plausible block-0 footprint plus 4 bytes for the
// probe. Passing a too-small value (e.g. 1L) breaks the default
// multipartIndexReaderSupplier readers because readFully then sees
// available==0 once position reaches the fake totalSize and spins
// (pr-reviewer follow-up #2). Backends that can answer the question
// more cheaply (e.g. an S3 HEAD request, or a wire-level
// readFileRange) should override this method outright —
// RemoteFileDataStorageManager does exactly that.
io.github.jbellis.jvector.disk.ReaderSupplier rs;
try {
// 1 GiB is far larger than any real block-0 in production; the
// probe still only ever reads 4 bytes, so the inflated fileSize
// only affects the reader's internal end-of-file accounting and
// never causes the backend to fetch more bytes than necessary.
rs = multipartIndexReaderSupplier(tableSpace, uuid, fileType,
1L << 30);
} catch (DataStorageManagerException e) {
return false;
}
try (io.github.jbellis.jvector.disk.RandomAccessReader r = rs.get()) {
r.seek(0L);
r.readInt();
return true;
// The reader will throw IOException on missing block-0; for backends
// that surface S3 NoSuchKey as an unchecked exception (the issue #617
// failure mode), the broader RuntimeException catch below absorbs it.
// Both are treated as "file not present" — see the method contract.
} catch (IOException | RuntimeException e) {
// Broad RuntimeException catch is necessary because some object-
// storage backends (S3) wrap NoSuchKeyException in a SdkException
// subclass that is itself a RuntimeException, not an IOException.
// We intentionally treat any error during the probe as "missing",
// consistent with the best-effort contract documented above.
return false;
}
}

/**
* Returns {@code true} when this storage manager supports bypassing the gRPC
* file-server round-trips for bulk segment-file downloads during recovery.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2261,6 +2261,208 @@ public void dropIndexImmediate(String table, String indexName, String requestedU
}
}

/**
* Outcome of {@link #deleteSegment(String, String, String, boolean, boolean)}.
* Mirrors the wire fields of {@code DeleteSegmentResponse} so the gRPC
* handler is a thin translation layer.
*/
public static final class DeleteSegmentResult {
public final String segment;
public final boolean removed;
public final long vectorsLost;
public final boolean graphFilePresent;
public final boolean storagePurged;

public DeleteSegmentResult(String segment, boolean removed, long vectorsLost,
boolean graphFilePresent, boolean storagePurged) {
this.segment = segment;
this.removed = removed;
this.vectorsLost = vectorsLost;
this.graphFilePresent = graphFilePresent;
this.storagePurged = storagePurged;
}
}

/**
* Thrown by {@link #deleteSegment} when the request must be rejected
* without mutating state: the index or store is not loaded, the
* segment is not registered, or the segment's graph file is still
* present in remote storage and {@code force == false}.
*/
public static final class DeleteSegmentException extends RuntimeException {
public DeleteSegmentException(String message) {
super(message);
}
}

/**
* Issue #617: operator remediation tool. Removes a single segment from
* a {@link PersistentVectorStore}'s in-memory metadata, with optional
* purging of the segment's multipart files in the underlying
* {@link DataStorageManager}.
*
* <p>Refuses the deletion when the segment's graph file IS reachable
* in remote storage and {@code force == false} — the most likely
* explanation for a reachable graph file is that the operator is
* targeting the wrong segment. The {@code --force} flag (and an
* extra confirmation in the CLI) lets the operator override.
*
* <p>On a successful in-memory removal, this method re-publishes the
* current {@link IndexingServiceCheckpointState} so shadow replicas
* observe the new (smaller) segment count on their next reload. The
* re-publish is best-effort — if it fails the next regular checkpoint
* will still carry the updated segment list, so shadows converge.
*
* @throws DeleteSegmentException when the request cannot be satisfied
* without mutating state
*/
public DeleteSegmentResult deleteSegment(String table, String indexName, String segmentStorageKey,
boolean purgeStorage, boolean force) {
if (table == null || table.isEmpty()) {
throw new DeleteSegmentException("table is required");
}
if (indexName == null || indexName.isEmpty()) {
throw new DeleteSegmentException("index is required");
}
if (segmentStorageKey == null || segmentStorageKey.isEmpty()) {
throw new DeleteSegmentException("segment is required");
}
AbstractVectorStore store = vectorStores.get(storeKey(table, indexName));
if (store == null) {
throw new DeleteSegmentException("index " + table + "." + indexName + " is not loaded");
}
if (!(store instanceof PersistentVectorStore)) {
// pr-reviewer follow-up #6: a ReadOnlyVectorStore means we are
// running as a shadow replica (or have loaded a snapshot in
// read-only mode); the IS-level gate at IndexingServiceImpl
// .deleteSegment normally short-circuits these RPCs before they
// reach the engine, but we keep belt-and-braces here in case a
// future caller bypasses the gRPC layer.
if (store instanceof ReadOnlyVectorStore) {
throw new DeleteSegmentException(
"index " + table + "." + indexName
+ ": this instance is a shadow replica — target the primary"
+ " indexing service");
}
throw new DeleteSegmentException(
"index " + table + "." + indexName + " is non-persistent ("
+ store.getClass().getSimpleName() + "); has no on-disk segments");
}
PersistentVectorStore pvs = (PersistentVectorStore) store;

// Presence check (informational + safety gate).
java.util.List<String> keys = pvs.getSegmentStorageKeysSnapshot();
if (!keys.contains(segmentStorageKey)) {
throw new DeleteSegmentException(
"segment " + segmentStorageKey + " is not registered in index "
+ table + "." + indexName + "; currently loaded segments: " + keys);
}

// MinIO HEAD-equivalent — best effort. We deliberately read the
// tablespace UUID from the engine rather than from the request because
// the IS is single-tablespace per instance.
boolean graphPresent = false;
if (dataStorageManager != null) {
graphPresent = dataStorageManager.multipartIndexFileExists(
tableSpaceUUID, segmentStorageKey, "graph");
}
if (graphPresent && !force) {
throw new DeleteSegmentException(
"refusing to delete segment " + segmentStorageKey
+ ": graph file IS reachable in remote storage. "
+ "Re-run with force=true if you are sure this is the right segment "
+ "(see issue #617).");
}

// Audit-level log BEFORE the mutation so a crash mid-delete leaves a
// forensic trace in the IS log.
LOGGER.log(Level.SEVERE,
"deleteSegment: operator-initiated removal of segment {0} from {1}.{2}"
+ " (graph_file_present={3}, force={4}, purge_storage={5})"
+ " — issue #617",
new Object[]{segmentStorageKey, table, indexName,
graphPresent, force, purgeStorage});

// pr-reviewer follow-up #1: test-only hook fired AFTER the engine has
// taken its snapshot+probe but BEFORE the engine's own drop call. A
// test can use this to simulate a concurrent compaction (or another
// operator-driven drop) racing the engine, and then assert that the
// engine reports the resulting "segment disappeared" race path
// correctly (removed=false, vectors_lost=-1). Strictly test-only,
// package-private — production callers never set this.
Runnable preDropHook = preDropRaceHookForTests;
if (preDropHook != null) {
preDropHook.run();
}

AbstractVectorStore.SegmentDropResult drop = pvs.dropSegmentByStorageKey(segmentStorageKey);
if (!drop.removed) {
// Race: a concurrent compaction swap removed the segment between
// our snapshot and the drop. Treat as a no-op — the operator's
// intent (segment gone) has been satisfied, but we cannot compute
// the vectors_lost count because the segment handle is no longer
// accessible. Surface -1L per the proto contract so operators can
// distinguish "removed 0 vectors" from "did not remove anything
// and cannot tell what would have been lost"
// (pr-reviewer follow-up #5).
LOGGER.log(Level.WARNING,
"deleteSegment: segment {0} disappeared between snapshot and drop"
+ " (concurrent compaction swap?); reporting no-op with"
+ " vectors_lost=-1 (race path)",
segmentStorageKey);
return new DeleteSegmentResult(segmentStorageKey, false, -1L, graphPresent, false);
}

boolean storagePurged = false;
if (purgeStorage && dataStorageManager != null) {
// Best-effort: failures are logged but do not undo the in-memory
// removal. The operator can re-run with purge_storage=true to
// retry the file deletion if needed (the underlying call is
// idempotent).
try {
dataStorageManager.deleteMultipartIndexFile(tableSpaceUUID, segmentStorageKey, "graph");
dataStorageManager.deleteMultipartIndexFile(tableSpaceUUID, segmentStorageKey, "map");
storagePurged = true;
} catch (herddb.storage.DataStorageManagerException e) {
LOGGER.log(Level.WARNING,
"deleteSegment: in-memory removal of " + segmentStorageKey
+ " succeeded but multipart file purge failed; "
+ "operator may need to clean up storage manually",
e);
}
}

// Trigger a checkpoint so the new (smaller) segment list is
// serialised to the on-disk IndexStatus AND the corresponding
// IndexingServiceCheckpointState is republished to ZK. Without
// the checkpoint a shadow reload would still see the deleted
// segment in the IndexStatus and fail with "multipart file not
// found" when it tries to mmap the purged map file. The
// dropSegmentByStorageKey path marks the store dirty so the
// checkpoint will actually serialise.
//
// forceCheckpointAndSaveWatermark also calls
// publishCheckpointStateBestEffort internally, so shadows are
// notified as part of the same write.
try {
forceCheckpointAndSaveWatermark();
} catch (RuntimeException e) {
// Checkpoint failure must not undo the in-memory removal —
// a subsequent regular checkpoint (or another delete-segment
// call) will eventually serialise the reduced segment list.
// Logged at WARNING so operators can correlate with the
// SEVERE audit line emitted above.
LOGGER.log(Level.WARNING,
"deleteSegment: post-delete checkpoint failed; shadows may"
+ " not observe the new segment count until the"
+ " next regular checkpoint",
e);
}

return new DeleteSegmentResult(
segmentStorageKey, true, drop.vectorsLost, graphPresent, storagePurged);
}

private static boolean isDmlType(short type) {
return type == LogEntryType.INSERT
|| type == LogEntryType.UPDATE
Expand Down Expand Up @@ -3422,6 +3624,27 @@ void setWarmupPauseHookForTest(Runnable hook) {
this.warmupPauseHookForTest = hook;
}

/**
* pr-reviewer follow-up #1 (issue #617): test-only hook fired inside
* {@link #deleteSegment} AFTER the engine has snapshotted the segment
* list and probed the multipart graph, but BEFORE the engine calls
* {@link PersistentVectorStore#dropSegmentByStorageKey} itself. Tests
* use this to drop the same segment from a parallel actor and then
* assert that {@code deleteSegment} reports the "segment disappeared
* between snapshot and drop" race path (removed=false, vectors_lost=-1).
* Strictly test-only — production never sets this.
*/
private volatile Runnable preDropRaceHookForTests = null;

/**
* Installs (or clears) the pre-drop race hook above. Package-private:
* production code never calls this.
*/
// package-private for testing
void setPreDropRaceHookForTests(Runnable hook) {
this.preDropRaceHookForTests = hook;
}

/**
* Body of the async warmup task: iterates the snapshot of persistent
* stores and calls {@link PersistentVectorStore#warmUpBlockCache} on each.
Expand Down Expand Up @@ -3790,6 +4013,33 @@ public long getShadowReloadCount() {
return shadowReloadCount.get();
}

/**
* Test-only accessor: returns the segment count of the loaded vector
* store for {@code table.indexName} regardless of whether it is a
* {@link PersistentVectorStore} (primary) or a
* {@link herddb.indexing.vector.ReadOnlyVectorStore} (shadow).
* Returns {@code -1} when the store is not loaded.
*
* <p>Added in pr-reviewer follow-up #4 for issue #617 so the
* {@code ShadowDeleteSegmentE2ETest.lateBootShadowObservesPostDeleteState}
* case can assert that a shadow booted AFTER a primary-side delete
* loads the smaller (post-delete) segment count, without having to
* unwrap the vector store map directly.
*/
public int getSegmentCountForTest(String table, String indexName) {
AbstractVectorStore store = vectorStores.get(storeKey(table, indexName));
if (store == null) {
return -1;
}
if (store instanceof PersistentVectorStore) {
return ((PersistentVectorStore) store).getSegmentCount();
}
if (store instanceof ReadOnlyVectorStore) {
return ((ReadOnlyVectorStore) store).getSegmentCount();
}
return -1;
}

/**
* Minimum {@code IndexStatus.generation} currently loaded across
* every vector store this engine holds. Used by the retention
Expand Down
Loading
Loading