From 687b3e30000a01c16d0af5f85f7a896f709cee4d Mon Sep 17 00:00:00 2001 From: guptaakashdeep Date: Wed, 30 Jul 2025 11:26:18 +0530 Subject: [PATCH 1/8] feat: Benchmarking implementation: Initial Commit --- .gitignore | 1 + Cargo.lock | 278 ++++++++++++++++++++++++++++++++-- Cargo.toml | 13 ++ benches/README.md | 36 +++++ benches/pagerank_benchmark.rs | 50 ++++++ run_benchmarks.py | 190 +++++++++++++++++++++++ src/lib.rs | 46 +----- src/pagerank.rs | 29 ++-- src/shortest_paths.rs | 4 +- src/util.rs | 92 +++++++++++ 10 files changed, 664 insertions(+), 75 deletions(-) create mode 100644 benches/README.md create mode 100644 benches/pagerank_benchmark.rs create mode 100644 run_benchmarks.py create mode 100644 src/util.rs diff --git a/.gitignore b/.gitignore index 998fd88..09b1145 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ .cargo/ debug/ release/ +/benches/data/ diff --git a/Cargo.lock b/Cargo.lock index 64d2787..aec0f0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,18 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]] +name = "anstyle" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" + [[package]] name = "arrayref" version = "0.3.9" @@ -474,6 +486,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.2.30" @@ -513,6 +531,58 @@ dependencies = [ "phf", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + +[[package]] +name = "clap" +version = "4.5.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be92d32e80243a54711e5d7ce823c35c41c9d929dc4ab58e1276f625841aadf9" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.5.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "707eab41e9622f9139419d573eca0900137718000c517d47da73045f54331c3d" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" + [[package]] name = "comfy-table" version = "7.1.4" @@ -573,6 +643,63 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -668,7 +795,7 @@ dependencies = [ "datafusion-sql", "flate2", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -703,7 +830,7 @@ dependencies = [ "datafusion-session", "datafusion-sql", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -791,7 +918,7 @@ dependencies = [ "flate2", "futures", "glob", - "itertools", + "itertools 0.14.0", "log", "object_store", "parquet", @@ -876,7 +1003,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -940,7 +1067,7 @@ dependencies = [ "arrow", "datafusion-common", "indexmap", - "itertools", + "itertools 0.14.0", "paste", ] @@ -963,7 +1090,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-macros", "hex", - "itertools", + "itertools 0.14.0", "log", "md-5", "rand", @@ -1023,7 +1150,7 @@ dependencies = [ "datafusion-functions-aggregate", "datafusion-macros", "datafusion-physical-expr-common", - "itertools", + "itertools 0.14.0", "log", "paste", ] @@ -1095,7 +1222,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "indexmap", - "itertools", + "itertools 0.14.0", "log", "recursive", "regex", @@ -1118,7 +1245,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools", + "itertools 0.14.0", "log", "paste", "petgraph", @@ -1135,7 +1262,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "itertools", + "itertools 0.14.0", ] [[package]] @@ -1152,7 +1279,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", - "itertools", + "itertools 0.14.0", "log", "recursive", ] @@ -1180,7 +1307,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools", + "itertools 0.14.0", "log", "parking_lot", "pin-project-lite", @@ -1204,7 +1331,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-sql", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -1464,6 +1591,7 @@ checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" name = "graphframes-rs" version = "0.1.0" dependencies = [ + "criterion", "datafusion", "tokio", ] @@ -1500,6 +1628,12 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -1681,6 +1815,26 @@ dependencies = [ "libc", ] +[[package]] +name = "is-terminal" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.59.0", +] + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -1980,7 +2134,7 @@ dependencies = [ "futures", "http", "humantime", - "itertools", + "itertools 0.14.0", "parking_lot", "percent-encoding", "thiserror", @@ -1998,6 +2152,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "ordered-float" version = "2.10.1" @@ -2126,6 +2286,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "potential_utf" version = "0.1.2" @@ -2206,6 +2394,26 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "recursive" version = "0.1.1" @@ -2380,6 +2588,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + [[package]] name = "simdutf8" version = "0.1.5" @@ -2410,6 +2627,16 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "sqlparser" version = "0.55.0" @@ -2548,6 +2775,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tokio" version = "1.46.1" @@ -2559,9 +2796,13 @@ dependencies = [ "io-uring", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "slab", + "socket2", "tokio-macros", + "windows-sys 0.52.0", ] [[package]] @@ -2867,6 +3108,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.59.0" diff --git a/Cargo.toml b/Cargo.toml index 4118c38..5f64ef8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,16 @@ exclude = [ [dependencies] datafusion = "48.0.1" tokio = {version = "1"} + +[dev-dependencies] +criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } +tokio = { version = "1", features = ["full"] } + +[[bench]] +name = "pagerank_benchmark" +harness = false # To disable Rust's default benchmarking and use the Criterion one + +# Adding more benchmarks +# [[bench]] +# name = "shortestdistance_benchmark" +# harness = false \ No newline at end of file diff --git a/benches/README.md b/benches/README.md new file mode 100644 index 0000000..c87b2a6 --- /dev/null +++ b/benches/README.md @@ -0,0 +1,36 @@ +# Running Benchmarks for Graphframe-rs + +Benchmarking for Graphframe-rs are currently done on LDBC Graphalytics [datasets](https://ldbcouncil.org/benchmarks/graphalytics/datasets/). +Benchmarking runs and reports are executed/generated as html-reports using Rust Criterion crate. + +## How to run benchmarks ? + +`run_benchmarks.py` file is the main source for running the benchmarks. + +### Dependencies + +Running benchmarks using python requires: + +```text +Python Package: +- requests # for downloading dataset +``` + +```text +CLI utility: +- zstd # For decompressing the downloaded dataset +- tar # For unzipping the decompressed dataset +``` + +### Parameters for `run_benchmarks.py` + +- `--dataset`: LDBC dataset name on which user want to run the benchmark (for e.g. test-pr-directed, cit-Patents). Dataset name are exactly same as mentioned in LDBC website. +- `--checkpoint_interval`: If user wants to define a specific number of checkpoints for Algorithms to run on. `default: 1` + +```bash +python3 run_benchmarks.py --dataset cit-Patents --checkpoint_interval 2 +``` + +## Benchmarking Reports + +Criterion benchmarking html-reports can be seen by opening `target/criterion/report/index.html` in any browser after benchmarking completes. diff --git a/benches/pagerank_benchmark.rs b/benches/pagerank_benchmark.rs new file mode 100644 index 0000000..9d2210d --- /dev/null +++ b/benches/pagerank_benchmark.rs @@ -0,0 +1,50 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use graphframes_rs::util::create_ldbc_test_graph; +use std::env; +use tokio::runtime::Runtime; + +fn benchmark_pagerank(c: &mut Criterion) { + let dataset_name = + env::var("BENCHMARK_DATASET").expect("BENCHMARK_DATASET environment variable not set"); + + let checkpoint_interval: usize = env::var("CHECKPOINT_INTERVAL") + .expect("BENCHMARK_DATASET environment variable not set") + .parse() + .expect("CHECKPOINT_INTERVAL is not a valid int"); + + let mut group = c.benchmark_group("PageRank"); + + // Create a Tokio runtime to execute the async graph loading function. + let rt = Runtime::new().unwrap(); + + // Load the graph data once before running the benchmark. + let graph = rt + .block_on(create_ldbc_test_graph(&dataset_name, true)) + .expect("Failed to create test graph"); + + // Creating pagerank_builder here so to exclude the time of generation in each iteration + let pagerank_builder = graph + .pagerank() + .max_iter(10) + .checkpoint_interval(checkpoint_interval) + .reset_prob(0.15); + + // Define the benchmark. + // Criterion runs the code inside the closure many times to get a reliable measurement. + group.bench_function( + String::from( + "pagerank-".to_owned() + &dataset_name + "-cp-" + &checkpoint_interval.to_string(), + ), + |b| { + // Use the `to_async` adapter to benchmark an async function. + b.to_async(&rt).iter(|| async { + pagerank_builder.clone().run().await.unwrap(); + }) + }, + ); + + group.finish(); +} + +criterion_group!(benches, benchmark_pagerank); +criterion_main!(benches); diff --git a/run_benchmarks.py b/run_benchmarks.py new file mode 100644 index 0000000..479e1f1 --- /dev/null +++ b/run_benchmarks.py @@ -0,0 +1,190 @@ +import argparse +import os +import pathlib +import requests +import subprocess +import sys +import shutil + +# The base URL for downloading Graphalytics datasets. +BASE_URL = "https://datasets.ldbcouncil.org/graphalytics" + +# The local directory where benchmark data will be stored. +BENCH_DATA_DIR = pathlib.Path("benches") / "data" / "ldbc" + + +def prepare_dataset(dataset_name: str): + """ + Ensures the dataset is downloaded, decompressed, renamed, and ready for use. + """ + dataset_dir = BENCH_DATA_DIR / dataset_name + archive_path = BENCH_DATA_DIR / dataset_name / f"{dataset_name}.tar.zst" + tar_path = BENCH_DATA_DIR / dataset_name / f"{dataset_name}.tar" + + # If the final extracted directory exists, we are ready to run benchmarks. + if dataset_dir.is_dir(): + print(f"Dataset '{dataset_name}' is ready.") + return + + # make dataset_dir if doesn't exist + os.mkdir(dataset_dir) + + # If the archive doesn't exist, download it. + if not archive_path.exists(): + print(f"Dataset archive '{archive_path}' not found. Downloading...") + archive_url = f"{BASE_URL}/{dataset_name}.tar.zst" + try: + response = requests.get(archive_url, stream=True) + response.raise_for_status() + with open(archive_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + print(f"Successfully downloaded {archive_path}") + except requests.exceptions.RequestException as e: + print( + f"Error: Failed to download dataset from {archive_url}", file=sys.stderr + ) + print(e, file=sys.stderr) + sys.exit(1) + + # Now, decompress and extract the archive using command-line tools. + print("Decompressing dataset...") + + # Check for required commands. + if not shutil.which("unzstd"): + print( + "Error: 'unzstd' command not found. Please install zstandard.", + file=sys.stderr, + ) + sys.exit(1) + if not shutil.which("tar"): + print("Error: 'tar' command not found.", file=sys.stderr) + sys.exit(1) + + try: + # Decompress .zst file using unzstd. + print(f"Running: unzstd -f {archive_path}") + subprocess.run( + ["unzstd", "-f", str(archive_path)], check=True, capture_output=True + ) + + # Extract .tar file. + print(f"Running: tar -xf {tar_path} -C {dataset_dir}") + subprocess.run( + ["tar", "-xf", str(tar_path), "-C", str(dataset_dir)], + check=True, + capture_output=True, + ) + + # Clean up the intermediate .tar file. + print(f"Cleaning up {tar_path}") + os.remove(tar_path) + + print("Decompression and extraction complete.") + + # Rename data files to add .csv extension. + print(f"Renaming files in {dataset_dir} to add .csv extension...") + for dirpath, _, filenames in os.walk(dataset_dir): + for filename in filenames: + if (not filename.endswith(".properties")) and ( + not filename.endswith(".tar.zst") + ): + old_path = pathlib.Path(dirpath) / filename + new_path = old_path.with_name(f"{old_path.name}.csv") + print(f" Renaming {old_path} to {new_path}") + os.rename(old_path, new_path) + print("File renaming complete.") + + except subprocess.CalledProcessError as e: + print(f"Error during decompression: {e}", file=sys.stderr) + print(f"Stdout: {e.stdout.decode() if e.stdout else 'N/A'}", file=sys.stderr) + print(f"Stderr: {e.stderr.decode() if e.stderr else 'N/A'}", file=sys.stderr) + sys.exit(1) + except FileNotFoundError: + print( + f"Error: Could not find intermediate file {tar_path} for cleanup.", + file=sys.stderr, + ) + sys.exit(1) + + +def run_benchmarks(dataset_name: str, checkpoint_interval: int): + """ + Runs the Rust benchmarks using 'cargo bench', passing the dataset name + as an environment variable. + """ + print(f"\nRunning benchmarks for dataset: {dataset_name}") + + # Set the dataset name in an environment variable for the benchmark process. + env = os.environ.copy() + env["BENCHMARK_DATASET"] = dataset_name + env["CHECKPOINT_INTERVAL"] = checkpoint_interval + + # Execute 'cargo bench' and stream its output. + try: + process = subprocess.Popen( + ["cargo", "bench"], + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + + # Read and print output line by line. + for line in iter(process.stdout.readline, ""): + print(line, end="") + + process.stdout.close() + return_code = process.wait() + + if return_code != 0: + print( + f"\nError: Benchmark process failed with exit code {return_code}", + file=sys.stderr, + ) + sys.exit(return_code) + + except FileNotFoundError: + print( + "Error: 'cargo' command not found. Is Rust installed and in your PATH?", + file=sys.stderr, + ) + sys.exit(1) + except subprocess.CalledProcessError as e: + print(f"Error running benchmarks: {e}", file=sys.stderr) + sys.exit(1) + + +def main(): + """ + Main function to parse arguments and orchestrate the benchmark run. + """ + parser = argparse.ArgumentParser( + description="A Python script to download datasets and run GraphFrame benchmarks." + ) + parser.add_argument( + "--dataset", + type=str, + required=True, + help="The name of the Graphalytics dataset to download and use for benchmarking (e.g., 'test-pr-directed').", + ) + parser.add_argument( + "--checkpoint_interval", + type=str, + required=False, + help="Providing checkpoint_interval to be used in algorithms to run benchmark.", + ) + args = parser.parse_args() + dataset = args.dataset + checkpoint_interval = args.checkpoint_interval if args.checkpoint_interval else 1 + + # Ensure the base data directory exists. + BENCH_DATA_DIR.mkdir(parents=True, exist_ok=True) + + prepare_dataset(dataset) + run_benchmarks(dataset, checkpoint_interval) + + +if __name__ == "__main__": + main() diff --git a/src/lib.rs b/src/lib.rs index 3f39f42..301cfc1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ mod pagerank; mod pregel; mod shortest_paths; +pub mod util; use datafusion::error::Result; use datafusion::functions_aggregate::count::count; @@ -91,51 +92,6 @@ mod tests { Ok(GraphFrame { vertices, edges }) } - // Creates GraphFrame from ldbc datasets. - // dataset: name of ldbc dataset like tested-pr-directed, wiki-Talk, etc. - pub async fn create_ldbc_test_graph(dataset: &str) -> Result { - let ctx = SessionContext::new(); - - let manifest_dir = env!("CARGO_MANIFEST_DIR"); - - let edge_schema = Schema::new(vec![ - Field::new("src", DataType::Int64, false), - Field::new("dst", DataType::Int64, false), - ]); - let vertices_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]); - - let edges_path = format!( - "{}/testing/data/ldbc/{}/{}.e.csv", - manifest_dir, dataset, dataset - ); - let vertices_path = format!( - "{}/testing/data/ldbc/{}/{}.v.csv", - manifest_dir, dataset, dataset - ); - - let edges = ctx - .read_csv( - &edges_path, - CsvReadOptions::new() - .delimiter(b' ') - .has_header(false) - .schema(&edge_schema), - ) - .await?; - - let vertices = ctx - .read_csv( - &vertices_path, - CsvReadOptions::new() - .delimiter(b' ') - .has_header(false) - .schema(&vertices_schema), - ) - .await?; - - Ok(GraphFrame { vertices, edges }) - } - #[tokio::test] async fn test_num_nodes() -> Result<()> { let graph = create_test_graph()?; diff --git a/src/pagerank.rs b/src/pagerank.rs index d3ebb3c..7337582 100644 --- a/src/pagerank.rs +++ b/src/pagerank.rs @@ -1,12 +1,15 @@ -use crate::GraphFrame; use crate::pregel::{MessageDirection, PREGEL_MSG, PregelBuilder, pregel_src}; +use crate::{GraphFrame, VERTEX_ID}; use datafusion::error::Result; use datafusion::functions_aggregate::sum::sum; use datafusion::prelude::*; +/// Column name for pagerank in the Page Rank algorithm +pub const PAGERANK: &str = "pagerank"; + /// A builder for the PageRank algorithm. /// -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PageRankBuilder<'a> { graph: &'a GraphFrame, max_iter: usize, @@ -65,13 +68,13 @@ impl<'a> PageRankBuilder<'a> { .max_iterations(self.max_iter) .checkpoint_interval(self.checkpoint_interval) .add_vertex_column( - "pagerank", + PAGERANK, lit(reset_prob_per_vertices), // All vertices start with a rank of 1/N lit(reset_prob_per_vertices) + lit(alpha) * col(PREGEL_MSG), // PageRank calculation ) .add_vertex_column("out_degree", col("out_degree"), col("out_degree")) // out_degrees are static .add_message( - pregel_src("pagerank") / pregel_src("out_degree"), + pregel_src(PAGERANK) / pregel_src("out_degree"), MessageDirection::SrcToDst, ) .with_aggregate_expr(sum(col(PREGEL_MSG))); @@ -83,7 +86,7 @@ impl<'a> PageRankBuilder<'a> { // Calculating the pagerank aggregation let aggregated_rank = calculated_page_ranks .clone() - .aggregate(vec![], vec![sum(col("pagerank")).alias("pagerank_sum")])? + .aggregate(vec![], vec![sum(col(PAGERANK)).alias("pagerank_sum")])? .cache() .await?; @@ -91,8 +94,8 @@ impl<'a> PageRankBuilder<'a> { let final_page_ranks = calculated_page_ranks.join(aggregated_rank, JoinType::Inner, &[], &[], None)?; Ok(final_page_ranks.select(vec![ - col("id"), - (col("pagerank") / col("pagerank_sum")).alias("pagerank"), + col(VERTEX_ID), + (col(PAGERANK) / col("pagerank_sum")).alias(PAGERANK), ])?) } } @@ -107,9 +110,8 @@ impl GraphFrame { #[cfg(test)] mod tests { use super::*; - use crate::tests::create_ldbc_test_graph; + use crate::util::create_ldbc_test_graph; use datafusion::arrow::datatypes::{DataType, Field, Schema}; - // Gets the expected pagerank results from the mentioned ldbc dataset async fn get_ldbc_pr_results(dataset: &str) -> Result { let ctx = SessionContext::new(); @@ -137,13 +139,12 @@ mod tests { #[tokio::test] async fn test_pagerank_run() -> Result<()> { let test_dataset: &str = "test-pr-directed"; - let graph = create_ldbc_test_graph(test_dataset).await?; - + let graph = create_ldbc_test_graph(test_dataset, false).await?; let calculated_page_rank = graph .pagerank() .max_iter(14) .reset_prob(0.15) - .checkpoint_interval(2) + .checkpoint_interval(1) .run() .await?; let ldbc_page_rank = get_ldbc_pr_results(test_dataset).await?; @@ -152,11 +153,11 @@ mod tests { .join( ldbc_page_rank, JoinType::Left, - &["id"], + &[VERTEX_ID], &["vertex_id"], None, )? - .with_column("difference", abs(col("pagerank") - col("expected_pr")))? + .with_column("difference", abs(col(PAGERANK) - col("expected_pr")))? .filter(col("difference").gt(lit(0.0015)))?; // comparison_df.clone().show().await?; diff --git a/src/shortest_paths.rs b/src/shortest_paths.rs index 0fa4bd5..75539b3 100644 --- a/src/shortest_paths.rs +++ b/src/shortest_paths.rs @@ -324,7 +324,7 @@ impl GraphFrame { #[cfg(test)] mod tests { use super::*; - use crate::tests::create_ldbc_test_graph; + use crate::util::create_ldbc_test_graph; use datafusion::arrow::array::{Int64Array, RecordBatch}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::prelude::SessionContext; @@ -500,7 +500,7 @@ mod tests { #[tokio::test] async fn test_ldbc() -> Result<()> { let expected_distances = get_ldbc_bfs_results("test-bfs-directed").await?; - let graph = create_ldbc_test_graph("test-bfs-directed").await?; + let graph = create_ldbc_test_graph("test-bfs-directed", false).await?; let results = graph .shortest_paths(vec![1]) diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..d20d940 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,92 @@ +use crate::GraphFrame; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::error::Result; +use datafusion::prelude::{CsvReadOptions, SessionContext}; +use std::collections::HashMap; +use std::io::Result as ioResult; +use std::{env, fs}; + +// Gets the basepath of the dataset based on if it's benchmark runs or test runs +/// # Arguments +/// +/// * `benchmark_run`: true for benchmark runs to read data from bench/data dir, false for tests to read data from testing/data. +pub fn _get_dataset_base_path(benchmark_run: bool) -> Result { + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let dir_name = if benchmark_run { "benches" } else { "testing" }; + let base_path = format!("{}/{}/data/ldbc", manifest_dir, dir_name); + Ok(base_path) +} + +/// Creates a GraphFrame from an LDBC-style dataset. +/// +/// The dataset is expected to be located in the `benches/data/` directory. +/// +/// # Arguments +/// +/// * `dataset`: The name of the dataset directory (e.g., "test-pr-directed"). +/// * `benchmark_run`: true for benchmark runs to read data from bench/data dir, false for tests to read data from testing/data. +pub async fn create_ldbc_test_graph(dataset: &str, benchmark_run: bool) -> Result { + let ctx = SessionContext::new(); + + let edge_schema = Schema::new(vec![ + Field::new("src", DataType::Int64, false), + Field::new("dst", DataType::Int64, false), + ]); + let vertices_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]); + + let ds_base_path = _get_dataset_base_path(benchmark_run)?; + + let edges_path = format!("{}/{}/{}.e.csv", ds_base_path, dataset, dataset); + let vertices_path = format!("{}/{}/{}.v.csv", ds_base_path, dataset, dataset); + + println!("{}", edges_path); + println!("{}", vertices_path); + + let edges = ctx + .read_csv( + &edges_path, + CsvReadOptions::new() + .delimiter(b' ') + .has_header(false) + .schema(&edge_schema), + ) + .await?; + + let vertices = ctx + .read_csv( + &vertices_path, + CsvReadOptions::new() + .delimiter(b' ') + .has_header(false) + .schema(&vertices_schema), + ) + .await?; + Ok(GraphFrame { vertices, edges }) +} + +// Reads the ldbc dataset properties file and converts it into a HashMap +pub fn parse_ldbc_properties_file( + dataset: &str, + benchmark_run: bool, +) -> ioResult> { + let prop_fp = format!( + "{}/{}/{}.properties", + _get_dataset_base_path(benchmark_run)?, + dataset, + dataset + ); + let content = fs::read_to_string(prop_fp)?; + let mut properties_map: HashMap = HashMap::new(); + + for line in content.lines() { + let trimmed_line = line.trim(); + + if trimmed_line.is_empty() || trimmed_line.starts_with("#") { + continue; + } + if let Some((key, value)) = trimmed_line.split_once("=") { + properties_map.insert(key.trim().to_string(), value.trim().to_string()); + } + } + Ok(properties_map) +} From 6bc461e0deccfe785e8d846dc1fdd49e8faa3fa8 Mon Sep 17 00:00:00 2001 From: guptaakashdeep Date: Wed, 30 Jul 2025 13:06:58 +0530 Subject: [PATCH 2/8] feat: ability to run single benchmark. --- benches/README.md | 7 ++++++- run_benchmarks.py | 21 +++++++++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/benches/README.md b/benches/README.md index c87b2a6..0f5acb4 100644 --- a/benches/README.md +++ b/benches/README.md @@ -24,11 +24,16 @@ CLI utility: ### Parameters for `run_benchmarks.py` -- `--dataset`: LDBC dataset name on which user want to run the benchmark (for e.g. test-pr-directed, cit-Patents). Dataset name are exactly same as mentioned in LDBC website. +- `--dataset`: [MANDATORY] LDBC dataset name on which user want to run the benchmark (for e.g. test-pr-directed, cit-Patents). Dataset name are exactly same as mentioned in LDBC website. - `--checkpoint_interval`: If user wants to define a specific number of checkpoints for Algorithms to run on. `default: 1` +- `--name`: If a particular benchmark needs to run. Name should be same as the `[[bench]]` names present in `Cargo.toml` ```bash +# Running all the benchmarks python3 run_benchmarks.py --dataset cit-Patents --checkpoint_interval 2 + +# Running an individual benchmark +python3 run_benchmarks.py --dataset cit-Patents --checkpoint_interval 2 --name pagerank_benchmark ``` ## Benchmarking Reports diff --git a/run_benchmarks.py b/run_benchmarks.py index 479e1f1..7998ce0 100644 --- a/run_benchmarks.py +++ b/run_benchmarks.py @@ -108,7 +108,7 @@ def prepare_dataset(dataset_name: str): sys.exit(1) -def run_benchmarks(dataset_name: str, checkpoint_interval: int): +def run_benchmarks(dataset_name: str, checkpoint_interval: int, benchmark_name: str): """ Runs the Rust benchmarks using 'cargo bench', passing the dataset name as an environment variable. @@ -122,8 +122,13 @@ def run_benchmarks(dataset_name: str, checkpoint_interval: int): # Execute 'cargo bench' and stream its output. try: + cmd = ( + ["cargo", "bench"] + if not benchmark_name + else ["cargo", "bench", "--bench", benchmark_name] + ) process = subprocess.Popen( - ["cargo", "bench"], + cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -172,18 +177,26 @@ def main(): parser.add_argument( "--checkpoint_interval", type=str, + default="1", required=False, help="Providing checkpoint_interval to be used in algorithms to run benchmark.", ) + parser.add_argument( + "--name", + type=str, + required=False, + help="Name of the benchmark that needs to run.", + ) args = parser.parse_args() dataset = args.dataset - checkpoint_interval = args.checkpoint_interval if args.checkpoint_interval else 1 + checkpoint_interval = args.checkpoint_interval + benchmark_name = args.name # Ensure the base data directory exists. BENCH_DATA_DIR.mkdir(parents=True, exist_ok=True) prepare_dataset(dataset) - run_benchmarks(dataset, checkpoint_interval) + run_benchmarks(dataset, checkpoint_interval, benchmark_name) if __name__ == "__main__": From f58d0900561dbf45dda58d3836d20450c4c6592d Mon Sep 17 00:00:00 2001 From: guptaakashdeep Date: Wed, 30 Jul 2025 17:23:01 +0530 Subject: [PATCH 3/8] Changes as per the code review --- Cargo.lock | 27 ++++++++++++++++++ Cargo.toml | 1 + benches/pagerank_benchmark.rs | 10 +++++-- src/pagerank.rs | 2 +- src/shortest_paths.rs | 2 +- src/util.rs | 53 ++++++++++++++++++++++------------- 6 files changed, 72 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aec0f0b..cd16e03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1383,6 +1383,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1593,6 +1602,7 @@ version = "0.1.0" dependencies = [ "criterion", "datafusion", + "java-properties", "tokio", ] @@ -1850,6 +1860,17 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "java-properties" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37bf6f484471c451f2b51eabd9e66b3fa7274550c5ec4b6c3d6070840945117f" +dependencies = [ + "encoding_rs", + "lazy_static", + "regex", +] + [[package]] name = "jobserver" version = "0.1.33" @@ -1870,6 +1891,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "lexical-core" version = "1.0.5" diff --git a/Cargo.toml b/Cargo.toml index 5f64ef8..2c1c1ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ exclude = [ [dependencies] datafusion = "48.0.1" tokio = {version = "1"} +java-properties = "2.0.0" [dev-dependencies] criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } diff --git a/benches/pagerank_benchmark.rs b/benches/pagerank_benchmark.rs index 9d2210d..d32aa35 100644 --- a/benches/pagerank_benchmark.rs +++ b/benches/pagerank_benchmark.rs @@ -19,7 +19,7 @@ fn benchmark_pagerank(c: &mut Criterion) { // Load the graph data once before running the benchmark. let graph = rt - .block_on(create_ldbc_test_graph(&dataset_name, true)) + .block_on(create_ldbc_test_graph(&dataset_name, true, false)) .expect("Failed to create test graph"); // Creating pagerank_builder here so to exclude the time of generation in each iteration @@ -38,7 +38,13 @@ fn benchmark_pagerank(c: &mut Criterion) { |b| { // Use the `to_async` adapter to benchmark an async function. b.to_async(&rt).iter(|| async { - pagerank_builder.clone().run().await.unwrap(); + let _ = pagerank_builder + .clone() + .run() + .await + .unwrap() + .collect() + .await; }) }, ); diff --git a/src/pagerank.rs b/src/pagerank.rs index 7337582..6a681ea 100644 --- a/src/pagerank.rs +++ b/src/pagerank.rs @@ -139,7 +139,7 @@ mod tests { #[tokio::test] async fn test_pagerank_run() -> Result<()> { let test_dataset: &str = "test-pr-directed"; - let graph = create_ldbc_test_graph(test_dataset, false).await?; + let graph = create_ldbc_test_graph(test_dataset, false, false).await?; let calculated_page_rank = graph .pagerank() .max_iter(14) diff --git a/src/shortest_paths.rs b/src/shortest_paths.rs index 75539b3..f5c0a0c 100644 --- a/src/shortest_paths.rs +++ b/src/shortest_paths.rs @@ -500,7 +500,7 @@ mod tests { #[tokio::test] async fn test_ldbc() -> Result<()> { let expected_distances = get_ldbc_bfs_results("test-bfs-directed").await?; - let graph = create_ldbc_test_graph("test-bfs-directed", false).await?; + let graph = create_ldbc_test_graph("test-bfs-directed", false, false).await?; let results = graph .shortest_paths(vec![1]) diff --git a/src/util.rs b/src/util.rs index d20d940..65d1355 100644 --- a/src/util.rs +++ b/src/util.rs @@ -2,21 +2,40 @@ use crate::GraphFrame; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::error::Result; use datafusion::prelude::{CsvReadOptions, SessionContext}; +use java_properties::read; use std::collections::HashMap; -use std::io::Result as ioResult; -use std::{env, fs}; +use std::env; +use std::fs::File; +use std::io::BufReader; +use std::io::{self, Result as ioResult}; // Gets the basepath of the dataset based on if it's benchmark runs or test runs /// # Arguments /// /// * `benchmark_run`: true for benchmark runs to read data from bench/data dir, false for tests to read data from testing/data. -pub fn _get_dataset_base_path(benchmark_run: bool) -> Result { +fn _get_dataset_base_path(benchmark_run: bool) -> Result { let manifest_dir = env!("CARGO_MANIFEST_DIR"); let dir_name = if benchmark_run { "benches" } else { "testing" }; let base_path = format!("{}/{}/data/ldbc", manifest_dir, dir_name); Ok(base_path) } +/// Creates Schema for Graphframe edges Dataframe based on is_3d flag +/// # Arguments +/// * `is_3d`: Boolean value that defines if edges has weights field or not. +fn _create_edge_schema(is_3d: bool) -> Schema { + let mut edge_fields = vec![ + Field::new("src", DataType::Int64, false), + Field::new("dst", DataType::Int64, false), + ]; + + if is_3d { + edge_fields.push(Field::new("weights", DataType::Float64, false)) + } + + Schema::new(edge_fields) +} + /// Creates a GraphFrame from an LDBC-style dataset. /// /// The dataset is expected to be located in the `benches/data/` directory. @@ -25,13 +44,15 @@ pub fn _get_dataset_base_path(benchmark_run: bool) -> Result { /// /// * `dataset`: The name of the dataset directory (e.g., "test-pr-directed"). /// * `benchmark_run`: true for benchmark runs to read data from bench/data dir, false for tests to read data from testing/data. -pub async fn create_ldbc_test_graph(dataset: &str, benchmark_run: bool) -> Result { +/// * `is_3d`: Boolean value that defines if edges has weights field or not. +pub async fn create_ldbc_test_graph( + dataset: &str, + benchmark_run: bool, + is_3d: bool, +) -> Result { let ctx = SessionContext::new(); - let edge_schema = Schema::new(vec![ - Field::new("src", DataType::Int64, false), - Field::new("dst", DataType::Int64, false), - ]); + let edge_schema = _create_edge_schema(is_3d); let vertices_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]); let ds_base_path = _get_dataset_base_path(benchmark_run)?; @@ -75,18 +96,12 @@ pub fn parse_ldbc_properties_file( dataset, dataset ); - let content = fs::read_to_string(prop_fp)?; - let mut properties_map: HashMap = HashMap::new(); + let prop_file = File::open(prop_fp)?; + let reader = BufReader::new(prop_file); - for line in content.lines() { - let trimmed_line = line.trim(); + // need to map the Properties error thrown by java_properties to io Error + let properties_map = + read(reader).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - if trimmed_line.is_empty() || trimmed_line.starts_with("#") { - continue; - } - if let Some((key, value)) = trimmed_line.split_once("=") { - properties_map.insert(key.trim().to_string(), value.trim().to_string()); - } - } Ok(properties_map) } From c8e94eab6a9e5beff4faef962e92b258c4b00230 Mon Sep 17 00:00:00 2001 From: guptaakashdeep Date: Wed, 30 Jul 2025 17:31:11 +0530 Subject: [PATCH 4/8] Renamed is_3d flag to is_weighted --- src/util.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/util.rs b/src/util.rs index 65d1355..4d38fe1 100644 --- a/src/util.rs +++ b/src/util.rs @@ -22,14 +22,14 @@ fn _get_dataset_base_path(benchmark_run: bool) -> Result { /// Creates Schema for Graphframe edges Dataframe based on is_3d flag /// # Arguments -/// * `is_3d`: Boolean value that defines if edges has weights field or not. -fn _create_edge_schema(is_3d: bool) -> Schema { +/// * `is_weighted`: Boolean value that defines if edges has weights field or not. +fn _create_edge_schema(is_weighted: bool) -> Schema { let mut edge_fields = vec![ Field::new("src", DataType::Int64, false), Field::new("dst", DataType::Int64, false), ]; - if is_3d { + if is_weighted { edge_fields.push(Field::new("weights", DataType::Float64, false)) } @@ -48,11 +48,11 @@ fn _create_edge_schema(is_3d: bool) -> Schema { pub async fn create_ldbc_test_graph( dataset: &str, benchmark_run: bool, - is_3d: bool, + is_weighted: bool, ) -> Result { let ctx = SessionContext::new(); - let edge_schema = _create_edge_schema(is_3d); + let edge_schema = _create_edge_schema(is_weighted); let vertices_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]); let ds_base_path = _get_dataset_base_path(benchmark_run)?; From d500d5985aea3c94840d305057b98f9b0de77d09 Mon Sep 17 00:00:00 2001 From: guptaakashdeep Date: Wed, 30 Jul 2025 17:40:17 +0530 Subject: [PATCH 5/8] Added 3 retries in case of dataset download failures --- run_benchmarks.py | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/run_benchmarks.py b/run_benchmarks.py index 7998ce0..686c0c4 100644 --- a/run_benchmarks.py +++ b/run_benchmarks.py @@ -5,6 +5,7 @@ import subprocess import sys import shutil +import time # The base URL for downloading Graphalytics datasets. BASE_URL = "https://datasets.ldbcouncil.org/graphalytics" @@ -33,19 +34,28 @@ def prepare_dataset(dataset_name: str): if not archive_path.exists(): print(f"Dataset archive '{archive_path}' not found. Downloading...") archive_url = f"{BASE_URL}/{dataset_name}.tar.zst" - try: - response = requests.get(archive_url, stream=True) - response.raise_for_status() - with open(archive_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - print(f"Successfully downloaded {archive_path}") - except requests.exceptions.RequestException as e: - print( - f"Error: Failed to download dataset from {archive_url}", file=sys.stderr - ) - print(e, file=sys.stderr) - sys.exit(1) + # 3 tries to download the dataset before actually failing + retries = 3 + for attempt in range(retries): + try: + response = requests.get(archive_url, stream=True) + response.raise_for_status() + with open(archive_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + print(f"Successfully downloaded {archive_path}") + break # Success, exit the loop + except requests.exceptions.RequestException as e: + print(f"Attempt {attempt + 1} of {retries} failed: {e}", file=sys.stderr) + if attempt < retries - 1: + print("Retrying in 5 seconds...", file=sys.stderr) + time.sleep(5) + else: + print( + f"Error: Failed to download dataset from {archive_url} after {retries} attempts.", + file=sys.stderr, + ) + sys.exit(1) # Now, decompress and extract the archive using command-line tools. print("Decompressing dataset...") From 4b6531aa8f20fae2e8731b50c6873ab0d1844b29 Mon Sep 17 00:00:00 2001 From: guptaakashdeep Date: Wed, 30 Jul 2025 17:46:25 +0530 Subject: [PATCH 6/8] Removal of java-properties dependency --- Cargo.lock | 27 --------------------------- Cargo.toml | 1 - src/util.rs | 24 ++++++++++++++---------- 3 files changed, 14 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd16e03..aec0f0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1383,15 +1383,6 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -[[package]] -name = "encoding_rs" -version = "0.8.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" -dependencies = [ - "cfg-if", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -1602,7 +1593,6 @@ version = "0.1.0" dependencies = [ "criterion", "datafusion", - "java-properties", "tokio", ] @@ -1860,17 +1850,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" -[[package]] -name = "java-properties" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37bf6f484471c451f2b51eabd9e66b3fa7274550c5ec4b6c3d6070840945117f" -dependencies = [ - "encoding_rs", - "lazy_static", - "regex", -] - [[package]] name = "jobserver" version = "0.1.33" @@ -1891,12 +1870,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "lazy_static" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" - [[package]] name = "lexical-core" version = "1.0.5" diff --git a/Cargo.toml b/Cargo.toml index 2c1c1ac..5f64ef8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,6 @@ exclude = [ [dependencies] datafusion = "48.0.1" tokio = {version = "1"} -java-properties = "2.0.0" [dev-dependencies] criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } diff --git a/src/util.rs b/src/util.rs index 4d38fe1..3dab0a7 100644 --- a/src/util.rs +++ b/src/util.rs @@ -2,12 +2,9 @@ use crate::GraphFrame; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::error::Result; use datafusion::prelude::{CsvReadOptions, SessionContext}; -use java_properties::read; use std::collections::HashMap; -use std::env; -use std::fs::File; -use std::io::BufReader; -use std::io::{self, Result as ioResult}; +use std::{env,fs}; +use std::io::Result as ioResult; // Gets the basepath of the dataset based on if it's benchmark runs or test runs /// # Arguments @@ -85,6 +82,7 @@ pub async fn create_ldbc_test_graph( Ok(GraphFrame { vertices, edges }) } + // Reads the ldbc dataset properties file and converts it into a HashMap pub fn parse_ldbc_properties_file( dataset: &str, @@ -96,12 +94,18 @@ pub fn parse_ldbc_properties_file( dataset, dataset ); - let prop_file = File::open(prop_fp)?; - let reader = BufReader::new(prop_file); + let content = fs::read_to_string(prop_fp)?; + let mut properties_map: HashMap = HashMap::new(); - // need to map the Properties error thrown by java_properties to io Error - let properties_map = - read(reader).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + for line in content.lines() { + let trimmed_line = line.trim(); + if trimmed_line.is_empty() || trimmed_line.starts_with("#") { + continue; + } + if let Some((key, value)) = trimmed_line.split_once("=") { + properties_map.insert(key.trim().to_string(), value.trim().to_string()); + } + } Ok(properties_map) } From 43cc3d24885e05a145ec3c7caf4d7403f136fe4e Mon Sep 17 00:00:00 2001 From: guptaakashdeep Date: Wed, 30 Jul 2025 17:48:00 +0530 Subject: [PATCH 7/8] Code formatting --- src/util.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/util.rs b/src/util.rs index 3dab0a7..a58a0c3 100644 --- a/src/util.rs +++ b/src/util.rs @@ -3,8 +3,8 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::error::Result; use datafusion::prelude::{CsvReadOptions, SessionContext}; use std::collections::HashMap; -use std::{env,fs}; use std::io::Result as ioResult; +use std::{env, fs}; // Gets the basepath of the dataset based on if it's benchmark runs or test runs /// # Arguments @@ -82,7 +82,6 @@ pub async fn create_ldbc_test_graph( Ok(GraphFrame { vertices, edges }) } - // Reads the ldbc dataset properties file and converts it into a HashMap pub fn parse_ldbc_properties_file( dataset: &str, From 7b4322df0b04f74f40174c88ea9a6a84f172a255 Mon Sep 17 00:00:00 2001 From: Akashdeep Gupta Date: Wed, 30 Jul 2025 17:48:31 +0530 Subject: [PATCH 8/8] Update run_benchmarks.py Co-authored-by: Sem --- run_benchmarks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run_benchmarks.py b/run_benchmarks.py index 686c0c4..91405c2 100644 --- a/run_benchmarks.py +++ b/run_benchmarks.py @@ -101,7 +101,7 @@ def prepare_dataset(dataset_name: str): ): old_path = pathlib.Path(dirpath) / filename new_path = old_path.with_name(f"{old_path.name}.csv") - print(f" Renaming {old_path} to {new_path}") + print(f"\tRenaming {old_path} to {new_path}") os.rename(old_path, new_path) print("File renaming complete.")