This repository was archived by the owner on Apr 14, 2026. It is now read-only.
forked from Abraxas-365/langchain-rust
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathlanggraph_parallel_execution.rs
More file actions
119 lines (103 loc) · 4.37 KB
/
langgraph_parallel_execution.rs
File metadata and controls
119 lines (103 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use langchain_ai_rust::langgraph::{
function_node, DurabilityMode, InMemorySaver, MessagesState, RunnableConfig, StateGraph, END,
START,
};
use langchain_ai_rust::schemas::messages::Message;
/// Parallel execution example for LangGraph
///
/// This example demonstrates:
/// 1. Creating a graph with multiple nodes that can execute in parallel
/// 2. Using super-step execution model
/// 3. Different durability modes
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create nodes that can execute in parallel
let node1 = function_node("node1", |_state: &MessagesState| async move {
use std::collections::HashMap;
let mut update = HashMap::new();
update.insert(
"messages".to_string(),
serde_json::to_value(vec![Message::new_ai_message("Message from node1")])?,
);
Ok(update)
});
let node2 = function_node("node2", |_state: &MessagesState| async move {
use std::collections::HashMap;
let mut update = HashMap::new();
update.insert(
"messages".to_string(),
serde_json::to_value(vec![Message::new_ai_message("Message from node2")])?,
);
Ok(update)
});
let node3 = function_node("node3", |_state: &MessagesState| async move {
use std::collections::HashMap;
let mut update = HashMap::new();
update.insert(
"messages".to_string(),
serde_json::to_value(vec![Message::new_ai_message("Message from node3")])?,
);
Ok(update)
});
// Build the graph
// node1 and node2 can execute in parallel (both from START)
// node3 executes after node1 and node2 complete
let mut graph = StateGraph::<MessagesState>::new();
graph.add_node("node1", node1)?;
graph.add_node("node2", node2)?;
graph.add_node("node3", node3)?;
// Both node1 and node2 start from START (parallel execution)
graph.add_edge(START, "node1");
graph.add_edge(START, "node2");
// node3 executes after both node1 and node2
graph.add_edge("node1", "node3");
graph.add_edge("node2", "node3");
graph.add_edge("node3", END);
// Create checkpointer
let checkpointer = std::sync::Arc::new(InMemorySaver::new());
// Compile with checkpointer
let compiled = graph.compile_with_persistence(Some(checkpointer.clone()), None)?;
// Example 1: Execute with Sync durability mode
println!("=== Example 1: Sync durability mode ===");
let config = RunnableConfig::with_thread_id("thread-parallel-1");
let initial_state = MessagesState::with_messages(vec![Message::new_human_message("start")]);
let final_state = compiled
.invoke_with_config_and_mode(Some(initial_state), &config, DurabilityMode::Sync)
.await?;
println!("Final messages count: {}", final_state.messages.len());
for message in &final_state.messages {
println!(
" {}: {}",
message.message_type.to_string(),
message.content
);
}
// Example 2: Execute with Async durability mode
println!("\n=== Example 2: Async durability mode ===");
let config = RunnableConfig::with_thread_id("thread-parallel-2");
let initial_state = MessagesState::with_messages(vec![Message::new_human_message("start")]);
let final_state = compiled
.invoke_with_config_and_mode(Some(initial_state), &config, DurabilityMode::Async)
.await?;
println!("Final messages count: {}", final_state.messages.len());
// Example 3: Execute with Exit durability mode
println!("\n=== Example 3: Exit durability mode ===");
let config = RunnableConfig::with_thread_id("thread-parallel-3");
let initial_state = MessagesState::with_messages(vec![Message::new_human_message("start")]);
let final_state = compiled
.invoke_with_config_and_mode(Some(initial_state), &config, DurabilityMode::Exit)
.await?;
println!("Final messages count: {}", final_state.messages.len());
// Get state history to see checkpoints
let history = compiled.get_state_history(&config).await?;
println!("\nCheckpoints created: {}", history.len());
for (i, snapshot) in history.iter().enumerate() {
println!(
" {}: step={:?}, executed_nodes={:?}",
i + 1,
snapshot.metadata.get("step"),
snapshot.metadata.get("executed_nodes")
);
}
Ok(())
}