From 176d30dba6c623af6539c3cd70f1a4853bc3ef7e Mon Sep 17 00:00:00 2001 From: Richard Whitcomb Date: Sun, 24 Jul 2016 14:38:16 -0400 Subject: [PATCH 1/4] Experimental Graph API for Scalding --- .../com/twitter/scalding/graph/Edge.scala | 18 ++ .../twitter/scalding/graph/EdgeTriplet.scala | 18 ++ .../com/twitter/scalding/graph/Graph.scala | 222 ++++++++++++++++++ .../twitter/scalding/graph/Neighbors.scala | 25 ++ .../com/twitter/scalding/graph/Vertex.scala | 20 ++ .../twitter/scalding/graph/GraphTest.scala | 75 ++++++ 6 files changed, 378 insertions(+) create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/graph/Edge.scala create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/graph/EdgeTriplet.scala create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/graph/Vertex.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Edge.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Edge.scala new file mode 100644 index 0000000000..a8ce45ffe2 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Edge.scala @@ -0,0 +1,18 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.graph + +case class Edge[T: Ordering, S](source: T, dest: T, attr: S) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/EdgeTriplet.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/EdgeTriplet.scala new file mode 100644 index 0000000000..1dafb8f5fb --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/EdgeTriplet.scala @@ -0,0 +1,18 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.graph + +case class EdgeTriplet[T: Ordering, S, Q](source: Vertex[T, Q], dest: Vertex[T, Q], edge: Edge[T, S]) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala new file mode 100644 index 0000000000..77e30331ca --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala @@ -0,0 +1,222 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.graph + +import com.twitter.scalding.TypedPipe + +import scala.reflect.ClassTag + +class Graph[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) { + def edges: TypedPipe[Edge[T, S]] = inputEdges + def vertices: TypedPipe[Vertex[T, Q]] = inputVertices + + /** + * Returns a TypedPipe of edges with joined Vertex attributes. + */ + def triplets: TypedPipe[EdgeTriplet[T, S, Q]] = + edges + .groupBy(_.source) + .join(vertices.groupBy(_.id)) + .toTypedPipe + .map{ case (source, (edge, packet)) => (edge.dest, (edge, packet)) } + .join(vertices.groupBy(_.id)) + .values + .map{ case ((edge, sourceVertex), destVertex) => EdgeTriplet(sourceVertex, destVertex, edge) } + + /** + * Left join vertices with the graph vertices and generate a new Graph + */ + def leftJoinVertices[U, VD2](other: TypedPipe[Vertex[T, U]])(mapFunc: (T, Q, Option[U]) => VD2): Graph[T, S, VD2] = { + val newVertices = vertices + .groupBy(_.id) + .leftJoin(other.groupBy(_.id)) + .toTypedPipe + .map{ case (id, (vertex, data)) => Vertex(id, mapFunc(id, vertex.attr, data.map(_.attr))) } + + new GraphUnfilteredEdges(edges, newVertices) + } + + /** + * Inner join vertices with the graph vertices and generate a new Graph + */ + def joinVertices[U, VD2](other: TypedPipe[Vertex[T, U]])(mapFunc: (T, Q, U) => VD2): Graph[T, S, VD2] = { + val newVertices = vertices + .groupBy(_.id) + .join(other.groupBy(_.id)) + .toTypedPipe + .map{ case (id, (vertex, data)) => Vertex(id, mapFunc(id, vertex.attr, data.attr)) } + + new Graph[T, S, VD2](edges, newVertices) + } + + def mapVertices[A](map: Vertex[T, Q] => A): Graph[T, S, A] = + new Graph[T, S, A](edges, vertices.map{ vertex => Vertex(vertex.id, map(vertex)) }) + + def mapEdges[A](map: Edge[T, S] => A): Graph[T, A, Q] = + new Graph[T, A, Q](edges.map{ edge => edge.copy(attr = map(edge)) }, vertices) + + def mapTriplets[A, B](map: EdgeTriplet[T, S, Q] => EdgeTriplet[T, A, B]): Graph[T, A, B] = { + val newTriplets = triplets.map(map) + + new Graph[T, A, B]( + newTriplets.map(_.edge), + newTriplets.flatMap(trip => List(trip.source, trip.dest)).distinct(Ordering.by(_.id))) + } + + /** + * Collect only neighbor ids. + * Optionally sort by Vertex + */ + def collectNeighborIds(sortNeighbors: Boolean = true): TypedPipe[Vertex[T, Neighbors[T]]] = + edges + .map{ edge => (edge.source, edge.dest) } + .group + .mapGroup{ + case (vert, nbrs) => + val nbrsArray: Vertex[T, Neighbors[T]] = if (sortNeighbors) { + Vertex(vert, SortedNeighbors(nbrs.toArray.sorted)) + } else { + Vertex(vert, UnsortedNeighbors(nbrs.toArray)) + } + + Iterator.single(nbrsArray) + } + .values + + /** + * Collect all Neighbors of an Vertex. + * Optionally sort by Vertex + */ + def collectNeighbors(sortNeighbors: Boolean = true): TypedPipe[Vertex[T, Neighbors[Vertex[T, Q]]]] = + edges + .map{ edge => (edge.dest, edge.source) } + .join(vertices.groupBy(_.id)) + .toTypedPipe + .map{ case (dest, (source, vertex)) => (source, (dest, vertex)) } + .group + .mapGroup{ + case (id, vertexes) => + val nbrsArray: Vertex[T, Neighbors[Vertex[T, Q]]] = if (sortNeighbors) { + Vertex(id, SortedNeighbors(vertexes.toArray.sortBy(_._1).map(_._2))) + } else { + Vertex(id, UnsortedNeighbors(vertexes.toArray.map(_._2))) + } + + Iterator.single(nbrsArray) + } + .values + + /** + * Returns each Vertex with all out going edges. + * Optionally sort the edges + */ + def collectEdges(sortNeighbors: Boolean = true): TypedPipe[Vertex[T, Neighbors[Edge[T, S]]]] = + edges + .map{ edge => (edge.source, edge) } + .group + .mapGroup{ + case (id, edgeList) => + val nbrsArray: Vertex[T, Neighbors[Edge[T, S]]] = if (sortNeighbors) { + Vertex(id, SortedNeighbors(edgeList.toArray.sortBy(_.dest))) + } else { + Vertex(id, UnsortedNeighbors(edgeList.toArray.sortBy(_.dest))) + } + + Iterator.single(nbrsArray) + } + .values + + /** + * Filter the graph by the Edge and Vertex filters. + */ + def subgraph(epred: EdgeTriplet[T, S, Q] => Boolean = _ => true, vpred: Vertex[T, Q] => Boolean = _ => true): Graph[T, S, Q] = { + val newTriplets = triplets.filter(epred) + + new Graph[T, S, Q]( + newTriplets.map(_.edge), + newTriplets.flatMap(trip => List(trip.source, trip.dest)).distinct(Ordering.by(_.id)).filter(vpred)) + } + + /** + * The current graph is filtered to only include the edges and vertices from the other graph. + * The attribute of the other graph does not matter, the current attributes are kept. + */ + def mask[A, B](other: Graph[T, A, B]): Graph[T, S, Q] = { + val fEdges = edges + .map{ e => ((e.dest, e.source), e) } + .group + .join(other.edges.map{ e => ((e.dest, e.source), ()) }.group) + .toTypedPipe + .map{ case (_, (e, _)) => e } + + val fVertices = vertices + .groupBy(_.id) + .join(other.vertices.groupBy(_.id)) + .toTypedPipe + .map{ case (_, (v, _)) => v } + + new Graph[T, S, Q](fEdges, fVertices) + } +} + +/** + * Sometimes working just on deges is required, in those cases we don't also want to + * take the computational hit of filtering vertices by the updated edges. In those cases + * you get return this subgraph that will only filter the vertices when necessary + */ +class GraphUnfilteredVertices[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) + extends Graph(inputEdges, inputVertices) { + override def vertices = { + val graphVertices = edges.flatMap(e => List(e.source, e.dest)).distinct + inputVertices + .groupBy(_.id) + .join(graphVertices.asKeys) + .values + .map(_._1) + } +} + +/** + * Sometimes working just on vertices is required, in those cases we don't also want to + * take the computational hit of filtering edges by the updated vertices. In those cases + * you get return this subgraph that will only filter the edges when necessary + */ +class GraphUnfilteredEdges[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) + extends Graph(inputEdges, inputVertices) { + + override def edges = + inputEdges + .groupBy(_.source) + .join(vertices.map(_.id).asKeys) + .values + .map(_._1) + .groupBy(_.dest) + .join(vertices.map(_.id).asKeys) + .values + .map(_._1) + +} + +object Graph { + def fromEdges[T: Ordering: ClassTag, S](edges: TypedPipe[Edge[T, S]]): Graph[T, S, Unit] = { + val vertices = edges + .flatMap(e => List(e.source, e.dest)) + .distinct + .map(vertex => Vertex(vertex, ())) + + new Graph(edges, vertices) + } +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala new file mode 100644 index 0000000000..70385a45a0 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala @@ -0,0 +1,25 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.graph + +sealed trait Neighbors[T] { + def neighbors: Array[T] +} + +case class SortedNeighbors[T](val neighbors: Array[T]) extends Neighbors[T] + +case class UnsortedNeighbors[T](val neighbors: Array[T]) extends Neighbors[T] + diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Vertex.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Vertex.scala new file mode 100644 index 0000000000..98c8019c15 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Vertex.scala @@ -0,0 +1,20 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.graph + +case class Vertex[T, S](id: T, attr: S)(implicit ord: Ordering[T]) extends Ordered[Vertex[T, _]] { + def compare(that: Vertex[T, _]) = ord.compare(id, that.id) +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala new file mode 100644 index 0000000000..3b857c394a --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala @@ -0,0 +1,75 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.graph + +import com.twitter.scalding.TypedPipeChecker +import com.twitter.scalding.typed.TypedPipe +import org.scalatest.{ Matchers, WordSpec } + +class GraphTest extends WordSpec with Matchers { + + val graphPipe = List( + (4L, 1L), + (2L, 4L), + (2L, 1L), + (5L, 2L), + (5L, 3L), + (5L, 6L), + (3L, 6L), + (3L, 2L), + (1L, 2L)) + + val vertices = graphPipe.flatMap{ case (s, d) => List(s, d) }.distinct.map(s => Vertex(s, ())) + val edges = graphPipe.map{ case (s, d) => Edge(s, d, ()) } + + "A Graph" should { + val graph = new Graph(TypedPipe.from(edges), TypedPipe.from(vertices)) + "map vertices" in { + val updatedGraph = graph.mapVertices{ case (Vertex(id, _)) => id } + val mapped = TypedPipeChecker.inMemoryToList(updatedGraph.vertices) + mapped.foreach{ case (Vertex(id, attr)) => assert(id == attr) } + } + + "map edges" in { + val updatedGraph = graph.mapEdges{ case (Edge(source, dest, _)) => source } + val mapped = TypedPipeChecker.inMemoryToList(updatedGraph.edges) + mapped.foreach{ case (Edge(source, dest, attr)) => assert(source == attr) } + } + + "join vertices" in { + val newVertices = List(Vertex(1L, ()), Vertex(3L, ())) + val updatedGraph = graph.joinVertices(TypedPipe.from(newVertices)){ case (id, _, _) => id } + val mapped = TypedPipeChecker.inMemoryToList(updatedGraph.vertices) + mapped.foreach{ case (Vertex(id, attr)) => assert(id == attr) } + } + + "collect neighbors sorted" in { + val neighbors = TypedPipeChecker.inMemoryToList(graph.collectNeighbors(true)) + + val vertex = neighbors.find(_.id == 2L) + assert(vertex.isDefined, "Found the vertex") + assert(vertex.get.attr.neighbors.map(_.id) === Array(1L, 4L)) + } + + "collect neighbors ids sorted" in { + val neighbors = TypedPipeChecker.inMemoryToList(graph.collectNeighborIds(true)) + + val vertex = neighbors.find(_.id == 2L) + assert(vertex.isDefined, "Found the vertex") + assert(vertex.get.attr.neighbors === Array(1L, 4L)) + } + } +} From 6670dfa49725c9c347202403f424a6fc714394b7 Mon Sep 17 00:00:00 2001 From: Richard Whitcomb Date: Sun, 24 Jul 2016 21:32:03 -0400 Subject: [PATCH 2/4] Add missing types to Graph subclasses which is breaking 2.10 build --- .../src/main/scala/com/twitter/scalding/graph/Graph.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala index 77e30331ca..c3f3c3a997 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala @@ -178,7 +178,7 @@ class Graph[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inpu * you get return this subgraph that will only filter the vertices when necessary */ class GraphUnfilteredVertices[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) - extends Graph(inputEdges, inputVertices) { + extends Graph[T, S, Q](inputEdges, inputVertices) { override def vertices = { val graphVertices = edges.flatMap(e => List(e.source, e.dest)).distinct inputVertices @@ -195,7 +195,7 @@ class GraphUnfilteredVertices[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe * you get return this subgraph that will only filter the edges when necessary */ class GraphUnfilteredEdges[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) - extends Graph(inputEdges, inputVertices) { + extends Graph[T, S, Q](inputEdges, inputVertices) { override def edges = inputEdges From 54a446c388747e10576b5f478c189a41d1fb4b95 Mon Sep 17 00:00:00 2001 From: Richard Whitcomb Date: Mon, 25 Jul 2016 16:18:19 -0400 Subject: [PATCH 3/4] Add exact cosine example --- .../com/twitter/scalding/graph/Graph.scala | 35 ++++++---- .../twitter/scalding/graph/Neighbors.scala | 69 ++++++++++++++++++- .../scalding/graph/VertexSimilarity.scala | 34 +++++++++ .../twitter/scalding/graph/GraphTest.scala | 4 +- .../scalding/graph/VertexSimilarityTest.scala | 44 ++++++++++++ 5 files changed, 170 insertions(+), 16 deletions(-) create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/graph/VertexSimilarity.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/graph/VertexSimilarityTest.scala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala index c3f3c3a997..6736aed301 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala @@ -19,7 +19,7 @@ import com.twitter.scalding.TypedPipe import scala.reflect.ClassTag -class Graph[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) { +class Graph[T: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]])(implicit ord: Ordering[T]) { def edges: TypedPipe[Edge[T, S]] = inputEdges def vertices: TypedPipe[Vertex[T, Q]] = inputVertices @@ -80,14 +80,14 @@ class Graph[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inpu * Collect only neighbor ids. * Optionally sort by Vertex */ - def collectNeighborIds(sortNeighbors: Boolean = true): TypedPipe[Vertex[T, Neighbors[T]]] = - edges - .map{ edge => (edge.source, edge.dest) } + def collectNeighborIds(sortNeighbors: Boolean = true): Graph[T, S, Neighbors[T]] = { + val nbrs = edges + .map { edge => (edge.source, edge.dest) } .group - .mapGroup{ + .mapGroup { case (vert, nbrs) => val nbrsArray: Vertex[T, Neighbors[T]] = if (sortNeighbors) { - Vertex(vert, SortedNeighbors(nbrs.toArray.sorted)) + Vertex(vert, SortedNeighbors(nbrs.toArray.sorted)(ord)) } else { Vertex(vert, UnsortedNeighbors(nbrs.toArray)) } @@ -96,21 +96,24 @@ class Graph[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inpu } .values + Graph(edges, nbrs) + } + /** * Collect all Neighbors of an Vertex. * Optionally sort by Vertex */ - def collectNeighbors(sortNeighbors: Boolean = true): TypedPipe[Vertex[T, Neighbors[Vertex[T, Q]]]] = - edges - .map{ edge => (edge.dest, edge.source) } + def collectNeighbors(sortNeighbors: Boolean = true): Graph[T, S, Neighbors[Vertex[T, Q]]] = { + val nbrs = edges + .map { edge => (edge.dest, edge.source) } .join(vertices.groupBy(_.id)) .toTypedPipe - .map{ case (dest, (source, vertex)) => (source, (dest, vertex)) } + .map { case (dest, (source, vertex)) => (source, (dest, vertex)) } .group - .mapGroup{ + .mapGroup { case (id, vertexes) => val nbrsArray: Vertex[T, Neighbors[Vertex[T, Q]]] = if (sortNeighbors) { - Vertex(id, SortedNeighbors(vertexes.toArray.sortBy(_._1).map(_._2))) + Vertex(id, SortedNeighbors(vertexes.toArray.sortBy(_._1).map(_._2))(Ordering.by(_.id))) } else { Vertex(id, UnsortedNeighbors(vertexes.toArray.map(_._2))) } @@ -119,6 +122,9 @@ class Graph[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inpu } .values + Graph(edges, nbrs) + } + /** * Returns each Vertex with all out going edges. * Optionally sort the edges @@ -130,7 +136,7 @@ class Graph[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inpu .mapGroup{ case (id, edgeList) => val nbrsArray: Vertex[T, Neighbors[Edge[T, S]]] = if (sortNeighbors) { - Vertex(id, SortedNeighbors(edgeList.toArray.sortBy(_.dest))) + Vertex(id, SortedNeighbors(edgeList.toArray.sortBy(_.dest))(Ordering.by(_.dest))) } else { Vertex(id, UnsortedNeighbors(edgeList.toArray.sortBy(_.dest))) } @@ -219,4 +225,7 @@ object Graph { new Graph(edges, vertices) } + + def apply[T: Ordering: ClassTag, S, Q](edges: TypedPipe[Edge[T, S]], vertices: TypedPipe[Vertex[T, Q]]): Graph[T, S, Q] = + new Graph(edges, vertices) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala index 70385a45a0..1beb78810f 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala @@ -19,7 +19,74 @@ sealed trait Neighbors[T] { def neighbors: Array[T] } -case class SortedNeighbors[T](val neighbors: Array[T]) extends Neighbors[T] +case class SortedNeighbors[T](val neighbors: Array[T])(implicit ord: Ordering[T]) extends Neighbors[T] { + def intersectCount(other: SortedNeighbors[T])(implicit ord: Ordering[T]): Int = { + val (size1, size2) = (neighbors.length, other.neighbors.length) + + val (max, min) = if (size1 > size2) (size1, size2) else (size2, size1) + + if ((size1 + size2) < (min * Math.log(max))) { + mergeJoin(neighbors, other.neighbors) + } else { + if (size1 > size2) { + binarySearch(neighbors, other.neighbors) + } else { + binarySearch(other.neighbors, neighbors) + } + } + } + + /** + * Compute Intersection by using merge join O(M + N) complexity + */ + private def mergeJoin(array1: Array[T], array2: Array[T])(implicit ord: Ordering[T]): Int = { + var (ix1, ix2, intersect) = (0, 0, 0) + + while (ix1 < array1.length && ix2 < array2.length) { + val comp = ord.compare(array1(ix1), array2(ix2)) + if (comp < 0) { + ix1 += 1 + } else if (comp > 0) { + ix2 += 1 + } else { + intersect += 1 + + ix1 += 1 + ix2 += 1 + } + } + + intersect + } + + /** + * Compute Intersection by using binary search O(Mlg(N)) where M is small and N is large + */ + private def binarySearch(bigArray: Array[T], smallArray: Array[T]): Int = { + var (ix2, intersect) = (0, 0) + + while (ix2 < smallArray.length) { + var left = 0 + var right = bigArray.length - 1 + while (left <= right) { + val mid = left + (right - left) / 2 + val comp = ord.compare(bigArray(mid), smallArray(ix2)) + if (comp == 0) { + intersect += 1 + left = right + 1 + } else if (comp > 0) { + right = mid - 1 + } else { + left = mid + 1 + } + } + + ix2 += 1 + } + + intersect + } +} case class UnsortedNeighbors[T](val neighbors: Array[T]) extends Neighbors[T] diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/VertexSimilarity.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/VertexSimilarity.scala new file mode 100644 index 0000000000..8c1a35f7c2 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/VertexSimilarity.scala @@ -0,0 +1,34 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.graph + +import com.twitter.scalding.mathematics.SetSimilarity + +import scala.reflect.ClassTag + +object VertexSimilarity { + /** + * Compute the exact cosine between all vertices in the graph. + */ + def exactCosine[T: Ordering: ClassTag, S, Q](graph: Graph[T, S, Q]): Graph[T, Option[Double], Unit] = + graph + .collectNeighborIds(sortNeighbors = true) + .mapTriplets{ + case (EdgeTriplet(Vertex(id1, nbrs1: SortedNeighbors[T]), Vertex(id2, nbrs2: SortedNeighbors[T]), edge)) => + val cosine = SetSimilarity(nbrs1.intersectCount(nbrs2), nbrs1.neighbors.length, nbrs2.neighbors.length).cosine + EdgeTriplet(Vertex(id1, ()), Vertex(id2, ()), Edge(id1, id2, cosine)) + } +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala index 3b857c394a..9bb7c83fdc 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala @@ -57,7 +57,7 @@ class GraphTest extends WordSpec with Matchers { } "collect neighbors sorted" in { - val neighbors = TypedPipeChecker.inMemoryToList(graph.collectNeighbors(true)) + val neighbors = TypedPipeChecker.inMemoryToList(graph.collectNeighbors(true).vertices) val vertex = neighbors.find(_.id == 2L) assert(vertex.isDefined, "Found the vertex") @@ -65,7 +65,7 @@ class GraphTest extends WordSpec with Matchers { } "collect neighbors ids sorted" in { - val neighbors = TypedPipeChecker.inMemoryToList(graph.collectNeighborIds(true)) + val neighbors = TypedPipeChecker.inMemoryToList(graph.collectNeighborIds(true).vertices) val vertex = neighbors.find(_.id == 2L) assert(vertex.isDefined, "Found the vertex") diff --git a/scalding-core/src/test/scala/com/twitter/scalding/graph/VertexSimilarityTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/graph/VertexSimilarityTest.scala new file mode 100644 index 0000000000..cd473eb4f8 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/graph/VertexSimilarityTest.scala @@ -0,0 +1,44 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding.graph + +import com.twitter.scalding.TypedPipeChecker +import com.twitter.scalding.typed.TypedPipe +import org.scalatest.{ Matchers, WordSpec } + +class VertexSimilarityTest extends WordSpec with Matchers { + val graphPipe = List( + (4L, 1L), + (2L, 4L), + (2L, 1L), + (5L, 2L), + (5L, 3L), + (5L, 6L), + (3L, 6L), + (3L, 2L)) + + val graph = Graph.fromEdges(TypedPipe.from(graphPipe).map{ case (s, d) => Edge(s, d, ()) }) + + "Vertex Similarity" should { + "compute exact cosine" in { + val cosine = TypedPipeChecker.inMemoryToList(VertexSimilarity.exactCosine(graph).edges) + + val edge = cosine.find(e => e.source == 2L && e.dest == 4L) + + assert(edge.exists(_.attr.exists(score => math.abs(score - 0.707) < 0.001))) + } + } +} \ No newline at end of file From d8e7a43ca2ac09637d38b664a6f4be7aea6a3d9c Mon Sep 17 00:00:00 2001 From: Richard Whitcomb Date: Fri, 26 Aug 2016 20:34:00 -0400 Subject: [PATCH 4/4] Simplify review, more unit tests, address feedback --- .../com/twitter/scalding/graph/Edge.scala | 2 +- .../twitter/scalding/graph/EdgeTriplet.scala | 2 +- .../com/twitter/scalding/graph/Graph.scala | 99 ++++++++++--------- .../twitter/scalding/graph/Neighbors.scala | 78 ++------------- .../com/twitter/scalding/graph/Vertex.scala | 4 +- .../scalding/graph/VertexSimilarity.scala | 34 ------- .../twitter/scalding/graph/GraphTest.scala | 51 +++++++++- .../scalding/graph/VertexSimilarityTest.scala | 44 --------- 8 files changed, 114 insertions(+), 200 deletions(-) delete mode 100644 scalding-core/src/main/scala/com/twitter/scalding/graph/VertexSimilarity.scala delete mode 100644 scalding-core/src/test/scala/com/twitter/scalding/graph/VertexSimilarityTest.scala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Edge.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Edge.scala index a8ce45ffe2..8cd58f9b81 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/graph/Edge.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Edge.scala @@ -15,4 +15,4 @@ limitations under the License. */ package com.twitter.scalding.graph -case class Edge[T: Ordering, S](source: T, dest: T, attr: S) +case class Edge[T, S](source: T, dest: T, attr: S) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/EdgeTriplet.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/EdgeTriplet.scala index 1dafb8f5fb..a2b1b9eff4 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/graph/EdgeTriplet.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/EdgeTriplet.scala @@ -15,4 +15,4 @@ limitations under the License. */ package com.twitter.scalding.graph -case class EdgeTriplet[T: Ordering, S, Q](source: Vertex[T, Q], dest: Vertex[T, Q], edge: Edge[T, S]) +case class EdgeTriplet[T, S, Q](source: Vertex[T, Q], dest: Vertex[T, Q], edge: Edge[T, S]) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala index 6736aed301..c4eecf258a 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Graph.scala @@ -19,7 +19,14 @@ import com.twitter.scalding.TypedPipe import scala.reflect.ClassTag -class Graph[T: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]])(implicit ord: Ordering[T]) { +/** + * General Graph Object that works on both Vertices and Edges. + * Graph supports extra data on both edges and vertices. + * + * @param inputEdges Directed Edges that make up the graph. + * @param inputVertices Vertices of the graph. + */ +class Graph[T: Ordering, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) { def edges: TypedPipe[Edge[T, S]] = inputEdges def vertices: TypedPipe[Vertex[T, Q]] = inputVertices @@ -62,6 +69,18 @@ class Graph[T: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: new Graph[T, S, VD2](edges, newVertices) } + /** + * Filtered the vertices of the graph. Defer the edge filtering until necessary. + */ + def filterVertices(filter: Vertex[T, Q] => Boolean) = + new GraphUnfilteredEdges[T, S, Q](edges, vertices.filter(filter)) + + /** + * Filtered the edges of the graph. Defer the vertex filtering until necessary. + */ + def filterEdges(filter: Edge[T, S] => Boolean) = + new GraphUnfilteredVertices[T, S, Q](edges.filter(filter), vertices) + def mapVertices[A](map: Vertex[T, Q] => A): Graph[T, S, A] = new Graph[T, S, A](edges, vertices.map{ vertex => Vertex(vertex.id, map(vertex)) }) @@ -77,22 +96,15 @@ class Graph[T: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: } /** - * Collect only neighbor ids. - * Optionally sort by Vertex + * For all vertices collect their neighbors storing only ids. */ - def collectNeighborIds(sortNeighbors: Boolean = true): Graph[T, S, Neighbors[T]] = { + def collectNeighborIds(implicit ct: ClassTag[T]): Graph[T, S, UnsortedNeighbors[T]] = { val nbrs = edges - .map { edge => (edge.source, edge.dest) } + .map{ edge => (edge.source, edge.dest) } .group - .mapGroup { - case (vert, nbrs) => - val nbrsArray: Vertex[T, Neighbors[T]] = if (sortNeighbors) { - Vertex(vert, SortedNeighbors(nbrs.toArray.sorted)(ord)) - } else { - Vertex(vert, UnsortedNeighbors(nbrs.toArray)) - } - - Iterator.single(nbrsArray) + .mapGroup{ + case (vert, neighbors) => + Iterator.single(Vertex(vert, UnsortedNeighbors(neighbors.toArray))) } .values @@ -100,25 +112,18 @@ class Graph[T: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: } /** - * Collect all Neighbors of an Vertex. - * Optionally sort by Vertex + * For all vertices collect their neighbors. */ - def collectNeighbors(sortNeighbors: Boolean = true): Graph[T, S, Neighbors[Vertex[T, Q]]] = { + def collectNeighbors(implicit ct: ClassTag[T]): Graph[T, S, UnsortedNeighbors[Vertex[T, Q]]] = { val nbrs = edges - .map { edge => (edge.dest, edge.source) } + .map{ edge => (edge.dest, edge.source) } .join(vertices.groupBy(_.id)) .toTypedPipe - .map { case (dest, (source, vertex)) => (source, (dest, vertex)) } + .map{ case (dest, (source, vertex)) => (source, (dest, vertex)) } .group - .mapGroup { + .mapGroup{ case (id, vertexes) => - val nbrsArray: Vertex[T, Neighbors[Vertex[T, Q]]] = if (sortNeighbors) { - Vertex(id, SortedNeighbors(vertexes.toArray.sortBy(_._1).map(_._2))(Ordering.by(_.id))) - } else { - Vertex(id, UnsortedNeighbors(vertexes.toArray.map(_._2))) - } - - Iterator.single(nbrsArray) + Iterator.single(Vertex(id, UnsortedNeighbors(vertexes.toArray.map(_._2)))) } .values @@ -129,31 +134,31 @@ class Graph[T: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: * Returns each Vertex with all out going edges. * Optionally sort the edges */ - def collectEdges(sortNeighbors: Boolean = true): TypedPipe[Vertex[T, Neighbors[Edge[T, S]]]] = - edges + def collectEdges(implicit ct: ClassTag[T]): Graph[T, S, UnsortedNeighbors[Edge[T, S]]] = { + val vertices = edges .map{ edge => (edge.source, edge) } .group .mapGroup{ case (id, edgeList) => - val nbrsArray: Vertex[T, Neighbors[Edge[T, S]]] = if (sortNeighbors) { - Vertex(id, SortedNeighbors(edgeList.toArray.sortBy(_.dest))(Ordering.by(_.dest))) - } else { - Vertex(id, UnsortedNeighbors(edgeList.toArray.sortBy(_.dest))) - } - - Iterator.single(nbrsArray) + Iterator.single(Vertex(id, UnsortedNeighbors(edgeList.toArray.sortBy(_.dest)))) } .values + Graph(edges, vertices) + } + /** * Filter the graph by the Edge and Vertex filters. */ def subgraph(epred: EdgeTriplet[T, S, Q] => Boolean = _ => true, vpred: Vertex[T, Q] => Boolean = _ => true): Graph[T, S, Q] = { val newTriplets = triplets.filter(epred) - new Graph[T, S, Q]( + new GraphUnfilteredEdges[T, S, Q]( newTriplets.map(_.edge), - newTriplets.flatMap(trip => List(trip.source, trip.dest)).distinct(Ordering.by(_.id)).filter(vpred)) + newTriplets + .flatMap(trip => List(trip.source, trip.dest)) + .filter(vpred) + .distinct(Ordering.by(_.id))) } /** @@ -181,9 +186,9 @@ class Graph[T: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: /** * Sometimes working just on deges is required, in those cases we don't also want to * take the computational hit of filtering vertices by the updated edges. In those cases - * you get return this subgraph that will only filter the vertices when necessary + * you can return this subgraph that will only filter the vertices when necessary */ -class GraphUnfilteredVertices[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) +class GraphUnfilteredVertices[T: Ordering, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) extends Graph[T, S, Q](inputEdges, inputVertices) { override def vertices = { val graphVertices = edges.flatMap(e => List(e.source, e.dest)).distinct @@ -198,9 +203,9 @@ class GraphUnfilteredVertices[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe /** * Sometimes working just on vertices is required, in those cases we don't also want to * take the computational hit of filtering edges by the updated vertices. In those cases - * you get return this subgraph that will only filter the edges when necessary + * you can return this subgraph that will only filter the edges when necessary */ -class GraphUnfilteredEdges[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) +class GraphUnfilteredEdges[T: Ordering, S, Q](inputEdges: TypedPipe[Edge[T, S]], inputVertices: TypedPipe[Vertex[T, Q]]) extends Graph[T, S, Q](inputEdges, inputVertices) { override def edges = @@ -217,7 +222,10 @@ class GraphUnfilteredEdges[T: Ordering: ClassTag, S, Q](inputEdges: TypedPipe[Ed } object Graph { - def fromEdges[T: Ordering: ClassTag, S](edges: TypedPipe[Edge[T, S]]): Graph[T, S, Unit] = { + /** + * Generate a graph from a set of Directed Edges. + */ + def fromEdges[T: Ordering, S](edges: TypedPipe[Edge[T, S]]): Graph[T, S, Unit] = { val vertices = edges .flatMap(e => List(e.source, e.dest)) .distinct @@ -226,6 +234,9 @@ object Graph { new Graph(edges, vertices) } - def apply[T: Ordering: ClassTag, S, Q](edges: TypedPipe[Edge[T, S]], vertices: TypedPipe[Vertex[T, Q]]): Graph[T, S, Q] = + /** + * Generate a graph from a set of Directed Edges and Vertices + */ + def apply[T: Ordering, S, Q](edges: TypedPipe[Edge[T, S]], vertices: TypedPipe[Vertex[T, Q]]): Graph[T, S, Q] = new Graph(edges, vertices) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala index 1beb78810f..427a313d54 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Neighbors.scala @@ -19,74 +19,14 @@ sealed trait Neighbors[T] { def neighbors: Array[T] } -case class SortedNeighbors[T](val neighbors: Array[T])(implicit ord: Ordering[T]) extends Neighbors[T] { - def intersectCount(other: SortedNeighbors[T])(implicit ord: Ordering[T]): Int = { - val (size1, size2) = (neighbors.length, other.neighbors.length) - - val (max, min) = if (size1 > size2) (size1, size2) else (size2, size1) - - if ((size1 + size2) < (min * Math.log(max))) { - mergeJoin(neighbors, other.neighbors) - } else { - if (size1 > size2) { - binarySearch(neighbors, other.neighbors) - } else { - binarySearch(other.neighbors, neighbors) - } - } - } - - /** - * Compute Intersection by using merge join O(M + N) complexity - */ - private def mergeJoin(array1: Array[T], array2: Array[T])(implicit ord: Ordering[T]): Int = { - var (ix1, ix2, intersect) = (0, 0, 0) - - while (ix1 < array1.length && ix2 < array2.length) { - val comp = ord.compare(array1(ix1), array2(ix2)) - if (comp < 0) { - ix1 += 1 - } else if (comp > 0) { - ix2 += 1 - } else { - intersect += 1 - - ix1 += 1 - ix2 += 1 - } - } - - intersect - } - - /** - * Compute Intersection by using binary search O(Mlg(N)) where M is small and N is large - */ - private def binarySearch(bigArray: Array[T], smallArray: Array[T]): Int = { - var (ix2, intersect) = (0, 0) - - while (ix2 < smallArray.length) { - var left = 0 - var right = bigArray.length - 1 - while (left <= right) { - val mid = left + (right - left) / 2 - val comp = ord.compare(bigArray(mid), smallArray(ix2)) - if (comp == 0) { - intersect += 1 - left = right + 1 - } else if (comp > 0) { - right = mid - 1 - } else { - left = mid + 1 - } - } - - ix2 += 1 - } - - intersect - } +/** + * List of Sorted Neighbors backed by an Array + */ +case class SortedNeighbors[T](val neighbors: Array[T]) extends Neighbors[T] +/** + * Unsorted List of Neighbors backed by an Array + */ +case class UnsortedNeighbors[T](val neighbors: Array[T]) extends Neighbors[T] { + def toSorted(implicit ord: Ordering[T]): SortedNeighbors[T] = SortedNeighbors(neighbors.sorted) } -case class UnsortedNeighbors[T](val neighbors: Array[T]) extends Neighbors[T] - diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/Vertex.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/Vertex.scala index 98c8019c15..043ee30aed 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/graph/Vertex.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/graph/Vertex.scala @@ -15,6 +15,4 @@ limitations under the License. */ package com.twitter.scalding.graph -case class Vertex[T, S](id: T, attr: S)(implicit ord: Ordering[T]) extends Ordered[Vertex[T, _]] { - def compare(that: Vertex[T, _]) = ord.compare(id, that.id) -} +case class Vertex[T, S](id: T, attr: S) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/graph/VertexSimilarity.scala b/scalding-core/src/main/scala/com/twitter/scalding/graph/VertexSimilarity.scala deleted file mode 100644 index 8c1a35f7c2..0000000000 --- a/scalding-core/src/main/scala/com/twitter/scalding/graph/VertexSimilarity.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2012 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package com.twitter.scalding.graph - -import com.twitter.scalding.mathematics.SetSimilarity - -import scala.reflect.ClassTag - -object VertexSimilarity { - /** - * Compute the exact cosine between all vertices in the graph. - */ - def exactCosine[T: Ordering: ClassTag, S, Q](graph: Graph[T, S, Q]): Graph[T, Option[Double], Unit] = - graph - .collectNeighborIds(sortNeighbors = true) - .mapTriplets{ - case (EdgeTriplet(Vertex(id1, nbrs1: SortedNeighbors[T]), Vertex(id2, nbrs2: SortedNeighbors[T]), edge)) => - val cosine = SetSimilarity(nbrs1.intersectCount(nbrs2), nbrs1.neighbors.length, nbrs2.neighbors.length).cosine - EdgeTriplet(Vertex(id1, ()), Vertex(id2, ()), Edge(id1, id2, cosine)) - } -} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala index 9bb7c83fdc..933892c0d4 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/graph/GraphTest.scala @@ -20,6 +20,7 @@ import com.twitter.scalding.typed.TypedPipe import org.scalatest.{ Matchers, WordSpec } class GraphTest extends WordSpec with Matchers { + import TypedPipeChecker._ val graphPipe = List( (4L, 1L), @@ -33,10 +34,14 @@ class GraphTest extends WordSpec with Matchers { (1L, 2L)) val vertices = graphPipe.flatMap{ case (s, d) => List(s, d) }.distinct.map(s => Vertex(s, ())) + + implicit val ord = Ordering.by[Vertex[Long, Unit], Long](_.id) + val edges = graphPipe.map{ case (s, d) => Edge(s, d, ()) } "A Graph" should { val graph = new Graph(TypedPipe.from(edges), TypedPipe.from(vertices)) + "map vertices" in { val updatedGraph = graph.mapVertices{ case (Vertex(id, _)) => id } val mapped = TypedPipeChecker.inMemoryToList(updatedGraph.vertices) @@ -45,19 +50,27 @@ class GraphTest extends WordSpec with Matchers { "map edges" in { val updatedGraph = graph.mapEdges{ case (Edge(source, dest, _)) => source } - val mapped = TypedPipeChecker.inMemoryToList(updatedGraph.edges) + val mapped = updatedGraph.edges.inMemoryToList mapped.foreach{ case (Edge(source, dest, attr)) => assert(source == attr) } } "join vertices" in { val newVertices = List(Vertex(1L, ()), Vertex(3L, ())) val updatedGraph = graph.joinVertices(TypedPipe.from(newVertices)){ case (id, _, _) => id } - val mapped = TypedPipeChecker.inMemoryToList(updatedGraph.vertices) + val mapped = updatedGraph.vertices.inMemoryToList mapped.foreach{ case (Vertex(id, attr)) => assert(id == attr) } } + "collect edges" in { + val neighbors = graph.collectEdges.vertices.inMemoryToList + + val vertex = neighbors.find(_.id == 5L) + assert(vertex.isDefined, "Found the vertex") + assert(vertex.get.attr.neighbors.map(_.dest) === Array(2L, 3L, 6L)) + } + "collect neighbors sorted" in { - val neighbors = TypedPipeChecker.inMemoryToList(graph.collectNeighbors(true).vertices) + val neighbors = graph.collectNeighbors.mapVertices(_.attr.toSorted).vertices.inMemoryToList val vertex = neighbors.find(_.id == 2L) assert(vertex.isDefined, "Found the vertex") @@ -65,11 +78,41 @@ class GraphTest extends WordSpec with Matchers { } "collect neighbors ids sorted" in { - val neighbors = TypedPipeChecker.inMemoryToList(graph.collectNeighborIds(true).vertices) + val neighbors = graph.collectNeighborIds.mapVertices(_.attr.toSorted).vertices.inMemoryToList val vertex = neighbors.find(_.id == 2L) assert(vertex.isDefined, "Found the vertex") assert(vertex.get.attr.neighbors === Array(1L, 4L)) } + + "subgraph" in { + def filterEdges(triplet: EdgeTriplet[Long, Unit, Unit]) = triplet.edge.source == 5L + def vpred(vertex: Vertex[Long, Unit]) = (vertex.id == 5L) || (vertex.id == 2L) + + val subgraph = graph.subgraph(filterEdges, vpred).edges.inMemoryToList + + assert(subgraph.size == 1, "Only one edge") + assert(subgraph.head == Edge(5L, 2L, ()), "Only one edge") + } + + "triplets" in { + val triplets = graph.triplets.inMemoryToList + val triplet = triplets.find(_.edge.source == 4L) + assert(triplet.isDefined, "Found the triplet") + assert(triplet.get === EdgeTriplet(Vertex(4L, ()), Vertex(1L, ()), Edge(4L, 1L, ()))) + } + + "mask" in { + val filteredGraph = graph.filterEdges(_.source == 5L) + val masked = graph.mask(filteredGraph).edges.inMemoryToList.sortBy(_.dest) + + val expectedEdges = graphPipe + .filter{ case (s, _) => s == 5L } + .sortBy(_._2) + .map{ case (s, d) => Edge(s, d, ()) } + + assert(masked.size == 3) + assert(masked === expectedEdges) + } } } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/graph/VertexSimilarityTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/graph/VertexSimilarityTest.scala deleted file mode 100644 index cd473eb4f8..0000000000 --- a/scalding-core/src/test/scala/com/twitter/scalding/graph/VertexSimilarityTest.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* -Copyright 2012 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package com.twitter.scalding.graph - -import com.twitter.scalding.TypedPipeChecker -import com.twitter.scalding.typed.TypedPipe -import org.scalatest.{ Matchers, WordSpec } - -class VertexSimilarityTest extends WordSpec with Matchers { - val graphPipe = List( - (4L, 1L), - (2L, 4L), - (2L, 1L), - (5L, 2L), - (5L, 3L), - (5L, 6L), - (3L, 6L), - (3L, 2L)) - - val graph = Graph.fromEdges(TypedPipe.from(graphPipe).map{ case (s, d) => Edge(s, d, ()) }) - - "Vertex Similarity" should { - "compute exact cosine" in { - val cosine = TypedPipeChecker.inMemoryToList(VertexSimilarity.exactCosine(graph).edges) - - val edge = cosine.find(e => e.source == 2L && e.dest == 4L) - - assert(edge.exists(_.attr.exists(score => math.abs(score - 0.707) < 0.001))) - } - } -} \ No newline at end of file