From a5e02f3a8a2086e5ce9d70b79e407740017eade8 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Thu, 21 May 2026 18:24:55 +0200 Subject: [PATCH] Fix self-retracting --- CHANGELOG.md | 6 +++ crates/tako/src/internal/scheduler/mapping.rs | 6 ++- .../tako/src/internal/scheduler/taskqueue.rs | 2 +- crates/tako/src/internal/server/reactor.rs | 18 +++++++-- .../tako/src/internal/tests/test_reactor.rs | 40 +++++++++++++++++++ 5 files changed, 66 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 91c456571..a97fc0153 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Fixes + +* Fixed a rare server crash when self retracting occurs. + ## v0.26.0 ### Breaking change diff --git a/crates/tako/src/internal/scheduler/mapping.rs b/crates/tako/src/internal/scheduler/mapping.rs index a74756bab..306110fc4 100644 --- a/crates/tako/src/internal/scheduler/mapping.rs +++ b/crates/tako/src/internal/scheduler/mapping.rs @@ -46,9 +46,12 @@ pub(crate) fn create_task_mapping( let task_id = tasks[task_idx]; worker_map.get_worker_mut(*w_id).insert_sn_task(task_id, rq); let task = task_map.get_task_mut(task_id); + log::debug!( + "Task={task_id} ({:?}) assigned to worker={w_id}", + task.state + ); let new_state = match &task.state { TaskRuntimeState::Waiting { .. } => { - log::debug!("Waiting task={} assigned to worker={}", task_id, w_id); mapping .workers .entry(*w_id) @@ -212,6 +215,7 @@ fn process_proactive_filling(core: &mut Core, mapping: &mut WorkerTaskMapping) { for worker in workers { let tasks = queue.take_tasks_for_prefill(prefill_size); for task_id in &tasks { + log::debug!("Prefiling task={task_id} to worker={}", worker.id); let task = task_map.get_task_mut(*task_id); assert!(task.is_waiting()); task.state = TaskRuntimeState::Prefilled { diff --git a/crates/tako/src/internal/scheduler/taskqueue.rs b/crates/tako/src/internal/scheduler/taskqueue.rs index bfdc8a5fb..e63a14c15 100644 --- a/crates/tako/src/internal/scheduler/taskqueue.rs +++ b/crates/tako/src/internal/scheduler/taskqueue.rs @@ -226,7 +226,7 @@ impl TaskQueue { #[inline] pub fn is_empty(&self) -> bool { - self.queue.is_empty() + self.queue.is_empty() && self.prefill.as_ref().is_none_or(|(_, t)| t.is_empty()) } #[inline] diff --git a/crates/tako/src/internal/server/reactor.rs b/crates/tako/src/internal/server/reactor.rs index 14f01a051..66f30eaa8 100644 --- a/crates/tako/src/internal/server/reactor.rs +++ b/crates/tako/src/internal/server/reactor.rs @@ -225,6 +225,7 @@ pub(crate) fn on_task_update( worker_id: WorkerId, updates: TaskUpdates, ) { + log::debug!("Task update for {worker_id}"); let mut need_scheduling = false; // This relies on the fact that when worker switching to prefill, it will send Finish, followed by Start // And this cannot happen in any other way @@ -266,6 +267,7 @@ fn task_running( worker_id: WorkerId, message: TaskRunningMsg, ) -> bool { + log::debug!("Set task={:?} to running state", message.task_id); let TaskRunningMsg { task_id, rv_id, @@ -310,10 +312,14 @@ fn task_running( assert_eq!(*w_id, worker_id); comm.ask_for_scheduling(); task.state = TaskRuntimeState::Running { worker_id, rv_id }; - let rqv = request_map.get(task.resource_rq_id); - worker_map - .get_worker_mut(worker_id) - .insert_sn_task(task_id, rqv.get(rv_id)); + // We have to call first try_remove_redirection and then insert_sn_task + // This cannot be done in reverse order because in rare cases + // we may be in a process of a dummy redirection (from a worker to the same worker). + // So the task is already assigned to worker_id, and calling insert_sn_task will + // fail because we are inserting already inserted task. + // By removing redirections first, we unassign the task so we can later assign it back + // In theory, we could optimize this special case by doing nothing, but it should be quite rare + // So I prefer to keep the code simple. try_remove_redirection( worker_map, scheduler_state, @@ -321,6 +327,10 @@ fn task_running( task_id, task.resource_rq_id, ); + let rqv = request_map.get(task.resource_rq_id); + worker_map + .get_worker_mut(worker_id) + .insert_sn_task(task_id, rqv.get(rv_id)); (simple_worker_list.as_slice(), false) } TaskRuntimeState::RunningMultiNode(ws) => { diff --git a/crates/tako/src/internal/tests/test_reactor.rs b/crates/tako/src/internal/tests/test_reactor.rs index 31f436c92..ddb87cf26 100644 --- a/crates/tako/src/internal/tests/test_reactor.rs +++ b/crates/tako/src/internal/tests/test_reactor.rs @@ -16,6 +16,7 @@ use crate::internal::tests::utils::sorted_vec; use crate::internal::tests::utils::task::{TaskBuilder, task_running_msg}; use crate::internal::tests::utils::workflows::{submit_example_1, submit_example_3}; use crate::internal::worker::configuration::OverviewConfiguration; +use crate::internal::worker::task::RunningTask; use crate::resources::{ResourceAmount, ResourceDescriptorItem, ResourceIdMap}; use crate::tests::utils::env::{TestComm, TestEnv}; use crate::tests::utils::worker::WorkerBuilder; @@ -861,6 +862,45 @@ fn test_prefill_worker_lost() { rt.sanity_check(); } +#[test] +fn test_prefill_started_on_same_worker() { + let mut rt = TestEnv::new(); + + rt.set_scheduler_config(SchedulerConfig { + proactive_filling_reserve: 0, + proactive_filling_max: 3, + ..Default::default() + }); + + let t1 = rt.new_task_default(); + let w1 = rt.new_worker(&WorkerBuilder::new(2)); + rt.schedule(); + assert!(rt.task(t1).is_assigned()); + let tasks = rt.new_tasks(2, &TaskBuilder::new()); + rt.schedule(); + let prefilled: TaskId = tasks + .iter() + .find(|t| rt.task(**t).is_prefilled()) + .copied() + .unwrap(); + let assigned: TaskId = tasks + .iter() + .find(|t| rt.task(**t).is_assigned()) + .copied() + .unwrap(); + let up1 = WorkerTaskUpdate::Finished { task_id: t1 }; + let mut comm = TestComm::new(); + on_task_update(rt.core(), &mut comm, w1, smallvec![up1]); + + rt.schedule(); + + assert!(rt.task(prefilled).is_retracting()); + + let up2 = WorkerTaskUpdate::Running(task_running_msg(prefilled)); + on_task_update(rt.core(), &mut comm, w1, smallvec![up2]); + rt.sanity_check(); +} + #[test] fn test_prefill_started() { let mut rt = TestEnv::new();