Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@
import java.nio.file.StandardCopyOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

/// A dataset loader that works with fvec/ivec datasets described by YAML catalog files
Expand Down Expand Up @@ -77,7 +80,13 @@
/// is fetched and its raw contents are cached locally in a hidden snapshot file for offline use.
/// On each run, the effective included entries are rebuilt by applying the local
/// {@code _defaults} to the fetched (or cached) remote entries. Local entries in the same
/// wrapper file are processed afterward and therefore take precedence over included remote entries.
/// wrapper file are processed afterward and may take precedence over included remote entries,
/// **but only when the override does not change the resolved local path** (i.e. {@code cache_dir}
/// and the {@code base}/{@code query}/{@code gt} filenames must match across all catalog files
/// contributing the same entry). Override is intended for things like changing {@code base_url}
/// to point at a private mirror; it is not a mechanism for redirecting a dataset to a different
/// physical file. Any conflict in the resolved local path is rejected at construction time
/// as a critical error — see [#validateCatalogPaths()].
/// This lets a single local file act as a thin configuration wrapper around a remote catalog:
/// ```yaml
/// _defaults:
Expand Down Expand Up @@ -265,10 +274,24 @@ private static class CatalogEntry {
private final DataSetMetadataReader metadata;
private final HttpClient httpClient;

/// Per-(dataset, facet) record of every catalog file that contributed a resolved local
/// path during loading. Populated while catalog files are being loaded, then consumed by
/// [#validateCatalogPaths()] which enforces that no two catalog files disagree on the path
/// for a given (dataset, facet) and that no two distinct (dataset, facet) pairs resolve to
/// the same local path.
private final Map<String, Map<String, List<PathContribution>>> pathContributions = new HashMap<>();

// S3 instances for connection pooling
private S3AsyncClient s3Client;
private S3TransferManager s3TransferManager;

/// Tracks in-flight downloads keyed by their normalized local destination path so that
/// concurrent {@link #loadDataSet} calls referencing the same local file (e.g. two datasets
/// that legitimately share a query facet) coalesce into a single download instead of racing.
/// Entries are removed when the underlying future completes, regardless of outcome, so
/// failures may be retried by a later caller and the map does not grow without bound.
private final ConcurrentHashMap<Path, CompletableFuture<Void>> downloadsInFlight = new ConcurrentHashMap<>();

/// Creates a local-only loader that recursively discovers all {@code .yaml}/{@code .yml}
/// files under the given path.
///
Expand Down Expand Up @@ -374,6 +397,9 @@ public DataSetLoaderSimpleMFD(String catalogUrl, String localPath, boolean check
var remoteCatalogData = fetchRemoteCatalogRaw(catalogUrl);
this.catalog = toCatalogEntries(remoteCatalogData, localCacheDir);
saveCatalogLocally(localCatalog, catalogUrl, remoteCatalogData);
for (var entry : this.catalog.entrySet()) {
recordPathContributions(localCatalog, entry.getKey(), entry.getValue());
}
}
} else {
if (!localEntries.isEmpty()) {
Expand All @@ -383,6 +409,8 @@ public DataSetLoaderSimpleMFD(String catalogUrl, String localPath, boolean check
this.catalog = Map.of();
}
}

validateCatalogPaths();
}

@Override
Expand All @@ -405,11 +433,13 @@ public Optional<DataSetInfo> loadDataSet(String dataSetName) {
String effectiveBaseUrl = entry.baseUrl != null ? entry.baseUrl : remoteBasePath;
Path effectiveCacheDir = entry.cacheDir;

// Execute downloads simultaneously to maximize network bandwidth
// Execute downloads simultaneously to maximize network bandwidth. Each call coalesces
// with any in-flight download for the same target path so concurrent loadDataSet calls
// for datasets that share a file do not race on the destination.
try {
var f1 = CompletableFuture.runAsync(() -> ensureQuietly(baseFile, effectiveCacheDir, effectiveBaseUrl));
var f2 = CompletableFuture.runAsync(() -> ensureQuietly(queryFile, effectiveCacheDir, effectiveBaseUrl));
var f3 = CompletableFuture.runAsync(() -> ensureQuietly(gtFile, effectiveCacheDir, effectiveBaseUrl));
var f1 = ensureFileAvailableAsync(baseFile, effectiveCacheDir, effectiveBaseUrl);
var f2 = ensureFileAvailableAsync(queryFile, effectiveCacheDir, effectiveBaseUrl);
var f3 = ensureFileAvailableAsync(gtFile, effectiveCacheDir, effectiveBaseUrl);

CompletableFuture.allOf(f1, f2, f3).join();
} catch (Exception e) {
Expand Down Expand Up @@ -536,7 +566,9 @@ private void loadCatalogEntries(Path catalogFile, Map<String, CatalogEntry> targ
fields.putAll(e.getValue());
}

putCatalogEntry(target, name, buildCatalogEntry(fields, catalogDir, source));
CatalogEntry built = buildCatalogEntry(fields, catalogDir, source);
putCatalogEntry(target, name, built);
recordPathContributions(catalogFile, name, built);
}
}

Expand Down Expand Up @@ -584,7 +616,9 @@ private void loadRemoteInclude(String includeUrl, Map<String, String> defaults,
fields.put("base_url", remoteBase);
}

putCatalogEntry(target, e.getKey(), buildCatalogEntry(fields, catalogDir, CatalogSource.INCLUDED_REMOTE));
CatalogEntry built = buildCatalogEntry(fields, catalogDir, CatalogSource.INCLUDED_REMOTE);
putCatalogEntry(target, e.getKey(), built);
recordPathContributions(cachedIncludeFile, e.getKey(), built);
}

logger.info("Included {} datasets from {} catalog", entryCount,
Expand Down Expand Up @@ -690,6 +724,142 @@ private static String expandEnvVars(String value) {
return sb.toString();
}

// ========================================================================================
// PATH CONFLICT VALIDATION
// ========================================================================================

/// Facet fields whose values name a data file resolved relative to the entry's cache directory.
private static final List<String> FILE_FACETS = List.of("base", "query", "gt");

/// Records, for a single (dataset, facet), which catalog file contributed the entry and the
/// normalized local path that would be used to open the file (the cache directory resolved
/// against the facet's filename, with redundant {@code .} and {@code ..} segments collapsed).
private static class PathContribution {
final Path sourceCatalog;
final Path normalizedPath;

PathContribution(Path sourceCatalog, Path normalizedPath) {
this.sourceCatalog = sourceCatalog;
this.normalizedPath = normalizedPath;
}
}

/// Records every facet of the given entry as a contribution from {@code sourceCatalog}.
/// The recorded path is {@code cacheDir.resolve(filename).normalize()} — the same form
/// later used when opening the file, viewed with the cache directory as the parent.
/// Contributions are validated after all catalog files have been loaded.
private void recordPathContributions(Path sourceCatalog, String dataset, CatalogEntry entry) {
for (String facet : FILE_FACETS) {
String filename = entry.fields.get(facet);
if (filename == null) continue;
Path normalized = entry.cacheDir.resolve(filename).normalize();
pathContributions
.computeIfAbsent(dataset, k -> new HashMap<>())
.computeIfAbsent(facet, k -> new ArrayList<>())
.add(new PathContribution(sourceCatalog, normalized));
}
}

/// Validates the assembled view of dataset facets across all discovered catalog files.
/// Two conditions are checked with facet-dependent severity:
///
/// 1. **Varying path for the same (dataset, facet)** — always a critical error. Two catalog
/// files that both contribute the same (dataset, facet) must agree on the resolved local
/// path; otherwise file selection is non-deterministic across walks.
/// 2. **Same path reused across distinct (dataset, facet) pairs** — severity depends on
/// which facets share the path:
/// - `query`-only sharing is permitted with a warning (e.g. reusing one query set
/// across datasets is a legitimate workflow).
/// - Any sharing that involves `gt` (ground truth) or `base` is a critical error.
/// Ground truth is dataset-specific by definition. Base files would only legitimately
/// be shared via a row-range specifier that distinguishes which slice each dataset
/// uses; this loader does not yet support range specifiers, so a shared base path
/// between distinct datasets is always an error.
private void validateCatalogPaths() {
// 1. detect varying paths for the same (dataset, facet) — critical
for (var datasetEntry : pathContributions.entrySet()) {
String dataset = datasetEntry.getKey();
for (var facetEntry : datasetEntry.getValue().entrySet()) {
String facet = facetEntry.getKey();
var contribs = facetEntry.getValue();
var distinct = new java.util.HashSet<Path>();
for (var c : contribs) {
distinct.add(c.normalizedPath);
}
if (distinct.size() > 1) {
StringBuilder sb = new StringBuilder();
sb.append("Conflicting local paths for dataset '").append(dataset)
.append("' facet '").append(facet)
.append("'. All catalog files that name this facet must resolve it to the same path:");
for (var c : contribs) {
sb.append("\n ").append(redact(c.sourceCatalog))
.append(" -> ").append(redact(c.normalizedPath));
}
throw new IllegalStateException(sb.toString());
}
}
}

// 2. detect the same path being used by more than one (dataset, facet) pair.
// Build reverse index path -> list of owners so each shared path is reported once
// with every owner. Severity is determined by the facets involved: query-only
// collisions warn; any collision involving base or gt is a hard error.
Map<Path, List<OwnerRef>> ownersByPath = new HashMap<>();
for (var datasetEntry : pathContributions.entrySet()) {
String dataset = datasetEntry.getKey();
for (var facetEntry : datasetEntry.getValue().entrySet()) {
String facet = facetEntry.getKey();
// by check 1 above, all contributions share the same path; use the first
var first = facetEntry.getValue().get(0);
ownersByPath
.computeIfAbsent(first.normalizedPath, k -> new ArrayList<>())
.add(new OwnerRef(dataset, facet, first.sourceCatalog));
}
}

List<String> hardErrors = new ArrayList<>();
for (var e : ownersByPath.entrySet()) {
List<OwnerRef> owners = e.getValue();
if (owners.size() <= 1) continue;

boolean involvesBaseOrGt = owners.stream()
.anyMatch(o -> "base".equals(o.facet) || "gt".equals(o.facet));

StringBuilder sb = new StringBuilder();
sb.append("Local path ").append(redact(e.getKey()))
.append(" is referenced by multiple dataset facets:");
for (OwnerRef o : owners) {
sb.append("\n ").append(o.dataset).append(":").append(o.facet)
.append(" (from ").append(redact(o.sourceCatalog)).append(")");
}

if (involvesBaseOrGt) {
hardErrors.add(sb.toString());
} else {
logger.warn(sb.toString());
}
}

if (!hardErrors.isEmpty()) {
throw new IllegalStateException(
"Local path conflicts detected (sharing is only permitted between 'query' facets):\n"
+ String.join("\n", hardErrors));
}
}

/// One owner of a shared local path, used to format collision messages.
private static class OwnerRef {
final String dataset;
final String facet;
final Path sourceCatalog;

OwnerRef(String dataset, String facet, Path sourceCatalog) {
this.dataset = dataset;
this.facet = facet;
this.sourceCatalog = sourceCatalog;
}
}

// ========================================================================================
// FILE AVAILABILITY
// ========================================================================================
Expand All @@ -702,6 +872,29 @@ private void ensureQuietly(String filename, Path cacheDir, String baseUrl) {
}
}

/// Returns a future that completes when {@code filename} is available locally under
/// {@code cacheDir}, coalescing duplicate concurrent requests for the same target path.
/// If another caller is already downloading the same file, this returns that caller's
/// future instead of starting a second download — preventing wasted bandwidth and, more
/// importantly, avoiding concurrent writes to the same destination path (which the S3
/// transfer manager and the HTTP retry-and-delete logic do not handle safely).
/// The map entry is cleared on completion (success or failure) so a failed download can
/// be retried by a subsequent call and so the map does not retain references indefinitely.
private CompletableFuture<Void> ensureFileAvailableAsync(String filename, Path cacheDir, String baseUrl) {
Path key = cacheDir.resolve(filename).normalize();
// The future stored in the map AND returned to callers is the whenCompleteAsync stage so
// that waiters who join it are guaranteed to observe the post-completion cleanup.
// whenCompleteAsync (rather than whenComplete) is required: when ensureQuietly is a
// no-op because the file already exists, the runAsync future can complete before
// .whenComplete is attached, in which case a plain whenComplete would fire the cleanup
// synchronously inside the computeIfAbsent remapping function and ConcurrentHashMap
// would throw "Recursive update". The async variant always defers the cleanup to an
// executor thread, which is then free to mutate the map.
return downloadsInFlight.computeIfAbsent(key, p ->
CompletableFuture.runAsync(() -> ensureQuietly(filename, cacheDir, baseUrl))
.whenCompleteAsync((r, ex) -> downloadsInFlight.remove(p)));
}

/// Ensures a dataset file is available locally. Checks in the entry's cache directory first.
/// If not found and a remote base URL is available (either per-entry or loader-level),
/// downloads the file.
Expand Down
Loading
Loading