-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathDistributedLayoutAlgorithm.py
More file actions
405 lines (329 loc) · 15.7 KB
/
DistributedLayoutAlgorithm.py
File metadata and controls
405 lines (329 loc) · 15.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
#import packages
from __future__ import print_function
import pyspark
import math
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
import os
import shutil
import sys
import timeit
import networkx as nx
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
from graphframes import GraphFrame
from graphframes.lib import AggregateMessages as AM
# Function to calculate the displacement on source`s x axis due to dest attractive force
# @para it take 4 parameter. x,y attributes from Src node and x,y attribute from Dst node
# @return the displacement on the source`s x axis due to destinations attractive force: DoubleType()
def aDispSrc(node1,node2):
#Constant to calculate attractive and repulsive force
K = math.sqrt(1 / nNodes)
dx = (node2[0] - node1[0])
dy = (node2[1] - node1[1])
distance = math.sqrt(dx**2 + dy**2)
if distance == 0:
return [0,0,0]
if distance < 0.01:
distance = 0.01
aForce = distance/K
dispX = dx/distance
dispY = dy/distance
aDispX = dispX*aForce
aDispY = dispY*aForce
xy = [aDispX,aDispY,1.0]
return xy
aDispSrc = F.udf(aDispSrc, ArrayType(DoubleType()))
# Function to calculate the displacement on dst`s x axis due to dest attractive force
# @para it take 4 parameter. x,y attributes from Src node and x,y attribute from Dst node
# @return the displacement on the dst`s x axis due to destinations attractive force: DoubleType()
def aDispDst(node1,node2):
K = math.sqrt(1 / nNodes)
dx = (node2[0] - node1[0])
dy = (node2[1] - node1[1])
distance = math.sqrt(dx**2 + dy**2)
if distance == 0:
return [0,0,0]
if distance < 0.01:
distance = 0.01
aForce = distance/K
dispX = dx/distance
dispY = dy/distance
aDispX = dispX*aForce
aDispY = dispY*aForce
xy = [-aDispX,-aDispY,1.0]
return xy
aDispDst = F.udf(aDispDst, ArrayType(DoubleType()))
# Function to calculate the associated centroid and it distance to the vertex
# @para it takes source vertex x,y co-ordiates as input parameter
# @return the associated centroid and its distances from the vertex
centroidVertexAssociationUdf = F.udf(lambda z: centroidVertexAssociation(z), ArrayType(DoubleType()))
def centroidVertexAssociation(vertexCord):
centroidsValue = centroidBroadcast.value
centroidLength = len(centroidsValue)
centroidDistance = 0.0
centroidAssociated = 0.0
for i in range(centroidLength):
dx = (vertexCord[0] - centroidsValue[i][1][0])
dy = (vertexCord[1] - centroidsValue[i][1][1])
distance = math.sqrt(dx**2 + dy**2)
if i == 0:
centroidDistance = distance
centroidAssociated = centroidsValue[i][0]
if distance < centroidDistance:
centroidAssociated = centroidsValue[i][0]
centroidDistance = distance
return [centroidAssociated,centroidDistance]
# Function to calculate the dispalcement on vertices due to centroids repulsive force
# @para it takes vertex x,y co-ordiates as input parameter
# @return the dsisplacement of vertex position due to centroids repulsive force
def rForceCentroid(vertexCord):
K = math.sqrt(1 / nNodes)
centroidLength = len(centroid_list)
weight = 1
dx = 0.0
dy = 0.0
distance = 0.0
rDispX = 0.0
rDispY = 0.0
if centroidLength>0:
for i in range(centroidLength):
dx = (vertexCord[0] - centroid_list[i][0])
dy = (vertexCord[1] - centroid_list[i][1])
distance = math.sqrt(dx**2 + dy**2)
if distance != 0:
if distance < 0.01:
distance = 0.01
rForce = -(K/distance**2) * weight
dispX = dx/distance
dispY = dy/distance
rDispX = rDispX + dispX*rForce
rDispY = rDispY + dispY*rForce
return [rDispX,rDispY]
rForceCentroid = F.udf(rForceCentroid,ArrayType(DoubleType()))
# Function to calculate the dispalcement on vertices due to center repulsive force
# @para it takes vertex x,y co-ordiates as input parameter
# @return the dsisplacement of vertex position due to center repulsive force
def rForceCenter(vertexCord):
K = math.sqrt(1 / nNodes)
weight = 1
centerValue = centerBroadcast.value
dx = (vertexCord[0] - centerValue[0])
dy = (vertexCord[1] - centerValue[1])
distance = math.sqrt(dx**2 + dy**2)
if distance == 0:
return [0,0]
if distance < 0.01:
distance = 0.01
rForce = -(K/distance**2) * weight
dispX = dx/distance
dispY = dy/distance
rDispX = dispX*rForce
rDispY = dispY*rForce
return [rDispX,rDispY]
rForceCenter = F.udf(rForceCenter,ArrayType(DoubleType()))
# Funtion to scale the vertice degree centrality
# @Param: degree of vertex and maxDegree in the graph
# @return: the scaled degree between 0 to 5
def scale_degree(degree, maxDegree, minDegree=1,mi=0, ma=5, log=False, power=1):
r"""Convert property map values to be more useful as a vertex size, or edge
width. The new values are taken to be
.. math::
y_i = mi + (ma - mi) \left(\frac{x_i - \min(x)} {\max(x) - \min(x)}\right)^\text{power}
If ``log=True``, :math:`x_i` is replaced with :math:`\ln(x_i)`.
If :math:`\max(x) - \min(x)` is zero, :math:`y_i = mi`.
"""
delta = (maxDegree) - minDegree
if delta == 0:
delta = 1
prop = mi + (ma - mi) * ((degree - minDegree) / delta) ** power
return prop
scale_degree = F.udf(scale_degree,DoubleType())
if __name__ == "__main__":
startTime = timeit.default_timer()
# save file arguments to variables
inputPath = sys.argv[1]
outputPath = sys.argv[2]
numIteration = int(sys.argv[3])
name = os.path.basename(inputPath ).split(".")[0]
# create spark context
spark = pyspark.sql.SparkSession.builder\
.appName(name) \
.getOrCreate()
sc = pyspark.SparkContext.getOrCreate()
sqlContext = pyspark.SQLContext.getOrCreate(sc)
sc.setLogLevel("ERROR")
checkpintDir = outputPath+"checkpoint"
try:
if os.path.exists(checkpintDir):
print("Checkpoint directory already exist")
else:
os.mkdir(checkpintDir)
print ("Successfully created the directory %s " % checkpintDir)
except OSError:
print ("Creation of the directory %s failed" % checkpintDir)
sc.setCheckpointDir(checkpintDir)
# startTime = timeit.default_timer()
InitialNumPartitions=sc.defaultParallelism
# load input edge file
edges = spark.read.csv(inputPath,sep = "\t",comment='#',header=None)
edges = edges.withColumnRenamed("_c0","src")\
.withColumnRenamed("_c1","dst")
edgesCheckpoint = edges.checkpoint()
edgesCheckpoint.count()
# print("the number of partitions in edges df are")
# numPartitions = edges.rdd.getNumPartitions()
# print(numPartitions)
# Extract nodes from the edge list dataframe
vA = edgesCheckpoint.select(F.col('src')).drop_duplicates()\
.withColumnRenamed('src','id')
# print(vA.rdd.getNumPartitions())
# print("number of unique verticex in src column: {}".format(vA.count()))
vB = edgesCheckpoint.select(F.col('dst')).drop_duplicates()\
.withColumnRenamed('dst','id')
# print(vB.rdd.getNumPartitions())
# print("number of unique verticex in dst column: {}".format(vB.count()))
vF1 = vA.union(vB).distinct()
nodesCheckpoint = vF1.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
nodesCheckpoint.count()
# print("the number of partitions in vF df are")
# print(nodesCheckpoint.rdd.getNumPartitions())
print("Num of nodes: {}".format(nodesCheckpoint.count()))
print("Num of edges: {}".format(edgesCheckpoint.count()))
independentSet = []
graphs = dict()
metaGraph = dict()
metaGraphlayout = dict()
metaEdgeCord = dict()
completeGraphLayout = dict()
#initialize index and graphs dictionary
i = 0
# create GraphFrame object using nodes and egdes dataframe
graphs[i] = GraphFrame(nodesCheckpoint,edgesCheckpoint)
currentNodes = graphs[i].vertices
# number of nodes in the first filtration level
currentNodesCounts = currentNodes.count()
currentEdges = graphs[i].edges
currentEdgesCounts = currentEdges.count()
# variable to stop the while loop of the seleceted nodes of next level of filtration is less than 3
currentSelectedNodes = currentNodes.count()
numOfVertices = graphs[i].vertices.count()
i = i+1
k = 1
t=0.1
vertices = nodesCheckpoint
edges = edgesCheckpoint
nNodes = vertices.count()
numberOfCentroids = round(nNodes/2)
#Initialize the vertice with x,y with random values and dispx,dispy with 0
verticeWithCord = vertices.withColumn("xy" ,F.array(F.rand(seed=1)*F.lit(3),F.rand(seed=0)*F.lit(3)))\
.checkpoint()
# cool-down amount
dt = t/(numIteration+1)
# calculate the center repulsive force for given iteration
for p in range(numIteration):
#calculate centroids
centroids = verticeWithCord.sample(withReplacement=False,fraction=(numberOfCentroids/nNodes),seed=1)
centroid_list = centroids.select("xy").rdd.flatMap(lambda x : x).collect()
#calculate centroids repulsive force
vCentroid = verticeWithCord.withColumn("dispCentroidXY",rForceCentroid("xy")).cache()
vCentroid.count()
# find the center of the network
if nNodes > 0:
x = centroids.agg(F.avg(F.col("xy")[0]).alias("centerX")).collect()
y = centroids.agg(F.avg(F.col("xy")[0]).alias("centerX")).collect()
center = [x[0][0],y[0][0]]
else:
center = [0,0]
centerBroadcast = sc.broadcast(center)
#calculate center repulsive force
vCenter = verticeWithCord.withColumn("dispCenterXY",rForceCenter("xy")).select("id","xy","dispCenterXY").cache()
vCenter.count()
centerBroadcast.unpersist()
#calculate total repulsive forece displacement
newVertices = vCentroid.join(vCenter,on="id")\
.drop(vCentroid.xy)\
.withColumn("dispX",(F.col("dispCentroidXY")[0]+F.col("dispCenterXY")[0]))\
.withColumn("dispY",(F.col("dispCentroidXY")[1]+F.col("dispCenterXY")[1]))\
.cache()
vCentroid.unpersist()
vCenter.unpersist()
# print("rForce is calculated")
gfA = GraphFrame(verticeWithCord,edges)#.cache()
#messages send to source and destination vertices to calculate displacement on node due to attractive force
msgToSrc = aDispSrc(AM.src['xy'],AM.dst['xy'])
msgToDst = aDispDst(AM.src['xy'],AM.dst['xy'])
# AM fucntion to calculate displacement on node due to attractive force
#@para Sum all the messages on the nodes,sendToSrc adn SendToDst messages
#@return Dataframe with attribute Id, and displacementX
aAgg = gfA.aggregateMessages(
F.array((F.sum(AM.msg.getItem(0))).alias("x"),(F.sum(AM.msg.getItem(1))).alias("y")).alias("aDispXY"),
sendToSrc=msgToSrc,#)
sendToDst=msgToDst)
cachedAAgg = AM.getCachedDataFrame(aAgg)
# print("aForce is calculated")
# Calculate total displacement from all forces
newVertices2 = newVertices.join((cachedAAgg), on=(newVertices['id'] == cachedAAgg['id']), how='left_outer')\
.drop(cachedAAgg['id'])\
.withColumn('newDispColX', F.when(cachedAAgg['adispXY'][0].isNotNull(),(cachedAAgg['adispXY'][0] + newVertices['dispX'])).otherwise(newVertices['dispX']))\
.withColumn('newDispColY', F.when(cachedAAgg['adispXY'][1].isNotNull(),(cachedAAgg['adispXY'][1] + newVertices['dispY'])).otherwise(newVertices['dispY']))\
.cache()
newVertices.unpersist()
#Update the vertices position
updatedVertices = newVertices2.withColumn("length", F.sqrt(F.col('newDispColX')**2 + F.col('newDispColY')**2))\
.withColumn('newDispX', (F.col('newDispColX')/F.col('length'))*F.least(F.abs(F.col('length')),F.lit(t)))\
.withColumn('newDispY', (F.col('newDispColY')/F.col('length'))*F.least(F.abs(F.col('length')),F.lit(t)))\
.withColumn('newXY', F.array((F.col('xy')[0]+F.col('newDispX')),(F.col('xy')[1]+F.col('newDispY'))))\
.drop("xy","dispCentroidXY","dispCenterXY","dispX","dispY","aDispXY","newDispColX","newDispColY","length","newDispX","newDispY")\
.withColumnRenamed("newXY","xy").checkpoint(True)
newVertices2.unpersist()
verticeWithCord = updatedVertices
cachedAAgg.unpersist()
print("{} Iterations are completed".format(p,k))
t -= dt
updatedV = verticeWithCord.select("id","xy")
graph = GraphFrame(updatedV , edges)
VerticesInDegrees = graph.inDegrees
VerticesOutDegrees = graph.outDegrees
veticesFinal = updatedV.join(VerticesInDegrees, on = "id", how = "left").na.fill(value=1).join(VerticesOutDegrees, on = "id", how = "left").na.fill(value=1)
maxInDegree = veticesFinal.orderBy(F.col("inDegree").desc()).take(1)[0][2]
maxOutDegree = veticesFinal.orderBy(F.col("outDegree").desc()).take(1)[0][2]
vertices_scaled_degree = veticesFinal.withColumn("scaled_inDegree",scale_degree("inDegree",F.lit(maxInDegree))).withColumn("scaled_outDegree",scale_degree("outDegree",F.lit(maxOutDegree)))
time5 = timeit.default_timer() - startTime
print("time taken for layout of combined levels = {}".format(time5))
vList = vertices_scaled_degree.select("id").rdd.flatMap(lambda x : x).collect()
print("list of nodes are collected")
vListPos = vertices_scaled_degree.select("xy").rdd.flatMap(lambda x : x).collect()
print("list of nodes positions are collected")
vlistDegree = vertices_scaled_degree.select("scaled_inDegree").rdd.flatMap(lambda x : x).collect()
print("list of nodes degree are collected")
degree = dict(zip(vList, vlistDegree))
pos = dict(zip(vList, vListPos))
print("dict of nodes and their positions are created")
nxGraphLayout = nx.from_pandas_edgelist(edges.toPandas(),source="src", target="dst")
print("networkx graph object is created")
#plot the nextworkX graph and save it to output path
a = nx.draw(nxGraphLayout,pos,node_size=vlistDegree,width=0.1)
print("networkx graph using distribute layout is created")
plt.title("{name}_{numIteration}_Iterations layout".format(name=name,numIteration=numIteration))
plt.savefig("{outputPath}{name}_{numIteration}_Iterations.png".format(outputPath=outputPath,name=name,numIteration=numIteration), dpi=1000,bbox_inches='tight')
print("graph is saved to the disk")
print("Num of nodes: {}".format(updatedV.count()))
print("Num of edges: {}".format(edges.count()))
#Save the calculated position of the graph to output path
updatedV.coalesce(10).write.mode("overwrite").parquet("{outputPath}{name}_{numIteration}_Iterations_updatedVertices.parquet".format(outputPath=outputPath,name=name,numIteration=numIteration))
edges.coalesce(10).write.mode("overwrite").parquet("{outputPath}{name}_{numIteration}_Iterations_edges.parquet".format(outputPath=outputPath,name=name,numIteration=numIteration))
print("Nodes and Edges dataframes saved to disk")
#remove the checkpoint dir
try:
if os.path.exists(checkpintDir):
shutil.rmtree(checkpintDir)
print("Successfully deleted the directory %s " % checkpintDir)
else:
print("Directory does not exist: %s " % checkpintDir)
except OSError:
print("Directory does not exist: %s " % checkpintDir)