diff --git a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java
index 8ff6657c6..a03818605 100644
--- a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java
+++ b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java
@@ -267,6 +267,63 @@ public abstract io.github.jbellis.jvector.disk.ReaderSupplier multipartIndexRead
public abstract void deleteMultipartIndexFile(String tableSpace, String uuid, String fileType)
throws DataStorageManagerException;
+ /**
+ * Issue #617: best-effort presence check for a multipart index file.
+ * Used by the operator-facing {@code DeleteSegment} RPC to refuse the
+ * delete when the graph file IS reachable (warning the operator that
+ * they may be targeting the wrong segment).
+ *
+ *
Default implementation opens the reader supplier with a 1-byte
+ * fictitious file size and attempts a single 1-byte read at offset 0.
+ * Success → {@code true}; an {@link IOException} or
+ * {@link DataStorageManagerException} on either step → {@code false}.
+ * Backends that can answer the question more cheaply (e.g. an S3
+ * {@code HEAD} request) should override.
+ *
+ *
The contract is intentionally "best-effort": callers MUST NOT
+ * use this as a strong existence check, only as a safety gate for
+ * operator-driven deletes. A {@code false} result combined with a
+ * {@code --force} override is the supported way to delete a segment.
+ */
+ public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) {
+ // We do not know the real file size here; pass a sentinel value
+ // larger than any plausible block-0 footprint plus 4 bytes for the
+ // probe. Passing a too-small value (e.g. 1L) breaks the default
+ // multipartIndexReaderSupplier readers because readFully then sees
+ // available==0 once position reaches the fake totalSize and spins
+ // (pr-reviewer follow-up #2). Backends that can answer the question
+ // more cheaply (e.g. an S3 HEAD request, or a wire-level
+ // readFileRange) should override this method outright —
+ // RemoteFileDataStorageManager does exactly that.
+ io.github.jbellis.jvector.disk.ReaderSupplier rs;
+ try {
+ // 1 GiB is far larger than any real block-0 in production; the
+ // probe still only ever reads 4 bytes, so the inflated fileSize
+ // only affects the reader's internal end-of-file accounting and
+ // never causes the backend to fetch more bytes than necessary.
+ rs = multipartIndexReaderSupplier(tableSpace, uuid, fileType,
+ 1L << 30);
+ } catch (DataStorageManagerException e) {
+ return false;
+ }
+ try (io.github.jbellis.jvector.disk.RandomAccessReader r = rs.get()) {
+ r.seek(0L);
+ r.readInt();
+ return true;
+ // The reader will throw IOException on missing block-0; for backends
+ // that surface S3 NoSuchKey as an unchecked exception (the issue #617
+ // failure mode), the broader RuntimeException catch below absorbs it.
+ // Both are treated as "file not present" — see the method contract.
+ } catch (IOException | RuntimeException e) {
+ // Broad RuntimeException catch is necessary because some object-
+ // storage backends (S3) wrap NoSuchKeyException in a SdkException
+ // subclass that is itself a RuntimeException, not an IOException.
+ // We intentionally treat any error during the probe as "missing",
+ // consistent with the best-effort contract documented above.
+ return false;
+ }
+ }
+
/**
* Returns {@code true} when this storage manager supports bypassing the gRPC
* file-server round-trips for bulk segment-file downloads during recovery.
diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java
index 8cc000db7..e3ae46148 100644
--- a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java
+++ b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java
@@ -2261,6 +2261,208 @@ public void dropIndexImmediate(String table, String indexName, String requestedU
}
}
+ /**
+ * Outcome of {@link #deleteSegment(String, String, String, boolean, boolean)}.
+ * Mirrors the wire fields of {@code DeleteSegmentResponse} so the gRPC
+ * handler is a thin translation layer.
+ */
+ public static final class DeleteSegmentResult {
+ public final String segment;
+ public final boolean removed;
+ public final long vectorsLost;
+ public final boolean graphFilePresent;
+ public final boolean storagePurged;
+
+ public DeleteSegmentResult(String segment, boolean removed, long vectorsLost,
+ boolean graphFilePresent, boolean storagePurged) {
+ this.segment = segment;
+ this.removed = removed;
+ this.vectorsLost = vectorsLost;
+ this.graphFilePresent = graphFilePresent;
+ this.storagePurged = storagePurged;
+ }
+ }
+
+ /**
+ * Thrown by {@link #deleteSegment} when the request must be rejected
+ * without mutating state: the index or store is not loaded, the
+ * segment is not registered, or the segment's graph file is still
+ * present in remote storage and {@code force == false}.
+ */
+ public static final class DeleteSegmentException extends RuntimeException {
+ public DeleteSegmentException(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Issue #617: operator remediation tool. Removes a single segment from
+ * a {@link PersistentVectorStore}'s in-memory metadata, with optional
+ * purging of the segment's multipart files in the underlying
+ * {@link DataStorageManager}.
+ *
+ *
Refuses the deletion when the segment's graph file IS reachable
+ * in remote storage and {@code force == false} — the most likely
+ * explanation for a reachable graph file is that the operator is
+ * targeting the wrong segment. The {@code --force} flag (and an
+ * extra confirmation in the CLI) lets the operator override.
+ *
+ *
On a successful in-memory removal, this method re-publishes the
+ * current {@link IndexingServiceCheckpointState} so shadow replicas
+ * observe the new (smaller) segment count on their next reload. The
+ * re-publish is best-effort — if it fails the next regular checkpoint
+ * will still carry the updated segment list, so shadows converge.
+ *
+ * @throws DeleteSegmentException when the request cannot be satisfied
+ * without mutating state
+ */
+ public DeleteSegmentResult deleteSegment(String table, String indexName, String segmentStorageKey,
+ boolean purgeStorage, boolean force) {
+ if (table == null || table.isEmpty()) {
+ throw new DeleteSegmentException("table is required");
+ }
+ if (indexName == null || indexName.isEmpty()) {
+ throw new DeleteSegmentException("index is required");
+ }
+ if (segmentStorageKey == null || segmentStorageKey.isEmpty()) {
+ throw new DeleteSegmentException("segment is required");
+ }
+ AbstractVectorStore store = vectorStores.get(storeKey(table, indexName));
+ if (store == null) {
+ throw new DeleteSegmentException("index " + table + "." + indexName + " is not loaded");
+ }
+ if (!(store instanceof PersistentVectorStore)) {
+ // pr-reviewer follow-up #6: a ReadOnlyVectorStore means we are
+ // running as a shadow replica (or have loaded a snapshot in
+ // read-only mode); the IS-level gate at IndexingServiceImpl
+ // .deleteSegment normally short-circuits these RPCs before they
+ // reach the engine, but we keep belt-and-braces here in case a
+ // future caller bypasses the gRPC layer.
+ if (store instanceof ReadOnlyVectorStore) {
+ throw new DeleteSegmentException(
+ "index " + table + "." + indexName
+ + ": this instance is a shadow replica — target the primary"
+ + " indexing service");
+ }
+ throw new DeleteSegmentException(
+ "index " + table + "." + indexName + " is non-persistent ("
+ + store.getClass().getSimpleName() + "); has no on-disk segments");
+ }
+ PersistentVectorStore pvs = (PersistentVectorStore) store;
+
+ // Presence check (informational + safety gate).
+ java.util.List keys = pvs.getSegmentStorageKeysSnapshot();
+ if (!keys.contains(segmentStorageKey)) {
+ throw new DeleteSegmentException(
+ "segment " + segmentStorageKey + " is not registered in index "
+ + table + "." + indexName + "; currently loaded segments: " + keys);
+ }
+
+ // MinIO HEAD-equivalent — best effort. We deliberately read the
+ // tablespace UUID from the engine rather than from the request because
+ // the IS is single-tablespace per instance.
+ boolean graphPresent = false;
+ if (dataStorageManager != null) {
+ graphPresent = dataStorageManager.multipartIndexFileExists(
+ tableSpaceUUID, segmentStorageKey, "graph");
+ }
+ if (graphPresent && !force) {
+ throw new DeleteSegmentException(
+ "refusing to delete segment " + segmentStorageKey
+ + ": graph file IS reachable in remote storage. "
+ + "Re-run with force=true if you are sure this is the right segment "
+ + "(see issue #617).");
+ }
+
+ // Audit-level log BEFORE the mutation so a crash mid-delete leaves a
+ // forensic trace in the IS log.
+ LOGGER.log(Level.SEVERE,
+ "deleteSegment: operator-initiated removal of segment {0} from {1}.{2}"
+ + " (graph_file_present={3}, force={4}, purge_storage={5})"
+ + " — issue #617",
+ new Object[]{segmentStorageKey, table, indexName,
+ graphPresent, force, purgeStorage});
+
+ // pr-reviewer follow-up #1: test-only hook fired AFTER the engine has
+ // taken its snapshot+probe but BEFORE the engine's own drop call. A
+ // test can use this to simulate a concurrent compaction (or another
+ // operator-driven drop) racing the engine, and then assert that the
+ // engine reports the resulting "segment disappeared" race path
+ // correctly (removed=false, vectors_lost=-1). Strictly test-only,
+ // package-private — production callers never set this.
+ Runnable preDropHook = preDropRaceHookForTests;
+ if (preDropHook != null) {
+ preDropHook.run();
+ }
+
+ AbstractVectorStore.SegmentDropResult drop = pvs.dropSegmentByStorageKey(segmentStorageKey);
+ if (!drop.removed) {
+ // Race: a concurrent compaction swap removed the segment between
+ // our snapshot and the drop. Treat as a no-op — the operator's
+ // intent (segment gone) has been satisfied, but we cannot compute
+ // the vectors_lost count because the segment handle is no longer
+ // accessible. Surface -1L per the proto contract so operators can
+ // distinguish "removed 0 vectors" from "did not remove anything
+ // and cannot tell what would have been lost"
+ // (pr-reviewer follow-up #5).
+ LOGGER.log(Level.WARNING,
+ "deleteSegment: segment {0} disappeared between snapshot and drop"
+ + " (concurrent compaction swap?); reporting no-op with"
+ + " vectors_lost=-1 (race path)",
+ segmentStorageKey);
+ return new DeleteSegmentResult(segmentStorageKey, false, -1L, graphPresent, false);
+ }
+
+ boolean storagePurged = false;
+ if (purgeStorage && dataStorageManager != null) {
+ // Best-effort: failures are logged but do not undo the in-memory
+ // removal. The operator can re-run with purge_storage=true to
+ // retry the file deletion if needed (the underlying call is
+ // idempotent).
+ try {
+ dataStorageManager.deleteMultipartIndexFile(tableSpaceUUID, segmentStorageKey, "graph");
+ dataStorageManager.deleteMultipartIndexFile(tableSpaceUUID, segmentStorageKey, "map");
+ storagePurged = true;
+ } catch (herddb.storage.DataStorageManagerException e) {
+ LOGGER.log(Level.WARNING,
+ "deleteSegment: in-memory removal of " + segmentStorageKey
+ + " succeeded but multipart file purge failed; "
+ + "operator may need to clean up storage manually",
+ e);
+ }
+ }
+
+ // Trigger a checkpoint so the new (smaller) segment list is
+ // serialised to the on-disk IndexStatus AND the corresponding
+ // IndexingServiceCheckpointState is republished to ZK. Without
+ // the checkpoint a shadow reload would still see the deleted
+ // segment in the IndexStatus and fail with "multipart file not
+ // found" when it tries to mmap the purged map file. The
+ // dropSegmentByStorageKey path marks the store dirty so the
+ // checkpoint will actually serialise.
+ //
+ // forceCheckpointAndSaveWatermark also calls
+ // publishCheckpointStateBestEffort internally, so shadows are
+ // notified as part of the same write.
+ try {
+ forceCheckpointAndSaveWatermark();
+ } catch (RuntimeException e) {
+ // Checkpoint failure must not undo the in-memory removal —
+ // a subsequent regular checkpoint (or another delete-segment
+ // call) will eventually serialise the reduced segment list.
+ // Logged at WARNING so operators can correlate with the
+ // SEVERE audit line emitted above.
+ LOGGER.log(Level.WARNING,
+ "deleteSegment: post-delete checkpoint failed; shadows may"
+ + " not observe the new segment count until the"
+ + " next regular checkpoint",
+ e);
+ }
+
+ return new DeleteSegmentResult(
+ segmentStorageKey, true, drop.vectorsLost, graphPresent, storagePurged);
+ }
+
private static boolean isDmlType(short type) {
return type == LogEntryType.INSERT
|| type == LogEntryType.UPDATE
@@ -3422,6 +3624,27 @@ void setWarmupPauseHookForTest(Runnable hook) {
this.warmupPauseHookForTest = hook;
}
+ /**
+ * pr-reviewer follow-up #1 (issue #617): test-only hook fired inside
+ * {@link #deleteSegment} AFTER the engine has snapshotted the segment
+ * list and probed the multipart graph, but BEFORE the engine calls
+ * {@link PersistentVectorStore#dropSegmentByStorageKey} itself. Tests
+ * use this to drop the same segment from a parallel actor and then
+ * assert that {@code deleteSegment} reports the "segment disappeared
+ * between snapshot and drop" race path (removed=false, vectors_lost=-1).
+ * Strictly test-only — production never sets this.
+ */
+ private volatile Runnable preDropRaceHookForTests = null;
+
+ /**
+ * Installs (or clears) the pre-drop race hook above. Package-private:
+ * production code never calls this.
+ */
+ // package-private for testing
+ void setPreDropRaceHookForTests(Runnable hook) {
+ this.preDropRaceHookForTests = hook;
+ }
+
/**
* Body of the async warmup task: iterates the snapshot of persistent
* stores and calls {@link PersistentVectorStore#warmUpBlockCache} on each.
@@ -3790,6 +4013,33 @@ public long getShadowReloadCount() {
return shadowReloadCount.get();
}
+ /**
+ * Test-only accessor: returns the segment count of the loaded vector
+ * store for {@code table.indexName} regardless of whether it is a
+ * {@link PersistentVectorStore} (primary) or a
+ * {@link herddb.indexing.vector.ReadOnlyVectorStore} (shadow).
+ * Returns {@code -1} when the store is not loaded.
+ *
+ * Added in pr-reviewer follow-up #4 for issue #617 so the
+ * {@code ShadowDeleteSegmentE2ETest.lateBootShadowObservesPostDeleteState}
+ * case can assert that a shadow booted AFTER a primary-side delete
+ * loads the smaller (post-delete) segment count, without having to
+ * unwrap the vector store map directly.
+ */
+ public int getSegmentCountForTest(String table, String indexName) {
+ AbstractVectorStore store = vectorStores.get(storeKey(table, indexName));
+ if (store == null) {
+ return -1;
+ }
+ if (store instanceof PersistentVectorStore) {
+ return ((PersistentVectorStore) store).getSegmentCount();
+ }
+ if (store instanceof ReadOnlyVectorStore) {
+ return ((ReadOnlyVectorStore) store).getSegmentCount();
+ }
+ return -1;
+ }
+
/**
* Minimum {@code IndexStatus.generation} currently loaded across
* every vector store this engine holds. Used by the retention
diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java
index 4fc269e0a..4dba6edb3 100644
--- a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java
+++ b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java
@@ -23,6 +23,8 @@
import com.google.protobuf.ByteString;
import herddb.indexing.proto.DescribeIndexRequest;
import herddb.indexing.proto.DescribeIndexResponse;
+import herddb.indexing.proto.DeleteSegmentRequest;
+import herddb.indexing.proto.DeleteSegmentResponse;
import herddb.indexing.proto.DropIndexRequest;
import herddb.indexing.proto.DropIndexResponse;
import herddb.indexing.proto.GetEngineStatsRequest;
@@ -679,6 +681,76 @@ public void dropIndex(DropIndexRequest request,
}
}
+ /**
+ * Issue #617: operator remediation tool. Removes a single corrupted
+ * segment from the IS in-memory metadata, with optional purge of its
+ * multipart files in the underlying {@link herddb.storage.DataStorageManager}.
+ *
+ *
Maps engine-level outcomes to gRPC status codes:
+ *
+ * - {@code IndexingServiceEngine.DeleteSegmentException} →
+ * {@code FAILED_PRECONDITION} (the request is well-formed but the
+ * IS refuses to act — wrong index, wrong segment, or graph file
+ * still present without {@code force=true}).
+ * - Other {@code RuntimeException} → {@code INTERNAL} (unexpected
+ * failure during the in-memory mutation or storage purge).
+ *
+ */
+ @Override
+ public void deleteSegment(DeleteSegmentRequest request,
+ StreamObserver responseObserver) {
+ String table = request.getTable();
+ String indexName = request.getIndex();
+ String segment = request.getSegment();
+ LOGGER.log(Level.WARNING,
+ "DeleteSegment RPC: index={0} table={1} tablespace={2} segment={3}"
+ + " purge_storage={4} force={5} (issue #617)",
+ new Object[]{indexName, table, request.getTablespace(), segment,
+ request.getPurgeStorage(), request.getForce()});
+ // pr-reviewer follow-up #1: refuse the RPC up-front when this instance
+ // is configured as a shadow replica. Shadow instances mirror their
+ // primary's segment list via reloadFromIndexStatus(); accepting a
+ // delete-segment write on a shadow would silently diverge the two
+ // copies and the next reload would just put the segment back. The
+ // operator must target the primary instead.
+ if (engine.isConfiguredAsShadow()) {
+ String desc = "not_primary: this indexing-service instance is a shadow replica "
+ + "(shadowOf=" + engine.getShadowOfOrMinusOne()
+ + "); target the primary instead";
+ LOGGER.log(Level.WARNING,
+ "DeleteSegment RPC rejected on shadow instance: {0}", desc);
+ responseObserver.onError(Status.FAILED_PRECONDITION
+ .withDescription(desc)
+ .asRuntimeException());
+ return;
+ }
+ try {
+ IndexingServiceEngine.DeleteSegmentResult result = engine.deleteSegment(
+ table, indexName, segment,
+ request.getPurgeStorage(), request.getForce());
+ responseObserver.onNext(DeleteSegmentResponse.newBuilder()
+ .setSegment(result.segment)
+ .setRemoved(result.removed)
+ .setVectorsLost(result.vectorsLost)
+ .setGraphFilePresent(result.graphFilePresent)
+ .setStoragePurged(result.storagePurged)
+ .build());
+ responseObserver.onCompleted();
+ } catch (IndexingServiceEngine.DeleteSegmentException e) {
+ LOGGER.log(Level.WARNING,
+ "DeleteSegment RPC rejected for segment " + segment + ": " + e.getMessage());
+ responseObserver.onError(Status.FAILED_PRECONDITION
+ .withDescription(e.getMessage())
+ .asRuntimeException());
+ } catch (RuntimeException e) {
+ LOGGER.log(Level.SEVERE, "DeleteSegment RPC failed for segment " + segment, e);
+ responseObserver.onError(Status.INTERNAL
+ .withDescription(e.getMessage())
+ .withCause(e)
+ .asRuntimeException());
+ }
+ }
+
/**
* Push-based indexing (TESTING ONLY). Deserializes each pushed
* {@code LogEntry} and enqueues it into the {@link PushCommitLogTailer}'s
diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java
index 3faa8b515..4b0eb3492 100644
--- a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java
+++ b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java
@@ -21,6 +21,7 @@
package herddb.indexing.admin;
import com.google.protobuf.ByteString;
+import herddb.indexing.proto.DeleteSegmentResponse;
import herddb.indexing.proto.DescribeIndexResponse;
import herddb.indexing.proto.GetEngineStatsResponse;
import herddb.indexing.proto.GetIndexStatusResponse;
@@ -84,6 +85,7 @@ public final class IndexingAdminCli {
static final String COMMAND_LIST_SHADOWS = "list-shadows";
static final String COMMAND_SHADOW_STATUS = "shadow-status";
static final String COMMAND_WAIT_SHADOW = "wait-shadow";
+ static final String COMMAND_DELETE_SEGMENT = "delete-segment";
private final PrintStream out;
private final PrintStream err;
@@ -131,6 +133,8 @@ public int run(String[] args) {
return runShadowStatus(rest);
case COMMAND_WAIT_SHADOW:
return runWaitShadow(rest);
+ case COMMAND_DELETE_SEGMENT:
+ return runDeleteSegment(rest);
default:
err.println("Unknown command: " + command);
printUsage();
@@ -162,6 +166,7 @@ private void printUsage() {
out.println(" list-shadows Read ZooKeeper and print registered shadow replicas");
out.println(" shadow-status Print shadow-replica catch-up state of one instance");
out.println(" wait-shadow Block until an instance has caught up to a target LSN");
+ out.println(" delete-segment Remove a corrupted segment from an index's IS metadata (#617)");
out.println();
out.println("Run 'indexing-admin --help' for command-specific flags.");
}
@@ -779,6 +784,115 @@ private int runWaitShadow(String[] args) throws Exception {
}
}
+ /**
+ * Issue #617: {@code delete-segment} command. Runs against a single IS
+ * instance and removes one segment from its in-memory metadata.
+ *
+ * Safety: the CLI prompts on stdin (unless {@code --yes} is given)
+ * before submitting the RPC, and again when the operator supplies
+ * {@code --force} (which overrides the server-side refusal to delete a
+ * segment whose graph file IS reachable in storage). The default exit
+ * code is {@code 0} only when the server reports {@code removed=true};
+ * a presence-only "no-op" response returns {@code 1} so wrapper scripts
+ * can distinguish "I successfully removed it" from "it was already gone".
+ */
+ private int runDeleteSegment(String[] args) throws Exception {
+ Options opts = new Options();
+ addCommonOptions(opts);
+ addServerOption(opts);
+ addIndexOptions(opts);
+ opts.addOption(Option.builder().longOpt("segment").hasArg().argName("NAME")
+ .desc("segment storage key to remove (required, e.g. vidx__seg627)")
+ .required().build());
+ opts.addOption(Option.builder().longOpt("purge-storage")
+ .desc("also delete the segment's multipart graph + map files from storage").build());
+ opts.addOption(Option.builder().longOpt("force")
+ .desc("delete even when the graph file IS reachable in remote storage "
+ + "(extra confirmation required unless --yes is set)").build());
+ opts.addOption(Option.builder().longOpt("yes")
+ .desc("skip interactive confirmation prompts (use in scripts)").build());
+
+ CommandLine cli = parse(opts, args, COMMAND_DELETE_SEGMENT);
+ if (cli == null) {
+ return 0;
+ }
+ String segment = cli.getOptionValue("segment");
+ boolean purge = cli.hasOption("purge-storage");
+ boolean force = cli.hasOption("force");
+ boolean yes = cli.hasOption("yes");
+
+ if (!yes) {
+ if (force) {
+ err.println("WARNING: --force will delete segment " + segment
+ + " even if its graph file is reachable in storage.");
+ err.println("This usually means you are targeting the wrong segment.");
+ }
+ err.println("About to delete segment '" + segment + "' from "
+ + cli.getOptionValue("table") + "." + cli.getOptionValue("index")
+ + " on " + cli.getOptionValue("server")
+ + (purge ? " (and purge multipart files from storage)" : "")
+ + ". Continue? [y/N] ");
+ err.flush();
+ // Read from System.in directly rather than capture it via a CLI
+ // option: the CLI's stdin is exactly what the operator typed.
+ int ch = -1;
+ try {
+ ch = System.in.read();
+ } catch (java.io.IOException e) {
+ err.println("aborting: stdin not available (" + e + "); re-run with --yes");
+ return 1;
+ }
+ if (ch != 'y' && ch != 'Y') {
+ err.println("aborted by user");
+ return 1;
+ }
+ }
+
+ try (IndexingAdminClient client = buildClient(cli)) {
+ DeleteSegmentResponse resp = client.deleteSegment(
+ cli.getOptionValue("tablespace"),
+ cli.getOptionValue("table"),
+ cli.getOptionValue("index"),
+ segment, purge, force);
+ // pr-reviewer follow-up #3: the engine emits vectors_lost=-1 on the
+ // race path (segment dropped by a concurrent compaction between the
+ // engine's snapshot and the engine's drop call — the count is
+ // genuinely unknown, NOT zero). Render the sentinel as the explicit
+ // "unknown" string in both text and JSON so wrapper scripts don't
+ // mistake it for a real zero. We keep the field present in JSON
+ // (rather than omitting it) so the schema is stable; the field
+ // type widens from number to "number | \"unknown\"".
+ boolean vectorsLostUnknown = resp.getVectorsLost() < 0L;
+ if (cli.hasOption("json")) {
+ Map m = new LinkedHashMap<>();
+ m.put("segment", resp.getSegment());
+ m.put("removed", resp.getRemoved());
+ if (vectorsLostUnknown) {
+ m.put("vectors_lost", "unknown");
+ } else {
+ m.put("vectors_lost", resp.getVectorsLost());
+ }
+ m.put("graph_file_present", resp.getGraphFilePresent());
+ m.put("storage_purged", resp.getStoragePurged());
+ out.println(JsonWriter.toJson(m));
+ } else {
+ if (vectorsLostUnknown) {
+ out.printf(Locale.ROOT,
+ "segment=%s removed=%s vectors_lost=unknown (race) "
+ + "graph_file_present=%s storage_purged=%s%n",
+ resp.getSegment(), resp.getRemoved(),
+ resp.getGraphFilePresent(), resp.getStoragePurged());
+ } else {
+ out.printf(Locale.ROOT,
+ "segment=%s removed=%s vectors_lost=%d graph_file_present=%s storage_purged=%s%n",
+ resp.getSegment(), resp.getRemoved(), resp.getVectorsLost(),
+ resp.getGraphFilePresent(), resp.getStoragePurged());
+ }
+ }
+ return resp.getRemoved() ? 0 : 1;
+ }
+ }
+
/**
* Returns the time-lag in milliseconds for a given LogEntry timestamp, or
* {@code -1} when {@code timestampMillis == 0} ("unknown" — no entries
diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java
index bb4811f2f..81be77d2e 100644
--- a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java
+++ b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java
@@ -20,6 +20,8 @@
package herddb.indexing.admin;
+import herddb.indexing.proto.DeleteSegmentRequest;
+import herddb.indexing.proto.DeleteSegmentResponse;
import herddb.indexing.proto.DescribeIndexRequest;
import herddb.indexing.proto.DescribeIndexResponse;
import herddb.indexing.proto.GetEngineStatsRequest;
@@ -115,6 +117,22 @@ public GetShadowStatusResponse getShadowStatus() {
return stub().getShadowStatus(GetShadowStatusRequest.newBuilder().build());
}
+ /**
+ * Issue #617: operator remediation tool. Removes a single segment from
+ * the IS in-memory metadata, optionally purging its multipart files.
+ */
+ public DeleteSegmentResponse deleteSegment(String tablespace, String table, String index,
+ String segment, boolean purgeStorage, boolean force) {
+ return stub().deleteSegment(DeleteSegmentRequest.newBuilder()
+ .setTablespace(tablespace == null ? "" : tablespace)
+ .setTable(table == null ? "" : table)
+ .setIndex(index == null ? "" : index)
+ .setSegment(segment == null ? "" : segment)
+ .setPurgeStorage(purgeStorage)
+ .setForce(force)
+ .build());
+ }
+
public WaitForCheckpointResponse waitForCheckpoint(String tablespace, String table, String index,
long targetLedgerId, long targetOffset,
long waitTimeoutMs) {
diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java b/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java
index bcc74cebd..21d91c715 100644
--- a/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java
+++ b/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java
@@ -178,6 +178,57 @@ public void dropSegmentByUuid(String segmentUuid) {
// No-op for non-persistent stores.
}
+ /**
+ * Result of a {@link #dropSegmentByStorageKey(String)} call.
+ *
+ * Used by the operator-facing {@code DeleteSegment} RPC (issue #617)
+ * to report whether the segment was found and, when it was, the number
+ * of live vectors that were lost as part of the removal — both as a
+ * sanity check ("did I just delete a populated segment?") and as the
+ * value the IS surfaces back to the {@code indexing-admin delete-segment}
+ * CLI for the operator's audit log.
+ */
+ public static final class SegmentDropResult {
+ public final boolean removed;
+ public final long vectorsLost;
+
+ public SegmentDropResult(boolean removed, long vectorsLost) {
+ this.removed = removed;
+ this.vectorsLost = vectorsLost;
+ }
+
+ public static final SegmentDropResult NOT_FOUND = new SegmentDropResult(false, 0L);
+ }
+
+ /**
+ * Removes a segment from the active list by its multipart storage key
+ * — i.e. the value returned by {@code PersistentVectorStore.segmentStorageKey}
+ * (legacy {@code indexUUID + "_seg" + segmentId} or, for adopted segments,
+ * the explicit {@code externalStorageKey}). Used by the operator-facing
+ * {@code DeleteSegment} RPC (issue #617) to remove a corrupted segment
+ * whose Phase B upload failed mid-flight, leaving it registered in IS
+ * metadata without a fully-written graph file in remote storage.
+ *
+ *
Idempotent: returns {@link SegmentDropResult#NOT_FOUND} when no
+ * segment with the given storage key is currently loaded.
+ *
+ *
Does not delete the segment's multipart files from the
+ * underlying storage manager — that is the responsibility of the
+ * caller (the {@code DeleteSegment} RPC handler), which only purges
+ * remote files when explicitly requested via {@code purge_storage=true}.
+ *
+ *
The default implementation is a no-op.
+ * {@link herddb.indexing.vector.PersistentVectorStore} overrides this.
+ *
+ * @param storageKey the segment's multipart storage key to remove
+ * @return a {@link SegmentDropResult} describing whether the segment
+ * was removed and how many live vectors were lost
+ */
+ public SegmentDropResult dropSegmentByStorageKey(String storageKey) {
+ // No-op for non-persistent stores.
+ return SegmentDropResult.NOT_FOUND;
+ }
+
/**
* Reconciles adopted (externally-produced) segments against the ZK-reported
* snapshot. Any segment with a non-null external storage key whose UUID is
diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java b/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java
index 6e4eecc0a..cf8130a17 100644
--- a/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java
+++ b/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java
@@ -3691,6 +3691,17 @@ public String getOnDiskSegmentExternalStorageKeyForTest(int idx) {
return segments.get(idx).externalStorageKey;
}
+ /**
+ * Test-only accessor for the deferred-close queue used by
+ * {@link #dropSegmentByUuid} and {@link #dropSegmentByStorageKey}.
+ * Returns the current number of queued (not-yet-closed) segments.
+ * Pr-reviewer follow-up #3 for issue #617 uses this to assert the
+ * queue eventually drains after the operator-driven drop.
+ */
+ public int getPendingSegmentClosesSizeForTest() {
+ return pendingSegmentCloses.size();
+ }
+
VectorSegment preloadCompactedSegment(SegmentWriteResult swr) throws IOException, DataStorageManagerException {
VectorSegment seg = new VectorSegment(swr.segmentId);
seg.segmentUuid = swr.segmentUuid;
@@ -3930,6 +3941,64 @@ public void dropSegmentByUuid(String segmentUuid) {
}
}
+ /**
+ * Issue #617: removes a segment from the active list by its
+ * multipart storage key. Shares the same lock discipline and
+ * deferred-close machinery as {@link #dropSegmentByUuid}.
+ *
+ *
Matching is by {@link #segmentStorageKey(VectorSegment)}, so this
+ * method works for both IS-locally-produced segments (legacy key
+ * {@code indexUUID + "_seg" + segmentId}) and adopted ones
+ * ({@code externalStorageKey}).
+ *
+ *
Idempotent: when no segment matches, returns
+ * {@link SegmentDropResult#NOT_FOUND} without mutating state.
+ */
+ @Override
+ public SegmentDropResult dropSegmentByStorageKey(String storageKey) {
+ if (storageKey == null || storageKey.isEmpty()) {
+ return SegmentDropResult.NOT_FOUND;
+ }
+ try {
+ VectorSegment found = null;
+ stateLock.writeLock().lock();
+ try {
+ List newList = new java.util.concurrent.CopyOnWriteArrayList<>();
+ for (VectorSegment s : segments) {
+ if (found == null && storageKey.equals(segmentStorageKey(s))) {
+ found = s;
+ } else {
+ newList.add(s);
+ }
+ }
+ if (found == null) {
+ return SegmentDropResult.NOT_FOUND;
+ }
+ segments = newList;
+ // Mirror dropSegmentByUuid: mark dirty so the next checkpoint
+ // serialises the reduced segment list. Without this the
+ // checkpoint gate would skip and leave a stale entry on disk.
+ dirty.set(true);
+ long vectorsLost = found.liveCount.get();
+ LOGGER.log(Level.SEVERE,
+ "dropSegmentByStorageKey: operator removed segment {0} from"
+ + " store {1} (vectors_lost={2}); total on-disk"
+ + " segments now {3}",
+ new Object[]{storageKey, indexName, vectorsLost, segments.size()});
+ // Same deferred-close protocol as dropSegmentByUuid (issue #535):
+ // a concurrent compaction cycle may still hold a reference to
+ // `found.onDiskGraph`; queue the close so it runs after the
+ // cycle releases its compactionLock.
+ pendingSegmentCloses.add(found);
+ return new SegmentDropResult(true, vectorsLost);
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ } finally {
+ drainPendingSegmentClosesOpportunistically();
+ }
+ }
+
/**
* Reconciles the store's adopted (externally-produced) segment set against
* the ZK-reported ownership snapshot. Any segment that was adopted (i.e.
@@ -8650,6 +8719,30 @@ public int getOnDiskNodeCount() {
return (int) onDiskNodeToPkSize();
}
+ /**
+ * Issue #617: snapshot of the multipart storage keys of every on-disk
+ * segment currently registered in this store. Used by the operator-
+ * facing {@code DeleteSegment} RPC handler to enumerate candidates
+ * (via {@code describe-index --json}'s {@code corrupted_segments} list)
+ * and to verify presence before mutating state.
+ *
+ * The returned list is a defensive copy and is stable across
+ * concurrent {@link #dropSegmentByStorageKey} / compaction swaps. It
+ * may legitimately contain duplicate-looking entries on a transient
+ * adoption race: callers MUST treat the result as a snapshot and
+ * re-read it if they need a fresh view.
+ */
+ public java.util.List getSegmentStorageKeysSnapshot() {
+ // CopyOnWriteArrayList iterator is snapshot-safe; no readLock needed
+ // for the enumeration itself, only for any subsequent mutation that
+ // wants to act on the snapshot atomically (that is the caller's job).
+ java.util.List out = new java.util.ArrayList<>(segments.size());
+ for (VectorSegment s : segments) {
+ out.add(segmentStorageKey(s));
+ }
+ return out;
+ }
+
/**
* Returns the current value of the global monotonic node-id counter.
* Exposed for telemetry — dashboards can watch the burn rate and
diff --git a/herddb-indexing-service/src/main/proto/indexing_service.proto b/herddb-indexing-service/src/main/proto/indexing_service.proto
index 3c6079c33..5c0b784a9 100644
--- a/herddb-indexing-service/src/main/proto/indexing_service.proto
+++ b/herddb-indexing-service/src/main/proto/indexing_service.proto
@@ -51,6 +51,20 @@ service IndexingService {
// The commit-log tailer path remains the authoritative cleanup fallback.
rpc DropIndex (DropIndexRequest) returns (DropIndexResponse);
+ // Issue #617: operator remediation tool. Removes a single corrupted /
+ // partially-written segment from the IS in-memory metadata so the
+ // compaction loop stops re-selecting it after a Phase B upload failure
+ // left the segment registered without a fully-written graph file.
+ //
+ // SAFETY: by default the IS refuses to delete a segment whose graph file
+ // IS reachable in remote storage (operator may be targeting the wrong
+ // segment). Set force=true to override. When purge_storage=true the IS
+ // also deletes the segment's multipart graph + map files from the
+ // underlying DataStorageManager. After a successful deletion the IS
+ // re-publishes its IndexingServiceCheckpointState so shadow replicas
+ // observe the new (smaller) segment set on their next reload.
+ rpc DeleteSegment (DeleteSegmentRequest) returns (DeleteSegmentResponse);
+
// Push-based indexing (TESTING ONLY). When the indexing service runs with
// indexing.log.type=push it does NOT tail a file/BookKeeper commit log;
// instead a client serializes herddb.log.LogEntry objects and pushes them
@@ -363,6 +377,58 @@ message DropIndexRequest {
message DropIndexResponse {
}
+// Issue #617: operator remediation tool — see rpc DeleteSegment above.
+message DeleteSegmentRequest {
+ // Tablespace name. Currently informational only (the IS resolves the
+ // store by (table, index)); kept for symmetry with the other admin RPCs
+ // and so future multi-tablespace IS instances do not need a proto bump.
+ string tablespace = 1;
+ string table = 2;
+ string index = 3;
+ // The segment's multipart storage key (e.g. "vidx__seg627").
+ // This is the value reported by describe-index in the
+ // corrupted_segments list (issue #616) and the value the IS uses to
+ // address segment files in remote storage.
+ string segment = 4;
+ // Whether to delete the segment's multipart graph + map files from the
+ // underlying DataStorageManager after removing it from in-memory state.
+ // When false, the segment is unlinked from the IS but its files remain
+ // (useful when an operator wants to preserve evidence of corruption for
+ // post-mortem analysis).
+ bool purge_storage = 5;
+ // Whether to delete the segment even when its graph file IS reachable
+ // in remote storage. Set to true only after careful verification — a
+ // reachable graph file usually means the operator is targeting the
+ // wrong segment.
+ bool force = 6;
+}
+
+message DeleteSegmentResponse {
+ // The storage key of the segment that was removed (echoes
+ // DeleteSegmentRequest.segment).
+ string segment = 1;
+ // Whether the segment was found in the IS metadata and removed.
+ bool removed = 2;
+ // Number of live vectors lost as a result of the removal. -1 when
+ // the IS could not compute the number — currently emitted only on the
+ // race path where a concurrent compaction swap removed the segment
+ // between the operator's snapshot and the engine's drop (the segment
+ // handle is gone, so its liveCount is no longer accessible). In every
+ // other no-op case (segment never matched, store not loaded) the engine
+ // throws DeleteSegmentException → FAILED_PRECONDITION instead of
+ // returning a response with vectors_lost=-1.
+ int64 vectors_lost = 3;
+ // Whether the segment's graph file was reachable in remote storage at
+ // the moment the IS checked. Informational — operators use this to
+ // decide whether to retry with --force or whether they were targeting
+ // the wrong segment.
+ bool graph_file_present = 4;
+ // Whether the IS attempted to delete the segment's multipart files from
+ // remote storage (only true when the request set purge_storage=true and
+ // the in-memory removal succeeded).
+ bool storage_purged = 5;
+}
+
// ---- Push-based indexing RPC (testing only — see rpc PushEntries) ----
// One commit-log entry to push, with its client-assigned LSN.
diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java
new file mode 100644
index 000000000..d76cdb224
--- /dev/null
+++ b/herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java
@@ -0,0 +1,340 @@
+/*
+ Licensed to Diennea S.r.l. under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. Diennea S.r.l. licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+ */
+package herddb.indexing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import herddb.codec.RecordSerializer;
+import herddb.core.MemoryManager;
+import herddb.indexing.vector.AbstractVectorStore;
+import herddb.indexing.vector.PersistentVectorStore;
+import herddb.indexing.vector.ReadOnlyVectorStore;
+import herddb.log.LogEntry;
+import herddb.log.LogEntryFactory;
+import herddb.log.LogSequenceNumber;
+import herddb.mem.MemoryDataStorageManager;
+import herddb.model.ColumnTypes;
+import herddb.model.Index;
+import herddb.model.Record;
+import herddb.model.Table;
+import herddb.model.TableSpace;
+import io.github.jbellis.jvector.vector.VectorSimilarityFunction;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Issue #617 + pr-reviewer follow-ups #1 and #2: engine-level edge cases for
+ * {@link IndexingServiceEngine#deleteSegment} that the CLI/gRPC tests in
+ * {@code herddb.indexing.admin.IndexingAdminCliDeleteSegmentTest} and the
+ * full E2E test in {@link ShadowDeleteSegmentE2ETest} do not cover.
+ *
+ *
+ * - Race-path test (follow-up #1): drive the engine into the
+ * branch where {@link PersistentVectorStore#dropSegmentByStorageKey}
+ * returns {@code removed=false} after the engine has already
+ * snapshotted the storage key. We inject a parallel drop via a
+ * test-only hook that fires inside {@code deleteSegment} between the
+ * engine's snapshot/probe and the engine's own drop call. Assert that
+ * the engine reports the documented sentinel:
+ * {@code vectorsLost == -1L && removed == false}.
+ * - Shadow / read-only rejection (follow-up #2): install a
+ * {@link ReadOnlyVectorStore} directly into the engine's
+ * {@code vectorStores} map (bypassing the gRPC layer entirely) and
+ * call {@link IndexingServiceEngine#deleteSegment} on it. The
+ * production gate at
+ * {@link IndexingServiceImpl#deleteSegment(herddb.indexing.proto.DeleteSegmentRequest,
+ * io.grpc.stub.StreamObserver)} short-circuits these RPCs at the
+ * service boundary, so this engine-level belt-and-braces refusal is
+ * reachable only by tests that drive the engine directly. The
+ * exception message must call out both "shadow replica" and "primary"
+ * so an operator who somehow reaches this branch (e.g. a future
+ * caller that bypasses the gRPC layer) gets actionable diagnostics.
+ *
+ */
+public class IndexingServiceEngineDeleteSegmentEdgeCasesTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private int savedMinLive;
+
+ @Before
+ public void setUp() {
+ savedMinLive = PersistentVectorStore.minLiveVectorsForCheckpoint;
+ PersistentVectorStore.minLiveVectorsForCheckpoint = 0;
+ }
+
+ @After
+ public void tearDown() {
+ PersistentVectorStore.minLiveVectorsForCheckpoint = savedMinLive;
+ }
+
+ private static Table createTable() {
+ return Table.builder()
+ .name("vectable")
+ .tablespace(TableSpace.DEFAULT)
+ .column("pk", ColumnTypes.STRING)
+ .column("vec", ColumnTypes.FLOATARRAY)
+ .primaryKey("pk")
+ .build();
+ }
+
+ private static Index createIndex(String uuid) {
+ return Index.builder()
+ .name("vidx").uuid(uuid)
+ .table("vectable")
+ .type(Index.TYPE_VECTOR)
+ .column("vec", ColumnTypes.FLOATARRAY)
+ .build();
+ }
+
+ private static float[] randomVector(Random rng, int dim) {
+ float[] v = new float[dim];
+ for (int i = 0; i < dim; i++) {
+ v[i] = rng.nextFloat();
+ }
+ return v;
+ }
+
+ private IndexingServiceEngine newEngine(MemoryDataStorageManager dsm, MemoryManager mm,
+ final String stableUuid,
+ final AtomicReference storeRef) throws Exception {
+ Path logDir = folder.newFolder().toPath();
+ Path dataDir = folder.newFolder().toPath();
+ Properties props = new Properties();
+ props.setProperty(IndexingServerConfiguration.PROPERTY_STORAGE_TYPE, "memory");
+ props.setProperty(IndexingServerConfiguration.PROPERTY_TABLESPACE_NAME, TableSpace.DEFAULT);
+ IndexingServerConfiguration config = new IndexingServerConfiguration(props);
+ final IndexingServiceEngine engine = new IndexingServiceEngine(logDir, dataDir, config);
+
+ herddb.mem.MemoryMetadataStorageManager memMeta = new herddb.mem.MemoryMetadataStorageManager();
+ memMeta.start();
+ memMeta.ensureDefaultTableSpace("local", "local", 0, 1);
+ engine.setMetadataStorageManager(memMeta);
+ engine.setDataStorageManager(dsm);
+ engine.setMemoryManager(mm);
+ engine.setVectorStoreFactory((indexName, tableName, vectorColumnName, dataDirArg, indexProperties) -> {
+ PersistentVectorStore pvs = new PersistentVectorStore(
+ indexName, tableName, engine.getTableSpaceUUID(), vectorColumnName,
+ stableUuid, dataDirArg, dsm, mm,
+ 16, 100, 1.2f, 1.4f, true, 2_000_000_000L, 0,
+ Long.MAX_VALUE,
+ VectorSimilarityFunction.EUCLIDEAN);
+ try {
+ pvs.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (storeRef != null) {
+ storeRef.set(pvs);
+ }
+ return pvs;
+ });
+ return engine;
+ }
+
+ /**
+ * pr-reviewer follow-up #1: race path between the engine's snapshot and
+ * the engine's drop. The pre-drop hook removes the same segment from a
+ * parallel actor (here a direct {@code dropSegmentByStorageKey} call on
+ * the store, simulating a concurrent compaction swap that finished
+ * between the engine's two reads). The engine's own drop then sees the
+ * segment gone and must return the documented race-sentinel.
+ */
+ @Test(timeout = 60_000)
+ public void engineReportsRaceSentinelWhenSegmentDroppedConcurrently() throws Exception {
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+ MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024);
+ final String stableUuid = "idx-uuid-617-race";
+
+ AtomicReference primaryStore = new AtomicReference<>();
+ IndexingServiceEngine engine = newEngine(dsm, mm, stableUuid, primaryStore);
+ engine.start();
+ try {
+ Table t = createTable();
+ Index idx = createIndex(stableUuid);
+ engine.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null));
+ engine.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null));
+
+ Random rng = new Random(617);
+ int dim = 12;
+ LogSequenceNumber last = null;
+ // Two checkpoints → two segments, so even after the "concurrent"
+ // drop the snapshot list (taken before the hook) still contained
+ // the victim — without that, the up-front presence-check inside
+ // deleteSegment would throw DeleteSegmentException ("not
+ // registered") and we would never reach the race branch.
+ for (int batch = 0; batch < 2; batch++) {
+ for (int i = 0; i < 40; i++) {
+ Record r = RecordSerializer.makeRecord(t,
+ "pk", "k" + batch + "_" + i, "vec", randomVector(rng, dim));
+ LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null);
+ last = new LogSequenceNumber(1, 100 + batch * 100 + i);
+ engine.applySingleEntryForTest(last, ins);
+ }
+ engine.awaitPendingWorkForTest();
+ engine.setLastProcessedLsnForTest(last);
+ engine.forceCheckpointAndSaveWatermark();
+ }
+
+ final PersistentVectorStore pvs = primaryStore.get();
+ assertNotNull("engine must have created the persistent store", pvs);
+ List keys = pvs.getSegmentStorageKeysSnapshot();
+ assertTrue("test setup must produce ≥2 segments, got " + keys.size(),
+ keys.size() >= 2);
+ final String victim = keys.get(0);
+
+ // Pre-drop race hook: fires inside engine.deleteSegment AFTER the
+ // snapshot+probe but BEFORE the engine's own drop. Removes the
+ // victim out from under the engine. The hook itself does NOT use
+ // a separate thread (the deferred-close protocol around
+ // dropSegmentByStorageKey is reentrant-safe) — the important
+ // invariant is just that the segment is gone by the time the
+ // engine's drop runs.
+ final AtomicReference sideDropRef =
+ new AtomicReference<>();
+ engine.setPreDropRaceHookForTests(() -> {
+ AbstractVectorStore.SegmentDropResult r = pvs.dropSegmentByStorageKey(victim);
+ sideDropRef.set(r);
+ });
+
+ IndexingServiceEngine.DeleteSegmentResult result = engine.deleteSegment(
+ "vectable", "vidx", victim,
+ /* purgeStorage */ false, /* force */ true);
+
+ // Pre-condition: the side-channel drop must have actually
+ // removed the segment, otherwise the race branch would not be
+ // exercised.
+ AbstractVectorStore.SegmentDropResult sideDrop = sideDropRef.get();
+ assertNotNull("pre-drop hook must have fired", sideDrop);
+ assertTrue("pre-drop hook must have removed the segment",
+ sideDrop.removed);
+
+ // The race-path documented contract: removed=false AND
+ // vectorsLost == -1L (sentinel for "unknown — cannot compute the
+ // count because the segment handle is no longer accessible").
+ assertFalse("engine must report removed=false on the race path",
+ result.removed);
+ assertEquals("engine must surface vectors_lost=-1 on the race path",
+ -1L, result.vectorsLost);
+ assertEquals("response segment name must match the request",
+ victim, result.segment);
+ assertFalse("storage_purged must be false on the race path "
+ + "(engine bails out before touching storage)",
+ result.storagePurged);
+
+ // And the segment is gone from the active list (the side-channel
+ // drop removed it).
+ assertFalse("victim must be gone from the active segment list",
+ pvs.getSegmentStorageKeysSnapshot().contains(victim));
+ } finally {
+ engine.setPreDropRaceHookForTests(null);
+ engine.close();
+ }
+ }
+
+ /**
+ * pr-reviewer follow-up #2: engine-only refusal when the configured
+ * store is a {@link ReadOnlyVectorStore}. The production
+ * {@code IndexingServiceImpl.deleteSegment} short-circuits these RPCs
+ * with a {@code FAILED_PRECONDITION} BEFORE reaching the engine — this
+ * test bypasses that gate by calling {@code engine.deleteSegment(...)}
+ * directly to prove the engine's belt-and-braces guard is in place.
+ *
+ * The exception message must mention both "shadow replica" and
+ * "primary" so a future caller that bypasses the gRPC layer still gets
+ * an actionable diagnostic.
+ */
+ @Test(timeout = 30_000)
+ public void engineRefusesDeleteOnReadOnlyVectorStoreWithDiagnosticMessage() throws Exception {
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+ MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024);
+ final String stableUuid = "idx-uuid-617-readonly";
+
+ IndexingServiceEngine engine = newEngine(dsm, mm, stableUuid, null);
+ engine.start();
+ ReadOnlyVectorStore readOnly = null;
+ try {
+ Table t = createTable();
+ Index idx = createIndex(stableUuid);
+ // Schema must exist so the engine's schema-tracker accepts the
+ // synthetic ReadOnlyVectorStore registration.
+ engine.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null));
+ engine.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null));
+
+ // Build a real ReadOnlyVectorStore (no segments, no data — the
+ // delete-segment refusal must fire on the type check alone, and
+ // must not depend on the store having any state). Use a
+ // dedicated tmp dir so the underlying PersistentVectorStore
+ // delegate does not collide with the in-flight production
+ // store.
+ Path roTmp = folder.newFolder("ro-store").toPath();
+ readOnly = new ReadOnlyVectorStore(
+ "vidx", "vectable", engine.getTableSpaceUUID(),
+ "vec", stableUuid + "-ro", roTmp,
+ dsm, mm,
+ 16, 100, 1.2f, 1.4f,
+ /* fusedPQ */ true,
+ /* maxSegmentSize */ 2_000_000_000L,
+ /* maxLiveGraphSize */ 0,
+ VectorSimilarityFunction.EUCLIDEAN);
+ readOnly.start();
+
+ // Replace the (PersistentVectorStore) the factory just installed
+ // with the synthetic ReadOnlyVectorStore — this is the precise
+ // scenario the engine guard is meant to catch.
+ engine.registerIndexForTest(idx, readOnly);
+
+ try {
+ engine.deleteSegment("vectable", "vidx", "any-segment-key",
+ /* purgeStorage */ false, /* force */ true);
+ fail("engine must refuse deleteSegment on a ReadOnlyVectorStore");
+ } catch (IndexingServiceEngine.DeleteSegmentException expected) {
+ String msg = expected.getMessage();
+ assertNotNull(msg);
+ assertTrue("refusal message must mention 'shadow replica', got: " + msg,
+ msg.contains("shadow replica"));
+ assertTrue("refusal message must mention 'primary' (so the operator"
+ + " knows where to retry), got: " + msg,
+ msg.contains("primary"));
+ }
+ } finally {
+ if (readOnly != null) {
+ try {
+ readOnly.close();
+ } catch (Exception ignore) {
+ // best-effort teardown
+ }
+ }
+ engine.close();
+ }
+ }
+}
diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java
new file mode 100644
index 000000000..667a22c55
--- /dev/null
+++ b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java
@@ -0,0 +1,573 @@
+/*
+ Licensed to Diennea S.r.l. under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. Diennea S.r.l. licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+ */
+package herddb.indexing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import herddb.cluster.ZookeeperMetadataStorageManager;
+import herddb.codec.RecordSerializer;
+import herddb.core.MemoryManager;
+import herddb.indexing.vector.PersistentVectorStore;
+import herddb.log.LogEntry;
+import herddb.log.LogEntryFactory;
+import herddb.log.LogSequenceNumber;
+import herddb.mem.MemoryDataStorageManager;
+import herddb.metadata.IndexingServiceCheckpointState;
+import herddb.metadata.IndexingServiceInstanceDescriptor;
+import herddb.model.ColumnTypes;
+import herddb.model.Index;
+import herddb.model.Record;
+import herddb.model.Table;
+import herddb.model.TableSpace;
+import herddb.utils.ZKTestEnv;
+import io.github.jbellis.jvector.vector.VectorSimilarityFunction;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Issue #617: end-to-end test for the operator-facing {@code DeleteSegment}
+ * path on a real {@link PersistentVectorStore} backed by a shared
+ * {@link MemoryDataStorageManager}, with one primary and one shadow replica
+ * coordinated through a real {@link ZKTestEnv}. Follows the same harness
+ * convention as {@link ShadowE2ETest} — both live in the IS module and run
+ * in the core (non-cluster) Maven test category because {@link ZKTestEnv}
+ * starts an embedded ZK/Bookie pair in the same JVM.
+ *
+ *
Coverage:
+ *
+ * - Refusal when graph file is present — the IS rejects the
+ * delete because the segment's multipart graph is reachable via the
+ * shared {@link MemoryDataStorageManager} (the same property the
+ * production refusal gate keys off in the issue #617 scenario,
+ * where {@code force=false} prevents accidental deletes).
+ * - Force override removes the segment — with {@code force=true}
+ * the primary's in-memory segment count drops, the IS re-publishes
+ * a fresh {@link IndexingServiceCheckpointState}, the shadow's
+ * reload counter advances, and the shadow's loaded segment count
+ * on the dropped-segment store matches the primary's (zero).
+ * - purge_storage=true also deletes the multipart files — after
+ * the force-delete the IS reports {@code storage_purged=true} and a
+ * subsequent existence probe against the storage manager returns
+ * false for both the {@code graph} and {@code map} keys.
+ *
+ *
+ * This test deliberately calls
+ * {@link IndexingServiceEngine#deleteSegment(String, String, String, boolean, boolean)}
+ * directly rather than going through the gRPC layer — the gRPC wire path
+ * is covered by {@code herddb.indexing.admin.IndexingAdminCliDeleteSegmentTest}
+ * (a focused fake-gRPC test). Coupling both layers in a single test would
+ * add cluster-mode wiring (an in-process gRPC server) without exercising
+ * a code path the CLI test does not already cover.
+ */
+public class ShadowDeleteSegmentE2ETest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private ZKTestEnv zk;
+ private ZookeeperMetadataStorageManager metadata;
+ private int savedMinLive;
+ private long savedDeferral;
+ private final List shadowMetadatas = new ArrayList<>();
+
+ @Before
+ public void setUp() throws Exception {
+ zk = new ZKTestEnv(folder.newFolder("zk").toPath());
+ zk.startBookieAndInitCluster();
+ metadata = new ZookeeperMetadataStorageManager(
+ zk.getAddress(), zk.getTimeout(), zk.getPath());
+ metadata.start();
+ metadata.ensureDefaultTableSpace("local", "local", 0, 1);
+
+ savedMinLive = PersistentVectorStore.minLiveVectorsForCheckpoint;
+ savedDeferral = PersistentVectorStore.maxCheckpointDeferralMs;
+ // Same gates lifted as in ShadowE2ETest so a tiny workload still
+ // produces a real persisted segment.
+ PersistentVectorStore.minLiveVectorsForCheckpoint = 0;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ PersistentVectorStore.minLiveVectorsForCheckpoint = savedMinLive;
+ PersistentVectorStore.maxCheckpointDeferralMs = savedDeferral;
+ for (ZookeeperMetadataStorageManager sm : shadowMetadatas) {
+ try {
+ sm.close();
+ } catch (Exception ignore) {
+ // teardown best-effort — match ShadowE2ETest pattern
+ }
+ }
+ if (metadata != null) {
+ try {
+ metadata.close();
+ } catch (Exception ignore) {
+ // teardown best-effort
+ }
+ }
+ if (zk != null) {
+ zk.close();
+ }
+ }
+
+ private static Table createTable() {
+ return Table.builder()
+ .name("vectable")
+ .tablespace(TableSpace.DEFAULT)
+ .column("pk", ColumnTypes.STRING)
+ .column("vec", ColumnTypes.FLOATARRAY)
+ .primaryKey("pk")
+ .build();
+ }
+
+ private static Index createIndex(String uuid) {
+ return Index.builder()
+ .name("vidx").uuid(uuid)
+ .table("vectable")
+ .type(Index.TYPE_VECTOR)
+ .column("vec", ColumnTypes.FLOATARRAY)
+ .build();
+ }
+
+ private static float[] randomVector(Random rng, int dim) {
+ float[] v = new float[dim];
+ for (int i = 0; i < dim; i++) {
+ v[i] = rng.nextFloat();
+ }
+ return v;
+ }
+
+ private IndexingServiceEngine newPrimary(MemoryDataStorageManager dsm, MemoryManager mm,
+ String stableUuid,
+ AtomicReference storeRef) throws Exception {
+ Path logDir = folder.newFolder().toPath();
+ Path dataDir = folder.newFolder().toPath();
+ Properties props = new Properties();
+ props.setProperty(IndexingServerConfiguration.PROPERTY_STORAGE_TYPE, "memory");
+ props.setProperty(IndexingServerConfiguration.PROPERTY_TABLESPACE_NAME, TableSpace.DEFAULT);
+ IndexingServerConfiguration config = new IndexingServerConfiguration(props);
+ IndexingServiceEngine engine = new IndexingServiceEngine(logDir, dataDir, config);
+ engine.setMetadataStorageManager(metadata);
+ engine.setDataStorageManager(dsm);
+ engine.setMemoryManager(mm);
+ engine.setVectorStoreFactory((indexName, tableName, vectorColumnName, dataDirArg, indexProperties) -> {
+ PersistentVectorStore pvs = new PersistentVectorStore(
+ indexName, tableName, engine.getTableSpaceUUID(), vectorColumnName,
+ stableUuid, dataDirArg, dsm, mm,
+ 16, 100, 1.2f, 1.4f, true, 2_000_000_000L, 0,
+ Long.MAX_VALUE,
+ VectorSimilarityFunction.EUCLIDEAN);
+ try {
+ pvs.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ storeRef.set(pvs);
+ return pvs;
+ });
+ return engine;
+ }
+
+ private IndexingServiceEngine newShadow(MemoryDataStorageManager dsm, MemoryManager mm,
+ int shadowOf) throws Exception {
+ Path logDir = folder.newFolder().toPath();
+ Path dataDir = folder.newFolder().toPath();
+ Properties props = new Properties();
+ props.setProperty(IndexingServerConfiguration.PROPERTY_STORAGE_TYPE, "memory");
+ props.setProperty(IndexingServerConfiguration.PROPERTY_TABLESPACE_NAME, TableSpace.DEFAULT);
+ props.setProperty(IndexingServerConfiguration.PROPERTY_ROLE,
+ IndexingServerConfiguration.ROLE_SHADOW);
+ props.setProperty(IndexingServerConfiguration.PROPERTY_SHADOW_OF, Integer.toString(shadowOf));
+ props.setProperty(IndexingServerConfiguration.PROPERTY_NUM_INSTANCES, "1");
+ IndexingServerConfiguration config = new IndexingServerConfiguration(props);
+ IndexingServiceEngine engine = new IndexingServiceEngine(logDir, dataDir, config);
+ ZookeeperMetadataStorageManager perShadow = new ZookeeperMetadataStorageManager(
+ zk.getAddress(), zk.getTimeout(), zk.getPath());
+ perShadow.start();
+ shadowMetadatas.add(perShadow);
+ engine.setMetadataStorageManager(perShadow);
+ engine.setDataStorageManager(dsm);
+ engine.setMemoryManager(mm);
+ return engine;
+ }
+
+ private void seedDsmSchema(MemoryDataStorageManager dsm, String tsUuid, Table table, Index idx)
+ throws Exception {
+ dsm.writeTables(tsUuid, LogSequenceNumber.START_OF_TIME,
+ Arrays.asList(table), Arrays.asList(idx), false);
+ }
+
+ /**
+ * Full lifecycle test:
+ * 1. Primary ingests 128 vectors, checkpoints, and produces ≥1 segment.
+ * 2. A shadow starts and catches up (reloadCount == 1).
+ * 3. {@code deleteSegment(force=false)} is refused — graph file present.
+ * 4. {@code deleteSegment(force=true, purge_storage=true)} succeeds:
+ * - primary's {@code segmentCount} drops by one;
+ * - storage_purged == true;
+ * - graph + map files are no longer reachable in the DSM.
+ * 5. The shadow observes the new checkpoint state (reloadCount advances).
+ */
+ @Test
+ public void deleteSegmentReducesPrimaryCountAndNotifiesShadow() throws Exception {
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+ MemoryManager mm = new MemoryManager(128 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024);
+ final String stableUuid = "idx-uuid-617";
+
+ AtomicReference primaryStore = new AtomicReference<>();
+ IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore);
+ primary.start();
+
+ Table t = createTable();
+ Index idx = createIndex(stableUuid);
+ primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null));
+ primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null));
+
+ Random rng = new Random(617);
+ int dim = 16;
+ LogSequenceNumber last = null;
+ for (int i = 0; i < 128; i++) {
+ Record r = RecordSerializer.makeRecord(t,
+ "pk", "k" + i, "vec", randomVector(rng, dim));
+ LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null);
+ last = new LogSequenceNumber(1, 100 + i);
+ primary.applySingleEntryForTest(last, ins);
+ }
+ primary.awaitPendingWorkForTest();
+ primary.setLastProcessedLsnForTest(last);
+ primary.forceCheckpointAndSaveWatermark();
+
+ seedDsmSchema(dsm, primary.getTableSpaceUUID(), t, idx);
+
+ metadata.registerIndexingServiceInstance(
+ IndexingServiceInstanceDescriptor.primary(
+ "p0", "dummy-addr:0", 0));
+ IndexingServiceCheckpointState beforeDelete =
+ metadata.getIndexingServiceCheckpointState(0);
+ assertNotNull("primary must have published checkpoint state", beforeDelete);
+
+ // A real segment was created by the checkpoint.
+ PersistentVectorStore pvs = primaryStore.get();
+ assertNotNull("primary store must be initialised", pvs);
+ int segmentCountBefore = pvs.getSegmentCount();
+ assertTrue("primary checkpoint must produce ≥1 segment, got " + segmentCountBefore,
+ segmentCountBefore >= 1);
+ List keysBefore = pvs.getSegmentStorageKeysSnapshot();
+ assertEquals(segmentCountBefore, keysBefore.size());
+ String targetSegment = keysBefore.get(0);
+
+ // Step 2: boot a shadow and let it catch up to reloadCount == 1.
+ IndexingServiceEngine shadow = newShadow(dsm, mm, 0);
+ shadow.start();
+ try {
+ assertTrue("shadow must become ready", shadow.isShadowReady());
+ assertEquals("shadow must have reloaded exactly once",
+ 1, shadow.getShadowReloadCount());
+
+ // Step 3: refuse the delete when force=false. The MemoryDataStorageManager
+ // honours multipartIndexFileExists via the default implementation, so the
+ // graph file is reachable → engine refuses.
+ try {
+ primary.deleteSegment("vectable", "vidx", targetSegment,
+ /* purgeStorage */ false, /* force */ false);
+ org.junit.Assert.fail("delete must be refused while graph file is present");
+ } catch (IndexingServiceEngine.DeleteSegmentException expected) {
+ assertTrue("refusal message must mention force flag, got: " + expected.getMessage(),
+ expected.getMessage().toLowerCase().contains("force"));
+ }
+ // Confirm the refusal did not mutate state.
+ assertEquals("refused delete must NOT alter segment count",
+ segmentCountBefore, pvs.getSegmentCount());
+
+ long reloadCountBeforeForce = shadow.getShadowReloadCount();
+ IndexingServiceCheckpointState stateBeforeForce =
+ metadata.getIndexingServiceCheckpointState(0);
+ assertNotNull(stateBeforeForce);
+
+ // Step 4: force + purge.
+ IndexingServiceEngine.DeleteSegmentResult result =
+ primary.deleteSegment("vectable", "vidx", targetSegment,
+ /* purgeStorage */ true, /* force */ true);
+ assertTrue("force-delete must report removed=true", result.removed);
+ assertEquals(targetSegment, result.segment);
+ assertTrue("force-delete must report graph_file_present=true",
+ result.graphFilePresent);
+ assertTrue("force-delete with purge must report storage_purged=true",
+ result.storagePurged);
+
+ assertEquals("primary segment count must have dropped by one",
+ segmentCountBefore - 1, pvs.getSegmentCount());
+
+ // Storage purge must have actually removed the multipart files.
+ assertFalse("graph multipart file must be gone after purge",
+ dsm.multipartIndexFileExists(primary.getTableSpaceUUID(),
+ targetSegment, "graph"));
+ assertFalse("map multipart file must be gone after purge",
+ dsm.multipartIndexFileExists(primary.getTableSpaceUUID(),
+ targetSegment, "map"));
+
+ // The primary must have written a fresh checkpoint state to ZK
+ // as part of the deleteSegment path. We verify this directly
+ // before waiting on the shadow — a missing republish would
+ // mask a regression in the engine-level notification logic
+ // behind a generic "reloadCount didn't advance" failure.
+ IndexingServiceCheckpointState stateAfterDelete =
+ metadata.getIndexingServiceCheckpointState(0);
+ assertNotNull(stateAfterDelete);
+ assertEquals("post-delete state must carry the new segment count",
+ pvs.getSegmentCount(), stateAfterDelete.getSegmentCount());
+ assertTrue("post-delete state timestamp must advance, before="
+ + stateBeforeForce.getTimestampMillis()
+ + " after=" + stateAfterDelete.getTimestampMillis(),
+ stateAfterDelete.getTimestampMillis() >= stateBeforeForce.getTimestampMillis());
+
+ // Step 5: shadow notification — wait for reloadCount to advance.
+ // 30s deadline is consistent with awaitShadowReloadsForTest()'s
+ // internal 30s barrier (the watch may arrive slightly after the
+ // setData call returns).
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (shadow.getShadowReloadCount() <= reloadCountBeforeForce
+ && System.currentTimeMillis() < deadline) {
+ Thread.sleep(50);
+ }
+ assertTrue("shadow must react to the post-delete republish, "
+ + "reloadCount before=" + reloadCountBeforeForce
+ + " now=" + shadow.getShadowReloadCount(),
+ shadow.getShadowReloadCount() > reloadCountBeforeForce);
+ } finally {
+ shadow.close();
+ primary.close();
+ }
+ }
+
+ /**
+ * Force-delete WITHOUT {@code purge_storage}: the in-memory removal must
+ * still succeed, but the engine must report {@code storage_purged=false}
+ * and the multipart files must remain in the DSM (for post-mortem
+ * forensics, as documented in issue #617).
+ */
+ @Test
+ public void forceDeleteWithoutPurgeKeepsMultipartFiles() throws Exception {
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+ MemoryManager mm = new MemoryManager(128 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024);
+ final String stableUuid = "idx-uuid-617-nopurge";
+
+ AtomicReference primaryStore = new AtomicReference<>();
+ IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore);
+ primary.start();
+
+ Table t = createTable();
+ Index idx = createIndex(stableUuid);
+ primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null));
+ primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null));
+
+ Random rng = new Random(2026);
+ int dim = 12;
+ LogSequenceNumber last = null;
+ for (int i = 0; i < 64; i++) {
+ Record r = RecordSerializer.makeRecord(t,
+ "pk", "k" + i, "vec", randomVector(rng, dim));
+ LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null);
+ last = new LogSequenceNumber(1, 200 + i);
+ primary.applySingleEntryForTest(last, ins);
+ }
+ primary.awaitPendingWorkForTest();
+ primary.setLastProcessedLsnForTest(last);
+ primary.forceCheckpointAndSaveWatermark();
+ seedDsmSchema(dsm, primary.getTableSpaceUUID(), t, idx);
+ metadata.registerIndexingServiceInstance(
+ IndexingServiceInstanceDescriptor.primary("p0", "dummy:0", 0));
+
+ try {
+ PersistentVectorStore pvs = primaryStore.get();
+ assertNotNull(pvs);
+ List keys = pvs.getSegmentStorageKeysSnapshot();
+ assertTrue("must have at least one segment", keys.size() >= 1);
+ String target = keys.get(0);
+
+ IndexingServiceEngine.DeleteSegmentResult result = primary.deleteSegment(
+ "vectable", "vidx", target,
+ /* purgeStorage */ false, /* force */ true);
+
+ assertTrue(result.removed);
+ assertFalse("storage_purged must be false when purge_storage=false",
+ result.storagePurged);
+ // Files must still be on disk for forensics.
+ assertTrue("graph file must remain in DSM when purge_storage=false",
+ dsm.multipartIndexFileExists(primary.getTableSpaceUUID(), target, "graph"));
+ } finally {
+ primary.close();
+ }
+ }
+
+ /**
+ * Negative path: requesting a segment that is not loaded must throw
+ * {@link IndexingServiceEngine.DeleteSegmentException} with a message
+ * that lists the currently-loaded segments. This is the operator-
+ * friendly counterpart of the issue #617 reproduction, where the
+ * operator pastes the wrong segment name.
+ */
+ @Test
+ public void unknownSegmentIsRejectedWithDiagnosticMessage() throws Exception {
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+ MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024);
+ final String stableUuid = "idx-uuid-617-unknown";
+
+ AtomicReference primaryStore = new AtomicReference<>();
+ IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore);
+ primary.start();
+ try {
+ Table t = createTable();
+ Index idx = createIndex(stableUuid);
+ primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null));
+ primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null));
+ // No checkpoint → store has no segments. Still a valid lookup target.
+
+ try {
+ primary.deleteSegment("vectable", "vidx", "vidx_nonsense_seg999",
+ false, true);
+ org.junit.Assert.fail("unknown segment must be rejected");
+ } catch (IndexingServiceEngine.DeleteSegmentException expected) {
+ assertTrue("error must mention the missing segment, got: " + expected.getMessage(),
+ expected.getMessage().contains("vidx_nonsense_seg999"));
+ }
+ } finally {
+ primary.close();
+ }
+ }
+
+ /**
+ * pr-reviewer follow-up #4 for issue #617: a shadow that boots AFTER
+ * the primary has already executed {@code deleteSegment(force=true,
+ * purge=true)} must load the smaller (post-delete) segment count on its
+ * very first reload — exactly the operational sequence operators
+ * follow when they delete an orphan segment on a primary, then bring
+ * up a fresh shadow from the cleaned-up checkpoint.
+ *
+ * Asserts:
+ *
+ * - the primary's segment count drops by one immediately;
+ * - the shadow becomes ready;
+ * - {@code shadowReloadCount == 1} (the cold-boot reload counts
+ * as the first observation);
+ * - the shadow's loaded segment count matches the primary's
+ * post-delete count.
+ *
+ */
+ @Test
+ public void lateBootShadowObservesPostDeleteState() throws Exception {
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+ MemoryManager mm = new MemoryManager(128 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024);
+ final String stableUuid = "idx-uuid-617-lateboot";
+
+ AtomicReference primaryStore = new AtomicReference<>();
+ IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore);
+ primary.start();
+
+ Table t = createTable();
+ Index idx = createIndex(stableUuid);
+ primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null));
+ primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null));
+
+ Random rng = new Random(617_42);
+ int dim = 12;
+ LogSequenceNumber last = null;
+ // Build enough vectors that a single checkpoint produces ≥1 segment,
+ // then a SECOND checkpoint produces another segment so the delete
+ // below has more than one segment to choose from. We then delete
+ // exactly one and verify the shadow loads the remaining count.
+ for (int batch = 0; batch < 2; batch++) {
+ for (int i = 0; i < 64; i++) {
+ Record r = RecordSerializer.makeRecord(t,
+ "pk", "k" + batch + "_" + i, "vec", randomVector(rng, dim));
+ LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null);
+ last = new LogSequenceNumber(1, 300 + batch * 100 + i);
+ primary.applySingleEntryForTest(last, ins);
+ }
+ primary.awaitPendingWorkForTest();
+ primary.setLastProcessedLsnForTest(last);
+ primary.forceCheckpointAndSaveWatermark();
+ }
+ seedDsmSchema(dsm, primary.getTableSpaceUUID(), t, idx);
+ metadata.registerIndexingServiceInstance(
+ IndexingServiceInstanceDescriptor.primary("p0", "dummy:0", 0));
+
+ IndexingServiceEngine shadow = null;
+ try {
+ PersistentVectorStore pvs = primaryStore.get();
+ assertNotNull(pvs);
+ int beforeDelete = pvs.getSegmentCount();
+ assertTrue("primary must have ≥2 segments before the delete to make the"
+ + " post-delete comparison non-trivial, got " + beforeDelete,
+ beforeDelete >= 2);
+
+ List keys = pvs.getSegmentStorageKeysSnapshot();
+ String victim = keys.get(0);
+
+ // Step 1: primary-side delete BEFORE the shadow is ever
+ // started. force=true + purge=true is the operational
+ // sequence for orphaned segments.
+ IndexingServiceEngine.DeleteSegmentResult result = primary.deleteSegment(
+ "vectable", "vidx", victim,
+ /* purgeStorage */ true, /* force */ true);
+ assertTrue("primary-side delete must succeed", result.removed);
+ assertTrue("graph file was present before the delete", result.graphFilePresent);
+ assertTrue("purge=true must report storage_purged=true", result.storagePurged);
+
+ int afterDelete = pvs.getSegmentCount();
+ assertEquals("primary segment count must drop by exactly one",
+ beforeDelete - 1, afterDelete);
+
+ // Step 2: only now boot the shadow. The shadow has never seen
+ // the pre-delete IndexStatus — it loads the (already smaller)
+ // post-delete one on its very first reload.
+ shadow = newShadow(dsm, mm, 0);
+ shadow.start();
+ assertTrue("late-boot shadow must become ready", shadow.isShadowReady());
+
+ assertEquals("shadow must have reloaded exactly once on cold boot",
+ 1L, shadow.getShadowReloadCount());
+
+ // Step 3: the shadow's loaded segment count must match the
+ // primary's POST-delete count. A regression that leaks the
+ // pre-delete IndexStatus into the shadow's load path would
+ // produce a mismatch here.
+ int shadowSegments = shadow.getSegmentCountForTest("vectable", "vidx");
+ assertEquals("late-boot shadow must observe the post-delete segment count",
+ afterDelete, shadowSegments);
+ } finally {
+ if (shadow != null) {
+ shadow.close();
+ }
+ primary.close();
+ }
+ }
+}
diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java
new file mode 100644
index 000000000..819a3b982
--- /dev/null
+++ b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java
@@ -0,0 +1,230 @@
+/*
+ Licensed to Diennea S.r.l. under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. Diennea S.r.l. licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+ */
+package herddb.indexing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import herddb.indexing.proto.DeleteSegmentRequest;
+import herddb.indexing.proto.IndexingServiceGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.nio.file.Paths;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * pr-reviewer follow-up #4 for issue #617: exercises the
+ * real-server shadow gate on the {@code DeleteSegment} RPC.
+ *
+ * The existing
+ * {@code IndexingAdminCliDeleteSegmentTest.shadowConfiguredServerRejectsWithNotPrimaryPrefix}
+ * test runs against a hand-rolled gRPC fake that returns a hard-coded
+ * {@code FAILED_PRECONDITION} response — it validates the CLI's surface,
+ * not the production gate at
+ * {@link IndexingServiceImpl#deleteSegment(DeleteSegmentRequest,
+ * io.grpc.stub.StreamObserver)}. A regression that deleted the production
+ * gate would still pass that fake-server test.
+ *
+ *
This test mirrors the {@link ShadowAdminCliAndRpcTest} harness: a real
+ * {@link IndexingServiceImpl} wired to a real {@link IndexingServiceEngine}
+ * stub configured as a shadow ({@code isConfiguredAsShadow() == true}),
+ * exposed via an in-process gRPC server, invoked via the real blocking
+ * stub. The engine's {@code deleteSegment} is intentionally NOT overridden
+ * — the test would FAIL if it were ever reached, proving the production
+ * shadow gate at the service boundary is what's blocking the call.
+ *
+ *
Asserts the documented error contract:
+ *
+ * - {@code Status.Code == FAILED_PRECONDITION};
+ * - description starts with
+ * {@code "not_primary: this indexing-service instance is a shadow replica (shadowOf="};
+ * - description contains {@code "target the primary instead"}.
+ *
+ */
+public class ShadowDeleteSegmentRpcGateTest {
+
+ /**
+ * Minimal {@link IndexingServiceEngine} subclass: exposes shadow=true with
+ * a configurable shadowOf, and BLOWS UP if anything ever calls the
+ * engine's own {@code deleteSegment} — that would mean the service-level
+ * shadow gate was bypassed.
+ */
+ private static final class ShadowStubEngine extends IndexingServiceEngine {
+ volatile boolean shadow = true;
+ volatile int shadowOf = 0;
+
+ ShadowStubEngine() {
+ super(Paths.get("/tmp/noop-log-shadow-rpc"),
+ Paths.get("/tmp/noop-data-shadow-rpc"),
+ new IndexingServerConfiguration());
+ }
+
+ @Override
+ public boolean isConfiguredAsShadow() {
+ return shadow;
+ }
+
+ @Override
+ public boolean isShadowReady() {
+ // Ready=true so the IndexingServiceImpl @Override interceptor for
+ // shadow-not-ready does NOT short-circuit our shadow gate ahead of
+ // time. Ready vs not-ready is orthogonal to the DeleteSegment gate
+ // — we want to prove the DeleteSegment gate fires even when the
+ // shadow is otherwise healthy.
+ return true;
+ }
+
+ @Override
+ public int getShadowOfOrMinusOne() {
+ return shadowOf;
+ }
+
+ @Override
+ public String getInstanceIdLabel() {
+ return "shadow-stub";
+ }
+
+ @Override
+ public String getTableSpaceUUID() {
+ return "ts-uuid";
+ }
+
+ @Override
+ public DeleteSegmentResult deleteSegment(String table, String indexName,
+ String segmentStorageKey,
+ boolean purgeStorage, boolean force) {
+ // If this is ever reached the service-level shadow gate was
+ // deleted or bypassed. The test asserts that this does NOT
+ // happen — the gRPC stub call must terminate inside
+ // IndexingServiceImpl.deleteSegment with FAILED_PRECONDITION,
+ // before this method is ever invoked.
+ throw new AssertionError("engine.deleteSegment must NOT be reached "
+ + "when the service is configured as a shadow — the production"
+ + " gate at IndexingServiceImpl.deleteSegment must short-circuit"
+ + " the RPC with not_primary:");
+ }
+ }
+
+ private ShadowStubEngine engine;
+ private Server server;
+ private ManagedChannel channel;
+ private IndexingServiceGrpc.IndexingServiceBlockingStub stub;
+
+ @Before
+ public void setUp() throws Exception {
+ engine = new ShadowStubEngine();
+ IndexingServiceImpl svc = new IndexingServiceImpl(engine, NullStatsLogger.INSTANCE);
+ server = ServerBuilder.forPort(0).addService(svc).build().start();
+ channel = ManagedChannelBuilder.forAddress("localhost", server.getPort())
+ .usePlaintext().build();
+ stub = IndexingServiceGrpc.newBlockingStub(channel);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (channel != null) {
+ channel.shutdownNow();
+ }
+ if (server != null) {
+ server.shutdownNow().awaitTermination();
+ }
+ }
+
+ /**
+ * pr-reviewer follow-up #4: the real-server gate must reject the RPC
+ * with the documented FAILED_PRECONDITION prefix WITHOUT ever entering
+ * the engine. ShadowStubEngine.deleteSegment throws AssertionError if
+ * reached — that fails the test, proving the gate is real.
+ */
+ @Test
+ public void deleteSegmentRpcOnShadowReturnsNotPrimaryFailedPrecondition() {
+ engine.shadow = true;
+ engine.shadowOf = 0;
+
+ DeleteSegmentRequest request = DeleteSegmentRequest.newBuilder()
+ .setTablespace("ts")
+ .setTable("vectable")
+ .setIndex("vidx")
+ .setSegment("vidx_aaa_seg1")
+ .setPurgeStorage(false)
+ .setForce(false)
+ .build();
+ try {
+ stub.deleteSegment(request);
+ fail("DeleteSegment RPC on shadow must throw StatusRuntimeException "
+ + "with FAILED_PRECONDITION");
+ } catch (StatusRuntimeException e) {
+ assertEquals("status code must be FAILED_PRECONDITION",
+ Status.Code.FAILED_PRECONDITION, e.getStatus().getCode());
+ String desc = e.getStatus().getDescription();
+ assertNotNull("status must carry a description", desc);
+ String expectedPrefix = "not_primary: this indexing-service instance is a"
+ + " shadow replica (shadowOf=";
+ assertTrue("status description must start with the documented "
+ + "not_primary prefix, got: " + desc,
+ desc.startsWith(expectedPrefix));
+ assertTrue("status description must instruct the operator to target"
+ + " the primary, got: " + desc,
+ desc.contains("target the primary instead"));
+ // Sanity: shadowOf=0 from our stub must appear in the message.
+ assertTrue("status description must include the shadowOf integer, got: " + desc,
+ desc.contains("shadowOf=0"));
+ }
+ }
+
+ /**
+ * Defensive: when shadow=true but shadowOf=-1 (configured-as-shadow
+ * without a numeric pointer), the gate still fires with the documented
+ * prefix. shadowOf="-1" still flows through the message verbatim.
+ */
+ @Test
+ public void deleteSegmentRpcRejectsEvenWithoutKnownShadowOf() {
+ engine.shadow = true;
+ engine.shadowOf = -1;
+
+ DeleteSegmentRequest request = DeleteSegmentRequest.newBuilder()
+ .setTable("vectable")
+ .setIndex("vidx")
+ .setSegment("vidx_aaa_seg2")
+ .build();
+ try {
+ stub.deleteSegment(request);
+ fail("DeleteSegment RPC on shadow with shadowOf=-1 must still be"
+ + " rejected with FAILED_PRECONDITION");
+ } catch (StatusRuntimeException e) {
+ assertEquals(Status.Code.FAILED_PRECONDITION, e.getStatus().getCode());
+ String desc = e.getStatus().getDescription();
+ assertNotNull(desc);
+ assertTrue("description must start with not_primary prefix, got: " + desc,
+ desc.startsWith("not_primary: this indexing-service instance is a"
+ + " shadow replica (shadowOf="));
+ assertTrue("description must instruct to target the primary, got: " + desc,
+ desc.contains("target the primary instead"));
+ }
+ }
+}
diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java
new file mode 100644
index 000000000..85fd00883
--- /dev/null
+++ b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java
@@ -0,0 +1,550 @@
+/*
+ Licensed to Diennea S.r.l. under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. Diennea S.r.l. licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+ */
+
+package herddb.indexing.admin;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import herddb.indexing.proto.DeleteSegmentRequest;
+import herddb.indexing.proto.DeleteSegmentResponse;
+import herddb.indexing.proto.IndexingServiceGrpc;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Issue #617: wire-level tests for the {@code delete-segment} sub-command of
+ * {@link IndexingAdminCli}. Each test boots a tiny in-process gRPC server
+ * whose {@code deleteSegment} implementation records the inbound request and
+ * returns a hand-crafted response, then drives the CLI's {@link
+ * IndexingAdminCli#run(String[])} entry point with the relevant flags
+ * (purge-storage, force, --yes, --json, stdin confirmation) and asserts on:
+ *
+ *
+ * - the exact wire request the CLI emitted (so the operator's flags
+ * reach the IS without translation drift);
+ * - the process exit code (0 when the server reported
+ * {@code removed=true}, 1 when {@code removed=false} — so wrapper
+ * scripts can distinguish "I removed it" from "it was already
+ * gone");
+ * - the stdout payload (text vs. JSON);
+ * - the stdin-confirmation behaviour (aborts cleanly on "n",
+ * proceeds on "y", and aborts when stdin is closed).
+ *
+ *
+ * Engine-level coverage (a real {@link herddb.indexing.vector.PersistentVectorStore}
+ * with real segments + shadow notification on segment removal) lives in
+ * {@code herddb.indexing.ShadowDeleteSegmentE2ETest}.
+ */
+public class IndexingAdminCliDeleteSegmentTest {
+
+ private FakeDeleteSegmentServer fakeServer;
+ private ByteArrayOutputStream stdoutBytes;
+ private ByteArrayOutputStream stderrBytes;
+ private IndexingAdminCli cli;
+ private InputStream savedStdin;
+
+ @Before
+ public void setUp() {
+ stdoutBytes = new ByteArrayOutputStream();
+ stderrBytes = new ByteArrayOutputStream();
+ cli = new IndexingAdminCli(
+ new PrintStream(stdoutBytes, true, StandardCharsets.UTF_8),
+ new PrintStream(stderrBytes, true, StandardCharsets.UTF_8));
+ savedStdin = System.in;
+ }
+
+ @After
+ public void tearDown() throws InterruptedException {
+ System.setIn(savedStdin);
+ if (fakeServer != null) {
+ fakeServer.close();
+ }
+ }
+
+ private String stdout() {
+ return new String(stdoutBytes.toByteArray(), StandardCharsets.UTF_8);
+ }
+
+ private String stderr() {
+ return new String(stderrBytes.toByteArray(), StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Happy path: {@code --yes} skips the confirmation prompt, and the
+ * server reports a successful removal. The CLI must forward every flag
+ * to the server (in particular {@code purge-storage} → {@code true}
+ * and {@code force} → {@code false}), exit with code 0, and emit JSON
+ * carrying every response field.
+ */
+ @Test
+ public void happyPathYesPurgeJsonReportsRemovedAndExits0() throws Exception {
+ fakeServer = new FakeDeleteSegmentServer(
+ req -> DeleteSegmentResponse.newBuilder()
+ .setSegment(req.getSegment())
+ .setRemoved(true)
+ .setVectorsLost(12345L)
+ .setGraphFilePresent(false)
+ .setStoragePurged(req.getPurgeStorage())
+ .build());
+ fakeServer.start();
+
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--tablespace", "ts",
+ "--table", "vectable",
+ "--index", "vidx",
+ "--segment", "vidx_aaa_seg627",
+ "--purge-storage",
+ "--yes",
+ "--json"
+ });
+ assertEquals("CLI should succeed; stderr=\n" + stderr(), 0, rc);
+
+ DeleteSegmentRequest captured = fakeServer.lastRequest();
+ assertNotNull("server must have received the request", captured);
+ assertEquals("ts", captured.getTablespace());
+ assertEquals("vectable", captured.getTable());
+ assertEquals("vidx", captured.getIndex());
+ assertEquals("vidx_aaa_seg627", captured.getSegment());
+ assertTrue("CLI must forward --purge-storage", captured.getPurgeStorage());
+ assertFalse("CLI must NOT set force when --force is absent", captured.getForce());
+
+ String out = stdout().trim();
+ assertTrue("JSON output expected, got:\n" + out, out.startsWith("{") && out.endsWith("}"));
+ assertTrue("must include the segment name, got:\n" + out,
+ out.contains("\"segment\":\"vidx_aaa_seg627\""));
+ assertTrue("must include removed=true, got:\n" + out,
+ out.contains("\"removed\":true"));
+ assertTrue("must include vectors_lost, got:\n" + out,
+ out.contains("\"vectors_lost\":12345"));
+ assertTrue("must include storage_purged=true, got:\n" + out,
+ out.contains("\"storage_purged\":true"));
+ }
+
+ /**
+ * Text mode (no {@code --json}) — the CLI must emit a single
+ * human-readable line containing the same fields the JSON mode
+ * emits, in {@code key=value} form.
+ */
+ @Test
+ public void textModeProducesSingleLineSummary() throws Exception {
+ fakeServer = new FakeDeleteSegmentServer(
+ req -> DeleteSegmentResponse.newBuilder()
+ .setSegment(req.getSegment())
+ .setRemoved(true)
+ .setVectorsLost(7L)
+ .setGraphFilePresent(true)
+ .setStoragePurged(false)
+ .build());
+ fakeServer.start();
+
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--table", "vectable",
+ "--index", "vidx",
+ "--segment", "vidx_bbb_seg42",
+ "--force",
+ "--yes"
+ });
+ assertEquals(0, rc);
+ String out = stdout();
+ assertTrue("text output must include segment=, got:\n" + out,
+ out.contains("segment=vidx_bbb_seg42"));
+ assertTrue(out.contains("removed=true"));
+ assertTrue(out.contains("vectors_lost=7"));
+ assertTrue(out.contains("graph_file_present=true"));
+ assertTrue(out.contains("storage_purged=false"));
+ }
+
+ /**
+ * When {@code --force} is set the CLI must forward {@code force=true}
+ * to the server. The server-side refusal is exercised by the engine
+ * E2E test; here we only check the wire-protocol forwarding.
+ */
+ @Test
+ public void forceFlagIsForwardedToServer() throws Exception {
+ AtomicReference captured = new AtomicReference<>();
+ fakeServer = new FakeDeleteSegmentServer(req -> {
+ captured.set(req);
+ return DeleteSegmentResponse.newBuilder()
+ .setSegment(req.getSegment())
+ .setRemoved(true)
+ .setVectorsLost(0L)
+ .setGraphFilePresent(true)
+ .setStoragePurged(false)
+ .build();
+ });
+ fakeServer.start();
+
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--table", "t",
+ "--index", "i",
+ "--segment", "vidx_seg1",
+ "--force",
+ "--yes"
+ });
+ assertEquals(0, rc);
+ assertTrue("CLI must forward --force as force=true",
+ captured.get().getForce());
+ }
+
+ /**
+ * Server-side refusal (mapped to {@code FAILED_PRECONDITION}) must
+ * surface as a non-zero CLI exit code and an error line on stderr —
+ * not a stack trace.
+ */
+ @Test
+ public void serverRefusalIsReportedAsErrorWithNonZeroExit() throws Exception {
+ fakeServer = new FakeDeleteSegmentServer(req -> {
+ throw Status.FAILED_PRECONDITION
+ .withDescription("refusing to delete segment vidx_seg1: graph file IS reachable")
+ .asRuntimeException();
+ });
+ fakeServer.start();
+
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--table", "t",
+ "--index", "i",
+ "--segment", "vidx_seg1",
+ "--yes"
+ });
+ assertTrue("CLI must exit non-zero on server refusal, got rc=" + rc, rc != 0);
+ String err = stderr();
+ assertTrue("stderr must mention the failure, got:\n" + err,
+ err.contains("ERROR") || err.toLowerCase().contains("refus"));
+ }
+
+ /**
+ * When the server returns {@code removed=false} (no-op race), the CLI
+ * exits with code 1 so wrapper scripts can tell "I removed it" from
+ * "it was already gone".
+ */
+ @Test
+ public void removedFalseExitsWithCode1() throws Exception {
+ fakeServer = new FakeDeleteSegmentServer(
+ req -> DeleteSegmentResponse.newBuilder()
+ .setSegment(req.getSegment())
+ .setRemoved(false)
+ .setVectorsLost(0L)
+ .setGraphFilePresent(false)
+ .setStoragePurged(false)
+ .build());
+ fakeServer.start();
+
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--table", "t",
+ "--index", "i",
+ "--segment", "vidx_gone",
+ "--yes"
+ });
+ assertEquals("removed=false must exit 1", 1, rc);
+ }
+
+ /**
+ * Stdin confirmation: when {@code --yes} is absent the CLI must read
+ * one byte from stdin and abort on anything other than {@code y/Y}.
+ * The server must never receive a request in the aborted case.
+ */
+ @Test
+ public void stdinPromptAbortsOnAnyNonYesInput() throws Exception {
+ AtomicReference captured = new AtomicReference<>();
+ fakeServer = new FakeDeleteSegmentServer(req -> {
+ captured.set(req);
+ return DeleteSegmentResponse.newBuilder()
+ .setSegment(req.getSegment())
+ .setRemoved(true)
+ .build();
+ });
+ fakeServer.start();
+
+ System.setIn(new ByteArrayInputStream("n\n".getBytes(StandardCharsets.UTF_8)));
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--table", "t",
+ "--index", "i",
+ "--segment", "vidx_seg1"
+ });
+ assertEquals("rejected prompt must exit non-zero", 1, rc);
+ assertTrue("stderr must mention abort, got:\n" + stderr(),
+ stderr().toLowerCase().contains("abort"));
+ // Critical safety property: the RPC must NOT have been invoked.
+ assertEquals("server must not have received any request when user aborted",
+ null, captured.get());
+ }
+
+ /**
+ * {@code y\n} on stdin (without {@code --yes}) must proceed with the
+ * RPC. Verifies the inverse of the abort path above.
+ */
+ @Test
+ public void stdinPromptProceedsOnYInput() throws Exception {
+ AtomicReference captured = new AtomicReference<>();
+ fakeServer = new FakeDeleteSegmentServer(req -> {
+ captured.set(req);
+ return DeleteSegmentResponse.newBuilder()
+ .setSegment(req.getSegment())
+ .setRemoved(true)
+ .setVectorsLost(1L)
+ .build();
+ });
+ fakeServer.start();
+
+ System.setIn(new ByteArrayInputStream("y\n".getBytes(StandardCharsets.UTF_8)));
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--table", "t",
+ "--index", "i",
+ "--segment", "vidx_seg2"
+ });
+ assertEquals(0, rc);
+ assertNotNull("server must have received the request after y/Y",
+ captured.get());
+ assertEquals("vidx_seg2", captured.get().getSegment());
+ }
+
+ /**
+ * pr-reviewer follow-up #1: when the targeted IS is a shadow replica,
+ * the server short-circuits the RPC with the exact prefix
+ * {@code "not_primary: this indexing-service instance is a shadow
+ * replica (shadowOf=...)"}. The CLI must surface this as a non-zero
+ * exit code with the rejection text on stderr.
+ */
+ @Test
+ public void shadowConfiguredServerRejectsWithNotPrimaryPrefix() throws Exception {
+ // The fake server emulates the IndexingServiceImpl.deleteSegment
+ // shadow gate: it never invokes the engine, just returns
+ // FAILED_PRECONDITION with the exact "not_primary:" message.
+ fakeServer = new FakeDeleteSegmentServer(req -> {
+ throw Status.FAILED_PRECONDITION
+ .withDescription("not_primary: this indexing-service instance is a"
+ + " shadow replica (shadowOf=0); target the primary instead")
+ .asRuntimeException();
+ });
+ fakeServer.start();
+
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--table", "t",
+ "--index", "i",
+ "--segment", "vidx_seg1",
+ "--yes"
+ });
+ assertTrue("CLI must exit non-zero on shadow rejection, got rc=" + rc, rc != 0);
+ String err = stderr();
+ assertTrue("stderr must carry the not_primary: prefix, got:\n" + err,
+ err.contains("not_primary:"));
+ assertTrue("stderr must mention shadowOf=, got:\n" + err,
+ err.contains("shadowOf="));
+ }
+
+ /**
+ * pr-reviewer follow-up #3 (text output): when the server returns
+ * {@code vectors_lost=-1} (the engine's race-sentinel for "the segment
+ * disappeared between the snapshot and the drop call — count is
+ * unknown"), the CLI must NOT print the raw {@code -1} number. Operators
+ * read the field as a count, so {@code -1} would be confusing.
+ * Instead emit {@code vectors_lost=unknown (race)}.
+ */
+ @Test
+ public void textOutputRendersNegativeVectorsLostAsUnknownRace() throws Exception {
+ fakeServer = new FakeDeleteSegmentServer(
+ req -> DeleteSegmentResponse.newBuilder()
+ .setSegment(req.getSegment())
+ .setRemoved(false)
+ .setVectorsLost(-1L)
+ .setGraphFilePresent(false)
+ .setStoragePurged(false)
+ .build());
+ fakeServer.start();
+
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--table", "vectable",
+ "--index", "vidx",
+ "--segment", "vidx_race_seg",
+ "--yes"
+ });
+ // removed=false → rc=1 (existing contract from removedFalseExitsWithCode1).
+ assertEquals("removed=false must still map to rc=1", 1, rc);
+
+ String out = stdout();
+ assertTrue("text output must render -1 as 'unknown (race)', got:\n" + out,
+ out.contains("vectors_lost=unknown (race)"));
+ // And it must NOT include the raw -1, which would be visually
+ // misleading next to the other numeric fields.
+ assertFalse("text output must not include the raw -1 sentinel, got:\n" + out,
+ out.contains("vectors_lost=-1"));
+ // The other fields are still rendered.
+ assertTrue(out.contains("segment=vidx_race_seg"));
+ assertTrue(out.contains("removed=false"));
+ }
+
+ /**
+ * pr-reviewer follow-up #3 (JSON output): same input as above (server
+ * returns {@code vectors_lost=-1}). The JSON form keeps the
+ * {@code vectors_lost} key in the object (so the schema stays stable)
+ * but emits the string {@code "unknown"} instead of the raw number.
+ * Wrapper scripts that consume the JSON can distinguish a real count
+ * from "we don't know" by type-checking the field.
+ */
+ @Test
+ public void jsonOutputRendersNegativeVectorsLostAsUnknownString() throws Exception {
+ fakeServer = new FakeDeleteSegmentServer(
+ req -> DeleteSegmentResponse.newBuilder()
+ .setSegment(req.getSegment())
+ .setRemoved(false)
+ .setVectorsLost(-1L)
+ .setGraphFilePresent(false)
+ .setStoragePurged(false)
+ .build());
+ fakeServer.start();
+
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", fakeServer.address(),
+ "--table", "vectable",
+ "--index", "vidx",
+ "--segment", "vidx_race_seg_json",
+ "--yes",
+ "--json"
+ });
+ assertEquals("removed=false must still map to rc=1", 1, rc);
+
+ String out = stdout().trim();
+ assertTrue("expected JSON object, got:\n" + out,
+ out.startsWith("{") && out.endsWith("}"));
+ // The vectors_lost key MUST still be present (stable schema), but as
+ // a JSON string, not a number. So the substring is the
+ // quoted-key-and-value form.
+ assertTrue("JSON must carry vectors_lost as the string \"unknown\", got:\n" + out,
+ out.contains("\"vectors_lost\":\"unknown\""));
+ // And it must NOT carry the raw -1 sentinel.
+ assertFalse("JSON must not carry the raw -1 vectors_lost value, got:\n" + out,
+ out.contains("\"vectors_lost\":-1"));
+ // Other fields still render correctly.
+ assertTrue(out.contains("\"segment\":\"vidx_race_seg_json\""));
+ assertTrue(out.contains("\"removed\":false"));
+ }
+
+ /**
+ * Missing {@code --segment} flag must exit with usage code 2, not
+ * with a stack trace.
+ */
+ @Test
+ public void missingSegmentFlagExits2() {
+ int rc = cli.run(new String[]{
+ "delete-segment",
+ "--server", "localhost:1",
+ "--table", "t",
+ "--index", "i"
+ // intentionally missing --segment
+ });
+ assertEquals(2, rc);
+ String err = stderr();
+ assertTrue("stderr must mention the missing option, got:\n" + err,
+ err.toLowerCase().contains("segment"));
+ }
+
+ // -------------------- harness --------------------
+
+ @FunctionalInterface
+ private interface DeleteSegmentHandler {
+ DeleteSegmentResponse handle(DeleteSegmentRequest request);
+ }
+
+ private static final class FakeDeleteSegmentServer implements AutoCloseable {
+ private final DeleteSegmentImpl impl;
+ private Server server;
+
+ FakeDeleteSegmentServer(DeleteSegmentHandler handler) {
+ this.impl = new DeleteSegmentImpl(handler);
+ }
+
+ void start() throws IOException {
+ server = ServerBuilder.forPort(0).addService(impl).build().start();
+ assertNotNull(server);
+ }
+
+ String address() {
+ return "localhost:" + server.getPort();
+ }
+
+ DeleteSegmentRequest lastRequest() {
+ return impl.lastRequest.get();
+ }
+
+ @Override
+ public void close() throws InterruptedException {
+ if (server != null) {
+ server.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ private static final class DeleteSegmentImpl
+ extends IndexingServiceGrpc.IndexingServiceImplBase {
+ private final DeleteSegmentHandler handler;
+ private final AtomicReference lastRequest = new AtomicReference<>();
+
+ DeleteSegmentImpl(DeleteSegmentHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void deleteSegment(DeleteSegmentRequest request,
+ StreamObserver responseObserver) {
+ lastRequest.set(request);
+ try {
+ DeleteSegmentResponse resp = handler.handle(request);
+ responseObserver.onNext(resp);
+ responseObserver.onCompleted();
+ } catch (io.grpc.StatusRuntimeException e) {
+ responseObserver.onError(e);
+ }
+ }
+ }
+}
diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java
new file mode 100644
index 000000000..628231f40
--- /dev/null
+++ b/herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java
@@ -0,0 +1,316 @@
+/*
+ Licensed to Diennea S.r.l. under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. Diennea S.r.l. licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+
+package herddb.indexing.vector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import herddb.core.MemoryManager;
+import herddb.mem.MemoryDataStorageManager;
+import herddb.utils.Bytes;
+import io.github.jbellis.jvector.vector.VectorSimilarityFunction;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Random;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * pr-reviewer follow-up #3 for issue #617: direct tests for
+ * {@link PersistentVectorStore#dropSegmentByStorageKey(String)} — the engine
+ * primitive that the operator-facing {@code DeleteSegment} RPC ultimately
+ * invokes after the safety gate, snapshot/refusal checks, and audit log.
+ *
+ * Engine-level coverage (request validation + checkpoint republish +
+ * shadow notification) lives in
+ * {@code herddb.indexing.ShadowDeleteSegmentE2ETest}; CLI/wire coverage in
+ * {@code herddb.indexing.admin.IndexingAdminCliDeleteSegmentTest}. This
+ * class is dedicated to the in-memory mutation primitive so a regression in
+ * the lock discipline / dirty-flag / deferred-close path is caught with the
+ * smallest possible harness.
+ *
+ *
Coverage matrix:
+ *
+ * - (a) happy path: a known segment is removed (gone from
+ * {@code segments}, store marked {@code dirty} for the next
+ * checkpoint);
+ * - (b) idempotence: a second call on the same key returns
+ * {@code SegmentDropResult.NOT_FOUND} without throwing and without
+ * mutating state;
+ * - (c) deferred-close drain: after the drop the
+ * {@code pendingSegmentCloses} queue eventually empties — i.e. the
+ * segment handle is closed (opportunistic drain runs while no
+ * compaction holds the lock);
+ * - (d) compaction-race safety: when a cycle is mid-iteration
+ * (after candidate selection, before {@code rebuildSegment}), a
+ * concurrent drop must NOT cause
+ * {@code CompactionException(CORRUPTION)} — same invariant as issue
+ * #535 inherited by the storage-key path.
+ *
+ */
+public class PersistentVectorStoreAdminDeleteSegmentTest {
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private static final int DIM = 16;
+ private int savedMinLive;
+
+ @Before
+ public void disableDeferral() {
+ savedMinLive = PersistentVectorStore.minLiveVectorsForCheckpoint;
+ // Allow tiny checkpoints to seal so we always produce ≥1 on-disk
+ // segment with a handful of inserts.
+ PersistentVectorStore.minLiveVectorsForCheckpoint = 0;
+ }
+
+ @After
+ public void restoreDeferral() {
+ PersistentVectorStore.minLiveVectorsForCheckpoint = savedMinLive;
+ }
+
+ private static float[] vec(Random rng) {
+ float[] v = new float[DIM];
+ for (int i = 0; i < DIM; i++) {
+ v[i] = rng.nextFloat();
+ }
+ return v;
+ }
+
+ private PersistentVectorStore newStore(Path tmpDir, MemoryDataStorageManager dsm) {
+ MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024);
+ PersistentVectorStore store = new PersistentVectorStore(
+ "vidx-617", "testtable", "tstblspace", "vector_col",
+ tmpDir, dsm, mm,
+ 16, 100, 1.2f, 1.4f, true, 2_000_000_000L, 0,
+ /* compactionIntervalMs */ Long.MAX_VALUE,
+ VectorSimilarityFunction.EUCLIDEAN);
+ // Aggressive policy so the race-test below still has something to
+ // pick when the hook drops the chosen candidate.
+ store.configureCompaction(Long.MAX_VALUE, 1L, Long.MAX_VALUE, 2,
+ Integer.MAX_VALUE, 0);
+ return store;
+ }
+
+ /** Seeds {@code n} on-disk segments via add+checkpoint cycles. */
+ private void seedSegments(PersistentVectorStore store, int n) throws Exception {
+ Random rng = new Random(617);
+ for (int c = 0; c < n; c++) {
+ for (int i = 0; i < 30; i++) {
+ store.addVector(Bytes.from_int(c * 1000 + i), vec(rng));
+ }
+ store.checkpoint();
+ }
+ }
+
+ /**
+ * (a) Happy path: a known segment key is removed, the result reports
+ * {@code removed=true} with a non-negative {@code vectorsLost}, the
+ * segment list shrinks by one, and {@code dirty} is set so the next
+ * checkpoint serialises the smaller list.
+ */
+ @Test(timeout = 30_000)
+ public void happyPathRemovesSegmentAndMarksDirty() throws Exception {
+ Path tmpDir = tmpFolder.newFolder("admin-delete-happy").toPath();
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+
+ try (PersistentVectorStore store = newStore(tmpDir, dsm)) {
+ store.start();
+ seedSegments(store, 3);
+
+ List keysBefore = store.getSegmentStorageKeysSnapshot();
+ int countBefore = store.getSegmentCount();
+ assertTrue("setup must produce ≥1 segment", countBefore >= 1);
+ assertEquals(countBefore, keysBefore.size());
+
+ String victim = keysBefore.get(0);
+
+ // Force a checkpoint AFTER the drop so we can observe the dirty
+ // flag effect: clear the dirty flag first by performing a no-op
+ // checkpoint (which only writes when something has changed
+ // since the last save). If we drop then checkpoint, the
+ // smaller list must be persisted — verified by re-reading
+ // getSegmentStorageKeysSnapshot.
+ store.checkpoint(); // baseline; segment list unchanged
+
+ AbstractVectorStore.SegmentDropResult result =
+ store.dropSegmentByStorageKey(victim);
+
+ assertTrue("happy path must report removed=true", result.removed);
+ assertTrue("vectorsLost must be ≥ 0 on the happy path, got " + result.vectorsLost,
+ result.vectorsLost >= 0L);
+
+ List keysAfter = store.getSegmentStorageKeysSnapshot();
+ assertEquals("segment count must drop by one", countBefore - 1, keysAfter.size());
+ assertFalse("victim must be gone from snapshot", keysAfter.contains(victim));
+
+ // The next checkpoint must actually serialise the change —
+ // a regression where the dirty flag is not set would leave
+ // the on-disk IndexStatus carrying the deleted segment.
+ store.checkpoint();
+ assertEquals("post-checkpoint list still matches",
+ countBefore - 1, store.getSegmentCount());
+ }
+ }
+
+ /**
+ * (b) Idempotent NOT_FOUND on a second call: dropping the same key
+ * twice in a row must NOT throw and must NOT mutate state. The
+ * production CLI relies on this — wrapper scripts can be re-run after
+ * a partial failure without worrying about double-drop semantics.
+ */
+ @Test(timeout = 30_000)
+ public void doubleDropReportsNotFoundWithoutMutatingState() throws Exception {
+ Path tmpDir = tmpFolder.newFolder("admin-delete-idempotent").toPath();
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+
+ try (PersistentVectorStore store = newStore(tmpDir, dsm)) {
+ store.start();
+ seedSegments(store, 2);
+
+ List keysBefore = store.getSegmentStorageKeysSnapshot();
+ assertTrue(keysBefore.size() >= 1);
+ String victim = keysBefore.get(0);
+
+ // First call: removes the segment.
+ AbstractVectorStore.SegmentDropResult first =
+ store.dropSegmentByStorageKey(victim);
+ assertTrue(first.removed);
+
+ int countAfterFirst = store.getSegmentCount();
+
+ // Second call: idempotent NOT_FOUND. State is unchanged.
+ AbstractVectorStore.SegmentDropResult second =
+ store.dropSegmentByStorageKey(victim);
+ assertFalse("second drop must report removed=false", second.removed);
+ assertEquals("NOT_FOUND vectorsLost must be 0",
+ 0L, second.vectorsLost);
+ assertEquals("segment count must NOT change on the second call",
+ countAfterFirst, store.getSegmentCount());
+
+ // The result is the documented sentinel.
+ assertEquals("second call must return the NOT_FOUND sentinel",
+ AbstractVectorStore.SegmentDropResult.NOT_FOUND.removed,
+ second.removed);
+ }
+ }
+
+ /**
+ * (c) Deferred-close drain: the {@code pendingSegmentCloses} queue
+ * must eventually drain after the drop. With no compaction running,
+ * the opportunistic drain in {@code dropSegmentByStorageKey}'s
+ * {@code finally} block should empty the queue synchronously.
+ */
+ @Test(timeout = 30_000)
+ public void pendingSegmentClosesQueueDrainsAfterDrop() throws Exception {
+ Path tmpDir = tmpFolder.newFolder("admin-delete-drain").toPath();
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+
+ try (PersistentVectorStore store = newStore(tmpDir, dsm)) {
+ store.start();
+ seedSegments(store, 2);
+
+ // Idle state: queue must start empty.
+ assertEquals("queue must be empty before the drop",
+ 0, store.getPendingSegmentClosesSizeForTest());
+
+ List keysBefore = store.getSegmentStorageKeysSnapshot();
+ assertTrue(keysBefore.size() >= 1);
+ String victim = keysBefore.get(0);
+
+ AbstractVectorStore.SegmentDropResult result =
+ store.dropSegmentByStorageKey(victim);
+ assertTrue(result.removed);
+
+ // After the drop, with no concurrent compaction holding
+ // compactionLock, the opportunistic drain must have emptied
+ // the queue. The drain runs in the same call's `finally`
+ // block, so it is synchronous from the caller's POV.
+ assertEquals("opportunistic drain must have closed the queued segment "
+ + "since no compaction was running",
+ 0, store.getPendingSegmentClosesSizeForTest());
+ }
+ }
+
+ /**
+ * (d) Compaction-race safety: when a cycle is mid-iteration (after
+ * candidate selection, before {@code rebuildSegment}), a concurrent
+ * {@code dropSegmentByStorageKey} on a candidate must NOT cause
+ * {@code CompactionException(CORRUPTION)}. This is the same
+ * invariant issue #535 fixed for {@code dropSegmentByUuid}; the
+ * storage-key path inherits the same deferred-close protocol so the
+ * same test shape applies.
+ */
+ @Test(timeout = 30_000)
+ public void dropDuringCompactionMustNotCauseCorruption() throws Exception {
+ Path tmpDir = tmpFolder.newFolder("admin-delete-race").toPath();
+ MemoryDataStorageManager dsm = new MemoryDataStorageManager();
+
+ try (PersistentVectorStore store = newStore(tmpDir, dsm)) {
+ store.start();
+ seedSegments(store, 5);
+
+ List keysBefore = store.getSegmentStorageKeysSnapshot();
+ assertTrue("setup must produce ≥2 segments", keysBefore.size() >= 2);
+ String victim = keysBefore.get(0);
+
+ long corruptionBefore = store.getCompactionFailuresCorruptionTotal();
+
+ // Wire the hook to fire the storage-key drop inside the cycle,
+ // AFTER candidates are picked but BEFORE rebuildSegment reads
+ // seg.onDiskGraph. Pre-issue-#535 the synchronous close inside
+ // the drop would have nulled out onDiskGraph and tripped a
+ // CORRUPTION failure; the deferred-close protocol queues the
+ // close until after the cycle's iteration completes.
+ store.setCompactionPostCandidateSelectionHookForTest(() -> {
+ AbstractVectorStore.SegmentDropResult r =
+ store.dropSegmentByStorageKey(victim);
+ // Hook drops the victim synchronously — the cycle still
+ // sees a non-null onDiskGraph because the close was
+ // deferred to the cycle's finally block.
+ assertNotNull("hook drop must have produced a result", r);
+ });
+
+ store.runCompactionCycle();
+
+ // CORE INVARIANT (the fix this code inherits from issue #535):
+ // NO CORRUPTION failure must have been recorded.
+ assertEquals("no CORRUPTION failure must be recorded — the storage-key"
+ + " drop path inherits issue #535's deferred-close protocol",
+ corruptionBefore, store.getCompactionFailuresCorruptionTotal());
+
+ // The victim must be gone from the active list — the drop
+ // already removed it under the writeLock before the hook
+ // returned.
+ assertFalse("victim must be removed from the active segment list",
+ store.getSegmentStorageKeysSnapshot().contains(victim));
+
+ // By the time runCompactionCycle returns, the deferred close
+ // must have drained.
+ assertEquals("pendingSegmentCloses queue must drain by the end of the"
+ + " cycle",
+ 0, store.getPendingSegmentClosesSizeForTest());
+ }
+ }
+}
diff --git a/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java b/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java
index adb260041..d7284db3a 100644
--- a/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java
+++ b/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java
@@ -362,4 +362,20 @@ public void deleteMultipartIndexFile(String tableSpace, String uuid, String file
throws DataStorageManagerException {
activeDelegate.deleteMultipartIndexFile(tableSpace, uuid, fileType);
}
+
+ /**
+ * Issue #617 + pr-reviewer follow-up #5: delegate the multipart presence
+ * probe to the {@link #activeDelegate}. The default
+ * {@link DataStorageManager#multipartIndexFileExists} fallback would still
+ * work after the 1 GiB sentinel fix in the base class, but it routes the
+ * probe through {@link #multipartIndexReaderSupplier} and a synthetic
+ * {@code readInt()} — which is slower and noisier than letting the
+ * delegate (typically a {@link RemoteFileDataStorageManager} or a
+ * {@link ReadReplicaDataStorageManager}) handle the probe with its own
+ * backend-specific shortcut.
+ */
+ @Override
+ public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) {
+ return activeDelegate.multipartIndexFileExists(tableSpace, uuid, fileType);
+ }
}
diff --git a/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java b/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java
index 4fdf351e3..59fcc605a 100644
--- a/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java
+++ b/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java
@@ -133,6 +133,33 @@ public void deleteMultipartIndexFile(String tableSpace, String uuid, String file
throw readOnly("deleteMultipartIndexFile");
}
+ /**
+ * Issue #617 + pr-reviewer follow-up #5: same {@code readFileRange}
+ * shortcut as {@link RemoteFileDataStorageManager#multipartIndexFileExists}
+ * — the default supplier-based fallback would also work (now that the base
+ * class uses a 1 GiB sentinel for the synthetic {@code fileSize}), but it
+ * routes the 4-byte probe through {@link RemoteRandomAccessReader} which
+ * is heavier than a direct block-range read. A null/empty response means
+ * block-0 is absent.
+ */
+ @Override
+ public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) {
+ String logicalPath = remoteMultipartPath(tableSpace, uuid, fileType);
+ int blockSize = client.getBlockSize();
+ try {
+ byte[] head = client.readFileRange(logicalPath, 0L, 4, blockSize);
+ return head != null && head.length > 0;
+ } catch (RuntimeException e) {
+ // Some backends wrap NoSuchKey-style errors as unchecked
+ // exceptions; treat any failure during the probe as "missing"
+ // per the base contract.
+ LOGGER.log(Level.FINE,
+ "multipartIndexFileExists: probe failed for {0}: {1}",
+ new Object[]{logicalPath, e.getMessage()});
+ return false;
+ }
+ }
+
// -------------------------------------------------------------------------
// Page deserialization (matches FileDataStorageManager format)
// -------------------------------------------------------------------------
diff --git a/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java b/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java
index 27efcb4b5..e124b9dbf 100644
--- a/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java
+++ b/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java
@@ -1045,6 +1045,46 @@ public io.github.jbellis.jvector.disk.ReaderSupplier multipartIndexReaderSupplie
readerStatsLogger, segmentBlockCache);
}
+ /**
+ * Issue #617 + pr-reviewer follow-up #2: backend-specific override of the
+ * {@code multipartIndexFileExists} probe. The default
+ * {@link DataStorageManager#multipartIndexFileExists} implementation
+ * builds a {@link RemoteRandomAccessReader} with a fictitious
+ * {@code fileSize=1L} and then calls {@code readInt()} on it — that
+ * combination triggers an infinite loop in
+ * {@link RemoteRandomAccessReader#readFully(byte[])} because the loop's
+ * {@code toCopy} becomes zero once {@code position} matches the fake
+ * {@code totalSize}. We bypass the reader entirely and probe block-0
+ * directly via the file-server client's {@code readFileRange} API,
+ * which returns {@code null} when the underlying block is absent.
+ *
+ * 4 bytes is the same probe length the base contract documents (the
+ * default impl does {@code readInt()}), so the contract gap that
+ * {@code --force} covers — a block-0 of ≥4 bytes is enough to make the
+ * probe return {@code true} even on a truncated upload — is preserved.
+ */
+ @Override
+ public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) {
+ String logicalPath = remoteMultipartPath(tableSpace, uuid, fileType);
+ int blockSize = Math.max(client.getBlockSize(), MULTIPART_BLOCK_SIZE);
+ try {
+ byte[] head = client.readFileRange(logicalPath, 0L, 4, blockSize);
+ // RemoteFileServiceClient.readFileRange returns null for missing
+ // blocks (see RemoteFileServiceTest.testWriteFileBlockAndReadFileRange).
+ // A non-null result with ≥ 1 byte means block-0 is reachable —
+ // matches the base contract's "best-effort presence check".
+ return head != null && head.length > 0;
+ } catch (RuntimeException e) {
+ // Some backends wrap NoSuchKey-style errors in unchecked
+ // exceptions; treat any failure during the probe as "missing"
+ // per the contract.
+ LOGGER.log(Level.FINE,
+ "multipartIndexFileExists: probe failed for {0}: {1}",
+ new Object[]{logicalPath, e.getMessage()});
+ return false;
+ }
+ }
+
@Override
public void deleteMultipartIndexFile(String tableSpace, String uuid, String fileType)
throws DataStorageManagerException {
diff --git a/herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java b/herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java
new file mode 100644
index 000000000..26419db92
--- /dev/null
+++ b/herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java
@@ -0,0 +1,279 @@
+/*
+ Licensed to Diennea S.r.l. under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. Diennea S.r.l. licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+ */
+package herddb.remote;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * pr-reviewer follow-up #2 for issue #617: dedicated probe-and-purge contract
+ * tests for {@link RemoteFileDataStorageManager#multipartIndexFileExists}
+ * (the safety gate the {@code DeleteSegment} engine path keys off) and
+ * {@link RemoteFileDataStorageManager#deleteMultipartIndexFile} (the
+ * {@code --purge-storage} purger).
+ *
+ *
The matrix:
+ *
+ * - (a) probe returns {@code false} when the multipart object is
+ * absent — the gate must not block a delete the operator initiated for
+ * an already-orphaned segment record;
+ * - (b) probe returns {@code true} when the multipart object is
+ * fully present — the gate must protect the operator from deleting a
+ * healthy segment;
+ * - (c) {@code deleteMultipartIndexFile} is idempotent and
+ * invalidates the {@link SegmentBlockCache} so a subsequent reader on
+ * the same logical path cannot serve stale bytes;
+ * - (d) probe still returns {@code true} on a block-0 that is at
+ * least 4 bytes long but otherwise truncated/corrupted — this
+ * documents the contract gap that the operator's {@code --force} flag
+ * covers (the gate cannot distinguish "complete file" from "block-0
+ * only", so {@code --force} remains required when the rest of the
+ * file is known to be corrupt).
+ *
+ *
+ * End-to-end coverage of {@code DeleteSegment} that exercises this gate
+ * indirectly lives in {@code herddb.indexing.ShadowDeleteSegmentE2ETest}
+ * (in the indexing-service module).
+ */
+public class RemoteFileDataStorageManagerProbeAndPurgeTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private RemoteFileServer server;
+ private RemoteFileServiceClient client;
+ private RemoteFileDataStorageManager storage;
+
+ @Before
+ public void setUp() throws Exception {
+ server = new RemoteFileServer(0, folder.newFolder("remote").toPath());
+ server.start();
+ client = new RemoteFileServiceClient(Arrays.asList("localhost:" + server.getPort()));
+ storage = new RemoteFileDataStorageManager(
+ folder.newFolder("metadata").toPath(),
+ folder.newFolder("tmp").toPath(),
+ 1000,
+ client);
+ storage.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (storage != null) {
+ storage.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ /**
+ * Writes a synthetic graph file via the high-level multipart writer so
+ * the produced object matches exactly what {@code PersistentVectorStore}
+ * would have written at Phase B time (issue #617 reproduction shape).
+ */
+ private long writeMultipartGraph(String tableSpace, String uuid, byte[] data) throws Exception {
+ Path tempFile = folder.newFile("graph-" + uuid + ".bin").toPath();
+ Files.write(tempFile, data);
+ try {
+ storage.writeMultipartIndexFile(tableSpace, uuid, "graph", tempFile, null);
+ } finally {
+ Files.deleteIfExists(tempFile);
+ }
+ return data.length;
+ }
+
+ /**
+ * (a) absent object → probe returns {@code false}. This is the path the
+ * issue #617 reproduction hits when the Phase B upload failed before
+ * block-0 was ever published: the IS metadata still references the
+ * segment, but the remote storage has no object for it. With the gate
+ * keying off this probe, {@code DeleteSegment} with {@code force=false}
+ * is accepted because the safety check (graph file IS reachable) does
+ * not fire.
+ */
+ @Test(timeout = 30_000)
+ public void probeReturnsFalseWhenObjectIsAbsent() {
+ assertFalse("probe must return false when no multipart object exists",
+ storage.multipartIndexFileExists("ts1", "uuid-absent", "graph"));
+ }
+
+ /**
+ * (b) fully-present object → probe returns {@code true}. This is the
+ * happy path: the gate must protect the operator from deleting a
+ * healthy segment without going through {@code --force}.
+ */
+ @Test(timeout = 60_000)
+ public void probeReturnsTrueWhenObjectIsFullyPresent() throws Exception {
+ // Multi-block payload so the probe does not just inspect a partial
+ // block-0 — exactly matches the production "complete graph file"
+ // contract that the gate is supposed to detect.
+ byte[] payload = new byte[64 * 1024];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) (i & 0xFF);
+ }
+ writeMultipartGraph("ts1", "uuid-present", payload);
+ assertTrue("probe must return true when the multipart object exists",
+ storage.multipartIndexFileExists("ts1", "uuid-present", "graph"));
+ }
+
+ /**
+ * (c) {@code deleteMultipartIndexFile} is idempotent and invalidates
+ * the block cache. We verify:
+ *
+ * - two back-to-back calls on the same key succeed (the second
+ * must not throw — the multipart files are already gone);
+ * - a block populated in the {@link SegmentBlockCache} via a real
+ * read is removed from the cache by the delete call, so a
+ * subsequent reader on the same logical path cannot serve stale
+ * bytes from a rewritten segment that happens to hit the same
+ * key.
+ *
+ *
+ * {@link SegmentBlockCache} is {@code final}, so we install a real
+ * one with a small capacity and assert via the public
+ * {@link SegmentBlockCache#containsBlock(String, long, int)} accessor
+ * — that is exactly the same surface used in production to decide
+ * whether a block load is needed.
+ */
+ @Test(timeout = 60_000)
+ public void deleteMultipartIndexFileIsIdempotentAndInvalidatesCache() throws Exception {
+ byte[] payload = new byte[8 * 1024];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) ((i * 31) & 0xFF);
+ }
+ writeMultipartGraph("ts1", "uuid-purge", payload);
+ assertTrue("precondition: object must exist before purge",
+ storage.multipartIndexFileExists("ts1", "uuid-purge", "graph"));
+
+ // Install a real (enabled) SegmentBlockCache so we can observe the
+ // invalidation effect via the public containsBlock() accessor.
+ // 1 MiB capacity is plenty for the 8 KiB payload.
+ SegmentBlockCache cache = new SegmentBlockCache(1L * 1024 * 1024);
+ storage.setSegmentBlockCache(cache,
+ org.apache.bookkeeper.stats.NullStatsLogger.INSTANCE);
+
+ // Force a block to land in the cache by running a real read. The
+ // RemoteRandomAccessReader populates the cache on first access via
+ // the supplier returned by multipartIndexReaderSupplier.
+ io.github.jbellis.jvector.disk.ReaderSupplier rs =
+ storage.multipartIndexReaderSupplier("ts1", "uuid-purge", "graph",
+ (long) payload.length);
+ try (io.github.jbellis.jvector.disk.RandomAccessReader r = rs.get()) {
+ r.seek(0L);
+ r.readInt(); // pulls block-0 into the cache
+ }
+ String logicalPath = "ts1/uuid-purge/multipart/graph";
+ // The cache may use a writeBlockSize different from payload.length,
+ // so we cannot pin down the exact (offset, length) pair without
+ // duplicating the production block-sizing math. Instead, walk a
+ // small set of candidate block sizes via the public containsBlock
+ // helper — at least one of them must hit, otherwise the read
+ // above did not populate the cache and the test cannot prove the
+ // invalidation contract.
+ int[] candidateLengths = new int[]{
+ 4 * 1024 * 1024, 1024 * 1024, 64 * 1024, payload.length
+ };
+ boolean wasCached = false;
+ for (int len : candidateLengths) {
+ if (cache.containsBlock(logicalPath, 0L, len)) {
+ wasCached = true;
+ break;
+ }
+ }
+ assertTrue("precondition: a block must be cached after the read,"
+ + " otherwise the invalidation assertion is vacuous",
+ wasCached);
+
+ // First delete must succeed AND invalidate the cache.
+ storage.deleteMultipartIndexFile("ts1", "uuid-purge", "graph");
+ assertFalse("object must be gone after delete",
+ storage.multipartIndexFileExists("ts1", "uuid-purge", "graph"));
+ for (int len : candidateLengths) {
+ assertFalse("cached block at length=" + len
+ + " must be evicted after deleteMultipartIndexFile",
+ cache.containsBlock(logicalPath, 0L, len));
+ }
+
+ // Second delete must also succeed (idempotent) — no exception, no
+ // surfaced error. The implementation logs at WARNING but does not
+ // propagate.
+ storage.deleteMultipartIndexFile("ts1", "uuid-purge", "graph");
+ assertFalse("object must remain gone after the second delete call",
+ storage.multipartIndexFileExists("ts1", "uuid-purge", "graph"));
+ }
+
+ /**
+ * (d) Documents the contract gap that {@code --force} covers: when
+ * block-0 is present but truncated/corrupted, the probe still returns
+ * {@code true}. The default probe reads the first 4 bytes via the
+ * multipart reader supplier — that succeeds as long as block-0 has at
+ * least 4 bytes, even if the rest of the file is missing or corrupted.
+ *
+ *
This is the exact reproduction of the issue #617 scenario where an
+ * S3 multipart upload published block-0 then failed on a later block:
+ * the operator running {@code delete-segment} without {@code --force}
+ * sees the safety gate trip ("graph file IS reachable"), reads the
+ * documentation, and re-runs with {@code --force} once they have
+ * confirmed via the file-server logs that the upload never completed.
+ */
+ @Test(timeout = 30_000)
+ public void probeReturnsTrueOnPartialBlockZeroAtLeast4Bytes() throws Exception {
+ // Write just block-0 directly via the lower-level writeFileBlock RPC
+ // — that bypasses the multipart writer's "all blocks or none"
+ // semantics and reproduces the Phase B "upload failed after
+ // block-0" failure mode the issue #617 operator path covers.
+ // Must match remoteMultipartPath: tableSpace/uuid/multipart/fileType.
+ String logicalPath = "ts1/uuid-partial/multipart/graph";
+ byte[] partial = new byte[16]; // > 4 bytes; the probe reads 4 bytes
+ for (int i = 0; i < partial.length; i++) {
+ partial[i] = (byte) i;
+ }
+ client.writeFileBlock(logicalPath, 0L, partial);
+
+ // Sanity: the multipart writer would have written multiple blocks
+ // for a real graph file. Here only block-0 is present — yet the
+ // probe must still return true because it only inspects block-0.
+ assertTrue("probe returns true on a partial block-0 — documents the"
+ + " contract gap that --force covers (issue #617)",
+ storage.multipartIndexFileExists("ts1", "uuid-partial", "graph"));
+
+ // Round-tripping the partial bytes through writeMultipartFile is
+ // not possible (the input stream framing would always produce a
+ // matching file); this direct block-0 write is the only way to
+ // produce a "partial" state in the test harness, and it mirrors
+ // exactly what an S3 backend produces on a partial multipart
+ // upload failure.
+ assertNotNull("client must still be wired for follow-up purge calls", client);
+ }
+
+}