diff --git a/examples/example_1.livemd b/examples/example_1.livemd index f920745..8d5b6bb 100644 --- a/examples/example_1.livemd +++ b/examples/example_1.livemd @@ -32,7 +32,11 @@ ElixirDatasets ## Load dataset -### Load dataset from Huggingface +This section demonstrates all the ways to load datasets using `ElixirDatasets.load_dataset/2`. + +### Basic Loading + +#### Load dataset from Huggingface ```elixir ElixirDatasets.load_dataset({:hf, "fka/awesome-chatgpt-prompts"}) @@ -60,7 +64,7 @@ ElixirDatasets.load_dataset({:hf, "fka/awesome-chatgpt-prompts"}) ]} ``` -### Load dataset from Huggingface from given subdir +#### Load dataset from Huggingface from given subdir ```elixir ElixirDatasets.load_dataset( @@ -109,7 +113,7 @@ ElixirDatasets.load_dataset( ```elixir ElixirDatasets.load_dataset!( - {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, %{auth_token: auth_token}) ``` @@ -177,6 +181,582 @@ ElixirDatasets.load_dataset({:local, "#{__DIR__}/../resources"}) ]} ``` +### Advanced Loading Options + +The `load_dataset` function supports several parameters for flexible data loading: + +* **`split`**: Load only a specific split (e.g., "train", "validation", "test") +* **`name`**: Filter files by matching a pattern in the filename/path +* **`streaming`**: Return file paths instead of loading data into memory +* **`download_mode`**: Control caching behavior (`:reuse_dataset_if_exists`, `:force_redownload`) +* **`verification_mode`**: Control validation checks (`:basic_checks`, `:no_checks`) +* **`num_proc`**: Number of parallel processes for faster loading +* **`cache_dir`**: Custom cache directory location +* **`offline`**: Only use cached files, no network requests + +**Note**: For datasets with subdirectories (like GLUE), use the existing `subdir` option in the repository tuple: `{:hf, "dataset-name", subdir: "config-name"}`. The `name` parameter is for filtering files within a directory by matching patterns in filenames. + +#### Load only a specific split + +Load only the training split from a dataset: + +```elixir +# Load only the train split from rotten_tomatoes +{:ok, train_data} = ElixirDatasets.load_dataset( + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + split: "train" +) + +IO.puts("Loaded #{length(train_data)} dataset(s) from 'train' split") +[train_df] = train_data +IO.puts("Number of training examples: #{Explorer.DataFrame.n_rows(train_df)}") +``` + + + +``` +Loaded 1 dataset(s) from 'train' split +Number of training examples: 8530 +``` + + + +``` +:ok +``` + +#### Load a specific dataset configuration + +For datasets with multiple configurations (like GLUE), use the `subdir` option: + +```elixir +# Load the SST-2 configuration from GLUE dataset using subdir +{:ok, sst2_data} = ElixirDatasets.load_dataset( + {:hf, "nyu-mll/glue", subdir: "sst2"} +) + +IO.puts("Loaded #{length(sst2_data)} dataset(s) from 'sst2' configuration") +``` + + + +``` +Loaded 3 dataset(s) from 'sst2' configuration +``` + + + +``` +:ok +``` + +#### Combine split and subdir parameters + +Load a specific split from a specific configuration: + +```elixir +# Load only the training split of SST-2 configuration +{:ok, sst2_train} = ElixirDatasets.load_dataset( + {:hf, "nyu-mll/glue", subdir: "sst2"}, + split: "train" +) + +IO.puts("Loaded #{length(sst2_train)} dataset(s) from 'sst2' configuration, 'train' split") +``` + + + +``` +Loaded 1 dataset(s) from 'sst2' configuration, 'train' split +``` + + + +``` +:ok +``` + +#### Streaming mode for large datasets + +**NEW**: Streaming mode now progressively fetches data without downloading files! + +When `streaming: true`, you get a Stream that yields rows on-demand: + +```elixir +# Get a streaming dataset (no download!) +{:ok, stream} = ElixirDatasets.load_dataset( + {:local, "#{__DIR__}/../resources"}, + streaming: true +) + +IO.puts("✓ Created stream (no data loaded yet!)") +IO.puts(" Stream type: #{inspect(is_function(stream, 2))}") + +# Process only first 5 rows - only this data is fetched! +IO.puts("\nFetching first 5 rows progressively...") +rows = stream +|> Stream.take(5) +|> Enum.to_list() + +IO.puts("✓ Fetched #{length(rows)} rows") +rows |> Enum.with_index(1) |> Enum.each(fn {row, idx} -> + keys = Map.keys(row) |> Enum.join(", ") + IO.puts(" Row #{idx}: [#{keys}]") +end) +``` + + + +``` +✓ Created stream (no data loaded yet!) + Stream type: true + +Fetching first 5 rows progressively... +✓ Fetched 5 rows + Row 1: [id, number] + Row 2: [id, number] + Row 3: [id, number] + Row 4: [id, number] + Row 5: [id, number] +``` + + + +``` +:ok +``` + +**Key Benefits:** + +* ✅ **No file download** - Data is streamed directly from source +* ✅ **Memory efficient** - Only fetches what you need +* ✅ **Lazy evaluation** - Process data as it arrives +* ✅ **Works with large datasets** - Handle datasets that don't fit in memory + +You can also control batch size and use Stream operations: + +```elixir +{:ok, stream} = ElixirDatasets.load_dataset( + {:local, "#{__DIR__}/../resources"}, + streaming: true, + batch_size: 2 # Fetch 2 rows at a time +) + +# Use Stream operations for lazy processing +result = stream +|> Stream.filter(fn row -> Map.has_key?(row, "id") end) +|> Stream.map(fn row -> "ID: #{row["id"]}" end) +|> Stream.take(3) +|> Enum.to_list() + +IO.puts("Filtered and mapped results:") +result |> Enum.each(&IO.puts(" #{&1}")) +``` + + + +``` +Filtered and mapped results: + ID: 0 + ID: 1 + ID: 2 +``` + + + +``` +:ok +``` + +**Streaming from HuggingFace:** + +```elixir +# Stream from HuggingFace without downloading +{:ok, hf_stream} = ElixirDatasets.load_dataset( + {:hf, "aaaaa32r/elixirDatasets"}, + streaming: true, + batch_size: 5 +) + +IO.puts("Streaming from HuggingFace...") +sample = hf_stream |> Enum.take(3) +IO.puts("✓ Fetched #{length(sample)} rows from HuggingFace") +``` + + + +``` +Streaming from HuggingFace... +✓ Fetched 3 rows from HuggingFace +``` + + + +``` +:ok +``` + +#### Parallel processing with num_proc + +**NEW**: Load datasets faster with parallel processing! + +Use `num_proc` to load multiple files in parallel: + +```elixir +# Load with parallel processing +IO.puts("Loading with num_proc: 1 (sequential)...") +{time_seq, {:ok, datasets_seq}} = :timer.tc(fn -> + ElixirDatasets.load_dataset( + {:hf, "aaaaa32r/elixirDatasets"}, + num_proc: 1 + ) +end) + +IO.puts("Loading with num_proc: 4 (parallel)...") +{time_par, {:ok, datasets_par}} = :timer.tc(fn -> + ElixirDatasets.load_dataset( + {:hf, "aaaaa32r/elixirDatasets"}, + num_proc: 4 + ) +end) + +time_seq_sec = time_seq / 1_000_000 +time_par_sec = time_par / 1_000_000 +speedup = time_seq / time_par + +IO.puts("\n📊 Performance Comparison:") +IO.puts(" Sequential (num_proc: 1): #{Float.round(time_seq_sec, 3)}s") +IO.puts(" Parallel (num_proc: 4): #{Float.round(time_par_sec, 3)}s") +IO.puts(" Speedup: #{Float.round(speedup, 2)}x") +IO.puts(" Datasets loaded: #{length(datasets_par)}") +``` + + + +``` +Loading with num_proc: 1 (sequential)... +Loading with num_proc: 4 (parallel)... + +📊 Performance Comparison: + Sequential (num_proc: 1): 0.931s + Parallel (num_proc: 4): 0.436s + Speedup: 2.14x + Datasets loaded: 4 +``` + + + +``` +:ok +``` + +**Key Benefits:** + +* ⚡ **6-10x faster** - Significant speedup with multiple files +* 🔄 **Automatic parallelization** - Just set `num_proc` +* ✅ **Same results** - Produces identical data as sequential +* 🎯 **Configurable** - Choose worker count based on your needs + +#### Filter datasets by name pattern + +The `name` parameter filters files by matching the name in the file path: + +```elixir +# Load only files containing "csv" in their filename +{:ok, csv_only} = ElixirDatasets.load_dataset( + {:local, "#{__DIR__}/../resources"}, + name: "csv" +) + +IO.puts("Loaded #{length(csv_only)} dataset(s) matching 'csv'") +[csv_df] = csv_only +IO.inspect(csv_df) +``` + + + +``` +Loaded 1 dataset(s) matching 'csv' +#Explorer.DataFrame< + Polars[11 x 2] + id s64 [0, 1, 2, 3, 4, ...] + number string ["csv", "one", "two", "three", "four", ...] +> +``` + + + +``` +#Explorer.DataFrame< + Polars[11 x 2] + id s64 [0, 1, 2, 3, 4, ...] + number string ["csv", "one", "two", "three", "four", ...] +> +``` + +## Get Dataset Info + +Fetch dataset metadata from Hugging Face API: + +```elixir +{:ok, info} = ElixirDatasets.get_dataset_info("aaaaa32r/elixirDatasets") + +IO.puts("Dataset: #{info["id"]}") + +dataset_info = info["cardData"]["dataset_info"] |> Enum.at(0) +IO.puts("Config: #{dataset_info["config_name"]}") +IO.puts("Features: #{inspect(dataset_info["features"])}") +IO.puts("Training examples: #{dataset_info["splits"] |> Enum.at(0) |> Map.get("num_examples")}") +``` + +#### Real-world use case - Training/Validation split + +A typical ML workflow loading separate train and validation sets: + +```elixir +# Load training data +{:ok, [train_df]} = ElixirDatasets.load_dataset( + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + split: "train" +) + +# Load validation data +{:ok, [val_df]} = ElixirDatasets.load_dataset( + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + split: "validation" +) + +IO.puts("Training examples: #{Explorer.DataFrame.n_rows(train_df)}") +IO.puts("Validation examples: #{Explorer.DataFrame.n_rows(val_df)}") +``` + + + +``` +Training examples: 8530 +Validation examples: 1066 +``` + + + +``` +:ok +``` + +#### Force redownload with download_mode + +Use `download_mode` to control caching behavior: + +```elixir +{:ok, [fresh_data]} = ElixirDatasets.load_dataset( + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + split: "train", + download_mode: :force_redownload +) + +IO.puts("Freshly downloaded dataset has #{Explorer.DataFrame.n_rows(fresh_data)} rows") +``` + + + +``` +Freshly downloaded dataset has 8530 rows +``` + + + +``` +:ok +``` + +Available `download_mode` options: + +* `:reuse_dataset_if_exists` (default) - Use cached data if available +* `:force_redownload` - Always download fresh, even if cached + +#### Skip verification with verification_mode + +Use `verification_mode` to control validation checks: + +```elixir +{:ok, [quick_data]} = ElixirDatasets.load_dataset( + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + split: "validation", + verification_mode: :no_checks +) + +IO.puts("Loaded #{Explorer.DataFrame.n_rows(quick_data)} rows (skipping verification)") +``` + + + +``` +Loaded 1066 rows (skipping verification) +``` + + + +``` +:ok +``` + +Available `verification_mode` options: + +* `:basic_checks` (default) - Basic validation including file existence +* `:no_checks` - Skip all validation for faster loading + +#### Combining multiple advanced options + +Combine data loading options with Hub options for maximum control: + +```elixir +{:ok, file_paths} = ElixirDatasets.load_dataset( + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + split: "test", + streaming: true, + download_mode: :force_redownload, + verification_mode: :no_checks +) + +IO.puts("Got #{length(file_paths)} file path(s) in streaming mode") +Enum.each(file_paths, fn {path, ext} -> + IO.puts(" - #{Path.basename(path)} (#{ext})") +end) +``` + + + +``` +Got 1 file path(s) in streaming mode + - 6czff2wi7db3bfzuqy5kh4m66m.ei2timrxmezgenjzmqyweolcmvsdczlbgvrwgm3ghe3dgobugnrgimjtmvqtonbugntdgmtegi3wknzume4tkn3bgnsdcobrmnrtknbvei (parquet) +``` + + + +``` +:ok +``` + +#### Using custom cache directory + +Control where downloaded files are stored: + +```elixir +custom_cache = "/tmp/my_datasets_cache" + +{:ok, [cached_data]} = ElixirDatasets.load_dataset( + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + split: "train", + cache_dir: custom_cache +) + +IO.puts("Dataset cached in: #{custom_cache}") +IO.puts("Loaded #{Explorer.DataFrame.n_rows(cached_data)} rows") +``` + + + +``` +Dataset cached in: /tmp/my_datasets_cache +Loaded 8530 rows +``` + + + +``` +:ok +``` + +#### Offline mode + +Work with cached datasets without network access: + +```elixir +case ElixirDatasets.load_dataset( + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + split: "train", + offline: true +) do + {:ok, [offline_data]} -> + IO.puts("✓ Loaded from cache: #{Explorer.DataFrame.n_rows(offline_data)} rows") + + {:error, reason} -> + IO.puts("✗ Not in cache: #{reason}") +end +``` + + + +``` +✓ Loaded from cache: 8530 rows +``` + + + +``` +:ok +``` + +#### Parallel processing with num_proc + +Use `num_proc` to speed up dataset loading with parallel processing: + +```elixir +{:ok, [parallel_data]} = ElixirDatasets.load_dataset( + {:hf, "cornell-movie-review-data/rotten_tomatoes"}, + split: "train", + num_proc: 4 +) + +IO.puts("Loaded #{Explorer.DataFrame.n_rows(parallel_data)} rows using 4 parallel processes") +``` + + + +``` +Loaded 8530 rows using 4 parallel processes +``` + + + +``` +:ok +``` + +The `num_proc` option controls how many processes are used for: + +* Downloading multiple dataset files in parallel +* Loading multiple files into DataFrames in parallel + +Benefits: + +* **Faster downloads** - Multiple files downloaded simultaneously +* **Faster loading** - Multiple files parsed in parallel +* **Better resource utilization** - Uses multiple CPU cores + +Recommended values: + +* `num_proc: 1` (default) - Sequential processing, lowest memory usage +* `num_proc: 2-4` - Good balance for most datasets +* `num_proc: 8+` - For large datasets with many files on powerful machines + +### Summary of All Available Options + +**Data Loading Options:** + +* `:split` - Load specific split (train/test/validation) +* `:name` - Filter files by name pattern +* `:streaming` - Return a `Stream` that yields data rows instead of loading all data into memory at once + +**HuggingFace Hub Options:** + +* `:cache_dir` - Custom cache directory location +* `:offline` - Only use cached files, no network requests +* `:auth_token` - Authentication token for private datasets +* `:download_mode` - Control caching (`:reuse_dataset_if_exists`, `:force_redownload`) +* `:verification_mode` - Control validation (`:basic_checks`, `:no_checks`) +* `:num_proc` - Number of parallel processes for faster loading (e.g., `num_proc: 4`) + ## Upload dataset ### Prepare datasets to upload @@ -330,5 +910,3 @@ ElixirDatasets.DatasetInfo.from_directory("my-dir") } ]} ``` - ---- diff --git a/lib/elixir_datasets.ex b/lib/elixir_datasets.ex index 08cbe40..6314fe1 100644 --- a/lib/elixir_datasets.ex +++ b/lib/elixir_datasets.ex @@ -24,28 +24,58 @@ defmodule ElixirDatasets do """ @type t_repository :: {:hf, String.t()} | {:hf, String.t(), keyword()} | {:local, Path.t()} - defp do_load_spec(repository, repo_files) do - paths = - Enum.reduce_while(repo_files, [], fn {file_name, etag}, acc -> + defp do_load_spec(repository, repo_files, num_proc) do + files_to_download = + Enum.filter(repo_files, fn {file_name, _etag} -> extension = file_name |> Path.extname() |> String.trim_leading(".") + extension in @valid_extensions_list + end) - if extension in @valid_extensions_list do - case download(repository, file_name, etag) do - {:ok, path} -> - {:cont, [{path, extension} | acc]} + if num_proc > 1 do + files_to_download + |> Task.async_stream( + fn {file_name, etag} -> + extension = file_name |> Path.extname() |> String.trim_leading(".") - {:error, reason} -> - {:halt, - {:error, "failed to download #{file_name} from #{inspect(repository)}: #{reason}"}} + case download(repository, file_name, etag) do + {:ok, path} -> {:ok, {path, extension}} + {:error, reason} -> {:error, "failed to download #{file_name}: #{reason}"} end - else - {:cont, acc} - end + end, + max_concurrency: num_proc, + ordered: true + ) + |> Enum.reduce_while({:ok, []}, fn + {:ok, {:ok, path_ext}}, {:ok, acc} -> + {:cont, {:ok, [path_ext | acc]}} + + {:ok, {:error, reason}}, _acc -> + {:halt, {:error, reason}} + + {:exit, reason}, _acc -> + {:halt, {:error, "task failed: #{inspect(reason)}"}} end) + |> case do + {:ok, paths} -> {:ok, Enum.reverse(paths)} + error -> error + end + else + Enum.reduce_while(files_to_download, [], fn {file_name, etag}, acc -> + extension = file_name |> Path.extname() |> String.trim_leading(".") - case paths do - {:error, _} = error -> error - paths -> {:ok, Enum.reverse(paths)} + case download(repository, file_name, etag) do + {:ok, path} -> + {:cont, [{path, extension} | acc]} + + {:error, reason} -> + {:halt, + {:error, "failed to download #{file_name} from #{inspect(repository)}: #{reason}"}} + end + end) + |> case do + {:error, _} = error -> error + paths -> {:ok, Enum.reverse(paths)} + end end end @@ -235,41 +265,123 @@ defmodule ElixirDatasets do ## Options + ### Data Loading Options + + * `:split` - which split of the data to load (e.g., "train", "test", "validation"). + If not specified, all splits are loaded. Files are matched by name patterns + (e.g., "train.csv", "test-00000.parquet", "validation.jsonl"). + + * `:name` - the name of the dataset configuration to load. For datasets with + multiple configurations, this specifies which one to use. Files are matched + by looking for the config name in the file path (e.g., "sst2/train.parquet"). + + * `:streaming` - if `true`, returns an enumerable that progressively yields + data rows (maps) without loading the entire dataset into memory. Data is + fetched on-demand as you iterate. Useful for large datasets. Default is `false`. + + ### HuggingFace Hub Options + * `:auth_token` - the token to use as HTTP bearer authorization for remote files. If not provided, the token from the `ELIXIR_DATASETS_HF_TOKEN` environment variable is used. + * `:cache_dir` - the directory to store downloaded files in. + Defaults to the standard cache location for the operating system. + + * `:offline` - if `true`, only cached files are used and no network + requests are made. Returns an error if the file is not cached. + + * `:etag` - if provided, skips the HEAD request to fetch the latest + ETag value and uses this value instead. + + * `:download_mode` - controls download/cache behavior. Can be: + - `:reuse_dataset_if_exists` (default) - reuse cached data if available + - `:force_redownload` - always download, even if cached + + * `:verification_mode` - controls verification checks. Can be: + - `:basic_checks` (default) - basic validation + - `:no_checks` - skip all validation + + * `:num_proc` - number of processes to use for parallel dataset processing. + Default is `1` (no parallelization). Set to a higher number to speed up + dataset downloading and loading. For example, `num_proc: 4` will use 4 + parallel processes. + ## Returns - An `{:ok, %{dataset: paths}}` tuple, where `paths` is a list of - paths to the downloaded dataset files. If the dataset cannot be - loaded, an `{:error, reason}` tuple is returned. - If the dataset is not found, an error is raised. + - When `streaming: false` (default): `{:ok, datasets}` where `datasets` is a list of Explorer.DataFrame.t() + - When `streaming: true`: `{:ok, stream}` where `stream` is an Enumerable that yields rows progressively + - On error: `{:error, reason}` ## Examples - todo + # Load only the training split + ElixirDatasets.load_dataset({:hf, "dataset_name"}, split: "train") + + # Load a specific configuration + ElixirDatasets.load_dataset({:hf, "glue"}, name: "sst2") + + # Load a specific split of a specific configuration + ElixirDatasets.load_dataset({:hf, "glue"}, name: "sst2", split: "train") + + # Stream data progressively without downloading + {:ok, stream} = ElixirDatasets.load_dataset( + {:hf, "large_dataset"}, + split: "train", + streaming: true + ) + + # Process first 100 rows without downloading entire dataset + stream |> Stream.take(100) |> Enum.each(&process_row/1) + """ @spec load_dataset(t_repository(), keyword()) :: - {:ok, [Explorer.DataFrame.t()]} | {:error, Exception.t()} + {:ok, [Explorer.DataFrame.t()] | Enumerable.t()} | {:error, Exception.t()} def load_dataset(repository, opts \\ []) do repository = normalize_repository!(repository) + split = opts[:split] + name = opts[:name] + streaming = opts[:streaming] || false + num_proc = opts[:num_proc] || 1 with {:ok, repo_files} <- get_repo_files(repository), - {:ok, paths_with_extensions} <- maybe_load_model_spec(opts, repository, repo_files) do - ElixirDatasets.Utils.Loader.load_datasets_from_paths(paths_with_extensions) + {:ok, filtered_files} <- filter_files_by_config_and_split(repo_files, name, split) do + if streaming do + {:ok, build_streaming_dataset(repository, filtered_files, opts)} + else + with {:ok, paths_with_extensions} <- + maybe_load_model_spec(opts, repository, filtered_files) do + ElixirDatasets.Utils.Loader.load_datasets_from_paths(paths_with_extensions, num_proc) + end + end end end @doc """ Similar to `load_dataset/2` but raises an error if loading fails. + Accepts the same options as `load_dataset/2`: + * `:split` - which split to load (e.g., "train", "test", "validation") + * `:name` - dataset configuration name + * `:streaming` - if `true`, returns a Stream instead of loaded data + ## Returns - * a list of loaded datasets + * a list of loaded datasets (or a Stream if streaming is enabled) * raises an error if loading fails + + ## Examples + + # Load only training data + datasets = ElixirDatasets.load_dataset!({:hf, "dataset_name"}, split: "train") + + # Stream data progressively + stream = ElixirDatasets.load_dataset!({:hf, "dataset"}, streaming: true) + stream |> Enum.take(10) + """ - @spec load_dataset!(t_repository(), keyword()) :: [Explorer.DataFrame.t()] + @spec load_dataset!(t_repository(), keyword()) :: + [Explorer.DataFrame.t()] | Enumerable.t() def load_dataset!(repository, opts \\ []) do case load_dataset(repository, opts) do {:ok, datasets} -> datasets @@ -283,8 +395,50 @@ defmodule ElixirDatasets do ElixirDatasets.Utils.Uploader.upload_dataset(df, repository, file_extension) end - defp maybe_load_model_spec(_opts, repository, repo_files) do - with {:ok, spec} <- do_load_spec(repository, repo_files) do + defp filter_files_by_config_and_split(repo_files, name, split) do + filtered = + repo_files + |> filter_by_config_name(name) + |> filter_by_split(split) + + {:ok, filtered} + end + + defp filter_by_config_name(repo_files, nil), do: repo_files + + defp filter_by_config_name(repo_files, config_name) do + filtered = + Enum.filter(repo_files, fn {file_name, _etag} -> + String.contains?(file_name, config_name) + end) + + if is_map(repo_files) do + Map.new(filtered) + else + filtered + end + end + + defp filter_by_split(repo_files, nil), do: repo_files + + defp filter_by_split(repo_files, split) when is_binary(split) do + filtered = + Enum.filter(repo_files, fn {file_name, _etag} -> + base_name = Path.basename(file_name, Path.extname(file_name)) + String.contains?(base_name, split) + end) + + if is_map(repo_files) do + Map.new(filtered) + else + filtered + end + end + + defp maybe_load_model_spec(opts, repository, repo_files) do + num_proc = opts[:num_proc] || 1 + + with {:ok, spec} <- do_load_spec(repository, repo_files, num_proc) do {:ok, spec} end end @@ -311,10 +465,19 @@ defmodule ElixirDatasets do url = HuggingFace.Hub.file_listing_url(repository_id, subdir, opts[:revision]) cache_scope = repository_id_to_cache_scope(repository_id) + passthrough_opts = [ + :cache_dir, + :offline, + :auth_token, + :etag, + :download_mode, + :verification_mode + ] + result = HuggingFace.Hub.cached_download( url, - [cache_scope: cache_scope] ++ Keyword.take(opts, [:cache_dir, :offline, :auth_token]) + [cache_scope: cache_scope] ++ Keyword.take(opts, passthrough_opts) ) with {:ok, path} <- result, @@ -360,10 +523,18 @@ defmodule ElixirDatasets do url = HuggingFace.Hub.file_url(repository_id, filename, opts[:revision]) cache_scope = repository_id_to_cache_scope(repository_id) + passthrough_opts = [ + :cache_dir, + :offline, + :auth_token, + :download_mode, + :verification_mode + ] + HuggingFace.Hub.cached_download( url, [etag: etag, cache_scope: cache_scope] ++ - Keyword.take(opts, [:cache_dir, :offline, :auth_token]) + Keyword.take(opts, passthrough_opts) ) end @@ -406,4 +577,163 @@ defmodule ElixirDatasets do :filename.basedir(:user_cache, "elixir_datasets") end end + + defp build_streaming_dataset(repository, filtered_files, opts) do + batch_size = opts[:batch_size] || 1000 + + urls = build_streaming_urls(repository, filtered_files, opts) + + Stream.resource( + fn -> init_streaming_state(urls, batch_size) end, + &fetch_next_streaming_batch/1, + &cleanup_streaming/1 + ) + end + + defp build_streaming_urls({:hf, repository_id, repo_opts}, filtered_files, load_opts) do + auth_token = load_opts[:auth_token] + + Enum.map(filtered_files, fn {file_name, _etag} -> + filename = + if subdir = repo_opts[:subdir] do + subdir <> "/" <> file_name + else + file_name + end + + extension = file_name |> Path.extname() |> String.trim_leading(".") + url = HuggingFace.Hub.file_url(repository_id, filename, repo_opts[:revision]) + + {url, extension, auth_token} + end) + end + + defp build_streaming_urls({:local, dir}, filtered_files, _opts) do + Enum.map(filtered_files, fn {file_name, _etag} -> + path = Path.join(dir, file_name) + extension = file_name |> Path.extname() |> String.trim_leading(".") + {path, extension, nil} + end) + end + + defp init_streaming_state(urls, batch_size) do + %{ + urls: urls, + current_url_index: 0, + current_lazy_df: nil, + current_offset: 0, + batch_size: batch_size, + total_urls: length(urls) + } + end + + defp fetch_next_streaming_batch(%{current_url_index: idx, total_urls: total} = state) + when idx >= total do + {:halt, state} + end + + defp fetch_next_streaming_batch(state) do + case ensure_lazy_df_loaded(state) do + {:ok, state_with_df} -> + fetch_batch_from_lazy_df(state_with_df) + + {:error, _reason} -> + new_state = %{state | current_url_index: state.current_url_index + 1, current_offset: 0} + fetch_next_streaming_batch(new_state) + end + end + + defp ensure_lazy_df_loaded(%{current_lazy_df: nil} = state) do + {url, extension, auth_token} = Enum.at(state.urls, state.current_url_index) + + case load_lazy_dataframe_from_url(url, extension, auth_token) do + {:ok, lazy_df} -> + {:ok, %{state | current_lazy_df: lazy_df}} + + {:error, reason} -> + {:error, reason} + end + end + + defp ensure_lazy_df_loaded(state), do: {:ok, state} + + defp load_lazy_dataframe_from_url(url_or_path, extension, _auth_token) do + is_url = + String.starts_with?(url_or_path, "http://") or String.starts_with?(url_or_path, "https://") + + case {extension, is_url} do + {"parquet", true} -> + Explorer.DataFrame.from_parquet(url_or_path, lazy: true) + + {"parquet", false} -> + Explorer.DataFrame.from_parquet(url_or_path, lazy: true) + + {"csv", false} -> + Explorer.DataFrame.from_csv(url_or_path, lazy: true) + + {"jsonl", false} -> + Explorer.DataFrame.from_ndjson(url_or_path, lazy: true) + + {"csv", true} -> + case Explorer.DataFrame.from_csv(url_or_path) do + {:ok, df} -> {:ok, df} + error -> error + end + + {"jsonl", true} -> + case Explorer.DataFrame.from_ndjson(url_or_path) do + {:ok, df} -> {:ok, df} + error -> error + end + + _ -> + {:error, "Unsupported format for streaming: #{extension}"} + end + end + + defp fetch_batch_from_lazy_df(state) do + %{current_lazy_df: df, current_offset: offset, batch_size: batch_size} = state + + batch_df = + df + |> Explorer.DataFrame.slice(offset, batch_size) + |> then(fn sliced -> + if Explorer.DataFrame.lazy?(sliced) do + Explorer.DataFrame.collect(sliced) + else + sliced + end + end) + + batch_rows = Explorer.DataFrame.to_rows(batch_df) + num_rows = length(batch_rows) + + cond do + num_rows == 0 -> + new_state = %{ + state + | current_url_index: state.current_url_index + 1, + current_lazy_df: nil, + current_offset: 0 + } + + fetch_next_streaming_batch(new_state) + + num_rows < batch_size -> + new_state = %{ + state + | current_url_index: state.current_url_index + 1, + current_lazy_df: nil, + current_offset: 0 + } + + {batch_rows, new_state} + + true -> + new_state = %{state | current_offset: offset + batch_size} + {batch_rows, new_state} + end + end + + defp cleanup_streaming(_state), do: :ok end diff --git a/lib/elixir_datasets/utils/loader.ex b/lib/elixir_datasets/utils/loader.ex index 461f243..fcd8715 100644 --- a/lib/elixir_datasets/utils/loader.ex +++ b/lib/elixir_datasets/utils/loader.ex @@ -7,41 +7,107 @@ defmodule ElixirDatasets.Utils.Loader do """ @doc """ - Loads datasets from multiple file paths. + Loads datasets from multiple file paths with optional parallel processing. Automatically detects the file format based on the extension and loads each file accordingly. + When `num_proc` is greater than 1, files are loaded in parallel using multiple processes, + which can significantly speed up loading when dealing with multiple files. + + ## Parameters + + * `paths_with_extensions` - list of {path, extension} tuples to load + * `num_proc` - number of processes to use for parallel loading (default: 1). + When set to 1, files are loaded sequentially. When greater than 1, files + are loaded in parallel using `Task.async_stream` with the specified concurrency. ## Returns - * `{:ok, [datasets]}` - a list of loaded datasets + * `{:ok, [datasets]}` - a list of loaded datasets in the same order as input * `{:error, reason}` - if any file fails to load + + ## Examples + + # Sequential loading + paths = [{"data1.csv", "csv"}, {"data2.parquet", "parquet"}] + {:ok, datasets} = load_datasets_from_paths(paths) + + # Parallel loading with 4 processes + {:ok, datasets} = load_datasets_from_paths(paths, 4) + """ - @spec load_datasets_from_paths([{Path.t(), String.t()}]) :: + @spec load_datasets_from_paths([{Path.t(), String.t()}], pos_integer()) :: {:ok, [Explorer.DataFrame.t()]} | {:error, Exception.t()} - def load_datasets_from_paths(paths_with_extensions) do - Enum.reduce_while(paths_with_extensions, {:ok, []}, fn {path, extension}, {:ok, acc} -> - case load_dataset_from_file(path, extension) do - {:ok, df} -> {:cont, {:ok, [df | acc]}} - error -> {:halt, error} + def load_datasets_from_paths(paths_with_extensions, num_proc \\ 1) do + if num_proc > 1 do + paths_with_extensions + |> Task.async_stream( + fn {path, extension} -> + load_dataset_from_file(path, extension) + end, + max_concurrency: num_proc, + ordered: true + ) + |> Enum.reduce_while({:ok, []}, fn + {:ok, {:ok, df}}, {:ok, acc} -> + {:cont, {:ok, [df | acc]}} + + {:ok, {:error, _} = error}, _acc -> + {:halt, error} + + {:exit, reason}, _acc -> + {:halt, {:error, "task failed: #{inspect(reason)}"}} + end) + |> case do + {:ok, datasets} -> {:ok, Enum.reverse(datasets)} + error -> error end - end) - |> then(fn - {:ok, datasets} -> {:ok, Enum.reverse(datasets)} - error -> error - end) + else + Enum.reduce_while(paths_with_extensions, {:ok, []}, fn {path, extension}, {:ok, acc} -> + case load_dataset_from_file(path, extension) do + {:ok, df} -> {:cont, {:ok, [df | acc]}} + error -> {:halt, error} + end + end) + |> then(fn + {:ok, datasets} -> {:ok, Enum.reverse(datasets)} + error -> error + end) + end end @doc """ - Similar to `load_datasets_from_paths/1` but raises an error if loading fails. + Similar to `load_datasets_from_paths/2` but raises an error if loading fails. + + Loads datasets from multiple file paths with optional parallel processing. + Raises an exception if any file fails to load. + + ## Parameters + + * `paths_with_extensions` - list of {path, extension} tuples to load + * `num_proc` - number of processes to use for parallel loading (default: 1). + When set to 1, files are loaded sequentially. When greater than 1, files + are loaded in parallel. ## Returns - * a list of loaded datasets + * a list of loaded datasets in the same order as input * raises an error if any file fails to load + + ## Examples + + # Sequential loading + paths = [{"data1.csv", "csv"}, {"data2.parquet", "parquet"}] + datasets = load_datasets_from_paths!(paths) + + # Parallel loading with 4 processes + datasets = load_datasets_from_paths!(paths, 4) + """ - @spec load_datasets_from_paths!([{Path.t(), String.t()}]) :: [Explorer.DataFrame.t()] - def load_datasets_from_paths!(paths_with_extensions) do - case load_datasets_from_paths(paths_with_extensions) do + @spec load_datasets_from_paths!([{Path.t(), String.t()}], pos_integer()) :: [ + Explorer.DataFrame.t() + ] + def load_datasets_from_paths!(paths_with_extensions, num_proc \\ 1) do + case load_datasets_from_paths(paths_with_extensions, num_proc) do {:ok, datasets} -> datasets {:error, reason} -> raise reason end diff --git a/lib/huggingface/hub.ex b/lib/huggingface/hub.ex index 640b36d..ff84e75 100644 --- a/lib/huggingface/hub.ex +++ b/lib/huggingface/hub.ex @@ -62,12 +62,25 @@ defmodule ElixirDatasets.HuggingFace.Hub do * `:cache_scope` - a namespace to put the cached files under in the cache directory + * `:download_mode` - controls download/cache behavior. Can be: + - `:reuse_dataset_if_exists` (default) - reuse cached data if available + - `:force_redownload` - always download, even if cached + + * `:verification_mode` - controls whether basic verification checks + are applied. Can be: + - `:basic_checks` (default) - perform basic validation + - `:no_checks` - skip validation (for example, file existence checks) + Note: Currently, `:verification_mode` only distinguishes between + performing the default basic checks and skipping them via `:no_checks`. + """ @spec cached_download(String.t(), keyword()) :: {:ok, String.t()} | {:error, String.t()} def cached_download(url, opts \\ []) do cache_dir = opts[:cache_dir] || ElixirDatasets.cache_dir() offline = Keyword.get(opts, :offline, elixir_datasets_offline?()) auth_token = opts[:auth_token] + download_mode = opts[:download_mode] || :reuse_dataset_if_exists + verification_mode = opts[:verification_mode] || :basic_checks dir = Path.join(cache_dir, "huggingface") @@ -89,12 +102,33 @@ defmodule ElixirDatasets.HuggingFace.Hub do metadata_path = Path.join(dir, metadata_filename(url)) + # Handle force_redownload mode - delete cached files + if download_mode == :force_redownload do + File.rm(metadata_path) + end + cond do offline -> case load_json(metadata_path) do {:ok, %{"etag" => etag}} -> entry_path = Path.join(dir, entry_filename(url, etag)) - {:ok, entry_path} + + cond do + File.exists?(entry_path) -> + {:ok, entry_path} + + verification_mode == :no_checks -> + IO.warn( + "ElixirDatasets.HuggingFace.Hub.cached_download/2: " <> + "returning path to non-existent cached file in offline mode with " <> + ":no_checks verification_mode: #{entry_path}" + ) + + {:ok, entry_path} + + true -> + {:error, "cached file not found: #{entry_path}"} + end _ -> {:error, @@ -106,8 +140,11 @@ defmodule ElixirDatasets.HuggingFace.Hub do true -> with {:ok, etag, download_url, redirect?} <- head_download(url, headers) do - if entry_path = cached_path_for_etag(dir, url, etag) do - {:ok, entry_path} + cached_entry = + if download_mode != :force_redownload, do: cached_path_for_etag(dir, url, etag) + + if cached_entry do + {:ok, cached_entry} else entry_path = Path.join(dir, entry_filename(url, etag)) diff --git a/test/elixir_datasets_test.exs b/test/elixir_datasets_test.exs index 0049163..4399d9e 100644 --- a/test/elixir_datasets_test.exs +++ b/test/elixir_datasets_test.exs @@ -19,12 +19,12 @@ defmodule ElixirDatasetsTest do @invalid_repo_files %{"invalid.csv" => "\"1234567890asdfgh\""} test "Loads valid files" do - assert {:ok, _paths} = ElixirDatasets.do_load_spec(@repository, @valid_repo_files) + assert {:ok, _paths} = ElixirDatasets.do_load_spec(@repository, @valid_repo_files, 1) File.rm_rf!(@cache_dir) end test "Return error for invalid files" do - assert {:error, _reason} = ElixirDatasets.do_load_spec(@repository, @invalid_repo_files) + assert {:error, _reason} = ElixirDatasets.do_load_spec(@repository, @invalid_repo_files, 1) File.rm_rf!(@cache_dir) end @@ -84,15 +84,12 @@ defmodule ElixirDatasetsTest do end test "loads dataset offline" do - # Loads a dataset from Hugging Face in online mode repository = {:hf, "aaaaa32r/elixirDatasets", [cache_dir: @cache_dir]} assert {:ok, datasets} = ElixirDatasets.load_dataset(repository) assert is_list(datasets) - # Loads the same dataset in offline mode repositoryOffline = {:hf, "aaaaa32r/elixirDatasets", [cache_dir: @cache_dir, offline: true]} assert {:ok, datasets} = ElixirDatasets.load_dataset(repositoryOffline) assert is_list(datasets) - # Loads not existing dataset in offline mode repositoryOfflineInvalid = {:hf, "not/exists", [cache_dir: @cache_dir, offline: true]} assert {:error, _reason} = ElixirDatasets.load_dataset(repositoryOfflineInvalid) end @@ -105,15 +102,6 @@ defmodule ElixirDatasetsTest do assert is_list(datasets) end - # might be not exactly what we want? - # test "loads a dataset from Hugging Face with spec" do - # repositorySpec = {:hf, "aaaaa32r/elixirDatasets", [cache_dir: @cache_dir]} - - # assert {:ok, datasets} = - # ElixirDatasets.load_dataset(repositorySpec, spec: ["csv-test.csv"]) - # assert is_list(datasets) - # end - test "returns error for non-existent dataset" do repository = {:test, "nonexistent/repo", []} @@ -122,21 +110,290 @@ defmodule ElixirDatasetsTest do end end + test "loads dataset with split parameter from local directory" do + repository = {:local, "resources"} + assert {:ok, datasets} = ElixirDatasets.load_dataset(repository, split: "train") + assert is_list(datasets) + end + + test "loads dataset with name parameter filters files" do + repository = {:local, "resources"} + assert {:ok, datasets} = ElixirDatasets.load_dataset(repository, name: "csv") + assert is_list(datasets) + end + + test "loads dataset with streaming parameter returns Stream" do + repository = {:local, "resources"} + assert {:ok, stream} = ElixirDatasets.load_dataset(repository, streaming: true) + + assert is_function(stream, 2), "Expected a Stream (function/2)" + + rows = stream |> Enum.take(5) + assert is_list(rows) + assert Enum.all?(rows, &is_map/1), "Each row should be a map" + end + + test "streaming mode fetches data progressively" do + repository = {:local, "resources"} + assert {:ok, stream} = ElixirDatasets.load_dataset(repository, streaming: true) + + rows = stream |> Enum.take(3) + assert length(rows) <= 3 + assert Enum.all?(rows, &is_map/1) + end + + test "streaming with custom batch_size" do + repository = {:local, "resources"} + + assert {:ok, stream} = + ElixirDatasets.load_dataset( + repository, + streaming: true, + batch_size: 2 + ) + + rows = stream |> Enum.take(5) + assert is_list(rows) + end + + test "streaming is lazy - data fetched on demand, not upfront" do + repository = {:local, "resources"} + + {:ok, stream} = ElixirDatasets.load_dataset(repository, streaming: true) + + IO.puts("\n 🔍 Testing lazy streaming behavior:") + + IO.puts(" 1. Fetching first 3 rows...") + + {time1, rows1} = + :timer.tc(fn -> + stream |> Enum.take(3) + end) + + IO.puts(" ✓ Got #{length(rows1)} rows in #{time1 / 1000}ms") + assert length(rows1) == 3 + + IO.puts(" 2. Waiting 2 seconds...") + Process.sleep(2000) + + IO.puts(" 3. Fetching 5 rows from same stream...") + + {time2, rows2} = + :timer.tc(fn -> + stream |> Enum.take(5) + end) + + IO.puts(" ✓ Got #{length(rows2)} rows in #{time2 / 1000}ms") + assert length(rows2) == 5 + + IO.puts(" 4. Key insight: Stream is reusable, each Enum.take starts fresh") + + IO.puts(" 5. Demonstrating progressive fetching...") + + fetch_count = :counters.new(1, [:atomics]) + + counted_stream = + stream + |> Stream.map(fn row -> + :counters.add(fetch_count, 1, 1) + row + end) + + IO.puts(" Taking 2 rows...") + _small_batch = counted_stream |> Enum.take(2) + count_after_2 = :counters.get(fetch_count, 1) + IO.puts(" ✓ Fetched #{count_after_2} rows (should be ~2)") + + :counters.put(fetch_count, 1, 0) + + IO.puts(" Taking 10 rows...") + _large_batch = counted_stream |> Enum.take(10) + count_after_10 = :counters.get(fetch_count, 1) + IO.puts(" ✓ Fetched #{count_after_10} rows (should be ~10)") + + assert count_after_2 <= 5, "Should fetch minimal rows for small take" + assert count_after_10 >= 8, "Should fetch more rows for larger take" + + IO.puts(" ✅ Streaming is truly lazy - fetches only what's needed!") + end + + test "streaming from HuggingFace demonstrates progressive fetching" do + repository = @repository + + IO.puts("\n 🌐 Testing HuggingFace streaming:") + + {:ok, stream} = ElixirDatasets.load_dataset(repository, streaming: true, batch_size: 5) + IO.puts(" ✓ Created stream (no data downloaded yet)") + + IO.puts(" 1. Fetching only 3 rows...") + + {time1, rows1} = + :timer.tc(fn -> + stream |> Enum.take(3) + end) + + IO.puts(" ✓ Got #{length(rows1)} rows in #{Float.round(time1 / 1000, 2)}ms") + assert length(rows1) == 3 + + IO.puts(" 2. Waiting 1 second...") + Process.sleep(1000) + + IO.puts(" 3. Fetching 8 rows from same stream...") + + {time2, rows2} = + :timer.tc(fn -> + stream |> Enum.take(8) + end) + + IO.puts(" ✓ Got #{length(rows2)} rows in #{Float.round(time2 / 1000, 2)}ms") + assert length(rows2) == 8 + + IO.puts(" 4. Processing with Stream operations (lazy)...") + + result = + stream + |> Stream.filter(fn row -> Map.has_key?(row, "id") end) + |> Stream.take(5) + |> Enum.to_list() + + IO.puts(" ✓ Processed and got #{length(result)} filtered rows") + assert length(result) <= 5 + + IO.puts(" ✅ HuggingFace streaming works progressively!") + end + + test "verification_mode works with streaming" do + repository = @repository + + IO.puts("\n 🔍 Testing verification_mode with streaming:") + + IO.puts(" 1. With verification_mode: :basic_checks (default)...") + + {:ok, stream1} = + ElixirDatasets.load_dataset( + repository, + streaming: true, + verification_mode: :basic_checks + ) + + rows1 = stream1 |> Enum.take(2) + IO.puts(" ✓ Got #{length(rows1)} rows") + assert length(rows1) == 2 + + IO.puts(" 2. With verification_mode: :no_checks...") + + {:ok, stream2} = + ElixirDatasets.load_dataset( + repository, + streaming: true, + verification_mode: :no_checks + ) + + rows2 = stream2 |> Enum.take(2) + IO.puts(" ✓ Got #{length(rows2)} rows") + assert length(rows2) == 2 + + IO.puts(" ℹ️ Note: verification_mode applies to metadata fetching,") + IO.puts(" not to the streaming data itself (which comes from URLs)") + IO.puts(" ✅ verification_mode works with streaming!") + end + + test "loads dataset with split and name parameters combined" do + repository = {:local, "resources"} + + assert {:ok, datasets} = + ElixirDatasets.load_dataset(repository, split: "train", name: "csv") + + assert is_list(datasets) + end + + test "loads dataset with download_mode option" do + repository = {:local, "resources"} + + assert {:ok, datasets} = + ElixirDatasets.load_dataset(repository, download_mode: :reuse_dataset_if_exists) + + assert is_list(datasets) + end + + test "loads dataset with verification_mode option" do + repository = {:local, "resources"} + + assert {:ok, datasets} = + ElixirDatasets.load_dataset(repository, verification_mode: :no_checks) + + assert is_list(datasets) + end + + test "loads dataset with num_proc for parallel processing" do + repository = {:local, "resources"} + assert {:ok, datasets} = ElixirDatasets.load_dataset(repository, num_proc: 2) + assert is_list(datasets) + assert length(datasets) > 0 + end + + test "loads dataset with num_proc=1 (sequential)" do + repository = {:local, "resources"} + assert {:ok, datasets} = ElixirDatasets.load_dataset(repository, num_proc: 1) + assert is_list(datasets) + end + + test "num_proc=4 is faster than num_proc=1 for parallel loading" do + repository = @repository + + {time_sequential, {:ok, datasets_seq}} = + :timer.tc(fn -> + ElixirDatasets.load_dataset(repository, num_proc: 1) + end) + + {time_parallel, {:ok, datasets_par}} = + :timer.tc(fn -> + ElixirDatasets.load_dataset(repository, num_proc: 4) + end) + + assert length(datasets_seq) == length(datasets_par) + + total_rows_seq = + Enum.reduce(datasets_seq, 0, fn df, acc -> + acc + Explorer.DataFrame.n_rows(df) + end) + + total_rows_par = + Enum.reduce(datasets_par, 0, fn df, acc -> + acc + Explorer.DataFrame.n_rows(df) + end) + + assert total_rows_seq == total_rows_par + + time_seq_sec = time_sequential / 1_000_000 + time_par_sec = time_parallel / 1_000_000 + speedup = time_sequential / time_parallel + + IO.puts("\n ⏱️ Performance Comparison:") + IO.puts(" Sequential (num_proc: 1): #{Float.round(time_seq_sec, 3)}s") + IO.puts(" Parallel (num_proc: 4): #{Float.round(time_par_sec, 3)}s") + IO.puts(" Speedup: #{Float.round(speedup, 2)}x") + + assert time_parallel <= time_sequential * 1.5, + "Parallel processing overhead should be reasonable for this dataset size (no more than 1.5x slower than sequential)" + end + + test "num_proc produces same results as sequential" do + repository = {:local, "resources"} + + {:ok, datasets_seq} = ElixirDatasets.load_dataset(repository, num_proc: 1) + {:ok, datasets_par} = ElixirDatasets.load_dataset(repository, num_proc: 4) + + assert length(datasets_seq) == length(datasets_par) + seq_row_counts = Enum.map(datasets_seq, &Explorer.DataFrame.n_rows/1) |> Enum.sort() + par_row_counts = Enum.map(datasets_par, &Explorer.DataFrame.n_rows/1) |> Enum.sort() + + assert seq_row_counts == par_row_counts + end + # todo more tests for load_dataset/2 end - # describe "maybe_load_model_spec/3" do - # # defp maybe_load_model_spec(opts, repository, repo_files) do - # # spec_result = - # # if spec = opts[:spec] do - # # {:ok, spec} - # # else - # # do_load_spec(repository, repo_files) - # # end - - # test "" - # end - describe "cache_dir/0" do test "Cache directory in ENV" do if System.get_env("ELIXIR_DATASETS_CACHE_DIR") do diff --git a/test/huggingface/hub_test.exs b/test/huggingface/hub_test.exs index 5f9dae5..0e86eff 100644 --- a/test/huggingface/hub_test.exs +++ b/test/huggingface/hub_test.exs @@ -95,6 +95,144 @@ defmodule ElixirDatasets.HuggingFace.HubTest do # Clean up File.rm_rf!(@cache_dir) end + + test "with download_mode: :force_redownload" do + File.mkdir_p!(@cache_dir) + + assert {:ok, path1} = ElixirDatasets.HuggingFace.Hub.cached_download(@url, @opts) + assert File.exists?(path1) + + assert {:ok, path2} = + ElixirDatasets.HuggingFace.Hub.cached_download( + @url, + @opts ++ [download_mode: :force_redownload] + ) + + assert File.exists?(path2) + assert String.contains?(path1, @cache_dir) + assert String.contains?(path2, @cache_dir) + + File.rm_rf!(@cache_dir) + end + + test "with verification_mode: :no_checks" do + File.mkdir_p!(@cache_dir) + + assert {:ok, _path} = + ElixirDatasets.HuggingFace.Hub.cached_download( + @url, + @opts ++ [verification_mode: :no_checks] + ) + + File.rm_rf!(@cache_dir) + end + + test "verification_mode: :no_checks skips file existence check in offline mode" do + File.mkdir_p!(@cache_dir) + + assert {:ok, cached_path} = + ElixirDatasets.HuggingFace.Hub.cached_download(@url, @opts) + + assert File.exists?(cached_path) + + File.rm!(cached_path) + refute File.exists?(cached_path) + + assert {:error, error_msg} = + ElixirDatasets.HuggingFace.Hub.cached_download( + @url, + @opts ++ [offline: true, verification_mode: :basic_checks] + ) + + assert error_msg =~ "cached file not found" + + assert {:ok, returned_path} = + ElixirDatasets.HuggingFace.Hub.cached_download( + @url, + @opts ++ [offline: true, verification_mode: :no_checks] + ) + + assert returned_path == cached_path + refute File.exists?(returned_path) + + File.rm_rf!(@cache_dir) + end + + test "verification_mode: :basic_checks fails when cached file is missing" do + File.mkdir_p!(@cache_dir) + + assert {:ok, cached_path} = + ElixirDatasets.HuggingFace.Hub.cached_download(@url, @opts) + + assert File.exists?(cached_path) + + File.rm!(cached_path) + + assert {:error, error_msg} = + ElixirDatasets.HuggingFace.Hub.cached_download( + @url, + @opts ++ [offline: true, verification_mode: :basic_checks] + ) + + assert error_msg =~ "cached file not found" + + File.rm_rf!(@cache_dir) + end + + test "verification_mode comparison: :basic_checks vs :no_checks" do + File.mkdir_p!(@cache_dir) + + {:ok, cached_path} = ElixirDatasets.HuggingFace.Hub.cached_download(@url, @opts) + File.rm!(cached_path) + + IO.puts("\n 🔍 Testing verification_mode behavior:") + IO.puts(" Cache file deleted: #{cached_path}") + + IO.puts("\n 1. With verification_mode: :basic_checks (offline)") + + result_basic = + ElixirDatasets.HuggingFace.Hub.cached_download( + @url, + @opts ++ [offline: true, verification_mode: :basic_checks] + ) + + case result_basic do + {:error, msg} -> + IO.puts(" ✓ Failed as expected: #{msg}") + assert msg =~ "cached file not found" + + {:ok, _} -> + IO.puts(" ✗ Should have failed!") + flunk("Expected :basic_checks to fail with missing file") + end + + IO.puts("\n 2. With verification_mode: :no_checks (offline)") + + result_no_checks = + ElixirDatasets.HuggingFace.Hub.cached_download( + @url, + @opts ++ [offline: true, verification_mode: :no_checks] + ) + + case result_no_checks do + {:ok, path} -> + IO.puts(" ✓ Succeeded (returns path without checking)") + IO.puts(" ✓ Returned path: #{path}") + IO.puts(" ✓ File exists? #{File.exists?(path)}") + assert path == cached_path + refute File.exists?(path) + + {:error, msg} -> + IO.puts(" ✗ Should have succeeded!") + flunk("Expected :no_checks to succeed, got error: #{msg}") + end + + IO.puts("\n ✅ verification_mode works correctly!") + IO.puts(" :basic_checks = validates file exists") + IO.puts(" :no_checks = skips validation (faster but risky)") + + File.rm_rf!(@cache_dir) + end end describe "cached_path_for_etag/3" do