-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMPBlockScheduler.py
More file actions
158 lines (133 loc) · 5.33 KB
/
MPBlockScheduler.py
File metadata and controls
158 lines (133 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import multiprocessing
import queue
import time
import os
import random
from multiprocessing import Process, Queue, Lock, Manager
from collections import defaultdict
from SGDRecommender import ExplicitMF
class SubSample:
def __init__(self,block_pos:tuple,samples) -> None:
self.block_pos = block_pos
self.samples = samples
class BlockScheduler:
def __init__(self,grid,iters) -> None:
self.grid = grid
self.width = len(grid)
self.n_threads = self.width -1
temp = [0]*self.width
self.update_counter = []
for i in range(self.width):
self.update_counter.append(temp.copy())
self.unused_rows,self.unused_cols=[*range(self.width)],[*range(self.width)]
self.completed_rows,self.completed_cols=set(),set()
self.iters = iters
print(self.update_counter)
def check_completion(self):
#print(self.unused_rows,self.unused_cols)
if len(self.completed_rows) == self.width and len(self.completed_cols)==self.width:
return True
else:
return False
def get_next(self,completed=None):
print("Unused rows",self.unused_rows,"Unused cols: ",self.unused_cols)
print('\n'.join([''.join(['{:4}'.format(item) for item in row])
for row in self.update_counter]))
if completed:
self.completed_chunk(completed)
# if len(self.unused_cols) != len(self.unused_rows):
# raise Exception(f"Desync occurred, # of unused rows and cols is not the same\n{self.unused_rows}\n{self.unused_cols}")
if not len(self.unused_cols):
return None
min = 10**10
min_idx = (-1,-1)
for i in self.unused_rows:
for j in self.unused_cols:
if self.update_counter[i][j] <min and self.update_counter[i][j] <self.iters:
min = self.update_counter[i][j]
min_idx = (i,j)
if min_idx != (-1,-1):
output = SubSample(min_idx,self.grid[min_idx[0]][min_idx[1]])
row_idx = self.unused_rows.index(min_idx[0])
col_idx = self.unused_cols.index(min_idx[1])
self.unused_cols.pop(col_idx)
self.unused_rows.pop(row_idx)
else:
output = None
return output
def completed_chunk(self,idx):
row = idx[0]
col = idx[1]
if idx[0] in self.unused_rows or idx[1] in self.unused_cols:
raise Exception("Desync occurred, completed chunk was still in unused chunks")
self.update_counter[idx[0]][idx[1]] += 1
r_count,c_count = 0,0
for i in range(self.width):
if self.update_counter[row][i] == self.iters:
r_count += 1
if self.update_counter[i][col] == self.iters:
c_count += 1
# if r_count > self.iters or c_count>self.iters:
# raise Exception(f"Too many iterations. updates:\n{self.update_counter}")
if c_count == self.width:
self.completed_cols.add(col)
else:
self.unused_cols.append(col)
if r_count == self.width:
self.completed_rows.add(row)
else:
self.unused_rows.append(row)
# The consumer function takes data off of the Queue
def consumer(in_queue,out_queue, lock,model:ExplicitMF):
# Synchronize access to the console
with lock:
print('Starting consumer => {}'.format(os.getpid()))
# Run indefinitely
while True:
# If the queue is empty, queue.get() will block until the queue has data
block = in_queue.get()
# Synchronize access to the console
# with lock:
# print('{} got {}'.format(os.getpid(), block.block_pos))
if block == "000":
return 0
elif block:
# time.sleep(random.randint(1,4)/1000.0)
with lock:
model.expirimental_setter(block.block_pos[0],block.block_pos[1])
out_queue.put(block.block_pos)
else:
continue
# if __name__ == '__main__':
# # Some lists with our favorite characters
# # Create the Queue object
# m = multiprocessing.Manager()
# in_queue = m.Queue()
# out_queue = m.Queue()
# scheduler = BlockScheduler(multiprocessing.cpu_count()+2,6)
# # Create a lock object to synchronize resource access
# lock = Lock()
# consumers = []
# # Create consumer processes
# for i in range(multiprocessing.cpu_count()):
# p = Process(target=consumer, args=(in_queue,out_queue, lock))
# consumers.append(p)
# in_queue.put(scheduler.get_next())
# for c in consumers:
# c.start()
# #scheduler will handle putting and retreiving items from queues
# done = False
# while not done:
# updated_block = out_queue.get(timeout=20)
# next = scheduler.get_next(completed=updated_block)
# done = scheduler.check_completion()
# if done:
# print("Done!")
# for i in range(multiprocessing.cpu_count()):
# in_queue.put("000")
# break
# in_queue.put(next)
# # Like threading, we have a join() method that synchronizes our program
# for c in consumers:
# c.join()
# print('Parent process exiting...')