From 6b61f533eb3de21771a95c7dba9da08a8e16e62d Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Mon, 28 Jul 2025 09:18:34 +0200 Subject: [PATCH 1/4] SP wip --- src/lib.rs | 1 + src/shortest_paths.rs | 254 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+) create mode 100644 src/shortest_paths.rs diff --git a/src/lib.rs b/src/lib.rs index d8f4bb6..7f96dc7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ mod pregel; +mod shortest_paths; use datafusion::error::Result; use datafusion::functions_aggregate::count::count; diff --git a/src/shortest_paths.rs b/src/shortest_paths.rs new file mode 100644 index 0000000..be6a0f6 --- /dev/null +++ b/src/shortest_paths.rs @@ -0,0 +1,254 @@ +use crate::pregel::PREGEL_MSG; +use crate::{GraphFrame, VERTEX_ID}; +use datafusion::arrow::array::{ + Array, ArrayRef, Int32Array, Int32Builder, Int64Array, Int64Builder, MapBuilder, +}; +use datafusion::arrow::datatypes::{DataType, Field, Fields}; +use datafusion::common::ScalarValue; +use datafusion::error::{DataFusionError, Result}; +use datafusion::functions::core::expr_ext::FieldAccessor; +use datafusion::functions_nested::map::map; +use datafusion::logical_expr::Volatility; +use datafusion::physical_plan::Accumulator; +use datafusion::prelude::*; +use std::collections::HashMap; +use std::sync::Arc; + +/// Column name for distances in the Shortest Paths algorithm +pub const DISTANCES: &str = "distances"; + +#[derive(Debug)] +struct DistancesMap { + distances: HashMap, +} + +impl DistancesMap { + fn new(landmarks: Arc>) -> Self { + Self { + distances: HashMap::from_iter(landmarks.iter().map(|&lm| (lm, i32::MAX))), + } + } +} + +impl Accumulator for DistancesMap { + // Internal state: Map -- mapping from landmarks to distances; + // Update logic is the same as merge: for all the keys taking the minimal value; + // Result is the same as state and input rows; + // + // Transform multiple Map into a single Map with minimal values per key. + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + let array_of_maps = &values[0]; + (0..array_of_maps.len()).try_for_each(|index| { + let new_map = ScalarValue::try_from_array(array_of_maps, index)?; + + if let ScalarValue::Map(value) = new_map { + let keys = value.keys().as_any().downcast_ref::().unwrap(); + let values = value.values().as_any().downcast_ref::().unwrap(); + for (k, v) in keys.iter().zip(values.iter()) { + match (k, v) { + (Some(k), Some(v)) => { + if v < self.distances[&k] { + self.distances.insert(k, v); + } + } + _ => { + return Err(DataFusionError::Plan( + "Invalid map entry in DistancesMap accumulator".to_string(), + )); + } + } + } + } + Ok(()) + }) + } + + fn evaluate(&mut self) -> Result { + self.state().map(|state| state[0].clone()) + } + + fn size(&self) -> usize { + size_of::() + } + + fn state(&mut self) -> Result> { + let l_marks_builder = Int64Builder::with_capacity(self.distances.len()); + let distances_builder = Int32Builder::with_capacity(self.distances.len()); + let mut map_builder = MapBuilder::new(None, l_marks_builder, distances_builder); + + map_builder.keys().append_array(&Int64Array::from( + self.distances.keys().map(|k| k.clone()).collect::>(), + )); + map_builder.values().append_array(&Int32Array::from( + self.distances + .values() + .map(|v| v.clone()) + .collect::>(), + )); + map_builder.append(true)?; + Ok(vec![ScalarValue::Map(Arc::new(map_builder.finish()))]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } +} + +impl GraphFrame { + /// Compute the shortest paths from landmarks to all vertices in the graph. + /// + /// # Arguments + /// + /// * `landmarks` - A vector of vertex IDs to use as landmarks + /// * `max_iterations` - Optional maximum number of iterations to run + /// + /// # Returns + /// + /// A DataFrame with vertex IDs and a map of landmark -> distance + pub async fn shortest_paths( + &self, + landmarks: Vec, + max_iterations: Option, + ) -> Result { + let internal_distances_data_type = DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Int64, false), + Field::new("value", DataType::Int32, false), + ])), + false, + )), + false, + ); + const PARTICIPATING: &str = "participating"; + let landmarks_list = landmarks + .iter() + .map(|lm| lit(lm.clone())) + .collect::>(); + let zero_map = map(landmarks_list.clone(), vec![lit(i32::MAX); landmarks.len()]); + + // For landmarks: + // - distance to itself is 0 + // - distance to other landmarks is infinity + // For non-landmarks: + // - distance to other landmarks is infinity + let init_distances = when( + col(VERTEX_ID).in_list(landmarks_list.clone(), true), + zero_map, + ) + .otherwise(map( + landmarks_list.clone(), + landmarks + .iter() + .map(|lm| { + when(col(VERTEX_ID).eq(lit(lm.clone())), lit(0)) + .otherwise(lit(i32::MAX)) + .unwrap() + }) + .collect::>(), + ))?; + + let update_distances = map( + landmarks_list, + landmarks + .iter() + .map(|lm| { + let pregel_el = map_extract(col(PREGEL_MSG), lit(lm.clone())); + let map_el = map_extract(col(PREGEL_MSG), lit(lm.clone())); + when( + pregel_el + .clone() + .is_null() + .or(pregel_el.clone().lt_eq(map_el.clone())), + map_el, + ) + .otherwise(pregel_el) + .unwrap() + }) + .collect::>(), + ); + + let landmarks_copy = Arc::new(landmarks.clone()); + + let aggregate_expr = create_udaf( + "merge_distance_maps", + vec![internal_distances_data_type.clone()], + Arc::new(internal_distances_data_type.clone()), + Volatility::Immutable, + Arc::new(move |_| Ok(Box::new(DistancesMap::new(landmarks_copy.clone())))), + Arc::new(vec![internal_distances_data_type.clone()]), + ); + + // Initialize participation: only landmarks participate initially + let init_participating = landmarks.iter().fold(lit(false), |acc, &landmark| { + acc.or(col(VERTEX_ID).eq(lit(landmark))) + }); + + // Update participation condition: a vertex participates if it already participates or + // if it receives a message (meaning it's a neighbor of a participating vertex) + let update_participating = col(PARTICIPATING).or(col("msg").is_not_null()); + + // Message to send: current distances map + let message_expr = col(DISTANCES); + + // Aggregate expression: for each landmark, take the minimum of current distance and received distance + 1 + let aggregate_expr = named_struct( + landmarks + .iter() + .flat_map(|&landmark| { + let landmark_str = landmark.to_string(); + let current_dist = col(DISTANCES).field(&landmark_str); + let msg_dist = col("msg").field(&landmark_str); + + // If message contains a distance for this landmark, consider it + 1 as a candidate + // Choose the minimum of current distance and received distance + 1 + // If both are NULL, result is NULL + vec![ + lit(landmark_str), + when( + msg_dist.clone().is_not_null(), + when( + current_dist.clone().is_not_null(), + least(vec![current_dist.clone(), msg_dist.clone() + lit(1)]), + ) + .otherwise(msg_dist.clone() + lit(1)) + .expect("Failed to create inner when expression"), + ) + .otherwise(current_dist.clone()) + .expect("Failed to create outer when expression"), + ] + }) + .collect::>(), + ); + + // Voting condition: vertex votes to halt if its distances map didn't change + let voting_condition = col(DISTANCES).eq(col("updated_distances")); + + // Run Pregel algorithm + let result = self + .pregel() + // Add vertex columns + .add_vertex_column(DISTANCES, init_distances.clone(), col("updated_distances")) + .add_vertex_column("updated_distances", init_distances, aggregate_expr.clone()) + // Set participation condition + .with_participation_column(PARTICIPATING, init_participating, update_participating) + // Add message + .add_message(message_expr, crate::pregel::MessageDirection::SrcToDst) + // Set aggregate expression + .with_aggregate_expr(aggregate_expr) + // Set voting condition + .with_vertex_voting("active", voting_condition) + // Set max iterations if provided + .max_iterations(max_iterations.unwrap_or(usize::MAX)) + // Run the algorithm + .run(false) + .await?; + + // Return the result with vertex ID and distances map + Ok(result.data.select(vec![col(VERTEX_ID), col(DISTANCES)])?) + } +} From 321d4c8bb0849e052c8d329b1a0cd86da4c1b002 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Tue, 29 Jul 2025 13:04:45 +0200 Subject: [PATCH 2/4] WIP --- src/shortest_paths.rs | 403 +++++++++++++++++++++++++++++++----------- 1 file changed, 299 insertions(+), 104 deletions(-) diff --git a/src/shortest_paths.rs b/src/shortest_paths.rs index be6a0f6..72ee15d 100644 --- a/src/shortest_paths.rs +++ b/src/shortest_paths.rs @@ -1,14 +1,15 @@ -use crate::pregel::PREGEL_MSG; +use crate::pregel::{PREGEL_MSG, pregel_src}; use crate::{GraphFrame, VERTEX_ID}; use datafusion::arrow::array::{ - Array, ArrayRef, Int32Array, Int32Builder, Int64Array, Int64Builder, MapBuilder, + Array, ArrayRef, Int32Array, Int32Builder, Int64Array, Int64Builder, MapBuilder, MapFieldNames, + as_map_array, }; use datafusion::arrow::datatypes::{DataType, Field, Fields}; use datafusion::common::ScalarValue; +use datafusion::common::cast::{as_int32_array, as_int64_array}; use datafusion::error::{DataFusionError, Result}; -use datafusion::functions::core::expr_ext::FieldAccessor; use datafusion::functions_nested::map::map; -use datafusion::logical_expr::Volatility; +use datafusion::logical_expr::{ColumnarValue, Volatility}; use datafusion::physical_plan::Accumulator; use datafusion::prelude::*; use std::collections::HashMap; @@ -46,7 +47,11 @@ impl Accumulator for DistancesMap { if let ScalarValue::Map(value) = new_map { let keys = value.keys().as_any().downcast_ref::().unwrap(); - let values = value.values().as_any().downcast_ref::().unwrap(); + let values = value + .values() + .as_any() + .downcast_ref::() + .unwrap(); for (k, v) in keys.iter().zip(values.iter()) { match (k, v) { (Some(k), Some(v)) => { @@ -77,7 +82,15 @@ impl Accumulator for DistancesMap { fn state(&mut self) -> Result> { let l_marks_builder = Int64Builder::with_capacity(self.distances.len()); let distances_builder = Int32Builder::with_capacity(self.distances.len()); - let mut map_builder = MapBuilder::new(None, l_marks_builder, distances_builder); + let mut map_builder = MapBuilder::new( + Some(MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + l_marks_builder, + distances_builder, + ); map_builder.keys().append_array(&Int64Array::from( self.distances.keys().map(|k| k.clone()).collect::>(), @@ -97,85 +110,160 @@ impl Accumulator for DistancesMap { } } -impl GraphFrame { - /// Compute the shortest paths from landmarks to all vertices in the graph. - /// - /// # Arguments - /// - /// * `landmarks` - A vector of vertex IDs to use as landmarks - /// * `max_iterations` - Optional maximum number of iterations to run - /// - /// # Returns - /// - /// A DataFrame with vertex IDs and a map of landmark -> distance - pub async fn shortest_paths( - &self, - landmarks: Vec, - max_iterations: Option, - ) -> Result { +fn merge_distances_maps(args: &[ColumnarValue]) -> Result { + let arrays = ColumnarValue::values_to_arrays(args)?; + let left_map = as_map_array(&arrays[0]); + let right_map = as_map_array(&arrays[1]); + + if left_map.len() != right_map.len() { + return Err(DataFusionError::Plan(format!( + "Invalid map length in merge_distances_maps: {} != {}", + left_map.len(), + right_map.len() + ))); + } + let keys = as_int64_array(left_map.keys())?; + let left_values = as_int32_array(left_map.values())?; + let right_values = as_int32_array(right_map.values())?; + + let new_keys_builder = Int64Builder::with_capacity(left_map.len()); + let new_values_builder = Int32Builder::with_capacity(left_map.len()); + let mut builder = MapBuilder::new( + Some(MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + new_keys_builder, + new_values_builder, + ); + + for map_idx in 0..left_map.len() { + if left_values.is_null(map_idx) || right_values.is_null(map_idx) { + builder.append(false)?; + continue; + } + let left_start = left_map.value_offsets()[map_idx] as usize; + let left_end = left_map.value_offsets()[map_idx + 1] as usize; + + let right_start = right_map.value_offsets()[map_idx] as usize; + + for (pos, i) in (left_start..left_end).enumerate() { + builder.keys().append_value(keys.value(i)); + let left_value = left_values.value(i); + let right_value = right_values.value(right_start + pos); + if left_value < right_value { + builder.values().append_value(left_value); + } else { + builder.values().append_value(right_value); + } + } + builder.append(true)?; + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) +} + +pub struct ShortestPathsBuilder<'a> { + graph_frame: &'a GraphFrame, + landmarks: Vec, + max_iterations: usize, + checkpoint_interval: usize, +} + +impl<'a> ShortestPathsBuilder<'a> { + pub fn new(graph_frame: &'a GraphFrame, landmarks: Vec) -> Self { + Self { + graph_frame, + landmarks, + max_iterations: i32::MAX as usize, + checkpoint_interval: 2, + } + } + + pub fn max_iterations(mut self, max_iterations: usize) -> Self { + self.max_iterations = max_iterations; + self + } + pub fn checkpoint_interval(mut self, checkpoint_interval: usize) -> Self { + self.checkpoint_interval = checkpoint_interval; + self + } + + pub async fn run(self) -> Result { let internal_distances_data_type = DataType::Map( Arc::new(Field::new( "entries", DataType::Struct(Fields::from(vec![ Field::new("key", DataType::Int64, false), - Field::new("value", DataType::Int32, false), + Field::new("value", DataType::Int32, true), ])), false, )), false, ); const PARTICIPATING: &str = "participating"; - let landmarks_list = landmarks + let landmarks_list = self + .landmarks .iter() .map(|lm| lit(lm.clone())) .collect::>(); - let zero_map = map(landmarks_list.clone(), vec![lit(i32::MAX); landmarks.len()]); + let zero_map = map( + landmarks_list.clone(), + vec![lit(i32::MAX); self.landmarks.len()], + ); // For landmarks: // - distance to itself is 0 // - distance to other landmarks is infinity // For non-landmarks: // - distance to other landmarks is infinity - let init_distances = when( - col(VERTEX_ID).in_list(landmarks_list.clone(), true), - zero_map, - ) - .otherwise(map( - landmarks_list.clone(), - landmarks - .iter() - .map(|lm| { - when(col(VERTEX_ID).eq(lit(lm.clone())), lit(0)) - .otherwise(lit(i32::MAX)) - .unwrap() - }) - .collect::>(), - ))?; - - let update_distances = map( - landmarks_list, - landmarks - .iter() - .map(|lm| { - let pregel_el = map_extract(col(PREGEL_MSG), lit(lm.clone())); - let map_el = map_extract(col(PREGEL_MSG), lit(lm.clone())); - when( - pregel_el - .clone() - .is_null() - .or(pregel_el.clone().lt_eq(map_el.clone())), - map_el, + let init_distances = self + .landmarks + .clone() + .iter() + .fold( + when( + col(VERTEX_ID).in_list(landmarks_list.clone(), true), + zero_map.clone(), + ), + |mut builder, &landmark| { + builder.when( + col(VERTEX_ID).eq(lit(landmark)), + map( + landmarks_list.clone(), + self.landmarks + .clone() + .iter() + .map(|lm_value| { + if lm_value == &landmark { + lit(0i32) + } else { + lit(i32::MAX) + } + }) + .collect::>(), + ), ) - .otherwise(pregel_el) - .unwrap() - }) - .collect::>(), + }, + ) + .otherwise(zero_map.clone())?; // otherwise should be unreachable + + let update_distances = create_udf( + "_merge_two_maps", + vec![ + internal_distances_data_type.clone(), + internal_distances_data_type.clone(), + ], + internal_distances_data_type.clone(), + Volatility::Immutable, + Arc::new(merge_distances_maps), ); - let landmarks_copy = Arc::new(landmarks.clone()); + let landmarks_copy = Arc::new(self.landmarks.clone()); - let aggregate_expr = create_udaf( - "merge_distance_maps", + let aggregate_expr_udaf = create_udaf( + "_merge_distance_maps", vec![internal_distances_data_type.clone()], Arc::new(internal_distances_data_type.clone()), Volatility::Immutable, @@ -184,66 +272,48 @@ impl GraphFrame { ); // Initialize participation: only landmarks participate initially - let init_participating = landmarks.iter().fold(lit(false), |acc, &landmark| { + let init_participating = self.landmarks.iter().fold(lit(false), |acc, &landmark| { acc.or(col(VERTEX_ID).eq(lit(landmark))) }); // Update participation condition: a vertex participates if it already participates or // if it receives a message (meaning it's a neighbor of a participating vertex) - let update_participating = col(PARTICIPATING).or(col("msg").is_not_null()); + let update_participating = + col(PREGEL_MSG) + .is_not_null() + .and(map_values(col(DISTANCES)).not_eq(map_values( + update_distances.call(vec![col(DISTANCES), col(PREGEL_MSG)]), + ))); // Message to send: current distances map - let message_expr = col(DISTANCES); - - // Aggregate expression: for each landmark, take the minimum of current distance and received distance + 1 - let aggregate_expr = named_struct( - landmarks - .iter() - .flat_map(|&landmark| { - let landmark_str = landmark.to_string(); - let current_dist = col(DISTANCES).field(&landmark_str); - let msg_dist = col("msg").field(&landmark_str); - - // If message contains a distance for this landmark, consider it + 1 as a candidate - // Choose the minimum of current distance and received distance + 1 - // If both are NULL, result is NULL - vec![ - lit(landmark_str), - when( - msg_dist.clone().is_not_null(), - when( - current_dist.clone().is_not_null(), - least(vec![current_dist.clone(), msg_dist.clone() + lit(1)]), - ) - .otherwise(msg_dist.clone() + lit(1)) - .expect("Failed to create inner when expression"), - ) - .otherwise(current_dist.clone()) - .expect("Failed to create outer when expression"), - ] - }) - .collect::>(), - ); - - // Voting condition: vertex votes to halt if its distances map didn't change - let voting_condition = col(DISTANCES).eq(col("updated_distances")); + let message_expr = pregel_src(DISTANCES); // Run Pregel algorithm let result = self + .graph_frame .pregel() // Add vertex columns - .add_vertex_column(DISTANCES, init_distances.clone(), col("updated_distances")) - .add_vertex_column("updated_distances", init_distances, aggregate_expr.clone()) + .add_vertex_column( + DISTANCES, + init_distances.clone(), + update_distances.call(vec![col(DISTANCES), col(PREGEL_MSG)]), + ) // Set participation condition - .with_participation_column(PARTICIPATING, init_participating, update_participating) - // Add message + .with_participation_column( + PARTICIPATING, + init_participating, + update_participating.clone(), + ) + // Add a message .add_message(message_expr, crate::pregel::MessageDirection::SrcToDst) // Set aggregate expression - .with_aggregate_expr(aggregate_expr) + .with_aggregate_expr(aggregate_expr_udaf.call(vec![col(PREGEL_MSG)])) // Set voting condition - .with_vertex_voting("active", voting_condition) + .with_vertex_voting("active", update_participating) // Set max iterations if provided - .max_iterations(max_iterations.unwrap_or(usize::MAX)) + .max_iterations(self.max_iterations) + // Set checkpoint interval + .checkpoint_interval(2) // Run the algorithm .run(false) .await?; @@ -252,3 +322,128 @@ impl GraphFrame { Ok(result.data.select(vec![col(VERTEX_ID), col(DISTANCES)])?) } } + +impl GraphFrame { + pub fn shortest_paths(&self, landmarks: Vec) -> ShortestPathsBuilder { + ShortestPathsBuilder::new(self, landmarks) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::arrow::array::{Int32Array, Int64Array, RecordBatch}; + use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion::prelude::SessionContext; + use std::sync::Arc; + + fn create_small_test_graph() -> Result { + let ctx = SessionContext::new(); + + let vertices_data = RecordBatch::try_new( + SchemaRef::from(Schema::new(vec![Field::new("id", DataType::Int64, false)])), + vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4]))], + )?; + let vertices = ctx.read_batch(vertices_data)?; + + let edges_data = RecordBatch::try_new( + SchemaRef::from(Schema::new(vec![ + Field::new("src", DataType::Int64, false), + Field::new("dst", DataType::Int64, false), + ])), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 2, 3])), + Arc::new(Int64Array::from(vec![2, 3, 4, 4])), + ], + )?; + let edges = ctx.read_batch(edges_data)?; + + Ok(GraphFrame { vertices, edges }) + } + + #[tokio::test] + async fn test_shortest_paths_single_landmark() -> Result<()> { + let graph = create_small_test_graph()?; + let landmarks = vec![1]; + let result = graph.shortest_paths(landmarks).run().await?; + let batches = result.collect().await?; + + for batch in batches { + let ids = batch + .column_by_name(VERTEX_ID) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..ids.len() { + let id = ids.value(i); + let distances = batch.column(1); + match id { + 1 => assert_eq!(map_extract_value(distances, i, 1), 0), + 2 => assert_eq!(map_extract_value(distances, i, 1), 1), + 3 => assert_eq!(map_extract_value(distances, i, 1), 2), + 4 => assert_eq!(map_extract_value(distances, i, 1), 3), + _ => panic!("Unexpected vertex id"), + } + } + } + Ok(()) + } + + #[tokio::test] + async fn test_shortest_paths_multiple_landmarks() -> Result<()> { + let graph = create_small_test_graph()?; + let landmarks = vec![1, 4]; + let result = graph.shortest_paths(landmarks).run().await?; + let batches = result.collect().await?; + + for batch in batches { + let ids = batch + .column_by_name(VERTEX_ID) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..ids.len() { + let id = ids.value(i); + let distances = batch.column(1); + match id { + 1 => { + assert_eq!(map_extract_value(distances, i, 1), 0); + assert_eq!(map_extract_value(distances, i, 4), 3); + } + 2 => { + assert_eq!(map_extract_value(distances, i, 1), 1); + assert_eq!(map_extract_value(distances, i, 4), 2); + } + 3 => { + assert_eq!(map_extract_value(distances, i, 1), 2); + assert_eq!(map_extract_value(distances, i, 4), 1); + } + 4 => { + assert_eq!(map_extract_value(distances, i, 1), 3); + assert_eq!(map_extract_value(distances, i, 4), 0); + } + _ => panic!("Unexpected vertex id"), + } + } + } + Ok(()) + } + + fn map_extract_value(array: &ArrayRef, row: usize, key: i64) -> i32 { + let scalar = ScalarValue::try_from_array(array, row).unwrap(); + if let ScalarValue::Map(map) = scalar { + let keys = map.keys().as_any().downcast_ref::().unwrap(); + let values = map.values().as_any().downcast_ref::().unwrap(); + for (i, k) in keys.iter().enumerate() { + if k == Some(key) { + return values.value(i); + } + } + } + panic!("Key not found in map") + } +} From 5b52968c198e18bf990f82201fa733b9f9ec309d Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Tue, 29 Jul 2025 18:28:47 +0200 Subject: [PATCH 3/4] Shortest Paths Algorithm --- src/pregel.rs | 5 +- src/shortest_paths.rs | 540 +++++++++++++++++++++--------------------- 2 files changed, 275 insertions(+), 270 deletions(-) diff --git a/src/pregel.rs b/src/pregel.rs index 3a99a36..efbedf8 100644 --- a/src/pregel.rs +++ b/src/pregel.rs @@ -314,7 +314,10 @@ impl PregelBuilder { // are not participating in iteration. if self.participation_column.is_some() { let participation_column = self.participation_column.as_ref().unwrap(); - triplets = triplets.filter(pregel_src(&participation_column.name))? + triplets = triplets.filter( + pregel_src(&participation_column.name) + .or(pregel_dst(&participation_column.name)), + )?; } // Unfortunately, "unnest" does not allow passing to it an array of expression; diff --git a/src/shortest_paths.rs b/src/shortest_paths.rs index 72ee15d..0ed8309 100644 --- a/src/shortest_paths.rs +++ b/src/shortest_paths.rs @@ -1,15 +1,14 @@ -use crate::pregel::{PREGEL_MSG, pregel_src}; +use crate::pregel::{PREGEL_MSG, pregel_dst}; use crate::{GraphFrame, VERTEX_ID}; -use datafusion::arrow::array::{ - Array, ArrayRef, Int32Array, Int32Builder, Int64Array, Int64Builder, MapBuilder, MapFieldNames, - as_map_array, -}; +use arrow::compute::min; +use datafusion::arrow; +use datafusion::arrow::array::{ArrayRef, Int32Array, StructArray, as_struct_array}; use datafusion::arrow::datatypes::{DataType, Field, Fields}; use datafusion::common::ScalarValue; -use datafusion::common::cast::{as_int32_array, as_int64_array}; -use datafusion::error::{DataFusionError, Result}; -use datafusion::functions_nested::map::map; -use datafusion::logical_expr::{ColumnarValue, Volatility}; +use datafusion::common::cast::as_int32_array; +use datafusion::error::Result; +use datafusion::functions::core::expr_ext::FieldAccessor; +use datafusion::logical_expr::Volatility; use datafusion::physical_plan::Accumulator; use datafusion::prelude::*; use std::collections::HashMap; @@ -18,8 +17,13 @@ use std::sync::Arc; /// Column name for distances in the Shortest Paths algorithm pub const DISTANCES: &str = "distances"; +/// Internal accumulator type for maintaining the shortest path distances from landmarks. +/// +/// This accumulator keeps track of the minimum distances from each vertex to a set of landmark vertices. +/// The distances are stored in a HashMap where keys are landmark IDs and values are minimum distances. #[derive(Debug)] struct DistancesMap { + /// Maps landmark vertex IDs to their current minimum distances distances: HashMap, } @@ -31,44 +35,36 @@ impl DistancesMap { } } +/// Implementation of DataFusion's Accumulator trait for DistancesMap. +/// +/// The accumulator maintains and updates minimum distances from vertices to landmarks: +/// 1. Update batch processes with new distance values: +/// - Examines each landmark's distances from incoming struct arrays +/// - Updates the stored distance if the incoming value is smaller +/// 2. Evaluate converts the current state to ScalarValue +/// 3. State converts distances to struct array format with: +/// - Fields for each landmark ID +/// - Arrays containing current minimum distances +/// 4. Merge batch combines multiple states using the same logic as update +/// impl Accumulator for DistancesMap { - // Internal state: Map -- mapping from landmarks to distances; - // Update logic is the same as merge: for all the keys taking the minimal value; - // Result is the same as state and input rows; - // - // Transform multiple Map into a single Map with minimal values per key. fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if values.is_empty() { return Ok(()); } - let array_of_maps = &values[0]; - (0..array_of_maps.len()).try_for_each(|index| { - let new_map = ScalarValue::try_from_array(array_of_maps, index)?; - - if let ScalarValue::Map(value) = new_map { - let keys = value.keys().as_any().downcast_ref::().unwrap(); - let values = value - .values() - .as_any() - .downcast_ref::() - .unwrap(); - for (k, v) in keys.iter().zip(values.iter()) { - match (k, v) { - (Some(k), Some(v)) => { - if v < self.distances[&k] { - self.distances.insert(k, v); - } - } - _ => { - return Err(DataFusionError::Plan( - "Invalid map entry in DistancesMap accumulator".to_string(), - )); - } - } - } + let array_of_structs = as_struct_array(&values[0]); + for key in self.distances.clone().keys() { + let array_for_key = array_of_structs.column_by_name(&key.to_string()); + let min_distance_from_incoming = array_for_key + .map(|array| min(as_int32_array(array).ok()?)) + .unwrap_or(Some(i32::MAX)) + .unwrap_or(i32::MAX); + + if min_distance_from_incoming < self.distances[key] { + self.distances.insert(*key, min_distance_from_incoming); } - Ok(()) - }) + } + Ok(()) } fn evaluate(&mut self) -> Result { @@ -80,29 +76,24 @@ impl Accumulator for DistancesMap { } fn state(&mut self) -> Result> { - let l_marks_builder = Int64Builder::with_capacity(self.distances.len()); - let distances_builder = Int32Builder::with_capacity(self.distances.len()); - let mut map_builder = MapBuilder::new( - Some(MapFieldNames { - entry: "entries".to_string(), - key: "key".to_string(), - value: "value".to_string(), - }), - l_marks_builder, - distances_builder, + let mut sorted_keys = self.distances.keys().clone().collect::>(); + sorted_keys.sort(); + let fields = Fields::from( + sorted_keys + .iter() + .map(|k| Field::new(k.to_string(), DataType::Int32, true)) + .collect::>(), ); + let arrays = sorted_keys + .iter() + .map(|k| self.distances.get(k).unwrap()) + .map(|v| Int32Array::from(vec![*v])) + .map(|arr| Arc::new(arr) as ArrayRef) + .collect::>(); + let nulls = None; + let scalar_value = ScalarValue::Struct(Arc::new(StructArray::new(fields, arrays, nulls))); - map_builder.keys().append_array(&Int64Array::from( - self.distances.keys().map(|k| k.clone()).collect::>(), - )); - map_builder.values().append_array(&Int32Array::from( - self.distances - .values() - .map(|v| v.clone()) - .collect::>(), - )); - map_builder.append(true)?; - Ok(vec![ScalarValue::Map(Arc::new(map_builder.finish()))]) + Ok(vec![scalar_value]) } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { @@ -110,165 +101,121 @@ impl Accumulator for DistancesMap { } } -fn merge_distances_maps(args: &[ColumnarValue]) -> Result { - let arrays = ColumnarValue::values_to_arrays(args)?; - let left_map = as_map_array(&arrays[0]); - let right_map = as_map_array(&arrays[1]); - - if left_map.len() != right_map.len() { - return Err(DataFusionError::Plan(format!( - "Invalid map length in merge_distances_maps: {} != {}", - left_map.len(), - right_map.len() - ))); - } - let keys = as_int64_array(left_map.keys())?; - let left_values = as_int32_array(left_map.values())?; - let right_values = as_int32_array(right_map.values())?; - - let new_keys_builder = Int64Builder::with_capacity(left_map.len()); - let new_values_builder = Int32Builder::with_capacity(left_map.len()); - let mut builder = MapBuilder::new( - Some(MapFieldNames { - entry: "entries".to_string(), - key: "key".to_string(), - value: "value".to_string(), - }), - new_keys_builder, - new_values_builder, - ); - - for map_idx in 0..left_map.len() { - if left_values.is_null(map_idx) || right_values.is_null(map_idx) { - builder.append(false)?; - continue; - } - let left_start = left_map.value_offsets()[map_idx] as usize; - let left_end = left_map.value_offsets()[map_idx + 1] as usize; - - let right_start = right_map.value_offsets()[map_idx] as usize; - - for (pos, i) in (left_start..left_end).enumerate() { - builder.keys().append_value(keys.value(i)); - let left_value = left_values.value(i); - let right_value = right_values.value(right_start + pos); - if left_value < right_value { - builder.values().append_value(left_value); - } else { - builder.values().append_value(right_value); - } - } - builder.append(true)?; - } - - Ok(ColumnarValue::Array(Arc::new(builder.finish()))) -} - +/// Builder for configuring and running shortest paths computation from vertices to landmarks. +/// +/// This builder helps configure and execute a Pregel algorithm that computes the shortest paths +/// from all vertices in the graph to a specified set of landmark vertices. pub struct ShortestPathsBuilder<'a> { + /// Reference to the graph frame containing vertices and edges graph_frame: &'a GraphFrame, + /// Vector of vertex IDs designated as landmarks landmarks: Vec, + /// Maximum number of iterations to run the algorithm max_iterations: usize, + /// Interval at which to checkpoint the computation state checkpoint_interval: usize, } impl<'a> ShortestPathsBuilder<'a> { + /// Creates a new ShortestPathsBuilder with the specified graph and landmarks. + /// + /// # Arguments + /// * `graph_frame` - The graph frame to compute the shortest paths on + /// * `landmarks` - Vector of vertex IDs to use as landmarks pub fn new(graph_frame: &'a GraphFrame, landmarks: Vec) -> Self { + let mut sorted_landmarks = landmarks.clone(); + sorted_landmarks.sort(); Self { graph_frame, - landmarks, + landmarks: sorted_landmarks, max_iterations: i32::MAX as usize, - checkpoint_interval: 2, + checkpoint_interval: 1, } } + /// Sets the maximum number of iterations for the algorithm. + /// + /// # Arguments + /// * `max_iterations` - Maximum number of iterations to run pub fn max_iterations(mut self, max_iterations: usize) -> Self { self.max_iterations = max_iterations; self } + + /// Sets the interval at which to checkpoint computation state. + /// + /// # Arguments + /// * `checkpoint_interval` - Number of iterations between checkpoints pub fn checkpoint_interval(mut self, checkpoint_interval: usize) -> Self { self.checkpoint_interval = checkpoint_interval; self } pub async fn run(self) -> Result { - let internal_distances_data_type = DataType::Map( - Arc::new(Field::new( - "entries", - DataType::Struct(Fields::from(vec![ - Field::new("key", DataType::Int64, false), - Field::new("value", DataType::Int32, true), - ])), - false, - )), - false, - ); - const PARTICIPATING: &str = "participating"; - let landmarks_list = self - .landmarks - .iter() - .map(|lm| lit(lm.clone())) - .collect::>(); - let zero_map = map( - landmarks_list.clone(), - vec![lit(i32::MAX); self.landmarks.len()], - ); + // The data type for storing distances to landmarks; + // A struct in a form lm -> distance + let internal_distance_data_type = DataType::Struct(Fields::from( + self.landmarks + .clone() + .into_iter() + .map(|lm| Field::new(lm.to_string(), DataType::Int32, true)) + .collect::>(), + )); // For landmarks: // - distance to itself is 0 // - distance to other landmarks is infinity // For non-landmarks: // - distance to other landmarks is infinity - let init_distances = self - .landmarks - .clone() - .iter() - .fold( - when( - col(VERTEX_ID).in_list(landmarks_list.clone(), true), - zero_map.clone(), - ), - |mut builder, &landmark| { - builder.when( - col(VERTEX_ID).eq(lit(landmark)), - map( - landmarks_list.clone(), - self.landmarks - .clone() - .iter() - .map(|lm_value| { - if lm_value == &landmark { - lit(0i32) - } else { - lit(i32::MAX) - } - }) - .collect::>(), - ), - ) - }, - ) - .otherwise(zero_map.clone())?; // otherwise should be unreachable - - let update_distances = create_udf( - "_merge_two_maps", - vec![ - internal_distances_data_type.clone(), - internal_distances_data_type.clone(), - ], - internal_distances_data_type.clone(), - Volatility::Immutable, - Arc::new(merge_distances_maps), + let init_distances = named_struct( + self.landmarks + .clone() + .iter() + .flat_map(|lm| { + vec![ + lit(lm.to_string()), + when(col(VERTEX_ID).eq(lit(lm.clone())), lit(0i32)) + .otherwise(lit(i32::MAX)) + .unwrap(), + ] + }) + .collect::>(), ); + // The logic of updating distances: + // - If no message received (PREGEL_MSG is null), keep existing distances + // - Otherwise, for each landmark: + // - Compare current distance with received distance + // - Keep the minimum of the two values + let update_distances = + when(col(PREGEL_MSG).is_null(), col(DISTANCES)).otherwise(named_struct( + self.landmarks + .clone() + .iter() + .flat_map(|lm| { + let left_value = col(DISTANCES).field(lm.to_string()); + let right_value = col(PREGEL_MSG).field(lm.to_string()); + vec![ + lit(lm.to_string()), + when(left_value.clone().lt_eq(right_value.clone()), left_value) + .otherwise(right_value) + .unwrap(), + ] + }) + .collect::>(), + ))?; + + const PARTICIPATING: &str = "participating"; + let landmarks_copy = Arc::new(self.landmarks.clone()); let aggregate_expr_udaf = create_udaf( "_merge_distance_maps", - vec![internal_distances_data_type.clone()], - Arc::new(internal_distances_data_type.clone()), + vec![internal_distance_data_type.clone()], + Arc::new(internal_distance_data_type.clone()), Volatility::Immutable, Arc::new(move |_| Ok(Box::new(DistancesMap::new(landmarks_copy.clone())))), - Arc::new(vec![internal_distances_data_type.clone()]), + Arc::new(vec![internal_distance_data_type.clone()]), ); // Initialize participation: only landmarks participate initially @@ -279,25 +226,39 @@ impl<'a> ShortestPathsBuilder<'a> { // Update participation condition: a vertex participates if it already participates or // if it receives a message (meaning it's a neighbor of a participating vertex) let update_participating = - col(PREGEL_MSG) - .is_not_null() - .and(map_values(col(DISTANCES)).not_eq(map_values( - update_distances.call(vec![col(DISTANCES), col(PREGEL_MSG)]), - ))); - - // Message to send: current distances map - let message_expr = pregel_src(DISTANCES); + self.landmarks + .clone() + .iter() + .fold(lit(false), |acc, &landmark| { + acc.or(col(DISTANCES) + .field(landmark.to_string()) + .gt(col(PREGEL_MSG).field(landmark.to_string()))) + }); + + // Message to send: current distances map + 1 + let message_expr = named_struct( + self.landmarks + .clone() + .iter() + .flat_map(|lm| { + let col_name = lm.to_string(); + let d_col = pregel_dst(DISTANCES).field(col_name.clone()); + vec![ + lit(col_name), + when(d_col.clone().lt(lit(i32::MAX)), d_col + lit(1i32)) + .otherwise(lit(i32::MAX)) + .unwrap(), + ] + }) + .collect(), + ); // Run Pregel algorithm let result = self .graph_frame .pregel() // Add vertex columns - .add_vertex_column( - DISTANCES, - init_distances.clone(), - update_distances.call(vec![col(DISTANCES), col(PREGEL_MSG)]), - ) + .add_vertex_column(DISTANCES, init_distances.clone(), update_distances) // Set participation condition .with_participation_column( PARTICIPATING, @@ -305,7 +266,7 @@ impl<'a> ShortestPathsBuilder<'a> { update_participating.clone(), ) // Add a message - .add_message(message_expr, crate::pregel::MessageDirection::SrcToDst) + .add_message(message_expr, crate::pregel::MessageDirection::DstToSrc) // Set aggregate expression .with_aggregate_expr(aggregate_expr_udaf.call(vec![col(PREGEL_MSG)])) // Set voting condition @@ -324,6 +285,13 @@ impl<'a> ShortestPathsBuilder<'a> { } impl GraphFrame { + /// Computes shortest paths from all vertices to a set of landmark vertices. + /// + /// # Arguments + /// * `landmarks` - Vector of vertex IDs to use as landmarks for computing the shortest paths + /// + /// # Returns + /// a Builder object to configure and execute the shortest paths computation pub fn shortest_paths(&self, landmarks: Vec) -> ShortestPathsBuilder { ShortestPathsBuilder::new(self, landmarks) } @@ -332,7 +300,7 @@ impl GraphFrame { #[cfg(test)] mod tests { use super::*; - use datafusion::arrow::array::{Int32Array, Int64Array, RecordBatch}; + use datafusion::arrow::array::{Int64Array, RecordBatch}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::prelude::SessionContext; use std::sync::Arc; @@ -352,8 +320,8 @@ mod tests { Field::new("dst", DataType::Int64, false), ])), vec![ - Arc::new(Int64Array::from(vec![1, 2, 2, 3])), - Arc::new(Int64Array::from(vec![2, 3, 4, 4])), + Arc::new(Int64Array::from(vec![1, 2, 2, 3, 4, 4, 2, 3])), + Arc::new(Int64Array::from(vec![2, 3, 4, 4, 1, 2, 1, 2])), ], )?; let edges = ctx.read_batch(edges_data)?; @@ -363,87 +331,121 @@ mod tests { #[tokio::test] async fn test_shortest_paths_single_landmark() -> Result<()> { + let ctx = SessionContext::new(); let graph = create_small_test_graph()?; let landmarks = vec![1]; let result = graph.shortest_paths(landmarks).run().await?; - let batches = result.collect().await?; - - for batch in batches { - let ids = batch - .column_by_name(VERTEX_ID) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(); - - for i in 0..ids.len() { - let id = ids.value(i); - let distances = batch.column(1); - match id { - 1 => assert_eq!(map_extract_value(distances, i, 1), 0), - 2 => assert_eq!(map_extract_value(distances, i, 1), 1), - 3 => assert_eq!(map_extract_value(distances, i, 1), 2), - 4 => assert_eq!(map_extract_value(distances, i, 1), 3), - _ => panic!("Unexpected vertex id"), - } - } - } + + // Create expected results + let expected_data = RecordBatch::try_new( + SchemaRef::from(Schema::new(vec![ + Field::new("expected_id", DataType::Int64, false), + Field::new( + "expected_distances", + DataType::Struct(Fields::from(vec![Field::new("1", DataType::Int32, true)])), + false, + ), + ])), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4])), + Arc::new(StructArray::new( + Fields::from(vec![Field::new("1", DataType::Int32, true)]), + vec![Arc::new(Int32Array::from(vec![0, 1, 2, 1]))], + None, + )), + ], + )?; + let expected = ctx.read_batch(expected_data)?; + + // Join and compare results + let comparison = result.join( + expected, + JoinType::Inner, + &[VERTEX_ID], + &["expected_id"], + None, + )?; + + let diff = comparison + .filter( + col(DISTANCES) + .field("1") + .not_eq(col("expected_distances").field("1")), + )? + .select(vec![ + col(VERTEX_ID), + col(DISTANCES).field("1"), + col("expected_distances").field("1"), + ])?; + + assert_eq!( + diff.count().await?, + 0, + "Found differences in shortest paths" + ); Ok(()) } #[tokio::test] async fn test_shortest_paths_multiple_landmarks() -> Result<()> { + let ctx = SessionContext::new(); let graph = create_small_test_graph()?; let landmarks = vec![1, 4]; let result = graph.shortest_paths(landmarks).run().await?; - let batches = result.collect().await?; - - for batch in batches { - let ids = batch - .column_by_name(VERTEX_ID) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(); - - for i in 0..ids.len() { - let id = ids.value(i); - let distances = batch.column(1); - match id { - 1 => { - assert_eq!(map_extract_value(distances, i, 1), 0); - assert_eq!(map_extract_value(distances, i, 4), 3); - } - 2 => { - assert_eq!(map_extract_value(distances, i, 1), 1); - assert_eq!(map_extract_value(distances, i, 4), 2); - } - 3 => { - assert_eq!(map_extract_value(distances, i, 1), 2); - assert_eq!(map_extract_value(distances, i, 4), 1); - } - 4 => { - assert_eq!(map_extract_value(distances, i, 1), 3); - assert_eq!(map_extract_value(distances, i, 4), 0); - } - _ => panic!("Unexpected vertex id"), - } - } - } - Ok(()) - } - fn map_extract_value(array: &ArrayRef, row: usize, key: i64) -> i32 { - let scalar = ScalarValue::try_from_array(array, row).unwrap(); - if let ScalarValue::Map(map) = scalar { - let keys = map.keys().as_any().downcast_ref::().unwrap(); - let values = map.values().as_any().downcast_ref::().unwrap(); - for (i, k) in keys.iter().enumerate() { - if k == Some(key) { - return values.value(i); - } - } - } - panic!("Key not found in map") + // Create expected results + let expected_data = RecordBatch::try_new( + SchemaRef::from(Schema::new(vec![ + Field::new("expected_id", DataType::Int64, false), + Field::new( + "expected_distances", + DataType::Struct(Fields::from(vec![ + Field::new("1", DataType::Int32, true), + Field::new("4", DataType::Int32, true), + ])), + false, + ), + ])), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4])), + Arc::new(StructArray::new( + Fields::from(vec![ + Field::new("1", DataType::Int32, true), + Field::new("4", DataType::Int32, true), + ]), + vec![ + Arc::new(Int32Array::from(vec![0, 1, 2, 1])), + Arc::new(Int32Array::from(vec![2, 1, 1, 0])), + ], + None, + )), + ], + )?; + let expected = ctx.read_batch(expected_data)?; + + // Join and compare results + let comparison = result.join( + expected, + JoinType::Inner, + &[VERTEX_ID], + &["expected_id"], + None, + )?; + + let diff = comparison.filter( + col(DISTANCES) + .field("1") + .not_eq(col("expected_distances").field("1")) + .or(col(DISTANCES) + .field("4") + .not_eq(col("expected_distances").field("4"))), + )?; + + assert_eq!( + diff.count().await?, + 0, + "Found differences in shortest paths" + ); + Ok(()) } } From 5aaf8fe25de8c13f9a86c0b5c13d4faeef03b8d1 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Tue, 29 Jul 2025 18:29:36 +0200 Subject: [PATCH 4/4] Update README --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c93a89d..20472df 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,8 @@ processing capabilities on top of DataFusion's DataFrame API. ## About -This project aims to bring the power of GraphFrames to the Apache DataFusion ecosystem by leveraging DataFrame capabilities. +This project aims to bring the power of GraphFrames to the Apache DataFusion ecosystem by leveraging DataFrame +capabilities. It provides a similar API to Apache Spark's GraphFrames. ## Project Status @@ -20,8 +21,8 @@ and Pregel API. | Graph Abstraction | ✓ | ✓ | | Basic Statistics (degree, etc.) | ✓ | ✓ | | Pregel API | ✓ | ✓ | -| Shortest Paths | ✓ | In Progress | -| PageRank | ✓ | Planned | +| Shortest Paths | ✓ | ✓ | +| PageRank | ✓ | ✓ | | Parallel Personalized PageRank | ✓ | Planned | | Connected Components | ✓ | Planned | | Strongly Connected Components | ✓ | Planned |