From 997e8528fa0486876a708596396d9cd99b623433 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 20 May 2026 19:56:20 +0200 Subject: [PATCH 1/3] issue #617: add indexing-admin delete-segment command for corrupted segments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an operator remediation tool that removes a single segment from the IS in-memory metadata after a Phase B upload failure has left it registered without a complete graph file in remote storage. Without this, every subsequent compaction cycle re-selects the broken segment and fails with S3 NoSuchKey, locking the index into a compaction livelock that only a full cluster teardown can resolve. The new path: - `DeleteSegment` gRPC RPC + proto messages, surfaced as the `indexing-admin delete-segment` CLI sub-command with `--segment`, `--purge-storage`, `--force`, and `--yes` flags (plus an interactive stdin confirmation for non-scripted use). - Engine-level `IndexingServiceEngine.deleteSegment` that locates the store, verifies presence, refuses the delete when the graph file is still reachable (unless `force=true`), drops the segment from the in-memory list, optionally purges multipart files, and force-runs a checkpoint so the new (smaller) segment list is serialised to IndexStatus and republished to ZK — shadows observe the change on their next reload. - `PersistentVectorStore.dropSegmentByStorageKey` reuses the same writeLock + deferred-close protocol as `dropSegmentByUuid` (issue #535), so the operator path is safe against concurrent compaction. - Best-effort `DataStorageManager.multipartIndexFileExists` default that probes the first 4 bytes via the multipart reader supplier — used as the safety gate without requiring backend-specific HEAD APIs. Tests: - `IndexingAdminCliDeleteSegmentTest` — 8 fake-gRPC tests covering flag forwarding (purge/force/--yes), text vs JSON output, exit codes (0 for removed=true, 1 for removed=false), server-refusal mapping, stdin confirmation accept/abort, and missing-required-flag usage handling. - `ShadowDeleteSegmentE2ETest` — 3 end-to-end tests with a real ZKTestEnv + MemoryDataStorageManager + PersistentVectorStore: the refusal gate when the graph file is reachable, the force+purge path with shadow-replica notification (shadow reloadCount advances after the post-delete republish), the no-purge path that keeps multipart files for forensics, and the unknown-segment rejection diagnostic. Co-Authored-By: Claude Opus 4.7 --- .../herddb/storage/DataStorageManager.java | 48 ++ .../indexing/IndexingServiceEngine.java | 172 +++++++ .../herddb/indexing/IndexingServiceImpl.java | 55 +++ .../indexing/admin/IndexingAdminCli.java | 93 ++++ .../indexing/admin/IndexingAdminClient.java | 18 + .../indexing/vector/AbstractVectorStore.java | 51 ++ .../vector/PersistentVectorStore.java | 82 +++ .../src/main/proto/indexing_service.proto | 60 +++ .../indexing/ShadowDeleteSegmentE2ETest.java | 466 ++++++++++++++++++ .../IndexingAdminCliDeleteSegmentTest.java | 424 ++++++++++++++++ 10 files changed, 1469 insertions(+) create mode 100644 herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java create mode 100644 herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java diff --git a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java index 8ff6657c6..ac5b26087 100644 --- a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java +++ b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java @@ -267,6 +267,54 @@ public abstract io.github.jbellis.jvector.disk.ReaderSupplier multipartIndexRead public abstract void deleteMultipartIndexFile(String tableSpace, String uuid, String fileType) throws DataStorageManagerException; + /** + * Issue #617: best-effort presence check for a multipart index file. + * Used by the operator-facing {@code DeleteSegment} RPC to refuse the + * delete when the graph file IS reachable (warning the operator that + * they may be targeting the wrong segment). + * + *

Default implementation opens the reader supplier with a 1-byte + * fictitious file size and attempts a single 1-byte read at offset 0. + * Success → {@code true}; an {@link IOException} or + * {@link DataStorageManagerException} on either step → {@code false}. + * Backends that can answer the question more cheaply (e.g. an S3 + * {@code HEAD} request) should override. + * + *

The contract is intentionally "best-effort": callers MUST NOT + * use this as a strong existence check, only as a safety gate for + * operator-driven deletes. A {@code false} result combined with a + * {@code --force} override is the supported way to delete a segment. + */ + public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) { + // We do not know the real file size here; pass a sentinel value + // large enough to cover the first block. The default + // multipartIndexReaderSupplier implementations only use fileSize + // to compute block boundaries, so a too-large value is harmless + // for the single-byte read we are about to attempt. + io.github.jbellis.jvector.disk.ReaderSupplier rs; + try { + rs = multipartIndexReaderSupplier(tableSpace, uuid, fileType, 1L); + } catch (DataStorageManagerException e) { + return false; + } + try (io.github.jbellis.jvector.disk.RandomAccessReader r = rs.get()) { + r.seek(0L); + r.readInt(); + return true; + // The reader will throw IOException on missing block-0; for backends + // that surface S3 NoSuchKey as an unchecked exception (the issue #617 + // failure mode), the broader RuntimeException catch below absorbs it. + // Both are treated as "file not present" — see the method contract. + } catch (IOException | RuntimeException e) { + // Broad RuntimeException catch is necessary because some object- + // storage backends (S3) wrap NoSuchKeyException in a SdkException + // subclass that is itself a RuntimeException, not an IOException. + // We intentionally treat any error during the probe as "missing", + // consistent with the best-effort contract documented above. + return false; + } + } + /** * Returns {@code true} when this storage manager supports bypassing the gRPC * file-server round-trips for bulk segment-file downloads during recovery. diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java index 8cc000db7..8d256f694 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java @@ -2261,6 +2261,178 @@ public void dropIndexImmediate(String table, String indexName, String requestedU } } + /** + * Outcome of {@link #deleteSegment(String, String, String, boolean, boolean)}. + * Mirrors the wire fields of {@code DeleteSegmentResponse} so the gRPC + * handler is a thin translation layer. + */ + public static final class DeleteSegmentResult { + public final String segment; + public final boolean removed; + public final long vectorsLost; + public final boolean graphFilePresent; + public final boolean storagePurged; + + public DeleteSegmentResult(String segment, boolean removed, long vectorsLost, + boolean graphFilePresent, boolean storagePurged) { + this.segment = segment; + this.removed = removed; + this.vectorsLost = vectorsLost; + this.graphFilePresent = graphFilePresent; + this.storagePurged = storagePurged; + } + } + + /** + * Thrown by {@link #deleteSegment} when the request must be rejected + * without mutating state: the index or store is not loaded, the + * segment is not registered, or the segment's graph file is still + * present in remote storage and {@code force == false}. + */ + public static final class DeleteSegmentException extends RuntimeException { + public DeleteSegmentException(String message) { + super(message); + } + } + + /** + * Issue #617: operator remediation tool. Removes a single segment from + * a {@link PersistentVectorStore}'s in-memory metadata, with optional + * purging of the segment's multipart files in the underlying + * {@link DataStorageManager}. + * + *

Refuses the deletion when the segment's graph file IS reachable + * in remote storage and {@code force == false} — the most likely + * explanation for a reachable graph file is that the operator is + * targeting the wrong segment. The {@code --force} flag (and an + * extra confirmation in the CLI) lets the operator override. + * + *

On a successful in-memory removal, this method re-publishes the + * current {@link IndexingServiceCheckpointState} so shadow replicas + * observe the new (smaller) segment count on their next reload. The + * re-publish is best-effort — if it fails the next regular checkpoint + * will still carry the updated segment list, so shadows converge. + * + * @throws DeleteSegmentException when the request cannot be satisfied + * without mutating state + */ + public DeleteSegmentResult deleteSegment(String table, String indexName, String segmentStorageKey, + boolean purgeStorage, boolean force) { + if (table == null || table.isEmpty()) { + throw new DeleteSegmentException("table is required"); + } + if (indexName == null || indexName.isEmpty()) { + throw new DeleteSegmentException("index is required"); + } + if (segmentStorageKey == null || segmentStorageKey.isEmpty()) { + throw new DeleteSegmentException("segment is required"); + } + AbstractVectorStore store = vectorStores.get(storeKey(table, indexName)); + if (store == null) { + throw new DeleteSegmentException("index " + table + "." + indexName + " is not loaded"); + } + if (!(store instanceof PersistentVectorStore)) { + throw new DeleteSegmentException( + "index " + table + "." + indexName + " is non-persistent (" + + store.getClass().getSimpleName() + "); has no on-disk segments"); + } + PersistentVectorStore pvs = (PersistentVectorStore) store; + + // Presence check (informational + safety gate). + java.util.List keys = pvs.getSegmentStorageKeysSnapshot(); + if (!keys.contains(segmentStorageKey)) { + throw new DeleteSegmentException( + "segment " + segmentStorageKey + " is not registered in index " + + table + "." + indexName + "; currently loaded segments: " + keys); + } + + // MinIO HEAD-equivalent — best effort. We deliberately read the + // tablespace UUID from the engine rather than from the request because + // the IS is single-tablespace per instance. + boolean graphPresent = false; + if (dataStorageManager != null) { + graphPresent = dataStorageManager.multipartIndexFileExists( + tableSpaceUUID, segmentStorageKey, "graph"); + } + if (graphPresent && !force) { + throw new DeleteSegmentException( + "refusing to delete segment " + segmentStorageKey + + ": graph file IS reachable in remote storage. " + + "Re-run with force=true if you are sure this is the right segment " + + "(see issue #617)."); + } + + // Audit-level log BEFORE the mutation so a crash mid-delete leaves a + // forensic trace in the IS log. + LOGGER.log(Level.SEVERE, + "deleteSegment: operator-initiated removal of segment {0} from {1}.{2}" + + " (graph_file_present={3}, force={4}, purge_storage={5})" + + " — issue #617", + new Object[]{segmentStorageKey, table, indexName, + graphPresent, force, purgeStorage}); + + AbstractVectorStore.SegmentDropResult drop = pvs.dropSegmentByStorageKey(segmentStorageKey); + if (!drop.removed) { + // Race: a concurrent compaction swap removed the segment between + // our snapshot and the drop. Treat as a no-op success — the + // operator's intent (segment gone) has been satisfied. + LOGGER.log(Level.WARNING, + "deleteSegment: segment {0} disappeared between snapshot and drop" + + " (concurrent compaction swap?); reporting no-op", + segmentStorageKey); + return new DeleteSegmentResult(segmentStorageKey, false, 0L, graphPresent, false); + } + + boolean storagePurged = false; + if (purgeStorage && dataStorageManager != null) { + // Best-effort: failures are logged but do not undo the in-memory + // removal. The operator can re-run with purge_storage=true to + // retry the file deletion if needed (the underlying call is + // idempotent). + try { + dataStorageManager.deleteMultipartIndexFile(tableSpaceUUID, segmentStorageKey, "graph"); + dataStorageManager.deleteMultipartIndexFile(tableSpaceUUID, segmentStorageKey, "map"); + storagePurged = true; + } catch (herddb.storage.DataStorageManagerException e) { + LOGGER.log(Level.WARNING, + "deleteSegment: in-memory removal of " + segmentStorageKey + + " succeeded but multipart file purge failed; " + + "operator may need to clean up storage manually", + e); + } + } + + // Trigger a checkpoint so the new (smaller) segment list is + // serialised to the on-disk IndexStatus AND the corresponding + // IndexingServiceCheckpointState is republished to ZK. Without + // the checkpoint a shadow reload would still see the deleted + // segment in the IndexStatus and fail with "multipart file not + // found" when it tries to mmap the purged map file. The + // dropSegmentByStorageKey path marks the store dirty so the + // checkpoint will actually serialise. + // + // forceCheckpointAndSaveWatermark also calls + // publishCheckpointStateBestEffort internally, so shadows are + // notified as part of the same write. + try { + forceCheckpointAndSaveWatermark(); + } catch (RuntimeException e) { + // Checkpoint failure must not undo the in-memory removal — + // a subsequent regular checkpoint (or another delete-segment + // call) will eventually serialise the reduced segment list. + // Logged at WARNING so operators can correlate with the + // SEVERE audit line emitted above. + LOGGER.log(Level.WARNING, + "deleteSegment: post-delete checkpoint failed; shadows may" + + " not observe the new segment count until the" + + " next regular checkpoint", + e); + } + + return new DeleteSegmentResult( + segmentStorageKey, true, drop.vectorsLost, graphPresent, storagePurged); + } + private static boolean isDmlType(short type) { return type == LogEntryType.INSERT || type == LogEntryType.UPDATE diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java index 4fc269e0a..333ac00f2 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java @@ -23,6 +23,8 @@ import com.google.protobuf.ByteString; import herddb.indexing.proto.DescribeIndexRequest; import herddb.indexing.proto.DescribeIndexResponse; +import herddb.indexing.proto.DeleteSegmentRequest; +import herddb.indexing.proto.DeleteSegmentResponse; import herddb.indexing.proto.DropIndexRequest; import herddb.indexing.proto.DropIndexResponse; import herddb.indexing.proto.GetEngineStatsRequest; @@ -679,6 +681,59 @@ public void dropIndex(DropIndexRequest request, } } + /** + * Issue #617: operator remediation tool. Removes a single corrupted + * segment from the IS in-memory metadata, with optional purge of its + * multipart files in the underlying {@link herddb.storage.DataStorageManager}. + * + *

Maps engine-level outcomes to gRPC status codes: + *

+ */ + @Override + public void deleteSegment(DeleteSegmentRequest request, + StreamObserver responseObserver) { + String table = request.getTable(); + String indexName = request.getIndex(); + String segment = request.getSegment(); + LOGGER.log(Level.WARNING, + "DeleteSegment RPC: index={0} table={1} tablespace={2} segment={3}" + + " purge_storage={4} force={5} (issue #617)", + new Object[]{indexName, table, request.getTablespace(), segment, + request.getPurgeStorage(), request.getForce()}); + try { + IndexingServiceEngine.DeleteSegmentResult result = engine.deleteSegment( + table, indexName, segment, + request.getPurgeStorage(), request.getForce()); + responseObserver.onNext(DeleteSegmentResponse.newBuilder() + .setSegment(result.segment) + .setRemoved(result.removed) + .setVectorsLost(result.vectorsLost) + .setGraphFilePresent(result.graphFilePresent) + .setStoragePurged(result.storagePurged) + .build()); + responseObserver.onCompleted(); + } catch (IndexingServiceEngine.DeleteSegmentException e) { + LOGGER.log(Level.WARNING, + "DeleteSegment RPC rejected for segment " + segment + ": " + e.getMessage()); + responseObserver.onError(Status.FAILED_PRECONDITION + .withDescription(e.getMessage()) + .asRuntimeException()); + } catch (RuntimeException e) { + LOGGER.log(Level.SEVERE, "DeleteSegment RPC failed for segment " + segment, e); + responseObserver.onError(Status.INTERNAL + .withDescription(e.getMessage()) + .withCause(e) + .asRuntimeException()); + } + } + /** * Push-based indexing (TESTING ONLY). Deserializes each pushed * {@code LogEntry} and enqueues it into the {@link PushCommitLogTailer}'s diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java index 3faa8b515..75225a726 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java @@ -21,6 +21,7 @@ package herddb.indexing.admin; import com.google.protobuf.ByteString; +import herddb.indexing.proto.DeleteSegmentResponse; import herddb.indexing.proto.DescribeIndexResponse; import herddb.indexing.proto.GetEngineStatsResponse; import herddb.indexing.proto.GetIndexStatusResponse; @@ -84,6 +85,7 @@ public final class IndexingAdminCli { static final String COMMAND_LIST_SHADOWS = "list-shadows"; static final String COMMAND_SHADOW_STATUS = "shadow-status"; static final String COMMAND_WAIT_SHADOW = "wait-shadow"; + static final String COMMAND_DELETE_SEGMENT = "delete-segment"; private final PrintStream out; private final PrintStream err; @@ -131,6 +133,8 @@ public int run(String[] args) { return runShadowStatus(rest); case COMMAND_WAIT_SHADOW: return runWaitShadow(rest); + case COMMAND_DELETE_SEGMENT: + return runDeleteSegment(rest); default: err.println("Unknown command: " + command); printUsage(); @@ -162,6 +166,7 @@ private void printUsage() { out.println(" list-shadows Read ZooKeeper and print registered shadow replicas"); out.println(" shadow-status Print shadow-replica catch-up state of one instance"); out.println(" wait-shadow Block until an instance has caught up to a target LSN"); + out.println(" delete-segment Remove a corrupted segment from an index's IS metadata (#617)"); out.println(); out.println("Run 'indexing-admin --help' for command-specific flags."); } @@ -779,6 +784,94 @@ private int runWaitShadow(String[] args) throws Exception { } } + /** + * Issue #617: {@code delete-segment} command. Runs against a single IS + * instance and removes one segment from its in-memory metadata. + * + *

Safety: the CLI prompts on stdin (unless {@code --yes} is given) + * before submitting the RPC, and again when the operator supplies + * {@code --force} (which overrides the server-side refusal to delete a + * segment whose graph file IS reachable in storage). The default exit + * code is {@code 0} only when the server reports {@code removed=true}; + * a presence-only "no-op" response returns {@code 1} so wrapper scripts + * can distinguish "I successfully removed it" from "it was already gone". + */ + private int runDeleteSegment(String[] args) throws Exception { + Options opts = new Options(); + addCommonOptions(opts); + addServerOption(opts); + addIndexOptions(opts); + opts.addOption(Option.builder().longOpt("segment").hasArg().argName("NAME") + .desc("segment storage key to remove (required, e.g. vidx__seg627)") + .required().build()); + opts.addOption(Option.builder().longOpt("purge-storage") + .desc("also delete the segment's multipart graph + map files from storage").build()); + opts.addOption(Option.builder().longOpt("force") + .desc("delete even when the graph file IS reachable in remote storage " + + "(extra confirmation required unless --yes is set)").build()); + opts.addOption(Option.builder().longOpt("yes") + .desc("skip interactive confirmation prompts (use in scripts)").build()); + + CommandLine cli = parse(opts, args, COMMAND_DELETE_SEGMENT); + if (cli == null) { + return 0; + } + String segment = cli.getOptionValue("segment"); + boolean purge = cli.hasOption("purge-storage"); + boolean force = cli.hasOption("force"); + boolean yes = cli.hasOption("yes"); + + if (!yes) { + if (force) { + err.println("WARNING: --force will delete segment " + segment + + " even if its graph file is reachable in storage."); + err.println("This usually means you are targeting the wrong segment."); + } + err.println("About to delete segment '" + segment + "' from " + + cli.getOptionValue("table") + "." + cli.getOptionValue("index") + + " on " + cli.getOptionValue("server") + + (purge ? " (and purge multipart files from storage)" : "") + + ". Continue? [y/N] "); + err.flush(); + // Read from System.in directly rather than capture it via a CLI + // option: the CLI's stdin is exactly what the operator typed. + int ch = -1; + try { + ch = System.in.read(); + } catch (java.io.IOException e) { + err.println("aborting: stdin not available (" + e + "); re-run with --yes"); + return 1; + } + if (ch != 'y' && ch != 'Y') { + err.println("aborted by user"); + return 1; + } + } + + try (IndexingAdminClient client = buildClient(cli)) { + DeleteSegmentResponse resp = client.deleteSegment( + cli.getOptionValue("tablespace"), + cli.getOptionValue("table"), + cli.getOptionValue("index"), + segment, purge, force); + if (cli.hasOption("json")) { + Map m = new LinkedHashMap<>(); + m.put("segment", resp.getSegment()); + m.put("removed", resp.getRemoved()); + m.put("vectors_lost", resp.getVectorsLost()); + m.put("graph_file_present", resp.getGraphFilePresent()); + m.put("storage_purged", resp.getStoragePurged()); + out.println(JsonWriter.toJson(m)); + } else { + out.printf(Locale.ROOT, + "segment=%s removed=%s vectors_lost=%d graph_file_present=%s storage_purged=%s%n", + resp.getSegment(), resp.getRemoved(), resp.getVectorsLost(), + resp.getGraphFilePresent(), resp.getStoragePurged()); + } + return resp.getRemoved() ? 0 : 1; + } + } + /** * Returns the time-lag in milliseconds for a given LogEntry timestamp, or * {@code -1} when {@code timestampMillis == 0} ("unknown" — no entries diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java index bb4811f2f..81be77d2e 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java @@ -20,6 +20,8 @@ package herddb.indexing.admin; +import herddb.indexing.proto.DeleteSegmentRequest; +import herddb.indexing.proto.DeleteSegmentResponse; import herddb.indexing.proto.DescribeIndexRequest; import herddb.indexing.proto.DescribeIndexResponse; import herddb.indexing.proto.GetEngineStatsRequest; @@ -115,6 +117,22 @@ public GetShadowStatusResponse getShadowStatus() { return stub().getShadowStatus(GetShadowStatusRequest.newBuilder().build()); } + /** + * Issue #617: operator remediation tool. Removes a single segment from + * the IS in-memory metadata, optionally purging its multipart files. + */ + public DeleteSegmentResponse deleteSegment(String tablespace, String table, String index, + String segment, boolean purgeStorage, boolean force) { + return stub().deleteSegment(DeleteSegmentRequest.newBuilder() + .setTablespace(tablespace == null ? "" : tablespace) + .setTable(table == null ? "" : table) + .setIndex(index == null ? "" : index) + .setSegment(segment == null ? "" : segment) + .setPurgeStorage(purgeStorage) + .setForce(force) + .build()); + } + public WaitForCheckpointResponse waitForCheckpoint(String tablespace, String table, String index, long targetLedgerId, long targetOffset, long waitTimeoutMs) { diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java b/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java index bcc74cebd..21d91c715 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java @@ -178,6 +178,57 @@ public void dropSegmentByUuid(String segmentUuid) { // No-op for non-persistent stores. } + /** + * Result of a {@link #dropSegmentByStorageKey(String)} call. + * + *

Used by the operator-facing {@code DeleteSegment} RPC (issue #617) + * to report whether the segment was found and, when it was, the number + * of live vectors that were lost as part of the removal — both as a + * sanity check ("did I just delete a populated segment?") and as the + * value the IS surfaces back to the {@code indexing-admin delete-segment} + * CLI for the operator's audit log. + */ + public static final class SegmentDropResult { + public final boolean removed; + public final long vectorsLost; + + public SegmentDropResult(boolean removed, long vectorsLost) { + this.removed = removed; + this.vectorsLost = vectorsLost; + } + + public static final SegmentDropResult NOT_FOUND = new SegmentDropResult(false, 0L); + } + + /** + * Removes a segment from the active list by its multipart storage key + * — i.e. the value returned by {@code PersistentVectorStore.segmentStorageKey} + * (legacy {@code indexUUID + "_seg" + segmentId} or, for adopted segments, + * the explicit {@code externalStorageKey}). Used by the operator-facing + * {@code DeleteSegment} RPC (issue #617) to remove a corrupted segment + * whose Phase B upload failed mid-flight, leaving it registered in IS + * metadata without a fully-written graph file in remote storage. + * + *

Idempotent: returns {@link SegmentDropResult#NOT_FOUND} when no + * segment with the given storage key is currently loaded. + * + *

Does not delete the segment's multipart files from the + * underlying storage manager — that is the responsibility of the + * caller (the {@code DeleteSegment} RPC handler), which only purges + * remote files when explicitly requested via {@code purge_storage=true}. + * + *

The default implementation is a no-op. + * {@link herddb.indexing.vector.PersistentVectorStore} overrides this. + * + * @param storageKey the segment's multipart storage key to remove + * @return a {@link SegmentDropResult} describing whether the segment + * was removed and how many live vectors were lost + */ + public SegmentDropResult dropSegmentByStorageKey(String storageKey) { + // No-op for non-persistent stores. + return SegmentDropResult.NOT_FOUND; + } + /** * Reconciles adopted (externally-produced) segments against the ZK-reported * snapshot. Any segment with a non-null external storage key whose UUID is diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java b/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java index 6e4eecc0a..7f10f21f6 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java @@ -3930,6 +3930,64 @@ public void dropSegmentByUuid(String segmentUuid) { } } + /** + * Issue #617: removes a segment from the active list by its + * multipart storage key. Shares the same lock discipline and + * deferred-close machinery as {@link #dropSegmentByUuid}. + * + *

Matching is by {@link #segmentStorageKey(VectorSegment)}, so this + * method works for both IS-locally-produced segments (legacy key + * {@code indexUUID + "_seg" + segmentId}) and adopted ones + * ({@code externalStorageKey}). + * + *

Idempotent: when no segment matches, returns + * {@link SegmentDropResult#NOT_FOUND} without mutating state. + */ + @Override + public SegmentDropResult dropSegmentByStorageKey(String storageKey) { + if (storageKey == null || storageKey.isEmpty()) { + return SegmentDropResult.NOT_FOUND; + } + try { + VectorSegment found = null; + stateLock.writeLock().lock(); + try { + List newList = new java.util.concurrent.CopyOnWriteArrayList<>(); + for (VectorSegment s : segments) { + if (found == null && storageKey.equals(segmentStorageKey(s))) { + found = s; + } else { + newList.add(s); + } + } + if (found == null) { + return SegmentDropResult.NOT_FOUND; + } + segments = newList; + // Mirror dropSegmentByUuid: mark dirty so the next checkpoint + // serialises the reduced segment list. Without this the + // checkpoint gate would skip and leave a stale entry on disk. + dirty.set(true); + long vectorsLost = found.liveCount.get(); + LOGGER.log(Level.SEVERE, + "dropSegmentByStorageKey: operator removed segment {0} from" + + " store {1} (vectors_lost={2}); total on-disk" + + " segments now {3}", + new Object[]{storageKey, indexName, vectorsLost, segments.size()}); + // Same deferred-close protocol as dropSegmentByUuid (issue #535): + // a concurrent compaction cycle may still hold a reference to + // `found.onDiskGraph`; queue the close so it runs after the + // cycle releases its compactionLock. + pendingSegmentCloses.add(found); + return new SegmentDropResult(true, vectorsLost); + } finally { + stateLock.writeLock().unlock(); + } + } finally { + drainPendingSegmentClosesOpportunistically(); + } + } + /** * Reconciles the store's adopted (externally-produced) segment set against * the ZK-reported ownership snapshot. Any segment that was adopted (i.e. @@ -8650,6 +8708,30 @@ public int getOnDiskNodeCount() { return (int) onDiskNodeToPkSize(); } + /** + * Issue #617: snapshot of the multipart storage keys of every on-disk + * segment currently registered in this store. Used by the operator- + * facing {@code DeleteSegment} RPC handler to enumerate candidates + * (via {@code describe-index --json}'s {@code corrupted_segments} list) + * and to verify presence before mutating state. + * + *

The returned list is a defensive copy and is stable across + * concurrent {@link #dropSegmentByStorageKey} / compaction swaps. It + * may legitimately contain duplicate-looking entries on a transient + * adoption race: callers MUST treat the result as a snapshot and + * re-read it if they need a fresh view. + */ + public java.util.List getSegmentStorageKeysSnapshot() { + // CopyOnWriteArrayList iterator is snapshot-safe; no readLock needed + // for the enumeration itself, only for any subsequent mutation that + // wants to act on the snapshot atomically (that is the caller's job). + java.util.List out = new java.util.ArrayList<>(segments.size()); + for (VectorSegment s : segments) { + out.add(segmentStorageKey(s)); + } + return out; + } + /** * Returns the current value of the global monotonic node-id counter. * Exposed for telemetry — dashboards can watch the burn rate and diff --git a/herddb-indexing-service/src/main/proto/indexing_service.proto b/herddb-indexing-service/src/main/proto/indexing_service.proto index 3c6079c33..62ef0bc01 100644 --- a/herddb-indexing-service/src/main/proto/indexing_service.proto +++ b/herddb-indexing-service/src/main/proto/indexing_service.proto @@ -51,6 +51,20 @@ service IndexingService { // The commit-log tailer path remains the authoritative cleanup fallback. rpc DropIndex (DropIndexRequest) returns (DropIndexResponse); + // Issue #617: operator remediation tool. Removes a single corrupted / + // partially-written segment from the IS in-memory metadata so the + // compaction loop stops re-selecting it after a Phase B upload failure + // left the segment registered without a fully-written graph file. + // + // SAFETY: by default the IS refuses to delete a segment whose graph file + // IS reachable in remote storage (operator may be targeting the wrong + // segment). Set force=true to override. When purge_storage=true the IS + // also deletes the segment's multipart graph + map files from the + // underlying DataStorageManager. After a successful deletion the IS + // re-publishes its IndexingServiceCheckpointState so shadow replicas + // observe the new (smaller) segment set on their next reload. + rpc DeleteSegment (DeleteSegmentRequest) returns (DeleteSegmentResponse); + // Push-based indexing (TESTING ONLY). When the indexing service runs with // indexing.log.type=push it does NOT tail a file/BookKeeper commit log; // instead a client serializes herddb.log.LogEntry objects and pushes them @@ -363,6 +377,52 @@ message DropIndexRequest { message DropIndexResponse { } +// Issue #617: operator remediation tool — see rpc DeleteSegment above. +message DeleteSegmentRequest { + // Tablespace name. Currently informational only (the IS resolves the + // store by (table, index)); kept for symmetry with the other admin RPCs + // and so future multi-tablespace IS instances do not need a proto bump. + string tablespace = 1; + string table = 2; + string index = 3; + // The segment's multipart storage key (e.g. "vidx__seg627"). + // This is the value reported by describe-index in the + // corrupted_segments list (issue #616) and the value the IS uses to + // address segment files in remote storage. + string segment = 4; + // Whether to delete the segment's multipart graph + map files from the + // underlying DataStorageManager after removing it from in-memory state. + // When false, the segment is unlinked from the IS but its files remain + // (useful when an operator wants to preserve evidence of corruption for + // post-mortem analysis). + bool purge_storage = 5; + // Whether to delete the segment even when its graph file IS reachable + // in remote storage. Set to true only after careful verification — a + // reachable graph file usually means the operator is targeting the + // wrong segment. + bool force = 6; +} + +message DeleteSegmentResponse { + // The storage key of the segment that was removed (echoes + // DeleteSegmentRequest.segment). + string segment = 1; + // Whether the segment was found in the IS metadata and removed. + bool removed = 2; + // Number of live vectors lost as a result of the removal. -1 when + // the IS could not compute the number (e.g. the segment was not loaded). + int64 vectors_lost = 3; + // Whether the segment's graph file was reachable in remote storage at + // the moment the IS checked. Informational — operators use this to + // decide whether to retry with --force or whether they were targeting + // the wrong segment. + bool graph_file_present = 4; + // Whether the IS attempted to delete the segment's multipart files from + // remote storage (only true when the request set purge_storage=true and + // the in-memory removal succeeded). + bool storage_purged = 5; +} + // ---- Push-based indexing RPC (testing only — see rpc PushEntries) ---- // One commit-log entry to push, with its client-assigned LSN. diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java new file mode 100644 index 000000000..17960d489 --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java @@ -0,0 +1,466 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package herddb.indexing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import herddb.cluster.ZookeeperMetadataStorageManager; +import herddb.codec.RecordSerializer; +import herddb.core.MemoryManager; +import herddb.indexing.vector.PersistentVectorStore; +import herddb.log.LogEntry; +import herddb.log.LogEntryFactory; +import herddb.log.LogSequenceNumber; +import herddb.mem.MemoryDataStorageManager; +import herddb.metadata.IndexingServiceCheckpointState; +import herddb.metadata.IndexingServiceInstanceDescriptor; +import herddb.model.ColumnTypes; +import herddb.model.Index; +import herddb.model.Record; +import herddb.model.Table; +import herddb.model.TableSpace; +import herddb.utils.ZKTestEnv; +import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Issue #617: end-to-end test for the operator-facing {@code DeleteSegment} + * path on a real {@link PersistentVectorStore} backed by a shared + * {@link MemoryDataStorageManager}, with one primary and one shadow replica + * coordinated through a real {@link ZKTestEnv}. Follows the same harness + * convention as {@link ShadowE2ETest} — both live in the IS module and run + * in the core (non-cluster) Maven test category because {@link ZKTestEnv} + * starts an embedded ZK/Bookie pair in the same JVM. + * + *

Coverage: + *

    + *
  • Refusal when graph file is present — the IS rejects the + * delete because the segment's multipart graph is reachable via the + * shared {@link MemoryDataStorageManager} (the same property the + * production refusal gate keys off in the issue #617 scenario, + * where {@code force=false} prevents accidental deletes).
  • + *
  • Force override removes the segment — with {@code force=true} + * the primary's in-memory segment count drops, the IS re-publishes + * a fresh {@link IndexingServiceCheckpointState}, the shadow's + * reload counter advances, and the shadow's loaded segment count + * on the dropped-segment store matches the primary's (zero).
  • + *
  • purge_storage=true also deletes the multipart files — after + * the force-delete the IS reports {@code storage_purged=true} and a + * subsequent existence probe against the storage manager returns + * false for both the {@code graph} and {@code map} keys.
  • + *
+ * + *

This test deliberately calls + * {@link IndexingServiceEngine#deleteSegment(String, String, String, boolean, boolean)} + * directly rather than going through the gRPC layer — the gRPC wire path + * is covered by {@code herddb.indexing.admin.IndexingAdminCliDeleteSegmentTest} + * (a focused fake-gRPC test). Coupling both layers in a single test would + * add cluster-mode wiring (an in-process gRPC server) without exercising + * a code path the CLI test does not already cover. + */ +public class ShadowDeleteSegmentE2ETest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private ZKTestEnv zk; + private ZookeeperMetadataStorageManager metadata; + private int savedMinLive; + private long savedDeferral; + private final List shadowMetadatas = new ArrayList<>(); + + @Before + public void setUp() throws Exception { + zk = new ZKTestEnv(folder.newFolder("zk").toPath()); + zk.startBookieAndInitCluster(); + metadata = new ZookeeperMetadataStorageManager( + zk.getAddress(), zk.getTimeout(), zk.getPath()); + metadata.start(); + metadata.ensureDefaultTableSpace("local", "local", 0, 1); + + savedMinLive = PersistentVectorStore.minLiveVectorsForCheckpoint; + savedDeferral = PersistentVectorStore.maxCheckpointDeferralMs; + // Same gates lifted as in ShadowE2ETest so a tiny workload still + // produces a real persisted segment. + PersistentVectorStore.minLiveVectorsForCheckpoint = 0; + } + + @After + public void tearDown() throws Exception { + PersistentVectorStore.minLiveVectorsForCheckpoint = savedMinLive; + PersistentVectorStore.maxCheckpointDeferralMs = savedDeferral; + for (ZookeeperMetadataStorageManager sm : shadowMetadatas) { + try { + sm.close(); + } catch (Exception ignore) { + // teardown best-effort — match ShadowE2ETest pattern + } + } + if (metadata != null) { + try { + metadata.close(); + } catch (Exception ignore) { + // teardown best-effort + } + } + if (zk != null) { + zk.close(); + } + } + + private static Table createTable() { + return Table.builder() + .name("vectable") + .tablespace(TableSpace.DEFAULT) + .column("pk", ColumnTypes.STRING) + .column("vec", ColumnTypes.FLOATARRAY) + .primaryKey("pk") + .build(); + } + + private static Index createIndex(String uuid) { + return Index.builder() + .name("vidx").uuid(uuid) + .table("vectable") + .type(Index.TYPE_VECTOR) + .column("vec", ColumnTypes.FLOATARRAY) + .build(); + } + + private static float[] randomVector(Random rng, int dim) { + float[] v = new float[dim]; + for (int i = 0; i < dim; i++) { + v[i] = rng.nextFloat(); + } + return v; + } + + private IndexingServiceEngine newPrimary(MemoryDataStorageManager dsm, MemoryManager mm, + String stableUuid, + AtomicReference storeRef) throws Exception { + Path logDir = folder.newFolder().toPath(); + Path dataDir = folder.newFolder().toPath(); + Properties props = new Properties(); + props.setProperty(IndexingServerConfiguration.PROPERTY_STORAGE_TYPE, "memory"); + props.setProperty(IndexingServerConfiguration.PROPERTY_TABLESPACE_NAME, TableSpace.DEFAULT); + IndexingServerConfiguration config = new IndexingServerConfiguration(props); + IndexingServiceEngine engine = new IndexingServiceEngine(logDir, dataDir, config); + engine.setMetadataStorageManager(metadata); + engine.setDataStorageManager(dsm); + engine.setMemoryManager(mm); + engine.setVectorStoreFactory((indexName, tableName, vectorColumnName, dataDirArg, indexProperties) -> { + PersistentVectorStore pvs = new PersistentVectorStore( + indexName, tableName, engine.getTableSpaceUUID(), vectorColumnName, + stableUuid, dataDirArg, dsm, mm, + 16, 100, 1.2f, 1.4f, true, 2_000_000_000L, 0, + Long.MAX_VALUE, + VectorSimilarityFunction.EUCLIDEAN); + try { + pvs.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + storeRef.set(pvs); + return pvs; + }); + return engine; + } + + private IndexingServiceEngine newShadow(MemoryDataStorageManager dsm, MemoryManager mm, + int shadowOf) throws Exception { + Path logDir = folder.newFolder().toPath(); + Path dataDir = folder.newFolder().toPath(); + Properties props = new Properties(); + props.setProperty(IndexingServerConfiguration.PROPERTY_STORAGE_TYPE, "memory"); + props.setProperty(IndexingServerConfiguration.PROPERTY_TABLESPACE_NAME, TableSpace.DEFAULT); + props.setProperty(IndexingServerConfiguration.PROPERTY_ROLE, + IndexingServerConfiguration.ROLE_SHADOW); + props.setProperty(IndexingServerConfiguration.PROPERTY_SHADOW_OF, Integer.toString(shadowOf)); + props.setProperty(IndexingServerConfiguration.PROPERTY_NUM_INSTANCES, "1"); + IndexingServerConfiguration config = new IndexingServerConfiguration(props); + IndexingServiceEngine engine = new IndexingServiceEngine(logDir, dataDir, config); + ZookeeperMetadataStorageManager perShadow = new ZookeeperMetadataStorageManager( + zk.getAddress(), zk.getTimeout(), zk.getPath()); + perShadow.start(); + shadowMetadatas.add(perShadow); + engine.setMetadataStorageManager(perShadow); + engine.setDataStorageManager(dsm); + engine.setMemoryManager(mm); + return engine; + } + + private void seedDsmSchema(MemoryDataStorageManager dsm, String tsUuid, Table table, Index idx) + throws Exception { + dsm.writeTables(tsUuid, LogSequenceNumber.START_OF_TIME, + Arrays.asList(table), Arrays.asList(idx), false); + } + + /** + * Full lifecycle test: + * 1. Primary ingests 128 vectors, checkpoints, and produces ≥1 segment. + * 2. A shadow starts and catches up (reloadCount == 1). + * 3. {@code deleteSegment(force=false)} is refused — graph file present. + * 4. {@code deleteSegment(force=true, purge_storage=true)} succeeds: + * - primary's {@code segmentCount} drops by one; + * - storage_purged == true; + * - graph + map files are no longer reachable in the DSM. + * 5. The shadow observes the new checkpoint state (reloadCount advances). + */ + @Test + public void deleteSegmentReducesPrimaryCountAndNotifiesShadow() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(128 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore); + primary.start(); + + Table t = createTable(); + Index idx = createIndex(stableUuid); + primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + Random rng = new Random(617); + int dim = 16; + LogSequenceNumber last = null; + for (int i = 0; i < 128; i++) { + Record r = RecordSerializer.makeRecord(t, + "pk", "k" + i, "vec", randomVector(rng, dim)); + LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null); + last = new LogSequenceNumber(1, 100 + i); + primary.applySingleEntryForTest(last, ins); + } + primary.awaitPendingWorkForTest(); + primary.setLastProcessedLsnForTest(last); + primary.forceCheckpointAndSaveWatermark(); + + seedDsmSchema(dsm, primary.getTableSpaceUUID(), t, idx); + + metadata.registerIndexingServiceInstance( + IndexingServiceInstanceDescriptor.primary( + "p0", "dummy-addr:0", 0)); + IndexingServiceCheckpointState beforeDelete = + metadata.getIndexingServiceCheckpointState(0); + assertNotNull("primary must have published checkpoint state", beforeDelete); + + // A real segment was created by the checkpoint. + PersistentVectorStore pvs = primaryStore.get(); + assertNotNull("primary store must be initialised", pvs); + int segmentCountBefore = pvs.getSegmentCount(); + assertTrue("primary checkpoint must produce ≥1 segment, got " + segmentCountBefore, + segmentCountBefore >= 1); + List keysBefore = pvs.getSegmentStorageKeysSnapshot(); + assertEquals(segmentCountBefore, keysBefore.size()); + String targetSegment = keysBefore.get(0); + + // Step 2: boot a shadow and let it catch up to reloadCount == 1. + IndexingServiceEngine shadow = newShadow(dsm, mm, 0); + shadow.start(); + try { + assertTrue("shadow must become ready", shadow.isShadowReady()); + assertEquals("shadow must have reloaded exactly once", + 1, shadow.getShadowReloadCount()); + + // Step 3: refuse the delete when force=false. The MemoryDataStorageManager + // honours multipartIndexFileExists via the default implementation, so the + // graph file is reachable → engine refuses. + try { + primary.deleteSegment("vectable", "vidx", targetSegment, + /* purgeStorage */ false, /* force */ false); + org.junit.Assert.fail("delete must be refused while graph file is present"); + } catch (IndexingServiceEngine.DeleteSegmentException expected) { + assertTrue("refusal message must mention force flag, got: " + expected.getMessage(), + expected.getMessage().toLowerCase().contains("force")); + } + // Confirm the refusal did not mutate state. + assertEquals("refused delete must NOT alter segment count", + segmentCountBefore, pvs.getSegmentCount()); + + long reloadCountBeforeForce = shadow.getShadowReloadCount(); + IndexingServiceCheckpointState stateBeforeForce = + metadata.getIndexingServiceCheckpointState(0); + assertNotNull(stateBeforeForce); + + // Step 4: force + purge. + IndexingServiceEngine.DeleteSegmentResult result = + primary.deleteSegment("vectable", "vidx", targetSegment, + /* purgeStorage */ true, /* force */ true); + assertTrue("force-delete must report removed=true", result.removed); + assertEquals(targetSegment, result.segment); + assertTrue("force-delete must report graph_file_present=true", + result.graphFilePresent); + assertTrue("force-delete with purge must report storage_purged=true", + result.storagePurged); + + assertEquals("primary segment count must have dropped by one", + segmentCountBefore - 1, pvs.getSegmentCount()); + + // Storage purge must have actually removed the multipart files. + assertFalse("graph multipart file must be gone after purge", + dsm.multipartIndexFileExists(primary.getTableSpaceUUID(), + targetSegment, "graph")); + assertFalse("map multipart file must be gone after purge", + dsm.multipartIndexFileExists(primary.getTableSpaceUUID(), + targetSegment, "map")); + + // The primary must have written a fresh checkpoint state to ZK + // as part of the deleteSegment path. We verify this directly + // before waiting on the shadow — a missing republish would + // mask a regression in the engine-level notification logic + // behind a generic "reloadCount didn't advance" failure. + IndexingServiceCheckpointState stateAfterDelete = + metadata.getIndexingServiceCheckpointState(0); + assertNotNull(stateAfterDelete); + assertEquals("post-delete state must carry the new segment count", + pvs.getSegmentCount(), stateAfterDelete.getSegmentCount()); + assertTrue("post-delete state timestamp must advance, before=" + + stateBeforeForce.getTimestampMillis() + + " after=" + stateAfterDelete.getTimestampMillis(), + stateAfterDelete.getTimestampMillis() >= stateBeforeForce.getTimestampMillis()); + + // Step 5: shadow notification — wait for reloadCount to advance. + // 30s deadline is consistent with awaitShadowReloadsForTest()'s + // internal 30s barrier (the watch may arrive slightly after the + // setData call returns). + long deadline = System.currentTimeMillis() + 30_000L; + while (shadow.getShadowReloadCount() <= reloadCountBeforeForce + && System.currentTimeMillis() < deadline) { + Thread.sleep(50); + } + assertTrue("shadow must react to the post-delete republish, " + + "reloadCount before=" + reloadCountBeforeForce + + " now=" + shadow.getShadowReloadCount(), + shadow.getShadowReloadCount() > reloadCountBeforeForce); + } finally { + shadow.close(); + primary.close(); + } + } + + /** + * Force-delete WITHOUT {@code purge_storage}: the in-memory removal must + * still succeed, but the engine must report {@code storage_purged=false} + * and the multipart files must remain in the DSM (for post-mortem + * forensics, as documented in issue #617). + */ + @Test + public void forceDeleteWithoutPurgeKeepsMultipartFiles() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(128 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-nopurge"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore); + primary.start(); + + Table t = createTable(); + Index idx = createIndex(stableUuid); + primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + Random rng = new Random(2026); + int dim = 12; + LogSequenceNumber last = null; + for (int i = 0; i < 64; i++) { + Record r = RecordSerializer.makeRecord(t, + "pk", "k" + i, "vec", randomVector(rng, dim)); + LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null); + last = new LogSequenceNumber(1, 200 + i); + primary.applySingleEntryForTest(last, ins); + } + primary.awaitPendingWorkForTest(); + primary.setLastProcessedLsnForTest(last); + primary.forceCheckpointAndSaveWatermark(); + seedDsmSchema(dsm, primary.getTableSpaceUUID(), t, idx); + metadata.registerIndexingServiceInstance( + IndexingServiceInstanceDescriptor.primary("p0", "dummy:0", 0)); + + try { + PersistentVectorStore pvs = primaryStore.get(); + assertNotNull(pvs); + List keys = pvs.getSegmentStorageKeysSnapshot(); + assertTrue("must have at least one segment", keys.size() >= 1); + String target = keys.get(0); + + IndexingServiceEngine.DeleteSegmentResult result = primary.deleteSegment( + "vectable", "vidx", target, + /* purgeStorage */ false, /* force */ true); + + assertTrue(result.removed); + assertFalse("storage_purged must be false when purge_storage=false", + result.storagePurged); + // Files must still be on disk for forensics. + assertTrue("graph file must remain in DSM when purge_storage=false", + dsm.multipartIndexFileExists(primary.getTableSpaceUUID(), target, "graph")); + } finally { + primary.close(); + } + } + + /** + * Negative path: requesting a segment that is not loaded must throw + * {@link IndexingServiceEngine.DeleteSegmentException} with a message + * that lists the currently-loaded segments. This is the operator- + * friendly counterpart of the issue #617 reproduction, where the + * operator pastes the wrong segment name. + */ + @Test + public void unknownSegmentIsRejectedWithDiagnosticMessage() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-unknown"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore); + primary.start(); + try { + Table t = createTable(); + Index idx = createIndex(stableUuid); + primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + // No checkpoint → store has no segments. Still a valid lookup target. + + try { + primary.deleteSegment("vectable", "vidx", "vidx_nonsense_seg999", + false, true); + org.junit.Assert.fail("unknown segment must be rejected"); + } catch (IndexingServiceEngine.DeleteSegmentException expected) { + assertTrue("error must mention the missing segment, got: " + expected.getMessage(), + expected.getMessage().contains("vidx_nonsense_seg999")); + } + } finally { + primary.close(); + } + } +} diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java new file mode 100644 index 000000000..7f0d1fb2d --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java @@ -0,0 +1,424 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ + +package herddb.indexing.admin; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import herddb.indexing.proto.DeleteSegmentRequest; +import herddb.indexing.proto.DeleteSegmentResponse; +import herddb.indexing.proto.IndexingServiceGrpc; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Issue #617: wire-level tests for the {@code delete-segment} sub-command of + * {@link IndexingAdminCli}. Each test boots a tiny in-process gRPC server + * whose {@code deleteSegment} implementation records the inbound request and + * returns a hand-crafted response, then drives the CLI's {@link + * IndexingAdminCli#run(String[])} entry point with the relevant flags + * (purge-storage, force, --yes, --json, stdin confirmation) and asserts on: + * + *

    + *
  • the exact wire request the CLI emitted (so the operator's flags + * reach the IS without translation drift);
  • + *
  • the process exit code (0 when the server reported + * {@code removed=true}, 1 when {@code removed=false} — so wrapper + * scripts can distinguish "I removed it" from "it was already + * gone");
  • + *
  • the stdout payload (text vs. JSON);
  • + *
  • the stdin-confirmation behaviour (aborts cleanly on "n", + * proceeds on "y", and aborts when stdin is closed).
  • + *
+ * + *

Engine-level coverage (a real {@link herddb.indexing.vector.PersistentVectorStore} + * with real segments + shadow notification on segment removal) lives in + * {@code herddb.indexing.ShadowDeleteSegmentE2ETest}. + */ +public class IndexingAdminCliDeleteSegmentTest { + + private FakeDeleteSegmentServer fakeServer; + private ByteArrayOutputStream stdoutBytes; + private ByteArrayOutputStream stderrBytes; + private IndexingAdminCli cli; + private InputStream savedStdin; + + @Before + public void setUp() { + stdoutBytes = new ByteArrayOutputStream(); + stderrBytes = new ByteArrayOutputStream(); + cli = new IndexingAdminCli( + new PrintStream(stdoutBytes, true, StandardCharsets.UTF_8), + new PrintStream(stderrBytes, true, StandardCharsets.UTF_8)); + savedStdin = System.in; + } + + @After + public void tearDown() throws InterruptedException { + System.setIn(savedStdin); + if (fakeServer != null) { + fakeServer.close(); + } + } + + private String stdout() { + return new String(stdoutBytes.toByteArray(), StandardCharsets.UTF_8); + } + + private String stderr() { + return new String(stderrBytes.toByteArray(), StandardCharsets.UTF_8); + } + + /** + * Happy path: {@code --yes} skips the confirmation prompt, and the + * server reports a successful removal. The CLI must forward every flag + * to the server (in particular {@code purge-storage} → {@code true} + * and {@code force} → {@code false}), exit with code 0, and emit JSON + * carrying every response field. + */ + @Test + public void happyPathYesPurgeJsonReportsRemovedAndExits0() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .setVectorsLost(12345L) + .setGraphFilePresent(false) + .setStoragePurged(req.getPurgeStorage()) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--tablespace", "ts", + "--table", "vectable", + "--index", "vidx", + "--segment", "vidx_aaa_seg627", + "--purge-storage", + "--yes", + "--json" + }); + assertEquals("CLI should succeed; stderr=\n" + stderr(), 0, rc); + + DeleteSegmentRequest captured = fakeServer.lastRequest(); + assertNotNull("server must have received the request", captured); + assertEquals("ts", captured.getTablespace()); + assertEquals("vectable", captured.getTable()); + assertEquals("vidx", captured.getIndex()); + assertEquals("vidx_aaa_seg627", captured.getSegment()); + assertTrue("CLI must forward --purge-storage", captured.getPurgeStorage()); + assertFalse("CLI must NOT set force when --force is absent", captured.getForce()); + + String out = stdout().trim(); + assertTrue("JSON output expected, got:\n" + out, out.startsWith("{") && out.endsWith("}")); + assertTrue("must include the segment name, got:\n" + out, + out.contains("\"segment\":\"vidx_aaa_seg627\"")); + assertTrue("must include removed=true, got:\n" + out, + out.contains("\"removed\":true")); + assertTrue("must include vectors_lost, got:\n" + out, + out.contains("\"vectors_lost\":12345")); + assertTrue("must include storage_purged=true, got:\n" + out, + out.contains("\"storage_purged\":true")); + } + + /** + * Text mode (no {@code --json}) — the CLI must emit a single + * human-readable line containing the same fields the JSON mode + * emits, in {@code key=value} form. + */ + @Test + public void textModeProducesSingleLineSummary() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .setVectorsLost(7L) + .setGraphFilePresent(true) + .setStoragePurged(false) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "vectable", + "--index", "vidx", + "--segment", "vidx_bbb_seg42", + "--force", + "--yes" + }); + assertEquals(0, rc); + String out = stdout(); + assertTrue("text output must include segment=, got:\n" + out, + out.contains("segment=vidx_bbb_seg42")); + assertTrue(out.contains("removed=true")); + assertTrue(out.contains("vectors_lost=7")); + assertTrue(out.contains("graph_file_present=true")); + assertTrue(out.contains("storage_purged=false")); + } + + /** + * When {@code --force} is set the CLI must forward {@code force=true} + * to the server. The server-side refusal is exercised by the engine + * E2E test; here we only check the wire-protocol forwarding. + */ + @Test + public void forceFlagIsForwardedToServer() throws Exception { + AtomicReference captured = new AtomicReference<>(); + fakeServer = new FakeDeleteSegmentServer(req -> { + captured.set(req); + return DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .setVectorsLost(0L) + .setGraphFilePresent(true) + .setStoragePurged(false) + .build(); + }); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg1", + "--force", + "--yes" + }); + assertEquals(0, rc); + assertTrue("CLI must forward --force as force=true", + captured.get().getForce()); + } + + /** + * Server-side refusal (mapped to {@code FAILED_PRECONDITION}) must + * surface as a non-zero CLI exit code and an error line on stderr — + * not a stack trace. + */ + @Test + public void serverRefusalIsReportedAsErrorWithNonZeroExit() throws Exception { + fakeServer = new FakeDeleteSegmentServer(req -> { + throw Status.FAILED_PRECONDITION + .withDescription("refusing to delete segment vidx_seg1: graph file IS reachable") + .asRuntimeException(); + }); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg1", + "--yes" + }); + assertTrue("CLI must exit non-zero on server refusal, got rc=" + rc, rc != 0); + String err = stderr(); + assertTrue("stderr must mention the failure, got:\n" + err, + err.contains("ERROR") || err.toLowerCase().contains("refus")); + } + + /** + * When the server returns {@code removed=false} (no-op race), the CLI + * exits with code 1 so wrapper scripts can tell "I removed it" from + * "it was already gone". + */ + @Test + public void removedFalseExitsWithCode1() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(false) + .setVectorsLost(0L) + .setGraphFilePresent(false) + .setStoragePurged(false) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_gone", + "--yes" + }); + assertEquals("removed=false must exit 1", 1, rc); + } + + /** + * Stdin confirmation: when {@code --yes} is absent the CLI must read + * one byte from stdin and abort on anything other than {@code y/Y}. + * The server must never receive a request in the aborted case. + */ + @Test + public void stdinPromptAbortsOnAnyNonYesInput() throws Exception { + AtomicReference captured = new AtomicReference<>(); + fakeServer = new FakeDeleteSegmentServer(req -> { + captured.set(req); + return DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .build(); + }); + fakeServer.start(); + + System.setIn(new ByteArrayInputStream("n\n".getBytes(StandardCharsets.UTF_8))); + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg1" + }); + assertEquals("rejected prompt must exit non-zero", 1, rc); + assertTrue("stderr must mention abort, got:\n" + stderr(), + stderr().toLowerCase().contains("abort")); + // Critical safety property: the RPC must NOT have been invoked. + assertEquals("server must not have received any request when user aborted", + null, captured.get()); + } + + /** + * {@code y\n} on stdin (without {@code --yes}) must proceed with the + * RPC. Verifies the inverse of the abort path above. + */ + @Test + public void stdinPromptProceedsOnYInput() throws Exception { + AtomicReference captured = new AtomicReference<>(); + fakeServer = new FakeDeleteSegmentServer(req -> { + captured.set(req); + return DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .setVectorsLost(1L) + .build(); + }); + fakeServer.start(); + + System.setIn(new ByteArrayInputStream("y\n".getBytes(StandardCharsets.UTF_8))); + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg2" + }); + assertEquals(0, rc); + assertNotNull("server must have received the request after y/Y", + captured.get()); + assertEquals("vidx_seg2", captured.get().getSegment()); + } + + /** + * Missing {@code --segment} flag must exit with usage code 2, not + * with a stack trace. + */ + @Test + public void missingSegmentFlagExits2() { + int rc = cli.run(new String[]{ + "delete-segment", + "--server", "localhost:1", + "--table", "t", + "--index", "i" + // intentionally missing --segment + }); + assertEquals(2, rc); + String err = stderr(); + assertTrue("stderr must mention the missing option, got:\n" + err, + err.toLowerCase().contains("segment")); + } + + // -------------------- harness -------------------- + + @FunctionalInterface + private interface DeleteSegmentHandler { + DeleteSegmentResponse handle(DeleteSegmentRequest request); + } + + private static final class FakeDeleteSegmentServer implements AutoCloseable { + private final DeleteSegmentImpl impl; + private Server server; + + FakeDeleteSegmentServer(DeleteSegmentHandler handler) { + this.impl = new DeleteSegmentImpl(handler); + } + + void start() throws IOException { + server = ServerBuilder.forPort(0).addService(impl).build().start(); + assertNotNull(server); + } + + String address() { + return "localhost:" + server.getPort(); + } + + DeleteSegmentRequest lastRequest() { + return impl.lastRequest.get(); + } + + @Override + public void close() throws InterruptedException { + if (server != null) { + server.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } + } + + private static final class DeleteSegmentImpl + extends IndexingServiceGrpc.IndexingServiceImplBase { + private final DeleteSegmentHandler handler; + private final AtomicReference lastRequest = new AtomicReference<>(); + + DeleteSegmentImpl(DeleteSegmentHandler handler) { + this.handler = handler; + } + + @Override + public void deleteSegment(DeleteSegmentRequest request, + StreamObserver responseObserver) { + lastRequest.set(request); + try { + DeleteSegmentResponse resp = handler.handle(request); + responseObserver.onNext(resp); + responseObserver.onCompleted(); + } catch (io.grpc.StatusRuntimeException e) { + responseObserver.onError(e); + } + } + } +} From 4a4f4a03fd961a2e56fbf77bc90b845708a61c67 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 20 May 2026 22:45:37 +0200 Subject: [PATCH 2/3] review: address pr-reviewer follow-ups for issue #617 delete-segment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Six follow-ups from the first pr-reviewer pass: 1. Add an owner-instance / shadow-target gate in IndexingServiceImpl.deleteSegment: short-circuit with FAILED_PRECONDITION ("not_primary: ... shadowOf=...") when the engine is configured as a shadow, so an accidental operator-targeted delete on a shadow cannot diverge it from its primary. Adds a fake-gRPC CLI test that asserts the rejection prefix and non-zero exit code. 2. Add RemoteFileDataStorageManagerProbeAndPurgeTest covering the probe-and-purge contract: absent → false, fully-present → true, deleteMultipartIndexFile is idempotent + invalidates the SegmentBlockCache (verified via the public containsBlock accessor), and a partial block-0 (≥ 4 bytes, truncated rest) still returns true — documenting the contract gap that --force covers. Fixes a latent bug in the default multipartIndexFileExists impl (DataStorageManager) where fileSize=1L would cause an infinite loop in RemoteRandomAccessReader.readFully; the default sentinel is now 1 GiB and RemoteFileDataStorageManager overrides the probe to use a wire-level readFileRange instead. 3. Add PersistentVectorStoreAdminDeleteSegmentTest with four cases: happy-path removal (segment gone + dirty marked), idempotent NOT_FOUND on second call, deferred-close drain (pendingSegmentCloses queue empties via opportunistic drain), and compaction-cycle race safety (the storage-key drop path inherits issue #535's deferred-close protocol → no CompactionException). Surfaces the queue size via a new test-only accessor on PersistentVectorStore. 4. Add lateBootShadowObservesPostDeleteState to ShadowDeleteSegmentE2ETest: primary deletes a segment BEFORE the shadow is ever started, then the shadow boots and is verified to load the smaller (post-delete) segment count on its very first reload (shadowReloadCount == 1). Surfaces the shadow's segment count via a new IndexingServiceEngine test accessor that works for both PersistentVectorStore (primary) and ReadOnlyVectorStore (shadow). 5. Resolve the vectors_lost=-1 doc drift: the race path (segment disappeared between snapshot and drop) now returns -1L so operators can distinguish "removed 0 vectors" from "did not remove anything and cannot tell what would have been lost". Proto comment updated accordingly. 6. Tighten the rejection message in IndexingServiceEngine.deleteSegment so when store instanceof ReadOnlyVectorStore it says "this instance is a shadow replica — target the primary indexing service" rather than the generic "non-persistent" message. Belt-and-braces in case the IS-level gate at #1 is ever bypassed. Co-Authored-By: Claude Opus 4.7 --- .../herddb/storage/DataStorageManager.java | 19 +- .../indexing/IndexingServiceEngine.java | 53 ++- .../herddb/indexing/IndexingServiceImpl.java | 17 + .../vector/PersistentVectorStore.java | 11 + .../src/main/proto/indexing_service.proto | 8 +- .../indexing/ShadowDeleteSegmentE2ETest.java | 107 ++++++ .../IndexingAdminCliDeleteSegmentTest.java | 36 ++ ...tentVectorStoreAdminDeleteSegmentTest.java | 316 ++++++++++++++++++ .../remote/RemoteFileDataStorageManager.java | 40 +++ ...leDataStorageManagerProbeAndPurgeTest.java | 279 ++++++++++++++++ 10 files changed, 876 insertions(+), 10 deletions(-) create mode 100644 herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java create mode 100644 herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java diff --git a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java index ac5b26087..f9aa3dbf9 100644 --- a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java +++ b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java @@ -287,13 +287,22 @@ public abstract void deleteMultipartIndexFile(String tableSpace, String uuid, St */ public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) { // We do not know the real file size here; pass a sentinel value - // large enough to cover the first block. The default - // multipartIndexReaderSupplier implementations only use fileSize - // to compute block boundaries, so a too-large value is harmless - // for the single-byte read we are about to attempt. + // larger than any plausible block-0 footprint plus 4 bytes for the + // probe. Passing a too-small value (e.g. 1L) breaks the default + // multipartIndexReaderSupplier readers because readFully then sees + // available==0 once position reaches the fake totalSize and spins + // (pr-reviewer follow-up #2). Backends that can answer the question + // more cheaply (e.g. an S3 HEAD request, or a wire-level + // readFileRange) should override this method outright — + // RemoteFileDataStorageManager does exactly that. io.github.jbellis.jvector.disk.ReaderSupplier rs; try { - rs = multipartIndexReaderSupplier(tableSpace, uuid, fileType, 1L); + // 1 GiB is far larger than any real block-0 in production; the + // probe still only ever reads 4 bytes, so the inflated fileSize + // only affects the reader's internal end-of-file accounting and + // never causes the backend to fetch more bytes than necessary. + rs = multipartIndexReaderSupplier(tableSpace, uuid, fileType, + 1L * 1024 * 1024 * 1024); } catch (DataStorageManagerException e) { return false; } diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java index 8d256f694..aa432f7d3 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java @@ -2332,6 +2332,18 @@ public DeleteSegmentResult deleteSegment(String table, String indexName, String throw new DeleteSegmentException("index " + table + "." + indexName + " is not loaded"); } if (!(store instanceof PersistentVectorStore)) { + // pr-reviewer follow-up #6: a ReadOnlyVectorStore means we are + // running as a shadow replica (or have loaded a snapshot in + // read-only mode); the IS-level gate at IndexingServiceImpl + // .deleteSegment normally short-circuits these RPCs before they + // reach the engine, but we keep belt-and-braces here in case a + // future caller bypasses the gRPC layer. + if (store instanceof herddb.indexing.vector.ReadOnlyVectorStore) { + throw new DeleteSegmentException( + "index " + table + "." + indexName + + ": this instance is a shadow replica — target the primary" + + " indexing service"); + } throw new DeleteSegmentException( "index " + table + "." + indexName + " is non-persistent (" + store.getClass().getSimpleName() + "); has no on-disk segments"); @@ -2374,13 +2386,19 @@ public DeleteSegmentResult deleteSegment(String table, String indexName, String AbstractVectorStore.SegmentDropResult drop = pvs.dropSegmentByStorageKey(segmentStorageKey); if (!drop.removed) { // Race: a concurrent compaction swap removed the segment between - // our snapshot and the drop. Treat as a no-op success — the - // operator's intent (segment gone) has been satisfied. + // our snapshot and the drop. Treat as a no-op — the operator's + // intent (segment gone) has been satisfied, but we cannot compute + // the vectors_lost count because the segment handle is no longer + // accessible. Surface -1L per the proto contract so operators can + // distinguish "removed 0 vectors" from "did not remove anything + // and cannot tell what would have been lost" + // (pr-reviewer follow-up #5). LOGGER.log(Level.WARNING, "deleteSegment: segment {0} disappeared between snapshot and drop" - + " (concurrent compaction swap?); reporting no-op", + + " (concurrent compaction swap?); reporting no-op with" + + " vectors_lost=-1 (race path)", segmentStorageKey); - return new DeleteSegmentResult(segmentStorageKey, false, 0L, graphPresent, false); + return new DeleteSegmentResult(segmentStorageKey, false, -1L, graphPresent, false); } boolean storagePurged = false; @@ -3962,6 +3980,33 @@ public long getShadowReloadCount() { return shadowReloadCount.get(); } + /** + * Test-only accessor: returns the segment count of the loaded vector + * store for {@code table.indexName} regardless of whether it is a + * {@link PersistentVectorStore} (primary) or a + * {@link herddb.indexing.vector.ReadOnlyVectorStore} (shadow). + * Returns {@code -1} when the store is not loaded. + * + *

Added in pr-reviewer follow-up #4 for issue #617 so the + * {@code ShadowDeleteSegmentE2ETest.lateBootShadowObservesPostDeleteState} + * case can assert that a shadow booted AFTER a primary-side delete + * loads the smaller (post-delete) segment count, without having to + * unwrap the vector store map directly. + */ + public int getSegmentCountForTest(String table, String indexName) { + AbstractVectorStore store = vectorStores.get(storeKey(table, indexName)); + if (store == null) { + return -1; + } + if (store instanceof PersistentVectorStore) { + return ((PersistentVectorStore) store).getSegmentCount(); + } + if (store instanceof herddb.indexing.vector.ReadOnlyVectorStore) { + return ((herddb.indexing.vector.ReadOnlyVectorStore) store).getSegmentCount(); + } + return -1; + } + /** * Minimum {@code IndexStatus.generation} currently loaded across * every vector store this engine holds. Used by the retention diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java index 333ac00f2..4dba6edb3 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java @@ -707,6 +707,23 @@ public void deleteSegment(DeleteSegmentRequest request, + " purge_storage={4} force={5} (issue #617)", new Object[]{indexName, table, request.getTablespace(), segment, request.getPurgeStorage(), request.getForce()}); + // pr-reviewer follow-up #1: refuse the RPC up-front when this instance + // is configured as a shadow replica. Shadow instances mirror their + // primary's segment list via reloadFromIndexStatus(); accepting a + // delete-segment write on a shadow would silently diverge the two + // copies and the next reload would just put the segment back. The + // operator must target the primary instead. + if (engine.isConfiguredAsShadow()) { + String desc = "not_primary: this indexing-service instance is a shadow replica " + + "(shadowOf=" + engine.getShadowOfOrMinusOne() + + "); target the primary instead"; + LOGGER.log(Level.WARNING, + "DeleteSegment RPC rejected on shadow instance: {0}", desc); + responseObserver.onError(Status.FAILED_PRECONDITION + .withDescription(desc) + .asRuntimeException()); + return; + } try { IndexingServiceEngine.DeleteSegmentResult result = engine.deleteSegment( table, indexName, segment, diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java b/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java index 7f10f21f6..cf8130a17 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java @@ -3691,6 +3691,17 @@ public String getOnDiskSegmentExternalStorageKeyForTest(int idx) { return segments.get(idx).externalStorageKey; } + /** + * Test-only accessor for the deferred-close queue used by + * {@link #dropSegmentByUuid} and {@link #dropSegmentByStorageKey}. + * Returns the current number of queued (not-yet-closed) segments. + * Pr-reviewer follow-up #3 for issue #617 uses this to assert the + * queue eventually drains after the operator-driven drop. + */ + public int getPendingSegmentClosesSizeForTest() { + return pendingSegmentCloses.size(); + } + VectorSegment preloadCompactedSegment(SegmentWriteResult swr) throws IOException, DataStorageManagerException { VectorSegment seg = new VectorSegment(swr.segmentId); seg.segmentUuid = swr.segmentUuid; diff --git a/herddb-indexing-service/src/main/proto/indexing_service.proto b/herddb-indexing-service/src/main/proto/indexing_service.proto index 62ef0bc01..5c0b784a9 100644 --- a/herddb-indexing-service/src/main/proto/indexing_service.proto +++ b/herddb-indexing-service/src/main/proto/indexing_service.proto @@ -410,7 +410,13 @@ message DeleteSegmentResponse { // Whether the segment was found in the IS metadata and removed. bool removed = 2; // Number of live vectors lost as a result of the removal. -1 when - // the IS could not compute the number (e.g. the segment was not loaded). + // the IS could not compute the number — currently emitted only on the + // race path where a concurrent compaction swap removed the segment + // between the operator's snapshot and the engine's drop (the segment + // handle is gone, so its liveCount is no longer accessible). In every + // other no-op case (segment never matched, store not loaded) the engine + // throws DeleteSegmentException → FAILED_PRECONDITION instead of + // returning a response with vectors_lost=-1. int64 vectors_lost = 3; // Whether the segment's graph file was reachable in remote storage at // the moment the IS checked. Informational — operators use this to diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java index 17960d489..667a22c55 100644 --- a/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java +++ b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java @@ -463,4 +463,111 @@ public void unknownSegmentIsRejectedWithDiagnosticMessage() throws Exception { primary.close(); } } + + /** + * pr-reviewer follow-up #4 for issue #617: a shadow that boots AFTER + * the primary has already executed {@code deleteSegment(force=true, + * purge=true)} must load the smaller (post-delete) segment count on its + * very first reload — exactly the operational sequence operators + * follow when they delete an orphan segment on a primary, then bring + * up a fresh shadow from the cleaned-up checkpoint. + * + *

Asserts: + *

    + *
  1. the primary's segment count drops by one immediately;
  2. + *
  3. the shadow becomes ready;
  4. + *
  5. {@code shadowReloadCount == 1} (the cold-boot reload counts + * as the first observation);
  6. + *
  7. the shadow's loaded segment count matches the primary's + * post-delete count.
  8. + *
+ */ + @Test + public void lateBootShadowObservesPostDeleteState() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(128 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-lateboot"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore); + primary.start(); + + Table t = createTable(); + Index idx = createIndex(stableUuid); + primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + Random rng = new Random(617_42); + int dim = 12; + LogSequenceNumber last = null; + // Build enough vectors that a single checkpoint produces ≥1 segment, + // then a SECOND checkpoint produces another segment so the delete + // below has more than one segment to choose from. We then delete + // exactly one and verify the shadow loads the remaining count. + for (int batch = 0; batch < 2; batch++) { + for (int i = 0; i < 64; i++) { + Record r = RecordSerializer.makeRecord(t, + "pk", "k" + batch + "_" + i, "vec", randomVector(rng, dim)); + LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null); + last = new LogSequenceNumber(1, 300 + batch * 100 + i); + primary.applySingleEntryForTest(last, ins); + } + primary.awaitPendingWorkForTest(); + primary.setLastProcessedLsnForTest(last); + primary.forceCheckpointAndSaveWatermark(); + } + seedDsmSchema(dsm, primary.getTableSpaceUUID(), t, idx); + metadata.registerIndexingServiceInstance( + IndexingServiceInstanceDescriptor.primary("p0", "dummy:0", 0)); + + IndexingServiceEngine shadow = null; + try { + PersistentVectorStore pvs = primaryStore.get(); + assertNotNull(pvs); + int beforeDelete = pvs.getSegmentCount(); + assertTrue("primary must have ≥2 segments before the delete to make the" + + " post-delete comparison non-trivial, got " + beforeDelete, + beforeDelete >= 2); + + List keys = pvs.getSegmentStorageKeysSnapshot(); + String victim = keys.get(0); + + // Step 1: primary-side delete BEFORE the shadow is ever + // started. force=true + purge=true is the operational + // sequence for orphaned segments. + IndexingServiceEngine.DeleteSegmentResult result = primary.deleteSegment( + "vectable", "vidx", victim, + /* purgeStorage */ true, /* force */ true); + assertTrue("primary-side delete must succeed", result.removed); + assertTrue("graph file was present before the delete", result.graphFilePresent); + assertTrue("purge=true must report storage_purged=true", result.storagePurged); + + int afterDelete = pvs.getSegmentCount(); + assertEquals("primary segment count must drop by exactly one", + beforeDelete - 1, afterDelete); + + // Step 2: only now boot the shadow. The shadow has never seen + // the pre-delete IndexStatus — it loads the (already smaller) + // post-delete one on its very first reload. + shadow = newShadow(dsm, mm, 0); + shadow.start(); + assertTrue("late-boot shadow must become ready", shadow.isShadowReady()); + + assertEquals("shadow must have reloaded exactly once on cold boot", + 1L, shadow.getShadowReloadCount()); + + // Step 3: the shadow's loaded segment count must match the + // primary's POST-delete count. A regression that leaks the + // pre-delete IndexStatus into the shadow's load path would + // produce a mismatch here. + int shadowSegments = shadow.getSegmentCountForTest("vectable", "vidx"); + assertEquals("late-boot shadow must observe the post-delete segment count", + afterDelete, shadowSegments); + } finally { + if (shadow != null) { + shadow.close(); + } + primary.close(); + } + } } diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java index 7f0d1fb2d..f6735b65d 100644 --- a/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java +++ b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java @@ -344,6 +344,42 @@ public void stdinPromptProceedsOnYInput() throws Exception { assertEquals("vidx_seg2", captured.get().getSegment()); } + /** + * pr-reviewer follow-up #1: when the targeted IS is a shadow replica, + * the server short-circuits the RPC with the exact prefix + * {@code "not_primary: this indexing-service instance is a shadow + * replica (shadowOf=...)"}. The CLI must surface this as a non-zero + * exit code with the rejection text on stderr. + */ + @Test + public void shadowConfiguredServerRejectsWithNotPrimaryPrefix() throws Exception { + // The fake server emulates the IndexingServiceImpl.deleteSegment + // shadow gate: it never invokes the engine, just returns + // FAILED_PRECONDITION with the exact "not_primary:" message. + fakeServer = new FakeDeleteSegmentServer(req -> { + throw Status.FAILED_PRECONDITION + .withDescription("not_primary: this indexing-service instance is a" + + " shadow replica (shadowOf=0); target the primary instead") + .asRuntimeException(); + }); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg1", + "--yes" + }); + assertTrue("CLI must exit non-zero on shadow rejection, got rc=" + rc, rc != 0); + String err = stderr(); + assertTrue("stderr must carry the not_primary: prefix, got:\n" + err, + err.contains("not_primary:")); + assertTrue("stderr must mention shadowOf=, got:\n" + err, + err.contains("shadowOf=")); + } + /** * Missing {@code --segment} flag must exit with usage code 2, not * with a stack trace. diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java new file mode 100644 index 000000000..628231f40 --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java @@ -0,0 +1,316 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +package herddb.indexing.vector; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import herddb.core.MemoryManager; +import herddb.mem.MemoryDataStorageManager; +import herddb.utils.Bytes; +import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import java.nio.file.Path; +import java.util.List; +import java.util.Random; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * pr-reviewer follow-up #3 for issue #617: direct tests for + * {@link PersistentVectorStore#dropSegmentByStorageKey(String)} — the engine + * primitive that the operator-facing {@code DeleteSegment} RPC ultimately + * invokes after the safety gate, snapshot/refusal checks, and audit log. + * + *

Engine-level coverage (request validation + checkpoint republish + + * shadow notification) lives in + * {@code herddb.indexing.ShadowDeleteSegmentE2ETest}; CLI/wire coverage in + * {@code herddb.indexing.admin.IndexingAdminCliDeleteSegmentTest}. This + * class is dedicated to the in-memory mutation primitive so a regression in + * the lock discipline / dirty-flag / deferred-close path is caught with the + * smallest possible harness. + * + *

Coverage matrix: + *

    + *
  1. (a) happy path: a known segment is removed (gone from + * {@code segments}, store marked {@code dirty} for the next + * checkpoint);
  2. + *
  3. (b) idempotence: a second call on the same key returns + * {@code SegmentDropResult.NOT_FOUND} without throwing and without + * mutating state;
  4. + *
  5. (c) deferred-close drain: after the drop the + * {@code pendingSegmentCloses} queue eventually empties — i.e. the + * segment handle is closed (opportunistic drain runs while no + * compaction holds the lock);
  6. + *
  7. (d) compaction-race safety: when a cycle is mid-iteration + * (after candidate selection, before {@code rebuildSegment}), a + * concurrent drop must NOT cause + * {@code CompactionException(CORRUPTION)} — same invariant as issue + * #535 inherited by the storage-key path.
  8. + *
+ */ +public class PersistentVectorStoreAdminDeleteSegmentTest { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static final int DIM = 16; + private int savedMinLive; + + @Before + public void disableDeferral() { + savedMinLive = PersistentVectorStore.minLiveVectorsForCheckpoint; + // Allow tiny checkpoints to seal so we always produce ≥1 on-disk + // segment with a handful of inserts. + PersistentVectorStore.minLiveVectorsForCheckpoint = 0; + } + + @After + public void restoreDeferral() { + PersistentVectorStore.minLiveVectorsForCheckpoint = savedMinLive; + } + + private static float[] vec(Random rng) { + float[] v = new float[DIM]; + for (int i = 0; i < DIM; i++) { + v[i] = rng.nextFloat(); + } + return v; + } + + private PersistentVectorStore newStore(Path tmpDir, MemoryDataStorageManager dsm) { + MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + PersistentVectorStore store = new PersistentVectorStore( + "vidx-617", "testtable", "tstblspace", "vector_col", + tmpDir, dsm, mm, + 16, 100, 1.2f, 1.4f, true, 2_000_000_000L, 0, + /* compactionIntervalMs */ Long.MAX_VALUE, + VectorSimilarityFunction.EUCLIDEAN); + // Aggressive policy so the race-test below still has something to + // pick when the hook drops the chosen candidate. + store.configureCompaction(Long.MAX_VALUE, 1L, Long.MAX_VALUE, 2, + Integer.MAX_VALUE, 0); + return store; + } + + /** Seeds {@code n} on-disk segments via add+checkpoint cycles. */ + private void seedSegments(PersistentVectorStore store, int n) throws Exception { + Random rng = new Random(617); + for (int c = 0; c < n; c++) { + for (int i = 0; i < 30; i++) { + store.addVector(Bytes.from_int(c * 1000 + i), vec(rng)); + } + store.checkpoint(); + } + } + + /** + * (a) Happy path: a known segment key is removed, the result reports + * {@code removed=true} with a non-negative {@code vectorsLost}, the + * segment list shrinks by one, and {@code dirty} is set so the next + * checkpoint serialises the smaller list. + */ + @Test(timeout = 30_000) + public void happyPathRemovesSegmentAndMarksDirty() throws Exception { + Path tmpDir = tmpFolder.newFolder("admin-delete-happy").toPath(); + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + + try (PersistentVectorStore store = newStore(tmpDir, dsm)) { + store.start(); + seedSegments(store, 3); + + List keysBefore = store.getSegmentStorageKeysSnapshot(); + int countBefore = store.getSegmentCount(); + assertTrue("setup must produce ≥1 segment", countBefore >= 1); + assertEquals(countBefore, keysBefore.size()); + + String victim = keysBefore.get(0); + + // Force a checkpoint AFTER the drop so we can observe the dirty + // flag effect: clear the dirty flag first by performing a no-op + // checkpoint (which only writes when something has changed + // since the last save). If we drop then checkpoint, the + // smaller list must be persisted — verified by re-reading + // getSegmentStorageKeysSnapshot. + store.checkpoint(); // baseline; segment list unchanged + + AbstractVectorStore.SegmentDropResult result = + store.dropSegmentByStorageKey(victim); + + assertTrue("happy path must report removed=true", result.removed); + assertTrue("vectorsLost must be ≥ 0 on the happy path, got " + result.vectorsLost, + result.vectorsLost >= 0L); + + List keysAfter = store.getSegmentStorageKeysSnapshot(); + assertEquals("segment count must drop by one", countBefore - 1, keysAfter.size()); + assertFalse("victim must be gone from snapshot", keysAfter.contains(victim)); + + // The next checkpoint must actually serialise the change — + // a regression where the dirty flag is not set would leave + // the on-disk IndexStatus carrying the deleted segment. + store.checkpoint(); + assertEquals("post-checkpoint list still matches", + countBefore - 1, store.getSegmentCount()); + } + } + + /** + * (b) Idempotent NOT_FOUND on a second call: dropping the same key + * twice in a row must NOT throw and must NOT mutate state. The + * production CLI relies on this — wrapper scripts can be re-run after + * a partial failure without worrying about double-drop semantics. + */ + @Test(timeout = 30_000) + public void doubleDropReportsNotFoundWithoutMutatingState() throws Exception { + Path tmpDir = tmpFolder.newFolder("admin-delete-idempotent").toPath(); + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + + try (PersistentVectorStore store = newStore(tmpDir, dsm)) { + store.start(); + seedSegments(store, 2); + + List keysBefore = store.getSegmentStorageKeysSnapshot(); + assertTrue(keysBefore.size() >= 1); + String victim = keysBefore.get(0); + + // First call: removes the segment. + AbstractVectorStore.SegmentDropResult first = + store.dropSegmentByStorageKey(victim); + assertTrue(first.removed); + + int countAfterFirst = store.getSegmentCount(); + + // Second call: idempotent NOT_FOUND. State is unchanged. + AbstractVectorStore.SegmentDropResult second = + store.dropSegmentByStorageKey(victim); + assertFalse("second drop must report removed=false", second.removed); + assertEquals("NOT_FOUND vectorsLost must be 0", + 0L, second.vectorsLost); + assertEquals("segment count must NOT change on the second call", + countAfterFirst, store.getSegmentCount()); + + // The result is the documented sentinel. + assertEquals("second call must return the NOT_FOUND sentinel", + AbstractVectorStore.SegmentDropResult.NOT_FOUND.removed, + second.removed); + } + } + + /** + * (c) Deferred-close drain: the {@code pendingSegmentCloses} queue + * must eventually drain after the drop. With no compaction running, + * the opportunistic drain in {@code dropSegmentByStorageKey}'s + * {@code finally} block should empty the queue synchronously. + */ + @Test(timeout = 30_000) + public void pendingSegmentClosesQueueDrainsAfterDrop() throws Exception { + Path tmpDir = tmpFolder.newFolder("admin-delete-drain").toPath(); + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + + try (PersistentVectorStore store = newStore(tmpDir, dsm)) { + store.start(); + seedSegments(store, 2); + + // Idle state: queue must start empty. + assertEquals("queue must be empty before the drop", + 0, store.getPendingSegmentClosesSizeForTest()); + + List keysBefore = store.getSegmentStorageKeysSnapshot(); + assertTrue(keysBefore.size() >= 1); + String victim = keysBefore.get(0); + + AbstractVectorStore.SegmentDropResult result = + store.dropSegmentByStorageKey(victim); + assertTrue(result.removed); + + // After the drop, with no concurrent compaction holding + // compactionLock, the opportunistic drain must have emptied + // the queue. The drain runs in the same call's `finally` + // block, so it is synchronous from the caller's POV. + assertEquals("opportunistic drain must have closed the queued segment " + + "since no compaction was running", + 0, store.getPendingSegmentClosesSizeForTest()); + } + } + + /** + * (d) Compaction-race safety: when a cycle is mid-iteration (after + * candidate selection, before {@code rebuildSegment}), a concurrent + * {@code dropSegmentByStorageKey} on a candidate must NOT cause + * {@code CompactionException(CORRUPTION)}. This is the same + * invariant issue #535 fixed for {@code dropSegmentByUuid}; the + * storage-key path inherits the same deferred-close protocol so the + * same test shape applies. + */ + @Test(timeout = 30_000) + public void dropDuringCompactionMustNotCauseCorruption() throws Exception { + Path tmpDir = tmpFolder.newFolder("admin-delete-race").toPath(); + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + + try (PersistentVectorStore store = newStore(tmpDir, dsm)) { + store.start(); + seedSegments(store, 5); + + List keysBefore = store.getSegmentStorageKeysSnapshot(); + assertTrue("setup must produce ≥2 segments", keysBefore.size() >= 2); + String victim = keysBefore.get(0); + + long corruptionBefore = store.getCompactionFailuresCorruptionTotal(); + + // Wire the hook to fire the storage-key drop inside the cycle, + // AFTER candidates are picked but BEFORE rebuildSegment reads + // seg.onDiskGraph. Pre-issue-#535 the synchronous close inside + // the drop would have nulled out onDiskGraph and tripped a + // CORRUPTION failure; the deferred-close protocol queues the + // close until after the cycle's iteration completes. + store.setCompactionPostCandidateSelectionHookForTest(() -> { + AbstractVectorStore.SegmentDropResult r = + store.dropSegmentByStorageKey(victim); + // Hook drops the victim synchronously — the cycle still + // sees a non-null onDiskGraph because the close was + // deferred to the cycle's finally block. + assertNotNull("hook drop must have produced a result", r); + }); + + store.runCompactionCycle(); + + // CORE INVARIANT (the fix this code inherits from issue #535): + // NO CORRUPTION failure must have been recorded. + assertEquals("no CORRUPTION failure must be recorded — the storage-key" + + " drop path inherits issue #535's deferred-close protocol", + corruptionBefore, store.getCompactionFailuresCorruptionTotal()); + + // The victim must be gone from the active list — the drop + // already removed it under the writeLock before the hook + // returned. + assertFalse("victim must be removed from the active segment list", + store.getSegmentStorageKeysSnapshot().contains(victim)); + + // By the time runCompactionCycle returns, the deferred close + // must have drained. + assertEquals("pendingSegmentCloses queue must drain by the end of the" + + " cycle", + 0, store.getPendingSegmentClosesSizeForTest()); + } + } +} diff --git a/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java b/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java index 27efcb4b5..e124b9dbf 100644 --- a/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java +++ b/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java @@ -1045,6 +1045,46 @@ public io.github.jbellis.jvector.disk.ReaderSupplier multipartIndexReaderSupplie readerStatsLogger, segmentBlockCache); } + /** + * Issue #617 + pr-reviewer follow-up #2: backend-specific override of the + * {@code multipartIndexFileExists} probe. The default + * {@link DataStorageManager#multipartIndexFileExists} implementation + * builds a {@link RemoteRandomAccessReader} with a fictitious + * {@code fileSize=1L} and then calls {@code readInt()} on it — that + * combination triggers an infinite loop in + * {@link RemoteRandomAccessReader#readFully(byte[])} because the loop's + * {@code toCopy} becomes zero once {@code position} matches the fake + * {@code totalSize}. We bypass the reader entirely and probe block-0 + * directly via the file-server client's {@code readFileRange} API, + * which returns {@code null} when the underlying block is absent. + * + *

4 bytes is the same probe length the base contract documents (the + * default impl does {@code readInt()}), so the contract gap that + * {@code --force} covers — a block-0 of ≥4 bytes is enough to make the + * probe return {@code true} even on a truncated upload — is preserved. + */ + @Override + public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) { + String logicalPath = remoteMultipartPath(tableSpace, uuid, fileType); + int blockSize = Math.max(client.getBlockSize(), MULTIPART_BLOCK_SIZE); + try { + byte[] head = client.readFileRange(logicalPath, 0L, 4, blockSize); + // RemoteFileServiceClient.readFileRange returns null for missing + // blocks (see RemoteFileServiceTest.testWriteFileBlockAndReadFileRange). + // A non-null result with ≥ 1 byte means block-0 is reachable — + // matches the base contract's "best-effort presence check". + return head != null && head.length > 0; + } catch (RuntimeException e) { + // Some backends wrap NoSuchKey-style errors in unchecked + // exceptions; treat any failure during the probe as "missing" + // per the contract. + LOGGER.log(Level.FINE, + "multipartIndexFileExists: probe failed for {0}: {1}", + new Object[]{logicalPath, e.getMessage()}); + return false; + } + } + @Override public void deleteMultipartIndexFile(String tableSpace, String uuid, String fileType) throws DataStorageManagerException { diff --git a/herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java b/herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java new file mode 100644 index 000000000..26419db92 --- /dev/null +++ b/herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java @@ -0,0 +1,279 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package herddb.remote; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * pr-reviewer follow-up #2 for issue #617: dedicated probe-and-purge contract + * tests for {@link RemoteFileDataStorageManager#multipartIndexFileExists} + * (the safety gate the {@code DeleteSegment} engine path keys off) and + * {@link RemoteFileDataStorageManager#deleteMultipartIndexFile} (the + * {@code --purge-storage} purger). + * + *

The matrix: + *

    + *
  • (a) probe returns {@code false} when the multipart object is + * absent — the gate must not block a delete the operator initiated for + * an already-orphaned segment record;
  • + *
  • (b) probe returns {@code true} when the multipart object is + * fully present — the gate must protect the operator from deleting a + * healthy segment;
  • + *
  • (c) {@code deleteMultipartIndexFile} is idempotent and + * invalidates the {@link SegmentBlockCache} so a subsequent reader on + * the same logical path cannot serve stale bytes;
  • + *
  • (d) probe still returns {@code true} on a block-0 that is at + * least 4 bytes long but otherwise truncated/corrupted — this + * documents the contract gap that the operator's {@code --force} flag + * covers (the gate cannot distinguish "complete file" from "block-0 + * only", so {@code --force} remains required when the rest of the + * file is known to be corrupt).
  • + *
+ * + *

End-to-end coverage of {@code DeleteSegment} that exercises this gate + * indirectly lives in {@code herddb.indexing.ShadowDeleteSegmentE2ETest} + * (in the indexing-service module). + */ +public class RemoteFileDataStorageManagerProbeAndPurgeTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private RemoteFileServer server; + private RemoteFileServiceClient client; + private RemoteFileDataStorageManager storage; + + @Before + public void setUp() throws Exception { + server = new RemoteFileServer(0, folder.newFolder("remote").toPath()); + server.start(); + client = new RemoteFileServiceClient(Arrays.asList("localhost:" + server.getPort())); + storage = new RemoteFileDataStorageManager( + folder.newFolder("metadata").toPath(), + folder.newFolder("tmp").toPath(), + 1000, + client); + storage.start(); + } + + @After + public void tearDown() throws Exception { + if (storage != null) { + storage.close(); + } + if (client != null) { + client.close(); + } + if (server != null) { + server.stop(); + } + } + + /** + * Writes a synthetic graph file via the high-level multipart writer so + * the produced object matches exactly what {@code PersistentVectorStore} + * would have written at Phase B time (issue #617 reproduction shape). + */ + private long writeMultipartGraph(String tableSpace, String uuid, byte[] data) throws Exception { + Path tempFile = folder.newFile("graph-" + uuid + ".bin").toPath(); + Files.write(tempFile, data); + try { + storage.writeMultipartIndexFile(tableSpace, uuid, "graph", tempFile, null); + } finally { + Files.deleteIfExists(tempFile); + } + return data.length; + } + + /** + * (a) absent object → probe returns {@code false}. This is the path the + * issue #617 reproduction hits when the Phase B upload failed before + * block-0 was ever published: the IS metadata still references the + * segment, but the remote storage has no object for it. With the gate + * keying off this probe, {@code DeleteSegment} with {@code force=false} + * is accepted because the safety check (graph file IS reachable) does + * not fire. + */ + @Test(timeout = 30_000) + public void probeReturnsFalseWhenObjectIsAbsent() { + assertFalse("probe must return false when no multipart object exists", + storage.multipartIndexFileExists("ts1", "uuid-absent", "graph")); + } + + /** + * (b) fully-present object → probe returns {@code true}. This is the + * happy path: the gate must protect the operator from deleting a + * healthy segment without going through {@code --force}. + */ + @Test(timeout = 60_000) + public void probeReturnsTrueWhenObjectIsFullyPresent() throws Exception { + // Multi-block payload so the probe does not just inspect a partial + // block-0 — exactly matches the production "complete graph file" + // contract that the gate is supposed to detect. + byte[] payload = new byte[64 * 1024]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) (i & 0xFF); + } + writeMultipartGraph("ts1", "uuid-present", payload); + assertTrue("probe must return true when the multipart object exists", + storage.multipartIndexFileExists("ts1", "uuid-present", "graph")); + } + + /** + * (c) {@code deleteMultipartIndexFile} is idempotent and invalidates + * the block cache. We verify: + *

    + *
  1. two back-to-back calls on the same key succeed (the second + * must not throw — the multipart files are already gone);
  2. + *
  3. a block populated in the {@link SegmentBlockCache} via a real + * read is removed from the cache by the delete call, so a + * subsequent reader on the same logical path cannot serve stale + * bytes from a rewritten segment that happens to hit the same + * key.
  4. + *
+ * + *

{@link SegmentBlockCache} is {@code final}, so we install a real + * one with a small capacity and assert via the public + * {@link SegmentBlockCache#containsBlock(String, long, int)} accessor + * — that is exactly the same surface used in production to decide + * whether a block load is needed. + */ + @Test(timeout = 60_000) + public void deleteMultipartIndexFileIsIdempotentAndInvalidatesCache() throws Exception { + byte[] payload = new byte[8 * 1024]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) ((i * 31) & 0xFF); + } + writeMultipartGraph("ts1", "uuid-purge", payload); + assertTrue("precondition: object must exist before purge", + storage.multipartIndexFileExists("ts1", "uuid-purge", "graph")); + + // Install a real (enabled) SegmentBlockCache so we can observe the + // invalidation effect via the public containsBlock() accessor. + // 1 MiB capacity is plenty for the 8 KiB payload. + SegmentBlockCache cache = new SegmentBlockCache(1L * 1024 * 1024); + storage.setSegmentBlockCache(cache, + org.apache.bookkeeper.stats.NullStatsLogger.INSTANCE); + + // Force a block to land in the cache by running a real read. The + // RemoteRandomAccessReader populates the cache on first access via + // the supplier returned by multipartIndexReaderSupplier. + io.github.jbellis.jvector.disk.ReaderSupplier rs = + storage.multipartIndexReaderSupplier("ts1", "uuid-purge", "graph", + (long) payload.length); + try (io.github.jbellis.jvector.disk.RandomAccessReader r = rs.get()) { + r.seek(0L); + r.readInt(); // pulls block-0 into the cache + } + String logicalPath = "ts1/uuid-purge/multipart/graph"; + // The cache may use a writeBlockSize different from payload.length, + // so we cannot pin down the exact (offset, length) pair without + // duplicating the production block-sizing math. Instead, walk a + // small set of candidate block sizes via the public containsBlock + // helper — at least one of them must hit, otherwise the read + // above did not populate the cache and the test cannot prove the + // invalidation contract. + int[] candidateLengths = new int[]{ + 4 * 1024 * 1024, 1024 * 1024, 64 * 1024, payload.length + }; + boolean wasCached = false; + for (int len : candidateLengths) { + if (cache.containsBlock(logicalPath, 0L, len)) { + wasCached = true; + break; + } + } + assertTrue("precondition: a block must be cached after the read," + + " otherwise the invalidation assertion is vacuous", + wasCached); + + // First delete must succeed AND invalidate the cache. + storage.deleteMultipartIndexFile("ts1", "uuid-purge", "graph"); + assertFalse("object must be gone after delete", + storage.multipartIndexFileExists("ts1", "uuid-purge", "graph")); + for (int len : candidateLengths) { + assertFalse("cached block at length=" + len + + " must be evicted after deleteMultipartIndexFile", + cache.containsBlock(logicalPath, 0L, len)); + } + + // Second delete must also succeed (idempotent) — no exception, no + // surfaced error. The implementation logs at WARNING but does not + // propagate. + storage.deleteMultipartIndexFile("ts1", "uuid-purge", "graph"); + assertFalse("object must remain gone after the second delete call", + storage.multipartIndexFileExists("ts1", "uuid-purge", "graph")); + } + + /** + * (d) Documents the contract gap that {@code --force} covers: when + * block-0 is present but truncated/corrupted, the probe still returns + * {@code true}. The default probe reads the first 4 bytes via the + * multipart reader supplier — that succeeds as long as block-0 has at + * least 4 bytes, even if the rest of the file is missing or corrupted. + * + *

This is the exact reproduction of the issue #617 scenario where an + * S3 multipart upload published block-0 then failed on a later block: + * the operator running {@code delete-segment} without {@code --force} + * sees the safety gate trip ("graph file IS reachable"), reads the + * documentation, and re-runs with {@code --force} once they have + * confirmed via the file-server logs that the upload never completed. + */ + @Test(timeout = 30_000) + public void probeReturnsTrueOnPartialBlockZeroAtLeast4Bytes() throws Exception { + // Write just block-0 directly via the lower-level writeFileBlock RPC + // — that bypasses the multipart writer's "all blocks or none" + // semantics and reproduces the Phase B "upload failed after + // block-0" failure mode the issue #617 operator path covers. + // Must match remoteMultipartPath: tableSpace/uuid/multipart/fileType. + String logicalPath = "ts1/uuid-partial/multipart/graph"; + byte[] partial = new byte[16]; // > 4 bytes; the probe reads 4 bytes + for (int i = 0; i < partial.length; i++) { + partial[i] = (byte) i; + } + client.writeFileBlock(logicalPath, 0L, partial); + + // Sanity: the multipart writer would have written multiple blocks + // for a real graph file. Here only block-0 is present — yet the + // probe must still return true because it only inspects block-0. + assertTrue("probe returns true on a partial block-0 — documents the" + + " contract gap that --force covers (issue #617)", + storage.multipartIndexFileExists("ts1", "uuid-partial", "graph")); + + // Round-tripping the partial bytes through writeMultipartFile is + // not possible (the input stream framing would always produce a + // matching file); this direct block-0 write is the only way to + // produce a "partial" state in the test harness, and it mirrors + // exactly what an S3 backend produces on a partial multipart + // upload failure. + assertNotNull("client must still be wired for follow-up purge calls", client); + } + +} From 98f75f7c146a837fd0b073bc3c09617a9df6f832 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 20 May 2026 23:15:34 +0200 Subject: [PATCH 3/3] review: address pr-reviewer iteration-2 follow-ups for issue #617 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Iteration 3 of 3: - #1 (race-path test): add IndexingServiceEngineDeleteSegmentEdgeCasesTest with `engineReportsRaceSentinelWhenSegmentDroppedConcurrently` that drives the engine into the removed=false / vectors_lost=-1 branch via a new package-private hook `setPreDropRaceHookForTests` fired inside `deleteSegment` between the engine's snapshot+probe and its own drop. - #2 (engine-only ReadOnlyVectorStore rejection): same test class adds `engineRefusesDeleteOnReadOnlyVectorStoreWithDiagnosticMessage` that registers a real ReadOnlyVectorStore directly on the engine and asserts the rejection message contains both "shadow replica" and "primary". - #3 (CLI -1 rendering): IndexingAdminCli now renders negative vectors_lost as `vectors_lost=unknown (race)` in text mode and as the string `"unknown"` in JSON (key kept so the schema stays stable). Two new tests in IndexingAdminCliDeleteSegmentTest cover both forms. - #4 (real-server shadow gate): add ShadowDeleteSegmentRpcGateTest with a real IndexingServiceImpl + real gRPC server + ShadowStubEngine whose `deleteSegment` throws AssertionError — proving the production `not_primary:` gate at IndexingServiceImpl short-circuits before reaching the engine. Asserts FAILED_PRECONDITION + the documented prefix + "target the primary instead". - #5 (multipartIndexFileExists override): override added in PromotableRemoteFileDataStorageManager (delegates to activeDelegate) and ReadReplicaDataStorageManager (same readFileRange shortcut as RemoteFileDataStorageManager). - #6 (nit): replaced the remaining `herddb.indexing.vector.ReadOnlyVectorStore` FQN references in IndexingServiceEngine with the imported simple name. - #7 (nit): reformat the 1 GiB literal in DataStorageManager to `1L << 30`. Nit #8 (full hammer suite) was deliberately skipped per the agent's "Never run the full test suite" rule — none of the changes in this commit touch indexes/checkpoints/concurrency hot paths (the checkpoint-on-delete behaviour was already present in the prior commit). Pre-PR validation (`spotless:check apache-rat:check spotbugs:check install -DskipTests -Pci`) is green across all 22 reactor modules. Co-Authored-By: Claude Opus 4.7 --- .../herddb/storage/DataStorageManager.java | 2 +- .../indexing/IndexingServiceEngine.java | 39 +- .../indexing/admin/IndexingAdminCli.java | 31 +- ...rviceEngineDeleteSegmentEdgeCasesTest.java | 340 ++++++++++++++++++ .../ShadowDeleteSegmentRpcGateTest.java | 230 ++++++++++++ .../IndexingAdminCliDeleteSegmentTest.java | 90 +++++ ...romotableRemoteFileDataStorageManager.java | 16 + .../remote/ReadReplicaDataStorageManager.java | 27 ++ 8 files changed, 766 insertions(+), 9 deletions(-) create mode 100644 herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java create mode 100644 herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java diff --git a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java index f9aa3dbf9..a03818605 100644 --- a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java +++ b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java @@ -302,7 +302,7 @@ public boolean multipartIndexFileExists(String tableSpace, String uuid, String f // only affects the reader's internal end-of-file accounting and // never causes the backend to fetch more bytes than necessary. rs = multipartIndexReaderSupplier(tableSpace, uuid, fileType, - 1L * 1024 * 1024 * 1024); + 1L << 30); } catch (DataStorageManagerException e) { return false; } diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java index aa432f7d3..e3ae46148 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java @@ -2338,7 +2338,7 @@ public DeleteSegmentResult deleteSegment(String table, String indexName, String // .deleteSegment normally short-circuits these RPCs before they // reach the engine, but we keep belt-and-braces here in case a // future caller bypasses the gRPC layer. - if (store instanceof herddb.indexing.vector.ReadOnlyVectorStore) { + if (store instanceof ReadOnlyVectorStore) { throw new DeleteSegmentException( "index " + table + "." + indexName + ": this instance is a shadow replica — target the primary" @@ -2383,6 +2383,18 @@ public DeleteSegmentResult deleteSegment(String table, String indexName, String new Object[]{segmentStorageKey, table, indexName, graphPresent, force, purgeStorage}); + // pr-reviewer follow-up #1: test-only hook fired AFTER the engine has + // taken its snapshot+probe but BEFORE the engine's own drop call. A + // test can use this to simulate a concurrent compaction (or another + // operator-driven drop) racing the engine, and then assert that the + // engine reports the resulting "segment disappeared" race path + // correctly (removed=false, vectors_lost=-1). Strictly test-only, + // package-private — production callers never set this. + Runnable preDropHook = preDropRaceHookForTests; + if (preDropHook != null) { + preDropHook.run(); + } + AbstractVectorStore.SegmentDropResult drop = pvs.dropSegmentByStorageKey(segmentStorageKey); if (!drop.removed) { // Race: a concurrent compaction swap removed the segment between @@ -3612,6 +3624,27 @@ void setWarmupPauseHookForTest(Runnable hook) { this.warmupPauseHookForTest = hook; } + /** + * pr-reviewer follow-up #1 (issue #617): test-only hook fired inside + * {@link #deleteSegment} AFTER the engine has snapshotted the segment + * list and probed the multipart graph, but BEFORE the engine calls + * {@link PersistentVectorStore#dropSegmentByStorageKey} itself. Tests + * use this to drop the same segment from a parallel actor and then + * assert that {@code deleteSegment} reports the "segment disappeared + * between snapshot and drop" race path (removed=false, vectors_lost=-1). + * Strictly test-only — production never sets this. + */ + private volatile Runnable preDropRaceHookForTests = null; + + /** + * Installs (or clears) the pre-drop race hook above. Package-private: + * production code never calls this. + */ + // package-private for testing + void setPreDropRaceHookForTests(Runnable hook) { + this.preDropRaceHookForTests = hook; + } + /** * Body of the async warmup task: iterates the snapshot of persistent * stores and calls {@link PersistentVectorStore#warmUpBlockCache} on each. @@ -4001,8 +4034,8 @@ public int getSegmentCountForTest(String table, String indexName) { if (store instanceof PersistentVectorStore) { return ((PersistentVectorStore) store).getSegmentCount(); } - if (store instanceof herddb.indexing.vector.ReadOnlyVectorStore) { - return ((herddb.indexing.vector.ReadOnlyVectorStore) store).getSegmentCount(); + if (store instanceof ReadOnlyVectorStore) { + return ((ReadOnlyVectorStore) store).getSegmentCount(); } return -1; } diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java index 75225a726..4b0eb3492 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java @@ -854,19 +854,40 @@ private int runDeleteSegment(String[] args) throws Exception { cli.getOptionValue("table"), cli.getOptionValue("index"), segment, purge, force); + // pr-reviewer follow-up #3: the engine emits vectors_lost=-1 on the + // race path (segment dropped by a concurrent compaction between the + // engine's snapshot and the engine's drop call — the count is + // genuinely unknown, NOT zero). Render the sentinel as the explicit + // "unknown" string in both text and JSON so wrapper scripts don't + // mistake it for a real zero. We keep the field present in JSON + // (rather than omitting it) so the schema is stable; the field + // type widens from number to "number | \"unknown\"". + boolean vectorsLostUnknown = resp.getVectorsLost() < 0L; if (cli.hasOption("json")) { Map m = new LinkedHashMap<>(); m.put("segment", resp.getSegment()); m.put("removed", resp.getRemoved()); - m.put("vectors_lost", resp.getVectorsLost()); + if (vectorsLostUnknown) { + m.put("vectors_lost", "unknown"); + } else { + m.put("vectors_lost", resp.getVectorsLost()); + } m.put("graph_file_present", resp.getGraphFilePresent()); m.put("storage_purged", resp.getStoragePurged()); out.println(JsonWriter.toJson(m)); } else { - out.printf(Locale.ROOT, - "segment=%s removed=%s vectors_lost=%d graph_file_present=%s storage_purged=%s%n", - resp.getSegment(), resp.getRemoved(), resp.getVectorsLost(), - resp.getGraphFilePresent(), resp.getStoragePurged()); + if (vectorsLostUnknown) { + out.printf(Locale.ROOT, + "segment=%s removed=%s vectors_lost=unknown (race) " + + "graph_file_present=%s storage_purged=%s%n", + resp.getSegment(), resp.getRemoved(), + resp.getGraphFilePresent(), resp.getStoragePurged()); + } else { + out.printf(Locale.ROOT, + "segment=%s removed=%s vectors_lost=%d graph_file_present=%s storage_purged=%s%n", + resp.getSegment(), resp.getRemoved(), resp.getVectorsLost(), + resp.getGraphFilePresent(), resp.getStoragePurged()); + } } return resp.getRemoved() ? 0 : 1; } diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java new file mode 100644 index 000000000..d76cdb224 --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java @@ -0,0 +1,340 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package herddb.indexing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import herddb.codec.RecordSerializer; +import herddb.core.MemoryManager; +import herddb.indexing.vector.AbstractVectorStore; +import herddb.indexing.vector.PersistentVectorStore; +import herddb.indexing.vector.ReadOnlyVectorStore; +import herddb.log.LogEntry; +import herddb.log.LogEntryFactory; +import herddb.log.LogSequenceNumber; +import herddb.mem.MemoryDataStorageManager; +import herddb.model.ColumnTypes; +import herddb.model.Index; +import herddb.model.Record; +import herddb.model.Table; +import herddb.model.TableSpace; +import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import java.nio.file.Path; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Issue #617 + pr-reviewer follow-ups #1 and #2: engine-level edge cases for + * {@link IndexingServiceEngine#deleteSegment} that the CLI/gRPC tests in + * {@code herddb.indexing.admin.IndexingAdminCliDeleteSegmentTest} and the + * full E2E test in {@link ShadowDeleteSegmentE2ETest} do not cover. + * + *

    + *
  1. Race-path test (follow-up #1): drive the engine into the + * branch where {@link PersistentVectorStore#dropSegmentByStorageKey} + * returns {@code removed=false} after the engine has already + * snapshotted the storage key. We inject a parallel drop via a + * test-only hook that fires inside {@code deleteSegment} between the + * engine's snapshot/probe and the engine's own drop call. Assert that + * the engine reports the documented sentinel: + * {@code vectorsLost == -1L && removed == false}.
  2. + *
  3. Shadow / read-only rejection (follow-up #2): install a + * {@link ReadOnlyVectorStore} directly into the engine's + * {@code vectorStores} map (bypassing the gRPC layer entirely) and + * call {@link IndexingServiceEngine#deleteSegment} on it. The + * production gate at + * {@link IndexingServiceImpl#deleteSegment(herddb.indexing.proto.DeleteSegmentRequest, + * io.grpc.stub.StreamObserver)} short-circuits these RPCs at the + * service boundary, so this engine-level belt-and-braces refusal is + * reachable only by tests that drive the engine directly. The + * exception message must call out both "shadow replica" and "primary" + * so an operator who somehow reaches this branch (e.g. a future + * caller that bypasses the gRPC layer) gets actionable diagnostics. + *
+ */ +public class IndexingServiceEngineDeleteSegmentEdgeCasesTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private int savedMinLive; + + @Before + public void setUp() { + savedMinLive = PersistentVectorStore.minLiveVectorsForCheckpoint; + PersistentVectorStore.minLiveVectorsForCheckpoint = 0; + } + + @After + public void tearDown() { + PersistentVectorStore.minLiveVectorsForCheckpoint = savedMinLive; + } + + private static Table createTable() { + return Table.builder() + .name("vectable") + .tablespace(TableSpace.DEFAULT) + .column("pk", ColumnTypes.STRING) + .column("vec", ColumnTypes.FLOATARRAY) + .primaryKey("pk") + .build(); + } + + private static Index createIndex(String uuid) { + return Index.builder() + .name("vidx").uuid(uuid) + .table("vectable") + .type(Index.TYPE_VECTOR) + .column("vec", ColumnTypes.FLOATARRAY) + .build(); + } + + private static float[] randomVector(Random rng, int dim) { + float[] v = new float[dim]; + for (int i = 0; i < dim; i++) { + v[i] = rng.nextFloat(); + } + return v; + } + + private IndexingServiceEngine newEngine(MemoryDataStorageManager dsm, MemoryManager mm, + final String stableUuid, + final AtomicReference storeRef) throws Exception { + Path logDir = folder.newFolder().toPath(); + Path dataDir = folder.newFolder().toPath(); + Properties props = new Properties(); + props.setProperty(IndexingServerConfiguration.PROPERTY_STORAGE_TYPE, "memory"); + props.setProperty(IndexingServerConfiguration.PROPERTY_TABLESPACE_NAME, TableSpace.DEFAULT); + IndexingServerConfiguration config = new IndexingServerConfiguration(props); + final IndexingServiceEngine engine = new IndexingServiceEngine(logDir, dataDir, config); + + herddb.mem.MemoryMetadataStorageManager memMeta = new herddb.mem.MemoryMetadataStorageManager(); + memMeta.start(); + memMeta.ensureDefaultTableSpace("local", "local", 0, 1); + engine.setMetadataStorageManager(memMeta); + engine.setDataStorageManager(dsm); + engine.setMemoryManager(mm); + engine.setVectorStoreFactory((indexName, tableName, vectorColumnName, dataDirArg, indexProperties) -> { + PersistentVectorStore pvs = new PersistentVectorStore( + indexName, tableName, engine.getTableSpaceUUID(), vectorColumnName, + stableUuid, dataDirArg, dsm, mm, + 16, 100, 1.2f, 1.4f, true, 2_000_000_000L, 0, + Long.MAX_VALUE, + VectorSimilarityFunction.EUCLIDEAN); + try { + pvs.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (storeRef != null) { + storeRef.set(pvs); + } + return pvs; + }); + return engine; + } + + /** + * pr-reviewer follow-up #1: race path between the engine's snapshot and + * the engine's drop. The pre-drop hook removes the same segment from a + * parallel actor (here a direct {@code dropSegmentByStorageKey} call on + * the store, simulating a concurrent compaction swap that finished + * between the engine's two reads). The engine's own drop then sees the + * segment gone and must return the documented race-sentinel. + */ + @Test(timeout = 60_000) + public void engineReportsRaceSentinelWhenSegmentDroppedConcurrently() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-race"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine engine = newEngine(dsm, mm, stableUuid, primaryStore); + engine.start(); + try { + Table t = createTable(); + Index idx = createIndex(stableUuid); + engine.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + engine.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + Random rng = new Random(617); + int dim = 12; + LogSequenceNumber last = null; + // Two checkpoints → two segments, so even after the "concurrent" + // drop the snapshot list (taken before the hook) still contained + // the victim — without that, the up-front presence-check inside + // deleteSegment would throw DeleteSegmentException ("not + // registered") and we would never reach the race branch. + for (int batch = 0; batch < 2; batch++) { + for (int i = 0; i < 40; i++) { + Record r = RecordSerializer.makeRecord(t, + "pk", "k" + batch + "_" + i, "vec", randomVector(rng, dim)); + LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null); + last = new LogSequenceNumber(1, 100 + batch * 100 + i); + engine.applySingleEntryForTest(last, ins); + } + engine.awaitPendingWorkForTest(); + engine.setLastProcessedLsnForTest(last); + engine.forceCheckpointAndSaveWatermark(); + } + + final PersistentVectorStore pvs = primaryStore.get(); + assertNotNull("engine must have created the persistent store", pvs); + List keys = pvs.getSegmentStorageKeysSnapshot(); + assertTrue("test setup must produce ≥2 segments, got " + keys.size(), + keys.size() >= 2); + final String victim = keys.get(0); + + // Pre-drop race hook: fires inside engine.deleteSegment AFTER the + // snapshot+probe but BEFORE the engine's own drop. Removes the + // victim out from under the engine. The hook itself does NOT use + // a separate thread (the deferred-close protocol around + // dropSegmentByStorageKey is reentrant-safe) — the important + // invariant is just that the segment is gone by the time the + // engine's drop runs. + final AtomicReference sideDropRef = + new AtomicReference<>(); + engine.setPreDropRaceHookForTests(() -> { + AbstractVectorStore.SegmentDropResult r = pvs.dropSegmentByStorageKey(victim); + sideDropRef.set(r); + }); + + IndexingServiceEngine.DeleteSegmentResult result = engine.deleteSegment( + "vectable", "vidx", victim, + /* purgeStorage */ false, /* force */ true); + + // Pre-condition: the side-channel drop must have actually + // removed the segment, otherwise the race branch would not be + // exercised. + AbstractVectorStore.SegmentDropResult sideDrop = sideDropRef.get(); + assertNotNull("pre-drop hook must have fired", sideDrop); + assertTrue("pre-drop hook must have removed the segment", + sideDrop.removed); + + // The race-path documented contract: removed=false AND + // vectorsLost == -1L (sentinel for "unknown — cannot compute the + // count because the segment handle is no longer accessible"). + assertFalse("engine must report removed=false on the race path", + result.removed); + assertEquals("engine must surface vectors_lost=-1 on the race path", + -1L, result.vectorsLost); + assertEquals("response segment name must match the request", + victim, result.segment); + assertFalse("storage_purged must be false on the race path " + + "(engine bails out before touching storage)", + result.storagePurged); + + // And the segment is gone from the active list (the side-channel + // drop removed it). + assertFalse("victim must be gone from the active segment list", + pvs.getSegmentStorageKeysSnapshot().contains(victim)); + } finally { + engine.setPreDropRaceHookForTests(null); + engine.close(); + } + } + + /** + * pr-reviewer follow-up #2: engine-only refusal when the configured + * store is a {@link ReadOnlyVectorStore}. The production + * {@code IndexingServiceImpl.deleteSegment} short-circuits these RPCs + * with a {@code FAILED_PRECONDITION} BEFORE reaching the engine — this + * test bypasses that gate by calling {@code engine.deleteSegment(...)} + * directly to prove the engine's belt-and-braces guard is in place. + * + *

The exception message must mention both "shadow replica" and + * "primary" so a future caller that bypasses the gRPC layer still gets + * an actionable diagnostic. + */ + @Test(timeout = 30_000) + public void engineRefusesDeleteOnReadOnlyVectorStoreWithDiagnosticMessage() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-readonly"; + + IndexingServiceEngine engine = newEngine(dsm, mm, stableUuid, null); + engine.start(); + ReadOnlyVectorStore readOnly = null; + try { + Table t = createTable(); + Index idx = createIndex(stableUuid); + // Schema must exist so the engine's schema-tracker accepts the + // synthetic ReadOnlyVectorStore registration. + engine.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + engine.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + // Build a real ReadOnlyVectorStore (no segments, no data — the + // delete-segment refusal must fire on the type check alone, and + // must not depend on the store having any state). Use a + // dedicated tmp dir so the underlying PersistentVectorStore + // delegate does not collide with the in-flight production + // store. + Path roTmp = folder.newFolder("ro-store").toPath(); + readOnly = new ReadOnlyVectorStore( + "vidx", "vectable", engine.getTableSpaceUUID(), + "vec", stableUuid + "-ro", roTmp, + dsm, mm, + 16, 100, 1.2f, 1.4f, + /* fusedPQ */ true, + /* maxSegmentSize */ 2_000_000_000L, + /* maxLiveGraphSize */ 0, + VectorSimilarityFunction.EUCLIDEAN); + readOnly.start(); + + // Replace the (PersistentVectorStore) the factory just installed + // with the synthetic ReadOnlyVectorStore — this is the precise + // scenario the engine guard is meant to catch. + engine.registerIndexForTest(idx, readOnly); + + try { + engine.deleteSegment("vectable", "vidx", "any-segment-key", + /* purgeStorage */ false, /* force */ true); + fail("engine must refuse deleteSegment on a ReadOnlyVectorStore"); + } catch (IndexingServiceEngine.DeleteSegmentException expected) { + String msg = expected.getMessage(); + assertNotNull(msg); + assertTrue("refusal message must mention 'shadow replica', got: " + msg, + msg.contains("shadow replica")); + assertTrue("refusal message must mention 'primary' (so the operator" + + " knows where to retry), got: " + msg, + msg.contains("primary")); + } + } finally { + if (readOnly != null) { + try { + readOnly.close(); + } catch (Exception ignore) { + // best-effort teardown + } + } + engine.close(); + } + } +} diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java new file mode 100644 index 000000000..819a3b982 --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java @@ -0,0 +1,230 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package herddb.indexing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import herddb.indexing.proto.DeleteSegmentRequest; +import herddb.indexing.proto.IndexingServiceGrpc; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.nio.file.Paths; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * pr-reviewer follow-up #4 for issue #617: exercises the + * real-server shadow gate on the {@code DeleteSegment} RPC. + * + *

The existing + * {@code IndexingAdminCliDeleteSegmentTest.shadowConfiguredServerRejectsWithNotPrimaryPrefix} + * test runs against a hand-rolled gRPC fake that returns a hard-coded + * {@code FAILED_PRECONDITION} response — it validates the CLI's surface, + * not the production gate at + * {@link IndexingServiceImpl#deleteSegment(DeleteSegmentRequest, + * io.grpc.stub.StreamObserver)}. A regression that deleted the production + * gate would still pass that fake-server test. + * + *

This test mirrors the {@link ShadowAdminCliAndRpcTest} harness: a real + * {@link IndexingServiceImpl} wired to a real {@link IndexingServiceEngine} + * stub configured as a shadow ({@code isConfiguredAsShadow() == true}), + * exposed via an in-process gRPC server, invoked via the real blocking + * stub. The engine's {@code deleteSegment} is intentionally NOT overridden + * — the test would FAIL if it were ever reached, proving the production + * shadow gate at the service boundary is what's blocking the call. + * + *

Asserts the documented error contract: + *

    + *
  • {@code Status.Code == FAILED_PRECONDITION};
  • + *
  • description starts with + * {@code "not_primary: this indexing-service instance is a shadow replica (shadowOf="};
  • + *
  • description contains {@code "target the primary instead"}.
  • + *
+ */ +public class ShadowDeleteSegmentRpcGateTest { + + /** + * Minimal {@link IndexingServiceEngine} subclass: exposes shadow=true with + * a configurable shadowOf, and BLOWS UP if anything ever calls the + * engine's own {@code deleteSegment} — that would mean the service-level + * shadow gate was bypassed. + */ + private static final class ShadowStubEngine extends IndexingServiceEngine { + volatile boolean shadow = true; + volatile int shadowOf = 0; + + ShadowStubEngine() { + super(Paths.get("/tmp/noop-log-shadow-rpc"), + Paths.get("/tmp/noop-data-shadow-rpc"), + new IndexingServerConfiguration()); + } + + @Override + public boolean isConfiguredAsShadow() { + return shadow; + } + + @Override + public boolean isShadowReady() { + // Ready=true so the IndexingServiceImpl @Override interceptor for + // shadow-not-ready does NOT short-circuit our shadow gate ahead of + // time. Ready vs not-ready is orthogonal to the DeleteSegment gate + // — we want to prove the DeleteSegment gate fires even when the + // shadow is otherwise healthy. + return true; + } + + @Override + public int getShadowOfOrMinusOne() { + return shadowOf; + } + + @Override + public String getInstanceIdLabel() { + return "shadow-stub"; + } + + @Override + public String getTableSpaceUUID() { + return "ts-uuid"; + } + + @Override + public DeleteSegmentResult deleteSegment(String table, String indexName, + String segmentStorageKey, + boolean purgeStorage, boolean force) { + // If this is ever reached the service-level shadow gate was + // deleted or bypassed. The test asserts that this does NOT + // happen — the gRPC stub call must terminate inside + // IndexingServiceImpl.deleteSegment with FAILED_PRECONDITION, + // before this method is ever invoked. + throw new AssertionError("engine.deleteSegment must NOT be reached " + + "when the service is configured as a shadow — the production" + + " gate at IndexingServiceImpl.deleteSegment must short-circuit" + + " the RPC with not_primary:"); + } + } + + private ShadowStubEngine engine; + private Server server; + private ManagedChannel channel; + private IndexingServiceGrpc.IndexingServiceBlockingStub stub; + + @Before + public void setUp() throws Exception { + engine = new ShadowStubEngine(); + IndexingServiceImpl svc = new IndexingServiceImpl(engine, NullStatsLogger.INSTANCE); + server = ServerBuilder.forPort(0).addService(svc).build().start(); + channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()) + .usePlaintext().build(); + stub = IndexingServiceGrpc.newBlockingStub(channel); + } + + @After + public void tearDown() throws Exception { + if (channel != null) { + channel.shutdownNow(); + } + if (server != null) { + server.shutdownNow().awaitTermination(); + } + } + + /** + * pr-reviewer follow-up #4: the real-server gate must reject the RPC + * with the documented FAILED_PRECONDITION prefix WITHOUT ever entering + * the engine. ShadowStubEngine.deleteSegment throws AssertionError if + * reached — that fails the test, proving the gate is real. + */ + @Test + public void deleteSegmentRpcOnShadowReturnsNotPrimaryFailedPrecondition() { + engine.shadow = true; + engine.shadowOf = 0; + + DeleteSegmentRequest request = DeleteSegmentRequest.newBuilder() + .setTablespace("ts") + .setTable("vectable") + .setIndex("vidx") + .setSegment("vidx_aaa_seg1") + .setPurgeStorage(false) + .setForce(false) + .build(); + try { + stub.deleteSegment(request); + fail("DeleteSegment RPC on shadow must throw StatusRuntimeException " + + "with FAILED_PRECONDITION"); + } catch (StatusRuntimeException e) { + assertEquals("status code must be FAILED_PRECONDITION", + Status.Code.FAILED_PRECONDITION, e.getStatus().getCode()); + String desc = e.getStatus().getDescription(); + assertNotNull("status must carry a description", desc); + String expectedPrefix = "not_primary: this indexing-service instance is a" + + " shadow replica (shadowOf="; + assertTrue("status description must start with the documented " + + "not_primary prefix, got: " + desc, + desc.startsWith(expectedPrefix)); + assertTrue("status description must instruct the operator to target" + + " the primary, got: " + desc, + desc.contains("target the primary instead")); + // Sanity: shadowOf=0 from our stub must appear in the message. + assertTrue("status description must include the shadowOf integer, got: " + desc, + desc.contains("shadowOf=0")); + } + } + + /** + * Defensive: when shadow=true but shadowOf=-1 (configured-as-shadow + * without a numeric pointer), the gate still fires with the documented + * prefix. shadowOf="-1" still flows through the message verbatim. + */ + @Test + public void deleteSegmentRpcRejectsEvenWithoutKnownShadowOf() { + engine.shadow = true; + engine.shadowOf = -1; + + DeleteSegmentRequest request = DeleteSegmentRequest.newBuilder() + .setTable("vectable") + .setIndex("vidx") + .setSegment("vidx_aaa_seg2") + .build(); + try { + stub.deleteSegment(request); + fail("DeleteSegment RPC on shadow with shadowOf=-1 must still be" + + " rejected with FAILED_PRECONDITION"); + } catch (StatusRuntimeException e) { + assertEquals(Status.Code.FAILED_PRECONDITION, e.getStatus().getCode()); + String desc = e.getStatus().getDescription(); + assertNotNull(desc); + assertTrue("description must start with not_primary prefix, got: " + desc, + desc.startsWith("not_primary: this indexing-service instance is a" + + " shadow replica (shadowOf=")); + assertTrue("description must instruct to target the primary, got: " + desc, + desc.contains("target the primary instead")); + } + } +} diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java index f6735b65d..85fd00883 100644 --- a/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java +++ b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java @@ -380,6 +380,96 @@ public void shadowConfiguredServerRejectsWithNotPrimaryPrefix() throws Exception err.contains("shadowOf=")); } + /** + * pr-reviewer follow-up #3 (text output): when the server returns + * {@code vectors_lost=-1} (the engine's race-sentinel for "the segment + * disappeared between the snapshot and the drop call — count is + * unknown"), the CLI must NOT print the raw {@code -1} number. Operators + * read the field as a count, so {@code -1} would be confusing. + * Instead emit {@code vectors_lost=unknown (race)}. + */ + @Test + public void textOutputRendersNegativeVectorsLostAsUnknownRace() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(false) + .setVectorsLost(-1L) + .setGraphFilePresent(false) + .setStoragePurged(false) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "vectable", + "--index", "vidx", + "--segment", "vidx_race_seg", + "--yes" + }); + // removed=false → rc=1 (existing contract from removedFalseExitsWithCode1). + assertEquals("removed=false must still map to rc=1", 1, rc); + + String out = stdout(); + assertTrue("text output must render -1 as 'unknown (race)', got:\n" + out, + out.contains("vectors_lost=unknown (race)")); + // And it must NOT include the raw -1, which would be visually + // misleading next to the other numeric fields. + assertFalse("text output must not include the raw -1 sentinel, got:\n" + out, + out.contains("vectors_lost=-1")); + // The other fields are still rendered. + assertTrue(out.contains("segment=vidx_race_seg")); + assertTrue(out.contains("removed=false")); + } + + /** + * pr-reviewer follow-up #3 (JSON output): same input as above (server + * returns {@code vectors_lost=-1}). The JSON form keeps the + * {@code vectors_lost} key in the object (so the schema stays stable) + * but emits the string {@code "unknown"} instead of the raw number. + * Wrapper scripts that consume the JSON can distinguish a real count + * from "we don't know" by type-checking the field. + */ + @Test + public void jsonOutputRendersNegativeVectorsLostAsUnknownString() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(false) + .setVectorsLost(-1L) + .setGraphFilePresent(false) + .setStoragePurged(false) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "vectable", + "--index", "vidx", + "--segment", "vidx_race_seg_json", + "--yes", + "--json" + }); + assertEquals("removed=false must still map to rc=1", 1, rc); + + String out = stdout().trim(); + assertTrue("expected JSON object, got:\n" + out, + out.startsWith("{") && out.endsWith("}")); + // The vectors_lost key MUST still be present (stable schema), but as + // a JSON string, not a number. So the substring is the + // quoted-key-and-value form. + assertTrue("JSON must carry vectors_lost as the string \"unknown\", got:\n" + out, + out.contains("\"vectors_lost\":\"unknown\"")); + // And it must NOT carry the raw -1 sentinel. + assertFalse("JSON must not carry the raw -1 vectors_lost value, got:\n" + out, + out.contains("\"vectors_lost\":-1")); + // Other fields still render correctly. + assertTrue(out.contains("\"segment\":\"vidx_race_seg_json\"")); + assertTrue(out.contains("\"removed\":false")); + } + /** * Missing {@code --segment} flag must exit with usage code 2, not * with a stack trace. diff --git a/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java b/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java index adb260041..d7284db3a 100644 --- a/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java +++ b/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java @@ -362,4 +362,20 @@ public void deleteMultipartIndexFile(String tableSpace, String uuid, String file throws DataStorageManagerException { activeDelegate.deleteMultipartIndexFile(tableSpace, uuid, fileType); } + + /** + * Issue #617 + pr-reviewer follow-up #5: delegate the multipart presence + * probe to the {@link #activeDelegate}. The default + * {@link DataStorageManager#multipartIndexFileExists} fallback would still + * work after the 1 GiB sentinel fix in the base class, but it routes the + * probe through {@link #multipartIndexReaderSupplier} and a synthetic + * {@code readInt()} — which is slower and noisier than letting the + * delegate (typically a {@link RemoteFileDataStorageManager} or a + * {@link ReadReplicaDataStorageManager}) handle the probe with its own + * backend-specific shortcut. + */ + @Override + public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) { + return activeDelegate.multipartIndexFileExists(tableSpace, uuid, fileType); + } } diff --git a/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java b/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java index 4fdf351e3..59fcc605a 100644 --- a/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java +++ b/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java @@ -133,6 +133,33 @@ public void deleteMultipartIndexFile(String tableSpace, String uuid, String file throw readOnly("deleteMultipartIndexFile"); } + /** + * Issue #617 + pr-reviewer follow-up #5: same {@code readFileRange} + * shortcut as {@link RemoteFileDataStorageManager#multipartIndexFileExists} + * — the default supplier-based fallback would also work (now that the base + * class uses a 1 GiB sentinel for the synthetic {@code fileSize}), but it + * routes the 4-byte probe through {@link RemoteRandomAccessReader} which + * is heavier than a direct block-range read. A null/empty response means + * block-0 is absent. + */ + @Override + public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) { + String logicalPath = remoteMultipartPath(tableSpace, uuid, fileType); + int blockSize = client.getBlockSize(); + try { + byte[] head = client.readFileRange(logicalPath, 0L, 4, blockSize); + return head != null && head.length > 0; + } catch (RuntimeException e) { + // Some backends wrap NoSuchKey-style errors as unchecked + // exceptions; treat any failure during the probe as "missing" + // per the base contract. + LOGGER.log(Level.FINE, + "multipartIndexFileExists: probe failed for {0}: {1}", + new Object[]{logicalPath, e.getMessage()}); + return false; + } + } + // ------------------------------------------------------------------------- // Page deserialization (matches FileDataStorageManager format) // -------------------------------------------------------------------------