Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Fixes

* Fixed a rare server crash when self retracting occurs.

## v0.26.0

### Breaking change
Expand Down
6 changes: 5 additions & 1 deletion crates/tako/src/internal/scheduler/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ pub(crate) fn create_task_mapping(
let task_id = tasks[task_idx];
worker_map.get_worker_mut(*w_id).insert_sn_task(task_id, rq);
let task = task_map.get_task_mut(task_id);
log::debug!(
"Task={task_id} ({:?}) assigned to worker={w_id}",
task.state
);
let new_state = match &task.state {
TaskRuntimeState::Waiting { .. } => {
log::debug!("Waiting task={} assigned to worker={}", task_id, w_id);
mapping
.workers
.entry(*w_id)
Expand Down Expand Up @@ -212,6 +215,7 @@ fn process_proactive_filling(core: &mut Core, mapping: &mut WorkerTaskMapping) {
for worker in workers {
let tasks = queue.take_tasks_for_prefill(prefill_size);
for task_id in &tasks {
log::debug!("Prefiling task={task_id} to worker={}", worker.id);
let task = task_map.get_task_mut(*task_id);
assert!(task.is_waiting());
task.state = TaskRuntimeState::Prefilled {
Expand Down
2 changes: 1 addition & 1 deletion crates/tako/src/internal/scheduler/taskqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl TaskQueue {

#[inline]
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
self.queue.is_empty() && self.prefill.as_ref().is_none_or(|(_, t)| t.is_empty())
}

#[inline]
Expand Down
18 changes: 14 additions & 4 deletions crates/tako/src/internal/server/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub(crate) fn on_task_update(
worker_id: WorkerId,
updates: TaskUpdates,
) {
log::debug!("Task update for {worker_id}");
let mut need_scheduling = false;
// This relies on the fact that when worker switching to prefill, it will send Finish, followed by Start
// And this cannot happen in any other way
Expand Down Expand Up @@ -266,6 +267,7 @@ fn task_running(
worker_id: WorkerId,
message: TaskRunningMsg,
) -> bool {
log::debug!("Set task={:?} to running state", message.task_id);
let TaskRunningMsg {
task_id,
rv_id,
Expand Down Expand Up @@ -310,17 +312,25 @@ fn task_running(
assert_eq!(*w_id, worker_id);
comm.ask_for_scheduling();
task.state = TaskRuntimeState::Running { worker_id, rv_id };
let rqv = request_map.get(task.resource_rq_id);
worker_map
.get_worker_mut(worker_id)
.insert_sn_task(task_id, rqv.get(rv_id));
// We have to call first try_remove_redirection and then insert_sn_task
// This cannot be done in reverse order because in rare cases
// we may be in a process of a dummy redirection (from a worker to the same worker).
// So the task is already assigned to worker_id, and calling insert_sn_task will
// fail because we are inserting already inserted task.
// By removing redirections first, we unassign the task so we can later assign it back
// In theory, we could optimize this special case by doing nothing, but it should be quite rare
// So I prefer to keep the code simple.
try_remove_redirection(
worker_map,
scheduler_state,
request_map,
task_id,
task.resource_rq_id,
);
let rqv = request_map.get(task.resource_rq_id);
worker_map
.get_worker_mut(worker_id)
.insert_sn_task(task_id, rqv.get(rv_id));
(simple_worker_list.as_slice(), false)
}
TaskRuntimeState::RunningMultiNode(ws) => {
Expand Down
40 changes: 40 additions & 0 deletions crates/tako/src/internal/tests/test_reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::internal::tests::utils::sorted_vec;
use crate::internal::tests::utils::task::{TaskBuilder, task_running_msg};
use crate::internal::tests::utils::workflows::{submit_example_1, submit_example_3};
use crate::internal::worker::configuration::OverviewConfiguration;
use crate::internal::worker::task::RunningTask;
use crate::resources::{ResourceAmount, ResourceDescriptorItem, ResourceIdMap};
use crate::tests::utils::env::{TestComm, TestEnv};
use crate::tests::utils::worker::WorkerBuilder;
Expand Down Expand Up @@ -861,6 +862,45 @@ fn test_prefill_worker_lost() {
rt.sanity_check();
}

#[test]
fn test_prefill_started_on_same_worker() {
let mut rt = TestEnv::new();

rt.set_scheduler_config(SchedulerConfig {
proactive_filling_reserve: 0,
proactive_filling_max: 3,
..Default::default()
});

let t1 = rt.new_task_default();
let w1 = rt.new_worker(&WorkerBuilder::new(2));
rt.schedule();
assert!(rt.task(t1).is_assigned());
let tasks = rt.new_tasks(2, &TaskBuilder::new());
rt.schedule();
let prefilled: TaskId = tasks
.iter()
.find(|t| rt.task(**t).is_prefilled())
.copied()
.unwrap();
let assigned: TaskId = tasks
.iter()
.find(|t| rt.task(**t).is_assigned())
.copied()
.unwrap();
let up1 = WorkerTaskUpdate::Finished { task_id: t1 };
let mut comm = TestComm::new();
on_task_update(rt.core(), &mut comm, w1, smallvec![up1]);

rt.schedule();

assert!(rt.task(prefilled).is_retracting());

let up2 = WorkerTaskUpdate::Running(task_running_msg(prefilled));
on_task_update(rt.core(), &mut comm, w1, smallvec![up2]);
rt.sanity_check();
}

#[test]
fn test_prefill_started() {
let mut rt = TestEnv::new();
Expand Down
Loading