diff --git a/core/config.py b/core/config.py index d641630..c088dd1 100644 --- a/core/config.py +++ b/core/config.py @@ -1,14 +1,14 @@ """ -------- Worker Machines Parameters -------- """ -GPU_MEMORY_SIZE = 14000000 # in KB, 15BG for Tesla T4 +GPU_MEMORY_SIZE = 24000000 # in KB, 24GB for NVIDIA A30 -TOTAL_NUM_OF_WORKERS = 140 +TOTAL_NUM_OF_WORKERS = 50 """ -------- Workload Parameters -------- """ -TOTAL_NUM_OF_JOBS = 1000 +TOTAL_NUM_OF_JOBS = 10000 # The interval between two consecutive job creation events at each external client -DEFAULT_CREATION_INTERVAL_PERCLIENT = 100 # ms. +DEFAULT_CREATION_INTERVAL_PERCLIENT = 0.2 # ms. WORKLOAD_DISTRIBUTION = "POISON" # UNIFORM | POISON | GAMMA @@ -20,4 +20,4 @@ PLACEMENT_INFORMATION_STALENESS = 1 # in ms -RESCHEDULE_THREASHOLD = 1.5 \ No newline at end of file +RESCHEDULE_THREASHOLD = 1.5 diff --git a/core/events.py b/core/events.py index abef882..c815a90 100644 --- a/core/events.py +++ b/core/events.py @@ -150,20 +150,21 @@ def run(self, current_time): def to_string(self): return "[Intermediate Results Arrival]: worker:" + str(self.worker.worker_id) + ", prev_task_id:" + str(self.prev_task.task_id) + ", cur_task_id:" + str(self.cur_task.task_id) +class BatchEndEvent(Event): + """ Event to signify that a BATCH has been performed by the WORKER. """ -class TaskEndEvent(Event): - """ Event to signify that a TASK has been performed by the WORKER. """ - - def __init__(self, worker, job_id=-1, task_id=-1): + def __init__(self, worker, model, job_ids=[], task_type=(-1, -1)): self.worker = worker - self.job_id = job_id # integer representing the job_id - self.task_id = task_id # integer representing the task_id + self.model = model + self.job_ids = job_ids # integers representing the job_ids + self.task_type = task_type # (workflow_id, task_id) def run(self, current_time): - return self.worker.free_slot(current_time) + return self.worker.free_slot(current_time, self.model, self.task_type) def to_string(self): - return "[Task End (Job {} - Task {}) at Worker {}] ===".format(self.job_id, self.task_id, self.worker.worker_id) + jobs = ",".join([str(id) for id in self.job_ids]) + return f"[Batch End (Task {self.task_type}, Jobs {jobs}) at Worker {self.worker.worker_id}]" # for PER_JOB scheduler @@ -199,7 +200,6 @@ def run(self, current_time): def to_string(self): return "[Job End] ===" - class EventOrders: """ Used so that the Simulation keeps track of the priority queue order diff --git a/core/job.py b/core/job.py index c5b6ec3..434c397 100644 --- a/core/job.py +++ b/core/job.py @@ -81,10 +81,15 @@ def job_generate_from_workflow(self): current_task = Task(self.id, # ID of the associated unique Job task_cfg["TASK_INDEX"], # taskID + (self.job_type_id, task_cfg["TASK_INDEX"]), # task type task_cfg["EXECUTION_TIME"], required_model_for_task, task_cfg["INPUT_SIZE"], - task_cfg["OUTPUT_SIZE"]) + task_cfg["OUTPUT_SIZE"], + task_cfg["MAX_BATCH_SIZE"], + task_cfg["MAX_WAIT_TIME"], + task_cfg["BATCH_SIZES"], + task_cfg["BATCH_EXEC_TIME"]) self.tasks.append(current_task) diff --git a/core/simulation.py b/core/simulation.py index 8aca6bd..d9b538d 100644 --- a/core/simulation.py +++ b/core/simulation.py @@ -3,7 +3,6 @@ is referenced from Sparrow: https://github.com/radlab/sparrow ''' -import imp import numpy as np from matplotlib import pyplot as plt from core.config import * @@ -44,6 +43,9 @@ def __init__( # Tracking measurements self.result_to_export = pd.DataFrame() self.tasks_logging_times = pd.DataFrame() + self.event_log = pd.DataFrame(columns=["time", "event"]) + self.batch_exec_log = pd.DataFrame(columns=["time", "worker_id", "workflow_id", "task_id", "batch_size", "model_exec_time", "batch_exec_time", "job_ids"]) + print("---- SIMULATION : " + self.simulation_name + "----") self.produce_breakdown = produce_breakdown @@ -104,8 +106,8 @@ def run_finish(self, last_time, by_job_type=False): def produce_time_breakdown_results(self, completed_jobs): dataframe = pd.DataFrame(columns=["job_id", "load_info_staleness", "placement_info_staleness", "req_inter_arrival_delay", - "workflow_type", "scheduler_type", "slowdown", "response_time"]) - dataframe_tasks_log = pd.DataFrame(columns=["workflow_type", "task_id", "time_to_buffer", "dependency_wait_time", + "workflow_type", "job_create_time", "scheduler_type", "slowdown", "response_time"]) + dataframe_tasks_log = pd.DataFrame(columns=["workflow_type", "task_id", "task_arrival_time", "task_start_exec_time", "time_to_buffer", "dependency_wait_time", "time_spent_in_queue", "model_fetching_time", "execution_time"]) for index, completed_job in enumerate(completed_jobs): @@ -120,7 +122,7 @@ def produce_time_breakdown_results(self, completed_jobs): if "JOB_CREATION_INTERVAL" in WORKFLOW_LIST[completed_job.job_type_id]: job_creation_interval = WORKFLOW_LIST[completed_job.job_type_id]["JOB_CREATION_INTERVAL"] dataframe.loc[index] = [index, LOAD_INFORMATION_STALENESS, PLACEMENT_INFORMATION_STALENESS, job_creation_interval, completed_job.job_type_id, - self.simulation_name, slowdown, response_time] + completed_job.create_time, self.simulation_name, slowdown, response_time] task_index = 0 for job in completed_jobs: @@ -141,9 +143,10 @@ def produce_time_breakdown_results(self, completed_jobs): assert model_fetching_time >= 0 assert execution_time >= 0 - dataframe_tasks_log.loc[task_index] = [job.job_type_id, task.task_id, time_to_buffer, - dependency_wait_time, time_spent_in_queue, model_fetching_time, execution_time] + dataframe_tasks_log.loc[task_index] = [job.job_type_id, task.task_id, task.log.task_arrival_at_worker_buffer_timestamp, + task.log.task_execution_start_timestamp,time_to_buffer, dependency_wait_time, + time_spent_in_queue, model_fetching_time, execution_time] task_index += 1 self.tasks_logging_times = dataframe_tasks_log - self.result_to_export = dataframe \ No newline at end of file + self.result_to_export = dataframe diff --git a/core/task.py b/core/task.py index f97fdce..61c478a 100644 --- a/core/task.py +++ b/core/task.py @@ -2,9 +2,12 @@ class Task(object): - def __init__(self, job_id, task_id, task_exec_duration, required_model, input_size, result_size): + def __init__(self, job_id, task_id, task_type, task_exec_duration, + required_model, input_size, result_size, max_batch_size, + max_wait_time, batch_sizes, batch_exec_time): self.job_id = job_id # id of the job the task belongs to self.task_id = task_id # id of the task itself + self.task_type = task_type # (workflow_id, task_id) # the time it takes to execute the task self.task_exec_duration = task_exec_duration # required model_id to execute the task. None if it is a computation task that doesn't involve ML model @@ -12,6 +15,10 @@ def __init__(self, job_id, task_id, task_exec_duration, required_model, input_si # task input size to model. self.input_size = input_size self.result_size = result_size # output size + self.max_batch_size = max_batch_size + self.max_wait_time = max_wait_time + self.batch_sizes = batch_sizes + self.batch_exec_time = batch_exec_time # list of Tasks (inputs) that this task requires ( list will be appended as the job generated) self.required_task_ids = [] # list of task ids self.next_task_ids = [] # list of task ids diff --git a/core/workflow.py b/core/workflow.py index 4d5fd9f..2f470a0 100644 --- a/core/workflow.py +++ b/core/workflow.py @@ -3,179 +3,156 @@ WORKFLOW_LIST = [ {"JOB_TYPE": 0, # ID of the type of workflow (dependency graph) - "JOB_NAME": "translation", + "JOB_NAME": "textvision", # the minimum amount of time necessary to execute the whole job - "BEST_EXEC_TIME": 1365, - "TASKS": [{"MODEL_NAME": "OPT", - "MODEL_ID": 0, + "BEST_EXEC_TIME": 51.7, + "TASKS": [{"MODEL_NAME": "", + "MODEL_ID": -1, "TASK_INDEX": 0, - "PREV_TASK_INDEX": [], - "NEXT_TASK_INDEX": [1,2,3], - "MODEL_SIZE": 5720000, # in kB - "INPUT_SIZE": 1, - "OUTPUT_SIZE": 2, # in kB - "EXECUTION_TIME": 561 # avg time, in ms + "PREV_TASK_INDEX": [], + "NEXT_TASK_INDEX": [1, 2], + "MODEL_SIZE": 0, # in KB + "INPUT_SIZE": 1, + "OUTPUT_SIZE": 1, + "EXECUTION_TIME": 1, # in ms + "MAX_BATCH_SIZE": 128, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1, 2, 4, 8, 16, 32, 64, 128], + "BATCH_EXEC_TIME": [1, 1, 1, 1, 1, 1, 1, 1] }, - {"MODEL_NAME": "marian", - "MODEL_ID": 1, + {"MODEL_NAME": "text_encoder", + "MODEL_ID": 0, "TASK_INDEX": 1, "PREV_TASK_INDEX": [0], - "NEXT_TASK_INDEX": [4], - "MODEL_SIZE": 800000, # in kB - "INPUT_SIZE": 2, - "OUTPUT_SIZE": 2, - "EXECUTION_TIME": 441 # in ms + "NEXT_TASK_INDEX": [3], + "MODEL_SIZE": 5677000, # in kB + "INPUT_SIZE": 1, + "OUTPUT_SIZE": 2, # in kB + "EXECUTION_TIME": 10, # avg time, in ms + "MAX_BATCH_SIZE": 128, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1, 4, 8, 16, 32, 64, 128], + "BATCH_EXEC_TIME": [10, 10, 11, 12, 15, 20, 31] }, - {"MODEL_NAME": "mt5", - "MODEL_ID": 2, + {"MODEL_NAME": "vision_encoder", + "MODEL_ID": 1, "TASK_INDEX": 2, "PREV_TASK_INDEX": [0], - "NEXT_TASK_INDEX": [4], - "MODEL_SIZE": 2000000, # in KB - "INPUT_SIZE": 2, - "OUTPUT_SIZE": 2, - "EXECUTION_TIME": 778 # in ms + "NEXT_TASK_INDEX": [3], + "MODEL_SIZE": 11655000, # in kB + "INPUT_SIZE": 10000, + "OUTPUT_SIZE": 100, + "EXECUTION_TIME": 31, # in ms + "MAX_BATCH_SIZE": 8, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1, 4, 8], + "BATCH_EXEC_TIME": [31, 98, 183] }, - {"MODEL_NAME": "mt5", + {"MODEL_NAME": "flmr", "MODEL_ID": 2, "TASK_INDEX": 3, - "PREV_TASK_INDEX": [0], + "PREV_TASK_INDEX": [1,2], "NEXT_TASK_INDEX": [4], - "MODEL_SIZE": 2000000, # in KB - "INPUT_SIZE": 2, - "OUTPUT_SIZE": 2, - "EXECUTION_TIME": 803 # in ms + "MODEL_SIZE": 854000, # in KB + "INPUT_SIZE": 102, + "OUTPUT_SIZE": 5, + "EXECUTION_TIME": 1.7, # in ms + "MAX_BATCH_SIZE": 32, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1, 2, 4, 8, 16, 32], + "BATCH_EXEC_TIME": [1.7, 1.9, 1.9, 2, 2.6, 3.1] }, - {"MODEL_NAME": "", - "MODEL_ID": -1, + {"MODEL_NAME": "search", + "MODEL_ID": 3, "TASK_INDEX": 4, - "PREV_TASK_INDEX": [1,2,3], + "PREV_TASK_INDEX": [3], "NEXT_TASK_INDEX": [], - "MODEL_SIZE": 0, # in KB - "INPUT_SIZE": 2, + "MODEL_SIZE": 777000, # in KB + "INPUT_SIZE": 5, "OUTPUT_SIZE": 2, - "EXECUTION_TIME": 1 # in ms - }, + "EXECUTION_TIME": 18, # in ms + "MAX_BATCH_SIZE": 16, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1, 4, 8, 16], + "BATCH_EXEC_TIME": [18, 64, 114, 209] + } ] }, {"JOB_TYPE": 1, - "JOB_NAME": "question_answer", + "JOB_NAME": "tts", # the minimum amount of time necessary to execute the whole job - "BEST_EXEC_TIME": 587, - "TASKS": [{"MODEL_NAME": "OPT", - "MODEL_ID": 0, - "TASK_INDEX": 0, - "PREV_TASK_INDEX": [], - "NEXT_TASK_INDEX": [1], - "MODEL_SIZE": 5720000, # in kB - "INPUT_SIZE": 1, - "OUTPUT_SIZE": 2, # in kB - "EXECUTION_TIME": 560 # avg time, in ms - }, - {"MODEL_NAME": "NLI", - "MODEL_ID": 3, - "TASK_INDEX": 1, - "PREV_TASK_INDEX": [0], - "NEXT_TASK_INDEX": [], - "MODEL_SIZE": 2140000, # in kB - "INPUT_SIZE": 1, - "OUTPUT_SIZE": 1, - "EXECUTION_TIME": 27 # in ms - } - ] - }, - - {"JOB_TYPE": 2, # ID of the type of workflow (dependency graph) - "JOB_NAME": "img_to_sound", - "BEST_EXEC_TIME": 359.2, - "TASKS": [{"MODEL_NAME": "vit", + "BEST_EXEC_TIME": 308.4, + "TASKS": [{"MODEL_NAME": "audio_det", "MODEL_ID": 4, "TASK_INDEX": 0, "PREV_TASK_INDEX": [], - "NEXT_TASK_INDEX": [1,2], - "MODEL_SIZE": 1700000, # in kB - "INPUT_SIZE": 3000, # 224 x 224 x 3 shape, assuming 64 bits representation - "OUTPUT_SIZE": 20, - "EXECUTION_TIME": 283 # avg time, in ms + "NEXT_TASK_INDEX": [1], + "MODEL_SIZE": 10525000, # in kB + "INPUT_SIZE": 10000, + "OUTPUT_SIZE": 2, # in kB + "EXECUTION_TIME": 66, # avg time, in ms + "MAX_BATCH_SIZE": 16, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1, 2, 4, 8, 16], + "BATCH_EXEC_TIME": [66, 68, 70, 76, 127] }, - {"MODEL_NAME": "NLI", - "MODEL_ID": 3, + {"MODEL_NAME": "text_encoder_2", + "MODEL_ID": 5, "TASK_INDEX": 1, "PREV_TASK_INDEX": [0], - "NEXT_TASK_INDEX": [3], - "MODEL_SIZE": 2140000, # in kB - "INPUT_SIZE": 20, # 299×299, assuming 64 bits representation - "OUTPUT_SIZE": 10, - "EXECUTION_TIME": 26 # in ms + "NEXT_TASK_INDEX": [2], + "MODEL_SIZE": 427000, # in kB + "INPUT_SIZE": 2, + "OUTPUT_SIZE": 4, + "EXECUTION_TIME": 17, # in ms + "MAX_BATCH_SIZE": 64, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1, 2, 4, 8, 16, 32, 64], + "BATCH_EXEC_TIME": [17, 18, 18, 19, 19, 20, 22] }, - {"MODEL_NAME": "txt2speech", - "MODEL_ID": 5, + {"MODEL_NAME": "faiss_search", + "MODEL_ID": 6, "TASK_INDEX": 2, - "PREV_TASK_INDEX": [0], - "NEXT_TASK_INDEX": [3], - "MODEL_SIZE": 2700000, # in kB - "INPUT_SIZE": 20, - "OUTPUT_SIZE": 3000, - "EXECUTION_TIME": 76 # in ms + "PREV_TASK_INDEX": [1], + "NEXT_TASK_INDEX": [3,4], + "MODEL_SIZE": 783000, # in kB + "INPUT_SIZE": 4, + "OUTPUT_SIZE": 2, + "EXECUTION_TIME": 0.4, # in ms + "MAX_BATCH_SIZE": 256, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1, 2, 4, 8, 16, 32, 64, 128, 256], + "BATCH_EXEC_TIME": [0.4, 0.4, 0.4, 0.5, 0.5, 0.6, 0.8, 1.1, 1.6] }, - {"MODEL_NAME": "aggregate", - "MODEL_ID": -1, + {"MODEL_NAME": "text_check", + "MODEL_ID": 7, "TASK_INDEX": 3, - "PREV_TASK_INDEX": [1,2], - "NEXT_TASK_INDEX": [], - "MODEL_SIZE": -1, # in kB - "INPUT_SIZE": 3000, - "OUTPUT_SIZE": 3000, - "EXECUTION_TIME": 0.2 # in ms - } - ] - }, - - {"JOB_TYPE": 3, # ID of the type of workflow (dependency graph) - "JOB_NAME": "ImageObjDetect", - "BEST_EXEC_TIME": 282.6, - "TASKS": [{"MODEL_NAME": "entry", - "MODEL_ID": -1, - "TASK_INDEX": 0, - "PREV_TASK_INDEX": [], - "NEXT_TASK_INDEX": [1,2], - "MODEL_SIZE": -1, # in kB - "INPUT_SIZE": 3000, - "OUTPUT_SIZE": 3000, - "EXECUTION_TIME": 0.6 # avg time, in ms + "PREV_TASK_INDEX": [2], + "NEXT_TASK_INDEX": [4], + "MODEL_SIZE": 7383000, # in kB + "INPUT_SIZE": 2, + "OUTPUT_SIZE": 2, + "EXECUTION_TIME": 17, # in ms + "MAX_BATCH_SIZE": 4, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1, 2, 4], + "BATCH_EXEC_TIME": [17, 25, 45] }, - {"MODEL_NAME": "DETR", + {"MODEL_NAME": "text_to_speech", "MODEL_ID": 8, - "TASK_INDEX": 1, - "PREV_TASK_INDEX": [0], - "NEXT_TASK_INDEX": [3], - "MODEL_SIZE": 1800000, # in kB - "INPUT_SIZE": 3000, # 299×299, assuming 64 bits representation - "OUTPUT_SIZE": 3000, # - "EXECUTION_TIME": 178 # in ms - }, - {"MODEL_NAME": "Depth", - "MODEL_ID": 9, - "TASK_INDEX": 2, - "PREV_TASK_INDEX": [0], - "NEXT_TASK_INDEX": [3], - "MODEL_SIZE": 3900000, # in kB - "INPUT_SIZE": 3000, - "OUTPUT_SIZE": 3000, - "EXECUTION_TIME": 147 # in ms - }, - {"MODEL_NAME": "Aggregate", - "MODEL_ID": -1, - "TASK_INDEX": 3, - "PREV_TASK_INDEX": [1,2], + "TASK_INDEX": 4, + "PREV_TASK_INDEX": [2,3], "NEXT_TASK_INDEX": [], - "MODEL_SIZE": -1, # in kB - "INPUT_SIZE": 3000, - "OUTPUT_SIZE": 3000, - "EXECUTION_TIME": 104 # in ms + "MODEL_SIZE": 783000, # in kB + "INPUT_SIZE": 4, + "OUTPUT_SIZE": 10000, + "EXECUTION_TIME": 208, # in ms + "MAX_BATCH_SIZE": 1, + "MAX_WAIT_TIME": 1, # ms + "BATCH_SIZES": [1], + "BATCH_EXEC_TIME": [208] } ] - }, - + } ] diff --git a/experiments/parse_results.py b/experiments/parse_results.py new file mode 100644 index 0000000..e2923a3 --- /dev/null +++ b/experiments/parse_results.py @@ -0,0 +1,185 @@ +import sys +import os +import pandas as pd +import matplotlib.pyplot as plt + +from core.workflow import * +from functools import reduce + + +# TODO: verify units +def plot_response_time_vs_arrival_time(job_df, out_path): + plt.figure(figsize=(10, 6)) + + job_types = set(job_df["workflow_type"]) + job_names = { job_type: list(filter(lambda job: job["JOB_TYPE"]==job_type, WORKFLOW_LIST))[0]["JOB_NAME"] + for job_type in job_types } + + fst_job_create_time = job_df["job_create_time"][0] + for jt in job_types: + job_create_times = job_df[job_df["workflow_type"] == jt]["job_create_time"] - fst_job_create_time + job_response_times = job_df[job_df["workflow_type"] == jt]["response_time"] + + plt.scatter( + job_create_times, + job_response_times, + label=f"Workflow {jt}: {job_names[jt]}", + s=4 + ) + + plt.xlabel("Job arrival time (ms since start)") + plt.ylabel("Response time (ms)") + plt.title("Response Time vs. Arrival Time by Job Type") + + plt.legend() + plt.savefig(os.path.join(out_path, "response_vs_arrival.png")) + + +def plot_batch_size_vs_batch_start(event_df, out_path): + batch_start_events = event_df[event_df["event"].str.contains("Batch Start")] + + task_types = set(batch_start_events["event"].str.extract(r"Task \(([0-9]+, [0-9]+)\)")[0]) + model_names = { + task_type: list(filter( + lambda task: task["TASK_INDEX"]==int(task_type.split(", ")[1]), + list(filter(lambda job: job["JOB_TYPE"]==int(task_type.split(",")[0]), WORKFLOW_LIST))[0]["TASKS"] + ))[0]["MODEL_NAME"] for task_type in task_types } + + for task_type in task_types: + type_details = [int(item) for item in task_type.split(", ")] # [workflow_id, task_id] + + fig = plt.figure(figsize=(10, 6)) + + batch_start_events_for_type = batch_start_events[batch_start_events["event"].str.contains(f"Task \({task_type}\)")] + batch_sizes = batch_start_events_for_type["event"].str.extract(r"Jobs ([0-9|,]+)")[0].str.count(f'[0-9]+') + + plt.scatter( + batch_start_events_for_type["time"], + batch_sizes, + label=f"Workflow {type_details[0]}, Task ID {type_details[1]}: Model {model_names[task_type]}", + s=4 + ) + + plt.xlabel("Batch exec start time (ms since start)") + plt.ylabel("Batch size") + plt.title("Batch Size vs. Time by Model") + + plt.legend() + plt.savefig(os.path.join(out_path, f"wf_{type_details[0]}_task_{type_details[1]}_batch_size_vs_time.png")) + + +def plot_batch_size_bar_chart(event_df, out_path): + batch_start_events = event_df[event_df["event"].str.contains("Batch Start")] + + task_types = set(batch_start_events["event"].str.extract(r"Task \(([0-9]+, [0-9]+)\)")[0]) + task_details = { + task_type: list(filter( + lambda task: task["TASK_INDEX"]==int(task_type.split(", ")[1]), + list(filter(lambda job: job["JOB_TYPE"]==int(task_type.split(",")[0]), WORKFLOW_LIST))[0]["TASKS"] + ))[0] for task_type in task_types } + + for task_type in task_types: + type_details = [int(item) for item in task_type.split(", ")] # [workflow_id, task_id] + + fig = plt.figure(figsize=(8, 6)) + + batch_start_events_for_type = batch_start_events[batch_start_events["event"].str.contains(f"Task \({task_type}\)")] + batch_size_events = batch_start_events_for_type["event"].str.extract(r"Jobs ([0-9|,]+)")[0].str.count(f'[0-9]+') + batch_size_counts = list(map(lambda size: (batch_size_events == size).sum(), + task_details[task_type]["BATCH_SIZES"])) + + plt.bar( + range(len(task_details[task_type]["BATCH_SIZES"])), + batch_size_counts + ) + + plt.xticks(range(len(task_details[task_type]["BATCH_SIZES"])), task_details[task_type]["BATCH_SIZES"]) + plt.xlabel("Batch sizes") + plt.ylabel("Number of batches") + plt.title(f"Batch size distribution for {task_details[task_type]["MODEL_NAME"]} Model") + + plt.savefig(os.path.join(out_path, f"wf_{type_details[0]}_task_{type_details[1]}_batch_size_dist.png")) + + +def gen_per_task_stats(task_df, out_path): + job_types = set(task_df["workflow_type"]) + task_types_per_job = list(map( + lambda jt: set(task_df[task_df["workflow_type"] == jt]["task_id"]), + job_types + )) + + task_stat_types = ["arrival_at_worker_to_exec_start_time", "arrival_at_worker_to_enqueue_time", + "enqueue_to_exec_start_time", "model_fetching_time"] + task_stats = reduce( + lambda acc, t: acc + [f"mean_{t}", f"median_{t}", f"p99_{t}"], + task_stat_types, + [] + ) + task_stat_df = pd.DataFrame(columns=["job_type", "task_type"] + task_stats) + + for i, jt in enumerate(job_types): + for task_type in task_types_per_job[i]: + task_df_row_i = len(task_stat_df) + task_stat_df.loc[task_df_row_i] = {"job_type": jt, "task_type": task_type} + + task_set = task_df[(task_df["workflow_type"] == jt) + & (task_df["task_id"] == task_type)] + + task_stat_data = { + "arrival_at_worker_to_exec_start_time": task_set["task_start_exec_time"] - task_set["task_arrival_time"], + "arrival_at_worker_to_enqueue_time": task_set["dependency_wait_time"], + "enqueue_to_exec_start_time": task_set["time_spent_in_queue"], + "model_fetching_time": task_set["model_fetching_time"] + } + for stat in task_stat_types: + task_stat_df.loc[task_df_row_i, f"mean_{stat}"] = task_stat_data[stat].mean() + task_stat_df.loc[task_df_row_i, f"median_{stat}"] = task_stat_data[stat].median() + task_stat_df.loc[task_df_row_i, f"p99_{stat}"] = task_stat_data[stat].quantile(0.99) + + task_stat_df.to_csv(os.path.join(out_path, "per_task_avgs.csv")) + + +def gen_stats(job_df, event_df): + print(f"Mean response time: {job_df["response_time"].mean()}, Max: {job_df["response_time"].max()}") + # print(f"TPUT: {len(job_df) / event_df.loc[len(events_df)-1]["time"]}") + + +def plot_model_loading_histogram(model_df, out_path): + fig = plt.figure(figsize=(8, 6)) + + plt.hist(model_df[model_df["placed_or_evicted"] == "placed"]["start_time"], bins=15, edgecolor='black') + + plt.xlabel("Time") + plt.ylabel("Number of models loaded") + plt.title(f"Model Loading Over Time") + + plt.savefig(os.path.join(out_path, f"model_loading_hist.png")) + + +def plot_model_eviction_histogram(model_df, out_path): + fig = plt.figure(figsize=(8, 6)) + + plt.hist(model_df[model_df["placed_or_evicted"] == "evicted"]["start_time"], bins=15, edgecolor='black') + + plt.xlabel("Time") + plt.ylabel("Number of models evicted") + plt.title(f"Model Eviction Over Time") + + plt.savefig(os.path.join(out_path, f"model_eviction_hist.png")) + + +results_dir_path = sys.argv[1] # results/ +out_path = sys.argv[2] if len(sys.argv) > 2 else "parsed_results" + +os.makedirs(out_path, exist_ok=True) + +job_df = pd.read_csv(os.path.join(results_dir_path, "job_breakdown.csv")) +# task_df = pd.read_csv(os.path.join(results_dir_path, "loadDelay_1_placementDelay_1.csv")) +events_df = pd.read_csv(os.path.join(results_dir_path, 'events_by_time.csv')) +model_df = pd.read_csv(os.path.join(results_dir_path, "model_history_log.csv")) + +plot_model_loading_histogram(model_df, out_path) +plot_model_eviction_histogram(model_df, out_path) +plot_batch_size_bar_chart(events_df, out_path) +plot_batch_size_vs_batch_start(events_df, out_path) +plot_response_time_vs_arrival_time(job_df, out_path) diff --git a/experiments/run_experiments.py b/experiments/run_experiments.py index 7630b43..26c1c5d 100644 --- a/experiments/run_experiments.py +++ b/experiments/run_experiments.py @@ -12,7 +12,7 @@ # experiment_schedulers options: centralheft | decentralheft | hashtask experiment_schedulers = [] -plotting_job_type_list = [0, 1, 2, 3] +plotting_job_type_list = [0] # plotting_job_type_list = [2,3] np.random.seed(42) @@ -48,19 +48,31 @@ # 2. Run and collect data if "centralheft" in experiment_schedulers: sim = Simulation_central(simulation_name="centralheft", job_split="PER_TASK", - num_workers=TOTAL_NUM_OF_WORKERS, job_types_list=plotting_job_type_list) + num_workers=TOTAL_NUM_OF_WORKERS, job_types_list=plotting_job_type_list, + produce_breakdown=True) sim.run() + event_log = sim.event_log + event_log.to_csv(OUTPUT_FILE_NAMES["centralheft"] + "events_by_time.csv") + # result_to_export = sim.result_to_export + result_to_export = sim.result_to_export + result_to_export.to_csv(OUTPUT_FILE_NAMES["centralheft"] + "job_breakdown.csv") + tasks_logging_times = sim.tasks_logging_times tasks_logging_times.to_csv(OUTPUT_FILE_NAMES["centralheft"] + "loadDelay_" + str( LOAD_INFORMATION_STALENESS) + "_placementDelay_" + str(PLACEMENT_INFORMATION_STALENESS) + ".csv") + if "hashtask" in experiment_schedulers: OUTPUT_FILENAME = "hashtask" sim = Simulation_central(simulation_name="hashtask", job_split="PER_TASK", - num_workers=TOTAL_NUM_OF_WORKERS, job_types_list=plotting_job_type_list) + num_workers=TOTAL_NUM_OF_WORKERS, job_types_list=plotting_job_type_list, + produce_breakdown=True) sim.run() + + event_log = sim.event_log + event_log.to_csv(OUTPUT_FILE_NAMES["hashtask"] + "events_by_time.csv") tasks_logging_times = sim.tasks_logging_times tasks_logging_times.to_csv(OUTPUT_FILE_NAMES["hashtask"] + "loadDelay_" + str( @@ -80,6 +92,21 @@ # dataframe = sim.result_to_export # dataframe.to_csv(OUTPUT_FILE_NAMES["decentralheft"] + "loadDelay_" + str( # LOAD_INFORMATION_STALENESS) + "_placementDelay_" + str(PLACEMENT_INFORMATION_STALENESS) + ".csv") + + event_log = sim.event_log + event_log.to_csv(OUTPUT_FILE_NAMES["decentralheft"] + "events_by_time.csv") + + result_to_export = sim.result_to_export + result_to_export.to_csv(OUTPUT_FILE_NAMES["decentralheft"] + "job_breakdown.csv") + tasks_logging_times = sim.tasks_logging_times tasks_logging_times.to_csv(OUTPUT_FILE_NAMES["decentralheft"] + "loadDelay_" + str( LOAD_INFORMATION_STALENESS) + "_placementDelay_" + str(PLACEMENT_INFORMATION_STALENESS) + ".csv") + + sim.batch_exec_log.to_csv(OUTPUT_FILE_NAMES["decentralheft"] + "batch_log.csv") + + worker_model_histories = pd.concat(list(map(lambda w: w.model_history_log, sim.workers)), + keys=list(map(lambda w: w.worker_id, sim.workers)), + names=['worker_id']).reset_index(level='worker_id') + worker_model_histories = worker_model_histories.sort_values(by="start_time") + worker_model_histories.to_csv(OUTPUT_FILE_NAMES["decentralheft"] + "model_history_log.csv") diff --git a/schedulers/algo/nav_heft_algo.py b/schedulers/algo/nav_heft_algo.py index eb54589..51dac17 100644 --- a/schedulers/algo/nav_heft_algo.py +++ b/schedulers/algo/nav_heft_algo.py @@ -83,21 +83,24 @@ def nav_heft_job_plan(job, worker_list, current_time, initial_worker_id=None, co workers[worker.worker_id] = worker sorted_tasks = ranking_tasks(job) workers_to_select = [w.worker_id for w in worker_list] - workers_EAT = {} # worker_id -> earliest_available_time + workers_EAT = {} # worker_id -> (task_type -> earliest_available_time) workers_available_memory = {} # worker_id -> available_memory # 1. initialize the earliest available time and memory for each worker for worker_id in workers_to_select: - cur_worker_waittime = 0 - if consider_load: - cur_worker_waittime = workers[worker_id].get_task_queue_waittime(current_time, \ - info_staleness=LOAD_INFORMATION_STALENESS, \ - requiring_worker_id=initial_worker_id) - workers_EAT[worker_id] = current_time + cur_worker_waittime + workers_EAT[worker_id] = { + task_id: current_time + (workers[worker_id].get_task_queue_waittime( + current_time, + (job.job_type_id, task_id), + info_staleness=LOAD_INFORMATION_STALENESS, + requiring_worker_id=initial_worker_id) if consider_load else 0) + for task_id in sorted_tasks + } + available_memory = GPU_MEMORY_SIZE if consider_cache: - available_memory = workers[worker_id].used_GPUmemory(current_time, \ - info_staleness=PLACEMENT_INFORMATION_STALENESS, \ - requiring_worker_id=initial_worker_id) + available_memory = workers[worker_id].GPU_state.available_memory(current_time) + # info_staleness=PLACEMENT_INFORMATION_STALENESS, \ + # requiring_worker_id=initial_worker_id) workers_available_memory[worker_id] = available_memory # Select the best worker for each task based on their ranking from high to low @@ -109,7 +112,7 @@ def nav_heft_job_plan(job, worker_list, current_time, initial_worker_id=None, co fetching_model_size = 0 for cur_worker_id in workers_to_select: # 2.0 consider the current worker queue wait time to determine its earliest start time - cur_earliest_start_time = workers_EAT[cur_worker_id] + cur_earliest_start_time = workers_EAT[cur_worker_id][task_id] # 2.1 calculate the inputs arrival time inputs_arrival_time = 0 if cur_task.task_id == 0 and initial_worker_id is not None and cur_worker_id != initial_worker_id: @@ -125,13 +128,12 @@ def nav_heft_job_plan(job, worker_list, current_time, initial_worker_id=None, co model_fetch_time = 0 cur_fetching_model_size = 0 if consider_cache: - models_in_cur_worker = workers[cur_worker_id].get_model_history(current_time, \ - info_staleness=PLACEMENT_INFORMATION_STALENESS, \ - requiring_workerid= initial_worker_id) - if cur_task.model is not None and cur_task.model not in models_in_cur_worker: + # TODO: info staleness + if cur_task.model is not None and \ + not workers[cur_worker_id].GPU_state.does_have_idle_copy(cur_task.model, current_time): model_fetch_time = SameMachineCPUtoGPU_delay(cur_task.model.model_size) cur_fetching_model_size = cur_task.model.model_size - if workers_available_memory[cur_worker_id] + cur_task.model.model_size > GPU_MEMORY_SIZE: + if not workers[cur_worker_id].GPU_state.can_fetch_model(cur_task.model, current_time): # double model fetch time due to the overhead from model_eviction model_fetch_time += model_fetch_time cur_earliest_start_time += model_fetch_time @@ -142,7 +144,7 @@ def nav_heft_job_plan(job, worker_list, current_time, initial_worker_id=None, co fetching_model_size = cur_fetching_model_size # 3. pick the worker with ealiest start time cur_task_finish_time = earliest_start_time + job.tasks[task_id].task_exec_duration - workers_EAT[selected_worker_id] = cur_task_finish_time + workers_EAT[selected_worker_id][task_id] = cur_task_finish_time allocated_tasks_info[task_id] = (selected_worker_id, cur_task_finish_time) if workers_available_memory[selected_worker_id] >= fetching_model_size: workers_available_memory[selected_worker_id] -= fetching_model_size @@ -155,6 +157,7 @@ def nav_heft_job_plan(job, worker_list, current_time, initial_worker_id=None, co def nav_heft_task_adjustment(job, task_id, workers, current_time, local_worker_id, allocated_worker_id) -> int: # 1. check assigned worker wait_time to decide if need to adjust assigned worker cur_wait_time = workers[allocated_worker_id].get_task_queue_waittime(current_time, \ + (job.job_type_id, task_id), \ info_staleness=LOAD_INFORMATION_STALENESS, \ requiring_worker_id=local_worker_id) cur_task = job.tasks[task_id] @@ -167,6 +170,7 @@ def nav_heft_task_adjustment(job, task_id, workers, current_time, local_worker_i earliest_start_time = float('inf') for cur_worker in workers: wait_time = cur_worker.get_task_queue_waittime(current_time, \ + (job.job_type_id, task_id), \ info_staleness=LOAD_INFORMATION_STALENESS, \ requiring_worker_id=local_worker_id) cur_earliest_start_time = current_time + wait_time diff --git a/schedulers/centralized/simulation_central.py b/schedulers/centralized/simulation_central.py index 13def63..5f75adc 100644 --- a/schedulers/centralized/simulation_central.py +++ b/schedulers/centralized/simulation_central.py @@ -49,9 +49,14 @@ def run(self): self.event_queue.put(EventOrders( external_client_id * job_create_interval, \ JobCreationAtExternalClient(self, external_client_id))) + last_time = 0 while self.remaining_jobs > 0: cur_event = self.event_queue.get() + + if type(cur_event.event) != WorkerWakeUpEvent or cur_event.event.will_run(cur_event.current_time): + self.event_log.loc[len(self.event_log)] = [cur_event.current_time, cur_event.to_string()] + assert cur_event.current_time >= last_time last_time = cur_event.current_time new_events = cur_event.event.run(cur_event.current_time) diff --git a/schedulers/decentralized/simulation_decentral.py b/schedulers/decentralized/simulation_decentral.py index b716d7e..104c537 100644 --- a/schedulers/decentralized/simulation_decentral.py +++ b/schedulers/decentralized/simulation_decentral.py @@ -1,3 +1,5 @@ +import pandas as pd + from queue import PriorityQueue from core.simulation import * @@ -48,12 +50,16 @@ def run(self): last_time = 0 while self.remaining_jobs > 0: cur_event = self.event_queue.get() + + print(cur_event.to_string()) + print(f"Jobs left: {self.remaining_jobs}") + + self.event_log.loc[len(self.event_log)] = [cur_event.current_time, cur_event.to_string()] + assert cur_event.current_time >= last_time last_time = cur_event.current_time new_events = cur_event.event.run(cur_event.current_time) for new_event in new_events: last_time = cur_event.current_time self.event_queue.put(new_event) - self.run_finish(last_time, by_job_type=True) - - + self.run_finish(last_time, by_job_type=True) \ No newline at end of file diff --git a/set_env.sh b/set_env.sh new file mode 100755 index 0000000..2437e4e --- /dev/null +++ b/set_env.sh @@ -0,0 +1,2 @@ +export SIMULATION_DIR=$(pwd) +export PYTHONPATH="${PYTHONPATH}:${SIMULATION_DIR}" diff --git a/workers/model_state.py b/workers/model_state.py new file mode 100644 index 0000000..cfdd764 --- /dev/null +++ b/workers/model_state.py @@ -0,0 +1,233 @@ +import copy + +from core.config import * +from core.model import Model + + +class ModelState: + PLACED = 0 + PRE_FETCH = 1 # reserved for a model that will be fetched + IN_FETCH = 2 + IN_EVICT = 3 + + def __init__(self, model: Model, state: int, is_reserved_for_batch=True, size=0): + self.model = model + self.size = size if size > 0 else model.model_size + self.state = state + self.is_reserved_for_batch = is_reserved_for_batch + + def __eq__(self, value): + return type(value) == ModelState and self.model == value.model and self.state == value.state + + def __str__(self): + return f"<[{self._state_to_str()}] [{'NOT ' if not self.is_reserved_for_batch else ''}IN USE] Model ID: {self.model.model_id if self.model else -1}>" + + def __repr__(self): + return self.__str__() + + def _state_to_str(self): + if self.state == self.PLACED: return "Placed" + elif self.state == self.IN_FETCH: return "Fetching" + elif self.state == self.IN_EVICT: return "Evicting" + elif self.state == self.PRE_FETCH: return "Reserved" + + +class GPUState(object): + def __init__(self): + # sorted (asc) list of GPU states [(time, [model states])] + self._model_states = [] + + def reserved_memory(self, time: float) -> float: + """ + Returns the total GPU memory that is currently in use, either + for currently placed models, models that are being fetched, + models that are being evicted, or models that will be fetched. + """ + return sum(state.size for state in self.state_at(time)) + + def available_memory(self, time: float) -> float: + """ + Returns total GPU memory that is not reserved (see reserved_memory). + """ + return GPU_MEMORY_SIZE - self.reserved_memory(time) + + def can_fetch_model(self, model: Model, time: float) -> bool: + """ + Returns True if a new copy of [model] can be fetched to the + GPU as is with no evictions. + """ + return self.available_memory(time) >= model.model_size + + def can_fetch_model_on_eviction(self, model: Model, time: float) -> bool: + """ + Return True if a new copy of [model] can be fetched to the GPU + upon evicting some number of placed models not in use. + """ + # cannot use space occupied by models currently being fetched/evicted or used + return (self.available_memory(time) + \ + sum(state.size for state in self.state_at(time) + if state.state == ModelState.PLACED and not state.is_reserved_for_batch)) >= model.model_size + + def _insert_state_marker(self, marker_time: float, at_marker_modify, post_marker_modify): + """ + Internal helper to update states at exactly [marker_time] with + [at_marker_modify: (time, old_states) -> new_states] and states + after [marker_time] with + [post_marker_modify: (time, old_states) -> new_states]. + """ + did_add_marker = False + for i in range(len(self._model_states)-1, -1, -1): + (timestamp, states) = self._model_states[i] + if timestamp == marker_time: + at_marker_modify(timestamp, states) + did_add_marker = True + elif timestamp < marker_time: + if not did_add_marker: + state_copy = copy.deepcopy(states) + at_marker_modify(timestamp, state_copy) + self._model_states.insert(i+1, (marker_time, state_copy)) + did_add_marker = True + return + else: + post_marker_modify(timestamp, states) + + if not did_add_marker: + states = [] + at_marker_modify(marker_time, states) + self._model_states.insert(0, (marker_time, states)) + + def fetch_model(self, model: Model, start_time: float, fetch_time: float): + """ + Fetches a new copy of [model] to the GPU if there is enough available + memory without additional evictions. + """ + assert(model != None) + assert(self.can_fetch_model(model, start_time)) + + fetch_end_time = start_time + fetch_time + + if len(self._model_states) == 0: + # mark when fetch begins and ends + self._model_states.append((start_time, [ModelState(model, ModelState.IN_FETCH)])) + self._model_states.append((fetch_end_time, [ModelState(model, ModelState.PLACED)])) + return + + # add fetch end marker + self._insert_state_marker(fetch_end_time, + lambda _, states: states.append(ModelState(model, ModelState.PLACED)), + lambda _, states: states.append(ModelState(model, ModelState.PLACED))) + + # add fetch start marker + self._insert_state_marker(start_time, + lambda _, states: states.append(ModelState(model, ModelState.IN_FETCH)), + lambda t, states: states.append(ModelState(model, ModelState.IN_FETCH)) if t < fetch_end_time else None) + + + def evict_model(self, model: Model, start_time: float, evict_time: float, reserve_until=-1): + """ + Evicts [model] starting at [start_time] in [evict_time] time. + Reserves evicted space until [reserve_until]. This prevents other models + from being loaded in space that may be intended to fetch a specific model. + Does not reserve if [reserve_until] < 0. + """ + assert(model in self.placed_models(start_time)) + + eviction_end_time = start_time + evict_time + + # remove model from all later timestamps + def _remove_model(timestamp, states): + for state in states: + if state.state == ModelState.PLACED and state.model == model: + states.remove(state) + return + assert(False) + + self._insert_state_marker(eviction_end_time, _remove_model, _remove_model) + + # def _begin_model_eviction(timestamp, states): + # for state in states: + # if state.state == ModelState.PLACED and state.model == model and not state.is_reserved_for_batch: + # state.state = ModelState.IN_EVICT + # return + # assert(False) # should not happen: no model exists to evict + + # add eviction start marker + # self._insert_state_marker(start_time, _begin_model_eviction, + # lambda t, states: _begin_model_eviction(t, states) if t < eviction_end_time else None) + + # if reserve_until >= 0: + # self.reserve_model_space(model, model.model_size, eviction_end_time, reserve_until) + + def reserve_model_space(self, model: Model, size: float, start_time: float, end_time: float): + """ + Reserves [size] extra space for [model] from [start_time] to [end_time]. + Used during evictions when additional space must be reserved in addition + to space from evicted or currently evicting models for when [model] is + fetched. Prevents other models from being fetched in space made for + [model]. + """ + assert(size > 0) + + # mark reservation start + self._insert_state_marker(start_time, + lambda _, states: states.append(ModelState(model, ModelState.PRE_FETCH, size=size)), + lambda t, states: states.append(ModelState(model, ModelState.PRE_FETCH, size=size)) if t < end_time else None) + + def _remove_reservation(timestamp, states): + for state in states: + if state.model == model and state.state == ModelState.PRE_FETCH and state.size == size: + states.remove(state) + return + + # mark reservation end + self._insert_state_marker(end_time, _remove_reservation, lambda _, states: None) + + def state_at(self, time: float) -> list[ModelState]: + for (timestamp, states) in self._model_states[::-1]: + if timestamp <= time: + return states + return [] + + def placed_models(self, time: float) -> list[Model]: + return [state.model for state in self.state_at(time) if state.state == ModelState.PLACED] + + def placed_model_states(self, time: float) -> list[ModelState]: + states = self.state_at(time) + if len(states) == 0: + return [] + return [state for state in states if state.state == ModelState.PLACED] + + def does_have_idle_copy(self, model: Model, time: float) -> bool: + return any(state.model == model and not state.is_reserved_for_batch for state in self.placed_model_states(time)) + + def reserve_idle_copy(self, model: Model, time: float): + """ + If there is an idle copy of [model], reserve it to execute a batch + starting from [time]. When execution finishes, a call to + [release_busy_copy] is required. + """ + assert(self.does_have_idle_copy(model, time)) + + def _occupy_one_copy(timestamp, states): + for j, state in enumerate(states): + if state.model == model and \ + state.state == ModelState.PLACED and \ + not state.is_reserved_for_batch: + states[j].is_reserved_for_batch = True + return + assert(False) # should not reach! (no idle copies) + + # reserve 1 idle copy from start to exec end + self._insert_state_marker(time, _occupy_one_copy, _occupy_one_copy) + + def release_busy_copy(self, model: Model, time: float): + """ + Releases a previously occupied/reserved copy of [model] at [time]. + """ + def _release_one_copy(timestamp, states): + for i, state in enumerate(states): + if state.model == model and state.state == ModelState.PLACED and state.is_reserved_for_batch: + states[i].is_reserved_for_batch = False + return + + self._insert_state_marker(time, _release_one_copy, _release_one_copy) diff --git a/workers/taskworker.py b/workers/taskworker.py index 73df922..9cc350e 100644 --- a/workers/taskworker.py +++ b/workers/taskworker.py @@ -12,8 +12,9 @@ def __init__(self, simulation, num_free_slots, worker_id): # {task_obj1:[(preq_task_id0,arrival_time0), (preq_taks_id0, arrival_time1), ...], task2:[( ...],} self.waiting_tasks_buffer = defaultdict(lambda: []) # keep track of the queue information at time: [ (time1,[task0,task1,]), (time2,[task1,...]),...] - self.queue_history = [] + self.queue_history = {} self.involved = False + self.max_wait_times = {} def add_task(self, current_time, task): """ @@ -21,13 +22,40 @@ def add_task(self, current_time, task): """ # Update when the task is sent to the worker assert (task.log.task_placed_on_worker_queue_timestamp <= current_time) - self.add_task_to_queue_history(task, current_time) - return self.maybe_start_task(current_time) + self.add_task_to_queue_history(task, current_time) # Update when the task is sent to the worker - def free_slot(self, current_time): - """ Frees a slot on the worker and attempts to launch another task in that slot. """ - self.num_free_slots += 1 - get_task_events = self.maybe_start_task(current_time) + # Initialize max wait time + if task.task_type not in self.max_wait_times or self.max_wait_times[task.task_type] < 0: + self.max_wait_times[task.task_type] = current_time + task.max_wait_time + + return self.maybe_start_batch(current_time, task.task_type) + + def get_next_models(self, lookahead_count: int, current_time: float, info_staleness=0): + if lookahead_count <= 0: + return [] + + next_models = [] + task_types_by_arrival, task_queues = self.get_sorted_task_types(current_time) + for task_type in task_types_by_arrival: + next_model = task_queues[task_type][0].model + if next_model != None and next_model not in next_models: + next_models.append(next_model) + if len(next_models) == lookahead_count: + return next_models + + return next_models + + def free_slot(self, current_time, model, task_type): + """ Attempts to launch another task. """ + if model != None: + self.GPU_state.release_busy_copy(model, current_time) + + get_task_events = [] + task_types, task_queues = self.get_sorted_task_types(current_time) + for task_type in task_types: + batch_end_events = self._maybe_start_batch(task_queues[task_type], current_time) + get_task_events += batch_end_events + return get_task_events # --------------------------- DECENTRALIZED WORKER SCHEDULING ---------------------- @@ -63,39 +91,166 @@ def schedule_job_heft(self, current_time, job): # --------------------------- TASK EXECUTION ---------------------- - def maybe_start_task(self, current_time): - task_end_events = [] - task_list = self.get_queue_history(current_time, info_staleness=0) - # print(task_list) - queued_tasks = queue.Queue() - [queued_tasks.put(task) for task in task_list] - while (not queued_tasks.empty()) and self.num_free_slots > 0: - task = queued_tasks.get() - if (current_time >= task.log.task_placed_on_worker_queue_timestamp): - # if self.worker_id == 2: - # print("time{}, exec_task {}. job_start_time: {}, job_type: {} ".format(current_time, task, self.simulation.jobs[task.job_id].create_time, self.simulation.jobs[task.job_id].job_type_id)) - task_end_events, task_end_time = self.task_execute( - task, current_time) - self.rm_task_in_queue_history(task, current_time) - break - return task_end_events + _CAN_RUN_NOW = 0 + _CAN_RUN_ON_EVICT = 1 + _CANNOT_RUN = 2 + + def can_run_task(self, current_time: float, model: Model, info_staleness=0) -> int: + """ + Returns _CAN_RUN_NOW if model None, or model is on GPU and not currently in use. + Returns _CAN_RUN_ON_EVICT if model can be loaded onto the GPU upon evicting + unused models. + Returns _CANNOT_RUN otherwise. + """ + if model == None or self.GPU_state.does_have_idle_copy(model, current_time): + return self._CAN_RUN_NOW + + # cannot load additional copies of the same model + if any(map(lambda s: s.model == model, self.GPU_state.state_at(current_time))): + return self._CANNOT_RUN + + if self.GPU_state.can_fetch_model(model, current_time): + return self._CAN_RUN_NOW + + if self.GPU_state.can_fetch_model_on_eviction(model, current_time): + return self._CAN_RUN_ON_EVICT + + return self._CANNOT_RUN + + def get_sorted_task_types(self, current_time, info_staleness=0) -> tuple[list[tuple[int, int]], dict[tuple[int, int], list[Task]]]: + """ + Returns a list of all task_types with at least 1 task queued on this + worker in order of when they are scheduled to execute (e.g. task queue + at index 0 is the next to be executed when a slot opens up on the worker) + in addition to a map of all task_types to their task queues. + """ + task_types = self.queue_history.keys() + task_queues = { task_type: self.get_queue_history(current_time, task_type, info_staleness) for task_type in task_types } + + types_to_preempt = sorted(filter( + lambda task_type: len(task_queues[task_type]) > 0 and \ + self.max_wait_times[task_type] >= 0 and self.max_wait_times[task_type] <= current_time, task_types), + key=lambda task_type: self.max_wait_times[task_type]) + types_by_arrival = sorted(filter(lambda task_type: len(task_queues[task_type]) > 0 and \ + self.max_wait_times[task_type] > current_time, task_types), + key=lambda task_type: task_queues[task_type][0].log.task_placed_on_worker_queue_timestamp) + + return (types_to_preempt + types_by_arrival), task_queues + + def _maybe_start_batch(self, task_queue: list[Task], current_time: float) -> list[EventOrders]: + """ + Attempts to start a batch drawn from [task_queue]. If there is not + enough GPU memory or the [task_queue] is empty, does nothing. If a + batch is started, updates task type's next wake up to max_wait_time + + earliest remaining task's arrival. + """ + # only wake up if existing tasks to avoid congestion since + # empty queue will wake up on next task enqueue + if len(task_queue) == 0: + return [] + + batch = [] + batch_end_events = [] + + can_run = self.can_run_task(current_time, task_queue[0].model) + if can_run == self._CAN_RUN_ON_EVICT: + current_time += self.evict_models_from_GPU_until( + current_time, task_queue[0].model.model_size, self.LOOKAHEAD_EVICTION) + + if can_run == self._CAN_RUN_NOW or can_run == self._CAN_RUN_ON_EVICT: + queued_tasks = queue.Queue() + [queued_tasks.put(task) for task in task_queue] + + # form largest batch < max_batch_size possible + while (not queued_tasks.empty()) and len(batch) < task_queue[0].max_batch_size: + task = queued_tasks.get() + if (current_time >= task.log.task_placed_on_worker_queue_timestamp): + batch.append(task) + + if len(batch) > 0: + batch_end_events, task_end_time = self.batch_execute(batch, current_time) + for task in batch: # rm all tasks in batch + self.rm_task_in_queue_history(task, current_time) + + # if successfully launched batch, reset max wait time + if not queued_tasks.empty(): + earliest_remaining_arrival = -1 + while not queued_tasks.empty(): + task = queued_tasks.get() + if earliest_remaining_arrival < 0 or \ + task.log.task_placed_on_worker_queue_timestamp < earliest_remaining_arrival: + earliest_remaining_arrival = task.log.task_placed_on_worker_queue_timestamp + self.max_wait_times[batch[0].task_type] = earliest_remaining_arrival + batch[0].max_wait_time + else: + self.max_wait_times[batch[0].task_type] = -1 + + return batch_end_events + + def maybe_start_batch(self, current_time: float, task_type: tuple[int, int]): + """ + Attempts to launch a batch of [task_type]. Does nothing if there are no + tasks of [task_type] queued. + """ + task_queue = self.get_queue_history(current_time, task_type, info_staleness=0) + return self._maybe_start_batch(task_queue, current_time) + + def batch_execute(self, tasks, current_time): + """ + Fetches a new copy or reserves an idle copy of any required GPU models + and executes the batch [tasks]. Returns a list containing the + BatchEndEvent and the batch execution end time. + """ + assert(len(tasks) > 0) # cannot launch empty batch - def task_execute(self, task, current_time): self.involved = True - self.num_free_slots -= 1 - model_fetch_time = self.fetch_model(task.model, current_time) - task_end_time = current_time + model_fetch_time + task.task_exec_duration - events = self.send_result_to_next_workers( - task_end_time, task) - task_end_events = events - task_end_events.append(EventOrders(task_end_time, TaskEndEvent( - self, job_id=task.job_id, task_id=task.task_id))) - self.simulation.add_job_completion_time( - task.job_id, task.task_id, task_end_time) - # task log tracking - task.log.task_front_queue_timestamp = current_time - task.log.task_execution_start_timestamp = current_time + model_fetch_time - task.log.task_execution_end_timestamp = task_end_time + + batch_index = 0 + for i, batch_size in enumerate(sorted(tasks[0].batch_sizes)): + if len(tasks) <= batch_size: # choose smallest batch size > len(tasks) + batch_index = i + break + + model_fetch_time = 0 + if tasks[0].model != None: + if self.GPU_state.does_have_idle_copy(tasks[0].model, current_time): + self.GPU_state.reserve_idle_copy(tasks[0].model, current_time) + else: + model_fetch_time = self.fetch_model(tasks[0].model, current_time) + + task_end_time = current_time + model_fetch_time + tasks[0].batch_exec_time[batch_index] + task_end_events = [] + + job_ids = [] # for logging + + for task in tasks: + events = self.send_result_to_next_workers( + task_end_time, task) + task_end_events += events + + self.simulation.add_job_completion_time( + task.job_id, task.task_id, task_end_time) + + job_ids.append(task.job_id) + + # task log tracking + task.log.task_front_queue_timestamp = current_time + task.log.task_execution_start_timestamp = current_time + model_fetch_time + task.log.task_execution_end_timestamp = task_end_time + + self.simulation.batch_exec_log.loc[len(self.simulation.batch_exec_log)] = { + "time": current_time, + "worker_id": self.worker_id, + "workflow_id": tasks[0].task_type[0], + "task_id": tasks[0].task_id, + "batch_size": len(tasks), + "model_exec_time": tasks[0].batch_exec_time[batch_index], + "batch_exec_time": model_fetch_time + tasks[0].batch_exec_time[batch_index], + "job_ids": job_ids + } + + task_end_events.append(EventOrders(task_end_time, BatchEndEvent( + self, tasks[0].model, job_ids=job_ids, task_type=tasks[0].task_type + ))) return task_end_events, task_end_time # --------------------------- Subsequent TASK Transfer -------------------- @@ -152,24 +307,25 @@ def receive_intermediate_result(self, current_time, prev_task, cur_task) -> list # ------------------------- queue history update helper functions --------------- def add_task_to_queue_history(self, task, current_time): - last_index = len(self.queue_history) - 1 - # 0. base case - if last_index == -1: - self.queue_history.append((current_time, [task])) + # 0. Base case (first entry) + if task.task_type not in self.queue_history: + self.queue_history[task.task_type] = [(current_time, [task])] return + # 1. Find the time_stamp place to add this queue information + last_index = len(self.queue_history[task.task_type]) - 1 while last_index >= 0: - if self.queue_history[last_index][0] == current_time: - if task not in self.queue_history[last_index][1]: - self.queue_history[last_index][1].append(task) + if self.queue_history[task.task_type][last_index][0] == current_time: + if task not in self.queue_history[task.task_type][last_index][1]: + self.queue_history[task.task_type][last_index][1].append(task) break - if self.queue_history[last_index][0] < current_time: + if self.queue_history[task.task_type][last_index][0] < current_time: # print("2") - if task not in self.queue_history[last_index][1]: - next_queue = self.queue_history[last_index][1].copy() + if task not in self.queue_history[task.task_type][last_index][1]: + next_queue = self.queue_history[task.task_type][last_index][1].copy() next_queue.append(task) last_index += 1 - self.queue_history.insert( + self.queue_history[task.task_type].insert( last_index, (current_time, next_queue) ) break @@ -177,47 +333,54 @@ def add_task_to_queue_history(self, task, current_time): last_index -= 1 # 2. added the task to all the subsequent timestamp tuples - while last_index < len(self.queue_history): - if task not in self.queue_history[last_index][1]: - self.queue_history[last_index][1].append(task) + while last_index < len(self.queue_history[task.task_type]): + if task not in self.queue_history[task.task_type][last_index][1]: + self.queue_history[task.task_type][last_index][1].append(task) last_index += 1 def rm_task_in_queue_history(self, task, current_time): - last_index = len(self.queue_history) - 1 # 0. base case: shouldn't happen - if last_index == -1: + if task.task_type not in self.queue_history: AssertionError("rm model cached location to an empty list") return + + last_index = len(self.queue_history[task.task_type]) - 1 + # 1. find the place to add this remove_event to the tuple list while last_index >= 0: - if self.queue_history[last_index][0] == current_time: - if task in self.queue_history[last_index][1]: - self.queue_history[last_index][1].remove(task) + if self.queue_history[task.task_type][last_index][0] == current_time: + if task in self.queue_history[task.task_type][last_index][1]: + self.queue_history[task.task_type][last_index][1].remove(task) break - if self.queue_history[last_index][0] < current_time: - if task in self.queue_history[last_index][1]: - next_tasks_in_queue = self.queue_history[last_index][1].copy() + if self.queue_history[task.task_type][last_index][0] < current_time: + if task in self.queue_history[task.task_type][last_index][1]: + next_tasks_in_queue = self.queue_history[task.task_type][last_index][1].copy() next_tasks_in_queue.remove(task) last_index = last_index + 1 - self.queue_history.insert( + self.queue_history[task.task_type].insert( last_index, (current_time, next_tasks_in_queue) ) break last_index -= 1 # go to prev time # 2. remove the task from all the subsequent tuple - while last_index < len(self.queue_history): - if task in self.queue_history[last_index]: - self.queue_history[last_index][1].remove(task) + while last_index < len(self.queue_history[task.task_type]): + if task in self.queue_history[task.task_type][last_index]: + self.queue_history[task.task_type][last_index][1].remove(task) last_index += 1 # do this for the remaining element after - def get_queue_history(self, current_time, info_staleness=0) -> list: - return self.get_history(self.queue_history, current_time, info_staleness) + def get_queue_history(self, current_time, task_type, info_staleness=0) -> list: + return self.get_history(self.queue_history[task_type], current_time, info_staleness) - def get_task_queue_waittime(self, current_time, info_staleness=0, requiring_worker_id=None): + def get_task_queue_waittime(self, current_time, task_type, info_staleness=0, requiring_worker_id=None): if requiring_worker_id != None and requiring_worker_id != self.worker_id: info_staleness = 0 - queueing_tasks = self.get_queue_history(current_time, info_staleness) - waittime = 0 - for task in queueing_tasks: - waittime += task.task_exec_duration - return waittime + + task_types, task_queues = self.get_sorted_task_types(current_time, info_staleness=info_staleness) + + wait_time = 0 + for queued_task_type in task_types: + for task in task_queues[queued_task_type]: + wait_time += task.task_exec_duration + if queued_task_type == task_type: + return wait_time + return wait_time diff --git a/workers/worker.py b/workers/worker.py index 88cb292..5c1a9f7 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -2,7 +2,10 @@ from core.config import * from core.network import * from core.config import * -import sys + +import pandas as pd + +from workers.model_state import * class Worker(object): @@ -12,11 +15,13 @@ def __init__(self, simulation, num_free_slots, worker_id): self.worker_id = worker_id self.simulation = simulation self.num_free_slots = num_free_slots + self.current_batch = [] # track the currently executing batch (if any) self.GPU_memory_models = [] - # Keep track of the list of models sitting in GPU memory at time: - # {time-> list of model objects} : [ (time1,[model0,model1,]), (time2,[model1,...]),...] - self.GPU_memory_models_history = [] - + self.GPU_state = GPUState() + + self.model_history_log = pd.DataFrame(columns=["start_time", "end_time", + "model_id", "placed_or_evicted"]) + def __hash__(self): return hash(self.worker_id) @@ -46,123 +51,86 @@ def initial_model_placement(self, model): return 1 return 1 - def used_GPUmemory(self, current_time, info_staleness=0, requiring_worker_id=None) -> int: - """ - Helper function for local GPU memory usage check - """ - if requiring_worker_id == self.worker_id: - info_staleness = 0 - models = self.get_model_history(current_time, info_staleness) - return sum(m.model_size for m in models) - # ---------- LOCAL MEMORY MANAGEMENT AND RETRIEVE ----------""" def fetch_model(self, model, current_time): - """ - Return: model transfer time required to execute the Task - Every "task" requires one "model" to be executed correctly - add this information to 2 histories: - 1. model_history on worker - 2. cache_history on metadata_service - """ - if model is None: - return 0 - # First check if the model is stored locally: either on GPU, or systemRAM(home node) - w_models = self.get_model_history(current_time, info_staleness=0) - # case1: if it is in local GPU already - if model in w_models: + if model == None or self.GPU_state.does_have_idle_copy(model, current_time): return 0 + fetch_time = 0 fetch_time = SameMachineCPUtoGPU_delay(model.model_size) + self.simulation.metadata_service.add_model_cached_location( model, self.worker_id, current_time + fetch_time) - self.add_model_to_memory_history(model, current_time + fetch_time) - eviction_time = self.evict_model_from_GPU(current_time + fetch_time) - return fetch_time + eviction_time + self.GPU_state.fetch_model(model, current_time, fetch_time) + + self.model_history_log.loc[len(self.model_history_log)] = { + "start_time": current_time, + "end_time": current_time + fetch_time, + "model_id": model.model_id, + "placed_or_evicted": "placed" + } + + return fetch_time + + # NOTE: REQUIRED OVERRIDE + def get_next_models(self, lookahead_count: int, current_time: float, info_staleness=0): + """ + Returns a list of up to lookahead_count models in order of when they are + expected to be executed. + """ + return [] + + LOOKAHEAD_EVICTION = 0 + FCFS_EVICTION = 1 - def evict_model_from_GPU(self, current_time): + def evict_models_from_GPU_until(self, current_time: float, min_required_memory: int, policy: int) -> float: """ - Do nothing if current cached models didn't exceed the GPU memory - remove this information to 2 histories: - 1. model_history on worker - 2. cache_history on metadata_service + Evicts models from GPU according to FCFS or lookahead eviction policy until at least + min_required_memory space is available. Returns time taken to execute model + evictions. 0 if min_required_memory could not be created. + Assumes batches run in earliest task arrival order. """ - models_in_GPU = self.get_model_history(current_time, info_staleness=0) - models_total_size = 0 - for model in models_in_GPU: - models_total_size += model.model_size - eviction_index = 0 - eviction_duration = 0 - while(models_total_size > GPU_MEMORY_SIZE): - rm_model = models_in_GPU[eviction_index] - self.simulation.metadata_service.rm_model_cached_location( - rm_model, self.worker_id, current_time) - self.rm_model_in_memory_history(rm_model, current_time) - models_total_size -= rm_model.model_size - eviction_index += 1 - eviction_duration += SameMachineGPUtoCPU_delay(rm_model.model_size) - return eviction_duration + curr_memory = self.GPU_state.available_memory(current_time) + + placed_model_states = self.GPU_state.placed_model_states(current_time) + if policy == self.LOOKAHEAD_EVICTION: + next_models = self.get_next_models(3, current_time) + placed_model_states = sorted( + placed_model_states, + key=lambda m: next_models.index(m.model) if m.model in next_models else len(next_models), + reverse=True + ) + + models_to_evict = [] + for state in placed_model_states: + if not state.is_reserved_for_batch: + curr_memory += state.model.model_size + models_to_evict.append(state.model) + if curr_memory >= min_required_memory: + # model_evict_times = list(map(lambda m: SameMachineGPUtoCPU_delay(m.model_size), models_to_evict)) + # eviction_duration = max(model_evict_times) + # full_eviction_end = current_time + eviction_duration + + # must reserve space to prevent other models from loading in space created here + # extra_to_reserve = min_required_memory - sum(m.model_size for m in models_to_evict) + # if extra_to_reserve > 0: + # self.GPU_state.reserve_model_space(None, extra_to_reserve, current_time, full_eviction_end) + + for i in range(len(models_to_evict)): + self.simulation.metadata_service.rm_model_cached_location( + models_to_evict[i], self.worker_id, current_time) + self.GPU_state.evict_model(models_to_evict[i], current_time, 0) + + self.model_history_log.loc[len(self.model_history_log)] = { + "start_time": current_time, + "end_time": current_time , + "model_id": models_to_evict[i].model_id, + "placed_or_evicted": "evicted" + } + return 0 + return 0 # ------------------------- cached model history update helper functions --------------- - def add_model_to_memory_history(self, model, current_time): - assert (model.model_size <= GPU_MEMORY_SIZE) - last_index = len(self.GPU_memory_models_history) - 1 - # 0. base case - if last_index == -1: - self.GPU_memory_models_history.append((current_time, [model])) - return - # 1. Find the time_stamp place to add this queue information - while last_index >= 0: - if self.GPU_memory_models_history[last_index][0] == current_time: - if model not in self.GPU_memory_models_history[last_index][1]: - self.GPU_memory_models_history[last_index][1].append(model) - break - if self.GPU_memory_models_history[last_index][0] < current_time: - if model not in self.GPU_memory_models_history[last_index][1]: - next_queue = self.GPU_memory_models_history[last_index][1].copy( - ) - next_queue.append(model) - last_index += 1 - self.GPU_memory_models_history.insert( - last_index, (current_time, next_queue) - ) - break - # check the previous entry - last_index -= 1 - # 2. added the worker_id to all the subsequent timestamp tuples - while last_index < len(self.GPU_memory_models_history): - if model not in self.GPU_memory_models_history[last_index][1]: - self.GPU_memory_models_history[last_index][1].append(model) - last_index += 1 - - def rm_model_in_memory_history(self, model, current_time): - last_index = len(self.GPU_memory_models_history) - 1 - # 0. base case: shouldn't happen - if last_index == -1: - AssertionError("rm model cached location to an empty list") - return - # 1. find the place to add this remove_event to the tuple list - while last_index >= 0: - if self.GPU_memory_models_history[last_index][0] == current_time: - if model in self.GPU_memory_models_history[last_index][1]: - self.GPU_memory_models_history[last_index][1].remove(model) - break - if self.GPU_memory_models_history[last_index][0] < current_time: - if model in self.GPU_memory_models_history[last_index][1]: - next_tasks_in_memory = self.GPU_memory_models_history[last_index][1].copy( - ) - next_tasks_in_memory.remove(model) - last_index = last_index + 1 - self.GPU_memory_models_history.insert( - last_index, (current_time, next_tasks_in_memory) - ) - break - last_index -= 1 # go to prev time - # 2. remove the task from all the subsequent tuple - while last_index < len(self.GPU_memory_models_history): - if model in self.GPU_memory_models_history[last_index]: - self.GPU_memory_models_history[last_index][1].remove(model) - last_index += 1 # do this for the remaining element after - def get_history(self, history, current_time, info_staleness) -> list: delayed_time = current_time - info_staleness last_index = len(history) - 1 @@ -171,10 +139,3 @@ def get_history(self, history, current_time, info_staleness) -> list: return history[last_index][1].copy() last_index -= 1 # check the previous one return [] - - def get_model_history(self, current_time, info_staleness=0, requiring_workerid= None) -> list: - if requiring_workerid == self.worker_id: - info_staleness = 0 - return self.get_history(self.GPU_memory_models_history, current_time, info_staleness) - -