- Module Structure
- Design Decisions
- Concurrency Model
- Performance Optimizations
- Signal Handling
- Fail-Fast Mode
The parallel executor is organized into focused modules:
executor/parallel.rs- ParallelExecutor core logic (412 lines)executor/execution_strategy.rs- Task spawning and progress bars (257 lines)executor/connection_manager.rs- SSH connection setup (168 lines)executor/result_types.rs- Result types (119 lines)executor/mod.rs- Public API exports (25 lines)
- Tokio-based async execution for maximum concurrency
- Semaphore-based concurrency limiting to prevent resource exhaustion
- Progress bar visualization using
indicatif - Streaming output collection for real-time feedback
The executor uses a semaphore-based concurrency model to limit parallel execution:
let semaphore = Arc::new(Semaphore::new(max_parallel));
let tasks: Vec<JoinHandle<Result<ExecutionResult>>> = nodes
.into_iter
.map(|node| {
let permit = semaphore.clone.acquire_owned;
tokio::spawn(async move {
let _permit = permit.await;
execute_on_node(node, command).await
})
})
.collect;This approach ensures:
- Limited number of concurrent SSH connections
- Efficient resource utilization
- Prevention of connection exhaustion
- Configurable parallelism via CLI flags
- Connection reuse within same node (planned)
- Buffered I/O for output collection - Reduces syscall overhead
- Early termination on critical failures - Stops execution when failures occur
The executor supports two modes for handling Ctrl+C (SIGINT) signals during parallel execution.
- First Ctrl+C: Displays status (running/completed job counts)
- Second Ctrl+C (within 1 second): Terminates all jobs immediately with exit code 130
- Time window reset: If >1 second passes, next Ctrl+C restarts the sequence and shows status again
- Provides users visibility into execution progress before termination
- Single Ctrl+C: Immediately terminates all jobs with exit code 130
- Optimized for non-interactive environments (CI/CD, scripts)
- Compatible with pdsh
-boption for tool compatibility
- Normal completion: Exit code determined by ExitCodeStrategy (MainRank/RequireAllSuccess/etc.)
- Signal termination (Ctrl+C): Always exits with code 130 (standard SIGINT exit code)
- This ensures scripts can detect user interruption vs. command failure
Signal handling is implemented in both execution modes:
executemethod (normal/progress bar mode) - lines 172-280handle_stream_modemethod (stream mode) - lines 714-838- TUI mode has its own quit handling (q or Ctrl+C) and ignores the batch flag
Implementation is in executor/parallel.rs using tokio::select! to handle signals alongside normal execution:
loop {
tokio::select! {
_ = signal::ctrl_c => {
if self.batch {
// Batch mode: terminate immediately
eprintln!("\nReceived Ctrl+C (batch mode). Terminating all jobs...");
for handle in pending_handles.drain(..) {
handle.abort;
}
// Exit with SIGINT exit code (130)
std::process::exit(130);
} else {
// Two-stage mode: first shows status, second terminates
if !first_ctrl_c {
first_ctrl_c = true;
ctrl_c_time = Some(std::time::Instant::now);
eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate.");
// Show status
let running_count = pending_handles.len;
let completed_count = self.nodes.len - running_count;
eprintln!("Status: {} running, {} completed", running_count, completed_count);
} else {
// Second Ctrl+C: check time window
if let Some(first_time) = ctrl_c_time {
if first_time.elapsed <= Duration::from_secs(1) {
// Within time window: terminate
eprintln!("Received second Ctrl+C. Terminating all jobs...");
for handle in pending_handles.drain(..) {
handle.abort;
}
// Exit with SIGINT exit code (130)
std::process::exit(130);
} else {
// Time window expired: reset and show status again
first_ctrl_c = true;
ctrl_c_time = Some(std::time::Instant::now);
eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate.");
// Show current status
let running_count = pending_handles.len;
let completed_count = self.nodes.len - running_count;
eprintln!("Status: {} running, {} completed", running_count, completed_count);
}
}
}
}
}
// Wait for all tasks to complete
results = join_all(pending_handles.iter_mut) => {
return self.collect_results(results);
}
}
// Small sleep to avoid busy waiting
tokio::time::sleep(Duration::from_millis(50)).await;
}The batch flag is passed through the executor chain:
- CLI
--batchflag →ExecuteCommandParams.batch→ParallelExecutor.batch - Applied in both normal mode (
execute) and stream mode (handle_stream_mode) - TUI mode maintains its own quit handling and ignores this flag
The --fail-fast / -k option enables immediate termination when any node fails. This is compatible with pdsh's -k flag and useful for:
- Critical operations where partial execution is unacceptable
- Deployment scripts where all nodes must succeed
- Validation checks across clusters
Implementation uses cancellation signaling via tokio::sync::watch:
// Cancellation signaling via tokio::sync::watch
let (cancel_tx, cancel_rx) = watch::channel(false);
// Task selection with cancellation check
tokio::select! {
biased; // Prioritize cancellation check
_ = cancel_rx.changed => {
// Task cancelled due to fail-fast
return Err(anyhow!("Execution cancelled due to fail-fast"));
}
permit = semaphore.acquire => {
// Execute task normally
}
}The fail-fast mode integrates with:
--require-all-success: Both require all nodes to succeed, but fail-fast stops early--check-all-nodes: Fail-fast stops early, check-all-nodes affects final exit code--parallel N: Cancels pending tasks waiting in the semaphore queue
Related Documentation: