Conversation
c910a2a to
ee2d73a
Compare
|
|
||
|
|
||
| # initialization routine | ||
| def init(model_parallel_sizes=[1, 1, 1, 1], model_parallel_names=["h", "w", "fin", "fout"], data_parallel_sizes=[1, -1], data_parallel_names=["ensemble", "batch"], verbose=False): |
There was a problem hiding this comment.
Will we ever configure model_parallel_sizes and data_parallel_sizes to something else?
There was a problem hiding this comment.
Yes, here we will configure comm with values other than 1:
There was a problem hiding this comment.
In particular, "h" and "w" parallelism (the main focus of this code) are done using model_parallel_sizes.
7bbb0a9 to
a58046c
Compare
There was a problem hiding this comment.
One overarching comment about naming: I think it is worthwhile considering naming this something other than "Spatial" because while we are only doing the H and W decomposition here, we may do other types of decomposition in the future. I think a good alternative is simply "Makani" and I would prefix "makani_" in front of all files copied from the nvidia/makani repo, e.g., makani_comm.py for https://github.com/NVIDIA/makani/blob/main/makani/utils/comm.py. And I would also specify atop the file the specific link and commit it came from, e.g., https://github.com/NVIDIA/makani/blob/dbcf2c1dc82cdbc544c81193eecd8ac4a6be337c/makani/utils/comm.py.
See how the team did this for sht_fix.py: https://github.com/ai2cm/ace/blob/main/fme/sht_fix.py
Yes, we can adapt this naming convention. @elynnwu, @mcgibbon, @oliverwm1, do you have any recommendations on how to name this class and the Makani files? I like @mahf708’s idea of prefixing the filenames with “makani” and using “MakaniTorchDistributed” as the class name. |
| torch.backends.cudnn.benchmark = True | ||
| torch.backends.cuda.matmul.allow_tf32 = True | ||
| torch.backends.cudnn.allow_tf32 = True |
There was a problem hiding this comment.
Potential Issue: these torch backend flags don't appear to be related to spatial parallelism.
| def is_available(cls) -> bool: | ||
| """Check if torch distributed is available.""" | ||
| h_parallel_size = int(os.environ.get("H_PARALLEL_SIZE", 1)) | ||
| w_parallel_size = int(os.environ.get("W_PARALLEL_SIZE", 1)) | ||
| spatial_parallelism=False | ||
| if (h_parallel_size>1) or (w_parallel_size >1): | ||
| spatial_parallelism=True | ||
| return torch.distributed.is_available() and spatial_parallelism |
There was a problem hiding this comment.
Suggestion: Split off self._spatial_parallelism_enabled() logic into a separate helper function to make this function a bit clearer. Specifically, torch.distributed.is_available() is a very high level operation, while the spatial parallelism check here is very low-level. Splitting it to a helper keeps everything in is_available at the same level of abstraction.
|
|
||
| def reduce_mean(self, tensor: torch.Tensor) -> torch.Tensor | None: | ||
| torch.distributed.all_reduce(tensor) | ||
| return tensor / self.total_ranks |
There was a problem hiding this comment.
Is this correct? I would expect you need to handle spatial parallelism in these functions. Specifically, I expect you need to pass a group argument to all_reduce which tells it which spot on the globe it's in, so each process only reduces the mean in its patch. As written, I expect every rank will get the mean across all "tiles" and the spatial areas will be mixed into one.
This is done for example in physicsnemo where it uses group=DistributedManager().group(process_group) (not sure what process_group is) to set a group arg for the reduction function override for param.grad (link)
Unfortunately I can't really find any other uses of distributed reduction in the physicsnemo code, not sure if they have inline spatial-map metrics the way we have.
There was a problem hiding this comment.
I did not modify this method in the original branch. I added a couple of unit tests and the implementation works at least for the training part. I believe the only reductions involved are those that compute the loss and the metrics. This implementation should be fine in cases where spatial dimensions are not involved. If this approach is acceptable, I would like to address the remaining issue in a follow-up PR.
|
I would discourage using "makani" as the name since the code is primarily from physicsnemo (not makani). Maybe we can use "Model", which is the terminology physicsnemo uses, instead of "Spatial". |
The file comm.py was a direct copy from Makani. It uses routines from PhysicsNemo. @mahf708, what do you think about using 'Model' instead of 'Makani' for the class name ( |
|
Yes, any name is good for me as long as it's clear. I defer to Jeremy and Elynn, so anything they decide is good for me |
d24b1c2 to
2309d5e
Compare
| return lat, lon, nlat, nlon, batch_size, input_tensor | ||
|
|
||
|
|
||
| @pytest.mark.skipif(torch.cuda.device_count() < 4, reason="requires multi-GPU machine") |
There was a problem hiding this comment.
Can you relax this criteria to 2 GPUs? We can change the test to use 1x2 instead. I believe the GPU tests will run once you do that.
|
Thanks for all your help reviewing this PR. I believe I’ve addressed most of your comments, except for the remark about A couple of other points:
Finally, I added the new unit tests under |
| def local_batch_size(self, batch_size: int) -> int: | ||
| return batch_size // comm.get_size("data") | ||
|
|
||
| def reduce_mean(self, tensor: torch.Tensor) -> torch.Tensor | None: |
There was a problem hiding this comment.
This code is currently incorrect. Take for example a basic case where n_lat=1, n_lon=2, and W_PARALLEL_SIZE=2, running on 4 ranks. Let's say you have a batch_size=2 array with shape [n_batch, n_lat, n_lon], where the first batch member has data [[1, -1]], and the second batch member has data [[2, -2]].
If we were not using spatial parallelism, we would have two ranks with the above data, the first rank having [[1, -1]] and the second rank having [[2, -2]], and the mean across ranks would be [[1.5, -1.5]]. This is the result we want to get regardless of whether or not we're using spatial parallelism.
If you're using spatial parallelism with 4 ranks, the ranks each have data [[1]], [[-1]], [[2]], and [[-2]]. The current code will reduce this to [[0]] on all ranks, but what we actually want is that the ranks in the "west" group reduce to [[1.5]], and that the ranks in the "east" group reduce to [[-1.5]], so that if we stitched it back to the global data we'd get [[1.5, -1.5]]. The current result of [[0, 0]] is a bug.
To fix it, we have to pass the reduction operation a keyword argument telling it what "group" this process is in. There is a helper method in physicsnemo for this, that I referenced earlier.
There was a problem hiding this comment.
I think it's very important to fix this before merging those methods, so that we aren't confused by incorrect answers on main. If we just aren't using the methods yet and want to implement them later, we should have the methods raise NotImplementedError() instead of giving the answers they currently do.
There was a problem hiding this comment.
@mcgibbon, I reviewed the Makani implementation and found a couple of places where the “group” parameter was passed to the reduce operation. I ended up using “data” as the group in the torch.distributed.all_reduce routine. I also added a couple of unit tests for this method:
Test 1 uses your simple example.
Test 2 uses random values for the input tensor.
| def get_local_rank(self) -> int: | ||
| return self._device_id | ||
|
|
||
| def get_local_slices(self, crop_shape): |
There was a problem hiding this comment.
Suggestion: the code here is pretty complex and this function will be widely used - add a unit test for get_local_slices. You may need to move the logic to some kind of helper function and test just the helper, since this object is difficult to construct in a test.
There was a problem hiding this comment.
Question: What is the meaning of "crop" shape? I thought this takes the full domain shape as its input argument.
There was a problem hiding this comment.
The unit test for get_local_slices is located at here.
In this test, a tensor is created and then scattered across processes; its local slices are gathered into a new tensor. The test passes if the original tensor and the gathered tensor are equal.
I renamed crop_shape to tensor_shape.
| torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.MAX) | ||
| return tensor | ||
|
|
||
| def gather(self, tensor: torch.Tensor) -> list[torch.Tensor] | None: |
There was a problem hiding this comment.
Suggestion: Raise NotImplementedError() instead or add a unit test for this and the other spatially-involved distributed methods (like gather_irregular).
| return self.world_size | ||
|
|
||
| def get_local_rank(self) -> int: | ||
| return self._device_id |
There was a problem hiding this comment.
Suggestion (optional): rename self._device_id to self._local_rank, since it is a little less confusing to see/read "my device id is my local rank" than reading "my local rank is my device id". Up to you, if you think the current way is clearer.
There was a problem hiding this comment.
I did not see _local_rank defined in the base class. Should I add it?
…ch Distributed class, but in the __init__ method, the comm object from Makani is initialized. Most of the changes required for this class have not been added yet. In addition, the comm file is also included. Finally, nvidia-physicsnemo is added to the requirements.txt file.
Co-authored-by: Jeremy McGibbon <jeremym@allenai.org>
Co-authored-by: Jeremy McGibbon <jeremym@allenai.org>
… my large PR. Only run this test if the number of GPUs is greater than four. First, invoke the single version in loss test.
…arios where spatial parallelism is not involved.
…the RuntimeError “Boolean value of Tensor with more than one value is ambiguous.” Therefore, we need to compare tensors using torch.testing.assert_close.
…est. Since both tests now run simultaneously, saving data to the local directory is no longer necessary.
…ributed directory.
456a247 to
8c1f7d6
Compare
| if ModelTorchDistributed.is_available() and not force_non_distributed: | ||
| self._distributed: DistributedBackend = ModelTorchDistributed() | ||
| elif TorchDistributed.is_available() and not force_non_distributed: | ||
| self._distributed = TorchDistributed() |
There was a problem hiding this comment.
do you wanna perserve the type like before, i.e.
| self._distributed = TorchDistributed() | |
| self._distributed: DistributedBackend = TorchDistributed() |
There was a problem hiding this comment.
I tried to add the type DistributedBackend to this line. However, when running pre-commit I get:
fme/core/distributed/distributed.py:34: error: Attribute "_distributed" already defined on line 32 [no-redef]
Found 1 error in 1 file (checked 1 source file)
I ran a single test, and it ran fine.
For this reason, I removed the type.
@mcgibbon, any suggestions?
There was a problem hiding this comment.
ah interesting linting error. @odiazib feel free to disregard this comment and all my other comments; they were very minor
| torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.MIN) | ||
| return tensor | ||
|
|
||
| def reduce_max(self, tensor: torch.Tensor) -> torch.Tensor | None: |
There was a problem hiding this comment.
not important, but for all reductions, you could potentially do a generic all-purpose reduce op that can reroute to the correct one or potentially do an einops-style one, e.g., the signature
def reduce(tensor, op):
# if op ...
# ...
return tensorThere was a problem hiding this comment.
@mcgibbon, any suggestions on this? I just copied and pasted from the torch.distributed class.
There was a problem hiding this comment.
Oh, if that follows a pattern already established, then please disregard my comment (sorry I should've checked)
| h_parallel_size = int(os.environ.get("H_PARALLEL_SIZE", 1)) | ||
| w_parallel_size = int(os.environ.get("W_PARALLEL_SIZE", 1)) |
There was a problem hiding this comment.
I am tiny bit uncomfortable with changing runtime behavior with env variables, but maybe that's how it's done? I defer to Jeremy and Elynn --- I've seen stuff like this in the og distributed class, so maybe that's the preferred way here
There was a problem hiding this comment.
Using environment variables was an easy way to get things started with spatial parallelism. At this point, however, I believe we can consider an alternative.
There was a problem hiding this comment.
it's ok to keep them (if that's the easiest thing)
| # Set H_PARALLEL_SIZE back to 1. | ||
| os.environ["H_PARALLEL_SIZE"] = "1" |
There was a problem hiding this comment.
why I feel a little uneasy about changing runtime behavior with env variable is needing this type of clean-up 👀
Co-authored-by: Naser Mahfouz <naser.mahfouz@pnnl.gov>
Co-authored-by: Naser Mahfouz <naser.mahfouz@pnnl.gov>
Co-authored-by: Naser Mahfouz <naser.mahfouz@pnnl.gov>
…distributed.py Co-authored-by: Naser Mahfouz <naser.mahfouz@pnnl.gov>
| def get_local_slices(self, tensor_shape): | ||
| return tuple(slice(None, None) for _ in tensor_shape) | ||
|
|
||
| def reduce_mean(self, tensor: torch.Tensor) -> torch.Tensor: |
There was a problem hiding this comment.
Possibly a bug from previous changes, missing group=None here - looks like all reduce_mean calls have group as input
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
| from model_torch_distributed import ModelTorchDistributed |
There was a problem hiding this comment.
Changed to from fme.core.distributed.model_torch_distributed.model_torch_distributed import ModelTorchDistributed so that this import always works and to be consistent with your other test imports.
| @@ -0,0 +1,86 @@ | |||
| import logging | |||
There was a problem hiding this comment.
Can you remove test_reduce_mean.py? Why are there two versions of this?
add ModelTorchDistributed with tests Changes: - fme.core.distributed has a new ModelDistributedBackend that allows for parallelism over spatial dimensions as well as batch/data. - torch is pinned with a minimum of 2.4.0 to use new facilities for distributed, etc. - [x] Tests added - [ ] If dependencies changed, "deps only" image rebuilt and "latest_deps_only_image.txt" file updated Closes #749 Closes #842
add ModelTorchDistributed with tests Changes: - fme.core.distributed has a new ModelDistributedBackend that allows for parallelism over spatial dimensions as well as batch/data. - torch is pinned with a minimum of 2.4.0 to use new facilities for distributed, etc. - [x] Tests added - [ ] If dependencies changed, "deps only" image rebuilt and "latest_deps_only_image.txt" file updated Closes ai2cm#749 Closes ai2cm#842
The SpatialTorchDistributed class is copied from the TorchDistributed class. I have not added all the methods and modifications needed for spatial parallelism. So far, I have done the following
Changes:
symbol (e.g.
fme.core.my_function) or script and concise description of changes or added featureCan group multiple related symbols on a single bullet
Tests added
If dependencies changed, "deps only" image rebuilt and "latest_deps_only_image.txt" file updated
Resolves # (delete if none)