From 06a4deabe1ded1657685499b3f8cbcf2e3c1ce2f Mon Sep 17 00:00:00 2001 From: namithDev Date: Fri, 17 Apr 2026 23:20:21 -0700 Subject: [PATCH] made hepft ignore processors that went down mid-task. --- heft.py | 87 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/heft.py b/heft.py index 521005a..1057807 100644 --- a/heft.py +++ b/heft.py @@ -38,68 +38,85 @@ def calc_heft(dag: TaskDAG, network: NetworkGraph) -> dict: def calc_hepft(dag: TaskDAG, network: NetworkGraph, dynamic_net: DynamicNetwork) -> tuple[dict, dict, dict]: - # task prioritization - ranks = dag.compute_ranks(network) + # task prioritization + ranks = dag.compute_ranks(network, dynamic_network=dynamic_net) sorted_tasks = sorted(ranks.keys(), key=lambda t: ranks[t], reverse=True) - # processor selection - proc_available_time = {proc_id: 0 for proc_id in network.processors} - schedule = {} # task_id: processor, start_time, finish_time + # processor selection + schedule = {} + proc_available = {proc_id: 0.0 for proc_id in network.processors} for task_id in sorted_tasks: task = dag.nodes[task_id] - best_proc, best_est, best_eft = None, 0.0, float('inf') + best_proc, best_est, best_eft = None, None, float('inf') - # Always consider ALL processors for proc_id in network.processors: - base_est = proc_available_time.get(proc_id, 0.0) - est = base_est + # ── compute EST ─────────────────────────────────────────────── + ready_time = 0.0 for parent_id in task.parents: - parent_proc, _, parent_finish = schedule[parent_id] - - transfer_time = max(parent_finish, est) - future_network = dynamic_net.pred_net_func(transfer_time) - + parent_proc, _, parent_eft = schedule[parent_id] data_size = dag.edges[(parent_id, task_id)] - comm = future_network.comm_cost( - parent_proc, - proc_id, - data_size, + pred_net = dynamic_net.pred_net_func(parent_eft) + comm = pred_net.comm_cost( + parent_proc, proc_id, data_size, fallback_bandwidth=network.bandwidth ) + ready_time = max(ready_time, parent_eft + comm) - est = max(est, parent_finish + comm) - - # Check processor availability at actual start time - net_at_start = dynamic_net.pred_net_func(est) - if not net_at_start.has_processor(proc_id): - continue - + est = max(ready_time, proc_available[proc_id]) eft = est + task.comp_costs[proc_id] + # ── full window check [est, eft] ─────────────────────────────── + # with perfect knowledge we know every future snapshot, so scan + # every snapshot that falls inside this task's execution window. + # if the processor is absent from ANY of them, it will fail + # mid-execution — skip it for this task. + proc_goes_down = False + + # always check the boundaries first + for boundary in [est, eft]: + snapshot = dynamic_net.pred_net_func(boundary) + if not snapshot.has_processor(proc_id): + proc_goes_down = True + break + + # then check every snapshot whose timestamp falls inside (est, eft) + if not proc_goes_down: + for t, snapshot in dynamic_net.snapshots: + if t <= est: + continue # before execution window + if t >= eft: + break # past execution window — snapshots are sorted + if not snapshot.has_processor(proc_id): + proc_goes_down = True + break # one failure inside the window is enough + + if proc_goes_down: + continue # skip for this task only — reconsidered for next task + + # ── update best ─────────────────────────────────────────────── if eft < best_eft: - best_eft = eft - best_est = est - best_proc = proc_id + best_eft, best_est, best_proc = eft, est, proc_id - # Fallback (unchanged, but now rarely triggered) + # ── fallback: all processors fail during this window ────────────── if best_proc is None: for proc_id in network.processors: - est = proc_available_time.get(proc_id, 0.0) + ready_time = 0.0 for parent_id in task.parents: - parent_proc, _, parent_finish = schedule[parent_id] + parent_proc, _, parent_eft = schedule[parent_id] data_size = dag.edges[(parent_id, task_id)] - comm = network.comm_cost(parent_proc, proc_id, data_size) - est = max(est, parent_finish + comm) + comm = network.comm_cost(parent_proc, proc_id, data_size) + ready_time = max(ready_time, parent_eft + comm) + est = max(ready_time, proc_available[proc_id]) eft = est + task.comp_costs[proc_id] if eft < best_eft: best_eft, best_est, best_proc = eft, est, proc_id - schedule[task_id] = (best_proc, best_est, best_eft) - proc_available_time[best_proc] = best_eft + schedule[task_id] = (best_proc, best_est, best_eft) + proc_available[best_proc] = best_eft return schedule