diff --git a/jvector-examples/src/main/java/io/github/jbellis/jvector/example/benchmarks/datasets/DataSetLoaderSimpleMFD.java b/jvector-examples/src/main/java/io/github/jbellis/jvector/example/benchmarks/datasets/DataSetLoaderSimpleMFD.java index 5582e27e8..0ab427b76 100644 --- a/jvector-examples/src/main/java/io/github/jbellis/jvector/example/benchmarks/datasets/DataSetLoaderSimpleMFD.java +++ b/jvector-examples/src/main/java/io/github/jbellis/jvector/example/benchmarks/datasets/DataSetLoaderSimpleMFD.java @@ -42,11 +42,14 @@ import java.nio.file.StandardCopyOption; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; /// A dataset loader that works with fvec/ivec datasets described by YAML catalog files @@ -77,7 +80,13 @@ /// is fetched and its raw contents are cached locally in a hidden snapshot file for offline use. /// On each run, the effective included entries are rebuilt by applying the local /// {@code _defaults} to the fetched (or cached) remote entries. Local entries in the same -/// wrapper file are processed afterward and therefore take precedence over included remote entries. +/// wrapper file are processed afterward and may take precedence over included remote entries, +/// **but only when the override does not change the resolved local path** (i.e. {@code cache_dir} +/// and the {@code base}/{@code query}/{@code gt} filenames must match across all catalog files +/// contributing the same entry). Override is intended for things like changing {@code base_url} +/// to point at a private mirror; it is not a mechanism for redirecting a dataset to a different +/// physical file. Any conflict in the resolved local path is rejected at construction time +/// as a critical error — see [#validateCatalogPaths()]. /// This lets a single local file act as a thin configuration wrapper around a remote catalog: /// ```yaml /// _defaults: @@ -265,10 +274,24 @@ private static class CatalogEntry { private final DataSetMetadataReader metadata; private final HttpClient httpClient; + /// Per-(dataset, facet) record of every catalog file that contributed a resolved local + /// path during loading. Populated while catalog files are being loaded, then consumed by + /// [#validateCatalogPaths()] which enforces that no two catalog files disagree on the path + /// for a given (dataset, facet) and that no two distinct (dataset, facet) pairs resolve to + /// the same local path. + private final Map>> pathContributions = new HashMap<>(); + // S3 instances for connection pooling private S3AsyncClient s3Client; private S3TransferManager s3TransferManager; + /// Tracks in-flight downloads keyed by their normalized local destination path so that + /// concurrent {@link #loadDataSet} calls referencing the same local file (e.g. two datasets + /// that legitimately share a query facet) coalesce into a single download instead of racing. + /// Entries are removed when the underlying future completes, regardless of outcome, so + /// failures may be retried by a later caller and the map does not grow without bound. + private final ConcurrentHashMap> downloadsInFlight = new ConcurrentHashMap<>(); + /// Creates a local-only loader that recursively discovers all {@code .yaml}/{@code .yml} /// files under the given path. /// @@ -374,6 +397,9 @@ public DataSetLoaderSimpleMFD(String catalogUrl, String localPath, boolean check var remoteCatalogData = fetchRemoteCatalogRaw(catalogUrl); this.catalog = toCatalogEntries(remoteCatalogData, localCacheDir); saveCatalogLocally(localCatalog, catalogUrl, remoteCatalogData); + for (var entry : this.catalog.entrySet()) { + recordPathContributions(localCatalog, entry.getKey(), entry.getValue()); + } } } else { if (!localEntries.isEmpty()) { @@ -383,6 +409,8 @@ public DataSetLoaderSimpleMFD(String catalogUrl, String localPath, boolean check this.catalog = Map.of(); } } + + validateCatalogPaths(); } @Override @@ -405,11 +433,13 @@ public Optional loadDataSet(String dataSetName) { String effectiveBaseUrl = entry.baseUrl != null ? entry.baseUrl : remoteBasePath; Path effectiveCacheDir = entry.cacheDir; - // Execute downloads simultaneously to maximize network bandwidth + // Execute downloads simultaneously to maximize network bandwidth. Each call coalesces + // with any in-flight download for the same target path so concurrent loadDataSet calls + // for datasets that share a file do not race on the destination. try { - var f1 = CompletableFuture.runAsync(() -> ensureQuietly(baseFile, effectiveCacheDir, effectiveBaseUrl)); - var f2 = CompletableFuture.runAsync(() -> ensureQuietly(queryFile, effectiveCacheDir, effectiveBaseUrl)); - var f3 = CompletableFuture.runAsync(() -> ensureQuietly(gtFile, effectiveCacheDir, effectiveBaseUrl)); + var f1 = ensureFileAvailableAsync(baseFile, effectiveCacheDir, effectiveBaseUrl); + var f2 = ensureFileAvailableAsync(queryFile, effectiveCacheDir, effectiveBaseUrl); + var f3 = ensureFileAvailableAsync(gtFile, effectiveCacheDir, effectiveBaseUrl); CompletableFuture.allOf(f1, f2, f3).join(); } catch (Exception e) { @@ -536,7 +566,9 @@ private void loadCatalogEntries(Path catalogFile, Map targ fields.putAll(e.getValue()); } - putCatalogEntry(target, name, buildCatalogEntry(fields, catalogDir, source)); + CatalogEntry built = buildCatalogEntry(fields, catalogDir, source); + putCatalogEntry(target, name, built); + recordPathContributions(catalogFile, name, built); } } @@ -584,7 +616,9 @@ private void loadRemoteInclude(String includeUrl, Map defaults, fields.put("base_url", remoteBase); } - putCatalogEntry(target, e.getKey(), buildCatalogEntry(fields, catalogDir, CatalogSource.INCLUDED_REMOTE)); + CatalogEntry built = buildCatalogEntry(fields, catalogDir, CatalogSource.INCLUDED_REMOTE); + putCatalogEntry(target, e.getKey(), built); + recordPathContributions(cachedIncludeFile, e.getKey(), built); } logger.info("Included {} datasets from {} catalog", entryCount, @@ -690,6 +724,142 @@ private static String expandEnvVars(String value) { return sb.toString(); } + // ======================================================================================== + // PATH CONFLICT VALIDATION + // ======================================================================================== + + /// Facet fields whose values name a data file resolved relative to the entry's cache directory. + private static final List FILE_FACETS = List.of("base", "query", "gt"); + + /// Records, for a single (dataset, facet), which catalog file contributed the entry and the + /// normalized local path that would be used to open the file (the cache directory resolved + /// against the facet's filename, with redundant {@code .} and {@code ..} segments collapsed). + private static class PathContribution { + final Path sourceCatalog; + final Path normalizedPath; + + PathContribution(Path sourceCatalog, Path normalizedPath) { + this.sourceCatalog = sourceCatalog; + this.normalizedPath = normalizedPath; + } + } + + /// Records every facet of the given entry as a contribution from {@code sourceCatalog}. + /// The recorded path is {@code cacheDir.resolve(filename).normalize()} — the same form + /// later used when opening the file, viewed with the cache directory as the parent. + /// Contributions are validated after all catalog files have been loaded. + private void recordPathContributions(Path sourceCatalog, String dataset, CatalogEntry entry) { + for (String facet : FILE_FACETS) { + String filename = entry.fields.get(facet); + if (filename == null) continue; + Path normalized = entry.cacheDir.resolve(filename).normalize(); + pathContributions + .computeIfAbsent(dataset, k -> new HashMap<>()) + .computeIfAbsent(facet, k -> new ArrayList<>()) + .add(new PathContribution(sourceCatalog, normalized)); + } + } + + /// Validates the assembled view of dataset facets across all discovered catalog files. + /// Two conditions are checked with facet-dependent severity: + /// + /// 1. **Varying path for the same (dataset, facet)** — always a critical error. Two catalog + /// files that both contribute the same (dataset, facet) must agree on the resolved local + /// path; otherwise file selection is non-deterministic across walks. + /// 2. **Same path reused across distinct (dataset, facet) pairs** — severity depends on + /// which facets share the path: + /// - `query`-only sharing is permitted with a warning (e.g. reusing one query set + /// across datasets is a legitimate workflow). + /// - Any sharing that involves `gt` (ground truth) or `base` is a critical error. + /// Ground truth is dataset-specific by definition. Base files would only legitimately + /// be shared via a row-range specifier that distinguishes which slice each dataset + /// uses; this loader does not yet support range specifiers, so a shared base path + /// between distinct datasets is always an error. + private void validateCatalogPaths() { + // 1. detect varying paths for the same (dataset, facet) — critical + for (var datasetEntry : pathContributions.entrySet()) { + String dataset = datasetEntry.getKey(); + for (var facetEntry : datasetEntry.getValue().entrySet()) { + String facet = facetEntry.getKey(); + var contribs = facetEntry.getValue(); + var distinct = new java.util.HashSet(); + for (var c : contribs) { + distinct.add(c.normalizedPath); + } + if (distinct.size() > 1) { + StringBuilder sb = new StringBuilder(); + sb.append("Conflicting local paths for dataset '").append(dataset) + .append("' facet '").append(facet) + .append("'. All catalog files that name this facet must resolve it to the same path:"); + for (var c : contribs) { + sb.append("\n ").append(redact(c.sourceCatalog)) + .append(" -> ").append(redact(c.normalizedPath)); + } + throw new IllegalStateException(sb.toString()); + } + } + } + + // 2. detect the same path being used by more than one (dataset, facet) pair. + // Build reverse index path -> list of owners so each shared path is reported once + // with every owner. Severity is determined by the facets involved: query-only + // collisions warn; any collision involving base or gt is a hard error. + Map> ownersByPath = new HashMap<>(); + for (var datasetEntry : pathContributions.entrySet()) { + String dataset = datasetEntry.getKey(); + for (var facetEntry : datasetEntry.getValue().entrySet()) { + String facet = facetEntry.getKey(); + // by check 1 above, all contributions share the same path; use the first + var first = facetEntry.getValue().get(0); + ownersByPath + .computeIfAbsent(first.normalizedPath, k -> new ArrayList<>()) + .add(new OwnerRef(dataset, facet, first.sourceCatalog)); + } + } + + List hardErrors = new ArrayList<>(); + for (var e : ownersByPath.entrySet()) { + List owners = e.getValue(); + if (owners.size() <= 1) continue; + + boolean involvesBaseOrGt = owners.stream() + .anyMatch(o -> "base".equals(o.facet) || "gt".equals(o.facet)); + + StringBuilder sb = new StringBuilder(); + sb.append("Local path ").append(redact(e.getKey())) + .append(" is referenced by multiple dataset facets:"); + for (OwnerRef o : owners) { + sb.append("\n ").append(o.dataset).append(":").append(o.facet) + .append(" (from ").append(redact(o.sourceCatalog)).append(")"); + } + + if (involvesBaseOrGt) { + hardErrors.add(sb.toString()); + } else { + logger.warn(sb.toString()); + } + } + + if (!hardErrors.isEmpty()) { + throw new IllegalStateException( + "Local path conflicts detected (sharing is only permitted between 'query' facets):\n" + + String.join("\n", hardErrors)); + } + } + + /// One owner of a shared local path, used to format collision messages. + private static class OwnerRef { + final String dataset; + final String facet; + final Path sourceCatalog; + + OwnerRef(String dataset, String facet, Path sourceCatalog) { + this.dataset = dataset; + this.facet = facet; + this.sourceCatalog = sourceCatalog; + } + } + // ======================================================================================== // FILE AVAILABILITY // ======================================================================================== @@ -702,6 +872,29 @@ private void ensureQuietly(String filename, Path cacheDir, String baseUrl) { } } + /// Returns a future that completes when {@code filename} is available locally under + /// {@code cacheDir}, coalescing duplicate concurrent requests for the same target path. + /// If another caller is already downloading the same file, this returns that caller's + /// future instead of starting a second download — preventing wasted bandwidth and, more + /// importantly, avoiding concurrent writes to the same destination path (which the S3 + /// transfer manager and the HTTP retry-and-delete logic do not handle safely). + /// The map entry is cleared on completion (success or failure) so a failed download can + /// be retried by a subsequent call and so the map does not retain references indefinitely. + private CompletableFuture ensureFileAvailableAsync(String filename, Path cacheDir, String baseUrl) { + Path key = cacheDir.resolve(filename).normalize(); + // The future stored in the map AND returned to callers is the whenCompleteAsync stage so + // that waiters who join it are guaranteed to observe the post-completion cleanup. + // whenCompleteAsync (rather than whenComplete) is required: when ensureQuietly is a + // no-op because the file already exists, the runAsync future can complete before + // .whenComplete is attached, in which case a plain whenComplete would fire the cleanup + // synchronously inside the computeIfAbsent remapping function and ConcurrentHashMap + // would throw "Recursive update". The async variant always defers the cleanup to an + // executor thread, which is then free to mutate the map. + return downloadsInFlight.computeIfAbsent(key, p -> + CompletableFuture.runAsync(() -> ensureQuietly(filename, cacheDir, baseUrl)) + .whenCompleteAsync((r, ex) -> downloadsInFlight.remove(p))); + } + /// Ensures a dataset file is available locally. Checks in the entry's cache directory first. /// If not found and a remote base URL is available (either per-entry or loader-level), /// downloads the file. diff --git a/jvector-examples/src/test/java/io/github/jbellis/jvector/example/benchmarks/datasets/DataSetLoaderSimpleMFDTest.java b/jvector-examples/src/test/java/io/github/jbellis/jvector/example/benchmarks/datasets/DataSetLoaderSimpleMFDTest.java index 33379dd57..4a2beccd6 100644 --- a/jvector-examples/src/test/java/io/github/jbellis/jvector/example/benchmarks/datasets/DataSetLoaderSimpleMFDTest.java +++ b/jvector-examples/src/test/java/io/github/jbellis/jvector/example/benchmarks/datasets/DataSetLoaderSimpleMFDTest.java @@ -17,6 +17,8 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; +import io.github.jbellis.jvector.example.benchmarks.datasets.DataSetLoaderSimpleMFD; +import io.github.jbellis.jvector.example.benchmarks.datasets.DataSetMetadataReader; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -24,11 +26,19 @@ import java.io.IOException; import java.io.OutputStream; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.*; @@ -441,12 +451,14 @@ public void subdirectoryDataFilesResolveRelativeToTheirCatalog() throws IOExcept } @Test - public void duplicateEntryAcrossCatalogsDoesNotFail() throws IOException { - // root catalog defines test-ds + public void duplicateEntryAcrossCatalogsWithConflictingPathsFailsAtConstruction() throws IOException { + // root catalog defines test-ds with files cached under cacheDir writeTestCatalog(cacheDir); - writeTestDataFiles(cacheDir); - // subdirectory also defines test-ds — one wins (walk order is unspecified) + // subdirectory also defines test-ds but its files resolve to a different physical + // location (cache_dir defaults to the catalog file's own directory). Walk order + // would silently determine which physical file backs the dataset, so the loader + // must reject this rather than picking one non-deterministically. Path subDir = cacheDir.resolve("override"); Files.createDirectories(subDir); Files.writeString(subDir.resolve("catalog_entries.yaml"), @@ -454,18 +466,165 @@ public void duplicateEntryAcrossCatalogsDoesNotFail() throws IOException { " base: test_base.fvecs\n" + " query: test_query.fvecs\n" + " gt: test_gt.ivecs\n"); - writeTestDataFiles(subDir); + + var failure = assertThrows(IllegalStateException.class, () -> new DataSetLoaderSimpleMFD( + null, cacheDir.toString(), false, testMetadata + )); + assertTrue(failure.getMessage().contains("test-ds"), + "error should name the conflicting dataset: " + failure.getMessage()); + } + + @Test + public void duplicateEntryAcrossCatalogsWithIdenticalPathsSucceeds() throws IOException { + // two catalog files declaring the same dataset are fine as long as every (dataset, facet) + // resolves to the same local path — there is nothing for walk order to disagree about. + Path sharedDataDir = tempFolder.newFolder("shared-data").toPath(); + Files.writeString(cacheDir.resolve("catalog-a.yaml"), + "test-ds:\n" + + " cache_dir: " + sharedDataDir + "\n" + + " base: test_base.fvecs\n" + + " query: test_query.fvecs\n" + + " gt: test_gt.ivecs\n"); + Files.writeString(cacheDir.resolve("catalog-b.yaml"), + "test-ds:\n" + + " cache_dir: " + sharedDataDir + "\n" + + " base: test_base.fvecs\n" + + " query: test_query.fvecs\n" + + " gt: test_gt.ivecs\n"); + writeTestDataFiles(sharedDataDir); var loader = new DataSetLoaderSimpleMFD( null, cacheDir.toString(), false, testMetadata ); - - // should load without error — whichever catalog wins, the dataset is valid var ds = loader.loadDataSet("test-ds").orElseThrow().getDataSet(); - assertNotNull(ds); assertEquals(5, ds.getBaseVectors().size()); } + @Test + public void sharedQueryFileAcrossDatasetsIsPermitted() throws IOException { + // sharing a query file across datasets is a legitimate workflow (e.g. comparing several + // base/gt pairs against the same query set) and should not fail construction. + Files.writeString(cacheDir.resolve("catalog_entries.yaml"), + "test-ds:\n" + + " base: test_base.fvecs\n" + + " query: shared_query.fvecs\n" + + " gt: test_gt.ivecs\n" + + "sub-ds:\n" + + " base: sub_base.fvecs\n" + + " query: shared_query.fvecs\n" + + " gt: sub_gt.ivecs\n"); + writeTestFvecs(cacheDir.resolve("test_base.fvecs"), 4, new float[][] { + {1.0f, 0.0f, 0.0f, 0.0f}, + {0.0f, 1.0f, 0.0f, 0.0f}, + }); + writeTestFvecs(cacheDir.resolve("sub_base.fvecs"), 4, new float[][] { + {0.0f, 0.0f, 1.0f, 0.0f}, + {0.0f, 0.0f, 0.0f, 1.0f}, + }); + writeTestFvecs(cacheDir.resolve("shared_query.fvecs"), 4, new float[][] { + {1.0f, 0.0f, 0.0f, 0.0f}, + }); + writeTestIvecs(cacheDir.resolve("test_gt.ivecs"), new int[][] {{0}}); + writeTestIvecs(cacheDir.resolve("sub_gt.ivecs"), new int[][] {{1}}); + + var loader = new DataSetLoaderSimpleMFD( + null, cacheDir.toString(), false, testMetadata + ); + assertEquals(2, loader.loadDataSet("test-ds").orElseThrow().getDataSet().getBaseVectors().size()); + assertEquals(2, loader.loadDataSet("sub-ds").orElseThrow().getDataSet().getBaseVectors().size()); + } + + @Test + public void sharedGroundTruthFileAcrossDatasetsIsRejected() throws IOException { + // ground truth is dataset-specific by definition; sharing one across datasets is wrong. + Files.writeString(cacheDir.resolve("catalog_entries.yaml"), + "test-ds:\n" + + " base: test_base.fvecs\n" + + " query: test_query.fvecs\n" + + " gt: shared_gt.ivecs\n" + + "sub-ds:\n" + + " base: sub_base.fvecs\n" + + " query: sub_query.fvecs\n" + + " gt: shared_gt.ivecs\n"); + + var failure = assertThrows(IllegalStateException.class, () -> new DataSetLoaderSimpleMFD( + null, cacheDir.toString(), false, testMetadata + )); + assertTrue(failure.getMessage().contains("shared_gt.ivecs"), + "error should name the shared gt path: " + failure.getMessage()); + assertTrue(failure.getMessage().contains("test-ds:gt") && failure.getMessage().contains("sub-ds:gt"), + "error should name both colliding owners: " + failure.getMessage()); + } + + @Test + public void sharedBaseFileAcrossDatasetsIsRejected() throws IOException { + // without a row-range specifier (which this loader does not yet support), two datasets + // cannot legitimately share a base file. + Files.writeString(cacheDir.resolve("catalog_entries.yaml"), + "test-ds:\n" + + " base: shared_base.fvecs\n" + + " query: test_query.fvecs\n" + + " gt: test_gt.ivecs\n" + + "sub-ds:\n" + + " base: shared_base.fvecs\n" + + " query: sub_query.fvecs\n" + + " gt: sub_gt.ivecs\n"); + + var failure = assertThrows(IllegalStateException.class, () -> new DataSetLoaderSimpleMFD( + null, cacheDir.toString(), false, testMetadata + )); + assertTrue(failure.getMessage().contains("shared_base.fvecs"), + "error should name the shared base path: " + failure.getMessage()); + assertTrue(failure.getMessage().contains("test-ds:base") && failure.getMessage().contains("sub-ds:base"), + "error should name both colliding owners: " + failure.getMessage()); + } + + @Test + public void sharedPathAcrossDifferentFacetsIsRejected() throws IOException { + // crossing facets — one dataset's base happens to point at another's query path — + // is rejected as soon as any owner is base or gt. + Files.writeString(cacheDir.resolve("catalog_entries.yaml"), + "test-ds:\n" + + " base: crossover.fvecs\n" + + " query: test_query.fvecs\n" + + " gt: test_gt.ivecs\n" + + "sub-ds:\n" + + " base: sub_base.fvecs\n" + + " query: crossover.fvecs\n" + + " gt: sub_gt.ivecs\n"); + + var failure = assertThrows(IllegalStateException.class, () -> new DataSetLoaderSimpleMFD( + null, cacheDir.toString(), false, testMetadata + )); + assertTrue(failure.getMessage().contains("crossover.fvecs"), + "error should name the crossover path: " + failure.getMessage()); + assertTrue(failure.getMessage().contains("test-ds:base") && failure.getMessage().contains("sub-ds:query"), + "error should name both colliding owners: " + failure.getMessage()); + } + + @Test + public void multipleSharedPathErrorsAreAggregated() throws IOException { + // a single exception should describe every hard collision so the user can fix them all + // at once rather than discovering them one round-trip at a time. + Files.writeString(cacheDir.resolve("catalog_entries.yaml"), + "test-ds:\n" + + " base: shared_base.fvecs\n" + + " query: test_query.fvecs\n" + + " gt: shared_gt.ivecs\n" + + "sub-ds:\n" + + " base: shared_base.fvecs\n" + + " query: sub_query.fvecs\n" + + " gt: shared_gt.ivecs\n"); + + var failure = assertThrows(IllegalStateException.class, () -> new DataSetLoaderSimpleMFD( + null, cacheDir.toString(), false, testMetadata + )); + assertTrue(failure.getMessage().contains("shared_base.fvecs"), + "error should describe the base collision: " + failure.getMessage()); + assertTrue(failure.getMessage().contains("shared_gt.ivecs"), + "error should describe the gt collision: " + failure.getMessage()); + } + @Test public void discoversAlternativeCatalogFilenames() throws IOException { // entries.yaml in root @@ -1320,8 +1479,10 @@ public void includeOnlyCatalogLoadsOfflineFromCachedRemoteCatalog() throws IOExc } @Test - public void localCatalogOverridesCachedIncludedRemoteCatalogOffline() throws IOException { - // local dataset should win over a cached included remote dataset of the same name + public void localCatalogOverridingIncludedRemoteCachePathFailsAtConstruction() throws IOException { + // A local catalog that overrides cache_dir for a dataset also present in an _include'd + // remote catalog produces two distinct physical paths for the same (dataset, facet), + // which the loader must reject as a critical configuration error. Path remoteDir = tempFolder.newFolder("remote-catalog").toPath(); Path cachedRemoteDir = tempFolder.newFolder("cached-public-data").toPath(); Path localOverrideDir = tempFolder.newFolder("local-override").toPath(); @@ -1349,28 +1510,14 @@ public void localCatalogOverridesCachedIncludedRemoteCatalogOffline() throws IOE "_defaults:\n" + " cache_dir: " + cachedRemoteDir + "\n"); - // online construction fetches and caches the included remote catalog, - // but the local override should still win - var onlineLoader = new DataSetLoaderSimpleMFD( + var failure = assertThrows(IllegalStateException.class, () -> new DataSetLoaderSimpleMFD( null, cacheDir.toString(), false, testMetadata - ); - - var onlineDs = onlineLoader.loadDataSet("test-ds").orElseThrow().getDataSet(); - assertEquals(1, onlineDs.getBaseVectors().size()); + )); + assertTrue(failure.getMessage().contains("test-ds"), + "error should name the conflicting dataset: " + failure.getMessage()); } finally { server.stop(0); } - - // offline, the cached remote catalog should still not override the real local dataset - var offlineLoader = new DataSetLoaderSimpleMFD( - null, cacheDir.toString(), false, testMetadata - ); - - var offlineDs = offlineLoader.loadDataSet("test-ds").orElseThrow().getDataSet(); - assertEquals(1, offlineDs.getBaseVectors().size()); - assertEquals(1, offlineDs.getQueryVectors().size()); - assertEquals(1, offlineDs.getGroundTruth().size()); - assertEquals(4, offlineDs.getDimension()); } @Test @@ -1414,6 +1561,162 @@ public void cachedIncludedRemoteCatalogStillFailsOfflineWhenDataFilesAreMissing( "Cached remote catalog should still fail when the chosen data files are missing offline"); } + // ======================================================================== + // Concurrent download coalescing + // ======================================================================== + + @Test + public void concurrentLoadDataSetForSharedQueryDownloadsOnlyOnce() throws Exception { + // Two datasets share a query file. Concurrent loadDataSet calls for both must coalesce + // into a single HTTP GET for the shared file rather than racing two parallel downloads + // against the same destination path. + Path remoteDir = tempFolder.newFolder("remote").toPath(); + writeTestFvecs(remoteDir.resolve("test_base.fvecs"), 4, new float[][] {{1f, 0f, 0f, 0f}, {0f, 1f, 0f, 0f}}); + writeTestFvecs(remoteDir.resolve("sub_base.fvecs"), 4, new float[][] {{0f, 0f, 1f, 0f}, {0f, 0f, 0f, 1f}}); + writeTestFvecs(remoteDir.resolve("shared_query.fvecs"), 4, new float[][] {{1f, 0f, 0f, 0f}}); + writeTestIvecs(remoteDir.resolve("test_gt.ivecs"), new int[][] {{0}}); + writeTestIvecs(remoteDir.resolve("sub_gt.ivecs"), new int[][] {{1}}); + + AtomicInteger sharedDownloads = new AtomicInteger(); + CountDownLatch unblockSharedQuery = new CountDownLatch(1); + + HttpServer server = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 0); + server.createContext("/", exchange -> { + if (exchange.getRequestURI().getPath().endsWith("shared_query.fvecs")) { + sharedDownloads.incrementAndGet(); + try { + unblockSharedQuery.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + serveStaticFile(exchange, remoteDir); + }); + server.start(); + try { + Files.writeString(cacheDir.resolve("catalog_entries.yaml"), + "_defaults:\n" + + " base_url: http://127.0.0.1:" + server.getAddress().getPort() + "/\n" + + "test-ds:\n" + + " base: test_base.fvecs\n" + + " query: shared_query.fvecs\n" + + " gt: test_gt.ivecs\n" + + "sub-ds:\n" + + " base: sub_base.fvecs\n" + + " query: shared_query.fvecs\n" + + " gt: sub_gt.ivecs\n"); + + var loader = new DataSetLoaderSimpleMFD(null, cacheDir.toString(), false, testMetadata); + + ExecutorService es = Executors.newFixedThreadPool(2); + try { + CompletableFuture f1 = CompletableFuture.runAsync( + () -> loader.loadDataSet("test-ds").orElseThrow().getDataSet(), es); + CompletableFuture f2 = CompletableFuture.runAsync( + () -> loader.loadDataSet("sub-ds").orElseThrow().getDataSet(), es); + + // wait for the first download to reach the server (proves it actually started) + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (sharedDownloads.get() < 1 && System.nanoTime() < deadline) { + Thread.sleep(10); + } + assertTrue(sharedDownloads.get() >= 1, + "first thread should have entered the server handler for shared_query.fvecs"); + + // brief pause so the second thread has time to reach its computeIfAbsent + // for the shared query path while the first thread is still blocked downloading + Thread.sleep(100); + unblockSharedQuery.countDown(); + + f1.join(); + f2.join(); + } finally { + es.shutdown(); + } + + assertEquals(1, sharedDownloads.get(), + "shared query file should have been downloaded exactly once via coalescing"); + } finally { + server.stop(0); + } + } + + @Test + public void failedDownloadIsRetriable() throws Exception { + // A download failure must remove the entry from the in-flight map so a subsequent + // loadDataSet call can retry. If the failed future were retained, the retry would + // join the cached failed future and observe the same exception forever. + Path remoteDir = tempFolder.newFolder("remote").toPath(); + writeTestDataFiles(remoteDir); + + AtomicInteger requestCount = new AtomicInteger(); + HttpServer server = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 0); + server.createContext("/", exchange -> { + // the first request to any file gets a 500; all subsequent requests succeed + if (requestCount.incrementAndGet() == 1) { + exchange.sendResponseHeaders(500, -1); + exchange.close(); + return; + } + serveStaticFile(exchange, remoteDir); + }); + server.start(); + try { + Files.writeString(cacheDir.resolve("catalog_entries.yaml"), + "_defaults:\n" + + " base_url: http://127.0.0.1:" + server.getAddress().getPort() + "/\n" + + "test-ds:\n" + + " base: test_base.fvecs\n" + + " query: test_query.fvecs\n" + + " gt: test_gt.ivecs\n"); + + var loader = new DataSetLoaderSimpleMFD(null, cacheDir.toString(), false, testMetadata); + + assertThrows(RuntimeException.class, + () -> loader.loadDataSet("test-ds").orElseThrow().getDataSet(), + "first load should fail because one of the parallel downloads got a 500"); + + // retry: the failed in-flight entry should have been removed, so the missing file + // can be downloaded on this call (the other facets are already cached locally) + var ds = loader.loadDataSet("test-ds").orElseThrow().getDataSet(); + assertEquals(5, ds.getBaseVectors().size()); + } finally { + server.stop(0); + } + } + + @Test + public void successfulDownloadRemovesEntryFromInFlightMap() throws Exception { + // Memory hygiene: after a successful download, the in-flight map must not retain the + // completed future. The check uses reflection to avoid widening the field's visibility + // purely for test purposes. + Path remoteDir = tempFolder.newFolder("remote").toPath(); + writeTestDataFiles(remoteDir); + + HttpServer server = startFileServer(remoteDir); + try { + Files.writeString(cacheDir.resolve("catalog_entries.yaml"), + "_defaults:\n" + + " base_url: http://127.0.0.1:" + server.getAddress().getPort() + "/\n" + + "test-ds:\n" + + " base: test_base.fvecs\n" + + " query: test_query.fvecs\n" + + " gt: test_gt.ivecs\n"); + + var loader = new DataSetLoaderSimpleMFD(null, cacheDir.toString(), false, testMetadata); + var ds = loader.loadDataSet("test-ds").orElseThrow().getDataSet(); + assertEquals(5, ds.getBaseVectors().size()); + + Field field = DataSetLoaderSimpleMFD.class.getDeclaredField("downloadsInFlight"); + field.setAccessible(true); + ConcurrentHashMap map = (ConcurrentHashMap) field.get(loader); + assertEquals(0, map.size(), + "in-flight map should be empty after all downloads complete"); + } finally { + server.stop(0); + } + } + // ======================================================================== // Helpers // ========================================================================