From aaee10835bd3d7a04e4fa009cf97c4de8c28e4cc Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Thu, 7 Aug 2025 20:15:02 +0000 Subject: [PATCH 01/14] test no ray --- inference.py | 0 test_no_ray.py | 105 +++++++++++++++++++++++++++++++++++++++++++++++++ train.py | 12 ++++++ 3 files changed, 117 insertions(+) create mode 100644 inference.py create mode 100644 test_no_ray.py create mode 100644 train.py diff --git a/inference.py b/inference.py new file mode 100644 index 00000000..e69de29b diff --git a/test_no_ray.py b/test_no_ray.py new file mode 100644 index 00000000..88789031 --- /dev/null +++ b/test_no_ray.py @@ -0,0 +1,105 @@ +import logging +import os +import tempfile +import traceback +from typing import Any, Dict, Optional +from composer.cli.launcher import _launch_processes, _monitor_processes, _cleanup_processes, _aggregate_process_returncode + +def run_distributed_training( + nproc: int, + world_size: int, + base_rank: int, + node_rank: int, + master_addr: str, + master_port: int, + training_script: str, + training_script_args: Any=None, + module_mode: bool=False, + command_mode: bool=False, + stdout: Optional[str]=None, + stderr: Optional[str]=None, + verbose: bool = False, +) -> int: + """ + Run distributed training with the given parameters. + + Args: + nproc (int): Number of processes to launch. + world_size (int): Total number of processes across all nodes. + base_rank (int): Base rank of the current node. + node_rank (int): Rank of the current node. + master_addr (str): Address of the master node. + master_port (int): Port of the master node. + module_mode (bool): Whether to use module mode. + command_mode (bool): Whether to use command mode. + stdout (Optional[str]): Stdout file format. + stderr (Optional[str]): Stderr file format. + training_script (str): Training script to run. + training_script_args (Any): Arguments for the training script. + verbose (bool): Whether to use verbose logging. + + Returns: + int: Aggregated return code from all processes. + """ + if training_script_args is None: + training_script_args = [] + + MOSAICML_PLATFORM_ENV_VAR = "MOSAICML_PLATFORM" + MOSAICML_LOG_DIR_ENV_VAR = "MOSAICML_LOG_DIR" + MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR = "MOSAICML_GPU_LOG_FILE_PREFIX" + + logger = logging.getLogger("distributed_training") + logging.basicConfig() + logger.setLevel(logging.INFO if verbose else logging.WARNING) + + processes: Dict[Any, Any] = {} + + log_tmpdir = tempfile.TemporaryDirectory() + if stdout is None: + stdout = f'{log_tmpdir.name}/rank{{rank}}.stdout.txt' + if stderr is None: + stderr = f'{log_tmpdir.name}/rank{{rank}}.stderr.txt' + + # If running on the Mosaic platform, log all gpu ranks' stderr and stdout to Mosaic platform + if ( + os.environ.get(MOSAICML_PLATFORM_ENV_VAR, 'false').lower() == 'true' + and str(os.environ.get(MOSAICML_LOG_DIR_ENV_VAR, 'false')).lower() != 'false' + and os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR, 'false').lower() != 'false' + ): + logger.info('Logging all GPU ranks to Mosaic AI Training.') + log_file_format = ( + f"{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/" + f"{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{local_rank}}.txt" + ) + stdout = log_file_format + stderr = None + + try: + _launch_processes( + nproc=nproc, + world_size=world_size, + base_rank=base_rank, + node_rank=node_rank, + master_addr=master_addr, + master_port=master_port, + module_mode=module_mode, + command_mode=command_mode, + stdout_file_format=stdout, + stderr_file_format=stderr, + training_script=training_script, + training_script_args=training_script_args, + processes=processes, + ) + _monitor_processes(processes) + except Exception: + # Print the exception first, then kill the training processes, since killing + # may take up to CLEANUP_TIMEOUT seconds, and the user should know immediately + # what failed. No need to re-raise the exception, as `aggregate_process_returncode` + # will return an appropriate error code, which will cause the script to exit. + logger.error("Exception occurred during distributed training", exc_info=True) + traceback.print_exc() + print('Killing training processes') + finally: + _cleanup_processes(processes) + log_tmpdir.cleanup() + return _aggregate_process_returncode(processes) diff --git a/train.py b/train.py new file mode 100644 index 00000000..53034da6 --- /dev/null +++ b/train.py @@ -0,0 +1,12 @@ +from composer.utils import dist + +from multiprocessing.context import SpawnProcess + +def train_to_inference_communication(): + pass + +if __name__ == "__main__": + dist.initialize_dist() + print(f"Hello from rank {dist.get_rank()}") + + SpawnProcess(target=train_to_inference_communication).start() \ No newline at end of file From a4dbd0116532d1e5a78105bb91c8064ec7bbd1f7 Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 02:27:04 +0000 Subject: [PATCH 02/14] works well --- inference.py | 0 rollout.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++ test_no_ray.py | 18 +++++++++++++++++ train.py | 53 ++++++++++++++++++++++++++++++++++++++++++++------ 4 files changed, 115 insertions(+), 6 deletions(-) delete mode 100644 inference.py create mode 100644 rollout.py diff --git a/inference.py b/inference.py deleted file mode 100644 index e69de29b..00000000 diff --git a/rollout.py b/rollout.py new file mode 100644 index 00000000..359d5ac8 --- /dev/null +++ b/rollout.py @@ -0,0 +1,50 @@ +from compose_rl.algorithms.online.generation_utils.vllm_utils import init_process_group +from composer.utils import dist +import torch + +import logging + +logging.basicConfig( + # Example of format string + # 2022-06-29 11:22:26,152: rank0[822018][MainThread]: INFO: composer.trainer.trainer: Using precision Precision.FP32 + # Including the PID and thread name to help with debugging dataloader workers and callbacks that spawn background + # threads / processes + format= + f'[ROLLOUT]%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', +) + +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) + + +MODEL_UPDATE_PORT=29600 +NUM_INFERENCE_ENGINES=1 +MAX_ITERATIONS=10 + +if __name__ == "__main__": + torch.distributed.init_process_group(backend="nccl") # 0, 1 + log.info(f"Hello from rank {dist.get_global_rank()}") + rank = dist.get_global_rank() + 1 + log.info(f"Rank {rank}") + model_update_group = None + if dist.get_global_rank() == 0: + log.info("Initializing model update process group") # 1 + model_update_group = init_process_group( + backend="nccl", + init_method=f"tcp://localhost:{MODEL_UPDATE_PORT}", + world_size=1 + NUM_INFERENCE_ENGINES, + rank=rank, + group_name="model_update_group", + ) + + # TODO: check to see if there's an update to the model weights, if there is update the weights + # to make it sync, we will wait until there is a weight update + if model_update_group is not None: + t = torch.tensor([0]).to('cuda') + dist.broadcast(group=model_update_group, src=0,tensor=t) + log.info(f"Rank {dist.get_global_rank()} all gathered {t}") + + # TODO: start generating rollouts and put it in the experience buffer + + + diff --git a/test_no_ray.py b/test_no_ray.py index 88789031..f855d15b 100644 --- a/test_no_ray.py +++ b/test_no_ray.py @@ -1,5 +1,6 @@ import logging import os +import subprocess import tempfile import traceback from typing import Any, Dict, Optional @@ -103,3 +104,20 @@ def run_distributed_training( _cleanup_processes(processes) log_tmpdir.cleanup() return _aggregate_process_returncode(processes) + + +if __name__ == "__main__": + # test on 4 gpus! + + try: + p1 = subprocess.Popen('CUDA_VISIBLE_DEVICES=0,1 composer -n 2 train.py', shell=True) + p2 = subprocess.Popen('CUDA_VISIBLE_DEVICES=2,3 composer -n 2 rollout.py', shell=True) + p1.wait() + p2.wait() + except Exception as e: + print(e) + print(traceback.format_exc()) + print('Killing training processes') + finally: + _cleanup_processes({0: p1, 1: p2}) + diff --git a/train.py b/train.py index 53034da6..5b645189 100644 --- a/train.py +++ b/train.py @@ -1,12 +1,53 @@ +from datetime import timedelta from composer.utils import dist +import torch -from multiprocessing.context import SpawnProcess +from compose_rl.algorithms.online.generation_utils.vllm_utils import init_process_group -def train_to_inference_communication(): - pass +import logging + +logging.basicConfig( + # Example of format string + # 2022-06-29 11:22:26,152: rank0[822018][MainThread]: INFO: composer.trainer.trainer: Using precision Precision.FP32 + # Including the PID and thread name to help with debugging dataloader workers and callbacks that spawn background + # threads / processes + format= + f'[TRAIN]%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', +) + +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) + +MODEL_UPDATE_PORT=29600 +NUM_INFERENCE_ENGINES=1 +MAX_ITERATIONS=10 if __name__ == "__main__": - dist.initialize_dist() - print(f"Hello from rank {dist.get_rank()}") + # note: the smaller timeout seems to hold, doesn't matter which process gorup you set the timeout to + torch.distributed.init_process_group(backend="nccl") + log.info(f"Hello from rank {dist.get_global_rank()}") + + model_update_group = None + if dist.get_global_rank() == 0: + log.info("Initializing model update process group") + model_update_group = init_process_group( + backend="nccl", + init_method=f"tcp://localhost:{MODEL_UPDATE_PORT}", + world_size=1 + NUM_INFERENCE_ENGINES, + rank=0, + group_name="model_update_group", + ) + + + # TODO: broadcast the model weights to the inference engines + if model_update_group is not None: + t = torch.tensor([5]).to('cuda') + dist.broadcast(group=model_update_group, src=0,tensor=t) + log.info(f"Rank {dist.get_global_rank()} Broadcasted{t}") + + # TODO: get the experience buffer results from the rollout process + + # all ranks should wait until we have the experience buffer results + + # TODO: train the model - SpawnProcess(target=train_to_inference_communication).start() \ No newline at end of file From 968e0f967d162c2d48f7e05c1cbd5083dc40520a Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 02:51:13 +0000 Subject: [PATCH 03/14] async ops --- rollout.py | 17 +++++++++++++++++ train.py | 23 +++++++++++++++++++---- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/rollout.py b/rollout.py index 359d5ac8..96ec62f3 100644 --- a/rollout.py +++ b/rollout.py @@ -18,6 +18,7 @@ MODEL_UPDATE_PORT=29600 +EXPERIENCE_BUFFER_PORT=29601 NUM_INFERENCE_ENGINES=1 MAX_ITERATIONS=10 @@ -27,6 +28,7 @@ rank = dist.get_global_rank() + 1 log.info(f"Rank {rank}") model_update_group = None + experience_buffer_group = None if dist.get_global_rank() == 0: log.info("Initializing model update process group") # 1 model_update_group = init_process_group( @@ -36,6 +38,13 @@ rank=rank, group_name="model_update_group", ) + experience_buffer_group = init_process_group( + backend="nccl", + init_method=f"tcp://localhost:{EXPERIENCE_BUFFER_PORT}", + world_size=1 + NUM_INFERENCE_ENGINES, + rank=rank, + group_name="experience_buffer_group", + ) # TODO: check to see if there's an update to the model weights, if there is update the weights # to make it sync, we will wait until there is a weight update @@ -45,6 +54,14 @@ log.info(f"Rank {dist.get_global_rank()} all gathered {t}") # TODO: start generating rollouts and put it in the experience buffer + dist.barrier() # wait until the model update is complete + + + + if experience_buffer_group is not None: + t = torch.tensor([6]).to('cuda') + torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=t, async_op=True) # don't block, send it off and continue generating rollouts + log.info(f"Rank {dist.get_global_rank()} Broadcasted experience{t}") diff --git a/train.py b/train.py index 5b645189..86a01f1b 100644 --- a/train.py +++ b/train.py @@ -19,6 +19,7 @@ log.setLevel(logging.DEBUG) MODEL_UPDATE_PORT=29600 +EXPERIENCE_BUFFER_PORT=29601 NUM_INFERENCE_ENGINES=1 MAX_ITERATIONS=10 @@ -28,6 +29,7 @@ log.info(f"Hello from rank {dist.get_global_rank()}") model_update_group = None + experience_buffer_group = None if dist.get_global_rank() == 0: log.info("Initializing model update process group") model_update_group = init_process_group( @@ -37,17 +39,30 @@ rank=0, group_name="model_update_group", ) - + experience_buffer_group = init_process_group( + backend="nccl", + init_method=f"tcp://localhost:{EXPERIENCE_BUFFER_PORT}", + world_size=1 + NUM_INFERENCE_ENGINES, + rank=0, + group_name="experience_buffer_group", + ) # TODO: broadcast the model weights to the inference engines if model_update_group is not None: t = torch.tensor([5]).to('cuda') - dist.broadcast(group=model_update_group, src=0,tensor=t) - log.info(f"Rank {dist.get_global_rank()} Broadcasted{t}") + torch.distributed.broadcast(group=model_update_group, src=0,tensor=t, async_op=True) # broadcast all the model weights + log.info(f"Rank {dist.get_global_rank()} Broadcasted model weights{t}") # TODO: get the experience buffer results from the rollout process + if experience_buffer_group is not None: + t = torch.tensor([0]).to('cuda') + torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=t) # block until the broadcast is complete + log.info(f"Rank {dist.get_global_rank()} Broadcasted experience{t}") + + # all training ranks should wait until we have the experience buffer results + dist.barrier() - # all ranks should wait until we have the experience buffer results + # distributed the experiences results to each of the training ranks # TODO: train the model From ec532c484c666d55ed23ea00e99531fc3e42b635 Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 02:58:18 +0000 Subject: [PATCH 04/14] simplify --- rollout.py | 55 ++++++++++++++++++++------------------------------ test_no_ray.py | 2 +- 2 files changed, 23 insertions(+), 34 deletions(-) diff --git a/rollout.py b/rollout.py index 96ec62f3..2d571d10 100644 --- a/rollout.py +++ b/rollout.py @@ -23,45 +23,34 @@ MAX_ITERATIONS=10 if __name__ == "__main__": - torch.distributed.init_process_group(backend="nccl") # 0, 1 - log.info(f"Hello from rank {dist.get_global_rank()}") - rank = dist.get_global_rank() + 1 - log.info(f"Rank {rank}") - model_update_group = None - experience_buffer_group = None - if dist.get_global_rank() == 0: - log.info("Initializing model update process group") # 1 - model_update_group = init_process_group( - backend="nccl", - init_method=f"tcp://localhost:{MODEL_UPDATE_PORT}", - world_size=1 + NUM_INFERENCE_ENGINES, - rank=rank, - group_name="model_update_group", - ) - experience_buffer_group = init_process_group( - backend="nccl", - init_method=f"tcp://localhost:{EXPERIENCE_BUFFER_PORT}", - world_size=1 + NUM_INFERENCE_ENGINES, - rank=rank, - group_name="experience_buffer_group", - ) + rank = 1 # TODO: UPDATE TO SUPPORT MULTIPLE INFERENCE ENGINES + log.info("Initializing model update process group") # 1 + model_update_group = init_process_group( + backend="nccl", + init_method=f"tcp://localhost:{MODEL_UPDATE_PORT}", + world_size=1 + NUM_INFERENCE_ENGINES, + rank=rank, + group_name="model_update_group", + ) + experience_buffer_group = init_process_group( + backend="nccl", + init_method=f"tcp://localhost:{EXPERIENCE_BUFFER_PORT}", + world_size=1 + NUM_INFERENCE_ENGINES, + rank=rank, + group_name="experience_buffer_group", + ) # TODO: check to see if there's an update to the model weights, if there is update the weights # to make it sync, we will wait until there is a weight update - if model_update_group is not None: - t = torch.tensor([0]).to('cuda') - dist.broadcast(group=model_update_group, src=0,tensor=t) - log.info(f"Rank {dist.get_global_rank()} all gathered {t}") + t = torch.tensor([0]).to('cuda') + torch.distributed.broadcast(group=model_update_group, src=0,tensor=t) + log.info(f"Rank {dist.get_global_rank()} all gathered {t}") # TODO: start generating rollouts and put it in the experience buffer - dist.barrier() # wait until the model update is complete - - - if experience_buffer_group is not None: - t = torch.tensor([6]).to('cuda') - torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=t, async_op=True) # don't block, send it off and continue generating rollouts - log.info(f"Rank {dist.get_global_rank()} Broadcasted experience{t}") + t = torch.tensor([6]).to('cuda') + torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=t, async_op=True) # don't block, send it off and continue generating rollouts + log.info(f"Rank {dist.get_global_rank()} Broadcasted experience{t}") diff --git a/test_no_ray.py b/test_no_ray.py index f855d15b..71abe524 100644 --- a/test_no_ray.py +++ b/test_no_ray.py @@ -111,7 +111,7 @@ def run_distributed_training( try: p1 = subprocess.Popen('CUDA_VISIBLE_DEVICES=0,1 composer -n 2 train.py', shell=True) - p2 = subprocess.Popen('CUDA_VISIBLE_DEVICES=2,3 composer -n 2 rollout.py', shell=True) + p2 = subprocess.Popen('CUDA_VISIBLE_DEVICES=2,3 python rollout.py', shell=True) p1.wait() p2.wait() except Exception as e: From 49400b4a6dc23e5ea038f4a965fdee3af2bbef0f Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 03:38:33 +0000 Subject: [PATCH 05/14] works with a certain number of iterations! --- rollout.py | 53 ++++++++++++++++++++++++++++++++++++++-------------- train.py | 55 +++++++++++++++++++++++++++++++++--------------------- 2 files changed, 73 insertions(+), 35 deletions(-) diff --git a/rollout.py b/rollout.py index 2d571d10..f0f784a6 100644 --- a/rollout.py +++ b/rollout.py @@ -4,6 +4,14 @@ import logging +MODEL_UPDATE_PORT=29600 +EXPERIENCE_BUFFER_PORT=29601 +NUM_INFERENCE_ENGINES=1 +MAX_ITERATIONS=2 + +# Global iteration tracker +CURRENT_ITERATION = 0 + logging.basicConfig( # Example of format string # 2022-06-29 11:22:26,152: rank0[822018][MainThread]: INFO: composer.trainer.trainer: Using precision Precision.FP32 @@ -12,17 +20,11 @@ format= f'[ROLLOUT]%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) - log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) - -MODEL_UPDATE_PORT=29600 -EXPERIENCE_BUFFER_PORT=29601 -NUM_INFERENCE_ENGINES=1 -MAX_ITERATIONS=10 - if __name__ == "__main__": + rank = 1 # TODO: UPDATE TO SUPPORT MULTIPLE INFERENCE ENGINES log.info("Initializing model update process group") # 1 model_update_group = init_process_group( @@ -42,15 +44,38 @@ # TODO: check to see if there's an update to the model weights, if there is update the weights # to make it sync, we will wait until there is a weight update - t = torch.tensor([0]).to('cuda') - torch.distributed.broadcast(group=model_update_group, src=0,tensor=t) - log.info(f"Rank {dist.get_global_rank()} all gathered {t}") + is_ready_to_update = torch.tensor([0]).to('cuda') + is_ready_to_update_work = None + + for i in range(MAX_ITERATIONS): + log.info(f"[ITERATION {i + 1}/{MAX_ITERATIONS}] Starting iteration") + + if is_ready_to_update_work is None: + # if we haven't checked if there's an update to the model weights, run the check in the background + is_ready_to_update_work = torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update, async_op=True) + if i == 0: + is_ready_to_update_work.wait() # We need to update the weights for the first iteration before we start generating rollouts + + if is_ready_to_update.item() == 1: + assert is_ready_to_update_work.is_completed() + log.info(f"Weights are ready to update") + + log.info("Updating the model weights") + weights = torch.tensor([i]).to('cuda') + torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights) + log.info(f"Updating the weights to {weights}") + # rest the update check + is_ready_to_update = torch.tensor([0]).to('cuda') + is_ready_to_update_work = None + + + # TODO: start generating rollouts and put it in the experience buffer - # TODO: start generating rollouts and put it in the experience buffer + experience_buffer = torch.tensor([6]).to('cuda') + torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer, async_op=True) # don't block, send it off and continue generating rollouts + log.info(f"Rank {dist.get_global_rank()} Sent experience buffer {experience_buffer}") - t = torch.tensor([6]).to('cuda') - torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=t, async_op=True) # don't block, send it off and continue generating rollouts - log.info(f"Rank {dist.get_global_rank()} Broadcasted experience{t}") + log.info(f"Completed iteration {i + 1}/{MAX_ITERATIONS}") diff --git a/train.py b/train.py index 86a01f1b..95122f5c 100644 --- a/train.py +++ b/train.py @@ -1,4 +1,3 @@ -from datetime import timedelta from composer.utils import dist import torch @@ -6,6 +5,12 @@ import logging +MODEL_UPDATE_PORT=29600 +EXPERIENCE_BUFFER_PORT=29601 +NUM_INFERENCE_ENGINES=1 +MAX_ITERATIONS=2 + + logging.basicConfig( # Example of format string # 2022-06-29 11:22:26,152: rank0[822018][MainThread]: INFO: composer.trainer.trainer: Using precision Precision.FP32 @@ -14,20 +19,15 @@ format= f'[TRAIN]%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) - log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) -MODEL_UPDATE_PORT=29600 -EXPERIENCE_BUFFER_PORT=29601 -NUM_INFERENCE_ENGINES=1 -MAX_ITERATIONS=10 - if __name__ == "__main__": - # note: the smaller timeout seems to hold, doesn't matter which process gorup you set the timeout to torch.distributed.init_process_group(backend="nccl") log.info(f"Hello from rank {dist.get_global_rank()}") + + model_update_group = None experience_buffer_group = None if dist.get_global_rank() == 0: @@ -48,21 +48,34 @@ ) # TODO: broadcast the model weights to the inference engines - if model_update_group is not None: - t = torch.tensor([5]).to('cuda') - torch.distributed.broadcast(group=model_update_group, src=0,tensor=t, async_op=True) # broadcast all the model weights - log.info(f"Rank {dist.get_global_rank()} Broadcasted model weights{t}") + for i in range(MAX_ITERATIONS): + # Update global iteration tracker + log.info(f"Starting iteration {i + 1}/{MAX_ITERATIONS}") + + if model_update_group is not None: + is_ready_to_update = torch.tensor([1]).to('cuda') + is_ready_to_update_work = torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update, async_op=True) + log.info(f"Rank {dist.get_global_rank()} Broadcasted is_ready_to_update{is_ready_to_update}") + + is_ready_to_update_work.wait() # wait until the broadcast is complete (the rollout process has received the message) before we update the model weights + + # Actually broadcast the model weights + weights = torch.tensor([5]).to('cuda') + torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights, async_op=True) # broadcast all the model weights + log.info(f"Rank {dist.get_global_rank()} Broadcasted model weights{weights}") # TODO: update the model weights + + # TODO: get the experience buffer results from the rollout process + experience_buffer = torch.tensor([0]).to('cuda') + if experience_buffer_group is not None: + torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer) # block until the broadcast is complete, need to get the new experiences + log.info(f"Rank {dist.get_global_rank()} Got experience buffer {experience_buffer}") - # TODO: get the experience buffer results from the rollout process - if experience_buffer_group is not None: - t = torch.tensor([0]).to('cuda') - torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=t) # block until the broadcast is complete - log.info(f"Rank {dist.get_global_rank()} Broadcasted experience{t}") + # all training ranks should wait until we have the experience buffer results + dist.barrier() - # all training ranks should wait until we have the experience buffer results - dist.barrier() + # distributed the experiences results to each of the training ranks - # distributed the experiences results to each of the training ranks + # TODO: train the model - # TODO: train the model + log.info(f"Completed iteration {i + 1}/{MAX_ITERATIONS}") From 52d02b5e0654e748b1eb5ee7d8e45996163fa5d2 Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 03:42:29 +0000 Subject: [PATCH 06/14] correct training to be blocking --- rollout.py | 2 +- train.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/rollout.py b/rollout.py index f0f784a6..d61974b5 100644 --- a/rollout.py +++ b/rollout.py @@ -60,7 +60,7 @@ assert is_ready_to_update_work.is_completed() log.info(f"Weights are ready to update") - log.info("Updating the model weights") + log.info("Updating the model weights") # this is a blocking operation, we need to wait until the weights are updated before we can start generating rollouts weights = torch.tensor([i]).to('cuda') torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights) log.info(f"Updating the weights to {weights}") diff --git a/train.py b/train.py index 95122f5c..d2062ece 100644 --- a/train.py +++ b/train.py @@ -54,17 +54,15 @@ if model_update_group is not None: is_ready_to_update = torch.tensor([1]).to('cuda') - is_ready_to_update_work = torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update, async_op=True) + torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update) # BLOCKING, let the other process know that we're ready to update the model weights log.info(f"Rank {dist.get_global_rank()} Broadcasted is_ready_to_update{is_ready_to_update}") - is_ready_to_update_work.wait() # wait until the broadcast is complete (the rollout process has received the message) before we update the model weights - # Actually broadcast the model weights weights = torch.tensor([5]).to('cuda') - torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights, async_op=True) # broadcast all the model weights + torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights) # broadcast all the model weights, BLOCKING log.info(f"Rank {dist.get_global_rank()} Broadcasted model weights{weights}") # TODO: update the model weights - # TODO: get the experience buffer results from the rollout process + # Get the experience buffer results from the rollout process experience_buffer = torch.tensor([0]).to('cuda') if experience_buffer_group is not None: torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer) # block until the broadcast is complete, need to get the new experiences @@ -75,7 +73,7 @@ # distributed the experiences results to each of the training ranks - # TODO: train the model + # TODO: train the model TRAINING CODE HERE log.info(f"Completed iteration {i + 1}/{MAX_ITERATIONS}") From 2240f6d66d0fb47f72f730264dc6bdc0ac3816ea Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 04:35:52 +0000 Subject: [PATCH 07/14] wait for the last experience buffer to be received --- rollout.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/rollout.py b/rollout.py index d61974b5..4b487531 100644 --- a/rollout.py +++ b/rollout.py @@ -9,9 +9,6 @@ NUM_INFERENCE_ENGINES=1 MAX_ITERATIONS=2 -# Global iteration tracker -CURRENT_ITERATION = 0 - logging.basicConfig( # Example of format string # 2022-06-29 11:22:26,152: rank0[822018][MainThread]: INFO: composer.trainer.trainer: Using precision Precision.FP32 @@ -72,10 +69,16 @@ # TODO: start generating rollouts and put it in the experience buffer experience_buffer = torch.tensor([6]).to('cuda') - torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer, async_op=True) # don't block, send it off and continue generating rollouts - log.info(f"Rank {dist.get_global_rank()} Sent experience buffer {experience_buffer}") + experience_buffer_work = torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer, async_op=True) # don't block, send it off and continue generating rollouts + log.info(f"Sent experience buffer {experience_buffer}") log.info(f"Completed iteration {i + 1}/{MAX_ITERATIONS}") + if i == MAX_ITERATIONS - 1: + assert experience_buffer_work is not None + log.info(f"Waiting for the last experience buffer to be received") + experience_buffer_work.wait() + + From d443a1b817d6df107dfe3001a49af5fd66411732 Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 05:11:30 +0000 Subject: [PATCH 08/14] simple prototype is done and working on single node --- rollout.py | 6 +-- test_no_ray.py | 109 ++----------------------------------------------- train.py | 13 +++--- 3 files changed, 11 insertions(+), 117 deletions(-) diff --git a/rollout.py b/rollout.py index 4b487531..39bb9add 100644 --- a/rollout.py +++ b/rollout.py @@ -32,7 +32,7 @@ group_name="model_update_group", ) experience_buffer_group = init_process_group( - backend="nccl", + backend="gloo", init_method=f"tcp://localhost:{EXPERIENCE_BUFFER_PORT}", world_size=1 + NUM_INFERENCE_ENGINES, rank=rank, @@ -58,7 +58,7 @@ log.info(f"Weights are ready to update") log.info("Updating the model weights") # this is a blocking operation, we need to wait until the weights are updated before we can start generating rollouts - weights = torch.tensor([i]).to('cuda') + weights = torch.tensor([0]).to('cuda') torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights) log.info(f"Updating the weights to {weights}") # rest the update check @@ -68,7 +68,7 @@ # TODO: start generating rollouts and put it in the experience buffer - experience_buffer = torch.tensor([6]).to('cuda') + experience_buffer = torch.tensor([20+i]) experience_buffer_work = torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer, async_op=True) # don't block, send it off and continue generating rollouts log.info(f"Sent experience buffer {experience_buffer}") diff --git a/test_no_ray.py b/test_no_ray.py index 71abe524..2f2b0d37 100644 --- a/test_no_ray.py +++ b/test_no_ray.py @@ -1,114 +1,10 @@ -import logging -import os import subprocess -import tempfile import traceback -from typing import Any, Dict, Optional -from composer.cli.launcher import _launch_processes, _monitor_processes, _cleanup_processes, _aggregate_process_returncode - -def run_distributed_training( - nproc: int, - world_size: int, - base_rank: int, - node_rank: int, - master_addr: str, - master_port: int, - training_script: str, - training_script_args: Any=None, - module_mode: bool=False, - command_mode: bool=False, - stdout: Optional[str]=None, - stderr: Optional[str]=None, - verbose: bool = False, -) -> int: - """ - Run distributed training with the given parameters. - - Args: - nproc (int): Number of processes to launch. - world_size (int): Total number of processes across all nodes. - base_rank (int): Base rank of the current node. - node_rank (int): Rank of the current node. - master_addr (str): Address of the master node. - master_port (int): Port of the master node. - module_mode (bool): Whether to use module mode. - command_mode (bool): Whether to use command mode. - stdout (Optional[str]): Stdout file format. - stderr (Optional[str]): Stderr file format. - training_script (str): Training script to run. - training_script_args (Any): Arguments for the training script. - verbose (bool): Whether to use verbose logging. - - Returns: - int: Aggregated return code from all processes. - """ - if training_script_args is None: - training_script_args = [] - - MOSAICML_PLATFORM_ENV_VAR = "MOSAICML_PLATFORM" - MOSAICML_LOG_DIR_ENV_VAR = "MOSAICML_LOG_DIR" - MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR = "MOSAICML_GPU_LOG_FILE_PREFIX" - - logger = logging.getLogger("distributed_training") - logging.basicConfig() - logger.setLevel(logging.INFO if verbose else logging.WARNING) - - processes: Dict[Any, Any] = {} - - log_tmpdir = tempfile.TemporaryDirectory() - if stdout is None: - stdout = f'{log_tmpdir.name}/rank{{rank}}.stdout.txt' - if stderr is None: - stderr = f'{log_tmpdir.name}/rank{{rank}}.stderr.txt' - - # If running on the Mosaic platform, log all gpu ranks' stderr and stdout to Mosaic platform - if ( - os.environ.get(MOSAICML_PLATFORM_ENV_VAR, 'false').lower() == 'true' - and str(os.environ.get(MOSAICML_LOG_DIR_ENV_VAR, 'false')).lower() != 'false' - and os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR, 'false').lower() != 'false' - ): - logger.info('Logging all GPU ranks to Mosaic AI Training.') - log_file_format = ( - f"{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/" - f"{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{local_rank}}.txt" - ) - stdout = log_file_format - stderr = None - - try: - _launch_processes( - nproc=nproc, - world_size=world_size, - base_rank=base_rank, - node_rank=node_rank, - master_addr=master_addr, - master_port=master_port, - module_mode=module_mode, - command_mode=command_mode, - stdout_file_format=stdout, - stderr_file_format=stderr, - training_script=training_script, - training_script_args=training_script_args, - processes=processes, - ) - _monitor_processes(processes) - except Exception: - # Print the exception first, then kill the training processes, since killing - # may take up to CLEANUP_TIMEOUT seconds, and the user should know immediately - # what failed. No need to re-raise the exception, as `aggregate_process_returncode` - # will return an appropriate error code, which will cause the script to exit. - logger.error("Exception occurred during distributed training", exc_info=True) - traceback.print_exc() - print('Killing training processes') - finally: - _cleanup_processes(processes) - log_tmpdir.cleanup() - return _aggregate_process_returncode(processes) if __name__ == "__main__": # test on 4 gpus! - + # for multinode, we should determine which command to launch on which node try: p1 = subprocess.Popen('CUDA_VISIBLE_DEVICES=0,1 composer -n 2 train.py', shell=True) p2 = subprocess.Popen('CUDA_VISIBLE_DEVICES=2,3 python rollout.py', shell=True) @@ -119,5 +15,6 @@ def run_distributed_training( print(traceback.format_exc()) print('Killing training processes') finally: - _cleanup_processes({0: p1, 1: p2}) + p1.terminate() + p2.terminate() diff --git a/train.py b/train.py index d2062ece..fd309955 100644 --- a/train.py +++ b/train.py @@ -26,8 +26,6 @@ torch.distributed.init_process_group(backend="nccl") log.info(f"Hello from rank {dist.get_global_rank()}") - - model_update_group = None experience_buffer_group = None if dist.get_global_rank() == 0: @@ -40,14 +38,13 @@ group_name="model_update_group", ) experience_buffer_group = init_process_group( - backend="nccl", + backend="gloo", init_method=f"tcp://localhost:{EXPERIENCE_BUFFER_PORT}", world_size=1 + NUM_INFERENCE_ENGINES, rank=0, group_name="experience_buffer_group", ) - # TODO: broadcast the model weights to the inference engines for i in range(MAX_ITERATIONS): # Update global iteration tracker log.info(f"Starting iteration {i + 1}/{MAX_ITERATIONS}") @@ -55,15 +52,15 @@ if model_update_group is not None: is_ready_to_update = torch.tensor([1]).to('cuda') torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update) # BLOCKING, let the other process know that we're ready to update the model weights - log.info(f"Rank {dist.get_global_rank()} Broadcasted is_ready_to_update{is_ready_to_update}") + log.info(f"Rank {dist.get_global_rank()} Broadcasted is_ready_to_update {is_ready_to_update}") # Actually broadcast the model weights - weights = torch.tensor([5]).to('cuda') + weights = torch.tensor([10+i]).to('cuda') torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights) # broadcast all the model weights, BLOCKING - log.info(f"Rank {dist.get_global_rank()} Broadcasted model weights{weights}") # TODO: update the model weights + log.info(f"Rank {dist.get_global_rank()} Broadcasted model weights {weights}") # TODO: update the model weights # Get the experience buffer results from the rollout process - experience_buffer = torch.tensor([0]).to('cuda') + experience_buffer = torch.tensor([0]) if experience_buffer_group is not None: torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer) # block until the broadcast is complete, need to get the new experiences log.info(f"Rank {dist.get_global_rank()} Got experience buffer {experience_buffer}") From 8d77d8ff25c8c5f2fdd632d68c9f1d203bc2a442 Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 05:33:13 +0000 Subject: [PATCH 09/14] clean up comments --- rollout.py | 22 ++++++++++++---------- train.py | 22 +++++++++++++--------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/rollout.py b/rollout.py index 39bb9add..4f25297b 100644 --- a/rollout.py +++ b/rollout.py @@ -21,9 +21,8 @@ log.setLevel(logging.DEBUG) if __name__ == "__main__": - + # Initialize the process groups for communication between train and rollout agents rank = 1 # TODO: UPDATE TO SUPPORT MULTIPLE INFERENCE ENGINES - log.info("Initializing model update process group") # 1 model_update_group = init_process_group( backend="nccl", init_method=f"tcp://localhost:{MODEL_UPDATE_PORT}", @@ -39,35 +38,38 @@ group_name="experience_buffer_group", ) - # TODO: check to see if there's an update to the model weights, if there is update the weights - # to make it sync, we will wait until there is a weight update is_ready_to_update = torch.tensor([0]).to('cuda') is_ready_to_update_work = None for i in range(MAX_ITERATIONS): - log.info(f"[ITERATION {i + 1}/{MAX_ITERATIONS}] Starting iteration") + log.info(f"Starting iteration {i + 1}/{MAX_ITERATIONS}") if is_ready_to_update_work is None: - # if we haven't checked if there's an update to the model weights, run the check in the background + # Check to see if there's an update to the model weights available. is_ready_to_update_work = torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update, async_op=True) + + # We always need to update on the first iteration. if i == 0: - is_ready_to_update_work.wait() # We need to update the weights for the first iteration before we start generating rollouts + is_ready_to_update_work.wait() if is_ready_to_update.item() == 1: assert is_ready_to_update_work.is_completed() log.info(f"Weights are ready to update") - log.info("Updating the model weights") # this is a blocking operation, we need to wait until the weights are updated before we can start generating rollouts + # Update the model weights + log.info("Updating the model weights") weights = torch.tensor([0]).to('cuda') torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights) log.info(f"Updating the weights to {weights}") - # rest the update check + + # Reset the update check is_ready_to_update = torch.tensor([0]).to('cuda') is_ready_to_update_work = None - # TODO: start generating rollouts and put it in the experience buffer + # TODO: start generating rollouts for the experience buffer + # Send the experience buffer to the train agent. experience_buffer = torch.tensor([20+i]) experience_buffer_work = torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer, async_op=True) # don't block, send it off and continue generating rollouts log.info(f"Sent experience buffer {experience_buffer}") diff --git a/train.py b/train.py index fd309955..61811044 100644 --- a/train.py +++ b/train.py @@ -24,8 +24,8 @@ if __name__ == "__main__": torch.distributed.init_process_group(backend="nccl") - log.info(f"Hello from rank {dist.get_global_rank()}") + # Initialize the process groups for communication between train and rollout agents model_update_group = None experience_buffer_group = None if dist.get_global_rank() == 0: @@ -46,31 +46,35 @@ ) for i in range(MAX_ITERATIONS): - # Update global iteration tracker log.info(f"Starting iteration {i + 1}/{MAX_ITERATIONS}") if model_update_group is not None: + # Let the rollout agent know that we're ready to update the model weights is_ready_to_update = torch.tensor([1]).to('cuda') - torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update) # BLOCKING, let the other process know that we're ready to update the model weights + torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update) log.info(f"Rank {dist.get_global_rank()} Broadcasted is_ready_to_update {is_ready_to_update}") - # Actually broadcast the model weights + # Broadcast the model weights weights = torch.tensor([10+i]).to('cuda') - torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights) # broadcast all the model weights, BLOCKING - log.info(f"Rank {dist.get_global_rank()} Broadcasted model weights {weights}") # TODO: update the model weights + torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights) + log.info(f"Rank {dist.get_global_rank()} Broadcasted model weights {weights}") # Get the experience buffer results from the rollout process experience_buffer = torch.tensor([0]) if experience_buffer_group is not None: - torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer) # block until the broadcast is complete, need to get the new experiences + torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer) log.info(f"Rank {dist.get_global_rank()} Got experience buffer {experience_buffer}") # all training ranks should wait until we have the experience buffer results dist.barrier() - # distributed the experiences results to each of the training ranks + # TODO: distributed the experiences results to each of the training ranks + # TODO: train the model + + # simulate "long training!"" + import time + time.sleep(20) - # TODO: train the model TRAINING CODE HERE log.info(f"Completed iteration {i + 1}/{MAX_ITERATIONS}") From 8cdd5f877e6cd7c8025fce82a0eb0e4525f7ced0 Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 05:34:34 +0000 Subject: [PATCH 10/14] update comments --- rollout.py | 7 ++----- train.py | 6 +----- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/rollout.py b/rollout.py index 4f25297b..70003551 100644 --- a/rollout.py +++ b/rollout.py @@ -10,10 +10,6 @@ MAX_ITERATIONS=2 logging.basicConfig( - # Example of format string - # 2022-06-29 11:22:26,152: rank0[822018][MainThread]: INFO: composer.trainer.trainer: Using precision Precision.FP32 - # Including the PID and thread name to help with debugging dataloader workers and callbacks that spawn background - # threads / processes format= f'[ROLLOUT]%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) @@ -70,8 +66,9 @@ # TODO: start generating rollouts for the experience buffer # Send the experience buffer to the train agent. + # We do not block here. We can continue generating rollouts while the experience buffer is being sent. experience_buffer = torch.tensor([20+i]) - experience_buffer_work = torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer, async_op=True) # don't block, send it off and continue generating rollouts + experience_buffer_work = torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer, async_op=True) log.info(f"Sent experience buffer {experience_buffer}") log.info(f"Completed iteration {i + 1}/{MAX_ITERATIONS}") diff --git a/train.py b/train.py index 61811044..3aa44866 100644 --- a/train.py +++ b/train.py @@ -12,10 +12,6 @@ logging.basicConfig( - # Example of format string - # 2022-06-29 11:22:26,152: rank0[822018][MainThread]: INFO: composer.trainer.trainer: Using precision Precision.FP32 - # Including the PID and thread name to help with debugging dataloader workers and callbacks that spawn background - # threads / processes format= f'[TRAIN]%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) @@ -71,7 +67,7 @@ # TODO: distributed the experiences results to each of the training ranks # TODO: train the model - # simulate "long training!"" + # simulate "long" training! import time time.sleep(20) From b407f00bf065d2bb5c1857f65848c55cdaaac5fe Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 05:46:50 +0000 Subject: [PATCH 11/14] Add MAX_ASYNC_STEPS --- rollout.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/rollout.py b/rollout.py index 70003551..becc7eaa 100644 --- a/rollout.py +++ b/rollout.py @@ -9,6 +9,9 @@ NUM_INFERENCE_ENGINES=1 MAX_ITERATIONS=2 +# How off-policy we can be. Set it to 0 to be fully synchronous and on-policy. +MAX_ASYNC_STEP=0 + logging.basicConfig( format= f'[ROLLOUT]%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', @@ -36,6 +39,8 @@ is_ready_to_update = torch.tensor([0]).to('cuda') is_ready_to_update_work = None + last_update_iteration = 0 + for i in range(MAX_ITERATIONS): log.info(f"Starting iteration {i + 1}/{MAX_ITERATIONS}") @@ -45,7 +50,7 @@ is_ready_to_update_work = torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update, async_op=True) # We always need to update on the first iteration. - if i == 0: + if i == 0 or i - last_update_iteration > MAX_ASYNC_STEP: is_ready_to_update_work.wait() if is_ready_to_update.item() == 1: @@ -61,7 +66,7 @@ # Reset the update check is_ready_to_update = torch.tensor([0]).to('cuda') is_ready_to_update_work = None - + last_update_iteration = i # TODO: start generating rollouts for the experience buffer From 2b17f081db81f6fc313539d84672c0223780eb76 Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 05:53:25 +0000 Subject: [PATCH 12/14] add comments --- test_no_ray.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test_no_ray.py b/test_no_ray.py index 2f2b0d37..08f7e1c0 100644 --- a/test_no_ray.py +++ b/test_no_ray.py @@ -6,7 +6,10 @@ # test on 4 gpus! # for multinode, we should determine which command to launch on which node try: + # Launch the train agent with multiple processes for distributed training p1 = subprocess.Popen('CUDA_VISIBLE_DEVICES=0,1 composer -n 2 train.py', shell=True) + + # Launch the rollout agent with a single process for vllm p2 = subprocess.Popen('CUDA_VISIBLE_DEVICES=2,3 python rollout.py', shell=True) p1.wait() p2.wait() From 2fa1b53229fea56594652d2c3a2dfda04148d71f Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 06:22:36 +0000 Subject: [PATCH 13/14] update --- rollout.py | 7 ++++--- test_no_ray.py | 2 +- train.py | 11 ++++++----- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/rollout.py b/rollout.py index becc7eaa..471fb6ab 100644 --- a/rollout.py +++ b/rollout.py @@ -9,12 +9,12 @@ NUM_INFERENCE_ENGINES=1 MAX_ITERATIONS=2 -# How off-policy we can be. Set it to 0 to be fully synchronous and on-policy. -MAX_ASYNC_STEP=0 +# Set it to 0 to be fully synchronous and on-policy. +MAX_ASYNC_STEP=1 logging.basicConfig( format= - f'[ROLLOUT]%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', + f'[ROLLOUT] %(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) @@ -69,6 +69,7 @@ last_update_iteration = i # TODO: start generating rollouts for the experience buffer + log.info("Generating rollouts!") # Send the experience buffer to the train agent. # We do not block here. We can continue generating rollouts while the experience buffer is being sent. diff --git a/test_no_ray.py b/test_no_ray.py index 08f7e1c0..09236e97 100644 --- a/test_no_ray.py +++ b/test_no_ray.py @@ -8,7 +8,7 @@ try: # Launch the train agent with multiple processes for distributed training p1 = subprocess.Popen('CUDA_VISIBLE_DEVICES=0,1 composer -n 2 train.py', shell=True) - + # Launch the rollout agent with a single process for vllm p2 = subprocess.Popen('CUDA_VISIBLE_DEVICES=2,3 python rollout.py', shell=True) p1.wait() diff --git a/train.py b/train.py index 3aa44866..7c179ffd 100644 --- a/train.py +++ b/train.py @@ -13,7 +13,7 @@ logging.basicConfig( format= - f'[TRAIN]%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', + f'[TRAIN] %(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) @@ -48,18 +48,18 @@ # Let the rollout agent know that we're ready to update the model weights is_ready_to_update = torch.tensor([1]).to('cuda') torch.distributed.broadcast(group=model_update_group, src=0,tensor=is_ready_to_update) - log.info(f"Rank {dist.get_global_rank()} Broadcasted is_ready_to_update {is_ready_to_update}") + log.info(f"Broadcasted is_ready_to_update {is_ready_to_update}") # Broadcast the model weights weights = torch.tensor([10+i]).to('cuda') torch.distributed.broadcast(group=model_update_group, src=0,tensor=weights) - log.info(f"Rank {dist.get_global_rank()} Broadcasted model weights {weights}") + log.info(f"Broadcasted model weights {weights}") # Get the experience buffer results from the rollout process experience_buffer = torch.tensor([0]) if experience_buffer_group is not None: torch.distributed.broadcast(group=experience_buffer_group, src=1,tensor=experience_buffer) - log.info(f"Rank {dist.get_global_rank()} Got experience buffer {experience_buffer}") + log.info(f"Got experience buffer {experience_buffer}") # all training ranks should wait until we have the experience buffer results dist.barrier() @@ -68,8 +68,9 @@ # TODO: train the model # simulate "long" training! + log.info("Training!") import time - time.sleep(20) + time.sleep(5) log.info(f"Completed iteration {i + 1}/{MAX_ITERATIONS}") From bfc387729624feeb35d369911a88de3194916209 Mon Sep 17 00:00:00 2001 From: Irene Dea Date: Fri, 8 Aug 2025 18:57:42 +0000 Subject: [PATCH 14/14] add comment --- train.py | 1 + 1 file changed, 1 insertion(+) diff --git a/train.py b/train.py index 7c179ffd..835ce2e1 100644 --- a/train.py +++ b/train.py @@ -44,6 +44,7 @@ for i in range(MAX_ITERATIONS): log.info(f"Starting iteration {i + 1}/{MAX_ITERATIONS}") + # TODO: We shouldn't broadcast if the rollouts are done! if model_update_group is not None: # Let the rollout agent know that we're ready to update the model weights is_ready_to_update = torch.tensor([1]).to('cuda')