diff --git a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java index 8ff6657c6..a03818605 100644 --- a/herddb-core/src/main/java/herddb/storage/DataStorageManager.java +++ b/herddb-core/src/main/java/herddb/storage/DataStorageManager.java @@ -267,6 +267,63 @@ public abstract io.github.jbellis.jvector.disk.ReaderSupplier multipartIndexRead public abstract void deleteMultipartIndexFile(String tableSpace, String uuid, String fileType) throws DataStorageManagerException; + /** + * Issue #617: best-effort presence check for a multipart index file. + * Used by the operator-facing {@code DeleteSegment} RPC to refuse the + * delete when the graph file IS reachable (warning the operator that + * they may be targeting the wrong segment). + * + *

Default implementation opens the reader supplier with a 1-byte + * fictitious file size and attempts a single 1-byte read at offset 0. + * Success → {@code true}; an {@link IOException} or + * {@link DataStorageManagerException} on either step → {@code false}. + * Backends that can answer the question more cheaply (e.g. an S3 + * {@code HEAD} request) should override. + * + *

The contract is intentionally "best-effort": callers MUST NOT + * use this as a strong existence check, only as a safety gate for + * operator-driven deletes. A {@code false} result combined with a + * {@code --force} override is the supported way to delete a segment. + */ + public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) { + // We do not know the real file size here; pass a sentinel value + // larger than any plausible block-0 footprint plus 4 bytes for the + // probe. Passing a too-small value (e.g. 1L) breaks the default + // multipartIndexReaderSupplier readers because readFully then sees + // available==0 once position reaches the fake totalSize and spins + // (pr-reviewer follow-up #2). Backends that can answer the question + // more cheaply (e.g. an S3 HEAD request, or a wire-level + // readFileRange) should override this method outright — + // RemoteFileDataStorageManager does exactly that. + io.github.jbellis.jvector.disk.ReaderSupplier rs; + try { + // 1 GiB is far larger than any real block-0 in production; the + // probe still only ever reads 4 bytes, so the inflated fileSize + // only affects the reader's internal end-of-file accounting and + // never causes the backend to fetch more bytes than necessary. + rs = multipartIndexReaderSupplier(tableSpace, uuid, fileType, + 1L << 30); + } catch (DataStorageManagerException e) { + return false; + } + try (io.github.jbellis.jvector.disk.RandomAccessReader r = rs.get()) { + r.seek(0L); + r.readInt(); + return true; + // The reader will throw IOException on missing block-0; for backends + // that surface S3 NoSuchKey as an unchecked exception (the issue #617 + // failure mode), the broader RuntimeException catch below absorbs it. + // Both are treated as "file not present" — see the method contract. + } catch (IOException | RuntimeException e) { + // Broad RuntimeException catch is necessary because some object- + // storage backends (S3) wrap NoSuchKeyException in a SdkException + // subclass that is itself a RuntimeException, not an IOException. + // We intentionally treat any error during the probe as "missing", + // consistent with the best-effort contract documented above. + return false; + } + } + /** * Returns {@code true} when this storage manager supports bypassing the gRPC * file-server round-trips for bulk segment-file downloads during recovery. diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java index 8cc000db7..e3ae46148 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceEngine.java @@ -2261,6 +2261,208 @@ public void dropIndexImmediate(String table, String indexName, String requestedU } } + /** + * Outcome of {@link #deleteSegment(String, String, String, boolean, boolean)}. + * Mirrors the wire fields of {@code DeleteSegmentResponse} so the gRPC + * handler is a thin translation layer. + */ + public static final class DeleteSegmentResult { + public final String segment; + public final boolean removed; + public final long vectorsLost; + public final boolean graphFilePresent; + public final boolean storagePurged; + + public DeleteSegmentResult(String segment, boolean removed, long vectorsLost, + boolean graphFilePresent, boolean storagePurged) { + this.segment = segment; + this.removed = removed; + this.vectorsLost = vectorsLost; + this.graphFilePresent = graphFilePresent; + this.storagePurged = storagePurged; + } + } + + /** + * Thrown by {@link #deleteSegment} when the request must be rejected + * without mutating state: the index or store is not loaded, the + * segment is not registered, or the segment's graph file is still + * present in remote storage and {@code force == false}. + */ + public static final class DeleteSegmentException extends RuntimeException { + public DeleteSegmentException(String message) { + super(message); + } + } + + /** + * Issue #617: operator remediation tool. Removes a single segment from + * a {@link PersistentVectorStore}'s in-memory metadata, with optional + * purging of the segment's multipart files in the underlying + * {@link DataStorageManager}. + * + *

Refuses the deletion when the segment's graph file IS reachable + * in remote storage and {@code force == false} — the most likely + * explanation for a reachable graph file is that the operator is + * targeting the wrong segment. The {@code --force} flag (and an + * extra confirmation in the CLI) lets the operator override. + * + *

On a successful in-memory removal, this method re-publishes the + * current {@link IndexingServiceCheckpointState} so shadow replicas + * observe the new (smaller) segment count on their next reload. The + * re-publish is best-effort — if it fails the next regular checkpoint + * will still carry the updated segment list, so shadows converge. + * + * @throws DeleteSegmentException when the request cannot be satisfied + * without mutating state + */ + public DeleteSegmentResult deleteSegment(String table, String indexName, String segmentStorageKey, + boolean purgeStorage, boolean force) { + if (table == null || table.isEmpty()) { + throw new DeleteSegmentException("table is required"); + } + if (indexName == null || indexName.isEmpty()) { + throw new DeleteSegmentException("index is required"); + } + if (segmentStorageKey == null || segmentStorageKey.isEmpty()) { + throw new DeleteSegmentException("segment is required"); + } + AbstractVectorStore store = vectorStores.get(storeKey(table, indexName)); + if (store == null) { + throw new DeleteSegmentException("index " + table + "." + indexName + " is not loaded"); + } + if (!(store instanceof PersistentVectorStore)) { + // pr-reviewer follow-up #6: a ReadOnlyVectorStore means we are + // running as a shadow replica (or have loaded a snapshot in + // read-only mode); the IS-level gate at IndexingServiceImpl + // .deleteSegment normally short-circuits these RPCs before they + // reach the engine, but we keep belt-and-braces here in case a + // future caller bypasses the gRPC layer. + if (store instanceof ReadOnlyVectorStore) { + throw new DeleteSegmentException( + "index " + table + "." + indexName + + ": this instance is a shadow replica — target the primary" + + " indexing service"); + } + throw new DeleteSegmentException( + "index " + table + "." + indexName + " is non-persistent (" + + store.getClass().getSimpleName() + "); has no on-disk segments"); + } + PersistentVectorStore pvs = (PersistentVectorStore) store; + + // Presence check (informational + safety gate). + java.util.List keys = pvs.getSegmentStorageKeysSnapshot(); + if (!keys.contains(segmentStorageKey)) { + throw new DeleteSegmentException( + "segment " + segmentStorageKey + " is not registered in index " + + table + "." + indexName + "; currently loaded segments: " + keys); + } + + // MinIO HEAD-equivalent — best effort. We deliberately read the + // tablespace UUID from the engine rather than from the request because + // the IS is single-tablespace per instance. + boolean graphPresent = false; + if (dataStorageManager != null) { + graphPresent = dataStorageManager.multipartIndexFileExists( + tableSpaceUUID, segmentStorageKey, "graph"); + } + if (graphPresent && !force) { + throw new DeleteSegmentException( + "refusing to delete segment " + segmentStorageKey + + ": graph file IS reachable in remote storage. " + + "Re-run with force=true if you are sure this is the right segment " + + "(see issue #617)."); + } + + // Audit-level log BEFORE the mutation so a crash mid-delete leaves a + // forensic trace in the IS log. + LOGGER.log(Level.SEVERE, + "deleteSegment: operator-initiated removal of segment {0} from {1}.{2}" + + " (graph_file_present={3}, force={4}, purge_storage={5})" + + " — issue #617", + new Object[]{segmentStorageKey, table, indexName, + graphPresent, force, purgeStorage}); + + // pr-reviewer follow-up #1: test-only hook fired AFTER the engine has + // taken its snapshot+probe but BEFORE the engine's own drop call. A + // test can use this to simulate a concurrent compaction (or another + // operator-driven drop) racing the engine, and then assert that the + // engine reports the resulting "segment disappeared" race path + // correctly (removed=false, vectors_lost=-1). Strictly test-only, + // package-private — production callers never set this. + Runnable preDropHook = preDropRaceHookForTests; + if (preDropHook != null) { + preDropHook.run(); + } + + AbstractVectorStore.SegmentDropResult drop = pvs.dropSegmentByStorageKey(segmentStorageKey); + if (!drop.removed) { + // Race: a concurrent compaction swap removed the segment between + // our snapshot and the drop. Treat as a no-op — the operator's + // intent (segment gone) has been satisfied, but we cannot compute + // the vectors_lost count because the segment handle is no longer + // accessible. Surface -1L per the proto contract so operators can + // distinguish "removed 0 vectors" from "did not remove anything + // and cannot tell what would have been lost" + // (pr-reviewer follow-up #5). + LOGGER.log(Level.WARNING, + "deleteSegment: segment {0} disappeared between snapshot and drop" + + " (concurrent compaction swap?); reporting no-op with" + + " vectors_lost=-1 (race path)", + segmentStorageKey); + return new DeleteSegmentResult(segmentStorageKey, false, -1L, graphPresent, false); + } + + boolean storagePurged = false; + if (purgeStorage && dataStorageManager != null) { + // Best-effort: failures are logged but do not undo the in-memory + // removal. The operator can re-run with purge_storage=true to + // retry the file deletion if needed (the underlying call is + // idempotent). + try { + dataStorageManager.deleteMultipartIndexFile(tableSpaceUUID, segmentStorageKey, "graph"); + dataStorageManager.deleteMultipartIndexFile(tableSpaceUUID, segmentStorageKey, "map"); + storagePurged = true; + } catch (herddb.storage.DataStorageManagerException e) { + LOGGER.log(Level.WARNING, + "deleteSegment: in-memory removal of " + segmentStorageKey + + " succeeded but multipart file purge failed; " + + "operator may need to clean up storage manually", + e); + } + } + + // Trigger a checkpoint so the new (smaller) segment list is + // serialised to the on-disk IndexStatus AND the corresponding + // IndexingServiceCheckpointState is republished to ZK. Without + // the checkpoint a shadow reload would still see the deleted + // segment in the IndexStatus and fail with "multipart file not + // found" when it tries to mmap the purged map file. The + // dropSegmentByStorageKey path marks the store dirty so the + // checkpoint will actually serialise. + // + // forceCheckpointAndSaveWatermark also calls + // publishCheckpointStateBestEffort internally, so shadows are + // notified as part of the same write. + try { + forceCheckpointAndSaveWatermark(); + } catch (RuntimeException e) { + // Checkpoint failure must not undo the in-memory removal — + // a subsequent regular checkpoint (or another delete-segment + // call) will eventually serialise the reduced segment list. + // Logged at WARNING so operators can correlate with the + // SEVERE audit line emitted above. + LOGGER.log(Level.WARNING, + "deleteSegment: post-delete checkpoint failed; shadows may" + + " not observe the new segment count until the" + + " next regular checkpoint", + e); + } + + return new DeleteSegmentResult( + segmentStorageKey, true, drop.vectorsLost, graphPresent, storagePurged); + } + private static boolean isDmlType(short type) { return type == LogEntryType.INSERT || type == LogEntryType.UPDATE @@ -3422,6 +3624,27 @@ void setWarmupPauseHookForTest(Runnable hook) { this.warmupPauseHookForTest = hook; } + /** + * pr-reviewer follow-up #1 (issue #617): test-only hook fired inside + * {@link #deleteSegment} AFTER the engine has snapshotted the segment + * list and probed the multipart graph, but BEFORE the engine calls + * {@link PersistentVectorStore#dropSegmentByStorageKey} itself. Tests + * use this to drop the same segment from a parallel actor and then + * assert that {@code deleteSegment} reports the "segment disappeared + * between snapshot and drop" race path (removed=false, vectors_lost=-1). + * Strictly test-only — production never sets this. + */ + private volatile Runnable preDropRaceHookForTests = null; + + /** + * Installs (or clears) the pre-drop race hook above. Package-private: + * production code never calls this. + */ + // package-private for testing + void setPreDropRaceHookForTests(Runnable hook) { + this.preDropRaceHookForTests = hook; + } + /** * Body of the async warmup task: iterates the snapshot of persistent * stores and calls {@link PersistentVectorStore#warmUpBlockCache} on each. @@ -3790,6 +4013,33 @@ public long getShadowReloadCount() { return shadowReloadCount.get(); } + /** + * Test-only accessor: returns the segment count of the loaded vector + * store for {@code table.indexName} regardless of whether it is a + * {@link PersistentVectorStore} (primary) or a + * {@link herddb.indexing.vector.ReadOnlyVectorStore} (shadow). + * Returns {@code -1} when the store is not loaded. + * + *

Added in pr-reviewer follow-up #4 for issue #617 so the + * {@code ShadowDeleteSegmentE2ETest.lateBootShadowObservesPostDeleteState} + * case can assert that a shadow booted AFTER a primary-side delete + * loads the smaller (post-delete) segment count, without having to + * unwrap the vector store map directly. + */ + public int getSegmentCountForTest(String table, String indexName) { + AbstractVectorStore store = vectorStores.get(storeKey(table, indexName)); + if (store == null) { + return -1; + } + if (store instanceof PersistentVectorStore) { + return ((PersistentVectorStore) store).getSegmentCount(); + } + if (store instanceof ReadOnlyVectorStore) { + return ((ReadOnlyVectorStore) store).getSegmentCount(); + } + return -1; + } + /** * Minimum {@code IndexStatus.generation} currently loaded across * every vector store this engine holds. Used by the retention diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java index 4fc269e0a..4dba6edb3 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/IndexingServiceImpl.java @@ -23,6 +23,8 @@ import com.google.protobuf.ByteString; import herddb.indexing.proto.DescribeIndexRequest; import herddb.indexing.proto.DescribeIndexResponse; +import herddb.indexing.proto.DeleteSegmentRequest; +import herddb.indexing.proto.DeleteSegmentResponse; import herddb.indexing.proto.DropIndexRequest; import herddb.indexing.proto.DropIndexResponse; import herddb.indexing.proto.GetEngineStatsRequest; @@ -679,6 +681,76 @@ public void dropIndex(DropIndexRequest request, } } + /** + * Issue #617: operator remediation tool. Removes a single corrupted + * segment from the IS in-memory metadata, with optional purge of its + * multipart files in the underlying {@link herddb.storage.DataStorageManager}. + * + *

Maps engine-level outcomes to gRPC status codes: + *

+ */ + @Override + public void deleteSegment(DeleteSegmentRequest request, + StreamObserver responseObserver) { + String table = request.getTable(); + String indexName = request.getIndex(); + String segment = request.getSegment(); + LOGGER.log(Level.WARNING, + "DeleteSegment RPC: index={0} table={1} tablespace={2} segment={3}" + + " purge_storage={4} force={5} (issue #617)", + new Object[]{indexName, table, request.getTablespace(), segment, + request.getPurgeStorage(), request.getForce()}); + // pr-reviewer follow-up #1: refuse the RPC up-front when this instance + // is configured as a shadow replica. Shadow instances mirror their + // primary's segment list via reloadFromIndexStatus(); accepting a + // delete-segment write on a shadow would silently diverge the two + // copies and the next reload would just put the segment back. The + // operator must target the primary instead. + if (engine.isConfiguredAsShadow()) { + String desc = "not_primary: this indexing-service instance is a shadow replica " + + "(shadowOf=" + engine.getShadowOfOrMinusOne() + + "); target the primary instead"; + LOGGER.log(Level.WARNING, + "DeleteSegment RPC rejected on shadow instance: {0}", desc); + responseObserver.onError(Status.FAILED_PRECONDITION + .withDescription(desc) + .asRuntimeException()); + return; + } + try { + IndexingServiceEngine.DeleteSegmentResult result = engine.deleteSegment( + table, indexName, segment, + request.getPurgeStorage(), request.getForce()); + responseObserver.onNext(DeleteSegmentResponse.newBuilder() + .setSegment(result.segment) + .setRemoved(result.removed) + .setVectorsLost(result.vectorsLost) + .setGraphFilePresent(result.graphFilePresent) + .setStoragePurged(result.storagePurged) + .build()); + responseObserver.onCompleted(); + } catch (IndexingServiceEngine.DeleteSegmentException e) { + LOGGER.log(Level.WARNING, + "DeleteSegment RPC rejected for segment " + segment + ": " + e.getMessage()); + responseObserver.onError(Status.FAILED_PRECONDITION + .withDescription(e.getMessage()) + .asRuntimeException()); + } catch (RuntimeException e) { + LOGGER.log(Level.SEVERE, "DeleteSegment RPC failed for segment " + segment, e); + responseObserver.onError(Status.INTERNAL + .withDescription(e.getMessage()) + .withCause(e) + .asRuntimeException()); + } + } + /** * Push-based indexing (TESTING ONLY). Deserializes each pushed * {@code LogEntry} and enqueues it into the {@link PushCommitLogTailer}'s diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java index 3faa8b515..4b0eb3492 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminCli.java @@ -21,6 +21,7 @@ package herddb.indexing.admin; import com.google.protobuf.ByteString; +import herddb.indexing.proto.DeleteSegmentResponse; import herddb.indexing.proto.DescribeIndexResponse; import herddb.indexing.proto.GetEngineStatsResponse; import herddb.indexing.proto.GetIndexStatusResponse; @@ -84,6 +85,7 @@ public final class IndexingAdminCli { static final String COMMAND_LIST_SHADOWS = "list-shadows"; static final String COMMAND_SHADOW_STATUS = "shadow-status"; static final String COMMAND_WAIT_SHADOW = "wait-shadow"; + static final String COMMAND_DELETE_SEGMENT = "delete-segment"; private final PrintStream out; private final PrintStream err; @@ -131,6 +133,8 @@ public int run(String[] args) { return runShadowStatus(rest); case COMMAND_WAIT_SHADOW: return runWaitShadow(rest); + case COMMAND_DELETE_SEGMENT: + return runDeleteSegment(rest); default: err.println("Unknown command: " + command); printUsage(); @@ -162,6 +166,7 @@ private void printUsage() { out.println(" list-shadows Read ZooKeeper and print registered shadow replicas"); out.println(" shadow-status Print shadow-replica catch-up state of one instance"); out.println(" wait-shadow Block until an instance has caught up to a target LSN"); + out.println(" delete-segment Remove a corrupted segment from an index's IS metadata (#617)"); out.println(); out.println("Run 'indexing-admin --help' for command-specific flags."); } @@ -779,6 +784,115 @@ private int runWaitShadow(String[] args) throws Exception { } } + /** + * Issue #617: {@code delete-segment} command. Runs against a single IS + * instance and removes one segment from its in-memory metadata. + * + *

Safety: the CLI prompts on stdin (unless {@code --yes} is given) + * before submitting the RPC, and again when the operator supplies + * {@code --force} (which overrides the server-side refusal to delete a + * segment whose graph file IS reachable in storage). The default exit + * code is {@code 0} only when the server reports {@code removed=true}; + * a presence-only "no-op" response returns {@code 1} so wrapper scripts + * can distinguish "I successfully removed it" from "it was already gone". + */ + private int runDeleteSegment(String[] args) throws Exception { + Options opts = new Options(); + addCommonOptions(opts); + addServerOption(opts); + addIndexOptions(opts); + opts.addOption(Option.builder().longOpt("segment").hasArg().argName("NAME") + .desc("segment storage key to remove (required, e.g. vidx__seg627)") + .required().build()); + opts.addOption(Option.builder().longOpt("purge-storage") + .desc("also delete the segment's multipart graph + map files from storage").build()); + opts.addOption(Option.builder().longOpt("force") + .desc("delete even when the graph file IS reachable in remote storage " + + "(extra confirmation required unless --yes is set)").build()); + opts.addOption(Option.builder().longOpt("yes") + .desc("skip interactive confirmation prompts (use in scripts)").build()); + + CommandLine cli = parse(opts, args, COMMAND_DELETE_SEGMENT); + if (cli == null) { + return 0; + } + String segment = cli.getOptionValue("segment"); + boolean purge = cli.hasOption("purge-storage"); + boolean force = cli.hasOption("force"); + boolean yes = cli.hasOption("yes"); + + if (!yes) { + if (force) { + err.println("WARNING: --force will delete segment " + segment + + " even if its graph file is reachable in storage."); + err.println("This usually means you are targeting the wrong segment."); + } + err.println("About to delete segment '" + segment + "' from " + + cli.getOptionValue("table") + "." + cli.getOptionValue("index") + + " on " + cli.getOptionValue("server") + + (purge ? " (and purge multipart files from storage)" : "") + + ". Continue? [y/N] "); + err.flush(); + // Read from System.in directly rather than capture it via a CLI + // option: the CLI's stdin is exactly what the operator typed. + int ch = -1; + try { + ch = System.in.read(); + } catch (java.io.IOException e) { + err.println("aborting: stdin not available (" + e + "); re-run with --yes"); + return 1; + } + if (ch != 'y' && ch != 'Y') { + err.println("aborted by user"); + return 1; + } + } + + try (IndexingAdminClient client = buildClient(cli)) { + DeleteSegmentResponse resp = client.deleteSegment( + cli.getOptionValue("tablespace"), + cli.getOptionValue("table"), + cli.getOptionValue("index"), + segment, purge, force); + // pr-reviewer follow-up #3: the engine emits vectors_lost=-1 on the + // race path (segment dropped by a concurrent compaction between the + // engine's snapshot and the engine's drop call — the count is + // genuinely unknown, NOT zero). Render the sentinel as the explicit + // "unknown" string in both text and JSON so wrapper scripts don't + // mistake it for a real zero. We keep the field present in JSON + // (rather than omitting it) so the schema is stable; the field + // type widens from number to "number | \"unknown\"". + boolean vectorsLostUnknown = resp.getVectorsLost() < 0L; + if (cli.hasOption("json")) { + Map m = new LinkedHashMap<>(); + m.put("segment", resp.getSegment()); + m.put("removed", resp.getRemoved()); + if (vectorsLostUnknown) { + m.put("vectors_lost", "unknown"); + } else { + m.put("vectors_lost", resp.getVectorsLost()); + } + m.put("graph_file_present", resp.getGraphFilePresent()); + m.put("storage_purged", resp.getStoragePurged()); + out.println(JsonWriter.toJson(m)); + } else { + if (vectorsLostUnknown) { + out.printf(Locale.ROOT, + "segment=%s removed=%s vectors_lost=unknown (race) " + + "graph_file_present=%s storage_purged=%s%n", + resp.getSegment(), resp.getRemoved(), + resp.getGraphFilePresent(), resp.getStoragePurged()); + } else { + out.printf(Locale.ROOT, + "segment=%s removed=%s vectors_lost=%d graph_file_present=%s storage_purged=%s%n", + resp.getSegment(), resp.getRemoved(), resp.getVectorsLost(), + resp.getGraphFilePresent(), resp.getStoragePurged()); + } + } + return resp.getRemoved() ? 0 : 1; + } + } + /** * Returns the time-lag in milliseconds for a given LogEntry timestamp, or * {@code -1} when {@code timestampMillis == 0} ("unknown" — no entries diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java index bb4811f2f..81be77d2e 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/admin/IndexingAdminClient.java @@ -20,6 +20,8 @@ package herddb.indexing.admin; +import herddb.indexing.proto.DeleteSegmentRequest; +import herddb.indexing.proto.DeleteSegmentResponse; import herddb.indexing.proto.DescribeIndexRequest; import herddb.indexing.proto.DescribeIndexResponse; import herddb.indexing.proto.GetEngineStatsRequest; @@ -115,6 +117,22 @@ public GetShadowStatusResponse getShadowStatus() { return stub().getShadowStatus(GetShadowStatusRequest.newBuilder().build()); } + /** + * Issue #617: operator remediation tool. Removes a single segment from + * the IS in-memory metadata, optionally purging its multipart files. + */ + public DeleteSegmentResponse deleteSegment(String tablespace, String table, String index, + String segment, boolean purgeStorage, boolean force) { + return stub().deleteSegment(DeleteSegmentRequest.newBuilder() + .setTablespace(tablespace == null ? "" : tablespace) + .setTable(table == null ? "" : table) + .setIndex(index == null ? "" : index) + .setSegment(segment == null ? "" : segment) + .setPurgeStorage(purgeStorage) + .setForce(force) + .build()); + } + public WaitForCheckpointResponse waitForCheckpoint(String tablespace, String table, String index, long targetLedgerId, long targetOffset, long waitTimeoutMs) { diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java b/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java index bcc74cebd..21d91c715 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/vector/AbstractVectorStore.java @@ -178,6 +178,57 @@ public void dropSegmentByUuid(String segmentUuid) { // No-op for non-persistent stores. } + /** + * Result of a {@link #dropSegmentByStorageKey(String)} call. + * + *

Used by the operator-facing {@code DeleteSegment} RPC (issue #617) + * to report whether the segment was found and, when it was, the number + * of live vectors that were lost as part of the removal — both as a + * sanity check ("did I just delete a populated segment?") and as the + * value the IS surfaces back to the {@code indexing-admin delete-segment} + * CLI for the operator's audit log. + */ + public static final class SegmentDropResult { + public final boolean removed; + public final long vectorsLost; + + public SegmentDropResult(boolean removed, long vectorsLost) { + this.removed = removed; + this.vectorsLost = vectorsLost; + } + + public static final SegmentDropResult NOT_FOUND = new SegmentDropResult(false, 0L); + } + + /** + * Removes a segment from the active list by its multipart storage key + * — i.e. the value returned by {@code PersistentVectorStore.segmentStorageKey} + * (legacy {@code indexUUID + "_seg" + segmentId} or, for adopted segments, + * the explicit {@code externalStorageKey}). Used by the operator-facing + * {@code DeleteSegment} RPC (issue #617) to remove a corrupted segment + * whose Phase B upload failed mid-flight, leaving it registered in IS + * metadata without a fully-written graph file in remote storage. + * + *

Idempotent: returns {@link SegmentDropResult#NOT_FOUND} when no + * segment with the given storage key is currently loaded. + * + *

Does not delete the segment's multipart files from the + * underlying storage manager — that is the responsibility of the + * caller (the {@code DeleteSegment} RPC handler), which only purges + * remote files when explicitly requested via {@code purge_storage=true}. + * + *

The default implementation is a no-op. + * {@link herddb.indexing.vector.PersistentVectorStore} overrides this. + * + * @param storageKey the segment's multipart storage key to remove + * @return a {@link SegmentDropResult} describing whether the segment + * was removed and how many live vectors were lost + */ + public SegmentDropResult dropSegmentByStorageKey(String storageKey) { + // No-op for non-persistent stores. + return SegmentDropResult.NOT_FOUND; + } + /** * Reconciles adopted (externally-produced) segments against the ZK-reported * snapshot. Any segment with a non-null external storage key whose UUID is diff --git a/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java b/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java index 6e4eecc0a..cf8130a17 100644 --- a/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java +++ b/herddb-indexing-service/src/main/java/herddb/indexing/vector/PersistentVectorStore.java @@ -3691,6 +3691,17 @@ public String getOnDiskSegmentExternalStorageKeyForTest(int idx) { return segments.get(idx).externalStorageKey; } + /** + * Test-only accessor for the deferred-close queue used by + * {@link #dropSegmentByUuid} and {@link #dropSegmentByStorageKey}. + * Returns the current number of queued (not-yet-closed) segments. + * Pr-reviewer follow-up #3 for issue #617 uses this to assert the + * queue eventually drains after the operator-driven drop. + */ + public int getPendingSegmentClosesSizeForTest() { + return pendingSegmentCloses.size(); + } + VectorSegment preloadCompactedSegment(SegmentWriteResult swr) throws IOException, DataStorageManagerException { VectorSegment seg = new VectorSegment(swr.segmentId); seg.segmentUuid = swr.segmentUuid; @@ -3930,6 +3941,64 @@ public void dropSegmentByUuid(String segmentUuid) { } } + /** + * Issue #617: removes a segment from the active list by its + * multipart storage key. Shares the same lock discipline and + * deferred-close machinery as {@link #dropSegmentByUuid}. + * + *

Matching is by {@link #segmentStorageKey(VectorSegment)}, so this + * method works for both IS-locally-produced segments (legacy key + * {@code indexUUID + "_seg" + segmentId}) and adopted ones + * ({@code externalStorageKey}). + * + *

Idempotent: when no segment matches, returns + * {@link SegmentDropResult#NOT_FOUND} without mutating state. + */ + @Override + public SegmentDropResult dropSegmentByStorageKey(String storageKey) { + if (storageKey == null || storageKey.isEmpty()) { + return SegmentDropResult.NOT_FOUND; + } + try { + VectorSegment found = null; + stateLock.writeLock().lock(); + try { + List newList = new java.util.concurrent.CopyOnWriteArrayList<>(); + for (VectorSegment s : segments) { + if (found == null && storageKey.equals(segmentStorageKey(s))) { + found = s; + } else { + newList.add(s); + } + } + if (found == null) { + return SegmentDropResult.NOT_FOUND; + } + segments = newList; + // Mirror dropSegmentByUuid: mark dirty so the next checkpoint + // serialises the reduced segment list. Without this the + // checkpoint gate would skip and leave a stale entry on disk. + dirty.set(true); + long vectorsLost = found.liveCount.get(); + LOGGER.log(Level.SEVERE, + "dropSegmentByStorageKey: operator removed segment {0} from" + + " store {1} (vectors_lost={2}); total on-disk" + + " segments now {3}", + new Object[]{storageKey, indexName, vectorsLost, segments.size()}); + // Same deferred-close protocol as dropSegmentByUuid (issue #535): + // a concurrent compaction cycle may still hold a reference to + // `found.onDiskGraph`; queue the close so it runs after the + // cycle releases its compactionLock. + pendingSegmentCloses.add(found); + return new SegmentDropResult(true, vectorsLost); + } finally { + stateLock.writeLock().unlock(); + } + } finally { + drainPendingSegmentClosesOpportunistically(); + } + } + /** * Reconciles the store's adopted (externally-produced) segment set against * the ZK-reported ownership snapshot. Any segment that was adopted (i.e. @@ -8650,6 +8719,30 @@ public int getOnDiskNodeCount() { return (int) onDiskNodeToPkSize(); } + /** + * Issue #617: snapshot of the multipart storage keys of every on-disk + * segment currently registered in this store. Used by the operator- + * facing {@code DeleteSegment} RPC handler to enumerate candidates + * (via {@code describe-index --json}'s {@code corrupted_segments} list) + * and to verify presence before mutating state. + * + *

The returned list is a defensive copy and is stable across + * concurrent {@link #dropSegmentByStorageKey} / compaction swaps. It + * may legitimately contain duplicate-looking entries on a transient + * adoption race: callers MUST treat the result as a snapshot and + * re-read it if they need a fresh view. + */ + public java.util.List getSegmentStorageKeysSnapshot() { + // CopyOnWriteArrayList iterator is snapshot-safe; no readLock needed + // for the enumeration itself, only for any subsequent mutation that + // wants to act on the snapshot atomically (that is the caller's job). + java.util.List out = new java.util.ArrayList<>(segments.size()); + for (VectorSegment s : segments) { + out.add(segmentStorageKey(s)); + } + return out; + } + /** * Returns the current value of the global monotonic node-id counter. * Exposed for telemetry — dashboards can watch the burn rate and diff --git a/herddb-indexing-service/src/main/proto/indexing_service.proto b/herddb-indexing-service/src/main/proto/indexing_service.proto index 3c6079c33..5c0b784a9 100644 --- a/herddb-indexing-service/src/main/proto/indexing_service.proto +++ b/herddb-indexing-service/src/main/proto/indexing_service.proto @@ -51,6 +51,20 @@ service IndexingService { // The commit-log tailer path remains the authoritative cleanup fallback. rpc DropIndex (DropIndexRequest) returns (DropIndexResponse); + // Issue #617: operator remediation tool. Removes a single corrupted / + // partially-written segment from the IS in-memory metadata so the + // compaction loop stops re-selecting it after a Phase B upload failure + // left the segment registered without a fully-written graph file. + // + // SAFETY: by default the IS refuses to delete a segment whose graph file + // IS reachable in remote storage (operator may be targeting the wrong + // segment). Set force=true to override. When purge_storage=true the IS + // also deletes the segment's multipart graph + map files from the + // underlying DataStorageManager. After a successful deletion the IS + // re-publishes its IndexingServiceCheckpointState so shadow replicas + // observe the new (smaller) segment set on their next reload. + rpc DeleteSegment (DeleteSegmentRequest) returns (DeleteSegmentResponse); + // Push-based indexing (TESTING ONLY). When the indexing service runs with // indexing.log.type=push it does NOT tail a file/BookKeeper commit log; // instead a client serializes herddb.log.LogEntry objects and pushes them @@ -363,6 +377,58 @@ message DropIndexRequest { message DropIndexResponse { } +// Issue #617: operator remediation tool — see rpc DeleteSegment above. +message DeleteSegmentRequest { + // Tablespace name. Currently informational only (the IS resolves the + // store by (table, index)); kept for symmetry with the other admin RPCs + // and so future multi-tablespace IS instances do not need a proto bump. + string tablespace = 1; + string table = 2; + string index = 3; + // The segment's multipart storage key (e.g. "vidx__seg627"). + // This is the value reported by describe-index in the + // corrupted_segments list (issue #616) and the value the IS uses to + // address segment files in remote storage. + string segment = 4; + // Whether to delete the segment's multipart graph + map files from the + // underlying DataStorageManager after removing it from in-memory state. + // When false, the segment is unlinked from the IS but its files remain + // (useful when an operator wants to preserve evidence of corruption for + // post-mortem analysis). + bool purge_storage = 5; + // Whether to delete the segment even when its graph file IS reachable + // in remote storage. Set to true only after careful verification — a + // reachable graph file usually means the operator is targeting the + // wrong segment. + bool force = 6; +} + +message DeleteSegmentResponse { + // The storage key of the segment that was removed (echoes + // DeleteSegmentRequest.segment). + string segment = 1; + // Whether the segment was found in the IS metadata and removed. + bool removed = 2; + // Number of live vectors lost as a result of the removal. -1 when + // the IS could not compute the number — currently emitted only on the + // race path where a concurrent compaction swap removed the segment + // between the operator's snapshot and the engine's drop (the segment + // handle is gone, so its liveCount is no longer accessible). In every + // other no-op case (segment never matched, store not loaded) the engine + // throws DeleteSegmentException → FAILED_PRECONDITION instead of + // returning a response with vectors_lost=-1. + int64 vectors_lost = 3; + // Whether the segment's graph file was reachable in remote storage at + // the moment the IS checked. Informational — operators use this to + // decide whether to retry with --force or whether they were targeting + // the wrong segment. + bool graph_file_present = 4; + // Whether the IS attempted to delete the segment's multipart files from + // remote storage (only true when the request set purge_storage=true and + // the in-memory removal succeeded). + bool storage_purged = 5; +} + // ---- Push-based indexing RPC (testing only — see rpc PushEntries) ---- // One commit-log entry to push, with its client-assigned LSN. diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java new file mode 100644 index 000000000..d76cdb224 --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/IndexingServiceEngineDeleteSegmentEdgeCasesTest.java @@ -0,0 +1,340 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package herddb.indexing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import herddb.codec.RecordSerializer; +import herddb.core.MemoryManager; +import herddb.indexing.vector.AbstractVectorStore; +import herddb.indexing.vector.PersistentVectorStore; +import herddb.indexing.vector.ReadOnlyVectorStore; +import herddb.log.LogEntry; +import herddb.log.LogEntryFactory; +import herddb.log.LogSequenceNumber; +import herddb.mem.MemoryDataStorageManager; +import herddb.model.ColumnTypes; +import herddb.model.Index; +import herddb.model.Record; +import herddb.model.Table; +import herddb.model.TableSpace; +import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import java.nio.file.Path; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Issue #617 + pr-reviewer follow-ups #1 and #2: engine-level edge cases for + * {@link IndexingServiceEngine#deleteSegment} that the CLI/gRPC tests in + * {@code herddb.indexing.admin.IndexingAdminCliDeleteSegmentTest} and the + * full E2E test in {@link ShadowDeleteSegmentE2ETest} do not cover. + * + *

    + *
  1. Race-path test (follow-up #1): drive the engine into the + * branch where {@link PersistentVectorStore#dropSegmentByStorageKey} + * returns {@code removed=false} after the engine has already + * snapshotted the storage key. We inject a parallel drop via a + * test-only hook that fires inside {@code deleteSegment} between the + * engine's snapshot/probe and the engine's own drop call. Assert that + * the engine reports the documented sentinel: + * {@code vectorsLost == -1L && removed == false}.
  2. + *
  3. Shadow / read-only rejection (follow-up #2): install a + * {@link ReadOnlyVectorStore} directly into the engine's + * {@code vectorStores} map (bypassing the gRPC layer entirely) and + * call {@link IndexingServiceEngine#deleteSegment} on it. The + * production gate at + * {@link IndexingServiceImpl#deleteSegment(herddb.indexing.proto.DeleteSegmentRequest, + * io.grpc.stub.StreamObserver)} short-circuits these RPCs at the + * service boundary, so this engine-level belt-and-braces refusal is + * reachable only by tests that drive the engine directly. The + * exception message must call out both "shadow replica" and "primary" + * so an operator who somehow reaches this branch (e.g. a future + * caller that bypasses the gRPC layer) gets actionable diagnostics. + *
+ */ +public class IndexingServiceEngineDeleteSegmentEdgeCasesTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private int savedMinLive; + + @Before + public void setUp() { + savedMinLive = PersistentVectorStore.minLiveVectorsForCheckpoint; + PersistentVectorStore.minLiveVectorsForCheckpoint = 0; + } + + @After + public void tearDown() { + PersistentVectorStore.minLiveVectorsForCheckpoint = savedMinLive; + } + + private static Table createTable() { + return Table.builder() + .name("vectable") + .tablespace(TableSpace.DEFAULT) + .column("pk", ColumnTypes.STRING) + .column("vec", ColumnTypes.FLOATARRAY) + .primaryKey("pk") + .build(); + } + + private static Index createIndex(String uuid) { + return Index.builder() + .name("vidx").uuid(uuid) + .table("vectable") + .type(Index.TYPE_VECTOR) + .column("vec", ColumnTypes.FLOATARRAY) + .build(); + } + + private static float[] randomVector(Random rng, int dim) { + float[] v = new float[dim]; + for (int i = 0; i < dim; i++) { + v[i] = rng.nextFloat(); + } + return v; + } + + private IndexingServiceEngine newEngine(MemoryDataStorageManager dsm, MemoryManager mm, + final String stableUuid, + final AtomicReference storeRef) throws Exception { + Path logDir = folder.newFolder().toPath(); + Path dataDir = folder.newFolder().toPath(); + Properties props = new Properties(); + props.setProperty(IndexingServerConfiguration.PROPERTY_STORAGE_TYPE, "memory"); + props.setProperty(IndexingServerConfiguration.PROPERTY_TABLESPACE_NAME, TableSpace.DEFAULT); + IndexingServerConfiguration config = new IndexingServerConfiguration(props); + final IndexingServiceEngine engine = new IndexingServiceEngine(logDir, dataDir, config); + + herddb.mem.MemoryMetadataStorageManager memMeta = new herddb.mem.MemoryMetadataStorageManager(); + memMeta.start(); + memMeta.ensureDefaultTableSpace("local", "local", 0, 1); + engine.setMetadataStorageManager(memMeta); + engine.setDataStorageManager(dsm); + engine.setMemoryManager(mm); + engine.setVectorStoreFactory((indexName, tableName, vectorColumnName, dataDirArg, indexProperties) -> { + PersistentVectorStore pvs = new PersistentVectorStore( + indexName, tableName, engine.getTableSpaceUUID(), vectorColumnName, + stableUuid, dataDirArg, dsm, mm, + 16, 100, 1.2f, 1.4f, true, 2_000_000_000L, 0, + Long.MAX_VALUE, + VectorSimilarityFunction.EUCLIDEAN); + try { + pvs.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (storeRef != null) { + storeRef.set(pvs); + } + return pvs; + }); + return engine; + } + + /** + * pr-reviewer follow-up #1: race path between the engine's snapshot and + * the engine's drop. The pre-drop hook removes the same segment from a + * parallel actor (here a direct {@code dropSegmentByStorageKey} call on + * the store, simulating a concurrent compaction swap that finished + * between the engine's two reads). The engine's own drop then sees the + * segment gone and must return the documented race-sentinel. + */ + @Test(timeout = 60_000) + public void engineReportsRaceSentinelWhenSegmentDroppedConcurrently() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-race"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine engine = newEngine(dsm, mm, stableUuid, primaryStore); + engine.start(); + try { + Table t = createTable(); + Index idx = createIndex(stableUuid); + engine.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + engine.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + Random rng = new Random(617); + int dim = 12; + LogSequenceNumber last = null; + // Two checkpoints → two segments, so even after the "concurrent" + // drop the snapshot list (taken before the hook) still contained + // the victim — without that, the up-front presence-check inside + // deleteSegment would throw DeleteSegmentException ("not + // registered") and we would never reach the race branch. + for (int batch = 0; batch < 2; batch++) { + for (int i = 0; i < 40; i++) { + Record r = RecordSerializer.makeRecord(t, + "pk", "k" + batch + "_" + i, "vec", randomVector(rng, dim)); + LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null); + last = new LogSequenceNumber(1, 100 + batch * 100 + i); + engine.applySingleEntryForTest(last, ins); + } + engine.awaitPendingWorkForTest(); + engine.setLastProcessedLsnForTest(last); + engine.forceCheckpointAndSaveWatermark(); + } + + final PersistentVectorStore pvs = primaryStore.get(); + assertNotNull("engine must have created the persistent store", pvs); + List keys = pvs.getSegmentStorageKeysSnapshot(); + assertTrue("test setup must produce ≥2 segments, got " + keys.size(), + keys.size() >= 2); + final String victim = keys.get(0); + + // Pre-drop race hook: fires inside engine.deleteSegment AFTER the + // snapshot+probe but BEFORE the engine's own drop. Removes the + // victim out from under the engine. The hook itself does NOT use + // a separate thread (the deferred-close protocol around + // dropSegmentByStorageKey is reentrant-safe) — the important + // invariant is just that the segment is gone by the time the + // engine's drop runs. + final AtomicReference sideDropRef = + new AtomicReference<>(); + engine.setPreDropRaceHookForTests(() -> { + AbstractVectorStore.SegmentDropResult r = pvs.dropSegmentByStorageKey(victim); + sideDropRef.set(r); + }); + + IndexingServiceEngine.DeleteSegmentResult result = engine.deleteSegment( + "vectable", "vidx", victim, + /* purgeStorage */ false, /* force */ true); + + // Pre-condition: the side-channel drop must have actually + // removed the segment, otherwise the race branch would not be + // exercised. + AbstractVectorStore.SegmentDropResult sideDrop = sideDropRef.get(); + assertNotNull("pre-drop hook must have fired", sideDrop); + assertTrue("pre-drop hook must have removed the segment", + sideDrop.removed); + + // The race-path documented contract: removed=false AND + // vectorsLost == -1L (sentinel for "unknown — cannot compute the + // count because the segment handle is no longer accessible"). + assertFalse("engine must report removed=false on the race path", + result.removed); + assertEquals("engine must surface vectors_lost=-1 on the race path", + -1L, result.vectorsLost); + assertEquals("response segment name must match the request", + victim, result.segment); + assertFalse("storage_purged must be false on the race path " + + "(engine bails out before touching storage)", + result.storagePurged); + + // And the segment is gone from the active list (the side-channel + // drop removed it). + assertFalse("victim must be gone from the active segment list", + pvs.getSegmentStorageKeysSnapshot().contains(victim)); + } finally { + engine.setPreDropRaceHookForTests(null); + engine.close(); + } + } + + /** + * pr-reviewer follow-up #2: engine-only refusal when the configured + * store is a {@link ReadOnlyVectorStore}. The production + * {@code IndexingServiceImpl.deleteSegment} short-circuits these RPCs + * with a {@code FAILED_PRECONDITION} BEFORE reaching the engine — this + * test bypasses that gate by calling {@code engine.deleteSegment(...)} + * directly to prove the engine's belt-and-braces guard is in place. + * + *

The exception message must mention both "shadow replica" and + * "primary" so a future caller that bypasses the gRPC layer still gets + * an actionable diagnostic. + */ + @Test(timeout = 30_000) + public void engineRefusesDeleteOnReadOnlyVectorStoreWithDiagnosticMessage() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-readonly"; + + IndexingServiceEngine engine = newEngine(dsm, mm, stableUuid, null); + engine.start(); + ReadOnlyVectorStore readOnly = null; + try { + Table t = createTable(); + Index idx = createIndex(stableUuid); + // Schema must exist so the engine's schema-tracker accepts the + // synthetic ReadOnlyVectorStore registration. + engine.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + engine.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + // Build a real ReadOnlyVectorStore (no segments, no data — the + // delete-segment refusal must fire on the type check alone, and + // must not depend on the store having any state). Use a + // dedicated tmp dir so the underlying PersistentVectorStore + // delegate does not collide with the in-flight production + // store. + Path roTmp = folder.newFolder("ro-store").toPath(); + readOnly = new ReadOnlyVectorStore( + "vidx", "vectable", engine.getTableSpaceUUID(), + "vec", stableUuid + "-ro", roTmp, + dsm, mm, + 16, 100, 1.2f, 1.4f, + /* fusedPQ */ true, + /* maxSegmentSize */ 2_000_000_000L, + /* maxLiveGraphSize */ 0, + VectorSimilarityFunction.EUCLIDEAN); + readOnly.start(); + + // Replace the (PersistentVectorStore) the factory just installed + // with the synthetic ReadOnlyVectorStore — this is the precise + // scenario the engine guard is meant to catch. + engine.registerIndexForTest(idx, readOnly); + + try { + engine.deleteSegment("vectable", "vidx", "any-segment-key", + /* purgeStorage */ false, /* force */ true); + fail("engine must refuse deleteSegment on a ReadOnlyVectorStore"); + } catch (IndexingServiceEngine.DeleteSegmentException expected) { + String msg = expected.getMessage(); + assertNotNull(msg); + assertTrue("refusal message must mention 'shadow replica', got: " + msg, + msg.contains("shadow replica")); + assertTrue("refusal message must mention 'primary' (so the operator" + + " knows where to retry), got: " + msg, + msg.contains("primary")); + } + } finally { + if (readOnly != null) { + try { + readOnly.close(); + } catch (Exception ignore) { + // best-effort teardown + } + } + engine.close(); + } + } +} diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java new file mode 100644 index 000000000..667a22c55 --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentE2ETest.java @@ -0,0 +1,573 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package herddb.indexing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import herddb.cluster.ZookeeperMetadataStorageManager; +import herddb.codec.RecordSerializer; +import herddb.core.MemoryManager; +import herddb.indexing.vector.PersistentVectorStore; +import herddb.log.LogEntry; +import herddb.log.LogEntryFactory; +import herddb.log.LogSequenceNumber; +import herddb.mem.MemoryDataStorageManager; +import herddb.metadata.IndexingServiceCheckpointState; +import herddb.metadata.IndexingServiceInstanceDescriptor; +import herddb.model.ColumnTypes; +import herddb.model.Index; +import herddb.model.Record; +import herddb.model.Table; +import herddb.model.TableSpace; +import herddb.utils.ZKTestEnv; +import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Issue #617: end-to-end test for the operator-facing {@code DeleteSegment} + * path on a real {@link PersistentVectorStore} backed by a shared + * {@link MemoryDataStorageManager}, with one primary and one shadow replica + * coordinated through a real {@link ZKTestEnv}. Follows the same harness + * convention as {@link ShadowE2ETest} — both live in the IS module and run + * in the core (non-cluster) Maven test category because {@link ZKTestEnv} + * starts an embedded ZK/Bookie pair in the same JVM. + * + *

Coverage: + *

    + *
  • Refusal when graph file is present — the IS rejects the + * delete because the segment's multipart graph is reachable via the + * shared {@link MemoryDataStorageManager} (the same property the + * production refusal gate keys off in the issue #617 scenario, + * where {@code force=false} prevents accidental deletes).
  • + *
  • Force override removes the segment — with {@code force=true} + * the primary's in-memory segment count drops, the IS re-publishes + * a fresh {@link IndexingServiceCheckpointState}, the shadow's + * reload counter advances, and the shadow's loaded segment count + * on the dropped-segment store matches the primary's (zero).
  • + *
  • purge_storage=true also deletes the multipart files — after + * the force-delete the IS reports {@code storage_purged=true} and a + * subsequent existence probe against the storage manager returns + * false for both the {@code graph} and {@code map} keys.
  • + *
+ * + *

This test deliberately calls + * {@link IndexingServiceEngine#deleteSegment(String, String, String, boolean, boolean)} + * directly rather than going through the gRPC layer — the gRPC wire path + * is covered by {@code herddb.indexing.admin.IndexingAdminCliDeleteSegmentTest} + * (a focused fake-gRPC test). Coupling both layers in a single test would + * add cluster-mode wiring (an in-process gRPC server) without exercising + * a code path the CLI test does not already cover. + */ +public class ShadowDeleteSegmentE2ETest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private ZKTestEnv zk; + private ZookeeperMetadataStorageManager metadata; + private int savedMinLive; + private long savedDeferral; + private final List shadowMetadatas = new ArrayList<>(); + + @Before + public void setUp() throws Exception { + zk = new ZKTestEnv(folder.newFolder("zk").toPath()); + zk.startBookieAndInitCluster(); + metadata = new ZookeeperMetadataStorageManager( + zk.getAddress(), zk.getTimeout(), zk.getPath()); + metadata.start(); + metadata.ensureDefaultTableSpace("local", "local", 0, 1); + + savedMinLive = PersistentVectorStore.minLiveVectorsForCheckpoint; + savedDeferral = PersistentVectorStore.maxCheckpointDeferralMs; + // Same gates lifted as in ShadowE2ETest so a tiny workload still + // produces a real persisted segment. + PersistentVectorStore.minLiveVectorsForCheckpoint = 0; + } + + @After + public void tearDown() throws Exception { + PersistentVectorStore.minLiveVectorsForCheckpoint = savedMinLive; + PersistentVectorStore.maxCheckpointDeferralMs = savedDeferral; + for (ZookeeperMetadataStorageManager sm : shadowMetadatas) { + try { + sm.close(); + } catch (Exception ignore) { + // teardown best-effort — match ShadowE2ETest pattern + } + } + if (metadata != null) { + try { + metadata.close(); + } catch (Exception ignore) { + // teardown best-effort + } + } + if (zk != null) { + zk.close(); + } + } + + private static Table createTable() { + return Table.builder() + .name("vectable") + .tablespace(TableSpace.DEFAULT) + .column("pk", ColumnTypes.STRING) + .column("vec", ColumnTypes.FLOATARRAY) + .primaryKey("pk") + .build(); + } + + private static Index createIndex(String uuid) { + return Index.builder() + .name("vidx").uuid(uuid) + .table("vectable") + .type(Index.TYPE_VECTOR) + .column("vec", ColumnTypes.FLOATARRAY) + .build(); + } + + private static float[] randomVector(Random rng, int dim) { + float[] v = new float[dim]; + for (int i = 0; i < dim; i++) { + v[i] = rng.nextFloat(); + } + return v; + } + + private IndexingServiceEngine newPrimary(MemoryDataStorageManager dsm, MemoryManager mm, + String stableUuid, + AtomicReference storeRef) throws Exception { + Path logDir = folder.newFolder().toPath(); + Path dataDir = folder.newFolder().toPath(); + Properties props = new Properties(); + props.setProperty(IndexingServerConfiguration.PROPERTY_STORAGE_TYPE, "memory"); + props.setProperty(IndexingServerConfiguration.PROPERTY_TABLESPACE_NAME, TableSpace.DEFAULT); + IndexingServerConfiguration config = new IndexingServerConfiguration(props); + IndexingServiceEngine engine = new IndexingServiceEngine(logDir, dataDir, config); + engine.setMetadataStorageManager(metadata); + engine.setDataStorageManager(dsm); + engine.setMemoryManager(mm); + engine.setVectorStoreFactory((indexName, tableName, vectorColumnName, dataDirArg, indexProperties) -> { + PersistentVectorStore pvs = new PersistentVectorStore( + indexName, tableName, engine.getTableSpaceUUID(), vectorColumnName, + stableUuid, dataDirArg, dsm, mm, + 16, 100, 1.2f, 1.4f, true, 2_000_000_000L, 0, + Long.MAX_VALUE, + VectorSimilarityFunction.EUCLIDEAN); + try { + pvs.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + storeRef.set(pvs); + return pvs; + }); + return engine; + } + + private IndexingServiceEngine newShadow(MemoryDataStorageManager dsm, MemoryManager mm, + int shadowOf) throws Exception { + Path logDir = folder.newFolder().toPath(); + Path dataDir = folder.newFolder().toPath(); + Properties props = new Properties(); + props.setProperty(IndexingServerConfiguration.PROPERTY_STORAGE_TYPE, "memory"); + props.setProperty(IndexingServerConfiguration.PROPERTY_TABLESPACE_NAME, TableSpace.DEFAULT); + props.setProperty(IndexingServerConfiguration.PROPERTY_ROLE, + IndexingServerConfiguration.ROLE_SHADOW); + props.setProperty(IndexingServerConfiguration.PROPERTY_SHADOW_OF, Integer.toString(shadowOf)); + props.setProperty(IndexingServerConfiguration.PROPERTY_NUM_INSTANCES, "1"); + IndexingServerConfiguration config = new IndexingServerConfiguration(props); + IndexingServiceEngine engine = new IndexingServiceEngine(logDir, dataDir, config); + ZookeeperMetadataStorageManager perShadow = new ZookeeperMetadataStorageManager( + zk.getAddress(), zk.getTimeout(), zk.getPath()); + perShadow.start(); + shadowMetadatas.add(perShadow); + engine.setMetadataStorageManager(perShadow); + engine.setDataStorageManager(dsm); + engine.setMemoryManager(mm); + return engine; + } + + private void seedDsmSchema(MemoryDataStorageManager dsm, String tsUuid, Table table, Index idx) + throws Exception { + dsm.writeTables(tsUuid, LogSequenceNumber.START_OF_TIME, + Arrays.asList(table), Arrays.asList(idx), false); + } + + /** + * Full lifecycle test: + * 1. Primary ingests 128 vectors, checkpoints, and produces ≥1 segment. + * 2. A shadow starts and catches up (reloadCount == 1). + * 3. {@code deleteSegment(force=false)} is refused — graph file present. + * 4. {@code deleteSegment(force=true, purge_storage=true)} succeeds: + * - primary's {@code segmentCount} drops by one; + * - storage_purged == true; + * - graph + map files are no longer reachable in the DSM. + * 5. The shadow observes the new checkpoint state (reloadCount advances). + */ + @Test + public void deleteSegmentReducesPrimaryCountAndNotifiesShadow() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(128 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore); + primary.start(); + + Table t = createTable(); + Index idx = createIndex(stableUuid); + primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + Random rng = new Random(617); + int dim = 16; + LogSequenceNumber last = null; + for (int i = 0; i < 128; i++) { + Record r = RecordSerializer.makeRecord(t, + "pk", "k" + i, "vec", randomVector(rng, dim)); + LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null); + last = new LogSequenceNumber(1, 100 + i); + primary.applySingleEntryForTest(last, ins); + } + primary.awaitPendingWorkForTest(); + primary.setLastProcessedLsnForTest(last); + primary.forceCheckpointAndSaveWatermark(); + + seedDsmSchema(dsm, primary.getTableSpaceUUID(), t, idx); + + metadata.registerIndexingServiceInstance( + IndexingServiceInstanceDescriptor.primary( + "p0", "dummy-addr:0", 0)); + IndexingServiceCheckpointState beforeDelete = + metadata.getIndexingServiceCheckpointState(0); + assertNotNull("primary must have published checkpoint state", beforeDelete); + + // A real segment was created by the checkpoint. + PersistentVectorStore pvs = primaryStore.get(); + assertNotNull("primary store must be initialised", pvs); + int segmentCountBefore = pvs.getSegmentCount(); + assertTrue("primary checkpoint must produce ≥1 segment, got " + segmentCountBefore, + segmentCountBefore >= 1); + List keysBefore = pvs.getSegmentStorageKeysSnapshot(); + assertEquals(segmentCountBefore, keysBefore.size()); + String targetSegment = keysBefore.get(0); + + // Step 2: boot a shadow and let it catch up to reloadCount == 1. + IndexingServiceEngine shadow = newShadow(dsm, mm, 0); + shadow.start(); + try { + assertTrue("shadow must become ready", shadow.isShadowReady()); + assertEquals("shadow must have reloaded exactly once", + 1, shadow.getShadowReloadCount()); + + // Step 3: refuse the delete when force=false. The MemoryDataStorageManager + // honours multipartIndexFileExists via the default implementation, so the + // graph file is reachable → engine refuses. + try { + primary.deleteSegment("vectable", "vidx", targetSegment, + /* purgeStorage */ false, /* force */ false); + org.junit.Assert.fail("delete must be refused while graph file is present"); + } catch (IndexingServiceEngine.DeleteSegmentException expected) { + assertTrue("refusal message must mention force flag, got: " + expected.getMessage(), + expected.getMessage().toLowerCase().contains("force")); + } + // Confirm the refusal did not mutate state. + assertEquals("refused delete must NOT alter segment count", + segmentCountBefore, pvs.getSegmentCount()); + + long reloadCountBeforeForce = shadow.getShadowReloadCount(); + IndexingServiceCheckpointState stateBeforeForce = + metadata.getIndexingServiceCheckpointState(0); + assertNotNull(stateBeforeForce); + + // Step 4: force + purge. + IndexingServiceEngine.DeleteSegmentResult result = + primary.deleteSegment("vectable", "vidx", targetSegment, + /* purgeStorage */ true, /* force */ true); + assertTrue("force-delete must report removed=true", result.removed); + assertEquals(targetSegment, result.segment); + assertTrue("force-delete must report graph_file_present=true", + result.graphFilePresent); + assertTrue("force-delete with purge must report storage_purged=true", + result.storagePurged); + + assertEquals("primary segment count must have dropped by one", + segmentCountBefore - 1, pvs.getSegmentCount()); + + // Storage purge must have actually removed the multipart files. + assertFalse("graph multipart file must be gone after purge", + dsm.multipartIndexFileExists(primary.getTableSpaceUUID(), + targetSegment, "graph")); + assertFalse("map multipart file must be gone after purge", + dsm.multipartIndexFileExists(primary.getTableSpaceUUID(), + targetSegment, "map")); + + // The primary must have written a fresh checkpoint state to ZK + // as part of the deleteSegment path. We verify this directly + // before waiting on the shadow — a missing republish would + // mask a regression in the engine-level notification logic + // behind a generic "reloadCount didn't advance" failure. + IndexingServiceCheckpointState stateAfterDelete = + metadata.getIndexingServiceCheckpointState(0); + assertNotNull(stateAfterDelete); + assertEquals("post-delete state must carry the new segment count", + pvs.getSegmentCount(), stateAfterDelete.getSegmentCount()); + assertTrue("post-delete state timestamp must advance, before=" + + stateBeforeForce.getTimestampMillis() + + " after=" + stateAfterDelete.getTimestampMillis(), + stateAfterDelete.getTimestampMillis() >= stateBeforeForce.getTimestampMillis()); + + // Step 5: shadow notification — wait for reloadCount to advance. + // 30s deadline is consistent with awaitShadowReloadsForTest()'s + // internal 30s barrier (the watch may arrive slightly after the + // setData call returns). + long deadline = System.currentTimeMillis() + 30_000L; + while (shadow.getShadowReloadCount() <= reloadCountBeforeForce + && System.currentTimeMillis() < deadline) { + Thread.sleep(50); + } + assertTrue("shadow must react to the post-delete republish, " + + "reloadCount before=" + reloadCountBeforeForce + + " now=" + shadow.getShadowReloadCount(), + shadow.getShadowReloadCount() > reloadCountBeforeForce); + } finally { + shadow.close(); + primary.close(); + } + } + + /** + * Force-delete WITHOUT {@code purge_storage}: the in-memory removal must + * still succeed, but the engine must report {@code storage_purged=false} + * and the multipart files must remain in the DSM (for post-mortem + * forensics, as documented in issue #617). + */ + @Test + public void forceDeleteWithoutPurgeKeepsMultipartFiles() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(128 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-nopurge"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore); + primary.start(); + + Table t = createTable(); + Index idx = createIndex(stableUuid); + primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + Random rng = new Random(2026); + int dim = 12; + LogSequenceNumber last = null; + for (int i = 0; i < 64; i++) { + Record r = RecordSerializer.makeRecord(t, + "pk", "k" + i, "vec", randomVector(rng, dim)); + LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null); + last = new LogSequenceNumber(1, 200 + i); + primary.applySingleEntryForTest(last, ins); + } + primary.awaitPendingWorkForTest(); + primary.setLastProcessedLsnForTest(last); + primary.forceCheckpointAndSaveWatermark(); + seedDsmSchema(dsm, primary.getTableSpaceUUID(), t, idx); + metadata.registerIndexingServiceInstance( + IndexingServiceInstanceDescriptor.primary("p0", "dummy:0", 0)); + + try { + PersistentVectorStore pvs = primaryStore.get(); + assertNotNull(pvs); + List keys = pvs.getSegmentStorageKeysSnapshot(); + assertTrue("must have at least one segment", keys.size() >= 1); + String target = keys.get(0); + + IndexingServiceEngine.DeleteSegmentResult result = primary.deleteSegment( + "vectable", "vidx", target, + /* purgeStorage */ false, /* force */ true); + + assertTrue(result.removed); + assertFalse("storage_purged must be false when purge_storage=false", + result.storagePurged); + // Files must still be on disk for forensics. + assertTrue("graph file must remain in DSM when purge_storage=false", + dsm.multipartIndexFileExists(primary.getTableSpaceUUID(), target, "graph")); + } finally { + primary.close(); + } + } + + /** + * Negative path: requesting a segment that is not loaded must throw + * {@link IndexingServiceEngine.DeleteSegmentException} with a message + * that lists the currently-loaded segments. This is the operator- + * friendly counterpart of the issue #617 reproduction, where the + * operator pastes the wrong segment name. + */ + @Test + public void unknownSegmentIsRejectedWithDiagnosticMessage() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-unknown"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore); + primary.start(); + try { + Table t = createTable(); + Index idx = createIndex(stableUuid); + primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + // No checkpoint → store has no segments. Still a valid lookup target. + + try { + primary.deleteSegment("vectable", "vidx", "vidx_nonsense_seg999", + false, true); + org.junit.Assert.fail("unknown segment must be rejected"); + } catch (IndexingServiceEngine.DeleteSegmentException expected) { + assertTrue("error must mention the missing segment, got: " + expected.getMessage(), + expected.getMessage().contains("vidx_nonsense_seg999")); + } + } finally { + primary.close(); + } + } + + /** + * pr-reviewer follow-up #4 for issue #617: a shadow that boots AFTER + * the primary has already executed {@code deleteSegment(force=true, + * purge=true)} must load the smaller (post-delete) segment count on its + * very first reload — exactly the operational sequence operators + * follow when they delete an orphan segment on a primary, then bring + * up a fresh shadow from the cleaned-up checkpoint. + * + *

Asserts: + *

    + *
  1. the primary's segment count drops by one immediately;
  2. + *
  3. the shadow becomes ready;
  4. + *
  5. {@code shadowReloadCount == 1} (the cold-boot reload counts + * as the first observation);
  6. + *
  7. the shadow's loaded segment count matches the primary's + * post-delete count.
  8. + *
+ */ + @Test + public void lateBootShadowObservesPostDeleteState() throws Exception { + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + MemoryManager mm = new MemoryManager(128 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + final String stableUuid = "idx-uuid-617-lateboot"; + + AtomicReference primaryStore = new AtomicReference<>(); + IndexingServiceEngine primary = newPrimary(dsm, mm, stableUuid, primaryStore); + primary.start(); + + Table t = createTable(); + Index idx = createIndex(stableUuid); + primary.applyEntry(new LogSequenceNumber(1, 1), LogEntryFactory.createTable(t, null)); + primary.applyEntry(new LogSequenceNumber(1, 2), LogEntryFactory.createIndex(idx, null)); + + Random rng = new Random(617_42); + int dim = 12; + LogSequenceNumber last = null; + // Build enough vectors that a single checkpoint produces ≥1 segment, + // then a SECOND checkpoint produces another segment so the delete + // below has more than one segment to choose from. We then delete + // exactly one and verify the shadow loads the remaining count. + for (int batch = 0; batch < 2; batch++) { + for (int i = 0; i < 64; i++) { + Record r = RecordSerializer.makeRecord(t, + "pk", "k" + batch + "_" + i, "vec", randomVector(rng, dim)); + LogEntry ins = LogEntryFactory.insert(t, r.key, r.value, null); + last = new LogSequenceNumber(1, 300 + batch * 100 + i); + primary.applySingleEntryForTest(last, ins); + } + primary.awaitPendingWorkForTest(); + primary.setLastProcessedLsnForTest(last); + primary.forceCheckpointAndSaveWatermark(); + } + seedDsmSchema(dsm, primary.getTableSpaceUUID(), t, idx); + metadata.registerIndexingServiceInstance( + IndexingServiceInstanceDescriptor.primary("p0", "dummy:0", 0)); + + IndexingServiceEngine shadow = null; + try { + PersistentVectorStore pvs = primaryStore.get(); + assertNotNull(pvs); + int beforeDelete = pvs.getSegmentCount(); + assertTrue("primary must have ≥2 segments before the delete to make the" + + " post-delete comparison non-trivial, got " + beforeDelete, + beforeDelete >= 2); + + List keys = pvs.getSegmentStorageKeysSnapshot(); + String victim = keys.get(0); + + // Step 1: primary-side delete BEFORE the shadow is ever + // started. force=true + purge=true is the operational + // sequence for orphaned segments. + IndexingServiceEngine.DeleteSegmentResult result = primary.deleteSegment( + "vectable", "vidx", victim, + /* purgeStorage */ true, /* force */ true); + assertTrue("primary-side delete must succeed", result.removed); + assertTrue("graph file was present before the delete", result.graphFilePresent); + assertTrue("purge=true must report storage_purged=true", result.storagePurged); + + int afterDelete = pvs.getSegmentCount(); + assertEquals("primary segment count must drop by exactly one", + beforeDelete - 1, afterDelete); + + // Step 2: only now boot the shadow. The shadow has never seen + // the pre-delete IndexStatus — it loads the (already smaller) + // post-delete one on its very first reload. + shadow = newShadow(dsm, mm, 0); + shadow.start(); + assertTrue("late-boot shadow must become ready", shadow.isShadowReady()); + + assertEquals("shadow must have reloaded exactly once on cold boot", + 1L, shadow.getShadowReloadCount()); + + // Step 3: the shadow's loaded segment count must match the + // primary's POST-delete count. A regression that leaks the + // pre-delete IndexStatus into the shadow's load path would + // produce a mismatch here. + int shadowSegments = shadow.getSegmentCountForTest("vectable", "vidx"); + assertEquals("late-boot shadow must observe the post-delete segment count", + afterDelete, shadowSegments); + } finally { + if (shadow != null) { + shadow.close(); + } + primary.close(); + } + } +} diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java new file mode 100644 index 000000000..819a3b982 --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/ShadowDeleteSegmentRpcGateTest.java @@ -0,0 +1,230 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package herddb.indexing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import herddb.indexing.proto.DeleteSegmentRequest; +import herddb.indexing.proto.IndexingServiceGrpc; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.nio.file.Paths; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * pr-reviewer follow-up #4 for issue #617: exercises the + * real-server shadow gate on the {@code DeleteSegment} RPC. + * + *

The existing + * {@code IndexingAdminCliDeleteSegmentTest.shadowConfiguredServerRejectsWithNotPrimaryPrefix} + * test runs against a hand-rolled gRPC fake that returns a hard-coded + * {@code FAILED_PRECONDITION} response — it validates the CLI's surface, + * not the production gate at + * {@link IndexingServiceImpl#deleteSegment(DeleteSegmentRequest, + * io.grpc.stub.StreamObserver)}. A regression that deleted the production + * gate would still pass that fake-server test. + * + *

This test mirrors the {@link ShadowAdminCliAndRpcTest} harness: a real + * {@link IndexingServiceImpl} wired to a real {@link IndexingServiceEngine} + * stub configured as a shadow ({@code isConfiguredAsShadow() == true}), + * exposed via an in-process gRPC server, invoked via the real blocking + * stub. The engine's {@code deleteSegment} is intentionally NOT overridden + * — the test would FAIL if it were ever reached, proving the production + * shadow gate at the service boundary is what's blocking the call. + * + *

Asserts the documented error contract: + *

    + *
  • {@code Status.Code == FAILED_PRECONDITION};
  • + *
  • description starts with + * {@code "not_primary: this indexing-service instance is a shadow replica (shadowOf="};
  • + *
  • description contains {@code "target the primary instead"}.
  • + *
+ */ +public class ShadowDeleteSegmentRpcGateTest { + + /** + * Minimal {@link IndexingServiceEngine} subclass: exposes shadow=true with + * a configurable shadowOf, and BLOWS UP if anything ever calls the + * engine's own {@code deleteSegment} — that would mean the service-level + * shadow gate was bypassed. + */ + private static final class ShadowStubEngine extends IndexingServiceEngine { + volatile boolean shadow = true; + volatile int shadowOf = 0; + + ShadowStubEngine() { + super(Paths.get("/tmp/noop-log-shadow-rpc"), + Paths.get("/tmp/noop-data-shadow-rpc"), + new IndexingServerConfiguration()); + } + + @Override + public boolean isConfiguredAsShadow() { + return shadow; + } + + @Override + public boolean isShadowReady() { + // Ready=true so the IndexingServiceImpl @Override interceptor for + // shadow-not-ready does NOT short-circuit our shadow gate ahead of + // time. Ready vs not-ready is orthogonal to the DeleteSegment gate + // — we want to prove the DeleteSegment gate fires even when the + // shadow is otherwise healthy. + return true; + } + + @Override + public int getShadowOfOrMinusOne() { + return shadowOf; + } + + @Override + public String getInstanceIdLabel() { + return "shadow-stub"; + } + + @Override + public String getTableSpaceUUID() { + return "ts-uuid"; + } + + @Override + public DeleteSegmentResult deleteSegment(String table, String indexName, + String segmentStorageKey, + boolean purgeStorage, boolean force) { + // If this is ever reached the service-level shadow gate was + // deleted or bypassed. The test asserts that this does NOT + // happen — the gRPC stub call must terminate inside + // IndexingServiceImpl.deleteSegment with FAILED_PRECONDITION, + // before this method is ever invoked. + throw new AssertionError("engine.deleteSegment must NOT be reached " + + "when the service is configured as a shadow — the production" + + " gate at IndexingServiceImpl.deleteSegment must short-circuit" + + " the RPC with not_primary:"); + } + } + + private ShadowStubEngine engine; + private Server server; + private ManagedChannel channel; + private IndexingServiceGrpc.IndexingServiceBlockingStub stub; + + @Before + public void setUp() throws Exception { + engine = new ShadowStubEngine(); + IndexingServiceImpl svc = new IndexingServiceImpl(engine, NullStatsLogger.INSTANCE); + server = ServerBuilder.forPort(0).addService(svc).build().start(); + channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()) + .usePlaintext().build(); + stub = IndexingServiceGrpc.newBlockingStub(channel); + } + + @After + public void tearDown() throws Exception { + if (channel != null) { + channel.shutdownNow(); + } + if (server != null) { + server.shutdownNow().awaitTermination(); + } + } + + /** + * pr-reviewer follow-up #4: the real-server gate must reject the RPC + * with the documented FAILED_PRECONDITION prefix WITHOUT ever entering + * the engine. ShadowStubEngine.deleteSegment throws AssertionError if + * reached — that fails the test, proving the gate is real. + */ + @Test + public void deleteSegmentRpcOnShadowReturnsNotPrimaryFailedPrecondition() { + engine.shadow = true; + engine.shadowOf = 0; + + DeleteSegmentRequest request = DeleteSegmentRequest.newBuilder() + .setTablespace("ts") + .setTable("vectable") + .setIndex("vidx") + .setSegment("vidx_aaa_seg1") + .setPurgeStorage(false) + .setForce(false) + .build(); + try { + stub.deleteSegment(request); + fail("DeleteSegment RPC on shadow must throw StatusRuntimeException " + + "with FAILED_PRECONDITION"); + } catch (StatusRuntimeException e) { + assertEquals("status code must be FAILED_PRECONDITION", + Status.Code.FAILED_PRECONDITION, e.getStatus().getCode()); + String desc = e.getStatus().getDescription(); + assertNotNull("status must carry a description", desc); + String expectedPrefix = "not_primary: this indexing-service instance is a" + + " shadow replica (shadowOf="; + assertTrue("status description must start with the documented " + + "not_primary prefix, got: " + desc, + desc.startsWith(expectedPrefix)); + assertTrue("status description must instruct the operator to target" + + " the primary, got: " + desc, + desc.contains("target the primary instead")); + // Sanity: shadowOf=0 from our stub must appear in the message. + assertTrue("status description must include the shadowOf integer, got: " + desc, + desc.contains("shadowOf=0")); + } + } + + /** + * Defensive: when shadow=true but shadowOf=-1 (configured-as-shadow + * without a numeric pointer), the gate still fires with the documented + * prefix. shadowOf="-1" still flows through the message verbatim. + */ + @Test + public void deleteSegmentRpcRejectsEvenWithoutKnownShadowOf() { + engine.shadow = true; + engine.shadowOf = -1; + + DeleteSegmentRequest request = DeleteSegmentRequest.newBuilder() + .setTable("vectable") + .setIndex("vidx") + .setSegment("vidx_aaa_seg2") + .build(); + try { + stub.deleteSegment(request); + fail("DeleteSegment RPC on shadow with shadowOf=-1 must still be" + + " rejected with FAILED_PRECONDITION"); + } catch (StatusRuntimeException e) { + assertEquals(Status.Code.FAILED_PRECONDITION, e.getStatus().getCode()); + String desc = e.getStatus().getDescription(); + assertNotNull(desc); + assertTrue("description must start with not_primary prefix, got: " + desc, + desc.startsWith("not_primary: this indexing-service instance is a" + + " shadow replica (shadowOf=")); + assertTrue("description must instruct to target the primary, got: " + desc, + desc.contains("target the primary instead")); + } + } +} diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java new file mode 100644 index 000000000..85fd00883 --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/admin/IndexingAdminCliDeleteSegmentTest.java @@ -0,0 +1,550 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ + +package herddb.indexing.admin; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import herddb.indexing.proto.DeleteSegmentRequest; +import herddb.indexing.proto.DeleteSegmentResponse; +import herddb.indexing.proto.IndexingServiceGrpc; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Issue #617: wire-level tests for the {@code delete-segment} sub-command of + * {@link IndexingAdminCli}. Each test boots a tiny in-process gRPC server + * whose {@code deleteSegment} implementation records the inbound request and + * returns a hand-crafted response, then drives the CLI's {@link + * IndexingAdminCli#run(String[])} entry point with the relevant flags + * (purge-storage, force, --yes, --json, stdin confirmation) and asserts on: + * + *
    + *
  • the exact wire request the CLI emitted (so the operator's flags + * reach the IS without translation drift);
  • + *
  • the process exit code (0 when the server reported + * {@code removed=true}, 1 when {@code removed=false} — so wrapper + * scripts can distinguish "I removed it" from "it was already + * gone");
  • + *
  • the stdout payload (text vs. JSON);
  • + *
  • the stdin-confirmation behaviour (aborts cleanly on "n", + * proceeds on "y", and aborts when stdin is closed).
  • + *
+ * + *

Engine-level coverage (a real {@link herddb.indexing.vector.PersistentVectorStore} + * with real segments + shadow notification on segment removal) lives in + * {@code herddb.indexing.ShadowDeleteSegmentE2ETest}. + */ +public class IndexingAdminCliDeleteSegmentTest { + + private FakeDeleteSegmentServer fakeServer; + private ByteArrayOutputStream stdoutBytes; + private ByteArrayOutputStream stderrBytes; + private IndexingAdminCli cli; + private InputStream savedStdin; + + @Before + public void setUp() { + stdoutBytes = new ByteArrayOutputStream(); + stderrBytes = new ByteArrayOutputStream(); + cli = new IndexingAdminCli( + new PrintStream(stdoutBytes, true, StandardCharsets.UTF_8), + new PrintStream(stderrBytes, true, StandardCharsets.UTF_8)); + savedStdin = System.in; + } + + @After + public void tearDown() throws InterruptedException { + System.setIn(savedStdin); + if (fakeServer != null) { + fakeServer.close(); + } + } + + private String stdout() { + return new String(stdoutBytes.toByteArray(), StandardCharsets.UTF_8); + } + + private String stderr() { + return new String(stderrBytes.toByteArray(), StandardCharsets.UTF_8); + } + + /** + * Happy path: {@code --yes} skips the confirmation prompt, and the + * server reports a successful removal. The CLI must forward every flag + * to the server (in particular {@code purge-storage} → {@code true} + * and {@code force} → {@code false}), exit with code 0, and emit JSON + * carrying every response field. + */ + @Test + public void happyPathYesPurgeJsonReportsRemovedAndExits0() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .setVectorsLost(12345L) + .setGraphFilePresent(false) + .setStoragePurged(req.getPurgeStorage()) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--tablespace", "ts", + "--table", "vectable", + "--index", "vidx", + "--segment", "vidx_aaa_seg627", + "--purge-storage", + "--yes", + "--json" + }); + assertEquals("CLI should succeed; stderr=\n" + stderr(), 0, rc); + + DeleteSegmentRequest captured = fakeServer.lastRequest(); + assertNotNull("server must have received the request", captured); + assertEquals("ts", captured.getTablespace()); + assertEquals("vectable", captured.getTable()); + assertEquals("vidx", captured.getIndex()); + assertEquals("vidx_aaa_seg627", captured.getSegment()); + assertTrue("CLI must forward --purge-storage", captured.getPurgeStorage()); + assertFalse("CLI must NOT set force when --force is absent", captured.getForce()); + + String out = stdout().trim(); + assertTrue("JSON output expected, got:\n" + out, out.startsWith("{") && out.endsWith("}")); + assertTrue("must include the segment name, got:\n" + out, + out.contains("\"segment\":\"vidx_aaa_seg627\"")); + assertTrue("must include removed=true, got:\n" + out, + out.contains("\"removed\":true")); + assertTrue("must include vectors_lost, got:\n" + out, + out.contains("\"vectors_lost\":12345")); + assertTrue("must include storage_purged=true, got:\n" + out, + out.contains("\"storage_purged\":true")); + } + + /** + * Text mode (no {@code --json}) — the CLI must emit a single + * human-readable line containing the same fields the JSON mode + * emits, in {@code key=value} form. + */ + @Test + public void textModeProducesSingleLineSummary() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .setVectorsLost(7L) + .setGraphFilePresent(true) + .setStoragePurged(false) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "vectable", + "--index", "vidx", + "--segment", "vidx_bbb_seg42", + "--force", + "--yes" + }); + assertEquals(0, rc); + String out = stdout(); + assertTrue("text output must include segment=, got:\n" + out, + out.contains("segment=vidx_bbb_seg42")); + assertTrue(out.contains("removed=true")); + assertTrue(out.contains("vectors_lost=7")); + assertTrue(out.contains("graph_file_present=true")); + assertTrue(out.contains("storage_purged=false")); + } + + /** + * When {@code --force} is set the CLI must forward {@code force=true} + * to the server. The server-side refusal is exercised by the engine + * E2E test; here we only check the wire-protocol forwarding. + */ + @Test + public void forceFlagIsForwardedToServer() throws Exception { + AtomicReference captured = new AtomicReference<>(); + fakeServer = new FakeDeleteSegmentServer(req -> { + captured.set(req); + return DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .setVectorsLost(0L) + .setGraphFilePresent(true) + .setStoragePurged(false) + .build(); + }); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg1", + "--force", + "--yes" + }); + assertEquals(0, rc); + assertTrue("CLI must forward --force as force=true", + captured.get().getForce()); + } + + /** + * Server-side refusal (mapped to {@code FAILED_PRECONDITION}) must + * surface as a non-zero CLI exit code and an error line on stderr — + * not a stack trace. + */ + @Test + public void serverRefusalIsReportedAsErrorWithNonZeroExit() throws Exception { + fakeServer = new FakeDeleteSegmentServer(req -> { + throw Status.FAILED_PRECONDITION + .withDescription("refusing to delete segment vidx_seg1: graph file IS reachable") + .asRuntimeException(); + }); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg1", + "--yes" + }); + assertTrue("CLI must exit non-zero on server refusal, got rc=" + rc, rc != 0); + String err = stderr(); + assertTrue("stderr must mention the failure, got:\n" + err, + err.contains("ERROR") || err.toLowerCase().contains("refus")); + } + + /** + * When the server returns {@code removed=false} (no-op race), the CLI + * exits with code 1 so wrapper scripts can tell "I removed it" from + * "it was already gone". + */ + @Test + public void removedFalseExitsWithCode1() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(false) + .setVectorsLost(0L) + .setGraphFilePresent(false) + .setStoragePurged(false) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_gone", + "--yes" + }); + assertEquals("removed=false must exit 1", 1, rc); + } + + /** + * Stdin confirmation: when {@code --yes} is absent the CLI must read + * one byte from stdin and abort on anything other than {@code y/Y}. + * The server must never receive a request in the aborted case. + */ + @Test + public void stdinPromptAbortsOnAnyNonYesInput() throws Exception { + AtomicReference captured = new AtomicReference<>(); + fakeServer = new FakeDeleteSegmentServer(req -> { + captured.set(req); + return DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .build(); + }); + fakeServer.start(); + + System.setIn(new ByteArrayInputStream("n\n".getBytes(StandardCharsets.UTF_8))); + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg1" + }); + assertEquals("rejected prompt must exit non-zero", 1, rc); + assertTrue("stderr must mention abort, got:\n" + stderr(), + stderr().toLowerCase().contains("abort")); + // Critical safety property: the RPC must NOT have been invoked. + assertEquals("server must not have received any request when user aborted", + null, captured.get()); + } + + /** + * {@code y\n} on stdin (without {@code --yes}) must proceed with the + * RPC. Verifies the inverse of the abort path above. + */ + @Test + public void stdinPromptProceedsOnYInput() throws Exception { + AtomicReference captured = new AtomicReference<>(); + fakeServer = new FakeDeleteSegmentServer(req -> { + captured.set(req); + return DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(true) + .setVectorsLost(1L) + .build(); + }); + fakeServer.start(); + + System.setIn(new ByteArrayInputStream("y\n".getBytes(StandardCharsets.UTF_8))); + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg2" + }); + assertEquals(0, rc); + assertNotNull("server must have received the request after y/Y", + captured.get()); + assertEquals("vidx_seg2", captured.get().getSegment()); + } + + /** + * pr-reviewer follow-up #1: when the targeted IS is a shadow replica, + * the server short-circuits the RPC with the exact prefix + * {@code "not_primary: this indexing-service instance is a shadow + * replica (shadowOf=...)"}. The CLI must surface this as a non-zero + * exit code with the rejection text on stderr. + */ + @Test + public void shadowConfiguredServerRejectsWithNotPrimaryPrefix() throws Exception { + // The fake server emulates the IndexingServiceImpl.deleteSegment + // shadow gate: it never invokes the engine, just returns + // FAILED_PRECONDITION with the exact "not_primary:" message. + fakeServer = new FakeDeleteSegmentServer(req -> { + throw Status.FAILED_PRECONDITION + .withDescription("not_primary: this indexing-service instance is a" + + " shadow replica (shadowOf=0); target the primary instead") + .asRuntimeException(); + }); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "t", + "--index", "i", + "--segment", "vidx_seg1", + "--yes" + }); + assertTrue("CLI must exit non-zero on shadow rejection, got rc=" + rc, rc != 0); + String err = stderr(); + assertTrue("stderr must carry the not_primary: prefix, got:\n" + err, + err.contains("not_primary:")); + assertTrue("stderr must mention shadowOf=, got:\n" + err, + err.contains("shadowOf=")); + } + + /** + * pr-reviewer follow-up #3 (text output): when the server returns + * {@code vectors_lost=-1} (the engine's race-sentinel for "the segment + * disappeared between the snapshot and the drop call — count is + * unknown"), the CLI must NOT print the raw {@code -1} number. Operators + * read the field as a count, so {@code -1} would be confusing. + * Instead emit {@code vectors_lost=unknown (race)}. + */ + @Test + public void textOutputRendersNegativeVectorsLostAsUnknownRace() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(false) + .setVectorsLost(-1L) + .setGraphFilePresent(false) + .setStoragePurged(false) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "vectable", + "--index", "vidx", + "--segment", "vidx_race_seg", + "--yes" + }); + // removed=false → rc=1 (existing contract from removedFalseExitsWithCode1). + assertEquals("removed=false must still map to rc=1", 1, rc); + + String out = stdout(); + assertTrue("text output must render -1 as 'unknown (race)', got:\n" + out, + out.contains("vectors_lost=unknown (race)")); + // And it must NOT include the raw -1, which would be visually + // misleading next to the other numeric fields. + assertFalse("text output must not include the raw -1 sentinel, got:\n" + out, + out.contains("vectors_lost=-1")); + // The other fields are still rendered. + assertTrue(out.contains("segment=vidx_race_seg")); + assertTrue(out.contains("removed=false")); + } + + /** + * pr-reviewer follow-up #3 (JSON output): same input as above (server + * returns {@code vectors_lost=-1}). The JSON form keeps the + * {@code vectors_lost} key in the object (so the schema stays stable) + * but emits the string {@code "unknown"} instead of the raw number. + * Wrapper scripts that consume the JSON can distinguish a real count + * from "we don't know" by type-checking the field. + */ + @Test + public void jsonOutputRendersNegativeVectorsLostAsUnknownString() throws Exception { + fakeServer = new FakeDeleteSegmentServer( + req -> DeleteSegmentResponse.newBuilder() + .setSegment(req.getSegment()) + .setRemoved(false) + .setVectorsLost(-1L) + .setGraphFilePresent(false) + .setStoragePurged(false) + .build()); + fakeServer.start(); + + int rc = cli.run(new String[]{ + "delete-segment", + "--server", fakeServer.address(), + "--table", "vectable", + "--index", "vidx", + "--segment", "vidx_race_seg_json", + "--yes", + "--json" + }); + assertEquals("removed=false must still map to rc=1", 1, rc); + + String out = stdout().trim(); + assertTrue("expected JSON object, got:\n" + out, + out.startsWith("{") && out.endsWith("}")); + // The vectors_lost key MUST still be present (stable schema), but as + // a JSON string, not a number. So the substring is the + // quoted-key-and-value form. + assertTrue("JSON must carry vectors_lost as the string \"unknown\", got:\n" + out, + out.contains("\"vectors_lost\":\"unknown\"")); + // And it must NOT carry the raw -1 sentinel. + assertFalse("JSON must not carry the raw -1 vectors_lost value, got:\n" + out, + out.contains("\"vectors_lost\":-1")); + // Other fields still render correctly. + assertTrue(out.contains("\"segment\":\"vidx_race_seg_json\"")); + assertTrue(out.contains("\"removed\":false")); + } + + /** + * Missing {@code --segment} flag must exit with usage code 2, not + * with a stack trace. + */ + @Test + public void missingSegmentFlagExits2() { + int rc = cli.run(new String[]{ + "delete-segment", + "--server", "localhost:1", + "--table", "t", + "--index", "i" + // intentionally missing --segment + }); + assertEquals(2, rc); + String err = stderr(); + assertTrue("stderr must mention the missing option, got:\n" + err, + err.toLowerCase().contains("segment")); + } + + // -------------------- harness -------------------- + + @FunctionalInterface + private interface DeleteSegmentHandler { + DeleteSegmentResponse handle(DeleteSegmentRequest request); + } + + private static final class FakeDeleteSegmentServer implements AutoCloseable { + private final DeleteSegmentImpl impl; + private Server server; + + FakeDeleteSegmentServer(DeleteSegmentHandler handler) { + this.impl = new DeleteSegmentImpl(handler); + } + + void start() throws IOException { + server = ServerBuilder.forPort(0).addService(impl).build().start(); + assertNotNull(server); + } + + String address() { + return "localhost:" + server.getPort(); + } + + DeleteSegmentRequest lastRequest() { + return impl.lastRequest.get(); + } + + @Override + public void close() throws InterruptedException { + if (server != null) { + server.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } + } + + private static final class DeleteSegmentImpl + extends IndexingServiceGrpc.IndexingServiceImplBase { + private final DeleteSegmentHandler handler; + private final AtomicReference lastRequest = new AtomicReference<>(); + + DeleteSegmentImpl(DeleteSegmentHandler handler) { + this.handler = handler; + } + + @Override + public void deleteSegment(DeleteSegmentRequest request, + StreamObserver responseObserver) { + lastRequest.set(request); + try { + DeleteSegmentResponse resp = handler.handle(request); + responseObserver.onNext(resp); + responseObserver.onCompleted(); + } catch (io.grpc.StatusRuntimeException e) { + responseObserver.onError(e); + } + } + } +} diff --git a/herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java b/herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java new file mode 100644 index 000000000..628231f40 --- /dev/null +++ b/herddb-indexing-service/src/test/java/herddb/indexing/vector/PersistentVectorStoreAdminDeleteSegmentTest.java @@ -0,0 +1,316 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +package herddb.indexing.vector; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import herddb.core.MemoryManager; +import herddb.mem.MemoryDataStorageManager; +import herddb.utils.Bytes; +import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import java.nio.file.Path; +import java.util.List; +import java.util.Random; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * pr-reviewer follow-up #3 for issue #617: direct tests for + * {@link PersistentVectorStore#dropSegmentByStorageKey(String)} — the engine + * primitive that the operator-facing {@code DeleteSegment} RPC ultimately + * invokes after the safety gate, snapshot/refusal checks, and audit log. + * + *

Engine-level coverage (request validation + checkpoint republish + + * shadow notification) lives in + * {@code herddb.indexing.ShadowDeleteSegmentE2ETest}; CLI/wire coverage in + * {@code herddb.indexing.admin.IndexingAdminCliDeleteSegmentTest}. This + * class is dedicated to the in-memory mutation primitive so a regression in + * the lock discipline / dirty-flag / deferred-close path is caught with the + * smallest possible harness. + * + *

Coverage matrix: + *

    + *
  1. (a) happy path: a known segment is removed (gone from + * {@code segments}, store marked {@code dirty} for the next + * checkpoint);
  2. + *
  3. (b) idempotence: a second call on the same key returns + * {@code SegmentDropResult.NOT_FOUND} without throwing and without + * mutating state;
  4. + *
  5. (c) deferred-close drain: after the drop the + * {@code pendingSegmentCloses} queue eventually empties — i.e. the + * segment handle is closed (opportunistic drain runs while no + * compaction holds the lock);
  6. + *
  7. (d) compaction-race safety: when a cycle is mid-iteration + * (after candidate selection, before {@code rebuildSegment}), a + * concurrent drop must NOT cause + * {@code CompactionException(CORRUPTION)} — same invariant as issue + * #535 inherited by the storage-key path.
  8. + *
+ */ +public class PersistentVectorStoreAdminDeleteSegmentTest { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static final int DIM = 16; + private int savedMinLive; + + @Before + public void disableDeferral() { + savedMinLive = PersistentVectorStore.minLiveVectorsForCheckpoint; + // Allow tiny checkpoints to seal so we always produce ≥1 on-disk + // segment with a handful of inserts. + PersistentVectorStore.minLiveVectorsForCheckpoint = 0; + } + + @After + public void restoreDeferral() { + PersistentVectorStore.minLiveVectorsForCheckpoint = savedMinLive; + } + + private static float[] vec(Random rng) { + float[] v = new float[DIM]; + for (int i = 0; i < DIM; i++) { + v[i] = rng.nextFloat(); + } + return v; + } + + private PersistentVectorStore newStore(Path tmpDir, MemoryDataStorageManager dsm) { + MemoryManager mm = new MemoryManager(64 * 1024 * 1024, 0, 1024 * 1024, 1024 * 1024); + PersistentVectorStore store = new PersistentVectorStore( + "vidx-617", "testtable", "tstblspace", "vector_col", + tmpDir, dsm, mm, + 16, 100, 1.2f, 1.4f, true, 2_000_000_000L, 0, + /* compactionIntervalMs */ Long.MAX_VALUE, + VectorSimilarityFunction.EUCLIDEAN); + // Aggressive policy so the race-test below still has something to + // pick when the hook drops the chosen candidate. + store.configureCompaction(Long.MAX_VALUE, 1L, Long.MAX_VALUE, 2, + Integer.MAX_VALUE, 0); + return store; + } + + /** Seeds {@code n} on-disk segments via add+checkpoint cycles. */ + private void seedSegments(PersistentVectorStore store, int n) throws Exception { + Random rng = new Random(617); + for (int c = 0; c < n; c++) { + for (int i = 0; i < 30; i++) { + store.addVector(Bytes.from_int(c * 1000 + i), vec(rng)); + } + store.checkpoint(); + } + } + + /** + * (a) Happy path: a known segment key is removed, the result reports + * {@code removed=true} with a non-negative {@code vectorsLost}, the + * segment list shrinks by one, and {@code dirty} is set so the next + * checkpoint serialises the smaller list. + */ + @Test(timeout = 30_000) + public void happyPathRemovesSegmentAndMarksDirty() throws Exception { + Path tmpDir = tmpFolder.newFolder("admin-delete-happy").toPath(); + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + + try (PersistentVectorStore store = newStore(tmpDir, dsm)) { + store.start(); + seedSegments(store, 3); + + List keysBefore = store.getSegmentStorageKeysSnapshot(); + int countBefore = store.getSegmentCount(); + assertTrue("setup must produce ≥1 segment", countBefore >= 1); + assertEquals(countBefore, keysBefore.size()); + + String victim = keysBefore.get(0); + + // Force a checkpoint AFTER the drop so we can observe the dirty + // flag effect: clear the dirty flag first by performing a no-op + // checkpoint (which only writes when something has changed + // since the last save). If we drop then checkpoint, the + // smaller list must be persisted — verified by re-reading + // getSegmentStorageKeysSnapshot. + store.checkpoint(); // baseline; segment list unchanged + + AbstractVectorStore.SegmentDropResult result = + store.dropSegmentByStorageKey(victim); + + assertTrue("happy path must report removed=true", result.removed); + assertTrue("vectorsLost must be ≥ 0 on the happy path, got " + result.vectorsLost, + result.vectorsLost >= 0L); + + List keysAfter = store.getSegmentStorageKeysSnapshot(); + assertEquals("segment count must drop by one", countBefore - 1, keysAfter.size()); + assertFalse("victim must be gone from snapshot", keysAfter.contains(victim)); + + // The next checkpoint must actually serialise the change — + // a regression where the dirty flag is not set would leave + // the on-disk IndexStatus carrying the deleted segment. + store.checkpoint(); + assertEquals("post-checkpoint list still matches", + countBefore - 1, store.getSegmentCount()); + } + } + + /** + * (b) Idempotent NOT_FOUND on a second call: dropping the same key + * twice in a row must NOT throw and must NOT mutate state. The + * production CLI relies on this — wrapper scripts can be re-run after + * a partial failure without worrying about double-drop semantics. + */ + @Test(timeout = 30_000) + public void doubleDropReportsNotFoundWithoutMutatingState() throws Exception { + Path tmpDir = tmpFolder.newFolder("admin-delete-idempotent").toPath(); + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + + try (PersistentVectorStore store = newStore(tmpDir, dsm)) { + store.start(); + seedSegments(store, 2); + + List keysBefore = store.getSegmentStorageKeysSnapshot(); + assertTrue(keysBefore.size() >= 1); + String victim = keysBefore.get(0); + + // First call: removes the segment. + AbstractVectorStore.SegmentDropResult first = + store.dropSegmentByStorageKey(victim); + assertTrue(first.removed); + + int countAfterFirst = store.getSegmentCount(); + + // Second call: idempotent NOT_FOUND. State is unchanged. + AbstractVectorStore.SegmentDropResult second = + store.dropSegmentByStorageKey(victim); + assertFalse("second drop must report removed=false", second.removed); + assertEquals("NOT_FOUND vectorsLost must be 0", + 0L, second.vectorsLost); + assertEquals("segment count must NOT change on the second call", + countAfterFirst, store.getSegmentCount()); + + // The result is the documented sentinel. + assertEquals("second call must return the NOT_FOUND sentinel", + AbstractVectorStore.SegmentDropResult.NOT_FOUND.removed, + second.removed); + } + } + + /** + * (c) Deferred-close drain: the {@code pendingSegmentCloses} queue + * must eventually drain after the drop. With no compaction running, + * the opportunistic drain in {@code dropSegmentByStorageKey}'s + * {@code finally} block should empty the queue synchronously. + */ + @Test(timeout = 30_000) + public void pendingSegmentClosesQueueDrainsAfterDrop() throws Exception { + Path tmpDir = tmpFolder.newFolder("admin-delete-drain").toPath(); + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + + try (PersistentVectorStore store = newStore(tmpDir, dsm)) { + store.start(); + seedSegments(store, 2); + + // Idle state: queue must start empty. + assertEquals("queue must be empty before the drop", + 0, store.getPendingSegmentClosesSizeForTest()); + + List keysBefore = store.getSegmentStorageKeysSnapshot(); + assertTrue(keysBefore.size() >= 1); + String victim = keysBefore.get(0); + + AbstractVectorStore.SegmentDropResult result = + store.dropSegmentByStorageKey(victim); + assertTrue(result.removed); + + // After the drop, with no concurrent compaction holding + // compactionLock, the opportunistic drain must have emptied + // the queue. The drain runs in the same call's `finally` + // block, so it is synchronous from the caller's POV. + assertEquals("opportunistic drain must have closed the queued segment " + + "since no compaction was running", + 0, store.getPendingSegmentClosesSizeForTest()); + } + } + + /** + * (d) Compaction-race safety: when a cycle is mid-iteration (after + * candidate selection, before {@code rebuildSegment}), a concurrent + * {@code dropSegmentByStorageKey} on a candidate must NOT cause + * {@code CompactionException(CORRUPTION)}. This is the same + * invariant issue #535 fixed for {@code dropSegmentByUuid}; the + * storage-key path inherits the same deferred-close protocol so the + * same test shape applies. + */ + @Test(timeout = 30_000) + public void dropDuringCompactionMustNotCauseCorruption() throws Exception { + Path tmpDir = tmpFolder.newFolder("admin-delete-race").toPath(); + MemoryDataStorageManager dsm = new MemoryDataStorageManager(); + + try (PersistentVectorStore store = newStore(tmpDir, dsm)) { + store.start(); + seedSegments(store, 5); + + List keysBefore = store.getSegmentStorageKeysSnapshot(); + assertTrue("setup must produce ≥2 segments", keysBefore.size() >= 2); + String victim = keysBefore.get(0); + + long corruptionBefore = store.getCompactionFailuresCorruptionTotal(); + + // Wire the hook to fire the storage-key drop inside the cycle, + // AFTER candidates are picked but BEFORE rebuildSegment reads + // seg.onDiskGraph. Pre-issue-#535 the synchronous close inside + // the drop would have nulled out onDiskGraph and tripped a + // CORRUPTION failure; the deferred-close protocol queues the + // close until after the cycle's iteration completes. + store.setCompactionPostCandidateSelectionHookForTest(() -> { + AbstractVectorStore.SegmentDropResult r = + store.dropSegmentByStorageKey(victim); + // Hook drops the victim synchronously — the cycle still + // sees a non-null onDiskGraph because the close was + // deferred to the cycle's finally block. + assertNotNull("hook drop must have produced a result", r); + }); + + store.runCompactionCycle(); + + // CORE INVARIANT (the fix this code inherits from issue #535): + // NO CORRUPTION failure must have been recorded. + assertEquals("no CORRUPTION failure must be recorded — the storage-key" + + " drop path inherits issue #535's deferred-close protocol", + corruptionBefore, store.getCompactionFailuresCorruptionTotal()); + + // The victim must be gone from the active list — the drop + // already removed it under the writeLock before the hook + // returned. + assertFalse("victim must be removed from the active segment list", + store.getSegmentStorageKeysSnapshot().contains(victim)); + + // By the time runCompactionCycle returns, the deferred close + // must have drained. + assertEquals("pendingSegmentCloses queue must drain by the end of the" + + " cycle", + 0, store.getPendingSegmentClosesSizeForTest()); + } + } +} diff --git a/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java b/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java index adb260041..d7284db3a 100644 --- a/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java +++ b/herddb-remote-file-service/src/main/java/herddb/remote/PromotableRemoteFileDataStorageManager.java @@ -362,4 +362,20 @@ public void deleteMultipartIndexFile(String tableSpace, String uuid, String file throws DataStorageManagerException { activeDelegate.deleteMultipartIndexFile(tableSpace, uuid, fileType); } + + /** + * Issue #617 + pr-reviewer follow-up #5: delegate the multipart presence + * probe to the {@link #activeDelegate}. The default + * {@link DataStorageManager#multipartIndexFileExists} fallback would still + * work after the 1 GiB sentinel fix in the base class, but it routes the + * probe through {@link #multipartIndexReaderSupplier} and a synthetic + * {@code readInt()} — which is slower and noisier than letting the + * delegate (typically a {@link RemoteFileDataStorageManager} or a + * {@link ReadReplicaDataStorageManager}) handle the probe with its own + * backend-specific shortcut. + */ + @Override + public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) { + return activeDelegate.multipartIndexFileExists(tableSpace, uuid, fileType); + } } diff --git a/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java b/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java index 4fdf351e3..59fcc605a 100644 --- a/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java +++ b/herddb-remote-file-service/src/main/java/herddb/remote/ReadReplicaDataStorageManager.java @@ -133,6 +133,33 @@ public void deleteMultipartIndexFile(String tableSpace, String uuid, String file throw readOnly("deleteMultipartIndexFile"); } + /** + * Issue #617 + pr-reviewer follow-up #5: same {@code readFileRange} + * shortcut as {@link RemoteFileDataStorageManager#multipartIndexFileExists} + * — the default supplier-based fallback would also work (now that the base + * class uses a 1 GiB sentinel for the synthetic {@code fileSize}), but it + * routes the 4-byte probe through {@link RemoteRandomAccessReader} which + * is heavier than a direct block-range read. A null/empty response means + * block-0 is absent. + */ + @Override + public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) { + String logicalPath = remoteMultipartPath(tableSpace, uuid, fileType); + int blockSize = client.getBlockSize(); + try { + byte[] head = client.readFileRange(logicalPath, 0L, 4, blockSize); + return head != null && head.length > 0; + } catch (RuntimeException e) { + // Some backends wrap NoSuchKey-style errors as unchecked + // exceptions; treat any failure during the probe as "missing" + // per the base contract. + LOGGER.log(Level.FINE, + "multipartIndexFileExists: probe failed for {0}: {1}", + new Object[]{logicalPath, e.getMessage()}); + return false; + } + } + // ------------------------------------------------------------------------- // Page deserialization (matches FileDataStorageManager format) // ------------------------------------------------------------------------- diff --git a/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java b/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java index 27efcb4b5..e124b9dbf 100644 --- a/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java +++ b/herddb-remote-file-service/src/main/java/herddb/remote/RemoteFileDataStorageManager.java @@ -1045,6 +1045,46 @@ public io.github.jbellis.jvector.disk.ReaderSupplier multipartIndexReaderSupplie readerStatsLogger, segmentBlockCache); } + /** + * Issue #617 + pr-reviewer follow-up #2: backend-specific override of the + * {@code multipartIndexFileExists} probe. The default + * {@link DataStorageManager#multipartIndexFileExists} implementation + * builds a {@link RemoteRandomAccessReader} with a fictitious + * {@code fileSize=1L} and then calls {@code readInt()} on it — that + * combination triggers an infinite loop in + * {@link RemoteRandomAccessReader#readFully(byte[])} because the loop's + * {@code toCopy} becomes zero once {@code position} matches the fake + * {@code totalSize}. We bypass the reader entirely and probe block-0 + * directly via the file-server client's {@code readFileRange} API, + * which returns {@code null} when the underlying block is absent. + * + *

4 bytes is the same probe length the base contract documents (the + * default impl does {@code readInt()}), so the contract gap that + * {@code --force} covers — a block-0 of ≥4 bytes is enough to make the + * probe return {@code true} even on a truncated upload — is preserved. + */ + @Override + public boolean multipartIndexFileExists(String tableSpace, String uuid, String fileType) { + String logicalPath = remoteMultipartPath(tableSpace, uuid, fileType); + int blockSize = Math.max(client.getBlockSize(), MULTIPART_BLOCK_SIZE); + try { + byte[] head = client.readFileRange(logicalPath, 0L, 4, blockSize); + // RemoteFileServiceClient.readFileRange returns null for missing + // blocks (see RemoteFileServiceTest.testWriteFileBlockAndReadFileRange). + // A non-null result with ≥ 1 byte means block-0 is reachable — + // matches the base contract's "best-effort presence check". + return head != null && head.length > 0; + } catch (RuntimeException e) { + // Some backends wrap NoSuchKey-style errors in unchecked + // exceptions; treat any failure during the probe as "missing" + // per the contract. + LOGGER.log(Level.FINE, + "multipartIndexFileExists: probe failed for {0}: {1}", + new Object[]{logicalPath, e.getMessage()}); + return false; + } + } + @Override public void deleteMultipartIndexFile(String tableSpace, String uuid, String fileType) throws DataStorageManagerException { diff --git a/herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java b/herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java new file mode 100644 index 000000000..26419db92 --- /dev/null +++ b/herddb-remote-file-service/src/test/java/herddb/remote/RemoteFileDataStorageManagerProbeAndPurgeTest.java @@ -0,0 +1,279 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package herddb.remote; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * pr-reviewer follow-up #2 for issue #617: dedicated probe-and-purge contract + * tests for {@link RemoteFileDataStorageManager#multipartIndexFileExists} + * (the safety gate the {@code DeleteSegment} engine path keys off) and + * {@link RemoteFileDataStorageManager#deleteMultipartIndexFile} (the + * {@code --purge-storage} purger). + * + *

The matrix: + *

    + *
  • (a) probe returns {@code false} when the multipart object is + * absent — the gate must not block a delete the operator initiated for + * an already-orphaned segment record;
  • + *
  • (b) probe returns {@code true} when the multipart object is + * fully present — the gate must protect the operator from deleting a + * healthy segment;
  • + *
  • (c) {@code deleteMultipartIndexFile} is idempotent and + * invalidates the {@link SegmentBlockCache} so a subsequent reader on + * the same logical path cannot serve stale bytes;
  • + *
  • (d) probe still returns {@code true} on a block-0 that is at + * least 4 bytes long but otherwise truncated/corrupted — this + * documents the contract gap that the operator's {@code --force} flag + * covers (the gate cannot distinguish "complete file" from "block-0 + * only", so {@code --force} remains required when the rest of the + * file is known to be corrupt).
  • + *
+ * + *

End-to-end coverage of {@code DeleteSegment} that exercises this gate + * indirectly lives in {@code herddb.indexing.ShadowDeleteSegmentE2ETest} + * (in the indexing-service module). + */ +public class RemoteFileDataStorageManagerProbeAndPurgeTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private RemoteFileServer server; + private RemoteFileServiceClient client; + private RemoteFileDataStorageManager storage; + + @Before + public void setUp() throws Exception { + server = new RemoteFileServer(0, folder.newFolder("remote").toPath()); + server.start(); + client = new RemoteFileServiceClient(Arrays.asList("localhost:" + server.getPort())); + storage = new RemoteFileDataStorageManager( + folder.newFolder("metadata").toPath(), + folder.newFolder("tmp").toPath(), + 1000, + client); + storage.start(); + } + + @After + public void tearDown() throws Exception { + if (storage != null) { + storage.close(); + } + if (client != null) { + client.close(); + } + if (server != null) { + server.stop(); + } + } + + /** + * Writes a synthetic graph file via the high-level multipart writer so + * the produced object matches exactly what {@code PersistentVectorStore} + * would have written at Phase B time (issue #617 reproduction shape). + */ + private long writeMultipartGraph(String tableSpace, String uuid, byte[] data) throws Exception { + Path tempFile = folder.newFile("graph-" + uuid + ".bin").toPath(); + Files.write(tempFile, data); + try { + storage.writeMultipartIndexFile(tableSpace, uuid, "graph", tempFile, null); + } finally { + Files.deleteIfExists(tempFile); + } + return data.length; + } + + /** + * (a) absent object → probe returns {@code false}. This is the path the + * issue #617 reproduction hits when the Phase B upload failed before + * block-0 was ever published: the IS metadata still references the + * segment, but the remote storage has no object for it. With the gate + * keying off this probe, {@code DeleteSegment} with {@code force=false} + * is accepted because the safety check (graph file IS reachable) does + * not fire. + */ + @Test(timeout = 30_000) + public void probeReturnsFalseWhenObjectIsAbsent() { + assertFalse("probe must return false when no multipart object exists", + storage.multipartIndexFileExists("ts1", "uuid-absent", "graph")); + } + + /** + * (b) fully-present object → probe returns {@code true}. This is the + * happy path: the gate must protect the operator from deleting a + * healthy segment without going through {@code --force}. + */ + @Test(timeout = 60_000) + public void probeReturnsTrueWhenObjectIsFullyPresent() throws Exception { + // Multi-block payload so the probe does not just inspect a partial + // block-0 — exactly matches the production "complete graph file" + // contract that the gate is supposed to detect. + byte[] payload = new byte[64 * 1024]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) (i & 0xFF); + } + writeMultipartGraph("ts1", "uuid-present", payload); + assertTrue("probe must return true when the multipart object exists", + storage.multipartIndexFileExists("ts1", "uuid-present", "graph")); + } + + /** + * (c) {@code deleteMultipartIndexFile} is idempotent and invalidates + * the block cache. We verify: + *

    + *
  1. two back-to-back calls on the same key succeed (the second + * must not throw — the multipart files are already gone);
  2. + *
  3. a block populated in the {@link SegmentBlockCache} via a real + * read is removed from the cache by the delete call, so a + * subsequent reader on the same logical path cannot serve stale + * bytes from a rewritten segment that happens to hit the same + * key.
  4. + *
+ * + *

{@link SegmentBlockCache} is {@code final}, so we install a real + * one with a small capacity and assert via the public + * {@link SegmentBlockCache#containsBlock(String, long, int)} accessor + * — that is exactly the same surface used in production to decide + * whether a block load is needed. + */ + @Test(timeout = 60_000) + public void deleteMultipartIndexFileIsIdempotentAndInvalidatesCache() throws Exception { + byte[] payload = new byte[8 * 1024]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) ((i * 31) & 0xFF); + } + writeMultipartGraph("ts1", "uuid-purge", payload); + assertTrue("precondition: object must exist before purge", + storage.multipartIndexFileExists("ts1", "uuid-purge", "graph")); + + // Install a real (enabled) SegmentBlockCache so we can observe the + // invalidation effect via the public containsBlock() accessor. + // 1 MiB capacity is plenty for the 8 KiB payload. + SegmentBlockCache cache = new SegmentBlockCache(1L * 1024 * 1024); + storage.setSegmentBlockCache(cache, + org.apache.bookkeeper.stats.NullStatsLogger.INSTANCE); + + // Force a block to land in the cache by running a real read. The + // RemoteRandomAccessReader populates the cache on first access via + // the supplier returned by multipartIndexReaderSupplier. + io.github.jbellis.jvector.disk.ReaderSupplier rs = + storage.multipartIndexReaderSupplier("ts1", "uuid-purge", "graph", + (long) payload.length); + try (io.github.jbellis.jvector.disk.RandomAccessReader r = rs.get()) { + r.seek(0L); + r.readInt(); // pulls block-0 into the cache + } + String logicalPath = "ts1/uuid-purge/multipart/graph"; + // The cache may use a writeBlockSize different from payload.length, + // so we cannot pin down the exact (offset, length) pair without + // duplicating the production block-sizing math. Instead, walk a + // small set of candidate block sizes via the public containsBlock + // helper — at least one of them must hit, otherwise the read + // above did not populate the cache and the test cannot prove the + // invalidation contract. + int[] candidateLengths = new int[]{ + 4 * 1024 * 1024, 1024 * 1024, 64 * 1024, payload.length + }; + boolean wasCached = false; + for (int len : candidateLengths) { + if (cache.containsBlock(logicalPath, 0L, len)) { + wasCached = true; + break; + } + } + assertTrue("precondition: a block must be cached after the read," + + " otherwise the invalidation assertion is vacuous", + wasCached); + + // First delete must succeed AND invalidate the cache. + storage.deleteMultipartIndexFile("ts1", "uuid-purge", "graph"); + assertFalse("object must be gone after delete", + storage.multipartIndexFileExists("ts1", "uuid-purge", "graph")); + for (int len : candidateLengths) { + assertFalse("cached block at length=" + len + + " must be evicted after deleteMultipartIndexFile", + cache.containsBlock(logicalPath, 0L, len)); + } + + // Second delete must also succeed (idempotent) — no exception, no + // surfaced error. The implementation logs at WARNING but does not + // propagate. + storage.deleteMultipartIndexFile("ts1", "uuid-purge", "graph"); + assertFalse("object must remain gone after the second delete call", + storage.multipartIndexFileExists("ts1", "uuid-purge", "graph")); + } + + /** + * (d) Documents the contract gap that {@code --force} covers: when + * block-0 is present but truncated/corrupted, the probe still returns + * {@code true}. The default probe reads the first 4 bytes via the + * multipart reader supplier — that succeeds as long as block-0 has at + * least 4 bytes, even if the rest of the file is missing or corrupted. + * + *

This is the exact reproduction of the issue #617 scenario where an + * S3 multipart upload published block-0 then failed on a later block: + * the operator running {@code delete-segment} without {@code --force} + * sees the safety gate trip ("graph file IS reachable"), reads the + * documentation, and re-runs with {@code --force} once they have + * confirmed via the file-server logs that the upload never completed. + */ + @Test(timeout = 30_000) + public void probeReturnsTrueOnPartialBlockZeroAtLeast4Bytes() throws Exception { + // Write just block-0 directly via the lower-level writeFileBlock RPC + // — that bypasses the multipart writer's "all blocks or none" + // semantics and reproduces the Phase B "upload failed after + // block-0" failure mode the issue #617 operator path covers. + // Must match remoteMultipartPath: tableSpace/uuid/multipart/fileType. + String logicalPath = "ts1/uuid-partial/multipart/graph"; + byte[] partial = new byte[16]; // > 4 bytes; the probe reads 4 bytes + for (int i = 0; i < partial.length; i++) { + partial[i] = (byte) i; + } + client.writeFileBlock(logicalPath, 0L, partial); + + // Sanity: the multipart writer would have written multiple blocks + // for a real graph file. Here only block-0 is present — yet the + // probe must still return true because it only inspects block-0. + assertTrue("probe returns true on a partial block-0 — documents the" + + " contract gap that --force covers (issue #617)", + storage.multipartIndexFileExists("ts1", "uuid-partial", "graph")); + + // Round-tripping the partial bytes through writeMultipartFile is + // not possible (the input stream framing would always produce a + // matching file); this direct block-0 write is the only way to + // produce a "partial" state in the test harness, and it mirrors + // exactly what an S3 backend produces on a partial multipart + // upload failure. + assertNotNull("client must still be wired for follow-up purge calls", client); + } + +}