Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4c35e87
wip batching implementation
az275 Jun 3, 2025
ff389c6
batching config
az275 Jun 3, 2025
18741b6
worker wake up after max wait time
tamitakada Jun 4, 2025
04fefb9
wake up event
tamitakada Jun 4, 2025
aa7eb1c
event logging
tamitakada Jun 4, 2025
d4ae0f1
batching
tamitakada Jun 4, 2025
b9dcf02
comments
tamitakada Jun 5, 2025
7eba498
wake up enqueue bug fix and comments
tamitakada Jun 5, 2025
cc2b1df
produce breakdown
tamitakada Jun 5, 2025
2e65671
start batch whenever possible
tamitakada Jun 6, 2025
c813826
separate tasks for each task type; updated queue wait time estimates
tamitakada Jun 6, 2025
5de0c95
per task type wait times
tamitakada Jun 6, 2025
c3a8e43
task type bug fix
tamitakada Jun 6, 2025
a1449ec
task type fix
tamitakada Jun 6, 2025
14b857d
event logging fix
tamitakada Jun 6, 2025
f8e40de
env vars setup script
tamitakada May 20, 2025
e497a17
dummy wf
tamitakada Jun 6, 2025
22dbc00
merge
tamitakada Jun 10, 2025
6243924
real workflow
tamitakada Jun 10, 2025
05c3c88
lookahead model eviction
tamitakada Jun 10, 2025
f9b8408
eviction policy fix
tamitakada Jun 11, 2025
ae44ac7
concurrent batch execution according to available GPU memory
tamitakada Jun 11, 2025
c85596f
add model data to track models in use
tamitakada Jun 11, 2025
3d5d72c
can fit helper fix
tamitakada Jun 12, 2025
00dc474
merge
tamitakada Jun 12, 2025
2c916e3
model logging & eviction fix
tamitakada Jun 12, 2025
b04b800
merge: job logging
tamitakada Jun 6, 2025
086256e
merge fix
tamitakada Jun 6, 2025
11a9557
merge: job logging
tamitakada May 20, 2025
e5b5516
plot model loading
tamitakada Jun 12, 2025
8547b8c
fetch fix and policy choice
tamitakada Jun 12, 2025
50fe476
policy choice
tamitakada Jun 12, 2025
adf3c70
fixed loading plot; added eviction plot
tamitakada Jun 12, 2025
70c0489
gpu state refactor for accurate allocation handling
tamitakada Jun 17, 2025
84e1c0d
cache logic update for gpu state
tamitakada Jun 17, 2025
86566d1
remove batch start
tamitakada Jun 17, 2025
9af0327
reserving GPU space for model fetching upon eviction
tamitakada Jun 17, 2025
6b79f44
removed wake up
tamitakada Jun 17, 2025
0120870
to str fix
tamitakada Jun 17, 2025
26076cb
cannot load duplicates
tamitakada Jun 18, 2025
f268dc2
no eviction time
tamitakada Jun 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
""" -------- Worker Machines Parameters -------- """
GPU_MEMORY_SIZE = 14000000 # in KB, 15BG for Tesla T4
GPU_MEMORY_SIZE = 24000000 # in KB, 24GB for NVIDIA A30

TOTAL_NUM_OF_WORKERS = 140
TOTAL_NUM_OF_WORKERS = 50


""" -------- Workload Parameters -------- """
TOTAL_NUM_OF_JOBS = 1000
TOTAL_NUM_OF_JOBS = 10000

# The interval between two consecutive job creation events at each external client
DEFAULT_CREATION_INTERVAL_PERCLIENT = 100 # ms.
DEFAULT_CREATION_INTERVAL_PERCLIENT = 0.2 # ms.

WORKLOAD_DISTRIBUTION = "POISON" # UNIFORM | POISON | GAMMA

Expand All @@ -20,4 +20,4 @@

PLACEMENT_INFORMATION_STALENESS = 1 # in ms

RESCHEDULE_THREASHOLD = 1.5
RESCHEDULE_THREASHOLD = 1.5
18 changes: 9 additions & 9 deletions core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,21 @@ def run(self, current_time):
def to_string(self):
return "[Intermediate Results Arrival]: worker:" + str(self.worker.worker_id) + ", prev_task_id:" + str(self.prev_task.task_id) + ", cur_task_id:" + str(self.cur_task.task_id)

class BatchEndEvent(Event):
""" Event to signify that a BATCH has been performed by the WORKER. """

class TaskEndEvent(Event):
""" Event to signify that a TASK has been performed by the WORKER. """

def __init__(self, worker, job_id=-1, task_id=-1):
def __init__(self, worker, model, job_ids=[], task_type=(-1, -1)):
self.worker = worker
self.job_id = job_id # integer representing the job_id
self.task_id = task_id # integer representing the task_id
self.model = model
self.job_ids = job_ids # integers representing the job_ids
self.task_type = task_type # (workflow_id, task_id)

def run(self, current_time):
return self.worker.free_slot(current_time)
return self.worker.free_slot(current_time, self.model, self.task_type)

def to_string(self):
return "[Task End (Job {} - Task {}) at Worker {}] ===".format(self.job_id, self.task_id, self.worker.worker_id)
jobs = ",".join([str(id) for id in self.job_ids])
return f"[Batch End (Task {self.task_type}, Jobs {jobs}) at Worker {self.worker.worker_id}]"


# for PER_JOB scheduler
Expand Down Expand Up @@ -199,7 +200,6 @@ def run(self, current_time):
def to_string(self):
return "[Job End] ==="


class EventOrders:
"""
Used so that the Simulation keeps track of the priority queue order
Expand Down
7 changes: 6 additions & 1 deletion core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,15 @@ def job_generate_from_workflow(self):

current_task = Task(self.id, # ID of the associated unique Job
task_cfg["TASK_INDEX"], # taskID
(self.job_type_id, task_cfg["TASK_INDEX"]), # task type
task_cfg["EXECUTION_TIME"],
required_model_for_task,
task_cfg["INPUT_SIZE"],
task_cfg["OUTPUT_SIZE"])
task_cfg["OUTPUT_SIZE"],
task_cfg["MAX_BATCH_SIZE"],
task_cfg["MAX_WAIT_TIME"],
task_cfg["BATCH_SIZES"],
task_cfg["BATCH_EXEC_TIME"])

self.tasks.append(current_task)

Expand Down
17 changes: 10 additions & 7 deletions core/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
is referenced from Sparrow: https://github.com/radlab/sparrow
'''

import imp
import numpy as np
from matplotlib import pyplot as plt
from core.config import *
Expand Down Expand Up @@ -44,6 +43,9 @@ def __init__(
# Tracking measurements
self.result_to_export = pd.DataFrame()
self.tasks_logging_times = pd.DataFrame()
self.event_log = pd.DataFrame(columns=["time", "event"])
self.batch_exec_log = pd.DataFrame(columns=["time", "worker_id", "workflow_id", "task_id", "batch_size", "model_exec_time", "batch_exec_time", "job_ids"])

print("---- SIMULATION : " + self.simulation_name + "----")
self.produce_breakdown = produce_breakdown

Expand Down Expand Up @@ -104,8 +106,8 @@ def run_finish(self, last_time, by_job_type=False):
def produce_time_breakdown_results(self, completed_jobs):

dataframe = pd.DataFrame(columns=["job_id", "load_info_staleness", "placement_info_staleness", "req_inter_arrival_delay",
"workflow_type", "scheduler_type", "slowdown", "response_time"])
dataframe_tasks_log = pd.DataFrame(columns=["workflow_type", "task_id", "time_to_buffer", "dependency_wait_time",
"workflow_type", "job_create_time", "scheduler_type", "slowdown", "response_time"])
dataframe_tasks_log = pd.DataFrame(columns=["workflow_type", "task_id", "task_arrival_time", "task_start_exec_time", "time_to_buffer", "dependency_wait_time",
"time_spent_in_queue", "model_fetching_time", "execution_time"])

for index, completed_job in enumerate(completed_jobs):
Expand All @@ -120,7 +122,7 @@ def produce_time_breakdown_results(self, completed_jobs):
if "JOB_CREATION_INTERVAL" in WORKFLOW_LIST[completed_job.job_type_id]:
job_creation_interval = WORKFLOW_LIST[completed_job.job_type_id]["JOB_CREATION_INTERVAL"]
dataframe.loc[index] = [index, LOAD_INFORMATION_STALENESS, PLACEMENT_INFORMATION_STALENESS, job_creation_interval, completed_job.job_type_id,
self.simulation_name, slowdown, response_time]
completed_job.create_time, self.simulation_name, slowdown, response_time]

task_index = 0
for job in completed_jobs:
Expand All @@ -141,9 +143,10 @@ def produce_time_breakdown_results(self, completed_jobs):
assert model_fetching_time >= 0
assert execution_time >= 0

dataframe_tasks_log.loc[task_index] = [job.job_type_id, task.task_id, time_to_buffer,
dependency_wait_time, time_spent_in_queue, model_fetching_time, execution_time]
dataframe_tasks_log.loc[task_index] = [job.job_type_id, task.task_id, task.log.task_arrival_at_worker_buffer_timestamp,
task.log.task_execution_start_timestamp,time_to_buffer, dependency_wait_time,
time_spent_in_queue, model_fetching_time, execution_time]
task_index += 1

self.tasks_logging_times = dataframe_tasks_log
self.result_to_export = dataframe
self.result_to_export = dataframe
9 changes: 8 additions & 1 deletion core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@


class Task(object):
def __init__(self, job_id, task_id, task_exec_duration, required_model, input_size, result_size):
def __init__(self, job_id, task_id, task_type, task_exec_duration,
required_model, input_size, result_size, max_batch_size,
max_wait_time, batch_sizes, batch_exec_time):
self.job_id = job_id # id of the job the task belongs to
self.task_id = task_id # id of the task itself
self.task_type = task_type # (workflow_id, task_id)
# the time it takes to execute the task
self.task_exec_duration = task_exec_duration
# required model_id to execute the task. None if it is a computation task that doesn't involve ML model
self.model = required_model
# task input size to model.
self.input_size = input_size
self.result_size = result_size # output size
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.batch_sizes = batch_sizes
self.batch_exec_time = batch_exec_time
# list of Tasks (inputs) that this task requires ( list will be appended as the job generated)
self.required_task_ids = [] # list of task ids
self.next_task_ids = [] # list of task ids
Expand Down
Loading