Skip to content

Commit 15ebb8d

Browse files
author
David Buchaca
committed
refactor: group multiprocessing joblig examples into joblib
1 parent 32fef85 commit 15ebb8d

14 files changed

Lines changed: 647 additions & 0 deletions
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Collection of parallelizable examples
2+
3+
4+
## Feature counts
5+
6+
Example building a dict that contains word counts. This example showcases three different approaches found in
7+
8+
```
9+
feature_counts_dict_serial.py
10+
feature_counts_dict_joblib_parallel_and_reduce.py
11+
feature_counts_dict_joblib_parallel_and_reduce_in_chunks.py
12+
```
13+
14+
15+
#### Serial version (bottlenecked by the completly serial operation)
16+
```
17+
python feature_counts_dict_serial.py
18+
```
19+
20+
```
21+
num docs = 1131400
22+
23+
time overall 117.3199 sec
24+
25+
len(vocabulary.items())---> 130107
26+
(vocabulary['from'], vocabulary['gift'])---> (2267000, 6600)
27+
```
28+
29+
30+
#### Parallel version working one element at a time (bottlenecked by the reduce step)
31+
32+
```
33+
python feature_counts_dict_joblib_parallel_and_reduce.py
34+
```
35+
36+
```
37+
num docs = 1131400
38+
39+
time build vocabularies 42.0592 sec
40+
time aggregate vocabularies 35.4708 sec
41+
time overall 77.5302 sec
42+
43+
len(partial_vocabularies)---> 1131400
44+
len(vocabulary.items())---> 130107
45+
(vocabulary['from'], vocabulary['gift'])---> (2267000, 6600)
46+
```
47+
48+
49+
#### Parallel version working in minibatches
50+
51+
Note this implementation is not bottlenecked in the parallel part, it has an irrelevant bottlenecked in the reduce step.
52+
53+
```
54+
python feature_counts_dict_joblib_parallel_and_reduce_in_chunks.py
55+
```
56+
57+
```
58+
num docs = 1131400
59+
60+
time build vocabularies 26.7191 sec
61+
time aggregate vocabularies 0.236 sec
62+
time overall 26.9552 sec
63+
64+
len(partial_vocabularies)---> 12
65+
len(vocabulary.items())---> 130107
66+
(vocabulary['from'], vocabulary['gift'])---> (2267000, 6600)
67+
```
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import string
2+
import random
3+
import time
4+
from collections import Counter
5+
from functools import partial, reduce
6+
from collections import defaultdict
7+
from itertools import repeat
8+
9+
import sklearn
10+
from sklearn import feature_extraction, datasets
11+
from joblib import Parallel, delayed
12+
13+
from utils import timer, load_data
14+
15+
def aggregate_dicts(dicts):
16+
17+
if len(dicts) == 1:
18+
return dicts
19+
20+
else:
21+
result = dicts[0]
22+
23+
for d in dicts[1:]:
24+
for k,v in d.items():
25+
result[k] +=v
26+
27+
return result
28+
29+
30+
def build_vocabulary(sentence, doc_analyzer):
31+
vocabulary = defaultdict(int)
32+
words = doc_analyzer(sentence)
33+
34+
for word in words:
35+
vocabulary[word] += 1
36+
return vocabulary
37+
38+
39+
if __name__ == '__main__':
40+
41+
n_jobs = 10
42+
43+
factor_multiplier = 100 # This factor ensures 1 million documents in the dataset
44+
sentences, _, _, _ = load_data()
45+
sentences = sentences * factor_multiplier
46+
print(f'num docs = {len(sentences)}\n')
47+
48+
count_vectorizer = feature_extraction.text.CountVectorizer()
49+
doc_analyzer = count_vectorizer.build_analyzer()
50+
51+
with timer('overall', indentation=''):
52+
with timer('build vocabularies', indentation=''):
53+
p_build_vocabulary = partial(build_vocabulary, doc_analyzer=doc_analyzer)
54+
partial_vocabularies = Parallel(n_jobs=n_jobs)(delayed(p_build_vocabulary)(s) for s in sentences)
55+
56+
with timer('aggregate vocabularies', indentation=''):
57+
vocabulary = aggregate_dicts(partial_vocabularies)
58+
59+
print('\nlen(partial_vocabularies)--->', len(partial_vocabularies))
60+
print('len(vocabulary.items())--->', len(vocabulary.items()))
61+
print("(vocabulary['from'], vocabulary['gift'])--->", (vocabulary['from'], vocabulary['gift']))
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import string
2+
import random
3+
import time
4+
from collections import Counter
5+
from functools import partial, reduce
6+
7+
from collections import defaultdict
8+
from itertools import repeat
9+
10+
import sklearn
11+
from sklearn import feature_extraction, datasets
12+
from joblib import Parallel, delayed
13+
14+
from utils import timer, load_data
15+
16+
def aggregate_dicts(dicts):
17+
18+
if len(dicts) == 1:
19+
return dicts
20+
21+
else:
22+
result = dicts[0]
23+
24+
for d in dicts[1:]:
25+
for k,v in d.items():
26+
result[k] +=v
27+
28+
return result
29+
30+
def build_vocabulary(sentences, doc_analyzer):
31+
vocabulary = defaultdict(int)
32+
for sentence in sentences:
33+
words = doc_analyzer(sentence)
34+
for word in words:
35+
vocabulary[word] += 1
36+
37+
return vocabulary
38+
39+
def get_batches(s, n, truncate=False):
40+
assert n > 0
41+
while len(s) >= n:
42+
yield s[:n]
43+
s = s[n:]
44+
if len(s) and not truncate:
45+
yield s
46+
47+
48+
if __name__ == '__main__':
49+
50+
n_jobs = 10
51+
chunk_size = 100_000
52+
factor_multiplier = 100 # This factor ensures 1 million documents in the dataset
53+
54+
sentences, _, _, _ = load_data()
55+
sentences = sentences * factor_multiplier
56+
57+
print(f'num docs = {len(sentences)}\n')
58+
59+
count_vectorizer = feature_extraction.text.CountVectorizer()
60+
doc_analyzer = count_vectorizer.build_analyzer()
61+
62+
with timer('overall', indentation=''):
63+
with timer('build vocabularies', indentation=''):
64+
p_build_vocabulary = partial(build_vocabulary, doc_analyzer=doc_analyzer)
65+
partial_vocabularies = Parallel(n_jobs=n_jobs)(delayed(p_build_vocabulary)(s) for s in get_batches(sentences, chunk_size))
66+
67+
with timer('aggregate vocabularies', indentation=''):
68+
vocabulary = aggregate_dicts(partial_vocabularies)
69+
70+
print('\nlen(partial_vocabularies)--->', len(partial_vocabularies))
71+
print('len(vocabulary.items())--->', len(vocabulary.items()))
72+
print("(vocabulary['from'], vocabulary['gift'])--->", (vocabulary['from'], vocabulary['gift']))
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import string
2+
import random
3+
4+
from collections import defaultdict
5+
from itertools import repeat
6+
7+
import sklearn
8+
from sklearn import feature_extraction, datasets
9+
10+
from utils import timer, load_data
11+
12+
def update_vocabulary(sentence, vocabulary, doc_analyzer):
13+
words = doc_analyzer(sentence)
14+
for word in words:
15+
vocabulary[word] +=1
16+
17+
if __name__ == '__main__':
18+
19+
factor_multiplier = 100 # This factor ensures 1 million documents in the dataset
20+
sentences, _, _, _ = load_data()
21+
sentences = sentences * factor_multiplier
22+
print(f'num docs = {len(sentences)}\n')
23+
24+
count_vectorizer = feature_extraction.text.CountVectorizer()
25+
doc_analyzer = count_vectorizer.build_analyzer()
26+
vocabulary = defaultdict(int)
27+
28+
with timer('overall', indentation=''):
29+
for s in sentences:
30+
update_vocabulary(s, vocabulary, doc_analyzer)
31+
32+
print('\nlen(vocabulary.items())--->', len(vocabulary.items()))
33+
print("(vocabulary['from'], vocabulary['gift'])--->", (vocabulary['from'], vocabulary['gift']))
34+
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import string
2+
import random
3+
4+
from multiprocessing import Pool
5+
from collections import defaultdict
6+
from itertools import repeat
7+
8+
import sklearn
9+
from sklearn import feature_extraction, datasets
10+
import time
11+
from functools import partial, reduce
12+
from collections import Counter
13+
14+
15+
def aggregate_dicts(dicts):
16+
17+
if len(dicts) == 1:
18+
return dicts
19+
20+
else:
21+
result = dicts[0]
22+
23+
for d in dicts[1:]:
24+
for k,v in d.items():
25+
result[k] +=v
26+
27+
return result
28+
29+
def build_vocabulary(sentence, doc_analyzer):
30+
vocabulary = defaultdict(int)
31+
words = doc_analyzer(sentence)
32+
33+
for word in words:
34+
vocabulary[word] += 1
35+
return vocabulary
36+
37+
def load_data():
38+
39+
X = sklearn.datasets.fetch_20newsgroups()
40+
41+
X_train = sklearn.datasets.fetch_20newsgroups(subset="train").data
42+
y_train = sklearn.datasets.fetch_20newsgroups(subset="train").target
43+
X_test = sklearn.datasets.fetch_20newsgroups(subset="test").data
44+
y_test = sklearn.datasets.fetch_20newsgroups(subset="test").target
45+
46+
return X_train, y_train, X_test, y_test
47+
48+
49+
if __name__ == '__main__':
50+
51+
n_jobs = 10
52+
chunksize = 1000
53+
54+
factor_multiplier = 100 # This factor ensures 1 million documents in the dataset
55+
sentences, _, _, _ = load_data()
56+
sentences = sentences * factor_multiplier
57+
print(f'num docs = {len(sentences)}')
58+
59+
60+
count_vectorizer = feature_extraction.text.CountVectorizer()
61+
doc_analyzer = count_vectorizer.build_analyzer()
62+
63+
t0 = time.time()
64+
pool = Pool(processes=n_jobs)
65+
66+
p_build_vocabulary = partial(build_vocabulary, doc_analyzer=doc_analyzer)
67+
partial_vocabularies = pool.map(p_build_vocabulary, sentences)
68+
print('len(partial_vocabularies)--->', len(partial_vocabularies))
69+
vocabulary = aggregate_dicts(partial_vocabularies)
70+
71+
print('len(vocabulary.items())--->', len(vocabulary.items()))
72+
print("(vocabulary['from'], vocabulary['gift'])--->", (vocabulary['from'], vocabulary['gift']))
73+
print(f'time taken {time.time()-t0} seconds')
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import string
2+
import random
3+
4+
from multiprocessing import Pool
5+
from multiprocessing.managers import BaseManager, DictProxy
6+
from collections import defaultdict
7+
from itertools import repeat
8+
9+
import sklearn
10+
from sklearn import feature_extraction, datasets
11+
import time
12+
13+
class MyManager(BaseManager):
14+
pass
15+
16+
MyManager.register('defaultdict', defaultdict, DictProxy)
17+
18+
def update_vocabulary(sentence, manager_vocabulary, doc_analyzer):
19+
words = doc_analyzer(sentence)
20+
for word in words:
21+
manager_vocabulary[word] += 1
22+
23+
24+
def load_data():
25+
26+
X = sklearn.datasets.fetch_20newsgroups()
27+
28+
X_train = sklearn.datasets.fetch_20newsgroups(subset="train").data
29+
y_train = sklearn.datasets.fetch_20newsgroups(subset="train").target
30+
X_test = sklearn.datasets.fetch_20newsgroups(subset="test").data
31+
y_test = sklearn.datasets.fetch_20newsgroups(subset="test").target
32+
33+
return X_train, y_train, X_test, y_test
34+
35+
36+
if __name__ == '__main__':
37+
38+
n_jobs = 10
39+
chunksize = 100
40+
41+
factor_multiplier = 100 # This factor ensures 1 million documents in the dataset
42+
sentences, _, _, _ = load_data()
43+
sentences = sentences * factor_multiplier
44+
print(f'num docs = {len(sentences)}')
45+
46+
count_vectorizer = feature_extraction.text.CountVectorizer()
47+
doc_analyzer = count_vectorizer.build_analyzer()
48+
49+
t0 = time.time()
50+
pool = Pool(processes=n_jobs)
51+
manager = MyManager()
52+
manager.start()
53+
manager_vocabulary = manager.defaultdict(int)
54+
55+
pool.starmap(update_vocabulary, zip(sentences, repeat(manager_vocabulary), repeat(doc_analyzer)), chunksize=chunksize)
56+
vocabulary = manager_vocabulary
57+
58+
print('len(vocabulary.items())--->', len(vocabulary.items()))
59+
print("(vocabulary['from'], vocabulary['gift'])--->", (vocabulary['from'], vocabulary['gift']))
60+
print(f'time taken {time.time()-t0} seconds')
61+

0 commit comments

Comments
 (0)