diff --git a/.gitignore b/.gitignore index 998fd88..09b1145 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ .cargo/ debug/ release/ +/benches/data/ diff --git a/Cargo.lock b/Cargo.lock index 64d2787..aec0f0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,18 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]] +name = "anstyle" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" + [[package]] name = "arrayref" version = "0.3.9" @@ -474,6 +486,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.2.30" @@ -513,6 +531,58 @@ dependencies = [ "phf", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + +[[package]] +name = "clap" +version = "4.5.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be92d32e80243a54711e5d7ce823c35c41c9d929dc4ab58e1276f625841aadf9" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.5.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "707eab41e9622f9139419d573eca0900137718000c517d47da73045f54331c3d" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" + [[package]] name = "comfy-table" version = "7.1.4" @@ -573,6 +643,63 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -668,7 +795,7 @@ dependencies = [ "datafusion-sql", "flate2", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -703,7 +830,7 @@ dependencies = [ "datafusion-session", "datafusion-sql", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -791,7 +918,7 @@ dependencies = [ "flate2", "futures", "glob", - "itertools", + "itertools 0.14.0", "log", "object_store", "parquet", @@ -876,7 +1003,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -940,7 +1067,7 @@ dependencies = [ "arrow", "datafusion-common", "indexmap", - "itertools", + "itertools 0.14.0", "paste", ] @@ -963,7 +1090,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-macros", "hex", - "itertools", + "itertools 0.14.0", "log", "md-5", "rand", @@ -1023,7 +1150,7 @@ dependencies = [ "datafusion-functions-aggregate", "datafusion-macros", "datafusion-physical-expr-common", - "itertools", + "itertools 0.14.0", "log", "paste", ] @@ -1095,7 +1222,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "indexmap", - "itertools", + "itertools 0.14.0", "log", "recursive", "regex", @@ -1118,7 +1245,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools", + "itertools 0.14.0", "log", "paste", "petgraph", @@ -1135,7 +1262,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "itertools", + "itertools 0.14.0", ] [[package]] @@ -1152,7 +1279,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", - "itertools", + "itertools 0.14.0", "log", "recursive", ] @@ -1180,7 +1307,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools", + "itertools 0.14.0", "log", "parking_lot", "pin-project-lite", @@ -1204,7 +1331,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-sql", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -1464,6 +1591,7 @@ checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" name = "graphframes-rs" version = "0.1.0" dependencies = [ + "criterion", "datafusion", "tokio", ] @@ -1500,6 +1628,12 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -1681,6 +1815,26 @@ dependencies = [ "libc", ] +[[package]] +name = "is-terminal" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.59.0", +] + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -1980,7 +2134,7 @@ dependencies = [ "futures", "http", "humantime", - "itertools", + "itertools 0.14.0", "parking_lot", "percent-encoding", "thiserror", @@ -1998,6 +2152,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "ordered-float" version = "2.10.1" @@ -2126,6 +2286,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "potential_utf" version = "0.1.2" @@ -2206,6 +2394,26 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "recursive" version = "0.1.1" @@ -2380,6 +2588,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + [[package]] name = "simdutf8" version = "0.1.5" @@ -2410,6 +2627,16 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "sqlparser" version = "0.55.0" @@ -2548,6 +2775,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tokio" version = "1.46.1" @@ -2559,9 +2796,13 @@ dependencies = [ "io-uring", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "slab", + "socket2", "tokio-macros", + "windows-sys 0.52.0", ] [[package]] @@ -2867,6 +3108,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.59.0" diff --git a/Cargo.toml b/Cargo.toml index 4118c38..5f64ef8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,16 @@ exclude = [ [dependencies] datafusion = "48.0.1" tokio = {version = "1"} + +[dev-dependencies] +criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } +tokio = { version = "1", features = ["full"] } + +[[bench]] +name = "pagerank_benchmark" +harness = false # To disable Rust's default benchmarking and use the Criterion one + +# Adding more benchmarks +# [[bench]] +# name = "shortestdistance_benchmark" +# harness = false \ No newline at end of file diff --git a/benches/README.md b/benches/README.md new file mode 100644 index 0000000..0f5acb4 --- /dev/null +++ b/benches/README.md @@ -0,0 +1,41 @@ +# Running Benchmarks for Graphframe-rs + +Benchmarking for Graphframe-rs are currently done on LDBC Graphalytics [datasets](https://ldbcouncil.org/benchmarks/graphalytics/datasets/). +Benchmarking runs and reports are executed/generated as html-reports using Rust Criterion crate. + +## How to run benchmarks ? + +`run_benchmarks.py` file is the main source for running the benchmarks. + +### Dependencies + +Running benchmarks using python requires: + +```text +Python Package: +- requests # for downloading dataset +``` + +```text +CLI utility: +- zstd # For decompressing the downloaded dataset +- tar # For unzipping the decompressed dataset +``` + +### Parameters for `run_benchmarks.py` + +- `--dataset`: [MANDATORY] LDBC dataset name on which user want to run the benchmark (for e.g. test-pr-directed, cit-Patents). Dataset name are exactly same as mentioned in LDBC website. +- `--checkpoint_interval`: If user wants to define a specific number of checkpoints for Algorithms to run on. `default: 1` +- `--name`: If a particular benchmark needs to run. Name should be same as the `[[bench]]` names present in `Cargo.toml` + +```bash +# Running all the benchmarks +python3 run_benchmarks.py --dataset cit-Patents --checkpoint_interval 2 + +# Running an individual benchmark +python3 run_benchmarks.py --dataset cit-Patents --checkpoint_interval 2 --name pagerank_benchmark +``` + +## Benchmarking Reports + +Criterion benchmarking html-reports can be seen by opening `target/criterion/report/index.html` in any browser after benchmarking completes. diff --git a/benches/pagerank_benchmark.rs b/benches/pagerank_benchmark.rs new file mode 100644 index 0000000..d32aa35 --- /dev/null +++ b/benches/pagerank_benchmark.rs @@ -0,0 +1,56 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use graphframes_rs::util::create_ldbc_test_graph; +use std::env; +use tokio::runtime::Runtime; + +fn benchmark_pagerank(c: &mut Criterion) { + let dataset_name = + env::var("BENCHMARK_DATASET").expect("BENCHMARK_DATASET environment variable not set"); + + let checkpoint_interval: usize = env::var("CHECKPOINT_INTERVAL") + .expect("BENCHMARK_DATASET environment variable not set") + .parse() + .expect("CHECKPOINT_INTERVAL is not a valid int"); + + let mut group = c.benchmark_group("PageRank"); + + // Create a Tokio runtime to execute the async graph loading function. + let rt = Runtime::new().unwrap(); + + // Load the graph data once before running the benchmark. + let graph = rt + .block_on(create_ldbc_test_graph(&dataset_name, true, false)) + .expect("Failed to create test graph"); + + // Creating pagerank_builder here so to exclude the time of generation in each iteration + let pagerank_builder = graph + .pagerank() + .max_iter(10) + .checkpoint_interval(checkpoint_interval) + .reset_prob(0.15); + + // Define the benchmark. + // Criterion runs the code inside the closure many times to get a reliable measurement. + group.bench_function( + String::from( + "pagerank-".to_owned() + &dataset_name + "-cp-" + &checkpoint_interval.to_string(), + ), + |b| { + // Use the `to_async` adapter to benchmark an async function. + b.to_async(&rt).iter(|| async { + let _ = pagerank_builder + .clone() + .run() + .await + .unwrap() + .collect() + .await; + }) + }, + ); + + group.finish(); +} + +criterion_group!(benches, benchmark_pagerank); +criterion_main!(benches); diff --git a/run_benchmarks.py b/run_benchmarks.py new file mode 100644 index 0000000..91405c2 --- /dev/null +++ b/run_benchmarks.py @@ -0,0 +1,213 @@ +import argparse +import os +import pathlib +import requests +import subprocess +import sys +import shutil +import time + +# The base URL for downloading Graphalytics datasets. +BASE_URL = "https://datasets.ldbcouncil.org/graphalytics" + +# The local directory where benchmark data will be stored. +BENCH_DATA_DIR = pathlib.Path("benches") / "data" / "ldbc" + + +def prepare_dataset(dataset_name: str): + """ + Ensures the dataset is downloaded, decompressed, renamed, and ready for use. + """ + dataset_dir = BENCH_DATA_DIR / dataset_name + archive_path = BENCH_DATA_DIR / dataset_name / f"{dataset_name}.tar.zst" + tar_path = BENCH_DATA_DIR / dataset_name / f"{dataset_name}.tar" + + # If the final extracted directory exists, we are ready to run benchmarks. + if dataset_dir.is_dir(): + print(f"Dataset '{dataset_name}' is ready.") + return + + # make dataset_dir if doesn't exist + os.mkdir(dataset_dir) + + # If the archive doesn't exist, download it. + if not archive_path.exists(): + print(f"Dataset archive '{archive_path}' not found. Downloading...") + archive_url = f"{BASE_URL}/{dataset_name}.tar.zst" + # 3 tries to download the dataset before actually failing + retries = 3 + for attempt in range(retries): + try: + response = requests.get(archive_url, stream=True) + response.raise_for_status() + with open(archive_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + print(f"Successfully downloaded {archive_path}") + break # Success, exit the loop + except requests.exceptions.RequestException as e: + print(f"Attempt {attempt + 1} of {retries} failed: {e}", file=sys.stderr) + if attempt < retries - 1: + print("Retrying in 5 seconds...", file=sys.stderr) + time.sleep(5) + else: + print( + f"Error: Failed to download dataset from {archive_url} after {retries} attempts.", + file=sys.stderr, + ) + sys.exit(1) + + # Now, decompress and extract the archive using command-line tools. + print("Decompressing dataset...") + + # Check for required commands. + if not shutil.which("unzstd"): + print( + "Error: 'unzstd' command not found. Please install zstandard.", + file=sys.stderr, + ) + sys.exit(1) + if not shutil.which("tar"): + print("Error: 'tar' command not found.", file=sys.stderr) + sys.exit(1) + + try: + # Decompress .zst file using unzstd. + print(f"Running: unzstd -f {archive_path}") + subprocess.run( + ["unzstd", "-f", str(archive_path)], check=True, capture_output=True + ) + + # Extract .tar file. + print(f"Running: tar -xf {tar_path} -C {dataset_dir}") + subprocess.run( + ["tar", "-xf", str(tar_path), "-C", str(dataset_dir)], + check=True, + capture_output=True, + ) + + # Clean up the intermediate .tar file. + print(f"Cleaning up {tar_path}") + os.remove(tar_path) + + print("Decompression and extraction complete.") + + # Rename data files to add .csv extension. + print(f"Renaming files in {dataset_dir} to add .csv extension...") + for dirpath, _, filenames in os.walk(dataset_dir): + for filename in filenames: + if (not filename.endswith(".properties")) and ( + not filename.endswith(".tar.zst") + ): + old_path = pathlib.Path(dirpath) / filename + new_path = old_path.with_name(f"{old_path.name}.csv") + print(f"\tRenaming {old_path} to {new_path}") + os.rename(old_path, new_path) + print("File renaming complete.") + + except subprocess.CalledProcessError as e: + print(f"Error during decompression: {e}", file=sys.stderr) + print(f"Stdout: {e.stdout.decode() if e.stdout else 'N/A'}", file=sys.stderr) + print(f"Stderr: {e.stderr.decode() if e.stderr else 'N/A'}", file=sys.stderr) + sys.exit(1) + except FileNotFoundError: + print( + f"Error: Could not find intermediate file {tar_path} for cleanup.", + file=sys.stderr, + ) + sys.exit(1) + + +def run_benchmarks(dataset_name: str, checkpoint_interval: int, benchmark_name: str): + """ + Runs the Rust benchmarks using 'cargo bench', passing the dataset name + as an environment variable. + """ + print(f"\nRunning benchmarks for dataset: {dataset_name}") + + # Set the dataset name in an environment variable for the benchmark process. + env = os.environ.copy() + env["BENCHMARK_DATASET"] = dataset_name + env["CHECKPOINT_INTERVAL"] = checkpoint_interval + + # Execute 'cargo bench' and stream its output. + try: + cmd = ( + ["cargo", "bench"] + if not benchmark_name + else ["cargo", "bench", "--bench", benchmark_name] + ) + process = subprocess.Popen( + cmd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + + # Read and print output line by line. + for line in iter(process.stdout.readline, ""): + print(line, end="") + + process.stdout.close() + return_code = process.wait() + + if return_code != 0: + print( + f"\nError: Benchmark process failed with exit code {return_code}", + file=sys.stderr, + ) + sys.exit(return_code) + + except FileNotFoundError: + print( + "Error: 'cargo' command not found. Is Rust installed and in your PATH?", + file=sys.stderr, + ) + sys.exit(1) + except subprocess.CalledProcessError as e: + print(f"Error running benchmarks: {e}", file=sys.stderr) + sys.exit(1) + + +def main(): + """ + Main function to parse arguments and orchestrate the benchmark run. + """ + parser = argparse.ArgumentParser( + description="A Python script to download datasets and run GraphFrame benchmarks." + ) + parser.add_argument( + "--dataset", + type=str, + required=True, + help="The name of the Graphalytics dataset to download and use for benchmarking (e.g., 'test-pr-directed').", + ) + parser.add_argument( + "--checkpoint_interval", + type=str, + default="1", + required=False, + help="Providing checkpoint_interval to be used in algorithms to run benchmark.", + ) + parser.add_argument( + "--name", + type=str, + required=False, + help="Name of the benchmark that needs to run.", + ) + args = parser.parse_args() + dataset = args.dataset + checkpoint_interval = args.checkpoint_interval + benchmark_name = args.name + + # Ensure the base data directory exists. + BENCH_DATA_DIR.mkdir(parents=True, exist_ok=True) + + prepare_dataset(dataset) + run_benchmarks(dataset, checkpoint_interval, benchmark_name) + + +if __name__ == "__main__": + main() diff --git a/src/lib.rs b/src/lib.rs index 3f39f42..301cfc1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ mod pagerank; mod pregel; mod shortest_paths; +pub mod util; use datafusion::error::Result; use datafusion::functions_aggregate::count::count; @@ -91,51 +92,6 @@ mod tests { Ok(GraphFrame { vertices, edges }) } - // Creates GraphFrame from ldbc datasets. - // dataset: name of ldbc dataset like tested-pr-directed, wiki-Talk, etc. - pub async fn create_ldbc_test_graph(dataset: &str) -> Result { - let ctx = SessionContext::new(); - - let manifest_dir = env!("CARGO_MANIFEST_DIR"); - - let edge_schema = Schema::new(vec![ - Field::new("src", DataType::Int64, false), - Field::new("dst", DataType::Int64, false), - ]); - let vertices_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]); - - let edges_path = format!( - "{}/testing/data/ldbc/{}/{}.e.csv", - manifest_dir, dataset, dataset - ); - let vertices_path = format!( - "{}/testing/data/ldbc/{}/{}.v.csv", - manifest_dir, dataset, dataset - ); - - let edges = ctx - .read_csv( - &edges_path, - CsvReadOptions::new() - .delimiter(b' ') - .has_header(false) - .schema(&edge_schema), - ) - .await?; - - let vertices = ctx - .read_csv( - &vertices_path, - CsvReadOptions::new() - .delimiter(b' ') - .has_header(false) - .schema(&vertices_schema), - ) - .await?; - - Ok(GraphFrame { vertices, edges }) - } - #[tokio::test] async fn test_num_nodes() -> Result<()> { let graph = create_test_graph()?; diff --git a/src/pagerank.rs b/src/pagerank.rs index d3ebb3c..6a681ea 100644 --- a/src/pagerank.rs +++ b/src/pagerank.rs @@ -1,12 +1,15 @@ -use crate::GraphFrame; use crate::pregel::{MessageDirection, PREGEL_MSG, PregelBuilder, pregel_src}; +use crate::{GraphFrame, VERTEX_ID}; use datafusion::error::Result; use datafusion::functions_aggregate::sum::sum; use datafusion::prelude::*; +/// Column name for pagerank in the Page Rank algorithm +pub const PAGERANK: &str = "pagerank"; + /// A builder for the PageRank algorithm. /// -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PageRankBuilder<'a> { graph: &'a GraphFrame, max_iter: usize, @@ -65,13 +68,13 @@ impl<'a> PageRankBuilder<'a> { .max_iterations(self.max_iter) .checkpoint_interval(self.checkpoint_interval) .add_vertex_column( - "pagerank", + PAGERANK, lit(reset_prob_per_vertices), // All vertices start with a rank of 1/N lit(reset_prob_per_vertices) + lit(alpha) * col(PREGEL_MSG), // PageRank calculation ) .add_vertex_column("out_degree", col("out_degree"), col("out_degree")) // out_degrees are static .add_message( - pregel_src("pagerank") / pregel_src("out_degree"), + pregel_src(PAGERANK) / pregel_src("out_degree"), MessageDirection::SrcToDst, ) .with_aggregate_expr(sum(col(PREGEL_MSG))); @@ -83,7 +86,7 @@ impl<'a> PageRankBuilder<'a> { // Calculating the pagerank aggregation let aggregated_rank = calculated_page_ranks .clone() - .aggregate(vec![], vec![sum(col("pagerank")).alias("pagerank_sum")])? + .aggregate(vec![], vec![sum(col(PAGERANK)).alias("pagerank_sum")])? .cache() .await?; @@ -91,8 +94,8 @@ impl<'a> PageRankBuilder<'a> { let final_page_ranks = calculated_page_ranks.join(aggregated_rank, JoinType::Inner, &[], &[], None)?; Ok(final_page_ranks.select(vec![ - col("id"), - (col("pagerank") / col("pagerank_sum")).alias("pagerank"), + col(VERTEX_ID), + (col(PAGERANK) / col("pagerank_sum")).alias(PAGERANK), ])?) } } @@ -107,9 +110,8 @@ impl GraphFrame { #[cfg(test)] mod tests { use super::*; - use crate::tests::create_ldbc_test_graph; + use crate::util::create_ldbc_test_graph; use datafusion::arrow::datatypes::{DataType, Field, Schema}; - // Gets the expected pagerank results from the mentioned ldbc dataset async fn get_ldbc_pr_results(dataset: &str) -> Result { let ctx = SessionContext::new(); @@ -137,13 +139,12 @@ mod tests { #[tokio::test] async fn test_pagerank_run() -> Result<()> { let test_dataset: &str = "test-pr-directed"; - let graph = create_ldbc_test_graph(test_dataset).await?; - + let graph = create_ldbc_test_graph(test_dataset, false, false).await?; let calculated_page_rank = graph .pagerank() .max_iter(14) .reset_prob(0.15) - .checkpoint_interval(2) + .checkpoint_interval(1) .run() .await?; let ldbc_page_rank = get_ldbc_pr_results(test_dataset).await?; @@ -152,11 +153,11 @@ mod tests { .join( ldbc_page_rank, JoinType::Left, - &["id"], + &[VERTEX_ID], &["vertex_id"], None, )? - .with_column("difference", abs(col("pagerank") - col("expected_pr")))? + .with_column("difference", abs(col(PAGERANK) - col("expected_pr")))? .filter(col("difference").gt(lit(0.0015)))?; // comparison_df.clone().show().await?; diff --git a/src/shortest_paths.rs b/src/shortest_paths.rs index 0fa4bd5..f5c0a0c 100644 --- a/src/shortest_paths.rs +++ b/src/shortest_paths.rs @@ -324,7 +324,7 @@ impl GraphFrame { #[cfg(test)] mod tests { use super::*; - use crate::tests::create_ldbc_test_graph; + use crate::util::create_ldbc_test_graph; use datafusion::arrow::array::{Int64Array, RecordBatch}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::prelude::SessionContext; @@ -500,7 +500,7 @@ mod tests { #[tokio::test] async fn test_ldbc() -> Result<()> { let expected_distances = get_ldbc_bfs_results("test-bfs-directed").await?; - let graph = create_ldbc_test_graph("test-bfs-directed").await?; + let graph = create_ldbc_test_graph("test-bfs-directed", false, false).await?; let results = graph .shortest_paths(vec![1]) diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..a58a0c3 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,110 @@ +use crate::GraphFrame; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::error::Result; +use datafusion::prelude::{CsvReadOptions, SessionContext}; +use std::collections::HashMap; +use std::io::Result as ioResult; +use std::{env, fs}; + +// Gets the basepath of the dataset based on if it's benchmark runs or test runs +/// # Arguments +/// +/// * `benchmark_run`: true for benchmark runs to read data from bench/data dir, false for tests to read data from testing/data. +fn _get_dataset_base_path(benchmark_run: bool) -> Result { + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let dir_name = if benchmark_run { "benches" } else { "testing" }; + let base_path = format!("{}/{}/data/ldbc", manifest_dir, dir_name); + Ok(base_path) +} + +/// Creates Schema for Graphframe edges Dataframe based on is_3d flag +/// # Arguments +/// * `is_weighted`: Boolean value that defines if edges has weights field or not. +fn _create_edge_schema(is_weighted: bool) -> Schema { + let mut edge_fields = vec![ + Field::new("src", DataType::Int64, false), + Field::new("dst", DataType::Int64, false), + ]; + + if is_weighted { + edge_fields.push(Field::new("weights", DataType::Float64, false)) + } + + Schema::new(edge_fields) +} + +/// Creates a GraphFrame from an LDBC-style dataset. +/// +/// The dataset is expected to be located in the `benches/data/` directory. +/// +/// # Arguments +/// +/// * `dataset`: The name of the dataset directory (e.g., "test-pr-directed"). +/// * `benchmark_run`: true for benchmark runs to read data from bench/data dir, false for tests to read data from testing/data. +/// * `is_3d`: Boolean value that defines if edges has weights field or not. +pub async fn create_ldbc_test_graph( + dataset: &str, + benchmark_run: bool, + is_weighted: bool, +) -> Result { + let ctx = SessionContext::new(); + + let edge_schema = _create_edge_schema(is_weighted); + let vertices_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]); + + let ds_base_path = _get_dataset_base_path(benchmark_run)?; + + let edges_path = format!("{}/{}/{}.e.csv", ds_base_path, dataset, dataset); + let vertices_path = format!("{}/{}/{}.v.csv", ds_base_path, dataset, dataset); + + println!("{}", edges_path); + println!("{}", vertices_path); + + let edges = ctx + .read_csv( + &edges_path, + CsvReadOptions::new() + .delimiter(b' ') + .has_header(false) + .schema(&edge_schema), + ) + .await?; + + let vertices = ctx + .read_csv( + &vertices_path, + CsvReadOptions::new() + .delimiter(b' ') + .has_header(false) + .schema(&vertices_schema), + ) + .await?; + Ok(GraphFrame { vertices, edges }) +} + +// Reads the ldbc dataset properties file and converts it into a HashMap +pub fn parse_ldbc_properties_file( + dataset: &str, + benchmark_run: bool, +) -> ioResult> { + let prop_fp = format!( + "{}/{}/{}.properties", + _get_dataset_base_path(benchmark_run)?, + dataset, + dataset + ); + let content = fs::read_to_string(prop_fp)?; + let mut properties_map: HashMap = HashMap::new(); + + for line in content.lines() { + let trimmed_line = line.trim(); + + if trimmed_line.is_empty() || trimmed_line.starts_with("#") { + continue; + } + if let Some((key, value)) = trimmed_line.split_once("=") { + properties_map.insert(key.trim().to_string(), value.trim().to_string()); + } + } + Ok(properties_map) +}