Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ pub struct SimpleSpec {
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub worker_timeout_s: u64,

/// Maximum time (seconds) an action can stay in Executing state without
/// any worker update before being timed out and re-queued.
/// This applies regardless of worker keepalive status, catching cases
/// where a worker is alive (sending keepalives) but stuck on a specific
/// action. Set to 0 to disable (relies only on worker_timeout_s).
///
/// Default: 0 (disabled)
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub max_action_executing_timeout_s: u64,

/// If a job returns an internal error or times out this many times when
/// attempting to run on a worker the scheduler will return the last error
/// to the client. Jobs will be retried and this configuration is to help
Expand Down
1 change: 1 addition & 0 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ impl SimpleScheduler {
max_job_retries,
Duration::from_secs(worker_timeout_s),
Duration::from_secs(client_action_timeout_s),
Duration::from_secs(spec.max_action_executing_timeout_s),
awaited_action_db,
now_fn,
Some(worker_registry.clone()),
Expand Down
18 changes: 13 additions & 5 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ where
/// if it is not being processed by any worker.
client_action_timeout: Duration,

/// Maximum time an action can stay in Executing state without any worker
/// update, regardless of worker keepalive status. Duration::ZERO disables.
max_executing_timeout: Duration,

// A lock to ensure only one timeout operation is running at a time
// on this service.
timeout_operation_mux: Mutex<()>,
Expand Down Expand Up @@ -325,6 +329,7 @@ where
max_job_retries: usize,
no_event_action_timeout: Duration,
client_action_timeout: Duration,
max_executing_timeout: Duration,
action_db: T,
now_fn: NowFn,
worker_registry: Option<SharedWorkerRegistry>,
Expand All @@ -334,6 +339,7 @@ where
max_job_retries,
no_event_action_timeout,
client_action_timeout,
max_executing_timeout,
timeout_operation_mux: Mutex::new(()),
weak_self: weak_self.clone(),
now_fn,
Expand Down Expand Up @@ -361,6 +367,12 @@ where
};

if registry_alive {
if self.max_executing_timeout > Duration::ZERO {
let last_update = awaited_action.last_worker_updated_timestamp();
if let Ok(elapsed) = now.duration_since(last_update) {
return elapsed > self.max_executing_timeout;
}
}
return false;
}

Expand All @@ -369,11 +381,7 @@ where
.checked_add(self.no_event_action_timeout)
.unwrap_or(now);

if worker_should_update_before >= now {
return false;
}

true
worker_should_update_before < now
}

async fn apply_filter_predicate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async fn drops_missing_actions() -> Result<(), Error> {
0,
Duration::from_secs(10),
Duration::from_secs(10),
Duration::ZERO,
awaited_action_db,
SystemTime::now,
None,
Expand Down
Loading