Skip to content

Add GPU & Distributed Training Support#22

Open
JoeHolt wants to merge 9 commits intomasterfrom
exp-pr
Open

Add GPU & Distributed Training Support#22
JoeHolt wants to merge 9 commits intomasterfrom
exp-pr

Conversation

@JoeHolt
Copy link
Collaborator

@JoeHolt JoeHolt commented Apr 5, 2021

No description provided.

@JoeHolt JoeHolt changed the title Copied over to new PR Add GPU & Distributed Training Support Apr 5, 2021
@JoeHolt
Copy link
Collaborator Author

JoeHolt commented Apr 5, 2021

I believe the only relevant file from my other PR was the _dist.py, so I moved it over here into this cleaner PR

@JoeHolt
Copy link
Collaborator Author

JoeHolt commented Apr 5, 2021

When we last met, you asked that I update the API of DaskClassifier to look like this:

DaskClassifier(..., batch_size="geodamp", batch_size__latency=60, batch_size__factor=5)

I am have a few questions:

  • How should I implement the latency? Should I start tracking the current epoch internally within the class? Ie every time run_single_epoch is called, increment internal epoch count by 1.
  • How can I implement the specific dampening algorithm? Ie what the easiest way to make _dist.py use GeoDamp vs PadaDamp?

@stsievert
Copy link
Owner

relevant file from my other PR

Does tests/test_dist_damping.py need to be moved over from #15 too?

How should I implement the latency delay?

Something like this:

def initialize(self, X_train):
    self._meta = {..., "num_examples": 0}
def _partial_fit(self, X, y):
    self._meta["num_examples"] += len(X)
    _epochs = self._meta["num_examples"] / len(X_train)
    ...

Ie what the easiest way to make _dist.py use GeoDamp vs PadaDamp?

I think the easiest way is:

class GeoDamp:
    def __init__(self, delay=60, factor=5, initial=128):
        self.delay = delay
        self.factor = factor
        self.initial = initial

    def damp(self, meta: Dict[str, Any]) -> int:
        # meta is DaskBaseDamper._meta
        epochs = meta["epochs"]
        return self.initial * (self.factor ** (epochs // self.delay))

This will require some parsing of the arguments. DaskClassifier(batch_size="geodamp", batch_size__delay=70) should turn into DaskClassifier(batch_size=GeoDamp(delay=70)).

@JoeHolt
Copy link
Collaborator Author

JoeHolt commented Apr 14, 2021

I implemented the new API based on your description but had some questions:

  • It seems like it makes more sense to set the batch size arguments during a call to partial_fit vs fit. Does this make sense?
  • How can I adapt the classes in damping.py to work with dist.py? those classes seem like they need a lot more init arguments than the simple example we used.
  • Do I need to then update the test files?

Copy link
Owner

@stsievert stsievert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put the dampers (BaseDamper, Damper, ...) in a new file, _dampers.py.

The current tests should pass. I'd make a new test to ensure that GeoDamp works as expected. I think test_damping.py has some code that will be useful.

device: str = "cpu",
grads_per_worker: int=128,
max_epochs: int=20,
grads_per_worker=32,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
grads_per_worker=32,
grads_per_worker: int = 128,

adadamp/_dist.py Outdated
self.worker_max_batch_size = worker_max_batch_size
self.min_workers = min_workers
self.max_workers = max_workers
self.n_workers_ = min_workers
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be set in initialize.

The Scikit-learn API says that parameters ending in underscores should only be set when fit is called (https://scikit-learn.org/stable/glossary.html#term-attributes)

adadamp/_dist.py Outdated
args = (X, y) if y is not None else (X,)
return TensorDataset(*args)

def get_batch_size(self, batch_size, kwargs: Dict[str, Any]) -> BaseDamper:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_batch_size(self, batch_size, kwargs: Dict[str, Any]) -> BaseDamper:
def _get_damper(self, batch_size, kwargs: Dict[str, Any]) -> BaseDamper:

adadamp/_dist.py Outdated
self.batch_size_ = self.batch_size

if not isinstance(self.batch_size_, BaseDamper):
raise ValueError("BatchSize not subclass of BaseDamper")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This chunk of code should go in initialize.

adadamp/_dist.py Outdated
return self

def _run_single_epoch(self, X, y=None, **fit_params):
def run_single_epoch(self, X, y=None, **fit_params):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def run_single_epoch(self, X, y=None, **fit_params):
def _run_single_epoch(self, X, y=None, **fit_params):

)
from .dampers import (
SimpleBaseDamper,
SimpleGeoDamp
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see – there's already BaseDamper and GeoDamp. It might be okay to remove this from __init__.py. Users can always do from adadamp.dampers import GeoDamp.

adadamp/_dist.py Outdated
from torch.utils.data import Dataset, IterableDataset, TensorDataset
from torch.nn.modules.loss import _Loss as Loss
from torch.utils.data import Dataset, IterableDataset, TensorDataset, DataLoader
from adadamp.adadamp.dampers import SimpleBaseDamper, SimpleGeoDamp
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from adadamp.adadamp.dampers import SimpleBaseDamper, SimpleGeoDamp
from .dampers import SimpleBaseDamper, SimpleGeoDamp

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It helps to run pip install -e . in the root of this repo.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pip install -e /path/to/adadamp installs the AdaDamp package. It looks like from adadamp.adadamp.dampers import ... is not using the adadamp package; instead, it looks like it's using a relative path.

It might cleaner to do from adadamp.dampers import SimpleBaseDamper, SimpleGeoDamp. That's what from .dampers import is doing (but is relative, not absolute). Here's more detail: https://realpython.com/absolute-vs-relative-python-imports/

adadamp/_dist.py Outdated
}
self._initialized = True

if isinstance(self.batch_size, str):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change:

+ if not isinstance(self.batch_size, (str, int, np.integer, SimpleBaseDamper)):
+     raise ValueError("self.batch_size needs to be ...")
...
+ elif isinstance(self.batch_size, (int, np.integer)):
...
- if not isinstance(self.batch_size_, SimpleBaseDamper):
-     raise ValueError("self.batch_size is not ...")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants