Skip to content

Commit 0cfee90

Browse files
author
David Buchaca
committed
feat: add vocab building with threads
1 parent 15ebb8d commit 0cfee90

19 files changed

Lines changed: 373 additions & 647 deletions
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import string
2+
import time
3+
from collections import defaultdict
4+
from functools import partial
5+
from multiprocessing.dummy import Pool as ThreadPool
6+
7+
import sklearn
8+
from sklearn import feature_extraction, datasets
9+
10+
class timer():
11+
12+
def __init__(self, name = '', indentation=''):
13+
self.start = time.time()
14+
self.name = name
15+
self.indentation = indentation
16+
17+
def __enter__(self):
18+
pass
19+
20+
def __exit__(self, *args):
21+
print(self.indentation + f'time {self.name} {round(time.time() - self.start, 4)} sec')
22+
23+
24+
def load_data():
25+
26+
X = sklearn.datasets.fetch_20newsgroups()
27+
28+
X_train = sklearn.datasets.fetch_20newsgroups(subset="train").data
29+
y_train = sklearn.datasets.fetch_20newsgroups(subset="train").target
30+
X_test = sklearn.datasets.fetch_20newsgroups(subset="test").data
31+
y_test = sklearn.datasets.fetch_20newsgroups(subset="test").target
32+
33+
return X_train, y_train, X_test, y_test
34+
35+
36+
37+
# Basic tokenizer: lowercases, strips punctuation, and splits on whitespace
38+
def simple_tokenizer(text):
39+
translator = str.maketrans('', '', string.punctuation)
40+
return text.lower().translate(translator).split()
41+
42+
def aggregate_dicts(dicts):
43+
if len(dicts) == 1:
44+
return dicts[0]
45+
46+
result = dicts[0]
47+
for d in dicts[1:]:
48+
for k, v in d.items():
49+
result[k] += v
50+
return result
51+
52+
def build_vocabulary(sentence):
53+
vocabulary = defaultdict(int)
54+
words = simple_tokenizer(sentence)
55+
for word in words:
56+
vocabulary[word] += 1
57+
return vocabulary
58+
59+
if __name__ == '__main__':
60+
n_jobs = 10
61+
factor_multiplier = 20 # Ensures 500k million documents
62+
63+
sentences, _, _, _ = load_data()
64+
sentences = sentences * factor_multiplier
65+
print(f'num docs = {len(sentences)}\n')
66+
67+
with timer('overall', indentation=''):
68+
with timer('build vocabularies', indentation=''):
69+
with ThreadPool(n_jobs) as pool:
70+
partial_vocabularies = pool.map(build_vocabulary, sentences)
71+
72+
with timer('aggregate vocabularies', indentation=''):
73+
vocabulary = aggregate_dicts(partial_vocabularies)
74+
75+
print('\nlen(partial_vocabularies)--->', len(partial_vocabularies))
76+
print('len(vocabulary.items())--->', len(vocabulary.items()))
77+
print("(vocabulary['from'], vocabulary['gift'])--->", (vocabulary['from'], vocabulary['gift']))
78+
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import string
2+
import time
3+
from collections import defaultdict
4+
from threading import Lock
5+
from multiprocessing.dummy import Pool as ThreadPool
6+
7+
import sklearn
8+
from sklearn import datasets
9+
10+
11+
class timer:
12+
def __init__(self, name='', indentation=''):
13+
self.start = time.time()
14+
self.name = name
15+
self.indentation = indentation
16+
17+
def __enter__(self):
18+
pass
19+
20+
def __exit__(self, *args):
21+
print(self.indentation + f'time {self.name} {round(time.time() - self.start, 4)} sec')
22+
23+
24+
def load_data():
25+
X_train = datasets.fetch_20newsgroups(subset="train").data
26+
y_train = datasets.fetch_20newsgroups(subset="train").target
27+
X_test = datasets.fetch_20newsgroups(subset="test").data
28+
y_test = datasets.fetch_20newsgroups(subset="test").target
29+
return X_train, y_train, X_test, y_test
30+
31+
32+
# Basic tokenizer: lowercases, strips punctuation, and splits on whitespace
33+
def simple_tokenizer(text):
34+
translator = str.maketrans('', '', string.punctuation)
35+
return text.lower().translate(translator).split()
36+
37+
38+
# Shared resources for all threads
39+
shared_vocabulary = defaultdict(int)
40+
vocab_lock = Lock()
41+
42+
43+
def update_shared_vocabulary(sentence):
44+
words = simple_tokenizer(sentence)
45+
with vocab_lock:
46+
for word in words:
47+
shared_vocabulary[word] += 1
48+
49+
50+
if __name__ == '__main__':
51+
n_jobs = 10
52+
factor_multiplier = 20 # Ensures ~200k documents
53+
54+
sentences, _, _, _ = load_data()
55+
sentences = sentences * factor_multiplier
56+
print(f'num docs = {len(sentences)}\n')
57+
58+
with timer('overall', indentation=''):
59+
with timer('build vocabularies (shared dict)', indentation=''):
60+
with ThreadPool(n_jobs) as pool:
61+
pool.map(update_shared_vocabulary, sentences)
62+
63+
print('\nlen(vocabulary.items())--->', len(shared_vocabulary.items()))
64+
print("(vocabulary['from'], vocabulary['gift'])--->", (shared_vocabulary['from'], shared_vocabulary['gift']))
65+
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import string
2+
import time
3+
from collections import defaultdict
4+
from multiprocessing import Pool, Manager, Lock
5+
6+
import sklearn
7+
from sklearn import datasets
8+
9+
10+
class timer:
11+
def __init__(self, name='', indentation=''):
12+
self.start = time.time()
13+
self.name = name
14+
self.indentation = indentation
15+
16+
def __enter__(self):
17+
return self
18+
19+
def __exit__(self, *args):
20+
print(self.indentation + f'time {self.name} {round(time.time() - self.start, 4)} sec')
21+
22+
23+
def load_data():
24+
X_train = datasets.fetch_20newsgroups(subset="train").data
25+
y_train = datasets.fetch_20newsgroups(subset="train").target
26+
X_test = datasets.fetch_20newsgroups(subset="test").data
27+
y_test = datasets.fetch_20newsgroups(subset="test").target
28+
return X_train, y_train, X_test, y_test
29+
30+
31+
# Tokenizer: lowercase, remove punctuation, split
32+
def simple_tokenizer(text):
33+
translator = str.maketrans('', '', string.punctuation)
34+
return text.lower().translate(translator).split()
35+
36+
37+
# This function is called by each process
38+
def update_shared_vocabulary_old(sentence, shared_dict, lock):
39+
local_counts = defaultdict(int)
40+
for word in simple_tokenizer(sentence):
41+
local_counts[word] += 1
42+
43+
with lock:
44+
for word, count in local_counts.items():
45+
shared_dict[word] = shared_dict.get(word, 0) + count
46+
47+
48+
def update_shared_vocabulary(sentence, shared_dict, lock):
49+
# Local count first — no locking needed
50+
local_counts = defaultdict(int)
51+
for word in simple_tokenizer(sentence):
52+
local_counts[word] += 1
53+
54+
# Only lock once to update the shared dict
55+
with lock:
56+
for word, count in local_counts.items():
57+
shared_dict[word] = shared_dict.get(word, 0) + count
58+
59+
def init_process(shared_dict_, lock_):
60+
global shared_dict
61+
global lock
62+
shared_dict = shared_dict_
63+
lock = lock_
64+
65+
66+
def wrapper(sentence):
67+
update_shared_vocabulary(sentence, shared_dict, lock)
68+
69+
70+
if __name__ == '__main__':
71+
n_jobs = 8 # or os.cpu_count()
72+
factor_multiplier = 20 # ~500k documents
73+
74+
sentences, _, _, _ = load_data()
75+
sentences = sentences * factor_multiplier
76+
print(f'num docs = {len(sentences)}\n')
77+
78+
with timer('overall', indentation=''):
79+
with Manager() as manager:
80+
shared_dict = manager.dict()
81+
lock = manager.Lock()
82+
83+
with timer('build vocabularies (multiprocessing)', indentation=''):
84+
with Pool(processes=n_jobs, initializer=init_process, initargs=(shared_dict, lock)) as pool:
85+
pool.map(wrapper, sentences)
86+
87+
vocabulary = dict(shared_dict) # Convert managed dict to normal dict for inspection
88+
89+
print('\nlen(vocabulary.items())--->', len(vocabulary))
90+
print("(vocabulary['from'], vocabulary['gift'])--->", (vocabulary.get('from', 0), vocabulary.get('gift', 0)))
91+
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import string
2+
import time
3+
from collections import defaultdict
4+
from multiprocessing import Pool, cpu_count
5+
from sklearn import datasets
6+
7+
class timer:
8+
def __init__(self, name='', indentation=''):
9+
self.start = time.time()
10+
self.name = name
11+
self.indentation = indentation
12+
13+
def __enter__(self):
14+
return self
15+
16+
def __exit__(self, *args):
17+
print(self.indentation + f'time {self.name} {round(time.time() - self.start, 4)} sec')
18+
19+
20+
def load_data():
21+
X_train = datasets.fetch_20newsgroups(subset="train").data
22+
y_train = datasets.fetch_20newsgroups(subset="train").target
23+
X_test = datasets.fetch_20newsgroups(subset="test").data
24+
y_test = datasets.fetch_20newsgroups(subset="test").target
25+
return X_train, y_train, X_test, y_test
26+
27+
# Basic tokenizer: lowercases, strips punctuation, and splits on whitespace
28+
def simple_tokenizer(text):
29+
translator = str.maketrans('', '', string.punctuation)
30+
return text.lower().translate(translator).split()
31+
32+
def aggregate_dicts(dicts):
33+
if len(dicts) == 1:
34+
return dicts[0]
35+
36+
result = dicts[0]
37+
for d in dicts[1:]:
38+
for k, v in d.items():
39+
result[k] += v
40+
return result
41+
42+
def build_vocabulary(sentence):
43+
vocabulary = defaultdict(int)
44+
words = simple_tokenizer(sentence)
45+
for word in words:
46+
vocabulary[word] += 1
47+
return vocabulary
48+
49+
if __name__ == '__main__':
50+
n_jobs = 10 # Set based on available cores or workload
51+
factor_multiplier = 20 # Ensures 100k documents
52+
53+
sentences, _, _, _ = load_data()
54+
sentences = sentences * factor_multiplier
55+
print(f'num docs = {len(sentences)}\n')
56+
57+
with timer('overall', indentation=''):
58+
with timer('build vocabularies (multiprocessing)', indentation=''):
59+
with Pool(processes=n_jobs) as pool:
60+
partial_vocabularies = pool.map(build_vocabulary, sentences)
61+
62+
with timer('aggregate vocabularies', indentation=''):
63+
vocabulary = aggregate_dicts(partial_vocabularies)
64+
65+
print('\nlen(partial_vocabularies)--->', len(partial_vocabularies))
66+
print('len(vocabulary.items())--->', len(vocabulary.items()))
67+
print("(vocabulary['from'], vocabulary['gift'])--->", (vocabulary['from'], vocabulary['gift']))
68+
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import string
2+
import time
3+
from collections import defaultdict
4+
from threading import Lock
5+
from multiprocessing.dummy import Pool as ThreadPool
6+
7+
import sklearn
8+
from sklearn import datasets
9+
from collections import Counter
10+
11+
12+
class timer:
13+
def __init__(self, name='', indentation=''):
14+
self.start = time.time()
15+
self.name = name
16+
self.indentation = indentation
17+
18+
def __enter__(self):
19+
pass
20+
21+
def __exit__(self, *args):
22+
print(self.indentation + f'time {self.name} {round(time.time() - self.start, 4)} sec')
23+
24+
25+
def load_data():
26+
X_train = datasets.fetch_20newsgroups(subset="train").data
27+
y_train = datasets.fetch_20newsgroups(subset="train").target
28+
X_test = datasets.fetch_20newsgroups(subset="test").data
29+
y_test = datasets.fetch_20newsgroups(subset="test").target
30+
return X_train, y_train, X_test, y_test
31+
32+
33+
# Basic tokenizer: lowercases, strips punctuation, and splits on whitespace
34+
def simple_tokenizer(text):
35+
translator = str.maketrans('', '', string.punctuation)
36+
return text.lower().translate(translator).split()
37+
38+
39+
def build_local_vocabulary(sentences_chunk):
40+
local_vocab = Counter()
41+
for sentence in sentences_chunk:
42+
words = simple_tokenizer(sentence)
43+
local_vocab.update(words)
44+
return local_vocab
45+
46+
if __name__ == '__main__':
47+
from math import ceil
48+
49+
n_jobs = 10
50+
factor_multiplier = 20
51+
52+
sentences, _, _, _ = load_data()
53+
sentences = sentences * factor_multiplier
54+
print(f'num docs = {len(sentences)}\n')
55+
56+
chunk_size = ceil(len(sentences) / n_jobs)
57+
chunks = [sentences[i:i + chunk_size] for i in range(0, len(sentences), chunk_size)]
58+
59+
with timer('overall'):
60+
with timer('build vocabularies (local merge)'):
61+
with ThreadPool(n_jobs) as pool:
62+
local_vocabularies = pool.map(build_local_vocabulary, chunks)
63+
64+
# Merge all local vocabularies into one
65+
merged_vocab = Counter()
66+
for vocab in local_vocabularies:
67+
merged_vocab.update(vocab)
68+
69+
print('\nlen(vocabulary.items())--->', len(merged_vocab.items()))
70+
print("(vocabulary['from'], vocabulary['gift'])--->", (merged_vocab['from'], merged_vocab['gift']))
71+

0 commit comments

Comments
 (0)